PonyPlayer
readerwritercircularbuffer.h
浏览该文件的文档.
1// ©2020 Cameron Desrochers.
2// Distributed under the simplified BSD license (see the license file that
3// should have come with this header).
4
5// Provides a C++11 implementation of a single-producer, single-consumer wait-free concurrent
6// circular buffer (fixed-size queue).
7
8#pragma once
9
10#include <utility>
11#include <chrono>
12#include <memory>
13#include <cstdlib>
14#include <cstdint>
15#include <cassert>
16
17// Note that this implementation is fully modern C++11 (not compatible with old MSVC versions)
18// but we still include atomicops.h for its LightweightSemaphore implementation.
19#include "atomicops.h"
20
21#ifndef MOODYCAMEL_CACHE_LINE_SIZE
22#define MOODYCAMEL_CACHE_LINE_SIZE 64
23#endif
24
25namespace moodycamel {
26
27template<typename T>
29{
30public:
31 typedef T value_type;
32
33public:
34 explicit BlockingReaderWriterCircularBuffer(std::size_t capacity)
35 : maxcap(capacity), mask(), rawData(), data(),
36 slots_(new spsc_sema::LightweightSemaphore(static_cast<spsc_sema::LightweightSemaphore::ssize_t>(capacity))),
37 items(new spsc_sema::LightweightSemaphore(0)),
38 nextSlot(0), nextItem(0)
39 {
40 // Round capacity up to power of two to compute modulo mask.
41 // Adapted from http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
42 --capacity;
43 capacity |= capacity >> 1;
44 capacity |= capacity >> 2;
45 capacity |= capacity >> 4;
46 for (std::size_t i = 1; i < sizeof(std::size_t); i <<= 1)
47 capacity |= capacity >> (i << 3);
48 mask = capacity++;
49 rawData = static_cast<char*>(std::malloc(capacity * sizeof(T) + std::alignment_of<T>::value - 1));
50 data = align_for<T>(rawData);
51 }
52
54 : maxcap(0), mask(0), rawData(nullptr), data(nullptr),
55 slots_(new spsc_sema::LightweightSemaphore(0)),
56 items(new spsc_sema::LightweightSemaphore(0)),
57 nextSlot(), nextItem()
58 {
59 swap(other);
60 }
61
63
64 // Note: The queue should not be accessed concurrently while it's
65 // being deleted. It's up to the user to synchronize this.
67 {
68 for (std::size_t i = 0, n = items->availableApprox(); i != n; ++i)
69 reinterpret_cast<T*>(data)[(nextItem + i) & mask].~T();
70 std::free(rawData);
71 }
72
74 {
75 swap(other);
76 return *this;
77 }
78
80
81 // Swaps the contents of this buffer with the contents of another.
82 // Not thread-safe.
84 {
85 std::swap(maxcap, other.maxcap);
86 std::swap(mask, other.mask);
87 std::swap(rawData, other.rawData);
88 std::swap(data, other.data);
89 std::swap(slots_, other.slots_);
90 std::swap(items, other.items);
91 std::swap(nextSlot, other.nextSlot);
92 std::swap(nextItem, other.nextItem);
93 }
94
95 // Enqueues a single item (by copying it).
96 // Fails if not enough room to enqueue.
97 // Thread-safe when called by producer thread.
98 // No exception guarantee (state will be corrupted) if constructor of T throws.
99 bool try_enqueue(T const& item)
100 {
101 if (!slots_->tryWait())
102 return false;
103 inner_enqueue(item);
104 return true;
105 }
106
107 // Enqueues a single item (by moving it, if possible).
108 // Fails if not enough room to enqueue.
109 // Thread-safe when called by producer thread.
110 // No exception guarantee (state will be corrupted) if constructor of T throws.
111 bool try_enqueue(T&& item)
112 {
113 if (!slots_->tryWait())
114 return false;
115 inner_enqueue(std::move(item));
116 return true;
117 }
118
119 // Blocks the current thread until there's enough space to enqueue the given item,
120 // then enqueues it (via copy).
121 // Thread-safe when called by producer thread.
122 // No exception guarantee (state will be corrupted) if constructor of T throws.
123 void wait_enqueue(T const& item)
124 {
125 while (!slots_->wait());
126 inner_enqueue(item);
127 }
128
129 // Blocks the current thread until there's enough space to enqueue the given item,
130 // then enqueues it (via move, if possible).
131 // Thread-safe when called by producer thread.
132 // No exception guarantee (state will be corrupted) if constructor of T throws.
133 void wait_enqueue(T&& item)
134 {
135 while (!slots_->wait());
136 inner_enqueue(std::move(item));
137 }
138
139 // Blocks the current thread until there's enough space to enqueue the given item,
140 // or the timeout expires. Returns false without enqueueing the item if the timeout
141 // expires, otherwise enqueues the item (via copy) and returns true.
142 // Thread-safe when called by producer thread.
143 // No exception guarantee (state will be corrupted) if constructor of T throws.
144 bool wait_enqueue_timed(T const& item, std::int64_t timeout_usecs)
145 {
146 if (!slots_->wait(timeout_usecs))
147 return false;
148 inner_enqueue(item);
149 return true;
150 }
151
152 // Blocks the current thread until there's enough space to enqueue the given item,
153 // or the timeout expires. Returns false without enqueueing the item if the timeout
154 // expires, otherwise enqueues the item (via move, if possible) and returns true.
155 // Thread-safe when called by producer thread.
156 // No exception guarantee (state will be corrupted) if constructor of T throws.
157 bool wait_enqueue_timed(T&& item, std::int64_t timeout_usecs)
158 {
159 if (!slots_->wait(timeout_usecs))
160 return false;
161 inner_enqueue(std::move(item));
162 return true;
163 }
164
165 // Blocks the current thread until there's enough space to enqueue the given item,
166 // or the timeout expires. Returns false without enqueueing the item if the timeout
167 // expires, otherwise enqueues the item (via copy) and returns true.
168 // Thread-safe when called by producer thread.
169 // No exception guarantee (state will be corrupted) if constructor of T throws.
170 template<typename Rep, typename Period>
171 inline bool wait_enqueue_timed(T const& item, std::chrono::duration<Rep, Period> const& timeout)
172 {
173 return wait_enqueue_timed(item, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
174 }
175
176 // Blocks the current thread until there's enough space to enqueue the given item,
177 // or the timeout expires. Returns false without enqueueing the item if the timeout
178 // expires, otherwise enqueues the item (via move, if possible) and returns true.
179 // Thread-safe when called by producer thread.
180 // No exception guarantee (state will be corrupted) if constructor of T throws.
181 template<typename Rep, typename Period>
182 inline bool wait_enqueue_timed(T&& item, std::chrono::duration<Rep, Period> const& timeout)
183 {
184 return wait_enqueue_timed(std::move(item), std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
185 }
186
187 // Attempts to dequeue a single item.
188 // Returns false if the buffer is empty.
189 // Thread-safe when called by consumer thread.
190 // No exception guarantee (state will be corrupted) if assignment operator of U throws.
191 template<typename U>
192 bool try_dequeue(U& item)
193 {
194 if (!items->tryWait())
195 return false;
196 inner_dequeue(item);
197 return true;
198 }
199
200 // Blocks the current thread until there's something to dequeue, then dequeues it.
201 // Thread-safe when called by consumer thread.
202 // No exception guarantee (state will be corrupted) if assignment operator of U throws.
203 template<typename U>
204 void wait_dequeue(U& item)
205 {
206 while (!items->wait());
207 inner_dequeue(item);
208 }
209
210 // Blocks the current thread until either there's something to dequeue
211 // or the timeout expires. Returns false without setting `item` if the
212 // timeout expires, otherwise assigns to `item` and returns true.
213 // Thread-safe when called by consumer thread.
214 // No exception guarantee (state will be corrupted) if assignment operator of U throws.
215 template<typename U>
216 bool wait_dequeue_timed(U& item, std::int64_t timeout_usecs)
217 {
218 if (!items->wait(timeout_usecs))
219 return false;
220 inner_dequeue(item);
221 return true;
222 }
223
224 // Blocks the current thread until either there's something to dequeue
225 // or the timeout expires. Returns false without setting `item` if the
226 // timeout expires, otherwise assigns to `item` and returns true.
227 // Thread-safe when called by consumer thread.
228 // No exception guarantee (state will be corrupted) if assignment operator of U throws.
229 template<typename U, typename Rep, typename Period>
230 inline bool wait_dequeue_timed(U& item, std::chrono::duration<Rep, Period> const& timeout)
231 {
232 return wait_dequeue_timed(item, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
233 }
234
235 // Returns a (possibly outdated) snapshot of the total number of elements currently in the buffer.
236 // Thread-safe.
237 inline std::size_t size_approx() const
238 {
239 return items->availableApprox();
240 }
241
242 // Returns the maximum number of elements that this circular buffer can hold at once.
243 // Thread-safe.
244 inline std::size_t max_capacity() const
245 {
246 return maxcap;
247 }
248
249private:
250 template<typename U>
251 void inner_enqueue(U&& item)
252 {
253 std::size_t i = nextSlot++;
254 new (reinterpret_cast<T*>(data) + (i & mask)) T(std::forward<U>(item));
255 items->signal();
256 }
257
258 template<typename U>
259 void inner_dequeue(U& item)
260 {
261 std::size_t i = nextItem++;
262 T& element = reinterpret_cast<T*>(data)[i & mask];
263 item = std::move(element);
264 element.~T();
265 slots_->signal();
266 }
267
268 template<typename U>
269 static inline char* align_for(char* ptr)
270 {
271 const std::size_t alignment = std::alignment_of<U>::value;
272 return ptr + (alignment - (reinterpret_cast<std::uintptr_t>(ptr) % alignment)) % alignment;
273 }
274
275private:
276 std::size_t maxcap; // actual (non-power-of-two) capacity
277 std::size_t mask; // circular buffer capacity mask (for cheap modulo)
278 char* rawData; // raw circular buffer memory
279 char* data; // circular buffer memory aligned to element alignment
280 std::unique_ptr<spsc_sema::LightweightSemaphore> slots_; // number of slots currently free (named with underscore to accommodate Qt's 'slots' macro)
281 std::unique_ptr<spsc_sema::LightweightSemaphore> items; // number of elements currently enqueued
282 char cachelineFiller0[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(char*) * 2 - sizeof(std::size_t) * 2 - sizeof(std::unique_ptr<spsc_sema::LightweightSemaphore>) * 2];
283 std::size_t nextSlot; // index of next free slot to enqueue into
284 char cachelineFiller1[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(std::size_t)];
285 std::size_t nextItem; // index of next element to dequeue from
286};
287
288}
Definition: readerwritercircularbuffer.h:29
bool wait_enqueue_timed(T &&item, std::chrono::duration< Rep, Period > const &timeout)
Definition: readerwritercircularbuffer.h:182
BlockingReaderWriterCircularBuffer(std::size_t capacity)
Definition: readerwritercircularbuffer.h:34
void wait_enqueue(T const &item)
Definition: readerwritercircularbuffer.h:123
bool wait_enqueue_timed(T const &item, std::chrono::duration< Rep, Period > const &timeout)
Definition: readerwritercircularbuffer.h:171
bool wait_enqueue_timed(T &&item, std::int64_t timeout_usecs)
Definition: readerwritercircularbuffer.h:157
std::size_t max_capacity() const
Definition: readerwritercircularbuffer.h:244
bool wait_dequeue_timed(U &item, std::int64_t timeout_usecs)
Definition: readerwritercircularbuffer.h:216
BlockingReaderWriterCircularBuffer & operator=(BlockingReaderWriterCircularBuffer const &)=delete
T value_type
Definition: readerwritercircularbuffer.h:31
~BlockingReaderWriterCircularBuffer()
Definition: readerwritercircularbuffer.h:66
bool try_enqueue(T const &item)
Definition: readerwritercircularbuffer.h:99
bool try_dequeue(U &item)
Definition: readerwritercircularbuffer.h:192
void wait_enqueue(T &&item)
Definition: readerwritercircularbuffer.h:133
bool wait_enqueue_timed(T const &item, std::int64_t timeout_usecs)
Definition: readerwritercircularbuffer.h:144
bool wait_dequeue_timed(U &item, std::chrono::duration< Rep, Period > const &timeout)
Definition: readerwritercircularbuffer.h:230
void wait_dequeue(U &item)
Definition: readerwritercircularbuffer.h:204
BlockingReaderWriterCircularBuffer(BlockingReaderWriterCircularBuffer const &)=delete
BlockingReaderWriterCircularBuffer(BlockingReaderWriterCircularBuffer &&other)
Definition: readerwritercircularbuffer.h:53
bool try_enqueue(T &&item)
Definition: readerwritercircularbuffer.h:111
void swap(BlockingReaderWriterCircularBuffer &other) noexcept
Definition: readerwritercircularbuffer.h:83
std::size_t size_approx() const
Definition: readerwritercircularbuffer.h:237
BlockingReaderWriterCircularBuffer & operator=(BlockingReaderWriterCircularBuffer &&other) noexcept
Definition: readerwritercircularbuffer.h:73
Definition: atomicops.h:93
#define MOODYCAMEL_CACHE_LINE_SIZE
Definition: readerwritercircularbuffer.h:22