CPN
Computational Process Networks
LocalContext.cc
Go to the documentation of this file.
1 //=============================================================================
2 // Computational Process Networks class library
3 // Copyright (C) 1997-2006 Gregory E. Allen and The University of Texas
4 //
5 // This library is free software; you can redistribute it and/or modify it
6 // under the terms of the GNU Library General Public License as published
7 // by the Free Software Foundation; either version 2 of the License, or
8 // (at your option) any later version.
9 //
10 // This library is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 // Library General Public License for more details.
14 //
15 // The GNU Public License is available in the file LICENSE, or you
16 // can write to the Free Software Foundation, Inc., 59 Temple Place -
17 // Suite 330, Boston, MA 02111-1307, USA, or you can find it on the
18 // World Wide Web at http://www.fsf.org.
19 //=============================================================================
23 #include "common_priv.h"
24 #include "LocalContext.h"
25 #include <cpn/bits/KernelBase.h>
26 #include <cpn/KernelAttr.h>
27 #include <cpn/Exceptions.h>
29 #include <iostream>
30 #include <stdexcept>
31 
32 namespace CPN {
33 
35  : loglevel(Logger::WARNING), numlivenodes(0), shutdown(false), counter(0)
36  {}
37 
39  }
40 
41  void LocalContext::Log(int level, const std::string &msg) {
43  if (level >= loglevel) {
44  std::cerr << level << ":" << msg << std::endl;
45  }
46  }
47 
48  int LocalContext::LogLevel() const {
50  return loglevel;
51  }
52 
53  int LocalContext::LogLevel(int level) {
55  return loglevel = level;
56  }
57 
58  Key_t LocalContext::SetupKernel(const std::string &name, const std::string &hostname,
59  const std::string &servname, KernelBase *kernel) {
62  if (!kernel) { throw std::invalid_argument("Must have non null Kernel."); }
63  if (kernelnames.find(name) != kernelnames.end()) {
64  throw std::invalid_argument("Cannot create two kernels with the same name");
65  }
66  shared_ptr<KernelInfo> hinfo = shared_ptr<KernelInfo>(new KernelInfo);
67  hinfo->name = name;
68  hinfo->hostname = hostname;
69  hinfo->servname = servname;
70  hinfo->kernel = kernel;
71  hinfo->dead = false;
72  hinfo->live = false;
73  hinfo->allowremote = true;
74  Key_t key = NewKey();
75  kernelmap.insert(std::make_pair(key, hinfo));
76  kernelnames.insert(std::make_pair(name, key));
77  return key;
78  }
79 
80  Key_t LocalContext::SetupKernel(const std::string &name, KernelBase *kernel) {
83  if (!kernel) { throw std::invalid_argument("Must have non null Kernel."); }
84  if (kernelnames.find(name) != kernelnames.end()) {
85  throw std::invalid_argument("Cannot create two kernels with the same name");
86  }
87  shared_ptr<KernelInfo> hinfo = shared_ptr<KernelInfo>(new KernelInfo);
88  hinfo->name = name;
89  hinfo->kernel = kernel;
90  hinfo->dead = false;
91  hinfo->live = false;
92  hinfo->allowremote = false;
93  Key_t key = NewKey();
94  kernelmap.insert(std::make_pair(key, hinfo));
95  kernelnames.insert(std::make_pair(name, key));
96  return key;
97 
98  }
99 
100  Key_t LocalContext::GetKernelKey(const std::string &kernel) {
103  NameMap::iterator entry = kernelnames.find(kernel);
104  if (entry == kernelnames.end()) {
105  throw std::invalid_argument("No such kernel");
106  }
107  return entry->second;
108  }
109 
110  std::string LocalContext::GetKernelName(Key_t kernelkey) {
113  KernelMap::iterator entry = kernelmap.find(kernelkey);
114  if (entry == kernelmap.end()) {
115  throw std::invalid_argument("No such kernel");
116  }
117  return entry->second->name;
118  }
119 
120  void LocalContext::GetKernelConnectionInfo(Key_t kernelkey, std::string &hostname, std::string &servname) {
123  KernelMap::iterator entry = kernelmap.find(kernelkey);
124  if (entry == kernelmap.end()) {
125  throw std::invalid_argument("No such kernel");
126  }
127  ASSERT(entry->second->allowremote, "Kernel does not have remote configured.");
128  hostname = entry->second->hostname;
129  servname = entry->second->servname;
130  }
131 
134  KernelMap::iterator entry = kernelmap.find(kernelkey);
135  if (entry == kernelmap.end()) {
136  throw std::invalid_argument("No such kernel");
137  }
138  entry->second->dead = true;
140  }
141 
142  Key_t LocalContext::WaitForKernelStart(const std::string &kernel) {
145  while (true) {
146  NameMap::iterator entry = kernelnames.find(kernel);
147  if (entry != kernelnames.end()) {
148  KernelMap::iterator hentry = kernelmap.find(entry->second);
149  if (hentry->second->live) {
150  return entry->second;
151  }
152  }
155  }
156  }
157 
161  KernelMap::iterator entry = kernelmap.find(kernelkey);
162  if (entry == kernelmap.end()) {
163  throw std::invalid_argument("No such kernel");
164  }
165  entry->second->live = true;
167  }
168 
169  void LocalContext::SendCreateWriter(Key_t kernelkey, const SimpleQueueAttr &attr) {
170  KernelBase *kernel;
171  {
174  shared_ptr<KernelInfo> hinfo = kernelmap[kernelkey];
175  kernel = hinfo->kernel;
176  }
177  ASSERT(kernel);
178  kernel->RemoteCreateWriter(attr);
179  }
180 
181  void LocalContext::SendCreateReader(Key_t kernelkey, const SimpleQueueAttr &attr) {
182  KernelBase *kernel;
183  {
186  shared_ptr<KernelInfo> hinfo = kernelmap[kernelkey];
187  kernel = hinfo->kernel;
188  }
189  ASSERT(kernel);
190  kernel->RemoteCreateReader(attr);
191  }
192 
193  void LocalContext::SendCreateQueue(Key_t kernelkey, const SimpleQueueAttr &attr) {
194  KernelBase *kernel;
195  {
198  shared_ptr<KernelInfo> hinfo = kernelmap[kernelkey];
199  kernel = hinfo->kernel;
200  }
201  ASSERT(kernel);
202  kernel->RemoteCreateQueue(attr);
203  }
204 
205  void LocalContext::SendCreateNode(Key_t kernelkey, const NodeAttr &attr) {
206  KernelBase *kernel;
207  {
210  shared_ptr<KernelInfo> hinfo = kernelmap[kernelkey];
211  kernel = hinfo->kernel;
212  }
213  ASSERT(kernel);
214  kernel->RemoteCreateNode(attr);
215  }
216 
217  Key_t LocalContext::CreateNodeKey(Key_t kernelkey, const std::string &nodename) {
220  NameMap::iterator nameentry = nodenames.find(nodename);
221  Key_t key;
222  if (nameentry == nodenames.end()) {
223  shared_ptr<NodeInfo> ninfo = shared_ptr<NodeInfo>(new NodeInfo);
224  ninfo->name = nodename;
225  ninfo->started = false;
226  ninfo->dead = false;
227  ninfo->kernelkey = kernelkey;
228  key = NewKey();
229  nodenames.insert(std::make_pair(nodename, key));
230  nodemap.insert(std::make_pair(key, ninfo));
231  } else {
232  throw std::invalid_argument("Node " + nodename + " already exists.");
233  }
234  return key;
235  }
236 
237  Key_t LocalContext::GetNodeKey(const std::string &nodename) {
240  NameMap::iterator nameentry = nodenames.find(nodename);
241  if (nameentry == nodenames.end()) {
242  throw std::invalid_argument("No such node");
243  } else {
244  return nameentry->second;
245  }
246  }
247 
248  std::string LocalContext::GetNodeName(Key_t nodekey) {
250  NodeMap::iterator entry = nodemap.find(nodekey);
251  if (entry == nodemap.end()) {
252  throw std::invalid_argument("No such node");
253  }
254  return entry->second->name;
255  }
256 
260  NodeMap::iterator entry = nodemap.find(nodekey);
261  if (entry == nodemap.end()) {
262  throw std::invalid_argument("No such node");
263  } else {
264  entry->second->started = true;
265  ++numlivenodes;
267  }
268  }
269 
272  NodeMap::iterator entry = nodemap.find(nodekey);
273  if (entry == nodemap.end()) {
274  throw std::invalid_argument("No such node");
275  } else {
276  entry->second->dead = true;
277  --numlivenodes;
279  }
280  }
281 
282  Key_t LocalContext::WaitForNodeStart(const std::string &nodename) {
285  while (true) {
286  NameMap::iterator nameentry = nodenames.find(nodename);
287  if (nameentry != nodenames.end()) {
288  NodeMap::iterator entry = nodemap.find(nameentry->second);
289  if (entry != nodemap.end()) {
290  if (entry->second->started) {
291  return entry->first;
292  }
293  }
294  }
297  }
298  }
299 
300  void LocalContext::WaitForNodeEnd(const std::string &nodename) {
302  while (!shutdown) {
303  NameMap::iterator nameentry = nodenames.find(nodename);
304  if (nameentry != nodenames.end()) {
305  NodeMap::iterator entry = nodemap.find(nameentry->second);
306  if (entry != nodemap.end()) {
307  if (entry->second->dead) {
308  ASSERT(entry->second->started, "Node died before it started!?");
309  return;
310  }
311  }
312  }
314  }
315  }
316 
319  while (numlivenodes > 0 && !shutdown) {
321  }
322  }
323 
327  NodeMap::iterator entry = nodemap.find(nodekey);
328  if (entry == nodemap.end()) {
329  throw std::invalid_argument("No such node");
330  }
331  return entry->second->kernelkey;
332  }
333 
334  Key_t LocalContext::GetCreateReaderKey(Key_t nodekey, const std::string &portname) {
337  NodeMap::iterator entry = nodemap.find(nodekey);
338  if (entry == nodemap.end()) {
339  throw std::invalid_argument("No such node");
340  }
341  NameMap::iterator portentry = entry->second->readers.find(portname);
342  if (portentry == entry->second->readers.end()) {
343  Key_t key = NewKey();
344  entry->second->readers.insert(std::make_pair(portname, key));
345  shared_ptr<PortInfo> pinfo = shared_ptr<PortInfo>(new PortInfo);
346  pinfo->name = portname;
347  pinfo->nodekey = nodekey;
348  pinfo->opposingport = 0;
349  pinfo->dead = false;
350  readports.insert(std::make_pair(key, pinfo));
351  return key;
352  } else {
353  return portentry->second;
354  }
355  }
359  PortMap::iterator entry = readports.find(portkey);
360  if (entry == readports.end()) {
361  throw std::invalid_argument("No such port");
362  }
363  return entry->second->nodekey;
364  }
365 
367  Key_t nodekey = GetReaderNode(portkey);
368  return GetNodeKernel(nodekey);
369  }
370 
371  std::string LocalContext::GetReaderName(Key_t portkey) {
374  PortMap::iterator entry = readports.find(portkey);
375  if (entry == readports.end()) {
376  throw std::invalid_argument("No such port");
377  }
378  return entry->second->name;
379  }
380 
381  Key_t LocalContext::GetCreateWriterKey(Key_t nodekey, const std::string &portname) {
384  NodeMap::iterator entry = nodemap.find(nodekey);
385  if (entry == nodemap.end()) {
386  throw std::invalid_argument("No such node");
387  }
388  NameMap::iterator portentry = entry->second->writers.find(portname);
389  if (portentry == entry->second->writers.end()) {
390  Key_t key = NewKey();
391  entry->second->writers.insert(std::make_pair(portname, key));
392  shared_ptr<PortInfo> pinfo = shared_ptr<PortInfo>(new PortInfo);
393  pinfo->name = portname;
394  pinfo->nodekey = nodekey;
395  pinfo->opposingport = 0;
396  pinfo->dead = false;
397  writeports.insert(std::make_pair(key, pinfo));
398  return key;
399  } else {
400  return portentry->second;
401  }
402  }
406  PortMap::iterator entry = writeports.find(portkey);
407  if (entry == writeports.end()) {
408  throw std::invalid_argument("No such port");
409  }
410  return entry->second->nodekey;
411  }
412 
414  Key_t nodekey = GetWriterNode(portkey);
415  return GetNodeKernel(nodekey);
416  }
417 
418  std::string LocalContext::GetWriterName(Key_t portkey) {
421  PortMap::iterator entry = writeports.find(portkey);
422  if (entry == writeports.end()) {
423  throw std::invalid_argument("No such port");
424  }
425  return entry->second->name;
426  }
427 
428  void LocalContext::ConnectEndpoints(Key_t writerkey, Key_t readerkey, const std::string &qname) {
431  PortMap::iterator writeentry = writeports.find(writerkey);
432  if (writeentry == writeports.end()) {
433  throw std::invalid_argument("Write port does not exist.");
434  }
435  PortMap::iterator readentry = readports.find(readerkey);
436  if (readentry == readports.end()) {
437  throw std::invalid_argument("Read port does not exist.");
438  }
439 
440  writeentry->second->opposingport = readerkey;
441  readentry->second->opposingport = writerkey;
442 
443  readentry->second->qname = qname;
444  writeentry->second->qname = qname;
445  }
446 
450  PortMap::iterator readentry = readports.find(readerkey);
451  if (readentry == readports.end()) {
452  throw std::invalid_argument("Read port does not exist.");
453  }
454  return readentry->second->opposingport;
455  }
456 
460  PortMap::iterator writeentry = writeports.find(writerkey);
461  if (writeentry == writeports.end()) {
462  throw std::invalid_argument("Write port does not exist.");
463  }
464  return writeentry->second->opposingport;
465  }
466 
468  KernelMap mapcopy;
469  {
471  shutdown = true;
474  mapcopy = kernelmap;
475  }
476  KernelMap::iterator itr = mapcopy.begin();
477  while (itr != mapcopy.end()) {
478  if (!itr->second->dead) {
479  itr->second->kernel->NotifyTerminate();
480  }
481  ++itr;
482  }
483 }
484 
487  return shutdown;
488  }
489 
491  if (shutdown) {
492  throw ShutdownException();
493  }
494  }
495 }
496 
Logger object that is used for forwarding log messages.
Definition: Logger.h:57
virtual Key_t GetWriterNode(Key_t portkey)
PthreadCondition & Wait(PthreadMutex &mutex)
virtual ~LocalContext()
Definition: LocalContext.cc:38
std::map< Key_t, shared_ptr< KernelInfo > > KernelMap
Definition: LocalContext.h:74
virtual void SendCreateReader(Key_t kernelkey, const SimpleQueueAttr &attr)
Tell a given kernel that it needs to create a queue read end.
Attributes for a node.
Definition: NodeAttr.h:58
virtual Key_t GetWriterKernel(Key_t portkey)
virtual void SignalKernelEnd(Key_t kernelkey)
Signal to the Context that the given kernel is dead.
virtual std::string GetWriterName(Key_t portkey)
PthreadCondition & Broadcast(void)
void InternalCheckTerminated()
virtual Key_t GetNodeKey(const std::string &nodename)
unsigned numlivenodes
Definition: LocalContext.h:140
virtual Key_t SetupKernel(const std::string &name, const std::string &hostname, const std::string &servname, KernelBase *kernel)
Called by the Kernel when it has successfully set it self up. This gives the Context a way to notify ...
Definition: LocalContext.cc:58
virtual void ConnectEndpoints(Key_t writerkey, Key_t readerkey, const std::string &qname)
Called by the kernel when a queue is created. Note that the endpoints may have been created when the ...
virtual Key_t GetReadersWriter(Key_t readerkey)
virtual void SignalNodeEnd(Key_t nodekey)
Called by the node cleanup routine to indicate that the node has ended.
uint64_t Key_t
Definition: common.h:79
An implemenation of Context for a local process specific context.
virtual void GetKernelConnectionInfo(Key_t kernelkey, std::string &hostname, std::string &servname)
obtain the connection information for the given kernel
virtual void SignalNodeStart(Key_t nodekey)
Called by the node startup routine to indicate that the node has started.
The exceptions specified for the CPN network.
virtual void SendCreateQueue(Key_t kernelkey, const SimpleQueueAttr &attr)
Tell a given kernel that it needs to create a queue.
KernelMap kernelmap
Definition: LocalContext.h:136
PthreadCondition kernellivedead
Definition: LocalContext.h:133
virtual void RemoteCreateNode(NodeAttr attr)
Definition: KernelBase.cc:39
virtual void RemoteCreateWriter(SimpleQueueAttr attr)
Definition: KernelBase.cc:30
virtual Key_t WaitForKernelStart(const std::string &kernel)
Does not return until the given kernel has started.
virtual Key_t GetWritersReader(Key_t writerkey)
virtual Key_t GetCreateWriterKey(Key_t nodekey, const std::string &portname)
virtual void SendCreateWriter(Key_t kernelkey, const SimpleQueueAttr &attr)
Tell a given kernel that it needs to create a queue write end.
virtual std::string GetReaderName(Key_t portkey)
virtual void WaitForAllNodeEnd()
Convenience method which waits until there are no nodes running. If no node have started then this wi...
virtual int LogLevel() const
Definition: LocalContext.cc:48
virtual void SignalKernelStart(Key_t kernelkey)
Signal to the context that the given kernel has started.
PthreadMutex lock
Definition: LocalContext.h:131
virtual std::string GetNodeName(Key_t nodekey)
virtual void Terminate()
Signal to the Context that the network is terminating. After this call most methods will throw a Shut...
An exception indicating that the Kernel has shut down.
Definition: Exceptions.h:38
virtual Key_t GetCreateReaderKey(Key_t nodekey, const std::string &portname)
Get the key associated with the given endpoint for the given node. Creates the information if it does...
virtual Key_t GetKernelKey(const std::string &kernel)
Definition of the kernel attributes.
virtual std::string GetKernelName(Key_t kernelkey)
virtual void RemoteCreateQueue(SimpleQueueAttr attr)
Definition: KernelBase.cc:36
virtual bool IsTerminated()
virtual void SendCreateNode(Key_t kernelkey, const NodeAttr &attr)
Tell a given kernel that it needs to create a node.
PthreadCondition nodelivedead
Definition: LocalContext.h:132
virtual Key_t GetReaderKernel(Key_t portkey)
virtual Key_t GetReaderNode(Key_t portkey)
This is a simplified internal representation of the queue attributes needed to create a queue...
Definition: QueueAttr.h:237
virtual void RemoteCreateReader(SimpleQueueAttr attr)
Definition: KernelBase.cc:33
virtual void WaitForNodeEnd(const std::string &nodename)
Waits for the given node to signal end.
virtual void Log(int level, const std::string &msg)
Log a message to this outputer.
Definition: LocalContext.cc:41
virtual Key_t GetNodeKernel(Key_t nodekey)
virtual Key_t CreateNodeKey(Key_t kernelkey, const std::string &nodename)
Tell the context to allocate a new node key and data structure for a node with nodename which is on k...
virtual Key_t WaitForNodeStart(const std::string &nodename)
Waits until the node starts and returns the key, if the node is already started returns the key...
#define ASSERT(exp,...)