CPN
Computational Process Networks
ThresholdQueueBase.cc
Go to the documentation of this file.
1 //=============================================================================
2 // $Id: ThresholdQueueBase.cc,v 1.1 2006/06/12 20:30:13 gallen Exp $
3 //-----------------------------------------------------------------------------
4 // A queue class that is optimized for DSP applications.
5 // Specifically, reading and writing is done directly from queue memory,
6 // and data is guaranteed to be contiguous in memory.
7 //-----------------------------------------------------------------------------
8 // The following _must_ hold: 1 <= qMaxThreshold < qLength
9 //=============================================================================
10 
11 #include "common_priv.h"
12 #include "ThresholdQueue.h"
13 #include "MirrorBufferSet.h"
14 #include <stdlib.h>
15 #include <string.h>
16 
17 #define MINIMUM_ALIGNMENT 64
18 
19 //-----------------------------------------------------------------------------
21  ulong maxThresh, ulong numChans)
22 //-----------------------------------------------------------------------------
23 : elementSize(elemSize),
24  queueLength(queueLen),
25  maxThreshold(maxThresh),
26  numChannels(numChans),
27  chanOffset(0), baseOffset(0),
28  base(0), mbs(0)
29 {
30  if (maxThreshold<1)
31  maxThreshold = 1;
34 
35  Reset();
36 
37  bool useMBS = 1;
39 }
40 
41 //-----------------------------------------------------------------------------
43 //-----------------------------------------------------------------------------
44 : elementSize(elemSize),
45  queueLength(attr.queueLength),
48  chanOffset(0), baseOffset(0),
49  base(0), mbs(0)
50 {
51  if (maxThreshold<1)
52  maxThreshold = 1;
55 
56  Reset();
57 
58  bool useMBS = attr.useMBS;
59  if (useMBS) {
60  // there is no reason for an offset bigger than the page size
64  }
65 
67 }
68 
69 
70 //-----------------------------------------------------------------------------
71 void ThresholdQueueBase::AllocateBuf(ulong queueLen, ulong maxThresh,
72  ulong numChans, bool useMBS)
73 //-----------------------------------------------------------------------------
74 {
75  if (maxThresh<1) maxThresh = 1;
76  if (maxThresh>queueLen) queueLen = maxThresh;
77  numChannels = numChans;
78 
79  if ( useMBS && MirrorBufferSet::Supported() ) {
80  ulong bufSz = queueLen * elementSize;
81  ulong mirSz = maxThresh-1 + baseOffset + (numChannels-1)*chanOffset;
82  mirSz *= elementSize;
83  mbs = new MirrorBufferSet(bufSz, mirSz, numChannels);
84  // MirrorBufferSet may have just resized everything...
86  maxThreshold = mbs->MirrorSize() / elementSize + 1;
88  maxThreshold -= baseOffset + (numChannels-1)*chanOffset;
89  base = (char*)((void*)(*mbs)) + baseOffset;
90  // Corner case of queueLength == maxThreshold
92  } else {
93  queueLength = queueLen;
94  maxThreshold = maxThresh;
97  if (mod != 0) {
99  }
101  }
102  Reset();
103 }
104 
105 
106 
107 //-----------------------------------------------------------------------------
109 //-----------------------------------------------------------------------------
110 {
111  elementsEnqueued = 0;
112  elementsDequeued = 0;
113 
114  head = 0;
115  tail = 0;
116 }
117 
118 
119 //-----------------------------------------------------------------------------
121 //-----------------------------------------------------------------------------
122 {
123  if (mbs) {
124  delete mbs;
125  mbs = 0;
126  } else {
127  free(base);
128  }
129  base = 0;
130 }
131 
132 
133 //-----------------------------------------------------------------------------
135 // get a pointer for writing data before calling Enqueue
136 //-----------------------------------------------------------------------------
137 {
138  if (thresh>Freespace() || thresh>MaxThreshold()) return 0;
139 // if (chan>=numChannels) return 0;
140 // assert(chan<numChannels);
141  register ulong idx = tail;
142  while (idx>=queueLength) idx -= queueLength;
143  return (char*) base + (chan*channelStride + idx) * elementSize;
144 }
145 
146 
147 //-----------------------------------------------------------------------------
148 const void* ThresholdQueueBase::GetRawDequeuePtr(ulong thresh, ulong chan) const
149 // get the current pointer (to thresh valid samples)
150 //-----------------------------------------------------------------------------
151 {
152  if (thresh>Count() || thresh>MaxThreshold()) return 0;
153 // if (chan>=numChannels) return 0;
154 // assert(chan<numChannels);
155  register ulong idx = head;
156  while (idx>=queueLength) idx -= queueLength;
157  return (char*) base + (chan*channelStride + idx) * elementSize;
158 }
159 
160 
161 //-----------------------------------------------------------------------------
163 // move all of the data to the right place, then update the indices
164 //-----------------------------------------------------------------------------
165 {
166  if (!mbs) { // the mbs maintains circularity
167 
168  // we must mirror by ourselves
169  register ulong idx = tail;
170  while (idx>=queueLength) idx -= queueLength;
171 
172  // elems to copy from queue area to mirror area
173  register ulong countUp = 0;
174  if (idx<maxThreshold-1) countUp = maxThreshold-1-idx; // to mirror's lower edge
175  if (idx+count<maxThreshold-1) countUp = count;
176 
177  // elems to copy from mirror area to queue area
178  register ulong countDn = 0;
179  if (idx+count>queueLength) countDn = idx+count-queueLength;
180 
181  if (countUp || countDn)
182  for (ulong chan=0; chan<numChannels; chan++) {
183  char* chanBase = (char*)base + (chan*channelStride) * elementSize;
184 
185  // move data in the queue (below the threshold) to the mirror area
186  char* dst = chanBase + (idx + queueLength) * elementSize;
187  char* src = chanBase + idx * elementSize;
188  memcpy(dst,src,countUp*elementSize);
189 
190  // move data from the mirror area to the queue (below the threshold)
191  src = chanBase + queueLength * elementSize;
192  memcpy(chanBase,src,countDn*elementSize);
193  }
194  }
195 
196  // update the tail pointer
197  register ulong newTail = tail+count;
198  while (newTail>=2*queueLength) newTail -= 2*queueLength;
199  tail = newTail;
200  elementsEnqueued += count;
201 }
202 
203 
204 //-----------------------------------------------------------------------------
206 // release the buffer just referenced by GetDequeuePtr
207 //-----------------------------------------------------------------------------
208 {
209  register ulong newHead = head+count;
210  while (newHead>=2*queueLength) newHead -= 2*queueLength;
211  head = newHead;
212  elementsDequeued += count;
213 }
214 
215 
216 //-----------------------------------------------------------------------------
218 // the number of elements in the queue
219 //-----------------------------------------------------------------------------
220 {
221  register ulong count = (2*queueLength - head + tail);
222  while (count>=queueLength) count -= queueLength;
223  if (count) return count;
224  return (head==tail) ? 0 : queueLength;
225 }
226 
227 
228 //-----------------------------------------------------------------------------
230 // the number of elements free in the queue
231 //-----------------------------------------------------------------------------
232 {
233  return queueLength - Count();
234 }
235 
236 
237 #if 1
238 #include <assert.h>
239 #include <stdio.h>
240 
241 //-----------------------------------------------------------------------------
242 void ThresholdQueueBase::Grow(ulong queueLen, ulong maxThresh)
243 // Growing is only supposed to occur when a system is:
244 // "deadlocked, trying to write to a full queue"
245 // This is, by definition, an expensive operation. Little effort has been
246 // spent optimizing for this case
247 //-----------------------------------------------------------------------------
248 {
249  fprintf(stderr, "ThresholdQueueBase::Grow(%lu,%lu)\n", queueLen, maxThresh);
250 
251  // handle the default parameter case
252  if (!maxThresh) maxThresh = MaxThreshold();
253 
254  // ignore the do-nothing case
255  if (queueLen <= QueueLength() && maxThresh <= MaxThreshold()) return;
256 
257  // we don't do any shrinking
258  if (maxThresh <= MaxThreshold()) maxThresh = MaxThreshold();
259  if (queueLen <= QueueLength()) queueLen = QueueLength();
260 
261  // keep our old info around
262  ThresholdQueueBase oldQueue(*this); // just duplicate the pointers
263 
264  // allocate a new buffer (or MirrorBufferSet)
265  AllocateBuf(queueLen,maxThresh,numChannels,mbs?1:0);
266 
267  // growth should not affect this member
268  elementsDequeued = oldQueue.ElementsDequeued();
269 
270  // copy all in oldQueue to our new buffer with existing mechanisms
271  ulong count;
272  while ((count = oldQueue.Count()) != 0) {
273  if (count>oldQueue.MaxThreshold())
274  count = oldQueue.MaxThreshold();
275  for (ulong chan=0; chan<numChannels; chan++) {
276  const void* src = oldQueue.GetRawDequeuePtr(count,chan);
277  void* dst = GetRawEnqueuePtr(count,chan);
278  assert(src && dst);
279  memcpy(dst,src,count*elementSize);
280  }
281  Enqueue(count);
282  oldQueue.Dequeue(count);
283  }
284 
285  // growth should not affect this member
286  elementsEnqueued = oldQueue.ElementsEnqueued();
287 
288  // when oldQueue gets destructed, it will deallocate the old buffers
289 }
290 #endif
static int Supported(void)
ulong MirrorSize(void) const
void Dequeue(ulong count)
ulong MaxThreshold(void) const
ulong Count(void) const
const void * GetRawDequeuePtr(ulong dequeueThresh, ulong chan=0) const
#define MINIMUM_ALIGNMENT
void AllocateBuf(ulong queueLen, ulong maxThresh, ulong numChans, bool useMBS)
ulong QueueLength(void) const
ulong ElementsDequeued(void) const
ulong Freespace(void) const
void Enqueue(ulong count)
void Grow(ulong queueLen, ulong maxThresh)
ulong ElementsEnqueued(void) const
static ulong PageSize(void)
ulong BufferSize(void) const
void * GetRawEnqueuePtr(ulong enqueueThresh, ulong chan=0) const
MirrorBufferSet * mbs
ThresholdQueueBase(ulong elemSize, ulong queueLen, ulong maxThresh, ulong numChans=1)