CPN
Computational Process Networks
RemoteQueue.cc
Go to the documentation of this file.
1 //=============================================================================
2 // Computational Process Networks class library
3 // Copyright (C) 1997-2006 Gregory E. Allen and The University of Texas
4 //
5 // This library is free software; you can redistribute it and/or modify it
6 // under the terms of the GNU Library General Public License as published
7 // by the Free Software Foundation; either version 2 of the License, or
8 // (at your option) any later version.
9 //
10 // This library is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 // Library General Public License for more details.
14 //
15 // The GNU Public License is available in the file LICENSE, or you
16 // can write to the Free Software Foundation, Inc., 59 Temple Place -
17 // Suite 330, Boston, MA 02111-1307, USA, or you can find it on the
18 // World Wide Web at http://www.fsf.org.
19 //=============================================================================
23 #include "common_priv.h"
24 #include "RemoteQueue.h"
25 #include "RemoteQueueHolder.h"
26 #include <cpn/QueueAttr.h>
27 #include <cpn/bits/KernelBase.h>
28 #include <cpn/Exceptions.h>
29 #include <cpn/utils/AutoLock.h>
30 #include <cpn/utils/AutoUnlock.h>
32 #include "ConnectionServer.h"
35 #include <cpn/d4r/D4RNode.h>
36 #include <errno.h>
37 #include <algorithm>
38 #include <sstream>
39 
40 #if REMOTEQUEUETRACE
41 #define FUNC_TRACE(logger) logger.Trace("%s %s", __PRETTY_FUNCTION__, GetState().c_str())
42 #else
43 #define FUNC_TRACE(logger)
44 #endif
45 
46 
47 namespace CPN {
48 
50  ConnectionServer *s, RemoteQueueHolder *h, const SimpleQueueAttr &attr, bool usembs)
51  : ThresholdQueue(k, attr,
52  QueueLength(attr.GetLength(), attr.GetMaxThreshold(), attr.GetAlpha(), mode_), usembs),
53  pendingAction(false),
54  actionTick(false),
55  mode(mode_),
56  alpha(attr.GetAlpha()),
57  maxwritethreshold(attr.GetMaxWriteThreshold()),
58  server(s),
59  holder(h),
60  mocknode(new D4R::Node(mode_ == READ ? attr.GetWriterNodeKey() : attr.GetReaderNodeKey())),
61  clock(0),
62  readclock(0),
63  writeclock(0),
64  readerlength(QueueLength(attr.GetLength(), attr.GetMaxThreshold(), attr.GetAlpha(), READ)),
65  writerlength(QueueLength(attr.GetLength(), attr.GetMaxThreshold(), attr.GetAlpha(), WRITE)),
66  bytecount(0),
67  pendingBlock(false),
68  sentEnd(false),
69  pendingGrow(false),
70  pendingD4RTag(false),
71  tagUpdated(false),
72  pendingFlush(false),
73  pendingReset(false),
74  dead(false)
75  {
76  PthreadAttr pattr;
77  pattr.StackSize(1<<16); // magic number, the file thread shouldn't use any more than this
78  fileThread = auto_ptr<Pthread>(CreatePthreadFunctional(this, &RemoteQueue::FileThreadEntryPoint, pattr));
80  if (mode == READ) {
82  } else {
84  }
85  std::ostringstream oss;
86  oss << "RemoteQueue(m:";
87  if (mode == READ) { oss << "r"; }
88  else { oss << "w"; }
89  oss << ", r:" << readerkey << ", w:" << writerkey << ")";
90  logger.Name(oss.str());
91  logger.Trace("Constructed");
92  }
93 
95  ASSERT_ABORT(dead, "Shutdown but not dead!?");
96  Signal();
99  std::string clockstr = ClockString();
100  logger.Trace("Destructed (c: %s)", clockstr.c_str());
101  fileThread->Start();
102  fileThread->Join();
103  actionThread->Start();
104  actionThread->Join();
105  fileThread.reset();
106  actionThread.reset();
107  }
108 
110  fileThread->Start();
111  }
112 
114  AutoLock<const QueueBase> al(*this);
116  }
117 
120  dead = true;
122  Signal();
123  }
124 
125  unsigned RemoteQueue::UnlockedCount() const {
126  if (mode == READ) {
127  return queue->Count();
128  } else {
129  return queue->Count() + bytecount;
130  }
131  }
132 
134  if (mode == READ) {
135  return queue->Empty();
136  } else {
137  return queue->Empty() && (bytecount == 0);
138  }
139  }
140 
142  return readerlength + writerlength;
143  }
144 
145  void RemoteQueue::UnlockedGrow(unsigned queueLen, unsigned maxThresh) {
147  if (queueLen <= UnlockedQueueLength() && maxThresh <= UnlockedMaxThreshold()) return;
148  while (pendingGrow && !dead) {
149  Signal();
151  }
152  const unsigned maxthresh = std::max<unsigned>(queue->MaxThreshold(), maxThresh);
153  readerlength = QueueLength(queueLen, maxthresh, alpha, READ);
154  writerlength = QueueLength(queueLen, maxthresh, alpha, WRITE);
155  const unsigned newlen = (mode == WRITE ? writerlength : readerlength);
156  ThresholdQueue::UnlockedGrow(newlen, maxthresh);
157  pendingGrow = true;
158  Signal();
159  }
160 
163  ASSERT(mode == READ);
164  pendingBlock = true;
165  tagUpdated = false;
166  uint64_t saveclock = clock + 1;
167  Signal();
168  while (ReadBlocked() && (!tagUpdated || saveclock > readclock)) {
169  Wait();
170  }
171  if (ReadBlocked() && tagUpdated) {
173  }
174  }
175 
178  ASSERT(mode == WRITE);
179  pendingBlock = true;
180  tagUpdated = false;
181  uint64_t saveclock = clock + 1;
182  Signal();
183  while (WriteBlocked() && (!tagUpdated || saveclock > writeclock)) {
184  Wait();
185  }
186  if (WriteBlocked() && tagUpdated) {
188  }
189  }
190 
191  void RemoteQueue::InternalDequeue(unsigned count) {
192  ASSERT(mode == READ);
195  Signal();
196  }
197 
198  void RemoteQueue::InternalEnqueue(unsigned count) {
199  ASSERT(mode == WRITE);
202  Signal();
203  }
204 
206  ASSERT(mode == WRITE);
208  pendingFlush = true;
209  Signal();
210  }
211 
213  ASSERT(mode == READ);
215  pendingReset = true;
216  Signal();
217  }
218 
221  ASSERT(mode == READ);
222  pendingD4RTag = true;
224  }
225 
228  ASSERT(mode == WRITE);
229  pendingD4RTag = true;
231  }
232 
234  TickClock();
235  if (mode == READ) {
236  packet.ReadClock(clock).WriteClock(writeclock);
237  } else {
238  packet.ReadClock(readclock).WriteClock(clock);
239  }
240  }
241 
242  void RemoteQueue::EnqueuePacket(const Packet &packet) {
243  UpdateClock(packet);
245  ASSERT(mode == READ);
247  const unsigned numchannels = queue->NumChannels();
248  unsigned count = packet.Count();
249  if (count > queue->Freespace() || count > queue->MaxThreshold()) {
250  logger.Warn("Enqueue packet too large, silently growing queue. "
251  "Packet size: %u, Freespace: %u, MaxThresh: %u, QueueLength: %u",
252  count, queue->Freespace(), queue->MaxThreshold(), queue->QueueLength());
253  ThresholdQueue::UnlockedGrow(queue->Count() + count, count);
254  }
255  std::vector<iovec> iovs;
256  for (unsigned i = 0; i < numchannels; ++i) {
257  iovec iov;
258  iov.iov_base = ThresholdQueue::InternalGetRawEnqueuePtr(count, i);
259  ASSERT(iov.iov_base, "Internal throttle failed! (c: <%llu,%llu,%llu>)", clock, readclock, writeclock);
260  iov.iov_len = count;
261  iovs.push_back(iov);
262  }
263  inenqueue = true;
264  {
265  AutoUnlock<QueueBase> aul(*this);
266  unsigned numread = 0;
267  unsigned numtoread = packet.DataLength();
268  unsigned i = 0;
269  while (numread < numtoread) {
270  unsigned num = sock.Readv(&iovs[i], iovs.size() - i);
271  if (num == 0) {
272  if (sock.Eof()) {
273  return;
274  }
275  }
276  numread += num;
277  if (numread == numtoread) break;
278  while (iovs[i].iov_len <= num) {
279  num -= iovs[i].iov_len;
280  ++i;
281  }
282  iovs[i].iov_len -= num;
283  iovs[i].iov_base = ((char*)iovs[i].iov_base) + num;
284  }
285  ASSERT(numread == packet.DataLength());
286  }
287  inenqueue = false;
289  bytecount += count;
290  writerequest = 0;
291  NotifyData();
292  }
293 
295  unsigned count = queue->Count();
296  const unsigned numchannels = queue->NumChannels();
297  const unsigned maxthresh = queue->MaxThreshold();
298  const unsigned expectedfree = std::max(readerlength, maxthresh) - bytecount;
299  if (maxwritethreshold > 0) {
300  unsigned maxwrite = maxwritethreshold/numchannels;
301  if (maxwrite == 0) maxwrite = 1;
302  if (count > maxwrite) { count = maxwrite; }
303  }
304  if (count > maxthresh) { count = maxthresh; }
305  if (count > expectedfree) { count = expectedfree; }
306  if (count == 0) { return; }
308  const unsigned datalength = count * numchannels;
309  Packet packet(datalength, PACKET_ENQUEUE);
310  SetupPacket(packet);
311  packet.Count(count);
312 
313  std::vector<iovec> iovs;
314  iovec header = { &packet.header, sizeof(packet.header) };
315  iovs.push_back(header);
316  for (unsigned i = 0; i < numchannels; ++i) {
317  iovec iov;
318  iov.iov_base = const_cast<void*>(ThresholdQueue::InternalGetRawDequeuePtr(packet.Count(), i));
319  ASSERT(iov.iov_base);
320  iov.iov_len = packet.Count();
321  iovs.push_back(iov);
322  }
323  indequeue = true;
324  {
325  AutoUnlock<QueueBase> aul(*this);
326  WriteBytes(&iovs[0], iovs.size());
327  }
328  indequeue = false;
330 
331  bytecount += count;
333  }
334 
335  void RemoteQueue::DequeuePacket(const Packet &packet) {
336  UpdateClock(packet);
338  ASSERT(mode == WRITE);
340  readrequest = 0;
341  bytecount -= packet.Count();
342  }
343 
346  Packet packet(PACKET_DEQUEUE);
347  SetupPacket(packet);
348  const unsigned count = bytecount - queue->Count();
349  packet.Count(count);
351  bytecount -= count;
352  }
353 
354  void RemoteQueue::ReadBlockPacket(const Packet &packet) {
355  UpdateClock(packet);
357  ASSERT(mode == WRITE);
359  readrequest = packet.Requested();
360  if (readrequest > queue->Count() + bytecount) {
361  if (useD4R) {
362  pendingD4RTag = true;
363  }
364  }
365  }
366 
369  Packet packet(PACKET_READBLOCK);
370  SetupPacket(packet);
371  packet.Requested(readrequest);
373  }
374 
375  void RemoteQueue::WriteBlockPacket(const Packet &packet) {
376  UpdateClock(packet);
378  ASSERT(mode == READ);
380  writerequest = packet.Requested();
382  if (useD4R) {
383  pendingD4RTag = true;
384  }
385  }
386  }
387 
390  Packet packet(PACKET_WRITEBLOCK);
391  SetupPacket(packet);
392  packet.Requested(writerequest);
394  }
395 
396  void RemoteQueue::EndOfWritePacket(const Packet &packet) {
397  UpdateClock(packet);
399  ASSERT(mode == READ);
402  }
403 
406  ASSERT(!sentEnd);
407  Packet packet(PACKET_ENDOFWRITE);
408  SetupPacket(packet);
410  try {
412  } catch (const ErrnoException &e) {
413  logger.Debug("Error trying to close the write end: %s", e.what());
414  }
415  }
416 
417  void RemoteQueue::EndOfReadPacket(const Packet &packet) {
418  UpdateClock(packet);
420  ASSERT(mode == WRITE);
423  }
424 
427  ASSERT(!sentEnd);
428  Packet packet(PACKET_ENDOFREAD);
429  SetupPacket(packet);
431  try {
433  } catch (const ErrnoException &e) {
434  logger.Debug("Error trying to close the read end: %s", e.what());
435  }
436  }
437 
438  void RemoteQueue::GrowPacket(const Packet &packet) {
439  UpdateClock(packet);
441  const unsigned queueLen = packet.QueueSize();
442  const unsigned maxthresh = std::max<unsigned>(queue->MaxThreshold(), packet.MaxThreshold());
443  readerlength = QueueLength(queueLen, maxthresh, alpha, READ);
444  writerlength = QueueLength(queueLen, maxthresh, alpha, WRITE);
445  const unsigned newlen = (mode == WRITE ? writerlength : readerlength);
446  ThresholdQueue::UnlockedGrow(newlen, maxthresh);
447  }
448 
451  Packet packet(PACKET_GROW);
452  SetupPacket(packet);
454  packet.MaxThreshold(queue->MaxThreshold());
456  }
457 
458  void RemoteQueue::D4RTagPacket(const Packet &packet) {
459  UpdateClock(packet);
461  ASSERT(packet.DataLength() == sizeof(D4R::Tag));
462  D4R::Tag tag;
463  unsigned numread = sock.Read(&tag, sizeof(tag));
464  ASSERT(numread == sizeof(tag));
465  tagUpdated = true;
466  Signal();
467  mocknode->SetPublicTag(tag);
468  if (mode == WRITE) {
470  } else {
472  }
473  }
474 
477  Packet packet(PACKET_D4RTAG);
478  SetupPacket(packet);
479  packet.DataLength(sizeof(D4R::Tag));
480  D4R::Tag tag;
481  if (mode == READ) {
482  tag = reader->GetPublicTag();
483  } else {
484  tag = writer->GetPublicTag();
485  }
486  PacketEncoder::SendPacket(packet, &tag);
487  }
488 
489  void RemoteQueue::FlushPacket(const Packet &packet) {
490  UpdateClock(packet);
492  flushed = true;
493  Signal();
494  }
495 
498  Packet packet(PACKET_FLUSH);
499  SetupPacket(packet);
501  pendingFlush = false;
502  }
503 
504  void RemoteQueue::ResetPacket(const Packet &packet) {
505  UpdateClock(packet);
507  if (dequeueUseOld || enqueueUseOld) {
508  dequeueUseOld = false;
509  enqueueUseOld = false;
510  delete oldqueue;
511  oldqueue = 0;
512  }
513  queue->Reset();
514  readrequest = 0;
515  writerequest = 0;
516  enqueuethresh = 0;
517  dequeuethresh = 0;
518  indequeue = false;
519  inenqueue = false;
520  flushed = false;
521  Signal();
522  }
523 
526  Packet packet(PACKET_RESET);
527  SetupPacket(packet);
529  pendingReset = false;
530  }
531 
532  void RemoteQueue::IDReaderPacket(const Packet &packet) {
533  ASSERT(false, "Unexpected packet");
534  }
535 
536  void RemoteQueue::IDWriterPacket(const Packet &packet) {
537  ASSERT(false, "Unexpected packet");
538  }
539 
541  while (sock.Readable()) {
542  unsigned numtoread = 0;
543  void *ptr = PacketDecoder::GetDecoderBytes(numtoread);
544  unsigned numread = sock.Recv(ptr, numtoread, false);
545  if (numread == 0) {
546  if (sock.Eof()) {
547  logger.Debug("Read EOF");
548  }
549  break;
550  } else {
552  }
553  }
554  }
555 
556  void RemoteQueue::WriteBytes(const iovec *iov, unsigned iovcnt) {
557  unsigned total = 0;
558  for (unsigned i = 0; i < iovcnt; ++i) { total += iov[i].iov_len; }
559  unsigned numwritten = 0;
560  while (numwritten < total) {
561  unsigned num = sock.Writev(iov, iovcnt);
562  numwritten += num;
563  if (numwritten == total) break;
564  while (iov->iov_len <= num) {
565  num -= iov->iov_len;
566  ++iov;
567  --iovcnt;
568  }
569  if (num > 0) {
570  // Finish writing this section...
571  unsigned amount = num;
572  while (amount < iov->iov_len) {
573  amount += sock.Write(((char*)iov->iov_base) + amount, iov->iov_len - amount);
574  }
575  ++iov;
576  --iovcnt;
577  numwritten += amount - num;
578  }
579  }
580  ASSERT(total == numwritten);
581  }
582 
584  try {
585  while (true) {
586  try {
587  if (kernel->IsTerminated()) {
588  std::string clockstr = ClockString();
589  logger.Debug("Forced Shutdown (c: %s)", clockstr.c_str());
590  Shutdown();
591  }
592  {
593  AutoLock<QueueBase> al(*this);
594  if (dead) {
595  std::string clockstr = ClockString();
596  logger.Debug("Shutdown (c: %s)", clockstr.c_str());
597  break;
598  }
599  }
600  if (sock.Closed()) {
601  logger.Debug("Connecting");
602  shared_ptr<Sync::Future<int> > conn;
603  if (mode == WRITE) {
604  conn = server->ConnectWriter(GetKey());
605  } else {
606  conn = server->ConnectReader(GetKey());
607  }
608  if (conn) {
609  sock.Reset();
610  sock.FD(conn->Get());
611  }
612  if (sock.Closed()) {
613  logger.Debug("Connection Failed");
614  } else {
615  logger.Debug("Connected");
616  actionThread->Start();
617  sock.SetNoDelay(true);
618  }
619  } else {
620  FileHandle *fds[2];
621  fds[0] = &sock;
622  fds[1] = holder->GetWakeup();
623  FileHandle::Poll(fds, fds+2, -1);
624  {
625  AutoLock<QueueBase> al(*this);
626  Signal();
627  actionTick = false;
628  while (!actionTick && !dead) {
630  }
631  }
632  }
633  } catch (const ErrnoException &e) {
634  AutoLock<QueueBase> al(*this);
635  HandleError(e);
636  }
637  }
638  } catch (const ShutdownException &e) {
639  std::string clockstr = ClockString();
640  logger.Debug("Forced Shutdown threw exception (c: %s)", clockstr.c_str());
641  // Do nothing, this just breaks us out of the loop
642  // Can be thrown from ConnectWriter or ConnectReader
643  } catch (const ErrnoException &e) {
644  logger.Error(e.what());
645  ShutdownReader();
646  ShutdownWriter();
647  Shutdown();
648  } catch (...) {
650  throw;
651  }
653  server->Wakeup();
654  return 0;
655  }
656 
658  pendingAction = true;
660  }
661 
663  AutoLock<QueueBase> al(*this);
664  while (!dead) {
666  actionTick = true;
668  while(!pendingAction && !dead) {
669  Wait();
670  }
671  pendingAction = false;
672  }
673  return 0;
674  }
675 
677  if (sock.Closed()) {
678  return;
679  }
680 
681  bool terminated = kernel->IsTerminated();
682  if (sock.Eof() && !(readshutdown || writeshutdown)) {
683  if (terminated) {
684  if (!sock.Closed()) {
685  sock.Close();
686  }
688  return;
689  }
690  // We /should/ never get here, output copious amounts of internal information.
691  std::string clockstr = ClockString();
692  logger.Error("Eof detected but not shutdown! (c: %s)", clockstr.c_str());
693  LogState();
694  ASSERT(false, "EOF detected but not shutdown! (c: %s)", clockstr.c_str());
695  }
696 
697  try {
698  Read();
699 
700  if (pendingGrow && !sentEnd) {
701  SendGrowPacket();
702  pendingGrow = false;
703  }
704 
705  if (mode == WRITE) {
706  if (!sentEnd) {
707  if (terminated) {
709  }
710  // Write as much as we can
711  while (!queue->Empty() && (bytecount < readerlength) && !dead) {
713  }
714  if (dead) return;
715  if (pendingFlush && queue->Empty()) {
716  SendFlushPacket();
717  }
718  // A pending block is present
719  if (pendingBlock) {
720  // If we have received dequeue packets
721  // sense the block happened don't bother sending anything
722  if (writerequest > queue->Freespace()) {
724  }
725  pendingBlock = false;
726  }
727 
728  if (pendingD4RTag) {
729  if (writer && !writeshutdown) {
731  pendingD4RTag = false;
732  }
733  }
734  }
735  if (writeshutdown) {
736  if (!sentEnd && (queue->Empty() || terminated)) {
738  sentEnd = true;
739  }
740  }
741 
742  if (readshutdown) {
743  if (!sentEnd) {
745  sentEnd = true;
746  }
747  if (logger.LogLevel() <= Logger::DEBUG) {
748  std::string clockstr = ClockString();
749  logger.Debug("Closing the socket (c: %s)", clockstr.c_str());
750  }
751  sock.Close();
752  dead = true;
754  }
755  } else {
756  if (!sentEnd) {
757  if (terminated) {
759  }
760  if (pendingReset) {
761  SendResetPacket();
762  }
763  // If some bytes have been read from the queue
764  if (bytecount > queue->Count()) {
766  }
767 
768  if (pendingBlock) {
769  // May have received enqueue packets...
770  if (readrequest > queue->Count()) {
772  }
773  pendingBlock = false;
774  }
775  if (pendingD4RTag) {
776  if (reader && !readshutdown) {
778  pendingD4RTag = false;
779  }
780  }
781  if (readshutdown) {
783  sentEnd = true;
784  }
785  }
786  if (writeshutdown) {
787  if (!sentEnd) {
789  sentEnd = true;
790  }
791  if (logger.LogLevel() <= Logger::DEBUG) {
792  std::string clockstr = ClockString();
793  logger.Debug("Closing the socket (c: %s)", clockstr.c_str());
794  }
795  sock.Close();
796  dead = true;
798  }
799  }
800  } catch (const ErrnoException &e) {
801  HandleError(e);
802  }
803  }
804 
805  void RemoteQueue::UpdateClock(const Packet &packet) {
806  writeclock = packet.WriteClock();
807  readclock = packet.ReadClock();
808  if (mode == READ) {
809  clock = std::max(writeclock, clock) + 1;
810  } else {
811  clock = std::max(readclock, clock) + 1;
812  }
813  }
814 
816  ++clock;
817  }
818 
819  std::string RemoteQueue::ClockString() const {
820  std::ostringstream oss;
821  oss << "<" << clock << "," << readclock << "," << writeclock << ">";
822  return oss.str();
823  }
824 
826  if (readshutdown || writeshutdown) {
827  Signal();
828  }
829  switch (e.Error()) {
830  case EPIPE:
831  case EBADF:
832  case ECONNRESET:
833  try {
834  sock.Close();
835  } catch (const ErrnoException &e) {}
836  dead = true;
838  break;
839  default:
840  {
841  std::string clockstr = ClockString();
842  logger.Error("Exception in RemoteQueue rethrowing (c: %s e: %d): %s",
843  clockstr.c_str(), e.Error(), e.what());
844  }
845  throw;
846  break;
847  }
848  }
849 
850  unsigned RemoteQueue::QueueLength(unsigned length, unsigned maxthresh, double alpha, Mode_t mode) {
851  unsigned writerlen = unsigned(((double)length)*alpha);
852  if (mode == READ) {
853  return std::max<unsigned>(length - writerlen, maxthresh);
854  } else {
855  return std::max<unsigned>(writerlen, maxthresh);
856  }
857  }
858 
859  const char *BoolString(bool tf) {
860  if (tf) {
861  return "t";
862  } else {
863  return "f";
864  }
865  }
866 
867  std::string RemoteQueue::GetState() {
868  std::ostringstream oss;
869  oss << "s: " << UnlockedQueueLength() << ",mt: " << UnlockedMaxThreshold() << ",c: " << UnlockedCount()
870  << ",f: " << UnlockedFreespace() << ",rr: " << readrequest << ",wr: " << writerequest
871  << ",te: " << UnlockedNumEnqueued() << ",td: " << UnlockedNumDequeued()
872  << ",M: " << (mode == READ ? "r" : "w") << ",rl: " << readerlength
873  << ",wl: " << writerlength << ",c: " << ClockString() << ",bc: "
874  << bytecount << ",pb: " << pendingBlock << ",se: " << sentEnd
875  << ",pg: " << pendingGrow << ",pd4r: " << pendingD4RTag
876  << ",d: " << dead;
877  if (readshutdown) {
878  oss << ",readshutdown";
879  }
880  if (writeshutdown) {
881  oss << ",writeshutdown";
882  }
883  return oss.str();
884  }
885 
888  std::string clockstr = ClockString();
889  logger.Error("Mode: %s, Readerlength: %u, Writerlength %u, Clock: %s, bytecount: %u",
890  mode == READ ? "read" : "write", readerlength, writerlength, clockstr.c_str(), bytecount);
891  logger.Error("PendingBlock: %s, SentEnd: %s, PendingGrow: %s, PendingD4R: %s, Dead: %s"
892  ", PendingFlush: %s, PendingReset: %s",
895  logger.Error("FileThread id: %llu, Running: %s, ActionThread id: %llu, Running: %s",
896  (unsigned long long)((pthread_t)(*fileThread)), BoolString(fileThread->Running()),
897  (unsigned long long)((pthread_t)(*actionThread)), BoolString(actionThread->Running()));
898  if (sock.Closed()) {
899  logger.Error("Socket closed");
900  }
901  }
902 }
903 
void SendD4RTagPacket()
Definition: RemoteQueue.cc:475
unsigned maxwritethreshold
Definition: RemoteQueue.h:150
PthreadCondition & Wait(PthreadMutex &mutex)
void ReadBlockPacket(const Packet &packet)
Definition: RemoteQueue.cc:354
virtual void UnlockedGrow(unsigned queueLen, unsigned maxThresh)
ConnectionServer *const server
Definition: RemoteQueue.h:151
void WriteBlockPacket(const Packet &packet)
Definition: RemoteQueue.cc:375
virtual bool ReadBlocked()
Definition: QueueBase.cc:336
unsigned QueueLength() const
Definition: QueueBase.cc:230
virtual void UnlockedShutdownWriter()
Definition: QueueBase.cc:311
uint32_t QueueSize() const
Definition: PacketHeader.h:126
const char * BoolString(bool tf)
Definition: RemoteQueue.cc:859
shared_ptr< Node > reader
Definition: D4RQueue.h:122
void DequeuePacket(const Packet &packet)
Definition: RemoteQueue.cc:335
unsigned readrequest
Definition: QueueBase.h:279
void UnlockedSignalWriterTagChanged()
Definition: RemoteQueue.cc:226
virtual void InternalDequeue(unsigned count)
virtual unsigned UnlockedMaxThreshold() const
unsigned readerlength
Definition: RemoteQueue.h:159
PacketHeader header
Definition: PacketHeader.h:148
void ShutdownWrite()
Shutdown the write end of this socket. Any future attempt to write to this socket will fail...
void SendEnqueuePacket()
Definition: RemoteQueue.cc:294
PthreadCondition & Broadcast(void)
virtual void InternalFlush()
Definition: QueueBase.cc:265
Logger logger
Definition: QueueBase.h:288
virtual void WaitForFreespace()
Definition: QueueBase.cc:347
void SendEndOfWritePacket()
Definition: RemoteQueue.cc:404
void UnlockedGrow(unsigned queueLen, unsigned maxThresh)
Definition: RemoteQueue.cc:145
void SetReaderNode(shared_ptr< Node > n)
Definition: D4RQueue.cc:59
void InternalDequeue(unsigned count)
Definition: RemoteQueue.cc:191
unsigned Recv(void *ptr, unsigned len, bool block)
uint32_t Count() const
Definition: PacketHeader.h:131
void SendEndOfReadPacket()
Definition: RemoteQueue.cc:425
virtual void Wait()
Definition: QueueBase.h:246
Definition of the queue attributes.
uint64_t WriteClock() const
Definition: PacketHeader.h:133
void UpdateClock(const Packet &packet)
Definition: RemoteQueue.cc:805
void SetupPacket(Packet &packet)
Definition: RemoteQueue.cc:233
void Trace(const char *fmt,...)
Definition: Logger.cc:195
virtual void WaitForData()
Definition: QueueBase.cc:326
void SendPacket(const Packet &packet)
virtual void * InternalGetRawEnqueuePtr(unsigned thresh, unsigned chan)
unsigned writerlength
Definition: RemoteQueue.h:160
void SendWriteBlockPacket()
Definition: RemoteQueue.cc:388
ulong MaxThreshold(void) const
void WaitForFreespace()
Definition: RemoteQueue.cc:176
void ShutdownReader()
Called by the QueueReader when no more data will be read.
Definition: QueueBase.cc:250
uint32_t DataLength() const
Definition: PacketHeader.h:124
void Error(const char *fmt,...)
Definition: Logger.cc:159
unsigned writerequest
Definition: QueueBase.h:280
SocketHandle sock
Definition: RemoteQueue.h:153
ulong Count(void) const
shared_ptr< Node > writer
Definition: D4RQueue.h:123
virtual bool WriteBlocked()
Definition: QueueBase.cc:357
uint32_t MaxThreshold() const
Definition: PacketHeader.h:130
uint64_t ReadClock() const
Definition: PacketHeader.h:132
ulong NumChannels(void) const
virtual void UnlockedShutdownReader()
Definition: QueueBase.cc:255
void SendDequeuePacket()
Definition: RemoteQueue.cc:344
#define FUNC_TRACE(logger)
Definition: RemoteQueue.cc:43
auto_ptr< Pthread > fileThread
Definition: RemoteQueue.h:142
void EndOfReadPacket(const Packet &packet)
Definition: RemoteQueue.cc:417
An object to hold references to RemoteQueues so they can continue to work after the node has gone awa...
The exceptions specified for the CPN network.
void FlushPacket(const Packet &packet)
Definition: RemoteQueue.cc:489
WakeupHandle * GetWakeup()
static int Poll(IteratorRef< FileHandle * > begin, IteratorRef< FileHandle * > end, double timeout)
poll a list of FileHandles for any activity and call the appropriate On method.
Definition: FileHandle.cc:34
bool Closed() const
Definition: FileHandle.h:128
void UnlockedShutdown()
Definition: RemoteQueue.cc:118
void SendReadBlockPacket()
Definition: RemoteQueue.cc:367
void * ActionThreadEntryPoint()
Definition: RemoteQueue.cc:662
void HandleError(const ErrnoException &e)
Definition: RemoteQueue.cc:825
ulong QueueLength(void) const
void Close()
Close the file and reset the internal state.
Definition: FileHandle.cc:146
#define ASSERT_ABORT(exp,...)
void D4RTagPacket(const Packet &packet)
Definition: RemoteQueue.cc:458
void SetNoDelay(bool nodelay)
virtual void Signal()
Definition: QueueBase.h:247
void * FileThreadEntryPoint()
Definition: RemoteQueue.cc:583
shared_ptr< D4R::Node > mocknode
Definition: RemoteQueue.h:154
auto_ptr< Pthread > actionThread
Definition: RemoteQueue.h:143
unsigned UnlockedNumDequeued() const
const std::string & Name() const
Definition: Logger.cc:88
ulong Freespace(void) const
unsigned Writev(const iovec *iov, int iovcnt)
scatter gather io version of Write
Definition: FileHandle.cc:261
PthreadFunctional * CreatePthreadFunctional(T *obj, void *(T::*meth)(void))
unsigned dequeuethresh
Definition: QueueBase.h:282
shared_ptr< Sync::Future< int > > ConnectReader(Key_t readerkey)
const Key_t readerkey
Definition: QueueBase.h:273
void EndOfWritePacket(const Packet &packet)
Definition: RemoteQueue.cc:396
void IDWriterPacket(const Packet &packet)
Definition: RemoteQueue.cc:536
void UnlockedSignalReaderTagChanged()
Definition: RemoteQueue.cc:219
void NotifyData()
Definition: QueueBase.cc:341
void NotifyFreespace()
Definition: QueueBase.cc:362
KernelBase * kernel
Definition: QueueBase.h:286
int LogLevel() const
Definition: Logger.cc:58
virtual void InternalEnqueue(unsigned count)
void LogState()
For debug ONLY!
Definition: RemoteQueue.cc:886
virtual void UnlockedSignalReaderTagChanged()
Definition: D4RQueue.cc:144
virtual void UnlockedSignalWriterTagChanged()
Definition: D4RQueue.cc:155
void CleanupQueue(Key_t key)
std::string ClockString() const
Definition: RemoteQueue.cc:819
unsigned UnlockedCount() const
Definition: RemoteQueue.cc:125
bool writeshutdown
Definition: QueueBase.h:276
A version of the ThresholdQueue that provides the CPN Queue interface This queue implementation creat...
virtual unsigned UnlockedFreespace() const
RemoteQueue(KernelBase *k, Mode_t mode, ConnectionServer *s, RemoteQueueHolder *h, const SimpleQueueAttr &attr, bool usembs)
Definition: RemoteQueue.cc:49
An exception indicating that the Kernel has shut down.
Definition: Exceptions.h:38
void Reset()
Clear all internal state including the file descriptor! WARNING does not close the file! ...
Definition: FileHandle.cc:138
void ResetPacket(const Packet &packet)
Definition: RemoteQueue.cc:504
int FD() const
Definition: FileHandle.h:106
void InternalCheckStatus()
Definition: RemoteQueue.cc:676
std::string GetState()
For debugging.
Definition: RemoteQueue.cc:867
RemoteQueueHolder *const holder
Definition: RemoteQueue.h:152
void SendResetPacket()
Definition: RemoteQueue.cc:524
void IDReaderPacket(const Packet &packet)
Definition: RemoteQueue.cc:532
const Mode_t mode
Definition: RemoteQueue.h:148
uint32_t Requested() const
Definition: PacketHeader.h:129
virtual const void * InternalGetRawDequeuePtr(unsigned thresh, unsigned chan)
PthreadCondition actionCond
Definition: RemoteQueue.h:146
bool Readable(bool r)
Set that the file is currently readable or not.
Definition: FileHandle.h:86
void WriteBytes(const iovec *iov, unsigned iovcnt)
Definition: RemoteQueue.cc:556
void Warn(const char *fmt,...)
Definition: Logger.cc:168
virtual const char * what() const
const double alpha
Definition: RemoteQueue.h:149
void ReleaseDecoderBytes(unsigned amount)
void EnqueuePacket(const Packet &packet)
Definition: RemoteQueue.cc:242
bool readshutdown
Definition: QueueBase.h:275
unsigned enqueuethresh
Definition: QueueBase.h:281
bool Empty(void) const
virtual void InternalReset()
unsigned UnlockedQueueLength() const
Definition: RemoteQueue.cc:141
unsigned UnlockedNumEnqueued() const
virtual bool IsTerminated()=0
Key_t GetKey() const
Definition: RemoteQueue.h:74
virtual void LogState()
For debugging ONLY!! Otherwise non deterministic output.
Definition: QueueBase.cc:400
unsigned bytecount
Definition: RemoteQueue.h:168
void SetWriterNode(shared_ptr< Node > n)
Definition: D4RQueue.cc:65
bool Eof() const
Definition: FileHandle.h:115
uint64_t readclock
Definition: RemoteQueue.h:156
unsigned Readv(const iovec *iov, int iovcnt)
scatter gather io version of Read
Definition: FileHandle.cc:190
This is a simplified internal representation of the queue attributes needed to create a queue...
Definition: QueueAttr.h:237
void ShutdownWriter()
Called by the QueueWriter when no more data will be written.
Definition: QueueBase.cc:260
PthreadMutex lock
Definition: QueueBase.h:289
unsigned Read(void *ptr, unsigned len)
Read data from the file descriptor.
Definition: FileHandle.cc:159
void GrowPacket(const Packet &packet)
Definition: RemoteQueue.cc:438
Generic file handle could be a file, or a socket or a device.
Definition: FileHandle.h:41
const Key_t writerkey
Definition: QueueBase.h:274
#define ASSERT(exp,...)
void SendFlushPacket()
Definition: RemoteQueue.cc:496
shared_ptr< Sync::Future< int > > ConnectWriter(Key_t writerkey)
uint64_t writeclock
Definition: RemoteQueue.h:157
bool UnlockedEmpty() const
Definition: RemoteQueue.cc:133
void * GetDecoderBytes(unsigned &amount)
Automatic locking on the stack.
int Error() const
unsigned Write(const void *ptr, unsigned len)
Write data to the file descriptor.
Definition: FileHandle.cc:225
void InternalEnqueue(unsigned count)
Definition: RemoteQueue.cc:198
void Debug(const char *fmt,...)
Definition: Logger.cc:186