oculus1

view libovr/Src/OVR_ThreadCommandQueue.cpp @ 17:cfe4979ab3eb

ops, minor error in the last commit
author John Tsiombikas <nuclear@member.fsf.org>
date Sat, 21 Sep 2013 07:09:48 +0300
parents
children
line source
1 /************************************************************************************
3 PublicHeader: None
4 Filename : OVR_ThreadCommandQueue.cpp
5 Content : Command queue for operations executed on a thread
6 Created : October 29, 2012
8 Copyright : Copyright 2012 Oculus VR, Inc. All Rights reserved.
10 Use of this software is subject to the terms of the Oculus license
11 agreement provided at the time of installation or download, or which
12 otherwise accompanies this software in either electronic or hard copy form.
14 ************************************************************************************/
16 #include "OVR_ThreadCommandQueue.h"
18 namespace OVR {
21 //------------------------------------------------------------------------
22 // ***** CircularBuffer
24 // CircularBuffer is a FIFO buffer implemented in a single block of memory,
25 // which allows writing and reading variable-size data chucks. Write fails
26 // if buffer is full.
28 class CircularBuffer
29 {
30 enum {
31 AlignSize = 16,
32 AlignMask = AlignSize - 1
33 };
35 UByte* pBuffer;
36 UPInt Size;
37 UPInt Tail; // Byte offset of next item to be popped.
38 UPInt Head; // Byte offset of where next push will take place.
39 UPInt End; // When Head < Tail, this is used instead of Size.
41 inline UPInt roundUpSize(UPInt size)
42 { return (size + AlignMask) & ~(UPInt)AlignMask; }
44 public:
46 CircularBuffer(UPInt size)
47 : Size(size), Tail(0), Head(0), End(0)
48 {
49 pBuffer = (UByte*)OVR_ALLOC_ALIGNED(roundUpSize(size), AlignSize);
50 }
51 ~CircularBuffer()
52 {
53 // For ThreadCommands, we must consume everything before shutdown.
54 OVR_ASSERT(IsEmpty());
55 OVR_FREE_ALIGNED(pBuffer);
56 }
58 bool IsEmpty() const { return (Head == Tail); }
60 // Allocates a state block of specified size and advances pointers,
61 // returning 0 if buffer is full.
62 UByte* Write(UPInt size);
64 // Returns a pointer to next available data block; 0 if none available.
65 UByte* ReadBegin()
66 { return (Head != Tail) ? (pBuffer + Tail) : 0; }
67 // Consumes data of specified size; this must match size passed to Write.
68 void ReadEnd(UPInt size);
69 };
72 // Allocates a state block of specified size and advances pointers,
73 // returning 0 if buffer is full.
74 UByte* CircularBuffer::Write(UPInt size)
75 {
76 UByte* p = 0;
78 size = roundUpSize(size);
79 // Since this is circular buffer, always allow at least one item.
80 OVR_ASSERT(size < Size/2);
82 if (Head >= Tail)
83 {
84 OVR_ASSERT(End == 0);
86 if (size <= (Size - Head))
87 {
88 p = pBuffer + Head;
89 Head += size;
90 }
91 else if (size < Tail)
92 {
93 p = pBuffer;
94 End = Head;
95 Head = size;
96 OVR_ASSERT(Head != Tail);
97 }
98 }
99 else
100 {
101 OVR_ASSERT(End != 0);
103 if ((Tail - Head) > size)
104 {
105 p = pBuffer + Head;
106 Head += size;
107 OVR_ASSERT(Head != Tail);
108 }
109 }
111 return p;
112 }
114 void CircularBuffer::ReadEnd(UPInt size)
115 {
116 OVR_ASSERT(Head != Tail);
117 size = roundUpSize(size);
119 Tail += size;
120 if (Tail == End)
121 {
122 Tail = End = 0;
123 }
124 else if (Tail == Head)
125 {
126 OVR_ASSERT(End == 0);
127 Tail = Head = 0;
128 }
129 }
132 //-------------------------------------------------------------------------------------
133 // ***** ThreadCommand
135 ThreadCommand::PopBuffer::~PopBuffer()
136 {
137 if (Size)
138 Destruct<ThreadCommand>(toCommand());
139 }
141 void ThreadCommand::PopBuffer::InitFromBuffer(void* data)
142 {
143 ThreadCommand* cmd = (ThreadCommand*)data;
144 OVR_ASSERT(cmd->Size <= MaxSize);
146 if (Size)
147 Destruct<ThreadCommand>(toCommand());
148 Size = cmd->Size;
149 memcpy(Buffer, (void*)cmd, Size);
150 }
152 void ThreadCommand::PopBuffer::Execute()
153 {
154 ThreadCommand* command = toCommand();
155 OVR_ASSERT(command);
156 command->Execute();
157 if (NeedsWait())
158 GetEvent()->PulseEvent();
159 }
161 //-------------------------------------------------------------------------------------
163 class ThreadCommandQueueImpl : public NewOverrideBase
164 {
165 typedef ThreadCommand::NotifyEvent NotifyEvent;
166 friend class ThreadCommandQueue;
168 public:
170 ThreadCommandQueueImpl(ThreadCommandQueue* queue)
171 : pQueue(queue), CommandBuffer(2048),
172 ExitEnqueued(false), ExitProcessed(false)
173 {
174 }
175 ~ThreadCommandQueueImpl();
178 bool PushCommand(const ThreadCommand& command);
179 bool PopCommand(ThreadCommand::PopBuffer* popBuffer);
182 // ExitCommand is used by notify us that Thread is shutting down.
183 struct ExitCommand : public ThreadCommand
184 {
185 ThreadCommandQueueImpl* pImpl;
187 ExitCommand(ThreadCommandQueueImpl* impl, bool wait)
188 : ThreadCommand(sizeof(ExitCommand), wait, true), pImpl(impl) { }
190 virtual void Execute() const
191 {
192 Lock::Locker lock(&pImpl->QueueLock);
193 pImpl->ExitProcessed = true;
194 }
195 virtual ThreadCommand* CopyConstruct(void* p) const
196 { return Construct<ExitCommand>(p, *this); }
197 };
200 NotifyEvent* AllocNotifyEvent_NTS()
201 {
202 NotifyEvent* p = AvailableEvents.GetFirst();
204 if (!AvailableEvents.IsNull(p))
205 p->RemoveNode();
206 else
207 p = new NotifyEvent;
208 return p;
209 }
211 void FreeNotifyEvent_NTS(NotifyEvent* p)
212 {
213 AvailableEvents.PushBack(p);
214 }
216 void FreeNotifyEvents_NTS()
217 {
218 while(!AvailableEvents.IsEmpty())
219 {
220 NotifyEvent* p = AvailableEvents.GetFirst();
221 p->RemoveNode();
222 delete p;
223 }
224 }
226 ThreadCommandQueue* pQueue;
227 Lock QueueLock;
228 volatile bool ExitEnqueued;
229 volatile bool ExitProcessed;
230 List<NotifyEvent> AvailableEvents;
231 List<NotifyEvent> BlockedProducers;
232 CircularBuffer CommandBuffer;
233 };
237 ThreadCommandQueueImpl::~ThreadCommandQueueImpl()
238 {
239 Lock::Locker lock(&QueueLock);
240 OVR_ASSERT(BlockedProducers.IsEmpty());
241 FreeNotifyEvents_NTS();
242 }
244 bool ThreadCommandQueueImpl::PushCommand(const ThreadCommand& command)
245 {
246 ThreadCommand::NotifyEvent* completeEvent = 0;
247 ThreadCommand::NotifyEvent* queueAvailableEvent = 0;
249 // Repeat writing command into buffer until it is available.
250 do {
252 { // Lock Scope
253 Lock::Locker lock(&QueueLock);
255 if (queueAvailableEvent)
256 {
257 FreeNotifyEvent_NTS(queueAvailableEvent);
258 queueAvailableEvent = 0;
259 }
261 // Don't allow any commands after PushExitCommand() is called.
262 if (ExitEnqueued && !command.ExitFlag)
263 return false;
266 bool bufferWasEmpty = CommandBuffer.IsEmpty();
267 UByte* buffer = CommandBuffer.Write(command.GetSize());
268 if (buffer)
269 {
270 ThreadCommand* c = command.CopyConstruct(buffer);
271 if (c->NeedsWait())
272 completeEvent = c->pEvent = AllocNotifyEvent_NTS();
273 // Signal-waker consumer when we add data to buffer.
274 if (bufferWasEmpty)
275 pQueue->OnPushNonEmpty_Locked();
276 break;
277 }
279 queueAvailableEvent = AllocNotifyEvent_NTS();
280 BlockedProducers.PushBack(queueAvailableEvent);
281 } // Lock Scope
283 queueAvailableEvent->Wait();
285 } while(1);
287 // Command was enqueued, wait if necessary.
288 if (completeEvent)
289 {
290 completeEvent->Wait();
291 Lock::Locker lock(&QueueLock);
292 FreeNotifyEvent_NTS(completeEvent);
293 }
295 return true;
296 }
299 // Pops the next command from the thread queue, if any is available.
300 bool ThreadCommandQueueImpl::PopCommand(ThreadCommand::PopBuffer* popBuffer)
301 {
302 Lock::Locker lock(&QueueLock);
304 UByte* buffer = CommandBuffer.ReadBegin();
305 if (!buffer)
306 {
307 // Notify thread while in lock scope, enabling initialization of wait.
308 pQueue->OnPopEmpty_Locked();
309 return false;
310 }
312 popBuffer->InitFromBuffer(buffer);
313 CommandBuffer.ReadEnd(popBuffer->GetSize());
315 if (!BlockedProducers.IsEmpty())
316 {
317 ThreadCommand::NotifyEvent* queueAvailableEvent = BlockedProducers.GetFirst();
318 queueAvailableEvent->RemoveNode();
319 queueAvailableEvent->PulseEvent();
320 // Event is freed later by waiter.
321 }
322 return true;
323 }
326 //-------------------------------------------------------------------------------------
328 ThreadCommandQueue::ThreadCommandQueue()
329 {
330 pImpl = new ThreadCommandQueueImpl(this);
331 }
332 ThreadCommandQueue::~ThreadCommandQueue()
333 {
334 delete pImpl;
335 }
337 bool ThreadCommandQueue::PushCommand(const ThreadCommand& command)
338 {
339 return pImpl->PushCommand(command);
340 }
342 bool ThreadCommandQueue::PopCommand(ThreadCommand::PopBuffer* popBuffer)
343 {
344 return pImpl->PopCommand(popBuffer);
345 }
347 void ThreadCommandQueue::PushExitCommand(bool wait)
348 {
349 // Exit is processed in two stages:
350 // - First, ExitEnqueued flag is set to block further commands from queuing up.
351 // - Second, the actual exit call is processed on the consumer thread, flushing
352 // any prior commands.
353 // IsExiting() only returns true after exit has flushed.
354 {
355 Lock::Locker lock(&pImpl->QueueLock);
356 if (pImpl->ExitEnqueued)
357 return;
358 pImpl->ExitEnqueued = true;
359 }
361 PushCommand(ThreadCommandQueueImpl::ExitCommand(pImpl, wait));
362 }
364 bool ThreadCommandQueue::IsExiting() const
365 {
366 return pImpl->ExitProcessed;
367 }
370 } // namespace OVR