CPN
Computational Process Networks
RemoteContextClient.cc
Go to the documentation of this file.
1 //=============================================================================
2 // Computational Process Networks class library
3 // Copyright (C) 1997-2006 Gregory E. Allen and The University of Texas
4 //
5 // This library is free software; you can redistribute it and/or modify it
6 // under the terms of the GNU Library General Public License as published
7 // by the Free Software Foundation; either version 2 of the License, or
8 // (at your option) any later version.
9 //
10 // This library is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 // Library General Public License for more details.
14 //
15 // The GNU Public License is available in the file LICENSE, or you
16 // can write to the Free Software Foundation, Inc., 59 Temple Place -
17 // Suite 330, Boston, MA 02111-1307, USA, or you can find it on the
18 // World Wide Web at http://www.fsf.org.
19 //=============================================================================
23 #include "common_priv.h"
25 #include <cpn/bits/KernelBase.h>
26 #include <cpn/Exceptions.h>
28 #include <cpn/utils/AutoUnlock.h>
31 #include <stdexcept>
32 
33 namespace CPN {
34 
36  : trancounter(0), shutdown(false), loglevel(Logger::WARNING)
37  {
38  }
39 
41  if (terminateThread.get()) {
42  terminateThread->Join();
43  }
44  }
45 
48  return loglevel;
49  }
50 
53  return loglevel = level;
54  }
55 
56  void RemoteContextClient::Log(int level, const std::string &logmsg) {
58  if (level >= loglevel) {
59  Variant msg(Variant::MapType);
60  msg["type"] = RCTXMT_LOG;
61  msg["msg"] = logmsg;
62  SendMessage(msg);
63  }
64  }
65 
66  Key_t RemoteContextClient::SetupKernel(const std::string &name, const std::string &hostname,
67  const std::string &servname, KernelBase *kernel) {
70  if (!kernel) { throw std::invalid_argument("Must have non null Kernel."); }
71  Variant msg(Variant::MapType);
72  msg["type"] = RCTXMT_SETUP_KERNEL;
73  msg["name"] = name;
74  msg["hostname"] = hostname;
75  msg["servname"] = servname;
76  Variant reply = RemoteCall(msg);
77  if (!reply.Get("success", false).IsTrue()) {
78  throw std::invalid_argument("Cannot create two kernels with the same name");
79  }
80  Key_t key = reply["kernelinfo"]["key"].AsNumber<Key_t>();
81  kernels.insert(std::make_pair(key, kernel));
82  return key;
83  }
84 
85  Key_t RemoteContextClient::GetKernelKey(const std::string &kernel) {
88  Variant msg(Variant::MapType);
89  msg["type"] = RCTXMT_GET_KERNEL_INFO;
90  msg["name"] = kernel;
91  Variant reply = RemoteCall(msg);
92  if (!reply.Get("success", false).IsTrue()) {
93  throw std::invalid_argument("No such kernel");
94  }
95  return reply["kernelinfo"]["key"].AsNumber<Key_t>();
96  }
97 
98  std::string RemoteContextClient::GetKernelName(Key_t kernelkey) {
101  Variant msg(Variant::MapType);
102  msg["type"] = RCTXMT_GET_KERNEL_INFO;
103  msg["key"] = kernelkey;
104  Variant reply = RemoteCall(msg);
105  if (!reply.Get("success", false).IsTrue()) {
106  throw std::invalid_argument("No such kernel");
107  }
108  return reply["kernelinfo"]["name"].AsString();
109  }
110 
111  void RemoteContextClient::GetKernelConnectionInfo(Key_t kernelkey, std::string &hostname, std::string &servname) {
114  Variant msg(Variant::MapType);
115  msg["type"] = RCTXMT_GET_KERNEL_INFO;
116  msg["key"] = kernelkey;
117  Variant reply = RemoteCall(msg);
118  if (!reply.Get("success", false).IsTrue()) {
119  throw std::invalid_argument("No such kernel");
120  }
121  Variant kernelinfo = reply["kernelinfo"];
122  hostname = kernelinfo["hostname"].AsString();
123  servname = kernelinfo["servname"].AsString();
124  }
125 
128  Variant msg(Variant::MapType);
129  msg["type"] = RCTXMT_SIGNAL_KERNEL_END;
130  msg["key"] = kernelkey;
131  SendMessage(msg);
132  kernels.erase(kernelkey);
133  }
134 
135  Key_t RemoteContextClient::WaitForKernelStart(const std::string &kernel) {
139  Variant msg(Variant::MapType);
140  msg["type"] = RCTXMT_GET_KERNEL_INFO;
141  msg["name"] = kernel;
142  Variant reply = RemoteCall(msg);
143  if (reply.Get("success", false).IsTrue()) {
144  Variant kernelinfo = reply["kernelinfo"];
145  if (kernelinfo.Get("live", false).IsTrue()) {
146  return kernelinfo["key"].AsNumber<Key_t>();
147  }
148  }
149  while (true) {
150  if (genwait->messages.empty()) {
151  genwait->cond.Wait(lock);
153  } else {
154  Variant msg = genwait->messages.front();
155  genwait->messages.pop_front();
156  if (msg.Contains("kernelinfo")) {
157  Variant kernelinfo = msg["kernelinfo"];
158  if (kernelinfo["name"].AsString() == kernel && kernelinfo["live"].IsTrue()) {
159  return kernelinfo["key"].AsNumber<Key_t>();
160  }
161  }
162  }
163  }
164  }
165 
169  Variant msg(Variant::MapType);
170  msg["key"] = kernelkey;
171  msg["type"] = RCTXMT_SIGNAL_KERNEL_START;
172  SendMessage(msg);
173  }
174 
178  SendQueueMsg(kernelkey, RCTXMT_CREATE_WRITER, attr);
179  }
180 
184  SendQueueMsg(kernelkey, RCTXMT_CREATE_READER, attr);
185  }
186 
190  SendQueueMsg(kernelkey, RCTXMT_CREATE_QUEUE, attr);
191  }
192 
193  void RemoteContextClient::SendCreateNode(Key_t kernelkey, const NodeAttr &attr) {
196  Variant msg(Variant::MapType);
197  msg["msgtype"] = "kernel";
198  msg["type"] = RCTXMT_CREATE_NODE;
199  msg["kernelkey"] = kernelkey;
200  Variant nodeattr;
201  nodeattr["kernelkey"] = kernelkey;
202  nodeattr["kernel"] = attr.GetKernel();
203  nodeattr["name"] = attr.GetName();
204  nodeattr["nodetype"] = attr.GetTypeName();
205  const std::map<std::string, std::string> &params = attr.GetParams();
206  for (std::map<std::string, std::string>::const_iterator i = params.begin(),
207  e = params.end(); i != e; ++i) {
208  nodeattr["param"][i->first] = i->second;
209  }
210  nodeattr["key"] = attr.GetKey();
211  msg["nodeattr"] = nodeattr;
212  SendMessage(msg);
213  }
214 
215  NodeAttr MsgToNodeAttr(const Variant &msg) {
216  NodeAttr attr(msg["name"].AsString(), msg["nodetype"].AsString());
217  attr.SetKernel(msg["kernel"].AsString());
218  attr.SetKey(msg["key"].AsNumber<Key_t>());
219  if (msg.Contains("param")) {
220  for (Variant::ConstMapIterator i = msg["param"].MapBegin(), e = msg["param"].MapEnd();
221  i != e; ++i)
222  {
223  attr.SetParam(i->first, i->second.AsString());
224  }
225  }
226  return attr;
227  }
228 
229  void RemoteContextClient::SendQueueMsg(Key_t kernelkey, RCTXMT_t msgtype, const SimpleQueueAttr &attr) {
230  Variant msg(Variant::MapType);
231  msg["msgtype"] = "kernel";
232  msg["type"] = msgtype;
233  msg["kernelkey"] = kernelkey;
234  Variant queueattr;
235  queueattr["queuehint"] = attr.GetHint();
236  queueattr["datatype"] = attr.GetDatatype();
237  queueattr["queueLength"] = attr.GetLength();
238  queueattr["maxThreshold"] = attr.GetMaxThreshold();
239  queueattr["numChannels"] = attr.GetNumChannels();
240  queueattr["readerkey"] = attr.GetReaderKey();
241  queueattr["writerkey"] = attr.GetWriterKey();
242  queueattr["readernodekey"] = attr.GetReaderNodeKey();
243  queueattr["writernodekey"] = attr.GetWriterNodeKey();
244  queueattr["alpha"] = attr.GetAlpha();
245  queueattr["maxwritethreshold"] = attr.GetMaxWriteThreshold();
246  msg["queueattr"] = queueattr;
247  SendMessage(msg);
248  }
249 
250  SimpleQueueAttr MsgToQueueAttr(const Variant &msg) {
251  SimpleQueueAttr attr;
252  attr.SetHint(msg["queuehint"].AsString());
253  attr.SetDatatype(msg["datatype"].AsString());
254  attr.SetLength(msg["queueLength"].AsUnsigned());
255  attr.SetMaxThreshold(msg["maxThreshold"].AsUnsigned());
256  attr.SetNumChannels(msg["numChannels"].AsUnsigned());
257  attr.SetReaderKey(msg["readerkey"].AsNumber<Key_t>());
258  attr.SetWriterKey(msg["writerkey"].AsNumber<Key_t>());
259  attr.SetReaderNodeKey(msg["readernodekey"].AsNumber<Key_t>());
260  attr.SetWriterNodeKey(msg["writernodekey"].AsNumber<Key_t>());
261  attr.SetAlpha(msg["alpha"].AsDouble());
262  attr.SetMaxWriteThreshold(msg["maxwritethreshold"].AsUnsigned());
263  return attr;
264  }
265 
266  Key_t RemoteContextClient::CreateNodeKey(Key_t kernelkey, const std::string &nodename) {
269  Variant msg(Variant::MapType);
270  msg["type"] = RCTXMT_CREATE_NODE_KEY;
271  msg["kernelkey"] = kernelkey;
272  msg["name"] = nodename;
273  Variant reply = RemoteCall(msg);
274  if (!reply.Get("success", false).IsTrue()) {
275  throw std::invalid_argument("Node " + nodename + " already exists.");
276  }
277  return reply["nodeinfo"]["key"].AsNumber<Key_t>();
278  }
279 
280  Key_t RemoteContextClient::GetNodeKey(const std::string &nodename) {
283  Variant msg(Variant::MapType);
284  msg["type"] = RCTXMT_GET_NODE_INFO;
285  msg["name"] = nodename;
286  Variant reply = RemoteCall(msg);
287  if (!reply.Get("success", false).IsTrue()) {
288  throw std::invalid_argument("No such node");
289  }
290  return reply["nodeinfo"]["key"].AsNumber<Key_t>();
291  }
292 
296  Variant msg(Variant::MapType);
297  msg["type"] = RCTXMT_GET_NODE_INFO;
298  msg["key"] = nodekey;
299  Variant reply = RemoteCall(msg);
300  if (!reply.Get("success", false).IsTrue()) {
301  throw std::invalid_argument("No such node");
302  }
303  return reply["nodeinfo"]["name"].AsString();
304  }
305 
309  Variant msg(Variant::MapType);
310  msg["type"] = RCTXMT_SIGNAL_NODE_START;
311  msg["key"] = nodekey;
312  SendMessage(msg);
313  }
314 
317  Variant msg(Variant::MapType);
318  msg["type"] = RCTXMT_SIGNAL_NODE_END;
319  msg["key"] = nodekey;
320  SendMessage(msg);
321  }
322 
323  Key_t RemoteContextClient::WaitForNodeStart(const std::string &nodename) {
327  Variant msg(Variant::MapType);
328  msg["type"] = RCTXMT_GET_NODE_INFO;
329  msg["name"] = nodename;
330  Variant reply = RemoteCall(msg);
331 
332  if (reply.Get("success", false).IsTrue()) {
333  Variant nodeinfo = reply["nodeinfo"];
334  if (nodeinfo["started"].IsTrue()) {
335  return nodeinfo["key"].AsNumber<Key_t>();
336  }
337  }
338  while (true) {
339  if (genwait->messages.empty()) {
340  genwait->cond.Wait(lock);
342  } else {
343  Variant msg = genwait->messages.front();
344  genwait->messages.pop_front();
345  if (msg.Contains("nodeinfo")) {
346  Variant nodeinfo = msg["nodeinfo"];
347  if (nodeinfo["started"].IsTrue() && nodeinfo["name"].AsString() == nodename) {
348  return nodeinfo["key"].AsNumber<Key_t>();
349  }
350  }
351  }
352  }
353  }
354 
355  void RemoteContextClient::WaitForNodeEnd(const std::string &nodename) {
357  if (shutdown) { return; }
359 
360  Variant msg(Variant::MapType);
361  msg["type"] = RCTXMT_GET_NODE_INFO;
362  msg["name"] = nodename;
363  Variant reply;
364  try {
365  reply = RemoteCall(msg);
366  } catch (const ShutdownException &e) {
367  return;
368  }
369  if (reply.Get("success", false).IsTrue()) {
370  if (reply["nodeinfo"]["dead"].IsTrue()) {
371  return;
372  }
373  }
374  while (true) {
375  if (genwait->messages.empty()) {
376  genwait->cond.Wait(lock);
377  if (shutdown) { return; }
378  } else {
379  Variant msg = genwait->messages.front();
380  genwait->messages.pop_front();
381  if (msg.Contains("nodeinfo")) {
382  Variant nodeinfo = msg["nodeinfo"];
383  if (nodeinfo["dead"].IsTrue() && nodeinfo["name"].AsString() == nodename) {
384  return;
385  }
386  }
387  }
388  }
389  }
390 
393  if (shutdown) { return; }
395  Variant msg(Variant::MapType);
396  msg["type"] = RCTXMT_GET_NUM_NODE_LIVE;
397  Variant reply;
398  try {
399  reply = RemoteCall(msg);
400  } catch (const ShutdownException &e) {
401  return;
402  }
403  ASSERT(reply.Get("success", false).IsTrue(), "msg: %s", libvariant::SerializeJSON(reply).c_str());
404  if (reply["numlivenodes"].AsUnsigned() == 0) {
405  return;
406  }
407  while (true) {
408  if (genwait->messages.empty()) {
409  genwait->cond.Wait(lock);
410  if (shutdown) { return; }
411  } else {
412  Variant msg = genwait->messages.front();
413  genwait->messages.pop_front();
414  if (msg.Get("numlivenodes", -1).AsUnsigned() == 0) {
415  return;
416  }
417  }
418  }
419  }
420 
424  Variant msg(Variant::MapType);
425  msg["type"] = RCTXMT_GET_NODE_INFO;
426  msg["key"] = nodekey;
427  Variant reply = RemoteCall(msg);
428  if (!reply.Get("success", false).IsTrue()) {
429  throw std::invalid_argument("No such node");
430  }
431  return reply["nodeinfo"]["kernelkey"].AsNumber<Key_t>();
432  }
433 
434 
435  Key_t RemoteContextClient::GetCreateReaderKey(Key_t nodekey, const std::string &portname) {
437  return GetCreateEndpointKey(RCTXMT_GET_CREATE_READER_KEY, nodekey, portname);
438  }
439 
442  Variant info = GetEndpointInfo(RCTXMT_GET_READER_INFO, portkey);
443  return info["nodekey"].AsNumber<Key_t>();
444  }
445 
448  Variant info = GetEndpointInfo(RCTXMT_GET_READER_INFO, portkey);
449  return info["kernelkey"].AsNumber<Key_t>();
450  }
451 
454  Variant info = GetEndpointInfo(RCTXMT_GET_READER_INFO, portkey);
455  return info["name"].AsString();
456  }
457 
458  Key_t RemoteContextClient::GetCreateWriterKey(Key_t nodekey, const std::string &portname) {
460  return GetCreateEndpointKey(RCTXMT_GET_CREATE_WRITER_KEY, nodekey, portname);
461  }
462 
465  Variant info = GetEndpointInfo(RCTXMT_GET_WRITER_INFO, portkey);
466  return info["nodekey"].AsNumber<Key_t>();
467  }
468 
471  Variant info = GetEndpointInfo(RCTXMT_GET_WRITER_INFO, portkey);
472  return info["kernelkey"].AsNumber<Key_t>();
473  }
474 
477  Variant info = GetEndpointInfo(RCTXMT_GET_WRITER_INFO, portkey);
478  return info["name"].AsString();
479  }
480 
481  void RemoteContextClient::ConnectEndpoints(Key_t writerkey, Key_t readerkey, const std::string &qname) {
484  Variant msg(Variant::MapType);
485  msg["type"] = RCTXMT_CONNECT_ENDPOINTS;
486  msg["readerkey"] = readerkey;
487  msg["writerkey"] = writerkey;
488  msg["qname"] = qname;
489  SendMessage(msg);
490  }
491 
494  Variant info = GetEndpointInfo(RCTXMT_GET_READER_INFO, readerkey);
495  return info["writerkey"].AsNumber<Key_t>();
496  }
497 
500  Variant info = GetEndpointInfo(RCTXMT_GET_WRITER_INFO, writerkey);
501  return info["readerkey"].AsNumber<Key_t>();
502  }
503 
506  if (!shutdown) {
508  Variant msg(Variant::MapType);
509  msg["type"] = RCTXMT_TERMINATE;
510  SendMessage(msg);
511  }
512  }
513 
515  KernelMap mapcopy;
516  {
518  mapcopy = kernels;
519  }
520  KernelMap::iterator itr, end;
521  itr = mapcopy.begin();
522  end = mapcopy.end();
523  while (itr != end) {
524  itr->second->NotifyTerminate();
525  ++itr;
526  }
527  return 0;
528  }
529 
531  if (shutdown) return;
532  shutdown = true;
533  WaiterMap::iterator cwitr = callwaiters.begin();
534  while (cwitr != callwaiters.end()) {
535  cwitr->second->cond.Signal();
536  ++cwitr;
537  }
538  WaiterList::iterator gwitr;
539  gwitr = waiters.begin();
540  while (gwitr != waiters.end()) {
541  GenericWaiterPtr ptr = gwitr->lock();
542  if (ptr) {
543  ptr->cond.Signal();
544  }
545  ++gwitr;
546  }
548  terminateThread->Start();
549  }
550 
553  return shutdown;
554  }
555 
557  return true;
558  }
559 
561  if (shutdown) {
562  throw ShutdownException();
563  }
564  }
565 
567  callwaiters.insert(std::make_pair(info->waiterid, info));
568  }
569 
572  waiters.push_back(gw);
573  return gw;
574  }
575 
577  return ++trancounter;
578  }
579 
580  void RemoteContextClient::DispatchMessage(const Variant &msg) {
581  if (!msg.IsMap()) { return; }
583  if (msg.Contains("type") && msg["type"].AsNumber<RCTXMT_t>() == RCTXMT_TERMINATE) {
585  return;
586  }
587  // Need to add for the general informative messages.
588  //
589  // The specific reply messages
590  std::string msgtype = msg["msgtype"].AsString();
591  if (msgtype == "reply") {
592  unsigned tranid = msg["msgid"].AsUnsigned();
593  WaiterMap::iterator entry;
594  entry = callwaiters.find(tranid);
595  ASSERT(entry != callwaiters.end());
596  entry->second->msg = msg;
597  entry->second->signaled = true;
598  entry->second->cond.Signal();
599  callwaiters.erase(entry);
600  } else if (msgtype == "broadcast") {
601  WaiterList::iterator entry;
602  entry = waiters.begin();
603  while (entry != waiters.end()) {
604  GenericWaiterPtr waiter = entry->lock();
605  if (waiter) {
606  waiter->messages.push_back(msg);
607  waiter->cond.Signal();
608  ++entry;
609  } else {
610  entry = waiters.erase(entry);
611  }
612  }
613  } else if (msgtype == "kernel") {
614  Key_t kernelkey = msg["kernelkey"].AsNumber<Key_t>();
615  KernelMap::iterator entry = kernels.find(kernelkey);
616  if (entry != kernels.end()) {
617  KernelBase *kernel = entry->second;
618  // Cannot call a kernel method with the lock.
619  plock.Unlock();
620  ASSERT(kernel);
621  switch (msg["type"].AsNumber<RCTXMT_t>()) {
622  case RCTXMT_CREATE_NODE:
623  kernel->RemoteCreateNode(MsgToNodeAttr(msg["nodeattr"]));
624  break;
625  case RCTXMT_CREATE_QUEUE:
626  kernel->RemoteCreateQueue(MsgToQueueAttr(msg["queueattr"]));
627  break;
629  kernel->RemoteCreateWriter(MsgToQueueAttr(msg["queueattr"]));
630  break;
632  kernel->RemoteCreateReader(MsgToQueueAttr(msg["queueattr"]));
633  break;
634  default:
635  ASSERT(false, "Unknown kernel message type");
636  }
637  } else {
638  ASSERT(false, "Message for kernel %llu but said kernel doesn't exist!", kernelkey);
639  }
640  } else {
641  ASSERT(false);
642  }
643  }
644 
645  Key_t RemoteContextClient::GetCreateEndpointKey(RCTXMT_t msgtype, Key_t nodekey, const std::string &portname) {
647  Variant msg(Variant::MapType);
648  msg["type"] = msgtype;
649  msg["nodekey"] = nodekey;
650  msg["name"] = portname;
651  Variant reply = RemoteCall(msg);
652  if (!reply.Get("success", false).IsTrue()) {
653  throw std::invalid_argument("No such port");
654  }
655  return reply["endpointinfo"]["key"].AsNumber<Key_t>();
656  }
657 
660  Variant msg(Variant::MapType);
661  msg["type"] = msgtype;
662  msg["key"] = portkey;
663  Variant reply = RemoteCall(msg);
664  ASSERT(reply.Get("success", false).IsTrue(), "msg: %s", libvariant::SerializeJSON(reply).c_str());
665  return reply["endpointinfo"];
666  }
667 
668  Variant RemoteContextClient::RemoteCall(Variant msg) {
669  WaiterInfo winfo(NewTranID());
670  AddWaiter(&winfo);
671  msg["msgid"] = winfo.waiterid;
672  SendMessage(msg);
673  while (!winfo.signaled) {
674  winfo.cond.Wait(lock);
676  }
677  return winfo.msg;
678  }
679 }
Logger object that is used for forwarding log messages.
Definition: Logger.h:57
virtual void WaitForNodeEnd(const std::string &nodename)
Waits for the given node to signal end.
Key_t GetKey() const
Definition: NodeAttr.h:116
unsigned GetLength() const
Definition: QueueAttr.h:325
PthreadCondition & Wait(PthreadMutex &mutex)
virtual int LogLevel() const
void SendQueueMsg(CPN::Key_t kernelkey, RCTXMT_t msgtype, const CPN::SimpleQueueAttr &attr)
SimpleQueueAttr & SetHint(std::string hint)
Definition: QueueAttr.h:269
virtual CPN::Key_t GetReaderKernel(CPN::Key_t portkey)
Attributes for a node.
Definition: NodeAttr.h:58
RCTXMT_t
RCTXMT Remote Context Message Type These are the message types that the remote context uses to send i...
Definition: RCTXMT.h:33
unsigned GetNumChannels() const
Definition: QueueAttr.h:327
Variant GetEndpointInfo(RCTXMT_t msgtype, CPN::Key_t portkey)
Key_t GetWriterKey() const
Definition: QueueAttr.h:323
SimpleQueueAttr & SetReaderNodeKey(Key_t k)
Definition: QueueAttr.h:300
SimpleQueueAttr & SetMaxWriteThreshold(unsigned mwt)
Definition: QueueAttr.h:264
Variant RemoteCall(Variant msg)
virtual CPN::Key_t GetNodeKernel(CPN::Key_t nodekey)
virtual std::string GetWriterName(CPN::Key_t portkey)
virtual void GetKernelConnectionInfo(CPN::Key_t kernelkey, std::string &hostname, std::string &servname)
obtain the connection information for the given kernel
NodeAttr & SetKernel(const std::string &kname)
Definition: NodeAttr.h:79
virtual std::string GetKernelName(CPN::Key_t kernelkey)
virtual CPN::Key_t GetReadersWriter(CPN::Key_t readerkey)
CPN::Key_t GetCreateEndpointKey(RCTXMT_t msgtype, CPN::Key_t nodekey, const std::string &portname)
virtual CPN::Key_t GetCreateWriterKey(CPN::Key_t nodekey, const std::string &portname)
void AddWaiter(WaiterInfo *info)
void DispatchMessage(const Variant &msg)
virtual void Terminate()
Signal to the Context that the network is terminating. After this call most methods will throw a Shut...
GenericWaiterPtr NewGenericWaiter()
virtual void SignalKernelEnd(CPN::Key_t kernelkey)
Signal to the Context that the given kernel is dead.
virtual CPN::Key_t SetupKernel(const std::string &name, const std::string &hostname, const std::string &servname, CPN::KernelBase *kernel)
Called by the Kernel when it has successfully set it self up. This gives the Context a way to notify ...
NodeAttr & SetKey(Key_t key_)
Definition: NodeAttr.h:102
virtual CPN::Key_t CreateNodeKey(CPN::Key_t kernelkey, const std::string &nodename)
Tell the context to allocate a new node key and data structure for a node with nodename which is on k...
virtual void SignalNodeStart(CPN::Key_t nodekey)
Called by the node startup routine to indicate that the node has started.
virtual CPN::Key_t WaitForKernelStart(const std::string &kernel)
Does not return until the given kernel has started.
virtual CPN::Key_t GetKernelKey(const std::string &kernel)
SimpleQueueAttr & SetNumChannels(unsigned numchans)
Definition: QueueAttr.h:295
void Unlock()
Definition: AutoLock.h:61
uint64_t Key_t
Definition: common.h:79
virtual void SendCreateNode(CPN::Key_t kernelkey, const CPN::NodeAttr &attr)
Tell a given kernel that it needs to create a node.
SimpleQueueAttr & SetLength(unsigned length)
Definition: QueueAttr.h:285
const std::map< std::string, std::string > & GetParams() const
Definition: NodeAttr.h:114
The exceptions specified for the CPN network.
SimpleQueueAttr & SetAlpha(double a)
Definition: QueueAttr.h:259
virtual std::string GetReaderName(CPN::Key_t portkey)
virtual CPN::Key_t GetWritersReader(CPN::Key_t writerkey)
virtual CPN::Key_t WaitForNodeStart(const std::string &nodename)
Waits until the node starts and returns the key, if the node is already started returns the key...
virtual void SendCreateWriter(CPN::Key_t kernelkey, const CPN::SimpleQueueAttr &attr)
Tell a given kernel that it needs to create a queue write end.
virtual void RemoteCreateNode(NodeAttr attr)
Definition: KernelBase.cc:39
virtual bool RequireRemote()
Lets the kernel know that this context type requires remote activity. This overrides the kernel optio...
virtual void RemoteCreateWriter(SimpleQueueAttr attr)
Definition: KernelBase.cc:30
double GetAlpha() const
Definition: QueueAttr.h:330
NodeAttr & SetParam(const std::string &key, const std::string value)
Definition: NodeAttr.h:89
unsigned GetMaxThreshold() const
Definition: QueueAttr.h:326
virtual void Log(int level, const std::string &logmsg)
Log a message to this outputer.
std::map< CPN::Key_t, CPN::KernelBase * > KernelMap
PthreadFunctional * CreatePthreadFunctional(T *obj, void *(T::*meth)(void))
SimpleQueueAttr MsgToQueueAttr(const Variant &msg)
virtual void ConnectEndpoints(CPN::Key_t writerkey, CPN::Key_t readerkey, const std::string &qname)
Called by the kernel when a queue is created. Note that the endpoints may have been created when the ...
virtual CPN::Key_t GetCreateReaderKey(CPN::Key_t nodekey, const std::string &portname)
Get the key associated with the given endpoint for the given node. Creates the information if it does...
Key_t GetReaderKey() const
Definition: QueueAttr.h:324
virtual void SendCreateReader(CPN::Key_t kernelkey, const CPN::SimpleQueueAttr &attr)
Tell a given kernel that it needs to create a queue read end.
virtual void SendMessage(const Variant &msg)=0
virtual void SendCreateQueue(CPN::Key_t kernelkey, const CPN::SimpleQueueAttr &attr)
Tell a given kernel that it needs to create a queue.
virtual void SignalNodeEnd(CPN::Key_t nodekey)
Called by the node cleanup routine to indicate that the node has ended.
An exception indicating that the Kernel has shut down.
Definition: Exceptions.h:38
const std::string & GetName() const
Definition: NodeAttr.h:110
SimpleQueueAttr & SetWriterNodeKey(Key_t k)
Definition: QueueAttr.h:305
Key_t GetWriterNodeKey() const
Definition: QueueAttr.h:321
virtual CPN::Key_t GetWriterKernel(CPN::Key_t portkey)
virtual std::string GetNodeName(CPN::Key_t nodekey)
auto_ptr< Pthread > terminateThread
virtual void RemoteCreateQueue(SimpleQueueAttr attr)
Definition: KernelBase.cc:36
virtual void SignalKernelStart(CPN::Key_t kernelkey)
Signal to the context that the given kernel has started.
const std::string & GetDatatype() const
Definition: QueueAttr.h:329
shared_ptr< GenericWaiter > GenericWaiterPtr
const std::string & GetTypeName() const
Definition: NodeAttr.h:112
SimpleQueueAttr & SetWriterKey(Key_t k)
Definition: QueueAttr.h:315
unsigned GetMaxWriteThreshold() const
Definition: QueueAttr.h:331
SimpleQueueAttr & SetDatatype(const std::string &type)
Definition: QueueAttr.h:274
Key_t GetReaderNodeKey() const
Definition: QueueAttr.h:322
std::string GetHint() const
Definition: QueueAttr.h:328
NodeAttr MsgToNodeAttr(const Variant &msg)
SimpleQueueAttr & SetMaxThreshold(unsigned maxthresh)
Definition: QueueAttr.h:290
virtual CPN::Key_t GetReaderNode(CPN::Key_t portkey)
This is a simplified internal representation of the queue attributes needed to create a queue...
Definition: QueueAttr.h:237
virtual void RemoteCreateReader(SimpleQueueAttr attr)
Definition: KernelBase.cc:33
virtual CPN::Key_t GetWriterNode(CPN::Key_t portkey)
const std::string & GetKernel() const
Definition: NodeAttr.h:107
virtual CPN::Key_t GetNodeKey(const std::string &nodename)
#define ASSERT(exp,...)
SimpleQueueAttr & SetReaderKey(Key_t k)
Definition: QueueAttr.h:310
virtual void WaitForAllNodeEnd()
Convenience method which waits until there are no nodes running. If no node have started then this wi...