CPN
Computational Process Networks
Kernel.cc
Go to the documentation of this file.
1 //=============================================================================
2 // Computational Process Networks class library
3 // Copyright (C) 1997-2006 Gregory E. Allen and The University of Texas
4 //
5 // This library is free software; you can redistribute it and/or modify it
6 // under the terms of the GNU Library General Public License as published
7 // by the Free Software Foundation; either version 2 of the License, or
8 // (at your option) any later version.
9 //
10 // This library is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 // Library General Public License for more details.
14 //
15 // The GNU Public License is available in the file LICENSE, or you
16 // can write to the Free Software Foundation, Inc., 59 Temple Place -
17 // Suite 330, Boston, MA 02111-1307, USA, or you can find it on the
18 // World Wide Web at http://www.fsf.org.
19 //=============================================================================
24 #include "common_priv.h"
25 #include <cpn/Kernel.h>
26 #include <cpn/Exceptions.h>
27 #include <cpn/NodeFactory.h>
28 #include <cpn/NodeBase.h>
29 #include "CPNThresholdQueue.h"
30 #include "ConnectionServer.h"
31 #include "RemoteQueueHolder.h"
32 #include "RemoteQueue.h"
33 #include "RDMAQueue.h"
34 #include <cpn/io/SocketAddress.h>
36 #include <cpn/utils/Logger.h>
39 #include <stdexcept>
40 
41 //#define KERNEL_FUNC_TRACE
42 #ifdef KERNEL_FUNC_TRACE
43 #include <stdio.h>
44 #define FUNCBEGIN printf("%s begin %s\n",__PRETTY_FUNCTION__, kernelname.c_str())
45 #define FUNCEND printf("%s end %s\n",__PRETTY_FUNCTION__, kernelname.c_str())
46 #else
47 #define FUNCBEGIN
48 #define FUNCEND
49 #endif
50 
51 namespace CPN {
52 
53  Kernel::Kernel(const KernelAttr &kattr)
54  : status(INITIALIZED),
55  kernelname(kattr.GetName()),
56  kernelkey(0),
57  context(kattr.GetContext()),
58  useremote(kattr.GetRemoteEnabled()),
59  nodecond_signal(false),
60  useD4R(kattr.UseD4R()),
61  swallowbrokenqueue(kattr.SwallowBrokenQueueExceptions()),
62  growmaxthresh(kattr.GrowQueueMaxThreshold())
63  {
64  FUNCBEGIN;
65 #ifdef CPN_NODELIBDIR
66  try {
67  nodeloader.SearchDirectory(CPN_NODELIBDIR);
68  } catch (const ErrnoException &e) {}
69 #endif
74  if (!context) {
76  }
77  if (context->RequireRemote()) {
78  useremote = true;
79  }
80  logger.Output(context.get());
81  logger.LogLevel(context->LogLevel());
83 
84  if (useremote) {
86  kattr.GetServName());
87  server.reset(new ConnectionServer(addrlist, context));
88 
89  SocketAddress addr = server->GetAddress();
90  kernelkey = context->SetupKernel(kernelname, addr.GetHostName(), addr.GetServName(), this);
92 
93  logger.Info("New kernel, listening on %s:%s", addr.GetHostName().c_str(), addr.GetServName().c_str());
94  } else {
95  kernelkey = context->SetupKernel(kernelname, this);
96  logger.Info("New kernel");
97  }
98  // Start up and don't finish until actually started.
99  thread->Start();
101  }
102 
104  FUNCBEGIN;
105  Terminate();
106  Wait();
107  thread->Join();
108  FUNCEND;
109  }
110 
111  void Kernel::Wait() {
112  FUNCBEGIN;
113  KernelStatus_t s = status.Get();
114  while (s != DONE) {
115  s = status.CompareAndWait(s);
116  }
117  FUNCEND;
118  }
119 
121  context->Terminate();
122  }
123 
125  return context->IsTerminated();
126  }
127 
129  context->CheckTerminated();
130  }
131 
133  FUNCBEGIN;
135  SendWakeup();
136  }
137  }
138 
140  FUNCBEGIN;
141 
142  NodeAttr nodeattr = attr;
143 
144  if (nodeattr.GetKernelKey() == 0) {
145  Key_t key = 0;
146  if (nodeattr.GetKernel().empty()) {
147  key = kernelkey;
148  } else {
149  key = context->WaitForKernelStart(nodeattr.GetKernel());
150  }
151  nodeattr.SetKernelKey(key);
152  }
153  Key_t nodekey = context->CreateNodeKey(nodeattr.GetKernelKey(), nodeattr.GetName());
154  nodeattr.SetKey(nodekey);
155 
156  // check the kernel the node should go on and send
157  // to that particular kernel
158  if (nodeattr.GetKernelKey() == kernelkey) {
159  InternalCreateNode(nodeattr);
160  } else {
161  context->SendCreateNode(nodeattr.GetKernelKey(), nodeattr);
162  }
163  return nodekey;
164  }
165 
166  class ExternalEndpoint : public PseudoNode {
167  public:
168  ExternalEndpoint(const std::string &name, Key_t nodekey, shared_ptr<Context> context,
169  bool iswriter_)
170  : PseudoNode(name, nodekey, context), iswriter(iswriter_) {}
171  virtual ~ExternalEndpoint() {}
172 
173  bool IsWriter() const { return iswriter; }
174  private:
175  bool iswriter;
176  };
177 
178  void Kernel::CreateExternalReader(const std::string &name) {
179  CreateExternalEndpoint(name, false);
180  }
181 
182  void Kernel::CreateExternalWriter(const std::string &name) {
183  CreateExternalEndpoint(name, true);
184  }
185 
186  void Kernel::CreateExternalEndpoint(const std::string &name, bool iswriter) {
187  Key_t nodekey = context->CreateNodeKey(kernelkey, name);
188  shared_ptr<PseudoNode> pnode;
189  pnode.reset(new ExternalEndpoint(name, nodekey, context, iswriter));
191  nodemap.insert(std::make_pair(nodekey, pnode));
192  arlock.Unlock();
193  context->SignalNodeStart(nodekey);
194  }
195 
196  shared_ptr<QueueWriter> Kernel::GetExternalOQueue(const std::string &name) {
197  Key_t key = context->GetNodeKey(name);
199  NodeMap::iterator entry = nodemap.find(key);
200  if (entry == nodemap.end() || !entry->second->IsPurePseudo()) {
201  throw std::invalid_argument("Not a valid external reader.");
202  }
203  shared_ptr<ExternalEndpoint> pnode = dynamic_pointer_cast<ExternalEndpoint>(entry->second);
204  arlock.Unlock();
205  if (pnode && pnode->IsWriter()) {
206  return pnode->GetOQueue(name);
207  }
208  throw std::invalid_argument("Not a valid external reader.");
209  }
210 
211  shared_ptr<QueueReader> Kernel::GetExternalIQueue(const std::string &name) {
212  Key_t key = context->GetNodeKey(name);
214  NodeMap::iterator entry = nodemap.find(key);
215  if (entry == nodemap.end() || !entry->second->IsPurePseudo()) {
216  throw std::invalid_argument("Not a valid external writer.");
217  }
218  shared_ptr<ExternalEndpoint> pnode = dynamic_pointer_cast<ExternalEndpoint>(entry->second);
219  arlock.Unlock();
220  if (pnode && !pnode->IsWriter()) {
221  return pnode->GetIQueue(name);
222  }
223  throw std::invalid_argument("Not a valid external writer.");
224  }
225 
226  void Kernel::DestroyExternalEndpoint(const std::string &name) {
227  Key_t key = context->GetNodeKey(name);
229  NodeMap::iterator entry = nodemap.find(key);
230  if (entry == nodemap.end() || !entry->second->IsPurePseudo()) {
231  throw std::invalid_argument("Not a valid external endpoint.");
232  }
233  arlock.Unlock();
234  NodeTerminated(key);
235  }
236 
237  void Kernel::WaitForNode(const std::string &nodename) {
238  FUNCBEGIN;
239  context->WaitForNodeEnd(nodename);
240  }
241 
243  context->WaitForAllNodeEnd();
244  }
245 
246  void Kernel::WaitForNodeStart(const std::string &nodename) {
247  FUNCBEGIN;
248  context->WaitForNodeStart(nodename);
249  }
250 
251  void Kernel::CreateQueue(const QueueAttr &qattr) {
252  FUNCBEGIN;
253  // Normalize the QueueAttr into a SimpleQueueAttr
254  // This gets rid of the names and translates to IDs
255  SimpleQueueAttr attr = qattr;
256  if (attr.GetReaderKey() == 0) {
257  if (attr.GetReaderNodeKey() == 0) {
258  if (qattr.GetReaderNode().empty()) {
259  throw std::invalid_argument(
260  "The reader side must be specified with the reader key"
261  " or the reader name and node key or the reader name and"
262  " node name.");
263  }
264  attr.SetReaderNodeKey(context->WaitForNodeStart(qattr.GetReaderNode()));
265  }
266  if (qattr.GetReaderPort().empty()) {
267  throw std::invalid_argument("Ether the port key or port name must be specified.");
268  }
269  attr.SetReaderKey(context->GetCreateReaderKey(attr.GetReaderNodeKey(),
270  qattr.GetReaderPort()));
271  } else if (attr.GetReaderNodeKey() == 0) {
272  attr.SetReaderNodeKey(context->GetReaderNode(attr.GetReaderKey()));
273  }
274 
275  if (attr.GetWriterKey() == 0) {
276  if (attr.GetWriterNodeKey() == 0) {
277  if (qattr.GetWriterNode().empty()) {
278  throw std::invalid_argument(
279  "The writer side must be specified with the writer key"
280  " or the writer name and node key or the writer name and"
281  " node name.");
282  }
283  attr.SetWriterNodeKey(context->WaitForNodeStart(qattr.GetWriterNode()));
284  }
285  if (qattr.GetWriterPort().empty()) {
286  throw std::invalid_argument("Ether the port key or port name must be specified.");
287  }
288  attr.SetWriterKey(context->GetCreateWriterKey(attr.GetWriterNodeKey(),
289  qattr.GetWriterPort()));
290  } else if (attr.GetWriterNodeKey() == 0) {
291  attr.SetWriterNodeKey(context->GetWriterNode(attr.GetWriterKey()));
292  }
293 
294  context->ConnectEndpoints(attr.GetWriterKey(), attr.GetReaderKey(), qattr.GetName());
295 
296  Key_t readerkernel = context->GetNodeKernel(attr.GetReaderNodeKey());
297  Key_t writerkernel = context->GetNodeKernel(attr.GetWriterNodeKey());
298 
299  if (readerkernel == writerkernel) {
300  if (readerkernel == kernelkey) {
301  CreateLocalQueue(attr);
302  } else {
303  ASSERT(useremote, "Cannot create remote queue without enabling remote operations.");
304  // Send a message to the other kernel to create a local queue
305  context->SendCreateQueue(readerkernel, attr);
306  }
307  } else if (readerkernel == kernelkey) {
308  ASSERT(useremote, "Cannot create remote queue without enabling remote operations.");
309  // Create the reader end here and queue up a message
310  // to the writer kernel that they need to create an endpoint
311  CreateReaderEndpoint(attr);
312  context->SendCreateWriter(writerkernel, attr);
313  } else if (writerkernel == kernelkey) {
314  ASSERT(useremote, "Cannot create remote queue without enabling remote operations.");
315  // Create the writer end here and queue up a message to
316  // the reader kernel that they need to create an endpoint
317  CreateWriterEndpoint(attr);
318  context->SendCreateReader(readerkernel, attr);
319  } else {
320  ASSERT(useremote, "Cannot create remote queue without enabling remote operations.");
321  // Queue up a message to both the reader and writer kernel
322  // to create endpoints
323  context->SendCreateWriter(writerkernel, attr);
324  context->SendCreateReader(readerkernel, attr);
325  }
326  }
327 
332  };
333 
334  static QueueHintStatus ParseQueueHintForMBS(std::string hint) {
335  static const char threshold_str[] = "threshold";
336  static const char *threshold_end = threshold_str + sizeof(threshold_str) - 1;
337  static const char rdma_str[] = "rdma";
338  static const char *rdma_end = rdma_str + sizeof(rdma_str) - 1;
339  bool match = true;
340  std::string::iterator h_cur = hint.begin();
341  std::string::iterator h_end = hint.end();
342  for (const char *cur = &threshold_str[0]; cur != threshold_end; ++cur) {
343  if (h_cur == h_end || *h_cur != *cur) {
344  match = false;
345  break;
346  }
347  ++h_cur;
348  }
349  if (match) { return QUEUEHINT_THRESHOLD; }
350  match = true;
351  h_cur = hint.begin();
352  for (const char *cur = &rdma_str[0]; cur != rdma_end; ++cur) {
353  if (h_cur == h_end || *h_cur != *cur) {
354  match = false;
355  break;
356  }
357  ++h_cur;
358  }
359  if (match) { return QUEUEHINT_RDMA; }
360  return QUEUEHINT_DEFAULT;
361  }
362 
364  ASSERT(useremote, "Cannot create remote queue without enabling remote operations.");
365 
367  shared_ptr<RemoteQueueBase> endp;
368  shared_ptr<QueueBase> queue;
369 #if ENABLE_RDMA
370  if (hint == QUEUEHINT_RDMA) {
371  shared_ptr<RDMAQueue> q = shared_ptr<RDMAQueue>(
372  new RDMAQueue(
373  this,
374  RDMAQueue::READ,
375  server.get(),
376  remotequeueholder.get(),
377  attr
378  )
379  );
380  endp = q;
381  queue = q;
382  } else
383 #endif
384  {
385  shared_ptr<RemoteQueue> q = shared_ptr<RemoteQueue>(
386  new RemoteQueue(
387  this,
389  server.get(),
390  remotequeueholder.get(),
391  attr,
392  hint != QUEUEHINT_DEFAULT
393  ));
394  endp = q;
395  queue = q;
396  }
398  NodeMap::iterator entry = nodemap.find(attr.GetReaderNodeKey());
399  shared_ptr<PseudoNode> node = entry->second;
400  if (entry != nodemap.end()) {
401  node = entry->second;
402  }
403  arlock.Unlock();
404  remotequeueholder->AddQueue(endp);
405  endp->Start();
406  if (node) { node->CreateReader(queue); }
407  else { queue->ShutdownReader(); }
408  }
409 
411  ASSERT(useremote, "Cannot create remote queue without enabling remote operations.");
412 
414  shared_ptr<RemoteQueueBase> endp;
415  shared_ptr<QueueBase> queue;
416 #if ENABLE_RDMA
417  if (hint == QUEUEHINT_RDMA) {
418  shared_ptr<RDMAQueue> q = shared_ptr<RDMAQueue>(
419  new RDMAQueue(
420  this,
421  RDMAQueue::WRITE,
422  server.get(),
423  remotequeueholder.get(),
424  attr
425  )
426  );
427  endp = q;
428  queue = q;
429  } else
430 #endif
431  {
432  shared_ptr<RemoteQueue> q = shared_ptr<RemoteQueue>(
433  new RemoteQueue(
434  this,
436  server.get(),
437  remotequeueholder.get(),
438  attr,
439  hint != QUEUEHINT_DEFAULT
440  ));
441  endp = q;
442  queue = q;
443  }
445  NodeMap::iterator entry = nodemap.find(attr.GetWriterNodeKey());
446  shared_ptr<PseudoNode> node;
447  if (entry != nodemap.end()) {
448  node = entry->second;
449  }
450  arlock.Unlock();
451  remotequeueholder->AddQueue(endp);
452  endp->Start();
453  if (node) { node->CreateWriter(queue); }
454  else { queue->ShutdownWriter(); }
455  }
456 
459  shared_ptr<QueueBase> queue;
460  queue = shared_ptr<QueueBase>(new ThresholdQueue(this, attr, hint != QUEUEHINT_DEFAULT));
461 
463  NodeMap::iterator readentry = nodemap.find(attr.GetReaderNodeKey());
464  shared_ptr<PseudoNode> readnode;
465  if (readentry != nodemap.end()) {
466  readnode = readentry->second;
467  }
468 
469  NodeMap::iterator writeentry = nodemap.find(attr.GetWriterNodeKey());
470  shared_ptr<PseudoNode> writenode;
471  if (writeentry != nodemap.end()) {
472  writenode = writeentry->second;
473  }
474  arlock.Unlock();
475 
476  if (writenode) { writenode->CreateWriter(queue); }
477  else { queue->ShutdownWriter(); }
478  if (readnode) { readnode->CreateReader(queue); }
479  else { queue->ShutdownReader(); }
480  }
481 
484  FUNCBEGIN;
485  ASSERT(status.Get() == RUNNING);
486  NodeFactory *factory = GetNodeFactory(nodeattr.GetTypeName());
487  if (!factory) {
488  throw std::invalid_argument("No such node type " + nodeattr.GetTypeName());
489  }
490  shared_ptr<NodeBase> node = factory->Create(*this, nodeattr);
491  nodemap.insert(std::make_pair(nodeattr.GetKey(), node));
492  node->Start();
493  }
494 
496  context->SignalNodeEnd(key);
498  FUNCBEGIN;
499  if (status.Get() == DONE) {
500  logger.Warn("Nodes running after shutdown");
501  } else {
502  NodeMap::iterator entry = nodemap.find(key);
503  ASSERT(entry != nodemap.end());
504  shared_ptr<PseudoNode> node = entry->second;
505  nodemap.erase(entry);
506  {
508  garbagenodes.push_back(node);
509  }
510  SendWakeup();
511  }
512  nodecond.Signal();
513  nodecond_signal = true;
514  }
515 
518  FUNCBEGIN;
519  while (!garbagenodes.empty()) {
520  garbagenodes.back()->Shutdown();
521  garbagenodes.pop_back();
522  }
523  }
524 
527  if (useremote) {
528  server->Wakeup();
529  }
530  nodecond.Signal();
531  nodecond_signal = true;
532  }
533 
535  FUNCBEGIN;
537  try {
538  context->SignalKernelStart(kernelkey);
539  if (useremote) {
540  while (status.Get() == RUNNING) {
541  ClearGarbage();
542  remotequeueholder->Cleanup();
543  server->Poll();
544  }
545  } else {
546  ClearGarbage();
548  while (status.Get() == RUNNING) {
549  if (!nodecond_signal) {
551  nodecond_signal = false;
552  }
553  arlock.Unlock();
554  ClearGarbage();
555  arlock.Lock();
556  }
557  }
558  } catch (const ShutdownException &e) {
559  logger.Warn("Kernel forced shutdown");
560  }
561  if (useremote) {
562  server->Close();
563  remotequeueholder->Shutdown();
564  }
565  {
567  NodeMap mapcopy = nodemap;
568  arlock.Unlock();
569  NodeMap::iterator nitr = mapcopy.begin();
570  while (nitr != mapcopy.end()) {
571  (nitr++)->second->NotifyTerminate();
572  }
573  ClearGarbage();
574  arlock.Lock();
575  // Wait for all nodes to end
576  while (!nodemap.empty()) {
577  if (!nodecond_signal) {
579  nodecond_signal = false;
580  }
581  arlock.Unlock();
582  ClearGarbage();
583  arlock.Lock();
584  }
585  }
586  ClearGarbage();
587  context->SignalKernelEnd(kernelkey);
588  status.Post(DONE);
589  FUNCEND;
590  return 0;
591  }
592 
594  FUNCBEGIN;
595  ASSERT(useremote);
596  CreateWriterEndpoint(attr);
597  }
599  FUNCBEGIN;
600  ASSERT(useremote);
601  CreateReaderEndpoint(attr);
602  }
604  FUNCBEGIN;
605  ASSERT(useremote);
606  CreateLocalQueue(attr);
607  }
609  FUNCBEGIN;
610  ASSERT(useremote);
611  InternalCreateNode(attr);
612  }
613 
615  // Note that this function does not aquire the lock...
616  // this is because it is meant to be called from the debugger while
617  // the application is halted and may already be in the lock or not.
618  std::string statename;
619  switch (status.Get()) {
620  case INITIALIZED:
621  statename = "initialized";
622  break;
623  case RUNNING:
624  statename = "running";
625  break;
626  case TERMINATE:
627  statename = "terminated";
628  break;
629  case DONE:
630  statename = "done";
631  break;
632  }
633  logger.Error("Kernel %s (%llu) in state %s", kernelname.c_str(), kernelkey, statename.c_str());
634  logger.Error("Active nodes: %u, Garbage nodes: %u", nodemap.size(), garbagenodes.size());
635  if (useremote) {
636  server->LogState();
637  }
638  NodeMap::iterator node = nodemap.begin();
639  while (node != nodemap.end()) {
640  node->second->LogState();
641  ++node;
642  }
643  }
644 
645 
646  bool Kernel::UseD4R() {
648  return useD4R;
649  }
650 
651  bool Kernel::UseD4R(bool u) {
653  return useD4R = u;
654  }
655 
658  return swallowbrokenqueue;
659  }
660 
663  return swallowbrokenqueue = sbqe;
664  }
665 
668  return growmaxthresh;
669  }
670 
673  return growmaxthresh = grow;
674  }
675 
676 }
677 
const std::string name
Definition: PseudoNode.h:87
void Wait(ReentrantLock &lock) const
Key_t GetKey() const
Definition: NodeAttr.h:116
KernelStatus_t
Definition: Kernel.h:56
bool useD4R
Definition: Kernel.h:367
shared_ptr< QueueReader > GetExternalIQueue(const std::string &name)
Definition: Kernel.cc:211
The base definition of all nodes.
NodeLoader nodeloader
Definition: Kernel.h:353
shared_ptr< Context > context
Definition: Kernel.h:349
Attributes for a node.
Definition: NodeAttr.h:58
bool UseD4R()
Whether or not D4R should be used.
Definition: Kernel.cc:646
NodeList garbagenodes
Definition: Kernel.h:364
static shared_ptr< Context > Local()
Create a local context.
Definition: Context.cc:29
void LoadSharedLib(const std::string &libname)
Definition: NodeLoader.cc:57
void DestroyExternalEndpoint(const std::string &name)
Definition: Kernel.cc:226
Key_t GetWriterKey() const
Definition: QueueAttr.h:323
SimpleQueueAttr & SetReaderNodeKey(Key_t k)
Definition: QueueAttr.h:300
void NotifyTerminate()
Definition: Kernel.cc:132
#define FUNCEND
Definition: Kernel.cc:48
const std::vector< std::string > & GetNodeLists() const
Definition: KernelAttr.h:137
An abstraction of a socket address with convenience methods.
Definition: SocketAddress.h:42
void CreateExternalReader(const std::string &name)
Create an external reader.
Definition: Kernel.cc:178
LoggerOutput * Output()
Definition: Logger.cc:98
bool IsWriter() const
Definition: Kernel.cc:173
void WaitForNode(const std::string &nodename)
Definition: Kernel.cc:237
const Key_t nodekey
Definition: PseudoNode.h:88
Sync::ReentrantLock garbagelock
Definition: Kernel.h:363
virtual ~ExternalEndpoint()
Definition: Kernel.cc:171
Definition for the kernel object.
The attribute for the Kernel.
Definition: KernelAttr.h:40
void RemoteCreateNode(NodeAttr attr)
Definition: Kernel.cc:608
const std::string kernelname
Definition: Kernel.h:346
void SearchDirectory(const std::string &dirname)
Definition: NodeLoader.cc:78
#define FUNCBEGIN
Definition: Kernel.cc:47
NodeAttr & SetKernelKey(Key_t k)
Definition: NodeAttr.h:84
bool useremote
Definition: Kernel.h:352
Sync::StatusHandler< KernelStatus_t > status
Definition: Kernel.h:341
Status_t Get() const
Definition: StatusHandler.h:84
std::string GetHostName(bool numerichost=true) const
shared_ptr< QueueWriter > GetExternalOQueue(const std::string &name)
Definition: Kernel.cc:196
void RemoteCreateQueue(SimpleQueueAttr attr)
Definition: Kernel.cc:603
NodeAttr & SetKey(Key_t key_)
Definition: NodeAttr.h:102
auto_ptr< ConnectionServer > server
Definition: Kernel.h:350
void Error(const char *fmt,...)
Definition: Logger.cc:159
Status_t CompareAndWait(Status_t oldStatus) const
void CreateQueue(const QueueAttr &attr)
Create a new queue.
Definition: Kernel.cc:251
const std::string & GetReaderPort() const
Definition: QueueAttr.h:200
QueueHintStatus
Definition: Kernel.cc:328
void Unlock()
Definition: AutoLock.h:61
uint64_t Key_t
Definition: common.h:79
void CreateExternalWriter(const std::string &name)
Definition: Kernel.cc:182
void CreateReaderEndpoint(const SimpleQueueAttr &attr)
Definition: Kernel.cc:363
An object to hold references to RemoteQueues so they can continue to work after the node has gone awa...
The exceptions specified for the CPN network.
bool GrowQueueMaxThreshold()
Whether the queue should grow when a threshold larger than the current max threshold is requested...
Definition: Kernel.cc:666
void WaitForNodeStart(const std::string &nodename)
Wait for the given node to start.
Definition: Kernel.cc:246
const std::vector< std::string > & GetNodeSearchPaths() const
Definition: KernelAttr.h:139
void CheckTerminated()
Convenience method that checks IsTerminated and if so throws a ShutdownException. ...
Definition: Kernel.cc:128
bool swallowbrokenqueue
Definition: Kernel.h:368
NodeMap nodemap
Definition: Kernel.h:362
const std::string & GetServName() const
Definition: KernelAttr.h:123
void Info(const char *fmt,...)
Definition: Logger.cc:177
const std::string & GetWriterNode() const
Definition: QueueAttr.h:197
bool SwallowBrokenQueueExceptions()
Whether the node should by default swallow the broken queue exceptions or let them propigate as an er...
Definition: Kernel.cc:656
void CreateLocalQueue(const SimpleQueueAttr &attr)
Definition: Kernel.cc:457
void CreateExternalEndpoint(const std::string &name, bool iswriter)
Definition: Kernel.cc:186
Kernel(const KernelAttr &kattr)
Definition: Kernel.cc:53
void SendWakeup()
Definition: Kernel.cc:525
const std::string & Name() const
Definition: Logger.cc:88
void Lock()
Definition: AutoLock.h:69
PthreadFunctional * CreatePthreadFunctional(T *obj, void *(T::*meth)(void))
const std::string & GetName() const
Definition: QueueAttr.h:211
bool CompareAndPost(Status_t oldStatus, Status_t newStatus)
Definition: StatusHandler.h:96
Key_t kernelkey
Definition: Kernel.h:347
void NodeTerminated(Key_t key)
Called by the node in the cleanup routine. TO BE CALLED ONLY BY THE CPN INTERNALS.
Definition: Kernel.cc:495
std::string GetServName() const
Logger logger
Definition: Kernel.h:348
Key_t GetReaderKey() const
Definition: QueueAttr.h:324
shared_ptr< Context > context
Definition: PseudoNode.h:96
const std::vector< std::string > & GetSharedLibs() const
Definition: KernelAttr.h:135
void LoadNodeList(const std::string &filename)
Definition: NodeLoader.cc:68
void WaitForAllNodes()
Waits until there are no running nodes.
Definition: Kernel.cc:242
void ClearGarbage()
Definition: Kernel.cc:516
int LogLevel() const
Definition: Logger.cc:58
A very simple logging interface.
A version of the ThresholdQueue that provides the CPN Queue interface This queue implementation creat...
An exception indicating that the Kernel has shut down.
Definition: Exceptions.h:38
const std::string & GetName() const
Definition: NodeAttr.h:110
const std::string & GetWriterPort() const
Definition: QueueAttr.h:198
static QueueHintStatus ParseQueueHintForMBS(std::string hint)
Definition: Kernel.cc:334
ExternalEndpoint(const std::string &name, Key_t nodekey, shared_ptr< Context > context, bool iswriter_)
Definition: Kernel.cc:168
std::map< Key_t, shared_ptr< PseudoNode > > NodeMap
Definition: Kernel.h:355
Default threshold queue implementation.
void RemoteCreateWriter(SimpleQueueAttr attr)
Definition: Kernel.cc:593
SimpleQueueAttr & SetWriterNodeKey(Key_t k)
Definition: QueueAttr.h:305
Key_t GetWriterNodeKey() const
Definition: QueueAttr.h:321
static SockAddrList CreateIP(unsigned serv)
Return a list of valid socket address for the given service number or port number.
void RemoteCreateReader(SimpleQueueAttr attr)
Definition: Kernel.cc:598
void Warn(const char *fmt,...)
Definition: Logger.cc:168
auto_ptr< Pthread > thread
Definition: Kernel.h:345
void CreateWriterEndpoint(const SimpleQueueAttr &attr)
Definition: Kernel.cc:410
bool IsTerminated()
Definition: Kernel.cc:124
Sync::ReentrantCondition nodecond
Definition: Kernel.h:360
std::vector< SocketAddress > SockAddrList
Definition: SocketAddress.h:35
const std::string & GetReaderNode() const
Definition: QueueAttr.h:199
auto_ptr< RemoteQueueHolder > remotequeueholder
Definition: Kernel.h:351
The attributes for a queue.
Definition: QueueAttr.h:55
const std::string & GetTypeName() const
Definition: NodeAttr.h:112
SimpleQueueAttr & SetWriterKey(Key_t k)
Definition: QueueAttr.h:315
Sync::ReentrantLock datalock
Definition: Kernel.h:366
Key_t GetReaderNodeKey() const
Definition: QueueAttr.h:322
void LogState()
Definition: Kernel.cc:614
std::string GetHint() const
Definition: QueueAttr.h:328
void Terminate()
Definition: Kernel.cc:120
Sync::ReentrantLock nodelock
Definition: Kernel.h:359
The node factory provides a method for the kernel to create arbitrary user defined Nodes...
Definition: NodeFactory.h:37
NodeFactory * GetNodeFactory(const std::string &nodetype)
Return a pointer to the node factory that produces the given node type. May load a shared library to ...
Definition: Kernel.h:276
virtual shared_ptr< NodeBase > Create(Kernel &ker, const NodeAttr &attr)=0
bool growmaxthresh
Definition: Kernel.h:369
This is a simplified internal representation of the queue attributes needed to create a queue...
Definition: QueueAttr.h:237
const std::string & GetKernel() const
Definition: NodeAttr.h:107
void InternalCreateNode(NodeAttr &nodeattr)
Definition: Kernel.cc:482
void Wait()
Definition: Kernel.cc:111
Key_t GetKernelKey() const
Definition: NodeAttr.h:108
void * EntryPoint()
Definition: Kernel.cc:534
#define ASSERT(exp,...)
const std::string & GetHostName() const
Definition: KernelAttr.h:121
SimpleQueueAttr & SetReaderKey(Key_t k)
Definition: QueueAttr.h:310
void Post(Status_t newStatus)
Definition: StatusHandler.h:75
Key_t CreateNode(const NodeAttr &attr)
Definition: Kernel.cc:139
Definition of the NodeFactory.
bool nodecond_signal
Definition: Kernel.h:361