CPN
Computational Process Networks
RDMAQueue.cc
Go to the documentation of this file.
1 //=============================================================================
2 // Computational Process Networks class library
3 // Copyright (C) 1997-2006 Gregory E. Allen and The University of Texas
4 //
5 // This library is free software; you can redistribute it and/or modify it
6 // under the terms of the GNU Library General Public License as published
7 // by the Free Software Foundation; either version 2 of the License, or
8 // (at your option) any later version.
9 //
10 // This library is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 // Library General Public License for more details.
14 //
15 // The GNU Public License is available in the file LICENSE, or you
16 // can write to the Free Software Foundation, Inc., 59 Temple Place -
17 // Suite 330, Boston, MA 02111-1307, USA, or you can find it on the
18 // World Wide Web at http://www.fsf.org.
19 //=============================================================================
24 /*
25  * Some comments about how things work:
26  *
27  * The queue endpoint starts up in the S_INIT state. After the infiniband
28  * connection has completely come up we transition to the S_RUNNING state.
29  * When one of the endpoints shutdown it goes into the S_FLUSHING state if the
30  * other end is not already dead. The flushing state flushes all communication
31  * between the two endpoints and then goes through the S_DIEING state then
32  * finally ends in the S_DEAD state when we know that the other end will not
33  * send any more command.
34  *
35  * Note S_FLUSHING has nothing to do with the flush and reset functionality
36  * of the queue. S_FLUSHING has to do with the proper shutdown of the queue only.
37  *
38  * Comments on growing the queue:
39  *
40  * We cannot just copy the method from the RemoteQueue as the write end actually
41  * has address information about the read ends queue buffers. The write end
42  * must flush its RDMA operations before we can fully go over to the other
43  * side. Lets go through a walk through of how things should proceed from both
44  * sides.
45  *
46  * There are problems if both sides try to grow at the time time, so, we have
47  * to do some sort of mutual exclusion. The function EnterCriticalSection and
48  * LeaveCriticalSection implement Ricart and Agrawala's mutual exclusion
49  * algorithm.
50  *
51  * Grow initiated from read side:
52  *
53  * Read side:
54  * * Send grow command to write side.
55  * * Wait for acknowledgement from write side.
56  * * Create new buffer, copy data over, send command to write side for new buffer information
57  * * Lazy cleanup
58  * Write side:
59  * * When receive grow command, finish up pending enqueues but initiate no new ones
60  * * When finished, create new buffer, copy data over, and send acknowledgement.
61  * * Wait for new queue information, when got start up enqueues again.
62  * * Lazy cleanup
63  *
64  * Grow initiated from write side:
65  *
66  * Write side:
67  * * Create new buffer, copy data over, finish up pending enqueues but initiate no new ones
68  * * When finished, Send grow command to read side.
69  * * Wait for acknowledgement with new queue data then proceed with enqueues
70  * * Lazy cleanup
71  * Read side:
72  * * When receive grow command, Create new buffer, copy data over, send acknowledgement with queue data
73  * * Lazy cleanup
74  */
75 #include "common_priv.h"
76 #include "RDMAQueue.h"
77 #include <cpn/QueueAttr.h>
78 #include "RemoteQueueHolder.h"
79 #include <cpn/Exceptions.h>
80 #include <cpn/bits/KernelBase.h>
81 #include "ConnectionServer.h"
84 #include <cpn/io/SocketHandle.h>
85 #include <cpn/d4r/D4RNode.h>
86 #include <stdexcept>
87 #include <vector>
88 #include <map>
89 #include <algorithm>
90 #include <sys/select.h>
91 
92 #if ENABLE_RDMA
93 
94 #if RDMATRACE
95 #define TRACE(fmt, ...) logger.Trace(fmt, ## __VA_ARGS__)
96 #else
97 #define TRACE(fmt, ...)
98 #endif
99 
100 #define IBV_ACCESS_DEFAULT (IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE\
101  | IBV_ACCESS_REMOTE_READ)
102 
103 using std::max;
104 using std::min;
105 
106 namespace CPN {
107  namespace Internal {
108 
109  CommandQueue::CommandQueue(uint64_t size)
110  : outstanding_commands(0)
111  {
112  queue.base = (char*)malloc(size*sizeof(Command));
113  ASSERT(queue.base);
114  queue.length = size*sizeof(Command);
115  }
116 
117  CommandQueue::~CommandQueue() {
118  free(queue.base);
119  queue.base = 0;
120  }
121 
122  Command *CommandQueue::NewCommand() {
123  if (Full()) return 0;
124  Command *ptr = (Command*)queue.GetHead(outstanding_commands*sizeof(Command));
125  memset(ptr, 0, sizeof(Command));
126  outstanding_commands++;
127  return ptr;
128  }
129 
130  void CommandQueue::Enqueue() {
131  queue.AdvanceHead(sizeof(Command));
132  --outstanding_commands;
133  }
134 
135  Command *CommandQueue::GetCommand() {
136  if (Empty()) return 0;
137  return (Command*)queue.GetTail();
138  }
139 
140  void CommandQueue::Dequeue() {
141  queue.AdvanceTail(sizeof(Command));
142  }
143 
144  void CommandQueue::Reset() {
145  outstanding_commands = 0;
146  queue.head = 0;
147  queue.tail = 0;
148  }
149 
150  const char* StateStr(State s) {
151  switch (s) {
152  case S_INIT: return "INIT";
153  case S_RUNNING: return "RUNNING";
154  case S_FLUSHING: return "FLUSHING";
155  case S_DIEING: return "DIEING";
156  case S_DEAD: return "DEAD";
157  case S_ERROR: return "ERROR";
158  }
159  return "UNKNOWN";
160  }
161 
162  const char* CommandTypeStr(CommandType ct) {
163  switch (ct) {
164  case CT_UPDATE: return "UPDATE";
165  case CT_ENQUEUE: return "ENQUEUE";
166  case CT_DEQUEUE: return "DEQUEUE";
167  case CT_BLOCK: return "BLOCK";
168  case CT_SHUTDOWN: return "SHUTDOWN";
169  case CT_GROW: return "GROW";
170  case CT_D4R: return "D4R";
171  case CT_CS_REQ: return "CS_REQ";
172  case CT_CS_OK: return "CS_OK";
173  case CT_FLUSH: return "FLUSH";
174  case CT_RESET: return "RESET";
175  }
176  return "UNKNOWN";
177  }
178  }
179 
180  using namespace Internal;
181 
182  unsigned ComputeQueueLength(unsigned length, unsigned maxthresh, double alpha, RDMAQueue::Mode mode) {
183  unsigned writerlen = unsigned(((double)length)*alpha);
184  if (mode == RDMAQueue::READ) {
185  return std::max<unsigned>(length - writerlen, maxthresh);
186  } else {
187  return std::max<unsigned>(writerlen, maxthresh);
188  }
189  }
190 
191  void ParseQueueHint(const std::string &qhint, std::map<std::string, std::string> &result) {
192  typedef std::string::const_iterator stritr;
193  stritr cur = qhint.begin();
194  stritr end = qhint.end();
195  std::string key;
196  std::string value;
197  bool expect_key = true;
198  while (cur != end) {
199  switch (*cur) {
200  case ' ':
201  case '\t':
202  case '\v':
203  case '\n':
204  case '\r':
205  break;
206  case ':':
207  expect_key = false;
208  break;
209  case ',':
210  result.insert(std::make_pair(key, value));
211  expect_key = true;
212  break;
213  default:
214  if (expect_key) {
215  key.push_back(*cur);
216  } else {
217  value.push_back(*cur);
218  }
219  }
220  ++cur;
221  }
222  }
223 
224  RDMAQueue::RDMAQueue(KernelBase *k, Mode m,
225  ConnectionServer *s, RemoteQueueHolder *h, const SimpleQueueAttr &attr)
226  : QueueBase(k, attr),
227  mode(m),
228  state(S_INIT),
229  read_command_queue(std::max(attr.GetNumChannels()*2, 100u)),
230  write_command_queue(std::max(attr.GetNumChannels()*2, 100u)),
231  max_threshold(attr.GetMaxThreshold()),
232  num_channels(attr.GetNumChannels()),
233  num_enqueued(0),
234  num_dequeued(0),
235  alpha(attr.GetAlpha()),
236  max_send_wr(std::max(attr.GetNumChannels()*2, 100u)),
237  max_recv_wr(std::max(attr.GetNumChannels()*2, 100u)),
238  num_send_wr(0),
239  pending_dequeue(0),
240  pending_rdma(0),
241  confirmed_rdma(0),
242  cork(0),
243  queue_updated(false),
244  pending_grow(false),
245  grow_ack(false),
246  pending_flush(false),
247  clock(0), remote_clock(0), old_clock(0),
248  local_cs_ts(0),
249  local_want_cs(false),
250  remote_want_cs(false),
251  cs_ok(false),
252  if_name(),
253  if_port(0),
254  enqueue_use_old(false),
255  dequeue_use_old(false),
256  grow_length(0), grow_thresh(0),
257  server(s),
258  holder(h),
259  mocknode(new D4R::Node(m == READ ? attr.GetWriterNodeKey() : attr.GetReaderNodeKey())),
260  ctx(0), pd(0), cc(0), cq(0), qp(0), mbs_mr(0), mbs_old_mr(0),
261  read_command_queue_mr(0), write_command_queue_mr(0)
262  {
263  if (attr.GetHint().size() > 4) {
264  std::map<std::string, std::string> options;
265  ParseQueueHint(attr.GetHint().substr(4), options);
266  if (options.find("if_name") != options.end()) {
267  if_name = options["if_name"];
268  }
269  if (options.find("if_port") != options.end()) {
270  std::istringstream iss(options["if_port"]);
271  iss >> if_port;
272  if (!iss) {
273  if_port = 0;
274  }
275  }
276  }
277 
278  std::ostringstream oss;
279  oss << "RDMAQueue(m:";
280  if (mode == READ) { oss << "r"; }
281  else { oss << "w"; }
282  oss << ", r:" << readerkey << ", w:" << writerkey << ")";
283  logger.Name(oss.str());
284  if (mode == READ) {
285  SetWriterNode(mocknode);
286  } else {
287  SetReaderNode(mocknode);
288  }
289  local_queue.length = ComputeQueueLength(attr.GetLength(), max_threshold, alpha, mode);
290  remote_queue.length = 0;
291  mbs.reset(new MirrorBufferSet(local_queue.length, max_threshold, num_channels));
292  local_queue.length = mbs->BufferSize();
293  local_queue.base = (char*)(void*)*mbs;
294  local_queue.stride = mbs->BufferSize() + mbs->MirrorSize();
295  max_threshold = std::min(mbs->MirrorSize() + 1, local_queue.length);
296  event_thread.reset(CreatePthreadFunctional(this, &RDMAQueue::EventThread));
297  try {
298  Initialize();
299  mbs_mr = ibv_reg_mr(pd, local_queue.base, local_queue.stride*num_channels,
300  IBV_ACCESS_DEFAULT);
301  ASSERT(mbs_mr);
302  local_queue.lkey = mbs_mr->lkey;
303  logger.Trace("mbs constructed: base: %p, length: %llu, lkey: %llu",
304  local_queue.base, (unsigned long long)local_queue.length,
305  (unsigned long long)local_queue.lkey);
306 
307  read_command_queue_mr = ibv_reg_mr(pd, read_command_queue.GetBase(),
308  read_command_queue.Size()*sizeof(Command), IBV_ACCESS_LOCAL_WRITE);
309  ASSERT(read_command_queue_mr);
310  read_command_queue.SetKey(read_command_queue_mr->lkey);
311  write_command_queue_mr = ibv_reg_mr(pd, write_command_queue.GetBase(),
312  write_command_queue.Size()*sizeof(Command), 0);
313  ASSERT(write_command_queue_mr);
314  write_command_queue.SetKey(write_command_queue_mr->lkey);
315  } catch (...) {
316  Cleanup();
317  throw;
318  }
319  logger.Trace("Constructed");
320  }
321 
322  RDMAQueue::~RDMAQueue() {
323  Shutdown();
324  event_thread->Join();
325  logger.Trace("%s %s Destructed", StateStr(state), UnlockedClockStr());
326  Cleanup();
327  }
328 
329  void RDMAQueue::Start() {
330  event_thread->Start();
331  }
332 
333  void RDMAQueue::Shutdown() {
334  AutoLock<QueueBase> al(*this);
335  logger.Trace("%s %s Shutdown", StateStr(state), UnlockedClockStr());
336  if (UnlockedIsDead()) return;
337  if (mode == WRITE) {
338  UnlockedShutdownWriter();
339  } else {
340  UnlockedShutdownReader();
341  }
342  }
343 
344  void RDMAQueue::LogState() {
346  logger.Error("State: %s, Clock: %s", StateStr(state), UnlockedClockStr());
347  logger.Error("local_queue: size: %llu, count: %llu, remote_queue: size: %llu, count: %llu, lstride: %llu",
348  (unsigned long long)local_queue.Size(),
349  (unsigned long long)local_queue.Count(),
350  (unsigned long long)remote_queue.Size(),
351  (unsigned long long)remote_queue.Count(),
352  (unsigned long long)local_queue.stride
353  );
354  logger.Error("num_send_wr: %u, pending_rdma: %u, confirmed_rdma: %u",
355  num_send_wr, pending_rdma, confirmed_rdma);
356  logger.Error("cork: %u, queue_updated: %d, pending_grow: %d, grow_ack: %d"
357  ", pending_flush: %d",
358  cork, (int)queue_updated, (int)pending_grow, (int)grow_ack, (int)pending_flush);
359  logger.Error("local_cs_ts: %llu, local_want_cs: %d, remote_want_cs: %d, cs_ok: %d",
360  (unsigned long long)local_cs_ts, (int)local_want_cs, (int)remote_want_cs,
361  (int)cs_ok);
362  }
363 
364  void RDMAQueue::WaitForData() {
365  ASSERT(mode == READ);
366  ASSERT(!writeshutdown);
367  if (state == S_INIT) {
368  while (state == S_INIT && !readshutdown && !writeshutdown && ReadBlocked()) { Wait(); }
369  return;
370  }
371  try {
372  while (ActiveTransfers() && ReadBlocked()) { Wait(); }
373  if (!ReadBlocked()) return;
374  PostBlock(readrequest);
375  uint64_t sync_clock = clock;
376  while ((ActiveTransfers() || sync_clock > old_clock) && WriteBlocked()) { Wait(); }
377  if (ReadBlocked()) { QueueBase::WaitForData(); }
378  } catch (const BrokenQueueException &e) {}
379  }
380 
381  void RDMAQueue::WaitForFreespace() {
382  ASSERT(mode == WRITE);
383  ASSERT(!readshutdown);
384  if (state == S_INIT) {
385  while (state == S_INIT && !readshutdown && !writeshutdown && WriteBlocked()) { Wait(); }
386  }
387  try {
388  CheckRDMAWrite();
389  while (ActiveTransfers() && WriteBlocked()) { Wait(); }
390  if (!WriteBlocked()) return;
391  PostBlock(writerequest);
392  uint64_t sync_clock = clock;
393  while ((ActiveTransfers() || sync_clock > old_clock) && WriteBlocked()) { Wait(); }
394  if (WriteBlocked()) { QueueBase::WaitForFreespace(); }
395  } catch (const BrokenQueueException &e) {}
396  }
397 
398  const void *RDMAQueue::InternalGetRawDequeuePtr(unsigned thresh, unsigned chan) {
399  ASSERT(mode == READ);
400  ASSERT(chan < num_channels);
401  if (dequeue_use_old) {
402  ASSERT(indequeue);
403  return (const void*)(local_old_queue.GetTail() + chan*UnlockedDequeueChannelStride());
404  } else {
405  if (thresh > max_threshold) return 0;
406  if (thresh > local_queue.Count()) return 0;
407  return (const void*)(local_queue.GetTail() + chan*UnlockedDequeueChannelStride());
408  }
409  }
410 
411  void RDMAQueue::InternalDequeue(unsigned count) {
412  ASSERT(mode == READ);
413  ASSERT(count <= local_queue.Count());
414  if (dequeue_use_old) {
415  dequeue_use_old = false;
416  ibv_dereg_mr(mbs_old_mr);
417  mbs_old_mr = 0;
418  mbs_old.reset();
419  }
420  local_queue.AdvanceTail(count);
421  pending_dequeue += count;
422  CheckDequeue();
423  num_dequeued += count;
424  }
425 
426  void *RDMAQueue::InternalGetRawEnqueuePtr(unsigned thresh, unsigned chan) {
427  ASSERT(mode == WRITE);
428  ASSERT(chan < num_channels);
429  if (enqueue_use_old) {
430  ASSERT(inenqueue);
431  return (void*)(local_old_queue.GetHead() + chan*UnlockedEnqueueChannelStride());
432  } else {
433  if (thresh > max_threshold) return 0;
434  if (thresh > local_queue.Freespace()) return 0;
435  return (void*)(local_queue.GetHead() + chan*UnlockedEnqueueChannelStride());
436  }
437  }
438 
439  void RDMAQueue::InternalEnqueue(unsigned count) {
440  ASSERT(mode == WRITE);
441  ASSERT(count <= local_queue.Freespace());
442  if (enqueue_use_old) {
443  for (unsigned chan = 0; chan < num_channels; ++chan) {
444  memcpy((local_queue.GetHead() + chan*local_queue.stride),
445  (local_old_queue.GetHead() + chan*local_old_queue.stride),
446  count);
447  }
448  enqueue_use_old = false;
449  ibv_dereg_mr(mbs_old_mr);
450  mbs_old_mr = 0;
451  mbs_old.reset();
452  }
453  local_queue.AdvanceHead(count);
454  CheckRDMAWrite();
455  num_enqueued += count;
456  }
457 
458  void RDMAQueue::InternalFlush() {
459  ASSERT(mode == WRITE);
461  if (state == S_RUNNING && !ActiveTransfers() && local_queue.Empty()) {
462  PostFlush();
463  } else {
464  pending_flush = true;
465  }
466  }
467 
468  void RDMAQueue::InternalReset() {
469  ASSERT(mode == READ);
470  ASSERT(flushed);
471  readrequest = 0;
472  writerequest = 0;
473  enqueuethresh = 0;
474  dequeuethresh = 0;
475  indequeue = false;
476  inenqueue = false;
477  flushed = false;
478  while (state != S_RUNNING && !UnlockedIsDead() && !writeshutdown && !readshutdown) { Wait(); }
479  if (UnlockedIsDead() || writeshutdown || readshutdown) return;
480  ASSERT(num_enqueued >= num_dequeued);
481  while (num_enqueued > num_dequeued) {
482  while (local_queue.Count() == 0 && !(UnlockedIsDead() || writeshutdown || readshutdown)) { Wait(); }
483  if (UnlockedIsDead() || writeshutdown || readshutdown) return;
484  InternalDequeue(local_queue.Count());
485  }
486  PostReset();
487  num_enqueued = 0;
488  num_dequeued = 0;
489  }
490 
491  unsigned RDMAQueue::UnlockedEnqueueChannelStride() const {
492  if (enqueue_use_old) {
493  return local_old_queue.stride;
494  } else {
495  return local_queue.stride;
496  }
497  }
498 
499  unsigned RDMAQueue::UnlockedDequeueChannelStride() const {
500  if (dequeue_use_old) {
501  return local_old_queue.stride;
502  } else {
503  return local_queue.stride;
504  }
505  }
506 
507  void RDMAQueue::UnlockedGrow(unsigned queueLen, unsigned maxThresh) {
508  queueLen = max(queueLen, maxThresh);
509  if (queueLen < UnlockedQueueLength() && maxThresh < UnlockedMaxThreshold()) return;
510  while (state != S_RUNNING && !UnlockedIsDead() && !writeshutdown && !readshutdown) { Wait(); }
511  if (UnlockedIsDead() || writeshutdown || readshutdown) return;
512  EnterCriticalSection();
513  if (queueLen <= UnlockedQueueLength() && maxThresh <= UnlockedMaxThreshold()) {
514  LeaveCriticalSection();
515  return;
516  }
517  try {
518  logger.Trace("Growing %s %s from %llu %llu to %llu %llu", UnlockedClockStr(), StateStr(state),
519  (unsigned long long)UnlockedQueueLength(),
520  (unsigned long long)UnlockedMaxThreshold(),
521  (unsigned long long)queueLen,
522  (unsigned long long)maxThresh
523  );
524  grow_length = max(grow_length, (uint64_t)queueLen);
525  grow_thresh = max(grow_thresh, (uint64_t)maxThresh);
526  Cork();
527  if (mode == WRITE) {
528  while (ActiveTransfers() && UnlockedIsNormal()) { Wait(); }
529  if (!UnlockedIsNormal()) { return; }
530  PostGrow();
531  queue_updated = false;
532  uint64_t sync_clock = clock;
533  InternalGrow();
534  PostUpdateQueue();
535  while (sync_clock > old_clock && UnlockedIsNormal()) {
536  while (!queue_updated && UnlockedIsNormal()) { Wait(); }
537  queue_updated = false;
538  }
539  } else {
540  PostGrow();
541  uint64_t sync_clock = clock;
542  while (sync_clock > old_clock && UnlockedIsNormal()) {
543  queue_updated = false;
544  while (!queue_updated && UnlockedIsNormal()) { Wait(); }
545  }
546  if (!UnlockedIsNormal()) { return; }
547  InternalGrow();
548  PostUpdateQueue();
549  }
550  Uncork();
551  } catch (...) {
552  LeaveCriticalSection();
553  throw;
554  }
555  LeaveCriticalSection();
556  }
557 
558  void RDMAQueue::UnlockedShutdownReader() {
560  if (UnlockedIsNormal()) {
561  PostShutdown();
562  State oldstate = state;
563  if (writeshutdown) {
564  state = S_DEAD;
565  } else {
566  state = S_FLUSHING;
567  }
568  logger.Trace("%s Transition from %s to %s state", UnlockedClockStr(),
569  StateStr(oldstate), StateStr(state));
570  }
571  }
572 
573  void RDMAQueue::UnlockedShutdownWriter() {
575  if (UnlockedIsNormal()) {
576  State oldstate = state;
577  state = S_FLUSHING;
578  logger.Trace("%s Transition from %s to %s state", UnlockedClockStr(),
579  StateStr(oldstate), StateStr(state));
580  if (local_queue.Empty() && !ActiveTransfers()) {
581  oldstate = state;
582  PostShutdown();
583  state = S_DIEING;;
584  logger.Trace("%s Transition from %s to %s state", UnlockedClockStr(),
585  StateStr(oldstate), StateStr(state));
586  }
587  }
588  }
589 
590  void RDMAQueue::UnlockedSignalReaderTagChanged() {
591  ASSERT(mode == READ);
592  if (UnlockedIsNormal()) {
593  PostD4RTag();
594  }
596  }
597 
598  void RDMAQueue::UnlockedSignalWriterTagChanged() {
599  ASSERT(mode == WRITE);
600  if (UnlockedIsNormal()) {
601  PostD4RTag();
602  }
604  }
605 
606  void *RDMAQueue::EventThread() {
607  try {
608  Connect();
609  std::vector<ibv_wc> wcs(max_send_wr + max_recv_wr);
610  int wake_fd = holder->GetWakeup()->FD();
611  while (!IsDead()) {
612  kernel->CheckTerminated();
613  fd_set fdsr;
614  FD_ZERO(&fdsr);
615  int maxfd = max(cc->fd, wake_fd) + 1;
616  FD_SET(cc->fd, &fdsr);
617  FD_SET(wake_fd, &fdsr);
618  select(maxfd, &fdsr, 0, 0, 0);
619  if (FD_ISSET(cc->fd, &fdsr)) {
620  ibv_cq *l_cq;
621  void *context;
622  if (ibv_get_cq_event(cc, &l_cq, &context)) {
623  throw ErrnoException();
624  }
625  ibv_req_notify_cq(cq, 0);
626  while (true) {
627  int num = ibv_poll_cq(cq, wcs.size(), &wcs[0]);
628  for (int i = 0; i < num; ++i) {
629  ibv_wc *wc = &wcs[i];
630  if (wc->status != IBV_WC_SUCCESS) {
631  logger.Error("%s A work request did not complete, aborting!!!\n %s",
632  ClockStr(), ibv_wc_status_str(wc->status));
633  switch (wc->opcode) {
634  case IBV_WC_SEND:
635  if (write_command_queue.GetCommand()) {
636  logger.Error("Error with sending a %s command",
637  CommandTypeStr(write_command_queue.GetCommand()->type));
638  } else {
639  logger.Error("No commands in write command queue?!?!?");
640  }
641  break;
642  case IBV_WC_RDMA_WRITE:
643  logger.Error("Error with RDMA write");
644  break;
645  case IBV_WC_RECV:
646  logger.Error("Error on command receive");
647  break;
648  default:
649  logger.Error("opcode: %d", (int)wc->opcode);
650  break;
651  }
652  ShutdownError();
653  } else {
654  HandleWorkCompletion(wc);
655  }
656  }
657  ProcessEvents();
658  if (num < (int)wcs.size()) break;
659  }
660  ibv_ack_cq_events(l_cq, 1);
661  }
662  }
663  } catch (const ShutdownException &e) {
664  {
665  AutoLock<QueueBase> al(*this);
666  if (!UnlockedIsDead()) { state = S_DEAD; }
667  Signal();
668  }
669  logger.Debug("%s Forced Shutdown threw exception", ClockStr());
670  } catch (const ErrnoException &e) {
671  logger.Error("%s: %s", ClockStr(), e.what());
672  ShutdownError();
673  } catch (...) {
674  ShutdownError();
675  holder->CleanupQueue(GetKey());
676  throw;
677  }
678  holder->CleanupQueue(GetKey());
679  server->Wakeup();
680  return 0;
681  }
682 
683  void RDMAQueue::PostDequeue(unsigned count) {
684  ASSERT(mode == READ);
685  ASSERT(UnlockedIsNormal());
686  Command *command = NewCommand(CT_DEQUEUE);
687  command->data.dequeue.amount = count;
688  PostCommand(command);
689  TRACE("%s Posting dequeue %u", UnlockedClockStr(), count);
690  }
691 
692  void RDMAQueue::PostEnqueue(unsigned count) {
693  ASSERT(mode == WRITE);
694  ASSERT(UnlockedIsNormal() || state == S_FLUSHING);
695  Command *command = NewCommand(CT_ENQUEUE);
696  command->data.enqueue.amount = count;
697  PostCommand(command);
698  TRACE("%s Posting enqueue %u", UnlockedClockStr(), count);
699  }
700 
701  void RDMAQueue::PostBlock(unsigned count) {
702  ASSERT(UnlockedIsNormal());
703  Command *command = NewCommand(CT_BLOCK);
704  command->data.block.amount = count;
705  PostCommand(command);
706  TRACE("%s Posting block %u", UnlockedClockStr(), count);
707  }
708 
709  void RDMAQueue::PostShutdown() {
710  Command *command = NewCommand(CT_SHUTDOWN);
711  PostCommand(command);
712  TRACE("%s Posting shutdown", UnlockedClockStr());
713  }
714 
715  void RDMAQueue::PostGrow() {
716  Command *command = NewCommand(CT_GROW);
717  command->data.grow.length = grow_length;
718  command->data.grow.threshold = grow_thresh;
719  PostCommand(command);
720  TRACE("%s Posted grow len: %u, thresh: %u", UnlockedClockStr(), grow_length, grow_thresh);
721  }
722 
723  void RDMAQueue::PostD4RTag() {
724  ASSERT(UnlockedIsNormal());
725  Command *command = NewCommand(CT_D4R);
726  D4R::Tag tag;
727  if (mode == READ) {
728  tag = reader->GetPublicTag();
729  } else {
730  tag = writer->GetPublicTag();
731  }
732  command->data.d4r.count = tag.Count();
733  command->data.d4r.key = tag.Key();
734  command->data.d4r.qsize = tag.QueueSize();
735  command->data.d4r.qkey = tag.QueueKey();
736  PostCommand(command);
737  TRACE("%s Posting D4R tag count: %llu, key: %llu, qsize: %llu, qkey: %llu",
738  UnlockedClockStr(),
739  (unsigned long long)tag.Count(),
740  (unsigned long long)tag.Key(),
741  (unsigned long long)tag.QueueSize(),
742  (unsigned long long)tag.QueueKey());
743  }
744 
745  void RDMAQueue::PostCSRequest() {
746  Command *command = NewCommand(CT_CS_REQ);
747  command->data.cs_info.timestamp = local_cs_ts;
748  PostCommand(command);
749  TRACE("%s Posting CS_REQ with ts: %llu", UnlockedClockStr(), (unsigned long long)local_cs_ts);
750  }
751 
752  void RDMAQueue::PostCSOK() {
753  Command *command = NewCommand(CT_CS_OK);
754  PostCommand(command);
755  TRACE("%s Posting CS_OK", UnlockedClockStr());
756  }
757 
758  void RDMAQueue::PostFlush() {
759  Command *command = NewCommand(CT_FLUSH);
760  command->data.flush.num_enqueued = num_enqueued;
761  PostCommand(command);
762  TRACE("%s Posting FLUSH", UnlockedClockStr());
763  }
764 
765  void RDMAQueue::PostReset() {
766  Command *command = NewCommand(CT_RESET);
767  PostCommand(command);
768  TRACE("%s Posting RESET", UnlockedClockStr());
769  }
770 
771  void RDMAQueue::CheckRDMAWrite() {
772  ASSERT(mode == WRITE);
773  if (readshutdown) return;
774  if (state != S_RUNNING && state != S_FLUSHING) return;
775  if (cork > 0) return;
776  unsigned old_pending_rdma = pending_rdma;
777  while (true) {
778  if (local_queue.Count() - pending_rdma <= 0) break;
779  if (num_send_wr >= max_send_wr/2) break;
780  unsigned count = std::min(remote_queue.Freespace() - pending_rdma,
781  local_queue.Count() - pending_rdma);
782  count = std::min(count, max_threshold);
783  if (count == 0) return;
784  std::vector<ibv_send_wr> wr(num_channels);
785  memset(&wr[0], 0, num_channels*sizeof(ibv_send_wr));
786  std::vector<ibv_sge> sge(num_channels);
787  memset(&sge[0], 0, num_channels*sizeof(ibv_sge));
788  for (unsigned chan = 0; chan < num_channels; ++chan) {
789  sge[chan].addr = (uint64_t)local_queue.GetTail(pending_rdma)
790  + chan*local_queue.stride;
791  sge[chan].length = count;
792  sge[chan].lkey = local_queue.lkey;
793  wr[chan].wr_id = 0;
794  wr[chan].sg_list = &sge[chan];
795  wr[chan].num_sge = 1;
796  wr[chan].opcode = IBV_WR_RDMA_WRITE;
797  wr[chan].wr.rdma.remote_addr = (uint64_t)remote_queue.GetHead(pending_rdma)
798  + chan*remote_queue.stride;
799  wr[chan].wr.rdma.rkey = remote_queue.lkey;
800  if (chan+1 < num_channels) {
801  wr[chan].next = &wr[chan+1];
802  }
803  }
804  wr[num_channels-1].wr_id = count;
805  ibv_send_wr *bad;
806  if (ibv_post_send(qp, &wr[0], &bad)) {
807  throw ErrnoException();
808  }
809  num_send_wr += num_channels;
810  pending_rdma += count;
811  }
812  if (pending_rdma > old_pending_rdma) {
813  TRACE("%s Posted RDMA write, pending_rdma: %llu, to key: %llu", UnlockedClockStr(),
814  (unsigned long long)pending_rdma, (unsigned long long)remote_queue.lkey);
815  }
816  }
817 
818  void RDMAQueue::CheckDequeue() {
819  if (pending_dequeue > 0 && num_send_wr < max_send_wr && UnlockedIsNormal() && !writeshutdown) {
820  PostDequeue(pending_dequeue);
821  pending_dequeue = 0;
822  }
823  }
824 
825  void RDMAQueue::PostUpdateQueue() {
826  Command *command = NewCommand(CT_UPDATE);
827  command->data.update.base = local_queue.base;
828  command->data.update.length = local_queue.length;
829  command->data.update.head = local_queue.head;
830  command->data.update.tail = local_queue.tail;
831  command->data.update.stride = local_queue.stride;
832  command->data.update.lkey = local_queue.lkey;
833  PostCommand(command);
834  TRACE("%s Posting update queue", UnlockedClockStr());
835  }
836 
837  Command *RDMAQueue::NewCommand(CommandType type) {
838  while (num_send_wr >= max_send_wr && !UnlockedIsDead()) {
839  ASSERT(!pthread_equal(pthread_self(), (pthread_t)*event_thread),
840  "Blocking in event thread");
841  Wait();
842  }
843  if (UnlockedIsDead()) throw BrokenQueueException(GetKey());
844  Command *command = write_command_queue.NewCommand();
845  memset(command, 0, sizeof(Command));
846  command->type = type;
847  clock++;
848  command->clock = clock;
849  command->remote_clock = remote_clock;
850  return command;
851  }
852 
853  void RDMAQueue::PostCommand(Command *command) {
854  if ((mode == READ ? writeshutdown : readshutdown)) {
855  ASSERT(false);
856  return;
857  }
858  ASSERT(num_send_wr < max_send_wr);
859  ibv_send_wr wr, *bad;
860  ibv_sge sge;
861  memset(&sge, 0, sizeof(sge));
862  sge.addr = (uint64_t)(void*)command;
863  sge.length = sizeof(Command);
864  sge.lkey = write_command_queue.GetKey();
865  memset(&wr, 0, sizeof(wr));
866  wr.sg_list = &sge;
867  wr.num_sge = 1;
868  wr.opcode = IBV_WR_SEND;
869  if (ibv_post_send(qp, &wr, &bad)) {
870  throw ErrnoException();
871  }
872  ++num_send_wr;
873  write_command_queue.Enqueue();
874  }
875 
876  void RDMAQueue::PostReceives() {
877  uint64_t addr = 0;
878  int num = read_command_queue.Freespace();
879  if (num <= 0) return;
880  std::vector<ibv_recv_wr> wr(num);
881  memset(&wr[0], 0, num*sizeof(ibv_recv_wr));
882  std::vector<ibv_sge> sge(num);
883  memset(&sge[0], 0, num*sizeof(ibv_sge));
884  for (int i = 0; i < num; ++i) {
885  addr = (uint64_t)(void*)read_command_queue.NewCommand();
886  sge[i].addr = addr;
887  sge[i].length = sizeof(Command);
888  sge[i].lkey = read_command_queue.GetKey();
889  wr[i].wr_id = addr;
890  wr[i].sg_list = &sge[i];
891  wr[i].num_sge = 1;
892  wr[i].next = (i + 1 < num ? &wr[i+1] : 0);
893  }
894  ibv_recv_wr *bad = 0;
895  if (ibv_post_recv(qp, &wr[0], &bad)) {
896  throw ErrnoException();
897  }
898  }
899 
900  void RDMAQueue::HandleWorkCompletion(ibv_wc *wc) {
901  AutoLock<QueueBase> al(*this);
902  uint64_t count;
903  switch (wc->opcode) {
904  case IBV_WC_SEND:
905  write_command_queue.Dequeue();
906  num_send_wr--;
907  Signal();
908  break;
909  case IBV_WC_RDMA_WRITE:
910  ASSERT(mode == WRITE);
911  count = wc->wr_id;
912  local_queue.AdvanceTail(count);
913  remote_queue.AdvanceHead(count);
914  pending_rdma -= count;
915  confirmed_rdma += count;
916  num_send_wr--;
917  Signal();
918  break;
919  case IBV_WC_RECV:
920  read_command_queue.Enqueue();
921  break;
922  default:
923  ASSERT(false, "Unknown work completion type: %d", wc->opcode);
924  break;
925  }
926  }
927 
928  void RDMAQueue::ProcessEvents() {
929  AutoLock<QueueBase> al(*this);
930  while (!read_command_queue.Empty()) {
931  ProcessCommand(read_command_queue.GetCommand());
932  read_command_queue.Dequeue();
933  }
934  PostReceives();
935  if (mode == WRITE && !readshutdown) {
936  CheckRDMAWrite();
937  if (confirmed_rdma > 0 && num_send_wr < max_send_wr) {
938  PostEnqueue(confirmed_rdma);
939  confirmed_rdma = 0;
940  }
941  if (pending_grow && pending_rdma == 0 && num_send_wr < max_send_wr) {
942  InternalGrow();
943  PostUpdateQueue();
944  pending_grow = false;
945  grow_ack = true;
946  }
947  if (pending_flush && !ActiveTransfers() && local_queue.Empty()) {
948  PostFlush();
949  pending_flush = false;
950  }
951  } else if (mode == READ && !writeshutdown) {
952  CheckDequeue();
953  }
954  switch (state) {
955  case S_INIT:
956  break;
957  case S_RUNNING:
958  if (writeshutdown || readshutdown) {
959  logger.Trace("Shutdown? writeshutdown: %d, readshutdown: %d",
960  (int)writeshutdown, (int)readshutdown);
961  if (mode == READ) {
962  if (num_send_wr == max_send_wr) { break; }
963  PostShutdown();
964  }
965  State oldstate = state;
966  state = S_FLUSHING;
967  logger.Trace("%s Transition from %s to %s state", UnlockedClockStr(),
968  StateStr(oldstate), StateStr(state));
969  } else {
970  break;
971  }
972  case S_FLUSHING:
973  if (mode == WRITE) {
974  State oldstate = state;
975  if (readshutdown) {
976  state = S_DEAD;
977  logger.Trace("%s Transition from %s to %s state", UnlockedClockStr(),
978  StateStr(oldstate), StateStr(state));
979  } else if (local_queue.Empty() && !ActiveTransfers()) {
980  PostShutdown();
981  state = S_DIEING;;
982  logger.Trace("%s Transition from %s to %s state", UnlockedClockStr(),
983  StateStr(oldstate), StateStr(state));
984  }
985  } else { // READ
986  if (writeshutdown || !ActiveTransfers()) {
987  State oldstate = state;
988  state = S_DEAD;
989  logger.Trace("%s Transition from %s to %s state", UnlockedClockStr(),
990  StateStr(oldstate), StateStr(state));
991  }
992  }
993  break;
994  case S_DIEING:
995  ASSERT(mode == WRITE);
996  if (!ActiveTransfers()) {
997  State oldstate = state;
998  state = S_DEAD;
999  logger.Trace("%s Transition from %s to %s state", UnlockedClockStr(),
1000  StateStr(oldstate), StateStr(state));
1001  }
1002  break;
1003  case S_DEAD:
1004  break;
1005  case S_ERROR:
1006  break;
1007  }
1008  }
1009 
1010  void RDMAQueue::ProcessCommand(Command *command) {
1011  clock = std::max(clock, command->clock) + 1;
1012  remote_clock = command->clock;
1013  old_clock = command->remote_clock;
1014  State oldstate = state;
1015  switch (command->type) {
1016  case CT_UPDATE:
1017  remote_queue.base = command->data.update.base;
1018  remote_queue.length = command->data.update.length;
1019  if (mode == WRITE) {
1020  remote_queue.head = command->data.update.head;
1021  remote_queue.tail = command->data.update.tail;
1022  }
1023  remote_queue.stride = command->data.update.stride;
1024  remote_queue.lkey = command->data.update.lkey;
1025  queue_updated = true;
1026  logger.Trace("%s Receved update command: base %p, length: %llu, "
1027  "head: %llu, tail: %llu, stride: %llu, lkey: %llu", UnlockedClockStr(),
1028  remote_queue.base, (unsigned long long)remote_queue.length,
1029  (unsigned long long)remote_queue.head,
1030  (unsigned long long)remote_queue.tail,
1031  (unsigned long long)remote_queue.stride,
1032  (unsigned long long)remote_queue.lkey);
1033  if (state == S_INIT) {
1034  state = S_RUNNING;
1035  logger.Trace("%s Transition from %s to %s state", UnlockedClockStr(),
1036  StateStr(oldstate), StateStr(state));
1037  }
1038  if (grow_ack && mode == WRITE) {
1039  Uncork();
1040  grow_ack = false;
1041  }
1042  Signal();
1043  break;
1044  case CT_ENQUEUE:
1045  ASSERT(mode == READ);
1046  TRACE("%s %s Received enqueue command: amount: %llu, free: %llu", StateStr(state),
1047  UnlockedClockStr(), (unsigned long long)command->data.enqueue.amount,
1048  (unsigned long long)local_queue.Freespace());
1049  ASSERT(local_queue.Freespace() >= command->data.enqueue.amount, "Freespace: %llu"
1050  "\n%s\n%llu", (unsigned long long)local_queue.Freespace(), UnlockedClockStr(),
1051  (unsigned long long)GetKey());
1052  local_queue.AdvanceHead(command->data.enqueue.amount);
1053  writerequest = 0;
1054  NotifyData();
1055  break;
1056  case CT_DEQUEUE:
1057  ASSERT(mode == WRITE);
1058  TRACE("%s %s Received dequeue command: amount: %llu count: %llu", StateStr(state),
1059  UnlockedClockStr(), (unsigned long long)command->data.dequeue.amount,
1060  (unsigned long long)remote_queue.Count());
1061  ASSERT(remote_queue.Count() >= command->data.dequeue.amount);
1062  remote_queue.AdvanceTail(command->data.dequeue.amount);
1063  readrequest = 0;
1064  break;
1065  case CT_BLOCK:
1066  TRACE("%s %s Received block command: amount: %llu, count: %llu",
1067  StateStr(state), UnlockedClockStr(),
1068  (unsigned long long)command->data.block.amount,
1069  (unsigned long long)local_queue.Count());
1070  if (mode == READ) {
1071  if (local_queue.Full()) {
1072  writerequest = command->data.block.amount;
1073  }
1074  } else {
1075  if (local_queue.Empty()) {
1076  readrequest = command->data.block.amount;
1077  }
1078  }
1079  if (useD4R && UnlockedIsNormal()) { PostD4RTag(); }
1080  break;
1081  case CT_SHUTDOWN:
1082  logger.Trace("%s %s Received shutdown command", StateStr(state), UnlockedClockStr());
1083  if (mode == READ) {
1085  state = S_DEAD;
1086  } else {
1088  state = S_DEAD;
1089  }
1090  logger.Trace("%s Transition from %s to %s state", UnlockedClockStr(),
1091  StateStr(oldstate), StateStr(state));
1092  break;
1093  case CT_GROW:
1094  logger.Trace("%s %s Received grow command len: %llu thresh: %llu",
1095  StateStr(state), UnlockedClockStr(),
1096  (unsigned long long)command->data.grow.length,
1097  (unsigned long long)command->data.grow.threshold);
1098  grow_length = max(grow_length, command->data.grow.length);
1099  grow_thresh = max(grow_thresh, command->data.grow.threshold);
1100  if (mode == WRITE) {
1101  Cork();
1102  pending_grow = true;
1103  } else {
1104  Cork();
1105  InternalGrow();
1106  PostUpdateQueue();
1107  Uncork();
1108  }
1109  break;
1110  case CT_D4R:
1111  TRACE("%s %s Received D4R tag", StateStr(state), UnlockedClockStr());
1112  {
1113  D4R::Tag tag;
1114  tag.Count(command->data.d4r.count);
1115  tag.Key(command->data.d4r.key);
1116  tag.QueueSize(command->data.d4r.qsize);
1117  tag.QueueKey(command->data.d4r.qkey);
1118  mocknode->SetPublicTag(tag);
1119  }
1120  if (mode == WRITE) {
1122  } else {
1124  }
1125  break;
1126  case CT_CS_REQ:
1127  {
1128  uint64_t timestamp = command->data.cs_info.timestamp;
1129  TRACE("%s %s Receive CS_REQ, ts: %llu", StateStr(state), UnlockedClockStr(),
1130  (unsigned long long)timestamp);
1131  if (
1132  !local_want_cs ||
1133  local_cs_ts > timestamp ||
1134  (local_cs_ts == timestamp && mode == READ)
1135  ) {
1136  PostCSOK();
1137  } else {
1138  remote_want_cs = true;
1139  }
1140  }
1141  break;
1142  case CT_CS_OK:
1143  TRACE("%s %s Receive CS_OK", StateStr(state), UnlockedClockStr());
1144  cs_ok = true;
1145  Signal();
1146  break;
1147  case CT_FLUSH:
1148  TRACE("%s %s Receive FLUSH", StateStr(state), UnlockedClockStr());
1149  num_enqueued = command->data.flush.num_enqueued;
1150  flushed = true;
1151  Signal();
1152  break;
1153  case CT_RESET:
1154  TRACE("%s %s Receive RESET", StateStr(state), UnlockedClockStr());
1155  num_enqueued = 0;
1156  num_dequeued = 0;
1158  break;
1159  default:
1160  ASSERT(false);
1161  }
1162  }
1163 
1164  void RDMAQueue::Initialize() {
1165  int num_devices = 0;
1166  ibv_device **devices = ibv_get_device_list(&num_devices);
1167  ibv_device *device = 0;
1168  if (num_devices < 1) {
1169  throw std::runtime_error("Error getting devices, no devices");
1170  }
1171  for (int i = 0; i < num_devices; ++i) {
1172  if (if_name.empty()) {
1173  device = devices[i];
1174  if_name = ibv_get_device_name(devices[i]);
1175  break;
1176  } else if (if_name == ibv_get_device_name(devices[i])) {
1177  device = devices[i];
1178  break;
1179  }
1180  }
1181  if (!device) {
1182  throw std::runtime_error("Error getting device, no matching device.");
1183  }
1184  ctx = ibv_open_device(device);
1185  if (!ctx) { throw ErrnoException(); }
1186  pd = ibv_alloc_pd(ctx);
1187  if (!pd) { throw ErrnoException(); }
1188  cc = ibv_create_comp_channel(ctx);
1189  if (!cc) { throw ErrnoException(); }
1190  cq = ibv_create_cq(ctx, max_send_wr + max_recv_wr, 0, cc, 0);
1191  if (!cq) { throw ErrnoException(); }
1192  ibv_device_attr device_attr;
1193  memset(&device_attr, 0, sizeof(device_attr));
1194  if (ibv_query_device(ctx, &device_attr)) { throw ErrnoException(); }
1195  ibv_qp_init_attr qp_attr;
1196  memset(&qp_attr, 0, sizeof(qp_attr));
1197  qp_attr.qp_context = 0;
1198  qp_attr.send_cq = cq;
1199  qp_attr.recv_cq = cq;
1200  qp_attr.sq_sig_all = 1;
1201  qp_attr.cap.max_send_wr = max_send_wr;
1202  qp_attr.cap.max_recv_wr = max_recv_wr;
1203  qp_attr.cap.max_send_sge = 1;
1204  qp_attr.cap.max_recv_sge = 1;
1205  qp_attr.qp_type = IBV_QPT_RC;
1206  qp = ibv_create_qp(pd, &qp_attr);
1207  if (!qp) { throw ErrnoException(); }
1208  local_info.qp_num = qp->qp_num;
1209  ibv_port_attr port_attr;
1210  memset(&port_attr, 0, sizeof(port_attr));
1211  if (if_port <= 0) {
1212  int i;
1213  for (i = 1; i <= device_attr.phys_port_cnt; ++i) {
1214  if (ibv_query_port(ctx, i, &port_attr)) {
1215  throw ErrnoException();
1216  }
1217  if (port_attr.state == IBV_PORT_ACTIVE) {
1218  if_port = i;
1219  local_info.lid = port_attr.lid;
1220  break;
1221  }
1222  }
1223  if (i > device_attr.phys_port_cnt) {
1224  throw std::runtime_error("Unable to find an active port");
1225  }
1226  } else {
1227  if (ibv_query_port(ctx, if_port, &port_attr)) {
1228  throw ErrnoException();
1229  }
1230  if (port_attr.state == IBV_PORT_ACTIVE) {
1231  local_info.lid = port_attr.lid;
1232  } else {
1233  throw std::runtime_error("Port is not active");
1234  }
1235  }
1236  }
1237 
1238  void RDMAQueue::Cleanup() {
1239  if (write_command_queue_mr) {
1240  ibv_dereg_mr(write_command_queue_mr);
1241  write_command_queue_mr = 0;
1242  }
1243  if (read_command_queue_mr) {
1244  ibv_dereg_mr(read_command_queue_mr);
1245  read_command_queue_mr = 0;
1246  }
1247  if (mbs_mr) {
1248  ibv_dereg_mr(mbs_mr);
1249  mbs_mr = 0;
1250  }
1251  if (mbs_old_mr) {
1252  ibv_dereg_mr(mbs_old_mr);
1253  mbs_old_mr = 0;
1254  }
1255  if (qp) {
1256  ibv_destroy_qp(qp);
1257  qp = 0;
1258  }
1259  if (cq) {
1260  ibv_destroy_cq(cq);
1261  cq = 0;
1262  }
1263  if (cc) {
1264  ibv_destroy_comp_channel(cc);
1265  cc = 0;
1266  }
1267  if (pd) {
1268  ibv_dealloc_pd(pd);
1269  pd = 0;
1270  }
1271  if (ctx) {
1272  ibv_close_device(ctx);
1273  ctx = 0;
1274  }
1275  }
1276 
1277  static void ExchangeData(SocketHandle &sock, char *src, char *dst, unsigned len) {
1278  unsigned num_written = 0;
1279  while (num_written < len) {
1280  num_written += sock.Write(src + num_written, len - num_written);
1281  }
1282  unsigned num_read = 0;
1283  while (num_read < len) {
1284  unsigned nr = sock.Read(dst + num_read, len - num_read);
1285  if (nr == 0 && sock.Eof()) {
1286  throw std::runtime_error("Unexpected EOF encountered on socket.");
1287  }
1288  num_read += nr;
1289  }
1290  }
1291 
1292  void RDMAQueue::Connect() {
1293  SocketHandle sock;
1294  while (sock.Closed()) {
1295  kernel->CheckTerminated();
1296  logger.Debug("Connecting Socket");
1297  shared_ptr<Sync::Future<int> > conn;
1298  if (mode == WRITE) {
1299  conn = server->ConnectWriter(GetKey());
1300  } else {
1301  conn = server->ConnectReader(GetKey());
1302  }
1303  if (conn) {
1304  sock.Reset();
1305  sock.FD(conn->Get());
1306  }
1307  }
1308  logger.Debug("Socket Connected, exchanging data (lid: %d, qp_num: %d",
1309  local_info.lid, local_info.qp_num);
1310  ExchangeData(sock, (char*)&local_info, (char*)&remote_info, sizeof(PortInfo));
1311  sock.Close();
1312  logger.Debug("Got data (lid: %d, qp_num: %d) connecting the qp", remote_info.lid, remote_info.qp_num);
1313 
1314  AutoLock<QueueBase> al(*this);
1315  ibv_qp_attr attr;
1316  memset(&attr, 0, sizeof(attr));
1317  attr.qp_state = IBV_QPS_INIT;
1318  attr.pkey_index = 0;
1319  attr.port_num = if_port;
1320  attr.qp_access_flags = IBV_ACCESS_DEFAULT;
1321  if (ibv_modify_qp(qp, &attr,
1322  IBV_QP_STATE | IBV_QP_ACCESS_FLAGS |
1323  IBV_QP_PORT | IBV_QP_PKEY_INDEX))
1324  {
1325  throw ErrnoException();
1326  }
1327  PostReceives();
1328 
1329  logger.Debug("QP now in INIT state");
1330 
1331  memset(&attr, 0, sizeof(attr));
1332  attr.qp_state = IBV_QPS_RTR;
1333  attr.path_mtu = IBV_MTU_512;
1334 
1335  attr.ah_attr.is_global = 0;
1336  attr.ah_attr.dlid = remote_info.lid;
1337  attr.ah_attr.sl = 0;
1338  attr.ah_attr.src_path_bits = 0;
1339  attr.ah_attr.port_num = if_port;
1340 
1341  attr.dest_qp_num = remote_info.qp_num;
1342  attr.rq_psn = 0;
1343  attr.max_dest_rd_atomic = 1;
1344  attr.min_rnr_timer = 12; // Recommended value from documentation
1345 
1346  if (ibv_modify_qp(qp, &attr,
1347  IBV_QP_STATE | IBV_QP_PATH_MTU |
1348  IBV_QP_AV | IBV_QP_DEST_QPN | IBV_QP_RQ_PSN |
1349  IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER
1350  ))
1351  {
1352  throw ErrnoException();
1353  }
1354 
1355  logger.Debug("QP now in RTR state");
1356 
1357  memset(&attr, 0, sizeof(attr));
1358  attr.qp_state = IBV_QPS_RTS;
1359  attr.timeout = 14; // Recommended from docs
1360  attr.retry_cnt = 7; // Recommended from docs
1361  attr.rnr_retry = 7; // Recommended from docs
1362  attr.sq_psn = 0;
1363  attr.max_rd_atomic = 1;
1364  if (ibv_modify_qp(qp, &attr,
1365  IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT |
1366  IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC
1367  ))
1368  {
1369  throw ErrnoException();
1370  }
1371 
1372  logger.Debug("QP now in RTS state");
1373 
1374  ibv_req_notify_cq(cq, 0);
1375 
1376  PostUpdateQueue();
1377  logger.Debug("Connection complete");
1378  }
1379 
1380  void RDMAQueue::Cork() {
1381  ++cork;
1382  TRACE("Cork: %u", cork);
1383  }
1384 
1385  void RDMAQueue::Uncork() {
1386  ASSERT(cork > 0);
1387  --cork;
1388  TRACE("Uncork: %u", cork);
1389  if (cork == 0) {
1390  if (mode == WRITE) {
1391  CheckRDMAWrite();
1392  } else {
1393  }
1394  }
1395  Signal();
1396  }
1397 
1398  void RDMAQueue::EnterCriticalSection() {
1399  if (!UnlockedIsNormal()) { throw BrokenQueueException(GetKey()); }
1400  local_want_cs = true;
1401  local_cs_ts = clock;
1402  cs_ok = false;
1403  PostCSRequest();
1404  while ((!cs_ok || local_cs_ts > old_clock) && UnlockedIsNormal()) { Wait(); }
1405  TRACE("%s Entering Critical Section", UnlockedClockStr());
1406  }
1407 
1408  void RDMAQueue::LeaveCriticalSection() {
1409  TRACE("%s Leaving Critical Section", UnlockedClockStr());
1410  if (!UnlockedIsNormal()) { return; }
1411  local_want_cs = false;
1412  if (remote_want_cs) {
1413  PostCSOK();
1414  remote_want_cs = false;
1415  }
1416  }
1417 
1418  void RDMAQueue::InternalGrow() {
1419  logger.Trace("Internal grow, inenqueue: %d, indequeue: %d, grow_length: %u, grow_thresh: %u",
1420  (int)inenqueue, (int)indequeue, (unsigned)grow_length, (unsigned)grow_thresh);
1421  enqueue_use_old = inenqueue;
1422  dequeue_use_old = indequeue;
1423 
1424  // Allocate new buffer
1425  QueueModel new_queue;
1426  new_queue.length = ComputeQueueLength(grow_length, grow_thresh, alpha, mode);
1427  auto_ptr<MirrorBufferSet> new_mbs;
1428  new_mbs.reset(new MirrorBufferSet(new_queue.length, grow_thresh, num_channels));
1429  new_queue.length = new_mbs->BufferSize();
1430  new_queue.base = (char*)(void*)*new_mbs;
1431  new_queue.stride = new_mbs->BufferSize() + new_mbs->MirrorSize();
1432  grow_thresh = new_mbs->MirrorSize();
1433  ibv_mr *new_mbs_mr = ibv_reg_mr(pd, new_queue.base,
1434  (new_queue.length + grow_thresh)*num_channels, IBV_ACCESS_DEFAULT);
1435  new_queue.lkey = new_mbs_mr->lkey;
1436 
1437  // Copy the data over
1438  uint64_t thresh = min((uint64_t)max_threshold, (uint64_t)grow_thresh + 1);
1439  while (new_queue.Count() < local_queue.Count()) {
1440  uint64_t amount = min((uint64_t)thresh, local_queue.Count() - new_queue.Count());
1441  for (unsigned chan = 0; chan < num_channels; ++chan) {
1442  memcpy((new_queue.GetHead() + chan*new_queue.stride),
1443  (local_queue.GetTail(new_queue.Count())
1444  + chan*local_queue.stride),
1445  amount);
1446  }
1447  new_queue.AdvanceHead(amount);
1448  }
1449  max_threshold = std::min(grow_thresh + 1, new_queue.length);
1450 
1451  // Transition over
1452  if ((enqueue_use_old || dequeue_use_old) && !mbs_old.get()) {
1453  logger.Trace("Keeping old queue, old key %llu, new key %llu",
1454  (unsigned long long)local_queue.lkey, (unsigned long long)new_queue.lkey);
1455  mbs_old = mbs;
1456  local_old_queue = local_queue;
1457  mbs_old_mr = mbs_mr;
1458 
1459  mbs = new_mbs;
1460  local_queue = new_queue;
1461  mbs_mr = new_mbs_mr;
1462  } else {
1463  logger.Trace("Replacing old queue, old key %llu, new key %llu",
1464  (unsigned long long)local_queue.lkey, (unsigned long long)new_queue.lkey);
1465  ibv_dereg_mr(mbs_mr);
1466  mbs_mr = new_mbs_mr;
1467  mbs = new_mbs;
1468  local_queue = new_queue;
1469  }
1470  }
1471 
1472  bool RDMAQueue::ActiveTransfers() {
1473  return (num_send_wr != 0 || pending_rdma > 0 || confirmed_rdma > 0);
1474  }
1475 
1476  bool RDMAQueue::IsDead() {
1477  AutoLock<QueueBase> al(*this);
1478  return UnlockedIsDead();
1479  }
1480  bool RDMAQueue::UnlockedIsDead() {
1481  return (state == S_DEAD || state == S_ERROR);
1482  }
1483  bool RDMAQueue::UnlockedIsNormal() {
1484  return (state == S_RUNNING);
1485  }
1486 
1487  const char* RDMAQueue::ClockStr() {
1488  AutoLock<QueueBase> al(*this);
1489  return UnlockedClockStr();
1490  }
1491 
1492  const char* RDMAQueue::UnlockedClockStr() {
1493  std::ostringstream oss;
1494  oss << "<" << clock << ", " << remote_clock << ", " << old_clock << ">";
1495  clock_str = oss.str();
1496  return clock_str.c_str();
1497  }
1498 
1499  void RDMAQueue::ShutdownError() {
1500  AutoLock<QueueBase> al(*this);
1501  readshutdown = true;
1502  writeshutdown = true;
1503  state = S_ERROR;
1504  Signal();
1505  }
1506 
1507 }
1508 #endif // ENABLE_RDMA
1509 
virtual void UnlockedShutdownWriter()
Definition: QueueBase.cc:311
virtual void InternalFlush()
Definition: QueueBase.cc:265
A FileHandle customized with some socket specific functionality and functions.
Definition: SocketHandle.h:34
virtual void WaitForFreespace()
Definition: QueueBase.cc:347
Definition of the queue attributes.
virtual void WaitForData()
Definition: QueueBase.cc:326
uint64_t QueueKey() const
Definition: D4RTag.h:71
shared_ptr< QueueBase > queue
Definition: QueueWriter.h:159
uint64_t Key() const
Definition: D4RTag.h:62
virtual void UnlockedShutdownReader()
Definition: QueueBase.cc:255
An object to hold references to RemoteQueues so they can continue to work after the node has gone awa...
The exceptions specified for the CPN network.
bool Closed() const
Definition: FileHandle.h:128
virtual void InternalReset()
Definition: QueueBase.cc:289
void Close()
Close the file and reset the internal state.
Definition: FileHandle.cc:146
PthreadFunctional * CreatePthreadFunctional(T *obj, void *(T::*meth)(void))
virtual void UnlockedSignalReaderTagChanged()
Definition: D4RQueue.cc:144
virtual void UnlockedSignalWriterTagChanged()
Definition: D4RQueue.cc:155
void Reset()
Clear all internal state including the file descriptor! WARNING does not close the file! ...
Definition: FileHandle.cc:138
int FD() const
Definition: FileHandle.h:106
virtual const char * what() const
uint64_t QueueSize() const
Definition: D4RTag.h:66
virtual void LogState()
For debugging ONLY!! Otherwise non deterministic output.
Definition: QueueBase.cc:400
bool Eof() const
Definition: FileHandle.h:115
unsigned Read(void *ptr, unsigned len)
Read data from the file descriptor.
Definition: FileHandle.cc:159
uint64_t Count() const
Definition: D4RTag.h:58
#define ASSERT(exp,...)
unsigned Write(const void *ptr, unsigned len)
Write data to the file descriptor.
Definition: FileHandle.cc:225