23 #ifndef CPN_RDMA_QUEUE_H
24 #define CPN_RDMA_QUEUE_H
26 #include "common_priv.h"
33 #include <infiniband/verbs.h>
42 QueueModel() : base(0), length(0), head(0), tail(0), stride(0), lkey(0) {}
43 QueueModel(
char *b, uint64_t s)
44 : base(b), length(s), head(0), tail(0), stride(0), lkey(0)
46 char *GetTail(uint64_t offset = 0) {
return &base[(tail+offset)%length]; }
47 char *GetHead(uint64_t offset = 0) {
return &base[(head+offset)%length]; }
48 void AdvanceTail(uint64_t count) {
50 while (tail > length) {
55 void AdvanceHead(uint64_t count) { head += count; }
56 uint64_t Count()
const {
return head - tail; }
57 bool Empty()
const {
return head == tail; }
58 bool Full()
const {
return Count() == Size(); }
59 uint64_t Size()
const {
return length; }
60 uint64_t Freespace()
const {
return length - Count(); }
63 uint64_t length, head, tail;
84 uint64_t clock, remote_clock;
88 uint64_t length, head, tail, stride;
101 uint64_t length, threshold;
104 uint64_t count, key, qsize, qkey;
106 struct CriticalSectionInfo {
110 uint64_t num_enqueued;
115 struct CommandQueue {
117 CommandQueue(uint64_t size);
119 Command *NewCommand();
121 Command *GetCommand();
125 uint64_t Count()
const {
return queue.Count()/
sizeof(Command); }
126 bool Empty()
const {
return queue.Empty(); }
127 bool Full()
const {
return Freespace() == 0; }
128 uint64_t Size()
const {
return queue.Size()/
sizeof(Command); }
129 uint64_t Freespace()
const {
return queue.Freespace()/
sizeof(Command) - outstanding_commands; }
130 uint64_t Outstanding()
const {
return outstanding_commands; }
132 char *GetBase() {
return queue.base; }
133 void SetKey(uint32_t k) { queue.lkey = k; }
134 uint32_t GetKey()
const {
return queue.lkey; }
136 uint64_t outstanding_commands;
151 class RDMAQueue :
public QueueBase,
public RemoteQueueBase {
157 RDMAQueue(KernelBase *k, Mode m,
158 ConnectionServer *s, RemoteQueueHolder *h,
const SimpleQueueAttr &attr);
160 virtual ~RDMAQueue();
164 Key_t GetKey()
const {
return mode == READ ? readerkey : writerkey; }
168 virtual void WaitForData();
169 virtual void WaitForFreespace();
171 virtual const void *InternalGetRawDequeuePtr(
unsigned thresh,
unsigned chan);
172 virtual void InternalDequeue(
unsigned count);
173 virtual void *InternalGetRawEnqueuePtr(
unsigned thresh,
unsigned chan);
174 virtual void InternalEnqueue(
unsigned count);
176 virtual void InternalFlush();
177 virtual void InternalReset();
178 virtual unsigned UnlockedNumChannels()
const {
return num_channels; }
179 virtual unsigned UnlockedCount()
const {
return local_queue.Count() + remote_queue.Count(); }
180 virtual bool UnlockedEmpty()
const {
return local_queue.Empty() && remote_queue.Empty(); }
181 virtual unsigned UnlockedFreespace()
const {
return local_queue.Freespace(); }
182 virtual bool UnlockedFull()
const {
return local_queue.Full(); }
183 virtual unsigned UnlockedMaxThreshold()
const {
return max_threshold; }
184 virtual unsigned UnlockedQueueLength()
const {
return local_queue.Size() + remote_queue.Size(); }
185 virtual unsigned UnlockedEnqueueChannelStride()
const;
186 virtual unsigned UnlockedDequeueChannelStride()
const;
187 virtual void UnlockedGrow(
unsigned queueLen,
unsigned maxThresh);
188 virtual void UnlockedShutdownReader();
189 virtual void UnlockedShutdownWriter();
190 virtual unsigned UnlockedNumEnqueued()
const {
return num_enqueued; }
191 virtual unsigned UnlockedNumDequeued()
const {
return num_dequeued; }
193 virtual void UnlockedSignalReaderTagChanged();
194 virtual void UnlockedSignalWriterTagChanged();
204 void PostDequeue(
unsigned count);
205 void PostEnqueue(
unsigned count);
206 void PostBlock(
unsigned count);
210 void PostCSRequest();
214 void CheckRDMAWrite();
216 void PostUpdateQueue();
217 Internal::Command *NewCommand(Internal::CommandType type);
218 void PostCommand(Internal::Command *command);
220 void HandleWorkCompletion(ibv_wc *wc);
221 void ProcessEvents();
222 void ProcessCommand(Internal::Command *command);
230 void EnterCriticalSection();
231 void LeaveCriticalSection();
235 bool ActiveTransfers();
238 bool UnlockedIsDead();
239 bool UnlockedIsNormal();
241 const char* ClockStr();
242 const char* UnlockedClockStr();
244 void ShutdownError();
247 Internal::State state;
249 auto_ptr<MirrorBufferSet> mbs;
250 auto_ptr<MirrorBufferSet> mbs_old;
251 auto_ptr<Pthread> event_thread;
253 Internal::QueueModel local_queue;
254 Internal::QueueModel local_old_queue;
255 Internal::QueueModel remote_queue;
257 Internal::CommandQueue read_command_queue;
258 Internal::CommandQueue write_command_queue;
260 unsigned max_threshold;
261 unsigned num_channels;
262 uint64_t num_enqueued;
263 uint64_t num_dequeued;
265 unsigned max_send_wr;
266 unsigned max_recv_wr;
267 unsigned num_send_wr;
268 unsigned pending_dequeue;
269 unsigned pending_rdma;
270 unsigned confirmed_rdma;
278 uint64_t remote_clock;
280 std::string clock_str;
282 uint64_t local_cs_ts;
290 bool enqueue_use_old;
291 bool dequeue_use_old;
292 uint64_t grow_length;
293 uint64_t grow_thresh;
295 ConnectionServer *
const server;
296 RemoteQueueHolder *
const holder;
297 shared_ptr<D4R::Node> mocknode;
300 PortInfo remote_info;
304 ibv_comp_channel *cc;
309 ibv_mr *read_command_queue_mr;
310 ibv_mr *write_command_queue_mr;
314 #endif // ENABLE_RDMA
Top Representations of generic queues for the CPN library.