CPN
Computational Process Networks
CPNThresholdQueue.cc
Go to the documentation of this file.
1 //=============================================================================
2 // Computational Process Networks class library
3 // Copyright (C) 1997-2006 Gregory E. Allen and The University of Texas
4 //
5 // This library is free software; you can redistribute it and/or modify it
6 // under the terms of the GNU Library General Public License as published
7 // by the Free Software Foundation; either version 2 of the License, or
8 // (at your option) any later version.
9 //
10 // This library is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 // Library General Public License for more details.
14 //
15 // The GNU Public License is available in the file LICENSE, or you
16 // can write to the Free Software Foundation, Inc., 59 Temple Place -
17 // Suite 330, Boston, MA 02111-1307, USA, or you can find it on the
18 // World Wide Web at http://www.fsf.org.
19 //=============================================================================
24 #include "common_priv.h"
25 #include "CPNThresholdQueue.h"
26 #include <cpn/QueueAttr.h>
28 #include <cstring>
29 
30 namespace CPN {
31 
33  : QueueBase(k, attr), queue(0), oldqueue(0), enqueueUseOld(false), dequeueUseOld(false)
34  {
35  ThresholdQueueAttr qattr(attr.GetLength(), attr.GetMaxThreshold(),
36  attr.GetNumChannels(), usembs);
37  queue = new TQImpl(qattr);
38  }
39 
41  unsigned length, bool usembs)
42  : QueueBase(k, attr), queue(0), oldqueue(0), enqueueUseOld(false), dequeueUseOld(false)
43  {
44  ThresholdQueueAttr qattr(length, attr.GetMaxThreshold(),
45  attr.GetNumChannels(), usembs);
46  queue = new TQImpl(qattr);
47  }
48 
49 
51  delete queue;
52  queue = 0;
53  delete oldqueue;
54  oldqueue = 0;
55  }
56 
57  void *ThresholdQueue::InternalGetRawEnqueuePtr(unsigned thresh, unsigned chan) {
58  void *ret = 0;
59  if (enqueueUseOld) {
61  ret = oldqueue->GetRawEnqueuePtr(thresh, chan);
62  ASSERT(ret);
63  } else {
64  ret = queue->GetRawEnqueuePtr(thresh, chan);
65  }
66  return ret;
67  }
68 
69  void ThresholdQueue::InternalEnqueue(unsigned count) {
70  if (enqueueUseOld) {
72  oldqueue->Enqueue(count);
73 
74  unsigned count;
75  while ( (count = oldqueue->Count()) != 0 ) {
76  if (count > oldqueue->MaxThreshold()) {
77  count = oldqueue->MaxThreshold();
78  }
79  for (unsigned chan = 0; chan < queue->NumChannels(); chan++) {
80  const void* src = oldqueue->GetRawDequeuePtr(count, chan);
81  void* dst = queue->GetRawEnqueuePtr(count, chan);
82  ASSERT(src && dst);
83  memcpy(dst,src,count);
84  }
85 
86  queue->Enqueue(count);
87  oldqueue->Dequeue(count);
88  }
89 
90  enqueueUseOld = false;
91  delete oldqueue;
92  oldqueue = 0;
93  } else {
94  queue->Enqueue(count);
95  }
96  }
97 
99  return queue->NumChannels();
100  }
101 
103  if (enqueueUseOld) {
104  return oldqueue->ChannelStride();
105  } else {
106  return queue->ChannelStride();
107  }
108  }
109 
111  if (dequeueUseOld) {
112  return oldqueue->ChannelStride();
113  } else {
114  return queue->ChannelStride();
115  }
116  }
117 
119  return queue->Freespace();
120  }
121 
123  return queue->Full();
124  }
125 
126  const void *ThresholdQueue::InternalGetRawDequeuePtr(unsigned thresh, unsigned chan) {
127  const void *ret = 0;
128  if (dequeueUseOld) {
129  // The ONLY reason this code path should be followed is if the node made a getdequeueptr
130  // then called getdequeueptr again before dequeue when a grow happens inbetween
131  ASSERT(indequeue);
132  ret = oldqueue->GetRawDequeuePtr(thresh, chan);
133  ASSERT(ret);
134  } else {
135  ret = queue->GetRawDequeuePtr(thresh, chan);
136  }
137  return ret;
138  }
139 
140  void ThresholdQueue::InternalDequeue(unsigned count) {
141  if (dequeueUseOld) {
142  dequeueUseOld = false;
143  delete oldqueue;
144  oldqueue = 0;
145  }
146  queue->Dequeue(count);
147  }
148 
149  unsigned ThresholdQueue::UnlockedCount() const {
150  return queue->Count();
151  }
152 
154  return queue->Empty();
155  }
156 
158  return queue->MaxThreshold();
159  }
160 
162  return queue->QueueLength();
163  }
164 
166  return queue->ElementsEnqueued();
167  }
168 
170  return queue->ElementsDequeued();
171  }
172 
174  if (dequeueUseOld || enqueueUseOld) {
175  dequeueUseOld = false;
176  enqueueUseOld = false;
177  delete oldqueue;
178  oldqueue = 0;
179  }
180  queue->Reset();
182  }
183 
184  void ThresholdQueue::UnlockedGrow(unsigned queueLen, unsigned maxThresh) {
185  if (queueLen <= queue->QueueLength() && maxThresh <= queue->MaxThreshold()) return;
186  ASSERT(!(inenqueue && indequeue), "Unhandled grow case of having an outstanding dequeue and enqueue");
187  if (oldqueue) {
188  // If the old queue is still around we have to still be in the same state
191  queue->Grow(queueLen, maxThresh, false);
192  } else if (!inenqueue && !indequeue) {
193  queue->Grow(queueLen, maxThresh, false);
194  } else {
197  // this should make a duplicate
198  oldqueue = queue->Grow(queueLen, maxThresh, true);
199  if (!oldqueue) {
200  enqueueUseOld = false;
201  enqueueUseOld = false;
202  }
203  }
204  }
205 
206  ThresholdQueue::TQImpl::TQImpl(unsigned length, unsigned maxthresh, unsigned numchan)
207  : ThresholdQueueBase(1, length, maxthresh, numchan)
208  {
209  }
210 
212  : ThresholdQueueBase(1, attr)
213  {
214  }
215 
216  ThresholdQueue::TQImpl *ThresholdQueue::TQImpl::Grow(unsigned queueLen, unsigned maxThresh, bool copy) {
217  // ignore the do-nothing case
218  if (queueLen <= QueueLength() && maxThresh <= MaxThreshold()) return 0;
219 
220  // we don't do any shrinking
221  if (maxThresh <= MaxThreshold()) maxThresh = MaxThreshold();
222  if (queueLen <= QueueLength()) queueLen = QueueLength();
223 
224  // keep our old info around
225  auto_ptr<TQImpl> oldQueue = auto_ptr<TQImpl>(new TQImpl(*this)); // just duplicate the pointers
226  // Save the head and tail, these are the only member variables
227  // that we care about that will be changed by dequeueing all the data.
228  ulong oldhead = head;
229  ulong oldtail = tail;
230 
231  // allocate a new buffer (or MirrorBufferSet)
232  AllocateBuf(queueLen,maxThresh,numChannels,mbs?1:0);
233 
234  // growth should not affect this member
235  elementsDequeued = oldQueue->ElementsDequeued();
236 
237  // copy all in oldQueue to our new buffer with existing mechanisms
238  ulong count;
239  while ( (count = oldQueue->Count()) != 0 ) {
240  if (count > oldQueue->MaxThreshold()) {
241  count = oldQueue->MaxThreshold();
242  }
243  for (ulong chan = 0; chan < numChannels; chan++) {
244  const void* src = oldQueue->GetRawDequeuePtr(count, chan);
245  void* dst = GetRawEnqueuePtr(count, chan);
246  ASSERT(src && dst);
247  memcpy(dst, src, count*elementSize);
248  }
249  Enqueue(count);
250  oldQueue->Dequeue(count);
251  }
252 
253  // growth should not affect this member
254  elementsEnqueued = oldQueue->ElementsEnqueued();
255  // reset things inside oldQueue to where they where before we copied
256  oldQueue->head = oldhead;
257  oldQueue->tail = oldtail;
258 
259  if (copy) {
260  return oldQueue.release();
261  }
262  return 0;
263  }
264 }
265 
unsigned GetLength() const
Definition: QueueAttr.h:325
ThresholdQueue(KernelBase *k, const SimpleQueueAttr &attr, bool usembs)
virtual void UnlockedGrow(unsigned queueLen, unsigned maxThresh)
unsigned QueueLength() const
Definition: QueueBase.cc:230
virtual void InternalDequeue(unsigned count)
virtual unsigned UnlockedMaxThreshold() const
unsigned GetNumChannels() const
Definition: QueueAttr.h:327
void * GetRawEnqueuePtr(unsigned thresh, unsigned chan)
Definition: QueueBase.cc:121
virtual unsigned UnlockedDequeueChannelStride() const
Definition of the queue attributes.
virtual void * InternalGetRawEnqueuePtr(unsigned thresh, unsigned chan)
ThresholdQueueBase::ulong ulong
void Dequeue(ulong count)
ulong MaxThreshold(void) const
virtual unsigned UnlockedQueueLength() const
ulong Count(void) const
const void * GetRawDequeuePtr(ulong dequeueThresh, ulong chan=0) const
The base class for all queues in the CPN library.
Definition: QueueBase.h:39
ulong NumChannels(void) const
unsigned MaxThreshold() const
Definition: QueueBase.cc:225
virtual void InternalReset()
Definition: QueueBase.cc:289
ulong QueueLength(void) const
TQImpl(unsigned length, unsigned maxthres, unsigned numchan)
unsigned GetMaxThreshold() const
Definition: QueueAttr.h:326
ulong ElementsDequeued(void) const
unsigned UnlockedNumDequeued() const
ulong Freespace(void) const
void Enqueue(unsigned count)
Definition: QueueBase.cc:160
ulong ChannelStride(void) const
void Enqueue(ulong count)
virtual bool UnlockedFull() const
ulong ElementsEnqueued(void) const
virtual void InternalEnqueue(unsigned count)
virtual unsigned UnlockedNumChannels() const
virtual unsigned UnlockedFreespace() const
Default threshold queue implementation.
virtual const void * InternalGetRawDequeuePtr(unsigned thresh, unsigned chan)
virtual unsigned UnlockedEnqueueChannelStride() const
bool Full(void) const
virtual unsigned UnlockedCount() const
TQImpl * Grow(unsigned queueLen, unsigned maxThresh, bool copy)
bool Empty(void) const
virtual void InternalReset()
void * GetRawEnqueuePtr(ulong enqueueThresh, ulong chan=0) const
unsigned UnlockedNumEnqueued() const
virtual bool UnlockedEmpty() const
This is a simplified internal representation of the queue attributes needed to create a queue...
Definition: QueueAttr.h:237
#define ASSERT(exp,...)