24 #include "common_priv.h"
42 #ifdef KERNEL_FUNC_TRACE
44 #define FUNCBEGIN printf("%s begin %s\n",__PRETTY_FUNCTION__, kernelname.c_str())
45 #define FUNCEND printf("%s end %s\n",__PRETTY_FUNCTION__, kernelname.c_str())
54 : status(INITIALIZED),
55 kernelname(kattr.GetName()),
57 context(kattr.GetContext()),
58 useremote(kattr.GetRemoteEnabled()),
59 nodecond_signal(false),
60 useD4R(kattr.UseD4R()),
61 swallowbrokenqueue(kattr.SwallowBrokenQueueExceptions()),
62 growmaxthresh(kattr.GrowQueueMaxThreshold())
125 return context->IsTerminated();
188 shared_ptr<PseudoNode> pnode;
191 nodemap.insert(std::make_pair(nodekey, pnode));
193 context->SignalNodeStart(nodekey);
199 NodeMap::iterator entry =
nodemap.find(key);
200 if (entry ==
nodemap.end() || !entry->second->IsPurePseudo()) {
201 throw std::invalid_argument(
"Not a valid external reader.");
203 shared_ptr<ExternalEndpoint> pnode = dynamic_pointer_cast<
ExternalEndpoint>(entry->second);
205 if (pnode && pnode->IsWriter()) {
206 return pnode->GetOQueue(name);
208 throw std::invalid_argument(
"Not a valid external reader.");
214 NodeMap::iterator entry =
nodemap.find(key);
215 if (entry ==
nodemap.end() || !entry->second->IsPurePseudo()) {
216 throw std::invalid_argument(
"Not a valid external writer.");
218 shared_ptr<ExternalEndpoint> pnode = dynamic_pointer_cast<
ExternalEndpoint>(entry->second);
220 if (pnode && !pnode->IsWriter()) {
221 return pnode->GetIQueue(name);
223 throw std::invalid_argument(
"Not a valid external writer.");
229 NodeMap::iterator entry =
nodemap.find(key);
230 if (entry ==
nodemap.end() || !entry->second->IsPurePseudo()) {
231 throw std::invalid_argument(
"Not a valid external endpoint.");
239 context->WaitForNodeEnd(nodename);
248 context->WaitForNodeStart(nodename);
259 throw std::invalid_argument(
260 "The reader side must be specified with the reader key"
261 " or the reader name and node key or the reader name and"
267 throw std::invalid_argument(
"Ether the port key or port name must be specified.");
278 throw std::invalid_argument(
279 "The writer side must be specified with the writer key"
280 " or the writer name and node key or the writer name and"
286 throw std::invalid_argument(
"Ether the port key or port name must be specified.");
299 if (readerkernel == writerkernel) {
303 ASSERT(
useremote,
"Cannot create remote queue without enabling remote operations.");
305 context->SendCreateQueue(readerkernel, attr);
308 ASSERT(
useremote,
"Cannot create remote queue without enabling remote operations.");
312 context->SendCreateWriter(writerkernel, attr);
314 ASSERT(
useremote,
"Cannot create remote queue without enabling remote operations.");
318 context->SendCreateReader(readerkernel, attr);
320 ASSERT(
useremote,
"Cannot create remote queue without enabling remote operations.");
323 context->SendCreateWriter(writerkernel, attr);
324 context->SendCreateReader(readerkernel, attr);
335 static const char threshold_str[] =
"threshold";
336 static const char *threshold_end = threshold_str +
sizeof(threshold_str) - 1;
337 static const char rdma_str[] =
"rdma";
338 static const char *rdma_end = rdma_str +
sizeof(rdma_str) - 1;
340 std::string::iterator h_cur = hint.begin();
341 std::string::iterator h_end = hint.end();
342 for (
const char *cur = &threshold_str[0]; cur != threshold_end; ++cur) {
343 if (h_cur == h_end || *h_cur != *cur) {
351 h_cur = hint.begin();
352 for (
const char *cur = &rdma_str[0]; cur != rdma_end; ++cur) {
353 if (h_cur == h_end || *h_cur != *cur) {
364 ASSERT(
useremote,
"Cannot create remote queue without enabling remote operations.");
367 shared_ptr<RemoteQueueBase> endp;
368 shared_ptr<QueueBase> queue;
371 shared_ptr<RDMAQueue> q = shared_ptr<RDMAQueue>(
385 shared_ptr<RemoteQueue> q = shared_ptr<RemoteQueue>(
399 shared_ptr<PseudoNode> node = entry->second;
401 node = entry->second;
406 if (node) { node->CreateReader(queue); }
407 else { queue->ShutdownReader(); }
411 ASSERT(
useremote,
"Cannot create remote queue without enabling remote operations.");
414 shared_ptr<RemoteQueueBase> endp;
415 shared_ptr<QueueBase> queue;
418 shared_ptr<RDMAQueue> q = shared_ptr<RDMAQueue>(
432 shared_ptr<RemoteQueue> q = shared_ptr<RemoteQueue>(
446 shared_ptr<PseudoNode> node;
448 node = entry->second;
453 if (node) { node->CreateWriter(queue); }
454 else { queue->ShutdownWriter(); }
459 shared_ptr<QueueBase> queue;
464 shared_ptr<PseudoNode> readnode;
465 if (readentry !=
nodemap.end()) {
466 readnode = readentry->second;
470 shared_ptr<PseudoNode> writenode;
471 if (writeentry !=
nodemap.end()) {
472 writenode = writeentry->second;
476 if (writenode) { writenode->CreateWriter(queue); }
477 else { queue->ShutdownWriter(); }
478 if (readnode) { readnode->CreateReader(queue); }
479 else { queue->ShutdownReader(); }
488 throw std::invalid_argument(
"No such node type " + nodeattr.
GetTypeName());
490 shared_ptr<NodeBase> node = factory->
Create(*
this, nodeattr);
502 NodeMap::iterator entry =
nodemap.find(key);
504 shared_ptr<PseudoNode> node = entry->second;
569 NodeMap::iterator nitr = mapcopy.begin();
570 while (nitr != mapcopy.end()) {
571 (nitr++)->second->NotifyTerminate();
618 std::string statename;
621 statename =
"initialized";
624 statename =
"running";
627 statename =
"terminated";
638 NodeMap::iterator node =
nodemap.begin();
639 while (node !=
nodemap.end()) {
640 node->second->LogState();
void Wait(ReentrantLock &lock) const
shared_ptr< QueueReader > GetExternalIQueue(const std::string &name)
The base definition of all nodes.
shared_ptr< Context > context
bool UseD4R()
Whether or not D4R should be used.
static shared_ptr< Context > Local()
Create a local context.
void LoadSharedLib(const std::string &libname)
void DestroyExternalEndpoint(const std::string &name)
Key_t GetWriterKey() const
SimpleQueueAttr & SetReaderNodeKey(Key_t k)
const std::vector< std::string > & GetNodeLists() const
An abstraction of a socket address with convenience methods.
void CreateExternalReader(const std::string &name)
Create an external reader.
void WaitForNode(const std::string &nodename)
Sync::ReentrantLock garbagelock
virtual ~ExternalEndpoint()
Definition for the kernel object.
The attribute for the Kernel.
void RemoteCreateNode(NodeAttr attr)
const std::string kernelname
void SearchDirectory(const std::string &dirname)
NodeAttr & SetKernelKey(Key_t k)
Sync::StatusHandler< KernelStatus_t > status
std::string GetHostName(bool numerichost=true) const
shared_ptr< QueueWriter > GetExternalOQueue(const std::string &name)
void RemoteCreateQueue(SimpleQueueAttr attr)
NodeAttr & SetKey(Key_t key_)
auto_ptr< ConnectionServer > server
void Error(const char *fmt,...)
Status_t CompareAndWait(Status_t oldStatus) const
void CreateQueue(const QueueAttr &attr)
Create a new queue.
const std::string & GetReaderPort() const
void CreateExternalWriter(const std::string &name)
void CreateReaderEndpoint(const SimpleQueueAttr &attr)
An object to hold references to RemoteQueues so they can continue to work after the node has gone awa...
The exceptions specified for the CPN network.
bool GrowQueueMaxThreshold()
Whether the queue should grow when a threshold larger than the current max threshold is requested...
void WaitForNodeStart(const std::string &nodename)
Wait for the given node to start.
const std::vector< std::string > & GetNodeSearchPaths() const
void CheckTerminated()
Convenience method that checks IsTerminated and if so throws a ShutdownException. ...
const std::string & GetServName() const
void Info(const char *fmt,...)
const std::string & GetWriterNode() const
bool SwallowBrokenQueueExceptions()
Whether the node should by default swallow the broken queue exceptions or let them propigate as an er...
void CreateLocalQueue(const SimpleQueueAttr &attr)
void CreateExternalEndpoint(const std::string &name, bool iswriter)
Kernel(const KernelAttr &kattr)
const std::string & Name() const
PthreadFunctional * CreatePthreadFunctional(T *obj, void *(T::*meth)(void))
const std::string & GetName() const
bool CompareAndPost(Status_t oldStatus, Status_t newStatus)
void NodeTerminated(Key_t key)
Called by the node in the cleanup routine. TO BE CALLED ONLY BY THE CPN INTERNALS.
std::string GetServName() const
Key_t GetReaderKey() const
shared_ptr< Context > context
const std::vector< std::string > & GetSharedLibs() const
void LoadNodeList(const std::string &filename)
void WaitForAllNodes()
Waits until there are no running nodes.
A very simple logging interface.
A version of the ThresholdQueue that provides the CPN Queue interface This queue implementation creat...
An exception indicating that the Kernel has shut down.
const std::string & GetName() const
const std::string & GetWriterPort() const
static QueueHintStatus ParseQueueHintForMBS(std::string hint)
ExternalEndpoint(const std::string &name, Key_t nodekey, shared_ptr< Context > context, bool iswriter_)
std::map< Key_t, shared_ptr< PseudoNode > > NodeMap
Default threshold queue implementation.
void RemoteCreateWriter(SimpleQueueAttr attr)
SimpleQueueAttr & SetWriterNodeKey(Key_t k)
Key_t GetWriterNodeKey() const
static SockAddrList CreateIP(unsigned serv)
Return a list of valid socket address for the given service number or port number.
void RemoteCreateReader(SimpleQueueAttr attr)
void Warn(const char *fmt,...)
auto_ptr< Pthread > thread
void CreateWriterEndpoint(const SimpleQueueAttr &attr)
Sync::ReentrantCondition nodecond
std::vector< SocketAddress > SockAddrList
const std::string & GetReaderNode() const
auto_ptr< RemoteQueueHolder > remotequeueholder
The attributes for a queue.
const std::string & GetTypeName() const
SimpleQueueAttr & SetWriterKey(Key_t k)
Sync::ReentrantLock datalock
Key_t GetReaderNodeKey() const
std::string GetHint() const
Sync::ReentrantLock nodelock
The node factory provides a method for the kernel to create arbitrary user defined Nodes...
NodeFactory * GetNodeFactory(const std::string &nodetype)
Return a pointer to the node factory that produces the given node type. May load a shared library to ...
virtual shared_ptr< NodeBase > Create(Kernel &ker, const NodeAttr &attr)=0
This is a simplified internal representation of the queue attributes needed to create a queue...
const std::string & GetKernel() const
void InternalCreateNode(NodeAttr &nodeattr)
Key_t GetKernelKey() const
const std::string & GetHostName() const
SimpleQueueAttr & SetReaderKey(Key_t k)
void Post(Status_t newStatus)
Key_t CreateNode(const NodeAttr &attr)
Definition of the NodeFactory.