75 #include "common_priv.h"
90 #include <sys/select.h>
95 #define TRACE(fmt, ...) logger.Trace(fmt, ## __VA_ARGS__)
97 #define TRACE(fmt, ...)
100 #define IBV_ACCESS_DEFAULT (IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE\
101 | IBV_ACCESS_REMOTE_READ)
109 CommandQueue::CommandQueue(uint64_t size)
110 : outstanding_commands(0)
112 queue.base = (
char*)malloc(size*
sizeof(Command));
114 queue.length = size*
sizeof(Command);
117 CommandQueue::~CommandQueue() {
122 Command *CommandQueue::NewCommand() {
123 if (Full())
return 0;
124 Command *ptr = (Command*)queue.GetHead(outstanding_commands*
sizeof(Command));
125 memset(ptr, 0,
sizeof(Command));
126 outstanding_commands++;
130 void CommandQueue::Enqueue() {
131 queue.AdvanceHead(
sizeof(Command));
132 --outstanding_commands;
135 Command *CommandQueue::GetCommand() {
136 if (Empty())
return 0;
137 return (Command*)queue.GetTail();
140 void CommandQueue::Dequeue() {
141 queue.AdvanceTail(
sizeof(Command));
144 void CommandQueue::Reset() {
145 outstanding_commands = 0;
150 const char* StateStr(State s) {
152 case S_INIT:
return "INIT";
153 case S_RUNNING:
return "RUNNING";
154 case S_FLUSHING:
return "FLUSHING";
155 case S_DIEING:
return "DIEING";
156 case S_DEAD:
return "DEAD";
157 case S_ERROR:
return "ERROR";
162 const char* CommandTypeStr(CommandType ct) {
164 case CT_UPDATE:
return "UPDATE";
165 case CT_ENQUEUE:
return "ENQUEUE";
166 case CT_DEQUEUE:
return "DEQUEUE";
167 case CT_BLOCK:
return "BLOCK";
168 case CT_SHUTDOWN:
return "SHUTDOWN";
169 case CT_GROW:
return "GROW";
170 case CT_D4R:
return "D4R";
171 case CT_CS_REQ:
return "CS_REQ";
172 case CT_CS_OK:
return "CS_OK";
173 case CT_FLUSH:
return "FLUSH";
174 case CT_RESET:
return "RESET";
180 using namespace Internal;
182 unsigned ComputeQueueLength(
unsigned length,
unsigned maxthresh,
double alpha, RDMAQueue::Mode mode) {
183 unsigned writerlen = unsigned(((
double)length)*alpha);
184 if (mode == RDMAQueue::READ) {
185 return std::max<unsigned>(length - writerlen, maxthresh);
187 return std::max<unsigned>(writerlen, maxthresh);
191 void ParseQueueHint(
const std::string &qhint, std::map<std::string, std::string> &result) {
192 typedef std::string::const_iterator stritr;
193 stritr cur = qhint.begin();
194 stritr end = qhint.end();
197 bool expect_key =
true;
210 result.insert(std::make_pair(key, value));
217 value.push_back(*cur);
224 RDMAQueue::RDMAQueue(KernelBase *k, Mode m,
225 ConnectionServer *s, RemoteQueueHolder *h,
const SimpleQueueAttr &attr)
226 : QueueBase(k, attr),
229 read_command_queue(std::max(attr.GetNumChannels()*2, 100u)),
230 write_command_queue(std::max(attr.GetNumChannels()*2, 100u)),
231 max_threshold(attr.GetMaxThreshold()),
232 num_channels(attr.GetNumChannels()),
235 alpha(attr.GetAlpha()),
236 max_send_wr(std::max(attr.GetNumChannels()*2, 100u)),
237 max_recv_wr(std::max(attr.GetNumChannels()*2, 100u)),
243 queue_updated(false),
246 pending_flush(false),
247 clock(0), remote_clock(0), old_clock(0),
249 local_want_cs(false),
250 remote_want_cs(false),
254 enqueue_use_old(false),
255 dequeue_use_old(false),
256 grow_length(0), grow_thresh(0),
259 mocknode(new D4R::Node(m == READ ? attr.GetWriterNodeKey() : attr.GetReaderNodeKey())),
260 ctx(0), pd(0), cc(0), cq(0), qp(0), mbs_mr(0), mbs_old_mr(0),
261 read_command_queue_mr(0), write_command_queue_mr(0)
263 if (attr.GetHint().size() > 4) {
264 std::map<std::string, std::string> options;
265 ParseQueueHint(attr.GetHint().substr(4), options);
266 if (options.find(
"if_name") != options.end()) {
267 if_name = options[
"if_name"];
269 if (options.find(
"if_port") != options.end()) {
270 std::istringstream iss(options[
"if_port"]);
278 std::ostringstream oss;
279 oss <<
"RDMAQueue(m:";
280 if (mode == READ) { oss <<
"r"; }
282 oss <<
", r:" << readerkey <<
", w:" << writerkey <<
")";
283 logger.Name(oss.str());
285 SetWriterNode(mocknode);
287 SetReaderNode(mocknode);
289 local_queue.length = ComputeQueueLength(attr.GetLength(), max_threshold, alpha, mode);
290 remote_queue.length = 0;
291 mbs.reset(
new MirrorBufferSet(local_queue.length, max_threshold, num_channels));
292 local_queue.length = mbs->BufferSize();
293 local_queue.base = (
char*)(
void*)*mbs;
294 local_queue.stride = mbs->BufferSize() + mbs->MirrorSize();
295 max_threshold = std::min(mbs->MirrorSize() + 1, local_queue.length);
299 mbs_mr = ibv_reg_mr(pd, local_queue.base, local_queue.stride*num_channels,
302 local_queue.lkey = mbs_mr->lkey;
303 logger.Trace(
"mbs constructed: base: %p, length: %llu, lkey: %llu",
304 local_queue.base, (
unsigned long long)local_queue.length,
305 (
unsigned long long)local_queue.lkey);
307 read_command_queue_mr = ibv_reg_mr(pd, read_command_queue.GetBase(),
308 read_command_queue.Size()*
sizeof(Command), IBV_ACCESS_LOCAL_WRITE);
309 ASSERT(read_command_queue_mr);
310 read_command_queue.SetKey(read_command_queue_mr->lkey);
311 write_command_queue_mr = ibv_reg_mr(pd, write_command_queue.GetBase(),
312 write_command_queue.Size()*
sizeof(Command), 0);
313 ASSERT(write_command_queue_mr);
314 write_command_queue.SetKey(write_command_queue_mr->lkey);
319 logger.Trace(
"Constructed");
322 RDMAQueue::~RDMAQueue() {
324 event_thread->Join();
325 logger.Trace(
"%s %s Destructed", StateStr(state), UnlockedClockStr());
329 void RDMAQueue::Start() {
330 event_thread->Start();
333 void RDMAQueue::Shutdown() {
335 logger.Trace(
"%s %s Shutdown", StateStr(state), UnlockedClockStr());
336 if (UnlockedIsDead())
return;
338 UnlockedShutdownWriter();
340 UnlockedShutdownReader();
344 void RDMAQueue::LogState() {
346 logger.Error(
"State: %s, Clock: %s", StateStr(state), UnlockedClockStr());
347 logger.Error(
"local_queue: size: %llu, count: %llu, remote_queue: size: %llu, count: %llu, lstride: %llu",
348 (
unsigned long long)local_queue.Size(),
349 (
unsigned long long)local_queue.Count(),
350 (
unsigned long long)remote_queue.Size(),
351 (
unsigned long long)remote_queue.Count(),
352 (
unsigned long long)local_queue.stride
354 logger.Error(
"num_send_wr: %u, pending_rdma: %u, confirmed_rdma: %u",
355 num_send_wr, pending_rdma, confirmed_rdma);
356 logger.Error(
"cork: %u, queue_updated: %d, pending_grow: %d, grow_ack: %d"
357 ", pending_flush: %d",
358 cork, (
int)queue_updated, (
int)pending_grow, (
int)grow_ack, (
int)pending_flush);
359 logger.Error(
"local_cs_ts: %llu, local_want_cs: %d, remote_want_cs: %d, cs_ok: %d",
360 (
unsigned long long)local_cs_ts, (
int)local_want_cs, (
int)remote_want_cs,
364 void RDMAQueue::WaitForData() {
367 if (state == S_INIT) {
368 while (state == S_INIT && !readshutdown && !writeshutdown && ReadBlocked()) { Wait(); }
372 while (ActiveTransfers() && ReadBlocked()) { Wait(); }
373 if (!ReadBlocked())
return;
374 PostBlock(readrequest);
375 uint64_t sync_clock = clock;
376 while ((ActiveTransfers() || sync_clock > old_clock) && WriteBlocked()) { Wait(); }
378 }
catch (
const BrokenQueueException &e) {}
381 void RDMAQueue::WaitForFreespace() {
384 if (state == S_INIT) {
385 while (state == S_INIT && !readshutdown && !writeshutdown && WriteBlocked()) { Wait(); }
389 while (ActiveTransfers() && WriteBlocked()) { Wait(); }
390 if (!WriteBlocked())
return;
391 PostBlock(writerequest);
392 uint64_t sync_clock = clock;
393 while ((ActiveTransfers() || sync_clock > old_clock) && WriteBlocked()) { Wait(); }
395 }
catch (
const BrokenQueueException &e) {}
398 const void *RDMAQueue::InternalGetRawDequeuePtr(
unsigned thresh,
unsigned chan) {
400 ASSERT(chan < num_channels);
401 if (dequeue_use_old) {
403 return (
const void*)(local_old_queue.GetTail() + chan*UnlockedDequeueChannelStride());
405 if (thresh > max_threshold)
return 0;
406 if (thresh > local_queue.Count())
return 0;
407 return (
const void*)(local_queue.GetTail() + chan*UnlockedDequeueChannelStride());
411 void RDMAQueue::InternalDequeue(
unsigned count) {
413 ASSERT(count <= local_queue.Count());
414 if (dequeue_use_old) {
415 dequeue_use_old =
false;
416 ibv_dereg_mr(mbs_old_mr);
420 local_queue.AdvanceTail(count);
421 pending_dequeue += count;
423 num_dequeued += count;
426 void *RDMAQueue::InternalGetRawEnqueuePtr(
unsigned thresh,
unsigned chan) {
428 ASSERT(chan < num_channels);
429 if (enqueue_use_old) {
431 return (
void*)(local_old_queue.GetHead() + chan*UnlockedEnqueueChannelStride());
433 if (thresh > max_threshold)
return 0;
434 if (thresh > local_queue.Freespace())
return 0;
435 return (
void*)(local_queue.GetHead() + chan*UnlockedEnqueueChannelStride());
439 void RDMAQueue::InternalEnqueue(
unsigned count) {
441 ASSERT(count <= local_queue.Freespace());
442 if (enqueue_use_old) {
443 for (
unsigned chan = 0; chan < num_channels; ++chan) {
444 memcpy((local_queue.GetHead() + chan*local_queue.stride),
445 (local_old_queue.GetHead() + chan*local_old_queue.stride),
448 enqueue_use_old =
false;
449 ibv_dereg_mr(mbs_old_mr);
453 local_queue.AdvanceHead(count);
455 num_enqueued += count;
458 void RDMAQueue::InternalFlush() {
461 if (state == S_RUNNING && !ActiveTransfers() && local_queue.Empty()) {
464 pending_flush =
true;
468 void RDMAQueue::InternalReset() {
478 while (state != S_RUNNING && !UnlockedIsDead() && !writeshutdown && !readshutdown) { Wait(); }
479 if (UnlockedIsDead() || writeshutdown || readshutdown)
return;
480 ASSERT(num_enqueued >= num_dequeued);
481 while (num_enqueued > num_dequeued) {
482 while (local_queue.Count() == 0 && !(UnlockedIsDead() || writeshutdown || readshutdown)) { Wait(); }
483 if (UnlockedIsDead() || writeshutdown || readshutdown)
return;
484 InternalDequeue(local_queue.Count());
491 unsigned RDMAQueue::UnlockedEnqueueChannelStride()
const {
492 if (enqueue_use_old) {
493 return local_old_queue.stride;
495 return local_queue.stride;
499 unsigned RDMAQueue::UnlockedDequeueChannelStride()
const {
500 if (dequeue_use_old) {
501 return local_old_queue.stride;
503 return local_queue.stride;
507 void RDMAQueue::UnlockedGrow(
unsigned queueLen,
unsigned maxThresh) {
508 queueLen = max(queueLen, maxThresh);
509 if (queueLen < UnlockedQueueLength() && maxThresh < UnlockedMaxThreshold())
return;
510 while (state != S_RUNNING && !UnlockedIsDead() && !writeshutdown && !readshutdown) { Wait(); }
511 if (UnlockedIsDead() || writeshutdown || readshutdown)
return;
512 EnterCriticalSection();
513 if (queueLen <= UnlockedQueueLength() && maxThresh <= UnlockedMaxThreshold()) {
514 LeaveCriticalSection();
518 logger.Trace(
"Growing %s %s from %llu %llu to %llu %llu", UnlockedClockStr(), StateStr(state),
519 (
unsigned long long)UnlockedQueueLength(),
520 (
unsigned long long)UnlockedMaxThreshold(),
521 (
unsigned long long)queueLen,
522 (
unsigned long long)maxThresh
524 grow_length = max(grow_length, (uint64_t)queueLen);
525 grow_thresh = max(grow_thresh, (uint64_t)maxThresh);
528 while (ActiveTransfers() && UnlockedIsNormal()) { Wait(); }
529 if (!UnlockedIsNormal()) {
return; }
531 queue_updated =
false;
532 uint64_t sync_clock = clock;
535 while (sync_clock > old_clock && UnlockedIsNormal()) {
536 while (!queue_updated && UnlockedIsNormal()) { Wait(); }
537 queue_updated =
false;
541 uint64_t sync_clock = clock;
542 while (sync_clock > old_clock && UnlockedIsNormal()) {
543 queue_updated =
false;
544 while (!queue_updated && UnlockedIsNormal()) { Wait(); }
546 if (!UnlockedIsNormal()) {
return; }
552 LeaveCriticalSection();
555 LeaveCriticalSection();
558 void RDMAQueue::UnlockedShutdownReader() {
560 if (UnlockedIsNormal()) {
562 State oldstate = state;
568 logger.Trace(
"%s Transition from %s to %s state", UnlockedClockStr(),
569 StateStr(oldstate), StateStr(state));
573 void RDMAQueue::UnlockedShutdownWriter() {
575 if (UnlockedIsNormal()) {
576 State oldstate = state;
578 logger.Trace(
"%s Transition from %s to %s state", UnlockedClockStr(),
579 StateStr(oldstate), StateStr(state));
580 if (local_queue.Empty() && !ActiveTransfers()) {
584 logger.Trace(
"%s Transition from %s to %s state", UnlockedClockStr(),
585 StateStr(oldstate), StateStr(state));
590 void RDMAQueue::UnlockedSignalReaderTagChanged() {
592 if (UnlockedIsNormal()) {
598 void RDMAQueue::UnlockedSignalWriterTagChanged() {
600 if (UnlockedIsNormal()) {
606 void *RDMAQueue::EventThread() {
609 std::vector<ibv_wc> wcs(max_send_wr + max_recv_wr);
610 int wake_fd = holder->GetWakeup()->FD();
612 kernel->CheckTerminated();
615 int maxfd = max(cc->fd, wake_fd) + 1;
616 FD_SET(cc->fd, &fdsr);
617 FD_SET(wake_fd, &fdsr);
618 select(maxfd, &fdsr, 0, 0, 0);
619 if (FD_ISSET(cc->fd, &fdsr)) {
622 if (ibv_get_cq_event(cc, &l_cq, &context)) {
625 ibv_req_notify_cq(cq, 0);
627 int num = ibv_poll_cq(cq, wcs.size(), &wcs[0]);
628 for (
int i = 0; i < num; ++i) {
629 ibv_wc *wc = &wcs[i];
630 if (wc->status != IBV_WC_SUCCESS) {
631 logger.Error(
"%s A work request did not complete, aborting!!!\n %s",
632 ClockStr(), ibv_wc_status_str(wc->status));
633 switch (wc->opcode) {
635 if (write_command_queue.GetCommand()) {
636 logger.Error(
"Error with sending a %s command",
637 CommandTypeStr(write_command_queue.GetCommand()->type));
639 logger.Error(
"No commands in write command queue?!?!?");
642 case IBV_WC_RDMA_WRITE:
643 logger.Error(
"Error with RDMA write");
646 logger.Error(
"Error on command receive");
649 logger.Error(
"opcode: %d", (
int)wc->opcode);
654 HandleWorkCompletion(wc);
658 if (num < (
int)wcs.size())
break;
660 ibv_ack_cq_events(l_cq, 1);
663 }
catch (
const ShutdownException &e) {
666 if (!UnlockedIsDead()) { state = S_DEAD; }
669 logger.Debug(
"%s Forced Shutdown threw exception", ClockStr());
671 logger.Error(
"%s: %s", ClockStr(), e.
what());
675 holder->CleanupQueue(GetKey());
678 holder->CleanupQueue(GetKey());
683 void RDMAQueue::PostDequeue(
unsigned count) {
685 ASSERT(UnlockedIsNormal());
686 Command *command = NewCommand(CT_DEQUEUE);
687 command->data.dequeue.amount = count;
688 PostCommand(command);
689 TRACE(
"%s Posting dequeue %u", UnlockedClockStr(), count);
692 void RDMAQueue::PostEnqueue(
unsigned count) {
694 ASSERT(UnlockedIsNormal() || state == S_FLUSHING);
695 Command *command = NewCommand(CT_ENQUEUE);
696 command->data.enqueue.amount = count;
697 PostCommand(command);
698 TRACE(
"%s Posting enqueue %u", UnlockedClockStr(), count);
701 void RDMAQueue::PostBlock(
unsigned count) {
702 ASSERT(UnlockedIsNormal());
703 Command *command = NewCommand(CT_BLOCK);
704 command->data.block.amount = count;
705 PostCommand(command);
706 TRACE(
"%s Posting block %u", UnlockedClockStr(), count);
709 void RDMAQueue::PostShutdown() {
710 Command *command = NewCommand(CT_SHUTDOWN);
711 PostCommand(command);
712 TRACE(
"%s Posting shutdown", UnlockedClockStr());
715 void RDMAQueue::PostGrow() {
716 Command *command = NewCommand(CT_GROW);
717 command->data.grow.length = grow_length;
718 command->data.grow.threshold = grow_thresh;
719 PostCommand(command);
720 TRACE(
"%s Posted grow len: %u, thresh: %u", UnlockedClockStr(), grow_length, grow_thresh);
723 void RDMAQueue::PostD4RTag() {
724 ASSERT(UnlockedIsNormal());
725 Command *command = NewCommand(CT_D4R);
728 tag = reader->GetPublicTag();
730 tag = writer->GetPublicTag();
732 command->data.d4r.count = tag.
Count();
733 command->data.d4r.key = tag.
Key();
734 command->data.d4r.qsize = tag.
QueueSize();
735 command->data.d4r.qkey = tag.
QueueKey();
736 PostCommand(command);
737 TRACE(
"%s Posting D4R tag count: %llu, key: %llu, qsize: %llu, qkey: %llu",
739 (
unsigned long long)tag.
Count(),
740 (
unsigned long long)tag.
Key(),
742 (
unsigned long long)tag.
QueueKey());
745 void RDMAQueue::PostCSRequest() {
746 Command *command = NewCommand(CT_CS_REQ);
747 command->data.cs_info.timestamp = local_cs_ts;
748 PostCommand(command);
749 TRACE(
"%s Posting CS_REQ with ts: %llu", UnlockedClockStr(), (
unsigned long long)local_cs_ts);
752 void RDMAQueue::PostCSOK() {
753 Command *command = NewCommand(CT_CS_OK);
754 PostCommand(command);
755 TRACE(
"%s Posting CS_OK", UnlockedClockStr());
758 void RDMAQueue::PostFlush() {
759 Command *command = NewCommand(CT_FLUSH);
760 command->data.flush.num_enqueued = num_enqueued;
761 PostCommand(command);
762 TRACE(
"%s Posting FLUSH", UnlockedClockStr());
765 void RDMAQueue::PostReset() {
766 Command *command = NewCommand(CT_RESET);
767 PostCommand(command);
768 TRACE(
"%s Posting RESET", UnlockedClockStr());
771 void RDMAQueue::CheckRDMAWrite() {
773 if (readshutdown)
return;
774 if (state != S_RUNNING && state != S_FLUSHING)
return;
775 if (cork > 0)
return;
776 unsigned old_pending_rdma = pending_rdma;
778 if (local_queue.Count() - pending_rdma <= 0)
break;
779 if (num_send_wr >= max_send_wr/2)
break;
780 unsigned count = std::min(remote_queue.Freespace() - pending_rdma,
781 local_queue.Count() - pending_rdma);
782 count = std::min(count, max_threshold);
783 if (count == 0)
return;
784 std::vector<ibv_send_wr> wr(num_channels);
785 memset(&wr[0], 0, num_channels*
sizeof(ibv_send_wr));
786 std::vector<ibv_sge> sge(num_channels);
787 memset(&sge[0], 0, num_channels*
sizeof(ibv_sge));
788 for (
unsigned chan = 0; chan < num_channels; ++chan) {
789 sge[chan].addr = (uint64_t)local_queue.GetTail(pending_rdma)
790 + chan*local_queue.stride;
791 sge[chan].length = count;
792 sge[chan].lkey = local_queue.lkey;
794 wr[chan].sg_list = &sge[chan];
795 wr[chan].num_sge = 1;
796 wr[chan].opcode = IBV_WR_RDMA_WRITE;
797 wr[chan].wr.rdma.remote_addr = (uint64_t)remote_queue.GetHead(pending_rdma)
798 + chan*remote_queue.stride;
799 wr[chan].wr.rdma.rkey = remote_queue.lkey;
800 if (chan+1 < num_channels) {
801 wr[chan].next = &wr[chan+1];
804 wr[num_channels-1].wr_id = count;
806 if (ibv_post_send(qp, &wr[0], &bad)) {
809 num_send_wr += num_channels;
810 pending_rdma += count;
812 if (pending_rdma > old_pending_rdma) {
813 TRACE(
"%s Posted RDMA write, pending_rdma: %llu, to key: %llu", UnlockedClockStr(),
814 (
unsigned long long)pending_rdma, (
unsigned long long)remote_queue.lkey);
818 void RDMAQueue::CheckDequeue() {
819 if (pending_dequeue > 0 && num_send_wr < max_send_wr && UnlockedIsNormal() && !writeshutdown) {
820 PostDequeue(pending_dequeue);
825 void RDMAQueue::PostUpdateQueue() {
826 Command *command = NewCommand(CT_UPDATE);
827 command->data.update.base = local_queue.base;
828 command->data.update.length = local_queue.length;
829 command->data.update.head = local_queue.head;
830 command->data.update.tail = local_queue.tail;
831 command->data.update.stride = local_queue.stride;
832 command->data.update.lkey = local_queue.lkey;
833 PostCommand(command);
834 TRACE(
"%s Posting update queue", UnlockedClockStr());
837 Command *RDMAQueue::NewCommand(CommandType type) {
838 while (num_send_wr >= max_send_wr && !UnlockedIsDead()) {
839 ASSERT(!pthread_equal(pthread_self(), (pthread_t)*event_thread),
840 "Blocking in event thread");
843 if (UnlockedIsDead())
throw BrokenQueueException(GetKey());
844 Command *command = write_command_queue.NewCommand();
845 memset(command, 0,
sizeof(Command));
846 command->type = type;
848 command->clock = clock;
849 command->remote_clock = remote_clock;
853 void RDMAQueue::PostCommand(Command *command) {
854 if ((mode == READ ? writeshutdown : readshutdown)) {
858 ASSERT(num_send_wr < max_send_wr);
859 ibv_send_wr wr, *bad;
861 memset(&sge, 0,
sizeof(sge));
862 sge.addr = (uint64_t)(
void*)command;
863 sge.length =
sizeof(Command);
864 sge.lkey = write_command_queue.GetKey();
865 memset(&wr, 0,
sizeof(wr));
868 wr.opcode = IBV_WR_SEND;
869 if (ibv_post_send(qp, &wr, &bad)) {
873 write_command_queue.Enqueue();
876 void RDMAQueue::PostReceives() {
878 int num = read_command_queue.Freespace();
879 if (num <= 0)
return;
880 std::vector<ibv_recv_wr> wr(num);
881 memset(&wr[0], 0, num*
sizeof(ibv_recv_wr));
882 std::vector<ibv_sge> sge(num);
883 memset(&sge[0], 0, num*
sizeof(ibv_sge));
884 for (
int i = 0; i < num; ++i) {
885 addr = (uint64_t)(
void*)read_command_queue.NewCommand();
887 sge[i].length =
sizeof(Command);
888 sge[i].lkey = read_command_queue.GetKey();
890 wr[i].sg_list = &sge[i];
892 wr[i].next = (i + 1 < num ? &wr[i+1] : 0);
894 ibv_recv_wr *bad = 0;
895 if (ibv_post_recv(qp, &wr[0], &bad)) {
900 void RDMAQueue::HandleWorkCompletion(ibv_wc *wc) {
903 switch (wc->opcode) {
905 write_command_queue.Dequeue();
909 case IBV_WC_RDMA_WRITE:
912 local_queue.AdvanceTail(count);
913 remote_queue.AdvanceHead(count);
914 pending_rdma -= count;
915 confirmed_rdma += count;
920 read_command_queue.Enqueue();
923 ASSERT(
false,
"Unknown work completion type: %d", wc->opcode);
928 void RDMAQueue::ProcessEvents() {
930 while (!read_command_queue.Empty()) {
931 ProcessCommand(read_command_queue.GetCommand());
932 read_command_queue.Dequeue();
935 if (mode == WRITE && !readshutdown) {
937 if (confirmed_rdma > 0 && num_send_wr < max_send_wr) {
938 PostEnqueue(confirmed_rdma);
941 if (pending_grow && pending_rdma == 0 && num_send_wr < max_send_wr) {
944 pending_grow =
false;
947 if (pending_flush && !ActiveTransfers() && local_queue.Empty()) {
949 pending_flush =
false;
951 }
else if (mode == READ && !writeshutdown) {
958 if (writeshutdown || readshutdown) {
959 logger.Trace(
"Shutdown? writeshutdown: %d, readshutdown: %d",
960 (
int)writeshutdown, (
int)readshutdown);
962 if (num_send_wr == max_send_wr) {
break; }
965 State oldstate = state;
967 logger.Trace(
"%s Transition from %s to %s state", UnlockedClockStr(),
968 StateStr(oldstate), StateStr(state));
974 State oldstate = state;
977 logger.Trace(
"%s Transition from %s to %s state", UnlockedClockStr(),
978 StateStr(oldstate), StateStr(state));
979 }
else if (local_queue.Empty() && !ActiveTransfers()) {
982 logger.Trace(
"%s Transition from %s to %s state", UnlockedClockStr(),
983 StateStr(oldstate), StateStr(state));
986 if (writeshutdown || !ActiveTransfers()) {
987 State oldstate = state;
989 logger.Trace(
"%s Transition from %s to %s state", UnlockedClockStr(),
990 StateStr(oldstate), StateStr(state));
996 if (!ActiveTransfers()) {
997 State oldstate = state;
999 logger.Trace(
"%s Transition from %s to %s state", UnlockedClockStr(),
1000 StateStr(oldstate), StateStr(state));
1010 void RDMAQueue::ProcessCommand(Command *command) {
1011 clock = std::max(clock, command->clock) + 1;
1012 remote_clock = command->clock;
1013 old_clock = command->remote_clock;
1014 State oldstate = state;
1015 switch (command->type) {
1017 remote_queue.base = command->data.update.base;
1018 remote_queue.length = command->data.update.length;
1019 if (mode == WRITE) {
1020 remote_queue.head = command->data.update.head;
1021 remote_queue.tail = command->data.update.tail;
1023 remote_queue.stride = command->data.update.stride;
1024 remote_queue.lkey = command->data.update.lkey;
1025 queue_updated =
true;
1026 logger.Trace(
"%s Receved update command: base %p, length: %llu, "
1027 "head: %llu, tail: %llu, stride: %llu, lkey: %llu", UnlockedClockStr(),
1028 remote_queue.base, (
unsigned long long)remote_queue.length,
1029 (
unsigned long long)remote_queue.head,
1030 (
unsigned long long)remote_queue.tail,
1031 (
unsigned long long)remote_queue.stride,
1032 (
unsigned long long)remote_queue.lkey);
1033 if (state == S_INIT) {
1035 logger.Trace(
"%s Transition from %s to %s state", UnlockedClockStr(),
1036 StateStr(oldstate), StateStr(state));
1038 if (grow_ack && mode == WRITE) {
1046 TRACE(
"%s %s Received enqueue command: amount: %llu, free: %llu", StateStr(state),
1047 UnlockedClockStr(), (
unsigned long long)command->data.enqueue.amount,
1048 (
unsigned long long)local_queue.Freespace());
1049 ASSERT(local_queue.Freespace() >= command->data.enqueue.amount,
"Freespace: %llu"
1050 "\n%s\n%llu", (
unsigned long long)local_queue.Freespace(), UnlockedClockStr(),
1051 (
unsigned long long)GetKey());
1052 local_queue.AdvanceHead(command->data.enqueue.amount);
1058 TRACE(
"%s %s Received dequeue command: amount: %llu count: %llu", StateStr(state),
1059 UnlockedClockStr(), (
unsigned long long)command->data.dequeue.amount,
1060 (
unsigned long long)remote_queue.Count());
1061 ASSERT(remote_queue.Count() >= command->data.dequeue.amount);
1062 remote_queue.AdvanceTail(command->data.dequeue.amount);
1066 TRACE(
"%s %s Received block command: amount: %llu, count: %llu",
1067 StateStr(state), UnlockedClockStr(),
1068 (
unsigned long long)command->data.block.amount,
1069 (
unsigned long long)local_queue.Count());
1071 if (local_queue.Full()) {
1072 writerequest = command->data.block.amount;
1075 if (local_queue.Empty()) {
1076 readrequest = command->data.block.amount;
1079 if (useD4R && UnlockedIsNormal()) { PostD4RTag(); }
1082 logger.Trace(
"%s %s Received shutdown command", StateStr(state), UnlockedClockStr());
1090 logger.Trace(
"%s Transition from %s to %s state", UnlockedClockStr(),
1091 StateStr(oldstate), StateStr(state));
1094 logger.Trace(
"%s %s Received grow command len: %llu thresh: %llu",
1095 StateStr(state), UnlockedClockStr(),
1096 (
unsigned long long)command->data.grow.length,
1097 (
unsigned long long)command->data.grow.threshold);
1098 grow_length = max(grow_length, command->data.grow.length);
1099 grow_thresh = max(grow_thresh, command->data.grow.threshold);
1100 if (mode == WRITE) {
1102 pending_grow =
true;
1111 TRACE(
"%s %s Received D4R tag", StateStr(state), UnlockedClockStr());
1114 tag.
Count(command->data.d4r.count);
1115 tag.
Key(command->data.d4r.key);
1117 tag.
QueueKey(command->data.d4r.qkey);
1118 mocknode->SetPublicTag(tag);
1120 if (mode == WRITE) {
1128 uint64_t timestamp = command->data.cs_info.timestamp;
1129 TRACE(
"%s %s Receive CS_REQ, ts: %llu", StateStr(state), UnlockedClockStr(),
1130 (
unsigned long long)timestamp);
1133 local_cs_ts > timestamp ||
1134 (local_cs_ts == timestamp && mode == READ)
1138 remote_want_cs =
true;
1143 TRACE(
"%s %s Receive CS_OK", StateStr(state), UnlockedClockStr());
1148 TRACE(
"%s %s Receive FLUSH", StateStr(state), UnlockedClockStr());
1149 num_enqueued = command->data.flush.num_enqueued;
1154 TRACE(
"%s %s Receive RESET", StateStr(state), UnlockedClockStr());
1164 void RDMAQueue::Initialize() {
1165 int num_devices = 0;
1166 ibv_device **devices = ibv_get_device_list(&num_devices);
1167 ibv_device *device = 0;
1168 if (num_devices < 1) {
1169 throw std::runtime_error(
"Error getting devices, no devices");
1171 for (
int i = 0; i < num_devices; ++i) {
1172 if (if_name.empty()) {
1173 device = devices[i];
1174 if_name = ibv_get_device_name(devices[i]);
1176 }
else if (if_name == ibv_get_device_name(devices[i])) {
1177 device = devices[i];
1182 throw std::runtime_error(
"Error getting device, no matching device.");
1184 ctx = ibv_open_device(device);
1186 pd = ibv_alloc_pd(ctx);
1188 cc = ibv_create_comp_channel(ctx);
1190 cq = ibv_create_cq(ctx, max_send_wr + max_recv_wr, 0, cc, 0);
1192 ibv_device_attr device_attr;
1193 memset(&device_attr, 0,
sizeof(device_attr));
1194 if (ibv_query_device(ctx, &device_attr)) {
throw ErrnoException(); }
1195 ibv_qp_init_attr qp_attr;
1196 memset(&qp_attr, 0,
sizeof(qp_attr));
1197 qp_attr.qp_context = 0;
1198 qp_attr.send_cq = cq;
1199 qp_attr.recv_cq = cq;
1200 qp_attr.sq_sig_all = 1;
1201 qp_attr.cap.max_send_wr = max_send_wr;
1202 qp_attr.cap.max_recv_wr = max_recv_wr;
1203 qp_attr.cap.max_send_sge = 1;
1204 qp_attr.cap.max_recv_sge = 1;
1205 qp_attr.qp_type = IBV_QPT_RC;
1206 qp = ibv_create_qp(pd, &qp_attr);
1208 local_info.qp_num = qp->qp_num;
1209 ibv_port_attr port_attr;
1210 memset(&port_attr, 0,
sizeof(port_attr));
1213 for (i = 1; i <= device_attr.phys_port_cnt; ++i) {
1214 if (ibv_query_port(ctx, i, &port_attr)) {
1217 if (port_attr.state == IBV_PORT_ACTIVE) {
1219 local_info.lid = port_attr.lid;
1223 if (i > device_attr.phys_port_cnt) {
1224 throw std::runtime_error(
"Unable to find an active port");
1227 if (ibv_query_port(ctx, if_port, &port_attr)) {
1230 if (port_attr.state == IBV_PORT_ACTIVE) {
1231 local_info.lid = port_attr.lid;
1233 throw std::runtime_error(
"Port is not active");
1238 void RDMAQueue::Cleanup() {
1239 if (write_command_queue_mr) {
1240 ibv_dereg_mr(write_command_queue_mr);
1241 write_command_queue_mr = 0;
1243 if (read_command_queue_mr) {
1244 ibv_dereg_mr(read_command_queue_mr);
1245 read_command_queue_mr = 0;
1248 ibv_dereg_mr(mbs_mr);
1252 ibv_dereg_mr(mbs_old_mr);
1264 ibv_destroy_comp_channel(cc);
1272 ibv_close_device(ctx);
1277 static void ExchangeData(
SocketHandle &sock,
char *src,
char *dst,
unsigned len) {
1278 unsigned num_written = 0;
1279 while (num_written < len) {
1280 num_written += sock.
Write(src + num_written, len - num_written);
1282 unsigned num_read = 0;
1283 while (num_read < len) {
1284 unsigned nr = sock.
Read(dst + num_read, len - num_read);
1285 if (nr == 0 && sock.
Eof()) {
1286 throw std::runtime_error(
"Unexpected EOF encountered on socket.");
1292 void RDMAQueue::Connect() {
1295 kernel->CheckTerminated();
1296 logger.Debug(
"Connecting Socket");
1297 shared_ptr<Sync::Future<int> > conn;
1298 if (mode == WRITE) {
1299 conn = server->ConnectWriter(GetKey());
1301 conn = server->ConnectReader(GetKey());
1305 sock.
FD(conn->Get());
1308 logger.Debug(
"Socket Connected, exchanging data (lid: %d, qp_num: %d",
1309 local_info.lid, local_info.qp_num);
1310 ExchangeData(sock, (
char*)&local_info, (
char*)&remote_info,
sizeof(PortInfo));
1312 logger.Debug(
"Got data (lid: %d, qp_num: %d) connecting the qp", remote_info.lid, remote_info.qp_num);
1316 memset(&attr, 0,
sizeof(attr));
1317 attr.qp_state = IBV_QPS_INIT;
1318 attr.pkey_index = 0;
1319 attr.port_num = if_port;
1320 attr.qp_access_flags = IBV_ACCESS_DEFAULT;
1321 if (ibv_modify_qp(qp, &attr,
1322 IBV_QP_STATE | IBV_QP_ACCESS_FLAGS |
1323 IBV_QP_PORT | IBV_QP_PKEY_INDEX))
1329 logger.Debug(
"QP now in INIT state");
1331 memset(&attr, 0,
sizeof(attr));
1332 attr.qp_state = IBV_QPS_RTR;
1333 attr.path_mtu = IBV_MTU_512;
1335 attr.ah_attr.is_global = 0;
1336 attr.ah_attr.dlid = remote_info.lid;
1337 attr.ah_attr.sl = 0;
1338 attr.ah_attr.src_path_bits = 0;
1339 attr.ah_attr.port_num = if_port;
1341 attr.dest_qp_num = remote_info.qp_num;
1343 attr.max_dest_rd_atomic = 1;
1344 attr.min_rnr_timer = 12;
1346 if (ibv_modify_qp(qp, &attr,
1347 IBV_QP_STATE | IBV_QP_PATH_MTU |
1348 IBV_QP_AV | IBV_QP_DEST_QPN | IBV_QP_RQ_PSN |
1349 IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER
1355 logger.Debug(
"QP now in RTR state");
1357 memset(&attr, 0,
sizeof(attr));
1358 attr.qp_state = IBV_QPS_RTS;
1363 attr.max_rd_atomic = 1;
1364 if (ibv_modify_qp(qp, &attr,
1365 IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT |
1366 IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC
1372 logger.Debug(
"QP now in RTS state");
1374 ibv_req_notify_cq(cq, 0);
1377 logger.Debug(
"Connection complete");
1380 void RDMAQueue::Cork() {
1382 TRACE(
"Cork: %u", cork);
1385 void RDMAQueue::Uncork() {
1388 TRACE(
"Uncork: %u", cork);
1390 if (mode == WRITE) {
1398 void RDMAQueue::EnterCriticalSection() {
1399 if (!UnlockedIsNormal()) {
throw BrokenQueueException(GetKey()); }
1400 local_want_cs =
true;
1401 local_cs_ts = clock;
1404 while ((!cs_ok || local_cs_ts > old_clock) && UnlockedIsNormal()) { Wait(); }
1405 TRACE(
"%s Entering Critical Section", UnlockedClockStr());
1408 void RDMAQueue::LeaveCriticalSection() {
1409 TRACE(
"%s Leaving Critical Section", UnlockedClockStr());
1410 if (!UnlockedIsNormal()) {
return; }
1411 local_want_cs =
false;
1412 if (remote_want_cs) {
1414 remote_want_cs =
false;
1418 void RDMAQueue::InternalGrow() {
1419 logger.Trace(
"Internal grow, inenqueue: %d, indequeue: %d, grow_length: %u, grow_thresh: %u",
1420 (
int)inenqueue, (
int)indequeue, (
unsigned)grow_length, (
unsigned)grow_thresh);
1421 enqueue_use_old = inenqueue;
1422 dequeue_use_old = indequeue;
1425 QueueModel new_queue;
1426 new_queue.length = ComputeQueueLength(grow_length, grow_thresh, alpha, mode);
1427 auto_ptr<MirrorBufferSet> new_mbs;
1428 new_mbs.reset(
new MirrorBufferSet(new_queue.length, grow_thresh, num_channels));
1429 new_queue.length = new_mbs->BufferSize();
1430 new_queue.base = (
char*)(
void*)*new_mbs;
1431 new_queue.stride = new_mbs->BufferSize() + new_mbs->MirrorSize();
1432 grow_thresh = new_mbs->MirrorSize();
1433 ibv_mr *new_mbs_mr = ibv_reg_mr(pd, new_queue.base,
1434 (new_queue.length + grow_thresh)*num_channels, IBV_ACCESS_DEFAULT);
1435 new_queue.lkey = new_mbs_mr->lkey;
1438 uint64_t thresh = min((uint64_t)max_threshold, (uint64_t)grow_thresh + 1);
1439 while (new_queue.Count() < local_queue.Count()) {
1440 uint64_t amount = min((uint64_t)thresh, local_queue.Count() - new_queue.Count());
1441 for (
unsigned chan = 0; chan < num_channels; ++chan) {
1442 memcpy((new_queue.GetHead() + chan*new_queue.stride),
1443 (local_queue.GetTail(new_queue.Count())
1444 + chan*local_queue.stride),
1447 new_queue.AdvanceHead(amount);
1449 max_threshold = std::min(grow_thresh + 1, new_queue.length);
1452 if ((enqueue_use_old || dequeue_use_old) && !mbs_old.get()) {
1453 logger.Trace(
"Keeping old queue, old key %llu, new key %llu",
1454 (
unsigned long long)local_queue.lkey, (
unsigned long long)new_queue.lkey);
1456 local_old_queue = local_queue;
1457 mbs_old_mr = mbs_mr;
1460 local_queue = new_queue;
1461 mbs_mr = new_mbs_mr;
1463 logger.Trace(
"Replacing old queue, old key %llu, new key %llu",
1464 (
unsigned long long)local_queue.lkey, (
unsigned long long)new_queue.lkey);
1465 ibv_dereg_mr(mbs_mr);
1466 mbs_mr = new_mbs_mr;
1468 local_queue = new_queue;
1472 bool RDMAQueue::ActiveTransfers() {
1473 return (num_send_wr != 0 || pending_rdma > 0 || confirmed_rdma > 0);
1476 bool RDMAQueue::IsDead() {
1478 return UnlockedIsDead();
1480 bool RDMAQueue::UnlockedIsDead() {
1481 return (state == S_DEAD || state == S_ERROR);
1483 bool RDMAQueue::UnlockedIsNormal() {
1484 return (state == S_RUNNING);
1487 const char* RDMAQueue::ClockStr() {
1489 return UnlockedClockStr();
1492 const char* RDMAQueue::UnlockedClockStr() {
1493 std::ostringstream oss;
1494 oss <<
"<" << clock <<
", " << remote_clock <<
", " << old_clock <<
">";
1495 clock_str = oss.str();
1496 return clock_str.c_str();
1499 void RDMAQueue::ShutdownError() {
1501 readshutdown =
true;
1502 writeshutdown =
true;
1508 #endif // ENABLE_RDMA
virtual void UnlockedShutdownWriter()
virtual void InternalFlush()
A FileHandle customized with some socket specific functionality and functions.
virtual void WaitForFreespace()
Definition of the queue attributes.
virtual void WaitForData()
uint64_t QueueKey() const
shared_ptr< QueueBase > queue
virtual void UnlockedShutdownReader()
An object to hold references to RemoteQueues so they can continue to work after the node has gone awa...
The exceptions specified for the CPN network.
virtual void InternalReset()
void Close()
Close the file and reset the internal state.
PthreadFunctional * CreatePthreadFunctional(T *obj, void *(T::*meth)(void))
virtual void UnlockedSignalReaderTagChanged()
virtual void UnlockedSignalWriterTagChanged()
void Reset()
Clear all internal state including the file descriptor! WARNING does not close the file! ...
virtual const char * what() const
uint64_t QueueSize() const
virtual void LogState()
For debugging ONLY!! Otherwise non deterministic output.
unsigned Read(void *ptr, unsigned len)
Read data from the file descriptor.
unsigned Write(const void *ptr, unsigned len)
Write data to the file descriptor.