rev |
line source |
nuclear@0
|
1 /************************************************************************************
|
nuclear@0
|
2
|
nuclear@0
|
3 PublicHeader: None
|
nuclear@0
|
4 Filename : OVR_ThreadCommandQueue.cpp
|
nuclear@0
|
5 Content : Command queue for operations executed on a thread
|
nuclear@0
|
6 Created : October 29, 2012
|
nuclear@0
|
7
|
nuclear@0
|
8 Copyright : Copyright 2014 Oculus VR, LLC All Rights reserved.
|
nuclear@0
|
9
|
nuclear@0
|
10 Licensed under the Oculus VR Rift SDK License Version 3.2 (the "License");
|
nuclear@0
|
11 you may not use the Oculus VR Rift SDK except in compliance with the License,
|
nuclear@0
|
12 which is provided at the time of installation or download, or which
|
nuclear@0
|
13 otherwise accompanies this software in either electronic or hard copy form.
|
nuclear@0
|
14
|
nuclear@0
|
15 You may obtain a copy of the License at
|
nuclear@0
|
16
|
nuclear@0
|
17 http://www.oculusvr.com/licenses/LICENSE-3.2
|
nuclear@0
|
18
|
nuclear@0
|
19 Unless required by applicable law or agreed to in writing, the Oculus VR SDK
|
nuclear@0
|
20 distributed under the License is distributed on an "AS IS" BASIS,
|
nuclear@0
|
21 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
nuclear@0
|
22 See the License for the specific language governing permissions and
|
nuclear@0
|
23 limitations under the License.
|
nuclear@0
|
24
|
nuclear@0
|
25 ************************************************************************************/
|
nuclear@0
|
26
|
nuclear@0
|
27 #include "OVR_ThreadCommandQueue.h"
|
nuclear@0
|
28
|
nuclear@0
|
29 namespace OVR {
|
nuclear@0
|
30
|
nuclear@0
|
31
|
nuclear@0
|
32 //------------------------------------------------------------------------
|
nuclear@0
|
33 // ***** CircularBuffer
|
nuclear@0
|
34
|
nuclear@0
|
35 // CircularBuffer is a FIFO buffer implemented in a single block of memory,
|
nuclear@0
|
36 // which allows writing and reading variable-size data chucks. Write fails
|
nuclear@0
|
37 // if buffer is full.
|
nuclear@0
|
38
|
nuclear@0
|
39 class CircularBuffer
|
nuclear@0
|
40 {
|
nuclear@0
|
41 enum {
|
nuclear@0
|
42 AlignSize = 16,
|
nuclear@0
|
43 AlignMask = AlignSize - 1
|
nuclear@0
|
44 };
|
nuclear@0
|
45
|
nuclear@0
|
46 uint8_t* pBuffer;
|
nuclear@0
|
47 size_t Size;
|
nuclear@0
|
48 size_t Tail; // Byte offset of next item to be popped.
|
nuclear@0
|
49 size_t Head; // Byte offset of where next push will take place.
|
nuclear@0
|
50 size_t End; // When Head < Tail, this is used instead of Size.
|
nuclear@0
|
51
|
nuclear@0
|
52 inline size_t roundUpSize(size_t size)
|
nuclear@0
|
53 { return (size + AlignMask) & ~(size_t)AlignMask; }
|
nuclear@0
|
54
|
nuclear@0
|
55 public:
|
nuclear@0
|
56
|
nuclear@0
|
57 CircularBuffer(size_t size)
|
nuclear@0
|
58 : Size(size), Tail(0), Head(0), End(0)
|
nuclear@0
|
59 {
|
nuclear@0
|
60 pBuffer = (uint8_t*)OVR_ALLOC_ALIGNED(roundUpSize(size), AlignSize);
|
nuclear@0
|
61 }
|
nuclear@0
|
62 ~CircularBuffer()
|
nuclear@0
|
63 {
|
nuclear@0
|
64 // For ThreadCommands, we must consume everything before shutdown.
|
nuclear@0
|
65 OVR_ASSERT(IsEmpty());
|
nuclear@0
|
66 OVR_FREE_ALIGNED(pBuffer);
|
nuclear@0
|
67 }
|
nuclear@0
|
68
|
nuclear@0
|
69 bool IsEmpty() const { return (Head == Tail); }
|
nuclear@0
|
70
|
nuclear@0
|
71 // Allocates a state block of specified size and advances pointers,
|
nuclear@0
|
72 // returning 0 if buffer is full.
|
nuclear@0
|
73 uint8_t* Write(size_t size);
|
nuclear@0
|
74
|
nuclear@0
|
75 // Returns a pointer to next available data block; 0 if none available.
|
nuclear@0
|
76 uint8_t* ReadBegin()
|
nuclear@0
|
77 { return (Head != Tail) ? (pBuffer + Tail) : 0; }
|
nuclear@0
|
78 // Consumes data of specified size; this must match size passed to Write.
|
nuclear@0
|
79 void ReadEnd(size_t size);
|
nuclear@0
|
80 };
|
nuclear@0
|
81
|
nuclear@0
|
82
|
nuclear@0
|
83 // Allocates a state block of specified size and advances pointers,
|
nuclear@0
|
84 // returning 0 if buffer is full.
|
nuclear@0
|
85 uint8_t* CircularBuffer::Write(size_t size)
|
nuclear@0
|
86 {
|
nuclear@0
|
87 uint8_t* p = 0;
|
nuclear@0
|
88
|
nuclear@0
|
89 size = roundUpSize(size);
|
nuclear@0
|
90 // Since this is circular buffer, always allow at least one item.
|
nuclear@0
|
91 OVR_ASSERT(size < Size/2);
|
nuclear@0
|
92
|
nuclear@0
|
93 if (Head >= Tail)
|
nuclear@0
|
94 {
|
nuclear@0
|
95 OVR_ASSERT(End == 0);
|
nuclear@0
|
96
|
nuclear@0
|
97 if (size <= (Size - Head))
|
nuclear@0
|
98 {
|
nuclear@0
|
99 p = pBuffer + Head;
|
nuclear@0
|
100 Head += size;
|
nuclear@0
|
101 }
|
nuclear@0
|
102 else if (size < Tail)
|
nuclear@0
|
103 {
|
nuclear@0
|
104 p = pBuffer;
|
nuclear@0
|
105 End = Head;
|
nuclear@0
|
106 Head = size;
|
nuclear@0
|
107 OVR_ASSERT(Head != Tail);
|
nuclear@0
|
108 }
|
nuclear@0
|
109 }
|
nuclear@0
|
110 else
|
nuclear@0
|
111 {
|
nuclear@0
|
112 OVR_ASSERT(End != 0);
|
nuclear@0
|
113
|
nuclear@0
|
114 if ((Tail - Head) > size)
|
nuclear@0
|
115 {
|
nuclear@0
|
116 p = pBuffer + Head;
|
nuclear@0
|
117 Head += size;
|
nuclear@0
|
118 OVR_ASSERT(Head != Tail);
|
nuclear@0
|
119 }
|
nuclear@0
|
120 }
|
nuclear@0
|
121
|
nuclear@0
|
122 return p;
|
nuclear@0
|
123 }
|
nuclear@0
|
124
|
nuclear@0
|
125 void CircularBuffer::ReadEnd(size_t size)
|
nuclear@0
|
126 {
|
nuclear@0
|
127 OVR_ASSERT(Head != Tail);
|
nuclear@0
|
128 size = roundUpSize(size);
|
nuclear@0
|
129
|
nuclear@0
|
130 Tail += size;
|
nuclear@0
|
131 if (Tail == End)
|
nuclear@0
|
132 {
|
nuclear@0
|
133 Tail = End = 0;
|
nuclear@0
|
134 }
|
nuclear@0
|
135 else if (Tail == Head)
|
nuclear@0
|
136 {
|
nuclear@0
|
137 OVR_ASSERT(End == 0);
|
nuclear@0
|
138 Tail = Head = 0;
|
nuclear@0
|
139 }
|
nuclear@0
|
140 }
|
nuclear@0
|
141
|
nuclear@0
|
142
|
nuclear@0
|
143 //-------------------------------------------------------------------------------------
|
nuclear@0
|
144 // ***** ThreadCommand
|
nuclear@0
|
145
|
nuclear@0
|
146 ThreadCommand::PopBuffer::~PopBuffer()
|
nuclear@0
|
147 {
|
nuclear@0
|
148 if (Size) {
|
nuclear@0
|
149 Destruct<ThreadCommand>(toCommand());
|
nuclear@0
|
150 }
|
nuclear@0
|
151 }
|
nuclear@0
|
152
|
nuclear@0
|
153 void ThreadCommand::PopBuffer::InitFromBuffer(void* data)
|
nuclear@0
|
154 {
|
nuclear@0
|
155 ThreadCommand* cmd = (ThreadCommand*)data;
|
nuclear@0
|
156 OVR_ASSERT(cmd->Size <= MaxSize);
|
nuclear@0
|
157
|
nuclear@0
|
158 if (Size) {
|
nuclear@0
|
159 Destruct<ThreadCommand>(toCommand());
|
nuclear@0
|
160 }
|
nuclear@0
|
161 Size = cmd->Size;
|
nuclear@0
|
162 memcpy(Buffer, (void*)cmd, Size);
|
nuclear@0
|
163 }
|
nuclear@0
|
164
|
nuclear@0
|
165 void ThreadCommand::PopBuffer::Execute()
|
nuclear@0
|
166 {
|
nuclear@0
|
167 ThreadCommand* command = toCommand();
|
nuclear@0
|
168 OVR_ASSERT(command);
|
nuclear@0
|
169 command->Execute();
|
nuclear@0
|
170 if (NeedsWait()) {
|
nuclear@0
|
171 GetEvent()->PulseEvent();
|
nuclear@0
|
172 }
|
nuclear@0
|
173 }
|
nuclear@0
|
174
|
nuclear@0
|
175 //-------------------------------------------------------------------------------------
|
nuclear@0
|
176
|
nuclear@0
|
177 class ThreadCommandQueueImpl : public NewOverrideBase
|
nuclear@0
|
178 {
|
nuclear@0
|
179 typedef ThreadCommand::NotifyEvent NotifyEvent;
|
nuclear@0
|
180 friend class ThreadCommandQueue;
|
nuclear@0
|
181
|
nuclear@0
|
182 public:
|
nuclear@0
|
183
|
nuclear@0
|
184 ThreadCommandQueueImpl(ThreadCommandQueue* queue) :
|
nuclear@0
|
185 pQueue(queue),
|
nuclear@0
|
186 ExitEnqueued(false),
|
nuclear@0
|
187 ExitProcessed(false),
|
nuclear@0
|
188 CommandBuffer(2048),
|
nuclear@0
|
189 PullThreadId(0)
|
nuclear@0
|
190 {
|
nuclear@0
|
191 }
|
nuclear@0
|
192 ~ThreadCommandQueueImpl();
|
nuclear@0
|
193
|
nuclear@0
|
194
|
nuclear@0
|
195 bool PushCommand(const ThreadCommand& command);
|
nuclear@0
|
196 bool PopCommand(ThreadCommand::PopBuffer* popBuffer);
|
nuclear@0
|
197
|
nuclear@0
|
198
|
nuclear@0
|
199 // ExitCommand is used by notify us that Thread is shutting down.
|
nuclear@0
|
200 struct ExitCommand : public ThreadCommand
|
nuclear@0
|
201 {
|
nuclear@0
|
202 ThreadCommandQueueImpl* pImpl;
|
nuclear@0
|
203
|
nuclear@0
|
204 ExitCommand(ThreadCommandQueueImpl* impl, bool wait)
|
nuclear@0
|
205 : ThreadCommand(sizeof(ExitCommand), wait, true), pImpl(impl) { }
|
nuclear@0
|
206
|
nuclear@0
|
207 virtual void Execute() const
|
nuclear@0
|
208 {
|
nuclear@0
|
209 Lock::Locker lock(&pImpl->QueueLock);
|
nuclear@0
|
210 pImpl->ExitProcessed = true;
|
nuclear@0
|
211 }
|
nuclear@0
|
212 virtual ThreadCommand* CopyConstruct(void* p) const
|
nuclear@0
|
213 { return Construct<ExitCommand>(p, *this); }
|
nuclear@0
|
214 };
|
nuclear@0
|
215
|
nuclear@0
|
216
|
nuclear@0
|
217 NotifyEvent* AllocNotifyEvent_NTS()
|
nuclear@0
|
218 {
|
nuclear@0
|
219 NotifyEvent* p = AvailableEvents.GetFirst();
|
nuclear@0
|
220
|
nuclear@0
|
221 if (!AvailableEvents.IsNull(p))
|
nuclear@0
|
222 p->RemoveNode();
|
nuclear@0
|
223 else
|
nuclear@0
|
224 p = new NotifyEvent;
|
nuclear@0
|
225 return p;
|
nuclear@0
|
226 }
|
nuclear@0
|
227
|
nuclear@0
|
228 void FreeNotifyEvent_NTS(NotifyEvent* p)
|
nuclear@0
|
229 {
|
nuclear@0
|
230 AvailableEvents.PushBack(p);
|
nuclear@0
|
231 }
|
nuclear@0
|
232
|
nuclear@0
|
233 void FreeNotifyEvents_NTS()
|
nuclear@0
|
234 {
|
nuclear@0
|
235 while(!AvailableEvents.IsEmpty())
|
nuclear@0
|
236 {
|
nuclear@0
|
237 NotifyEvent* p = AvailableEvents.GetFirst();
|
nuclear@0
|
238 p->RemoveNode();
|
nuclear@0
|
239 delete p;
|
nuclear@0
|
240 }
|
nuclear@0
|
241 }
|
nuclear@0
|
242
|
nuclear@0
|
243 ThreadCommandQueue* pQueue;
|
nuclear@0
|
244 Lock QueueLock;
|
nuclear@0
|
245 volatile bool ExitEnqueued;
|
nuclear@0
|
246 volatile bool ExitProcessed;
|
nuclear@0
|
247 List<NotifyEvent> AvailableEvents;
|
nuclear@0
|
248 List<NotifyEvent> BlockedProducers;
|
nuclear@0
|
249 CircularBuffer CommandBuffer;
|
nuclear@0
|
250
|
nuclear@0
|
251 // The pull thread id is set to the last thread that pulled commands.
|
nuclear@0
|
252 // Since this thread command queue is designed for a single thread,
|
nuclear@0
|
253 // reentrant behavior that would cause a dead-lock for messages that
|
nuclear@0
|
254 // wait for completion can be avoided by simply comparing the
|
nuclear@0
|
255 // thread id of the last pull.
|
nuclear@0
|
256 OVR::ThreadId PullThreadId;
|
nuclear@0
|
257 };
|
nuclear@0
|
258
|
nuclear@0
|
259 ThreadCommandQueueImpl::~ThreadCommandQueueImpl()
|
nuclear@0
|
260 {
|
nuclear@0
|
261 Lock::Locker lock(&QueueLock);
|
nuclear@0
|
262 OVR_ASSERT(BlockedProducers.IsEmpty());
|
nuclear@0
|
263 FreeNotifyEvents_NTS();
|
nuclear@0
|
264 }
|
nuclear@0
|
265
|
nuclear@0
|
266 bool ThreadCommandQueueImpl::PushCommand(const ThreadCommand& command)
|
nuclear@0
|
267 {
|
nuclear@0
|
268 if (command.NeedsWait() && PullThreadId == OVR::GetCurrentThreadId())
|
nuclear@0
|
269 {
|
nuclear@0
|
270 command.Execute();
|
nuclear@0
|
271 return true;
|
nuclear@0
|
272 }
|
nuclear@0
|
273
|
nuclear@0
|
274 ThreadCommand::NotifyEvent* completeEvent = 0;
|
nuclear@0
|
275 ThreadCommand::NotifyEvent* queueAvailableEvent = 0;
|
nuclear@0
|
276
|
nuclear@0
|
277 // Repeat writing command into buffer until it is available.
|
nuclear@0
|
278 for (;;) {
|
nuclear@0
|
279 { // Lock Scope
|
nuclear@0
|
280 Lock::Locker lock(&QueueLock);
|
nuclear@0
|
281
|
nuclear@0
|
282 if (queueAvailableEvent) {
|
nuclear@0
|
283 FreeNotifyEvent_NTS(queueAvailableEvent);
|
nuclear@0
|
284 queueAvailableEvent = 0;
|
nuclear@0
|
285 }
|
nuclear@0
|
286
|
nuclear@0
|
287 // Don't allow any commands after PushExitCommand() is called.
|
nuclear@0
|
288 if (ExitEnqueued && !command.ExitFlag) {
|
nuclear@0
|
289 return false;
|
nuclear@0
|
290 }
|
nuclear@0
|
291
|
nuclear@0
|
292 bool bufferWasEmpty = CommandBuffer.IsEmpty();
|
nuclear@0
|
293 uint8_t* buffer = CommandBuffer.Write(command.GetSize());
|
nuclear@0
|
294
|
nuclear@0
|
295 if (buffer) {
|
nuclear@0
|
296 ThreadCommand* c = command.CopyConstruct(buffer);
|
nuclear@0
|
297
|
nuclear@0
|
298 if (c->NeedsWait()) {
|
nuclear@0
|
299 completeEvent = c->pEvent = AllocNotifyEvent_NTS();
|
nuclear@0
|
300 }
|
nuclear@0
|
301
|
nuclear@0
|
302 // Signal-waker consumer when we add data to buffer.
|
nuclear@0
|
303 if (bufferWasEmpty) {
|
nuclear@0
|
304 pQueue->OnPushNonEmpty_Locked();
|
nuclear@0
|
305 }
|
nuclear@0
|
306
|
nuclear@0
|
307 break;
|
nuclear@0
|
308 }
|
nuclear@0
|
309
|
nuclear@0
|
310 queueAvailableEvent = AllocNotifyEvent_NTS();
|
nuclear@0
|
311 BlockedProducers.PushBack(queueAvailableEvent);
|
nuclear@0
|
312 } // Lock Scope
|
nuclear@0
|
313
|
nuclear@0
|
314 queueAvailableEvent->Wait();
|
nuclear@0
|
315 } // Intentional infinite loop
|
nuclear@0
|
316
|
nuclear@0
|
317 // Command was enqueued, wait if necessary.
|
nuclear@0
|
318 if (completeEvent) {
|
nuclear@0
|
319 completeEvent->Wait();
|
nuclear@0
|
320 Lock::Locker lock(&QueueLock);
|
nuclear@0
|
321 FreeNotifyEvent_NTS(completeEvent);
|
nuclear@0
|
322 }
|
nuclear@0
|
323
|
nuclear@0
|
324 return true;
|
nuclear@0
|
325 }
|
nuclear@0
|
326
|
nuclear@0
|
327
|
nuclear@0
|
328 // Pops the next command from the thread queue, if any is available.
|
nuclear@0
|
329 bool ThreadCommandQueueImpl::PopCommand(ThreadCommand::PopBuffer* popBuffer)
|
nuclear@0
|
330 {
|
nuclear@0
|
331 PullThreadId = OVR::GetCurrentThreadId();
|
nuclear@0
|
332
|
nuclear@0
|
333 Lock::Locker lock(&QueueLock);
|
nuclear@0
|
334
|
nuclear@0
|
335 uint8_t* buffer = CommandBuffer.ReadBegin();
|
nuclear@0
|
336 if (!buffer)
|
nuclear@0
|
337 {
|
nuclear@0
|
338 // Notify thread while in lock scope, enabling initialization of wait.
|
nuclear@0
|
339 pQueue->OnPopEmpty_Locked();
|
nuclear@0
|
340 return false;
|
nuclear@0
|
341 }
|
nuclear@0
|
342
|
nuclear@0
|
343 popBuffer->InitFromBuffer(buffer);
|
nuclear@0
|
344 CommandBuffer.ReadEnd(popBuffer->GetSize());
|
nuclear@0
|
345
|
nuclear@0
|
346 if (!BlockedProducers.IsEmpty())
|
nuclear@0
|
347 {
|
nuclear@0
|
348 ThreadCommand::NotifyEvent* queueAvailableEvent = BlockedProducers.GetFirst();
|
nuclear@0
|
349 queueAvailableEvent->RemoveNode();
|
nuclear@0
|
350 queueAvailableEvent->PulseEvent();
|
nuclear@0
|
351 // Event is freed later by waiter.
|
nuclear@0
|
352 }
|
nuclear@0
|
353 return true;
|
nuclear@0
|
354 }
|
nuclear@0
|
355
|
nuclear@0
|
356
|
nuclear@0
|
357 //-------------------------------------------------------------------------------------
|
nuclear@0
|
358
|
nuclear@0
|
359 ThreadCommandQueue::ThreadCommandQueue()
|
nuclear@0
|
360 {
|
nuclear@0
|
361 pImpl = new ThreadCommandQueueImpl(this);
|
nuclear@0
|
362 }
|
nuclear@0
|
363 ThreadCommandQueue::~ThreadCommandQueue()
|
nuclear@0
|
364 {
|
nuclear@0
|
365 delete pImpl;
|
nuclear@0
|
366 }
|
nuclear@0
|
367
|
nuclear@0
|
368 bool ThreadCommandQueue::PushCommand(const ThreadCommand& command)
|
nuclear@0
|
369 {
|
nuclear@0
|
370 return pImpl->PushCommand(command);
|
nuclear@0
|
371 }
|
nuclear@0
|
372
|
nuclear@0
|
373 bool ThreadCommandQueue::PopCommand(ThreadCommand::PopBuffer* popBuffer)
|
nuclear@0
|
374 {
|
nuclear@0
|
375 return pImpl->PopCommand(popBuffer);
|
nuclear@0
|
376 }
|
nuclear@0
|
377
|
nuclear@0
|
378 void ThreadCommandQueue::PushExitCommand(bool wait)
|
nuclear@0
|
379 {
|
nuclear@0
|
380 // Exit is processed in two stages:
|
nuclear@0
|
381 // - First, ExitEnqueued flag is set to block further commands from queuing up.
|
nuclear@0
|
382 // - Second, the actual exit call is processed on the consumer thread, flushing
|
nuclear@0
|
383 // any prior commands.
|
nuclear@0
|
384 // IsExiting() only returns true after exit has flushed.
|
nuclear@0
|
385 {
|
nuclear@0
|
386 Lock::Locker lock(&pImpl->QueueLock);
|
nuclear@0
|
387 if (pImpl->ExitEnqueued)
|
nuclear@0
|
388 return;
|
nuclear@0
|
389 pImpl->ExitEnqueued = true;
|
nuclear@0
|
390 }
|
nuclear@0
|
391
|
nuclear@0
|
392 PushCommand(ThreadCommandQueueImpl::ExitCommand(pImpl, wait));
|
nuclear@0
|
393 }
|
nuclear@0
|
394
|
nuclear@0
|
395 bool ThreadCommandQueue::IsExiting() const
|
nuclear@0
|
396 {
|
nuclear@0
|
397 return pImpl->ExitProcessed;
|
nuclear@0
|
398 }
|
nuclear@0
|
399
|
nuclear@0
|
400
|
nuclear@0
|
401 } // namespace OVR
|