CPN
Computational Process Networks
RemoteQueue.h
Go to the documentation of this file.
1 //=============================================================================
2 // Computational Process Networks class library
3 // Copyright (C) 1997-2006 Gregory E. Allen and The University of Texas
4 //
5 // This library is free software; you can redistribute it and/or modify it
6 // under the terms of the GNU Library General Public License as published
7 // by the Free Software Foundation; either version 2 of the License, or
8 // (at your option) any later version.
9 //
10 // This library is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 // Library General Public License for more details.
14 //
15 // The GNU Public License is available in the file LICENSE, or you
16 // can write to the Free Software Foundation, Inc., 59 Temple Place -
17 // Suite 330, Boston, MA 02111-1307, USA, or you can find it on the
18 // World Wide Web at http://www.fsf.org.
19 //=============================================================================
23 #ifndef CPN_REMOTEQUEUE_H
24 #define CPN_REMOTEQUEUE_H
25 #pragma once
26 #include "common_priv.h"
27 #include "RemoteQueueBase.h"
28 #include "CPNThresholdQueue.h"
29 #include "PacketDecoder.h"
30 #include "PacketEncoder.h"
31 #include <cpn/io/SocketHandle.h>
32 
33 /*
34  * Forward declarations.
35  */
36 class ErrnoException;
37 class Pthread;
38 namespace D4R {
39  class Node;
40 }
41 
42 namespace CPN {
43  class RemoteQueueHolder;
44 
50  : public ThresholdQueue,
51  public RemoteQueueBase,
52  private PacketEncoder,
53  private PacketDecoder
54  {
55  public:
56 
57  enum Mode_t {
60  };
61 
64  bool usembs);
65 
66  ~RemoteQueue();
67 
71  void Start();
72 
73  Mode_t GetMode() const { return mode; }
74  Key_t GetKey() const { return mode == READ ? readerkey : writerkey; }
79  void Shutdown();
80 
81 
83  void LogState();
84  private:
85  void UnlockedShutdown();
86 
87  unsigned UnlockedCount() const;
88  bool UnlockedEmpty() const;
89  unsigned UnlockedQueueLength() const;
90  void UnlockedGrow(unsigned queueLen, unsigned maxThresh);
91 
92  void Signal();
93  void WaitForData();
94  void WaitForFreespace();
95  void InternalDequeue(unsigned count);
96  void InternalEnqueue(unsigned count);
97  void InternalFlush();
98  void InternalReset();
101 
102  void SetupPacket(Packet &packet);
103  void EnqueuePacket(const Packet &packet);
104  void SendEnqueuePacket();
105  void DequeuePacket(const Packet &packet);
106  void SendDequeuePacket();
107  void ReadBlockPacket(const Packet &packet);
108  void SendReadBlockPacket();
109  void WriteBlockPacket(const Packet &packet);
110  void SendWriteBlockPacket();
111  void EndOfWritePacket(const Packet &packet);
112  void SendEndOfWritePacket();
113  void EndOfReadPacket(const Packet &packet);
114  void SendEndOfReadPacket();
115  void GrowPacket(const Packet &packet);
116  void SendGrowPacket();
117  void D4RTagPacket(const Packet &packet);
118  void SendD4RTagPacket();
119  void FlushPacket(const Packet &packet);
120  void SendFlushPacket();
121  void ResetPacket(const Packet &packet);
122  void SendResetPacket();
123  void IDReaderPacket(const Packet &packet);
124  void IDWriterPacket(const Packet &packet);
125 
126  void Read();
127  void WriteBytes(const iovec *iov, unsigned iovcnt);
128 
129  void *FileThreadEntryPoint();
130  void *ActionThreadEntryPoint();
131  void InternalCheckStatus();
132  void UpdateClock(const Packet &packet);
133  void TickClock();
134  std::string ClockString() const;
135 
136  void HandleError(const ErrnoException &e);
137  static unsigned QueueLength(unsigned length, unsigned maxthresh, double alpha, Mode_t mode);
138 
140  std::string GetState();
141 
142  auto_ptr<Pthread> fileThread;
143  auto_ptr<Pthread> actionThread;
147 
148  const Mode_t mode;
149  const double alpha;
154  shared_ptr<D4R::Node> mocknode;
155  uint64_t clock; // Our clock value
156  uint64_t readclock; // Last knowledge of the reader clock
157  uint64_t writeclock; // Last knowledge of the writer clock
158 
159  unsigned readerlength;
160  unsigned writerlength;
161 
168  unsigned bytecount;
169 
171  bool sentEnd;
177 
178  bool dead;
179  };
180 }
181 #endif
void SendD4RTagPacket()
Definition: RemoteQueue.cc:475
unsigned maxwritethreshold
Definition: RemoteQueue.h:150
void ReadBlockPacket(const Packet &packet)
Definition: RemoteQueue.cc:354
ConnectionServer *const server
Definition: RemoteQueue.h:151
void WriteBlockPacket(const Packet &packet)
Definition: RemoteQueue.cc:375
unsigned QueueLength() const
Definition: QueueBase.cc:230
void DequeuePacket(const Packet &packet)
Definition: RemoteQueue.cc:335
void UnlockedSignalWriterTagChanged()
Definition: RemoteQueue.cc:226
unsigned readerlength
Definition: RemoteQueue.h:159
void SendEnqueuePacket()
Definition: RemoteQueue.cc:294
A FileHandle customized with some socket specific functionality and functions.
Definition: SocketHandle.h:34
void SendEndOfWritePacket()
Definition: RemoteQueue.cc:404
void UnlockedGrow(unsigned queueLen, unsigned maxThresh)
Definition: RemoteQueue.cc:145
void InternalDequeue(unsigned count)
Definition: RemoteQueue.cc:191
void SendEndOfReadPacket()
Definition: RemoteQueue.cc:425
void UpdateClock(const Packet &packet)
Definition: RemoteQueue.cc:805
void SetupPacket(Packet &packet)
Definition: RemoteQueue.cc:233
unsigned writerlength
Definition: RemoteQueue.h:160
void SendWriteBlockPacket()
Definition: RemoteQueue.cc:388
void WaitForFreespace()
Definition: RemoteQueue.cc:176
SocketHandle sock
Definition: RemoteQueue.h:153
uint64_t Key_t
Definition: common.h:79
void SendDequeuePacket()
Definition: RemoteQueue.cc:344
auto_ptr< Pthread > fileThread
Definition: RemoteQueue.h:142
void EndOfReadPacket(const Packet &packet)
Definition: RemoteQueue.cc:417
void FlushPacket(const Packet &packet)
Definition: RemoteQueue.cc:489
Mode_t GetMode() const
Definition: RemoteQueue.h:73
void UnlockedShutdown()
Definition: RemoteQueue.cc:118
void SendReadBlockPacket()
Definition: RemoteQueue.cc:367
void * ActionThreadEntryPoint()
Definition: RemoteQueue.cc:662
void HandleError(const ErrnoException &e)
Definition: RemoteQueue.cc:825
void D4RTagPacket(const Packet &packet)
Definition: RemoteQueue.cc:458
void * FileThreadEntryPoint()
Definition: RemoteQueue.cc:583
shared_ptr< D4R::Node > mocknode
Definition: RemoteQueue.h:154
auto_ptr< Pthread > actionThread
Definition: RemoteQueue.h:143
const Key_t readerkey
Definition: QueueBase.h:273
void EndOfWritePacket(const Packet &packet)
Definition: RemoteQueue.cc:396
void IDWriterPacket(const Packet &packet)
Definition: RemoteQueue.cc:536
void UnlockedSignalReaderTagChanged()
Definition: RemoteQueue.cc:219
void LogState()
For debug ONLY!
Definition: RemoteQueue.cc:886
std::string ClockString() const
Definition: RemoteQueue.cc:819
unsigned UnlockedCount() const
Definition: RemoteQueue.cc:125
A version of the ThresholdQueue that provides the CPN Queue interface This queue implementation creat...
RemoteQueue(KernelBase *k, Mode_t mode, ConnectionServer *s, RemoteQueueHolder *h, const SimpleQueueAttr &attr, bool usembs)
Definition: RemoteQueue.cc:49
void ResetPacket(const Packet &packet)
Definition: RemoteQueue.cc:504
Default threshold queue implementation.
void InternalCheckStatus()
Definition: RemoteQueue.cc:676
std::string GetState()
For debugging.
Definition: RemoteQueue.cc:867
RemoteQueueHolder *const holder
Definition: RemoteQueue.h:152
void SendResetPacket()
Definition: RemoteQueue.cc:524
void IDReaderPacket(const Packet &packet)
Definition: RemoteQueue.cc:532
const Mode_t mode
Definition: RemoteQueue.h:148
PthreadCondition actionCond
Definition: RemoteQueue.h:146
void WriteBytes(const iovec *iov, unsigned iovcnt)
Definition: RemoteQueue.cc:556
const double alpha
Definition: RemoteQueue.h:149
void EnqueuePacket(const Packet &packet)
Definition: RemoteQueue.cc:242
unsigned UnlockedQueueLength() const
Definition: RemoteQueue.cc:141
Key_t GetKey() const
Definition: RemoteQueue.h:74
unsigned bytecount
Definition: RemoteQueue.h:168
uint64_t readclock
Definition: RemoteQueue.h:156
This is a simplified internal representation of the queue attributes needed to create a queue...
Definition: QueueAttr.h:237
void GrowPacket(const Packet &packet)
Definition: RemoteQueue.cc:438
const Key_t writerkey
Definition: QueueBase.h:274
void SendFlushPacket()
Definition: RemoteQueue.cc:496
uint64_t writeclock
Definition: RemoteQueue.h:157
bool UnlockedEmpty() const
Definition: RemoteQueue.cc:133
void InternalEnqueue(unsigned count)
Definition: RemoteQueue.cc:198