PonyPlayer
readerwriterqueue.h
浏览该文件的文档.
1// ©2013-2020 Cameron Desrochers.
2// Distributed under the simplified BSD license (see the license file that
3// should have come with this header).
4
5#pragma once
6
7#include "atomicops.h"
8#include <new>
9#include <type_traits>
10#include <utility>
11#include <cassert>
12#include <stdexcept>
13#include <new>
14#include <cstdint>
15#include <cstdlib> // For malloc/free/abort & size_t
16#include <memory>
17#if __cplusplus > 199711L || _MSC_VER >= 1700 // C++11 or VS2012
18#include <chrono>
19#endif
20
21
22// A lock-free queue for a single-consumer, single-producer architecture.
23// The queue is also wait-free in the common path (except if more memory
24// needs to be allocated, in which case malloc is called).
25// Allocates memory sparingly, and only once if the original maximum size
26// estimate is never exceeded.
27// Tested on x86/x64 processors, but semantics should be correct for all
28// architectures (given the right implementations in atomicops.h), provided
29// that aligned integer and pointer accesses are naturally atomic.
30// Note that there should only be one consumer thread and producer thread;
31// Switching roles of the threads, or using multiple consecutive threads for
32// one role, is not safe unless properly synchronized.
33// Using the queue exclusively from one thread is fine, though a bit silly.
34
35#ifndef MOODYCAMEL_CACHE_LINE_SIZE
36#define MOODYCAMEL_CACHE_LINE_SIZE 64
37#endif
38
39#ifndef MOODYCAMEL_EXCEPTIONS_ENABLED
40#if (defined(_MSC_VER) && defined(_CPPUNWIND)) || (defined(__GNUC__) && defined(__EXCEPTIONS)) || (!defined(_MSC_VER) && !defined(__GNUC__))
41#define MOODYCAMEL_EXCEPTIONS_ENABLED
42#endif
43#endif
44
45#ifndef MOODYCAMEL_HAS_EMPLACE
46#if !defined(_MSC_VER) || _MSC_VER >= 1800 // variadic templates: either a non-MS compiler or VS >= 2013
47#define MOODYCAMEL_HAS_EMPLACE 1
48#endif
49#endif
50
51#ifndef MOODYCAMEL_MAYBE_ALIGN_TO_CACHELINE
52#if defined (__APPLE__) && defined (__MACH__) && __cplusplus >= 201703L
53// This is required to find out what deployment target we are using
54#include <CoreFoundation/CoreFoundation.h>
55#if !defined(MAC_OS_X_VERSION_MIN_REQUIRED) || MAC_OS_X_VERSION_MIN_REQUIRED < MAC_OS_X_VERSION_10_14
56// C++17 new(size_t, align_val_t) is not backwards-compatible with older versions of macOS, so we can't support over-alignment in this case
57#define MOODYCAMEL_MAYBE_ALIGN_TO_CACHELINE
58#endif
59#endif
60#endif
61
62#ifndef MOODYCAMEL_MAYBE_ALIGN_TO_CACHELINE
63#define MOODYCAMEL_MAYBE_ALIGN_TO_CACHELINE AE_ALIGN(MOODYCAMEL_CACHE_LINE_SIZE)
64#endif
65
66#ifdef AE_VCPP
67#pragma warning(push)
68#pragma warning(disable: 4324) // structure was padded due to __declspec(align())
69#pragma warning(disable: 4820) // padding was added
70#pragma warning(disable: 4127) // conditional expression is constant
71#endif
72
73namespace moodycamel {
74
75template<typename T, size_t MAX_BLOCK_SIZE = 512>
77{
78 // Design: Based on a queue-of-queues. The low-level queues are just
79 // circular buffers with front and tail indices indicating where the
80 // next element to dequeue is and where the next element can be enqueued,
81 // respectively. Each low-level queue is called a "block". Each block
82 // wastes exactly one element's worth of space to keep the design simple
83 // (if front == tail then the queue is empty, and can't be full).
84 // The high-level queue is a circular linked list of blocks; again there
85 // is a front and tail, but this time they are pointers to the blocks.
86 // The front block is where the next element to be dequeued is, provided
87 // the block is not empty. The back block is where elements are to be
88 // enqueued, provided the block is not full.
89 // The producer thread owns all the tail indices/pointers. The consumer
90 // thread owns all the front indices/pointers. Both threads read each
91 // other's variables, but only the owning thread updates them. E.g. After
92 // the consumer reads the producer's tail, the tail may change before the
93 // consumer is done dequeuing an object, but the consumer knows the tail
94 // will never go backwards, only forwards.
95 // If there is no room to enqueue an object, an additional block (of
96 // equal size to the last block) is added. Blocks are never removed.
97
98public:
99 typedef T value_type;
100
101 // Constructs a queue that can hold at least `size` elements without further
102 // allocations. If more than MAX_BLOCK_SIZE elements are requested,
103 // then several blocks of MAX_BLOCK_SIZE each are reserved (including
104 // at least one extra buffer block).
105 AE_NO_TSAN explicit ReaderWriterQueue(size_t size = 15)
106#ifndef NDEBUG
107 : enqueuing(false)
108 ,dequeuing(false)
109#endif
110 {
111 assert(MAX_BLOCK_SIZE == ceilToPow2(MAX_BLOCK_SIZE) && "MAX_BLOCK_SIZE must be a power of 2");
112 assert(MAX_BLOCK_SIZE >= 2 && "MAX_BLOCK_SIZE must be at least 2");
113
114 Block* firstBlock = nullptr;
115
116 largestBlockSize = ceilToPow2(size + 1); // We need a spare slot to fit size elements in the block
117 if (largestBlockSize > MAX_BLOCK_SIZE * 2) {
118 // We need a spare block in case the producer is writing to a different block the consumer is reading from, and
119 // wants to enqueue the maximum number of elements. We also need a spare element in each block to avoid the ambiguity
120 // between front == tail meaning "empty" and "full".
121 // So the effective number of slots that are guaranteed to be usable at any time is the block size - 1 times the
122 // number of blocks - 1. Solving for size and applying a ceiling to the division gives us (after simplifying):
123 size_t initialBlockCount = (size + MAX_BLOCK_SIZE * 2 - 3) / (MAX_BLOCK_SIZE - 1);
124 largestBlockSize = MAX_BLOCK_SIZE;
125 Block* lastBlock = nullptr;
126 for (size_t i = 0; i != initialBlockCount; ++i) {
127 auto block = make_block(largestBlockSize);
128 if (block == nullptr) {
129#ifdef MOODYCAMEL_EXCEPTIONS_ENABLED
130 throw std::bad_alloc();
131#else
132 abort();
133#endif
134 }
135 if (firstBlock == nullptr) {
136 firstBlock = block;
137 }
138 else {
139 lastBlock->next = block;
140 }
141 lastBlock = block;
142 block->next = firstBlock;
143 }
144 }
145 else {
146 firstBlock = make_block(largestBlockSize);
147 if (firstBlock == nullptr) {
148#ifdef MOODYCAMEL_EXCEPTIONS_ENABLED
149 throw std::bad_alloc();
150#else
151 abort();
152#endif
153 }
154 firstBlock->next = firstBlock;
155 }
156 frontBlock = firstBlock;
157 tailBlock = firstBlock;
158
159 // Make sure the reader/writer threads will have the initialized memory setup above:
161 }
162
163 // Note: The queue should not be accessed concurrently while it's
164 // being moved. It's up to the user to synchronize this.
166 : frontBlock(other.frontBlock.load()),
167 tailBlock(other.tailBlock.load()),
168 largestBlockSize(other.largestBlockSize)
169#ifndef NDEBUG
170 ,enqueuing(false)
171 ,dequeuing(false)
172#endif
173 {
174 other.largestBlockSize = 32;
175 Block* b = other.make_block(other.largestBlockSize);
176 if (b == nullptr) {
177#ifdef MOODYCAMEL_EXCEPTIONS_ENABLED
178 throw std::bad_alloc();
179#else
180 abort();
181#endif
182 }
183 b->next = b;
184 other.frontBlock = b;
185 other.tailBlock = b;
186 }
187
188 // Note: The queue should not be accessed concurrently while it's
189 // being moved. It's up to the user to synchronize this.
191 {
192 Block* b = frontBlock.load();
193 frontBlock = other.frontBlock.load();
194 other.frontBlock = b;
195 b = tailBlock.load();
196 tailBlock = other.tailBlock.load();
197 other.tailBlock = b;
198 std::swap(largestBlockSize, other.largestBlockSize);
199 return *this;
200 }
201
202 // Note: The queue should not be accessed concurrently while it's
203 // being deleted. It's up to the user to synchronize this.
205 {
206 // Make sure we get the latest version of all variables from other CPUs:
208
209 // Destroy any remaining objects in queue and free memory
210 Block* frontBlock_ = frontBlock;
211 Block* block = frontBlock_;
212 do {
213 Block* nextBlock = block->next;
214 size_t blockFront = block->front;
215 size_t blockTail = block->tail;
216
217 for (size_t i = blockFront; i != blockTail; i = (i + 1) & block->sizeMask) {
218 auto element = reinterpret_cast<T*>(block->data + i * sizeof(T));
219 element->~T();
220 (void)element;
221 }
222
223 auto rawBlock = block->rawThis;
224 block->~Block();
225 std::free(rawBlock);
226 block = nextBlock;
227 } while (block != frontBlock_);
228 }
229
230
231 // Enqueues a copy of element if there is room in the queue.
232 // Returns true if the element was enqueued, false otherwise.
233 // Does not allocate memory.
234 AE_FORCEINLINE bool try_enqueue(T const& element) AE_NO_TSAN
235 {
236 return inner_enqueue<CannotAlloc>(element);
237 }
238
239 // Enqueues a moved copy of element if there is room in the queue.
240 // Returns true if the element was enqueued, false otherwise.
241 // Does not allocate memory.
243 {
244 return inner_enqueue<CannotAlloc>(std::forward<T>(element));
245 }
246
247#if MOODYCAMEL_HAS_EMPLACE
248 // Like try_enqueue() but with emplace semantics (i.e. construct-in-place).
249 template<typename... Args>
251 {
252 return inner_enqueue<CannotAlloc>(std::forward<Args>(args)...);
253 }
254#endif
255
256 // Enqueues a copy of element on the queue.
257 // Allocates an additional block of memory if needed.
258 // Only fails (returns false) if memory allocation fails.
259 AE_FORCEINLINE bool enqueue(T const& element) AE_NO_TSAN
260 {
261 return inner_enqueue<CanAlloc>(element);
262 }
263
264 // Enqueues a moved copy of element on the queue.
265 // Allocates an additional block of memory if needed.
266 // Only fails (returns false) if memory allocation fails.
268 {
269 return inner_enqueue<CanAlloc>(std::forward<T>(element));
270 }
271
272#if MOODYCAMEL_HAS_EMPLACE
273 // Like enqueue() but with emplace semantics (i.e. construct-in-place).
274 template<typename... Args>
276 {
277 return inner_enqueue<CanAlloc>(std::forward<Args>(args)...);
278 }
279#endif
280
281 // Attempts to dequeue an element; if the queue is empty,
282 // returns false instead. If the queue has at least one element,
283 // moves front to result using operator=, then returns true.
284 template<typename U>
285 bool try_dequeue(U& result) AE_NO_TSAN
286 {
287#ifndef NDEBUG
288 ReentrantGuard guard(this->dequeuing);
289#endif
290
291 // High-level pseudocode:
292 // Remember where the tail block is
293 // If the front block has an element in it, dequeue it
294 // Else
295 // If front block was the tail block when we entered the function, return false
296 // Else advance to next block and dequeue the item there
297
298 // Note that we have to use the value of the tail block from before we check if the front
299 // block is full or not, in case the front block is empty and then, before we check if the
300 // tail block is at the front block or not, the producer fills up the front block *and
301 // moves on*, which would make us skip a filled block. Seems unlikely, but was consistently
302 // reproducible in practice.
303 // In order to avoid overhead in the common case, though, we do a double-checked pattern
304 // where we have the fast path if the front block is not empty, then read the tail block,
305 // then re-read the front block and check if it's not empty again, then check if the tail
306 // block has advanced.
307
308 Block* frontBlock_ = frontBlock.load();
309 size_t blockTail = frontBlock_->localTail;
310 size_t blockFront = frontBlock_->front.load();
311
312 if (blockFront != blockTail || blockFront != (frontBlock_->localTail = frontBlock_->tail.load())) {
314
315 non_empty_front_block:
316 // Front block not empty, dequeue from here
317 auto element = reinterpret_cast<T*>(frontBlock_->data + blockFront * sizeof(T));
318 result = std::move(*element);
319 element->~T();
320
321 blockFront = (blockFront + 1) & frontBlock_->sizeMask;
322
324 frontBlock_->front = blockFront;
325 }
326 else if (frontBlock_ != tailBlock.load()) {
328
329 frontBlock_ = frontBlock.load();
330 blockTail = frontBlock_->localTail = frontBlock_->tail.load();
331 blockFront = frontBlock_->front.load();
333
334 if (blockFront != blockTail) {
335 // Oh look, the front block isn't empty after all
336 goto non_empty_front_block;
337 }
338
339 // Front block is empty but there's another block ahead, advance to it
340 Block* nextBlock = frontBlock_->next;
341 // Don't need an acquire fence here since next can only ever be set on the tailBlock,
342 // and we're not the tailBlock, and we did an acquire earlier after reading tailBlock which
343 // ensures next is up-to-date on this CPU in case we recently were at tailBlock.
344
345 size_t nextBlockFront = nextBlock->front.load();
346 size_t nextBlockTail = nextBlock->localTail = nextBlock->tail.load();
348
349 // Since the tailBlock is only ever advanced after being written to,
350 // we know there's for sure an element to dequeue on it
351 assert(nextBlockFront != nextBlockTail);
352 AE_UNUSED(nextBlockTail);
353
354 // We're done with this block, let the producer use it if it needs
355 fence(memory_order_release); // Expose possibly pending changes to frontBlock->front from last dequeue
356 frontBlock = frontBlock_ = nextBlock;
357
358 compiler_fence(memory_order_release); // Not strictly needed
359
360 auto element = reinterpret_cast<T*>(frontBlock_->data + nextBlockFront * sizeof(T));
361
362 result = std::move(*element);
363 element->~T();
364
365 nextBlockFront = (nextBlockFront + 1) & frontBlock_->sizeMask;
366
368 frontBlock_->front = nextBlockFront;
369 }
370 else {
371 // No elements in current block and no other block to advance to
372 return false;
373 }
374
375 return true;
376 }
377
378
379 // Returns a pointer to the front element in the queue (the one that
380 // would be removed next by a call to `try_dequeue` or `pop`). If the
381 // queue appears empty at the time the method is called, nullptr is
382 // returned instead.
383 // Must be called only from the consumer thread.
384 T* peek() const AE_NO_TSAN
385 {
386#ifndef NDEBUG
387 ReentrantGuard guard(this->dequeuing);
388#endif
389 // See try_dequeue() for reasoning
390
391 Block* frontBlock_ = frontBlock.load();
392 size_t blockTail = frontBlock_->localTail;
393 size_t blockFront = frontBlock_->front.load();
394
395 if (blockFront != blockTail || blockFront != (frontBlock_->localTail = frontBlock_->tail.load())) {
397 non_empty_front_block:
398 return reinterpret_cast<T*>(frontBlock_->data + blockFront * sizeof(T));
399 }
400 else if (frontBlock_ != tailBlock.load()) {
402 frontBlock_ = frontBlock.load();
403 blockTail = frontBlock_->localTail = frontBlock_->tail.load();
404 blockFront = frontBlock_->front.load();
406
407 if (blockFront != blockTail) {
408 goto non_empty_front_block;
409 }
410
411 Block* nextBlock = frontBlock_->next;
412
413 size_t nextBlockFront = nextBlock->front.load();
415
416 assert(nextBlockFront != nextBlock->tail.load());
417 return reinterpret_cast<T*>(nextBlock->data + nextBlockFront * sizeof(T));
418 }
419
420 return nullptr;
421 }
422
423 // Removes the front element from the queue, if any, without returning it.
424 // Returns true on success, or false if the queue appeared empty at the time
425 // `pop` was called.
427 {
428#ifndef NDEBUG
429 ReentrantGuard guard(this->dequeuing);
430#endif
431 // See try_dequeue() for reasoning
432
433 Block* frontBlock_ = frontBlock.load();
434 size_t blockTail = frontBlock_->localTail;
435 size_t blockFront = frontBlock_->front.load();
436
437 if (blockFront != blockTail || blockFront != (frontBlock_->localTail = frontBlock_->tail.load())) {
439
440 non_empty_front_block:
441 auto element = reinterpret_cast<T*>(frontBlock_->data + blockFront * sizeof(T));
442 element->~T();
443
444 blockFront = (blockFront + 1) & frontBlock_->sizeMask;
445
447 frontBlock_->front = blockFront;
448 }
449 else if (frontBlock_ != tailBlock.load()) {
451 frontBlock_ = frontBlock.load();
452 blockTail = frontBlock_->localTail = frontBlock_->tail.load();
453 blockFront = frontBlock_->front.load();
455
456 if (blockFront != blockTail) {
457 goto non_empty_front_block;
458 }
459
460 // Front block is empty but there's another block ahead, advance to it
461 Block* nextBlock = frontBlock_->next;
462
463 size_t nextBlockFront = nextBlock->front.load();
464 size_t nextBlockTail = nextBlock->localTail = nextBlock->tail.load();
466
467 assert(nextBlockFront != nextBlockTail);
468 AE_UNUSED(nextBlockTail);
469
471 frontBlock = frontBlock_ = nextBlock;
472
474
475 auto element = reinterpret_cast<T*>(frontBlock_->data + nextBlockFront * sizeof(T));
476 element->~T();
477
478 nextBlockFront = (nextBlockFront + 1) & frontBlock_->sizeMask;
479
481 frontBlock_->front = nextBlockFront;
482 }
483 else {
484 // No elements in current block and no other block to advance to
485 return false;
486 }
487
488 return true;
489 }
490
491 // Returns the approximate number of items currently in the queue.
492 // Safe to call from both the producer and consumer threads.
493 inline size_t size_approx() const AE_NO_TSAN
494 {
495 size_t result = 0;
496 Block* frontBlock_ = frontBlock.load();
497 Block* block = frontBlock_;
498 do {
500 size_t blockFront = block->front.load();
501 size_t blockTail = block->tail.load();
502 result += (blockTail - blockFront) & block->sizeMask;
503 block = block->next.load();
504 } while (block != frontBlock_);
505 return result;
506 }
507
508 // Returns the total number of items that could be enqueued without incurring
509 // an allocation when this queue is empty.
510 // Safe to call from both the producer and consumer threads.
511 //
512 // NOTE: The actual capacity during usage may be different depending on the consumer.
513 // If the consumer is removing elements concurrently, the producer cannot add to
514 // the block the consumer is removing from until it's completely empty, except in
515 // the case where the producer was writing to the same block the consumer was
516 // reading from the whole time.
517 inline size_t max_capacity() const {
518 size_t result = 0;
519 Block* frontBlock_ = frontBlock.load();
520 Block* block = frontBlock_;
521 do {
523 result += block->sizeMask;
524 block = block->next.load();
525 } while (block != frontBlock_);
526 return result;
527 }
528
529
530private:
531 enum AllocationMode { CanAlloc, CannotAlloc };
532
533#if MOODYCAMEL_HAS_EMPLACE
534 template<AllocationMode canAlloc, typename... Args>
535 bool inner_enqueue(Args&&... args) AE_NO_TSAN
536#else
537 template<AllocationMode canAlloc, typename U>
538 bool inner_enqueue(U&& element) AE_NO_TSAN
539#endif
540 {
541#ifndef NDEBUG
542 ReentrantGuard guard(this->enqueuing);
543#endif
544
545 // High-level pseudocode (assuming we're allowed to alloc a new block):
546 // If room in tail block, add to tail
547 // Else check next block
548 // If next block is not the head block, enqueue on next block
549 // Else create a new block and enqueue there
550 // Advance tail to the block we just enqueued to
551
552 Block* tailBlock_ = tailBlock.load();
553 size_t blockFront = tailBlock_->localFront;
554 size_t blockTail = tailBlock_->tail.load();
555
556 size_t nextBlockTail = (blockTail + 1) & tailBlock_->sizeMask;
557 if (nextBlockTail != blockFront || nextBlockTail != (tailBlock_->localFront = tailBlock_->front.load())) {
559 // This block has room for at least one more element
560 char* location = tailBlock_->data + blockTail * sizeof(T);
561#if MOODYCAMEL_HAS_EMPLACE
562 new (location) T(std::forward<Args>(args)...);
563#else
564 new (location) T(std::forward<U>(element));
565#endif
566
568 tailBlock_->tail = nextBlockTail;
569 }
570 else {
572 if (tailBlock_->next.load() != frontBlock) {
573 // Note that the reason we can't advance to the frontBlock and start adding new entries there
574 // is because if we did, then dequeue would stay in that block, eventually reading the new values,
575 // instead of advancing to the next full block (whose values were enqueued first and so should be
576 // consumed first).
577
578 fence(memory_order_acquire); // Ensure we get latest writes if we got the latest frontBlock
579
580 // tailBlock is full, but there's a free block ahead, use it
581 Block* tailBlockNext = tailBlock_->next.load();
582 size_t nextBlockFront = tailBlockNext->localFront = tailBlockNext->front.load();
583 nextBlockTail = tailBlockNext->tail.load();
585
586 // This block must be empty since it's not the head block and we
587 // go through the blocks in a circle
588 assert(nextBlockFront == nextBlockTail);
589 tailBlockNext->localFront = nextBlockFront;
590
591 char* location = tailBlockNext->data + nextBlockTail * sizeof(T);
592#if MOODYCAMEL_HAS_EMPLACE
593 new (location) T(std::forward<Args>(args)...);
594#else
595 new (location) T(std::forward<U>(element));
596#endif
597
598 tailBlockNext->tail = (nextBlockTail + 1) & tailBlockNext->sizeMask;
599
601 tailBlock = tailBlockNext;
602 }
603 else if (canAlloc == CanAlloc) {
604 // tailBlock is full and there's no free block ahead; create a new block
605 auto newBlockSize = largestBlockSize >= MAX_BLOCK_SIZE ? largestBlockSize : largestBlockSize * 2;
606 auto newBlock = make_block(newBlockSize);
607 if (newBlock == nullptr) {
608 // Could not allocate a block!
609 return false;
610 }
611 largestBlockSize = newBlockSize;
612
613#if MOODYCAMEL_HAS_EMPLACE
614 new (newBlock->data) T(std::forward<Args>(args)...);
615#else
616 new (newBlock->data) T(std::forward<U>(element));
617#endif
618 assert(newBlock->front == 0);
619 newBlock->tail = newBlock->localTail = 1;
620
621 newBlock->next = tailBlock_->next.load();
622 tailBlock_->next = newBlock;
623
624 // Might be possible for the dequeue thread to see the new tailBlock->next
625 // *without* seeing the new tailBlock value, but this is OK since it can't
626 // advance to the next block until tailBlock is set anyway (because the only
627 // case where it could try to read the next is if it's already at the tailBlock,
628 // and it won't advance past tailBlock in any circumstance).
629
631 tailBlock = newBlock;
632 }
633 else if (canAlloc == CannotAlloc) {
634 // Would have had to allocate a new block to enqueue, but not allowed
635 return false;
636 }
637 else {
638 assert(false && "Should be unreachable code");
639 return false;
640 }
641 }
642
643 return true;
644 }
645
646
647 // Disable copying
648 ReaderWriterQueue(ReaderWriterQueue const&) { }
649
650 // Disable assignment
651 ReaderWriterQueue& operator=(ReaderWriterQueue const&) { }
652
653
654 AE_FORCEINLINE static size_t ceilToPow2(size_t x)
655 {
656 // From http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
657 --x;
658 x |= x >> 1;
659 x |= x >> 2;
660 x |= x >> 4;
661 for (size_t i = 1; i < sizeof(size_t); i <<= 1) {
662 x |= x >> (i << 3);
663 }
664 ++x;
665 return x;
666 }
667
668 template<typename U>
669 static AE_FORCEINLINE char* align_for(char* ptr) AE_NO_TSAN
670 {
671 const std::size_t alignment = std::alignment_of<U>::value;
672 return ptr + (alignment - (reinterpret_cast<std::uintptr_t>(ptr) % alignment)) % alignment;
673 }
674private:
675#ifndef NDEBUG
676 struct ReentrantGuard
677 {
678 AE_NO_TSAN ReentrantGuard(weak_atomic<bool>& _inSection)
679 : inSection(_inSection)
680 {
681 assert(!inSection && "Concurrent (or re-entrant) enqueue or dequeue operation detected (only one thread at a time may hold the producer or consumer role)");
682 inSection = true;
683 }
684
685 AE_NO_TSAN ~ReentrantGuard() { inSection = false; }
686
687 private:
688 ReentrantGuard& operator=(ReentrantGuard const&);
689
690 private:
691 weak_atomic<bool>& inSection;
692 };
693#endif
694
695 struct Block
696 {
697 // Avoid false-sharing by putting highly contended variables on their own cache lines
698 weak_atomic<size_t> front; // (Atomic) Elements are read from here
699 size_t localTail; // An uncontended shadow copy of tail, owned by the consumer
700
701 char cachelineFiller0[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(weak_atomic<size_t>) - sizeof(size_t)];
702 weak_atomic<size_t> tail; // (Atomic) Elements are enqueued here
703 size_t localFront;
704
705 char cachelineFiller1[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(weak_atomic<size_t>) - sizeof(size_t)]; // next isn't very contended, but we don't want it on the same cache line as tail (which is)
706 weak_atomic<Block*> next; // (Atomic)
707
708 char* data; // Contents (on heap) are aligned to T's alignment
709
710 const size_t sizeMask;
711
712
713 // size must be a power of two (and greater than 0)
714 AE_NO_TSAN Block(size_t const& _size, char* _rawThis, char* _data)
715 : front(0UL), localTail(0), tail(0UL), localFront(0), next(nullptr), data(_data), sizeMask(_size - 1), rawThis(_rawThis)
716 {
717 }
718
719 private:
720 // C4512 - Assignment operator could not be generated
721 Block& operator=(Block const&);
722
723 public:
724 char* rawThis;
725 };
726
727
728 static Block* make_block(size_t capacity) AE_NO_TSAN
729 {
730 // Allocate enough memory for the block itself, as well as all the elements it will contain
731 auto size = sizeof(Block) + std::alignment_of<Block>::value - 1;
732 size += sizeof(T) * capacity + std::alignment_of<T>::value - 1;
733 auto newBlockRaw = static_cast<char*>(std::malloc(size));
734 if (newBlockRaw == nullptr) {
735 return nullptr;
736 }
737
738 auto newBlockAligned = align_for<Block>(newBlockRaw);
739 auto newBlockData = align_for<T>(newBlockAligned + sizeof(Block));
740 return new (newBlockAligned) Block(capacity, newBlockRaw, newBlockData);
741 }
742
743private:
744 weak_atomic<Block*> frontBlock; // (Atomic) Elements are dequeued from this block
745
746 char cachelineFiller[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(weak_atomic<Block*>)];
747 weak_atomic<Block*> tailBlock; // (Atomic) Elements are enqueued to this block
748
749 size_t largestBlockSize;
750
751#ifndef NDEBUG
752 weak_atomic<bool> enqueuing;
753 mutable weak_atomic<bool> dequeuing;
754#endif
755};
756
757// Like ReaderWriterQueue, but also providees blocking operations
758template<typename T, size_t MAX_BLOCK_SIZE = 512>
760{
761private:
762 typedef ::moodycamel::ReaderWriterQueue<T, MAX_BLOCK_SIZE> ReaderWriterQueue;
763
764public:
765 explicit BlockingReaderWriterQueue(size_t size = 15) AE_NO_TSAN
766 : inner(size), sema(new spsc_sema::LightweightSemaphore())
767 { }
768
770 : inner(std::move(other.inner)), sema(std::move(other.sema))
771 { }
772
774 {
775 std::swap(sema, other.sema);
776 std::swap(inner, other.inner);
777 return *this;
778 }
779
780
781 // Enqueues a copy of element if there is room in the queue.
782 // Returns true if the element was enqueued, false otherwise.
783 // Does not allocate memory.
784 AE_FORCEINLINE bool try_enqueue(T const& element) AE_NO_TSAN
785 {
786 if (inner.try_enqueue(element)) {
787 sema->signal();
788 return true;
789 }
790 return false;
791 }
792
793 // Enqueues a moved copy of element if there is room in the queue.
794 // Returns true if the element was enqueued, false otherwise.
795 // Does not allocate memory.
797 {
798 if (inner.try_enqueue(std::forward<T>(element))) {
799 sema->signal();
800 return true;
801 }
802 return false;
803 }
804
805#if MOODYCAMEL_HAS_EMPLACE
806 // Like try_enqueue() but with emplace semantics (i.e. construct-in-place).
807 template<typename... Args>
809 {
810 if (inner.try_emplace(std::forward<Args>(args)...)) {
811 sema->signal();
812 return true;
813 }
814 return false;
815 }
816#endif
817
818
819 // Enqueues a copy of element on the queue.
820 // Allocates an additional block of memory if needed.
821 // Only fails (returns false) if memory allocation fails.
822 AE_FORCEINLINE bool enqueue(T const& element) AE_NO_TSAN
823 {
824 if (inner.enqueue(element)) {
825 sema->signal();
826 return true;
827 }
828 return false;
829 }
830
831 // Enqueues a moved copy of element on the queue.
832 // Allocates an additional block of memory if needed.
833 // Only fails (returns false) if memory allocation fails.
835 {
836 if (inner.enqueue(std::forward<T>(element))) {
837 sema->signal();
838 return true;
839 }
840 return false;
841 }
842
843#if MOODYCAMEL_HAS_EMPLACE
844 // Like enqueue() but with emplace semantics (i.e. construct-in-place).
845 template<typename... Args>
847 {
848 if (inner.emplace(std::forward<Args>(args)...)) {
849 sema->signal();
850 return true;
851 }
852 return false;
853 }
854#endif
855
856
857 // Attempts to dequeue an element; if the queue is empty,
858 // returns false instead. If the queue has at least one element,
859 // moves front to result using operator=, then returns true.
860 template<typename U>
861 bool try_dequeue(U& result) AE_NO_TSAN
862 {
863 if (sema->tryWait()) {
864 bool success = inner.try_dequeue(result);
865 assert(success);
866 AE_UNUSED(success);
867 return true;
868 }
869 return false;
870 }
871
872
873 // Attempts to dequeue an element; if the queue is empty,
874 // waits until an element is available, then dequeues it.
875 template<typename U>
876 void wait_dequeue(U& result) AE_NO_TSAN
877 {
878 while (!sema->wait());
879 bool success = inner.try_dequeue(result);
880 AE_UNUSED(result);
881 assert(success);
882 AE_UNUSED(success);
883 }
884
885
886 // Attempts to dequeue an element; if the queue is empty,
887 // waits until an element is available up to the specified timeout,
888 // then dequeues it and returns true, or returns false if the timeout
889 // expires before an element can be dequeued.
890 // Using a negative timeout indicates an indefinite timeout,
891 // and is thus functionally equivalent to calling wait_dequeue.
892 template<typename U>
893 bool wait_dequeue_timed(U& result, std::int64_t timeout_usecs) AE_NO_TSAN
894 {
895 if (!sema->wait(timeout_usecs)) {
896 return false;
897 }
898 bool success = inner.try_dequeue(result);
899 AE_UNUSED(result);
900 assert(success);
901 AE_UNUSED(success);
902 return true;
903 }
904
905
906#if __cplusplus > 199711L || _MSC_VER >= 1700
907 // Attempts to dequeue an element; if the queue is empty,
908 // waits until an element is available up to the specified timeout,
909 // then dequeues it and returns true, or returns false if the timeout
910 // expires before an element can be dequeued.
911 // Using a negative timeout indicates an indefinite timeout,
912 // and is thus functionally equivalent to calling wait_dequeue.
913 template<typename U, typename Rep, typename Period>
914 inline bool wait_dequeue_timed(U& result, std::chrono::duration<Rep, Period> const& timeout) AE_NO_TSAN
915 {
916 return wait_dequeue_timed(result, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
917 }
918#endif
919
920
921 // Returns a pointer to the front element in the queue (the one that
922 // would be removed next by a call to `try_dequeue` or `pop`). If the
923 // queue appears empty at the time the method is called, nullptr is
924 // returned instead.
925 // Must be called only from the consumer thread.
927 {
928 return inner.peek();
929 }
930
931 // Removes the front element from the queue, if any, without returning it.
932 // Returns true on success, or false if the queue appeared empty at the time
933 // `pop` was called.
935 {
936 if (sema->tryWait()) {
937 bool result = inner.pop();
938 assert(result);
939 AE_UNUSED(result);
940 return true;
941 }
942 return false;
943 }
944
945 // Returns the approximate number of items currently in the queue.
946 // Safe to call from both the producer and consumer threads.
948 {
949 return sema->availableApprox();
950 }
951
952 // Returns the total number of items that could be enqueued without incurring
953 // an allocation when this queue is empty.
954 // Safe to call from both the producer and consumer threads.
955 //
956 // NOTE: The actual capacity during usage may be different depending on the consumer.
957 // If the consumer is removing elements concurrently, the producer cannot add to
958 // the block the consumer is removing from until it's completely empty, except in
959 // the case where the producer was writing to the same block the consumer was
960 // reading from the whole time.
962 return inner.max_capacity();
963 }
964
965private:
966 // Disable copying & assignment
969
970private:
971 ReaderWriterQueue inner;
972 std::unique_ptr<spsc_sema::LightweightSemaphore> sema;
973};
974
975} // end namespace moodycamel
976
977#ifdef AE_VCPP
978#pragma warning(pop)
979#endif
#define AE_UNUSED(x)
Definition: atomicops.h:44
#define AE_FORCEINLINE
Definition: atomicops.h:76
#define AE_NO_TSAN
Definition: atomicops.h:61
Definition: readerwriterqueue.h:760
AE_FORCEINLINE bool try_enqueue(T &&element) AE_NO_TSAN
Definition: readerwriterqueue.h:796
AE_FORCEINLINE bool try_emplace(Args &&... args) AE_NO_TSAN
Definition: readerwriterqueue.h:808
AE_FORCEINLINE bool emplace(Args &&... args) AE_NO_TSAN
Definition: readerwriterqueue.h:846
AE_FORCEINLINE bool enqueue(T const &element) AE_NO_TSAN
Definition: readerwriterqueue.h:822
bool wait_dequeue_timed(U &result, std::int64_t timeout_usecs) AE_NO_TSAN
Definition: readerwriterqueue.h:893
AE_FORCEINLINE size_t max_capacity() const
Definition: readerwriterqueue.h:961
AE_FORCEINLINE bool enqueue(T &&element) AE_NO_TSAN
Definition: readerwriterqueue.h:834
void wait_dequeue(U &result) AE_NO_TSAN
Definition: readerwriterqueue.h:876
AE_FORCEINLINE bool pop() AE_NO_TSAN
Definition: readerwriterqueue.h:934
BlockingReaderWriterQueue(BlockingReaderWriterQueue &&other) AE_NO_TSAN
Definition: readerwriterqueue.h:769
bool try_dequeue(U &result) AE_NO_TSAN
Definition: readerwriterqueue.h:861
AE_FORCEINLINE T * peek() const AE_NO_TSAN
Definition: readerwriterqueue.h:926
BlockingReaderWriterQueue(size_t size=15) AE_NO_TSAN
Definition: readerwriterqueue.h:765
BlockingReaderWriterQueue & operator=(BlockingReaderWriterQueue &&other) AE_NO_TSAN
Definition: readerwriterqueue.h:773
AE_FORCEINLINE bool try_enqueue(T const &element) AE_NO_TSAN
Definition: readerwriterqueue.h:784
AE_FORCEINLINE size_t size_approx() const AE_NO_TSAN
Definition: readerwriterqueue.h:947
Definition: readerwriterqueue.h:77
AE_FORCEINLINE bool enqueue(T &&element) AE_NO_TSAN
Definition: readerwriterqueue.h:267
T * peek() const AE_NO_TSAN
Definition: readerwriterqueue.h:384
AE_FORCEINLINE bool emplace(Args &&... args) AE_NO_TSAN
Definition: readerwriterqueue.h:275
size_t size_approx() const AE_NO_TSAN
Definition: readerwriterqueue.h:493
bool try_dequeue(U &result) AE_NO_TSAN
Definition: readerwriterqueue.h:285
ReaderWriterQueue & operator=(ReaderWriterQueue &&other) AE_NO_TSAN
Definition: readerwriterqueue.h:190
AE_FORCEINLINE bool enqueue(T const &element) AE_NO_TSAN
Definition: readerwriterqueue.h:259
T value_type
Definition: readerwriterqueue.h:99
AE_NO_TSAN ~ReaderWriterQueue()
Definition: readerwriterqueue.h:204
AE_FORCEINLINE bool try_enqueue(T &&element) AE_NO_TSAN
Definition: readerwriterqueue.h:242
bool pop() AE_NO_TSAN
Definition: readerwriterqueue.h:426
AE_NO_TSAN ReaderWriterQueue(size_t size=15)
Definition: readerwriterqueue.h:105
size_t max_capacity() const
Definition: readerwriterqueue.h:517
AE_NO_TSAN ReaderWriterQueue(ReaderWriterQueue &&other)
Definition: readerwriterqueue.h:165
AE_FORCEINLINE bool try_enqueue(T const &element) AE_NO_TSAN
Definition: readerwriterqueue.h:234
AE_FORCEINLINE bool try_emplace(Args &&... args) AE_NO_TSAN
Definition: readerwriterqueue.h:250
args
Definition: build.py:22
Definition: atomicops.h:93
AE_FORCEINLINE void compiler_fence(memory_order order) AE_NO_TSAN
Definition: atomicops.h:206
@ memory_order_acquire
Definition: atomicops.h:97
@ memory_order_sync
Definition: atomicops.h:104
@ memory_order_release
Definition: atomicops.h:98
AE_FORCEINLINE void fence(memory_order order) AE_NO_TSAN
Definition: atomicops.h:218
#define MOODYCAMEL_CACHE_LINE_SIZE
Definition: readerwriterqueue.h:36
#define MOODYCAMEL_MAYBE_ALIGN_TO_CACHELINE
Definition: readerwriterqueue.h:63