oculus1
diff libovr/Src/OVR_ThreadCommandQueue.cpp @ 1:e2f9e4603129
added LibOVR and started a simple vr wrapper.
author | John Tsiombikas <nuclear@member.fsf.org> |
---|---|
date | Sat, 14 Sep 2013 16:14:59 +0300 |
parents | |
children |
line diff
1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/libovr/Src/OVR_ThreadCommandQueue.cpp Sat Sep 14 16:14:59 2013 +0300 1.3 @@ -0,0 +1,370 @@ 1.4 +/************************************************************************************ 1.5 + 1.6 +PublicHeader: None 1.7 +Filename : OVR_ThreadCommandQueue.cpp 1.8 +Content : Command queue for operations executed on a thread 1.9 +Created : October 29, 2012 1.10 + 1.11 +Copyright : Copyright 2012 Oculus VR, Inc. All Rights reserved. 1.12 + 1.13 +Use of this software is subject to the terms of the Oculus license 1.14 +agreement provided at the time of installation or download, or which 1.15 +otherwise accompanies this software in either electronic or hard copy form. 1.16 + 1.17 +************************************************************************************/ 1.18 + 1.19 +#include "OVR_ThreadCommandQueue.h" 1.20 + 1.21 +namespace OVR { 1.22 + 1.23 + 1.24 +//------------------------------------------------------------------------ 1.25 +// ***** CircularBuffer 1.26 + 1.27 +// CircularBuffer is a FIFO buffer implemented in a single block of memory, 1.28 +// which allows writing and reading variable-size data chucks. Write fails 1.29 +// if buffer is full. 1.30 + 1.31 +class CircularBuffer 1.32 +{ 1.33 + enum { 1.34 + AlignSize = 16, 1.35 + AlignMask = AlignSize - 1 1.36 + }; 1.37 + 1.38 + UByte* pBuffer; 1.39 + UPInt Size; 1.40 + UPInt Tail; // Byte offset of next item to be popped. 1.41 + UPInt Head; // Byte offset of where next push will take place. 1.42 + UPInt End; // When Head < Tail, this is used instead of Size. 1.43 + 1.44 + inline UPInt roundUpSize(UPInt size) 1.45 + { return (size + AlignMask) & ~(UPInt)AlignMask; } 1.46 + 1.47 +public: 1.48 + 1.49 + CircularBuffer(UPInt size) 1.50 + : Size(size), Tail(0), Head(0), End(0) 1.51 + { 1.52 + pBuffer = (UByte*)OVR_ALLOC_ALIGNED(roundUpSize(size), AlignSize); 1.53 + } 1.54 + ~CircularBuffer() 1.55 + { 1.56 + // For ThreadCommands, we must consume everything before shutdown. 1.57 + OVR_ASSERT(IsEmpty()); 1.58 + OVR_FREE_ALIGNED(pBuffer); 1.59 + } 1.60 + 1.61 + bool IsEmpty() const { return (Head == Tail); } 1.62 + 1.63 + // Allocates a state block of specified size and advances pointers, 1.64 + // returning 0 if buffer is full. 1.65 + UByte* Write(UPInt size); 1.66 + 1.67 + // Returns a pointer to next available data block; 0 if none available. 1.68 + UByte* ReadBegin() 1.69 + { return (Head != Tail) ? (pBuffer + Tail) : 0; } 1.70 + // Consumes data of specified size; this must match size passed to Write. 1.71 + void ReadEnd(UPInt size); 1.72 +}; 1.73 + 1.74 + 1.75 +// Allocates a state block of specified size and advances pointers, 1.76 +// returning 0 if buffer is full. 1.77 +UByte* CircularBuffer::Write(UPInt size) 1.78 +{ 1.79 + UByte* p = 0; 1.80 + 1.81 + size = roundUpSize(size); 1.82 + // Since this is circular buffer, always allow at least one item. 1.83 + OVR_ASSERT(size < Size/2); 1.84 + 1.85 + if (Head >= Tail) 1.86 + { 1.87 + OVR_ASSERT(End == 0); 1.88 + 1.89 + if (size <= (Size - Head)) 1.90 + { 1.91 + p = pBuffer + Head; 1.92 + Head += size; 1.93 + } 1.94 + else if (size < Tail) 1.95 + { 1.96 + p = pBuffer; 1.97 + End = Head; 1.98 + Head = size; 1.99 + OVR_ASSERT(Head != Tail); 1.100 + } 1.101 + } 1.102 + else 1.103 + { 1.104 + OVR_ASSERT(End != 0); 1.105 + 1.106 + if ((Tail - Head) > size) 1.107 + { 1.108 + p = pBuffer + Head; 1.109 + Head += size; 1.110 + OVR_ASSERT(Head != Tail); 1.111 + } 1.112 + } 1.113 + 1.114 + return p; 1.115 +} 1.116 + 1.117 +void CircularBuffer::ReadEnd(UPInt size) 1.118 +{ 1.119 + OVR_ASSERT(Head != Tail); 1.120 + size = roundUpSize(size); 1.121 + 1.122 + Tail += size; 1.123 + if (Tail == End) 1.124 + { 1.125 + Tail = End = 0; 1.126 + } 1.127 + else if (Tail == Head) 1.128 + { 1.129 + OVR_ASSERT(End == 0); 1.130 + Tail = Head = 0; 1.131 + } 1.132 +} 1.133 + 1.134 + 1.135 +//------------------------------------------------------------------------------------- 1.136 +// ***** ThreadCommand 1.137 + 1.138 +ThreadCommand::PopBuffer::~PopBuffer() 1.139 +{ 1.140 + if (Size) 1.141 + Destruct<ThreadCommand>(toCommand()); 1.142 +} 1.143 + 1.144 +void ThreadCommand::PopBuffer::InitFromBuffer(void* data) 1.145 +{ 1.146 + ThreadCommand* cmd = (ThreadCommand*)data; 1.147 + OVR_ASSERT(cmd->Size <= MaxSize); 1.148 + 1.149 + if (Size) 1.150 + Destruct<ThreadCommand>(toCommand()); 1.151 + Size = cmd->Size; 1.152 + memcpy(Buffer, (void*)cmd, Size); 1.153 +} 1.154 + 1.155 +void ThreadCommand::PopBuffer::Execute() 1.156 +{ 1.157 + ThreadCommand* command = toCommand(); 1.158 + OVR_ASSERT(command); 1.159 + command->Execute(); 1.160 + if (NeedsWait()) 1.161 + GetEvent()->PulseEvent(); 1.162 +} 1.163 + 1.164 +//------------------------------------------------------------------------------------- 1.165 + 1.166 +class ThreadCommandQueueImpl : public NewOverrideBase 1.167 +{ 1.168 + typedef ThreadCommand::NotifyEvent NotifyEvent; 1.169 + friend class ThreadCommandQueue; 1.170 + 1.171 +public: 1.172 + 1.173 + ThreadCommandQueueImpl(ThreadCommandQueue* queue) 1.174 + : pQueue(queue), CommandBuffer(2048), 1.175 + ExitEnqueued(false), ExitProcessed(false) 1.176 + { 1.177 + } 1.178 + ~ThreadCommandQueueImpl(); 1.179 + 1.180 + 1.181 + bool PushCommand(const ThreadCommand& command); 1.182 + bool PopCommand(ThreadCommand::PopBuffer* popBuffer); 1.183 + 1.184 + 1.185 + // ExitCommand is used by notify us that Thread is shutting down. 1.186 + struct ExitCommand : public ThreadCommand 1.187 + { 1.188 + ThreadCommandQueueImpl* pImpl; 1.189 + 1.190 + ExitCommand(ThreadCommandQueueImpl* impl, bool wait) 1.191 + : ThreadCommand(sizeof(ExitCommand), wait, true), pImpl(impl) { } 1.192 + 1.193 + virtual void Execute() const 1.194 + { 1.195 + Lock::Locker lock(&pImpl->QueueLock); 1.196 + pImpl->ExitProcessed = true; 1.197 + } 1.198 + virtual ThreadCommand* CopyConstruct(void* p) const 1.199 + { return Construct<ExitCommand>(p, *this); } 1.200 + }; 1.201 + 1.202 + 1.203 + NotifyEvent* AllocNotifyEvent_NTS() 1.204 + { 1.205 + NotifyEvent* p = AvailableEvents.GetFirst(); 1.206 + 1.207 + if (!AvailableEvents.IsNull(p)) 1.208 + p->RemoveNode(); 1.209 + else 1.210 + p = new NotifyEvent; 1.211 + return p; 1.212 + } 1.213 + 1.214 + void FreeNotifyEvent_NTS(NotifyEvent* p) 1.215 + { 1.216 + AvailableEvents.PushBack(p); 1.217 + } 1.218 + 1.219 + void FreeNotifyEvents_NTS() 1.220 + { 1.221 + while(!AvailableEvents.IsEmpty()) 1.222 + { 1.223 + NotifyEvent* p = AvailableEvents.GetFirst(); 1.224 + p->RemoveNode(); 1.225 + delete p; 1.226 + } 1.227 + } 1.228 + 1.229 + ThreadCommandQueue* pQueue; 1.230 + Lock QueueLock; 1.231 + volatile bool ExitEnqueued; 1.232 + volatile bool ExitProcessed; 1.233 + List<NotifyEvent> AvailableEvents; 1.234 + List<NotifyEvent> BlockedProducers; 1.235 + CircularBuffer CommandBuffer; 1.236 +}; 1.237 + 1.238 + 1.239 + 1.240 +ThreadCommandQueueImpl::~ThreadCommandQueueImpl() 1.241 +{ 1.242 + Lock::Locker lock(&QueueLock); 1.243 + OVR_ASSERT(BlockedProducers.IsEmpty()); 1.244 + FreeNotifyEvents_NTS(); 1.245 +} 1.246 + 1.247 +bool ThreadCommandQueueImpl::PushCommand(const ThreadCommand& command) 1.248 +{ 1.249 + ThreadCommand::NotifyEvent* completeEvent = 0; 1.250 + ThreadCommand::NotifyEvent* queueAvailableEvent = 0; 1.251 + 1.252 + // Repeat writing command into buffer until it is available. 1.253 + do { 1.254 + 1.255 + { // Lock Scope 1.256 + Lock::Locker lock(&QueueLock); 1.257 + 1.258 + if (queueAvailableEvent) 1.259 + { 1.260 + FreeNotifyEvent_NTS(queueAvailableEvent); 1.261 + queueAvailableEvent = 0; 1.262 + } 1.263 + 1.264 + // Don't allow any commands after PushExitCommand() is called. 1.265 + if (ExitEnqueued && !command.ExitFlag) 1.266 + return false; 1.267 + 1.268 + 1.269 + bool bufferWasEmpty = CommandBuffer.IsEmpty(); 1.270 + UByte* buffer = CommandBuffer.Write(command.GetSize()); 1.271 + if (buffer) 1.272 + { 1.273 + ThreadCommand* c = command.CopyConstruct(buffer); 1.274 + if (c->NeedsWait()) 1.275 + completeEvent = c->pEvent = AllocNotifyEvent_NTS(); 1.276 + // Signal-waker consumer when we add data to buffer. 1.277 + if (bufferWasEmpty) 1.278 + pQueue->OnPushNonEmpty_Locked(); 1.279 + break; 1.280 + } 1.281 + 1.282 + queueAvailableEvent = AllocNotifyEvent_NTS(); 1.283 + BlockedProducers.PushBack(queueAvailableEvent); 1.284 + } // Lock Scope 1.285 + 1.286 + queueAvailableEvent->Wait(); 1.287 + 1.288 + } while(1); 1.289 + 1.290 + // Command was enqueued, wait if necessary. 1.291 + if (completeEvent) 1.292 + { 1.293 + completeEvent->Wait(); 1.294 + Lock::Locker lock(&QueueLock); 1.295 + FreeNotifyEvent_NTS(completeEvent); 1.296 + } 1.297 + 1.298 + return true; 1.299 +} 1.300 + 1.301 + 1.302 +// Pops the next command from the thread queue, if any is available. 1.303 +bool ThreadCommandQueueImpl::PopCommand(ThreadCommand::PopBuffer* popBuffer) 1.304 +{ 1.305 + Lock::Locker lock(&QueueLock); 1.306 + 1.307 + UByte* buffer = CommandBuffer.ReadBegin(); 1.308 + if (!buffer) 1.309 + { 1.310 + // Notify thread while in lock scope, enabling initialization of wait. 1.311 + pQueue->OnPopEmpty_Locked(); 1.312 + return false; 1.313 + } 1.314 + 1.315 + popBuffer->InitFromBuffer(buffer); 1.316 + CommandBuffer.ReadEnd(popBuffer->GetSize()); 1.317 + 1.318 + if (!BlockedProducers.IsEmpty()) 1.319 + { 1.320 + ThreadCommand::NotifyEvent* queueAvailableEvent = BlockedProducers.GetFirst(); 1.321 + queueAvailableEvent->RemoveNode(); 1.322 + queueAvailableEvent->PulseEvent(); 1.323 + // Event is freed later by waiter. 1.324 + } 1.325 + return true; 1.326 +} 1.327 + 1.328 + 1.329 +//------------------------------------------------------------------------------------- 1.330 + 1.331 +ThreadCommandQueue::ThreadCommandQueue() 1.332 +{ 1.333 + pImpl = new ThreadCommandQueueImpl(this); 1.334 +} 1.335 +ThreadCommandQueue::~ThreadCommandQueue() 1.336 +{ 1.337 + delete pImpl; 1.338 +} 1.339 + 1.340 +bool ThreadCommandQueue::PushCommand(const ThreadCommand& command) 1.341 +{ 1.342 + return pImpl->PushCommand(command); 1.343 +} 1.344 + 1.345 +bool ThreadCommandQueue::PopCommand(ThreadCommand::PopBuffer* popBuffer) 1.346 +{ 1.347 + return pImpl->PopCommand(popBuffer); 1.348 +} 1.349 + 1.350 +void ThreadCommandQueue::PushExitCommand(bool wait) 1.351 +{ 1.352 + // Exit is processed in two stages: 1.353 + // - First, ExitEnqueued flag is set to block further commands from queuing up. 1.354 + // - Second, the actual exit call is processed on the consumer thread, flushing 1.355 + // any prior commands. 1.356 + // IsExiting() only returns true after exit has flushed. 1.357 + { 1.358 + Lock::Locker lock(&pImpl->QueueLock); 1.359 + if (pImpl->ExitEnqueued) 1.360 + return; 1.361 + pImpl->ExitEnqueued = true; 1.362 + } 1.363 + 1.364 + PushCommand(ThreadCommandQueueImpl::ExitCommand(pImpl, wait)); 1.365 +} 1.366 + 1.367 +bool ThreadCommandQueue::IsExiting() const 1.368 +{ 1.369 + return pImpl->ExitProcessed; 1.370 +} 1.371 + 1.372 + 1.373 +} // namespace OVR