nuclear@0: /************************************************************************************ nuclear@0: nuclear@0: PublicHeader: None nuclear@0: Filename : OVR_ThreadCommandQueue.cpp nuclear@0: Content : Command queue for operations executed on a thread nuclear@0: Created : October 29, 2012 nuclear@0: nuclear@0: Copyright : Copyright 2014 Oculus VR, LLC All Rights reserved. nuclear@0: nuclear@0: Licensed under the Oculus VR Rift SDK License Version 3.2 (the "License"); nuclear@0: you may not use the Oculus VR Rift SDK except in compliance with the License, nuclear@0: which is provided at the time of installation or download, or which nuclear@0: otherwise accompanies this software in either electronic or hard copy form. nuclear@0: nuclear@0: You may obtain a copy of the License at nuclear@0: nuclear@0: http://www.oculusvr.com/licenses/LICENSE-3.2 nuclear@0: nuclear@0: Unless required by applicable law or agreed to in writing, the Oculus VR SDK nuclear@0: distributed under the License is distributed on an "AS IS" BASIS, nuclear@0: WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. nuclear@0: See the License for the specific language governing permissions and nuclear@0: limitations under the License. nuclear@0: nuclear@0: ************************************************************************************/ nuclear@0: nuclear@0: #include "OVR_ThreadCommandQueue.h" nuclear@0: nuclear@0: namespace OVR { nuclear@0: nuclear@0: nuclear@0: //------------------------------------------------------------------------ nuclear@0: // ***** CircularBuffer nuclear@0: nuclear@0: // CircularBuffer is a FIFO buffer implemented in a single block of memory, nuclear@0: // which allows writing and reading variable-size data chucks. Write fails nuclear@0: // if buffer is full. nuclear@0: nuclear@0: class CircularBuffer nuclear@0: { nuclear@0: enum { nuclear@0: AlignSize = 16, nuclear@0: AlignMask = AlignSize - 1 nuclear@0: }; nuclear@0: nuclear@0: uint8_t* pBuffer; nuclear@0: size_t Size; nuclear@0: size_t Tail; // Byte offset of next item to be popped. nuclear@0: size_t Head; // Byte offset of where next push will take place. nuclear@0: size_t End; // When Head < Tail, this is used instead of Size. nuclear@0: nuclear@0: inline size_t roundUpSize(size_t size) nuclear@0: { return (size + AlignMask) & ~(size_t)AlignMask; } nuclear@0: nuclear@0: public: nuclear@0: nuclear@0: CircularBuffer(size_t size) nuclear@0: : Size(size), Tail(0), Head(0), End(0) nuclear@0: { nuclear@0: pBuffer = (uint8_t*)OVR_ALLOC_ALIGNED(roundUpSize(size), AlignSize); nuclear@0: } nuclear@0: ~CircularBuffer() nuclear@0: { nuclear@0: // For ThreadCommands, we must consume everything before shutdown. nuclear@0: OVR_ASSERT(IsEmpty()); nuclear@0: OVR_FREE_ALIGNED(pBuffer); nuclear@0: } nuclear@0: nuclear@0: bool IsEmpty() const { return (Head == Tail); } nuclear@0: nuclear@0: // Allocates a state block of specified size and advances pointers, nuclear@0: // returning 0 if buffer is full. nuclear@0: uint8_t* Write(size_t size); nuclear@0: nuclear@0: // Returns a pointer to next available data block; 0 if none available. nuclear@0: uint8_t* ReadBegin() nuclear@0: { return (Head != Tail) ? (pBuffer + Tail) : 0; } nuclear@0: // Consumes data of specified size; this must match size passed to Write. nuclear@0: void ReadEnd(size_t size); nuclear@0: }; nuclear@0: nuclear@0: nuclear@0: // Allocates a state block of specified size and advances pointers, nuclear@0: // returning 0 if buffer is full. nuclear@0: uint8_t* CircularBuffer::Write(size_t size) nuclear@0: { nuclear@0: uint8_t* p = 0; nuclear@0: nuclear@0: size = roundUpSize(size); nuclear@0: // Since this is circular buffer, always allow at least one item. nuclear@0: OVR_ASSERT(size < Size/2); nuclear@0: nuclear@0: if (Head >= Tail) nuclear@0: { nuclear@0: OVR_ASSERT(End == 0); nuclear@0: nuclear@0: if (size <= (Size - Head)) nuclear@0: { nuclear@0: p = pBuffer + Head; nuclear@0: Head += size; nuclear@0: } nuclear@0: else if (size < Tail) nuclear@0: { nuclear@0: p = pBuffer; nuclear@0: End = Head; nuclear@0: Head = size; nuclear@0: OVR_ASSERT(Head != Tail); nuclear@0: } nuclear@0: } nuclear@0: else nuclear@0: { nuclear@0: OVR_ASSERT(End != 0); nuclear@0: nuclear@0: if ((Tail - Head) > size) nuclear@0: { nuclear@0: p = pBuffer + Head; nuclear@0: Head += size; nuclear@0: OVR_ASSERT(Head != Tail); nuclear@0: } nuclear@0: } nuclear@0: nuclear@0: return p; nuclear@0: } nuclear@0: nuclear@0: void CircularBuffer::ReadEnd(size_t size) nuclear@0: { nuclear@0: OVR_ASSERT(Head != Tail); nuclear@0: size = roundUpSize(size); nuclear@0: nuclear@0: Tail += size; nuclear@0: if (Tail == End) nuclear@0: { nuclear@0: Tail = End = 0; nuclear@0: } nuclear@0: else if (Tail == Head) nuclear@0: { nuclear@0: OVR_ASSERT(End == 0); nuclear@0: Tail = Head = 0; nuclear@0: } nuclear@0: } nuclear@0: nuclear@0: nuclear@0: //------------------------------------------------------------------------------------- nuclear@0: // ***** ThreadCommand nuclear@0: nuclear@0: ThreadCommand::PopBuffer::~PopBuffer() nuclear@0: { nuclear@0: if (Size) { nuclear@0: Destruct(toCommand()); nuclear@0: } nuclear@0: } nuclear@0: nuclear@0: void ThreadCommand::PopBuffer::InitFromBuffer(void* data) nuclear@0: { nuclear@0: ThreadCommand* cmd = (ThreadCommand*)data; nuclear@0: OVR_ASSERT(cmd->Size <= MaxSize); nuclear@0: nuclear@0: if (Size) { nuclear@0: Destruct(toCommand()); nuclear@0: } nuclear@0: Size = cmd->Size; nuclear@0: memcpy(Buffer, (void*)cmd, Size); nuclear@0: } nuclear@0: nuclear@0: void ThreadCommand::PopBuffer::Execute() nuclear@0: { nuclear@0: ThreadCommand* command = toCommand(); nuclear@0: OVR_ASSERT(command); nuclear@0: command->Execute(); nuclear@0: if (NeedsWait()) { nuclear@0: GetEvent()->PulseEvent(); nuclear@0: } nuclear@0: } nuclear@0: nuclear@0: //------------------------------------------------------------------------------------- nuclear@0: nuclear@0: class ThreadCommandQueueImpl : public NewOverrideBase nuclear@0: { nuclear@0: typedef ThreadCommand::NotifyEvent NotifyEvent; nuclear@0: friend class ThreadCommandQueue; nuclear@0: nuclear@0: public: nuclear@0: nuclear@0: ThreadCommandQueueImpl(ThreadCommandQueue* queue) : nuclear@0: pQueue(queue), nuclear@0: ExitEnqueued(false), nuclear@0: ExitProcessed(false), nuclear@0: CommandBuffer(2048), nuclear@0: PullThreadId(0) nuclear@0: { nuclear@0: } nuclear@0: ~ThreadCommandQueueImpl(); nuclear@0: nuclear@0: nuclear@0: bool PushCommand(const ThreadCommand& command); nuclear@0: bool PopCommand(ThreadCommand::PopBuffer* popBuffer); nuclear@0: nuclear@0: nuclear@0: // ExitCommand is used by notify us that Thread is shutting down. nuclear@0: struct ExitCommand : public ThreadCommand nuclear@0: { nuclear@0: ThreadCommandQueueImpl* pImpl; nuclear@0: nuclear@0: ExitCommand(ThreadCommandQueueImpl* impl, bool wait) nuclear@0: : ThreadCommand(sizeof(ExitCommand), wait, true), pImpl(impl) { } nuclear@0: nuclear@0: virtual void Execute() const nuclear@0: { nuclear@0: Lock::Locker lock(&pImpl->QueueLock); nuclear@0: pImpl->ExitProcessed = true; nuclear@0: } nuclear@0: virtual ThreadCommand* CopyConstruct(void* p) const nuclear@0: { return Construct(p, *this); } nuclear@0: }; nuclear@0: nuclear@0: nuclear@0: NotifyEvent* AllocNotifyEvent_NTS() nuclear@0: { nuclear@0: NotifyEvent* p = AvailableEvents.GetFirst(); nuclear@0: nuclear@0: if (!AvailableEvents.IsNull(p)) nuclear@0: p->RemoveNode(); nuclear@0: else nuclear@0: p = new NotifyEvent; nuclear@0: return p; nuclear@0: } nuclear@0: nuclear@0: void FreeNotifyEvent_NTS(NotifyEvent* p) nuclear@0: { nuclear@0: AvailableEvents.PushBack(p); nuclear@0: } nuclear@0: nuclear@0: void FreeNotifyEvents_NTS() nuclear@0: { nuclear@0: while(!AvailableEvents.IsEmpty()) nuclear@0: { nuclear@0: NotifyEvent* p = AvailableEvents.GetFirst(); nuclear@0: p->RemoveNode(); nuclear@0: delete p; nuclear@0: } nuclear@0: } nuclear@0: nuclear@0: ThreadCommandQueue* pQueue; nuclear@0: Lock QueueLock; nuclear@0: volatile bool ExitEnqueued; nuclear@0: volatile bool ExitProcessed; nuclear@0: List AvailableEvents; nuclear@0: List BlockedProducers; nuclear@0: CircularBuffer CommandBuffer; nuclear@0: nuclear@0: // The pull thread id is set to the last thread that pulled commands. nuclear@0: // Since this thread command queue is designed for a single thread, nuclear@0: // reentrant behavior that would cause a dead-lock for messages that nuclear@0: // wait for completion can be avoided by simply comparing the nuclear@0: // thread id of the last pull. nuclear@0: OVR::ThreadId PullThreadId; nuclear@0: }; nuclear@0: nuclear@0: ThreadCommandQueueImpl::~ThreadCommandQueueImpl() nuclear@0: { nuclear@0: Lock::Locker lock(&QueueLock); nuclear@0: OVR_ASSERT(BlockedProducers.IsEmpty()); nuclear@0: FreeNotifyEvents_NTS(); nuclear@0: } nuclear@0: nuclear@0: bool ThreadCommandQueueImpl::PushCommand(const ThreadCommand& command) nuclear@0: { nuclear@0: if (command.NeedsWait() && PullThreadId == OVR::GetCurrentThreadId()) nuclear@0: { nuclear@0: command.Execute(); nuclear@0: return true; nuclear@0: } nuclear@0: nuclear@0: ThreadCommand::NotifyEvent* completeEvent = 0; nuclear@0: ThreadCommand::NotifyEvent* queueAvailableEvent = 0; nuclear@0: nuclear@0: // Repeat writing command into buffer until it is available. nuclear@0: for (;;) { nuclear@0: { // Lock Scope nuclear@0: Lock::Locker lock(&QueueLock); nuclear@0: nuclear@0: if (queueAvailableEvent) { nuclear@0: FreeNotifyEvent_NTS(queueAvailableEvent); nuclear@0: queueAvailableEvent = 0; nuclear@0: } nuclear@0: nuclear@0: // Don't allow any commands after PushExitCommand() is called. nuclear@0: if (ExitEnqueued && !command.ExitFlag) { nuclear@0: return false; nuclear@0: } nuclear@0: nuclear@0: bool bufferWasEmpty = CommandBuffer.IsEmpty(); nuclear@0: uint8_t* buffer = CommandBuffer.Write(command.GetSize()); nuclear@0: nuclear@0: if (buffer) { nuclear@0: ThreadCommand* c = command.CopyConstruct(buffer); nuclear@0: nuclear@0: if (c->NeedsWait()) { nuclear@0: completeEvent = c->pEvent = AllocNotifyEvent_NTS(); nuclear@0: } nuclear@0: nuclear@0: // Signal-waker consumer when we add data to buffer. nuclear@0: if (bufferWasEmpty) { nuclear@0: pQueue->OnPushNonEmpty_Locked(); nuclear@0: } nuclear@0: nuclear@0: break; nuclear@0: } nuclear@0: nuclear@0: queueAvailableEvent = AllocNotifyEvent_NTS(); nuclear@0: BlockedProducers.PushBack(queueAvailableEvent); nuclear@0: } // Lock Scope nuclear@0: nuclear@0: queueAvailableEvent->Wait(); nuclear@0: } // Intentional infinite loop nuclear@0: nuclear@0: // Command was enqueued, wait if necessary. nuclear@0: if (completeEvent) { nuclear@0: completeEvent->Wait(); nuclear@0: Lock::Locker lock(&QueueLock); nuclear@0: FreeNotifyEvent_NTS(completeEvent); nuclear@0: } nuclear@0: nuclear@0: return true; nuclear@0: } nuclear@0: nuclear@0: nuclear@0: // Pops the next command from the thread queue, if any is available. nuclear@0: bool ThreadCommandQueueImpl::PopCommand(ThreadCommand::PopBuffer* popBuffer) nuclear@0: { nuclear@0: PullThreadId = OVR::GetCurrentThreadId(); nuclear@0: nuclear@0: Lock::Locker lock(&QueueLock); nuclear@0: nuclear@0: uint8_t* buffer = CommandBuffer.ReadBegin(); nuclear@0: if (!buffer) nuclear@0: { nuclear@0: // Notify thread while in lock scope, enabling initialization of wait. nuclear@0: pQueue->OnPopEmpty_Locked(); nuclear@0: return false; nuclear@0: } nuclear@0: nuclear@0: popBuffer->InitFromBuffer(buffer); nuclear@0: CommandBuffer.ReadEnd(popBuffer->GetSize()); nuclear@0: nuclear@0: if (!BlockedProducers.IsEmpty()) nuclear@0: { nuclear@0: ThreadCommand::NotifyEvent* queueAvailableEvent = BlockedProducers.GetFirst(); nuclear@0: queueAvailableEvent->RemoveNode(); nuclear@0: queueAvailableEvent->PulseEvent(); nuclear@0: // Event is freed later by waiter. nuclear@0: } nuclear@0: return true; nuclear@0: } nuclear@0: nuclear@0: nuclear@0: //------------------------------------------------------------------------------------- nuclear@0: nuclear@0: ThreadCommandQueue::ThreadCommandQueue() nuclear@0: { nuclear@0: pImpl = new ThreadCommandQueueImpl(this); nuclear@0: } nuclear@0: ThreadCommandQueue::~ThreadCommandQueue() nuclear@0: { nuclear@0: delete pImpl; nuclear@0: } nuclear@0: nuclear@0: bool ThreadCommandQueue::PushCommand(const ThreadCommand& command) nuclear@0: { nuclear@0: return pImpl->PushCommand(command); nuclear@0: } nuclear@0: nuclear@0: bool ThreadCommandQueue::PopCommand(ThreadCommand::PopBuffer* popBuffer) nuclear@0: { nuclear@0: return pImpl->PopCommand(popBuffer); nuclear@0: } nuclear@0: nuclear@0: void ThreadCommandQueue::PushExitCommand(bool wait) nuclear@0: { nuclear@0: // Exit is processed in two stages: nuclear@0: // - First, ExitEnqueued flag is set to block further commands from queuing up. nuclear@0: // - Second, the actual exit call is processed on the consumer thread, flushing nuclear@0: // any prior commands. nuclear@0: // IsExiting() only returns true after exit has flushed. nuclear@0: { nuclear@0: Lock::Locker lock(&pImpl->QueueLock); nuclear@0: if (pImpl->ExitEnqueued) nuclear@0: return; nuclear@0: pImpl->ExitEnqueued = true; nuclear@0: } nuclear@0: nuclear@0: PushCommand(ThreadCommandQueueImpl::ExitCommand(pImpl, wait)); nuclear@0: } nuclear@0: nuclear@0: bool ThreadCommandQueue::IsExiting() const nuclear@0: { nuclear@0: return pImpl->ExitProcessed; nuclear@0: } nuclear@0: nuclear@0: nuclear@0: } // namespace OVR