nuclear@1: /************************************************************************************ nuclear@1: nuclear@1: PublicHeader: None nuclear@1: Filename : OVR_ThreadCommandQueue.cpp nuclear@1: Content : Command queue for operations executed on a thread nuclear@1: Created : October 29, 2012 nuclear@1: nuclear@1: Copyright : Copyright 2012 Oculus VR, Inc. All Rights reserved. nuclear@1: nuclear@1: Use of this software is subject to the terms of the Oculus license nuclear@1: agreement provided at the time of installation or download, or which nuclear@1: otherwise accompanies this software in either electronic or hard copy form. nuclear@1: nuclear@1: ************************************************************************************/ nuclear@1: nuclear@1: #include "OVR_ThreadCommandQueue.h" nuclear@1: nuclear@1: namespace OVR { nuclear@1: nuclear@1: nuclear@1: //------------------------------------------------------------------------ nuclear@1: // ***** CircularBuffer nuclear@1: nuclear@1: // CircularBuffer is a FIFO buffer implemented in a single block of memory, nuclear@1: // which allows writing and reading variable-size data chucks. Write fails nuclear@1: // if buffer is full. nuclear@1: nuclear@1: class CircularBuffer nuclear@1: { nuclear@1: enum { nuclear@1: AlignSize = 16, nuclear@1: AlignMask = AlignSize - 1 nuclear@1: }; nuclear@1: nuclear@1: UByte* pBuffer; nuclear@1: UPInt Size; nuclear@1: UPInt Tail; // Byte offset of next item to be popped. nuclear@1: UPInt Head; // Byte offset of where next push will take place. nuclear@1: UPInt End; // When Head < Tail, this is used instead of Size. nuclear@1: nuclear@1: inline UPInt roundUpSize(UPInt size) nuclear@1: { return (size + AlignMask) & ~(UPInt)AlignMask; } nuclear@1: nuclear@1: public: nuclear@1: nuclear@1: CircularBuffer(UPInt size) nuclear@1: : Size(size), Tail(0), Head(0), End(0) nuclear@1: { nuclear@1: pBuffer = (UByte*)OVR_ALLOC_ALIGNED(roundUpSize(size), AlignSize); nuclear@1: } nuclear@1: ~CircularBuffer() nuclear@1: { nuclear@1: // For ThreadCommands, we must consume everything before shutdown. nuclear@1: OVR_ASSERT(IsEmpty()); nuclear@1: OVR_FREE_ALIGNED(pBuffer); nuclear@1: } nuclear@1: nuclear@1: bool IsEmpty() const { return (Head == Tail); } nuclear@1: nuclear@1: // Allocates a state block of specified size and advances pointers, nuclear@1: // returning 0 if buffer is full. nuclear@1: UByte* Write(UPInt size); nuclear@1: nuclear@1: // Returns a pointer to next available data block; 0 if none available. nuclear@1: UByte* ReadBegin() nuclear@1: { return (Head != Tail) ? (pBuffer + Tail) : 0; } nuclear@1: // Consumes data of specified size; this must match size passed to Write. nuclear@1: void ReadEnd(UPInt size); nuclear@1: }; nuclear@1: nuclear@1: nuclear@1: // Allocates a state block of specified size and advances pointers, nuclear@1: // returning 0 if buffer is full. nuclear@1: UByte* CircularBuffer::Write(UPInt size) nuclear@1: { nuclear@1: UByte* p = 0; nuclear@1: nuclear@1: size = roundUpSize(size); nuclear@1: // Since this is circular buffer, always allow at least one item. nuclear@1: OVR_ASSERT(size < Size/2); nuclear@1: nuclear@1: if (Head >= Tail) nuclear@1: { nuclear@1: OVR_ASSERT(End == 0); nuclear@1: nuclear@1: if (size <= (Size - Head)) nuclear@1: { nuclear@1: p = pBuffer + Head; nuclear@1: Head += size; nuclear@1: } nuclear@1: else if (size < Tail) nuclear@1: { nuclear@1: p = pBuffer; nuclear@1: End = Head; nuclear@1: Head = size; nuclear@1: OVR_ASSERT(Head != Tail); nuclear@1: } nuclear@1: } nuclear@1: else nuclear@1: { nuclear@1: OVR_ASSERT(End != 0); nuclear@1: nuclear@1: if ((Tail - Head) > size) nuclear@1: { nuclear@1: p = pBuffer + Head; nuclear@1: Head += size; nuclear@1: OVR_ASSERT(Head != Tail); nuclear@1: } nuclear@1: } nuclear@1: nuclear@1: return p; nuclear@1: } nuclear@1: nuclear@1: void CircularBuffer::ReadEnd(UPInt size) nuclear@1: { nuclear@1: OVR_ASSERT(Head != Tail); nuclear@1: size = roundUpSize(size); nuclear@1: nuclear@1: Tail += size; nuclear@1: if (Tail == End) nuclear@1: { nuclear@1: Tail = End = 0; nuclear@1: } nuclear@1: else if (Tail == Head) nuclear@1: { nuclear@1: OVR_ASSERT(End == 0); nuclear@1: Tail = Head = 0; nuclear@1: } nuclear@1: } nuclear@1: nuclear@1: nuclear@1: //------------------------------------------------------------------------------------- nuclear@1: // ***** ThreadCommand nuclear@1: nuclear@1: ThreadCommand::PopBuffer::~PopBuffer() nuclear@1: { nuclear@1: if (Size) nuclear@1: Destruct(toCommand()); nuclear@1: } nuclear@1: nuclear@1: void ThreadCommand::PopBuffer::InitFromBuffer(void* data) nuclear@1: { nuclear@1: ThreadCommand* cmd = (ThreadCommand*)data; nuclear@1: OVR_ASSERT(cmd->Size <= MaxSize); nuclear@1: nuclear@1: if (Size) nuclear@1: Destruct(toCommand()); nuclear@1: Size = cmd->Size; nuclear@1: memcpy(Buffer, (void*)cmd, Size); nuclear@1: } nuclear@1: nuclear@1: void ThreadCommand::PopBuffer::Execute() nuclear@1: { nuclear@1: ThreadCommand* command = toCommand(); nuclear@1: OVR_ASSERT(command); nuclear@1: command->Execute(); nuclear@1: if (NeedsWait()) nuclear@1: GetEvent()->PulseEvent(); nuclear@1: } nuclear@1: nuclear@1: //------------------------------------------------------------------------------------- nuclear@1: nuclear@1: class ThreadCommandQueueImpl : public NewOverrideBase nuclear@1: { nuclear@1: typedef ThreadCommand::NotifyEvent NotifyEvent; nuclear@1: friend class ThreadCommandQueue; nuclear@1: nuclear@1: public: nuclear@1: nuclear@1: ThreadCommandQueueImpl(ThreadCommandQueue* queue) nuclear@1: : pQueue(queue), CommandBuffer(2048), nuclear@1: ExitEnqueued(false), ExitProcessed(false) nuclear@1: { nuclear@1: } nuclear@1: ~ThreadCommandQueueImpl(); nuclear@1: nuclear@1: nuclear@1: bool PushCommand(const ThreadCommand& command); nuclear@1: bool PopCommand(ThreadCommand::PopBuffer* popBuffer); nuclear@1: nuclear@1: nuclear@1: // ExitCommand is used by notify us that Thread is shutting down. nuclear@1: struct ExitCommand : public ThreadCommand nuclear@1: { nuclear@1: ThreadCommandQueueImpl* pImpl; nuclear@1: nuclear@1: ExitCommand(ThreadCommandQueueImpl* impl, bool wait) nuclear@1: : ThreadCommand(sizeof(ExitCommand), wait, true), pImpl(impl) { } nuclear@1: nuclear@1: virtual void Execute() const nuclear@1: { nuclear@1: Lock::Locker lock(&pImpl->QueueLock); nuclear@1: pImpl->ExitProcessed = true; nuclear@1: } nuclear@1: virtual ThreadCommand* CopyConstruct(void* p) const nuclear@1: { return Construct(p, *this); } nuclear@1: }; nuclear@1: nuclear@1: nuclear@1: NotifyEvent* AllocNotifyEvent_NTS() nuclear@1: { nuclear@1: NotifyEvent* p = AvailableEvents.GetFirst(); nuclear@1: nuclear@1: if (!AvailableEvents.IsNull(p)) nuclear@1: p->RemoveNode(); nuclear@1: else nuclear@1: p = new NotifyEvent; nuclear@1: return p; nuclear@1: } nuclear@1: nuclear@1: void FreeNotifyEvent_NTS(NotifyEvent* p) nuclear@1: { nuclear@1: AvailableEvents.PushBack(p); nuclear@1: } nuclear@1: nuclear@1: void FreeNotifyEvents_NTS() nuclear@1: { nuclear@1: while(!AvailableEvents.IsEmpty()) nuclear@1: { nuclear@1: NotifyEvent* p = AvailableEvents.GetFirst(); nuclear@1: p->RemoveNode(); nuclear@1: delete p; nuclear@1: } nuclear@1: } nuclear@1: nuclear@1: ThreadCommandQueue* pQueue; nuclear@1: Lock QueueLock; nuclear@1: volatile bool ExitEnqueued; nuclear@1: volatile bool ExitProcessed; nuclear@1: List AvailableEvents; nuclear@1: List BlockedProducers; nuclear@1: CircularBuffer CommandBuffer; nuclear@1: }; nuclear@1: nuclear@1: nuclear@1: nuclear@1: ThreadCommandQueueImpl::~ThreadCommandQueueImpl() nuclear@1: { nuclear@1: Lock::Locker lock(&QueueLock); nuclear@1: OVR_ASSERT(BlockedProducers.IsEmpty()); nuclear@1: FreeNotifyEvents_NTS(); nuclear@1: } nuclear@1: nuclear@1: bool ThreadCommandQueueImpl::PushCommand(const ThreadCommand& command) nuclear@1: { nuclear@1: ThreadCommand::NotifyEvent* completeEvent = 0; nuclear@1: ThreadCommand::NotifyEvent* queueAvailableEvent = 0; nuclear@1: nuclear@1: // Repeat writing command into buffer until it is available. nuclear@1: do { nuclear@1: nuclear@1: { // Lock Scope nuclear@1: Lock::Locker lock(&QueueLock); nuclear@1: nuclear@1: if (queueAvailableEvent) nuclear@1: { nuclear@1: FreeNotifyEvent_NTS(queueAvailableEvent); nuclear@1: queueAvailableEvent = 0; nuclear@1: } nuclear@1: nuclear@1: // Don't allow any commands after PushExitCommand() is called. nuclear@1: if (ExitEnqueued && !command.ExitFlag) nuclear@1: return false; nuclear@1: nuclear@1: nuclear@1: bool bufferWasEmpty = CommandBuffer.IsEmpty(); nuclear@1: UByte* buffer = CommandBuffer.Write(command.GetSize()); nuclear@1: if (buffer) nuclear@1: { nuclear@1: ThreadCommand* c = command.CopyConstruct(buffer); nuclear@1: if (c->NeedsWait()) nuclear@1: completeEvent = c->pEvent = AllocNotifyEvent_NTS(); nuclear@1: // Signal-waker consumer when we add data to buffer. nuclear@1: if (bufferWasEmpty) nuclear@1: pQueue->OnPushNonEmpty_Locked(); nuclear@1: break; nuclear@1: } nuclear@1: nuclear@1: queueAvailableEvent = AllocNotifyEvent_NTS(); nuclear@1: BlockedProducers.PushBack(queueAvailableEvent); nuclear@1: } // Lock Scope nuclear@1: nuclear@1: queueAvailableEvent->Wait(); nuclear@1: nuclear@1: } while(1); nuclear@1: nuclear@1: // Command was enqueued, wait if necessary. nuclear@1: if (completeEvent) nuclear@1: { nuclear@1: completeEvent->Wait(); nuclear@1: Lock::Locker lock(&QueueLock); nuclear@1: FreeNotifyEvent_NTS(completeEvent); nuclear@1: } nuclear@1: nuclear@1: return true; nuclear@1: } nuclear@1: nuclear@1: nuclear@1: // Pops the next command from the thread queue, if any is available. nuclear@1: bool ThreadCommandQueueImpl::PopCommand(ThreadCommand::PopBuffer* popBuffer) nuclear@1: { nuclear@1: Lock::Locker lock(&QueueLock); nuclear@1: nuclear@1: UByte* buffer = CommandBuffer.ReadBegin(); nuclear@1: if (!buffer) nuclear@1: { nuclear@1: // Notify thread while in lock scope, enabling initialization of wait. nuclear@1: pQueue->OnPopEmpty_Locked(); nuclear@1: return false; nuclear@1: } nuclear@1: nuclear@1: popBuffer->InitFromBuffer(buffer); nuclear@1: CommandBuffer.ReadEnd(popBuffer->GetSize()); nuclear@1: nuclear@1: if (!BlockedProducers.IsEmpty()) nuclear@1: { nuclear@1: ThreadCommand::NotifyEvent* queueAvailableEvent = BlockedProducers.GetFirst(); nuclear@1: queueAvailableEvent->RemoveNode(); nuclear@1: queueAvailableEvent->PulseEvent(); nuclear@1: // Event is freed later by waiter. nuclear@1: } nuclear@1: return true; nuclear@1: } nuclear@1: nuclear@1: nuclear@1: //------------------------------------------------------------------------------------- nuclear@1: nuclear@1: ThreadCommandQueue::ThreadCommandQueue() nuclear@1: { nuclear@1: pImpl = new ThreadCommandQueueImpl(this); nuclear@1: } nuclear@1: ThreadCommandQueue::~ThreadCommandQueue() nuclear@1: { nuclear@1: delete pImpl; nuclear@1: } nuclear@1: nuclear@1: bool ThreadCommandQueue::PushCommand(const ThreadCommand& command) nuclear@1: { nuclear@1: return pImpl->PushCommand(command); nuclear@1: } nuclear@1: nuclear@1: bool ThreadCommandQueue::PopCommand(ThreadCommand::PopBuffer* popBuffer) nuclear@1: { nuclear@1: return pImpl->PopCommand(popBuffer); nuclear@1: } nuclear@1: nuclear@1: void ThreadCommandQueue::PushExitCommand(bool wait) nuclear@1: { nuclear@1: // Exit is processed in two stages: nuclear@1: // - First, ExitEnqueued flag is set to block further commands from queuing up. nuclear@1: // - Second, the actual exit call is processed on the consumer thread, flushing nuclear@1: // any prior commands. nuclear@1: // IsExiting() only returns true after exit has flushed. nuclear@1: { nuclear@1: Lock::Locker lock(&pImpl->QueueLock); nuclear@1: if (pImpl->ExitEnqueued) nuclear@1: return; nuclear@1: pImpl->ExitEnqueued = true; nuclear@1: } nuclear@1: nuclear@1: PushCommand(ThreadCommandQueueImpl::ExitCommand(pImpl, wait)); nuclear@1: } nuclear@1: nuclear@1: bool ThreadCommandQueue::IsExiting() const nuclear@1: { nuclear@1: return pImpl->ExitProcessed; nuclear@1: } nuclear@1: nuclear@1: nuclear@1: } // namespace OVR