23 #include "common_priv.h"
41 #define FUNC_TRACE(logger) logger.Trace("%s %s", __PRETTY_FUNCTION__, GetState().c_str())
43 #define FUNC_TRACE(logger)
52 QueueLength(attr.GetLength(), attr.GetMaxThreshold(), attr.GetAlpha(), mode_), usembs),
56 alpha(attr.GetAlpha()),
57 maxwritethreshold(attr.GetMaxWriteThreshold()),
60 mocknode(new D4R::Node(mode_ == READ ? attr.GetWriterNodeKey() : attr.GetReaderNodeKey())),
64 readerlength(QueueLength(attr.GetLength(), attr.GetMaxThreshold(), attr.GetAlpha(), READ)),
65 writerlength(QueueLength(attr.GetLength(), attr.GetMaxThreshold(), attr.GetAlpha(), WRITE)),
77 pattr.StackSize(1<<16);
85 std::ostringstream oss;
86 oss <<
"RemoteQueue(m:";
100 logger.
Trace(
"Destructed (c: %s)", clockstr.c_str());
166 uint64_t saveclock =
clock + 1;
181 uint64_t saveclock =
clock + 1;
248 unsigned count = packet.
Count();
250 logger.
Warn(
"Enqueue packet too large, silently growing queue. "
251 "Packet size: %u, Freespace: %u, MaxThresh: %u, QueueLength: %u",
255 std::vector<iovec> iovs;
256 for (
unsigned i = 0; i < numchannels; ++i) {
266 unsigned numread = 0;
269 while (numread < numtoread) {
270 unsigned num =
sock.
Readv(&iovs[i], iovs.size() - i);
277 if (numread == numtoread)
break;
278 while (iovs[i].iov_len <= num) {
279 num -= iovs[i].iov_len;
282 iovs[i].iov_len -= num;
283 iovs[i].iov_base = ((
char*)iovs[i].iov_base) + num;
301 if (maxwrite == 0) maxwrite = 1;
302 if (count > maxwrite) { count = maxwrite; }
304 if (count > maxthresh) { count = maxthresh; }
305 if (count > expectedfree) { count = expectedfree; }
306 if (count == 0) {
return; }
308 const unsigned datalength = count * numchannels;
313 std::vector<iovec> iovs;
315 iovs.push_back(header);
316 for (
unsigned i = 0; i < numchannels; ++i) {
320 iov.iov_len = packet.
Count();
441 const unsigned queueLen = packet.
QueueSize();
463 unsigned numread =
sock.
Read(&tag,
sizeof(tag));
464 ASSERT(numread ==
sizeof(tag));
482 tag =
reader->GetPublicTag();
484 tag =
writer->GetPublicTag();
533 ASSERT(
false,
"Unexpected packet");
537 ASSERT(
false,
"Unexpected packet");
542 unsigned numtoread = 0;
544 unsigned numread =
sock.
Recv(ptr, numtoread,
false);
558 for (
unsigned i = 0; i < iovcnt; ++i) { total += iov[i].iov_len; }
559 unsigned numwritten = 0;
560 while (numwritten < total) {
563 if (numwritten == total)
break;
564 while (iov->iov_len <= num) {
571 unsigned amount = num;
572 while (amount < iov->iov_len) {
573 amount +=
sock.
Write(((
char*)iov->iov_base) + amount, iov->iov_len - amount);
577 numwritten += amount - num;
580 ASSERT(total == numwritten);
589 logger.
Debug(
"Forced Shutdown (c: %s)", clockstr.c_str());
596 logger.
Debug(
"Shutdown (c: %s)", clockstr.c_str());
602 shared_ptr<Sync::Future<int> > conn;
640 logger.
Debug(
"Forced Shutdown threw exception (c: %s)", clockstr.c_str());
692 logger.
Error(
"Eof detected but not shutdown! (c: %s)", clockstr.c_str());
694 ASSERT(
false,
"EOF detected but not shutdown! (c: %s)", clockstr.c_str());
749 logger.
Debug(
"Closing the socket (c: %s)", clockstr.c_str());
793 logger.
Debug(
"Closing the socket (c: %s)", clockstr.c_str());
820 std::ostringstream oss;
842 logger.
Error(
"Exception in RemoteQueue rethrowing (c: %s e: %d): %s",
851 unsigned writerlen = unsigned(((
double)length)*alpha);
853 return std::max<unsigned>(length - writerlen, maxthresh);
855 return std::max<unsigned>(writerlen, maxthresh);
868 std::ostringstream oss;
878 oss <<
",readshutdown";
881 oss <<
",writeshutdown";
889 logger.
Error(
"Mode: %s, Readerlength: %u, Writerlength %u, Clock: %s, bytecount: %u",
891 logger.
Error(
"PendingBlock: %s, SentEnd: %s, PendingGrow: %s, PendingD4R: %s, Dead: %s"
892 ", PendingFlush: %s, PendingReset: %s",
895 logger.
Error(
"FileThread id: %llu, Running: %s, ActionThread id: %llu, Running: %s",
unsigned maxwritethreshold
PthreadCondition & Wait(PthreadMutex &mutex)
void ReadBlockPacket(const Packet &packet)
virtual void UnlockedGrow(unsigned queueLen, unsigned maxThresh)
ConnectionServer *const server
void WriteBlockPacket(const Packet &packet)
virtual bool ReadBlocked()
unsigned QueueLength() const
virtual void UnlockedShutdownWriter()
uint32_t QueueSize() const
const char * BoolString(bool tf)
shared_ptr< Node > reader
void DequeuePacket(const Packet &packet)
void UnlockedSignalWriterTagChanged()
virtual void InternalDequeue(unsigned count)
virtual unsigned UnlockedMaxThreshold() const
void ShutdownWrite()
Shutdown the write end of this socket. Any future attempt to write to this socket will fail...
PthreadCondition & Broadcast(void)
virtual void InternalFlush()
virtual void WaitForFreespace()
void SendEndOfWritePacket()
void UnlockedGrow(unsigned queueLen, unsigned maxThresh)
void SetReaderNode(shared_ptr< Node > n)
void InternalDequeue(unsigned count)
unsigned Recv(void *ptr, unsigned len, bool block)
void SendEndOfReadPacket()
Definition of the queue attributes.
uint64_t WriteClock() const
void UpdateClock(const Packet &packet)
void SetupPacket(Packet &packet)
void Trace(const char *fmt,...)
virtual void WaitForData()
void SendPacket(const Packet &packet)
virtual void * InternalGetRawEnqueuePtr(unsigned thresh, unsigned chan)
void SendWriteBlockPacket()
ulong MaxThreshold(void) const
void ShutdownReader()
Called by the QueueReader when no more data will be read.
uint32_t DataLength() const
void Error(const char *fmt,...)
shared_ptr< Node > writer
virtual bool WriteBlocked()
uint32_t MaxThreshold() const
uint64_t ReadClock() const
ulong NumChannels(void) const
virtual void UnlockedShutdownReader()
#define FUNC_TRACE(logger)
auto_ptr< Pthread > fileThread
void EndOfReadPacket(const Packet &packet)
An object to hold references to RemoteQueues so they can continue to work after the node has gone awa...
The exceptions specified for the CPN network.
void FlushPacket(const Packet &packet)
WakeupHandle * GetWakeup()
static int Poll(IteratorRef< FileHandle * > begin, IteratorRef< FileHandle * > end, double timeout)
poll a list of FileHandles for any activity and call the appropriate On method.
void SendReadBlockPacket()
void * ActionThreadEntryPoint()
void HandleError(const ErrnoException &e)
ulong QueueLength(void) const
void Close()
Close the file and reset the internal state.
#define ASSERT_ABORT(exp,...)
void D4RTagPacket(const Packet &packet)
void SetNoDelay(bool nodelay)
void * FileThreadEntryPoint()
shared_ptr< D4R::Node > mocknode
auto_ptr< Pthread > actionThread
unsigned UnlockedNumDequeued() const
const std::string & Name() const
ulong Freespace(void) const
unsigned Writev(const iovec *iov, int iovcnt)
scatter gather io version of Write
PthreadFunctional * CreatePthreadFunctional(T *obj, void *(T::*meth)(void))
shared_ptr< Sync::Future< int > > ConnectReader(Key_t readerkey)
void EndOfWritePacket(const Packet &packet)
void IDWriterPacket(const Packet &packet)
void UnlockedSignalReaderTagChanged()
virtual void InternalEnqueue(unsigned count)
void LogState()
For debug ONLY!
virtual void UnlockedSignalReaderTagChanged()
virtual void UnlockedSignalWriterTagChanged()
void CleanupQueue(Key_t key)
std::string ClockString() const
unsigned UnlockedCount() const
A version of the ThresholdQueue that provides the CPN Queue interface This queue implementation creat...
virtual unsigned UnlockedFreespace() const
RemoteQueue(KernelBase *k, Mode_t mode, ConnectionServer *s, RemoteQueueHolder *h, const SimpleQueueAttr &attr, bool usembs)
An exception indicating that the Kernel has shut down.
void Reset()
Clear all internal state including the file descriptor! WARNING does not close the file! ...
void ResetPacket(const Packet &packet)
void InternalCheckStatus()
std::string GetState()
For debugging.
RemoteQueueHolder *const holder
void IDReaderPacket(const Packet &packet)
uint32_t Requested() const
virtual const void * InternalGetRawDequeuePtr(unsigned thresh, unsigned chan)
PthreadCondition actionCond
bool Readable(bool r)
Set that the file is currently readable or not.
void WriteBytes(const iovec *iov, unsigned iovcnt)
void Warn(const char *fmt,...)
virtual const char * what() const
void ReleaseDecoderBytes(unsigned amount)
void EnqueuePacket(const Packet &packet)
virtual void InternalReset()
unsigned UnlockedQueueLength() const
unsigned UnlockedNumEnqueued() const
virtual bool IsTerminated()=0
virtual void LogState()
For debugging ONLY!! Otherwise non deterministic output.
void SetWriterNode(shared_ptr< Node > n)
unsigned Readv(const iovec *iov, int iovcnt)
scatter gather io version of Read
This is a simplified internal representation of the queue attributes needed to create a queue...
void ShutdownWriter()
Called by the QueueWriter when no more data will be written.
unsigned Read(void *ptr, unsigned len)
Read data from the file descriptor.
void GrowPacket(const Packet &packet)
Generic file handle could be a file, or a socket or a device.
shared_ptr< Sync::Future< int > > ConnectWriter(Key_t writerkey)
bool UnlockedEmpty() const
void * GetDecoderBytes(unsigned &amount)
Automatic locking on the stack.
unsigned Write(const void *ptr, unsigned len)
Write data to the file descriptor.
void InternalEnqueue(unsigned count)
void Debug(const char *fmt,...)