forked from qicosmos/cosmos
-
Notifications
You must be signed in to change notification settings - Fork 0
/
SyncQueue.hpp
119 lines (102 loc) · 2.61 KB
/
SyncQueue.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#pragma once
#include<list>
#include<mutex>
#include<thread>
#include<condition_variable>
#include <iostream>
using namespace std;
template<typename T>
class SyncQueue
{
public:
SyncQueue(int maxSize) :m_maxSize(maxSize), m_needStop(false)
{
}
void Put(const T&x)
{
Add(x);
}
void Put(T&&x)
{
Add(std::forward<T>(x));
}
void Take(std::list<T>& list)
{
std::unique_lock<std::mutex> locker(m_mutex);
m_notEmpty.wait(locker, [this]{return m_needStop || NotEmpty(); });
if (m_needStop)
return;
list = std::move(m_queue);
m_notFull.notify_one();
}
void Take(T& t)
{
std::unique_lock<std::mutex> locker(m_mutex);
m_notEmpty.wait(locker, [this]{return m_needStop || NotEmpty(); });
if (m_needStop)
return;
t = m_queue.front();
m_queue.pop_front();
m_notFull.notify_one();
}
void Stop()
{
{
std::lock_guard<std::mutex> locker(m_mutex);
m_needStop = true;
}
m_notFull.notify_all();
m_notEmpty.notify_all();
}
bool Empty()
{
std::lock_guard<std::mutex> locker(m_mutex);
return m_queue.empty();
}
bool Full()
{
std::lock_guard<std::mutex> locker(m_mutex);
return m_queue.size() == m_maxSize;
}
size_t Size()
{
std::lock_guard<std::mutex> locker(m_mutex);
return m_queue.size();
}
int Count()
{
return m_queue.size();
}
private:
bool NotFull() const
{
bool full = m_queue.size() >= m_maxSize;
if (full)
cout << "full, waiting,thread id: " << this_thread::get_id() << endl;
return !full;
}
bool NotEmpty() const
{
bool empty = m_queue.empty();
if (empty)
cout << "empty,waiting,thread id: " << this_thread::get_id() << endl;
return !empty;
}
template<typename F>
void Add(F&&x)
{
std::unique_lock< std::mutex> locker(m_mutex);
m_notFull.wait(locker, [this]{return m_needStop || NotFull(); });
if (m_needStop)
return;
m_queue.push_back(std::forward<F>(x));
m_notEmpty.notify_one();
}
private:
std::list<T> m_queue; //缓冲区
std::mutex m_mutex; //互斥量和条件变量结合起来使用
std::condition_variable m_notEmpty;//不为空的条件变量
std::condition_variable m_notFull; //没有满的条件变量
int m_maxSize; //同步队列最大的size
bool m_needStop; //停止的标志
};