CPN
Computational Process Networks
PseudoNode.cc
Go to the documentation of this file.
1 //=============================================================================
2 // Computational Process Networks class library
3 // Copyright (C) 1997-2006 Gregory E. Allen and The University of Texas
4 //
5 // This library is free software; you can redistribute it and/or modify it
6 // under the terms of the GNU Library General Public License as published
7 // by the Free Software Foundation; either version 2 of the License, or
8 // (at your option) any later version.
9 //
10 // This library is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 // Library General Public License for more details.
14 //
15 // The GNU Public License is available in the file LICENSE, or you
16 // can write to the Free Software Foundation, Inc., 59 Temple Place -
17 // Suite 330, Boston, MA 02111-1307, USA, or you can find it on the
18 // World Wide Web at http://www.fsf.org.
19 //=============================================================================
20 #include "common_priv.h"
21 #include <cpn/bits/PseudoNode.h>
22 #include <cpn/d4r/D4RNode.h>
23 #include <cpn/Context.h>
24 #include <cpn/bits/QueueReader.h>
25 #include <cpn/bits/QueueWriter.h>
26 #include "QueueBase.h"
27 
28 
29 namespace CPN {
30 
31  PseudoNode::PseudoNode(const std::string &n, Key_t k, shared_ptr<Context> ctx)
32  : logger(ctx.get(), Logger::WARNING, n),
33  name(n),
34  nodekey(k),
35  d4rnode(new D4R::Node(k)),
36  context(ctx)
37  {
38  }
39 
41  }
42 
43  shared_ptr<QueueReader> PseudoNode::GetIQueue(const std::string &portname) {
44  context->CheckTerminated();
45  Key_t ekey = context->GetCreateReaderKey(nodekey, portname);
46  return GetReader(ekey);
47  }
48 
49  shared_ptr<QueueWriter> PseudoNode::GetOQueue(const std::string &portname) {
50  context->CheckTerminated();
51  Key_t ekey = context->GetCreateWriterKey(nodekey, portname);
52  return GetWriter(ekey);
53  }
54 
55  void PseudoNode::CreateReader(shared_ptr<QueueBase> q) {
57  Key_t readerkey = q->GetReaderKey();
58  d4rnode->AddReader(q);
59  q->SetReaderNode(d4rnode);
60  q->SignalReaderTagChanged();
61  ASSERT(readermap.find(readerkey) == readermap.end(), "The reader already exists");
62  shared_ptr<QueueReader> reader;
63  reader = shared_ptr<QueueReader>(new QueueReader(this, q));
64  readermap.insert(std::make_pair(readerkey, reader));
65  cond.Signal();
66  }
67 
68  void PseudoNode::CreateWriter(shared_ptr<QueueBase> q) {
70  Key_t writerkey = q->GetWriterKey();
71  d4rnode->AddWriter(q);
72  q->SetWriterNode(d4rnode);
73  q->SignalWriterTagChanged();
74  ASSERT(writermap.find(writerkey) == writermap.end(), "The writer already exists.");
75  shared_ptr<QueueWriter> writer;
76  writer = shared_ptr<QueueWriter>(new QueueWriter(this, q));
77  writermap.insert(std::make_pair(writerkey, writer));
78  cond.Signal();
79  }
80 
83  ReaderMap readers;
84  readers.swap(readermap);
85  WriterMap writers;
86  writers.swap(writermap);
87  arl.Unlock();
88  for (ReaderMap::iterator i = readers.begin(); i != readers.end(); ++i) {
89  i->second->GetQueue()->ShutdownReader();
90  }
91  for (WriterMap::iterator i = writers.begin(); i != writers.end(); ++i) {
92  i->second->GetQueue()->ShutdownWriter();
93  }
94  readers.clear();
95  writers.clear();
96 
97  }
98 
100  shared_ptr<QueueReader> reader;
102  ReaderMap::iterator entry = readermap.find(ekey);
103  if (entry != readermap.end()) {
104  reader = entry->second;
105  readermap.erase(entry);
106  }
107  arl.Unlock();
108  reader.reset();
109  }
110 
112  shared_ptr<QueueWriter> writer;
114  WriterMap::iterator entry = writermap.find(ekey);
115  if (entry != writermap.end()) {
116  writer = entry->second;
117  writermap.erase(entry);
118  }
119  arl.Unlock();
120  writer.reset();
121  }
122 
125  cond.Signal();
126  WriterMap::iterator witr = writermap.begin();
127  while (witr != writermap.end()) { (witr++)->second->NotifyTerminate(); }
128  ReaderMap::iterator ritr = readermap.begin();
129  while (ritr != readermap.end()) { (ritr++)->second->NotifyTerminate(); }
130  }
131 
132  shared_ptr<QueueReader> PseudoNode::GetReader(Key_t ekey) {
134  shared_ptr<QueueReader> reader;
135  while (!reader) {
136  ReaderMap::iterator entry = readermap.find(ekey);
137  if (entry == readermap.end()) {
138  context->CheckTerminated();
139  cond.Wait(lock);
140  } else {
141  reader = shared_ptr<QueueReader>(entry->second);
142  }
143  }
144  return reader;
145  }
146 
147  shared_ptr<QueueWriter> PseudoNode::GetWriter(Key_t ekey) {
149  shared_ptr<QueueWriter> writer;
150  while (!writer) {
151  WriterMap::iterator entry = writermap.find(ekey);
152  if (entry == writermap.end()) {
153  context->CheckTerminated();
154  cond.Wait(lock);
155  } else {
156  writer = shared_ptr<QueueWriter>(entry->second);
157  }
158  }
159  return writer;
160 
161  }
162 
164  return true;
165  }
166 
168  logger.Error("Logging (key: %llu), %u readers, %u writers",
169  nodekey, readermap.size(), writermap.size());
170  D4R::Tag tag = d4rnode->GetPrivateTag();
171  logger.Error("Private key: (%llu, %llu, %llu, %llu)", tag.Count(), tag.Key(), tag.QueueSize(), tag.QueueKey());
172  tag = d4rnode->GetPublicTag();
173  logger.Error("Public key: (%llu, %llu, %llu, %llu)", tag.Count(), tag.Key(), tag.QueueSize(), tag.QueueKey());
174  ReaderMap::iterator r = readermap.begin();
175  while (r != readermap.end()) {
176  r->second->GetQueue()->LogState();
177  ++r;
178  }
179  WriterMap::iterator w = writermap.begin();
180  while (w != writermap.end()) {
181  w->second->GetQueue()->LogState();
182  ++w;
183  }
184  }
185 }
186 
Logger object that is used for forwarding log messages.
Definition: Logger.h:57
shared_ptr< D4R::Node > d4rnode
Definition: PseudoNode.h:89
void Wait(ReentrantLock &lock) const
Top Representations of generic queues for the CPN library.
void CreateWriter(shared_ptr< QueueBase > q)
for use by the CPN::Kernel to create a new writer endpoint.
Definition: PseudoNode.cc:68
const Key_t nodekey
Definition: PseudoNode.h:88
shared_ptr< QueueReader > GetReader(Key_t ekey)
Definition: PseudoNode.cc:132
shared_ptr< QueueWriter > GetWriter(Key_t ekey)
Definition: PseudoNode.cc:147
virtual void LogState()
For debugging ONLY!
Definition: PseudoNode.cc:167
The Context abstract data type.
PseudoNode(const std::string &name_, Key_t k, shared_ptr< Context > ctx)
Definition: PseudoNode.cc:31
uint64_t QueueKey() const
Definition: D4RTag.h:71
WriterMap writermap
Definition: PseudoNode.h:94
void Error(const char *fmt,...)
Definition: Logger.cc:159
Sync::ReentrantLock lock
Definition: PseudoNode.h:85
void Unlock()
Definition: AutoLock.h:61
uint64_t Key_t
Definition: common.h:79
virtual bool IsPurePseudo()
Definition: PseudoNode.cc:163
std::map< Key_t, shared_ptr< QueueWriter > > WriterMap
Definition: PseudoNode.h:92
uint64_t Key() const
Definition: D4RTag.h:62
ReaderMap readermap
Definition: PseudoNode.h:93
virtual void Shutdown()
Perform actions (like joining a thread) before destruction.
Definition: PseudoNode.cc:81
virtual ~PseudoNode()
Definition: PseudoNode.cc:40
shared_ptr< QueueReader > GetIQueue(const std::string &portname)
This method is for use by the user to aquire a reader endpoint. This function blocks until the CPN::K...
Definition: PseudoNode.cc:43
shared_ptr< QueueWriter > GetOQueue(const std::string &portname)
This method is for use by the user to aquire a writer endpoint. This function blocks until the CPN::K...
Definition: PseudoNode.cc:49
Sync::ReentrantCondition cond
Definition: PseudoNode.h:86
shared_ptr< Context > context
Definition: PseudoNode.h:96
Definition of the reader portion of the CPN queue class.
Definition: QueueReader.h:37
void CreateReader(shared_ptr< QueueBase > q)
for use by the CPN::Kernel to create a new read endpoint.
Definition: PseudoNode.cc:55
The definition of reader end of the queue.
Definition for the Queue writer inteface.
void NotifyTerminate()
Called by the kernel when it is shutting down.
Definition: PseudoNode.cc:123
void ReleaseWriter(Key_t ekey)
Definition: PseudoNode.cc:111
uint64_t QueueSize() const
Definition: D4RTag.h:66
std::map< Key_t, shared_ptr< QueueReader > > ReaderMap
Definition: PseudoNode.h:91
Definition of the writer portion of the CPN queue class.
Definition: QueueWriter.h:37
void ReleaseReader(Key_t ekey)
Definition: PseudoNode.cc:99
uint64_t Count() const
Definition: D4RTag.h:58
#define ASSERT(exp,...)