rexylib/include/rexy/mpmc_queue.hpp

134 lines
3.8 KiB
C++

/**
This file is a part of rexy's general purpose library
Copyright (C) 2020 rexy712
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef REXY_MPMC_QUEUE_HPP
#define REXY_MPMC_QUEUE_HPP
#include <vector> //vector (duh)
#include <cstdlib> //size_t
#include <atomic> //atomic (duh)
#include "rexy.hpp"
#include "utility.hpp" //min
#ifdef __cpp_lib_hardware_interference_size
#include <new> //hardware_destructive_interference_size
#endif
#ifndef __cpp_aligned_new
//TODO: custom aligned allocator
#error "Require aligned new allocation"
#endif
namespace rexy{
//multiproducer, multiconsumer thread safe ring buffer with some attempts to optimize against thread thrashing
template<class T>
class mpmc_queue
{
public:
using value_type = T;
using size_type = size_t;
using pointer = value_type*;
using const_pointer = const value_type*;
using reference = value_type&;
using rvalue_reference = value_type&&;
using const_reference = const value_type&;
private:
#if defined(__cpp_lib_hardware_interference_size)
//libc++ bug
// https://bugs.llvm.org/show_bug.cgi?id=41423
#if defined(_LIBCPP_VERSION) && _LIBCPP_VERSION >= 11000
static constexpr size_t cacheline_size = std::hardware_destructive_interference_size;
#else
static constexpr size_t cacheline_size = 64;
#endif
#else
//Best guess
static constexpr size_t cacheline_size = 64;
#endif
class slot
{
public:
static constexpr size_type active_bit = 1;
private:
//ensure no false sharing with previous slot in queue
alignas(cacheline_size) std::atomic<size_type> m_turn = {0};
alignas(alignof(value_type)) unsigned char m_data[sizeof(value_type)] = {};
//ensure no false sharing with following data
char cachline_padding[cacheline_size - ((sizeof(m_data) + sizeof(m_turn)) % cacheline_size)];
public:
slot() = default;
slot(const slot& s);
slot(slot&& s);
~slot();
template<class... Args>
void construct(Args&&... args);
void destruct();
const_reference get()const&;
reference get()&;
rvalue_reference get()&&;
std::atomic<size_type>& turn();
const std::atomic<size_type>& turn()const;
};
private:
std::vector<slot> m_slots;
//keep head and tail on separate cache lines to prevent thread thrashing
alignas(cacheline_size) std::atomic<size_type> m_head;
alignas(cacheline_size) std::atomic<size_type> m_tail;
public:
explicit mpmc_queue(size_type capacity);
//copy/move NOT thread safe! requires external locking mechanism!
mpmc_queue(const mpmc_queue& m);
constexpr mpmc_queue(mpmc_queue&& m);
~mpmc_queue() = default;
mpmc_queue& operator=(const mpmc_queue& m);
constexpr mpmc_queue& operator=(mpmc_queue&& m);
//NOT thread safe! requires external locking mechanism!
void resize(size_type newcap);
//NOT thread safe! requires external locking mechanism!
void clear();
template<class... Args>
void emplace(Args&&... args);
template<class... Args>
bool try_emplace(Args&&... args);
void push(const_reference t);
void push(rvalue_reference t);
bool try_push(const_reference t);
bool try_push(rvalue_reference t);
void pop(reference t);
bool try_pop(reference t);
private:
constexpr size_type rotation_cnt(size_type t);
};
}
#include "mpmc_queue.tpp"
#endif