8#include <condition_variable>
20 const std::string m_name;
21 const size_t m_prefer;
24 std::mutex *m_mutex =
nullptr;
25 std::condition_variable *m_cond =
nullptr;
26 bool *m_open =
nullptr;
32 ) : m_name(std::move(name)), m_prefer(prefer), m_twins(
twins){
33 if (prefer < 2) {
throw std::runtime_error(
"PreferSize must not less than 2."); }
34 this->m_mutex =
twins->m_mutex;
35 this->m_cond =
twins->m_cond;
36 this->m_open =
twins->m_open;
39 inline bool isOpen() {
return *m_open && m_enable; }
42 TwinsBlockQueue(std::string name,
size_t prefer) : m_name(std::move(name)), m_prefer(prefer) {
43 if (prefer < 2) {
throw std::runtime_error(
"PreferSize must not less than 2."); }
44 this->m_mutex =
new std::mutex;
45 this->m_cond =
new std::condition_variable;
46 this->m_open =
new bool{
true};
55 std::unique_lock lock(*m_mutex);
56 if (m_twins !=
this) {
throw std::runtime_error(
"Already generate twins."); }
62 std::unique_lock lock(*m_mutex);
69 std::unique_lock lock(*m_mutex);
75 std::unique_lock lock(*m_mutex);
78#ifdef DEBUG_PRINT_FUNCTION_CALL
79 qDebug() << m_name.c_str() <<
"Close" << m_data.size();
83 void clear(
const std::function<
void(T)> &freeFunc) {
84 std::unique_lock lock(*m_mutex);
85 while(!m_data.empty()) {
86 freeFunc(m_data.front());
90#ifdef DEBUG_PRINT_FUNCTION_CALL
91 qDebug() << m_name.c_str() <<
"Clear" << m_data.size();
96 std::unique_lock lock(*m_mutex);
102 std::unique_lock lock(*m_mutex);
106 m_cond->wait(lock, [
this]{
107 return this->m_data.size() < m_prefer ||
108 (m_twins->m_enable && m_twins->m_data.size() < m_twins->m_prefer) || !isOpen();
111 if (m_data.size() == 1) { m_cond->notify_all(); }
112#ifdef DEBUG_PRINT_FUNCTION_CALL
113 qDebug() << m_name.c_str() <<
"Push" << m_data.size();
120 const static T defaultValue = {};
121 std::unique_lock lock(*m_mutex);
122 m_cond->wait(lock, [
this]{
return !this->m_data.empty() || !isOpen();});
123 if (m_data.empty()) {
124 return func(defaultValue);
126 return func(m_data.front());
131 std::unique_lock lock(*m_mutex);
132 m_cond->wait(lock, [
this]{
return !this->m_data.empty() || !isOpen();});
133 if (m_data.empty()) {
136 T ret = m_data.front();
137 if (protectNull && !ret) {
return {}; }
139 if (m_data.size() < m_prefer / 2 && isOpen()) { this->m_cond->notify_all(); }
144 int skip(
const std::function<
bool(T)> &predicate,
const std::function<
void(T)> &freeFunc) {
147 std::unique_lock lock(*m_mutex);
148 m_cond->wait(lock, [
this]{
return !this->m_data.empty() || !isOpen();});
149 if (m_data.empty()) {
return ret;}
150 T element = m_data.front();
151 if (element && predicate(element)) {
联动队列. 用于单生产者多队列, 单消费者通信.
Definition: twins_queue.hpp:17
void open()
Definition: twins_queue.hpp:95
T remove(bool protectNull)
Definition: twins_queue.hpp:130
bool push(const T item)
Definition: twins_queue.hpp:101
void setEnable(bool b)
Definition: twins_queue.hpp:61
TwinsBlockQueue(std::string name, size_t prefer)
Definition: twins_queue.hpp:42
TwinsBlockQueue< T > * twins(const std::string &name, size_t prefer)
Definition: twins_queue.hpp:54
void close()
Definition: twins_queue.hpp:74
bool isEnable() const
Definition: twins_queue.hpp:68
R viewFront(const std::function< R(T)> &func)
Definition: twins_queue.hpp:119
void clear(const std::function< void(T)> &freeFunc)
Definition: twins_queue.hpp:83
int skip(const std::function< bool(T)> &predicate, const std::function< void(T)> &freeFunc)
Definition: twins_queue.hpp:144
~TwinsBlockQueue()
Definition: twins_queue.hpp:50