nuclear@0: /************************************************************************************ nuclear@0: nuclear@0: Filename : OVR_RPC1.cpp nuclear@0: Content : A network plugin that provides remote procedure call functionality. nuclear@0: Created : June 10, 2014 nuclear@0: Authors : Kevin Jenkins nuclear@0: nuclear@0: Copyright : Copyright 2014 Oculus VR, LLC All Rights reserved. nuclear@0: nuclear@0: Licensed under the Oculus VR Rift SDK License Version 3.2 (the "License"); nuclear@0: you may not use the Oculus VR Rift SDK except in compliance with the License, nuclear@0: which is provided at the time of installation or download, or which nuclear@0: otherwise accompanies this software in either electronic or hard copy form. nuclear@0: nuclear@0: You may obtain a copy of the License at nuclear@0: nuclear@0: http://www.oculusvr.com/licenses/LICENSE-3.2 nuclear@0: nuclear@0: Unless required by applicable law or agreed to in writing, the Oculus VR SDK nuclear@0: distributed under the License is distributed on an "AS IS" BASIS, nuclear@0: WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. nuclear@0: See the License for the specific language governing permissions and nuclear@0: limitations under the License. nuclear@0: nuclear@0: ************************************************************************************/ nuclear@0: nuclear@0: #include "OVR_RPC1.h" nuclear@0: #include "OVR_BitStream.h" nuclear@0: #include "../Kernel/OVR_Threads.h" // Thread::MSleep nuclear@0: #include "OVR_MessageIDTypes.h" nuclear@0: nuclear@0: namespace OVR { namespace Net { namespace Plugins { nuclear@0: nuclear@0: nuclear@0: //----------------------------------------------------------------------------- nuclear@0: // Types nuclear@0: nuclear@0: enum { nuclear@0: ID_RPC4_SIGNAL, nuclear@0: CALL_BLOCKING, nuclear@0: RPC_ERROR_FUNCTION_NOT_REGISTERED, nuclear@0: ID_RPC4_RETURN, nuclear@0: }; nuclear@0: nuclear@0: nuclear@0: //----------------------------------------------------------------------------- nuclear@0: // RPC1 nuclear@0: nuclear@0: RPC1::RPC1() nuclear@0: { nuclear@0: blockingOnThisConnection = 0; nuclear@0: blockingReturnValue = new BitStream(); nuclear@0: } nuclear@0: nuclear@0: RPC1::~RPC1() nuclear@0: { nuclear@0: slotHash.Clear(); nuclear@0: delete blockingReturnValue; nuclear@0: } nuclear@0: nuclear@0: void RPC1::RegisterSlot(OVR::String sharedIdentifier, OVR::Observer* rpcSlotObserver ) nuclear@0: { nuclear@0: slotHash.AddObserverToSubject(sharedIdentifier, rpcSlotObserver); nuclear@0: } nuclear@0: nuclear@0: bool RPC1::RegisterBlockingFunction(OVR::String uniqueID, RPCDelegate blockingFunction) nuclear@0: { nuclear@0: if (registeredBlockingFunctions.Get(uniqueID)) nuclear@0: return false; nuclear@0: nuclear@0: registeredBlockingFunctions.Set(uniqueID, blockingFunction); nuclear@0: return true; nuclear@0: } nuclear@0: nuclear@0: void RPC1::UnregisterBlockingFunction(OVR::String uniqueID) nuclear@0: { nuclear@0: registeredBlockingFunctions.Remove(uniqueID); nuclear@0: } nuclear@0: nuclear@0: bool RPC1::CallBlocking( OVR::String uniqueID, OVR::Net::BitStream* bitStream, Ptr pConnection, OVR::Net::BitStream* returnData ) nuclear@0: { nuclear@0: // If invalid parameters, nuclear@0: if (!pConnection) nuclear@0: { nuclear@0: // Note: This may happen if the endpoint disconnects just before the call nuclear@0: return false; nuclear@0: } nuclear@0: nuclear@0: OVR::Net::BitStream out; nuclear@0: out.Write((MessageID) OVRID_RPC1); nuclear@0: out.Write((MessageID) CALL_BLOCKING); nuclear@0: out.Write(uniqueID); nuclear@0: if (bitStream) nuclear@0: { nuclear@0: bitStream->ResetReadPointer(); nuclear@0: out.AlignWriteToByteBoundary(); nuclear@0: out.Write(bitStream); nuclear@0: } nuclear@0: nuclear@0: SendParameters sp(pConnection, out.GetData(), out.GetNumberOfBytesUsed()); nuclear@0: nuclear@0: if (returnData) nuclear@0: { nuclear@0: returnData->Reset(); nuclear@0: } nuclear@0: nuclear@0: // Only one thread call at a time nuclear@0: Lock::Locker singleRPCLocker(&singleRPCLock); nuclear@0: nuclear@0: // Note this does not prevent multiple calls at a time because .Wait will unlock it below. nuclear@0: // The purpose of this mutex is to synchronize the polling thread and this one, not prevent nuclear@0: // multiple threads from invoking RPC. nuclear@0: Mutex::Locker locker(&callBlockingMutex); nuclear@0: nuclear@0: blockingReturnValue->Reset(); nuclear@0: blockingOnThisConnection = pConnection; nuclear@0: nuclear@0: int bytesSent = pSession->Send(&sp); nuclear@0: if (bytesSent == sp.Bytes) nuclear@0: { nuclear@0: while (blockingOnThisConnection == pConnection) nuclear@0: { nuclear@0: callBlockingWait.Wait(&callBlockingMutex); nuclear@0: } nuclear@0: } nuclear@0: else nuclear@0: { nuclear@0: return false; nuclear@0: } nuclear@0: nuclear@0: if (returnData) nuclear@0: { nuclear@0: returnData->Write(blockingReturnValue); nuclear@0: returnData->ResetReadPointer(); nuclear@0: } nuclear@0: nuclear@0: return true; nuclear@0: } nuclear@0: nuclear@0: bool RPC1::Signal(OVR::String sharedIdentifier, OVR::Net::BitStream* bitStream, Ptr pConnection) nuclear@0: { nuclear@0: OVR::Net::BitStream out; nuclear@0: out.Write((MessageID) OVRID_RPC1); nuclear@0: out.Write((MessageID) ID_RPC4_SIGNAL); nuclear@0: //out.Write(PluginId); nuclear@0: out.Write(sharedIdentifier); nuclear@0: if (bitStream) nuclear@0: { nuclear@0: bitStream->ResetReadPointer(); nuclear@0: out.AlignWriteToByteBoundary(); nuclear@0: out.Write(bitStream); nuclear@0: } nuclear@0: SendParameters sp(pConnection, out.GetData(), out.GetNumberOfBytesUsed()); nuclear@0: int32_t bytesSent = pSession->Send(&sp); nuclear@0: return bytesSent == sp.Bytes; nuclear@0: } nuclear@0: void RPC1::BroadcastSignal(OVR::String sharedIdentifier, OVR::Net::BitStream* bitStream) nuclear@0: { nuclear@0: OVR::Net::BitStream out; nuclear@0: out.Write((MessageID) OVRID_RPC1); nuclear@0: out.Write((MessageID) ID_RPC4_SIGNAL); nuclear@0: //out.Write(PluginId); nuclear@0: out.Write(sharedIdentifier); nuclear@0: if (bitStream) nuclear@0: { nuclear@0: bitStream->ResetReadPointer(); nuclear@0: out.AlignWriteToByteBoundary(); nuclear@0: out.Write(bitStream); nuclear@0: } nuclear@0: BroadcastParameters p(out.GetData(), out.GetNumberOfBytesUsed()); nuclear@0: pSession->Broadcast(&p); nuclear@0: } nuclear@0: void RPC1::OnReceive(ReceivePayload *pPayload, ListenerReceiveResult *lrrOut) nuclear@0: { nuclear@0: OVR_UNUSED(lrrOut); nuclear@0: nuclear@0: if (pPayload->pData[0] == OVRID_RPC1) nuclear@0: { nuclear@0: OVR_ASSERT(pPayload->Bytes >= 2); nuclear@0: nuclear@0: OVR::Net::BitStream bsIn((char*)pPayload->pData, pPayload->Bytes, false); nuclear@0: bsIn.IgnoreBytes(2); nuclear@0: nuclear@0: if (pPayload->pData[1] == RPC_ERROR_FUNCTION_NOT_REGISTERED) nuclear@0: { nuclear@0: Mutex::Locker locker(&callBlockingMutex); nuclear@0: nuclear@0: blockingReturnValue->Reset(); nuclear@0: blockingOnThisConnection = 0; nuclear@0: callBlockingWait.NotifyAll(); nuclear@0: } nuclear@0: else if (pPayload->pData[1] == ID_RPC4_RETURN) nuclear@0: { nuclear@0: Mutex::Locker locker(&callBlockingMutex); nuclear@0: nuclear@0: blockingReturnValue->Reset(); nuclear@0: blockingReturnValue->Write(bsIn); nuclear@0: blockingOnThisConnection = 0; nuclear@0: callBlockingWait.NotifyAll(); nuclear@0: } nuclear@0: else if (pPayload->pData[1] == CALL_BLOCKING) nuclear@0: { nuclear@0: OVR::String uniqueId; nuclear@0: bsIn.Read(uniqueId); nuclear@0: nuclear@0: RPCDelegate *bf = registeredBlockingFunctions.Get(uniqueId); nuclear@0: if (bf==0) nuclear@0: { nuclear@0: OVR::Net::BitStream bsOut; nuclear@0: bsOut.Write((unsigned char) OVRID_RPC1); nuclear@0: bsOut.Write((unsigned char) RPC_ERROR_FUNCTION_NOT_REGISTERED); nuclear@0: nuclear@0: SendParameters sp(pPayload->pConnection, bsOut.GetData(), bsOut.GetNumberOfBytesUsed()); nuclear@0: pSession->Send(&sp); nuclear@0: nuclear@0: return; nuclear@0: } nuclear@0: nuclear@0: OVR::Net::BitStream returnData; nuclear@0: bsIn.AlignReadToByteBoundary(); nuclear@0: (*bf)(&bsIn, &returnData, pPayload); nuclear@0: nuclear@0: OVR::Net::BitStream out; nuclear@0: out.Write((MessageID) OVRID_RPC1); nuclear@0: out.Write((MessageID) ID_RPC4_RETURN); nuclear@0: returnData.ResetReadPointer(); nuclear@0: out.AlignWriteToByteBoundary(); nuclear@0: out.Write(returnData); nuclear@0: nuclear@0: SendParameters sp(pPayload->pConnection, out.GetData(), out.GetNumberOfBytesUsed()); nuclear@0: pSession->Send(&sp); nuclear@0: } nuclear@0: else if (pPayload->pData[1]==ID_RPC4_SIGNAL) nuclear@0: { nuclear@0: OVR::String sharedIdentifier; nuclear@0: bsIn.Read(sharedIdentifier); nuclear@0: nuclear@0: Observer *o = slotHash.GetSubject(sharedIdentifier); nuclear@0: nuclear@0: if (o) nuclear@0: { nuclear@0: bsIn.AlignReadToByteBoundary(); nuclear@0: nuclear@0: if (o) nuclear@0: { nuclear@0: OVR::Net::BitStream serializedParameters(bsIn.GetData() + bsIn.GetReadOffset()/8, bsIn.GetNumberOfUnreadBits()/8, false); nuclear@0: nuclear@0: o->Call(&serializedParameters, pPayload); nuclear@0: } nuclear@0: } nuclear@0: } nuclear@0: } nuclear@0: } nuclear@0: nuclear@0: void RPC1::OnDisconnected(Connection* conn) nuclear@0: { nuclear@0: if (blockingOnThisConnection == conn) nuclear@0: { nuclear@0: blockingOnThisConnection = 0; nuclear@0: callBlockingWait.NotifyAll(); nuclear@0: } nuclear@0: } nuclear@0: nuclear@0: void RPC1::OnConnected(Connection* conn) nuclear@0: { nuclear@0: OVR_UNUSED(conn); nuclear@0: } nuclear@0: nuclear@0: nuclear@0: }}} // OVR::Net::Plugins