ovr_sdk

annotate LibOVR/Src/Kernel/OVR_ThreadCommandQueue.cpp @ 0:1b39a1b46319

initial 0.4.4
author John Tsiombikas <nuclear@member.fsf.org>
date Wed, 14 Jan 2015 06:51:16 +0200
parents
children
rev   line source
nuclear@0 1 /************************************************************************************
nuclear@0 2
nuclear@0 3 PublicHeader: None
nuclear@0 4 Filename : OVR_ThreadCommandQueue.cpp
nuclear@0 5 Content : Command queue for operations executed on a thread
nuclear@0 6 Created : October 29, 2012
nuclear@0 7
nuclear@0 8 Copyright : Copyright 2014 Oculus VR, LLC All Rights reserved.
nuclear@0 9
nuclear@0 10 Licensed under the Oculus VR Rift SDK License Version 3.2 (the "License");
nuclear@0 11 you may not use the Oculus VR Rift SDK except in compliance with the License,
nuclear@0 12 which is provided at the time of installation or download, or which
nuclear@0 13 otherwise accompanies this software in either electronic or hard copy form.
nuclear@0 14
nuclear@0 15 You may obtain a copy of the License at
nuclear@0 16
nuclear@0 17 http://www.oculusvr.com/licenses/LICENSE-3.2
nuclear@0 18
nuclear@0 19 Unless required by applicable law or agreed to in writing, the Oculus VR SDK
nuclear@0 20 distributed under the License is distributed on an "AS IS" BASIS,
nuclear@0 21 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
nuclear@0 22 See the License for the specific language governing permissions and
nuclear@0 23 limitations under the License.
nuclear@0 24
nuclear@0 25 ************************************************************************************/
nuclear@0 26
nuclear@0 27 #include "OVR_ThreadCommandQueue.h"
nuclear@0 28
nuclear@0 29 namespace OVR {
nuclear@0 30
nuclear@0 31
nuclear@0 32 //------------------------------------------------------------------------
nuclear@0 33 // ***** CircularBuffer
nuclear@0 34
nuclear@0 35 // CircularBuffer is a FIFO buffer implemented in a single block of memory,
nuclear@0 36 // which allows writing and reading variable-size data chucks. Write fails
nuclear@0 37 // if buffer is full.
nuclear@0 38
nuclear@0 39 class CircularBuffer
nuclear@0 40 {
nuclear@0 41 enum {
nuclear@0 42 AlignSize = 16,
nuclear@0 43 AlignMask = AlignSize - 1
nuclear@0 44 };
nuclear@0 45
nuclear@0 46 uint8_t* pBuffer;
nuclear@0 47 size_t Size;
nuclear@0 48 size_t Tail; // Byte offset of next item to be popped.
nuclear@0 49 size_t Head; // Byte offset of where next push will take place.
nuclear@0 50 size_t End; // When Head < Tail, this is used instead of Size.
nuclear@0 51
nuclear@0 52 inline size_t roundUpSize(size_t size)
nuclear@0 53 { return (size + AlignMask) & ~(size_t)AlignMask; }
nuclear@0 54
nuclear@0 55 public:
nuclear@0 56
nuclear@0 57 CircularBuffer(size_t size)
nuclear@0 58 : Size(size), Tail(0), Head(0), End(0)
nuclear@0 59 {
nuclear@0 60 pBuffer = (uint8_t*)OVR_ALLOC_ALIGNED(roundUpSize(size), AlignSize);
nuclear@0 61 }
nuclear@0 62 ~CircularBuffer()
nuclear@0 63 {
nuclear@0 64 // For ThreadCommands, we must consume everything before shutdown.
nuclear@0 65 OVR_ASSERT(IsEmpty());
nuclear@0 66 OVR_FREE_ALIGNED(pBuffer);
nuclear@0 67 }
nuclear@0 68
nuclear@0 69 bool IsEmpty() const { return (Head == Tail); }
nuclear@0 70
nuclear@0 71 // Allocates a state block of specified size and advances pointers,
nuclear@0 72 // returning 0 if buffer is full.
nuclear@0 73 uint8_t* Write(size_t size);
nuclear@0 74
nuclear@0 75 // Returns a pointer to next available data block; 0 if none available.
nuclear@0 76 uint8_t* ReadBegin()
nuclear@0 77 { return (Head != Tail) ? (pBuffer + Tail) : 0; }
nuclear@0 78 // Consumes data of specified size; this must match size passed to Write.
nuclear@0 79 void ReadEnd(size_t size);
nuclear@0 80 };
nuclear@0 81
nuclear@0 82
nuclear@0 83 // Allocates a state block of specified size and advances pointers,
nuclear@0 84 // returning 0 if buffer is full.
nuclear@0 85 uint8_t* CircularBuffer::Write(size_t size)
nuclear@0 86 {
nuclear@0 87 uint8_t* p = 0;
nuclear@0 88
nuclear@0 89 size = roundUpSize(size);
nuclear@0 90 // Since this is circular buffer, always allow at least one item.
nuclear@0 91 OVR_ASSERT(size < Size/2);
nuclear@0 92
nuclear@0 93 if (Head >= Tail)
nuclear@0 94 {
nuclear@0 95 OVR_ASSERT(End == 0);
nuclear@0 96
nuclear@0 97 if (size <= (Size - Head))
nuclear@0 98 {
nuclear@0 99 p = pBuffer + Head;
nuclear@0 100 Head += size;
nuclear@0 101 }
nuclear@0 102 else if (size < Tail)
nuclear@0 103 {
nuclear@0 104 p = pBuffer;
nuclear@0 105 End = Head;
nuclear@0 106 Head = size;
nuclear@0 107 OVR_ASSERT(Head != Tail);
nuclear@0 108 }
nuclear@0 109 }
nuclear@0 110 else
nuclear@0 111 {
nuclear@0 112 OVR_ASSERT(End != 0);
nuclear@0 113
nuclear@0 114 if ((Tail - Head) > size)
nuclear@0 115 {
nuclear@0 116 p = pBuffer + Head;
nuclear@0 117 Head += size;
nuclear@0 118 OVR_ASSERT(Head != Tail);
nuclear@0 119 }
nuclear@0 120 }
nuclear@0 121
nuclear@0 122 return p;
nuclear@0 123 }
nuclear@0 124
nuclear@0 125 void CircularBuffer::ReadEnd(size_t size)
nuclear@0 126 {
nuclear@0 127 OVR_ASSERT(Head != Tail);
nuclear@0 128 size = roundUpSize(size);
nuclear@0 129
nuclear@0 130 Tail += size;
nuclear@0 131 if (Tail == End)
nuclear@0 132 {
nuclear@0 133 Tail = End = 0;
nuclear@0 134 }
nuclear@0 135 else if (Tail == Head)
nuclear@0 136 {
nuclear@0 137 OVR_ASSERT(End == 0);
nuclear@0 138 Tail = Head = 0;
nuclear@0 139 }
nuclear@0 140 }
nuclear@0 141
nuclear@0 142
nuclear@0 143 //-------------------------------------------------------------------------------------
nuclear@0 144 // ***** ThreadCommand
nuclear@0 145
nuclear@0 146 ThreadCommand::PopBuffer::~PopBuffer()
nuclear@0 147 {
nuclear@0 148 if (Size) {
nuclear@0 149 Destruct<ThreadCommand>(toCommand());
nuclear@0 150 }
nuclear@0 151 }
nuclear@0 152
nuclear@0 153 void ThreadCommand::PopBuffer::InitFromBuffer(void* data)
nuclear@0 154 {
nuclear@0 155 ThreadCommand* cmd = (ThreadCommand*)data;
nuclear@0 156 OVR_ASSERT(cmd->Size <= MaxSize);
nuclear@0 157
nuclear@0 158 if (Size) {
nuclear@0 159 Destruct<ThreadCommand>(toCommand());
nuclear@0 160 }
nuclear@0 161 Size = cmd->Size;
nuclear@0 162 memcpy(Buffer, (void*)cmd, Size);
nuclear@0 163 }
nuclear@0 164
nuclear@0 165 void ThreadCommand::PopBuffer::Execute()
nuclear@0 166 {
nuclear@0 167 ThreadCommand* command = toCommand();
nuclear@0 168 OVR_ASSERT(command);
nuclear@0 169 command->Execute();
nuclear@0 170 if (NeedsWait()) {
nuclear@0 171 GetEvent()->PulseEvent();
nuclear@0 172 }
nuclear@0 173 }
nuclear@0 174
nuclear@0 175 //-------------------------------------------------------------------------------------
nuclear@0 176
nuclear@0 177 class ThreadCommandQueueImpl : public NewOverrideBase
nuclear@0 178 {
nuclear@0 179 typedef ThreadCommand::NotifyEvent NotifyEvent;
nuclear@0 180 friend class ThreadCommandQueue;
nuclear@0 181
nuclear@0 182 public:
nuclear@0 183
nuclear@0 184 ThreadCommandQueueImpl(ThreadCommandQueue* queue) :
nuclear@0 185 pQueue(queue),
nuclear@0 186 ExitEnqueued(false),
nuclear@0 187 ExitProcessed(false),
nuclear@0 188 CommandBuffer(2048),
nuclear@0 189 PullThreadId(0)
nuclear@0 190 {
nuclear@0 191 }
nuclear@0 192 ~ThreadCommandQueueImpl();
nuclear@0 193
nuclear@0 194
nuclear@0 195 bool PushCommand(const ThreadCommand& command);
nuclear@0 196 bool PopCommand(ThreadCommand::PopBuffer* popBuffer);
nuclear@0 197
nuclear@0 198
nuclear@0 199 // ExitCommand is used by notify us that Thread is shutting down.
nuclear@0 200 struct ExitCommand : public ThreadCommand
nuclear@0 201 {
nuclear@0 202 ThreadCommandQueueImpl* pImpl;
nuclear@0 203
nuclear@0 204 ExitCommand(ThreadCommandQueueImpl* impl, bool wait)
nuclear@0 205 : ThreadCommand(sizeof(ExitCommand), wait, true), pImpl(impl) { }
nuclear@0 206
nuclear@0 207 virtual void Execute() const
nuclear@0 208 {
nuclear@0 209 Lock::Locker lock(&pImpl->QueueLock);
nuclear@0 210 pImpl->ExitProcessed = true;
nuclear@0 211 }
nuclear@0 212 virtual ThreadCommand* CopyConstruct(void* p) const
nuclear@0 213 { return Construct<ExitCommand>(p, *this); }
nuclear@0 214 };
nuclear@0 215
nuclear@0 216
nuclear@0 217 NotifyEvent* AllocNotifyEvent_NTS()
nuclear@0 218 {
nuclear@0 219 NotifyEvent* p = AvailableEvents.GetFirst();
nuclear@0 220
nuclear@0 221 if (!AvailableEvents.IsNull(p))
nuclear@0 222 p->RemoveNode();
nuclear@0 223 else
nuclear@0 224 p = new NotifyEvent;
nuclear@0 225 return p;
nuclear@0 226 }
nuclear@0 227
nuclear@0 228 void FreeNotifyEvent_NTS(NotifyEvent* p)
nuclear@0 229 {
nuclear@0 230 AvailableEvents.PushBack(p);
nuclear@0 231 }
nuclear@0 232
nuclear@0 233 void FreeNotifyEvents_NTS()
nuclear@0 234 {
nuclear@0 235 while(!AvailableEvents.IsEmpty())
nuclear@0 236 {
nuclear@0 237 NotifyEvent* p = AvailableEvents.GetFirst();
nuclear@0 238 p->RemoveNode();
nuclear@0 239 delete p;
nuclear@0 240 }
nuclear@0 241 }
nuclear@0 242
nuclear@0 243 ThreadCommandQueue* pQueue;
nuclear@0 244 Lock QueueLock;
nuclear@0 245 volatile bool ExitEnqueued;
nuclear@0 246 volatile bool ExitProcessed;
nuclear@0 247 List<NotifyEvent> AvailableEvents;
nuclear@0 248 List<NotifyEvent> BlockedProducers;
nuclear@0 249 CircularBuffer CommandBuffer;
nuclear@0 250
nuclear@0 251 // The pull thread id is set to the last thread that pulled commands.
nuclear@0 252 // Since this thread command queue is designed for a single thread,
nuclear@0 253 // reentrant behavior that would cause a dead-lock for messages that
nuclear@0 254 // wait for completion can be avoided by simply comparing the
nuclear@0 255 // thread id of the last pull.
nuclear@0 256 OVR::ThreadId PullThreadId;
nuclear@0 257 };
nuclear@0 258
nuclear@0 259 ThreadCommandQueueImpl::~ThreadCommandQueueImpl()
nuclear@0 260 {
nuclear@0 261 Lock::Locker lock(&QueueLock);
nuclear@0 262 OVR_ASSERT(BlockedProducers.IsEmpty());
nuclear@0 263 FreeNotifyEvents_NTS();
nuclear@0 264 }
nuclear@0 265
nuclear@0 266 bool ThreadCommandQueueImpl::PushCommand(const ThreadCommand& command)
nuclear@0 267 {
nuclear@0 268 if (command.NeedsWait() && PullThreadId == OVR::GetCurrentThreadId())
nuclear@0 269 {
nuclear@0 270 command.Execute();
nuclear@0 271 return true;
nuclear@0 272 }
nuclear@0 273
nuclear@0 274 ThreadCommand::NotifyEvent* completeEvent = 0;
nuclear@0 275 ThreadCommand::NotifyEvent* queueAvailableEvent = 0;
nuclear@0 276
nuclear@0 277 // Repeat writing command into buffer until it is available.
nuclear@0 278 for (;;) {
nuclear@0 279 { // Lock Scope
nuclear@0 280 Lock::Locker lock(&QueueLock);
nuclear@0 281
nuclear@0 282 if (queueAvailableEvent) {
nuclear@0 283 FreeNotifyEvent_NTS(queueAvailableEvent);
nuclear@0 284 queueAvailableEvent = 0;
nuclear@0 285 }
nuclear@0 286
nuclear@0 287 // Don't allow any commands after PushExitCommand() is called.
nuclear@0 288 if (ExitEnqueued && !command.ExitFlag) {
nuclear@0 289 return false;
nuclear@0 290 }
nuclear@0 291
nuclear@0 292 bool bufferWasEmpty = CommandBuffer.IsEmpty();
nuclear@0 293 uint8_t* buffer = CommandBuffer.Write(command.GetSize());
nuclear@0 294
nuclear@0 295 if (buffer) {
nuclear@0 296 ThreadCommand* c = command.CopyConstruct(buffer);
nuclear@0 297
nuclear@0 298 if (c->NeedsWait()) {
nuclear@0 299 completeEvent = c->pEvent = AllocNotifyEvent_NTS();
nuclear@0 300 }
nuclear@0 301
nuclear@0 302 // Signal-waker consumer when we add data to buffer.
nuclear@0 303 if (bufferWasEmpty) {
nuclear@0 304 pQueue->OnPushNonEmpty_Locked();
nuclear@0 305 }
nuclear@0 306
nuclear@0 307 break;
nuclear@0 308 }
nuclear@0 309
nuclear@0 310 queueAvailableEvent = AllocNotifyEvent_NTS();
nuclear@0 311 BlockedProducers.PushBack(queueAvailableEvent);
nuclear@0 312 } // Lock Scope
nuclear@0 313
nuclear@0 314 queueAvailableEvent->Wait();
nuclear@0 315 } // Intentional infinite loop
nuclear@0 316
nuclear@0 317 // Command was enqueued, wait if necessary.
nuclear@0 318 if (completeEvent) {
nuclear@0 319 completeEvent->Wait();
nuclear@0 320 Lock::Locker lock(&QueueLock);
nuclear@0 321 FreeNotifyEvent_NTS(completeEvent);
nuclear@0 322 }
nuclear@0 323
nuclear@0 324 return true;
nuclear@0 325 }
nuclear@0 326
nuclear@0 327
nuclear@0 328 // Pops the next command from the thread queue, if any is available.
nuclear@0 329 bool ThreadCommandQueueImpl::PopCommand(ThreadCommand::PopBuffer* popBuffer)
nuclear@0 330 {
nuclear@0 331 PullThreadId = OVR::GetCurrentThreadId();
nuclear@0 332
nuclear@0 333 Lock::Locker lock(&QueueLock);
nuclear@0 334
nuclear@0 335 uint8_t* buffer = CommandBuffer.ReadBegin();
nuclear@0 336 if (!buffer)
nuclear@0 337 {
nuclear@0 338 // Notify thread while in lock scope, enabling initialization of wait.
nuclear@0 339 pQueue->OnPopEmpty_Locked();
nuclear@0 340 return false;
nuclear@0 341 }
nuclear@0 342
nuclear@0 343 popBuffer->InitFromBuffer(buffer);
nuclear@0 344 CommandBuffer.ReadEnd(popBuffer->GetSize());
nuclear@0 345
nuclear@0 346 if (!BlockedProducers.IsEmpty())
nuclear@0 347 {
nuclear@0 348 ThreadCommand::NotifyEvent* queueAvailableEvent = BlockedProducers.GetFirst();
nuclear@0 349 queueAvailableEvent->RemoveNode();
nuclear@0 350 queueAvailableEvent->PulseEvent();
nuclear@0 351 // Event is freed later by waiter.
nuclear@0 352 }
nuclear@0 353 return true;
nuclear@0 354 }
nuclear@0 355
nuclear@0 356
nuclear@0 357 //-------------------------------------------------------------------------------------
nuclear@0 358
nuclear@0 359 ThreadCommandQueue::ThreadCommandQueue()
nuclear@0 360 {
nuclear@0 361 pImpl = new ThreadCommandQueueImpl(this);
nuclear@0 362 }
nuclear@0 363 ThreadCommandQueue::~ThreadCommandQueue()
nuclear@0 364 {
nuclear@0 365 delete pImpl;
nuclear@0 366 }
nuclear@0 367
nuclear@0 368 bool ThreadCommandQueue::PushCommand(const ThreadCommand& command)
nuclear@0 369 {
nuclear@0 370 return pImpl->PushCommand(command);
nuclear@0 371 }
nuclear@0 372
nuclear@0 373 bool ThreadCommandQueue::PopCommand(ThreadCommand::PopBuffer* popBuffer)
nuclear@0 374 {
nuclear@0 375 return pImpl->PopCommand(popBuffer);
nuclear@0 376 }
nuclear@0 377
nuclear@0 378 void ThreadCommandQueue::PushExitCommand(bool wait)
nuclear@0 379 {
nuclear@0 380 // Exit is processed in two stages:
nuclear@0 381 // - First, ExitEnqueued flag is set to block further commands from queuing up.
nuclear@0 382 // - Second, the actual exit call is processed on the consumer thread, flushing
nuclear@0 383 // any prior commands.
nuclear@0 384 // IsExiting() only returns true after exit has flushed.
nuclear@0 385 {
nuclear@0 386 Lock::Locker lock(&pImpl->QueueLock);
nuclear@0 387 if (pImpl->ExitEnqueued)
nuclear@0 388 return;
nuclear@0 389 pImpl->ExitEnqueued = true;
nuclear@0 390 }
nuclear@0 391
nuclear@0 392 PushCommand(ThreadCommandQueueImpl::ExitCommand(pImpl, wait));
nuclear@0 393 }
nuclear@0 394
nuclear@0 395 bool ThreadCommandQueue::IsExiting() const
nuclear@0 396 {
nuclear@0 397 return pImpl->ExitProcessed;
nuclear@0 398 }
nuclear@0 399
nuclear@0 400
nuclear@0 401 } // namespace OVR