rev |
line source |
nuclear@1
|
1 /************************************************************************************
|
nuclear@1
|
2
|
nuclear@1
|
3 PublicHeader: None
|
nuclear@1
|
4 Filename : OVR_ThreadCommandQueue.cpp
|
nuclear@1
|
5 Content : Command queue for operations executed on a thread
|
nuclear@1
|
6 Created : October 29, 2012
|
nuclear@1
|
7
|
nuclear@1
|
8 Copyright : Copyright 2012 Oculus VR, Inc. All Rights reserved.
|
nuclear@1
|
9
|
nuclear@1
|
10 Use of this software is subject to the terms of the Oculus license
|
nuclear@1
|
11 agreement provided at the time of installation or download, or which
|
nuclear@1
|
12 otherwise accompanies this software in either electronic or hard copy form.
|
nuclear@1
|
13
|
nuclear@1
|
14 ************************************************************************************/
|
nuclear@1
|
15
|
nuclear@1
|
16 #include "OVR_ThreadCommandQueue.h"
|
nuclear@1
|
17
|
nuclear@1
|
18 namespace OVR {
|
nuclear@1
|
19
|
nuclear@1
|
20
|
nuclear@1
|
21 //------------------------------------------------------------------------
|
nuclear@1
|
22 // ***** CircularBuffer
|
nuclear@1
|
23
|
nuclear@1
|
24 // CircularBuffer is a FIFO buffer implemented in a single block of memory,
|
nuclear@1
|
25 // which allows writing and reading variable-size data chucks. Write fails
|
nuclear@1
|
26 // if buffer is full.
|
nuclear@1
|
27
|
nuclear@1
|
28 class CircularBuffer
|
nuclear@1
|
29 {
|
nuclear@1
|
30 enum {
|
nuclear@1
|
31 AlignSize = 16,
|
nuclear@1
|
32 AlignMask = AlignSize - 1
|
nuclear@1
|
33 };
|
nuclear@1
|
34
|
nuclear@1
|
35 UByte* pBuffer;
|
nuclear@1
|
36 UPInt Size;
|
nuclear@1
|
37 UPInt Tail; // Byte offset of next item to be popped.
|
nuclear@1
|
38 UPInt Head; // Byte offset of where next push will take place.
|
nuclear@1
|
39 UPInt End; // When Head < Tail, this is used instead of Size.
|
nuclear@1
|
40
|
nuclear@1
|
41 inline UPInt roundUpSize(UPInt size)
|
nuclear@1
|
42 { return (size + AlignMask) & ~(UPInt)AlignMask; }
|
nuclear@1
|
43
|
nuclear@1
|
44 public:
|
nuclear@1
|
45
|
nuclear@1
|
46 CircularBuffer(UPInt size)
|
nuclear@1
|
47 : Size(size), Tail(0), Head(0), End(0)
|
nuclear@1
|
48 {
|
nuclear@1
|
49 pBuffer = (UByte*)OVR_ALLOC_ALIGNED(roundUpSize(size), AlignSize);
|
nuclear@1
|
50 }
|
nuclear@1
|
51 ~CircularBuffer()
|
nuclear@1
|
52 {
|
nuclear@1
|
53 // For ThreadCommands, we must consume everything before shutdown.
|
nuclear@1
|
54 OVR_ASSERT(IsEmpty());
|
nuclear@1
|
55 OVR_FREE_ALIGNED(pBuffer);
|
nuclear@1
|
56 }
|
nuclear@1
|
57
|
nuclear@1
|
58 bool IsEmpty() const { return (Head == Tail); }
|
nuclear@1
|
59
|
nuclear@1
|
60 // Allocates a state block of specified size and advances pointers,
|
nuclear@1
|
61 // returning 0 if buffer is full.
|
nuclear@1
|
62 UByte* Write(UPInt size);
|
nuclear@1
|
63
|
nuclear@1
|
64 // Returns a pointer to next available data block; 0 if none available.
|
nuclear@1
|
65 UByte* ReadBegin()
|
nuclear@1
|
66 { return (Head != Tail) ? (pBuffer + Tail) : 0; }
|
nuclear@1
|
67 // Consumes data of specified size; this must match size passed to Write.
|
nuclear@1
|
68 void ReadEnd(UPInt size);
|
nuclear@1
|
69 };
|
nuclear@1
|
70
|
nuclear@1
|
71
|
nuclear@1
|
72 // Allocates a state block of specified size and advances pointers,
|
nuclear@1
|
73 // returning 0 if buffer is full.
|
nuclear@1
|
74 UByte* CircularBuffer::Write(UPInt size)
|
nuclear@1
|
75 {
|
nuclear@1
|
76 UByte* p = 0;
|
nuclear@1
|
77
|
nuclear@1
|
78 size = roundUpSize(size);
|
nuclear@1
|
79 // Since this is circular buffer, always allow at least one item.
|
nuclear@1
|
80 OVR_ASSERT(size < Size/2);
|
nuclear@1
|
81
|
nuclear@1
|
82 if (Head >= Tail)
|
nuclear@1
|
83 {
|
nuclear@1
|
84 OVR_ASSERT(End == 0);
|
nuclear@1
|
85
|
nuclear@1
|
86 if (size <= (Size - Head))
|
nuclear@1
|
87 {
|
nuclear@1
|
88 p = pBuffer + Head;
|
nuclear@1
|
89 Head += size;
|
nuclear@1
|
90 }
|
nuclear@1
|
91 else if (size < Tail)
|
nuclear@1
|
92 {
|
nuclear@1
|
93 p = pBuffer;
|
nuclear@1
|
94 End = Head;
|
nuclear@1
|
95 Head = size;
|
nuclear@1
|
96 OVR_ASSERT(Head != Tail);
|
nuclear@1
|
97 }
|
nuclear@1
|
98 }
|
nuclear@1
|
99 else
|
nuclear@1
|
100 {
|
nuclear@1
|
101 OVR_ASSERT(End != 0);
|
nuclear@1
|
102
|
nuclear@1
|
103 if ((Tail - Head) > size)
|
nuclear@1
|
104 {
|
nuclear@1
|
105 p = pBuffer + Head;
|
nuclear@1
|
106 Head += size;
|
nuclear@1
|
107 OVR_ASSERT(Head != Tail);
|
nuclear@1
|
108 }
|
nuclear@1
|
109 }
|
nuclear@1
|
110
|
nuclear@1
|
111 return p;
|
nuclear@1
|
112 }
|
nuclear@1
|
113
|
nuclear@1
|
114 void CircularBuffer::ReadEnd(UPInt size)
|
nuclear@1
|
115 {
|
nuclear@1
|
116 OVR_ASSERT(Head != Tail);
|
nuclear@1
|
117 size = roundUpSize(size);
|
nuclear@1
|
118
|
nuclear@1
|
119 Tail += size;
|
nuclear@1
|
120 if (Tail == End)
|
nuclear@1
|
121 {
|
nuclear@1
|
122 Tail = End = 0;
|
nuclear@1
|
123 }
|
nuclear@1
|
124 else if (Tail == Head)
|
nuclear@1
|
125 {
|
nuclear@1
|
126 OVR_ASSERT(End == 0);
|
nuclear@1
|
127 Tail = Head = 0;
|
nuclear@1
|
128 }
|
nuclear@1
|
129 }
|
nuclear@1
|
130
|
nuclear@1
|
131
|
nuclear@1
|
132 //-------------------------------------------------------------------------------------
|
nuclear@1
|
133 // ***** ThreadCommand
|
nuclear@1
|
134
|
nuclear@1
|
135 ThreadCommand::PopBuffer::~PopBuffer()
|
nuclear@1
|
136 {
|
nuclear@1
|
137 if (Size)
|
nuclear@1
|
138 Destruct<ThreadCommand>(toCommand());
|
nuclear@1
|
139 }
|
nuclear@1
|
140
|
nuclear@1
|
141 void ThreadCommand::PopBuffer::InitFromBuffer(void* data)
|
nuclear@1
|
142 {
|
nuclear@1
|
143 ThreadCommand* cmd = (ThreadCommand*)data;
|
nuclear@1
|
144 OVR_ASSERT(cmd->Size <= MaxSize);
|
nuclear@1
|
145
|
nuclear@1
|
146 if (Size)
|
nuclear@1
|
147 Destruct<ThreadCommand>(toCommand());
|
nuclear@1
|
148 Size = cmd->Size;
|
nuclear@1
|
149 memcpy(Buffer, (void*)cmd, Size);
|
nuclear@1
|
150 }
|
nuclear@1
|
151
|
nuclear@1
|
152 void ThreadCommand::PopBuffer::Execute()
|
nuclear@1
|
153 {
|
nuclear@1
|
154 ThreadCommand* command = toCommand();
|
nuclear@1
|
155 OVR_ASSERT(command);
|
nuclear@1
|
156 command->Execute();
|
nuclear@1
|
157 if (NeedsWait())
|
nuclear@1
|
158 GetEvent()->PulseEvent();
|
nuclear@1
|
159 }
|
nuclear@1
|
160
|
nuclear@1
|
161 //-------------------------------------------------------------------------------------
|
nuclear@1
|
162
|
nuclear@1
|
163 class ThreadCommandQueueImpl : public NewOverrideBase
|
nuclear@1
|
164 {
|
nuclear@1
|
165 typedef ThreadCommand::NotifyEvent NotifyEvent;
|
nuclear@1
|
166 friend class ThreadCommandQueue;
|
nuclear@1
|
167
|
nuclear@1
|
168 public:
|
nuclear@1
|
169
|
nuclear@1
|
170 ThreadCommandQueueImpl(ThreadCommandQueue* queue)
|
nuclear@1
|
171 : pQueue(queue), CommandBuffer(2048),
|
nuclear@1
|
172 ExitEnqueued(false), ExitProcessed(false)
|
nuclear@1
|
173 {
|
nuclear@1
|
174 }
|
nuclear@1
|
175 ~ThreadCommandQueueImpl();
|
nuclear@1
|
176
|
nuclear@1
|
177
|
nuclear@1
|
178 bool PushCommand(const ThreadCommand& command);
|
nuclear@1
|
179 bool PopCommand(ThreadCommand::PopBuffer* popBuffer);
|
nuclear@1
|
180
|
nuclear@1
|
181
|
nuclear@1
|
182 // ExitCommand is used by notify us that Thread is shutting down.
|
nuclear@1
|
183 struct ExitCommand : public ThreadCommand
|
nuclear@1
|
184 {
|
nuclear@1
|
185 ThreadCommandQueueImpl* pImpl;
|
nuclear@1
|
186
|
nuclear@1
|
187 ExitCommand(ThreadCommandQueueImpl* impl, bool wait)
|
nuclear@1
|
188 : ThreadCommand(sizeof(ExitCommand), wait, true), pImpl(impl) { }
|
nuclear@1
|
189
|
nuclear@1
|
190 virtual void Execute() const
|
nuclear@1
|
191 {
|
nuclear@1
|
192 Lock::Locker lock(&pImpl->QueueLock);
|
nuclear@1
|
193 pImpl->ExitProcessed = true;
|
nuclear@1
|
194 }
|
nuclear@1
|
195 virtual ThreadCommand* CopyConstruct(void* p) const
|
nuclear@1
|
196 { return Construct<ExitCommand>(p, *this); }
|
nuclear@1
|
197 };
|
nuclear@1
|
198
|
nuclear@1
|
199
|
nuclear@1
|
200 NotifyEvent* AllocNotifyEvent_NTS()
|
nuclear@1
|
201 {
|
nuclear@1
|
202 NotifyEvent* p = AvailableEvents.GetFirst();
|
nuclear@1
|
203
|
nuclear@1
|
204 if (!AvailableEvents.IsNull(p))
|
nuclear@1
|
205 p->RemoveNode();
|
nuclear@1
|
206 else
|
nuclear@1
|
207 p = new NotifyEvent;
|
nuclear@1
|
208 return p;
|
nuclear@1
|
209 }
|
nuclear@1
|
210
|
nuclear@1
|
211 void FreeNotifyEvent_NTS(NotifyEvent* p)
|
nuclear@1
|
212 {
|
nuclear@1
|
213 AvailableEvents.PushBack(p);
|
nuclear@1
|
214 }
|
nuclear@1
|
215
|
nuclear@1
|
216 void FreeNotifyEvents_NTS()
|
nuclear@1
|
217 {
|
nuclear@1
|
218 while(!AvailableEvents.IsEmpty())
|
nuclear@1
|
219 {
|
nuclear@1
|
220 NotifyEvent* p = AvailableEvents.GetFirst();
|
nuclear@1
|
221 p->RemoveNode();
|
nuclear@1
|
222 delete p;
|
nuclear@1
|
223 }
|
nuclear@1
|
224 }
|
nuclear@1
|
225
|
nuclear@1
|
226 ThreadCommandQueue* pQueue;
|
nuclear@1
|
227 Lock QueueLock;
|
nuclear@1
|
228 volatile bool ExitEnqueued;
|
nuclear@1
|
229 volatile bool ExitProcessed;
|
nuclear@1
|
230 List<NotifyEvent> AvailableEvents;
|
nuclear@1
|
231 List<NotifyEvent> BlockedProducers;
|
nuclear@1
|
232 CircularBuffer CommandBuffer;
|
nuclear@1
|
233 };
|
nuclear@1
|
234
|
nuclear@1
|
235
|
nuclear@1
|
236
|
nuclear@1
|
237 ThreadCommandQueueImpl::~ThreadCommandQueueImpl()
|
nuclear@1
|
238 {
|
nuclear@1
|
239 Lock::Locker lock(&QueueLock);
|
nuclear@1
|
240 OVR_ASSERT(BlockedProducers.IsEmpty());
|
nuclear@1
|
241 FreeNotifyEvents_NTS();
|
nuclear@1
|
242 }
|
nuclear@1
|
243
|
nuclear@1
|
244 bool ThreadCommandQueueImpl::PushCommand(const ThreadCommand& command)
|
nuclear@1
|
245 {
|
nuclear@1
|
246 ThreadCommand::NotifyEvent* completeEvent = 0;
|
nuclear@1
|
247 ThreadCommand::NotifyEvent* queueAvailableEvent = 0;
|
nuclear@1
|
248
|
nuclear@1
|
249 // Repeat writing command into buffer until it is available.
|
nuclear@1
|
250 do {
|
nuclear@1
|
251
|
nuclear@1
|
252 { // Lock Scope
|
nuclear@1
|
253 Lock::Locker lock(&QueueLock);
|
nuclear@1
|
254
|
nuclear@1
|
255 if (queueAvailableEvent)
|
nuclear@1
|
256 {
|
nuclear@1
|
257 FreeNotifyEvent_NTS(queueAvailableEvent);
|
nuclear@1
|
258 queueAvailableEvent = 0;
|
nuclear@1
|
259 }
|
nuclear@1
|
260
|
nuclear@1
|
261 // Don't allow any commands after PushExitCommand() is called.
|
nuclear@1
|
262 if (ExitEnqueued && !command.ExitFlag)
|
nuclear@1
|
263 return false;
|
nuclear@1
|
264
|
nuclear@1
|
265
|
nuclear@1
|
266 bool bufferWasEmpty = CommandBuffer.IsEmpty();
|
nuclear@1
|
267 UByte* buffer = CommandBuffer.Write(command.GetSize());
|
nuclear@1
|
268 if (buffer)
|
nuclear@1
|
269 {
|
nuclear@1
|
270 ThreadCommand* c = command.CopyConstruct(buffer);
|
nuclear@1
|
271 if (c->NeedsWait())
|
nuclear@1
|
272 completeEvent = c->pEvent = AllocNotifyEvent_NTS();
|
nuclear@1
|
273 // Signal-waker consumer when we add data to buffer.
|
nuclear@1
|
274 if (bufferWasEmpty)
|
nuclear@1
|
275 pQueue->OnPushNonEmpty_Locked();
|
nuclear@1
|
276 break;
|
nuclear@1
|
277 }
|
nuclear@1
|
278
|
nuclear@1
|
279 queueAvailableEvent = AllocNotifyEvent_NTS();
|
nuclear@1
|
280 BlockedProducers.PushBack(queueAvailableEvent);
|
nuclear@1
|
281 } // Lock Scope
|
nuclear@1
|
282
|
nuclear@1
|
283 queueAvailableEvent->Wait();
|
nuclear@1
|
284
|
nuclear@1
|
285 } while(1);
|
nuclear@1
|
286
|
nuclear@1
|
287 // Command was enqueued, wait if necessary.
|
nuclear@1
|
288 if (completeEvent)
|
nuclear@1
|
289 {
|
nuclear@1
|
290 completeEvent->Wait();
|
nuclear@1
|
291 Lock::Locker lock(&QueueLock);
|
nuclear@1
|
292 FreeNotifyEvent_NTS(completeEvent);
|
nuclear@1
|
293 }
|
nuclear@1
|
294
|
nuclear@1
|
295 return true;
|
nuclear@1
|
296 }
|
nuclear@1
|
297
|
nuclear@1
|
298
|
nuclear@1
|
299 // Pops the next command from the thread queue, if any is available.
|
nuclear@1
|
300 bool ThreadCommandQueueImpl::PopCommand(ThreadCommand::PopBuffer* popBuffer)
|
nuclear@1
|
301 {
|
nuclear@1
|
302 Lock::Locker lock(&QueueLock);
|
nuclear@1
|
303
|
nuclear@1
|
304 UByte* buffer = CommandBuffer.ReadBegin();
|
nuclear@1
|
305 if (!buffer)
|
nuclear@1
|
306 {
|
nuclear@1
|
307 // Notify thread while in lock scope, enabling initialization of wait.
|
nuclear@1
|
308 pQueue->OnPopEmpty_Locked();
|
nuclear@1
|
309 return false;
|
nuclear@1
|
310 }
|
nuclear@1
|
311
|
nuclear@1
|
312 popBuffer->InitFromBuffer(buffer);
|
nuclear@1
|
313 CommandBuffer.ReadEnd(popBuffer->GetSize());
|
nuclear@1
|
314
|
nuclear@1
|
315 if (!BlockedProducers.IsEmpty())
|
nuclear@1
|
316 {
|
nuclear@1
|
317 ThreadCommand::NotifyEvent* queueAvailableEvent = BlockedProducers.GetFirst();
|
nuclear@1
|
318 queueAvailableEvent->RemoveNode();
|
nuclear@1
|
319 queueAvailableEvent->PulseEvent();
|
nuclear@1
|
320 // Event is freed later by waiter.
|
nuclear@1
|
321 }
|
nuclear@1
|
322 return true;
|
nuclear@1
|
323 }
|
nuclear@1
|
324
|
nuclear@1
|
325
|
nuclear@1
|
326 //-------------------------------------------------------------------------------------
|
nuclear@1
|
327
|
nuclear@1
|
328 ThreadCommandQueue::ThreadCommandQueue()
|
nuclear@1
|
329 {
|
nuclear@1
|
330 pImpl = new ThreadCommandQueueImpl(this);
|
nuclear@1
|
331 }
|
nuclear@1
|
332 ThreadCommandQueue::~ThreadCommandQueue()
|
nuclear@1
|
333 {
|
nuclear@1
|
334 delete pImpl;
|
nuclear@1
|
335 }
|
nuclear@1
|
336
|
nuclear@1
|
337 bool ThreadCommandQueue::PushCommand(const ThreadCommand& command)
|
nuclear@1
|
338 {
|
nuclear@1
|
339 return pImpl->PushCommand(command);
|
nuclear@1
|
340 }
|
nuclear@1
|
341
|
nuclear@1
|
342 bool ThreadCommandQueue::PopCommand(ThreadCommand::PopBuffer* popBuffer)
|
nuclear@1
|
343 {
|
nuclear@1
|
344 return pImpl->PopCommand(popBuffer);
|
nuclear@1
|
345 }
|
nuclear@1
|
346
|
nuclear@1
|
347 void ThreadCommandQueue::PushExitCommand(bool wait)
|
nuclear@1
|
348 {
|
nuclear@1
|
349 // Exit is processed in two stages:
|
nuclear@1
|
350 // - First, ExitEnqueued flag is set to block further commands from queuing up.
|
nuclear@1
|
351 // - Second, the actual exit call is processed on the consumer thread, flushing
|
nuclear@1
|
352 // any prior commands.
|
nuclear@1
|
353 // IsExiting() only returns true after exit has flushed.
|
nuclear@1
|
354 {
|
nuclear@1
|
355 Lock::Locker lock(&pImpl->QueueLock);
|
nuclear@1
|
356 if (pImpl->ExitEnqueued)
|
nuclear@1
|
357 return;
|
nuclear@1
|
358 pImpl->ExitEnqueued = true;
|
nuclear@1
|
359 }
|
nuclear@1
|
360
|
nuclear@1
|
361 PushCommand(ThreadCommandQueueImpl::ExitCommand(pImpl, wait));
|
nuclear@1
|
362 }
|
nuclear@1
|
363
|
nuclear@1
|
364 bool ThreadCommandQueue::IsExiting() const
|
nuclear@1
|
365 {
|
nuclear@1
|
366 return pImpl->ExitProcessed;
|
nuclear@1
|
367 }
|
nuclear@1
|
368
|
nuclear@1
|
369
|
nuclear@1
|
370 } // namespace OVR
|