-
-
Notifications
You must be signed in to change notification settings - Fork 2
/
static_ring_buffer
203 lines (164 loc) · 10.3 KB
/
static_ring_buffer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
/*
Copyright (C) 2018-2024 Geoffrey Daniels. https://gpdaniels.com/
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, version 3 of the License only.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#pragma once
#ifndef GTL_CONTAINER_STATIC_RING_BUFFER_HPP
#define GTL_CONTAINER_STATIC_RING_BUFFER_HPP
// Summary: Statically sized thread-safe multi-producer multi-consumer ring-buffer.
#if defined(_MSC_VER)
# pragma warning(push, 0)
#endif
#include <atomic>
#if defined(_MSC_VER)
# pragma warning(pop)
#endif
namespace gtl {
/// @brief The static_ring_buffer class implements a thread-safe multi-producer multi-consumer ring-buffer.
template <typename data_type, unsigned int static_data_size>
class static_ring_buffer final {
private:
static_assert(static_data_size != 0, "Data size must be greater than 0.");
static_assert(static_data_size <= 0x80000000, "Data size must be less than 2^31.");
public:
/// @brief Make the data type publically accessible.
using type = data_type;
/// @brief Make the data size publically accessible.
constexpr static const unsigned int data_size = static_data_size;
private:
/// @brief Structure that holds a read and write index.
struct index_type final {
unsigned int read;
unsigned int write;
};
private:
/// @brief Reader holds the current write and pending read locations.
std::atomic<index_type> reader;
/// @brief Writer holds the current read and pending write locations.
std::atomic<index_type> writer;
/// @brief The ring buffer data array.
type data[data_size];
public:
/// @brief Defaulted destructor.
~static_ring_buffer() = default;
/// @brief Constructor zeros the atomic reader and writer structures.
static_ring_buffer()
: reader{}
, writer{} {
}
static_ring_buffer(const static_ring_buffer& other) = delete;
static_ring_buffer(static_ring_buffer&& other) = delete;
static_ring_buffer& operator=(const static_ring_buffer& other) = delete;
static_ring_buffer& operator=(static_ring_buffer&& other) = delete;
public:
/// @brief Get a boolean that represents if the ring buffer is empty, ignoring pending reads and writess.
/// @return true if the ring buffer is empty, false otherwise.
bool empty() const {
//Both the reader and writer are used here to get both the current read and current write locations.
const index_type current_reader = this->reader.load();
const index_type current_writer = this->writer.load();
// If the current read is equal to the pending write then the ring buffer is full.
return (current_writer.read == current_reader.write);
}
/// @brief Get a boolean that represents if the ring buffer is full, ignoring pending reads and writes.
/// @return true if the ring buffer is full, false otherwise.
bool full() const {
//Both the reader and writer are used here to get both the current read and current write locations.
const index_type current_reader = this->reader.load();
const index_type current_writer = this->writer.load();
// First calculate the current size, as the locations can be either size of one another use the ternary operator to compare them first.
const unsigned int current_size = (current_writer.read > current_reader.write) ? (current_writer.read - current_reader.write) : (current_reader.write - current_writer.read);
// If the data size is zero we are full, otherwise if the current size is zero we are empty, otherwise if the current size is a multiple of the buffer size we are full.
return ((this->data_size == 0) || ((current_size != 0) && ((current_size % this->data_size) == 0)));
}
public:
/// @brief Get the size of the ring buffer that is filled with elements, ignoring pending reads and writes.
/// @return The number of items pushed into the ring buffer that have not been popped out.
unsigned long long size() const {
//Both the reader and writer are used here to get both the current read and current write locations.
const index_type current_reader = this->writer.load();
const index_type current_writer = this->reader.load();
// First calculate the current size, as the locations can be either size of one another use the ternary operator to compare them first.
const unsigned int current_size = (current_writer.read > current_reader.write) ? (current_writer.read - current_reader.write) : (current_reader.write - current_writer.read);
// If the current size is zero then size is zero, otherwise if current size is a multiple of the data size we are full, otherwise the size is the current size remainder.
return (current_size == 0) ? 0 : (((current_size % this->data_size) == 0) ? this->data_size : current_size % this->data_size);
}
public:
/// @brief Attempt to push a value into the ring buffer.
/// @param value An input parameter to providing the value to push into the ring buffer.
/// @return true if value was successfully stored in the ring buffer, false otherwise.
bool try_push(const type& value) {
// Create a local copy of the writer.
index_type current_writer = this->writer.load();
// Check if the ring buffer is full.
const unsigned int current_size = (current_writer.read > current_writer.write) ? (current_writer.read - current_writer.write) : (current_writer.write - current_writer.read);
if ((current_size != 0) && ((current_size % this->data_size) == 0)) {
return false;
}
// Create a new writer location to hold the assigned/allocated/reserved pending write index.
index_type new_writer = { current_writer.read, (current_writer.write + 1) % (2 * this->data_size) };
// Attempt to assign/allocate/reserve an index for writing using atomic_compare_exchange_weak.
// Remember the writer holds the current read and pending write locations, so this tries to increment the pending write location.
// if (this->writer == current_writer) { this->writer = new_writer; } else { current_writer = this->writer; }
if (!std::atomic_compare_exchange_weak(&this->writer, ¤t_writer, new_writer)) {
return false;
}
// Write the value to the buffer at the pending write index.
this->data[current_writer.write % this->data_size] = value;
// Create a local copy of the reader.
index_type current_reader = this->reader.load();
// Update the reader to publish the pending write using atomic_compare_exchange_weak.
// Remember the reader holds the current write and the pending read locations, so this tries to set the current write location.
// if (this->reader == current_reader) { this->reader = { ... }; } else { current_reader = this->reader; }
do {
// Overwrite the readers current write location to ensure that pushes are finalised in order.
current_reader.write = current_writer.write;
}
while (!std::atomic_compare_exchange_weak(&this->reader, ¤t_reader, { current_reader.read, new_writer.write }));
// Success.
return true;
}
/// @brief Attempt to pop a value from the ring buffer.
/// @param[out] value An output parameter to store the value that is popped out of the ring buffer.
/// @return true if value was successfully recovered from the ring buffer, false otherwise.
bool try_pop(type& value) {
// Create a local copy of the reader.
index_type current_reader = this->reader.load();
// Check if the ring buffer is empty.
if (current_reader.read == current_reader.write) {
return false;
}
// Create a new reader location to hold the assigned/allocated/reserved pending read index.
index_type new_reader = { (current_reader.read + 1) % (2 * this->data_size), current_reader.write };
// Attempt to assign/allocate/reserve an index for reading using atomic_compare_exchange_weak.
// Remember the reader holds the current write and pending read locations, so this tries to increment the pending read location.
// if (this->reader == current_reader) { this->reader = new_reader; } else { current_reader = this->reader; }
if (!std::atomic_compare_exchange_weak(&this->reader, ¤t_reader, new_reader)) {
return false;
}
// Read the value from the buffer at the pending read index.
value = this->data[current_reader.read % this->data_size];
// Create a local copy of the writer.
index_type current_writer = this->writer.load();
// Update the writer to publish the pending read using atomic_compare_exchange_weak.
// Remember the writer holds the current read and the pending write locations, so this tries to set the current read location.
// if (this->writer == current_writer) { this->writer = { ... }; } else { current_writer = this->writer; }
do {
// Overwrite the writers current read location to ensure that pops are finalised in order.
current_writer.read = current_reader.read;
}
while (!std::atomic_compare_exchange_weak(&this->writer, ¤t_writer, { new_reader.read, current_writer.write }));
// Success.
return true;
}
};
}
#endif // GTL_CONTAINER_STATIC_RING_BUFFER_HPP