CPN
Computational Process Networks
QueueBase.cc
Go to the documentation of this file.
1 //=============================================================================
2 // Computational Process Networks class library
3 // Copyright (C) 1997-2006 Gregory E. Allen and The University of Texas
4 //
5 // This library is free software; you can redistribute it and/or modify it
6 // under the terms of the GNU Library General Public License as published
7 // by the Free Software Foundation; either version 2 of the License, or
8 // (at your option) any later version.
9 //
10 // This library is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 // Library General Public License for more details.
14 //
15 // The GNU Public License is available in the file LICENSE, or you
16 // can write to the Free Software Foundation, Inc., 59 Temple Place -
17 // Suite 330, Boston, MA 02111-1307, USA, or you can find it on the
18 // World Wide Web at http://www.fsf.org.
19 //=============================================================================
24 #include "common_priv.h"
25 #include "QueueBase.h"
26 #include <cpn/Exceptions.h>
27 #include <cpn/QueueAttr.h>
28 #include <cpn/bits/KernelBase.h>
29 #include <cpn/bits/QueueReleaser.h>
30 #include <cpn/Context.h>
31 #include <sstream>
32 #include <string.h>
33 
34 namespace CPN {
35 
36 
38  : readerkey(attr.GetReaderKey()),
39  writerkey(attr.GetWriterKey()),
40  readshutdown(false),
41  writeshutdown(false),
42  prepad(0),
43  postpad(0),
44  readrequest(0),
45  writerequest(0),
46  enqueuethresh(0),
47  dequeuethresh(0),
48  indequeue(false),
49  inenqueue(false),
50  flushed(false),
51  kernel(k),
52  useD4R(kernel->UseD4R()),
53  logger(kernel->GetContext().get(), Logger::DEBUG),
54  datatype(attr.GetDatatype())
55  {
56  std::ostringstream oss;
57  oss << "Queue(" << writerkey << ", " << readerkey << ")";
58  logger.Name(oss.str());
59  }
60 
62 
63  const void *QueueBase::GetRawDequeuePtr(unsigned thresh, unsigned chan) {
65  AutoLock<QueueBase> al(*this);
66  if (indequeue) { ASSERT(dequeuethresh >= thresh); }
67  else { dequeuethresh = thresh; }
68  while (true) {
69  const void *ptr = InternalGetRawDequeuePtr(thresh, chan);
70  if (ptr ||
72  || flushed) {
73  if (ptr) { indequeue = true; }
74  return ptr;
75  }
77  if (thresh > UnlockedMaxThreshold() && kernel->GrowQueueMaxThreshold()) {
78  //printf("Grow(%u, %u)\n", 2*thresh, thresh);
79  UnlockedGrow(2*thresh, thresh);
80  Signal();
81  } else if (WriteBlocked() && kernel->GrowQueueMaxThreshold()) {
82  UnlockedGrow(writerequest + thresh, thresh);
83  Signal();
84  } else {
85  readrequest = thresh;
86  WaitForData();
87  readrequest = 0;
88  }
89  }
90  }
91 
92  void QueueBase::Dequeue(unsigned count) {
93  if (!GetRawDequeuePtr(count, 0)) { throw BrokenQueueException(readerkey); }
94  AutoLock<QueueBase> al(*this);
95  dequeuethresh = 0;
96  indequeue = false;
98  InternalDequeue(count);
100  }
101 
102  bool QueueBase::RawDequeue(void* data, unsigned count, unsigned numChans, unsigned chanStride) {
103  const void *src = GetRawDequeuePtr(count, 0);
104  char *dest = (char*)data;
105  if (!src) { return false; }
106  memcpy(dest, src, count);
107  for (unsigned chan = 1; chan < numChans; ++chan) {
108  src = GetRawDequeuePtr(count, chan);
109  ASSERT(src);
110  dest += chanStride;
111  memcpy(dest, src, count);
112  }
113  Dequeue(count);
114  return true;
115  }
116 
117  bool QueueBase::RawDequeue(void *data, unsigned count) {
118  return RawDequeue(data, count, 1, 0);
119  }
120 
121  void *QueueBase::GetRawEnqueuePtr(unsigned thresh, unsigned chan) {
123  AutoLock<QueueBase> al(*this);
124  // Only the enqueuer may call flush and reset will clear all state as if we
125  // had not done anything yet, so block here
126  while (flushed) {
128  if (useD4R) {
129  WriteBlock(-1);
130  } else {
131  Wait();
132  }
133  }
134  if (inenqueue) { ASSERT(enqueuethresh >= thresh); }
135  else { enqueuethresh = thresh; }
136  bool grown = false;
137  while (true) {
138  void *ptr = InternalGetRawEnqueuePtr(thresh, chan);
139  if (ptr) {
140  inenqueue = true;
141  return ptr;
142  }
144  if (thresh > UnlockedMaxThreshold() && kernel->GrowQueueMaxThreshold()) {
145  //printf("Grow(%u, %u)\n", 2*thresh, thresh);
146  UnlockedGrow(2*thresh, thresh);
147  Signal();
148  } else if (!grown && ReadBlocked() && kernel->GrowQueueMaxThreshold()) {
149  UnlockedGrow(readrequest + thresh, thresh);
150  Signal();
151  grown = true;
152  } else {
153  writerequest = thresh;
155  writerequest = 0;
156  }
157  }
158  }
159 
160  void QueueBase::Enqueue(unsigned count) {
161  AutoLock<QueueBase> al(*this);
162  enqueuethresh = 0;
163  inenqueue = false;
165  InternalEnqueue(count);
166  NotifyData();
167  }
168 
169  void QueueBase::RawEnqueue(const void *data, unsigned count, unsigned numChans, unsigned chanStride) {
170  void *dest = GetRawEnqueuePtr(count, 0);
171  const char *src = (char*)data;
172  memcpy(dest, src, count);
173  for (unsigned chan = 1; chan < numChans; ++chan) {
174  dest = GetRawEnqueuePtr(count, chan);
175  src += chanStride;
176  memcpy(dest, src, count);
177  }
178  Enqueue(count);
179  }
180 
182  AutoLock<const QueueBase> al(*this);
183  InternalReset();
184  }
185 
187  AutoLock<const QueueBase> al(*this);
188  InternalFlush();
189  }
190 
191  bool QueueBase::Flushed() const {
192  AutoLock<const QueueBase> al(*this);
193  return flushed;
194  }
195 
196  void QueueBase::RawEnqueue(const void* data, unsigned count) {
197  return RawEnqueue(data, count, 1, 0);
198  }
199 
200  unsigned QueueBase::NumChannels() const {
201  AutoLock<const QueueBase> al(*this);
202  return UnlockedNumChannels();
203  }
204 
205  unsigned QueueBase::Count() const {
206  AutoLock<const QueueBase> al(*this);
207  return UnlockedCount();
208  }
209 
210  bool QueueBase::Empty() const {
211  AutoLock<const QueueBase> al(*this);
212  return UnlockedEmpty();
213  }
214 
215  unsigned QueueBase::Freespace() const {
216  AutoLock<const QueueBase> al(*this);
217  return UnlockedFreespace();
218  }
219 
220  bool QueueBase::Full() const {
221  AutoLock<const QueueBase> al(*this);
222  return UnlockedFull();
223  }
224 
225  unsigned QueueBase::MaxThreshold() const {
226  AutoLock<const QueueBase> al(*this);
227  return UnlockedMaxThreshold();
228  }
229 
230  unsigned QueueBase::QueueLength() const {
231  AutoLock<const QueueBase> al(*this);
232  return UnlockedQueueLength();
233  }
234 
236  AutoLock<const QueueBase> al(*this);
238  }
239 
241  AutoLock<const QueueBase> al(*this);
243  }
244 
245  void QueueBase::Grow(unsigned queueLen, unsigned maxThresh) {
246  AutoLock<QueueBase> al(*this);
247  UnlockedGrow(queueLen, maxThresh);
248  }
249 
251  AutoLock<QueueBase> al(*this);
253  }
254 
256  readshutdown = true;
257  Signal();
258  }
259 
261  AutoLock<QueueBase> al(*this);
263  }
264 
267  if (postpad > 0) {
268  while (true) {
269  void *ptr = InternalGetRawEnqueuePtr(postpad, 0);
270  if (ptr) {
271  unsigned num_chans = UnlockedNumChannels();
272  unsigned chanstride = UnlockedEnqueueChannelStride();
273  for (unsigned chan = 0; chan < num_chans; ++chan) {
274  memset((char*)ptr + chanstride*chan, 0, postpad);
275  }
277  } else {
281  writerequest = 0;
282  }
283  }
284  }
285  flushed = true;
286  cond.Broadcast();
287  }
288 
290  ASSERT(flushed);
291  readrequest = 0;
292  writerequest = 0;
293  enqueuethresh = 0;
294  dequeuethresh = 0;
295  indequeue = false;
296  inenqueue = false;
297  flushed = false;
298  if (prepad > 0) {
299  void *ptr = InternalGetRawEnqueuePtr(prepad, 0);
300  ASSERT(ptr);
301  unsigned num_chans = UnlockedNumChannels();
302  unsigned chanstride = UnlockedEnqueueChannelStride();
303  for (unsigned chan = 0; chan < num_chans; ++chan) {
304  memset((char*)ptr + chanstride*chan, 0, prepad);
305  }
307  }
308  cond.Broadcast();
309  }
310 
312  writeshutdown = true;
313  Signal();
314  }
315 
316  unsigned QueueBase::NumEnqueued() const {
317  AutoLock<const QueueBase> al(*this);
318  return UnlockedNumEnqueued();
319  }
320 
321  unsigned QueueBase::NumDequeued() const {
322  AutoLock<const QueueBase> al(*this);
323  return UnlockedNumDequeued();
324  }
325 
327  if (useD4R) {
328  ReadBlock();
329  } else {
330  while (ReadBlocked()) {
331  cond.Wait(lock);
332  }
333  }
334  }
335 
339  }
340 
342  if (UnlockedCount() >= readrequest) {
343  cond.Broadcast();
344  }
345  }
346 
348  if (useD4R) {
350  } else {
351  while (WriteBlocked()) {
352  cond.Wait(lock);
353  }
354  }
355  }
356 
360  }
361 
363  if (UnlockedFreespace() >= writerequest) {
364  cond.Broadcast();
365  }
366  }
367 
369  AutoLock<QueueBase> al(*this);
370  cond.Broadcast();
371  }
372 
374  unsigned size = kernel->CalculateGrowSize(UnlockedCount(), writerequest);
375  logger.Debug("Detect: Grow(%u, %u)", size, writerequest);
376  UnlockedGrow(size, writerequest);
377  logger.Debug("New size: (%u, %u)", UnlockedQueueLength(), UnlockedMaxThreshold());
378  }
379 
381  AutoLock<QueueBase> al(*this);
382  return readrequest;
383  }
384 
386  AutoLock<QueueBase> al(*this);
387  return writerequest;
388  }
389 
391  AutoLock<QueueBase> al(*this);
392  return readshutdown;
393  }
394 
396  AutoLock<QueueBase> al(*this);
397  return writeshutdown;
398  }
399 
401  logger.Error("Printing state (w:%llu r:%llu)", readerkey, writerkey);
402  logger.Error("size: %u, maxthresh: %u count: %u free: %u, flushed: %d",
404  (int)flushed);
405  logger.Error("readrequest: %u, writerequest: %u numenqueued: %u, numdequeued: %u",
407  if (indequeue) {
408  logger.Error("Indequeue (thresh: %u)", dequeuethresh);
409  }
410  if (inenqueue) {
411  logger.Error("Inenqueue (thresh: %u)", enqueuethresh);
412  }
413  if (readshutdown) {
414  logger.Error("Reader shutdown");
415  }
416  if (writeshutdown) {
417  logger.Error("Writer shutdown");
418  }
419  }
420 
422 }
423 
Logger object that is used for forwarding log messages.
Definition: Logger.h:57
PthreadCondition & Wait(PthreadMutex &mutex)
virtual bool ReadBlocked()
Definition: QueueBase.cc:336
unsigned QueueLength() const
Definition: QueueBase.cc:230
virtual void UnlockedShutdownWriter()
Definition: QueueBase.cc:311
Top Representations of generic queues for the CPN library.
unsigned readrequest
Definition: QueueBase.h:279
unsigned prepad
Definition: QueueBase.h:277
unsigned Count() const
Definition: QueueBase.cc:205
void Grow(unsigned queueLen, unsigned maxThresh)
Definition: QueueBase.cc:245
virtual unsigned UnlockedMaxThreshold() const =0
bool IsReaderShutdown()
Definition: QueueBase.cc:390
void * GetRawEnqueuePtr(unsigned thresh, unsigned chan)
Definition: QueueBase.cc:121
PthreadCondition & Broadcast(void)
unsigned EnqueueChannelStride() const
Definition: QueueBase.cc:235
virtual unsigned UnlockedNumEnqueued() const =0
#define DEBUG(fmt,...)
Definition: D4RNode.cc:33
virtual void InternalFlush()
Definition: QueueBase.cc:265
Logger logger
Definition: QueueBase.h:288
virtual void WaitForFreespace()
Definition: QueueBase.cc:347
unsigned NumChannels() const
Definition: QueueBase.cc:200
unsigned NumEnqueued() const
Definition: QueueBase.cc:316
virtual void Wait()
Definition: QueueBase.h:246
Definition of the queue attributes.
virtual void WaitForData()
Definition: QueueBase.cc:326
bool IsWriterShutdown()
Definition: QueueBase.cc:395
virtual unsigned UnlockedCount() const =0
The Context abstract data type.
void ShutdownReader()
Called by the QueueReader when no more data will be read.
Definition: QueueBase.cc:250
virtual ~QueueBase()
Definition: QueueBase.cc:61
bool RawDequeue(void *data, unsigned count, unsigned numChans, unsigned chanStride)
Definition: QueueBase.cc:102
void Error(const char *fmt,...)
Definition: Logger.cc:159
void Dequeue(unsigned count)
Definition: QueueBase.cc:92
bool Flushed() const
Definition: QueueBase.cc:191
virtual unsigned UnlockedDequeueChannelStride() const =0
unsigned writerequest
Definition: QueueBase.h:280
An exception indicating that the node tried to write to a shutdown queue.
Definition: Exceptions.h:65
virtual bool WriteBlocked()
Definition: QueueBase.cc:357
bool Empty() const
Definition: QueueBase.cc:210
unsigned postpad
Definition: QueueBase.h:278
virtual bool GrowQueueMaxThreshold()=0
virtual void InternalEnqueue(unsigned count)=0
virtual void UnlockedShutdownReader()
Definition: QueueBase.cc:255
bool Full() const
Definition: QueueBase.cc:220
The exceptions specified for the CPN network.
virtual void InternalDequeue(unsigned count)=0
unsigned MaxThreshold() const
Definition: QueueBase.cc:225
virtual void InternalReset()
Definition: QueueBase.cc:289
virtual unsigned UnlockedFreespace() const =0
unsigned WriteRequest()
For unit tests.
Definition: QueueBase.cc:385
void WriteBlock(unsigned qsize)
Definition: D4RQueue.cc:106
virtual void Signal()
Definition: QueueBase.h:247
unsigned ReadRequest()
For unit tests.
Definition: QueueBase.cc:380
virtual unsigned UnlockedNumChannels() const =0
const std::string & Name() const
Definition: Logger.cc:88
void Enqueue(unsigned count)
Definition: QueueBase.cc:160
unsigned dequeuethresh
Definition: QueueBase.h:282
virtual void CheckTerminated()=0
const Key_t readerkey
Definition: QueueBase.h:273
const void * GetRawDequeuePtr(unsigned thresh, unsigned chan)
Definition: QueueBase.cc:63
void NotifyData()
Definition: QueueBase.cc:341
void NotifyTerminate()
Used to tell any waiting threads that the network is terminating.
Definition: QueueBase.cc:368
virtual unsigned UnlockedEnqueueChannelStride() const =0
void NotifyFreespace()
Definition: QueueBase.cc:362
KernelBase * kernel
Definition: QueueBase.h:286
bool writeshutdown
Definition: QueueBase.h:276
virtual void Detect()
Definition: QueueBase.cc:373
void ReadBlock()
Definition: D4RQueue.cc:71
void RawEnqueue(const void *data, unsigned count, unsigned numChans, unsigned chanStride)
Definition: QueueBase.cc:169
virtual void * InternalGetRawEnqueuePtr(unsigned thresh, unsigned chan)=0
virtual const void * InternalGetRawDequeuePtr(unsigned thresh, unsigned chan)=0
unsigned DequeueChannelStride() const
Definition: QueueBase.cc:240
virtual ~QueueReleaser()
Definition: QueueBase.cc:421
unsigned NumDequeued() const
Definition: QueueBase.cc:321
virtual bool UnlockedEmpty() const =0
unsigned Freespace() const
Definition: QueueBase.cc:215
bool readshutdown
Definition: QueueBase.h:275
unsigned enqueuethresh
Definition: QueueBase.h:281
PthreadCondition cond
Definition: QueueBase.h:290
virtual unsigned CalculateGrowSize(unsigned currentsize, unsigned request)=0
virtual void LogState()
For debugging ONLY!! Otherwise non deterministic output.
Definition: QueueBase.cc:400
virtual void UnlockedGrow(unsigned queueLen, unsigned maxThresh)=0
This is a simplified internal representation of the queue attributes needed to create a queue...
Definition: QueueAttr.h:237
void ShutdownWriter()
Called by the QueueWriter when no more data will be written.
Definition: QueueBase.cc:260
PthreadMutex lock
Definition: QueueBase.h:289
const Key_t writerkey
Definition: QueueBase.h:274
virtual bool UnlockedFull() const =0
virtual unsigned UnlockedNumDequeued() const =0
#define ASSERT(exp,...)
virtual unsigned UnlockedQueueLength() const =0
void Debug(const char *fmt,...)
Definition: Logger.cc:186