ovr_sdk
diff LibOVR/Src/Kernel/OVR_ThreadCommandQueue.cpp @ 0:1b39a1b46319
initial 0.4.4
author | John Tsiombikas <nuclear@member.fsf.org> |
---|---|
date | Wed, 14 Jan 2015 06:51:16 +0200 |
parents | |
children |
line diff
1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/LibOVR/Src/Kernel/OVR_ThreadCommandQueue.cpp Wed Jan 14 06:51:16 2015 +0200 1.3 @@ -0,0 +1,401 @@ 1.4 +/************************************************************************************ 1.5 + 1.6 +PublicHeader: None 1.7 +Filename : OVR_ThreadCommandQueue.cpp 1.8 +Content : Command queue for operations executed on a thread 1.9 +Created : October 29, 2012 1.10 + 1.11 +Copyright : Copyright 2014 Oculus VR, LLC All Rights reserved. 1.12 + 1.13 +Licensed under the Oculus VR Rift SDK License Version 3.2 (the "License"); 1.14 +you may not use the Oculus VR Rift SDK except in compliance with the License, 1.15 +which is provided at the time of installation or download, or which 1.16 +otherwise accompanies this software in either electronic or hard copy form. 1.17 + 1.18 +You may obtain a copy of the License at 1.19 + 1.20 +http://www.oculusvr.com/licenses/LICENSE-3.2 1.21 + 1.22 +Unless required by applicable law or agreed to in writing, the Oculus VR SDK 1.23 +distributed under the License is distributed on an "AS IS" BASIS, 1.24 +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 1.25 +See the License for the specific language governing permissions and 1.26 +limitations under the License. 1.27 + 1.28 +************************************************************************************/ 1.29 + 1.30 +#include "OVR_ThreadCommandQueue.h" 1.31 + 1.32 +namespace OVR { 1.33 + 1.34 + 1.35 +//------------------------------------------------------------------------ 1.36 +// ***** CircularBuffer 1.37 + 1.38 +// CircularBuffer is a FIFO buffer implemented in a single block of memory, 1.39 +// which allows writing and reading variable-size data chucks. Write fails 1.40 +// if buffer is full. 1.41 + 1.42 +class CircularBuffer 1.43 +{ 1.44 + enum { 1.45 + AlignSize = 16, 1.46 + AlignMask = AlignSize - 1 1.47 + }; 1.48 + 1.49 + uint8_t* pBuffer; 1.50 + size_t Size; 1.51 + size_t Tail; // Byte offset of next item to be popped. 1.52 + size_t Head; // Byte offset of where next push will take place. 1.53 + size_t End; // When Head < Tail, this is used instead of Size. 1.54 + 1.55 + inline size_t roundUpSize(size_t size) 1.56 + { return (size + AlignMask) & ~(size_t)AlignMask; } 1.57 + 1.58 +public: 1.59 + 1.60 + CircularBuffer(size_t size) 1.61 + : Size(size), Tail(0), Head(0), End(0) 1.62 + { 1.63 + pBuffer = (uint8_t*)OVR_ALLOC_ALIGNED(roundUpSize(size), AlignSize); 1.64 + } 1.65 + ~CircularBuffer() 1.66 + { 1.67 + // For ThreadCommands, we must consume everything before shutdown. 1.68 + OVR_ASSERT(IsEmpty()); 1.69 + OVR_FREE_ALIGNED(pBuffer); 1.70 + } 1.71 + 1.72 + bool IsEmpty() const { return (Head == Tail); } 1.73 + 1.74 + // Allocates a state block of specified size and advances pointers, 1.75 + // returning 0 if buffer is full. 1.76 + uint8_t* Write(size_t size); 1.77 + 1.78 + // Returns a pointer to next available data block; 0 if none available. 1.79 + uint8_t* ReadBegin() 1.80 + { return (Head != Tail) ? (pBuffer + Tail) : 0; } 1.81 + // Consumes data of specified size; this must match size passed to Write. 1.82 + void ReadEnd(size_t size); 1.83 +}; 1.84 + 1.85 + 1.86 +// Allocates a state block of specified size and advances pointers, 1.87 +// returning 0 if buffer is full. 1.88 +uint8_t* CircularBuffer::Write(size_t size) 1.89 +{ 1.90 + uint8_t* p = 0; 1.91 + 1.92 + size = roundUpSize(size); 1.93 + // Since this is circular buffer, always allow at least one item. 1.94 + OVR_ASSERT(size < Size/2); 1.95 + 1.96 + if (Head >= Tail) 1.97 + { 1.98 + OVR_ASSERT(End == 0); 1.99 + 1.100 + if (size <= (Size - Head)) 1.101 + { 1.102 + p = pBuffer + Head; 1.103 + Head += size; 1.104 + } 1.105 + else if (size < Tail) 1.106 + { 1.107 + p = pBuffer; 1.108 + End = Head; 1.109 + Head = size; 1.110 + OVR_ASSERT(Head != Tail); 1.111 + } 1.112 + } 1.113 + else 1.114 + { 1.115 + OVR_ASSERT(End != 0); 1.116 + 1.117 + if ((Tail - Head) > size) 1.118 + { 1.119 + p = pBuffer + Head; 1.120 + Head += size; 1.121 + OVR_ASSERT(Head != Tail); 1.122 + } 1.123 + } 1.124 + 1.125 + return p; 1.126 +} 1.127 + 1.128 +void CircularBuffer::ReadEnd(size_t size) 1.129 +{ 1.130 + OVR_ASSERT(Head != Tail); 1.131 + size = roundUpSize(size); 1.132 + 1.133 + Tail += size; 1.134 + if (Tail == End) 1.135 + { 1.136 + Tail = End = 0; 1.137 + } 1.138 + else if (Tail == Head) 1.139 + { 1.140 + OVR_ASSERT(End == 0); 1.141 + Tail = Head = 0; 1.142 + } 1.143 +} 1.144 + 1.145 + 1.146 +//------------------------------------------------------------------------------------- 1.147 +// ***** ThreadCommand 1.148 + 1.149 +ThreadCommand::PopBuffer::~PopBuffer() 1.150 +{ 1.151 + if (Size) { 1.152 + Destruct<ThreadCommand>(toCommand()); 1.153 + } 1.154 +} 1.155 + 1.156 +void ThreadCommand::PopBuffer::InitFromBuffer(void* data) 1.157 +{ 1.158 + ThreadCommand* cmd = (ThreadCommand*)data; 1.159 + OVR_ASSERT(cmd->Size <= MaxSize); 1.160 + 1.161 + if (Size) { 1.162 + Destruct<ThreadCommand>(toCommand()); 1.163 + } 1.164 + Size = cmd->Size; 1.165 + memcpy(Buffer, (void*)cmd, Size); 1.166 +} 1.167 + 1.168 +void ThreadCommand::PopBuffer::Execute() 1.169 +{ 1.170 + ThreadCommand* command = toCommand(); 1.171 + OVR_ASSERT(command); 1.172 + command->Execute(); 1.173 + if (NeedsWait()) { 1.174 + GetEvent()->PulseEvent(); 1.175 + } 1.176 +} 1.177 + 1.178 +//------------------------------------------------------------------------------------- 1.179 + 1.180 +class ThreadCommandQueueImpl : public NewOverrideBase 1.181 +{ 1.182 + typedef ThreadCommand::NotifyEvent NotifyEvent; 1.183 + friend class ThreadCommandQueue; 1.184 + 1.185 +public: 1.186 + 1.187 + ThreadCommandQueueImpl(ThreadCommandQueue* queue) : 1.188 + pQueue(queue), 1.189 + ExitEnqueued(false), 1.190 + ExitProcessed(false), 1.191 + CommandBuffer(2048), 1.192 + PullThreadId(0) 1.193 + { 1.194 + } 1.195 + ~ThreadCommandQueueImpl(); 1.196 + 1.197 + 1.198 + bool PushCommand(const ThreadCommand& command); 1.199 + bool PopCommand(ThreadCommand::PopBuffer* popBuffer); 1.200 + 1.201 + 1.202 + // ExitCommand is used by notify us that Thread is shutting down. 1.203 + struct ExitCommand : public ThreadCommand 1.204 + { 1.205 + ThreadCommandQueueImpl* pImpl; 1.206 + 1.207 + ExitCommand(ThreadCommandQueueImpl* impl, bool wait) 1.208 + : ThreadCommand(sizeof(ExitCommand), wait, true), pImpl(impl) { } 1.209 + 1.210 + virtual void Execute() const 1.211 + { 1.212 + Lock::Locker lock(&pImpl->QueueLock); 1.213 + pImpl->ExitProcessed = true; 1.214 + } 1.215 + virtual ThreadCommand* CopyConstruct(void* p) const 1.216 + { return Construct<ExitCommand>(p, *this); } 1.217 + }; 1.218 + 1.219 + 1.220 + NotifyEvent* AllocNotifyEvent_NTS() 1.221 + { 1.222 + NotifyEvent* p = AvailableEvents.GetFirst(); 1.223 + 1.224 + if (!AvailableEvents.IsNull(p)) 1.225 + p->RemoveNode(); 1.226 + else 1.227 + p = new NotifyEvent; 1.228 + return p; 1.229 + } 1.230 + 1.231 + void FreeNotifyEvent_NTS(NotifyEvent* p) 1.232 + { 1.233 + AvailableEvents.PushBack(p); 1.234 + } 1.235 + 1.236 + void FreeNotifyEvents_NTS() 1.237 + { 1.238 + while(!AvailableEvents.IsEmpty()) 1.239 + { 1.240 + NotifyEvent* p = AvailableEvents.GetFirst(); 1.241 + p->RemoveNode(); 1.242 + delete p; 1.243 + } 1.244 + } 1.245 + 1.246 + ThreadCommandQueue* pQueue; 1.247 + Lock QueueLock; 1.248 + volatile bool ExitEnqueued; 1.249 + volatile bool ExitProcessed; 1.250 + List<NotifyEvent> AvailableEvents; 1.251 + List<NotifyEvent> BlockedProducers; 1.252 + CircularBuffer CommandBuffer; 1.253 + 1.254 + // The pull thread id is set to the last thread that pulled commands. 1.255 + // Since this thread command queue is designed for a single thread, 1.256 + // reentrant behavior that would cause a dead-lock for messages that 1.257 + // wait for completion can be avoided by simply comparing the 1.258 + // thread id of the last pull. 1.259 + OVR::ThreadId PullThreadId; 1.260 +}; 1.261 + 1.262 +ThreadCommandQueueImpl::~ThreadCommandQueueImpl() 1.263 +{ 1.264 + Lock::Locker lock(&QueueLock); 1.265 + OVR_ASSERT(BlockedProducers.IsEmpty()); 1.266 + FreeNotifyEvents_NTS(); 1.267 +} 1.268 + 1.269 +bool ThreadCommandQueueImpl::PushCommand(const ThreadCommand& command) 1.270 +{ 1.271 + if (command.NeedsWait() && PullThreadId == OVR::GetCurrentThreadId()) 1.272 + { 1.273 + command.Execute(); 1.274 + return true; 1.275 + } 1.276 + 1.277 + ThreadCommand::NotifyEvent* completeEvent = 0; 1.278 + ThreadCommand::NotifyEvent* queueAvailableEvent = 0; 1.279 + 1.280 + // Repeat writing command into buffer until it is available. 1.281 + for (;;) { 1.282 + { // Lock Scope 1.283 + Lock::Locker lock(&QueueLock); 1.284 + 1.285 + if (queueAvailableEvent) { 1.286 + FreeNotifyEvent_NTS(queueAvailableEvent); 1.287 + queueAvailableEvent = 0; 1.288 + } 1.289 + 1.290 + // Don't allow any commands after PushExitCommand() is called. 1.291 + if (ExitEnqueued && !command.ExitFlag) { 1.292 + return false; 1.293 + } 1.294 + 1.295 + bool bufferWasEmpty = CommandBuffer.IsEmpty(); 1.296 + uint8_t* buffer = CommandBuffer.Write(command.GetSize()); 1.297 + 1.298 + if (buffer) { 1.299 + ThreadCommand* c = command.CopyConstruct(buffer); 1.300 + 1.301 + if (c->NeedsWait()) { 1.302 + completeEvent = c->pEvent = AllocNotifyEvent_NTS(); 1.303 + } 1.304 + 1.305 + // Signal-waker consumer when we add data to buffer. 1.306 + if (bufferWasEmpty) { 1.307 + pQueue->OnPushNonEmpty_Locked(); 1.308 + } 1.309 + 1.310 + break; 1.311 + } 1.312 + 1.313 + queueAvailableEvent = AllocNotifyEvent_NTS(); 1.314 + BlockedProducers.PushBack(queueAvailableEvent); 1.315 + } // Lock Scope 1.316 + 1.317 + queueAvailableEvent->Wait(); 1.318 + } // Intentional infinite loop 1.319 + 1.320 + // Command was enqueued, wait if necessary. 1.321 + if (completeEvent) { 1.322 + completeEvent->Wait(); 1.323 + Lock::Locker lock(&QueueLock); 1.324 + FreeNotifyEvent_NTS(completeEvent); 1.325 + } 1.326 + 1.327 + return true; 1.328 +} 1.329 + 1.330 + 1.331 +// Pops the next command from the thread queue, if any is available. 1.332 +bool ThreadCommandQueueImpl::PopCommand(ThreadCommand::PopBuffer* popBuffer) 1.333 +{ 1.334 + PullThreadId = OVR::GetCurrentThreadId(); 1.335 + 1.336 + Lock::Locker lock(&QueueLock); 1.337 + 1.338 + uint8_t* buffer = CommandBuffer.ReadBegin(); 1.339 + if (!buffer) 1.340 + { 1.341 + // Notify thread while in lock scope, enabling initialization of wait. 1.342 + pQueue->OnPopEmpty_Locked(); 1.343 + return false; 1.344 + } 1.345 + 1.346 + popBuffer->InitFromBuffer(buffer); 1.347 + CommandBuffer.ReadEnd(popBuffer->GetSize()); 1.348 + 1.349 + if (!BlockedProducers.IsEmpty()) 1.350 + { 1.351 + ThreadCommand::NotifyEvent* queueAvailableEvent = BlockedProducers.GetFirst(); 1.352 + queueAvailableEvent->RemoveNode(); 1.353 + queueAvailableEvent->PulseEvent(); 1.354 + // Event is freed later by waiter. 1.355 + } 1.356 + return true; 1.357 +} 1.358 + 1.359 + 1.360 +//------------------------------------------------------------------------------------- 1.361 + 1.362 +ThreadCommandQueue::ThreadCommandQueue() 1.363 +{ 1.364 + pImpl = new ThreadCommandQueueImpl(this); 1.365 +} 1.366 +ThreadCommandQueue::~ThreadCommandQueue() 1.367 +{ 1.368 + delete pImpl; 1.369 +} 1.370 + 1.371 +bool ThreadCommandQueue::PushCommand(const ThreadCommand& command) 1.372 +{ 1.373 + return pImpl->PushCommand(command); 1.374 +} 1.375 + 1.376 +bool ThreadCommandQueue::PopCommand(ThreadCommand::PopBuffer* popBuffer) 1.377 +{ 1.378 + return pImpl->PopCommand(popBuffer); 1.379 +} 1.380 + 1.381 +void ThreadCommandQueue::PushExitCommand(bool wait) 1.382 +{ 1.383 + // Exit is processed in two stages: 1.384 + // - First, ExitEnqueued flag is set to block further commands from queuing up. 1.385 + // - Second, the actual exit call is processed on the consumer thread, flushing 1.386 + // any prior commands. 1.387 + // IsExiting() only returns true after exit has flushed. 1.388 + { 1.389 + Lock::Locker lock(&pImpl->QueueLock); 1.390 + if (pImpl->ExitEnqueued) 1.391 + return; 1.392 + pImpl->ExitEnqueued = true; 1.393 + } 1.394 + 1.395 + PushCommand(ThreadCommandQueueImpl::ExitCommand(pImpl, wait)); 1.396 +} 1.397 + 1.398 +bool ThreadCommandQueue::IsExiting() const 1.399 +{ 1.400 + return pImpl->ExitProcessed; 1.401 +} 1.402 + 1.403 + 1.404 +} // namespace OVR