Add mpmc_queue classes taken from our_dick project
This commit is contained in:
parent
8873049f9f
commit
b22c525407
@ -34,7 +34,7 @@ if(BUILD_TESTS)
|
||||
add_subdirectory(tests)
|
||||
endif()
|
||||
|
||||
set(LIBREXY_PUBLIC_HEADERS "include/rexy/algorithm.hpp" "include/rexy/utility.hpp" "include/rexy/basic_string_hash.hpp" "include/rexy/hash.hpp" "include/rexy/static_string_hash.hpp" "include/rexy/string_hash.hpp" "include/rexy/traits.hpp" "include/rexy/steal.hpp" "include/rexy/binary.hpp" "include/rexy/expression.hpp" "include/rexy/binary_base.hpp" "include/rexy/binary_base.tpp" "include/rexy/string_base.hpp" "include/rexy/string.hpp" "include/rexy/filerd.hpp" "include/rexy/string_base.tpp" "include/rexy/allocator.hpp")
|
||||
set(LIBREXY_PUBLIC_HEADERS "include/rexy/algorithm.hpp" "include/rexy/utility.hpp" "include/rexy/basic_string_hash.hpp" "include/rexy/hash.hpp" "include/rexy/static_string_hash.hpp" "include/rexy/string_hash.hpp" "include/rexy/mpmc_queue.hpp" "include/rexy/mpmc_queue.tpp" "include/rexy/traits.hpp" "include/rexy/steal.hpp" "include/rexy/binary.hpp" "include/rexy/expression.hpp" "include/rexy/binary_base.hpp" "include/rexy/binary_base.tpp" "include/rexy/string_base.hpp" "include/rexy/string.hpp" "include/rexy/filerd.hpp" "include/rexy/string_base.tpp" "include/rexy/allocator.hpp")
|
||||
target_compile_options(rexy PRIVATE -Wall -Wextra -pedantic -std=c++17)
|
||||
|
||||
install(TARGETS rexy
|
||||
|
||||
124
include/rexy/mpmc_queue.hpp
Normal file
124
include/rexy/mpmc_queue.hpp
Normal file
@ -0,0 +1,124 @@
|
||||
/**
|
||||
This file is a part of rexy's general purpose library
|
||||
Copyright (C) 2020 rexy712
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU Affero General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU Affero General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Affero General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef REXY_MPMC_QUEUE_HPP
|
||||
#define REXY_MPMC_QUEUE_HPP
|
||||
|
||||
#include <vector> //vector (duh)
|
||||
#include <cstdlib> //size_t
|
||||
#include <atomic> //atomic (duh)
|
||||
|
||||
#ifdef __cpp_lib_hardware_interference_size
|
||||
#include <new> //hardware_destructive_interference_size
|
||||
#endif
|
||||
|
||||
#ifndef __cpp_aligned_new
|
||||
//TODO: custom aligned allocator
|
||||
#error "Require aligned new allocation"
|
||||
#endif
|
||||
|
||||
namespace rexy{
|
||||
|
||||
//multiproducer, multiconsumer thread safe ring buffer with some attempts to optimize against thread thrashing
|
||||
template<class T>
|
||||
class mpmc_queue
|
||||
{
|
||||
public:
|
||||
using value_type = T;
|
||||
using size_type = size_t;
|
||||
using pointer = value_type*;
|
||||
using const_pointer = const value_type*;
|
||||
using reference = value_type&;
|
||||
using rvalue_reference = value_type&&;
|
||||
using const_reference = const value_type&;
|
||||
|
||||
private:
|
||||
#ifdef __cpp_lib_hardware_interference_size
|
||||
static constexpr size_t cacheline_size = std::hardware_destructive_interference_size;
|
||||
#else
|
||||
//Best guess
|
||||
static constexpr size_t cacheline_size = 64;
|
||||
#endif
|
||||
|
||||
class slot
|
||||
{
|
||||
public:
|
||||
static constexpr size_type active_bit = 1;
|
||||
private:
|
||||
//ensure no false sharing with previous slot in queue
|
||||
alignas(cacheline_size) std::atomic<size_type> m_turn = {0};
|
||||
alignas(alignof(value_type)) unsigned char m_data[sizeof(value_type)] = {};
|
||||
//ensure no false sharing with following data
|
||||
char cachline_padding[cacheline_size - (sizeof(m_data) + sizeof(m_turn))];
|
||||
public:
|
||||
slot() = default;
|
||||
slot(const slot& s);
|
||||
slot(slot&& s);
|
||||
~slot();
|
||||
template<class... Args>
|
||||
void construct(Args&&... args);
|
||||
void destruct();
|
||||
|
||||
const_reference get()const&;
|
||||
reference get()&;
|
||||
rvalue_reference get()&&;
|
||||
std::atomic<size_type>& turn();
|
||||
const std::atomic<size_type>& turn()const;
|
||||
};
|
||||
|
||||
private:
|
||||
std::vector<slot> m_slots;
|
||||
//keep head and tail on separate cache lines to prevent thread thrashing
|
||||
alignas(cacheline_size) std::atomic<size_type> m_head;
|
||||
alignas(cacheline_size) std::atomic<size_type> m_tail;
|
||||
|
||||
public:
|
||||
explicit mpmc_queue(size_type capacity);
|
||||
//copy/move NOT thread safe! requires external locking mechanism!
|
||||
mpmc_queue(const mpmc_queue& m);
|
||||
constexpr mpmc_queue(mpmc_queue&& m);
|
||||
~mpmc_queue() = default;
|
||||
mpmc_queue& operator=(const mpmc_queue& m);
|
||||
constexpr mpmc_queue& operator=(mpmc_queue&& m);
|
||||
|
||||
//NOT thread safe! requires external locking mechanism!
|
||||
void resize(size_type newcap);
|
||||
//NOT thread safe! requires external locking mechanism!
|
||||
void clear();
|
||||
|
||||
template<class... Args>
|
||||
void emplace(Args&&... args);
|
||||
template<class... Args>
|
||||
bool try_emplace(Args&&... args);
|
||||
|
||||
void push(const_reference t);
|
||||
void push(rvalue_reference t);
|
||||
bool try_push(const_reference t);
|
||||
bool try_push(rvalue_reference t);
|
||||
|
||||
void pop(reference t);
|
||||
bool try_pop(reference t);
|
||||
private:
|
||||
constexpr size_type rotation_cnt(size_type t);
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
#include "mpmc_queue.tpp"
|
||||
|
||||
#endif
|
||||
217
include/rexy/mpmc_queue.tpp
Normal file
217
include/rexy/mpmc_queue.tpp
Normal file
@ -0,0 +1,217 @@
|
||||
/**
|
||||
This file is a part of rexy's general purpose library
|
||||
Copyright (C) 2020 rexy712
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU Affero General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU Affero General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Affero General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef REXY_MPMC_QUEUE_TPP
|
||||
#define REXY_MPMC_QUEUE_TPP
|
||||
|
||||
#include <utility> //forward, move
|
||||
#include <atomic> //memory_order, atomic
|
||||
#include <cstring> //memcpy
|
||||
|
||||
namespace rexy{
|
||||
template<class T>
|
||||
mpmc_queue<T>::slot::slot(const slot& s):
|
||||
m_turn(s.m_turn.load(std::memory_order_acquire))
|
||||
{
|
||||
memcpy(m_data, s.m_data, sizeof(s.m_data));
|
||||
}
|
||||
template<class T>
|
||||
mpmc_queue<T>::slot::slot(slot&& s):
|
||||
m_turn(s.m_turn.load(std::memory_order_acquire))
|
||||
{
|
||||
memcpy(m_data, s.m_data, sizeof(s.m_data));
|
||||
}
|
||||
template<class T>
|
||||
mpmc_queue<T>::slot::~slot(){
|
||||
if(m_turn & active_bit){
|
||||
destruct();
|
||||
}
|
||||
}
|
||||
template<class T>
|
||||
template<class... Args>
|
||||
void mpmc_queue<T>::slot::construct(Args&&... args){
|
||||
new (&m_data) value_type(std::forward<Args>(args)...);
|
||||
}
|
||||
template<class T>
|
||||
void mpmc_queue<T>::slot::destruct(){
|
||||
reinterpret_cast<pointer>(&m_data)->~value_type();
|
||||
}
|
||||
|
||||
template<class T>
|
||||
auto mpmc_queue<T>::slot::get()const& -> const_reference{
|
||||
return reinterpret_cast<const_reference>(m_data);
|
||||
}
|
||||
template<class T>
|
||||
auto mpmc_queue<T>::slot::get()& -> reference{
|
||||
return reinterpret_cast<reference>(m_data);
|
||||
}
|
||||
template<class T>
|
||||
auto mpmc_queue<T>::slot::get()&& -> rvalue_reference{
|
||||
return std::move(reinterpret_cast<reference>(m_data));
|
||||
}
|
||||
template<class T>
|
||||
auto mpmc_queue<T>::slot::turn() -> std::atomic<size_type>&{
|
||||
return m_turn;
|
||||
}
|
||||
template<class T>
|
||||
auto mpmc_queue<T>::slot::turn()const -> const std::atomic<size_type>&{
|
||||
return m_turn;
|
||||
}
|
||||
|
||||
template<class T>
|
||||
mpmc_queue<T>::mpmc_queue(size_type capacity):
|
||||
m_slots(capacity),
|
||||
m_head(0),
|
||||
m_tail(0){}
|
||||
|
||||
template<class T>
|
||||
mpmc_queue<T>::mpmc_queue(const mpmc_queue& m):
|
||||
m_slots(m.m_slots),
|
||||
m_head(m.m_head.load()),
|
||||
m_tail(m.m_tail.load()){}
|
||||
template<class T>
|
||||
constexpr mpmc_queue<T>::mpmc_queue(mpmc_queue&& m):
|
||||
m_slots(std::move(m.m_slots)),
|
||||
m_head(m.m_head.load()),
|
||||
m_tail(m.m_tail.load()){}
|
||||
template<class T>
|
||||
mpmc_queue<T>& mpmc_queue<T>::operator=(const mpmc_queue& m){
|
||||
return (*this = mpmc_queue(m));
|
||||
}
|
||||
template<class T>
|
||||
constexpr mpmc_queue<T>& mpmc_queue<T>::operator=(mpmc_queue&& m){
|
||||
std::swap(m_slots, m.m_slots);
|
||||
m_head = m.m_head.load();
|
||||
m_tail = m.m_tail.load();
|
||||
return *this;
|
||||
}
|
||||
|
||||
template<class T>
|
||||
void mpmc_queue<T>::resize(size_type newcap){
|
||||
mpmc_queue tmp(newcap);
|
||||
size_type max = (m_head - m_tail) < newcap ? (m_head - m_tail) : newcap;
|
||||
for(size_type i = m_tail, j = 0;j < max;++i, ++j){
|
||||
tmp.m_slots[j].get() = std::move(m_slots[i % m_slots.capacity()].get());
|
||||
tmp.m_slots[j].turn() |= 1; //in-use bit
|
||||
}
|
||||
tmp.m_head = max;
|
||||
tmp.m_tail = 0;
|
||||
*this = std::move(tmp);
|
||||
}
|
||||
template<class T>
|
||||
void mpmc_queue<T>::clear(){
|
||||
size_type head = m_head.load(std::memory_order_acquire);
|
||||
for(size_type i = m_tail;i < head;++i){
|
||||
m_slots[i].destruct();
|
||||
m_slots[i].turn().store(0, std::memory_order_release);
|
||||
}
|
||||
m_head.store(0, std::memory_order_release);
|
||||
m_tail.store(0, std::memory_order_release);
|
||||
}
|
||||
template<class T>
|
||||
template<class... Args>
|
||||
void mpmc_queue<T>::emplace(Args&&... args){
|
||||
const size_type head = m_head.fetch_add(1, std::memory_order_seq_cst);
|
||||
slot& s = m_slots[head % m_slots.capacity()];
|
||||
//lsb is in-use flag. wait for it to be 0
|
||||
while(rotation_cnt(head) << 1 != s.turn().load(std::memory_order_acquire));
|
||||
|
||||
s.construct(std::forward<Args>(args)...);
|
||||
//set in-use flag
|
||||
s.turn().store((rotation_cnt(head) << 1) + 1, std::memory_order_release);
|
||||
}
|
||||
template<class T>
|
||||
template<class... Args>
|
||||
bool mpmc_queue<T>::try_emplace(Args&&... args){
|
||||
size_type head = m_head.load(std::memory_order_acquire);
|
||||
while(1){
|
||||
slot& s = m_slots[head % m_slots.capacity()];
|
||||
if((rotation_cnt(head) << 1) == s.turn().load(std::memory_order_acquire)){
|
||||
if(m_head.compare_exchange_strong(head, head+1, std::memory_order_seq_cst)){
|
||||
s.construct(std::forward<Args>(args)...);
|
||||
s.turn().store((rotation_cnt(head) << 1) + 1, std::memory_order_release);
|
||||
return true;
|
||||
}
|
||||
}else{
|
||||
const size_type prev_head = head;
|
||||
head = m_head.load(std::memory_order_acquire);
|
||||
if(head == prev_head)
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
template<class T>
|
||||
void mpmc_queue<T>::push(const_reference t){
|
||||
emplace(t);
|
||||
}
|
||||
template<class T>
|
||||
bool mpmc_queue<T>::try_push(const_reference t){
|
||||
return try_emplace(t);
|
||||
}
|
||||
template<class T>
|
||||
void mpmc_queue<T>::push(rvalue_reference t){
|
||||
emplace(std::move(t));
|
||||
}
|
||||
template<class T>
|
||||
bool mpmc_queue<T>::try_push(rvalue_reference t){
|
||||
return try_emplace(std::move(t));
|
||||
}
|
||||
|
||||
template<class T>
|
||||
void mpmc_queue<T>::pop(reference t){
|
||||
const size_type tail = m_tail.fetch_add(1, std::memory_order_seq_cst);
|
||||
slot& s = m_slots[tail % m_slots.capacity()];
|
||||
|
||||
//lsb is in-use flag. wait for it to be 1
|
||||
while((rotation_cnt(tail) << 1) + 1 != s.turn().load(std::memory_order_acquire));
|
||||
|
||||
t = std::move(s).get();
|
||||
s.destruct();
|
||||
s.turn().store((rotation_cnt(tail) << 1) + 2, std::memory_order_release);
|
||||
}
|
||||
template<class T>
|
||||
bool mpmc_queue<T>::try_pop(reference t){
|
||||
size_type tail = m_tail.load(std::memory_order_acquire);
|
||||
while(1){
|
||||
slot& s = m_slots[tail % m_slots.capacity()];
|
||||
if((rotation_cnt(tail) << 1) + 1 == s.turn().load(std::memory_order_acquire)){
|
||||
if(m_tail.compare_exchange_strong(tail, tail+1, std::memory_order_seq_cst)){
|
||||
t = std::move(s).get();
|
||||
s.destruct();
|
||||
s.turn().store((rotation_cnt(tail) << 1) + 2, std::memory_order_release);
|
||||
return true;
|
||||
}
|
||||
}else{
|
||||
//if the tail hasn't moved, then we're still waiting on producer.
|
||||
//if it has moved, another consumer took our data. try again.
|
||||
const size_type prev_tail = tail;
|
||||
tail = m_tail.load(std::memory_order_acquire);
|
||||
if(tail == prev_tail)
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
template<class T>
|
||||
constexpr auto mpmc_queue<T>::rotation_cnt(size_type t) -> size_type{
|
||||
return (t / m_slots.capacity());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#endif
|
||||
@ -20,6 +20,7 @@
|
||||
#define REXY_UTILITY_HPP
|
||||
|
||||
#include <utility> //forward, move
|
||||
#include <cstdlib> //size_t
|
||||
#include <type_traits>
|
||||
|
||||
namespace rexy{
|
||||
|
||||
@ -1,27 +1,28 @@
|
||||
//Never actually used in the project. This just ensures that all syntax is correct during builds.
|
||||
|
||||
#include "rexy/algorithm.hpp"
|
||||
#include "rexy/allocator.hpp"
|
||||
#include "rexy/binary.hpp"
|
||||
#include "rexy/binary_base.hpp"
|
||||
#include "rexy/binary_base.tpp"
|
||||
#include "rexy/expression.hpp"
|
||||
#include "rexy/filerd.hpp"
|
||||
#include "rexy/hash.hpp"
|
||||
#include "rexy/mpmc_queue.hpp"
|
||||
#include "rexy/steal.hpp"
|
||||
#include "rexy/string_base.hpp"
|
||||
#include "rexy/string_base.tpp"
|
||||
#include "rexy/string_hash.hpp"
|
||||
#include "rexy/string.hpp"
|
||||
#include "rexy/traits.hpp"
|
||||
#include "rexy/utility.hpp"
|
||||
|
||||
#include "rexy/detail/binary_string_conv.hpp"
|
||||
#include "rexy/allocator.hpp"
|
||||
#include "rexy/detail/string_appender.hpp"
|
||||
|
||||
#include "rexy/algorithm.hpp"
|
||||
#include "rexy/cx/array.hpp"
|
||||
#include "rexy/hash.hpp"
|
||||
#include "rexy/cx/hashmap.hpp"
|
||||
#include "rexy/string_hash.hpp"
|
||||
#include "rexy/cx/string.hpp"
|
||||
#include "rexy/utility.hpp"
|
||||
#include "rexy/cx/vector.hpp"
|
||||
|
||||
#include "rexy/cx/detail/bool_specialize_base.hpp"
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user