CPN
Computational Process Networks
Public Member Functions | Protected Member Functions | Protected Attributes | List of all members
CPN::QueueBase Class Referenceabstract

The base class for all queues in the CPN library. More...

#include <QueueBase.h>

+ Inheritance diagram for CPN::QueueBase:
+ Collaboration diagram for CPN::QueueBase:

Public Member Functions

virtual ~QueueBase ()
 
const void * GetRawDequeuePtr (unsigned thresh, unsigned chan)
 
void Dequeue (unsigned count)
 
bool RawDequeue (void *data, unsigned count, unsigned numChans, unsigned chanStride)
 
bool RawDequeue (void *data, unsigned count)
 
void * GetRawEnqueuePtr (unsigned thresh, unsigned chan)
 
void Enqueue (unsigned count)
 
void RawEnqueue (const void *data, unsigned count, unsigned numChans, unsigned chanStride)
 
void RawEnqueue (const void *data, unsigned count)
 
void Reset ()
 
void Flush ()
 
bool Flushed () const
 
unsigned NumChannels () const
 
unsigned Count () const
 
bool Empty () const
 
unsigned Freespace () const
 
bool Full () const
 
unsigned MaxThreshold () const
 
unsigned QueueLength () const
 
unsigned EnqueueChannelStride () const
 
unsigned DequeueChannelStride () const
 
void Grow (unsigned queueLen, unsigned maxThresh)
 
Key_t GetWriterKey () const
 
Key_t GetReaderKey () const
 
const std::string & GetDatatype () const
 
void ShutdownReader ()
 Called by the QueueReader when no more data will be read. More...
 
void ShutdownWriter ()
 Called by the QueueWriter when no more data will be written. More...
 
void NotifyTerminate ()
 Used to tell any waiting threads that the network is terminating. More...
 
void Lock () const
 
void Unlock () const
 
unsigned ReadRequest ()
 For unit tests. More...
 
unsigned WriteRequest ()
 For unit tests. More...
 
bool IsReaderShutdown ()
 
bool IsWriterShutdown ()
 
virtual void LogState ()
 For debugging ONLY!! Otherwise non deterministic output. More...
 
unsigned NumEnqueued () const
 
unsigned NumDequeued () const
 
void SetReaderNode (shared_ptr< Node > n)
 
void SetWriterNode (shared_ptr< Node > n)
 
void SignalReaderTagChanged ()
 
void SignalWriterTagChanged ()
 

Protected Member Functions

 QueueBase (KernelBase *k, const SimpleQueueAttr &attr)
 
virtual void WaitForData ()
 
virtual bool ReadBlocked ()
 
void NotifyData ()
 
virtual void WaitForFreespace ()
 
virtual bool WriteBlocked ()
 
void NotifyFreespace ()
 
virtual void Wait ()
 
virtual void Signal ()
 
virtual void Detect ()
 
virtual const void * InternalGetRawDequeuePtr (unsigned thresh, unsigned chan)=0
 
virtual void InternalDequeue (unsigned count)=0
 
virtual void * InternalGetRawEnqueuePtr (unsigned thresh, unsigned chan)=0
 
virtual void InternalEnqueue (unsigned count)=0
 
virtual void InternalFlush ()
 
virtual void InternalReset ()
 
virtual unsigned UnlockedNumChannels () const =0
 
virtual unsigned UnlockedCount () const =0
 
virtual bool UnlockedEmpty () const =0
 
virtual unsigned UnlockedFreespace () const =0
 
virtual bool UnlockedFull () const =0
 
virtual unsigned UnlockedMaxThreshold () const =0
 
virtual unsigned UnlockedQueueLength () const =0
 
virtual unsigned UnlockedEnqueueChannelStride () const =0
 
virtual unsigned UnlockedDequeueChannelStride () const =0
 
virtual void UnlockedGrow (unsigned queueLen, unsigned maxThresh)=0
 
virtual void UnlockedShutdownReader ()
 
virtual void UnlockedShutdownWriter ()
 
virtual unsigned UnlockedNumEnqueued () const =0
 
virtual unsigned UnlockedNumDequeued () const =0
 
void ReadBlock ()
 
void WriteBlock (unsigned qsize)
 
virtual void UnlockedSignalReaderTagChanged ()
 
virtual void UnlockedSignalWriterTagChanged ()
 

Protected Attributes

const Key_t readerkey
 
const Key_t writerkey
 
bool readshutdown
 
bool writeshutdown
 
unsigned prepad
 
unsigned postpad
 
unsigned readrequest
 
unsigned writerequest
 
unsigned enqueuethresh
 
unsigned dequeuethresh
 
bool indequeue
 
bool inenqueue
 
bool flushed
 
KernelBasekernel
 
bool useD4R
 
Logger logger
 
PthreadMutex lock
 
PthreadCondition cond
 
std::string datatype
 
shared_ptr< Node > reader
 
shared_ptr< Node > writer
 
bool readtagchanged
 
bool writetagchanged
 
bool incomm
 

Detailed Description

The base class for all queues in the CPN library.

Definition at line 39 of file QueueBase.h.

Constructor & Destructor Documentation

CPN::QueueBase::~QueueBase ( )
virtual

Reimplemented from D4R::QueueBase.

Definition at line 61 of file QueueBase.cc.

61 {}
CPN::QueueBase::QueueBase ( KernelBase k,
const SimpleQueueAttr attr 
)
protected

Definition at line 37 of file QueueBase.cc.

References logger, Logger::Name(), readerkey, and writerkey.

38  : readerkey(attr.GetReaderKey()),
39  writerkey(attr.GetWriterKey()),
40  readshutdown(false),
41  writeshutdown(false),
42  prepad(0),
43  postpad(0),
44  readrequest(0),
45  writerequest(0),
46  enqueuethresh(0),
47  dequeuethresh(0),
48  indequeue(false),
49  inenqueue(false),
50  flushed(false),
51  kernel(k),
52  useD4R(kernel->UseD4R()),
54  datatype(attr.GetDatatype())
55  {
56  std::ostringstream oss;
57  oss << "Queue(" << writerkey << ", " << readerkey << ")";
58  logger.Name(oss.str());
59  }
unsigned readrequest
Definition: QueueBase.h:279
unsigned prepad
Definition: QueueBase.h:277
Logger logger
Definition: QueueBase.h:288
unsigned writerequest
Definition: QueueBase.h:280
unsigned postpad
Definition: QueueBase.h:278
const std::string & Name() const
Definition: Logger.cc:88
unsigned dequeuethresh
Definition: QueueBase.h:282
const Key_t readerkey
Definition: QueueBase.h:273
virtual bool UseD4R()=0
std::string datatype
Definition: QueueBase.h:291
KernelBase * kernel
Definition: QueueBase.h:286
bool writeshutdown
Definition: QueueBase.h:276
virtual shared_ptr< Context > GetContext() const =0
bool readshutdown
Definition: QueueBase.h:275
unsigned enqueuethresh
Definition: QueueBase.h:281
const Key_t writerkey
Definition: QueueBase.h:274

+ Here is the call graph for this function:

Member Function Documentation

unsigned CPN::QueueBase::Count ( ) const
Returns
the number of bytes in the queue.

Definition at line 205 of file QueueBase.cc.

References UnlockedCount().

205  {
206  AutoLock<const QueueBase> al(*this);
207  return UnlockedCount();
208  }
virtual unsigned UnlockedCount() const =0

+ Here is the call graph for this function:

void CPN::QueueBase::Dequeue ( unsigned  count)

This function is used to remove elements from the queue. count elements will be removed from the queue when this function is called.

Parameters
countthe number of bytes to remove from the queue

Definition at line 92 of file QueueBase.cc.

References dequeuethresh, GetRawDequeuePtr(), indequeue, InternalDequeue(), NotifyFreespace(), readerkey, and readshutdown.

Referenced by RawDequeue().

92  {
93  if (!GetRawDequeuePtr(count, 0)) { throw BrokenQueueException(readerkey); }
94  AutoLock<QueueBase> al(*this);
95  dequeuethresh = 0;
96  indequeue = false;
97  if (readshutdown) { throw BrokenQueueException(readerkey); }
98  InternalDequeue(count);
100  }
virtual void InternalDequeue(unsigned count)=0
unsigned dequeuethresh
Definition: QueueBase.h:282
const Key_t readerkey
Definition: QueueBase.h:273
const void * GetRawDequeuePtr(unsigned thresh, unsigned chan)
Definition: QueueBase.cc:63
void NotifyFreespace()
Definition: QueueBase.cc:362
bool readshutdown
Definition: QueueBase.h:275

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

unsigned CPN::QueueBase::DequeueChannelStride ( ) const
Returns
The current dequeue channel stride. This value is only guaranteed to be consistent inbetween a call to GetDequeuePtr and Dequeue.

Definition at line 240 of file QueueBase.cc.

References UnlockedDequeueChannelStride().

240  {
241  AutoLock<const QueueBase> al(*this);
243  }
virtual unsigned UnlockedDequeueChannelStride() const =0

+ Here is the call graph for this function:

void CPN::QueueBase::Detect ( )
protectedvirtual

Called by the D4R algorithm when it has detected an artificial deadlock and this queue should be changed.

Implements D4R::QueueBase.

Definition at line 373 of file QueueBase.cc.

References CPN::KernelBase::CalculateGrowSize(), Logger::Debug(), kernel, logger, UnlockedCount(), UnlockedGrow(), UnlockedMaxThreshold(), UnlockedQueueLength(), and writerequest.

373  {
374  unsigned size = kernel->CalculateGrowSize(UnlockedCount(), writerequest);
375  logger.Debug("Detect: Grow(%u, %u)", size, writerequest);
376  UnlockedGrow(size, writerequest);
377  logger.Debug("New size: (%u, %u)", UnlockedQueueLength(), UnlockedMaxThreshold());
378  }
virtual unsigned UnlockedMaxThreshold() const =0
Logger logger
Definition: QueueBase.h:288
virtual unsigned UnlockedCount() const =0
unsigned writerequest
Definition: QueueBase.h:280
KernelBase * kernel
Definition: QueueBase.h:286
virtual unsigned CalculateGrowSize(unsigned currentsize, unsigned request)=0
virtual void UnlockedGrow(unsigned queueLen, unsigned maxThresh)=0
virtual unsigned UnlockedQueueLength() const =0
void Debug(const char *fmt,...)
Definition: Logger.cc:186

+ Here is the call graph for this function:

bool CPN::QueueBase::Empty ( ) const
Returns
true if the queue is empty

Definition at line 210 of file QueueBase.cc.

References UnlockedEmpty().

210  {
211  AutoLock<const QueueBase> al(*this);
212  return UnlockedEmpty();
213  }
virtual bool UnlockedEmpty() const =0

+ Here is the call graph for this function:

void CPN::QueueBase::Enqueue ( unsigned  count)

This function is used to release the buffer obtained with GetRawEnqueuePtr. The count specifies the number of entries that we want to be placed in the buffer.

Note
A call to this function without an accompanying call to GetRawEnqueuePtr is undefined.
Parameters
countthe number of bytes to be placed in the buffer
Invariant
count <= thresh from GetRawEnqueuePtr

Definition at line 160 of file QueueBase.cc.

References enqueuethresh, inenqueue, InternalEnqueue(), NotifyData(), writerkey, and writeshutdown.

Referenced by CPN::ThresholdQueue::TQImpl::Grow(), and RawEnqueue().

160  {
161  AutoLock<QueueBase> al(*this);
162  enqueuethresh = 0;
163  inenqueue = false;
164  if (writeshutdown) { throw BrokenQueueException(writerkey); }
165  InternalEnqueue(count);
166  NotifyData();
167  }
virtual void InternalEnqueue(unsigned count)=0
void NotifyData()
Definition: QueueBase.cc:341
bool writeshutdown
Definition: QueueBase.h:276
unsigned enqueuethresh
Definition: QueueBase.h:281
const Key_t writerkey
Definition: QueueBase.h:274

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

unsigned CPN::QueueBase::EnqueueChannelStride ( ) const
Returns
The current enqueue channel stride. This value is only guaranteed to be consistent inbetween a call to GetEnqueuePtr and Enqueue.

Definition at line 235 of file QueueBase.cc.

References UnlockedEnqueueChannelStride().

235  {
236  AutoLock<const QueueBase> al(*this);
238  }
virtual unsigned UnlockedEnqueueChannelStride() const =0

+ Here is the call graph for this function:

void CPN::QueueBase::Flush ( )

Definition at line 186 of file QueueBase.cc.

References InternalFlush().

186  {
187  AutoLock<const QueueBase> al(*this);
188  InternalFlush();
189  }
virtual void InternalFlush()
Definition: QueueBase.cc:265

+ Here is the call graph for this function:

bool CPN::QueueBase::Flushed ( ) const

Definition at line 191 of file QueueBase.cc.

References flushed.

191  {
192  AutoLock<const QueueBase> al(*this);
193  return flushed;
194  }
unsigned CPN::QueueBase::Freespace ( ) const
Returns
the number of bytes we can add to the queue without blocking.

Definition at line 215 of file QueueBase.cc.

References UnlockedFreespace().

215  {
216  AutoLock<const QueueBase> al(*this);
217  return UnlockedFreespace();
218  }
virtual unsigned UnlockedFreespace() const =0

+ Here is the call graph for this function:

bool CPN::QueueBase::Full ( ) const
Returns
true if the queue is full, false otherwise

Definition at line 220 of file QueueBase.cc.

References UnlockedFull().

220  {
221  AutoLock<const QueueBase> al(*this);
222  return UnlockedFull();
223  }
virtual bool UnlockedFull() const =0

+ Here is the call graph for this function:

const std::string& CPN::QueueBase::GetDatatype ( ) const
inline
Returns
the datatype name associated with this queue

Definition at line 213 of file QueueBase.h.

213 { return datatype; }
std::string datatype
Definition: QueueBase.h:291
const void * CPN::QueueBase::GetRawDequeuePtr ( unsigned  thresh,
unsigned  chan 
)

Get a pointer to a buffer containing elements.

Note
access to the memory locations pointed to by the returned pointer after Dequeue has been called is undefined.
Parameters
threshthe number of bytes to get
chanthe channel to use
Returns
A void* to a block of memory containing thresh bytes or 0 if there are not thresh bytes available.

Definition at line 63 of file QueueBase.cc.

References ASSERT, CPN::KernelBase::CheckTerminated(), dequeuethresh, flushed, CPN::KernelBase::GrowQueueMaxThreshold(), indequeue, InternalGetRawDequeuePtr(), kernel, readerkey, readrequest, readshutdown, Signal(), UnlockedGrow(), UnlockedMaxThreshold(), WaitForData(), WriteBlocked(), writerequest, and writeshutdown.

Referenced by Dequeue(), and RawDequeue().

63  {
65  AutoLock<QueueBase> al(*this);
66  if (indequeue) { ASSERT(dequeuethresh >= thresh); }
67  else { dequeuethresh = thresh; }
68  while (true) {
69  const void *ptr = InternalGetRawDequeuePtr(thresh, chan);
70  if (ptr ||
72  || flushed) {
73  if (ptr) { indequeue = true; }
74  return ptr;
75  }
76  if (readshutdown) { throw BrokenQueueException(readerkey); }
77  if (thresh > UnlockedMaxThreshold() && kernel->GrowQueueMaxThreshold()) {
78  //printf("Grow(%u, %u)\n", 2*thresh, thresh);
79  UnlockedGrow(2*thresh, thresh);
80  Signal();
81  } else if (WriteBlocked() && kernel->GrowQueueMaxThreshold()) {
82  UnlockedGrow(writerequest + thresh, thresh);
83  Signal();
84  } else {
85  readrequest = thresh;
86  WaitForData();
87  readrequest = 0;
88  }
89  }
90  }
unsigned readrequest
Definition: QueueBase.h:279
virtual unsigned UnlockedMaxThreshold() const =0
virtual void WaitForData()
Definition: QueueBase.cc:326
unsigned writerequest
Definition: QueueBase.h:280
virtual bool WriteBlocked()
Definition: QueueBase.cc:357
virtual bool GrowQueueMaxThreshold()=0
virtual void Signal()
Definition: QueueBase.h:247
unsigned dequeuethresh
Definition: QueueBase.h:282
virtual void CheckTerminated()=0
const Key_t readerkey
Definition: QueueBase.h:273
KernelBase * kernel
Definition: QueueBase.h:286
bool writeshutdown
Definition: QueueBase.h:276
virtual const void * InternalGetRawDequeuePtr(unsigned thresh, unsigned chan)=0
bool readshutdown
Definition: QueueBase.h:275
virtual void UnlockedGrow(unsigned queueLen, unsigned maxThresh)=0
#define ASSERT(exp,...)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void * CPN::QueueBase::GetRawEnqueuePtr ( unsigned  thresh,
unsigned  chan 
)

Return a pointer to a buffer of memory that contains thresh entries that we can write into.

Note
A call to this function without an accompanying call to Enqueue is undefined.
An access to the memory locations defined by the return value is undefined after a call to Enqueue.
Parameters
threshthe number bytes we need in the returned buffer.
chanthe channel to use
Returns
void* to the memory buffer, 0 if not enough space available

Definition at line 121 of file QueueBase.cc.

References ASSERT, CPN::KernelBase::CheckTerminated(), enqueuethresh, flushed, CPN::KernelBase::GrowQueueMaxThreshold(), inenqueue, InternalGetRawEnqueuePtr(), kernel, ReadBlocked(), readrequest, readshutdown, Signal(), UnlockedGrow(), UnlockedMaxThreshold(), useD4R, Wait(), WaitForFreespace(), D4R::QueueBase::WriteBlock(), writerequest, writerkey, and writeshutdown.

Referenced by CPN::ThresholdQueue::TQImpl::Grow(), and RawEnqueue().

121  {
123  AutoLock<QueueBase> al(*this);
124  // Only the enqueuer may call flush and reset will clear all state as if we
125  // had not done anything yet, so block here
126  while (flushed) {
127  if (readshutdown || writeshutdown) { throw BrokenQueueException(writerkey); }
128  if (useD4R) {
129  WriteBlock(-1);
130  } else {
131  Wait();
132  }
133  }
134  if (inenqueue) { ASSERT(enqueuethresh >= thresh); }
135  else { enqueuethresh = thresh; }
136  bool grown = false;
137  while (true) {
138  void *ptr = InternalGetRawEnqueuePtr(thresh, chan);
139  if (ptr) {
140  inenqueue = true;
141  return ptr;
142  }
143  if (readshutdown || writeshutdown) { throw BrokenQueueException(writerkey); }
144  if (thresh > UnlockedMaxThreshold() && kernel->GrowQueueMaxThreshold()) {
145  //printf("Grow(%u, %u)\n", 2*thresh, thresh);
146  UnlockedGrow(2*thresh, thresh);
147  Signal();
148  } else if (!grown && ReadBlocked() && kernel->GrowQueueMaxThreshold()) {
149  UnlockedGrow(readrequest + thresh, thresh);
150  Signal();
151  grown = true;
152  } else {
153  writerequest = thresh;
155  writerequest = 0;
156  }
157  }
158  }
virtual bool ReadBlocked()
Definition: QueueBase.cc:336
unsigned readrequest
Definition: QueueBase.h:279
virtual unsigned UnlockedMaxThreshold() const =0
virtual void WaitForFreespace()
Definition: QueueBase.cc:347
virtual void Wait()
Definition: QueueBase.h:246
unsigned writerequest
Definition: QueueBase.h:280
virtual bool GrowQueueMaxThreshold()=0
void WriteBlock(unsigned qsize)
Definition: D4RQueue.cc:106
virtual void Signal()
Definition: QueueBase.h:247
virtual void CheckTerminated()=0
KernelBase * kernel
Definition: QueueBase.h:286
bool writeshutdown
Definition: QueueBase.h:276
virtual void * InternalGetRawEnqueuePtr(unsigned thresh, unsigned chan)=0
bool readshutdown
Definition: QueueBase.h:275
unsigned enqueuethresh
Definition: QueueBase.h:281
virtual void UnlockedGrow(unsigned queueLen, unsigned maxThresh)=0
const Key_t writerkey
Definition: QueueBase.h:274
#define ASSERT(exp,...)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Key_t CPN::QueueBase::GetReaderKey ( ) const
inline
Returns
the reader key associated with this queue

Definition at line 211 of file QueueBase.h.

211 { return readerkey; }
const Key_t readerkey
Definition: QueueBase.h:273
Key_t CPN::QueueBase::GetWriterKey ( ) const
inline
Returns
the writer key associated with this queue

Definition at line 209 of file QueueBase.h.

209 { return writerkey; }
const Key_t writerkey
Definition: QueueBase.h:274
void CPN::QueueBase::Grow ( unsigned  queueLen,
unsigned  maxThresh 
)

Ensure that this queue has at least queueLen bytes of space and can support at least maxThresh as the maxThreshold the new queue length will be max(queueLen, QueueLength()) and the new max threshold will be max(maxThresh, MaxThreshold())

Note
that the caller must guarantee that both an enqueue and dequeue are not both outstanding. Otherwise this will fail.
Parameters
queueLenthe next queue length
maxThreshthe next max threshold

Definition at line 245 of file QueueBase.cc.

References UnlockedGrow().

245  {
246  AutoLock<QueueBase> al(*this);
247  UnlockedGrow(queueLen, maxThresh);
248  }
virtual void UnlockedGrow(unsigned queueLen, unsigned maxThresh)=0

+ Here is the call graph for this function:

virtual void CPN::QueueBase::InternalDequeue ( unsigned  count)
protectedpure virtual

Implemented in CPN::RemoteQueue, and CPN::ThresholdQueue.

Referenced by Dequeue().

+ Here is the caller graph for this function:

virtual void CPN::QueueBase::InternalEnqueue ( unsigned  count)
protectedpure virtual

Implemented in CPN::RemoteQueue, and CPN::ThresholdQueue.

Referenced by Enqueue(), InternalFlush(), and InternalReset().

+ Here is the caller graph for this function:

void CPN::QueueBase::InternalFlush ( )
protectedvirtual

Reimplemented in CPN::RemoteQueue.

Definition at line 265 of file QueueBase.cc.

References PthreadCondition::Broadcast(), cond, flushed, InternalEnqueue(), InternalGetRawEnqueuePtr(), postpad, readshutdown, UnlockedEnqueueChannelStride(), UnlockedNumChannels(), WaitForFreespace(), writerequest, writerkey, and writeshutdown.

Referenced by Flush(), and CPN::RemoteQueue::InternalFlush().

265  {
266  if (readshutdown || writeshutdown) { throw BrokenQueueException(writerkey); }
267  if (postpad > 0) {
268  while (true) {
269  void *ptr = InternalGetRawEnqueuePtr(postpad, 0);
270  if (ptr) {
271  unsigned num_chans = UnlockedNumChannels();
272  unsigned chanstride = UnlockedEnqueueChannelStride();
273  for (unsigned chan = 0; chan < num_chans; ++chan) {
274  memset((char*)ptr + chanstride*chan, 0, postpad);
275  }
277  } else {
278  if (readshutdown || writeshutdown) { throw BrokenQueueException(writerkey); }
281  writerequest = 0;
282  }
283  }
284  }
285  flushed = true;
286  cond.Broadcast();
287  }
PthreadCondition & Broadcast(void)
virtual void WaitForFreespace()
Definition: QueueBase.cc:347
unsigned writerequest
Definition: QueueBase.h:280
unsigned postpad
Definition: QueueBase.h:278
virtual void InternalEnqueue(unsigned count)=0
virtual unsigned UnlockedNumChannels() const =0
virtual unsigned UnlockedEnqueueChannelStride() const =0
bool writeshutdown
Definition: QueueBase.h:276
virtual void * InternalGetRawEnqueuePtr(unsigned thresh, unsigned chan)=0
bool readshutdown
Definition: QueueBase.h:275
PthreadCondition cond
Definition: QueueBase.h:290
const Key_t writerkey
Definition: QueueBase.h:274

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

virtual const void* CPN::QueueBase::InternalGetRawDequeuePtr ( unsigned  thresh,
unsigned  chan 
)
protectedpure virtual

Implemented in CPN::ThresholdQueue.

Referenced by GetRawDequeuePtr().

+ Here is the caller graph for this function:

virtual void* CPN::QueueBase::InternalGetRawEnqueuePtr ( unsigned  thresh,
unsigned  chan 
)
protectedpure virtual

Implemented in CPN::ThresholdQueue.

Referenced by GetRawEnqueuePtr(), InternalFlush(), and InternalReset().

+ Here is the caller graph for this function:

void CPN::QueueBase::InternalReset ( )
protectedvirtual

Reimplemented in CPN::RemoteQueue, and CPN::ThresholdQueue.

Definition at line 289 of file QueueBase.cc.

References ASSERT, PthreadCondition::Broadcast(), cond, dequeuethresh, enqueuethresh, flushed, indequeue, inenqueue, InternalEnqueue(), InternalGetRawEnqueuePtr(), prepad, readrequest, UnlockedEnqueueChannelStride(), UnlockedNumChannels(), and writerequest.

Referenced by CPN::ThresholdQueue::InternalReset(), and Reset().

289  {
290  ASSERT(flushed);
291  readrequest = 0;
292  writerequest = 0;
293  enqueuethresh = 0;
294  dequeuethresh = 0;
295  indequeue = false;
296  inenqueue = false;
297  flushed = false;
298  if (prepad > 0) {
299  void *ptr = InternalGetRawEnqueuePtr(prepad, 0);
300  ASSERT(ptr);
301  unsigned num_chans = UnlockedNumChannels();
302  unsigned chanstride = UnlockedEnqueueChannelStride();
303  for (unsigned chan = 0; chan < num_chans; ++chan) {
304  memset((char*)ptr + chanstride*chan, 0, prepad);
305  }
307  }
308  cond.Broadcast();
309  }
unsigned readrequest
Definition: QueueBase.h:279
unsigned prepad
Definition: QueueBase.h:277
PthreadCondition & Broadcast(void)
unsigned writerequest
Definition: QueueBase.h:280
virtual void InternalEnqueue(unsigned count)=0
virtual unsigned UnlockedNumChannels() const =0
unsigned dequeuethresh
Definition: QueueBase.h:282
virtual unsigned UnlockedEnqueueChannelStride() const =0
virtual void * InternalGetRawEnqueuePtr(unsigned thresh, unsigned chan)=0
unsigned enqueuethresh
Definition: QueueBase.h:281
PthreadCondition cond
Definition: QueueBase.h:290
#define ASSERT(exp,...)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool CPN::QueueBase::IsReaderShutdown ( )

Definition at line 390 of file QueueBase.cc.

References readshutdown.

390  {
391  AutoLock<QueueBase> al(*this);
392  return readshutdown;
393  }
bool readshutdown
Definition: QueueBase.h:275
bool CPN::QueueBase::IsWriterShutdown ( )

Definition at line 395 of file QueueBase.cc.

References writeshutdown.

395  {
396  AutoLock<QueueBase> al(*this);
397  return writeshutdown;
398  }
bool writeshutdown
Definition: QueueBase.h:276
void CPN::QueueBase::Lock ( ) const
inlinevirtual

These functions are to access the lock for the queue.

Implements D4R::QueueBase.

Definition at line 221 of file QueueBase.h.

References lock, and PthreadMutex::Lock().

221 { lock.Lock(); }
PthreadMutex & Lock(void)
Definition: PthreadMutex.h:49
PthreadMutex lock
Definition: QueueBase.h:289

+ Here is the call graph for this function:

void CPN::QueueBase::LogState ( )
virtual

For debugging ONLY!! Otherwise non deterministic output.

Reimplemented in CPN::RemoteQueue.

Definition at line 400 of file QueueBase.cc.

References dequeuethresh, enqueuethresh, Logger::Error(), flushed, indequeue, inenqueue, logger, readerkey, readrequest, readshutdown, UnlockedCount(), UnlockedFreespace(), UnlockedMaxThreshold(), UnlockedNumDequeued(), UnlockedNumEnqueued(), UnlockedQueueLength(), writerequest, writerkey, and writeshutdown.

Referenced by CPN::RemoteQueue::LogState().

400  {
401  logger.Error("Printing state (w:%llu r:%llu)", readerkey, writerkey);
402  logger.Error("size: %u, maxthresh: %u count: %u free: %u, flushed: %d",
404  (int)flushed);
405  logger.Error("readrequest: %u, writerequest: %u numenqueued: %u, numdequeued: %u",
407  if (indequeue) {
408  logger.Error("Indequeue (thresh: %u)", dequeuethresh);
409  }
410  if (inenqueue) {
411  logger.Error("Inenqueue (thresh: %u)", enqueuethresh);
412  }
413  if (readshutdown) {
414  logger.Error("Reader shutdown");
415  }
416  if (writeshutdown) {
417  logger.Error("Writer shutdown");
418  }
419  }
unsigned readrequest
Definition: QueueBase.h:279
virtual unsigned UnlockedMaxThreshold() const =0
virtual unsigned UnlockedNumEnqueued() const =0
Logger logger
Definition: QueueBase.h:288
virtual unsigned UnlockedCount() const =0
void Error(const char *fmt,...)
Definition: Logger.cc:159
unsigned writerequest
Definition: QueueBase.h:280
virtual unsigned UnlockedFreespace() const =0
unsigned dequeuethresh
Definition: QueueBase.h:282
const Key_t readerkey
Definition: QueueBase.h:273
bool writeshutdown
Definition: QueueBase.h:276
bool readshutdown
Definition: QueueBase.h:275
unsigned enqueuethresh
Definition: QueueBase.h:281
const Key_t writerkey
Definition: QueueBase.h:274
virtual unsigned UnlockedNumDequeued() const =0
virtual unsigned UnlockedQueueLength() const =0

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

unsigned CPN::QueueBase::MaxThreshold ( ) const
Returns
the maximum threshold this queue supports in bytes

Definition at line 225 of file QueueBase.cc.

References UnlockedMaxThreshold().

Referenced by CPN::ThresholdQueue::TQImpl::Grow(), and CPN::ThresholdQueue::UnlockedGrow().

225  {
226  AutoLock<const QueueBase> al(*this);
227  return UnlockedMaxThreshold();
228  }
virtual unsigned UnlockedMaxThreshold() const =0

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::QueueBase::NotifyData ( )
protected

Definition at line 341 of file QueueBase.cc.

References PthreadCondition::Broadcast(), cond, readrequest, and UnlockedCount().

Referenced by Enqueue(), and CPN::RemoteQueue::EnqueuePacket().

341  {
342  if (UnlockedCount() >= readrequest) {
343  cond.Broadcast();
344  }
345  }
unsigned readrequest
Definition: QueueBase.h:279
PthreadCondition & Broadcast(void)
virtual unsigned UnlockedCount() const =0
PthreadCondition cond
Definition: QueueBase.h:290

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::QueueBase::NotifyFreespace ( )
protected

Definition at line 362 of file QueueBase.cc.

References PthreadCondition::Broadcast(), cond, UnlockedFreespace(), and writerequest.

Referenced by Dequeue(), and CPN::RemoteQueue::SendEnqueuePacket().

362  {
363  if (UnlockedFreespace() >= writerequest) {
364  cond.Broadcast();
365  }
366  }
PthreadCondition & Broadcast(void)
unsigned writerequest
Definition: QueueBase.h:280
virtual unsigned UnlockedFreespace() const =0
PthreadCondition cond
Definition: QueueBase.h:290

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::QueueBase::NotifyTerminate ( )

Used to tell any waiting threads that the network is terminating.

Definition at line 368 of file QueueBase.cc.

References PthreadCondition::Broadcast(), and cond.

368  {
369  AutoLock<QueueBase> al(*this);
370  cond.Broadcast();
371  }
PthreadCondition & Broadcast(void)
PthreadCondition cond
Definition: QueueBase.h:290

+ Here is the call graph for this function:

unsigned CPN::QueueBase::NumChannels ( ) const
Returns
the number of channels supported by this queue.

Definition at line 200 of file QueueBase.cc.

References UnlockedNumChannels().

200  {
201  AutoLock<const QueueBase> al(*this);
202  return UnlockedNumChannels();
203  }
virtual unsigned UnlockedNumChannels() const =0

+ Here is the call graph for this function:

unsigned CPN::QueueBase::NumDequeued ( ) const

Definition at line 321 of file QueueBase.cc.

References UnlockedNumDequeued().

321  {
322  AutoLock<const QueueBase> al(*this);
323  return UnlockedNumDequeued();
324  }
virtual unsigned UnlockedNumDequeued() const =0

+ Here is the call graph for this function:

unsigned CPN::QueueBase::NumEnqueued ( ) const

Definition at line 316 of file QueueBase.cc.

References UnlockedNumEnqueued().

316  {
317  AutoLock<const QueueBase> al(*this);
318  return UnlockedNumEnqueued();
319  }
virtual unsigned UnlockedNumEnqueued() const =0

+ Here is the call graph for this function:

unsigned CPN::QueueBase::QueueLength ( ) const
Returns
the maximum number of bytes that can be put in this queue.

Definition at line 230 of file QueueBase.cc.

References UnlockedQueueLength().

Referenced by CPN::ThresholdQueue::TQImpl::Grow(), CPN::RemoteQueue::GrowPacket(), CPN::ThresholdQueue::UnlockedGrow(), and CPN::RemoteQueue::UnlockedGrow().

230  {
231  AutoLock<const QueueBase> al(*this);
232  return UnlockedQueueLength();
233  }
virtual unsigned UnlockedQueueLength() const =0

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool CPN::QueueBase::RawDequeue ( void *  data,
unsigned  count,
unsigned  numChans,
unsigned  chanStride 
)

Dequeue data from the queue directly into the memory pointed to by data. This function shall be equivalent to a call to GetRawDequeuePtr then a memcpy and then a call to Dequeue.

Parameters
datapoiner to memory to dequeue to
countthe number of bytes to copy into data
numChansthe number of channels to write to
chanStridethe distance in bytes between the beginning of the channels in data.
Returns
true on success false on failure

Definition at line 102 of file QueueBase.cc.

References ASSERT, Dequeue(), and GetRawDequeuePtr().

Referenced by RawDequeue().

102  {
103  const void *src = GetRawDequeuePtr(count, 0);
104  char *dest = (char*)data;
105  if (!src) { return false; }
106  memcpy(dest, src, count);
107  for (unsigned chan = 1; chan < numChans; ++chan) {
108  src = GetRawDequeuePtr(count, chan);
109  ASSERT(src);
110  dest += chanStride;
111  memcpy(dest, src, count);
112  }
113  Dequeue(count);
114  return true;
115  }
void Dequeue(unsigned count)
Definition: QueueBase.cc:92
const void * GetRawDequeuePtr(unsigned thresh, unsigned chan)
Definition: QueueBase.cc:63
#define ASSERT(exp,...)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool CPN::QueueBase::RawDequeue ( void *  data,
unsigned  count 
)

A version of RawDequeue to use when there is only 1 channel.

Parameters
datathe data to enqueue
countthe number of bytes to enqueue
Returns
true on success false if we have reached the end of the queue

Definition at line 117 of file QueueBase.cc.

References RawDequeue().

117  {
118  return RawDequeue(data, count, 1, 0);
119  }
bool RawDequeue(void *data, unsigned count, unsigned numChans, unsigned chanStride)
Definition: QueueBase.cc:102

+ Here is the call graph for this function:

void CPN::QueueBase::RawEnqueue ( const void *  data,
unsigned  count,
unsigned  numChans,
unsigned  chanStride 
)

This function shall be equivalent to a call to GetRqwEnqueuePtr and a memcpy and then a call to Enqueue

The underlying implementatin may implement ether the GetRawEnqueuePtr and Enqueue or RawEnqueue and then implement the other in terms of the one implemented.

Parameters
datapointer to the memory to enqueue
countthe number of bytes to enqueue
numChansthe number of channels to write to
chanStridethe distance in bytes between the beginning of the channels in data.

Definition at line 169 of file QueueBase.cc.

References Enqueue(), and GetRawEnqueuePtr().

Referenced by RawEnqueue().

169  {
170  void *dest = GetRawEnqueuePtr(count, 0);
171  const char *src = (char*)data;
172  memcpy(dest, src, count);
173  for (unsigned chan = 1; chan < numChans; ++chan) {
174  dest = GetRawEnqueuePtr(count, chan);
175  src += chanStride;
176  memcpy(dest, src, count);
177  }
178  Enqueue(count);
179  }
void * GetRawEnqueuePtr(unsigned thresh, unsigned chan)
Definition: QueueBase.cc:121
void Enqueue(unsigned count)
Definition: QueueBase.cc:160

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::QueueBase::RawEnqueue ( const void *  data,
unsigned  count 
)

A version of RawEnqueue to use when there is only 1 channel.

Parameters
datapointer to the memory to enqueue
countthe number of bytes to enqueue

Definition at line 196 of file QueueBase.cc.

References RawEnqueue().

196  {
197  return RawEnqueue(data, count, 1, 0);
198  }
void RawEnqueue(const void *data, unsigned count, unsigned numChans, unsigned chanStride)
Definition: QueueBase.cc:169

+ Here is the call graph for this function:

void D4R::QueueBase::ReadBlock ( )
protectedinherited

reader ===> writer ReadBlock requires that you already hold the lock and if it is reentrant then a single unlock will release the lock.

Exceptions
D4R::DeadlockException

Definition at line 71 of file D4RQueue.cc.

References D4R::QueueBase::incomm, D4R::QueueBase::ReadBlocked(), D4R::QueueBase::reader, D4R::QueueBase::Signal(), D4R::QueueBase::Wait(), D4R::QueueBase::writer, and D4R::QueueBase::writetagchanged.

Referenced by WaitForData().

71  {
72  if (!writer) {
73  while (ReadBlocked() && !writer) {
74  Wait();
75  }
76  }
77  if (!ReadBlocked()) { return; }
78  writetagchanged = false;
79  try {
80  while (incomm) { Wait(); }
81  ScopeSetter<bool> ss(incomm, true);
82  AutoUnlock<QueueBase> au(*this);
83  reader->Block(writer->GetPublicTag(), -1);
84  } catch (...) { Signal(); throw; }
85  Signal();
86  while (ReadBlocked()) {
87  if (writetagchanged) {
88  writetagchanged = false;
89  bool detect;
90  try {
91  while (incomm) { Wait(); }
92  ScopeSetter<bool> ss(incomm, true);
93  AutoUnlock<QueueBase> au(*this);
94  detect = reader->Transmit(writer->GetPublicTag());
95  } catch (...) { Signal(); throw; }
96  Signal();
97  if (detect) {
98  throw DeadlockException("True deadlock detected");
99  }
100  } else {
101  Wait();
102  }
103  }
104  }
shared_ptr< Node > reader
Definition: D4RQueue.h:122
virtual void Wait()=0
shared_ptr< Node > writer
Definition: D4RQueue.h:123
virtual bool ReadBlocked()=0
virtual void Signal()=0
bool writetagchanged
Definition: D4RQueue.h:125

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool CPN::QueueBase::ReadBlocked ( )
protectedvirtual
Returns
true if we are blocked

Implements D4R::QueueBase.

Definition at line 336 of file QueueBase.cc.

References CPN::KernelBase::CheckTerminated(), flushed, kernel, readrequest, readshutdown, UnlockedCount(), and writeshutdown.

Referenced by GetRawEnqueuePtr(), CPN::RemoteQueue::WaitForData(), and WaitForData().

336  {
339  }
unsigned readrequest
Definition: QueueBase.h:279
virtual unsigned UnlockedCount() const =0
virtual void CheckTerminated()=0
KernelBase * kernel
Definition: QueueBase.h:286
bool writeshutdown
Definition: QueueBase.h:276
bool readshutdown
Definition: QueueBase.h:275

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

unsigned CPN::QueueBase::ReadRequest ( )

For unit tests.

Definition at line 380 of file QueueBase.cc.

References readrequest.

380  {
381  AutoLock<QueueBase> al(*this);
382  return readrequest;
383  }
unsigned readrequest
Definition: QueueBase.h:279
void CPN::QueueBase::Reset ( )

Definition at line 181 of file QueueBase.cc.

References InternalReset().

181  {
182  AutoLock<const QueueBase> al(*this);
183  InternalReset();
184  }
virtual void InternalReset()
Definition: QueueBase.cc:289

+ Here is the call graph for this function:

void D4R::QueueBase::SetReaderNode ( shared_ptr< Node n)
inherited

Set the node which is reading from this queue

Parameters
nthe node

Definition at line 59 of file D4RQueue.cc.

References D4R::QueueBase::reader, and D4R::QueueBase::Signal().

Referenced by CPN::RemoteQueue::RemoteQueue().

59  {
60  AutoLock<QueueBase> al(*this);
61  reader = n;
62  Signal();
63  }
shared_ptr< Node > reader
Definition: D4RQueue.h:122
virtual void Signal()=0

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void D4R::QueueBase::SetWriterNode ( shared_ptr< Node n)
inherited

Set the node which is writing to this queue.

Parameters
nthe node

Definition at line 65 of file D4RQueue.cc.

References D4R::QueueBase::Signal(), and D4R::QueueBase::writer.

Referenced by CPN::RemoteQueue::RemoteQueue().

65  {
66  AutoLock<QueueBase> al(*this);
67  writer = n;
68  Signal();
69  }
shared_ptr< Node > writer
Definition: D4RQueue.h:123
virtual void Signal()=0

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::QueueBase::ShutdownReader ( )

Called by the QueueReader when no more data will be read.

Definition at line 250 of file QueueBase.cc.

References UnlockedShutdownReader().

Referenced by CPN::RemoteQueue::FileThreadEntryPoint().

250  {
251  AutoLock<QueueBase> al(*this);
253  }
virtual void UnlockedShutdownReader()
Definition: QueueBase.cc:255

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::QueueBase::ShutdownWriter ( )

Called by the QueueWriter when no more data will be written.

Definition at line 260 of file QueueBase.cc.

References UnlockedShutdownWriter().

Referenced by CPN::RemoteQueue::FileThreadEntryPoint().

260  {
261  AutoLock<QueueBase> al(*this);
263  }
virtual void UnlockedShutdownWriter()
Definition: QueueBase.cc:311

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

virtual void CPN::QueueBase::Signal ( )
inlineprotectedvirtual

Signal that Wait should return

Implements D4R::QueueBase.

Reimplemented in CPN::RemoteQueue.

Definition at line 247 of file QueueBase.h.

Referenced by GetRawDequeuePtr(), GetRawEnqueuePtr(), CPN::RemoteQueue::Signal(), UnlockedShutdownReader(), and UnlockedShutdownWriter().

247 { cond.Broadcast(); }
PthreadCondition & Broadcast(void)
PthreadCondition cond
Definition: QueueBase.h:290

+ Here is the caller graph for this function:

void D4R::QueueBase::SignalReaderTagChanged ( )
inherited

These functions are called by the D4R::Node when thier tag changes.

Definition at line 139 of file D4RQueue.cc.

References D4R::QueueBase::UnlockedSignalReaderTagChanged().

139  {
140  AutoLock<QueueBase> al(*this);
142  }
virtual void UnlockedSignalReaderTagChanged()
Definition: D4RQueue.cc:144

+ Here is the call graph for this function:

void D4R::QueueBase::SignalWriterTagChanged ( )
inherited

Definition at line 150 of file D4RQueue.cc.

References D4R::QueueBase::UnlockedSignalWriterTagChanged().

150  {
151  AutoLock<QueueBase> al(*this);
153  }
virtual void UnlockedSignalWriterTagChanged()
Definition: D4RQueue.cc:155

+ Here is the call graph for this function:

void CPN::QueueBase::Unlock ( void  ) const
inlinevirtual

Implements D4R::QueueBase.

Definition at line 222 of file QueueBase.h.

References lock, and PthreadMutex::Unlock().

222 { lock.Unlock(); }
PthreadMutex & Unlock(void)
Definition: PthreadMutex.h:50
PthreadMutex lock
Definition: QueueBase.h:289

+ Here is the call graph for this function:

virtual unsigned CPN::QueueBase::UnlockedCount ( ) const
protectedpure virtual

Implemented in CPN::RemoteQueue, and CPN::ThresholdQueue.

Referenced by Count(), Detect(), LogState(), NotifyData(), and ReadBlocked().

+ Here is the caller graph for this function:

virtual unsigned CPN::QueueBase::UnlockedDequeueChannelStride ( ) const
protectedpure virtual

Implemented in CPN::ThresholdQueue.

Referenced by DequeueChannelStride().

+ Here is the caller graph for this function:

virtual bool CPN::QueueBase::UnlockedEmpty ( ) const
protectedpure virtual

Implemented in CPN::RemoteQueue, and CPN::ThresholdQueue.

Referenced by Empty().

+ Here is the caller graph for this function:

virtual unsigned CPN::QueueBase::UnlockedEnqueueChannelStride ( ) const
protectedpure virtual

Implemented in CPN::ThresholdQueue.

Referenced by EnqueueChannelStride(), InternalFlush(), and InternalReset().

+ Here is the caller graph for this function:

virtual unsigned CPN::QueueBase::UnlockedFreespace ( ) const
protectedpure virtual

Implemented in CPN::ThresholdQueue.

Referenced by Freespace(), LogState(), NotifyFreespace(), and WriteBlocked().

+ Here is the caller graph for this function:

virtual bool CPN::QueueBase::UnlockedFull ( ) const
protectedpure virtual

Implemented in CPN::ThresholdQueue.

Referenced by Full().

+ Here is the caller graph for this function:

virtual void CPN::QueueBase::UnlockedGrow ( unsigned  queueLen,
unsigned  maxThresh 
)
protectedpure virtual

Implemented in CPN::RemoteQueue, and CPN::ThresholdQueue.

Referenced by Detect(), GetRawDequeuePtr(), GetRawEnqueuePtr(), and Grow().

+ Here is the caller graph for this function:

virtual unsigned CPN::QueueBase::UnlockedMaxThreshold ( ) const
protectedpure virtual

Implemented in CPN::ThresholdQueue.

Referenced by Detect(), GetRawDequeuePtr(), GetRawEnqueuePtr(), LogState(), and MaxThreshold().

+ Here is the caller graph for this function:

virtual unsigned CPN::QueueBase::UnlockedNumChannels ( ) const
protectedpure virtual

Implemented in CPN::ThresholdQueue.

Referenced by InternalFlush(), InternalReset(), and NumChannels().

+ Here is the caller graph for this function:

virtual unsigned CPN::QueueBase::UnlockedNumDequeued ( ) const
protectedpure virtual

Implemented in CPN::ThresholdQueue.

Referenced by LogState(), and NumDequeued().

+ Here is the caller graph for this function:

virtual unsigned CPN::QueueBase::UnlockedNumEnqueued ( ) const
protectedpure virtual

Implemented in CPN::ThresholdQueue.

Referenced by LogState(), and NumEnqueued().

+ Here is the caller graph for this function:

virtual unsigned CPN::QueueBase::UnlockedQueueLength ( ) const
protectedpure virtual

Implemented in CPN::RemoteQueue, and CPN::ThresholdQueue.

Referenced by Detect(), LogState(), QueueLength(), and WaitForFreespace().

+ Here is the caller graph for this function:

void CPN::QueueBase::UnlockedShutdownReader ( )
protectedvirtual

Definition at line 255 of file QueueBase.cc.

References readshutdown, and Signal().

Referenced by CPN::RemoteQueue::EndOfReadPacket(), CPN::RemoteQueue::InternalCheckStatus(), and ShutdownReader().

255  {
256  readshutdown = true;
257  Signal();
258  }
virtual void Signal()
Definition: QueueBase.h:247
bool readshutdown
Definition: QueueBase.h:275

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::QueueBase::UnlockedShutdownWriter ( )
protectedvirtual

Definition at line 311 of file QueueBase.cc.

References Signal(), and writeshutdown.

Referenced by CPN::RemoteQueue::EndOfWritePacket(), CPN::RemoteQueue::InternalCheckStatus(), and ShutdownWriter().

311  {
312  writeshutdown = true;
313  Signal();
314  }
virtual void Signal()
Definition: QueueBase.h:247
bool writeshutdown
Definition: QueueBase.h:276

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void D4R::QueueBase::UnlockedSignalReaderTagChanged ( )
protectedvirtualinherited

Reimplemented in CPN::RemoteQueue.

Definition at line 144 of file D4RQueue.cc.

References DEBUG, D4R::QueueBase::reader, D4R::QueueBase::readtagchanged, D4R::QueueBase::Signal(), and D4R::QueueBase::writer.

Referenced by CPN::RemoteQueue::D4RTagPacket(), D4R::QueueBase::SignalReaderTagChanged(), and CPN::RemoteQueue::UnlockedSignalReaderTagChanged().

144  {
145  DEBUG("%s: (%llu -> %llu)\n", __PRETTY_FUNCTION__, (writer ? writer->GetPrivateTag().Key() : 0), (reader ? reader->GetPrivateTag().Key() : 0));
146  readtagchanged = true;
147  Signal();
148  }
shared_ptr< Node > reader
Definition: D4RQueue.h:122
#define DEBUG(fmt,...)
Definition: D4RQueue.cc:34
shared_ptr< Node > writer
Definition: D4RQueue.h:123
bool readtagchanged
Definition: D4RQueue.h:124
virtual void Signal()=0

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void D4R::QueueBase::UnlockedSignalWriterTagChanged ( )
protectedvirtualinherited

Reimplemented in CPN::RemoteQueue.

Definition at line 155 of file D4RQueue.cc.

References DEBUG, D4R::QueueBase::reader, D4R::QueueBase::Signal(), D4R::QueueBase::writer, and D4R::QueueBase::writetagchanged.

Referenced by CPN::RemoteQueue::D4RTagPacket(), D4R::QueueBase::SignalWriterTagChanged(), and CPN::RemoteQueue::UnlockedSignalWriterTagChanged().

155  {
156  DEBUG("%s: (%llu -> %llu)\n", __PRETTY_FUNCTION__, (writer ? writer->GetPrivateTag().Key() : 0), (reader ? reader->GetPrivateTag().Key() : 0));
157  writetagchanged = true;
158  Signal();
159  }
shared_ptr< Node > reader
Definition: D4RQueue.h:122
#define DEBUG(fmt,...)
Definition: D4RQueue.cc:34
shared_ptr< Node > writer
Definition: D4RQueue.h:123
virtual void Signal()=0
bool writetagchanged
Definition: D4RQueue.h:125

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

virtual void CPN::QueueBase::Wait ( )
inlineprotectedvirtual

Wait untill Signal is called. Must be holding the lock to call.

Implements D4R::QueueBase.

Definition at line 246 of file QueueBase.h.

References lock.

Referenced by CPN::RemoteQueue::ActionThreadEntryPoint(), GetRawEnqueuePtr(), CPN::RemoteQueue::WaitForData(), and CPN::RemoteQueue::WaitForFreespace().

246 { cond.Wait(lock); }
PthreadCondition & Wait(PthreadMutex &mutex)
PthreadCondition cond
Definition: QueueBase.h:290
PthreadMutex lock
Definition: QueueBase.h:289

+ Here is the caller graph for this function:

void CPN::QueueBase::WaitForData ( )
protectedvirtual

Reimplemented in CPN::RemoteQueue.

Definition at line 326 of file QueueBase.cc.

References cond, lock, D4R::QueueBase::ReadBlock(), ReadBlocked(), useD4R, and PthreadCondition::Wait().

Referenced by GetRawDequeuePtr(), and CPN::RemoteQueue::WaitForData().

326  {
327  if (useD4R) {
328  ReadBlock();
329  } else {
330  while (ReadBlocked()) {
331  cond.Wait(lock);
332  }
333  }
334  }
PthreadCondition & Wait(PthreadMutex &mutex)
virtual bool ReadBlocked()
Definition: QueueBase.cc:336
void ReadBlock()
Definition: D4RQueue.cc:71
PthreadCondition cond
Definition: QueueBase.h:290
PthreadMutex lock
Definition: QueueBase.h:289

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::QueueBase::WaitForFreespace ( )
protectedvirtual

Reimplemented in CPN::RemoteQueue.

Definition at line 347 of file QueueBase.cc.

References cond, lock, UnlockedQueueLength(), useD4R, PthreadCondition::Wait(), D4R::QueueBase::WriteBlock(), and WriteBlocked().

Referenced by GetRawEnqueuePtr(), InternalFlush(), and CPN::RemoteQueue::WaitForFreespace().

347  {
348  if (useD4R) {
350  } else {
351  while (WriteBlocked()) {
352  cond.Wait(lock);
353  }
354  }
355  }
PthreadCondition & Wait(PthreadMutex &mutex)
virtual bool WriteBlocked()
Definition: QueueBase.cc:357
void WriteBlock(unsigned qsize)
Definition: D4RQueue.cc:106
PthreadCondition cond
Definition: QueueBase.h:290
PthreadMutex lock
Definition: QueueBase.h:289
virtual unsigned UnlockedQueueLength() const =0

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void D4R::QueueBase::WriteBlock ( unsigned  qsize)
protectedinherited

writer ===> reader WriteBlock requires that you already hold the lock and if it is reentrant then a single unlock will release the lock.

Definition at line 106 of file D4RQueue.cc.

References D4R::QueueBase::Detect(), D4R::QueueBase::incomm, D4R::QueueBase::reader, D4R::QueueBase::readtagchanged, D4R::QueueBase::Signal(), D4R::QueueBase::Wait(), D4R::QueueBase::WriteBlocked(), and D4R::QueueBase::writer.

Referenced by GetRawEnqueuePtr(), and WaitForFreespace().

106  {
107  if (!reader) {
108  while (WriteBlocked() && !reader) {
109  Wait();
110  }
111  }
112  if (!WriteBlocked()) { return; }
113  readtagchanged = false;
114  try {
115  while (incomm) { Wait(); }
116  ScopeSetter<bool> ss(incomm, true);
117  AutoUnlock<QueueBase> au(*this);
118  writer->Block(reader->GetPublicTag(), qsize);
119  } catch (...) { Signal(); throw; }
120  Signal();
121  while (WriteBlocked()) {
122  if (readtagchanged) {
123  readtagchanged = false;
124  bool detect;
125  try {
126  while (incomm) { Wait(); }
127  ScopeSetter<bool> ss(incomm, true);
128  AutoUnlock<QueueBase> au(*this);
129  detect = writer->Transmit(reader->GetPublicTag());
130  } catch (...) { Signal(); throw; }
131  Signal();
132  if (detect) { Detect(); }
133  } else {
134  Wait();
135  }
136  }
137  }
virtual void Detect()=0
shared_ptr< Node > reader
Definition: D4RQueue.h:122
virtual bool WriteBlocked()=0
virtual void Wait()=0
shared_ptr< Node > writer
Definition: D4RQueue.h:123
bool readtagchanged
Definition: D4RQueue.h:124
virtual void Signal()=0

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool CPN::QueueBase::WriteBlocked ( )
protectedvirtual
Returns
true if we are blocked.

Implements D4R::QueueBase.

Definition at line 357 of file QueueBase.cc.

References CPN::KernelBase::CheckTerminated(), flushed, kernel, readshutdown, UnlockedFreespace(), writerequest, and writeshutdown.

Referenced by GetRawDequeuePtr(), CPN::RemoteQueue::WaitForFreespace(), and WaitForFreespace().

357  {
360  }
unsigned writerequest
Definition: QueueBase.h:280
virtual unsigned UnlockedFreespace() const =0
virtual void CheckTerminated()=0
KernelBase * kernel
Definition: QueueBase.h:286
bool writeshutdown
Definition: QueueBase.h:276
bool readshutdown
Definition: QueueBase.h:275

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

unsigned CPN::QueueBase::WriteRequest ( )

For unit tests.

Definition at line 385 of file QueueBase.cc.

References writerequest.

385  {
386  AutoLock<QueueBase> al(*this);
387  return writerequest;
388  }
unsigned writerequest
Definition: QueueBase.h:280

Member Data Documentation

PthreadCondition CPN::QueueBase::cond
protected
std::string CPN::QueueBase::datatype
protected

Definition at line 291 of file QueueBase.h.

unsigned CPN::QueueBase::dequeuethresh
protected
unsigned CPN::QueueBase::enqueuethresh
protected
bool CPN::QueueBase::flushed
protected
bool D4R::QueueBase::incomm
protectedinherited

Definition at line 126 of file D4RQueue.h.

Referenced by D4R::QueueBase::ReadBlock(), and D4R::QueueBase::WriteBlock().

bool CPN::QueueBase::indequeue
protected
bool CPN::QueueBase::inenqueue
protected
KernelBase* CPN::QueueBase::kernel
protected
PthreadMutex CPN::QueueBase::lock
mutableprotected
Logger CPN::QueueBase::logger
protected
unsigned CPN::QueueBase::postpad
protected

Definition at line 278 of file QueueBase.h.

Referenced by InternalFlush().

unsigned CPN::QueueBase::prepad
protected

Definition at line 277 of file QueueBase.h.

Referenced by InternalReset().

shared_ptr<Node> D4R::QueueBase::reader
protectedinherited
const Key_t CPN::QueueBase::readerkey
protected
unsigned CPN::QueueBase::readrequest
protected
bool CPN::QueueBase::readshutdown
protected
bool D4R::QueueBase::readtagchanged
protectedinherited
bool CPN::QueueBase::useD4R
protected
shared_ptr<Node> D4R::QueueBase::writer
protectedinherited
unsigned CPN::QueueBase::writerequest
protected
const Key_t CPN::QueueBase::writerkey
protected
bool CPN::QueueBase::writeshutdown
protected
bool D4R::QueueBase::writetagchanged
protectedinherited

The documentation for this class was generated from the following files: