CPN
Computational Process Networks
Classes | Public Member Functions | Protected Member Functions | Protected Attributes | List of all members
CPN::ThresholdQueue Class Reference

A version of the ThresholdQueue that provides the CPN Queue interface This queue implementation creates a memory mapped object which has as a minimum size the machine page size. So there is an effective minimum queue size. Any queue size less than the page size will be expanded to the page size. More...

#include <CPNThresholdQueue.h>

+ Inheritance diagram for CPN::ThresholdQueue:
+ Collaboration diagram for CPN::ThresholdQueue:

Classes

class  TQImpl
 

Public Member Functions

 ThresholdQueue (KernelBase *k, const SimpleQueueAttr &attr, bool usembs)
 
 ThresholdQueue (KernelBase *k, const SimpleQueueAttr &attr, unsigned length, bool usembs)
 
 ~ThresholdQueue ()
 
const void * GetRawDequeuePtr (unsigned thresh, unsigned chan)
 
void Dequeue (unsigned count)
 
bool RawDequeue (void *data, unsigned count, unsigned numChans, unsigned chanStride)
 
bool RawDequeue (void *data, unsigned count)
 
void * GetRawEnqueuePtr (unsigned thresh, unsigned chan)
 
void Enqueue (unsigned count)
 
void RawEnqueue (const void *data, unsigned count, unsigned numChans, unsigned chanStride)
 
void RawEnqueue (const void *data, unsigned count)
 
void Reset ()
 
void Flush ()
 
bool Flushed () const
 
unsigned NumChannels () const
 
unsigned Count () const
 
bool Empty () const
 
unsigned Freespace () const
 
bool Full () const
 
unsigned MaxThreshold () const
 
unsigned QueueLength () const
 
unsigned EnqueueChannelStride () const
 
unsigned DequeueChannelStride () const
 
void Grow (unsigned queueLen, unsigned maxThresh)
 
Key_t GetWriterKey () const
 
Key_t GetReaderKey () const
 
const std::string & GetDatatype () const
 
void ShutdownReader ()
 Called by the QueueReader when no more data will be read. More...
 
void ShutdownWriter ()
 Called by the QueueWriter when no more data will be written. More...
 
void NotifyTerminate ()
 Used to tell any waiting threads that the network is terminating. More...
 
void Lock () const
 
void Unlock () const
 
unsigned ReadRequest ()
 For unit tests. More...
 
unsigned WriteRequest ()
 For unit tests. More...
 
bool IsReaderShutdown ()
 
bool IsWriterShutdown ()
 
virtual void LogState ()
 For debugging ONLY!! Otherwise non deterministic output. More...
 
unsigned NumEnqueued () const
 
unsigned NumDequeued () const
 
void SetReaderNode (shared_ptr< Node > n)
 
void SetWriterNode (shared_ptr< Node > n)
 
void SignalReaderTagChanged ()
 
void SignalWriterTagChanged ()
 

Protected Member Functions

virtual void * InternalGetRawEnqueuePtr (unsigned thresh, unsigned chan)
 
virtual void InternalEnqueue (unsigned count)
 
virtual const void * InternalGetRawDequeuePtr (unsigned thresh, unsigned chan)
 
virtual void InternalDequeue (unsigned count)
 
virtual void InternalReset ()
 
virtual unsigned UnlockedNumChannels () const
 
virtual unsigned UnlockedMaxThreshold () const
 
virtual unsigned UnlockedQueueLength () const
 
virtual unsigned UnlockedFreespace () const
 
virtual bool UnlockedFull () const
 
virtual unsigned UnlockedCount () const
 
virtual bool UnlockedEmpty () const
 
virtual unsigned UnlockedEnqueueChannelStride () const
 
virtual unsigned UnlockedDequeueChannelStride () const
 
unsigned UnlockedNumEnqueued () const
 
unsigned UnlockedNumDequeued () const
 
virtual void UnlockedGrow (unsigned queueLen, unsigned maxThresh)
 
virtual void WaitForData ()
 
virtual bool ReadBlocked ()
 
void NotifyData ()
 
virtual void WaitForFreespace ()
 
virtual bool WriteBlocked ()
 
void NotifyFreespace ()
 
virtual void Wait ()
 
virtual void Signal ()
 
virtual void Detect ()
 
virtual void InternalFlush ()
 
virtual void UnlockedShutdownReader ()
 
virtual void UnlockedShutdownWriter ()
 
void ReadBlock ()
 
void WriteBlock (unsigned qsize)
 
virtual void UnlockedSignalReaderTagChanged ()
 
virtual void UnlockedSignalWriterTagChanged ()
 

Protected Attributes

TQImplqueue
 
TQImploldqueue
 
bool enqueueUseOld
 
bool dequeueUseOld
 
const Key_t readerkey
 
const Key_t writerkey
 
bool readshutdown
 
bool writeshutdown
 
unsigned prepad
 
unsigned postpad
 
unsigned readrequest
 
unsigned writerequest
 
unsigned enqueuethresh
 
unsigned dequeuethresh
 
bool indequeue
 
bool inenqueue
 
bool flushed
 
KernelBasekernel
 
bool useD4R
 
Logger logger
 
PthreadMutex lock
 
PthreadCondition cond
 
std::string datatype
 
shared_ptr< Node > reader
 
shared_ptr< Node > writer
 
bool readtagchanged
 
bool writetagchanged
 
bool incomm
 

Detailed Description

A version of the ThresholdQueue that provides the CPN Queue interface This queue implementation creates a memory mapped object which has as a minimum size the machine page size. So there is an effective minimum queue size. Any queue size less than the page size will be expanded to the page size.

See Also
QueueBase and ThresholdQueueBase

Definition at line 45 of file CPNThresholdQueue.h.

Constructor & Destructor Documentation

ThresholdQueue< T >::ThresholdQueue ( KernelBase k,
const SimpleQueueAttr attr,
bool  usembs 
)

Definition at line 32 of file CPNThresholdQueue.cc.

References CPN::SimpleQueueAttr::GetLength(), CPN::SimpleQueueAttr::GetMaxThreshold(), CPN::SimpleQueueAttr::GetNumChannels(), and queue.

33  : QueueBase(k, attr), queue(0), oldqueue(0), enqueueUseOld(false), dequeueUseOld(false)
34  {
35  ThresholdQueueAttr qattr(attr.GetLength(), attr.GetMaxThreshold(),
36  attr.GetNumChannels(), usembs);
37  queue = new TQImpl(qattr);
38  }

+ Here is the call graph for this function:

ThresholdQueue< T >::ThresholdQueue ( KernelBase k,
const SimpleQueueAttr attr,
unsigned  length,
bool  usembs 
)

Definition at line 40 of file CPNThresholdQueue.cc.

References CPN::SimpleQueueAttr::GetMaxThreshold(), CPN::SimpleQueueAttr::GetNumChannels(), and queue.

42  : QueueBase(k, attr), queue(0), oldqueue(0), enqueueUseOld(false), dequeueUseOld(false)
43  {
44  ThresholdQueueAttr qattr(length, attr.GetMaxThreshold(),
45  attr.GetNumChannels(), usembs);
46  queue = new TQImpl(qattr);
47  }

+ Here is the call graph for this function:

Definition at line 50 of file CPNThresholdQueue.cc.

References oldqueue, and queue.

50  {
51  delete queue;
52  queue = 0;
53  delete oldqueue;
54  oldqueue = 0;
55  }

Member Function Documentation

unsigned CPN::QueueBase::Count ( ) const
inherited
Returns
the number of bytes in the queue.

Definition at line 205 of file QueueBase.cc.

References CPN::QueueBase::UnlockedCount().

205  {
206  AutoLock<const QueueBase> al(*this);
207  return UnlockedCount();
208  }
virtual unsigned UnlockedCount() const =0

+ Here is the call graph for this function:

void CPN::QueueBase::Dequeue ( unsigned  count)
inherited

This function is used to remove elements from the queue. count elements will be removed from the queue when this function is called.

Parameters
countthe number of bytes to remove from the queue

Definition at line 92 of file QueueBase.cc.

References CPN::QueueBase::dequeuethresh, CPN::QueueBase::GetRawDequeuePtr(), CPN::QueueBase::indequeue, CPN::QueueBase::InternalDequeue(), CPN::QueueBase::NotifyFreespace(), CPN::QueueBase::readerkey, and CPN::QueueBase::readshutdown.

Referenced by CPN::QueueBase::RawDequeue().

92  {
93  if (!GetRawDequeuePtr(count, 0)) { throw BrokenQueueException(readerkey); }
94  AutoLock<QueueBase> al(*this);
95  dequeuethresh = 0;
96  indequeue = false;
97  if (readshutdown) { throw BrokenQueueException(readerkey); }
98  InternalDequeue(count);
100  }
virtual void InternalDequeue(unsigned count)=0
unsigned dequeuethresh
Definition: QueueBase.h:282
const Key_t readerkey
Definition: QueueBase.h:273
const void * GetRawDequeuePtr(unsigned thresh, unsigned chan)
Definition: QueueBase.cc:63
void NotifyFreespace()
Definition: QueueBase.cc:362
bool readshutdown
Definition: QueueBase.h:275

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

unsigned CPN::QueueBase::DequeueChannelStride ( ) const
inherited
Returns
The current dequeue channel stride. This value is only guaranteed to be consistent inbetween a call to GetDequeuePtr and Dequeue.

Definition at line 240 of file QueueBase.cc.

References CPN::QueueBase::UnlockedDequeueChannelStride().

240  {
241  AutoLock<const QueueBase> al(*this);
243  }
virtual unsigned UnlockedDequeueChannelStride() const =0

+ Here is the call graph for this function:

void CPN::QueueBase::Detect ( )
protectedvirtualinherited

Called by the D4R algorithm when it has detected an artificial deadlock and this queue should be changed.

Implements D4R::QueueBase.

Definition at line 373 of file QueueBase.cc.

References CPN::KernelBase::CalculateGrowSize(), Logger::Debug(), CPN::QueueBase::kernel, CPN::QueueBase::logger, CPN::QueueBase::UnlockedCount(), CPN::QueueBase::UnlockedGrow(), CPN::QueueBase::UnlockedMaxThreshold(), CPN::QueueBase::UnlockedQueueLength(), and CPN::QueueBase::writerequest.

373  {
374  unsigned size = kernel->CalculateGrowSize(UnlockedCount(), writerequest);
375  logger.Debug("Detect: Grow(%u, %u)", size, writerequest);
376  UnlockedGrow(size, writerequest);
377  logger.Debug("New size: (%u, %u)", UnlockedQueueLength(), UnlockedMaxThreshold());
378  }
virtual unsigned UnlockedMaxThreshold() const =0
Logger logger
Definition: QueueBase.h:288
virtual unsigned UnlockedCount() const =0
unsigned writerequest
Definition: QueueBase.h:280
KernelBase * kernel
Definition: QueueBase.h:286
virtual unsigned CalculateGrowSize(unsigned currentsize, unsigned request)=0
virtual void UnlockedGrow(unsigned queueLen, unsigned maxThresh)=0
virtual unsigned UnlockedQueueLength() const =0
void Debug(const char *fmt,...)
Definition: Logger.cc:186

+ Here is the call graph for this function:

bool CPN::QueueBase::Empty ( ) const
inherited
Returns
true if the queue is empty

Definition at line 210 of file QueueBase.cc.

References CPN::QueueBase::UnlockedEmpty().

210  {
211  AutoLock<const QueueBase> al(*this);
212  return UnlockedEmpty();
213  }
virtual bool UnlockedEmpty() const =0

+ Here is the call graph for this function:

void CPN::QueueBase::Enqueue ( unsigned  count)
inherited

This function is used to release the buffer obtained with GetRawEnqueuePtr. The count specifies the number of entries that we want to be placed in the buffer.

Note
A call to this function without an accompanying call to GetRawEnqueuePtr is undefined.
Parameters
countthe number of bytes to be placed in the buffer
Invariant
count <= thresh from GetRawEnqueuePtr

Definition at line 160 of file QueueBase.cc.

References CPN::QueueBase::enqueuethresh, CPN::QueueBase::inenqueue, CPN::QueueBase::InternalEnqueue(), CPN::QueueBase::NotifyData(), CPN::QueueBase::writerkey, and CPN::QueueBase::writeshutdown.

Referenced by CPN::ThresholdQueue::TQImpl::Grow(), and CPN::QueueBase::RawEnqueue().

160  {
161  AutoLock<QueueBase> al(*this);
162  enqueuethresh = 0;
163  inenqueue = false;
164  if (writeshutdown) { throw BrokenQueueException(writerkey); }
165  InternalEnqueue(count);
166  NotifyData();
167  }
virtual void InternalEnqueue(unsigned count)=0
void NotifyData()
Definition: QueueBase.cc:341
bool writeshutdown
Definition: QueueBase.h:276
unsigned enqueuethresh
Definition: QueueBase.h:281
const Key_t writerkey
Definition: QueueBase.h:274

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

unsigned CPN::QueueBase::EnqueueChannelStride ( ) const
inherited
Returns
The current enqueue channel stride. This value is only guaranteed to be consistent inbetween a call to GetEnqueuePtr and Enqueue.

Definition at line 235 of file QueueBase.cc.

References CPN::QueueBase::UnlockedEnqueueChannelStride().

235  {
236  AutoLock<const QueueBase> al(*this);
238  }
virtual unsigned UnlockedEnqueueChannelStride() const =0

+ Here is the call graph for this function:

void CPN::QueueBase::Flush ( )
inherited

Definition at line 186 of file QueueBase.cc.

References CPN::QueueBase::InternalFlush().

186  {
187  AutoLock<const QueueBase> al(*this);
188  InternalFlush();
189  }
virtual void InternalFlush()
Definition: QueueBase.cc:265

+ Here is the call graph for this function:

bool CPN::QueueBase::Flushed ( ) const
inherited

Definition at line 191 of file QueueBase.cc.

References CPN::QueueBase::flushed.

191  {
192  AutoLock<const QueueBase> al(*this);
193  return flushed;
194  }
unsigned CPN::QueueBase::Freespace ( ) const
inherited
Returns
the number of bytes we can add to the queue without blocking.

Definition at line 215 of file QueueBase.cc.

References CPN::QueueBase::UnlockedFreespace().

215  {
216  AutoLock<const QueueBase> al(*this);
217  return UnlockedFreespace();
218  }
virtual unsigned UnlockedFreespace() const =0

+ Here is the call graph for this function:

bool CPN::QueueBase::Full ( ) const
inherited
Returns
true if the queue is full, false otherwise

Definition at line 220 of file QueueBase.cc.

References CPN::QueueBase::UnlockedFull().

220  {
221  AutoLock<const QueueBase> al(*this);
222  return UnlockedFull();
223  }
virtual bool UnlockedFull() const =0

+ Here is the call graph for this function:

const std::string& CPN::QueueBase::GetDatatype ( ) const
inlineinherited
Returns
the datatype name associated with this queue

Definition at line 213 of file QueueBase.h.

213 { return datatype; }
std::string datatype
Definition: QueueBase.h:291
const void * CPN::QueueBase::GetRawDequeuePtr ( unsigned  thresh,
unsigned  chan 
)
inherited

Get a pointer to a buffer containing elements.

Note
access to the memory locations pointed to by the returned pointer after Dequeue has been called is undefined.
Parameters
threshthe number of bytes to get
chanthe channel to use
Returns
A void* to a block of memory containing thresh bytes or 0 if there are not thresh bytes available.

Definition at line 63 of file QueueBase.cc.

References ASSERT, CPN::KernelBase::CheckTerminated(), CPN::QueueBase::dequeuethresh, CPN::QueueBase::flushed, CPN::KernelBase::GrowQueueMaxThreshold(), CPN::QueueBase::indequeue, CPN::QueueBase::InternalGetRawDequeuePtr(), CPN::QueueBase::kernel, CPN::QueueBase::readerkey, CPN::QueueBase::readrequest, CPN::QueueBase::readshutdown, CPN::QueueBase::Signal(), CPN::QueueBase::UnlockedGrow(), CPN::QueueBase::UnlockedMaxThreshold(), CPN::QueueBase::WaitForData(), CPN::QueueBase::WriteBlocked(), CPN::QueueBase::writerequest, and CPN::QueueBase::writeshutdown.

Referenced by CPN::QueueBase::Dequeue(), and CPN::QueueBase::RawDequeue().

63  {
65  AutoLock<QueueBase> al(*this);
66  if (indequeue) { ASSERT(dequeuethresh >= thresh); }
67  else { dequeuethresh = thresh; }
68  while (true) {
69  const void *ptr = InternalGetRawDequeuePtr(thresh, chan);
70  if (ptr ||
72  || flushed) {
73  if (ptr) { indequeue = true; }
74  return ptr;
75  }
76  if (readshutdown) { throw BrokenQueueException(readerkey); }
77  if (thresh > UnlockedMaxThreshold() && kernel->GrowQueueMaxThreshold()) {
78  //printf("Grow(%u, %u)\n", 2*thresh, thresh);
79  UnlockedGrow(2*thresh, thresh);
80  Signal();
81  } else if (WriteBlocked() && kernel->GrowQueueMaxThreshold()) {
82  UnlockedGrow(writerequest + thresh, thresh);
83  Signal();
84  } else {
85  readrequest = thresh;
86  WaitForData();
87  readrequest = 0;
88  }
89  }
90  }
unsigned readrequest
Definition: QueueBase.h:279
virtual unsigned UnlockedMaxThreshold() const =0
virtual void WaitForData()
Definition: QueueBase.cc:326
unsigned writerequest
Definition: QueueBase.h:280
virtual bool WriteBlocked()
Definition: QueueBase.cc:357
virtual bool GrowQueueMaxThreshold()=0
virtual void Signal()
Definition: QueueBase.h:247
unsigned dequeuethresh
Definition: QueueBase.h:282
virtual void CheckTerminated()=0
const Key_t readerkey
Definition: QueueBase.h:273
KernelBase * kernel
Definition: QueueBase.h:286
bool writeshutdown
Definition: QueueBase.h:276
virtual const void * InternalGetRawDequeuePtr(unsigned thresh, unsigned chan)=0
bool readshutdown
Definition: QueueBase.h:275
virtual void UnlockedGrow(unsigned queueLen, unsigned maxThresh)=0
#define ASSERT(exp,...)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void * CPN::QueueBase::GetRawEnqueuePtr ( unsigned  thresh,
unsigned  chan 
)
inherited

Return a pointer to a buffer of memory that contains thresh entries that we can write into.

Note
A call to this function without an accompanying call to Enqueue is undefined.
An access to the memory locations defined by the return value is undefined after a call to Enqueue.
Parameters
threshthe number bytes we need in the returned buffer.
chanthe channel to use
Returns
void* to the memory buffer, 0 if not enough space available

Definition at line 121 of file QueueBase.cc.

References ASSERT, CPN::KernelBase::CheckTerminated(), CPN::QueueBase::enqueuethresh, CPN::QueueBase::flushed, CPN::KernelBase::GrowQueueMaxThreshold(), CPN::QueueBase::inenqueue, CPN::QueueBase::InternalGetRawEnqueuePtr(), CPN::QueueBase::kernel, CPN::QueueBase::ReadBlocked(), CPN::QueueBase::readrequest, CPN::QueueBase::readshutdown, CPN::QueueBase::Signal(), CPN::QueueBase::UnlockedGrow(), CPN::QueueBase::UnlockedMaxThreshold(), CPN::QueueBase::useD4R, CPN::QueueBase::Wait(), CPN::QueueBase::WaitForFreespace(), D4R::QueueBase::WriteBlock(), CPN::QueueBase::writerequest, CPN::QueueBase::writerkey, and CPN::QueueBase::writeshutdown.

Referenced by CPN::ThresholdQueue::TQImpl::Grow(), and CPN::QueueBase::RawEnqueue().

121  {
123  AutoLock<QueueBase> al(*this);
124  // Only the enqueuer may call flush and reset will clear all state as if we
125  // had not done anything yet, so block here
126  while (flushed) {
127  if (readshutdown || writeshutdown) { throw BrokenQueueException(writerkey); }
128  if (useD4R) {
129  WriteBlock(-1);
130  } else {
131  Wait();
132  }
133  }
134  if (inenqueue) { ASSERT(enqueuethresh >= thresh); }
135  else { enqueuethresh = thresh; }
136  bool grown = false;
137  while (true) {
138  void *ptr = InternalGetRawEnqueuePtr(thresh, chan);
139  if (ptr) {
140  inenqueue = true;
141  return ptr;
142  }
143  if (readshutdown || writeshutdown) { throw BrokenQueueException(writerkey); }
144  if (thresh > UnlockedMaxThreshold() && kernel->GrowQueueMaxThreshold()) {
145  //printf("Grow(%u, %u)\n", 2*thresh, thresh);
146  UnlockedGrow(2*thresh, thresh);
147  Signal();
148  } else if (!grown && ReadBlocked() && kernel->GrowQueueMaxThreshold()) {
149  UnlockedGrow(readrequest + thresh, thresh);
150  Signal();
151  grown = true;
152  } else {
153  writerequest = thresh;
155  writerequest = 0;
156  }
157  }
158  }
virtual bool ReadBlocked()
Definition: QueueBase.cc:336
unsigned readrequest
Definition: QueueBase.h:279
virtual unsigned UnlockedMaxThreshold() const =0
virtual void WaitForFreespace()
Definition: QueueBase.cc:347
virtual void Wait()
Definition: QueueBase.h:246
unsigned writerequest
Definition: QueueBase.h:280
virtual bool GrowQueueMaxThreshold()=0
void WriteBlock(unsigned qsize)
Definition: D4RQueue.cc:106
virtual void Signal()
Definition: QueueBase.h:247
virtual void CheckTerminated()=0
KernelBase * kernel
Definition: QueueBase.h:286
bool writeshutdown
Definition: QueueBase.h:276
virtual void * InternalGetRawEnqueuePtr(unsigned thresh, unsigned chan)=0
bool readshutdown
Definition: QueueBase.h:275
unsigned enqueuethresh
Definition: QueueBase.h:281
virtual void UnlockedGrow(unsigned queueLen, unsigned maxThresh)=0
const Key_t writerkey
Definition: QueueBase.h:274
#define ASSERT(exp,...)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Key_t CPN::QueueBase::GetReaderKey ( ) const
inlineinherited
Returns
the reader key associated with this queue

Definition at line 211 of file QueueBase.h.

211 { return readerkey; }
const Key_t readerkey
Definition: QueueBase.h:273
Key_t CPN::QueueBase::GetWriterKey ( ) const
inlineinherited
Returns
the writer key associated with this queue

Definition at line 209 of file QueueBase.h.

209 { return writerkey; }
const Key_t writerkey
Definition: QueueBase.h:274
void CPN::QueueBase::Grow ( unsigned  queueLen,
unsigned  maxThresh 
)
inherited

Ensure that this queue has at least queueLen bytes of space and can support at least maxThresh as the maxThreshold the new queue length will be max(queueLen, QueueLength()) and the new max threshold will be max(maxThresh, MaxThreshold())

Note
that the caller must guarantee that both an enqueue and dequeue are not both outstanding. Otherwise this will fail.
Parameters
queueLenthe next queue length
maxThreshthe next max threshold

Definition at line 245 of file QueueBase.cc.

References CPN::QueueBase::UnlockedGrow().

245  {
246  AutoLock<QueueBase> al(*this);
247  UnlockedGrow(queueLen, maxThresh);
248  }
virtual void UnlockedGrow(unsigned queueLen, unsigned maxThresh)=0

+ Here is the call graph for this function:

void ThresholdQueue< T >::InternalDequeue ( unsigned  count)
protectedvirtual

Implements CPN::QueueBase.

Reimplemented in CPN::RemoteQueue.

Definition at line 140 of file CPNThresholdQueue.cc.

References ThresholdQueueBase::Dequeue(), dequeueUseOld, oldqueue, and queue.

Referenced by CPN::RemoteQueue::InternalDequeue(), and CPN::RemoteQueue::SendEnqueuePacket().

140  {
141  if (dequeueUseOld) {
142  dequeueUseOld = false;
143  delete oldqueue;
144  oldqueue = 0;
145  }
146  queue->Dequeue(count);
147  }
void Dequeue(ulong count)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ThresholdQueue< T >::InternalEnqueue ( unsigned  count)
protectedvirtual

Implements CPN::QueueBase.

Reimplemented in CPN::RemoteQueue.

Definition at line 69 of file CPNThresholdQueue.cc.

References ASSERT, ThresholdQueueBase::Count(), ThresholdQueueBase::Dequeue(), ThresholdQueueBase::Enqueue(), enqueueUseOld, ThresholdQueueBase::GetRawDequeuePtr(), ThresholdQueueBase::GetRawEnqueuePtr(), ThresholdQueueBase::MaxThreshold(), ThresholdQueueBase::NumChannels(), oldqueue, and queue.

Referenced by CPN::RemoteQueue::EnqueuePacket(), and CPN::RemoteQueue::InternalEnqueue().

69  {
70  if (enqueueUseOld) {
72  oldqueue->Enqueue(count);
73 
74  unsigned count;
75  while ( (count = oldqueue->Count()) != 0 ) {
76  if (count > oldqueue->MaxThreshold()) {
77  count = oldqueue->MaxThreshold();
78  }
79  for (unsigned chan = 0; chan < queue->NumChannels(); chan++) {
80  const void* src = oldqueue->GetRawDequeuePtr(count, chan);
81  void* dst = queue->GetRawEnqueuePtr(count, chan);
82  ASSERT(src && dst);
83  memcpy(dst,src,count);
84  }
85 
86  queue->Enqueue(count);
87  oldqueue->Dequeue(count);
88  }
89 
90  enqueueUseOld = false;
91  delete oldqueue;
92  oldqueue = 0;
93  } else {
94  queue->Enqueue(count);
95  }
96  }
void Dequeue(ulong count)
ulong MaxThreshold(void) const
ulong Count(void) const
const void * GetRawDequeuePtr(ulong dequeueThresh, ulong chan=0) const
ulong NumChannels(void) const
void Enqueue(ulong count)
void * GetRawEnqueuePtr(ulong enqueueThresh, ulong chan=0) const
#define ASSERT(exp,...)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::QueueBase::InternalFlush ( )
protectedvirtualinherited

Reimplemented in CPN::RemoteQueue.

Definition at line 265 of file QueueBase.cc.

References PthreadCondition::Broadcast(), CPN::QueueBase::cond, CPN::QueueBase::flushed, CPN::QueueBase::InternalEnqueue(), CPN::QueueBase::InternalGetRawEnqueuePtr(), CPN::QueueBase::postpad, CPN::QueueBase::readshutdown, CPN::QueueBase::UnlockedEnqueueChannelStride(), CPN::QueueBase::UnlockedNumChannels(), CPN::QueueBase::WaitForFreespace(), CPN::QueueBase::writerequest, CPN::QueueBase::writerkey, and CPN::QueueBase::writeshutdown.

Referenced by CPN::QueueBase::Flush(), and CPN::RemoteQueue::InternalFlush().

265  {
266  if (readshutdown || writeshutdown) { throw BrokenQueueException(writerkey); }
267  if (postpad > 0) {
268  while (true) {
269  void *ptr = InternalGetRawEnqueuePtr(postpad, 0);
270  if (ptr) {
271  unsigned num_chans = UnlockedNumChannels();
272  unsigned chanstride = UnlockedEnqueueChannelStride();
273  for (unsigned chan = 0; chan < num_chans; ++chan) {
274  memset((char*)ptr + chanstride*chan, 0, postpad);
275  }
277  } else {
278  if (readshutdown || writeshutdown) { throw BrokenQueueException(writerkey); }
281  writerequest = 0;
282  }
283  }
284  }
285  flushed = true;
286  cond.Broadcast();
287  }
PthreadCondition & Broadcast(void)
virtual void WaitForFreespace()
Definition: QueueBase.cc:347
unsigned writerequest
Definition: QueueBase.h:280
unsigned postpad
Definition: QueueBase.h:278
virtual void InternalEnqueue(unsigned count)=0
virtual unsigned UnlockedNumChannels() const =0
virtual unsigned UnlockedEnqueueChannelStride() const =0
bool writeshutdown
Definition: QueueBase.h:276
virtual void * InternalGetRawEnqueuePtr(unsigned thresh, unsigned chan)=0
bool readshutdown
Definition: QueueBase.h:275
PthreadCondition cond
Definition: QueueBase.h:290
const Key_t writerkey
Definition: QueueBase.h:274

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const void * ThresholdQueue< T >::InternalGetRawDequeuePtr ( unsigned  thresh,
unsigned  chan 
)
protectedvirtual

Implements CPN::QueueBase.

Definition at line 126 of file CPNThresholdQueue.cc.

References ASSERT, dequeueUseOld, ThresholdQueueBase::GetRawDequeuePtr(), CPN::QueueBase::indequeue, oldqueue, and queue.

Referenced by CPN::RemoteQueue::SendEnqueuePacket().

126  {
127  const void *ret = 0;
128  if (dequeueUseOld) {
129  // The ONLY reason this code path should be followed is if the node made a getdequeueptr
130  // then called getdequeueptr again before dequeue when a grow happens inbetween
131  ASSERT(indequeue);
132  ret = oldqueue->GetRawDequeuePtr(thresh, chan);
133  ASSERT(ret);
134  } else {
135  ret = queue->GetRawDequeuePtr(thresh, chan);
136  }
137  return ret;
138  }
const void * GetRawDequeuePtr(ulong dequeueThresh, ulong chan=0) const
#define ASSERT(exp,...)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void * ThresholdQueue< T >::InternalGetRawEnqueuePtr ( unsigned  thresh,
unsigned  chan 
)
protectedvirtual

Implements CPN::QueueBase.

Definition at line 57 of file CPNThresholdQueue.cc.

References ASSERT, enqueueUseOld, ThresholdQueueBase::GetRawEnqueuePtr(), CPN::QueueBase::inenqueue, oldqueue, and queue.

Referenced by CPN::RemoteQueue::EnqueuePacket().

57  {
58  void *ret = 0;
59  if (enqueueUseOld) {
61  ret = oldqueue->GetRawEnqueuePtr(thresh, chan);
62  ASSERT(ret);
63  } else {
64  ret = queue->GetRawEnqueuePtr(thresh, chan);
65  }
66  return ret;
67  }
void * GetRawEnqueuePtr(ulong enqueueThresh, ulong chan=0) const
#define ASSERT(exp,...)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ThresholdQueue< T >::InternalReset ( )
protectedvirtual

Reimplemented from CPN::QueueBase.

Reimplemented in CPN::RemoteQueue.

Definition at line 173 of file CPNThresholdQueue.cc.

References dequeueUseOld, enqueueUseOld, CPN::QueueBase::InternalReset(), oldqueue, queue, and ThresholdQueueBase::Reset().

Referenced by CPN::RemoteQueue::InternalReset().

173  {
174  if (dequeueUseOld || enqueueUseOld) {
175  dequeueUseOld = false;
176  enqueueUseOld = false;
177  delete oldqueue;
178  oldqueue = 0;
179  }
180  queue->Reset();
182  }
virtual void InternalReset()
Definition: QueueBase.cc:289

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool CPN::QueueBase::IsReaderShutdown ( )
inherited

Definition at line 390 of file QueueBase.cc.

References CPN::QueueBase::readshutdown.

390  {
391  AutoLock<QueueBase> al(*this);
392  return readshutdown;
393  }
bool readshutdown
Definition: QueueBase.h:275
bool CPN::QueueBase::IsWriterShutdown ( )
inherited

Definition at line 395 of file QueueBase.cc.

References CPN::QueueBase::writeshutdown.

395  {
396  AutoLock<QueueBase> al(*this);
397  return writeshutdown;
398  }
bool writeshutdown
Definition: QueueBase.h:276
void CPN::QueueBase::Lock ( ) const
inlinevirtualinherited

These functions are to access the lock for the queue.

Implements D4R::QueueBase.

Definition at line 221 of file QueueBase.h.

References lock, and PthreadMutex::Lock().

221 { lock.Lock(); }
PthreadMutex & Lock(void)
Definition: PthreadMutex.h:49
PthreadMutex lock
Definition: QueueBase.h:289

+ Here is the call graph for this function:

void CPN::QueueBase::LogState ( )
virtualinherited

For debugging ONLY!! Otherwise non deterministic output.

Reimplemented in CPN::RemoteQueue.

Definition at line 400 of file QueueBase.cc.

References CPN::QueueBase::dequeuethresh, CPN::QueueBase::enqueuethresh, Logger::Error(), CPN::QueueBase::flushed, CPN::QueueBase::indequeue, CPN::QueueBase::inenqueue, CPN::QueueBase::logger, CPN::QueueBase::readerkey, CPN::QueueBase::readrequest, CPN::QueueBase::readshutdown, CPN::QueueBase::UnlockedCount(), CPN::QueueBase::UnlockedFreespace(), CPN::QueueBase::UnlockedMaxThreshold(), CPN::QueueBase::UnlockedNumDequeued(), CPN::QueueBase::UnlockedNumEnqueued(), CPN::QueueBase::UnlockedQueueLength(), CPN::QueueBase::writerequest, CPN::QueueBase::writerkey, and CPN::QueueBase::writeshutdown.

Referenced by CPN::RemoteQueue::LogState().

400  {
401  logger.Error("Printing state (w:%llu r:%llu)", readerkey, writerkey);
402  logger.Error("size: %u, maxthresh: %u count: %u free: %u, flushed: %d",
404  (int)flushed);
405  logger.Error("readrequest: %u, writerequest: %u numenqueued: %u, numdequeued: %u",
407  if (indequeue) {
408  logger.Error("Indequeue (thresh: %u)", dequeuethresh);
409  }
410  if (inenqueue) {
411  logger.Error("Inenqueue (thresh: %u)", enqueuethresh);
412  }
413  if (readshutdown) {
414  logger.Error("Reader shutdown");
415  }
416  if (writeshutdown) {
417  logger.Error("Writer shutdown");
418  }
419  }
unsigned readrequest
Definition: QueueBase.h:279
virtual unsigned UnlockedMaxThreshold() const =0
virtual unsigned UnlockedNumEnqueued() const =0
Logger logger
Definition: QueueBase.h:288
virtual unsigned UnlockedCount() const =0
void Error(const char *fmt,...)
Definition: Logger.cc:159
unsigned writerequest
Definition: QueueBase.h:280
virtual unsigned UnlockedFreespace() const =0
unsigned dequeuethresh
Definition: QueueBase.h:282
const Key_t readerkey
Definition: QueueBase.h:273
bool writeshutdown
Definition: QueueBase.h:276
bool readshutdown
Definition: QueueBase.h:275
unsigned enqueuethresh
Definition: QueueBase.h:281
const Key_t writerkey
Definition: QueueBase.h:274
virtual unsigned UnlockedNumDequeued() const =0
virtual unsigned UnlockedQueueLength() const =0

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

unsigned CPN::QueueBase::MaxThreshold ( ) const
inherited
Returns
the maximum threshold this queue supports in bytes

Definition at line 225 of file QueueBase.cc.

References CPN::QueueBase::UnlockedMaxThreshold().

Referenced by CPN::ThresholdQueue::TQImpl::Grow(), and UnlockedGrow().

225  {
226  AutoLock<const QueueBase> al(*this);
227  return UnlockedMaxThreshold();
228  }
virtual unsigned UnlockedMaxThreshold() const =0

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::QueueBase::NotifyData ( )
protectedinherited

Definition at line 341 of file QueueBase.cc.

References PthreadCondition::Broadcast(), CPN::QueueBase::cond, CPN::QueueBase::readrequest, and CPN::QueueBase::UnlockedCount().

Referenced by CPN::QueueBase::Enqueue(), and CPN::RemoteQueue::EnqueuePacket().

341  {
342  if (UnlockedCount() >= readrequest) {
343  cond.Broadcast();
344  }
345  }
unsigned readrequest
Definition: QueueBase.h:279
PthreadCondition & Broadcast(void)
virtual unsigned UnlockedCount() const =0
PthreadCondition cond
Definition: QueueBase.h:290

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::QueueBase::NotifyFreespace ( )
protectedinherited

Definition at line 362 of file QueueBase.cc.

References PthreadCondition::Broadcast(), CPN::QueueBase::cond, CPN::QueueBase::UnlockedFreespace(), and CPN::QueueBase::writerequest.

Referenced by CPN::QueueBase::Dequeue(), and CPN::RemoteQueue::SendEnqueuePacket().

362  {
363  if (UnlockedFreespace() >= writerequest) {
364  cond.Broadcast();
365  }
366  }
PthreadCondition & Broadcast(void)
unsigned writerequest
Definition: QueueBase.h:280
virtual unsigned UnlockedFreespace() const =0
PthreadCondition cond
Definition: QueueBase.h:290

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::QueueBase::NotifyTerminate ( )
inherited

Used to tell any waiting threads that the network is terminating.

Definition at line 368 of file QueueBase.cc.

References PthreadCondition::Broadcast(), and CPN::QueueBase::cond.

368  {
369  AutoLock<QueueBase> al(*this);
370  cond.Broadcast();
371  }
PthreadCondition & Broadcast(void)
PthreadCondition cond
Definition: QueueBase.h:290

+ Here is the call graph for this function:

unsigned CPN::QueueBase::NumChannels ( ) const
inherited
Returns
the number of channels supported by this queue.

Definition at line 200 of file QueueBase.cc.

References CPN::QueueBase::UnlockedNumChannels().

200  {
201  AutoLock<const QueueBase> al(*this);
202  return UnlockedNumChannels();
203  }
virtual unsigned UnlockedNumChannels() const =0

+ Here is the call graph for this function:

unsigned CPN::QueueBase::NumDequeued ( ) const
inherited

Definition at line 321 of file QueueBase.cc.

References CPN::QueueBase::UnlockedNumDequeued().

321  {
322  AutoLock<const QueueBase> al(*this);
323  return UnlockedNumDequeued();
324  }
virtual unsigned UnlockedNumDequeued() const =0

+ Here is the call graph for this function:

unsigned CPN::QueueBase::NumEnqueued ( ) const
inherited

Definition at line 316 of file QueueBase.cc.

References CPN::QueueBase::UnlockedNumEnqueued().

316  {
317  AutoLock<const QueueBase> al(*this);
318  return UnlockedNumEnqueued();
319  }
virtual unsigned UnlockedNumEnqueued() const =0

+ Here is the call graph for this function:

unsigned CPN::QueueBase::QueueLength ( ) const
inherited
Returns
the maximum number of bytes that can be put in this queue.

Definition at line 230 of file QueueBase.cc.

References CPN::QueueBase::UnlockedQueueLength().

Referenced by CPN::ThresholdQueue::TQImpl::Grow(), CPN::RemoteQueue::GrowPacket(), UnlockedGrow(), and CPN::RemoteQueue::UnlockedGrow().

230  {
231  AutoLock<const QueueBase> al(*this);
232  return UnlockedQueueLength();
233  }
virtual unsigned UnlockedQueueLength() const =0

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool CPN::QueueBase::RawDequeue ( void *  data,
unsigned  count,
unsigned  numChans,
unsigned  chanStride 
)
inherited

Dequeue data from the queue directly into the memory pointed to by data. This function shall be equivalent to a call to GetRawDequeuePtr then a memcpy and then a call to Dequeue.

Parameters
datapoiner to memory to dequeue to
countthe number of bytes to copy into data
numChansthe number of channels to write to
chanStridethe distance in bytes between the beginning of the channels in data.
Returns
true on success false on failure

Definition at line 102 of file QueueBase.cc.

References ASSERT, CPN::QueueBase::Dequeue(), and CPN::QueueBase::GetRawDequeuePtr().

Referenced by CPN::QueueBase::RawDequeue().

102  {
103  const void *src = GetRawDequeuePtr(count, 0);
104  char *dest = (char*)data;
105  if (!src) { return false; }
106  memcpy(dest, src, count);
107  for (unsigned chan = 1; chan < numChans; ++chan) {
108  src = GetRawDequeuePtr(count, chan);
109  ASSERT(src);
110  dest += chanStride;
111  memcpy(dest, src, count);
112  }
113  Dequeue(count);
114  return true;
115  }
void Dequeue(unsigned count)
Definition: QueueBase.cc:92
const void * GetRawDequeuePtr(unsigned thresh, unsigned chan)
Definition: QueueBase.cc:63
#define ASSERT(exp,...)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool CPN::QueueBase::RawDequeue ( void *  data,
unsigned  count 
)
inherited

A version of RawDequeue to use when there is only 1 channel.

Parameters
datathe data to enqueue
countthe number of bytes to enqueue
Returns
true on success false if we have reached the end of the queue

Definition at line 117 of file QueueBase.cc.

References CPN::QueueBase::RawDequeue().

117  {
118  return RawDequeue(data, count, 1, 0);
119  }
bool RawDequeue(void *data, unsigned count, unsigned numChans, unsigned chanStride)
Definition: QueueBase.cc:102

+ Here is the call graph for this function:

void CPN::QueueBase::RawEnqueue ( const void *  data,
unsigned  count,
unsigned  numChans,
unsigned  chanStride 
)
inherited

This function shall be equivalent to a call to GetRqwEnqueuePtr and a memcpy and then a call to Enqueue

The underlying implementatin may implement ether the GetRawEnqueuePtr and Enqueue or RawEnqueue and then implement the other in terms of the one implemented.

Parameters
datapointer to the memory to enqueue
countthe number of bytes to enqueue
numChansthe number of channels to write to
chanStridethe distance in bytes between the beginning of the channels in data.

Definition at line 169 of file QueueBase.cc.

References CPN::QueueBase::Enqueue(), and CPN::QueueBase::GetRawEnqueuePtr().

Referenced by CPN::QueueBase::RawEnqueue().

169  {
170  void *dest = GetRawEnqueuePtr(count, 0);
171  const char *src = (char*)data;
172  memcpy(dest, src, count);
173  for (unsigned chan = 1; chan < numChans; ++chan) {
174  dest = GetRawEnqueuePtr(count, chan);
175  src += chanStride;
176  memcpy(dest, src, count);
177  }
178  Enqueue(count);
179  }
void * GetRawEnqueuePtr(unsigned thresh, unsigned chan)
Definition: QueueBase.cc:121
void Enqueue(unsigned count)
Definition: QueueBase.cc:160

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::QueueBase::RawEnqueue ( const void *  data,
unsigned  count 
)
inherited

A version of RawEnqueue to use when there is only 1 channel.

Parameters
datapointer to the memory to enqueue
countthe number of bytes to enqueue

Definition at line 196 of file QueueBase.cc.

References CPN::QueueBase::RawEnqueue().

196  {
197  return RawEnqueue(data, count, 1, 0);
198  }
void RawEnqueue(const void *data, unsigned count, unsigned numChans, unsigned chanStride)
Definition: QueueBase.cc:169

+ Here is the call graph for this function:

void D4R::QueueBase::ReadBlock ( )
protectedinherited

reader ===> writer ReadBlock requires that you already hold the lock and if it is reentrant then a single unlock will release the lock.

Exceptions
D4R::DeadlockException

Definition at line 71 of file D4RQueue.cc.

References D4R::QueueBase::incomm, D4R::QueueBase::ReadBlocked(), D4R::QueueBase::reader, D4R::QueueBase::Signal(), D4R::QueueBase::Wait(), D4R::QueueBase::writer, and D4R::QueueBase::writetagchanged.

Referenced by CPN::QueueBase::WaitForData().

71  {
72  if (!writer) {
73  while (ReadBlocked() && !writer) {
74  Wait();
75  }
76  }
77  if (!ReadBlocked()) { return; }
78  writetagchanged = false;
79  try {
80  while (incomm) { Wait(); }
81  ScopeSetter<bool> ss(incomm, true);
82  AutoUnlock<QueueBase> au(*this);
83  reader->Block(writer->GetPublicTag(), -1);
84  } catch (...) { Signal(); throw; }
85  Signal();
86  while (ReadBlocked()) {
87  if (writetagchanged) {
88  writetagchanged = false;
89  bool detect;
90  try {
91  while (incomm) { Wait(); }
92  ScopeSetter<bool> ss(incomm, true);
93  AutoUnlock<QueueBase> au(*this);
94  detect = reader->Transmit(writer->GetPublicTag());
95  } catch (...) { Signal(); throw; }
96  Signal();
97  if (detect) {
98  throw DeadlockException("True deadlock detected");
99  }
100  } else {
101  Wait();
102  }
103  }
104  }
shared_ptr< Node > reader
Definition: D4RQueue.h:122
virtual void Wait()=0
shared_ptr< Node > writer
Definition: D4RQueue.h:123
virtual bool ReadBlocked()=0
virtual void Signal()=0
bool writetagchanged
Definition: D4RQueue.h:125

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool CPN::QueueBase::ReadBlocked ( )
protectedvirtualinherited
Returns
true if we are blocked

Implements D4R::QueueBase.

Definition at line 336 of file QueueBase.cc.

References CPN::KernelBase::CheckTerminated(), CPN::QueueBase::flushed, CPN::QueueBase::kernel, CPN::QueueBase::readrequest, CPN::QueueBase::readshutdown, CPN::QueueBase::UnlockedCount(), and CPN::QueueBase::writeshutdown.

Referenced by CPN::QueueBase::GetRawEnqueuePtr(), CPN::RemoteQueue::WaitForData(), and CPN::QueueBase::WaitForData().

336  {
339  }
unsigned readrequest
Definition: QueueBase.h:279
virtual unsigned UnlockedCount() const =0
virtual void CheckTerminated()=0
KernelBase * kernel
Definition: QueueBase.h:286
bool writeshutdown
Definition: QueueBase.h:276
bool readshutdown
Definition: QueueBase.h:275

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

unsigned CPN::QueueBase::ReadRequest ( )
inherited

For unit tests.

Definition at line 380 of file QueueBase.cc.

References CPN::QueueBase::readrequest.

380  {
381  AutoLock<QueueBase> al(*this);
382  return readrequest;
383  }
unsigned readrequest
Definition: QueueBase.h:279
void CPN::QueueBase::Reset ( )
inherited

Definition at line 181 of file QueueBase.cc.

References CPN::QueueBase::InternalReset().

181  {
182  AutoLock<const QueueBase> al(*this);
183  InternalReset();
184  }
virtual void InternalReset()
Definition: QueueBase.cc:289

+ Here is the call graph for this function:

void D4R::QueueBase::SetReaderNode ( shared_ptr< Node n)
inherited

Set the node which is reading from this queue

Parameters
nthe node

Definition at line 59 of file D4RQueue.cc.

References D4R::QueueBase::reader, and D4R::QueueBase::Signal().

Referenced by CPN::RemoteQueue::RemoteQueue().

59  {
60  AutoLock<QueueBase> al(*this);
61  reader = n;
62  Signal();
63  }
shared_ptr< Node > reader
Definition: D4RQueue.h:122
virtual void Signal()=0

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void D4R::QueueBase::SetWriterNode ( shared_ptr< Node n)
inherited

Set the node which is writing to this queue.

Parameters
nthe node

Definition at line 65 of file D4RQueue.cc.

References D4R::QueueBase::Signal(), and D4R::QueueBase::writer.

Referenced by CPN::RemoteQueue::RemoteQueue().

65  {
66  AutoLock<QueueBase> al(*this);
67  writer = n;
68  Signal();
69  }
shared_ptr< Node > writer
Definition: D4RQueue.h:123
virtual void Signal()=0

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::QueueBase::ShutdownReader ( )
inherited

Called by the QueueReader when no more data will be read.

Definition at line 250 of file QueueBase.cc.

References CPN::QueueBase::UnlockedShutdownReader().

Referenced by CPN::RemoteQueue::FileThreadEntryPoint().

250  {
251  AutoLock<QueueBase> al(*this);
253  }
virtual void UnlockedShutdownReader()
Definition: QueueBase.cc:255

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::QueueBase::ShutdownWriter ( )
inherited

Called by the QueueWriter when no more data will be written.

Definition at line 260 of file QueueBase.cc.

References CPN::QueueBase::UnlockedShutdownWriter().

Referenced by CPN::RemoteQueue::FileThreadEntryPoint().

260  {
261  AutoLock<QueueBase> al(*this);
263  }
virtual void UnlockedShutdownWriter()
Definition: QueueBase.cc:311

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

virtual void CPN::QueueBase::Signal ( )
inlineprotectedvirtualinherited

Signal that Wait should return

Implements D4R::QueueBase.

Reimplemented in CPN::RemoteQueue.

Definition at line 247 of file QueueBase.h.

Referenced by CPN::QueueBase::GetRawDequeuePtr(), CPN::QueueBase::GetRawEnqueuePtr(), CPN::RemoteQueue::Signal(), CPN::QueueBase::UnlockedShutdownReader(), and CPN::QueueBase::UnlockedShutdownWriter().

247 { cond.Broadcast(); }
PthreadCondition & Broadcast(void)
PthreadCondition cond
Definition: QueueBase.h:290

+ Here is the caller graph for this function:

void D4R::QueueBase::SignalReaderTagChanged ( )
inherited

These functions are called by the D4R::Node when thier tag changes.

Definition at line 139 of file D4RQueue.cc.

References D4R::QueueBase::UnlockedSignalReaderTagChanged().

139  {
140  AutoLock<QueueBase> al(*this);
142  }
virtual void UnlockedSignalReaderTagChanged()
Definition: D4RQueue.cc:144

+ Here is the call graph for this function:

void D4R::QueueBase::SignalWriterTagChanged ( )
inherited

Definition at line 150 of file D4RQueue.cc.

References D4R::QueueBase::UnlockedSignalWriterTagChanged().

150  {
151  AutoLock<QueueBase> al(*this);
153  }
virtual void UnlockedSignalWriterTagChanged()
Definition: D4RQueue.cc:155

+ Here is the call graph for this function:

void CPN::QueueBase::Unlock ( void  ) const
inlinevirtualinherited

Implements D4R::QueueBase.

Definition at line 222 of file QueueBase.h.

References lock, and PthreadMutex::Unlock().

222 { lock.Unlock(); }
PthreadMutex & Unlock(void)
Definition: PthreadMutex.h:50
PthreadMutex lock
Definition: QueueBase.h:289

+ Here is the call graph for this function:

unsigned ThresholdQueue< T >::UnlockedCount ( ) const
protectedvirtual

Implements CPN::QueueBase.

Reimplemented in CPN::RemoteQueue.

Definition at line 149 of file CPNThresholdQueue.cc.

References ThresholdQueueBase::Count(), and queue.

149  {
150  return queue->Count();
151  }
ulong Count(void) const

+ Here is the call graph for this function:

unsigned ThresholdQueue< T >::UnlockedDequeueChannelStride ( ) const
protectedvirtual

Implements CPN::QueueBase.

Definition at line 110 of file CPNThresholdQueue.cc.

References ThresholdQueueBase::ChannelStride(), dequeueUseOld, oldqueue, and queue.

110  {
111  if (dequeueUseOld) {
112  return oldqueue->ChannelStride();
113  } else {
114  return queue->ChannelStride();
115  }
116  }
ulong ChannelStride(void) const

+ Here is the call graph for this function:

bool ThresholdQueue< T >::UnlockedEmpty ( ) const
protectedvirtual

Implements CPN::QueueBase.

Reimplemented in CPN::RemoteQueue.

Definition at line 153 of file CPNThresholdQueue.cc.

References ThresholdQueueBase::Empty(), and queue.

153  {
154  return queue->Empty();
155  }
bool Empty(void) const

+ Here is the call graph for this function:

unsigned ThresholdQueue< T >::UnlockedEnqueueChannelStride ( ) const
protectedvirtual

Implements CPN::QueueBase.

Definition at line 102 of file CPNThresholdQueue.cc.

References ThresholdQueueBase::ChannelStride(), enqueueUseOld, oldqueue, and queue.

102  {
103  if (enqueueUseOld) {
104  return oldqueue->ChannelStride();
105  } else {
106  return queue->ChannelStride();
107  }
108  }
ulong ChannelStride(void) const

+ Here is the call graph for this function:

unsigned ThresholdQueue< T >::UnlockedFreespace ( ) const
protectedvirtual

Implements CPN::QueueBase.

Definition at line 118 of file CPNThresholdQueue.cc.

References ThresholdQueueBase::Freespace(), and queue.

Referenced by CPN::RemoteQueue::GetState().

118  {
119  return queue->Freespace();
120  }
ulong Freespace(void) const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool ThresholdQueue< T >::UnlockedFull ( ) const
protectedvirtual

Implements CPN::QueueBase.

Definition at line 122 of file CPNThresholdQueue.cc.

References ThresholdQueueBase::Full(), and queue.

122  {
123  return queue->Full();
124  }
bool Full(void) const

+ Here is the call graph for this function:

void ThresholdQueue< T >::UnlockedGrow ( unsigned  queueLen,
unsigned  maxThresh 
)
protectedvirtual

Implements CPN::QueueBase.

Reimplemented in CPN::RemoteQueue.

Definition at line 184 of file CPNThresholdQueue.cc.

References ASSERT, dequeueUseOld, enqueueUseOld, CPN::ThresholdQueue::TQImpl::Grow(), CPN::QueueBase::indequeue, CPN::QueueBase::inenqueue, CPN::QueueBase::MaxThreshold(), oldqueue, queue, and CPN::QueueBase::QueueLength().

Referenced by CPN::RemoteQueue::EnqueuePacket(), CPN::RemoteQueue::GrowPacket(), and CPN::RemoteQueue::UnlockedGrow().

184  {
185  if (queueLen <= queue->QueueLength() && maxThresh <= queue->MaxThreshold()) return;
186  ASSERT(!(inenqueue && indequeue), "Unhandled grow case of having an outstanding dequeue and enqueue");
187  if (oldqueue) {
188  // If the old queue is still around we have to still be in the same state
191  queue->Grow(queueLen, maxThresh, false);
192  } else if (!inenqueue && !indequeue) {
193  queue->Grow(queueLen, maxThresh, false);
194  } else {
197  // this should make a duplicate
198  oldqueue = queue->Grow(queueLen, maxThresh, true);
199  if (!oldqueue) {
200  enqueueUseOld = false;
201  enqueueUseOld = false;
202  }
203  }
204  }
unsigned QueueLength() const
Definition: QueueBase.cc:230
unsigned MaxThreshold() const
Definition: QueueBase.cc:225
TQImpl * Grow(unsigned queueLen, unsigned maxThresh, bool copy)
#define ASSERT(exp,...)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

unsigned ThresholdQueue< T >::UnlockedMaxThreshold ( ) const
protectedvirtual

Implements CPN::QueueBase.

Definition at line 157 of file CPNThresholdQueue.cc.

References ThresholdQueueBase::MaxThreshold(), and queue.

Referenced by CPN::RemoteQueue::GetState(), and CPN::RemoteQueue::UnlockedGrow().

157  {
158  return queue->MaxThreshold();
159  }
ulong MaxThreshold(void) const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

unsigned ThresholdQueue< T >::UnlockedNumChannels ( ) const
protectedvirtual

Implements CPN::QueueBase.

Definition at line 98 of file CPNThresholdQueue.cc.

References ThresholdQueueBase::NumChannels(), and queue.

98  {
99  return queue->NumChannels();
100  }
ulong NumChannels(void) const

+ Here is the call graph for this function:

unsigned ThresholdQueue< T >::UnlockedNumDequeued ( ) const
protectedvirtual

Implements CPN::QueueBase.

Definition at line 169 of file CPNThresholdQueue.cc.

References ThresholdQueueBase::ElementsDequeued(), and queue.

Referenced by CPN::RemoteQueue::GetState().

169  {
170  return queue->ElementsDequeued();
171  }
ulong ElementsDequeued(void) const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

unsigned ThresholdQueue< T >::UnlockedNumEnqueued ( ) const
protectedvirtual

Implements CPN::QueueBase.

Definition at line 165 of file CPNThresholdQueue.cc.

References ThresholdQueueBase::ElementsEnqueued(), and queue.

Referenced by CPN::RemoteQueue::GetState().

165  {
166  return queue->ElementsEnqueued();
167  }
ulong ElementsEnqueued(void) const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

unsigned ThresholdQueue< T >::UnlockedQueueLength ( ) const
protectedvirtual

Implements CPN::QueueBase.

Reimplemented in CPN::RemoteQueue.

Definition at line 161 of file CPNThresholdQueue.cc.

References queue, and ThresholdQueueBase::QueueLength().

161  {
162  return queue->QueueLength();
163  }
ulong QueueLength(void) const

+ Here is the call graph for this function:

void CPN::QueueBase::UnlockedShutdownReader ( )
protectedvirtualinherited

Definition at line 255 of file QueueBase.cc.

References CPN::QueueBase::readshutdown, and CPN::QueueBase::Signal().

Referenced by CPN::RemoteQueue::EndOfReadPacket(), CPN::RemoteQueue::InternalCheckStatus(), and CPN::QueueBase::ShutdownReader().

255  {
256  readshutdown = true;
257  Signal();
258  }
virtual void Signal()
Definition: QueueBase.h:247
bool readshutdown
Definition: QueueBase.h:275

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::QueueBase::UnlockedShutdownWriter ( )
protectedvirtualinherited

Definition at line 311 of file QueueBase.cc.

References CPN::QueueBase::Signal(), and CPN::QueueBase::writeshutdown.

Referenced by CPN::RemoteQueue::EndOfWritePacket(), CPN::RemoteQueue::InternalCheckStatus(), and CPN::QueueBase::ShutdownWriter().

311  {
312  writeshutdown = true;
313  Signal();
314  }
virtual void Signal()
Definition: QueueBase.h:247
bool writeshutdown
Definition: QueueBase.h:276

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void D4R::QueueBase::UnlockedSignalReaderTagChanged ( )
protectedvirtualinherited

Reimplemented in CPN::RemoteQueue.

Definition at line 144 of file D4RQueue.cc.

References DEBUG, D4R::QueueBase::reader, D4R::QueueBase::readtagchanged, D4R::QueueBase::Signal(), and D4R::QueueBase::writer.

Referenced by CPN::RemoteQueue::D4RTagPacket(), D4R::QueueBase::SignalReaderTagChanged(), and CPN::RemoteQueue::UnlockedSignalReaderTagChanged().

144  {
145  DEBUG("%s: (%llu -> %llu)\n", __PRETTY_FUNCTION__, (writer ? writer->GetPrivateTag().Key() : 0), (reader ? reader->GetPrivateTag().Key() : 0));
146  readtagchanged = true;
147  Signal();
148  }
shared_ptr< Node > reader
Definition: D4RQueue.h:122
#define DEBUG(fmt,...)
Definition: D4RQueue.cc:34
shared_ptr< Node > writer
Definition: D4RQueue.h:123
bool readtagchanged
Definition: D4RQueue.h:124
virtual void Signal()=0

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void D4R::QueueBase::UnlockedSignalWriterTagChanged ( )
protectedvirtualinherited

Reimplemented in CPN::RemoteQueue.

Definition at line 155 of file D4RQueue.cc.

References DEBUG, D4R::QueueBase::reader, D4R::QueueBase::Signal(), D4R::QueueBase::writer, and D4R::QueueBase::writetagchanged.

Referenced by CPN::RemoteQueue::D4RTagPacket(), D4R::QueueBase::SignalWriterTagChanged(), and CPN::RemoteQueue::UnlockedSignalWriterTagChanged().

155  {
156  DEBUG("%s: (%llu -> %llu)\n", __PRETTY_FUNCTION__, (writer ? writer->GetPrivateTag().Key() : 0), (reader ? reader->GetPrivateTag().Key() : 0));
157  writetagchanged = true;
158  Signal();
159  }
shared_ptr< Node > reader
Definition: D4RQueue.h:122
#define DEBUG(fmt,...)
Definition: D4RQueue.cc:34
shared_ptr< Node > writer
Definition: D4RQueue.h:123
virtual void Signal()=0
bool writetagchanged
Definition: D4RQueue.h:125

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

virtual void CPN::QueueBase::Wait ( )
inlineprotectedvirtualinherited

Wait untill Signal is called. Must be holding the lock to call.

Implements D4R::QueueBase.

Definition at line 246 of file QueueBase.h.

References lock.

Referenced by CPN::RemoteQueue::ActionThreadEntryPoint(), CPN::QueueBase::GetRawEnqueuePtr(), CPN::RemoteQueue::WaitForData(), and CPN::RemoteQueue::WaitForFreespace().

246 { cond.Wait(lock); }
PthreadCondition & Wait(PthreadMutex &mutex)
PthreadCondition cond
Definition: QueueBase.h:290
PthreadMutex lock
Definition: QueueBase.h:289

+ Here is the caller graph for this function:

void CPN::QueueBase::WaitForData ( )
protectedvirtualinherited

Reimplemented in CPN::RemoteQueue.

Definition at line 326 of file QueueBase.cc.

References CPN::QueueBase::cond, CPN::QueueBase::lock, D4R::QueueBase::ReadBlock(), CPN::QueueBase::ReadBlocked(), CPN::QueueBase::useD4R, and PthreadCondition::Wait().

Referenced by CPN::QueueBase::GetRawDequeuePtr(), and CPN::RemoteQueue::WaitForData().

326  {
327  if (useD4R) {
328  ReadBlock();
329  } else {
330  while (ReadBlocked()) {
331  cond.Wait(lock);
332  }
333  }
334  }
PthreadCondition & Wait(PthreadMutex &mutex)
virtual bool ReadBlocked()
Definition: QueueBase.cc:336
void ReadBlock()
Definition: D4RQueue.cc:71
PthreadCondition cond
Definition: QueueBase.h:290
PthreadMutex lock
Definition: QueueBase.h:289

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::QueueBase::WaitForFreespace ( )
protectedvirtualinherited

Reimplemented in CPN::RemoteQueue.

Definition at line 347 of file QueueBase.cc.

References CPN::QueueBase::cond, CPN::QueueBase::lock, CPN::QueueBase::UnlockedQueueLength(), CPN::QueueBase::useD4R, PthreadCondition::Wait(), D4R::QueueBase::WriteBlock(), and CPN::QueueBase::WriteBlocked().

Referenced by CPN::QueueBase::GetRawEnqueuePtr(), CPN::QueueBase::InternalFlush(), and CPN::RemoteQueue::WaitForFreespace().

347  {
348  if (useD4R) {
350  } else {
351  while (WriteBlocked()) {
352  cond.Wait(lock);
353  }
354  }
355  }
PthreadCondition & Wait(PthreadMutex &mutex)
virtual bool WriteBlocked()
Definition: QueueBase.cc:357
void WriteBlock(unsigned qsize)
Definition: D4RQueue.cc:106
PthreadCondition cond
Definition: QueueBase.h:290
PthreadMutex lock
Definition: QueueBase.h:289
virtual unsigned UnlockedQueueLength() const =0

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void D4R::QueueBase::WriteBlock ( unsigned  qsize)
protectedinherited

writer ===> reader WriteBlock requires that you already hold the lock and if it is reentrant then a single unlock will release the lock.

Definition at line 106 of file D4RQueue.cc.

References D4R::QueueBase::Detect(), D4R::QueueBase::incomm, D4R::QueueBase::reader, D4R::QueueBase::readtagchanged, D4R::QueueBase::Signal(), D4R::QueueBase::Wait(), D4R::QueueBase::WriteBlocked(), and D4R::QueueBase::writer.

Referenced by CPN::QueueBase::GetRawEnqueuePtr(), and CPN::QueueBase::WaitForFreespace().

106  {
107  if (!reader) {
108  while (WriteBlocked() && !reader) {
109  Wait();
110  }
111  }
112  if (!WriteBlocked()) { return; }
113  readtagchanged = false;
114  try {
115  while (incomm) { Wait(); }
116  ScopeSetter<bool> ss(incomm, true);
117  AutoUnlock<QueueBase> au(*this);
118  writer->Block(reader->GetPublicTag(), qsize);
119  } catch (...) { Signal(); throw; }
120  Signal();
121  while (WriteBlocked()) {
122  if (readtagchanged) {
123  readtagchanged = false;
124  bool detect;
125  try {
126  while (incomm) { Wait(); }
127  ScopeSetter<bool> ss(incomm, true);
128  AutoUnlock<QueueBase> au(*this);
129  detect = writer->Transmit(reader->GetPublicTag());
130  } catch (...) { Signal(); throw; }
131  Signal();
132  if (detect) { Detect(); }
133  } else {
134  Wait();
135  }
136  }
137  }
virtual void Detect()=0
shared_ptr< Node > reader
Definition: D4RQueue.h:122
virtual bool WriteBlocked()=0
virtual void Wait()=0
shared_ptr< Node > writer
Definition: D4RQueue.h:123
bool readtagchanged
Definition: D4RQueue.h:124
virtual void Signal()=0

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool CPN::QueueBase::WriteBlocked ( )
protectedvirtualinherited
Returns
true if we are blocked.

Implements D4R::QueueBase.

Definition at line 357 of file QueueBase.cc.

References CPN::KernelBase::CheckTerminated(), CPN::QueueBase::flushed, CPN::QueueBase::kernel, CPN::QueueBase::readshutdown, CPN::QueueBase::UnlockedFreespace(), CPN::QueueBase::writerequest, and CPN::QueueBase::writeshutdown.

Referenced by CPN::QueueBase::GetRawDequeuePtr(), CPN::RemoteQueue::WaitForFreespace(), and CPN::QueueBase::WaitForFreespace().

357  {
360  }
unsigned writerequest
Definition: QueueBase.h:280
virtual unsigned UnlockedFreespace() const =0
virtual void CheckTerminated()=0
KernelBase * kernel
Definition: QueueBase.h:286
bool writeshutdown
Definition: QueueBase.h:276
bool readshutdown
Definition: QueueBase.h:275

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

unsigned CPN::QueueBase::WriteRequest ( )
inherited

For unit tests.

Definition at line 385 of file QueueBase.cc.

References CPN::QueueBase::writerequest.

385  {
386  AutoLock<QueueBase> al(*this);
387  return writerequest;
388  }
unsigned writerequest
Definition: QueueBase.h:280

Member Data Documentation

PthreadCondition CPN::QueueBase::cond
protectedinherited
std::string CPN::QueueBase::datatype
protectedinherited

Definition at line 291 of file QueueBase.h.

unsigned CPN::QueueBase::dequeuethresh
protectedinherited
bool CPN::ThresholdQueue::dequeueUseOld
protected
unsigned CPN::QueueBase::enqueuethresh
protectedinherited
bool CPN::ThresholdQueue::enqueueUseOld
protected
bool CPN::QueueBase::flushed
protectedinherited
bool D4R::QueueBase::incomm
protectedinherited

Definition at line 126 of file D4RQueue.h.

Referenced by D4R::QueueBase::ReadBlock(), and D4R::QueueBase::WriteBlock().

bool CPN::QueueBase::indequeue
protectedinherited
bool CPN::QueueBase::inenqueue
protectedinherited
KernelBase* CPN::QueueBase::kernel
protectedinherited
PthreadMutex CPN::QueueBase::lock
mutableprotectedinherited
Logger CPN::QueueBase::logger
protectedinherited
TQImpl* CPN::ThresholdQueue::oldqueue
protected
unsigned CPN::QueueBase::postpad
protectedinherited

Definition at line 278 of file QueueBase.h.

Referenced by CPN::QueueBase::InternalFlush().

unsigned CPN::QueueBase::prepad
protectedinherited

Definition at line 277 of file QueueBase.h.

Referenced by CPN::QueueBase::InternalReset().

TQImpl* CPN::ThresholdQueue::queue
protected
shared_ptr<Node> D4R::QueueBase::reader
protectedinherited
const Key_t CPN::QueueBase::readerkey
protectedinherited
unsigned CPN::QueueBase::readrequest
protectedinherited
bool CPN::QueueBase::readshutdown
protectedinherited
bool D4R::QueueBase::readtagchanged
protectedinherited
bool CPN::QueueBase::useD4R
protectedinherited
shared_ptr<Node> D4R::QueueBase::writer
protectedinherited
unsigned CPN::QueueBase::writerequest
protectedinherited
const Key_t CPN::QueueBase::writerkey
protectedinherited
bool CPN::QueueBase::writeshutdown
protectedinherited
bool D4R::QueueBase::writetagchanged
protectedinherited

The documentation for this class was generated from the following files: