corosync  2.4.4
cs_queue.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2002-2004 MontaVista Software, Inc.
3  * Copyright (c) 2006-2011 Red Hat, Inc.
4  *
5  * All rights reserved.
6  *
7  * Author: Steven Dake (sdake@redhat.com)
8  *
9  * This software licensed under BSD license, the text of which follows:
10  *
11  * Redistribution and use in source and binary forms, with or without
12  * modification, are permitted provided that the following conditions are met:
13  *
14  * - Redistributions of source code must retain the above copyright notice,
15  * this list of conditions and the following disclaimer.
16  * - Redistributions in binary form must reproduce the above copyright notice,
17  * this list of conditions and the following disclaimer in the documentation
18  * and/or other materials provided with the distribution.
19  * - Neither the name of the MontaVista Software, Inc. nor the names of its
20  * contributors may be used to endorse or promote products derived from this
21  * software without specific prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
33  * THE POSSIBILITY OF SUCH DAMAGE.
34  */
35 #ifndef CS_QUEUE_H_DEFINED
36 #define CS_QUEUE_H_DEFINED
37 
38 #include <string.h>
39 #include <stdlib.h>
40 #include <pthread.h>
41 #include <errno.h>
42 #include "assert.h"
43 
44 struct cs_queue {
45  int head;
46  int tail;
47  int used;
48  int usedhw;
49  int size;
50  void *items;
52  int iterator;
53  pthread_mutex_t mutex;
55 };
56 
57 static inline int cs_queue_init (struct cs_queue *cs_queue, int cs_queue_items, int size_per_item, int threaded_mode_enabled) {
58  cs_queue->head = 0;
59  cs_queue->tail = cs_queue_items - 1;
60  cs_queue->used = 0;
61  cs_queue->usedhw = 0;
62  cs_queue->size = cs_queue_items;
63  cs_queue->size_per_item = size_per_item;
65 
66  cs_queue->items = malloc (cs_queue_items * size_per_item);
67  if (cs_queue->items == 0) {
68  return (-ENOMEM);
69  }
70  memset (cs_queue->items, 0, cs_queue_items * size_per_item);
71  if (cs_queue->threaded_mode_enabled) {
72  pthread_mutex_init (&cs_queue->mutex, NULL);
73  }
74  return (0);
75 }
76 
77 static inline int cs_queue_reinit (struct cs_queue *cs_queue)
78 {
79  if (cs_queue->threaded_mode_enabled) {
80  pthread_mutex_lock (&cs_queue->mutex);
81  }
82  cs_queue->head = 0;
83  cs_queue->tail = cs_queue->size - 1;
84  cs_queue->used = 0;
85  cs_queue->usedhw = 0;
86 
87  memset (cs_queue->items, 0, cs_queue->size * cs_queue->size_per_item);
88  if (cs_queue->threaded_mode_enabled) {
89  pthread_mutex_unlock (&cs_queue->mutex);
90  }
91  return (0);
92 }
93 
94 static inline void cs_queue_free (struct cs_queue *cs_queue) {
95  if (cs_queue->threaded_mode_enabled) {
96  pthread_mutex_destroy (&cs_queue->mutex);
97  }
98  free (cs_queue->items);
99 }
100 
101 static inline int cs_queue_is_full (struct cs_queue *cs_queue) {
102  int full;
103 
104  if (cs_queue->threaded_mode_enabled) {
105  pthread_mutex_lock (&cs_queue->mutex);
106  }
107  full = ((cs_queue->size - 1) == cs_queue->used);
108  if (cs_queue->threaded_mode_enabled) {
109  pthread_mutex_unlock (&cs_queue->mutex);
110  }
111  return (full);
112 }
113 
114 static inline int cs_queue_is_empty (struct cs_queue *cs_queue) {
115  int empty;
116 
117  if (cs_queue->threaded_mode_enabled) {
118  pthread_mutex_lock (&cs_queue->mutex);
119  }
120  empty = (cs_queue->used == 0);
121  if (cs_queue->threaded_mode_enabled) {
122  pthread_mutex_unlock (&cs_queue->mutex);
123  }
124  return (empty);
125 }
126 
127 static inline void cs_queue_item_add (struct cs_queue *cs_queue, void *item)
128 {
129  char *cs_queue_item;
130  int cs_queue_position;
131 
132  if (cs_queue->threaded_mode_enabled) {
133  pthread_mutex_lock (&cs_queue->mutex);
134  }
135  cs_queue_position = cs_queue->head;
136  cs_queue_item = cs_queue->items;
137  cs_queue_item += cs_queue_position * cs_queue->size_per_item;
138  memcpy (cs_queue_item, item, cs_queue->size_per_item);
139 
140  assert (cs_queue->tail != cs_queue->head);
141 
142  cs_queue->head = (cs_queue->head + 1) % cs_queue->size;
143  cs_queue->used++;
144  if (cs_queue->used > cs_queue->usedhw) {
145  cs_queue->usedhw = cs_queue->used;
146  }
147  if (cs_queue->threaded_mode_enabled) {
148  pthread_mutex_unlock (&cs_queue->mutex);
149  }
150 }
151 
152 static inline void *cs_queue_item_get (struct cs_queue *cs_queue)
153 {
154  char *cs_queue_item;
155  int cs_queue_position;
156 
157  if (cs_queue->threaded_mode_enabled) {
158  pthread_mutex_lock (&cs_queue->mutex);
159  }
160  cs_queue_position = (cs_queue->tail + 1) % cs_queue->size;
161  cs_queue_item = cs_queue->items;
162  cs_queue_item += cs_queue_position * cs_queue->size_per_item;
163  if (cs_queue->threaded_mode_enabled) {
164  pthread_mutex_unlock (&cs_queue->mutex);
165  }
166  return ((void *)cs_queue_item);
167 }
168 
169 static inline void cs_queue_item_remove (struct cs_queue *cs_queue) {
170  if (cs_queue->threaded_mode_enabled) {
171  pthread_mutex_lock (&cs_queue->mutex);
172  }
173  cs_queue->tail = (cs_queue->tail + 1) % cs_queue->size;
174 
175  assert (cs_queue->tail != cs_queue->head);
176 
177  cs_queue->used--;
178  assert (cs_queue->used >= 0);
179  if (cs_queue->threaded_mode_enabled) {
180  pthread_mutex_unlock (&cs_queue->mutex);
181  }
182 }
183 
184 static inline void cs_queue_items_remove (struct cs_queue *cs_queue, int rel_count)
185 {
186  if (cs_queue->threaded_mode_enabled) {
187  pthread_mutex_lock (&cs_queue->mutex);
188  }
189  cs_queue->tail = (cs_queue->tail + rel_count) % cs_queue->size;
190 
191  assert (cs_queue->tail != cs_queue->head);
192 
193  cs_queue->used -= rel_count;
194  if (cs_queue->threaded_mode_enabled) {
195  pthread_mutex_unlock (&cs_queue->mutex);
196  }
197 }
198 
199 
200 static inline void cs_queue_item_iterator_init (struct cs_queue *cs_queue)
201 {
202  if (cs_queue->threaded_mode_enabled) {
203  pthread_mutex_lock (&cs_queue->mutex);
204  }
205  cs_queue->iterator = (cs_queue->tail + 1) % cs_queue->size;
206  if (cs_queue->threaded_mode_enabled) {
207  pthread_mutex_unlock (&cs_queue->mutex);
208  }
209 }
210 
211 static inline void *cs_queue_item_iterator_get (struct cs_queue *cs_queue)
212 {
213  char *cs_queue_item;
214  int cs_queue_position;
215 
216  if (cs_queue->threaded_mode_enabled) {
217  pthread_mutex_lock (&cs_queue->mutex);
218  }
219  cs_queue_position = (cs_queue->iterator) % cs_queue->size;
220  if (cs_queue->iterator == cs_queue->head) {
221  if (cs_queue->threaded_mode_enabled) {
222  pthread_mutex_unlock (&cs_queue->mutex);
223  }
224  return (0);
225  }
226  cs_queue_item = cs_queue->items;
227  cs_queue_item += cs_queue_position * cs_queue->size_per_item;
228  if (cs_queue->threaded_mode_enabled) {
229  pthread_mutex_unlock (&cs_queue->mutex);
230  }
231  return ((void *)cs_queue_item);
232 }
233 
234 static inline int cs_queue_item_iterator_next (struct cs_queue *cs_queue)
235 {
236  int next_res;
237 
238  if (cs_queue->threaded_mode_enabled) {
239  pthread_mutex_lock (&cs_queue->mutex);
240  }
241  cs_queue->iterator = (cs_queue->iterator + 1) % cs_queue->size;
242 
243  next_res = cs_queue->iterator == cs_queue->head;
244  if (cs_queue->threaded_mode_enabled) {
245  pthread_mutex_unlock (&cs_queue->mutex);
246  }
247  return (next_res);
248 }
249 
250 static inline void cs_queue_avail (struct cs_queue *cs_queue, int *avail)
251 {
252  if (cs_queue->threaded_mode_enabled) {
253  pthread_mutex_lock (&cs_queue->mutex);
254  }
255  *avail = cs_queue->size - cs_queue->used - 2;
256  assert (*avail >= 0);
257  if (cs_queue->threaded_mode_enabled) {
258  pthread_mutex_unlock (&cs_queue->mutex);
259  }
260 }
261 
262 static inline int cs_queue_used (struct cs_queue *cs_queue) {
263  int used;
264 
265  if (cs_queue->threaded_mode_enabled) {
266  pthread_mutex_lock (&cs_queue->mutex);
267  }
268  used = cs_queue->used;
269  if (cs_queue->threaded_mode_enabled) {
270  pthread_mutex_unlock (&cs_queue->mutex);
271  }
272 
273  return (used);
274 }
275 
276 static inline int cs_queue_usedhw (struct cs_queue *cs_queue) {
277  int usedhw;
278 
279  if (cs_queue->threaded_mode_enabled) {
280  pthread_mutex_lock (&cs_queue->mutex);
281  }
282 
283  usedhw = cs_queue->usedhw;
284 
285  if (cs_queue->threaded_mode_enabled) {
286  pthread_mutex_unlock (&cs_queue->mutex);
287  }
288 
289  return (usedhw);
290 }
291 
292 #endif /* CS_QUEUE_H_DEFINED */
int iterator
Definition: cs_queue.h:52
int head
Definition: cs_queue.h:45
void * items
Definition: cs_queue.h:50
int used
Definition: cs_queue.h:47
int usedhw
Definition: cs_queue.h:48
int size
Definition: cs_queue.h:49
pthread_mutex_t mutex
Definition: cs_queue.h:53
int tail
Definition: cs_queue.h:46
int size_per_item
Definition: cs_queue.h:51
int threaded_mode_enabled
Definition: cs_queue.h:54