PonyPlayer
twins_queue.hpp
浏览该文件的文档.
1//
2// Created by ColorsWind on 2022/5/4.
3//
4#pragma once
5#include <queue>
6#include <string>
7#include <mutex>
8#include <condition_variable>
9#include <utility>
10
11//#define DEBUG_PRINT_FUNCTION_CALL
16template<typename T>
18 std::queue<T> m_data;
19 bool m_enable{true};
20 const std::string m_name;
21 const size_t m_prefer;
22
23 TwinsBlockQueue<T> *m_twins;
24 std::mutex *m_mutex = nullptr;
25 std::condition_variable *m_cond = nullptr;
26 bool *m_open = nullptr;
27private:
29 std::string name,
30 size_t prefer,
32 ) : m_name(std::move(name)), m_prefer(prefer), m_twins(twins){
33 if (prefer < 2) { throw std::runtime_error("PreferSize must not less than 2."); }
34 this->m_mutex = twins->m_mutex;
35 this->m_cond = twins->m_cond;
36 this->m_open = twins->m_open;
37 }
38
39 inline bool isOpen() { return *m_open && m_enable; }
40
41public:
42 TwinsBlockQueue(std::string name, size_t prefer) : m_name(std::move(name)), m_prefer(prefer) {
43 if (prefer < 2) { throw std::runtime_error("PreferSize must not less than 2."); }
44 this->m_mutex = new std::mutex;
45 this->m_cond = new std::condition_variable;
46 this->m_open = new bool{true};
47 this->m_twins = this;
48 }
49
51 close();
52 }
53
54 TwinsBlockQueue<T> *twins(const std::string &name, size_t prefer) {
55 std::unique_lock lock(*m_mutex);
56 if (m_twins != this) { throw std::runtime_error("Already generate twins."); }
57 m_twins = new TwinsBlockQueue<T>{name, prefer, this};
58 return m_twins;
59 }
60
61 void setEnable(bool b) {
62 std::unique_lock lock(*m_mutex);
63 m_enable = b;
64 if (!m_enable)
65 m_cond->notify_all();
66 }
67
68 [[nodiscard]] bool isEnable() const {
69 std::unique_lock lock(*m_mutex);
70 return m_enable;
71 }
72
73
74 void close() {
75 std::unique_lock lock(*m_mutex);
76 *m_open = false;
77 m_cond->notify_all();
78#ifdef DEBUG_PRINT_FUNCTION_CALL
79 qDebug() << m_name.c_str() << "Close" << m_data.size();
80#endif
81 }
82
83 void clear(const std::function<void(T)> &freeFunc) {
84 std::unique_lock lock(*m_mutex);
85 while(!m_data.empty()) {
86 freeFunc(m_data.front());
87 m_data.pop();
88 }
89 m_cond->notify_all();
90#ifdef DEBUG_PRINT_FUNCTION_CALL
91 qDebug() << m_name.c_str() << "Clear" << m_data.size();
92#endif
93 }
94
95 void open() {
96 std::unique_lock lock(*m_mutex);
97 *m_open = true;
98 }
99
100
101 bool push(const T item) {
102 std::unique_lock lock(*m_mutex);
103 if (!isOpen()) {
104 return false;
105 }
106 m_cond->wait(lock, [this]{
107 return this->m_data.size() < m_prefer ||
108 (m_twins->m_enable && m_twins->m_data.size() < m_twins->m_prefer) || !isOpen();
109 });
110 m_data.push(item);
111 if (m_data.size() == 1) { m_cond->notify_all(); }
112#ifdef DEBUG_PRINT_FUNCTION_CALL
113 qDebug() << m_name.c_str() << "Push" << m_data.size();
114#endif
115 return true;
116 }
117
118 template<typename R>
119 R viewFront(const std::function<R(T)> &func) {
120 const static T defaultValue = {};
121 std::unique_lock lock(*m_mutex);
122 m_cond->wait(lock, [this]{ return !this->m_data.empty() || !isOpen();});
123 if (m_data.empty()) {
124 return func(defaultValue);
125 } else {
126 return func(m_data.front());
127 }
128 }
129
130 T remove(bool protectNull) {
131 std::unique_lock lock(*m_mutex);
132 m_cond->wait(lock, [this]{ return !this->m_data.empty() || !isOpen();});
133 if (m_data.empty()) {
134 return {};
135 } else {
136 T ret = m_data.front();
137 if (protectNull && !ret) { return {}; }
138 m_data.pop();
139 if (m_data.size() < m_prefer / 2 && isOpen()) { this->m_cond->notify_all(); }
140 return ret;
141 }
142 }
143
144 int skip(const std::function<bool(T)> &predicate, const std::function<void(T)> &freeFunc) {
145 int ret = 0;
146 while(true) {
147 std::unique_lock lock(*m_mutex);
148 m_cond->wait(lock, [this]{ return !this->m_data.empty() || !isOpen();});
149 if (m_data.empty()) { return ret;}
150 T element = m_data.front();
151 if (element && predicate(element)) {
152 m_data.pop();
153 lock.unlock();
154 freeFunc(element);
155 ++ret;
156 } else break;
157 }
158 return ret;
159 }
160
161// /**
162// * 取出队首元素, 若缺少元素, 则阻塞直到有元素.
163// * @return
164// */
165// const T& front() {
166// const static T defaultValue = {};
167// std::unique_lock lock(*m_mutex);
168// m_cond->wait(lock, [this]{ return !this->m_data.empty() || !isOpen();});
169//#ifdef DEBUG_PRINT_FUNCTION_CALL
170// qDebug() << m_name.c_str() << "IsEmpty" << m_data.empty() << "isOpen" << isOpen();
171//#endif
172// if (m_data.empty()) {
173// return defaultValue;
174// } else {
175// return m_data.front();
176// }
177// }
178//
192//
193// /**
194// * 删除队首元素, 需要保证 size >= 1.
195// */
196// bool pop() {
197// std::unique_lock lock(*m_mutex);
198// if (m_data.empty()) { return false; }
199//#ifdef QT_DEBUG
200// // nullptr signals end of file
201// if (!m_data.front()) { throw std::runtime_error("Should not pop nullptr"); }
202//#endif
203// m_data.pop();
204// // avoid bumpy
205// if (m_data.size() < m_prefer / 2) { this->m_cond->notify_all(); }
206//#ifdef DEBUG_PRINT_FUNCTION_CALL
207// qDebug() << m_name.c_str() << "Pop" << m_data.size();
208//#endif
209// return *m_open;
210// }
211//
212
213
214};
215
216template class TwinsBlockQueue<AVFrame *>;
联动队列. 用于单生产者多队列, 单消费者通信.
Definition: twins_queue.hpp:17
void open()
Definition: twins_queue.hpp:95
T remove(bool protectNull)
Definition: twins_queue.hpp:130
bool push(const T item)
Definition: twins_queue.hpp:101
void setEnable(bool b)
Definition: twins_queue.hpp:61
TwinsBlockQueue(std::string name, size_t prefer)
Definition: twins_queue.hpp:42
TwinsBlockQueue< T > * twins(const std::string &name, size_t prefer)
Definition: twins_queue.hpp:54
void close()
Definition: twins_queue.hpp:74
bool isEnable() const
Definition: twins_queue.hpp:68
R viewFront(const std::function< R(T)> &func)
Definition: twins_queue.hpp:119
void clear(const std::function< void(T)> &freeFunc)
Definition: twins_queue.hpp:83
int skip(const std::function< bool(T)> &predicate, const std::function< void(T)> &freeFunc)
Definition: twins_queue.hpp:144
~TwinsBlockQueue()
Definition: twins_queue.hpp:50