oculus1

annotate libovr/Src/OVR_ThreadCommandQueue.cpp @ 24:8419d8a13cee

foo
author John Tsiombikas <nuclear@member.fsf.org>
date Fri, 04 Oct 2013 14:50:26 +0300
parents
children
rev   line source
nuclear@1 1 /************************************************************************************
nuclear@1 2
nuclear@1 3 PublicHeader: None
nuclear@1 4 Filename : OVR_ThreadCommandQueue.cpp
nuclear@1 5 Content : Command queue for operations executed on a thread
nuclear@1 6 Created : October 29, 2012
nuclear@1 7
nuclear@1 8 Copyright : Copyright 2012 Oculus VR, Inc. All Rights reserved.
nuclear@1 9
nuclear@1 10 Use of this software is subject to the terms of the Oculus license
nuclear@1 11 agreement provided at the time of installation or download, or which
nuclear@1 12 otherwise accompanies this software in either electronic or hard copy form.
nuclear@1 13
nuclear@1 14 ************************************************************************************/
nuclear@1 15
nuclear@1 16 #include "OVR_ThreadCommandQueue.h"
nuclear@1 17
nuclear@1 18 namespace OVR {
nuclear@1 19
nuclear@1 20
nuclear@1 21 //------------------------------------------------------------------------
nuclear@1 22 // ***** CircularBuffer
nuclear@1 23
nuclear@1 24 // CircularBuffer is a FIFO buffer implemented in a single block of memory,
nuclear@1 25 // which allows writing and reading variable-size data chucks. Write fails
nuclear@1 26 // if buffer is full.
nuclear@1 27
nuclear@1 28 class CircularBuffer
nuclear@1 29 {
nuclear@1 30 enum {
nuclear@1 31 AlignSize = 16,
nuclear@1 32 AlignMask = AlignSize - 1
nuclear@1 33 };
nuclear@1 34
nuclear@1 35 UByte* pBuffer;
nuclear@1 36 UPInt Size;
nuclear@1 37 UPInt Tail; // Byte offset of next item to be popped.
nuclear@1 38 UPInt Head; // Byte offset of where next push will take place.
nuclear@1 39 UPInt End; // When Head < Tail, this is used instead of Size.
nuclear@1 40
nuclear@1 41 inline UPInt roundUpSize(UPInt size)
nuclear@1 42 { return (size + AlignMask) & ~(UPInt)AlignMask; }
nuclear@1 43
nuclear@1 44 public:
nuclear@1 45
nuclear@1 46 CircularBuffer(UPInt size)
nuclear@1 47 : Size(size), Tail(0), Head(0), End(0)
nuclear@1 48 {
nuclear@1 49 pBuffer = (UByte*)OVR_ALLOC_ALIGNED(roundUpSize(size), AlignSize);
nuclear@1 50 }
nuclear@1 51 ~CircularBuffer()
nuclear@1 52 {
nuclear@1 53 // For ThreadCommands, we must consume everything before shutdown.
nuclear@1 54 OVR_ASSERT(IsEmpty());
nuclear@1 55 OVR_FREE_ALIGNED(pBuffer);
nuclear@1 56 }
nuclear@1 57
nuclear@1 58 bool IsEmpty() const { return (Head == Tail); }
nuclear@1 59
nuclear@1 60 // Allocates a state block of specified size and advances pointers,
nuclear@1 61 // returning 0 if buffer is full.
nuclear@1 62 UByte* Write(UPInt size);
nuclear@1 63
nuclear@1 64 // Returns a pointer to next available data block; 0 if none available.
nuclear@1 65 UByte* ReadBegin()
nuclear@1 66 { return (Head != Tail) ? (pBuffer + Tail) : 0; }
nuclear@1 67 // Consumes data of specified size; this must match size passed to Write.
nuclear@1 68 void ReadEnd(UPInt size);
nuclear@1 69 };
nuclear@1 70
nuclear@1 71
nuclear@1 72 // Allocates a state block of specified size and advances pointers,
nuclear@1 73 // returning 0 if buffer is full.
nuclear@1 74 UByte* CircularBuffer::Write(UPInt size)
nuclear@1 75 {
nuclear@1 76 UByte* p = 0;
nuclear@1 77
nuclear@1 78 size = roundUpSize(size);
nuclear@1 79 // Since this is circular buffer, always allow at least one item.
nuclear@1 80 OVR_ASSERT(size < Size/2);
nuclear@1 81
nuclear@1 82 if (Head >= Tail)
nuclear@1 83 {
nuclear@1 84 OVR_ASSERT(End == 0);
nuclear@1 85
nuclear@1 86 if (size <= (Size - Head))
nuclear@1 87 {
nuclear@1 88 p = pBuffer + Head;
nuclear@1 89 Head += size;
nuclear@1 90 }
nuclear@1 91 else if (size < Tail)
nuclear@1 92 {
nuclear@1 93 p = pBuffer;
nuclear@1 94 End = Head;
nuclear@1 95 Head = size;
nuclear@1 96 OVR_ASSERT(Head != Tail);
nuclear@1 97 }
nuclear@1 98 }
nuclear@1 99 else
nuclear@1 100 {
nuclear@1 101 OVR_ASSERT(End != 0);
nuclear@1 102
nuclear@1 103 if ((Tail - Head) > size)
nuclear@1 104 {
nuclear@1 105 p = pBuffer + Head;
nuclear@1 106 Head += size;
nuclear@1 107 OVR_ASSERT(Head != Tail);
nuclear@1 108 }
nuclear@1 109 }
nuclear@1 110
nuclear@1 111 return p;
nuclear@1 112 }
nuclear@1 113
nuclear@1 114 void CircularBuffer::ReadEnd(UPInt size)
nuclear@1 115 {
nuclear@1 116 OVR_ASSERT(Head != Tail);
nuclear@1 117 size = roundUpSize(size);
nuclear@1 118
nuclear@1 119 Tail += size;
nuclear@1 120 if (Tail == End)
nuclear@1 121 {
nuclear@1 122 Tail = End = 0;
nuclear@1 123 }
nuclear@1 124 else if (Tail == Head)
nuclear@1 125 {
nuclear@1 126 OVR_ASSERT(End == 0);
nuclear@1 127 Tail = Head = 0;
nuclear@1 128 }
nuclear@1 129 }
nuclear@1 130
nuclear@1 131
nuclear@1 132 //-------------------------------------------------------------------------------------
nuclear@1 133 // ***** ThreadCommand
nuclear@1 134
nuclear@1 135 ThreadCommand::PopBuffer::~PopBuffer()
nuclear@1 136 {
nuclear@1 137 if (Size)
nuclear@1 138 Destruct<ThreadCommand>(toCommand());
nuclear@1 139 }
nuclear@1 140
nuclear@1 141 void ThreadCommand::PopBuffer::InitFromBuffer(void* data)
nuclear@1 142 {
nuclear@1 143 ThreadCommand* cmd = (ThreadCommand*)data;
nuclear@1 144 OVR_ASSERT(cmd->Size <= MaxSize);
nuclear@1 145
nuclear@1 146 if (Size)
nuclear@1 147 Destruct<ThreadCommand>(toCommand());
nuclear@1 148 Size = cmd->Size;
nuclear@1 149 memcpy(Buffer, (void*)cmd, Size);
nuclear@1 150 }
nuclear@1 151
nuclear@1 152 void ThreadCommand::PopBuffer::Execute()
nuclear@1 153 {
nuclear@1 154 ThreadCommand* command = toCommand();
nuclear@1 155 OVR_ASSERT(command);
nuclear@1 156 command->Execute();
nuclear@1 157 if (NeedsWait())
nuclear@1 158 GetEvent()->PulseEvent();
nuclear@1 159 }
nuclear@1 160
nuclear@1 161 //-------------------------------------------------------------------------------------
nuclear@1 162
nuclear@1 163 class ThreadCommandQueueImpl : public NewOverrideBase
nuclear@1 164 {
nuclear@1 165 typedef ThreadCommand::NotifyEvent NotifyEvent;
nuclear@1 166 friend class ThreadCommandQueue;
nuclear@1 167
nuclear@1 168 public:
nuclear@1 169
nuclear@1 170 ThreadCommandQueueImpl(ThreadCommandQueue* queue)
nuclear@1 171 : pQueue(queue), CommandBuffer(2048),
nuclear@1 172 ExitEnqueued(false), ExitProcessed(false)
nuclear@1 173 {
nuclear@1 174 }
nuclear@1 175 ~ThreadCommandQueueImpl();
nuclear@1 176
nuclear@1 177
nuclear@1 178 bool PushCommand(const ThreadCommand& command);
nuclear@1 179 bool PopCommand(ThreadCommand::PopBuffer* popBuffer);
nuclear@1 180
nuclear@1 181
nuclear@1 182 // ExitCommand is used by notify us that Thread is shutting down.
nuclear@1 183 struct ExitCommand : public ThreadCommand
nuclear@1 184 {
nuclear@1 185 ThreadCommandQueueImpl* pImpl;
nuclear@1 186
nuclear@1 187 ExitCommand(ThreadCommandQueueImpl* impl, bool wait)
nuclear@1 188 : ThreadCommand(sizeof(ExitCommand), wait, true), pImpl(impl) { }
nuclear@1 189
nuclear@1 190 virtual void Execute() const
nuclear@1 191 {
nuclear@1 192 Lock::Locker lock(&pImpl->QueueLock);
nuclear@1 193 pImpl->ExitProcessed = true;
nuclear@1 194 }
nuclear@1 195 virtual ThreadCommand* CopyConstruct(void* p) const
nuclear@1 196 { return Construct<ExitCommand>(p, *this); }
nuclear@1 197 };
nuclear@1 198
nuclear@1 199
nuclear@1 200 NotifyEvent* AllocNotifyEvent_NTS()
nuclear@1 201 {
nuclear@1 202 NotifyEvent* p = AvailableEvents.GetFirst();
nuclear@1 203
nuclear@1 204 if (!AvailableEvents.IsNull(p))
nuclear@1 205 p->RemoveNode();
nuclear@1 206 else
nuclear@1 207 p = new NotifyEvent;
nuclear@1 208 return p;
nuclear@1 209 }
nuclear@1 210
nuclear@1 211 void FreeNotifyEvent_NTS(NotifyEvent* p)
nuclear@1 212 {
nuclear@1 213 AvailableEvents.PushBack(p);
nuclear@1 214 }
nuclear@1 215
nuclear@1 216 void FreeNotifyEvents_NTS()
nuclear@1 217 {
nuclear@1 218 while(!AvailableEvents.IsEmpty())
nuclear@1 219 {
nuclear@1 220 NotifyEvent* p = AvailableEvents.GetFirst();
nuclear@1 221 p->RemoveNode();
nuclear@1 222 delete p;
nuclear@1 223 }
nuclear@1 224 }
nuclear@1 225
nuclear@1 226 ThreadCommandQueue* pQueue;
nuclear@1 227 Lock QueueLock;
nuclear@1 228 volatile bool ExitEnqueued;
nuclear@1 229 volatile bool ExitProcessed;
nuclear@1 230 List<NotifyEvent> AvailableEvents;
nuclear@1 231 List<NotifyEvent> BlockedProducers;
nuclear@1 232 CircularBuffer CommandBuffer;
nuclear@1 233 };
nuclear@1 234
nuclear@1 235
nuclear@1 236
nuclear@1 237 ThreadCommandQueueImpl::~ThreadCommandQueueImpl()
nuclear@1 238 {
nuclear@1 239 Lock::Locker lock(&QueueLock);
nuclear@1 240 OVR_ASSERT(BlockedProducers.IsEmpty());
nuclear@1 241 FreeNotifyEvents_NTS();
nuclear@1 242 }
nuclear@1 243
nuclear@1 244 bool ThreadCommandQueueImpl::PushCommand(const ThreadCommand& command)
nuclear@1 245 {
nuclear@1 246 ThreadCommand::NotifyEvent* completeEvent = 0;
nuclear@1 247 ThreadCommand::NotifyEvent* queueAvailableEvent = 0;
nuclear@1 248
nuclear@1 249 // Repeat writing command into buffer until it is available.
nuclear@1 250 do {
nuclear@1 251
nuclear@1 252 { // Lock Scope
nuclear@1 253 Lock::Locker lock(&QueueLock);
nuclear@1 254
nuclear@1 255 if (queueAvailableEvent)
nuclear@1 256 {
nuclear@1 257 FreeNotifyEvent_NTS(queueAvailableEvent);
nuclear@1 258 queueAvailableEvent = 0;
nuclear@1 259 }
nuclear@1 260
nuclear@1 261 // Don't allow any commands after PushExitCommand() is called.
nuclear@1 262 if (ExitEnqueued && !command.ExitFlag)
nuclear@1 263 return false;
nuclear@1 264
nuclear@1 265
nuclear@1 266 bool bufferWasEmpty = CommandBuffer.IsEmpty();
nuclear@1 267 UByte* buffer = CommandBuffer.Write(command.GetSize());
nuclear@1 268 if (buffer)
nuclear@1 269 {
nuclear@1 270 ThreadCommand* c = command.CopyConstruct(buffer);
nuclear@1 271 if (c->NeedsWait())
nuclear@1 272 completeEvent = c->pEvent = AllocNotifyEvent_NTS();
nuclear@1 273 // Signal-waker consumer when we add data to buffer.
nuclear@1 274 if (bufferWasEmpty)
nuclear@1 275 pQueue->OnPushNonEmpty_Locked();
nuclear@1 276 break;
nuclear@1 277 }
nuclear@1 278
nuclear@1 279 queueAvailableEvent = AllocNotifyEvent_NTS();
nuclear@1 280 BlockedProducers.PushBack(queueAvailableEvent);
nuclear@1 281 } // Lock Scope
nuclear@1 282
nuclear@1 283 queueAvailableEvent->Wait();
nuclear@1 284
nuclear@1 285 } while(1);
nuclear@1 286
nuclear@1 287 // Command was enqueued, wait if necessary.
nuclear@1 288 if (completeEvent)
nuclear@1 289 {
nuclear@1 290 completeEvent->Wait();
nuclear@1 291 Lock::Locker lock(&QueueLock);
nuclear@1 292 FreeNotifyEvent_NTS(completeEvent);
nuclear@1 293 }
nuclear@1 294
nuclear@1 295 return true;
nuclear@1 296 }
nuclear@1 297
nuclear@1 298
nuclear@1 299 // Pops the next command from the thread queue, if any is available.
nuclear@1 300 bool ThreadCommandQueueImpl::PopCommand(ThreadCommand::PopBuffer* popBuffer)
nuclear@1 301 {
nuclear@1 302 Lock::Locker lock(&QueueLock);
nuclear@1 303
nuclear@1 304 UByte* buffer = CommandBuffer.ReadBegin();
nuclear@1 305 if (!buffer)
nuclear@1 306 {
nuclear@1 307 // Notify thread while in lock scope, enabling initialization of wait.
nuclear@1 308 pQueue->OnPopEmpty_Locked();
nuclear@1 309 return false;
nuclear@1 310 }
nuclear@1 311
nuclear@1 312 popBuffer->InitFromBuffer(buffer);
nuclear@1 313 CommandBuffer.ReadEnd(popBuffer->GetSize());
nuclear@1 314
nuclear@1 315 if (!BlockedProducers.IsEmpty())
nuclear@1 316 {
nuclear@1 317 ThreadCommand::NotifyEvent* queueAvailableEvent = BlockedProducers.GetFirst();
nuclear@1 318 queueAvailableEvent->RemoveNode();
nuclear@1 319 queueAvailableEvent->PulseEvent();
nuclear@1 320 // Event is freed later by waiter.
nuclear@1 321 }
nuclear@1 322 return true;
nuclear@1 323 }
nuclear@1 324
nuclear@1 325
nuclear@1 326 //-------------------------------------------------------------------------------------
nuclear@1 327
nuclear@1 328 ThreadCommandQueue::ThreadCommandQueue()
nuclear@1 329 {
nuclear@1 330 pImpl = new ThreadCommandQueueImpl(this);
nuclear@1 331 }
nuclear@1 332 ThreadCommandQueue::~ThreadCommandQueue()
nuclear@1 333 {
nuclear@1 334 delete pImpl;
nuclear@1 335 }
nuclear@1 336
nuclear@1 337 bool ThreadCommandQueue::PushCommand(const ThreadCommand& command)
nuclear@1 338 {
nuclear@1 339 return pImpl->PushCommand(command);
nuclear@1 340 }
nuclear@1 341
nuclear@1 342 bool ThreadCommandQueue::PopCommand(ThreadCommand::PopBuffer* popBuffer)
nuclear@1 343 {
nuclear@1 344 return pImpl->PopCommand(popBuffer);
nuclear@1 345 }
nuclear@1 346
nuclear@1 347 void ThreadCommandQueue::PushExitCommand(bool wait)
nuclear@1 348 {
nuclear@1 349 // Exit is processed in two stages:
nuclear@1 350 // - First, ExitEnqueued flag is set to block further commands from queuing up.
nuclear@1 351 // - Second, the actual exit call is processed on the consumer thread, flushing
nuclear@1 352 // any prior commands.
nuclear@1 353 // IsExiting() only returns true after exit has flushed.
nuclear@1 354 {
nuclear@1 355 Lock::Locker lock(&pImpl->QueueLock);
nuclear@1 356 if (pImpl->ExitEnqueued)
nuclear@1 357 return;
nuclear@1 358 pImpl->ExitEnqueued = true;
nuclear@1 359 }
nuclear@1 360
nuclear@1 361 PushCommand(ThreadCommandQueueImpl::ExitCommand(pImpl, wait));
nuclear@1 362 }
nuclear@1 363
nuclear@1 364 bool ThreadCommandQueue::IsExiting() const
nuclear@1 365 {
nuclear@1 366 return pImpl->ExitProcessed;
nuclear@1 367 }
nuclear@1 368
nuclear@1 369
nuclear@1 370 } // namespace OVR