tube.h 11.3 KB
Newer Older
Philippe Gerum's avatar
Philippe Gerum committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/*
 * SPDX-License-Identifier: MIT
 *
 * Copyright (C) 2019 Philippe Gerum  <rpm@xenomai.org>
 *
 * The tube: a lighweight, lockless multi-reader/multi-writer FIFO
 * with a base-offset addressing variant which can work over a memory
 * segment shared between processes (*_rel form).
 */

#ifndef _EVL_TUBE_H
#define _EVL_TUBE_H

#include <stdbool.h>
#include <stdint.h>
#include <stddef.h>
#include <stdlib.h>
#include <evl/compiler.h>
#include <evl/atomic.h>

21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
/*
 * The tricky one: pulling a canister from the tube. The noticeable
 * part is how we deal with the end-of-queue situation, temporarily
 * snatching the final element - which is not usable for carrying
 * payload and cannot be returned to the caller. We have two
 * requirements:
 *
 * - first is that a queue must contain at least one (final) element,
 * so that tube_push_item() can always store a new message there,
 * adding a free canister at the end of the same queue to compensate
 * for the consumed slot. Therefore, consuming the final element with
 * a NULL ->next pointer is not allowed, which means that observing
 * __desc->head->next == NULL denotes an empty queue.
 *
 * - second is to not interpret or dereference the value of __next_dq
 * until the CAS advancing the head pointer has taken place, so that
 * we don't consume such information which might have been updated in
 * parallel by a concurrent thread which won the CAS race on advancing
 * the same head pointer (i.e. __desc->head = __head_dq->next). We
 * solve this by allowing the head pointer to go NULL shortly,
 * reverting this move only after the CAS has succeeded, which
 * guarantees us that nobody else was granted the same item
 * (__head_dq). Another concurrent pull might fetch a NULL head in the
 * meantime, which also denotes an empty queue (like seeing
 * __head_dq->next == NULL).
 *
 * The only drawback of this scheme is as follows:
 *
 * A: tube_pull() sets head = NULL        B: tube_push() stores to final element
 *                                        B: tube_pull() observes empty queue (head == NULL)
 * A: tube_pull() restores head = &final
 *
 * IOW, when a puller reaches the non-removable final element of a
 * queue, a concurrent pusher might still observe an empty queue after
 * a successful push, until the puller eventually restores the head
 * pointer, revealing the new message. Although not perfect, that
 * seems acceptable from the standpoint of the pusher knowing that
 * concurrent pullers are present.
 */
#define tube_pull(__desc)						\
	({								\
		typeof((__desc)->head) __next_dq, __head_dq;		\
		do {							\
			__head_dq = atomic_load(&(__desc)->head);	\
			if (__head_dq == NULL)				\
				break;					\
			__next_dq = atomic_load(&(__head_dq)->next);	\
		} while (!__sync_bool_compare_and_swap(			\
				&(__desc)->head, __head_dq, __next_dq)); \
		if (__head_dq && __next_dq == NULL) {			\
			atomic_store(&(__desc)->head, __head_dq);	\
			__head_dq = NULL;				\
		}							\
		__head_dq;						\
Philippe Gerum's avatar
Philippe Gerum committed
75
76
	})

77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
/*
 * The push operation moves the tail to a free canister, which serves
 * as the new final element, returning the former tail element which
 * will convey the payload (CAS ensures that multiple callers cannot
 * get the same element).
 *
 * FIXME: we still have a queue connectivity issue there if preempted
 * indefinitely between the prep and finish ops. Revisit.
 */
#define tube_push_prepare(__desc, __free)				\
  ({									\
		typeof((__desc)->tail) __old_qp;			\
		atomic_store(&(__free)->next, NULL);			\
		smp_mb();						\
		do {							\
			__old_qp = atomic_load(&(__desc)->tail);	\
		} while (!__sync_bool_compare_and_swap(			\
				&(__desc)->tail, __old_qp, __free));	\
		__old_qp;						\
  })

Philippe Gerum's avatar
Philippe Gerum committed
98
99
100
101
102
103
104
105
106
107
/*
 * We may directly compete with tube_push_prepare() on the tail
 * pointer, and indirectly with tube_pull() which may be walking the
 * canister list through the ->next link. A NULL link denotes the end
 * canister, tube_pull() cannot walk past that one, and never dequeues
 * it. Since __new was the previous end canister referred to by the
 * tail pointer, __new->next is NULL until sending is completed.
 */
#define tube_push_finish(__desc, __new, __free)		\
	do {						\
108
		smp_mb();				\
109
		atomic_store(&(__new)->next, __free);	\
Philippe Gerum's avatar
Philippe Gerum committed
110
111
112
113
114
	} while (0)

#define tube_push(__desc, __free)				\
	do {							\
		typeof((__desc)->tail) __new_q;			\
115
		compiler_barrier();				\
Philippe Gerum's avatar
Philippe Gerum committed
116
117
118
119
120
121
122
123
124
125
126
127
		__new_q = tube_push_prepare(__desc, __free);	\
		tube_push_finish(__desc, __new_q, __free);	\
	} while (0)

#define tube_push_item(__desc, __item, __free)			\
	do {							\
		typeof((__desc)->tail) __new_qi;		\
		__new_qi = tube_push_prepare(__desc, __free);	\
		__new_qi->payload = (__item);			\
		tube_push_finish(__desc, __new_qi, __free);	\
	} while (0)

128
#define DECLARE_EVL_CANISTER(__can_struct, __payload)		\
Philippe Gerum's avatar
Philippe Gerum committed
129
130
	struct __can_struct {					\
		struct __can_struct *next;			\
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
		typeof(__payload) payload;			\
	}

#define DECLARE_CANISTER_QUEUE(__can_struct)	\
	struct {				\
		struct __can_struct *head;	\
		struct __can_struct *tail;	\
		struct __can_struct first;	\
	}

#define CANISTER_QUEUE_INITIALIZER(__name)	\
	{					\
		.head = &(__name).first,	\
		.tail = &(__name).first,	\
		.first = {			\
			.next = NULL,		\
		},				\
Philippe Gerum's avatar
Philippe Gerum committed
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
	}

#define DECLARE_EVL_TUBE(__name, __can_struct)			\
	struct __name {						\
		DECLARE_CANISTER_QUEUE(__can_struct) pending;	\
		DECLARE_CANISTER_QUEUE(__can_struct) free;	\
		long max_items;					\
	}

#define TUBE_INITIALIZER(__name)					\
	{								\
		.pending = CANISTER_QUEUE_INITIALIZER((__name).pending), \
		.free = CANISTER_QUEUE_INITIALIZER((__name).free),	\
		.max_items = 0,						\
	}

#define evl_get_tube_size(__name, __count)			\
	({							\
		struct __name *__ptr;				\
		sizeof(struct __name) +				\
			sizeof(__ptr->free.first) * __count;	\
	})

#define evl_init_tube(__tube, __freevec, __count)		\
	do {							\
		typeof((__tube)->pending.tail) __i;		\
		int __n;					\
		*(__tube) = (typeof(*(__tube)))			\
			TUBE_INITIALIZER(*(__tube));		\
		for (__n = 0, __i = (typeof(__i))(__freevec);	\
		     __n < (__count); __n++, __i++)		\
			tube_push(&(__tube)->free, __i);	\
		(__tube)->max_items = (__count);		\
	} while (0)

#define evl_send_tube(__tube, __item)				\
	({							\
		bool __ret = false;				\
		typeof((__tube)->pending.tail) __new;		\
		__new = tube_pull(&(__tube)->free);		\
188
		if (__new) {					\
Philippe Gerum's avatar
Philippe Gerum committed
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
			tube_push_item(&(__tube)->pending,	\
				__item, __new);			\
			__ret = true;				\
		}						\
		__ret;						\
	})

#define evl_receive_tube(__tube, __item)			\
	({							\
		bool __ret = false;				\
		typeof((__tube)->pending.tail) __next;		\
		__next = tube_pull(&(__tube)->pending);	\
		if (__next) {					\
			__item = __next->payload;		\
			tube_push(&(__tube)->free, __next);	\
			__ret = true;				\
		}						\
		__ret;						\
	})

/*
 * Position-independent variant.
 */

#define tube_push_prepare_rel(__base, __desc, __free)			\
	({								\
		typeof((__desc)->tail) __old_qp;			\
		uintptr_t __off_qp = __memoff(__base, __free);		\
217
		atomic_store(&(__free)->next, 0);			\
218
		smp_mb();						\
Philippe Gerum's avatar
Philippe Gerum committed
219
		do {							\
220
			__old_qp = atomic_load(&(__desc)->tail);	\
Philippe Gerum's avatar
Philippe Gerum committed
221
222
223
224
225
226
		} while (__sync_val_compare_and_swap(			\
				&(__desc)->tail,			\
				__old_qp, __off_qp) != __old_qp);	\
		__memptr(__base, __old_qp);				\
	})

227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
/* See tube_pull() for details. */
#define tube_pull_rel(__base, __desc)					\
	({								\
		typeof((__desc)->head) __next_dq, __head_dq;		\
		typeof((__desc)->first[0]) *__head_dqptr;		\
		do {							\
			__head_dq = atomic_load(&(__desc)->head);	\
			if (__head_dq == 0) {				\
				__head_dqptr = NULL;			\
				break;					\
			}						\
			__head_dqptr = (typeof(__head_dqptr))		\
				__memptr(__base, __head_dq);		\
			__next_dq = atomic_load(&(__head_dqptr)->next);	\
		} while (!__sync_bool_compare_and_swap(			\
				&(__desc)->head, __head_dq, __next_dq)); \
		if (__head_dq && __next_dq == 0) {			\
			atomic_store(&(__desc)->head, __head_dq);	\
			__head_dqptr = NULL;				\
		}							\
		__head_dqptr;						\
	})

250
251
#define tube_push_finish_rel(__base, __desc, __new, __free)		\
	do {								\
252
		smp_mb();						\
253
		atomic_store(&(__new)->next, __memoff(__base, __free));	\
Philippe Gerum's avatar
Philippe Gerum committed
254
255
256
257
258
	} while (0)

#define tube_push_rel(__base, __desc, __free)				\
	do {								\
		typeof((__desc)->first[0]) *__new_q;			\
259
		compiler_barrier();					\
260
261
262
		__new_q = (typeof(__new_q))				\
			tube_push_prepare_rel(__base, __desc, __free);	\
		tube_push_finish_rel(__base, __desc, __new_q, __free);	\
Philippe Gerum's avatar
Philippe Gerum committed
263
264
265
266
267
	} while (0)

#define tube_push_item_rel(__base, __desc, __item, __free)		\
	do {								\
		typeof((__desc)->first[0]) *__new_qi;			\
268
269
		__new_qi = (typeof(__new_qi))				\
			tube_push_prepare_rel(__base, __desc, __free);	\
Philippe Gerum's avatar
Philippe Gerum committed
270
271
272
273
		__new_qi->payload = (__item);				\
		tube_push_finish_rel(__base, __desc, __new_qi, __free); \
	} while (0)

274
#define DECLARE_EVL_CANISTER_REL(__can_struct, __payload)	\
275
276
	struct __can_struct {					\
		uintptr_t next;					\
277
		typeof(__payload) payload;			\
278
279
	}							\

280
281
282
283
284
285
286
287
#define DECLARE_CANISTER_QUEUE_REL(__can_struct)		\
	struct {						\
		uintptr_t head;					\
		uintptr_t tail;					\
		/* Must NOT be first (non-zero offset) */	\
		struct __can_struct first[1];			\
	}

Philippe Gerum's avatar
Philippe Gerum committed
288
289
290
291
292
293
294
#define DECLARE_EVL_TUBE_REL(__name, __can_struct, __payload)		\
	struct __name {							\
		DECLARE_CANISTER_QUEUE_REL(__can_struct) pending;	\
		DECLARE_CANISTER_QUEUE_REL(__can_struct) free;		\
		long max_items;						\
	}

295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
#define canq_offsetof(__baseoff, __can_struct, __member)		\
	({								\
		DECLARE_CANISTER_QUEUE_REL(__can_struct) __tmp;		\
		(void)(__tmp);	/* Pedantic indirection for C++ */	\
		offsetof(typeof(__tmp), __member) + __baseoff;		\
	})

#define CANISTER_QUEUE_INITIALIZER_REL(__baseoff, __can_struct)		\
	{								\
		.head = canq_offsetof(__baseoff, __can_struct, first),	\
		.tail = canq_offsetof(__baseoff, __can_struct, first),	\
		.first = {						\
			[0] = {						\
				.next = 0,				\
			},						\
		},							\
	}

Philippe Gerum's avatar
Philippe Gerum committed
313
314
315
316
317
318
319
320
321
322
323
#define TUBE_INITIALIZER_REL(__name, __can_struct)			\
	{								\
		.pending = CANISTER_QUEUE_INITIALIZER_REL(__memoff(&(__name), \
				&(__name).pending), __can_struct), 	\
		.free = CANISTER_QUEUE_INITIALIZER_REL(__memoff(&(__name), \
				&(__name).free), __can_struct),		\
		.max_items = 0,						\
	}

#define evl_init_tube_rel(__name, __can_struct, __mem, __size)		\
	({								\
324
		struct __name *__tube = (typeof(__tube))(__mem);	\
Philippe Gerum's avatar
Philippe Gerum committed
325
326
327
		typeof(__tube->free.first[0]) *__i, *__iend;		\
		*__tube = (typeof(*__tube))				\
			TUBE_INITIALIZER_REL(*__tube, __can_struct);	\
328
		__iend = (typeof(__iend))((char *)__mem + __size);	\
Philippe Gerum's avatar
Philippe Gerum committed
329
330
331
332
333
334
335
		for (__i = (typeof(__i))(__tube + 1); __i < __iend; __i++) { \
			tube_push_rel(__tube, &(__tube)->free, __i);	\
		}							\
		__tube->max_items = __i - &__tube->free.first[1];	\
		__tube;							\
	})

336
337
338
#define evl_get_tube_size_rel(__name, __count)	\
	evl_get_tube_size(__name, __count)

Philippe Gerum's avatar
Philippe Gerum committed
339
340
341
342
#define evl_send_tube_rel(__tube, __item)			\
	({							\
		bool __ret = false;				\
		typeof((__tube)->pending.first[0]) *__new;	\
343
344
		__new = (typeof(__new))				\
			tube_pull_rel(__tube, &(__tube)->free); \
345
		if (__new) {					\
Philippe Gerum's avatar
Philippe Gerum committed
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
			tube_push_item_rel(__tube,		\
					&(__tube)->pending,	\
					__item, __new);		\
			__ret = true;				\
		}						\
		__ret;						\
	})

#define evl_receive_tube_rel(__tube, __item)				\
	({								\
		bool __ret = false;					\
		typeof((__tube)->pending.first[0]) *__next;		\
		__next = tube_pull_rel(__tube, &(__tube)->pending);	\
		if (__next) {						\
			__item = __next->payload;			\
			tube_push_rel(__tube, &(__tube)->free, __next); \
			__ret = true;					\
		}							\
		__ret;							\
	})

#endif /* _EVL_TUBE_H */