CPN
Computational Process Networks
Public Member Functions | Private Types | Private Member Functions | Private Attributes | List of all members
CPN::Kernel Class Reference

The Kernel declaration. More...

#include <Kernel.h>

+ Inheritance diagram for CPN::Kernel:
+ Collaboration diagram for CPN::Kernel:

Public Member Functions

 Kernel (const KernelAttr &kattr)
 
 ~Kernel ()
 
void Wait ()
 
void Terminate ()
 
bool IsTerminated ()
 
void CheckTerminated ()
 Convenience method that checks IsTerminated and if so throws a ShutdownException. More...
 
Key_t CreateNode (const NodeAttr &attr)
 
Key_t CreateNode (const std::string &name_, const std::string &nodetype_)
 Same as CreateNode that takes a NodeAttr except only the required parameters are set. More...
 
void CreateExternalReader (const std::string &name)
 Create an external reader. More...
 
void CreateExternalWriter (const std::string &name)
 
shared_ptr< QueueWriterGetExternalOQueue (const std::string &name)
 
shared_ptr< QueueReaderGetExternalIQueue (const std::string &name)
 
void DestroyExternalEndpoint (const std::string &name)
 
void WaitForNode (const std::string &nodename)
 
void WaitForAllNodes ()
 Waits until there are no running nodes. More...
 
void WaitForNodeStart (const std::string &nodename)
 Wait for the given node to start. More...
 
void CreateQueue (const QueueAttr &attr)
 Create a new queue. More...
 
LoggerOutputGetLogger ()
 
const std::string GetName () const
 
Key_t GetKey () const
 
shared_ptr< ContextGetContext () const
 
void NodeTerminated (Key_t key)
 Called by the node in the cleanup routine. TO BE CALLED ONLY BY THE CPN INTERNALS. More...
 
void LoadSharedLib (const std::string &libname)
 Attempts to load the given dynamic library and make the symbols inside available to be searched for node types. The library will be unloaded on distruction. More...
 
void LoadNodeList (const std::string &filename)
 
void SearchDirectory (const std::string &dirname)
 
NodeFactoryGetNodeFactory (const std::string &nodetype)
 Return a pointer to the node factory that produces the given node type. May load a shared library to find the node factory. More...
 
void RegisterNodeFactory (shared_ptr< NodeFactory > factory)
 A function that lets others register node factories. More...
 
unsigned CalculateGrowSize (unsigned currentsize, unsigned request)
 Calculate the new queue size when a queue needs to grow. More...
 
template<typename Function >
Key_t CreateFunctionNode (const std::string &nodename, Function func)
 Instantiating a function node. More...
 
template<typename Function , typename Argument1 >
Key_t CreateFunctionNode (const std::string &nodename, Function func, Argument1 arg1)
 
template<typename Function , typename Argument1 , typename Argument2 >
Key_t CreateFunctionNode (const std::string &nodename, Function func, Argument1 arg1, Argument2 arg2)
 
template<typename Function , typename Argument1 , typename Argument2 , typename Argument3 >
Key_t CreateFunctionNode (const std::string &nodename, Function func, Argument1 arg1, Argument2 arg2, Argument3 arg3)
 
template<typename Function , typename Argument1 , typename Argument2 , typename Argument3 , typename Argument4 >
Key_t CreateFunctionNode (const std::string &nodename, Function func, Argument1 arg1, Argument2 arg2, Argument3 arg3, Argument4 arg4)
 
template<typename Function , typename Argument1 , typename Argument2 , typename Argument3 , typename Argument4 , typename Argument5 >
Key_t CreateFunctionNode (const std::string &nodename, Function func, Argument1 arg1, Argument2 arg2, Argument3 arg3, Argument4 arg4, Argument5 arg5)
 
template<typename Function , typename Argument1 , typename Argument2 , typename Argument3 , typename Argument4 , typename Argument5 , typename Argument6 >
Key_t CreateFunctionNode (const std::string &nodename, Function func, Argument1 arg1, Argument2 arg2, Argument3 arg3, Argument4 arg4, Argument5 arg5, Argument6 arg6)
 
template<typename Function , typename Argument1 , typename Argument2 , typename Argument3 , typename Argument4 , typename Argument5 , typename Argument6 , typename Argument7 >
Key_t CreateFunctionNode (const std::string &nodename, Function func, Argument1 arg1, Argument2 arg2, Argument3 arg3, Argument4 arg4, Argument5 arg5, Argument6 arg6, Argument7 arg7)
 
template<typename Function , typename Argument1 , typename Argument2 , typename Argument3 , typename Argument4 , typename Argument5 , typename Argument6 , typename Argument7 , typename Argument8 >
Key_t CreateFunctionNode (const std::string &nodename, Function func, Argument1 arg1, Argument2 arg2, Argument3 arg3, Argument4 arg4, Argument5 arg5, Argument6 arg6, Argument7 arg7, Argument8 arg8)
 
template<typename Function , typename Argument1 , typename Argument2 , typename Argument3 , typename Argument4 , typename Argument5 , typename Argument6 , typename Argument7 , typename Argument8 , typename Argument9 >
Key_t CreateFunctionNode (const std::string &nodename, Function func, Argument1 arg1, Argument2 arg2, Argument3 arg3, Argument4 arg4, Argument5 arg5, Argument6 arg6, Argument7 arg7, Argument8 arg8, Argument9 arg9)
 
void NotifyTerminate ()
 
void RemoteCreateWriter (SimpleQueueAttr attr)
 
void RemoteCreateReader (SimpleQueueAttr attr)
 
void RemoteCreateQueue (SimpleQueueAttr attr)
 
void RemoteCreateNode (NodeAttr attr)
 
bool UseD4R ()
 Whether or not D4R should be used. More...
 
bool UseD4R (bool u)
 
bool GrowQueueMaxThreshold ()
 Whether the queue should grow when a threshold larger than the current max threshold is requested. More...
 
bool GrowQueueMaxThreshold (bool grow)
 
bool SwallowBrokenQueueExceptions ()
 Whether the node should by default swallow the broken queue exceptions or let them propigate as an error. More...
 
bool SwallowBrokenQueueExceptions (bool sbqe)
 

Private Types

enum  KernelStatus_t { INITIALIZED, RUNNING, TERMINATE, DONE }
 
typedef std::map< Key_t,
shared_ptr< PseudoNode > > 
NodeMap
 
typedef std::vector
< shared_ptr< PseudoNode > > 
NodeList
 

Private Member Functions

 Kernel (const Kernel &)
 
Kerneloperator= (const Kernel &)
 
void CreateExternalEndpoint (const std::string &name, bool iswriter)
 
void CreateReaderEndpoint (const SimpleQueueAttr &attr)
 
void CreateWriterEndpoint (const SimpleQueueAttr &attr)
 
void CreateLocalQueue (const SimpleQueueAttr &attr)
 
void InternalCreateNode (NodeAttr &nodeattr)
 
void ClearGarbage ()
 
void * EntryPoint ()
 
void SendWakeup ()
 
void LogState ()
 

Private Attributes

Sync::StatusHandler
< KernelStatus_t
status
 
auto_ptr< Pthreadthread
 
const std::string kernelname
 
Key_t kernelkey
 
Logger logger
 
shared_ptr< Contextcontext
 
auto_ptr< ConnectionServerserver
 
auto_ptr< RemoteQueueHolderremotequeueholder
 
bool useremote
 
NodeLoader nodeloader
 
Sync::ReentrantLock nodelock
 
Sync::ReentrantCondition nodecond
 
bool nodecond_signal
 
NodeMap nodemap
 
Sync::ReentrantLock garbagelock
 
NodeList garbagenodes
 
Sync::ReentrantLock datalock
 
bool useD4R
 
bool swallowbrokenqueue
 
bool growmaxthresh
 

Detailed Description

The Kernel declaration.

The purpose of the kernel object is to keep track of all the queues and nodes on a particular kernel, ensure that they are instantiated and destroyed correctly and to provide a unified interface to the user of the process network.

Definition at line 55 of file Kernel.h.

Member Typedef Documentation

typedef std::vector< shared_ptr<PseudoNode> > CPN::Kernel::NodeList
private

Definition at line 356 of file Kernel.h.

typedef std::map<Key_t, shared_ptr<PseudoNode> > CPN::Kernel::NodeMap
private

Definition at line 355 of file Kernel.h.

Member Enumeration Documentation

Enumerator
INITIALIZED 
RUNNING 
TERMINATE 
DONE 

Definition at line 56 of file Kernel.h.

Constructor & Destructor Documentation

CPN::Kernel::Kernel ( const KernelAttr kattr)

Construct a new kernel object with the given attributes. The Kernel starts processing immediately.

Definition at line 53 of file Kernel.cc.

References Sync::StatusHandler< Status_t >::CompareAndWait(), context, SocketAddress::CreateIP(), CreatePthreadFunctional(), EntryPoint(), FUNCBEGIN, SocketAddress::GetHostName(), CPN::KernelAttr::GetHostName(), CPN::KernelAttr::GetNodeLists(), CPN::KernelAttr::GetNodeSearchPaths(), SocketAddress::GetServName(), CPN::KernelAttr::GetServName(), CPN::KernelAttr::GetSharedLibs(), Logger::Info(), INITIALIZED, kernelkey, kernelname, CPN::NodeLoader::LoadNodeList(), CPN::NodeLoader::LoadSharedLib(), CPN::Context::Local(), logger, Logger::LogLevel(), Logger::Name(), nodeloader, Logger::Output(), remotequeueholder, CPN::NodeLoader::SearchDirectory(), server, status, thread, and useremote.

55  kernelname(kattr.GetName()),
56  kernelkey(0),
57  context(kattr.GetContext()),
58  useremote(kattr.GetRemoteEnabled()),
59  nodecond_signal(false),
60  useD4R(kattr.UseD4R()),
61  swallowbrokenqueue(kattr.SwallowBrokenQueueExceptions()),
62  growmaxthresh(kattr.GrowQueueMaxThreshold())
63  {
64  FUNCBEGIN;
65 #ifdef CPN_NODELIBDIR
66  try {
67  nodeloader.SearchDirectory(CPN_NODELIBDIR);
68  } catch (const ErrnoException &e) {}
69 #endif
70  nodeloader.SearchDirectory(kattr.GetNodeSearchPaths());
71  nodeloader.LoadSharedLib(kattr.GetSharedLibs());
72  nodeloader.LoadNodeList(kattr.GetNodeLists());
74  if (!context) {
76  }
77  if (context->RequireRemote()) {
78  useremote = true;
79  }
80  logger.Output(context.get());
81  logger.LogLevel(context->LogLevel());
83 
84  if (useremote) {
85  SockAddrList addrlist = SocketAddress::CreateIP(kattr.GetHostName(),
86  kattr.GetServName());
87  server.reset(new ConnectionServer(addrlist, context));
88 
89  SocketAddress addr = server->GetAddress();
90  kernelkey = context->SetupKernel(kernelname, addr.GetHostName(), addr.GetServName(), this);
91  remotequeueholder.reset(new RemoteQueueHolder());
92 
93  logger.Info("New kernel, listening on %s:%s", addr.GetHostName().c_str(), addr.GetServName().c_str());
94  } else {
95  kernelkey = context->SetupKernel(kernelname, this);
96  logger.Info("New kernel");
97  }
98  // Start up and don't finish until actually started.
99  thread->Start();
101  }
bool useD4R
Definition: Kernel.h:367
NodeLoader nodeloader
Definition: Kernel.h:353
shared_ptr< Context > context
Definition: Kernel.h:349
static shared_ptr< Context > Local()
Create a local context.
Definition: Context.cc:29
void LoadSharedLib(const std::string &libname)
Definition: NodeLoader.cc:57
An abstraction of a socket address with convenience methods.
Definition: SocketAddress.h:42
LoggerOutput * Output()
Definition: Logger.cc:98
const std::string kernelname
Definition: Kernel.h:346
void SearchDirectory(const std::string &dirname)
Definition: NodeLoader.cc:78
#define FUNCBEGIN
Definition: Kernel.cc:47
bool useremote
Definition: Kernel.h:352
Sync::StatusHandler< KernelStatus_t > status
Definition: Kernel.h:341
std::string GetHostName(bool numerichost=true) const
auto_ptr< ConnectionServer > server
Definition: Kernel.h:350
Status_t CompareAndWait(Status_t oldStatus) const
bool swallowbrokenqueue
Definition: Kernel.h:368
void Info(const char *fmt,...)
Definition: Logger.cc:177
const std::string & Name() const
Definition: Logger.cc:88
PthreadFunctional * CreatePthreadFunctional(T *obj, void *(T::*meth)(void))
Key_t kernelkey
Definition: Kernel.h:347
std::string GetServName() const
Logger logger
Definition: Kernel.h:348
void LoadNodeList(const std::string &filename)
Definition: NodeLoader.cc:68
int LogLevel() const
Definition: Logger.cc:58
static SockAddrList CreateIP(unsigned serv)
Return a list of valid socket address for the given service number or port number.
auto_ptr< Pthread > thread
Definition: Kernel.h:345
std::vector< SocketAddress > SockAddrList
Definition: SocketAddress.h:35
auto_ptr< RemoteQueueHolder > remotequeueholder
Definition: Kernel.h:351
bool growmaxthresh
Definition: Kernel.h:369
void * EntryPoint()
Definition: Kernel.cc:534
bool nodecond_signal
Definition: Kernel.h:361

+ Here is the call graph for this function:

CPN::Kernel::~Kernel ( )

Definition at line 103 of file Kernel.cc.

References FUNCBEGIN, FUNCEND, Terminate(), thread, and Wait().

103  {
104  FUNCBEGIN;
105  Terminate();
106  Wait();
107  thread->Join();
108  FUNCEND;
109  }
#define FUNCEND
Definition: Kernel.cc:48
#define FUNCBEGIN
Definition: Kernel.cc:47
auto_ptr< Pthread > thread
Definition: Kernel.h:345
void Terminate()
Definition: Kernel.cc:120
void Wait()
Definition: Kernel.cc:111

+ Here is the call graph for this function:

CPN::Kernel::Kernel ( const Kernel )
private

Member Function Documentation

unsigned CPN::Kernel::CalculateGrowSize ( unsigned  currentsize,
unsigned  request 
)
inlinevirtual

Calculate the new queue size when a queue needs to grow.

Returns
the new queue size

Implements CPN::KernelBase.

Definition at line 317 of file Kernel.h.

317  {
318  return context->CalculateGrowSize(currentsize, request);
319  }
shared_ptr< Context > context
Definition: Kernel.h:349
void CPN::Kernel::CheckTerminated ( )
virtual

Convenience method that checks IsTerminated and if so throws a ShutdownException.

Exceptions
ShutdownException

Implements CPN::KernelBase.

Definition at line 128 of file Kernel.cc.

References context.

128  {
129  context->CheckTerminated();
130  }
shared_ptr< Context > context
Definition: Kernel.h:349
void CPN::Kernel::ClearGarbage ( )
private

Definition at line 516 of file Kernel.cc.

References FUNCBEGIN, garbagelock, and garbagenodes.

Referenced by EntryPoint().

516  {
518  FUNCBEGIN;
519  while (!garbagenodes.empty()) {
520  garbagenodes.back()->Shutdown();
521  garbagenodes.pop_back();
522  }
523  }
NodeList garbagenodes
Definition: Kernel.h:364
Sync::ReentrantLock garbagelock
Definition: Kernel.h:363
#define FUNCBEGIN
Definition: Kernel.cc:47

+ Here is the caller graph for this function:

void CPN::Kernel::CreateExternalEndpoint ( const std::string &  name,
bool  iswriter 
)
private

Definition at line 186 of file Kernel.cc.

References context, kernelkey, nodelock, nodemap, and AutoLock< Lockable >::Unlock().

Referenced by CreateExternalReader(), and CreateExternalWriter().

186  {
187  Key_t nodekey = context->CreateNodeKey(kernelkey, name);
188  shared_ptr<PseudoNode> pnode;
189  pnode.reset(new ExternalEndpoint(name, nodekey, context, iswriter));
191  nodemap.insert(std::make_pair(nodekey, pnode));
192  arlock.Unlock();
193  context->SignalNodeStart(nodekey);
194  }
shared_ptr< Context > context
Definition: Kernel.h:349
uint64_t Key_t
Definition: common.h:79
NodeMap nodemap
Definition: Kernel.h:362
Key_t kernelkey
Definition: Kernel.h:347
Sync::ReentrantLock nodelock
Definition: Kernel.h:359

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::Kernel::CreateExternalReader ( const std::string &  name)

Create an external reader.

A external reader is a handle for applications to use outside of the network to input and output data

Note that an external reader can only be used on the kernel it is created on.

Implementation detail: External endpoints create a node internally with the same name. This means that they can be waited on with WaitForNodeStart and WaitForNode. Also, WaitForAllNodes will not return until all external endpoints have been destroyed. Additionally, external endpoint names must be unique across the network and cannot share the same name as any node.

Parameters
namethe name of the reader

Definition at line 178 of file Kernel.cc.

References CreateExternalEndpoint().

178  {
179  CreateExternalEndpoint(name, false);
180  }
void CreateExternalEndpoint(const std::string &name, bool iswriter)
Definition: Kernel.cc:186

+ Here is the call graph for this function:

void CPN::Kernel::CreateExternalWriter ( const std::string &  name)

Dual of CreateExternalReader

Definition at line 182 of file Kernel.cc.

References CreateExternalEndpoint().

182  {
183  CreateExternalEndpoint(name, true);
184  }
void CreateExternalEndpoint(const std::string &name, bool iswriter)
Definition: Kernel.cc:186

+ Here is the call graph for this function:

template<typename Function >
Key_t CPN::Kernel::CreateFunctionNode ( const std::string &  nodename,
Function  func 
)

Instantiating a function node.

The idea of a function node is a simple function with minimal state. A node is created and then the function is called with the given parameters.

This list of prototypes was generated by FunctionNode.py

template<typename Function , typename Argument1 >
Key_t CPN::Kernel::CreateFunctionNode ( const std::string &  nodename,
Function  func,
Argument1  arg1 
)
template<typename Function , typename Argument1 , typename Argument2 >
Key_t CPN::Kernel::CreateFunctionNode ( const std::string &  nodename,
Function  func,
Argument1  arg1,
Argument2  arg2 
)
template<typename Function , typename Argument1 , typename Argument2 , typename Argument3 >
Key_t CPN::Kernel::CreateFunctionNode ( const std::string &  nodename,
Function  func,
Argument1  arg1,
Argument2  arg2,
Argument3  arg3 
)
template<typename Function , typename Argument1 , typename Argument2 , typename Argument3 , typename Argument4 >
Key_t CPN::Kernel::CreateFunctionNode ( const std::string &  nodename,
Function  func,
Argument1  arg1,
Argument2  arg2,
Argument3  arg3,
Argument4  arg4 
)
template<typename Function , typename Argument1 , typename Argument2 , typename Argument3 , typename Argument4 , typename Argument5 >
Key_t CPN::Kernel::CreateFunctionNode ( const std::string &  nodename,
Function  func,
Argument1  arg1,
Argument2  arg2,
Argument3  arg3,
Argument4  arg4,
Argument5  arg5 
)
template<typename Function , typename Argument1 , typename Argument2 , typename Argument3 , typename Argument4 , typename Argument5 , typename Argument6 >
Key_t CPN::Kernel::CreateFunctionNode ( const std::string &  nodename,
Function  func,
Argument1  arg1,
Argument2  arg2,
Argument3  arg3,
Argument4  arg4,
Argument5  arg5,
Argument6  arg6 
)
template<typename Function , typename Argument1 , typename Argument2 , typename Argument3 , typename Argument4 , typename Argument5 , typename Argument6 , typename Argument7 >
Key_t CPN::Kernel::CreateFunctionNode ( const std::string &  nodename,
Function  func,
Argument1  arg1,
Argument2  arg2,
Argument3  arg3,
Argument4  arg4,
Argument5  arg5,
Argument6  arg6,
Argument7  arg7 
)
template<typename Function , typename Argument1 , typename Argument2 , typename Argument3 , typename Argument4 , typename Argument5 , typename Argument6 , typename Argument7 , typename Argument8 >
Key_t CPN::Kernel::CreateFunctionNode ( const std::string &  nodename,
Function  func,
Argument1  arg1,
Argument2  arg2,
Argument3  arg3,
Argument4  arg4,
Argument5  arg5,
Argument6  arg6,
Argument7  arg7,
Argument8  arg8 
)
template<typename Function , typename Argument1 , typename Argument2 , typename Argument3 , typename Argument4 , typename Argument5 , typename Argument6 , typename Argument7 , typename Argument8 , typename Argument9 >
Key_t CPN::Kernel::CreateFunctionNode ( const std::string &  nodename,
Function  func,
Argument1  arg1,
Argument2  arg2,
Argument3  arg3,
Argument4  arg4,
Argument5  arg5,
Argument6  arg6,
Argument7  arg7,
Argument8  arg8,
Argument9  arg9 
)
void CPN::Kernel::CreateLocalQueue ( const SimpleQueueAttr attr)
private

Definition at line 457 of file Kernel.cc.

References CPN::SimpleQueueAttr::GetHint(), CPN::SimpleQueueAttr::GetReaderNodeKey(), CPN::SimpleQueueAttr::GetWriterNodeKey(), nodelock, nodemap, CPN::ParseQueueHintForMBS(), and CPN::QUEUEHINT_DEFAULT.

Referenced by CreateQueue(), and RemoteCreateQueue().

457  {
458  QueueHintStatus hint = ParseQueueHintForMBS(attr.GetHint());
459  shared_ptr<QueueBase> queue;
460  queue = shared_ptr<QueueBase>(new ThresholdQueue(this, attr, hint != QUEUEHINT_DEFAULT));
461 
463  NodeMap::iterator readentry = nodemap.find(attr.GetReaderNodeKey());
464  shared_ptr<PseudoNode> readnode;
465  if (readentry != nodemap.end()) {
466  readnode = readentry->second;
467  }
468 
469  NodeMap::iterator writeentry = nodemap.find(attr.GetWriterNodeKey());
470  shared_ptr<PseudoNode> writenode;
471  if (writeentry != nodemap.end()) {
472  writenode = writeentry->second;
473  }
474  arlock.Unlock();
475 
476  if (writenode) { writenode->CreateWriter(queue); }
477  else { queue->ShutdownWriter(); }
478  if (readnode) { readnode->CreateReader(queue); }
479  else { queue->ShutdownReader(); }
480  }
QueueHintStatus
Definition: Kernel.cc:328
NodeMap nodemap
Definition: Kernel.h:362
static QueueHintStatus ParseQueueHintForMBS(std::string hint)
Definition: Kernel.cc:334
Sync::ReentrantLock nodelock
Definition: Kernel.h:359

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Key_t CPN::Kernel::CreateNode ( const NodeAttr attr)

Create a new node.

Parameters
attrThe NodeAttr that describes the new node to create.
Returns
the new node's key
Exceptions
KernelShutdownExceptionif Wait completed or Terminate has been called.

Definition at line 139 of file Kernel.cc.

References context, FUNCBEGIN, CPN::NodeAttr::GetKernel(), CPN::NodeAttr::GetKernelKey(), CPN::NodeAttr::GetName(), InternalCreateNode(), kernelkey, CPN::NodeAttr::SetKernelKey(), and CPN::NodeAttr::SetKey().

Referenced by CPN::VariantCPNLoader::LoadNode().

139  {
140  FUNCBEGIN;
141 
142  NodeAttr nodeattr = attr;
143 
144  if (nodeattr.GetKernelKey() == 0) {
145  Key_t key = 0;
146  if (nodeattr.GetKernel().empty()) {
147  key = kernelkey;
148  } else {
149  key = context->WaitForKernelStart(nodeattr.GetKernel());
150  }
151  nodeattr.SetKernelKey(key);
152  }
153  Key_t nodekey = context->CreateNodeKey(nodeattr.GetKernelKey(), nodeattr.GetName());
154  nodeattr.SetKey(nodekey);
155 
156  // check the kernel the node should go on and send
157  // to that particular kernel
158  if (nodeattr.GetKernelKey() == kernelkey) {
159  InternalCreateNode(nodeattr);
160  } else {
161  context->SendCreateNode(nodeattr.GetKernelKey(), nodeattr);
162  }
163  return nodekey;
164  }
shared_ptr< Context > context
Definition: Kernel.h:349
#define FUNCBEGIN
Definition: Kernel.cc:47
uint64_t Key_t
Definition: common.h:79
Key_t kernelkey
Definition: Kernel.h:347
void InternalCreateNode(NodeAttr &nodeattr)
Definition: Kernel.cc:482

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Key_t CPN::Kernel::CreateNode ( const std::string &  name_,
const std::string &  nodetype_ 
)
inline

Same as CreateNode that takes a NodeAttr except only the required parameters are set.

Parameters
namethe name of the node
nodetypethe node type name
Returns
the new node's key

Definition at line 107 of file Kernel.h.

107  {
108  return CreateNode(NodeAttr(name_,nodetype_));
109  }
Key_t CreateNode(const NodeAttr &attr)
Definition: Kernel.cc:139
void CPN::Kernel::CreateQueue ( const QueueAttr attr)

Create a new queue.

Note that the nodes for the queue must already exist.

Parameters
attrthe attribute to use to create the queu
See Also
QueueAttr

Definition at line 251 of file Kernel.cc.

References ASSERT, context, CreateLocalQueue(), CreateReaderEndpoint(), CreateWriterEndpoint(), FUNCBEGIN, CPN::QueueAttr::GetName(), CPN::SimpleQueueAttr::GetReaderKey(), CPN::QueueAttr::GetReaderNode(), CPN::SimpleQueueAttr::GetReaderNodeKey(), CPN::QueueAttr::GetReaderPort(), CPN::SimpleQueueAttr::GetWriterKey(), CPN::QueueAttr::GetWriterNode(), CPN::SimpleQueueAttr::GetWriterNodeKey(), CPN::QueueAttr::GetWriterPort(), kernelkey, CPN::SimpleQueueAttr::SetReaderKey(), CPN::SimpleQueueAttr::SetReaderNodeKey(), CPN::SimpleQueueAttr::SetWriterKey(), CPN::SimpleQueueAttr::SetWriterNodeKey(), and useremote.

Referenced by CPN::VariantCPNLoader::LoadQueue().

251  {
252  FUNCBEGIN;
253  // Normalize the QueueAttr into a SimpleQueueAttr
254  // This gets rid of the names and translates to IDs
255  SimpleQueueAttr attr = qattr;
256  if (attr.GetReaderKey() == 0) {
257  if (attr.GetReaderNodeKey() == 0) {
258  if (qattr.GetReaderNode().empty()) {
259  throw std::invalid_argument(
260  "The reader side must be specified with the reader key"
261  " or the reader name and node key or the reader name and"
262  " node name.");
263  }
264  attr.SetReaderNodeKey(context->WaitForNodeStart(qattr.GetReaderNode()));
265  }
266  if (qattr.GetReaderPort().empty()) {
267  throw std::invalid_argument("Ether the port key or port name must be specified.");
268  }
269  attr.SetReaderKey(context->GetCreateReaderKey(attr.GetReaderNodeKey(),
270  qattr.GetReaderPort()));
271  } else if (attr.GetReaderNodeKey() == 0) {
272  attr.SetReaderNodeKey(context->GetReaderNode(attr.GetReaderKey()));
273  }
274 
275  if (attr.GetWriterKey() == 0) {
276  if (attr.GetWriterNodeKey() == 0) {
277  if (qattr.GetWriterNode().empty()) {
278  throw std::invalid_argument(
279  "The writer side must be specified with the writer key"
280  " or the writer name and node key or the writer name and"
281  " node name.");
282  }
283  attr.SetWriterNodeKey(context->WaitForNodeStart(qattr.GetWriterNode()));
284  }
285  if (qattr.GetWriterPort().empty()) {
286  throw std::invalid_argument("Ether the port key or port name must be specified.");
287  }
288  attr.SetWriterKey(context->GetCreateWriterKey(attr.GetWriterNodeKey(),
289  qattr.GetWriterPort()));
290  } else if (attr.GetWriterNodeKey() == 0) {
291  attr.SetWriterNodeKey(context->GetWriterNode(attr.GetWriterKey()));
292  }
293 
294  context->ConnectEndpoints(attr.GetWriterKey(), attr.GetReaderKey(), qattr.GetName());
295 
296  Key_t readerkernel = context->GetNodeKernel(attr.GetReaderNodeKey());
297  Key_t writerkernel = context->GetNodeKernel(attr.GetWriterNodeKey());
298 
299  if (readerkernel == writerkernel) {
300  if (readerkernel == kernelkey) {
301  CreateLocalQueue(attr);
302  } else {
303  ASSERT(useremote, "Cannot create remote queue without enabling remote operations.");
304  // Send a message to the other kernel to create a local queue
305  context->SendCreateQueue(readerkernel, attr);
306  }
307  } else if (readerkernel == kernelkey) {
308  ASSERT(useremote, "Cannot create remote queue without enabling remote operations.");
309  // Create the reader end here and queue up a message
310  // to the writer kernel that they need to create an endpoint
311  CreateReaderEndpoint(attr);
312  context->SendCreateWriter(writerkernel, attr);
313  } else if (writerkernel == kernelkey) {
314  ASSERT(useremote, "Cannot create remote queue without enabling remote operations.");
315  // Create the writer end here and queue up a message to
316  // the reader kernel that they need to create an endpoint
317  CreateWriterEndpoint(attr);
318  context->SendCreateReader(readerkernel, attr);
319  } else {
320  ASSERT(useremote, "Cannot create remote queue without enabling remote operations.");
321  // Queue up a message to both the reader and writer kernel
322  // to create endpoints
323  context->SendCreateWriter(writerkernel, attr);
324  context->SendCreateReader(readerkernel, attr);
325  }
326  }
shared_ptr< Context > context
Definition: Kernel.h:349
#define FUNCBEGIN
Definition: Kernel.cc:47
bool useremote
Definition: Kernel.h:352
uint64_t Key_t
Definition: common.h:79
void CreateReaderEndpoint(const SimpleQueueAttr &attr)
Definition: Kernel.cc:363
void CreateLocalQueue(const SimpleQueueAttr &attr)
Definition: Kernel.cc:457
Key_t kernelkey
Definition: Kernel.h:347
void CreateWriterEndpoint(const SimpleQueueAttr &attr)
Definition: Kernel.cc:410
#define ASSERT(exp,...)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::Kernel::CreateReaderEndpoint ( const SimpleQueueAttr attr)
private

Definition at line 363 of file Kernel.cc.

References ASSERT, CPN::SimpleQueueAttr::GetHint(), CPN::SimpleQueueAttr::GetReaderNodeKey(), nodelock, nodemap, CPN::ParseQueueHintForMBS(), CPN::QUEUEHINT_DEFAULT, CPN::QUEUEHINT_RDMA, CPN::RemoteQueue::READ, remotequeueholder, server, AutoLock< Lockable >::Unlock(), and useremote.

Referenced by CreateQueue(), and RemoteCreateReader().

363  {
364  ASSERT(useremote, "Cannot create remote queue without enabling remote operations.");
365 
366  QueueHintStatus hint = ParseQueueHintForMBS(attr.GetHint());
367  shared_ptr<RemoteQueueBase> endp;
368  shared_ptr<QueueBase> queue;
369 #if ENABLE_RDMA
370  if (hint == QUEUEHINT_RDMA) {
371  shared_ptr<RDMAQueue> q = shared_ptr<RDMAQueue>(
372  new RDMAQueue(
373  this,
374  RDMAQueue::READ,
375  server.get(),
376  remotequeueholder.get(),
377  attr
378  )
379  );
380  endp = q;
381  queue = q;
382  } else
383 #endif
384  {
385  shared_ptr<RemoteQueue> q = shared_ptr<RemoteQueue>(
386  new RemoteQueue(
387  this,
389  server.get(),
390  remotequeueholder.get(),
391  attr,
392  hint != QUEUEHINT_DEFAULT
393  ));
394  endp = q;
395  queue = q;
396  }
398  NodeMap::iterator entry = nodemap.find(attr.GetReaderNodeKey());
399  shared_ptr<PseudoNode> node = entry->second;
400  if (entry != nodemap.end()) {
401  node = entry->second;
402  }
403  arlock.Unlock();
404  remotequeueholder->AddQueue(endp);
405  endp->Start();
406  if (node) { node->CreateReader(queue); }
407  else { queue->ShutdownReader(); }
408  }
bool useremote
Definition: Kernel.h:352
auto_ptr< ConnectionServer > server
Definition: Kernel.h:350
QueueHintStatus
Definition: Kernel.cc:328
NodeMap nodemap
Definition: Kernel.h:362
static QueueHintStatus ParseQueueHintForMBS(std::string hint)
Definition: Kernel.cc:334
auto_ptr< RemoteQueueHolder > remotequeueholder
Definition: Kernel.h:351
Sync::ReentrantLock nodelock
Definition: Kernel.h:359
#define ASSERT(exp,...)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::Kernel::CreateWriterEndpoint ( const SimpleQueueAttr attr)
private

Definition at line 410 of file Kernel.cc.

References ASSERT, CPN::SimpleQueueAttr::GetHint(), CPN::SimpleQueueAttr::GetWriterNodeKey(), nodelock, nodemap, CPN::ParseQueueHintForMBS(), CPN::QUEUEHINT_DEFAULT, CPN::QUEUEHINT_RDMA, remotequeueholder, server, AutoLock< Lockable >::Unlock(), useremote, and CPN::RemoteQueue::WRITE.

Referenced by CreateQueue(), and RemoteCreateWriter().

410  {
411  ASSERT(useremote, "Cannot create remote queue without enabling remote operations.");
412 
413  QueueHintStatus hint = ParseQueueHintForMBS(attr.GetHint());
414  shared_ptr<RemoteQueueBase> endp;
415  shared_ptr<QueueBase> queue;
416 #if ENABLE_RDMA
417  if (hint == QUEUEHINT_RDMA) {
418  shared_ptr<RDMAQueue> q = shared_ptr<RDMAQueue>(
419  new RDMAQueue(
420  this,
421  RDMAQueue::WRITE,
422  server.get(),
423  remotequeueholder.get(),
424  attr
425  )
426  );
427  endp = q;
428  queue = q;
429  } else
430 #endif
431  {
432  shared_ptr<RemoteQueue> q = shared_ptr<RemoteQueue>(
433  new RemoteQueue(
434  this,
436  server.get(),
437  remotequeueholder.get(),
438  attr,
439  hint != QUEUEHINT_DEFAULT
440  ));
441  endp = q;
442  queue = q;
443  }
445  NodeMap::iterator entry = nodemap.find(attr.GetWriterNodeKey());
446  shared_ptr<PseudoNode> node;
447  if (entry != nodemap.end()) {
448  node = entry->second;
449  }
450  arlock.Unlock();
451  remotequeueholder->AddQueue(endp);
452  endp->Start();
453  if (node) { node->CreateWriter(queue); }
454  else { queue->ShutdownWriter(); }
455  }
bool useremote
Definition: Kernel.h:352
auto_ptr< ConnectionServer > server
Definition: Kernel.h:350
QueueHintStatus
Definition: Kernel.cc:328
NodeMap nodemap
Definition: Kernel.h:362
static QueueHintStatus ParseQueueHintForMBS(std::string hint)
Definition: Kernel.cc:334
auto_ptr< RemoteQueueHolder > remotequeueholder
Definition: Kernel.h:351
Sync::ReentrantLock nodelock
Definition: Kernel.h:359
#define ASSERT(exp,...)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::Kernel::DestroyExternalEndpoint ( const std::string &  name)

Clean up the external endpoint. Anybody waiting on it will then return. Note that calling WaitForAllNodes will block until all external endpoints have died or Terminate is called.

Definition at line 226 of file Kernel.cc.

References context, nodelock, nodemap, NodeTerminated(), and AutoLock< Lockable >::Unlock().

226  {
227  Key_t key = context->GetNodeKey(name);
229  NodeMap::iterator entry = nodemap.find(key);
230  if (entry == nodemap.end() || !entry->second->IsPurePseudo()) {
231  throw std::invalid_argument("Not a valid external endpoint.");
232  }
233  arlock.Unlock();
234  NodeTerminated(key);
235  }
shared_ptr< Context > context
Definition: Kernel.h:349
uint64_t Key_t
Definition: common.h:79
NodeMap nodemap
Definition: Kernel.h:362
void NodeTerminated(Key_t key)
Called by the node in the cleanup routine. TO BE CALLED ONLY BY THE CPN INTERNALS.
Definition: Kernel.cc:495
Sync::ReentrantLock nodelock
Definition: Kernel.h:359

+ Here is the call graph for this function:

void * CPN::Kernel::EntryPoint ( )
private

Definition at line 534 of file Kernel.cc.

References ClearGarbage(), Sync::StatusHandler< Status_t >::CompareAndPost(), context, DONE, FUNCBEGIN, FUNCEND, Sync::StatusHandler< Status_t >::Get(), INITIALIZED, kernelkey, AutoLock< Lockable >::Lock(), logger, nodecond, nodecond_signal, nodelock, nodemap, Sync::StatusHandler< Status_t >::Post(), remotequeueholder, RUNNING, server, status, AutoLock< Lockable >::Unlock(), useremote, Sync::ReentrantCondition::Wait(), and Logger::Warn().

Referenced by Kernel().

534  {
535  FUNCBEGIN;
537  try {
538  context->SignalKernelStart(kernelkey);
539  if (useremote) {
540  while (status.Get() == RUNNING) {
541  ClearGarbage();
542  remotequeueholder->Cleanup();
543  server->Poll();
544  }
545  } else {
546  ClearGarbage();
548  while (status.Get() == RUNNING) {
549  if (!nodecond_signal) {
551  nodecond_signal = false;
552  }
553  arlock.Unlock();
554  ClearGarbage();
555  arlock.Lock();
556  }
557  }
558  } catch (const ShutdownException &e) {
559  logger.Warn("Kernel forced shutdown");
560  }
561  if (useremote) {
562  server->Close();
563  remotequeueholder->Shutdown();
564  }
565  {
567  NodeMap mapcopy = nodemap;
568  arlock.Unlock();
569  NodeMap::iterator nitr = mapcopy.begin();
570  while (nitr != mapcopy.end()) {
571  (nitr++)->second->NotifyTerminate();
572  }
573  ClearGarbage();
574  arlock.Lock();
575  // Wait for all nodes to end
576  while (!nodemap.empty()) {
577  if (!nodecond_signal) {
579  nodecond_signal = false;
580  }
581  arlock.Unlock();
582  ClearGarbage();
583  arlock.Lock();
584  }
585  }
586  ClearGarbage();
587  context->SignalKernelEnd(kernelkey);
588  status.Post(DONE);
589  FUNCEND;
590  return 0;
591  }
void Wait(ReentrantLock &lock) const
shared_ptr< Context > context
Definition: Kernel.h:349
#define FUNCEND
Definition: Kernel.cc:48
#define FUNCBEGIN
Definition: Kernel.cc:47
bool useremote
Definition: Kernel.h:352
Sync::StatusHandler< KernelStatus_t > status
Definition: Kernel.h:341
Status_t Get() const
Definition: StatusHandler.h:84
auto_ptr< ConnectionServer > server
Definition: Kernel.h:350
NodeMap nodemap
Definition: Kernel.h:362
bool CompareAndPost(Status_t oldStatus, Status_t newStatus)
Definition: StatusHandler.h:96
Key_t kernelkey
Definition: Kernel.h:347
Logger logger
Definition: Kernel.h:348
void ClearGarbage()
Definition: Kernel.cc:516
std::map< Key_t, shared_ptr< PseudoNode > > NodeMap
Definition: Kernel.h:355
void Warn(const char *fmt,...)
Definition: Logger.cc:168
Sync::ReentrantCondition nodecond
Definition: Kernel.h:360
auto_ptr< RemoteQueueHolder > remotequeueholder
Definition: Kernel.h:351
Sync::ReentrantLock nodelock
Definition: Kernel.h:359
void Post(Status_t newStatus)
Definition: StatusHandler.h:75
bool nodecond_signal
Definition: Kernel.h:361

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

shared_ptr<Context> CPN::Kernel::GetContext ( ) const
inlinevirtual
Returns
the context this kernel is using

Implements CPN::KernelBase.

Definition at line 227 of file Kernel.h.

Referenced by CPN::NodeBase::EntryPoint().

227 { return context; }
shared_ptr< Context > context
Definition: Kernel.h:349

+ Here is the caller graph for this function:

shared_ptr< QueueReader > CPN::Kernel::GetExternalIQueue ( const std::string &  name)

Get a IQueue for the given external reader.

Parameters
namethe name of the reader
Returns
A pointer to an object that IQueue will accept.

Definition at line 211 of file Kernel.cc.

References context, nodelock, nodemap, and AutoLock< Lockable >::Unlock().

211  {
212  Key_t key = context->GetNodeKey(name);
214  NodeMap::iterator entry = nodemap.find(key);
215  if (entry == nodemap.end() || !entry->second->IsPurePseudo()) {
216  throw std::invalid_argument("Not a valid external writer.");
217  }
218  shared_ptr<ExternalEndpoint> pnode = dynamic_pointer_cast<ExternalEndpoint>(entry->second);
219  arlock.Unlock();
220  if (pnode && !pnode->IsWriter()) {
221  return pnode->GetIQueue(name);
222  }
223  throw std::invalid_argument("Not a valid external writer.");
224  }
shared_ptr< Context > context
Definition: Kernel.h:349
uint64_t Key_t
Definition: common.h:79
NodeMap nodemap
Definition: Kernel.h:362
Sync::ReentrantLock nodelock
Definition: Kernel.h:359

+ Here is the call graph for this function:

shared_ptr< QueueWriter > CPN::Kernel::GetExternalOQueue ( const std::string &  name)

Get an OQueue for the given external writer.

Parameters
namethe name of the writer
Returns
A pointer to an object that oQueue will accept.

Definition at line 196 of file Kernel.cc.

References context, nodelock, nodemap, and AutoLock< Lockable >::Unlock().

196  {
197  Key_t key = context->GetNodeKey(name);
199  NodeMap::iterator entry = nodemap.find(key);
200  if (entry == nodemap.end() || !entry->second->IsPurePseudo()) {
201  throw std::invalid_argument("Not a valid external reader.");
202  }
203  shared_ptr<ExternalEndpoint> pnode = dynamic_pointer_cast<ExternalEndpoint>(entry->second);
204  arlock.Unlock();
205  if (pnode && pnode->IsWriter()) {
206  return pnode->GetOQueue(name);
207  }
208  throw std::invalid_argument("Not a valid external reader.");
209  }
shared_ptr< Context > context
Definition: Kernel.h:349
uint64_t Key_t
Definition: common.h:79
NodeMap nodemap
Definition: Kernel.h:362
Sync::ReentrantLock nodelock
Definition: Kernel.h:359

+ Here is the call graph for this function:

Key_t CPN::Kernel::GetKey ( ) const
inline
Returns
the unique key for this kernel

Definition at line 223 of file Kernel.h.

223 { return kernelkey; }
Key_t kernelkey
Definition: Kernel.h:347
LoggerOutput* CPN::Kernel::GetLogger ( )
inline
Returns
A pointer to the logger instance used by the kernel.

Definition at line 214 of file Kernel.h.

214 { return &logger; }
Logger logger
Definition: Kernel.h:348
const std::string CPN::Kernel::GetName ( ) const
inline
Returns
the name of this kernel.

Definition at line 219 of file Kernel.h.

219 { return kernelname; }
const std::string kernelname
Definition: Kernel.h:346
NodeFactory* CPN::Kernel::GetNodeFactory ( const std::string &  nodetype)
inline

Return a pointer to the node factory that produces the given node type. May load a shared library to find the node factory.

If there is no node factory available already, attempt to find a function named "cpninitnodetype" and call it to get the factory. If there is no function named this, then quiery the library loader for a library with the name nodetype then try again. If all this fails then throw a runtime_error exception.

Parameters
nodetypethe type of the node
Returns
a node factory for the node type

Definition at line 276 of file Kernel.h.

Referenced by InternalCreateNode().

276 { return nodeloader.GetFactory(nodetype); }
NodeLoader nodeloader
Definition: Kernel.h:353
NodeFactory * GetFactory(const std::string &nodename)
Definition: NodeLoader.cc:99

+ Here is the caller graph for this function:

bool CPN::Kernel::GrowQueueMaxThreshold ( )
virtual

Whether the queue should grow when a threshold larger than the current max threshold is requested.

Returns
true or false (default true)

Implements CPN::KernelBase.

Definition at line 666 of file Kernel.cc.

References datalock, and growmaxthresh.

666  {
668  return growmaxthresh;
669  }
Sync::ReentrantLock datalock
Definition: Kernel.h:366
bool growmaxthresh
Definition: Kernel.h:369
bool CPN::Kernel::GrowQueueMaxThreshold ( bool  grow)

Definition at line 671 of file Kernel.cc.

References datalock, and growmaxthresh.

671  {
673  return growmaxthresh = grow;
674  }
Sync::ReentrantLock datalock
Definition: Kernel.h:366
bool growmaxthresh
Definition: Kernel.h:369
void CPN::Kernel::InternalCreateNode ( NodeAttr nodeattr)
private

Definition at line 482 of file Kernel.cc.

References ASSERT, CPN::NodeFactory::Create(), FUNCBEGIN, Sync::StatusHandler< Status_t >::Get(), CPN::NodeAttr::GetKey(), GetNodeFactory(), CPN::NodeAttr::GetTypeName(), nodelock, nodemap, RUNNING, and status.

Referenced by CreateNode(), and RemoteCreateNode().

482  {
484  FUNCBEGIN;
485  ASSERT(status.Get() == RUNNING);
486  NodeFactory *factory = GetNodeFactory(nodeattr.GetTypeName());
487  if (!factory) {
488  throw std::invalid_argument("No such node type " + nodeattr.GetTypeName());
489  }
490  shared_ptr<NodeBase> node = factory->Create(*this, nodeattr);
491  nodemap.insert(std::make_pair(nodeattr.GetKey(), node));
492  node->Start();
493  }
#define FUNCBEGIN
Definition: Kernel.cc:47
Sync::StatusHandler< KernelStatus_t > status
Definition: Kernel.h:341
Status_t Get() const
Definition: StatusHandler.h:84
NodeMap nodemap
Definition: Kernel.h:362
Sync::ReentrantLock nodelock
Definition: Kernel.h:359
NodeFactory * GetNodeFactory(const std::string &nodetype)
Return a pointer to the node factory that produces the given node type. May load a shared library to ...
Definition: Kernel.h:276
virtual shared_ptr< NodeBase > Create(Kernel &ker, const NodeAttr &attr)=0
#define ASSERT(exp,...)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool CPN::Kernel::IsTerminated ( )
virtual
Returns
true if Terminate has been called

Implements CPN::KernelBase.

Definition at line 124 of file Kernel.cc.

References context.

124  {
125  return context->IsTerminated();
126  }
shared_ptr< Context > context
Definition: Kernel.h:349
void CPN::Kernel::LoadNodeList ( const std::string &  filename)
inline

Load a node list into the node loader.

Definition at line 258 of file Kernel.h.

258 { nodeloader.LoadNodeList(filename); }
NodeLoader nodeloader
Definition: Kernel.h:353
void LoadNodeList(const std::string &filename)
Definition: NodeLoader.cc:68
void CPN::Kernel::LoadSharedLib ( const std::string &  libname)
inline

Attempts to load the given dynamic library and make the symbols inside available to be searched for node types. The library will be unloaded on distruction.

Parameters
libnamethe library name and path

Definition at line 254 of file Kernel.h.

254 { nodeloader.LoadSharedLib(libname); }
NodeLoader nodeloader
Definition: Kernel.h:353
void LoadSharedLib(const std::string &libname)
Definition: NodeLoader.cc:57
void CPN::Kernel::LogState ( )
private

Definition at line 614 of file Kernel.cc.

References DONE, Logger::Error(), garbagenodes, Sync::StatusHandler< Status_t >::Get(), INITIALIZED, kernelkey, kernelname, logger, nodemap, RUNNING, server, status, TERMINATE, and useremote.

614  {
615  // Note that this function does not aquire the lock...
616  // this is because it is meant to be called from the debugger while
617  // the application is halted and may already be in the lock or not.
618  std::string statename;
619  switch (status.Get()) {
620  case INITIALIZED:
621  statename = "initialized";
622  break;
623  case RUNNING:
624  statename = "running";
625  break;
626  case TERMINATE:
627  statename = "terminated";
628  break;
629  case DONE:
630  statename = "done";
631  break;
632  }
633  logger.Error("Kernel %s (%llu) in state %s", kernelname.c_str(), kernelkey, statename.c_str());
634  logger.Error("Active nodes: %u, Garbage nodes: %u", nodemap.size(), garbagenodes.size());
635  if (useremote) {
636  server->LogState();
637  }
638  NodeMap::iterator node = nodemap.begin();
639  while (node != nodemap.end()) {
640  node->second->LogState();
641  ++node;
642  }
643  }
NodeList garbagenodes
Definition: Kernel.h:364
const std::string kernelname
Definition: Kernel.h:346
bool useremote
Definition: Kernel.h:352
Sync::StatusHandler< KernelStatus_t > status
Definition: Kernel.h:341
Status_t Get() const
Definition: StatusHandler.h:84
auto_ptr< ConnectionServer > server
Definition: Kernel.h:350
void Error(const char *fmt,...)
Definition: Logger.cc:159
NodeMap nodemap
Definition: Kernel.h:362
Key_t kernelkey
Definition: Kernel.h:347
Logger logger
Definition: Kernel.h:348

+ Here is the call graph for this function:

void CPN::Kernel::NodeTerminated ( Key_t  key)

Called by the node in the cleanup routine. TO BE CALLED ONLY BY THE CPN INTERNALS.

Definition at line 495 of file Kernel.cc.

References ASSERT, context, DONE, FUNCBEGIN, garbagelock, garbagenodes, Sync::StatusHandler< Status_t >::Get(), logger, nodecond, nodecond_signal, nodelock, nodemap, SendWakeup(), Sync::ReentrantCondition::Signal(), status, and Logger::Warn().

Referenced by DestroyExternalEndpoint(), and CPN::NodeBase::EntryPoint().

495  {
496  context->SignalNodeEnd(key);
498  FUNCBEGIN;
499  if (status.Get() == DONE) {
500  logger.Warn("Nodes running after shutdown");
501  } else {
502  NodeMap::iterator entry = nodemap.find(key);
503  ASSERT(entry != nodemap.end());
504  shared_ptr<PseudoNode> node = entry->second;
505  nodemap.erase(entry);
506  {
508  garbagenodes.push_back(node);
509  }
510  SendWakeup();
511  }
512  nodecond.Signal();
513  nodecond_signal = true;
514  }
shared_ptr< Context > context
Definition: Kernel.h:349
NodeList garbagenodes
Definition: Kernel.h:364
Sync::ReentrantLock garbagelock
Definition: Kernel.h:363
#define FUNCBEGIN
Definition: Kernel.cc:47
Sync::StatusHandler< KernelStatus_t > status
Definition: Kernel.h:341
Status_t Get() const
Definition: StatusHandler.h:84
NodeMap nodemap
Definition: Kernel.h:362
void SendWakeup()
Definition: Kernel.cc:525
Logger logger
Definition: Kernel.h:348
void Warn(const char *fmt,...)
Definition: Logger.cc:168
Sync::ReentrantCondition nodecond
Definition: Kernel.h:360
Sync::ReentrantLock nodelock
Definition: Kernel.h:359
#define ASSERT(exp,...)
bool nodecond_signal
Definition: Kernel.h:361

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::Kernel::NotifyTerminate ( )
virtual

These functions are called by the context to create things remotely. Should not be called by anyone but the context!

Reimplemented from CPN::KernelBase.

Definition at line 132 of file Kernel.cc.

References Sync::StatusHandler< Status_t >::CompareAndPost(), FUNCBEGIN, RUNNING, SendWakeup(), status, and TERMINATE.

132  {
133  FUNCBEGIN;
135  SendWakeup();
136  }
137  }
#define FUNCBEGIN
Definition: Kernel.cc:47
Sync::StatusHandler< KernelStatus_t > status
Definition: Kernel.h:341
void SendWakeup()
Definition: Kernel.cc:525
bool CompareAndPost(Status_t oldStatus, Status_t newStatus)
Definition: StatusHandler.h:96

+ Here is the call graph for this function:

Kernel& CPN::Kernel::operator= ( const Kernel )
private
void CPN::Kernel::RegisterNodeFactory ( shared_ptr< NodeFactory factory)
inline

A function that lets others register node factories.

Parameters
factorythe node factory

Definition at line 281 of file Kernel.h.

281 { nodeloader.RegisterFactory(factory); }
NodeLoader nodeloader
Definition: Kernel.h:353
void RegisterFactory(shared_ptr< NodeFactory > factory)
Definition: NodeLoader.cc:110
void CPN::Kernel::RemoteCreateNode ( NodeAttr  attr)
virtual

Reimplemented from CPN::KernelBase.

Definition at line 608 of file Kernel.cc.

References ASSERT, FUNCBEGIN, InternalCreateNode(), and useremote.

608  {
609  FUNCBEGIN;
610  ASSERT(useremote);
611  InternalCreateNode(attr);
612  }
#define FUNCBEGIN
Definition: Kernel.cc:47
bool useremote
Definition: Kernel.h:352
void InternalCreateNode(NodeAttr &nodeattr)
Definition: Kernel.cc:482
#define ASSERT(exp,...)

+ Here is the call graph for this function:

void CPN::Kernel::RemoteCreateQueue ( SimpleQueueAttr  attr)
virtual

Reimplemented from CPN::KernelBase.

Definition at line 603 of file Kernel.cc.

References ASSERT, CreateLocalQueue(), FUNCBEGIN, and useremote.

603  {
604  FUNCBEGIN;
605  ASSERT(useremote);
606  CreateLocalQueue(attr);
607  }
#define FUNCBEGIN
Definition: Kernel.cc:47
bool useremote
Definition: Kernel.h:352
void CreateLocalQueue(const SimpleQueueAttr &attr)
Definition: Kernel.cc:457
#define ASSERT(exp,...)

+ Here is the call graph for this function:

void CPN::Kernel::RemoteCreateReader ( SimpleQueueAttr  attr)
virtual

Reimplemented from CPN::KernelBase.

Definition at line 598 of file Kernel.cc.

References ASSERT, CreateReaderEndpoint(), FUNCBEGIN, and useremote.

598  {
599  FUNCBEGIN;
600  ASSERT(useremote);
601  CreateReaderEndpoint(attr);
602  }
#define FUNCBEGIN
Definition: Kernel.cc:47
bool useremote
Definition: Kernel.h:352
void CreateReaderEndpoint(const SimpleQueueAttr &attr)
Definition: Kernel.cc:363
#define ASSERT(exp,...)

+ Here is the call graph for this function:

void CPN::Kernel::RemoteCreateWriter ( SimpleQueueAttr  attr)
virtual

Reimplemented from CPN::KernelBase.

Definition at line 593 of file Kernel.cc.

References ASSERT, CreateWriterEndpoint(), FUNCBEGIN, and useremote.

593  {
594  FUNCBEGIN;
595  ASSERT(useremote);
596  CreateWriterEndpoint(attr);
597  }
#define FUNCBEGIN
Definition: Kernel.cc:47
bool useremote
Definition: Kernel.h:352
void CreateWriterEndpoint(const SimpleQueueAttr &attr)
Definition: Kernel.cc:410
#define ASSERT(exp,...)

+ Here is the call graph for this function:

void CPN::Kernel::SearchDirectory ( const std::string &  dirname)
inline

Add all node lists in the given directory

Definition at line 262 of file Kernel.h.

262 { nodeloader.SearchDirectory(dirname); }
NodeLoader nodeloader
Definition: Kernel.h:353
void SearchDirectory(const std::string &dirname)
Definition: NodeLoader.cc:78
void CPN::Kernel::SendWakeup ( )
private

Definition at line 525 of file Kernel.cc.

References nodecond, nodecond_signal, nodelock, server, Sync::ReentrantCondition::Signal(), and useremote.

Referenced by NodeTerminated(), and NotifyTerminate().

525  {
527  if (useremote) {
528  server->Wakeup();
529  }
530  nodecond.Signal();
531  nodecond_signal = true;
532  }
bool useremote
Definition: Kernel.h:352
auto_ptr< ConnectionServer > server
Definition: Kernel.h:350
Sync::ReentrantCondition nodecond
Definition: Kernel.h:360
Sync::ReentrantLock nodelock
Definition: Kernel.h:359
bool nodecond_signal
Definition: Kernel.h:361

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool CPN::Kernel::SwallowBrokenQueueExceptions ( )
virtual

Whether the node should by default swallow the broken queue exceptions or let them propigate as an error.

Returns
true or false

Implements CPN::KernelBase.

Definition at line 656 of file Kernel.cc.

References datalock, and swallowbrokenqueue.

Referenced by CPN::NodeBase::EntryPoint().

656  {
658  return swallowbrokenqueue;
659  }
bool swallowbrokenqueue
Definition: Kernel.h:368
Sync::ReentrantLock datalock
Definition: Kernel.h:366

+ Here is the caller graph for this function:

bool CPN::Kernel::SwallowBrokenQueueExceptions ( bool  sbqe)

Definition at line 661 of file Kernel.cc.

References datalock, and swallowbrokenqueue.

661  {
663  return swallowbrokenqueue = sbqe;
664  }
bool swallowbrokenqueue
Definition: Kernel.h:368
Sync::ReentrantLock datalock
Definition: Kernel.h:366
void CPN::Kernel::Terminate ( )

For the kernel to terminate. Returns immediately, use wait to wait for completion.

Definition at line 120 of file Kernel.cc.

References context.

Referenced by ~Kernel().

120  {
121  context->Terminate();
122  }
shared_ptr< Context > context
Definition: Kernel.h:349

+ Here is the caller graph for this function:

bool CPN::Kernel::UseD4R ( )
virtual

Whether or not D4R should be used.

These values may be cached by queues and nodes. Setting these values after any node or queue has been created may give undeterministic behavior.

Returns
true or false (default true)

Implements CPN::KernelBase.

Definition at line 646 of file Kernel.cc.

References datalock, and useD4R.

646  {
648  return useD4R;
649  }
bool useD4R
Definition: Kernel.h:367
Sync::ReentrantLock datalock
Definition: Kernel.h:366
bool CPN::Kernel::UseD4R ( bool  u)

Definition at line 651 of file Kernel.cc.

References datalock, and useD4R.

651  {
653  return useD4R = u;
654  }
bool useD4R
Definition: Kernel.h:367
Sync::ReentrantLock datalock
Definition: Kernel.h:366
void CPN::Kernel::Wait ( )

Waits until the main loop terminates. Will not happen until a terminate signal or Terminate is called. Use WaitForNode if you wish to wait for the nodes to be done.

Definition at line 111 of file Kernel.cc.

References Sync::StatusHandler< Status_t >::CompareAndWait(), DONE, FUNCBEGIN, FUNCEND, Sync::StatusHandler< Status_t >::Get(), and status.

Referenced by ~Kernel().

111  {
112  FUNCBEGIN;
113  KernelStatus_t s = status.Get();
114  while (s != DONE) {
115  s = status.CompareAndWait(s);
116  }
117  FUNCEND;
118  }
KernelStatus_t
Definition: Kernel.h:56
#define FUNCEND
Definition: Kernel.cc:48
#define FUNCBEGIN
Definition: Kernel.cc:47
Sync::StatusHandler< KernelStatus_t > status
Definition: Kernel.h:341
Status_t Get() const
Definition: StatusHandler.h:84
Status_t CompareAndWait(Status_t oldStatus) const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::Kernel::WaitForAllNodes ( )

Waits until there are no running nodes.

Definition at line 242 of file Kernel.cc.

References context.

242  {
243  context->WaitForAllNodeEnd();
244  }
shared_ptr< Context > context
Definition: Kernel.h:349
void CPN::Kernel::WaitForNode ( const std::string &  nodename)

Check for the existance of the given node. Returns when it detects that the given node does not exist.

Parameters
nodenamethe name of the node to wait for
Exceptions
ShutdownExceptionif the kernel has shutdown

Definition at line 237 of file Kernel.cc.

References context, and FUNCBEGIN.

237  {
238  FUNCBEGIN;
239  context->WaitForNodeEnd(nodename);
240  }
shared_ptr< Context > context
Definition: Kernel.h:349
#define FUNCBEGIN
Definition: Kernel.cc:47
void CPN::Kernel::WaitForNodeStart ( const std::string &  nodename)

Wait for the given node to start.

Parameters
nodenamethe name of the node to wait for

Definition at line 246 of file Kernel.cc.

References context, and FUNCBEGIN.

246  {
247  FUNCBEGIN;
248  context->WaitForNodeStart(nodename);
249  }
shared_ptr< Context > context
Definition: Kernel.h:349
#define FUNCBEGIN
Definition: Kernel.cc:47

Member Data Documentation

shared_ptr<Context> CPN::Kernel::context
private
Sync::ReentrantLock CPN::Kernel::datalock
private

Definition at line 366 of file Kernel.h.

Referenced by GrowQueueMaxThreshold(), SwallowBrokenQueueExceptions(), and UseD4R().

Sync::ReentrantLock CPN::Kernel::garbagelock
private

Definition at line 363 of file Kernel.h.

Referenced by ClearGarbage(), and NodeTerminated().

NodeList CPN::Kernel::garbagenodes
private

Definition at line 364 of file Kernel.h.

Referenced by ClearGarbage(), LogState(), and NodeTerminated().

bool CPN::Kernel::growmaxthresh
private

Definition at line 369 of file Kernel.h.

Referenced by GrowQueueMaxThreshold().

Key_t CPN::Kernel::kernelkey
private

Definition at line 347 of file Kernel.h.

Referenced by CreateExternalEndpoint(), CreateNode(), CreateQueue(), EntryPoint(), Kernel(), and LogState().

const std::string CPN::Kernel::kernelname
private

Definition at line 346 of file Kernel.h.

Referenced by Kernel(), and LogState().

Logger CPN::Kernel::logger
private

Definition at line 348 of file Kernel.h.

Referenced by EntryPoint(), Kernel(), LogState(), and NodeTerminated().

Sync::ReentrantCondition CPN::Kernel::nodecond
private

Definition at line 360 of file Kernel.h.

Referenced by EntryPoint(), NodeTerminated(), and SendWakeup().

bool CPN::Kernel::nodecond_signal
private

Definition at line 361 of file Kernel.h.

Referenced by EntryPoint(), NodeTerminated(), and SendWakeup().

NodeLoader CPN::Kernel::nodeloader
private

Definition at line 353 of file Kernel.h.

Referenced by Kernel().

Sync::ReentrantLock CPN::Kernel::nodelock
private
NodeMap CPN::Kernel::nodemap
private
auto_ptr<RemoteQueueHolder> CPN::Kernel::remotequeueholder
private

Definition at line 351 of file Kernel.h.

Referenced by CreateReaderEndpoint(), CreateWriterEndpoint(), EntryPoint(), and Kernel().

auto_ptr<ConnectionServer> CPN::Kernel::server
private
Sync::StatusHandler<KernelStatus_t> CPN::Kernel::status
private
bool CPN::Kernel::swallowbrokenqueue
private

Definition at line 368 of file Kernel.h.

Referenced by SwallowBrokenQueueExceptions().

auto_ptr<Pthread> CPN::Kernel::thread
private

Definition at line 345 of file Kernel.h.

Referenced by Kernel(), and ~Kernel().

bool CPN::Kernel::useD4R
private

Definition at line 367 of file Kernel.h.

Referenced by UseD4R().

bool CPN::Kernel::useremote
private

The documentation for this class was generated from the following files: