21#ifndef MOODYCAMEL_CACHE_LINE_SIZE
22#define MOODYCAMEL_CACHE_LINE_SIZE 64
35 : maxcap(capacity), mask(), rawData(), data(),
36 slots_(new spsc_sema::LightweightSemaphore(static_cast<spsc_sema::LightweightSemaphore::ssize_t>(capacity))),
37 items(new spsc_sema::LightweightSemaphore(0)),
38 nextSlot(0), nextItem(0)
43 capacity |= capacity >> 1;
44 capacity |= capacity >> 2;
45 capacity |= capacity >> 4;
46 for (std::size_t i = 1; i <
sizeof(std::size_t); i <<= 1)
47 capacity |= capacity >> (i << 3);
49 rawData =
static_cast<char*
>(std::malloc(capacity *
sizeof(T) + std::alignment_of<T>::value - 1));
50 data = align_for<T>(rawData);
54 : maxcap(0), mask(0), rawData(nullptr), data(nullptr),
55 slots_(new spsc_sema::LightweightSemaphore(0)),
56 items(new spsc_sema::LightweightSemaphore(0)),
57 nextSlot(), nextItem()
68 for (std::size_t i = 0, n = items->availableApprox(); i != n; ++i)
69 reinterpret_cast<T*
>(data)[(nextItem + i) & mask].~T();
85 std::swap(maxcap, other.maxcap);
86 std::swap(mask, other.mask);
87 std::swap(rawData, other.rawData);
88 std::swap(data, other.data);
89 std::swap(slots_, other.slots_);
90 std::swap(items, other.items);
91 std::swap(nextSlot, other.nextSlot);
92 std::swap(nextItem, other.nextItem);
101 if (!slots_->tryWait())
113 if (!slots_->tryWait())
115 inner_enqueue(std::move(item));
125 while (!slots_->wait());
135 while (!slots_->wait());
136 inner_enqueue(std::move(item));
146 if (!slots_->wait(timeout_usecs))
159 if (!slots_->wait(timeout_usecs))
161 inner_enqueue(std::move(item));
170 template<
typename Rep,
typename Period>
173 return wait_enqueue_timed(item, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
181 template<
typename Rep,
typename Period>
184 return wait_enqueue_timed(std::move(item), std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
194 if (!items->tryWait())
206 while (!items->wait());
218 if (!items->wait(timeout_usecs))
229 template<
typename U,
typename Rep,
typename Period>
232 return wait_dequeue_timed(item, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
239 return items->availableApprox();
251 void inner_enqueue(U&& item)
253 std::size_t i = nextSlot++;
254 new (
reinterpret_cast<T*
>(data) + (i & mask)) T(std::forward<U>(item));
259 void inner_dequeue(U& item)
261 std::size_t i = nextItem++;
262 T& element =
reinterpret_cast<T*
>(data)[i & mask];
263 item = std::move(element);
269 static inline char* align_for(
char* ptr)
271 const std::size_t alignment = std::alignment_of<U>::value;
272 return ptr + (alignment - (
reinterpret_cast<std::uintptr_t
>(ptr) % alignment)) % alignment;
280 std::unique_ptr<spsc_sema::LightweightSemaphore> slots_;
281 std::unique_ptr<spsc_sema::LightweightSemaphore> items;
282 char cachelineFiller0[
MOODYCAMEL_CACHE_LINE_SIZE -
sizeof(
char*) * 2 -
sizeof(std::size_t) * 2 -
sizeof(std::unique_ptr<spsc_sema::LightweightSemaphore>) * 2];
283 std::size_t nextSlot;
285 std::size_t nextItem;
Definition: readerwritercircularbuffer.h:29
bool wait_enqueue_timed(T &&item, std::chrono::duration< Rep, Period > const &timeout)
Definition: readerwritercircularbuffer.h:182
BlockingReaderWriterCircularBuffer(std::size_t capacity)
Definition: readerwritercircularbuffer.h:34
void wait_enqueue(T const &item)
Definition: readerwritercircularbuffer.h:123
bool wait_enqueue_timed(T const &item, std::chrono::duration< Rep, Period > const &timeout)
Definition: readerwritercircularbuffer.h:171
bool wait_enqueue_timed(T &&item, std::int64_t timeout_usecs)
Definition: readerwritercircularbuffer.h:157
std::size_t max_capacity() const
Definition: readerwritercircularbuffer.h:244
bool wait_dequeue_timed(U &item, std::int64_t timeout_usecs)
Definition: readerwritercircularbuffer.h:216
BlockingReaderWriterCircularBuffer & operator=(BlockingReaderWriterCircularBuffer const &)=delete
T value_type
Definition: readerwritercircularbuffer.h:31
~BlockingReaderWriterCircularBuffer()
Definition: readerwritercircularbuffer.h:66
bool try_enqueue(T const &item)
Definition: readerwritercircularbuffer.h:99
bool try_dequeue(U &item)
Definition: readerwritercircularbuffer.h:192
void wait_enqueue(T &&item)
Definition: readerwritercircularbuffer.h:133
bool wait_enqueue_timed(T const &item, std::int64_t timeout_usecs)
Definition: readerwritercircularbuffer.h:144
bool wait_dequeue_timed(U &item, std::chrono::duration< Rep, Period > const &timeout)
Definition: readerwritercircularbuffer.h:230
void wait_dequeue(U &item)
Definition: readerwritercircularbuffer.h:204
BlockingReaderWriterCircularBuffer(BlockingReaderWriterCircularBuffer const &)=delete
BlockingReaderWriterCircularBuffer(BlockingReaderWriterCircularBuffer &&other)
Definition: readerwritercircularbuffer.h:53
bool try_enqueue(T &&item)
Definition: readerwritercircularbuffer.h:111
void swap(BlockingReaderWriterCircularBuffer &other) noexcept
Definition: readerwritercircularbuffer.h:83
std::size_t size_approx() const
Definition: readerwritercircularbuffer.h:237
BlockingReaderWriterCircularBuffer & operator=(BlockingReaderWriterCircularBuffer &&other) noexcept
Definition: readerwritercircularbuffer.h:73
Definition: atomicops.h:93
#define MOODYCAMEL_CACHE_LINE_SIZE
Definition: readerwritercircularbuffer.h:22