23 #include "common_priv.h"
36 : trancounter(0), shutdown(false), loglevel(
Logger::WARNING)
59 Variant msg(Variant::MapType);
67 const std::string &servname,
KernelBase *kernel) {
70 if (!kernel) {
throw std::invalid_argument(
"Must have non null Kernel."); }
71 Variant msg(Variant::MapType);
74 msg[
"hostname"] = hostname;
75 msg[
"servname"] = servname;
77 if (!reply.Get(
"success",
false).IsTrue()) {
78 throw std::invalid_argument(
"Cannot create two kernels with the same name");
80 Key_t key = reply[
"kernelinfo"][
"key"].AsNumber<
Key_t>();
81 kernels.insert(std::make_pair(key, kernel));
88 Variant msg(Variant::MapType);
92 if (!reply.Get(
"success",
false).IsTrue()) {
93 throw std::invalid_argument(
"No such kernel");
95 return reply[
"kernelinfo"][
"key"].AsNumber<
Key_t>();
101 Variant msg(Variant::MapType);
103 msg[
"key"] = kernelkey;
105 if (!reply.Get(
"success",
false).IsTrue()) {
106 throw std::invalid_argument(
"No such kernel");
108 return reply[
"kernelinfo"][
"name"].AsString();
114 Variant msg(Variant::MapType);
116 msg[
"key"] = kernelkey;
118 if (!reply.Get(
"success",
false).IsTrue()) {
119 throw std::invalid_argument(
"No such kernel");
121 Variant kernelinfo = reply[
"kernelinfo"];
122 hostname = kernelinfo[
"hostname"].AsString();
123 servname = kernelinfo[
"servname"].AsString();
128 Variant msg(Variant::MapType);
130 msg[
"key"] = kernelkey;
139 Variant msg(Variant::MapType);
141 msg[
"name"] = kernel;
143 if (reply.Get(
"success",
false).IsTrue()) {
144 Variant kernelinfo = reply[
"kernelinfo"];
145 if (kernelinfo.Get(
"live",
false).IsTrue()) {
146 return kernelinfo[
"key"].AsNumber<
Key_t>();
150 if (genwait->messages.empty()) {
151 genwait->cond.Wait(
lock);
154 Variant msg = genwait->messages.front();
155 genwait->messages.pop_front();
156 if (msg.Contains(
"kernelinfo")) {
157 Variant kernelinfo = msg[
"kernelinfo"];
158 if (kernelinfo[
"name"].AsString() == kernel && kernelinfo[
"live"].IsTrue()) {
159 return kernelinfo[
"key"].AsNumber<
Key_t>();
169 Variant msg(Variant::MapType);
170 msg[
"key"] = kernelkey;
196 Variant msg(Variant::MapType);
197 msg[
"msgtype"] =
"kernel";
199 msg[
"kernelkey"] = kernelkey;
201 nodeattr[
"kernelkey"] = kernelkey;
203 nodeattr[
"name"] = attr.
GetName();
205 const std::map<std::string, std::string> ¶ms = attr.
GetParams();
206 for (std::map<std::string, std::string>::const_iterator i = params.begin(),
207 e = params.end(); i != e; ++i) {
208 nodeattr[
"param"][i->first] = i->second;
210 nodeattr[
"key"] = attr.
GetKey();
211 msg[
"nodeattr"] = nodeattr;
216 NodeAttr attr(msg[
"name"].AsString(), msg[
"nodetype"].AsString());
217 attr.
SetKernel(msg[
"kernel"].AsString());
218 attr.
SetKey(msg[
"key"].AsNumber<Key_t>());
219 if (msg.Contains(
"param")) {
220 for (Variant::ConstMapIterator i = msg[
"param"].MapBegin(), e = msg[
"param"].MapEnd();
223 attr.
SetParam(i->first, i->second.AsString());
230 Variant msg(Variant::MapType);
231 msg[
"msgtype"] =
"kernel";
232 msg[
"type"] = msgtype;
233 msg[
"kernelkey"] = kernelkey;
235 queueattr[
"queuehint"] = attr.
GetHint();
237 queueattr[
"queueLength"] = attr.
GetLength();
244 queueattr[
"alpha"] = attr.
GetAlpha();
246 msg[
"queueattr"] = queueattr;
252 attr.
SetHint(msg[
"queuehint"].AsString());
254 attr.
SetLength(msg[
"queueLength"].AsUnsigned());
261 attr.
SetAlpha(msg[
"alpha"].AsDouble());
269 Variant msg(Variant::MapType);
271 msg[
"kernelkey"] = kernelkey;
272 msg[
"name"] = nodename;
274 if (!reply.Get(
"success",
false).IsTrue()) {
275 throw std::invalid_argument(
"Node " + nodename +
" already exists.");
277 return reply[
"nodeinfo"][
"key"].AsNumber<
Key_t>();
283 Variant msg(Variant::MapType);
285 msg[
"name"] = nodename;
287 if (!reply.Get(
"success",
false).IsTrue()) {
288 throw std::invalid_argument(
"No such node");
290 return reply[
"nodeinfo"][
"key"].AsNumber<
Key_t>();
296 Variant msg(Variant::MapType);
298 msg[
"key"] = nodekey;
300 if (!reply.Get(
"success",
false).IsTrue()) {
301 throw std::invalid_argument(
"No such node");
303 return reply[
"nodeinfo"][
"name"].AsString();
309 Variant msg(Variant::MapType);
311 msg[
"key"] = nodekey;
317 Variant msg(Variant::MapType);
319 msg[
"key"] = nodekey;
327 Variant msg(Variant::MapType);
329 msg[
"name"] = nodename;
332 if (reply.Get(
"success",
false).IsTrue()) {
333 Variant nodeinfo = reply[
"nodeinfo"];
334 if (nodeinfo[
"started"].IsTrue()) {
335 return nodeinfo[
"key"].AsNumber<
Key_t>();
339 if (genwait->messages.empty()) {
340 genwait->cond.Wait(
lock);
343 Variant msg = genwait->messages.front();
344 genwait->messages.pop_front();
345 if (msg.Contains(
"nodeinfo")) {
346 Variant nodeinfo = msg[
"nodeinfo"];
347 if (nodeinfo[
"started"].IsTrue() && nodeinfo[
"name"].AsString() == nodename) {
348 return nodeinfo[
"key"].AsNumber<
Key_t>();
360 Variant msg(Variant::MapType);
362 msg[
"name"] = nodename;
369 if (reply.Get(
"success",
false).IsTrue()) {
370 if (reply[
"nodeinfo"][
"dead"].IsTrue()) {
375 if (genwait->messages.empty()) {
376 genwait->cond.Wait(
lock);
379 Variant msg = genwait->messages.front();
380 genwait->messages.pop_front();
381 if (msg.Contains(
"nodeinfo")) {
382 Variant nodeinfo = msg[
"nodeinfo"];
383 if (nodeinfo[
"dead"].IsTrue() && nodeinfo[
"name"].AsString() == nodename) {
395 Variant msg(Variant::MapType);
403 ASSERT(reply.Get(
"success",
false).IsTrue(),
"msg: %s", libvariant::SerializeJSON(reply).c_str());
404 if (reply[
"numlivenodes"].AsUnsigned() == 0) {
408 if (genwait->messages.empty()) {
409 genwait->cond.Wait(
lock);
412 Variant msg = genwait->messages.front();
413 genwait->messages.pop_front();
414 if (msg.Get(
"numlivenodes", -1).AsUnsigned() == 0) {
424 Variant msg(Variant::MapType);
426 msg[
"key"] = nodekey;
428 if (!reply.Get(
"success",
false).IsTrue()) {
429 throw std::invalid_argument(
"No such node");
431 return reply[
"nodeinfo"][
"kernelkey"].AsNumber<
Key_t>();
443 return info[
"nodekey"].AsNumber<
Key_t>();
449 return info[
"kernelkey"].AsNumber<
Key_t>();
455 return info[
"name"].AsString();
466 return info[
"nodekey"].AsNumber<
Key_t>();
472 return info[
"kernelkey"].AsNumber<
Key_t>();
478 return info[
"name"].AsString();
484 Variant msg(Variant::MapType);
486 msg[
"readerkey"] = readerkey;
487 msg[
"writerkey"] = writerkey;
488 msg[
"qname"] = qname;
495 return info[
"writerkey"].AsNumber<
Key_t>();
501 return info[
"readerkey"].AsNumber<
Key_t>();
508 Variant msg(Variant::MapType);
520 KernelMap::iterator itr, end;
521 itr = mapcopy.begin();
524 itr->second->NotifyTerminate();
535 cwitr->second->cond.Signal();
538 WaiterList::iterator gwitr;
540 while (gwitr !=
waiters.end()) {
581 if (!msg.IsMap()) {
return; }
590 std::string msgtype = msg[
"msgtype"].AsString();
591 if (msgtype ==
"reply") {
592 unsigned tranid = msg[
"msgid"].AsUnsigned();
593 WaiterMap::iterator entry;
596 entry->second->msg = msg;
597 entry->second->signaled =
true;
598 entry->second->cond.Signal();
600 }
else if (msgtype ==
"broadcast") {
601 WaiterList::iterator entry;
603 while (entry !=
waiters.end()) {
606 waiter->messages.push_back(msg);
607 waiter->cond.Signal();
613 }
else if (msgtype ==
"kernel") {
614 Key_t kernelkey = msg[
"kernelkey"].AsNumber<
Key_t>();
615 KernelMap::iterator entry =
kernels.find(kernelkey);
621 switch (msg[
"type"].AsNumber<RCTXMT_t>()) {
635 ASSERT(
false,
"Unknown kernel message type");
638 ASSERT(
false,
"Message for kernel %llu but said kernel doesn't exist!", kernelkey);
647 Variant msg(Variant::MapType);
648 msg[
"type"] = msgtype;
649 msg[
"nodekey"] = nodekey;
650 msg[
"name"] = portname;
652 if (!reply.Get(
"success",
false).IsTrue()) {
653 throw std::invalid_argument(
"No such port");
655 return reply[
"endpointinfo"][
"key"].AsNumber<
Key_t>();
660 Variant msg(Variant::MapType);
661 msg[
"type"] = msgtype;
662 msg[
"key"] = portkey;
664 ASSERT(reply.Get(
"success",
false).IsTrue(),
"msg: %s", libvariant::SerializeJSON(reply).c_str());
665 return reply[
"endpointinfo"];
Logger object that is used for forwarding log messages.
virtual void WaitForNodeEnd(const std::string &nodename)
Waits for the given node to signal end.
virtual ~RemoteContextClient()
unsigned GetLength() const
PthreadCondition & Wait(PthreadMutex &mutex)
virtual int LogLevel() const
void SendQueueMsg(CPN::Key_t kernelkey, RCTXMT_t msgtype, const CPN::SimpleQueueAttr &attr)
SimpleQueueAttr & SetHint(std::string hint)
virtual CPN::Key_t GetReaderKernel(CPN::Key_t portkey)
RCTXMT_t
RCTXMT Remote Context Message Type These are the message types that the remote context uses to send i...
unsigned GetNumChannels() const
Variant GetEndpointInfo(RCTXMT_t msgtype, CPN::Key_t portkey)
Key_t GetWriterKey() const
SimpleQueueAttr & SetReaderNodeKey(Key_t k)
SimpleQueueAttr & SetMaxWriteThreshold(unsigned mwt)
Variant RemoteCall(Variant msg)
virtual CPN::Key_t GetNodeKernel(CPN::Key_t nodekey)
virtual std::string GetWriterName(CPN::Key_t portkey)
virtual void GetKernelConnectionInfo(CPN::Key_t kernelkey, std::string &hostname, std::string &servname)
obtain the connection information for the given kernel
NodeAttr & SetKernel(const std::string &kname)
virtual std::string GetKernelName(CPN::Key_t kernelkey)
virtual CPN::Key_t GetReadersWriter(CPN::Key_t readerkey)
CPN::Key_t GetCreateEndpointKey(RCTXMT_t msgtype, CPN::Key_t nodekey, const std::string &portname)
virtual CPN::Key_t GetCreateWriterKey(CPN::Key_t nodekey, const std::string &portname)
void InternalCheckTerminated()
void AddWaiter(WaiterInfo *info)
void DispatchMessage(const Variant &msg)
virtual void Terminate()
Signal to the Context that the network is terminating. After this call most methods will throw a Shut...
GenericWaiterPtr NewGenericWaiter()
virtual void SignalKernelEnd(CPN::Key_t kernelkey)
Signal to the Context that the given kernel is dead.
virtual CPN::Key_t SetupKernel(const std::string &name, const std::string &hostname, const std::string &servname, CPN::KernelBase *kernel)
Called by the Kernel when it has successfully set it self up. This gives the Context a way to notify ...
NodeAttr & SetKey(Key_t key_)
virtual CPN::Key_t CreateNodeKey(CPN::Key_t kernelkey, const std::string &nodename)
Tell the context to allocate a new node key and data structure for a node with nodename which is on k...
virtual void SignalNodeStart(CPN::Key_t nodekey)
Called by the node startup routine to indicate that the node has started.
virtual CPN::Key_t WaitForKernelStart(const std::string &kernel)
Does not return until the given kernel has started.
virtual CPN::Key_t GetKernelKey(const std::string &kernel)
SimpleQueueAttr & SetNumChannels(unsigned numchans)
virtual void SendCreateNode(CPN::Key_t kernelkey, const CPN::NodeAttr &attr)
Tell a given kernel that it needs to create a node.
SimpleQueueAttr & SetLength(unsigned length)
const std::map< std::string, std::string > & GetParams() const
The exceptions specified for the CPN network.
SimpleQueueAttr & SetAlpha(double a)
virtual std::string GetReaderName(CPN::Key_t portkey)
virtual CPN::Key_t GetWritersReader(CPN::Key_t writerkey)
virtual bool IsTerminated()
virtual CPN::Key_t WaitForNodeStart(const std::string &nodename)
Waits until the node starts and returns the key, if the node is already started returns the key...
virtual void SendCreateWriter(CPN::Key_t kernelkey, const CPN::SimpleQueueAttr &attr)
Tell a given kernel that it needs to create a queue write end.
virtual void RemoteCreateNode(NodeAttr attr)
virtual bool RequireRemote()
Lets the kernel know that this context type requires remote activity. This overrides the kernel optio...
virtual void RemoteCreateWriter(SimpleQueueAttr attr)
NodeAttr & SetParam(const std::string &key, const std::string value)
unsigned GetMaxThreshold() const
virtual void Log(int level, const std::string &logmsg)
Log a message to this outputer.
std::map< CPN::Key_t, CPN::KernelBase * > KernelMap
PthreadFunctional * CreatePthreadFunctional(T *obj, void *(T::*meth)(void))
SimpleQueueAttr MsgToQueueAttr(const Variant &msg)
virtual void ConnectEndpoints(CPN::Key_t writerkey, CPN::Key_t readerkey, const std::string &qname)
Called by the kernel when a queue is created. Note that the endpoints may have been created when the ...
virtual CPN::Key_t GetCreateReaderKey(CPN::Key_t nodekey, const std::string &portname)
Get the key associated with the given endpoint for the given node. Creates the information if it does...
Key_t GetReaderKey() const
virtual void SendCreateReader(CPN::Key_t kernelkey, const CPN::SimpleQueueAttr &attr)
Tell a given kernel that it needs to create a queue read end.
virtual void SendMessage(const Variant &msg)=0
virtual void SendCreateQueue(CPN::Key_t kernelkey, const CPN::SimpleQueueAttr &attr)
Tell a given kernel that it needs to create a queue.
virtual void SignalNodeEnd(CPN::Key_t nodekey)
Called by the node cleanup routine to indicate that the node has ended.
An exception indicating that the Kernel has shut down.
const std::string & GetName() const
SimpleQueueAttr & SetWriterNodeKey(Key_t k)
Key_t GetWriterNodeKey() const
virtual CPN::Key_t GetWriterKernel(CPN::Key_t portkey)
virtual std::string GetNodeName(CPN::Key_t nodekey)
auto_ptr< Pthread > terminateThread
virtual void RemoteCreateQueue(SimpleQueueAttr attr)
virtual void SignalKernelStart(CPN::Key_t kernelkey)
Signal to the context that the given kernel has started.
const std::string & GetDatatype() const
shared_ptr< GenericWaiter > GenericWaiterPtr
const std::string & GetTypeName() const
SimpleQueueAttr & SetWriterKey(Key_t k)
unsigned GetMaxWriteThreshold() const
SimpleQueueAttr & SetDatatype(const std::string &type)
Key_t GetReaderNodeKey() const
std::string GetHint() const
NodeAttr MsgToNodeAttr(const Variant &msg)
SimpleQueueAttr & SetMaxThreshold(unsigned maxthresh)
virtual CPN::Key_t GetReaderNode(CPN::Key_t portkey)
This is a simplified internal representation of the queue attributes needed to create a queue...
virtual void RemoteCreateReader(SimpleQueueAttr attr)
virtual CPN::Key_t GetWriterNode(CPN::Key_t portkey)
const std::string & GetKernel() const
virtual CPN::Key_t GetNodeKey(const std::string &nodename)
SimpleQueueAttr & SetReaderKey(Key_t k)
virtual void WaitForAllNodeEnd()
Convenience method which waits until there are no nodes running. If no node have started then this wi...