ovr_sdk

view LibOVR/Src/Kernel/OVR_ThreadCommandQueue.cpp @ 0:1b39a1b46319

initial 0.4.4
author John Tsiombikas <nuclear@member.fsf.org>
date Wed, 14 Jan 2015 06:51:16 +0200
parents
children
line source
1 /************************************************************************************
3 PublicHeader: None
4 Filename : OVR_ThreadCommandQueue.cpp
5 Content : Command queue for operations executed on a thread
6 Created : October 29, 2012
8 Copyright : Copyright 2014 Oculus VR, LLC All Rights reserved.
10 Licensed under the Oculus VR Rift SDK License Version 3.2 (the "License");
11 you may not use the Oculus VR Rift SDK except in compliance with the License,
12 which is provided at the time of installation or download, or which
13 otherwise accompanies this software in either electronic or hard copy form.
15 You may obtain a copy of the License at
17 http://www.oculusvr.com/licenses/LICENSE-3.2
19 Unless required by applicable law or agreed to in writing, the Oculus VR SDK
20 distributed under the License is distributed on an "AS IS" BASIS,
21 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
22 See the License for the specific language governing permissions and
23 limitations under the License.
25 ************************************************************************************/
27 #include "OVR_ThreadCommandQueue.h"
29 namespace OVR {
32 //------------------------------------------------------------------------
33 // ***** CircularBuffer
35 // CircularBuffer is a FIFO buffer implemented in a single block of memory,
36 // which allows writing and reading variable-size data chucks. Write fails
37 // if buffer is full.
39 class CircularBuffer
40 {
41 enum {
42 AlignSize = 16,
43 AlignMask = AlignSize - 1
44 };
46 uint8_t* pBuffer;
47 size_t Size;
48 size_t Tail; // Byte offset of next item to be popped.
49 size_t Head; // Byte offset of where next push will take place.
50 size_t End; // When Head < Tail, this is used instead of Size.
52 inline size_t roundUpSize(size_t size)
53 { return (size + AlignMask) & ~(size_t)AlignMask; }
55 public:
57 CircularBuffer(size_t size)
58 : Size(size), Tail(0), Head(0), End(0)
59 {
60 pBuffer = (uint8_t*)OVR_ALLOC_ALIGNED(roundUpSize(size), AlignSize);
61 }
62 ~CircularBuffer()
63 {
64 // For ThreadCommands, we must consume everything before shutdown.
65 OVR_ASSERT(IsEmpty());
66 OVR_FREE_ALIGNED(pBuffer);
67 }
69 bool IsEmpty() const { return (Head == Tail); }
71 // Allocates a state block of specified size and advances pointers,
72 // returning 0 if buffer is full.
73 uint8_t* Write(size_t size);
75 // Returns a pointer to next available data block; 0 if none available.
76 uint8_t* ReadBegin()
77 { return (Head != Tail) ? (pBuffer + Tail) : 0; }
78 // Consumes data of specified size; this must match size passed to Write.
79 void ReadEnd(size_t size);
80 };
83 // Allocates a state block of specified size and advances pointers,
84 // returning 0 if buffer is full.
85 uint8_t* CircularBuffer::Write(size_t size)
86 {
87 uint8_t* p = 0;
89 size = roundUpSize(size);
90 // Since this is circular buffer, always allow at least one item.
91 OVR_ASSERT(size < Size/2);
93 if (Head >= Tail)
94 {
95 OVR_ASSERT(End == 0);
97 if (size <= (Size - Head))
98 {
99 p = pBuffer + Head;
100 Head += size;
101 }
102 else if (size < Tail)
103 {
104 p = pBuffer;
105 End = Head;
106 Head = size;
107 OVR_ASSERT(Head != Tail);
108 }
109 }
110 else
111 {
112 OVR_ASSERT(End != 0);
114 if ((Tail - Head) > size)
115 {
116 p = pBuffer + Head;
117 Head += size;
118 OVR_ASSERT(Head != Tail);
119 }
120 }
122 return p;
123 }
125 void CircularBuffer::ReadEnd(size_t size)
126 {
127 OVR_ASSERT(Head != Tail);
128 size = roundUpSize(size);
130 Tail += size;
131 if (Tail == End)
132 {
133 Tail = End = 0;
134 }
135 else if (Tail == Head)
136 {
137 OVR_ASSERT(End == 0);
138 Tail = Head = 0;
139 }
140 }
143 //-------------------------------------------------------------------------------------
144 // ***** ThreadCommand
146 ThreadCommand::PopBuffer::~PopBuffer()
147 {
148 if (Size) {
149 Destruct<ThreadCommand>(toCommand());
150 }
151 }
153 void ThreadCommand::PopBuffer::InitFromBuffer(void* data)
154 {
155 ThreadCommand* cmd = (ThreadCommand*)data;
156 OVR_ASSERT(cmd->Size <= MaxSize);
158 if (Size) {
159 Destruct<ThreadCommand>(toCommand());
160 }
161 Size = cmd->Size;
162 memcpy(Buffer, (void*)cmd, Size);
163 }
165 void ThreadCommand::PopBuffer::Execute()
166 {
167 ThreadCommand* command = toCommand();
168 OVR_ASSERT(command);
169 command->Execute();
170 if (NeedsWait()) {
171 GetEvent()->PulseEvent();
172 }
173 }
175 //-------------------------------------------------------------------------------------
177 class ThreadCommandQueueImpl : public NewOverrideBase
178 {
179 typedef ThreadCommand::NotifyEvent NotifyEvent;
180 friend class ThreadCommandQueue;
182 public:
184 ThreadCommandQueueImpl(ThreadCommandQueue* queue) :
185 pQueue(queue),
186 ExitEnqueued(false),
187 ExitProcessed(false),
188 CommandBuffer(2048),
189 PullThreadId(0)
190 {
191 }
192 ~ThreadCommandQueueImpl();
195 bool PushCommand(const ThreadCommand& command);
196 bool PopCommand(ThreadCommand::PopBuffer* popBuffer);
199 // ExitCommand is used by notify us that Thread is shutting down.
200 struct ExitCommand : public ThreadCommand
201 {
202 ThreadCommandQueueImpl* pImpl;
204 ExitCommand(ThreadCommandQueueImpl* impl, bool wait)
205 : ThreadCommand(sizeof(ExitCommand), wait, true), pImpl(impl) { }
207 virtual void Execute() const
208 {
209 Lock::Locker lock(&pImpl->QueueLock);
210 pImpl->ExitProcessed = true;
211 }
212 virtual ThreadCommand* CopyConstruct(void* p) const
213 { return Construct<ExitCommand>(p, *this); }
214 };
217 NotifyEvent* AllocNotifyEvent_NTS()
218 {
219 NotifyEvent* p = AvailableEvents.GetFirst();
221 if (!AvailableEvents.IsNull(p))
222 p->RemoveNode();
223 else
224 p = new NotifyEvent;
225 return p;
226 }
228 void FreeNotifyEvent_NTS(NotifyEvent* p)
229 {
230 AvailableEvents.PushBack(p);
231 }
233 void FreeNotifyEvents_NTS()
234 {
235 while(!AvailableEvents.IsEmpty())
236 {
237 NotifyEvent* p = AvailableEvents.GetFirst();
238 p->RemoveNode();
239 delete p;
240 }
241 }
243 ThreadCommandQueue* pQueue;
244 Lock QueueLock;
245 volatile bool ExitEnqueued;
246 volatile bool ExitProcessed;
247 List<NotifyEvent> AvailableEvents;
248 List<NotifyEvent> BlockedProducers;
249 CircularBuffer CommandBuffer;
251 // The pull thread id is set to the last thread that pulled commands.
252 // Since this thread command queue is designed for a single thread,
253 // reentrant behavior that would cause a dead-lock for messages that
254 // wait for completion can be avoided by simply comparing the
255 // thread id of the last pull.
256 OVR::ThreadId PullThreadId;
257 };
259 ThreadCommandQueueImpl::~ThreadCommandQueueImpl()
260 {
261 Lock::Locker lock(&QueueLock);
262 OVR_ASSERT(BlockedProducers.IsEmpty());
263 FreeNotifyEvents_NTS();
264 }
266 bool ThreadCommandQueueImpl::PushCommand(const ThreadCommand& command)
267 {
268 if (command.NeedsWait() && PullThreadId == OVR::GetCurrentThreadId())
269 {
270 command.Execute();
271 return true;
272 }
274 ThreadCommand::NotifyEvent* completeEvent = 0;
275 ThreadCommand::NotifyEvent* queueAvailableEvent = 0;
277 // Repeat writing command into buffer until it is available.
278 for (;;) {
279 { // Lock Scope
280 Lock::Locker lock(&QueueLock);
282 if (queueAvailableEvent) {
283 FreeNotifyEvent_NTS(queueAvailableEvent);
284 queueAvailableEvent = 0;
285 }
287 // Don't allow any commands after PushExitCommand() is called.
288 if (ExitEnqueued && !command.ExitFlag) {
289 return false;
290 }
292 bool bufferWasEmpty = CommandBuffer.IsEmpty();
293 uint8_t* buffer = CommandBuffer.Write(command.GetSize());
295 if (buffer) {
296 ThreadCommand* c = command.CopyConstruct(buffer);
298 if (c->NeedsWait()) {
299 completeEvent = c->pEvent = AllocNotifyEvent_NTS();
300 }
302 // Signal-waker consumer when we add data to buffer.
303 if (bufferWasEmpty) {
304 pQueue->OnPushNonEmpty_Locked();
305 }
307 break;
308 }
310 queueAvailableEvent = AllocNotifyEvent_NTS();
311 BlockedProducers.PushBack(queueAvailableEvent);
312 } // Lock Scope
314 queueAvailableEvent->Wait();
315 } // Intentional infinite loop
317 // Command was enqueued, wait if necessary.
318 if (completeEvent) {
319 completeEvent->Wait();
320 Lock::Locker lock(&QueueLock);
321 FreeNotifyEvent_NTS(completeEvent);
322 }
324 return true;
325 }
328 // Pops the next command from the thread queue, if any is available.
329 bool ThreadCommandQueueImpl::PopCommand(ThreadCommand::PopBuffer* popBuffer)
330 {
331 PullThreadId = OVR::GetCurrentThreadId();
333 Lock::Locker lock(&QueueLock);
335 uint8_t* buffer = CommandBuffer.ReadBegin();
336 if (!buffer)
337 {
338 // Notify thread while in lock scope, enabling initialization of wait.
339 pQueue->OnPopEmpty_Locked();
340 return false;
341 }
343 popBuffer->InitFromBuffer(buffer);
344 CommandBuffer.ReadEnd(popBuffer->GetSize());
346 if (!BlockedProducers.IsEmpty())
347 {
348 ThreadCommand::NotifyEvent* queueAvailableEvent = BlockedProducers.GetFirst();
349 queueAvailableEvent->RemoveNode();
350 queueAvailableEvent->PulseEvent();
351 // Event is freed later by waiter.
352 }
353 return true;
354 }
357 //-------------------------------------------------------------------------------------
359 ThreadCommandQueue::ThreadCommandQueue()
360 {
361 pImpl = new ThreadCommandQueueImpl(this);
362 }
363 ThreadCommandQueue::~ThreadCommandQueue()
364 {
365 delete pImpl;
366 }
368 bool ThreadCommandQueue::PushCommand(const ThreadCommand& command)
369 {
370 return pImpl->PushCommand(command);
371 }
373 bool ThreadCommandQueue::PopCommand(ThreadCommand::PopBuffer* popBuffer)
374 {
375 return pImpl->PopCommand(popBuffer);
376 }
378 void ThreadCommandQueue::PushExitCommand(bool wait)
379 {
380 // Exit is processed in two stages:
381 // - First, ExitEnqueued flag is set to block further commands from queuing up.
382 // - Second, the actual exit call is processed on the consumer thread, flushing
383 // any prior commands.
384 // IsExiting() only returns true after exit has flushed.
385 {
386 Lock::Locker lock(&pImpl->QueueLock);
387 if (pImpl->ExitEnqueued)
388 return;
389 pImpl->ExitEnqueued = true;
390 }
392 PushCommand(ThreadCommandQueueImpl::ExitCommand(pImpl, wait));
393 }
395 bool ThreadCommandQueue::IsExiting() const
396 {
397 return pImpl->ExitProcessed;
398 }
401 } // namespace OVR