GNU Radio 3.6.5.1 C++ API
circular_buffer.h
Go to the documentation of this file.
1 /* -*- c++ -*- */
2 /*
3  * Copyright 2006,2009,2010 Free Software Foundation, Inc.
4  *
5  * This file is part of GNU Radio.
6  *
7  * GNU Radio is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 3, or (at your option)
10  * any later version.
11  *
12  * GNU Radio is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with GNU Radio; see the file COPYING. If not, write to
19  * the Free Software Foundation, Inc., 51 Franklin Street,
20  * Boston, MA 02110-1301, USA.
21  */
22 
23 #ifndef _CIRCULAR_BUFFER_H_
24 #define _CIRCULAR_BUFFER_H_
25 
26 #include <gruel/thread.h>
27 #include <iostream>
28 #include <stdexcept>
29 
30 #ifndef DO_DEBUG
31 #define DO_DEBUG 0
32 #endif
33 
34 #if DO_DEBUG
35 #define DEBUG(X) do{X} while(0);
36 #else
37 #define DEBUG(X) do{} while(0);
38 #endif
39 
40 template <class T>
42 {
43 private:
44 // the buffer to use
45  T* d_buffer;
46 
47 // the following are in Items (type T)
48  size_t d_bufLen_I, d_readNdx_I, d_writeNdx_I;
49  size_t d_n_avail_write_I, d_n_avail_read_I;
50 
51 // stuff to control access to class internals
52  gruel::mutex* d_internal;
53  gruel::condition_variable* d_readBlock;
54  gruel::condition_variable* d_writeBlock;
55 
56 // booleans to decide how to control reading, writing, and aborting
57  bool d_doWriteBlock, d_doFullRead, d_doAbort;
58 
59  void delete_mutex_cond () {
60  if (d_internal) {
61  delete d_internal;
62  d_internal = NULL;
63  }
64  if (d_readBlock) {
65  delete d_readBlock;
66  d_readBlock = NULL;
67  }
68  if (d_writeBlock) {
69  delete d_writeBlock;
70  d_writeBlock = NULL;
71  }
72  };
73 
74 public:
75  circular_buffer (size_t bufLen_I,
76  bool doWriteBlock = true, bool doFullRead = false) {
77  if (bufLen_I == 0)
78  throw std::runtime_error ("circular_buffer(): "
79  "Number of items to buffer must be > 0.\n");
80  d_bufLen_I = bufLen_I;
81  d_buffer = (T*) new T[d_bufLen_I];
82  d_doWriteBlock = doWriteBlock;
83  d_doFullRead = doFullRead;
84  d_internal = NULL;
85  d_readBlock = d_writeBlock = NULL;
86  reset ();
87  DEBUG (std::cerr << "c_b(): buf len (items) = " << d_bufLen_
88  << ", doWriteBlock = " << (d_doWriteBlock ? "true" : "false")
89  << ", doFullRead = " << (d_doFullRead ? "true" : "false")
90  << std::endl);
91  };
92 
94  delete_mutex_cond ();
95  delete [] d_buffer;
96  };
97 
98  inline size_t n_avail_write_items () {
99  gruel::scoped_lock l (*d_internal);
100  size_t retVal = d_n_avail_write_I;
101  return (retVal);
102  };
103 
104  inline size_t n_avail_read_items () {
105  gruel::scoped_lock l (*d_internal);
106  size_t retVal = d_n_avail_read_I;
107  return (retVal);
108  };
109 
110  inline size_t buffer_length_items () {return (d_bufLen_I);};
111  inline bool do_write_block () {return (d_doWriteBlock);};
112  inline bool do_full_read () {return (d_doFullRead);};
113 
114  void reset () {
115  d_doAbort = false;
116  bzero (d_buffer, d_bufLen_I * sizeof (T));
117  d_readNdx_I = d_writeNdx_I = d_n_avail_read_I = 0;
118  d_n_avail_write_I = d_bufLen_I;
119  delete_mutex_cond ();
120  // create a mutex to handle contention of shared resources;
121  // any routine needed access to shared resources uses lock()
122  // before doing anything, then unlock() when finished.
123  d_internal = new gruel::mutex ();
124  // link the internal mutex to the read and write conditions;
125  // when wait() is called, the internal mutex will automatically
126  // be unlock()'ed. Upon return (from a notify_one() to the condition),
127  // the internal mutex will be lock()'ed.
128  d_readBlock = new gruel::condition_variable ();
129  d_writeBlock = new gruel::condition_variable ();
130  };
131 
132 /*
133  * enqueue: add the given buffer of item-length to the queue,
134  * first-in-first-out (FIFO).
135  *
136  * inputs:
137  * buf: a pointer to the buffer holding the data
138  *
139  * bufLen_I: the buffer length in items (of the instantiated type)
140  *
141  * returns:
142  * -1: on overflow (write is not blocking, and data is being
143  * written faster than it is being read)
144  * 0: if nothing to do (0 length buffer)
145  * 1: if success
146  * 2: in the process of aborting, do doing nothing
147  *
148  * will throw runtime errors if inputs are improper:
149  * buffer pointer is NULL
150  * buffer length is larger than the instantiated buffer length
151  */
152 
153  int enqueue (T* buf, size_t bufLen_I) {
154  DEBUG (std::cerr << "enqueue: buf = " << (void*) buf
155  << ", bufLen = " << bufLen_I
156  << ", #av_wr = " << d_n_avail_write_I
157  << ", #av_rd = " << d_n_avail_read_I << std::endl);
158  if (bufLen_I > d_bufLen_I) {
159  std::cerr << "ERROR: cannot add buffer longer ("
160  << bufLen_I << ") than instantiated length ("
161  << d_bufLen_I << ")." << std::endl;
162  throw std::runtime_error ("circular_buffer::enqueue()");
163  }
164 
165  if (bufLen_I == 0)
166  return (0);
167  if (!buf)
168  throw std::runtime_error ("circular_buffer::enqueue(): "
169  "input buffer is NULL.\n");
170  gruel::scoped_lock l (*d_internal);
171  if (d_doAbort) {
172  return (2);
173  }
174  // set the return value to 1: success; change if needed
175  int retval = 1;
176  if (bufLen_I > d_n_avail_write_I) {
177  if (d_doWriteBlock) {
178  while (bufLen_I > d_n_avail_write_I) {
179  DEBUG (std::cerr << "enqueue: #len > #a, waiting." << std::endl);
180  // wait; will automatically unlock() the internal mutex via
181  // the scoped lock
182  d_writeBlock->wait (l);
183  // and auto re-lock() it here.
184  if (d_doAbort) {
185  DEBUG (std::cerr << "enqueue: #len > #a, aborting." << std::endl);
186  return (2);
187  }
188  DEBUG (std::cerr << "enqueue: #len > #a, done waiting." << std::endl);
189  }
190  } else {
191  d_n_avail_read_I = d_bufLen_I - bufLen_I;
192  d_n_avail_write_I = bufLen_I;
193  DEBUG (std::cerr << "circular_buffer::enqueue: overflow" << std::endl);
194  retval = -1;
195  }
196  }
197  size_t n_now_I = d_bufLen_I - d_writeNdx_I, n_start_I = 0;
198  if (n_now_I > bufLen_I)
199  n_now_I = bufLen_I;
200  else if (n_now_I < bufLen_I)
201  n_start_I = bufLen_I - n_now_I;
202  bcopy (buf, &(d_buffer[d_writeNdx_I]), n_now_I * sizeof (T));
203  if (n_start_I) {
204  bcopy (&(buf[n_now_I]), d_buffer, n_start_I * sizeof (T));
205  d_writeNdx_I = n_start_I;
206  } else
207  d_writeNdx_I += n_now_I;
208  d_n_avail_read_I += bufLen_I;
209  d_n_avail_write_I -= bufLen_I;
210  d_readBlock->notify_one ();
211  return (retval);
212  };
213 
214 /*
215  * dequeue: removes from the queue the number of items requested, or
216  * available, into the given buffer on a FIFO basis.
217  *
218  * inputs:
219  * buf: a pointer to the buffer into which to copy the data
220  *
221  * bufLen_I: pointer to the number of items to remove in items
222  * (of the instantiated type)
223  *
224  * returns:
225  * 0: if nothing to do (0 length buffer)
226  * 1: if success
227  * 2: in the process of aborting, do doing nothing
228  *
229  * will throw runtime errors if inputs are improper:
230  * buffer pointer is NULL
231  * buffer length pointer is NULL
232  * buffer length is larger than the instantiated buffer length
233  */
234 
235  int dequeue (T* buf, size_t* bufLen_I) {
236  DEBUG (std::cerr << "dequeue: buf = " << ((void*) buf)
237  << ", *bufLen = " << (*bufLen_I)
238  << ", #av_wr = " << d_n_avail_write_I
239  << ", #av_rd = " << d_n_avail_read_I << std::endl);
240  if (!bufLen_I)
241  throw std::runtime_error ("circular_buffer::dequeue(): "
242  "input bufLen pointer is NULL.\n");
243  if (!buf)
244  throw std::runtime_error ("circular_buffer::dequeue(): "
245  "input buffer pointer is NULL.\n");
246  size_t l_bufLen_I = *bufLen_I;
247  if (l_bufLen_I == 0)
248  return (0);
249  if (l_bufLen_I > d_bufLen_I) {
250  std::cerr << "ERROR: cannot remove buffer longer ("
251  << l_bufLen_I << ") than instantiated length ("
252  << d_bufLen_I << ")." << std::endl;
253  throw std::runtime_error ("circular_buffer::dequeue()");
254  }
255 
256  gruel::scoped_lock l (*d_internal);
257  if (d_doAbort) {
258  return (2);
259  }
260  if (d_doFullRead) {
261  while (d_n_avail_read_I < l_bufLen_I) {
262  DEBUG (std::cerr << "dequeue: #a < #len, waiting." << std::endl);
263  // wait; will automatically unlock() the internal mutex via
264  // the scoped lock
265  d_readBlock->wait (l);
266  // and re-lock() it here.
267  if (d_doAbort) {
268  DEBUG (std::cerr << "dequeue: #a < #len, aborting." << std::endl);
269  return (2);
270  }
271  DEBUG (std::cerr << "dequeue: #a < #len, done waiting." << std::endl);
272  }
273  } else {
274  while (d_n_avail_read_I == 0) {
275  DEBUG (std::cerr << "dequeue: #a == 0, waiting." << std::endl);
276  // wait; will automatically unlock() the internal mutex via
277  // the scoped lock
278  d_readBlock->wait (l);
279  // and re-lock() it here.
280  if (d_doAbort) {
281  DEBUG (std::cerr << "dequeue: #a == 0, aborting." << std::endl);
282  return (2);
283  }
284  DEBUG (std::cerr << "dequeue: #a == 0, done waiting." << std::endl);
285  }
286  }
287  if (l_bufLen_I > d_n_avail_read_I)
288  l_bufLen_I = d_n_avail_read_I;
289  size_t n_now_I = d_bufLen_I - d_readNdx_I, n_start_I = 0;
290  if (n_now_I > l_bufLen_I)
291  n_now_I = l_bufLen_I;
292  else if (n_now_I < l_bufLen_I)
293  n_start_I = l_bufLen_I - n_now_I;
294  bcopy (&(d_buffer[d_readNdx_I]), buf, n_now_I * sizeof (T));
295  if (n_start_I) {
296  bcopy (d_buffer, &(buf[n_now_I]), n_start_I * sizeof (T));
297  d_readNdx_I = n_start_I;
298  } else
299  d_readNdx_I += n_now_I;
300  *bufLen_I = l_bufLen_I;
301  d_n_avail_read_I -= l_bufLen_I;
302  d_n_avail_write_I += l_bufLen_I;
303  d_writeBlock->notify_one ();
304  return (1);
305  };
306 
307  void abort () {
308  gruel::scoped_lock l (*d_internal);
309  d_doAbort = true;
310  d_writeBlock->notify_one ();
311  d_readBlock->notify_one ();
312  };
313 };
314 
315 #endif /* _CIRCULAR_BUFFER_H_ */