CPN
Computational Process Networks
D4RNode.cc
Go to the documentation of this file.
1 //=============================================================================
2 // Computational Process Networks class library
3 // Copyright (C) 1997-2006 Gregory E. Allen and The University of Texas
4 //
5 // This library is free software; you can redistribute it and/or modify it
6 // under the terms of the GNU Library General Public License as published
7 // by the Free Software Foundation; either version 2 of the License, or
8 // (at your option) any later version.
9 //
10 // This library is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 // Library General Public License for more details.
14 //
15 // The GNU Public License is available in the file LICENSE, or you
16 // can write to the Free Software Foundation, Inc., 59 Temple Place -
17 // Suite 330, Boston, MA 02111-1307, USA, or you can find it on the
18 // World Wide Web at http://www.fsf.org.
19 //=============================================================================
23 #include "common_priv.h"
24 #include <cpn/d4r/D4RNode.h>
25 #include <cpn/d4r/D4RQueue.h>
26 #include <cpn/utils/AutoLock.h>
27 #include <algorithm>
28 
29 #if D4RTRACE
30 #include <stdio.h>
31 #define DEBUG(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
32 #else
33 #define DEBUG(fmt, ...)
34 #endif
35 
36 namespace D4R {
37 
38  Node::Node(uint64_t key)
39  : publicTag(key),
40  privateTag(key)
41  {
42  }
43 
45 
48  return publicTag;
49  }
50 
51  void Node::SetPublicTag(const Tag &t) {
53  publicTag = t;
54  }
55 
58  return privateTag;
59  }
60 
61  void Node::SetPrivateTag(const Tag &t) {
63  privateTag = t;
64  }
65 
66  void Node::Block(const Tag &t, unsigned qsize) {
68  privateTag.QueueSize(qsize);
69  privateTag.Count(std::max(publicTag.Count(), t.Count()) + 1);
70  DEBUG("Node %llu:%llu block %d\n", privateTag.Count(), privateTag.Key(), (int)privateTag.QueueSize());
72  al.Unlock();
74  }
75 
76  bool Node::Transmit(const Tag &t) {
78  if (publicTag < t) {
79 
80  DEBUG("Transfer: publicTag < t\n\tPrivate: (%llu, %llu, %d, %llu)\n\tPublic: (%llu, %llu, %d, %llu)\n\t t: (%llu, %llu, %d, %llu)\n",
83  t.Count(), t.Key(), (int)t.QueueSize(), t.QueueKey());
84 
85  uint128_t priority = std::min(privateTag.Priority(), t.Priority());
86  publicTag = t;
87  publicTag.Priority(priority);
88  al.Unlock();
90  } else if (publicTag == t) {
91 
92  DEBUG("Transfer: publicTag == t\n\tPrivate: (%llu, %llu, %d, %llu)\n\tPublic: (%llu, %llu, %d, %llu)\n\t t: (%llu, %llu, %d, %llu)\n",
95  t.Count(), t.Key(), (int)t.QueueSize(), t.QueueKey());
96 
97  return publicTag.Priority() == privateTag.Priority();
98  } else {
99  DEBUG("Transfer: publicTag > t NOP\n\tPrivate: (%llu, %llu, %d, %llu)\n\tPublic: (%llu, %llu, %d, %llu)\n\t t: (%llu, %llu, %d, %llu)\n",
102  t.Count(), t.Key(), (int)t.QueueSize(), t.QueueKey());
103  }
104  return false;
105  }
106 
107  void Node::AddReader(weak_ptr<QueueBase> q) {
109  readerlist.push_back(q);
110  }
111  void Node::AddWriter(weak_ptr<QueueBase> q) {
113  writerlist.push_back(q);
114  }
115 
116  void SignalReader(shared_ptr<QueueBase> q) {
117  q->SignalReaderTagChanged();
118  }
119 
120  void SignalWriter(shared_ptr<QueueBase> q) {
121  q->SignalWriterTagChanged();
122  }
123 
124  typedef std::list<weak_ptr<QueueBase> > QueueList;
125  void GetQueues(QueueList &qlist, std::list<shared_ptr<QueueBase> > &out) {
126  QueueList::iterator itr, end;
127  itr = qlist.begin();
128  end = qlist.end();
129  while (itr != end) {
130  shared_ptr<QueueBase> q = itr->lock();
131  if (q) {
132  out.push_back(q);
133  ++itr;
134  } else {
135  itr = qlist.erase(itr);
136  }
137  }
138  }
139 
141  std::list<shared_ptr<QueueBase> > readers;
142  std::list<shared_ptr<QueueBase> > writers;
143  {
145  GetQueues(readerlist, readers);
146  GetQueues(writerlist, writers);
147  }
148  std::for_each(readers.begin(), readers.end(), &SignalReader);
149  std::for_each(writers.begin(), writers.end(), &SignalWriter);
150  }
151 
152 }
153 
void AddReader(weak_ptr< QueueBase > q)
Definition: D4RNode.cc:107
void SetPrivateTag(const Tag &t)
Definition: D4RNode.cc:61
Node(uint64_t key)
Definition: D4RNode.cc:38
#define DEBUG(fmt,...)
Definition: D4RNode.cc:33
Tag GetPrivateTag() const
Definition: D4RNode.cc:56
void SignalWriter(shared_ptr< QueueBase > q)
Definition: D4RNode.cc:120
uint64_t QueueKey() const
Definition: D4RTag.h:71
void Unlock()
Definition: AutoLock.h:61
Tag publicTag
Definition: D4RNode.h:126
std::list< weak_ptr< QueueBase > > writerlist
Definition: D4RNode.h:130
uint64_t Key() const
Definition: D4RTag.h:62
void SetPublicTag(const Tag &t)
Definition: D4RNode.cc:51
std::list< weak_ptr< QueueBase > > QueueList
Definition: D4RNode.cc:124
PthreadMutex taglock
Definition: D4RNode.h:128
Tag GetPublicTag() const
Definition: D4RNode.cc:46
void GetQueues(QueueList &qlist, std::list< shared_ptr< QueueBase > > &out)
Definition: D4RNode.cc:125
void SignalTagChanged()
Definition: D4RNode.cc:140
void SignalReader(shared_ptr< QueueBase > q)
Definition: D4RNode.cc:116
const uint128_t & Priority() const
Definition: D4RTag.h:76
void AddWriter(weak_ptr< QueueBase > q)
Definition: D4RNode.cc:111
std::list< weak_ptr< QueueBase > > readerlist
Definition: D4RNode.h:129
Tag privateTag
Definition: D4RNode.h:127
uint64_t QueueSize() const
Definition: D4RTag.h:66
void Block(const Tag &t, unsigned qsize)
Definition: D4RNode.cc:66
bool Transmit(const Tag &t)
Definition: D4RNode.cc:76
uint64_t Count() const
Definition: D4RTag.h:58
Automatic locking on the stack.