CPN
Computational Process Networks
RDMAQueue.h
Go to the documentation of this file.
1 //=============================================================================
2 // Computational Process Networks class library
3 // Copyright (C) 1997-2006 Gregory E. Allen and The University of Texas
4 //
5 // This library is free software; you can redistribute it and/or modify it
6 // under the terms of the GNU Library General Public License as published
7 // by the Free Software Foundation; either version 2 of the License, or
8 // (at your option) any later version.
9 //
10 // This library is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 // Library General Public License for more details.
14 //
15 // The GNU Public License is available in the file LICENSE, or you
16 // can write to the Free Software Foundation, Inc., 59 Temple Place -
17 // Suite 330, Boston, MA 02111-1307, USA, or you can find it on the
18 // World Wide Web at http://www.fsf.org.
19 //=============================================================================
23 #ifndef CPN_RDMA_QUEUE_H
24 #define CPN_RDMA_QUEUE_H
25 #pragma once
26 #include "common_priv.h"
27 #include "QueueBase.h"
28 #include "RemoteQueueBase.h"
29 #include "MirrorBufferSet.h"
30 #include <exception>
31 
32 #if ENABLE_RDMA
33 #include <infiniband/verbs.h>
34 
35 class Pthread;
36 
37 namespace CPN {
38 
39  namespace Internal {
40 
41  struct QueueModel {
42  QueueModel() : base(0), length(0), head(0), tail(0), stride(0), lkey(0) {}
43  QueueModel(char *b, uint64_t s)
44  : base(b), length(s), head(0), tail(0), stride(0), lkey(0)
45  {}
46  char *GetTail(uint64_t offset = 0) { return &base[(tail+offset)%length]; }
47  char *GetHead(uint64_t offset = 0) { return &base[(head+offset)%length]; }
48  void AdvanceTail(uint64_t count) {
49  tail += count;
50  while (tail > length) {
51  tail -= length;
52  head -= length;
53  }
54  }
55  void AdvanceHead(uint64_t count) { head += count; }
56  uint64_t Count() const { return head - tail; }
57  bool Empty() const { return head == tail; }
58  bool Full() const { return Count() == Size(); }
59  uint64_t Size() const { return length; }
60  uint64_t Freespace() const { return length - Count(); }
61 
62  char *base;
63  uint64_t length, head, tail;
64  uint64_t stride;
65  uint32_t lkey;
66  };
67 
68  enum CommandType {
69  CT_UPDATE,
70  CT_ENQUEUE,
71  CT_DEQUEUE,
72  CT_BLOCK,
73  CT_SHUTDOWN,
74  CT_GROW,
75  CT_D4R,
76  CT_CS_REQ,
77  CT_CS_OK,
78  CT_FLUSH,
79  CT_RESET
80  };
81 
82  struct Command {
83  CommandType type;
84  uint64_t clock, remote_clock;
85  union Data {
86  struct Update {
87  char *base;
88  uint64_t length, head, tail, stride;
89  uint32_t lkey;
90  } update;
91  struct Dequeue {
92  uint64_t amount;
93  } dequeue;
94  struct Enqueue {
95  uint64_t amount;
96  } enqueue;
97  struct Block {
98  uint64_t amount;
99  } block;
100  struct Grow {
101  uint64_t length, threshold;
102  } grow;
103  struct D4R {
104  uint64_t count, key, qsize, qkey;
105  } d4r;
106  struct CriticalSectionInfo {
107  uint64_t timestamp;
108  } cs_info;
109  struct Flush {
110  uint64_t num_enqueued;
111  } flush;
112  } data;
113  };
114 
115  struct CommandQueue {
116  public:
117  CommandQueue(uint64_t size);
118  ~CommandQueue();
119  Command *NewCommand();
120  void Enqueue();
121  Command *GetCommand();
122  void Dequeue();
123 
124  void Reset();
125  uint64_t Count() const { return queue.Count()/sizeof(Command); }
126  bool Empty() const { return queue.Empty(); }
127  bool Full() const { return Freespace() == 0; }
128  uint64_t Size() const { return queue.Size()/sizeof(Command); }
129  uint64_t Freespace() const { return queue.Freespace()/sizeof(Command) - outstanding_commands; }
130  uint64_t Outstanding() const { return outstanding_commands; }
131 
132  char *GetBase() { return queue.base; }
133  void SetKey(uint32_t k) { queue.lkey = k; }
134  uint32_t GetKey() const { return queue.lkey; }
135  private:
136  uint64_t outstanding_commands;
137  QueueModel queue;
138  };
139 
140  enum State {
141  S_INIT, /* Just initialized, cannot communicate yet */
142  S_RUNNING, /* Normal operation */
143  S_FLUSHING, /* Prepair to shut down, flush all data if necessary */
144  S_DIEING, /* All data has been flushed, confirm that the other end knows we are dieing */
145  S_DEAD, /* We have confirmed that the other end knows we are shut down
146  and will not send us any more messages. */
147  S_ERROR /* Something wrong happend, state of RDMAQueue is corrupted. */
148  };
149  }
150 
151  class RDMAQueue : public QueueBase, public RemoteQueueBase {
152  public:
153  enum Mode {
154  READ, WRITE
155  };
156 
157  RDMAQueue(KernelBase *k, Mode m,
158  ConnectionServer *s, RemoteQueueHolder *h, const SimpleQueueAttr &attr);
159 
160  virtual ~RDMAQueue();
161 
162  void Start();
163  void Shutdown();
164  Key_t GetKey() const { return mode == READ ? readerkey : writerkey; }
165  void LogState();
166  protected:
167 
168  virtual void WaitForData();
169  virtual void WaitForFreespace();
170 
171  virtual const void *InternalGetRawDequeuePtr(unsigned thresh, unsigned chan);
172  virtual void InternalDequeue(unsigned count);
173  virtual void *InternalGetRawEnqueuePtr(unsigned thresh, unsigned chan);
174  virtual void InternalEnqueue(unsigned count);
175 
176  virtual void InternalFlush();
177  virtual void InternalReset();
178  virtual unsigned UnlockedNumChannels() const { return num_channels; }
179  virtual unsigned UnlockedCount() const { return local_queue.Count() + remote_queue.Count(); }
180  virtual bool UnlockedEmpty() const { return local_queue.Empty() && remote_queue.Empty(); }
181  virtual unsigned UnlockedFreespace() const { return local_queue.Freespace(); }
182  virtual bool UnlockedFull() const { return local_queue.Full(); }
183  virtual unsigned UnlockedMaxThreshold() const { return max_threshold; }
184  virtual unsigned UnlockedQueueLength() const { return local_queue.Size() + remote_queue.Size(); }
185  virtual unsigned UnlockedEnqueueChannelStride() const;
186  virtual unsigned UnlockedDequeueChannelStride() const;
187  virtual void UnlockedGrow(unsigned queueLen, unsigned maxThresh);
188  virtual void UnlockedShutdownReader();
189  virtual void UnlockedShutdownWriter();
190  virtual unsigned UnlockedNumEnqueued() const { return num_enqueued; }
191  virtual unsigned UnlockedNumDequeued() const { return num_dequeued; }
192 
193  virtual void UnlockedSignalReaderTagChanged();
194  virtual void UnlockedSignalWriterTagChanged();
195 
196  private:
197 
198  struct PortInfo {
199  int lid;
200  int qp_num;
201  };
202 
203  void *EventThread();
204  void PostDequeue(unsigned count);
205  void PostEnqueue(unsigned count);
206  void PostBlock(unsigned count);
207  void PostShutdown();
208  void PostGrow();
209  void PostD4RTag();
210  void PostCSRequest();
211  void PostCSOK();
212  void PostFlush();
213  void PostReset();
214  void CheckRDMAWrite();
215  void CheckDequeue();
216  void PostUpdateQueue();
217  Internal::Command *NewCommand(Internal::CommandType type);
218  void PostCommand(Internal::Command *command);
219  void PostReceives();
220  void HandleWorkCompletion(ibv_wc *wc);
221  void ProcessEvents();
222  void ProcessCommand(Internal::Command *command);
223  void Initialize();
224  void Cleanup();
225  void Connect();
226 
227  void Cork();
228  void Uncork();
229 
230  void EnterCriticalSection();
231  void LeaveCriticalSection();
232 
233  void InternalGrow();
234 
235  bool ActiveTransfers();
236 
237  bool IsDead();
238  bool UnlockedIsDead();
239  bool UnlockedIsNormal();
240 
241  const char* ClockStr();
242  const char* UnlockedClockStr();
243 
244  void ShutdownError();
245 
246  const Mode mode;
247  Internal::State state;
248 
249  auto_ptr<MirrorBufferSet> mbs;
250  auto_ptr<MirrorBufferSet> mbs_old;
251  auto_ptr<Pthread> event_thread;
252 
253  Internal::QueueModel local_queue;
254  Internal::QueueModel local_old_queue;
255  Internal::QueueModel remote_queue;
256 
257  Internal::CommandQueue read_command_queue;
258  Internal::CommandQueue write_command_queue;
259 
260  unsigned max_threshold;
261  unsigned num_channels;
262  uint64_t num_enqueued;
263  uint64_t num_dequeued;
264  double alpha;
265  unsigned max_send_wr;
266  unsigned max_recv_wr;
267  unsigned num_send_wr;
268  unsigned pending_dequeue;
269  unsigned pending_rdma;
270  unsigned confirmed_rdma;
271  unsigned cork;
272  bool queue_updated;
273  bool pending_grow;
274  bool grow_ack;
275  bool pending_flush;
276 
277  uint64_t clock;
278  uint64_t remote_clock;
279  uint64_t old_clock;
280  std::string clock_str;
281 
282  uint64_t local_cs_ts;
283  bool local_want_cs;
284  bool remote_want_cs;
285  bool cs_ok;
286 
287  std::string if_name;
288  int if_port;
289 
290  bool enqueue_use_old;
291  bool dequeue_use_old;
292  uint64_t grow_length;
293  uint64_t grow_thresh;
294 
295  ConnectionServer *const server;
296  RemoteQueueHolder *const holder;
297  shared_ptr<D4R::Node> mocknode;
298 
299  PortInfo local_info;
300  PortInfo remote_info;
301 
302  ibv_context *ctx;
303  ibv_pd *pd;
304  ibv_comp_channel *cc;
305  ibv_cq *cq;
306  ibv_qp *qp;
307  ibv_mr *mbs_mr;
308  ibv_mr *mbs_old_mr;
309  ibv_mr *read_command_queue_mr;
310  ibv_mr *write_command_queue_mr;
311 
312  };
313 }
314 #endif // ENABLE_RDMA
315 
316 #endif
Top Representations of generic queues for the CPN library.
uint64_t Key_t
Definition: common.h:79