CPN
Computational Process Networks
Public Types | Public Member Functions | Protected Member Functions | Protected Attributes | Private Member Functions | Static Private Member Functions | Private Attributes | List of all members
CPN::RemoteQueue Class Reference

#include <RemoteQueue.h>

+ Inheritance diagram for CPN::RemoteQueue:
+ Collaboration diagram for CPN::RemoteQueue:

Public Types

enum  Mode_t { READ, WRITE }
 

Public Member Functions

 RemoteQueue (KernelBase *k, Mode_t mode, ConnectionServer *s, RemoteQueueHolder *h, const SimpleQueueAttr &attr, bool usembs)
 
 ~RemoteQueue ()
 
void Start ()
 
Mode_t GetMode () const
 
Key_t GetKey () const
 
void Shutdown ()
 
void LogState ()
 For debug ONLY! More...
 
const void * GetRawDequeuePtr (unsigned thresh, unsigned chan)
 
void Dequeue (unsigned count)
 
bool RawDequeue (void *data, unsigned count, unsigned numChans, unsigned chanStride)
 
bool RawDequeue (void *data, unsigned count)
 
void * GetRawEnqueuePtr (unsigned thresh, unsigned chan)
 
void Enqueue (unsigned count)
 
void RawEnqueue (const void *data, unsigned count, unsigned numChans, unsigned chanStride)
 
void RawEnqueue (const void *data, unsigned count)
 
void Reset ()
 
void Flush ()
 
bool Flushed () const
 
unsigned NumChannels () const
 
unsigned Count () const
 
bool Empty () const
 
unsigned Freespace () const
 
bool Full () const
 
unsigned MaxThreshold () const
 
unsigned QueueLength () const
 
unsigned EnqueueChannelStride () const
 
unsigned DequeueChannelStride () const
 
void Grow (unsigned queueLen, unsigned maxThresh)
 
Key_t GetWriterKey () const
 
Key_t GetReaderKey () const
 
const std::string & GetDatatype () const
 
void ShutdownReader ()
 Called by the QueueReader when no more data will be read. More...
 
void ShutdownWriter ()
 Called by the QueueWriter when no more data will be written. More...
 
void NotifyTerminate ()
 Used to tell any waiting threads that the network is terminating. More...
 
void Lock () const
 
void Unlock () const
 
unsigned ReadRequest ()
 For unit tests. More...
 
unsigned WriteRequest ()
 For unit tests. More...
 
bool IsReaderShutdown ()
 
bool IsWriterShutdown ()
 
unsigned NumEnqueued () const
 
unsigned NumDequeued () const
 
void SetReaderNode (shared_ptr< Node > n)
 
void SetWriterNode (shared_ptr< Node > n)
 
void SignalReaderTagChanged ()
 
void SignalWriterTagChanged ()
 

Protected Member Functions

virtual void * InternalGetRawEnqueuePtr (unsigned thresh, unsigned chan)
 
virtual const void * InternalGetRawDequeuePtr (unsigned thresh, unsigned chan)
 
virtual unsigned UnlockedNumChannels () const
 
virtual unsigned UnlockedMaxThreshold () const
 
virtual unsigned UnlockedFreespace () const
 
virtual bool UnlockedFull () const
 
virtual unsigned UnlockedEnqueueChannelStride () const
 
virtual unsigned UnlockedDequeueChannelStride () const
 
unsigned UnlockedNumEnqueued () const
 
unsigned UnlockedNumDequeued () const
 
virtual bool ReadBlocked ()
 
void NotifyData ()
 
virtual bool WriteBlocked ()
 
void NotifyFreespace ()
 
virtual void Wait ()
 
virtual void Detect ()
 
virtual void UnlockedShutdownReader ()
 
virtual void UnlockedShutdownWriter ()
 
void ReadBlock ()
 
void WriteBlock (unsigned qsize)
 

Protected Attributes

TQImplqueue
 
TQImploldqueue
 
bool enqueueUseOld
 
bool dequeueUseOld
 
const Key_t readerkey
 
const Key_t writerkey
 
bool readshutdown
 
bool writeshutdown
 
unsigned prepad
 
unsigned postpad
 
unsigned readrequest
 
unsigned writerequest
 
unsigned enqueuethresh
 
unsigned dequeuethresh
 
bool indequeue
 
bool inenqueue
 
bool flushed
 
KernelBasekernel
 
bool useD4R
 
Logger logger
 
PthreadMutex lock
 
PthreadCondition cond
 
std::string datatype
 
shared_ptr< Node > reader
 
shared_ptr< Node > writer
 
bool readtagchanged
 
bool writetagchanged
 
bool incomm
 

Private Member Functions

void UnlockedShutdown ()
 
unsigned UnlockedCount () const
 
bool UnlockedEmpty () const
 
unsigned UnlockedQueueLength () const
 
void UnlockedGrow (unsigned queueLen, unsigned maxThresh)
 
void Signal ()
 
void WaitForData ()
 
void WaitForFreespace ()
 
void InternalDequeue (unsigned count)
 
void InternalEnqueue (unsigned count)
 
void InternalFlush ()
 
void InternalReset ()
 
void UnlockedSignalReaderTagChanged ()
 
void UnlockedSignalWriterTagChanged ()
 
void SetupPacket (Packet &packet)
 
void EnqueuePacket (const Packet &packet)
 
void SendEnqueuePacket ()
 
void DequeuePacket (const Packet &packet)
 
void SendDequeuePacket ()
 
void ReadBlockPacket (const Packet &packet)
 
void SendReadBlockPacket ()
 
void WriteBlockPacket (const Packet &packet)
 
void SendWriteBlockPacket ()
 
void EndOfWritePacket (const Packet &packet)
 
void SendEndOfWritePacket ()
 
void EndOfReadPacket (const Packet &packet)
 
void SendEndOfReadPacket ()
 
void GrowPacket (const Packet &packet)
 
void SendGrowPacket ()
 
void D4RTagPacket (const Packet &packet)
 
void SendD4RTagPacket ()
 
void FlushPacket (const Packet &packet)
 
void SendFlushPacket ()
 
void ResetPacket (const Packet &packet)
 
void SendResetPacket ()
 
void IDReaderPacket (const Packet &packet)
 
void IDWriterPacket (const Packet &packet)
 
void Read ()
 
void WriteBytes (const iovec *iov, unsigned iovcnt)
 
void * FileThreadEntryPoint ()
 
void * ActionThreadEntryPoint ()
 
void InternalCheckStatus ()
 
void UpdateClock (const Packet &packet)
 
void TickClock ()
 
std::string ClockString () const
 
void HandleError (const ErrnoException &e)
 
std::string GetState ()
 For debugging. More...
 
template<typename Queue_t >
void SendEnqueue (const Packet &packet, Queue_t &queue)
 
void SendPacket (const Packet &packet)
 
void SendPacket (const Packet &packet, void *data)
 
void * GetDecoderBytes (unsigned &amount)
 
void ReleaseDecoderBytes (unsigned amount)
 
void Reset ()
 
unsigned NumBytes () const
 
void FirePacket (const Packet &packet)
 

Static Private Member Functions

static unsigned QueueLength (unsigned length, unsigned maxthresh, double alpha, Mode_t mode)
 

Private Attributes

auto_ptr< PthreadfileThread
 
auto_ptr< PthreadactionThread
 
bool pendingAction
 
bool actionTick
 
PthreadCondition actionCond
 
const Mode_t mode
 
const double alpha
 
unsigned maxwritethreshold
 
ConnectionServer *const server
 
RemoteQueueHolder *const holder
 
SocketHandle sock
 
shared_ptr< D4R::Nodemocknode
 
uint64_t clock
 
uint64_t readclock
 
uint64_t writeclock
 
unsigned readerlength
 
unsigned writerlength
 
unsigned bytecount
 
bool pendingBlock
 
bool sentEnd
 
bool pendingGrow
 
bool pendingD4RTag
 
bool tagUpdated
 
bool pendingFlush
 
bool pendingReset
 
bool dead
 

Detailed Description

The RemoteQueue is a specialization of the ThresholdQueue which is split in half across a socket. This class works closely with ConnectionServer and RemoteQueueHolder.

Definition at line 49 of file RemoteQueue.h.

Member Enumeration Documentation

Enumerator
READ 
WRITE 

Definition at line 57 of file RemoteQueue.h.

Constructor & Destructor Documentation

CPN::RemoteQueue::RemoteQueue ( KernelBase k,
Mode_t  mode,
ConnectionServer s,
RemoteQueueHolder h,
const SimpleQueueAttr attr,
bool  usembs 
)

Definition at line 49 of file RemoteQueue.cc.

References actionThread, ActionThreadEntryPoint(), CreatePthreadFunctional(), fileThread, FileThreadEntryPoint(), CPN::QueueBase::logger, mocknode, mode, Logger::Name(), READ, CPN::QueueBase::readerkey, D4R::QueueBase::SetReaderNode(), D4R::QueueBase::SetWriterNode(), Logger::Trace(), and CPN::QueueBase::writerkey.

51  : ThresholdQueue(k, attr,
52  QueueLength(attr.GetLength(), attr.GetMaxThreshold(), attr.GetAlpha(), mode_), usembs),
53  pendingAction(false),
54  actionTick(false),
55  mode(mode_),
56  alpha(attr.GetAlpha()),
57  maxwritethreshold(attr.GetMaxWriteThreshold()),
58  server(s),
59  holder(h),
60  mocknode(new D4R::Node(mode_ == READ ? attr.GetWriterNodeKey() : attr.GetReaderNodeKey())),
61  clock(0),
62  readclock(0),
63  writeclock(0),
64  readerlength(QueueLength(attr.GetLength(), attr.GetMaxThreshold(), attr.GetAlpha(), READ)),
65  writerlength(QueueLength(attr.GetLength(), attr.GetMaxThreshold(), attr.GetAlpha(), WRITE)),
66  bytecount(0),
67  pendingBlock(false),
68  sentEnd(false),
69  pendingGrow(false),
70  pendingD4RTag(false),
71  tagUpdated(false),
72  pendingFlush(false),
73  pendingReset(false),
74  dead(false)
75  {
76  PthreadAttr pattr;
77  pattr.StackSize(1<<16); // magic number, the file thread shouldn't use any more than this
78  fileThread = auto_ptr<Pthread>(CreatePthreadFunctional(this, &RemoteQueue::FileThreadEntryPoint, pattr));
80  if (mode == READ) {
82  } else {
84  }
85  std::ostringstream oss;
86  oss << "RemoteQueue(m:";
87  if (mode == READ) { oss << "r"; }
88  else { oss << "w"; }
89  oss << ", r:" << readerkey << ", w:" << writerkey << ")";
90  logger.Name(oss.str());
91  logger.Trace("Constructed");
92  }
unsigned maxwritethreshold
Definition: RemoteQueue.h:150
ThresholdQueue(KernelBase *k, const SimpleQueueAttr &attr, bool usembs)
ConnectionServer *const server
Definition: RemoteQueue.h:151
unsigned QueueLength() const
Definition: QueueBase.cc:230
unsigned readerlength
Definition: RemoteQueue.h:159
Logger logger
Definition: QueueBase.h:288
void SetReaderNode(shared_ptr< Node > n)
Definition: D4RQueue.cc:59
void Trace(const char *fmt,...)
Definition: Logger.cc:195
unsigned writerlength
Definition: RemoteQueue.h:160
auto_ptr< Pthread > fileThread
Definition: RemoteQueue.h:142
void * ActionThreadEntryPoint()
Definition: RemoteQueue.cc:662
void * FileThreadEntryPoint()
Definition: RemoteQueue.cc:583
shared_ptr< D4R::Node > mocknode
Definition: RemoteQueue.h:154
auto_ptr< Pthread > actionThread
Definition: RemoteQueue.h:143
const std::string & Name() const
Definition: Logger.cc:88
PthreadFunctional * CreatePthreadFunctional(T *obj, void *(T::*meth)(void))
const Key_t readerkey
Definition: QueueBase.h:273
RemoteQueueHolder *const holder
Definition: RemoteQueue.h:152
const Mode_t mode
Definition: RemoteQueue.h:148
const double alpha
Definition: RemoteQueue.h:149
unsigned bytecount
Definition: RemoteQueue.h:168
void SetWriterNode(shared_ptr< Node > n)
Definition: D4RQueue.cc:65
uint64_t readclock
Definition: RemoteQueue.h:156
const Key_t writerkey
Definition: QueueBase.h:274
uint64_t writeclock
Definition: RemoteQueue.h:157

+ Here is the call graph for this function:

CPN::RemoteQueue::~RemoteQueue ( )

Definition at line 94 of file RemoteQueue.cc.

References actionCond, actionThread, ASSERT_ABORT, PthreadCondition::Broadcast(), ClockString(), dead, fileThread, FUNC_TRACE, CPN::QueueBase::logger, Signal(), and Logger::Trace().

94  {
95  ASSERT_ABORT(dead, "Shutdown but not dead!?");
96  Signal();
99  std::string clockstr = ClockString();
100  logger.Trace("Destructed (c: %s)", clockstr.c_str());
101  fileThread->Start();
102  fileThread->Join();
103  actionThread->Start();
104  actionThread->Join();
105  fileThread.reset();
106  actionThread.reset();
107  }
PthreadCondition & Broadcast(void)
Logger logger
Definition: QueueBase.h:288
void Trace(const char *fmt,...)
Definition: Logger.cc:195
#define FUNC_TRACE(logger)
Definition: RemoteQueue.cc:43
auto_ptr< Pthread > fileThread
Definition: RemoteQueue.h:142
#define ASSERT_ABORT(exp,...)
auto_ptr< Pthread > actionThread
Definition: RemoteQueue.h:143
std::string ClockString() const
Definition: RemoteQueue.cc:819
PthreadCondition actionCond
Definition: RemoteQueue.h:146

+ Here is the call graph for this function:

Member Function Documentation

void * CPN::RemoteQueue::ActionThreadEntryPoint ( )
private

Definition at line 662 of file RemoteQueue.cc.

References actionCond, actionTick, PthreadCondition::Broadcast(), dead, InternalCheckStatus(), pendingAction, and CPN::QueueBase::Wait().

Referenced by RemoteQueue().

662  {
663  AutoLock<QueueBase> al(*this);
664  while (!dead) {
666  actionTick = true;
668  while(!pendingAction && !dead) {
669  Wait();
670  }
671  pendingAction = false;
672  }
673  return 0;
674  }
PthreadCondition & Broadcast(void)
virtual void Wait()
Definition: QueueBase.h:246
void InternalCheckStatus()
Definition: RemoteQueue.cc:676
PthreadCondition actionCond
Definition: RemoteQueue.h:146

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::string CPN::RemoteQueue::ClockString ( ) const
private

Definition at line 819 of file RemoteQueue.cc.

References clock, readclock, and writeclock.

Referenced by FileThreadEntryPoint(), GetState(), HandleError(), InternalCheckStatus(), LogState(), and ~RemoteQueue().

819  {
820  std::ostringstream oss;
821  oss << "<" << clock << "," << readclock << "," << writeclock << ">";
822  return oss.str();
823  }
uint64_t readclock
Definition: RemoteQueue.h:156
uint64_t writeclock
Definition: RemoteQueue.h:157

+ Here is the caller graph for this function:

unsigned CPN::QueueBase::Count ( ) const
inherited
Returns
the number of bytes in the queue.

Definition at line 205 of file QueueBase.cc.

References CPN::QueueBase::UnlockedCount().

205  {
206  AutoLock<const QueueBase> al(*this);
207  return UnlockedCount();
208  }
virtual unsigned UnlockedCount() const =0

+ Here is the call graph for this function:

void CPN::RemoteQueue::D4RTagPacket ( const Packet packet)
privatevirtual

Implements CPN::PacketHandler.

Definition at line 458 of file RemoteQueue.cc.

References ASSERT, CPN::Packet::DataLength(), FUNC_TRACE, CPN::QueueBase::logger, mocknode, mode, FileHandle::Read(), Signal(), sock, tagUpdated, D4R::QueueBase::UnlockedSignalReaderTagChanged(), D4R::QueueBase::UnlockedSignalWriterTagChanged(), UpdateClock(), and WRITE.

458  {
459  UpdateClock(packet);
461  ASSERT(packet.DataLength() == sizeof(D4R::Tag));
462  D4R::Tag tag;
463  unsigned numread = sock.Read(&tag, sizeof(tag));
464  ASSERT(numread == sizeof(tag));
465  tagUpdated = true;
466  Signal();
467  mocknode->SetPublicTag(tag);
468  if (mode == WRITE) {
470  } else {
472  }
473  }
Logger logger
Definition: QueueBase.h:288
void UpdateClock(const Packet &packet)
Definition: RemoteQueue.cc:805
SocketHandle sock
Definition: RemoteQueue.h:153
#define FUNC_TRACE(logger)
Definition: RemoteQueue.cc:43
shared_ptr< D4R::Node > mocknode
Definition: RemoteQueue.h:154
virtual void UnlockedSignalReaderTagChanged()
Definition: D4RQueue.cc:144
virtual void UnlockedSignalWriterTagChanged()
Definition: D4RQueue.cc:155
const Mode_t mode
Definition: RemoteQueue.h:148
unsigned Read(void *ptr, unsigned len)
Read data from the file descriptor.
Definition: FileHandle.cc:159
#define ASSERT(exp,...)

+ Here is the call graph for this function:

void CPN::QueueBase::Dequeue ( unsigned  count)
inherited

This function is used to remove elements from the queue. count elements will be removed from the queue when this function is called.

Parameters
countthe number of bytes to remove from the queue

Definition at line 92 of file QueueBase.cc.

References CPN::QueueBase::dequeuethresh, CPN::QueueBase::GetRawDequeuePtr(), CPN::QueueBase::indequeue, CPN::QueueBase::InternalDequeue(), CPN::QueueBase::NotifyFreespace(), CPN::QueueBase::readerkey, and CPN::QueueBase::readshutdown.

Referenced by CPN::QueueBase::RawDequeue().

92  {
93  if (!GetRawDequeuePtr(count, 0)) { throw BrokenQueueException(readerkey); }
94  AutoLock<QueueBase> al(*this);
95  dequeuethresh = 0;
96  indequeue = false;
97  if (readshutdown) { throw BrokenQueueException(readerkey); }
98  InternalDequeue(count);
100  }
virtual void InternalDequeue(unsigned count)=0
unsigned dequeuethresh
Definition: QueueBase.h:282
const Key_t readerkey
Definition: QueueBase.h:273
const void * GetRawDequeuePtr(unsigned thresh, unsigned chan)
Definition: QueueBase.cc:63
void NotifyFreespace()
Definition: QueueBase.cc:362
bool readshutdown
Definition: QueueBase.h:275

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

unsigned CPN::QueueBase::DequeueChannelStride ( ) const
inherited
Returns
The current dequeue channel stride. This value is only guaranteed to be consistent inbetween a call to GetDequeuePtr and Dequeue.

Definition at line 240 of file QueueBase.cc.

References CPN::QueueBase::UnlockedDequeueChannelStride().

240  {
241  AutoLock<const QueueBase> al(*this);
243  }
virtual unsigned UnlockedDequeueChannelStride() const =0

+ Here is the call graph for this function:

void CPN::RemoteQueue::DequeuePacket ( const Packet packet)
privatevirtual

Implements CPN::PacketHandler.

Definition at line 335 of file RemoteQueue.cc.

References ASSERT, bytecount, CPN::Packet::Count(), FUNC_TRACE, CPN::QueueBase::logger, mode, CPN::QueueBase::readrequest, CPN::QueueBase::readshutdown, UpdateClock(), and WRITE.

335  {
336  UpdateClock(packet);
338  ASSERT(mode == WRITE);
340  readrequest = 0;
341  bytecount -= packet.Count();
342  }
unsigned readrequest
Definition: QueueBase.h:279
Logger logger
Definition: QueueBase.h:288
void UpdateClock(const Packet &packet)
Definition: RemoteQueue.cc:805
#define FUNC_TRACE(logger)
Definition: RemoteQueue.cc:43
const Mode_t mode
Definition: RemoteQueue.h:148
bool readshutdown
Definition: QueueBase.h:275
unsigned bytecount
Definition: RemoteQueue.h:168
#define ASSERT(exp,...)

+ Here is the call graph for this function:

void CPN::QueueBase::Detect ( )
protectedvirtualinherited

Called by the D4R algorithm when it has detected an artificial deadlock and this queue should be changed.

Implements D4R::QueueBase.

Definition at line 373 of file QueueBase.cc.

References CPN::KernelBase::CalculateGrowSize(), Logger::Debug(), CPN::QueueBase::kernel, CPN::QueueBase::logger, CPN::QueueBase::UnlockedCount(), CPN::QueueBase::UnlockedGrow(), CPN::QueueBase::UnlockedMaxThreshold(), CPN::QueueBase::UnlockedQueueLength(), and CPN::QueueBase::writerequest.

373  {
374  unsigned size = kernel->CalculateGrowSize(UnlockedCount(), writerequest);
375  logger.Debug("Detect: Grow(%u, %u)", size, writerequest);
376  UnlockedGrow(size, writerequest);
377  logger.Debug("New size: (%u, %u)", UnlockedQueueLength(), UnlockedMaxThreshold());
378  }
virtual unsigned UnlockedMaxThreshold() const =0
Logger logger
Definition: QueueBase.h:288
virtual unsigned UnlockedCount() const =0
unsigned writerequest
Definition: QueueBase.h:280
KernelBase * kernel
Definition: QueueBase.h:286
virtual unsigned CalculateGrowSize(unsigned currentsize, unsigned request)=0
virtual void UnlockedGrow(unsigned queueLen, unsigned maxThresh)=0
virtual unsigned UnlockedQueueLength() const =0
void Debug(const char *fmt,...)
Definition: Logger.cc:186

+ Here is the call graph for this function:

bool CPN::QueueBase::Empty ( ) const
inherited
Returns
true if the queue is empty

Definition at line 210 of file QueueBase.cc.

References CPN::QueueBase::UnlockedEmpty().

210  {
211  AutoLock<const QueueBase> al(*this);
212  return UnlockedEmpty();
213  }
virtual bool UnlockedEmpty() const =0

+ Here is the call graph for this function:

void CPN::RemoteQueue::EndOfReadPacket ( const Packet packet)
privatevirtual

Implements CPN::PacketHandler.

Definition at line 417 of file RemoteQueue.cc.

References ASSERT, FUNC_TRACE, CPN::QueueBase::logger, mode, CPN::QueueBase::readshutdown, CPN::QueueBase::UnlockedShutdownReader(), UpdateClock(), and WRITE.

417  {
418  UpdateClock(packet);
420  ASSERT(mode == WRITE);
423  }
Logger logger
Definition: QueueBase.h:288
void UpdateClock(const Packet &packet)
Definition: RemoteQueue.cc:805
virtual void UnlockedShutdownReader()
Definition: QueueBase.cc:255
#define FUNC_TRACE(logger)
Definition: RemoteQueue.cc:43
const Mode_t mode
Definition: RemoteQueue.h:148
bool readshutdown
Definition: QueueBase.h:275
#define ASSERT(exp,...)

+ Here is the call graph for this function:

void CPN::RemoteQueue::EndOfWritePacket ( const Packet packet)
privatevirtual

Implements CPN::PacketHandler.

Definition at line 396 of file RemoteQueue.cc.

References ASSERT, FUNC_TRACE, CPN::QueueBase::logger, mode, READ, CPN::QueueBase::UnlockedShutdownWriter(), UpdateClock(), and CPN::QueueBase::writeshutdown.

396  {
397  UpdateClock(packet);
399  ASSERT(mode == READ);
402  }
virtual void UnlockedShutdownWriter()
Definition: QueueBase.cc:311
Logger logger
Definition: QueueBase.h:288
void UpdateClock(const Packet &packet)
Definition: RemoteQueue.cc:805
#define FUNC_TRACE(logger)
Definition: RemoteQueue.cc:43
bool writeshutdown
Definition: QueueBase.h:276
const Mode_t mode
Definition: RemoteQueue.h:148
#define ASSERT(exp,...)

+ Here is the call graph for this function:

void CPN::QueueBase::Enqueue ( unsigned  count)
inherited

This function is used to release the buffer obtained with GetRawEnqueuePtr. The count specifies the number of entries that we want to be placed in the buffer.

Note
A call to this function without an accompanying call to GetRawEnqueuePtr is undefined.
Parameters
countthe number of bytes to be placed in the buffer
Invariant
count <= thresh from GetRawEnqueuePtr

Definition at line 160 of file QueueBase.cc.

References CPN::QueueBase::enqueuethresh, CPN::QueueBase::inenqueue, CPN::QueueBase::InternalEnqueue(), CPN::QueueBase::NotifyData(), CPN::QueueBase::writerkey, and CPN::QueueBase::writeshutdown.

Referenced by CPN::ThresholdQueue::TQImpl::Grow(), and CPN::QueueBase::RawEnqueue().

160  {
161  AutoLock<QueueBase> al(*this);
162  enqueuethresh = 0;
163  inenqueue = false;
164  if (writeshutdown) { throw BrokenQueueException(writerkey); }
165  InternalEnqueue(count);
166  NotifyData();
167  }
virtual void InternalEnqueue(unsigned count)=0
void NotifyData()
Definition: QueueBase.cc:341
bool writeshutdown
Definition: QueueBase.h:276
unsigned enqueuethresh
Definition: QueueBase.h:281
const Key_t writerkey
Definition: QueueBase.h:274

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

unsigned CPN::QueueBase::EnqueueChannelStride ( ) const
inherited
Returns
The current enqueue channel stride. This value is only guaranteed to be consistent inbetween a call to GetEnqueuePtr and Enqueue.

Definition at line 235 of file QueueBase.cc.

References CPN::QueueBase::UnlockedEnqueueChannelStride().

235  {
236  AutoLock<const QueueBase> al(*this);
238  }
virtual unsigned UnlockedEnqueueChannelStride() const =0

+ Here is the call graph for this function:

void CPN::RemoteQueue::EnqueuePacket ( const Packet packet)
privatevirtual

Implements CPN::PacketHandler.

Definition at line 242 of file RemoteQueue.cc.

References ASSERT, bytecount, clock, ThresholdQueueBase::Count(), CPN::Packet::Count(), CPN::Packet::DataLength(), FileHandle::Eof(), ThresholdQueueBase::Freespace(), FUNC_TRACE, CPN::QueueBase::inenqueue, CPN::ThresholdQueue::InternalEnqueue(), CPN::ThresholdQueue::InternalGetRawEnqueuePtr(), CPN::QueueBase::logger, ThresholdQueueBase::MaxThreshold(), mode, CPN::QueueBase::NotifyData(), ThresholdQueueBase::NumChannels(), CPN::ThresholdQueue::queue, ThresholdQueueBase::QueueLength(), READ, readclock, FileHandle::Readv(), sock, CPN::ThresholdQueue::UnlockedGrow(), UpdateClock(), Logger::Warn(), writeclock, CPN::QueueBase::writerequest, and CPN::QueueBase::writeshutdown.

242  {
243  UpdateClock(packet);
245  ASSERT(mode == READ);
247  const unsigned numchannels = queue->NumChannels();
248  unsigned count = packet.Count();
249  if (count > queue->Freespace() || count > queue->MaxThreshold()) {
250  logger.Warn("Enqueue packet too large, silently growing queue. "
251  "Packet size: %u, Freespace: %u, MaxThresh: %u, QueueLength: %u",
252  count, queue->Freespace(), queue->MaxThreshold(), queue->QueueLength());
253  ThresholdQueue::UnlockedGrow(queue->Count() + count, count);
254  }
255  std::vector<iovec> iovs;
256  for (unsigned i = 0; i < numchannels; ++i) {
257  iovec iov;
258  iov.iov_base = ThresholdQueue::InternalGetRawEnqueuePtr(count, i);
259  ASSERT(iov.iov_base, "Internal throttle failed! (c: <%llu,%llu,%llu>)", clock, readclock, writeclock);
260  iov.iov_len = count;
261  iovs.push_back(iov);
262  }
263  inenqueue = true;
264  {
265  AutoUnlock<QueueBase> aul(*this);
266  unsigned numread = 0;
267  unsigned numtoread = packet.DataLength();
268  unsigned i = 0;
269  while (numread < numtoread) {
270  unsigned num = sock.Readv(&iovs[i], iovs.size() - i);
271  if (num == 0) {
272  if (sock.Eof()) {
273  return;
274  }
275  }
276  numread += num;
277  if (numread == numtoread) break;
278  while (iovs[i].iov_len <= num) {
279  num -= iovs[i].iov_len;
280  ++i;
281  }
282  iovs[i].iov_len -= num;
283  iovs[i].iov_base = ((char*)iovs[i].iov_base) + num;
284  }
285  ASSERT(numread == packet.DataLength());
286  }
287  inenqueue = false;
289  bytecount += count;
290  writerequest = 0;
291  NotifyData();
292  }
virtual void UnlockedGrow(unsigned queueLen, unsigned maxThresh)
Logger logger
Definition: QueueBase.h:288
void UpdateClock(const Packet &packet)
Definition: RemoteQueue.cc:805
virtual void * InternalGetRawEnqueuePtr(unsigned thresh, unsigned chan)
ulong MaxThreshold(void) const
unsigned writerequest
Definition: QueueBase.h:280
SocketHandle sock
Definition: RemoteQueue.h:153
ulong Count(void) const
ulong NumChannels(void) const
#define FUNC_TRACE(logger)
Definition: RemoteQueue.cc:43
ulong QueueLength(void) const
ulong Freespace(void) const
void NotifyData()
Definition: QueueBase.cc:341
virtual void InternalEnqueue(unsigned count)
bool writeshutdown
Definition: QueueBase.h:276
const Mode_t mode
Definition: RemoteQueue.h:148
void Warn(const char *fmt,...)
Definition: Logger.cc:168
unsigned bytecount
Definition: RemoteQueue.h:168
bool Eof() const
Definition: FileHandle.h:115
uint64_t readclock
Definition: RemoteQueue.h:156
unsigned Readv(const iovec *iov, int iovcnt)
scatter gather io version of Read
Definition: FileHandle.cc:190
#define ASSERT(exp,...)
uint64_t writeclock
Definition: RemoteQueue.h:157

+ Here is the call graph for this function:

void * CPN::RemoteQueue::FileThreadEntryPoint ( )
private

Definition at line 583 of file RemoteQueue.cc.

References actionCond, actionThread, actionTick, CPN::RemoteQueueHolder::CleanupQueue(), ClockString(), FileHandle::Closed(), CPN::ConnectionServer::ConnectReader(), CPN::ConnectionServer::ConnectWriter(), dead, Logger::Debug(), Logger::Error(), FileHandle::FD(), GetKey(), CPN::RemoteQueueHolder::GetWakeup(), HandleError(), holder, CPN::KernelBase::IsTerminated(), CPN::QueueBase::kernel, CPN::QueueBase::lock, CPN::QueueBase::logger, mode, FileHandle::Poll(), FileHandle::Reset(), server, SocketHandle::SetNoDelay(), Shutdown(), CPN::QueueBase::ShutdownReader(), CPN::QueueBase::ShutdownWriter(), Signal(), sock, PthreadCondition::Wait(), CPN::ConnectionServer::Wakeup(), ErrnoException::what(), and WRITE.

Referenced by RemoteQueue().

583  {
584  try {
585  while (true) {
586  try {
587  if (kernel->IsTerminated()) {
588  std::string clockstr = ClockString();
589  logger.Debug("Forced Shutdown (c: %s)", clockstr.c_str());
590  Shutdown();
591  }
592  {
593  AutoLock<QueueBase> al(*this);
594  if (dead) {
595  std::string clockstr = ClockString();
596  logger.Debug("Shutdown (c: %s)", clockstr.c_str());
597  break;
598  }
599  }
600  if (sock.Closed()) {
601  logger.Debug("Connecting");
602  shared_ptr<Sync::Future<int> > conn;
603  if (mode == WRITE) {
604  conn = server->ConnectWriter(GetKey());
605  } else {
606  conn = server->ConnectReader(GetKey());
607  }
608  if (conn) {
609  sock.Reset();
610  sock.FD(conn->Get());
611  }
612  if (sock.Closed()) {
613  logger.Debug("Connection Failed");
614  } else {
615  logger.Debug("Connected");
616  actionThread->Start();
617  sock.SetNoDelay(true);
618  }
619  } else {
620  FileHandle *fds[2];
621  fds[0] = &sock;
622  fds[1] = holder->GetWakeup();
623  FileHandle::Poll(fds, fds+2, -1);
624  {
625  AutoLock<QueueBase> al(*this);
626  Signal();
627  actionTick = false;
628  while (!actionTick && !dead) {
630  }
631  }
632  }
633  } catch (const ErrnoException &e) {
634  AutoLock<QueueBase> al(*this);
635  HandleError(e);
636  }
637  }
638  } catch (const ShutdownException &e) {
639  std::string clockstr = ClockString();
640  logger.Debug("Forced Shutdown threw exception (c: %s)", clockstr.c_str());
641  // Do nothing, this just breaks us out of the loop
642  // Can be thrown from ConnectWriter or ConnectReader
643  } catch (const ErrnoException &e) {
644  logger.Error(e.what());
645  ShutdownReader();
646  ShutdownWriter();
647  Shutdown();
648  } catch (...) {
650  throw;
651  }
653  server->Wakeup();
654  return 0;
655  }
PthreadCondition & Wait(PthreadMutex &mutex)
ConnectionServer *const server
Definition: RemoteQueue.h:151
Logger logger
Definition: QueueBase.h:288
void ShutdownReader()
Called by the QueueReader when no more data will be read.
Definition: QueueBase.cc:250
void Error(const char *fmt,...)
Definition: Logger.cc:159
SocketHandle sock
Definition: RemoteQueue.h:153
WakeupHandle * GetWakeup()
static int Poll(IteratorRef< FileHandle * > begin, IteratorRef< FileHandle * > end, double timeout)
poll a list of FileHandles for any activity and call the appropriate On method.
Definition: FileHandle.cc:34
bool Closed() const
Definition: FileHandle.h:128
void HandleError(const ErrnoException &e)
Definition: RemoteQueue.cc:825
void SetNoDelay(bool nodelay)
auto_ptr< Pthread > actionThread
Definition: RemoteQueue.h:143
shared_ptr< Sync::Future< int > > ConnectReader(Key_t readerkey)
KernelBase * kernel
Definition: QueueBase.h:286
void CleanupQueue(Key_t key)
std::string ClockString() const
Definition: RemoteQueue.cc:819
void Reset()
Clear all internal state including the file descriptor! WARNING does not close the file! ...
Definition: FileHandle.cc:138
int FD() const
Definition: FileHandle.h:106
RemoteQueueHolder *const holder
Definition: RemoteQueue.h:152
const Mode_t mode
Definition: RemoteQueue.h:148
PthreadCondition actionCond
Definition: RemoteQueue.h:146
virtual const char * what() const
virtual bool IsTerminated()=0
Key_t GetKey() const
Definition: RemoteQueue.h:74
void ShutdownWriter()
Called by the QueueWriter when no more data will be written.
Definition: QueueBase.cc:260
PthreadMutex lock
Definition: QueueBase.h:289
Generic file handle could be a file, or a socket or a device.
Definition: FileHandle.h:41
shared_ptr< Sync::Future< int > > ConnectWriter(Key_t writerkey)
void Debug(const char *fmt,...)
Definition: Logger.cc:186

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::PacketHandler::FirePacket ( const Packet packet)
inherited

Definition at line 31 of file PacketHeader.cc.

References ASSERT, CPN::PacketHandler::D4RTagPacket(), CPN::PacketHandler::DequeuePacket(), CPN::PacketHandler::EndOfReadPacket(), CPN::PacketHandler::EndOfWritePacket(), CPN::PacketHandler::EnqueuePacket(), CPN::PacketHandler::FlushPacket(), CPN::PacketHandler::GrowPacket(), CPN::PacketHandler::IDReaderPacket(), CPN::PacketHandler::IDWriterPacket(), CPN::PACKET_D4RTAG, CPN::PACKET_DEQUEUE, CPN::PACKET_ENDOFREAD, CPN::PACKET_ENDOFWRITE, CPN::PACKET_ENQUEUE, CPN::PACKET_FLUSH, CPN::PACKET_GROW, CPN::PACKET_ID_READER, CPN::PACKET_ID_WRITER, CPN::PACKET_READBLOCK, CPN::PACKET_RESET, CPN::PACKET_WRITEBLOCK, CPN::PacketHandler::ReadBlockPacket(), CPN::PacketHandler::ResetPacket(), CPN::Packet::Type(), CPN::Packet::Valid(), and CPN::PacketHandler::WriteBlockPacket().

Referenced by CPN::PacketDecoder::ReleaseDecoderBytes().

31  {
32  ASSERT(packet.Valid(), "Invalid packet");
33  switch (packet.Type()) {
34  case PACKET_ENQUEUE:
35  EnqueuePacket(packet);
36  break;
37  case PACKET_DEQUEUE:
38  DequeuePacket(packet);
39  break;
40  case PACKET_READBLOCK:
41  ReadBlockPacket(packet);
42  break;
43  case PACKET_WRITEBLOCK:
44  WriteBlockPacket(packet);
45  break;
46  case PACKET_ENDOFWRITE:
47  EndOfWritePacket(packet);
48  break;
49  case PACKET_ENDOFREAD:
50  EndOfReadPacket(packet);
51  break;
52  case PACKET_GROW:
53  GrowPacket(packet);
54  break;
55  case PACKET_D4RTAG:
56  D4RTagPacket(packet);
57  break;
58  case PACKET_FLUSH:
59  FlushPacket(packet);
60  break;
61  case PACKET_RESET:
62  ResetPacket(packet);
63  break;
64  case PACKET_ID_READER:
65  IDReaderPacket(packet);
66  break;
67  case PACKET_ID_WRITER:
68  IDWriterPacket(packet);
69  break;
70  default:
71  ASSERT(false, "Invalid packet type.");
72  }
73  }
virtual void IDWriterPacket(const Packet &packet)=0
virtual void WriteBlockPacket(const Packet &packet)=0
virtual void D4RTagPacket(const Packet &packet)=0
virtual void FlushPacket(const Packet &packet)=0
virtual void EnqueuePacket(const Packet &packet)=0
virtual void IDReaderPacket(const Packet &packet)=0
virtual void ReadBlockPacket(const Packet &packet)=0
virtual void EndOfReadPacket(const Packet &packet)=0
virtual void GrowPacket(const Packet &packet)=0
virtual void ResetPacket(const Packet &packet)=0
virtual void EndOfWritePacket(const Packet &packet)=0
virtual void DequeuePacket(const Packet &packet)=0
#define ASSERT(exp,...)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::QueueBase::Flush ( )
inherited

Definition at line 186 of file QueueBase.cc.

References CPN::QueueBase::InternalFlush().

186  {
187  AutoLock<const QueueBase> al(*this);
188  InternalFlush();
189  }
virtual void InternalFlush()
Definition: QueueBase.cc:265

+ Here is the call graph for this function:

bool CPN::QueueBase::Flushed ( ) const
inherited

Definition at line 191 of file QueueBase.cc.

References CPN::QueueBase::flushed.

191  {
192  AutoLock<const QueueBase> al(*this);
193  return flushed;
194  }
void CPN::RemoteQueue::FlushPacket ( const Packet packet)
privatevirtual

Implements CPN::PacketHandler.

Definition at line 489 of file RemoteQueue.cc.

References CPN::QueueBase::flushed, FUNC_TRACE, CPN::QueueBase::logger, Signal(), and UpdateClock().

489  {
490  UpdateClock(packet);
492  flushed = true;
493  Signal();
494  }
Logger logger
Definition: QueueBase.h:288
void UpdateClock(const Packet &packet)
Definition: RemoteQueue.cc:805
#define FUNC_TRACE(logger)
Definition: RemoteQueue.cc:43

+ Here is the call graph for this function:

unsigned CPN::QueueBase::Freespace ( ) const
inherited
Returns
the number of bytes we can add to the queue without blocking.

Definition at line 215 of file QueueBase.cc.

References CPN::QueueBase::UnlockedFreespace().

215  {
216  AutoLock<const QueueBase> al(*this);
217  return UnlockedFreespace();
218  }
virtual unsigned UnlockedFreespace() const =0

+ Here is the call graph for this function:

bool CPN::QueueBase::Full ( ) const
inherited
Returns
true if the queue is full, false otherwise

Definition at line 220 of file QueueBase.cc.

References CPN::QueueBase::UnlockedFull().

220  {
221  AutoLock<const QueueBase> al(*this);
222  return UnlockedFull();
223  }
virtual bool UnlockedFull() const =0

+ Here is the call graph for this function:

const std::string& CPN::QueueBase::GetDatatype ( ) const
inlineinherited
Returns
the datatype name associated with this queue

Definition at line 213 of file QueueBase.h.

213 { return datatype; }
std::string datatype
Definition: QueueBase.h:291
void * CPN::PacketDecoder::GetDecoderBytes ( unsigned &  amount)
inherited

Definition at line 37 of file PacketDecoder.cc.

References ASSERT, CPN::PacketDecoder::header, CPN::Packet::header, and CPN::PacketDecoder::numbytes.

Referenced by Read().

37  {
38  char *ptr = reinterpret_cast<char*>(&header.header);
39  amount = sizeof(header.header) - numbytes;
40  ASSERT(amount != 0, "Zero decoder bytes");
41  return ptr + numbytes;
42  }
PacketHeader header
Definition: PacketHeader.h:148
#define ASSERT(exp,...)

+ Here is the caller graph for this function:

Key_t CPN::RemoteQueue::GetKey ( ) const
inlinevirtual

Implements CPN::RemoteQueueBase.

Definition at line 74 of file RemoteQueue.h.

References mode, READ, CPN::QueueBase::readerkey, and CPN::QueueBase::writerkey.

Referenced by FileThreadEntryPoint().

74 { return mode == READ ? readerkey : writerkey; }
const Key_t readerkey
Definition: QueueBase.h:273
const Mode_t mode
Definition: RemoteQueue.h:148
const Key_t writerkey
Definition: QueueBase.h:274

+ Here is the caller graph for this function:

Mode_t CPN::RemoteQueue::GetMode ( ) const
inline

Definition at line 73 of file RemoteQueue.h.

References mode.

73 { return mode; }
const Mode_t mode
Definition: RemoteQueue.h:148
const void * CPN::QueueBase::GetRawDequeuePtr ( unsigned  thresh,
unsigned  chan 
)
inherited

Get a pointer to a buffer containing elements.

Note
access to the memory locations pointed to by the returned pointer after Dequeue has been called is undefined.
Parameters
threshthe number of bytes to get
chanthe channel to use
Returns
A void* to a block of memory containing thresh bytes or 0 if there are not thresh bytes available.

Definition at line 63 of file QueueBase.cc.

References ASSERT, CPN::KernelBase::CheckTerminated(), CPN::QueueBase::dequeuethresh, CPN::QueueBase::flushed, CPN::KernelBase::GrowQueueMaxThreshold(), CPN::QueueBase::indequeue, CPN::QueueBase::InternalGetRawDequeuePtr(), CPN::QueueBase::kernel, CPN::QueueBase::readerkey, CPN::QueueBase::readrequest, CPN::QueueBase::readshutdown, CPN::QueueBase::Signal(), CPN::QueueBase::UnlockedGrow(), CPN::QueueBase::UnlockedMaxThreshold(), CPN::QueueBase::WaitForData(), CPN::QueueBase::WriteBlocked(), CPN::QueueBase::writerequest, and CPN::QueueBase::writeshutdown.

Referenced by CPN::QueueBase::Dequeue(), and CPN::QueueBase::RawDequeue().

63  {
65  AutoLock<QueueBase> al(*this);
66  if (indequeue) { ASSERT(dequeuethresh >= thresh); }
67  else { dequeuethresh = thresh; }
68  while (true) {
69  const void *ptr = InternalGetRawDequeuePtr(thresh, chan);
70  if (ptr ||
72  || flushed) {
73  if (ptr) { indequeue = true; }
74  return ptr;
75  }
76  if (readshutdown) { throw BrokenQueueException(readerkey); }
77  if (thresh > UnlockedMaxThreshold() && kernel->GrowQueueMaxThreshold()) {
78  //printf("Grow(%u, %u)\n", 2*thresh, thresh);
79  UnlockedGrow(2*thresh, thresh);
80  Signal();
81  } else if (WriteBlocked() && kernel->GrowQueueMaxThreshold()) {
82  UnlockedGrow(writerequest + thresh, thresh);
83  Signal();
84  } else {
85  readrequest = thresh;
86  WaitForData();
87  readrequest = 0;
88  }
89  }
90  }
unsigned readrequest
Definition: QueueBase.h:279
virtual unsigned UnlockedMaxThreshold() const =0
virtual void WaitForData()
Definition: QueueBase.cc:326
unsigned writerequest
Definition: QueueBase.h:280
virtual bool WriteBlocked()
Definition: QueueBase.cc:357
virtual bool GrowQueueMaxThreshold()=0
virtual void Signal()
Definition: QueueBase.h:247
unsigned dequeuethresh
Definition: QueueBase.h:282
virtual void CheckTerminated()=0
const Key_t readerkey
Definition: QueueBase.h:273
KernelBase * kernel
Definition: QueueBase.h:286
bool writeshutdown
Definition: QueueBase.h:276
virtual const void * InternalGetRawDequeuePtr(unsigned thresh, unsigned chan)=0
bool readshutdown
Definition: QueueBase.h:275
virtual void UnlockedGrow(unsigned queueLen, unsigned maxThresh)=0
#define ASSERT(exp,...)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void * CPN::QueueBase::GetRawEnqueuePtr ( unsigned  thresh,
unsigned  chan 
)
inherited

Return a pointer to a buffer of memory that contains thresh entries that we can write into.

Note
A call to this function without an accompanying call to Enqueue is undefined.
An access to the memory locations defined by the return value is undefined after a call to Enqueue.
Parameters
threshthe number bytes we need in the returned buffer.
chanthe channel to use
Returns
void* to the memory buffer, 0 if not enough space available

Definition at line 121 of file QueueBase.cc.

References ASSERT, CPN::KernelBase::CheckTerminated(), CPN::QueueBase::enqueuethresh, CPN::QueueBase::flushed, CPN::KernelBase::GrowQueueMaxThreshold(), CPN::QueueBase::inenqueue, CPN::QueueBase::InternalGetRawEnqueuePtr(), CPN::QueueBase::kernel, CPN::QueueBase::ReadBlocked(), CPN::QueueBase::readrequest, CPN::QueueBase::readshutdown, CPN::QueueBase::Signal(), CPN::QueueBase::UnlockedGrow(), CPN::QueueBase::UnlockedMaxThreshold(), CPN::QueueBase::useD4R, CPN::QueueBase::Wait(), CPN::QueueBase::WaitForFreespace(), D4R::QueueBase::WriteBlock(), CPN::QueueBase::writerequest, CPN::QueueBase::writerkey, and CPN::QueueBase::writeshutdown.

Referenced by CPN::ThresholdQueue::TQImpl::Grow(), and CPN::QueueBase::RawEnqueue().

121  {
123  AutoLock<QueueBase> al(*this);
124  // Only the enqueuer may call flush and reset will clear all state as if we
125  // had not done anything yet, so block here
126  while (flushed) {
127  if (readshutdown || writeshutdown) { throw BrokenQueueException(writerkey); }
128  if (useD4R) {
129  WriteBlock(-1);
130  } else {
131  Wait();
132  }
133  }
134  if (inenqueue) { ASSERT(enqueuethresh >= thresh); }
135  else { enqueuethresh = thresh; }
136  bool grown = false;
137  while (true) {
138  void *ptr = InternalGetRawEnqueuePtr(thresh, chan);
139  if (ptr) {
140  inenqueue = true;
141  return ptr;
142  }
143  if (readshutdown || writeshutdown) { throw BrokenQueueException(writerkey); }
144  if (thresh > UnlockedMaxThreshold() && kernel->GrowQueueMaxThreshold()) {
145  //printf("Grow(%u, %u)\n", 2*thresh, thresh);
146  UnlockedGrow(2*thresh, thresh);
147  Signal();
148  } else if (!grown && ReadBlocked() && kernel->GrowQueueMaxThreshold()) {
149  UnlockedGrow(readrequest + thresh, thresh);
150  Signal();
151  grown = true;
152  } else {
153  writerequest = thresh;
155  writerequest = 0;
156  }
157  }
158  }
virtual bool ReadBlocked()
Definition: QueueBase.cc:336
unsigned readrequest
Definition: QueueBase.h:279
virtual unsigned UnlockedMaxThreshold() const =0
virtual void WaitForFreespace()
Definition: QueueBase.cc:347
virtual void Wait()
Definition: QueueBase.h:246
unsigned writerequest
Definition: QueueBase.h:280
virtual bool GrowQueueMaxThreshold()=0
void WriteBlock(unsigned qsize)
Definition: D4RQueue.cc:106
virtual void Signal()
Definition: QueueBase.h:247
virtual void CheckTerminated()=0
KernelBase * kernel
Definition: QueueBase.h:286
bool writeshutdown
Definition: QueueBase.h:276
virtual void * InternalGetRawEnqueuePtr(unsigned thresh, unsigned chan)=0
bool readshutdown
Definition: QueueBase.h:275
unsigned enqueuethresh
Definition: QueueBase.h:281
virtual void UnlockedGrow(unsigned queueLen, unsigned maxThresh)=0
const Key_t writerkey
Definition: QueueBase.h:274
#define ASSERT(exp,...)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Key_t CPN::QueueBase::GetReaderKey ( ) const
inlineinherited
Returns
the reader key associated with this queue

Definition at line 211 of file QueueBase.h.

211 { return readerkey; }
const Key_t readerkey
Definition: QueueBase.h:273
std::string CPN::RemoteQueue::GetState ( )
private

For debugging.

Definition at line 867 of file RemoteQueue.cc.

References bytecount, ClockString(), dead, mode, pendingBlock, pendingD4RTag, pendingGrow, READ, readerlength, CPN::QueueBase::readrequest, CPN::QueueBase::readshutdown, sentEnd, UnlockedCount(), CPN::ThresholdQueue::UnlockedFreespace(), CPN::ThresholdQueue::UnlockedMaxThreshold(), CPN::ThresholdQueue::UnlockedNumDequeued(), CPN::ThresholdQueue::UnlockedNumEnqueued(), UnlockedQueueLength(), CPN::QueueBase::writerequest, writerlength, and CPN::QueueBase::writeshutdown.

867  {
868  std::ostringstream oss;
869  oss << "s: " << UnlockedQueueLength() << ",mt: " << UnlockedMaxThreshold() << ",c: " << UnlockedCount()
870  << ",f: " << UnlockedFreespace() << ",rr: " << readrequest << ",wr: " << writerequest
871  << ",te: " << UnlockedNumEnqueued() << ",td: " << UnlockedNumDequeued()
872  << ",M: " << (mode == READ ? "r" : "w") << ",rl: " << readerlength
873  << ",wl: " << writerlength << ",c: " << ClockString() << ",bc: "
874  << bytecount << ",pb: " << pendingBlock << ",se: " << sentEnd
875  << ",pg: " << pendingGrow << ",pd4r: " << pendingD4RTag
876  << ",d: " << dead;
877  if (readshutdown) {
878  oss << ",readshutdown";
879  }
880  if (writeshutdown) {
881  oss << ",writeshutdown";
882  }
883  return oss.str();
884  }
unsigned readrequest
Definition: QueueBase.h:279
virtual unsigned UnlockedMaxThreshold() const
unsigned readerlength
Definition: RemoteQueue.h:159
unsigned writerlength
Definition: RemoteQueue.h:160
unsigned writerequest
Definition: QueueBase.h:280
unsigned UnlockedNumDequeued() const
std::string ClockString() const
Definition: RemoteQueue.cc:819
unsigned UnlockedCount() const
Definition: RemoteQueue.cc:125
bool writeshutdown
Definition: QueueBase.h:276
virtual unsigned UnlockedFreespace() const
const Mode_t mode
Definition: RemoteQueue.h:148
bool readshutdown
Definition: QueueBase.h:275
unsigned UnlockedQueueLength() const
Definition: RemoteQueue.cc:141
unsigned UnlockedNumEnqueued() const
unsigned bytecount
Definition: RemoteQueue.h:168

+ Here is the call graph for this function:

Key_t CPN::QueueBase::GetWriterKey ( ) const
inlineinherited
Returns
the writer key associated with this queue

Definition at line 209 of file QueueBase.h.

209 { return writerkey; }
const Key_t writerkey
Definition: QueueBase.h:274
void CPN::QueueBase::Grow ( unsigned  queueLen,
unsigned  maxThresh 
)
inherited

Ensure that this queue has at least queueLen bytes of space and can support at least maxThresh as the maxThreshold the new queue length will be max(queueLen, QueueLength()) and the new max threshold will be max(maxThresh, MaxThreshold())

Note
that the caller must guarantee that both an enqueue and dequeue are not both outstanding. Otherwise this will fail.
Parameters
queueLenthe next queue length
maxThreshthe next max threshold

Definition at line 245 of file QueueBase.cc.

References CPN::QueueBase::UnlockedGrow().

245  {
246  AutoLock<QueueBase> al(*this);
247  UnlockedGrow(queueLen, maxThresh);
248  }
virtual void UnlockedGrow(unsigned queueLen, unsigned maxThresh)=0

+ Here is the call graph for this function:

void CPN::RemoteQueue::GrowPacket ( const Packet packet)
privatevirtual

Implements CPN::PacketHandler.

Definition at line 438 of file RemoteQueue.cc.

References alpha, FUNC_TRACE, CPN::QueueBase::logger, ThresholdQueueBase::MaxThreshold(), CPN::Packet::MaxThreshold(), mode, CPN::ThresholdQueue::queue, CPN::QueueBase::QueueLength(), CPN::Packet::QueueSize(), READ, readerlength, CPN::ThresholdQueue::UnlockedGrow(), UpdateClock(), WRITE, and writerlength.

438  {
439  UpdateClock(packet);
441  const unsigned queueLen = packet.QueueSize();
442  const unsigned maxthresh = std::max<unsigned>(queue->MaxThreshold(), packet.MaxThreshold());
443  readerlength = QueueLength(queueLen, maxthresh, alpha, READ);
444  writerlength = QueueLength(queueLen, maxthresh, alpha, WRITE);
445  const unsigned newlen = (mode == WRITE ? writerlength : readerlength);
446  ThresholdQueue::UnlockedGrow(newlen, maxthresh);
447  }
virtual void UnlockedGrow(unsigned queueLen, unsigned maxThresh)
unsigned QueueLength() const
Definition: QueueBase.cc:230
unsigned readerlength
Definition: RemoteQueue.h:159
Logger logger
Definition: QueueBase.h:288
void UpdateClock(const Packet &packet)
Definition: RemoteQueue.cc:805
unsigned writerlength
Definition: RemoteQueue.h:160
ulong MaxThreshold(void) const
#define FUNC_TRACE(logger)
Definition: RemoteQueue.cc:43
const Mode_t mode
Definition: RemoteQueue.h:148
const double alpha
Definition: RemoteQueue.h:149

+ Here is the call graph for this function:

void CPN::RemoteQueue::HandleError ( const ErrnoException e)
private

Definition at line 825 of file RemoteQueue.cc.

References actionCond, PthreadCondition::Broadcast(), ClockString(), FileHandle::Close(), dead, ErrnoException::Error(), Logger::Error(), CPN::QueueBase::logger, CPN::QueueBase::readshutdown, Signal(), sock, ErrnoException::what(), and CPN::QueueBase::writeshutdown.

Referenced by FileThreadEntryPoint(), and InternalCheckStatus().

825  {
826  if (readshutdown || writeshutdown) {
827  Signal();
828  }
829  switch (e.Error()) {
830  case EPIPE:
831  case EBADF:
832  case ECONNRESET:
833  try {
834  sock.Close();
835  } catch (const ErrnoException &e) {}
836  dead = true;
838  break;
839  default:
840  {
841  std::string clockstr = ClockString();
842  logger.Error("Exception in RemoteQueue rethrowing (c: %s e: %d): %s",
843  clockstr.c_str(), e.Error(), e.what());
844  }
845  throw;
846  break;
847  }
848  }
PthreadCondition & Broadcast(void)
Logger logger
Definition: QueueBase.h:288
void Error(const char *fmt,...)
Definition: Logger.cc:159
SocketHandle sock
Definition: RemoteQueue.h:153
void Close()
Close the file and reset the internal state.
Definition: FileHandle.cc:146
std::string ClockString() const
Definition: RemoteQueue.cc:819
bool writeshutdown
Definition: QueueBase.h:276
PthreadCondition actionCond
Definition: RemoteQueue.h:146
virtual const char * what() const
bool readshutdown
Definition: QueueBase.h:275
int Error() const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::RemoteQueue::IDReaderPacket ( const Packet packet)
privatevirtual

Implements CPN::PacketHandler.

Definition at line 532 of file RemoteQueue.cc.

References ASSERT.

532  {
533  ASSERT(false, "Unexpected packet");
534  }
#define ASSERT(exp,...)
void CPN::RemoteQueue::IDWriterPacket ( const Packet packet)
privatevirtual

Implements CPN::PacketHandler.

Definition at line 536 of file RemoteQueue.cc.

References ASSERT.

536  {
537  ASSERT(false, "Unexpected packet");
538  }
#define ASSERT(exp,...)
void CPN::RemoteQueue::InternalCheckStatus ( )
private

Definition at line 676 of file RemoteQueue.cc.

References actionCond, ASSERT, PthreadCondition::Broadcast(), bytecount, ClockString(), FileHandle::Close(), FileHandle::Closed(), ThresholdQueueBase::Count(), dead, Logger::DEBUG, Logger::Debug(), ThresholdQueueBase::Empty(), FileHandle::Eof(), Logger::Error(), ThresholdQueueBase::Freespace(), HandleError(), CPN::KernelBase::IsTerminated(), CPN::QueueBase::kernel, CPN::QueueBase::logger, Logger::LogLevel(), LogState(), mode, pendingBlock, pendingD4RTag, pendingFlush, pendingGrow, pendingReset, CPN::ThresholdQueue::queue, Read(), D4R::QueueBase::reader, readerlength, CPN::QueueBase::readrequest, CPN::QueueBase::readshutdown, SendD4RTagPacket(), SendDequeuePacket(), SendEndOfReadPacket(), SendEndOfWritePacket(), SendEnqueuePacket(), SendFlushPacket(), SendGrowPacket(), SendReadBlockPacket(), SendResetPacket(), SendWriteBlockPacket(), sentEnd, sock, UnlockedShutdown(), CPN::QueueBase::UnlockedShutdownReader(), CPN::QueueBase::UnlockedShutdownWriter(), WRITE, D4R::QueueBase::writer, CPN::QueueBase::writerequest, and CPN::QueueBase::writeshutdown.

Referenced by ActionThreadEntryPoint().

676  {
677  if (sock.Closed()) {
678  return;
679  }
680 
681  bool terminated = kernel->IsTerminated();
682  if (sock.Eof() && !(readshutdown || writeshutdown)) {
683  if (terminated) {
684  if (!sock.Closed()) {
685  sock.Close();
686  }
688  return;
689  }
690  // We /should/ never get here, output copious amounts of internal information.
691  std::string clockstr = ClockString();
692  logger.Error("Eof detected but not shutdown! (c: %s)", clockstr.c_str());
693  LogState();
694  ASSERT(false, "EOF detected but not shutdown! (c: %s)", clockstr.c_str());
695  }
696 
697  try {
698  Read();
699 
700  if (pendingGrow && !sentEnd) {
701  SendGrowPacket();
702  pendingGrow = false;
703  }
704 
705  if (mode == WRITE) {
706  if (!sentEnd) {
707  if (terminated) {
709  }
710  // Write as much as we can
711  while (!queue->Empty() && (bytecount < readerlength) && !dead) {
713  }
714  if (dead) return;
715  if (pendingFlush && queue->Empty()) {
716  SendFlushPacket();
717  }
718  // A pending block is present
719  if (pendingBlock) {
720  // If we have received dequeue packets
721  // sense the block happened don't bother sending anything
722  if (writerequest > queue->Freespace()) {
724  }
725  pendingBlock = false;
726  }
727 
728  if (pendingD4RTag) {
729  if (writer && !writeshutdown) {
731  pendingD4RTag = false;
732  }
733  }
734  }
735  if (writeshutdown) {
736  if (!sentEnd && (queue->Empty() || terminated)) {
738  sentEnd = true;
739  }
740  }
741 
742  if (readshutdown) {
743  if (!sentEnd) {
745  sentEnd = true;
746  }
747  if (logger.LogLevel() <= Logger::DEBUG) {
748  std::string clockstr = ClockString();
749  logger.Debug("Closing the socket (c: %s)", clockstr.c_str());
750  }
751  sock.Close();
752  dead = true;
754  }
755  } else {
756  if (!sentEnd) {
757  if (terminated) {
759  }
760  if (pendingReset) {
761  SendResetPacket();
762  }
763  // If some bytes have been read from the queue
764  if (bytecount > queue->Count()) {
766  }
767 
768  if (pendingBlock) {
769  // May have received enqueue packets...
770  if (readrequest > queue->Count()) {
772  }
773  pendingBlock = false;
774  }
775  if (pendingD4RTag) {
776  if (reader && !readshutdown) {
778  pendingD4RTag = false;
779  }
780  }
781  if (readshutdown) {
783  sentEnd = true;
784  }
785  }
786  if (writeshutdown) {
787  if (!sentEnd) {
789  sentEnd = true;
790  }
791  if (logger.LogLevel() <= Logger::DEBUG) {
792  std::string clockstr = ClockString();
793  logger.Debug("Closing the socket (c: %s)", clockstr.c_str());
794  }
795  sock.Close();
796  dead = true;
798  }
799  }
800  } catch (const ErrnoException &e) {
801  HandleError(e);
802  }
803  }
void SendD4RTagPacket()
Definition: RemoteQueue.cc:475
virtual void UnlockedShutdownWriter()
Definition: QueueBase.cc:311
shared_ptr< Node > reader
Definition: D4RQueue.h:122
unsigned readrequest
Definition: QueueBase.h:279
unsigned readerlength
Definition: RemoteQueue.h:159
void SendEnqueuePacket()
Definition: RemoteQueue.cc:294
PthreadCondition & Broadcast(void)
Logger logger
Definition: QueueBase.h:288
void SendEndOfWritePacket()
Definition: RemoteQueue.cc:404
void SendEndOfReadPacket()
Definition: RemoteQueue.cc:425
void SendWriteBlockPacket()
Definition: RemoteQueue.cc:388
void Error(const char *fmt,...)
Definition: Logger.cc:159
unsigned writerequest
Definition: QueueBase.h:280
SocketHandle sock
Definition: RemoteQueue.h:153
ulong Count(void) const
shared_ptr< Node > writer
Definition: D4RQueue.h:123
virtual void UnlockedShutdownReader()
Definition: QueueBase.cc:255
void SendDequeuePacket()
Definition: RemoteQueue.cc:344
void UnlockedShutdown()
Definition: RemoteQueue.cc:118
bool Closed() const
Definition: FileHandle.h:128
void SendReadBlockPacket()
Definition: RemoteQueue.cc:367
void HandleError(const ErrnoException &e)
Definition: RemoteQueue.cc:825
void Close()
Close the file and reset the internal state.
Definition: FileHandle.cc:146
ulong Freespace(void) const
KernelBase * kernel
Definition: QueueBase.h:286
int LogLevel() const
Definition: Logger.cc:58
void LogState()
For debug ONLY!
Definition: RemoteQueue.cc:886
std::string ClockString() const
Definition: RemoteQueue.cc:819
bool writeshutdown
Definition: QueueBase.h:276
void SendResetPacket()
Definition: RemoteQueue.cc:524
const Mode_t mode
Definition: RemoteQueue.h:148
PthreadCondition actionCond
Definition: RemoteQueue.h:146
bool readshutdown
Definition: QueueBase.h:275
bool Empty(void) const
virtual bool IsTerminated()=0
unsigned bytecount
Definition: RemoteQueue.h:168
bool Eof() const
Definition: FileHandle.h:115
#define ASSERT(exp,...)
void SendFlushPacket()
Definition: RemoteQueue.cc:496
void Debug(const char *fmt,...)
Definition: Logger.cc:186

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::RemoteQueue::InternalDequeue ( unsigned  count)
privatevirtual

Reimplemented from CPN::ThresholdQueue.

Definition at line 191 of file RemoteQueue.cc.

References ASSERT, FUNC_TRACE, CPN::ThresholdQueue::InternalDequeue(), CPN::QueueBase::logger, mode, READ, and Signal().

191  {
192  ASSERT(mode == READ);
195  Signal();
196  }
virtual void InternalDequeue(unsigned count)
Logger logger
Definition: QueueBase.h:288
#define FUNC_TRACE(logger)
Definition: RemoteQueue.cc:43
const Mode_t mode
Definition: RemoteQueue.h:148
#define ASSERT(exp,...)

+ Here is the call graph for this function:

void CPN::RemoteQueue::InternalEnqueue ( unsigned  count)
privatevirtual

Reimplemented from CPN::ThresholdQueue.

Definition at line 198 of file RemoteQueue.cc.

References ASSERT, FUNC_TRACE, CPN::ThresholdQueue::InternalEnqueue(), CPN::QueueBase::logger, mode, Signal(), and WRITE.

198  {
199  ASSERT(mode == WRITE);
202  Signal();
203  }
Logger logger
Definition: QueueBase.h:288
#define FUNC_TRACE(logger)
Definition: RemoteQueue.cc:43
virtual void InternalEnqueue(unsigned count)
const Mode_t mode
Definition: RemoteQueue.h:148
#define ASSERT(exp,...)

+ Here is the call graph for this function:

void CPN::RemoteQueue::InternalFlush ( )
privatevirtual

Reimplemented from CPN::QueueBase.

Definition at line 205 of file RemoteQueue.cc.

References ASSERT, CPN::QueueBase::InternalFlush(), mode, pendingFlush, Signal(), and WRITE.

205  {
206  ASSERT(mode == WRITE);
208  pendingFlush = true;
209  Signal();
210  }
virtual void InternalFlush()
Definition: QueueBase.cc:265
const Mode_t mode
Definition: RemoteQueue.h:148
#define ASSERT(exp,...)

+ Here is the call graph for this function:

const void * ThresholdQueue< T >::InternalGetRawDequeuePtr ( unsigned  thresh,
unsigned  chan 
)
protectedvirtualinherited

Implements CPN::QueueBase.

Definition at line 126 of file CPNThresholdQueue.cc.

References ASSERT, CPN::ThresholdQueue::dequeueUseOld, ThresholdQueueBase::GetRawDequeuePtr(), CPN::QueueBase::indequeue, CPN::ThresholdQueue::oldqueue, and CPN::ThresholdQueue::queue.

Referenced by SendEnqueuePacket().

126  {
127  const void *ret = 0;
128  if (dequeueUseOld) {
129  // The ONLY reason this code path should be followed is if the node made a getdequeueptr
130  // then called getdequeueptr again before dequeue when a grow happens inbetween
131  ASSERT(indequeue);
132  ret = oldqueue->GetRawDequeuePtr(thresh, chan);
133  ASSERT(ret);
134  } else {
135  ret = queue->GetRawDequeuePtr(thresh, chan);
136  }
137  return ret;
138  }
const void * GetRawDequeuePtr(ulong dequeueThresh, ulong chan=0) const
#define ASSERT(exp,...)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void * ThresholdQueue< T >::InternalGetRawEnqueuePtr ( unsigned  thresh,
unsigned  chan 
)
protectedvirtualinherited

Implements CPN::QueueBase.

Definition at line 57 of file CPNThresholdQueue.cc.

References ASSERT, CPN::ThresholdQueue::enqueueUseOld, ThresholdQueueBase::GetRawEnqueuePtr(), CPN::QueueBase::inenqueue, CPN::ThresholdQueue::oldqueue, and CPN::ThresholdQueue::queue.

Referenced by EnqueuePacket().

57  {
58  void *ret = 0;
59  if (enqueueUseOld) {
61  ret = oldqueue->GetRawEnqueuePtr(thresh, chan);
62  ASSERT(ret);
63  } else {
64  ret = queue->GetRawEnqueuePtr(thresh, chan);
65  }
66  return ret;
67  }
void * GetRawEnqueuePtr(ulong enqueueThresh, ulong chan=0) const
#define ASSERT(exp,...)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::RemoteQueue::InternalReset ( )
privatevirtual

Reimplemented from CPN::ThresholdQueue.

Definition at line 212 of file RemoteQueue.cc.

References ASSERT, CPN::ThresholdQueue::InternalReset(), mode, pendingReset, READ, and Signal().

212  {
213  ASSERT(mode == READ);
215  pendingReset = true;
216  Signal();
217  }
const Mode_t mode
Definition: RemoteQueue.h:148
virtual void InternalReset()
#define ASSERT(exp,...)

+ Here is the call graph for this function:

bool CPN::QueueBase::IsReaderShutdown ( )
inherited

Definition at line 390 of file QueueBase.cc.

References CPN::QueueBase::readshutdown.

390  {
391  AutoLock<QueueBase> al(*this);
392  return readshutdown;
393  }
bool readshutdown
Definition: QueueBase.h:275
bool CPN::QueueBase::IsWriterShutdown ( )
inherited

Definition at line 395 of file QueueBase.cc.

References CPN::QueueBase::writeshutdown.

395  {
396  AutoLock<QueueBase> al(*this);
397  return writeshutdown;
398  }
bool writeshutdown
Definition: QueueBase.h:276
void CPN::QueueBase::Lock ( ) const
inlinevirtualinherited

These functions are to access the lock for the queue.

Implements D4R::QueueBase.

Definition at line 221 of file QueueBase.h.

References lock, and PthreadMutex::Lock().

221 { lock.Lock(); }
PthreadMutex & Lock(void)
Definition: PthreadMutex.h:49
PthreadMutex lock
Definition: QueueBase.h:289

+ Here is the call graph for this function:

void CPN::RemoteQueue::LogState ( )
virtual

For debug ONLY!

Implements CPN::RemoteQueueBase.

Definition at line 886 of file RemoteQueue.cc.

References actionThread, CPN::BoolString(), bytecount, ClockString(), FileHandle::Closed(), dead, Logger::Error(), fileThread, CPN::QueueBase::logger, CPN::QueueBase::LogState(), mode, pendingBlock, pendingD4RTag, pendingFlush, pendingGrow, pendingReset, READ, readerlength, sentEnd, sock, and writerlength.

Referenced by InternalCheckStatus().

886  {
888  std::string clockstr = ClockString();
889  logger.Error("Mode: %s, Readerlength: %u, Writerlength %u, Clock: %s, bytecount: %u",
890  mode == READ ? "read" : "write", readerlength, writerlength, clockstr.c_str(), bytecount);
891  logger.Error("PendingBlock: %s, SentEnd: %s, PendingGrow: %s, PendingD4R: %s, Dead: %s"
892  ", PendingFlush: %s, PendingReset: %s",
895  logger.Error("FileThread id: %llu, Running: %s, ActionThread id: %llu, Running: %s",
896  (unsigned long long)((pthread_t)(*fileThread)), BoolString(fileThread->Running()),
897  (unsigned long long)((pthread_t)(*actionThread)), BoolString(actionThread->Running()));
898  if (sock.Closed()) {
899  logger.Error("Socket closed");
900  }
901  }
const char * BoolString(bool tf)
Definition: RemoteQueue.cc:859
unsigned readerlength
Definition: RemoteQueue.h:159
Logger logger
Definition: QueueBase.h:288
unsigned writerlength
Definition: RemoteQueue.h:160
void Error(const char *fmt,...)
Definition: Logger.cc:159
SocketHandle sock
Definition: RemoteQueue.h:153
auto_ptr< Pthread > fileThread
Definition: RemoteQueue.h:142
bool Closed() const
Definition: FileHandle.h:128
auto_ptr< Pthread > actionThread
Definition: RemoteQueue.h:143
std::string ClockString() const
Definition: RemoteQueue.cc:819
const Mode_t mode
Definition: RemoteQueue.h:148
virtual void LogState()
For debugging ONLY!! Otherwise non deterministic output.
Definition: QueueBase.cc:400
unsigned bytecount
Definition: RemoteQueue.h:168

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

unsigned CPN::QueueBase::MaxThreshold ( ) const
inherited
Returns
the maximum threshold this queue supports in bytes

Definition at line 225 of file QueueBase.cc.

References CPN::QueueBase::UnlockedMaxThreshold().

Referenced by CPN::ThresholdQueue::TQImpl::Grow(), and CPN::ThresholdQueue::UnlockedGrow().

225  {
226  AutoLock<const QueueBase> al(*this);
227  return UnlockedMaxThreshold();
228  }
virtual unsigned UnlockedMaxThreshold() const =0

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::QueueBase::NotifyData ( )
protectedinherited

Definition at line 341 of file QueueBase.cc.

References PthreadCondition::Broadcast(), CPN::QueueBase::cond, CPN::QueueBase::readrequest, and CPN::QueueBase::UnlockedCount().

Referenced by CPN::QueueBase::Enqueue(), and EnqueuePacket().

341  {
342  if (UnlockedCount() >= readrequest) {
343  cond.Broadcast();
344  }
345  }
unsigned readrequest
Definition: QueueBase.h:279
PthreadCondition & Broadcast(void)
virtual unsigned UnlockedCount() const =0
PthreadCondition cond
Definition: QueueBase.h:290

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::QueueBase::NotifyFreespace ( )
protectedinherited

Definition at line 362 of file QueueBase.cc.

References PthreadCondition::Broadcast(), CPN::QueueBase::cond, CPN::QueueBase::UnlockedFreespace(), and CPN::QueueBase::writerequest.

Referenced by CPN::QueueBase::Dequeue(), and SendEnqueuePacket().

362  {
363  if (UnlockedFreespace() >= writerequest) {
364  cond.Broadcast();
365  }
366  }
PthreadCondition & Broadcast(void)
unsigned writerequest
Definition: QueueBase.h:280
virtual unsigned UnlockedFreespace() const =0
PthreadCondition cond
Definition: QueueBase.h:290

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::QueueBase::NotifyTerminate ( )
inherited

Used to tell any waiting threads that the network is terminating.

Definition at line 368 of file QueueBase.cc.

References PthreadCondition::Broadcast(), and CPN::QueueBase::cond.

368  {
369  AutoLock<QueueBase> al(*this);
370  cond.Broadcast();
371  }
PthreadCondition & Broadcast(void)
PthreadCondition cond
Definition: QueueBase.h:290

+ Here is the call graph for this function:

unsigned CPN::PacketDecoder::NumBytes ( ) const
inlineinherited

Definition at line 48 of file PacketDecoder.h.

48 { return numbytes; }
unsigned CPN::QueueBase::NumChannels ( ) const
inherited
Returns
the number of channels supported by this queue.

Definition at line 200 of file QueueBase.cc.

References CPN::QueueBase::UnlockedNumChannels().

200  {
201  AutoLock<const QueueBase> al(*this);
202  return UnlockedNumChannels();
203  }
virtual unsigned UnlockedNumChannels() const =0

+ Here is the call graph for this function:

unsigned CPN::QueueBase::NumDequeued ( ) const
inherited

Definition at line 321 of file QueueBase.cc.

References CPN::QueueBase::UnlockedNumDequeued().

321  {
322  AutoLock<const QueueBase> al(*this);
323  return UnlockedNumDequeued();
324  }
virtual unsigned UnlockedNumDequeued() const =0

+ Here is the call graph for this function:

unsigned CPN::QueueBase::NumEnqueued ( ) const
inherited

Definition at line 316 of file QueueBase.cc.

References CPN::QueueBase::UnlockedNumEnqueued().

316  {
317  AutoLock<const QueueBase> al(*this);
318  return UnlockedNumEnqueued();
319  }
virtual unsigned UnlockedNumEnqueued() const =0

+ Here is the call graph for this function:

unsigned CPN::RemoteQueue::QueueLength ( unsigned  length,
unsigned  maxthresh,
double  alpha,
Mode_t  mode 
)
staticprivate

Definition at line 850 of file RemoteQueue.cc.

References READ.

850  {
851  unsigned writerlen = unsigned(((double)length)*alpha);
852  if (mode == READ) {
853  return std::max<unsigned>(length - writerlen, maxthresh);
854  } else {
855  return std::max<unsigned>(writerlen, maxthresh);
856  }
857  }
const Mode_t mode
Definition: RemoteQueue.h:148
const double alpha
Definition: RemoteQueue.h:149
unsigned CPN::QueueBase::QueueLength ( ) const
inherited
Returns
the maximum number of bytes that can be put in this queue.

Definition at line 230 of file QueueBase.cc.

References CPN::QueueBase::UnlockedQueueLength().

Referenced by CPN::ThresholdQueue::TQImpl::Grow(), GrowPacket(), CPN::ThresholdQueue::UnlockedGrow(), and UnlockedGrow().

230  {
231  AutoLock<const QueueBase> al(*this);
232  return UnlockedQueueLength();
233  }
virtual unsigned UnlockedQueueLength() const =0

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool CPN::QueueBase::RawDequeue ( void *  data,
unsigned  count,
unsigned  numChans,
unsigned  chanStride 
)
inherited

Dequeue data from the queue directly into the memory pointed to by data. This function shall be equivalent to a call to GetRawDequeuePtr then a memcpy and then a call to Dequeue.

Parameters
datapoiner to memory to dequeue to
countthe number of bytes to copy into data
numChansthe number of channels to write to
chanStridethe distance in bytes between the beginning of the channels in data.
Returns
true on success false on failure

Definition at line 102 of file QueueBase.cc.

References ASSERT, CPN::QueueBase::Dequeue(), and CPN::QueueBase::GetRawDequeuePtr().

Referenced by CPN::QueueBase::RawDequeue().

102  {
103  const void *src = GetRawDequeuePtr(count, 0);
104  char *dest = (char*)data;
105  if (!src) { return false; }
106  memcpy(dest, src, count);
107  for (unsigned chan = 1; chan < numChans; ++chan) {
108  src = GetRawDequeuePtr(count, chan);
109  ASSERT(src);
110  dest += chanStride;
111  memcpy(dest, src, count);
112  }
113  Dequeue(count);
114  return true;
115  }
void Dequeue(unsigned count)
Definition: QueueBase.cc:92
const void * GetRawDequeuePtr(unsigned thresh, unsigned chan)
Definition: QueueBase.cc:63
#define ASSERT(exp,...)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool CPN::QueueBase::RawDequeue ( void *  data,
unsigned  count 
)
inherited

A version of RawDequeue to use when there is only 1 channel.

Parameters
datathe data to enqueue
countthe number of bytes to enqueue
Returns
true on success false if we have reached the end of the queue

Definition at line 117 of file QueueBase.cc.

References CPN::QueueBase::RawDequeue().

117  {
118  return RawDequeue(data, count, 1, 0);
119  }
bool RawDequeue(void *data, unsigned count, unsigned numChans, unsigned chanStride)
Definition: QueueBase.cc:102

+ Here is the call graph for this function:

void CPN::QueueBase::RawEnqueue ( const void *  data,
unsigned  count,
unsigned  numChans,
unsigned  chanStride 
)
inherited

This function shall be equivalent to a call to GetRqwEnqueuePtr and a memcpy and then a call to Enqueue

The underlying implementatin may implement ether the GetRawEnqueuePtr and Enqueue or RawEnqueue and then implement the other in terms of the one implemented.

Parameters
datapointer to the memory to enqueue
countthe number of bytes to enqueue
numChansthe number of channels to write to
chanStridethe distance in bytes between the beginning of the channels in data.

Definition at line 169 of file QueueBase.cc.

References CPN::QueueBase::Enqueue(), and CPN::QueueBase::GetRawEnqueuePtr().

Referenced by CPN::QueueBase::RawEnqueue().

169  {
170  void *dest = GetRawEnqueuePtr(count, 0);
171  const char *src = (char*)data;
172  memcpy(dest, src, count);
173  for (unsigned chan = 1; chan < numChans; ++chan) {
174  dest = GetRawEnqueuePtr(count, chan);
175  src += chanStride;
176  memcpy(dest, src, count);
177  }
178  Enqueue(count);
179  }
void * GetRawEnqueuePtr(unsigned thresh, unsigned chan)
Definition: QueueBase.cc:121
void Enqueue(unsigned count)
Definition: QueueBase.cc:160

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::QueueBase::RawEnqueue ( const void *  data,
unsigned  count 
)
inherited

A version of RawEnqueue to use when there is only 1 channel.

Parameters
datapointer to the memory to enqueue
countthe number of bytes to enqueue

Definition at line 196 of file QueueBase.cc.

References CPN::QueueBase::RawEnqueue().

196  {
197  return RawEnqueue(data, count, 1, 0);
198  }
void RawEnqueue(const void *data, unsigned count, unsigned numChans, unsigned chanStride)
Definition: QueueBase.cc:169

+ Here is the call graph for this function:

void CPN::RemoteQueue::Read ( )
private

Definition at line 540 of file RemoteQueue.cc.

References Logger::Debug(), FileHandle::Eof(), CPN::PacketDecoder::GetDecoderBytes(), CPN::QueueBase::logger, FileHandle::Readable(), SocketHandle::Recv(), CPN::PacketDecoder::ReleaseDecoderBytes(), and sock.

Referenced by InternalCheckStatus().

540  {
541  while (sock.Readable()) {
542  unsigned numtoread = 0;
543  void *ptr = PacketDecoder::GetDecoderBytes(numtoread);
544  unsigned numread = sock.Recv(ptr, numtoread, false);
545  if (numread == 0) {
546  if (sock.Eof()) {
547  logger.Debug("Read EOF");
548  }
549  break;
550  } else {
552  }
553  }
554  }
Logger logger
Definition: QueueBase.h:288
unsigned Recv(void *ptr, unsigned len, bool block)
SocketHandle sock
Definition: RemoteQueue.h:153
bool Readable(bool r)
Set that the file is currently readable or not.
Definition: FileHandle.h:86
void ReleaseDecoderBytes(unsigned amount)
bool Eof() const
Definition: FileHandle.h:115
void * GetDecoderBytes(unsigned &amount)
void Debug(const char *fmt,...)
Definition: Logger.cc:186

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void D4R::QueueBase::ReadBlock ( )
protectedinherited

reader ===> writer ReadBlock requires that you already hold the lock and if it is reentrant then a single unlock will release the lock.

Exceptions
D4R::DeadlockException

Definition at line 71 of file D4RQueue.cc.

References D4R::QueueBase::incomm, D4R::QueueBase::ReadBlocked(), D4R::QueueBase::reader, D4R::QueueBase::Signal(), D4R::QueueBase::Wait(), D4R::QueueBase::writer, and D4R::QueueBase::writetagchanged.

Referenced by CPN::QueueBase::WaitForData().

71  {
72  if (!writer) {
73  while (ReadBlocked() && !writer) {
74  Wait();
75  }
76  }
77  if (!ReadBlocked()) { return; }
78  writetagchanged = false;
79  try {
80  while (incomm) { Wait(); }
81  ScopeSetter<bool> ss(incomm, true);
82  AutoUnlock<QueueBase> au(*this);
83  reader->Block(writer->GetPublicTag(), -1);
84  } catch (...) { Signal(); throw; }
85  Signal();
86  while (ReadBlocked()) {
87  if (writetagchanged) {
88  writetagchanged = false;
89  bool detect;
90  try {
91  while (incomm) { Wait(); }
92  ScopeSetter<bool> ss(incomm, true);
93  AutoUnlock<QueueBase> au(*this);
94  detect = reader->Transmit(writer->GetPublicTag());
95  } catch (...) { Signal(); throw; }
96  Signal();
97  if (detect) {
98  throw DeadlockException("True deadlock detected");
99  }
100  } else {
101  Wait();
102  }
103  }
104  }
shared_ptr< Node > reader
Definition: D4RQueue.h:122
virtual void Wait()=0
shared_ptr< Node > writer
Definition: D4RQueue.h:123
virtual bool ReadBlocked()=0
virtual void Signal()=0
bool writetagchanged
Definition: D4RQueue.h:125

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool CPN::QueueBase::ReadBlocked ( )
protectedvirtualinherited
Returns
true if we are blocked

Implements D4R::QueueBase.

Definition at line 336 of file QueueBase.cc.

References CPN::KernelBase::CheckTerminated(), CPN::QueueBase::flushed, CPN::QueueBase::kernel, CPN::QueueBase::readrequest, CPN::QueueBase::readshutdown, CPN::QueueBase::UnlockedCount(), and CPN::QueueBase::writeshutdown.

Referenced by CPN::QueueBase::GetRawEnqueuePtr(), WaitForData(), and CPN::QueueBase::WaitForData().

336  {
339  }
unsigned readrequest
Definition: QueueBase.h:279
virtual unsigned UnlockedCount() const =0
virtual void CheckTerminated()=0
KernelBase * kernel
Definition: QueueBase.h:286
bool writeshutdown
Definition: QueueBase.h:276
bool readshutdown
Definition: QueueBase.h:275

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::RemoteQueue::ReadBlockPacket ( const Packet packet)
privatevirtual

Implements CPN::PacketHandler.

Definition at line 354 of file RemoteQueue.cc.

References ASSERT, bytecount, ThresholdQueueBase::Count(), FUNC_TRACE, CPN::QueueBase::logger, mode, pendingD4RTag, CPN::ThresholdQueue::queue, CPN::QueueBase::readrequest, CPN::QueueBase::readshutdown, CPN::Packet::Requested(), UpdateClock(), CPN::QueueBase::useD4R, and WRITE.

354  {
355  UpdateClock(packet);
357  ASSERT(mode == WRITE);
359  readrequest = packet.Requested();
360  if (readrequest > queue->Count() + bytecount) {
361  if (useD4R) {
362  pendingD4RTag = true;
363  }
364  }
365  }
unsigned readrequest
Definition: QueueBase.h:279
Logger logger
Definition: QueueBase.h:288
void UpdateClock(const Packet &packet)
Definition: RemoteQueue.cc:805
ulong Count(void) const
#define FUNC_TRACE(logger)
Definition: RemoteQueue.cc:43
const Mode_t mode
Definition: RemoteQueue.h:148
bool readshutdown
Definition: QueueBase.h:275
unsigned bytecount
Definition: RemoteQueue.h:168
#define ASSERT(exp,...)

+ Here is the call graph for this function:

unsigned CPN::QueueBase::ReadRequest ( )
inherited

For unit tests.

Definition at line 380 of file QueueBase.cc.

References CPN::QueueBase::readrequest.

380  {
381  AutoLock<QueueBase> al(*this);
382  return readrequest;
383  }
unsigned readrequest
Definition: QueueBase.h:279
void CPN::PacketDecoder::ReleaseDecoderBytes ( unsigned  amount)
inherited

Definition at line 44 of file PacketDecoder.cc.

References ASSERT, CPN::PacketHandler::FirePacket(), CPN::PacketDecoder::header, CPN::Packet::header, CPN::PacketDecoder::numbytes, and CPN::Packet::Valid().

Referenced by Read().

44  {
45  ASSERT(numbytes + amount <= sizeof(header.header));
46  numbytes += amount;
47  if (numbytes == sizeof(header.header)) {
48  if (header.Valid()) {
50  numbytes = 0;
51  } else {
52  // If it is not valid, what do we do??
53  // Well, we can search for the header word
54  // wont always work...
55  char *ptr = reinterpret_cast<char*>(&header.header);
56  numbytes -= 1;
57  memmove(ptr, ptr + 1, numbytes);
58  }
59  }
60  }
PacketHeader header
Definition: PacketHeader.h:148
bool Valid() const
Definition: PacketHeader.h:134
void FirePacket(const Packet &packet)
Definition: PacketHeader.cc:31
#define ASSERT(exp,...)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::PacketDecoder::Reset ( )
inherited

Definition at line 62 of file PacketDecoder.cc.

References CPN::PacketDecoder::numbytes.

62  {
63  numbytes = 0;
64  }
void CPN::QueueBase::Reset ( )
inherited

Definition at line 181 of file QueueBase.cc.

References CPN::QueueBase::InternalReset().

181  {
182  AutoLock<const QueueBase> al(*this);
183  InternalReset();
184  }
virtual void InternalReset()
Definition: QueueBase.cc:289

+ Here is the call graph for this function:

void CPN::RemoteQueue::ResetPacket ( const Packet packet)
privatevirtual

Implements CPN::PacketHandler.

Definition at line 504 of file RemoteQueue.cc.

References CPN::QueueBase::dequeuethresh, CPN::ThresholdQueue::dequeueUseOld, CPN::QueueBase::enqueuethresh, CPN::ThresholdQueue::enqueueUseOld, CPN::QueueBase::flushed, FUNC_TRACE, CPN::QueueBase::indequeue, CPN::QueueBase::inenqueue, CPN::QueueBase::logger, CPN::ThresholdQueue::oldqueue, CPN::ThresholdQueue::queue, CPN::QueueBase::readrequest, ThresholdQueueBase::Reset(), Signal(), UpdateClock(), and CPN::QueueBase::writerequest.

504  {
505  UpdateClock(packet);
507  if (dequeueUseOld || enqueueUseOld) {
508  dequeueUseOld = false;
509  enqueueUseOld = false;
510  delete oldqueue;
511  oldqueue = 0;
512  }
513  queue->Reset();
514  readrequest = 0;
515  writerequest = 0;
516  enqueuethresh = 0;
517  dequeuethresh = 0;
518  indequeue = false;
519  inenqueue = false;
520  flushed = false;
521  Signal();
522  }
unsigned readrequest
Definition: QueueBase.h:279
Logger logger
Definition: QueueBase.h:288
void UpdateClock(const Packet &packet)
Definition: RemoteQueue.cc:805
unsigned writerequest
Definition: QueueBase.h:280
#define FUNC_TRACE(logger)
Definition: RemoteQueue.cc:43
unsigned dequeuethresh
Definition: QueueBase.h:282
unsigned enqueuethresh
Definition: QueueBase.h:281

+ Here is the call graph for this function:

void CPN::RemoteQueue::SendD4RTagPacket ( )
private

Definition at line 475 of file RemoteQueue.cc.

References CPN::Packet::DataLength(), FUNC_TRACE, CPN::QueueBase::logger, mode, CPN::PACKET_D4RTAG, READ, D4R::QueueBase::reader, CPN::PacketEncoder::SendPacket(), SetupPacket(), and D4R::QueueBase::writer.

Referenced by InternalCheckStatus().

475  {
477  Packet packet(PACKET_D4RTAG);
478  SetupPacket(packet);
479  packet.DataLength(sizeof(D4R::Tag));
480  D4R::Tag tag;
481  if (mode == READ) {
482  tag = reader->GetPublicTag();
483  } else {
484  tag = writer->GetPublicTag();
485  }
486  PacketEncoder::SendPacket(packet, &tag);
487  }
shared_ptr< Node > reader
Definition: D4RQueue.h:122
Logger logger
Definition: QueueBase.h:288
void SetupPacket(Packet &packet)
Definition: RemoteQueue.cc:233
void SendPacket(const Packet &packet)
shared_ptr< Node > writer
Definition: D4RQueue.h:123
#define FUNC_TRACE(logger)
Definition: RemoteQueue.cc:43
const Mode_t mode
Definition: RemoteQueue.h:148

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::RemoteQueue::SendDequeuePacket ( )
private

Definition at line 344 of file RemoteQueue.cc.

References bytecount, ThresholdQueueBase::Count(), CPN::Packet::Count(), FUNC_TRACE, CPN::QueueBase::logger, CPN::PACKET_DEQUEUE, CPN::ThresholdQueue::queue, CPN::PacketEncoder::SendPacket(), and SetupPacket().

Referenced by InternalCheckStatus().

344  {
346  Packet packet(PACKET_DEQUEUE);
347  SetupPacket(packet);
348  const unsigned count = bytecount - queue->Count();
349  packet.Count(count);
351  bytecount -= count;
352  }
Logger logger
Definition: QueueBase.h:288
void SetupPacket(Packet &packet)
Definition: RemoteQueue.cc:233
void SendPacket(const Packet &packet)
ulong Count(void) const
#define FUNC_TRACE(logger)
Definition: RemoteQueue.cc:43
unsigned bytecount
Definition: RemoteQueue.h:168

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::RemoteQueue::SendEndOfReadPacket ( )
private

Definition at line 425 of file RemoteQueue.cc.

References ASSERT, Logger::Debug(), FUNC_TRACE, CPN::QueueBase::logger, CPN::PACKET_ENDOFREAD, CPN::PacketEncoder::SendPacket(), sentEnd, SetupPacket(), SocketHandle::ShutdownWrite(), sock, and ErrnoException::what().

Referenced by InternalCheckStatus().

425  {
427  ASSERT(!sentEnd);
428  Packet packet(PACKET_ENDOFREAD);
429  SetupPacket(packet);
431  try {
433  } catch (const ErrnoException &e) {
434  logger.Debug("Error trying to close the read end: %s", e.what());
435  }
436  }
void ShutdownWrite()
Shutdown the write end of this socket. Any future attempt to write to this socket will fail...
Logger logger
Definition: QueueBase.h:288
void SetupPacket(Packet &packet)
Definition: RemoteQueue.cc:233
void SendPacket(const Packet &packet)
SocketHandle sock
Definition: RemoteQueue.h:153
#define FUNC_TRACE(logger)
Definition: RemoteQueue.cc:43
virtual const char * what() const
#define ASSERT(exp,...)
void Debug(const char *fmt,...)
Definition: Logger.cc:186

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::RemoteQueue::SendEndOfWritePacket ( )
private

Definition at line 404 of file RemoteQueue.cc.

References ASSERT, Logger::Debug(), FUNC_TRACE, CPN::QueueBase::logger, CPN::PACKET_ENDOFWRITE, CPN::PacketEncoder::SendPacket(), sentEnd, SetupPacket(), SocketHandle::ShutdownWrite(), sock, and ErrnoException::what().

Referenced by InternalCheckStatus().

404  {
406  ASSERT(!sentEnd);
407  Packet packet(PACKET_ENDOFWRITE);
408  SetupPacket(packet);
410  try {
412  } catch (const ErrnoException &e) {
413  logger.Debug("Error trying to close the write end: %s", e.what());
414  }
415  }
void ShutdownWrite()
Shutdown the write end of this socket. Any future attempt to write to this socket will fail...
Logger logger
Definition: QueueBase.h:288
void SetupPacket(Packet &packet)
Definition: RemoteQueue.cc:233
void SendPacket(const Packet &packet)
SocketHandle sock
Definition: RemoteQueue.h:153
#define FUNC_TRACE(logger)
Definition: RemoteQueue.cc:43
virtual const char * what() const
#define ASSERT(exp,...)
void Debug(const char *fmt,...)
Definition: Logger.cc:186

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<typename Queue_t >
void CPN::PacketEncoder::SendEnqueue ( const Packet packet,
Queue_t &  queue 
)
inlineinherited

Definition at line 44 of file PacketEncoder.h.

References ASSERT, CPN::Packet::Count(), CPN::Packet::header, CPN::PACKET_ENQUEUE, and CPN::Packet::Type().

44  {
45  ASSERT(packet.Type() == PACKET_ENQUEUE);
46  std::vector<iovec> iovs;
47  // Must use const_cast here because iovec.iov_base isn't const... :(
48  iovec header = { const_cast<PacketHeader*>(&packet.header), sizeof(packet.header) };
49  iovs.push_back(header);
50  for (unsigned i = 0; i < queue.NumChannels(); ++i) {
51  iovec iov;
52  iov.iov_base = const_cast<void*>(queue.GetRawDequeuePtr(packet.Count(), i));
53  ASSERT(iov.iov_base);
54  iov.iov_len = packet.Count();
55  iovs.push_back(iov);
56  }
57  WriteBytes(&iovs[0], iovs.size());
58  queue.Dequeue(packet.Count());
59  }
virtual void WriteBytes(const iovec *iov, unsigned iovcnt)=0
#define ASSERT(exp,...)

+ Here is the call graph for this function:

void CPN::RemoteQueue::SendEnqueuePacket ( )
private

Definition at line 294 of file RemoteQueue.cc.

References ASSERT, bytecount, ThresholdQueueBase::Count(), CPN::Packet::Count(), FUNC_TRACE, CPN::PacketDecoder::header, CPN::Packet::header, CPN::QueueBase::indequeue, CPN::ThresholdQueue::InternalDequeue(), CPN::ThresholdQueue::InternalGetRawDequeuePtr(), CPN::QueueBase::logger, ThresholdQueueBase::MaxThreshold(), maxwritethreshold, CPN::QueueBase::NotifyFreespace(), ThresholdQueueBase::NumChannels(), CPN::PACKET_ENQUEUE, CPN::ThresholdQueue::queue, readerlength, SetupPacket(), and WriteBytes().

Referenced by InternalCheckStatus().

294  {
295  unsigned count = queue->Count();
296  const unsigned numchannels = queue->NumChannels();
297  const unsigned maxthresh = queue->MaxThreshold();
298  const unsigned expectedfree = std::max(readerlength, maxthresh) - bytecount;
299  if (maxwritethreshold > 0) {
300  unsigned maxwrite = maxwritethreshold/numchannels;
301  if (maxwrite == 0) maxwrite = 1;
302  if (count > maxwrite) { count = maxwrite; }
303  }
304  if (count > maxthresh) { count = maxthresh; }
305  if (count > expectedfree) { count = expectedfree; }
306  if (count == 0) { return; }
308  const unsigned datalength = count * numchannels;
309  Packet packet(datalength, PACKET_ENQUEUE);
310  SetupPacket(packet);
311  packet.Count(count);
312 
313  std::vector<iovec> iovs;
314  iovec header = { &packet.header, sizeof(packet.header) };
315  iovs.push_back(header);
316  for (unsigned i = 0; i < numchannels; ++i) {
317  iovec iov;
318  iov.iov_base = const_cast<void*>(ThresholdQueue::InternalGetRawDequeuePtr(packet.Count(), i));
319  ASSERT(iov.iov_base);
320  iov.iov_len = packet.Count();
321  iovs.push_back(iov);
322  }
323  indequeue = true;
324  {
325  AutoUnlock<QueueBase> aul(*this);
326  WriteBytes(&iovs[0], iovs.size());
327  }
328  indequeue = false;
329  ThresholdQueue::InternalDequeue(packet.Count());
330 
331  bytecount += count;
333  }
unsigned maxwritethreshold
Definition: RemoteQueue.h:150
virtual void InternalDequeue(unsigned count)
unsigned readerlength
Definition: RemoteQueue.h:159
PacketHeader header
Definition: PacketHeader.h:148
Logger logger
Definition: QueueBase.h:288
void SetupPacket(Packet &packet)
Definition: RemoteQueue.cc:233
ulong MaxThreshold(void) const
ulong Count(void) const
ulong NumChannels(void) const
#define FUNC_TRACE(logger)
Definition: RemoteQueue.cc:43
void NotifyFreespace()
Definition: QueueBase.cc:362
virtual const void * InternalGetRawDequeuePtr(unsigned thresh, unsigned chan)
void WriteBytes(const iovec *iov, unsigned iovcnt)
Definition: RemoteQueue.cc:556
unsigned bytecount
Definition: RemoteQueue.h:168
#define ASSERT(exp,...)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::RemoteQueue::SendFlushPacket ( )
private

Definition at line 496 of file RemoteQueue.cc.

References FUNC_TRACE, CPN::QueueBase::logger, CPN::PACKET_FLUSH, pendingFlush, CPN::PacketEncoder::SendPacket(), and SetupPacket().

Referenced by InternalCheckStatus().

496  {
498  Packet packet(PACKET_FLUSH);
499  SetupPacket(packet);
501  pendingFlush = false;
502  }
Logger logger
Definition: QueueBase.h:288
void SetupPacket(Packet &packet)
Definition: RemoteQueue.cc:233
void SendPacket(const Packet &packet)
#define FUNC_TRACE(logger)
Definition: RemoteQueue.cc:43

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::RemoteQueue::SendGrowPacket ( )
private

Definition at line 449 of file RemoteQueue.cc.

References FUNC_TRACE, CPN::QueueBase::logger, ThresholdQueueBase::MaxThreshold(), CPN::Packet::MaxThreshold(), CPN::PACKET_GROW, CPN::ThresholdQueue::queue, CPN::Packet::QueueSize(), readerlength, CPN::PacketEncoder::SendPacket(), SetupPacket(), and writerlength.

Referenced by InternalCheckStatus().

449  {
451  Packet packet(PACKET_GROW);
452  SetupPacket(packet);
453  packet.QueueSize(readerlength + writerlength);
454  packet.MaxThreshold(queue->MaxThreshold());
456  }
unsigned readerlength
Definition: RemoteQueue.h:159
Logger logger
Definition: QueueBase.h:288
void SetupPacket(Packet &packet)
Definition: RemoteQueue.cc:233
void SendPacket(const Packet &packet)
unsigned writerlength
Definition: RemoteQueue.h:160
ulong MaxThreshold(void) const
#define FUNC_TRACE(logger)
Definition: RemoteQueue.cc:43

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::PacketEncoder::SendPacket ( const Packet packet)
inherited

Definition at line 32 of file PacketEncoder.cc.

References CPN::Packet::header, and CPN::PacketEncoder::WriteBytes().

Referenced by SendD4RTagPacket(), SendDequeuePacket(), SendEndOfReadPacket(), SendEndOfWritePacket(), SendFlushPacket(), SendGrowPacket(), SendReadBlockPacket(), SendResetPacket(), and SendWriteBlockPacket().

32  {
33  iovec iov;
34  iov.iov_base = const_cast<PacketHeader*>(&packet.header);
35  iov.iov_len = sizeof(packet.header);
36  WriteBytes(&iov, 1);
37  }
virtual void WriteBytes(const iovec *iov, unsigned iovcnt)=0

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::PacketEncoder::SendPacket ( const Packet packet,
void *  data 
)
inherited

Definition at line 39 of file PacketEncoder.cc.

References ASSERT, CPN::Packet::DataLength(), CPN::Packet::header, and CPN::PacketEncoder::WriteBytes().

39  {
40  ASSERT(data);
41  iovec iov[2];
42  iov[0].iov_base = const_cast<PacketHeader*>(&packet.header);
43  iov[0].iov_len = sizeof(packet.header);
44  iov[1].iov_base = data;
45  iov[1].iov_len = packet.DataLength();
46  WriteBytes(&iov[0], 2);
47  }
virtual void WriteBytes(const iovec *iov, unsigned iovcnt)=0
#define ASSERT(exp,...)

+ Here is the call graph for this function:

void CPN::RemoteQueue::SendReadBlockPacket ( )
private

Definition at line 367 of file RemoteQueue.cc.

References FUNC_TRACE, CPN::QueueBase::logger, CPN::PACKET_READBLOCK, CPN::QueueBase::readrequest, CPN::Packet::Requested(), CPN::PacketEncoder::SendPacket(), and SetupPacket().

Referenced by InternalCheckStatus().

367  {
369  Packet packet(PACKET_READBLOCK);
370  SetupPacket(packet);
371  packet.Requested(readrequest);
373  }
unsigned readrequest
Definition: QueueBase.h:279
Logger logger
Definition: QueueBase.h:288
void SetupPacket(Packet &packet)
Definition: RemoteQueue.cc:233
void SendPacket(const Packet &packet)
#define FUNC_TRACE(logger)
Definition: RemoteQueue.cc:43

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::RemoteQueue::SendResetPacket ( )
private

Definition at line 524 of file RemoteQueue.cc.

References FUNC_TRACE, CPN::QueueBase::logger, CPN::PACKET_RESET, pendingReset, CPN::PacketEncoder::SendPacket(), and SetupPacket().

Referenced by InternalCheckStatus().

524  {
526  Packet packet(PACKET_RESET);
527  SetupPacket(packet);
529  pendingReset = false;
530  }
Logger logger
Definition: QueueBase.h:288
void SetupPacket(Packet &packet)
Definition: RemoteQueue.cc:233
void SendPacket(const Packet &packet)
#define FUNC_TRACE(logger)
Definition: RemoteQueue.cc:43

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::RemoteQueue::SendWriteBlockPacket ( )
private

Definition at line 388 of file RemoteQueue.cc.

References FUNC_TRACE, CPN::QueueBase::logger, CPN::PACKET_WRITEBLOCK, CPN::Packet::Requested(), CPN::PacketEncoder::SendPacket(), SetupPacket(), and CPN::QueueBase::writerequest.

Referenced by InternalCheckStatus().

388  {
390  Packet packet(PACKET_WRITEBLOCK);
391  SetupPacket(packet);
392  packet.Requested(writerequest);
394  }
Logger logger
Definition: QueueBase.h:288
void SetupPacket(Packet &packet)
Definition: RemoteQueue.cc:233
void SendPacket(const Packet &packet)
unsigned writerequest
Definition: QueueBase.h:280
#define FUNC_TRACE(logger)
Definition: RemoteQueue.cc:43

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void D4R::QueueBase::SetReaderNode ( shared_ptr< Node n)
inherited

Set the node which is reading from this queue

Parameters
nthe node

Definition at line 59 of file D4RQueue.cc.

References D4R::QueueBase::reader, and D4R::QueueBase::Signal().

Referenced by RemoteQueue().

59  {
60  AutoLock<QueueBase> al(*this);
61  reader = n;
62  Signal();
63  }
shared_ptr< Node > reader
Definition: D4RQueue.h:122
virtual void Signal()=0

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::RemoteQueue::SetupPacket ( Packet packet)
private

Definition at line 233 of file RemoteQueue.cc.

References clock, mode, READ, CPN::Packet::ReadClock(), readclock, TickClock(), and writeclock.

Referenced by SendD4RTagPacket(), SendDequeuePacket(), SendEndOfReadPacket(), SendEndOfWritePacket(), SendEnqueuePacket(), SendFlushPacket(), SendGrowPacket(), SendReadBlockPacket(), SendResetPacket(), and SendWriteBlockPacket().

233  {
234  TickClock();
235  if (mode == READ) {
236  packet.ReadClock(clock).WriteClock(writeclock);
237  } else {
238  packet.ReadClock(readclock).WriteClock(clock);
239  }
240  }
const Mode_t mode
Definition: RemoteQueue.h:148
uint64_t readclock
Definition: RemoteQueue.h:156
uint64_t writeclock
Definition: RemoteQueue.h:157

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void D4R::QueueBase::SetWriterNode ( shared_ptr< Node n)
inherited

Set the node which is writing to this queue.

Parameters
nthe node

Definition at line 65 of file D4RQueue.cc.

References D4R::QueueBase::Signal(), and D4R::QueueBase::writer.

Referenced by RemoteQueue().

65  {
66  AutoLock<QueueBase> al(*this);
67  writer = n;
68  Signal();
69  }
shared_ptr< Node > writer
Definition: D4RQueue.h:123
virtual void Signal()=0

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::RemoteQueue::Shutdown ( )
virtual

Tell the queue that CPN is shutting down and the queue should start cleaning up.

Implements CPN::RemoteQueueBase.

Definition at line 113 of file RemoteQueue.cc.

References UnlockedShutdown().

Referenced by FileThreadEntryPoint().

113  {
114  AutoLock<const QueueBase> al(*this);
116  }
void UnlockedShutdown()
Definition: RemoteQueue.cc:118

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::QueueBase::ShutdownReader ( )
inherited

Called by the QueueReader when no more data will be read.

Definition at line 250 of file QueueBase.cc.

References CPN::QueueBase::UnlockedShutdownReader().

Referenced by FileThreadEntryPoint().

250  {
251  AutoLock<QueueBase> al(*this);
253  }
virtual void UnlockedShutdownReader()
Definition: QueueBase.cc:255

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::QueueBase::ShutdownWriter ( )
inherited

Called by the QueueWriter when no more data will be written.

Definition at line 260 of file QueueBase.cc.

References CPN::QueueBase::UnlockedShutdownWriter().

Referenced by FileThreadEntryPoint().

260  {
261  AutoLock<QueueBase> al(*this);
263  }
virtual void UnlockedShutdownWriter()
Definition: QueueBase.cc:311

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::RemoteQueue::Signal ( )
privatevirtual

Signal that Wait should return

Reimplemented from CPN::QueueBase.

Definition at line 657 of file RemoteQueue.cc.

References pendingAction, and CPN::QueueBase::Signal().

Referenced by D4RTagPacket(), FileThreadEntryPoint(), FlushPacket(), HandleError(), InternalDequeue(), InternalEnqueue(), InternalFlush(), InternalReset(), ResetPacket(), UnlockedGrow(), UnlockedShutdown(), WaitForData(), WaitForFreespace(), and ~RemoteQueue().

657  {
658  pendingAction = true;
660  }
virtual void Signal()
Definition: QueueBase.h:247

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void D4R::QueueBase::SignalReaderTagChanged ( )
inherited

These functions are called by the D4R::Node when thier tag changes.

Definition at line 139 of file D4RQueue.cc.

References D4R::QueueBase::UnlockedSignalReaderTagChanged().

139  {
140  AutoLock<QueueBase> al(*this);
142  }
virtual void UnlockedSignalReaderTagChanged()
Definition: D4RQueue.cc:144

+ Here is the call graph for this function:

void D4R::QueueBase::SignalWriterTagChanged ( )
inherited

Definition at line 150 of file D4RQueue.cc.

References D4R::QueueBase::UnlockedSignalWriterTagChanged().

150  {
151  AutoLock<QueueBase> al(*this);
153  }
virtual void UnlockedSignalWriterTagChanged()
Definition: D4RQueue.cc:155

+ Here is the call graph for this function:

void CPN::RemoteQueue::Start ( void  )
virtual

Start the internals of this queue.

Implements CPN::RemoteQueueBase.

Definition at line 109 of file RemoteQueue.cc.

References fileThread.

109  {
110  fileThread->Start();
111  }
auto_ptr< Pthread > fileThread
Definition: RemoteQueue.h:142
void CPN::RemoteQueue::TickClock ( )
private

Definition at line 815 of file RemoteQueue.cc.

References clock.

Referenced by SetupPacket().

815  {
816  ++clock;
817  }

+ Here is the caller graph for this function:

void CPN::QueueBase::Unlock ( void  ) const
inlinevirtualinherited

Implements D4R::QueueBase.

Definition at line 222 of file QueueBase.h.

References lock, and PthreadMutex::Unlock().

222 { lock.Unlock(); }
PthreadMutex & Unlock(void)
Definition: PthreadMutex.h:50
PthreadMutex lock
Definition: QueueBase.h:289

+ Here is the call graph for this function:

unsigned CPN::RemoteQueue::UnlockedCount ( ) const
privatevirtual

Reimplemented from CPN::ThresholdQueue.

Definition at line 125 of file RemoteQueue.cc.

References bytecount, ThresholdQueueBase::Count(), mode, CPN::ThresholdQueue::queue, and READ.

Referenced by GetState().

125  {
126  if (mode == READ) {
127  return queue->Count();
128  } else {
129  return queue->Count() + bytecount;
130  }
131  }
ulong Count(void) const
const Mode_t mode
Definition: RemoteQueue.h:148
unsigned bytecount
Definition: RemoteQueue.h:168

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

unsigned ThresholdQueue< T >::UnlockedDequeueChannelStride ( ) const
protectedvirtualinherited

Implements CPN::QueueBase.

Definition at line 110 of file CPNThresholdQueue.cc.

References ThresholdQueueBase::ChannelStride(), CPN::ThresholdQueue::dequeueUseOld, CPN::ThresholdQueue::oldqueue, and CPN::ThresholdQueue::queue.

110  {
111  if (dequeueUseOld) {
112  return oldqueue->ChannelStride();
113  } else {
114  return queue->ChannelStride();
115  }
116  }
ulong ChannelStride(void) const

+ Here is the call graph for this function:

bool CPN::RemoteQueue::UnlockedEmpty ( ) const
privatevirtual

Reimplemented from CPN::ThresholdQueue.

Definition at line 133 of file RemoteQueue.cc.

References bytecount, ThresholdQueueBase::Empty(), mode, CPN::ThresholdQueue::queue, and READ.

133  {
134  if (mode == READ) {
135  return queue->Empty();
136  } else {
137  return queue->Empty() && (bytecount == 0);
138  }
139  }
const Mode_t mode
Definition: RemoteQueue.h:148
bool Empty(void) const
unsigned bytecount
Definition: RemoteQueue.h:168

+ Here is the call graph for this function:

unsigned ThresholdQueue< T >::UnlockedEnqueueChannelStride ( ) const
protectedvirtualinherited

Implements CPN::QueueBase.

Definition at line 102 of file CPNThresholdQueue.cc.

References ThresholdQueueBase::ChannelStride(), CPN::ThresholdQueue::enqueueUseOld, CPN::ThresholdQueue::oldqueue, and CPN::ThresholdQueue::queue.

102  {
103  if (enqueueUseOld) {
104  return oldqueue->ChannelStride();
105  } else {
106  return queue->ChannelStride();
107  }
108  }
ulong ChannelStride(void) const

+ Here is the call graph for this function:

unsigned ThresholdQueue< T >::UnlockedFreespace ( ) const
protectedvirtualinherited

Implements CPN::QueueBase.

Definition at line 118 of file CPNThresholdQueue.cc.

References ThresholdQueueBase::Freespace(), and CPN::ThresholdQueue::queue.

Referenced by GetState().

118  {
119  return queue->Freespace();
120  }
ulong Freespace(void) const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool ThresholdQueue< T >::UnlockedFull ( ) const
protectedvirtualinherited

Implements CPN::QueueBase.

Definition at line 122 of file CPNThresholdQueue.cc.

References ThresholdQueueBase::Full(), and CPN::ThresholdQueue::queue.

122  {
123  return queue->Full();
124  }
bool Full(void) const

+ Here is the call graph for this function:

void CPN::RemoteQueue::UnlockedGrow ( unsigned  queueLen,
unsigned  maxThresh 
)
privatevirtual

Reimplemented from CPN::ThresholdQueue.

Definition at line 145 of file RemoteQueue.cc.

References actionCond, alpha, dead, FUNC_TRACE, CPN::QueueBase::lock, CPN::QueueBase::logger, ThresholdQueueBase::MaxThreshold(), mode, pendingGrow, CPN::ThresholdQueue::queue, CPN::QueueBase::QueueLength(), READ, readerlength, Signal(), CPN::ThresholdQueue::UnlockedGrow(), CPN::ThresholdQueue::UnlockedMaxThreshold(), UnlockedQueueLength(), PthreadCondition::Wait(), WRITE, and writerlength.

145  {
147  if (queueLen <= UnlockedQueueLength() && maxThresh <= UnlockedMaxThreshold()) return;
148  while (pendingGrow && !dead) {
149  Signal();
151  }
152  const unsigned maxthresh = std::max<unsigned>(queue->MaxThreshold(), maxThresh);
153  readerlength = QueueLength(queueLen, maxthresh, alpha, READ);
154  writerlength = QueueLength(queueLen, maxthresh, alpha, WRITE);
155  const unsigned newlen = (mode == WRITE ? writerlength : readerlength);
156  ThresholdQueue::UnlockedGrow(newlen, maxthresh);
157  pendingGrow = true;
158  Signal();
159  }
PthreadCondition & Wait(PthreadMutex &mutex)
virtual void UnlockedGrow(unsigned queueLen, unsigned maxThresh)
unsigned QueueLength() const
Definition: QueueBase.cc:230
virtual unsigned UnlockedMaxThreshold() const
unsigned readerlength
Definition: RemoteQueue.h:159
Logger logger
Definition: QueueBase.h:288
unsigned writerlength
Definition: RemoteQueue.h:160
ulong MaxThreshold(void) const
#define FUNC_TRACE(logger)
Definition: RemoteQueue.cc:43
const Mode_t mode
Definition: RemoteQueue.h:148
PthreadCondition actionCond
Definition: RemoteQueue.h:146
const double alpha
Definition: RemoteQueue.h:149
unsigned UnlockedQueueLength() const
Definition: RemoteQueue.cc:141
PthreadMutex lock
Definition: QueueBase.h:289

+ Here is the call graph for this function:

unsigned ThresholdQueue< T >::UnlockedMaxThreshold ( ) const
protectedvirtualinherited

Implements CPN::QueueBase.

Definition at line 157 of file CPNThresholdQueue.cc.

References ThresholdQueueBase::MaxThreshold(), and CPN::ThresholdQueue::queue.

Referenced by GetState(), and UnlockedGrow().

157  {
158  return queue->MaxThreshold();
159  }
ulong MaxThreshold(void) const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

unsigned ThresholdQueue< T >::UnlockedNumChannels ( ) const
protectedvirtualinherited

Implements CPN::QueueBase.

Definition at line 98 of file CPNThresholdQueue.cc.

References ThresholdQueueBase::NumChannels(), and CPN::ThresholdQueue::queue.

98  {
99  return queue->NumChannels();
100  }
ulong NumChannels(void) const

+ Here is the call graph for this function:

unsigned ThresholdQueue< T >::UnlockedNumDequeued ( ) const
protectedvirtualinherited

Implements CPN::QueueBase.

Definition at line 169 of file CPNThresholdQueue.cc.

References ThresholdQueueBase::ElementsDequeued(), and CPN::ThresholdQueue::queue.

Referenced by GetState().

169  {
170  return queue->ElementsDequeued();
171  }
ulong ElementsDequeued(void) const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

unsigned ThresholdQueue< T >::UnlockedNumEnqueued ( ) const
protectedvirtualinherited

Implements CPN::QueueBase.

Definition at line 165 of file CPNThresholdQueue.cc.

References ThresholdQueueBase::ElementsEnqueued(), and CPN::ThresholdQueue::queue.

Referenced by GetState().

165  {
166  return queue->ElementsEnqueued();
167  }
ulong ElementsEnqueued(void) const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

unsigned CPN::RemoteQueue::UnlockedQueueLength ( ) const
privatevirtual

Reimplemented from CPN::ThresholdQueue.

Definition at line 141 of file RemoteQueue.cc.

References readerlength, and writerlength.

Referenced by GetState(), and UnlockedGrow().

141  {
142  return readerlength + writerlength;
143  }
unsigned readerlength
Definition: RemoteQueue.h:159
unsigned writerlength
Definition: RemoteQueue.h:160

+ Here is the caller graph for this function:

void CPN::RemoteQueue::UnlockedShutdown ( )
private

Definition at line 118 of file RemoteQueue.cc.

References actionCond, PthreadCondition::Broadcast(), dead, FUNC_TRACE, CPN::QueueBase::logger, and Signal().

Referenced by InternalCheckStatus(), and Shutdown().

118  {
120  dead = true;
122  Signal();
123  }
PthreadCondition & Broadcast(void)
Logger logger
Definition: QueueBase.h:288
#define FUNC_TRACE(logger)
Definition: RemoteQueue.cc:43
PthreadCondition actionCond
Definition: RemoteQueue.h:146

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::QueueBase::UnlockedShutdownReader ( )
protectedvirtualinherited

Definition at line 255 of file QueueBase.cc.

References CPN::QueueBase::readshutdown, and CPN::QueueBase::Signal().

Referenced by EndOfReadPacket(), InternalCheckStatus(), and CPN::QueueBase::ShutdownReader().

255  {
256  readshutdown = true;
257  Signal();
258  }
virtual void Signal()
Definition: QueueBase.h:247
bool readshutdown
Definition: QueueBase.h:275

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::QueueBase::UnlockedShutdownWriter ( )
protectedvirtualinherited

Definition at line 311 of file QueueBase.cc.

References CPN::QueueBase::Signal(), and CPN::QueueBase::writeshutdown.

Referenced by EndOfWritePacket(), InternalCheckStatus(), and CPN::QueueBase::ShutdownWriter().

311  {
312  writeshutdown = true;
313  Signal();
314  }
virtual void Signal()
Definition: QueueBase.h:247
bool writeshutdown
Definition: QueueBase.h:276

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::RemoteQueue::UnlockedSignalReaderTagChanged ( )
privatevirtual

Reimplemented from D4R::QueueBase.

Definition at line 219 of file RemoteQueue.cc.

References ASSERT, FUNC_TRACE, CPN::QueueBase::logger, mode, pendingD4RTag, READ, and D4R::QueueBase::UnlockedSignalReaderTagChanged().

219  {
221  ASSERT(mode == READ);
222  pendingD4RTag = true;
224  }
Logger logger
Definition: QueueBase.h:288
#define FUNC_TRACE(logger)
Definition: RemoteQueue.cc:43
virtual void UnlockedSignalReaderTagChanged()
Definition: D4RQueue.cc:144
const Mode_t mode
Definition: RemoteQueue.h:148
#define ASSERT(exp,...)

+ Here is the call graph for this function:

void CPN::RemoteQueue::UnlockedSignalWriterTagChanged ( )
privatevirtual

Reimplemented from D4R::QueueBase.

Definition at line 226 of file RemoteQueue.cc.

References ASSERT, FUNC_TRACE, CPN::QueueBase::logger, mode, pendingD4RTag, D4R::QueueBase::UnlockedSignalWriterTagChanged(), and WRITE.

226  {
228  ASSERT(mode == WRITE);
229  pendingD4RTag = true;
231  }
Logger logger
Definition: QueueBase.h:288
#define FUNC_TRACE(logger)
Definition: RemoteQueue.cc:43
virtual void UnlockedSignalWriterTagChanged()
Definition: D4RQueue.cc:155
const Mode_t mode
Definition: RemoteQueue.h:148
#define ASSERT(exp,...)

+ Here is the call graph for this function:

void CPN::RemoteQueue::UpdateClock ( const Packet packet)
private

Definition at line 805 of file RemoteQueue.cc.

References clock, mode, READ, CPN::Packet::ReadClock(), readclock, CPN::Packet::WriteClock(), and writeclock.

Referenced by D4RTagPacket(), DequeuePacket(), EndOfReadPacket(), EndOfWritePacket(), EnqueuePacket(), FlushPacket(), GrowPacket(), ReadBlockPacket(), ResetPacket(), and WriteBlockPacket().

805  {
806  writeclock = packet.WriteClock();
807  readclock = packet.ReadClock();
808  if (mode == READ) {
809  clock = std::max(writeclock, clock) + 1;
810  } else {
811  clock = std::max(readclock, clock) + 1;
812  }
813  }
const Mode_t mode
Definition: RemoteQueue.h:148
uint64_t readclock
Definition: RemoteQueue.h:156
uint64_t writeclock
Definition: RemoteQueue.h:157

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

virtual void CPN::QueueBase::Wait ( )
inlineprotectedvirtualinherited

Wait untill Signal is called. Must be holding the lock to call.

Implements D4R::QueueBase.

Definition at line 246 of file QueueBase.h.

References lock.

Referenced by ActionThreadEntryPoint(), CPN::QueueBase::GetRawEnqueuePtr(), WaitForData(), and WaitForFreespace().

246 { cond.Wait(lock); }
PthreadCondition & Wait(PthreadMutex &mutex)
PthreadCondition cond
Definition: QueueBase.h:290
PthreadMutex lock
Definition: QueueBase.h:289

+ Here is the caller graph for this function:

void CPN::RemoteQueue::WaitForData ( )
privatevirtual

Reimplemented from CPN::QueueBase.

Definition at line 161 of file RemoteQueue.cc.

References ASSERT, clock, FUNC_TRACE, CPN::QueueBase::logger, mode, pendingBlock, READ, CPN::QueueBase::ReadBlocked(), readclock, Signal(), tagUpdated, CPN::QueueBase::Wait(), and CPN::QueueBase::WaitForData().

161  {
163  ASSERT(mode == READ);
164  pendingBlock = true;
165  tagUpdated = false;
166  uint64_t saveclock = clock + 1;
167  Signal();
168  while (ReadBlocked() && (!tagUpdated || saveclock > readclock)) {
169  Wait();
170  }
171  if (ReadBlocked() && tagUpdated) {
173  }
174  }
virtual bool ReadBlocked()
Definition: QueueBase.cc:336
Logger logger
Definition: QueueBase.h:288
virtual void Wait()
Definition: QueueBase.h:246
virtual void WaitForData()
Definition: QueueBase.cc:326
#define FUNC_TRACE(logger)
Definition: RemoteQueue.cc:43
const Mode_t mode
Definition: RemoteQueue.h:148
uint64_t readclock
Definition: RemoteQueue.h:156
#define ASSERT(exp,...)

+ Here is the call graph for this function:

void CPN::RemoteQueue::WaitForFreespace ( )
privatevirtual

Reimplemented from CPN::QueueBase.

Definition at line 176 of file RemoteQueue.cc.

References ASSERT, clock, FUNC_TRACE, CPN::QueueBase::logger, mode, pendingBlock, Signal(), tagUpdated, CPN::QueueBase::Wait(), CPN::QueueBase::WaitForFreespace(), WRITE, CPN::QueueBase::WriteBlocked(), and writeclock.

176  {
178  ASSERT(mode == WRITE);
179  pendingBlock = true;
180  tagUpdated = false;
181  uint64_t saveclock = clock + 1;
182  Signal();
183  while (WriteBlocked() && (!tagUpdated || saveclock > writeclock)) {
184  Wait();
185  }
186  if (WriteBlocked() && tagUpdated) {
188  }
189  }
Logger logger
Definition: QueueBase.h:288
virtual void WaitForFreespace()
Definition: QueueBase.cc:347
virtual void Wait()
Definition: QueueBase.h:246
virtual bool WriteBlocked()
Definition: QueueBase.cc:357
#define FUNC_TRACE(logger)
Definition: RemoteQueue.cc:43
const Mode_t mode
Definition: RemoteQueue.h:148
#define ASSERT(exp,...)
uint64_t writeclock
Definition: RemoteQueue.h:157

+ Here is the call graph for this function:

void D4R::QueueBase::WriteBlock ( unsigned  qsize)
protectedinherited

writer ===> reader WriteBlock requires that you already hold the lock and if it is reentrant then a single unlock will release the lock.

Definition at line 106 of file D4RQueue.cc.

References D4R::QueueBase::Detect(), D4R::QueueBase::incomm, D4R::QueueBase::reader, D4R::QueueBase::readtagchanged, D4R::QueueBase::Signal(), D4R::QueueBase::Wait(), D4R::QueueBase::WriteBlocked(), and D4R::QueueBase::writer.

Referenced by CPN::QueueBase::GetRawEnqueuePtr(), and CPN::QueueBase::WaitForFreespace().

106  {
107  if (!reader) {
108  while (WriteBlocked() && !reader) {
109  Wait();
110  }
111  }
112  if (!WriteBlocked()) { return; }
113  readtagchanged = false;
114  try {
115  while (incomm) { Wait(); }
116  ScopeSetter<bool> ss(incomm, true);
117  AutoUnlock<QueueBase> au(*this);
118  writer->Block(reader->GetPublicTag(), qsize);
119  } catch (...) { Signal(); throw; }
120  Signal();
121  while (WriteBlocked()) {
122  if (readtagchanged) {
123  readtagchanged = false;
124  bool detect;
125  try {
126  while (incomm) { Wait(); }
127  ScopeSetter<bool> ss(incomm, true);
128  AutoUnlock<QueueBase> au(*this);
129  detect = writer->Transmit(reader->GetPublicTag());
130  } catch (...) { Signal(); throw; }
131  Signal();
132  if (detect) { Detect(); }
133  } else {
134  Wait();
135  }
136  }
137  }
virtual void Detect()=0
shared_ptr< Node > reader
Definition: D4RQueue.h:122
virtual bool WriteBlocked()=0
virtual void Wait()=0
shared_ptr< Node > writer
Definition: D4RQueue.h:123
bool readtagchanged
Definition: D4RQueue.h:124
virtual void Signal()=0

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool CPN::QueueBase::WriteBlocked ( )
protectedvirtualinherited
Returns
true if we are blocked.

Implements D4R::QueueBase.

Definition at line 357 of file QueueBase.cc.

References CPN::KernelBase::CheckTerminated(), CPN::QueueBase::flushed, CPN::QueueBase::kernel, CPN::QueueBase::readshutdown, CPN::QueueBase::UnlockedFreespace(), CPN::QueueBase::writerequest, and CPN::QueueBase::writeshutdown.

Referenced by CPN::QueueBase::GetRawDequeuePtr(), WaitForFreespace(), and CPN::QueueBase::WaitForFreespace().

357  {
360  }
unsigned writerequest
Definition: QueueBase.h:280
virtual unsigned UnlockedFreespace() const =0
virtual void CheckTerminated()=0
KernelBase * kernel
Definition: QueueBase.h:286
bool writeshutdown
Definition: QueueBase.h:276
bool readshutdown
Definition: QueueBase.h:275

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void CPN::RemoteQueue::WriteBlockPacket ( const Packet packet)
privatevirtual

Implements CPN::PacketHandler.

Definition at line 375 of file RemoteQueue.cc.

References ASSERT, ThresholdQueueBase::Count(), FUNC_TRACE, CPN::QueueBase::logger, mode, pendingD4RTag, CPN::ThresholdQueue::queue, READ, readerlength, CPN::Packet::Requested(), UpdateClock(), CPN::QueueBase::useD4R, CPN::QueueBase::writerequest, and CPN::QueueBase::writeshutdown.

375  {
376  UpdateClock(packet);
378  ASSERT(mode == READ);
380  writerequest = packet.Requested();
382  if (useD4R) {
383  pendingD4RTag = true;
384  }
385  }
386  }
unsigned readerlength
Definition: RemoteQueue.h:159
Logger logger
Definition: QueueBase.h:288
void UpdateClock(const Packet &packet)
Definition: RemoteQueue.cc:805
unsigned writerequest
Definition: QueueBase.h:280
ulong Count(void) const
#define FUNC_TRACE(logger)
Definition: RemoteQueue.cc:43
bool writeshutdown
Definition: QueueBase.h:276
const Mode_t mode
Definition: RemoteQueue.h:148
#define ASSERT(exp,...)

+ Here is the call graph for this function:

void CPN::RemoteQueue::WriteBytes ( const iovec *  iov,
unsigned  iovcnt 
)
privatevirtual

Implements CPN::PacketEncoder.

Definition at line 556 of file RemoteQueue.cc.

References ASSERT, sock, FileHandle::Write(), and FileHandle::Writev().

Referenced by SendEnqueuePacket().

556  {
557  unsigned total = 0;
558  for (unsigned i = 0; i < iovcnt; ++i) { total += iov[i].iov_len; }
559  unsigned numwritten = 0;
560  while (numwritten < total) {
561  unsigned num = sock.Writev(iov, iovcnt);
562  numwritten += num;
563  if (numwritten == total) break;
564  while (iov->iov_len <= num) {
565  num -= iov->iov_len;
566  ++iov;
567  --iovcnt;
568  }
569  if (num > 0) {
570  // Finish writing this section...
571  unsigned amount = num;
572  while (amount < iov->iov_len) {
573  amount += sock.Write(((char*)iov->iov_base) + amount, iov->iov_len - amount);
574  }
575  ++iov;
576  --iovcnt;
577  numwritten += amount - num;
578  }
579  }
580  ASSERT(total == numwritten);
581  }
SocketHandle sock
Definition: RemoteQueue.h:153
unsigned Writev(const iovec *iov, int iovcnt)
scatter gather io version of Write
Definition: FileHandle.cc:261
#define ASSERT(exp,...)
unsigned Write(const void *ptr, unsigned len)
Write data to the file descriptor.
Definition: FileHandle.cc:225

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

unsigned CPN::QueueBase::WriteRequest ( )
inherited

For unit tests.

Definition at line 385 of file QueueBase.cc.

References CPN::QueueBase::writerequest.

385  {
386  AutoLock<QueueBase> al(*this);
387  return writerequest;
388  }
unsigned writerequest
Definition: QueueBase.h:280

Member Data Documentation

PthreadCondition CPN::RemoteQueue::actionCond
private
auto_ptr<Pthread> CPN::RemoteQueue::actionThread
private

Definition at line 143 of file RemoteQueue.h.

Referenced by FileThreadEntryPoint(), LogState(), RemoteQueue(), and ~RemoteQueue().

bool CPN::RemoteQueue::actionTick
private

Definition at line 145 of file RemoteQueue.h.

Referenced by ActionThreadEntryPoint(), and FileThreadEntryPoint().

const double CPN::RemoteQueue::alpha
private

Definition at line 149 of file RemoteQueue.h.

Referenced by GrowPacket(), and UnlockedGrow().

unsigned CPN::RemoteQueue::bytecount
private

When in write mode this is the number of bytes that we think are in the queue on the other side. When in read mode this is the number of bytes that we have read from the socket minus the number of bytes that have been dequeued.

Definition at line 168 of file RemoteQueue.h.

Referenced by DequeuePacket(), EnqueuePacket(), GetState(), InternalCheckStatus(), LogState(), ReadBlockPacket(), SendDequeuePacket(), SendEnqueuePacket(), UnlockedCount(), and UnlockedEmpty().

uint64_t CPN::RemoteQueue::clock
private
PthreadCondition CPN::QueueBase::cond
protectedinherited
std::string CPN::QueueBase::datatype
protectedinherited

Definition at line 291 of file QueueBase.h.

bool CPN::RemoteQueue::dead
private
unsigned CPN::QueueBase::dequeuethresh
protectedinherited
bool CPN::ThresholdQueue::dequeueUseOld
protectedinherited
unsigned CPN::QueueBase::enqueuethresh
protectedinherited
bool CPN::ThresholdQueue::enqueueUseOld
protectedinherited
auto_ptr<Pthread> CPN::RemoteQueue::fileThread
private

Definition at line 142 of file RemoteQueue.h.

Referenced by LogState(), RemoteQueue(), Start(), and ~RemoteQueue().

bool CPN::QueueBase::flushed
protectedinherited
RemoteQueueHolder* const CPN::RemoteQueue::holder
private

Definition at line 152 of file RemoteQueue.h.

Referenced by FileThreadEntryPoint().

bool D4R::QueueBase::incomm
protectedinherited

Definition at line 126 of file D4RQueue.h.

Referenced by D4R::QueueBase::ReadBlock(), and D4R::QueueBase::WriteBlock().

bool CPN::QueueBase::indequeue
protectedinherited
bool CPN::QueueBase::inenqueue
protectedinherited
KernelBase* CPN::QueueBase::kernel
protectedinherited
PthreadMutex CPN::QueueBase::lock
mutableprotectedinherited
Logger CPN::QueueBase::logger
protectedinherited
unsigned CPN::RemoteQueue::maxwritethreshold
private

Definition at line 150 of file RemoteQueue.h.

Referenced by SendEnqueuePacket().

shared_ptr<D4R::Node> CPN::RemoteQueue::mocknode
private

Definition at line 154 of file RemoteQueue.h.

Referenced by D4RTagPacket(), and RemoteQueue().

const Mode_t CPN::RemoteQueue::mode
private
TQImpl* CPN::ThresholdQueue::oldqueue
protectedinherited
bool CPN::RemoteQueue::pendingAction
private

Definition at line 144 of file RemoteQueue.h.

Referenced by ActionThreadEntryPoint(), and Signal().

bool CPN::RemoteQueue::pendingBlock
private

Definition at line 170 of file RemoteQueue.h.

Referenced by GetState(), InternalCheckStatus(), LogState(), WaitForData(), and WaitForFreespace().

bool CPN::RemoteQueue::pendingD4RTag
private
bool CPN::RemoteQueue::pendingFlush
private

Definition at line 175 of file RemoteQueue.h.

Referenced by InternalCheckStatus(), InternalFlush(), LogState(), and SendFlushPacket().

bool CPN::RemoteQueue::pendingGrow
private

Definition at line 172 of file RemoteQueue.h.

Referenced by GetState(), InternalCheckStatus(), LogState(), and UnlockedGrow().

bool CPN::RemoteQueue::pendingReset
private

Definition at line 176 of file RemoteQueue.h.

Referenced by InternalCheckStatus(), InternalReset(), LogState(), and SendResetPacket().

unsigned CPN::QueueBase::postpad
protectedinherited

Definition at line 278 of file QueueBase.h.

Referenced by CPN::QueueBase::InternalFlush().

unsigned CPN::QueueBase::prepad
protectedinherited

Definition at line 277 of file QueueBase.h.

Referenced by CPN::QueueBase::InternalReset().

TQImpl* CPN::ThresholdQueue::queue
protectedinherited
uint64_t CPN::RemoteQueue::readclock
private

Definition at line 156 of file RemoteQueue.h.

Referenced by ClockString(), EnqueuePacket(), SetupPacket(), UpdateClock(), and WaitForData().

shared_ptr<Node> D4R::QueueBase::reader
protectedinherited
const Key_t CPN::QueueBase::readerkey
protectedinherited
unsigned CPN::RemoteQueue::readerlength
private
unsigned CPN::QueueBase::readrequest
protectedinherited
bool CPN::QueueBase::readshutdown
protectedinherited
bool D4R::QueueBase::readtagchanged
protectedinherited
bool CPN::RemoteQueue::sentEnd
private
ConnectionServer* const CPN::RemoteQueue::server
private

Definition at line 151 of file RemoteQueue.h.

Referenced by FileThreadEntryPoint().

SocketHandle CPN::RemoteQueue::sock
private
bool CPN::RemoteQueue::tagUpdated
private

Definition at line 174 of file RemoteQueue.h.

Referenced by D4RTagPacket(), WaitForData(), and WaitForFreespace().

bool CPN::QueueBase::useD4R
protectedinherited
uint64_t CPN::RemoteQueue::writeclock
private

Definition at line 157 of file RemoteQueue.h.

Referenced by ClockString(), EnqueuePacket(), SetupPacket(), UpdateClock(), and WaitForFreespace().

shared_ptr<Node> D4R::QueueBase::writer
protectedinherited
unsigned CPN::QueueBase::writerequest
protectedinherited
const Key_t CPN::QueueBase::writerkey
protectedinherited
unsigned CPN::RemoteQueue::writerlength
private
bool CPN::QueueBase::writeshutdown
protectedinherited
bool D4R::QueueBase::writetagchanged
protectedinherited

The documentation for this class was generated from the following files: