/** This file is a part of rexy's general purpose library Copyright (C) 2020 rexy712 This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . */ #ifndef REXY_MPMC_QUEUE_HPP #define REXY_MPMC_QUEUE_HPP #include //vector (duh) #include //size_t #include //atomic (duh) #include "rexy.hpp" #ifdef __cpp_lib_hardware_interference_size #include //hardware_destructive_interference_size #endif #ifndef __cpp_aligned_new //TODO: custom aligned allocator #error "Require aligned new allocation" #endif namespace rexy{ //multiproducer, multiconsumer thread safe ring buffer with some attempts to optimize against thread thrashing template class mpmc_queue { public: using value_type = T; using size_type = size_t; using pointer = value_type*; using const_pointer = const value_type*; using reference = value_type&; using rvalue_reference = value_type&&; using const_reference = const value_type&; private: #if defined(__cpp_lib_hardware_interference_size) //libc++ bug // https://bugs.llvm.org/show_bug.cgi?id=41423 #if defined(_LIBCPP_VERSION) && _LIBCPP_VERSION >= 11000 static constexpr size_t cacheline_size = std::hardware_destructive_interference_size; #else static constexpr size_t cacheline_size = 64; #endif #else //Best guess static constexpr size_t cacheline_size = 64; #endif class slot { public: static constexpr size_type active_bit = 1; private: //ensure no false sharing with previous slot in queue alignas(cacheline_size) std::atomic m_turn = {0}; alignas(alignof(value_type)) unsigned char m_data[sizeof(value_type)] = {}; //ensure no false sharing with following data char cachline_padding[cacheline_size - (sizeof(m_data) + sizeof(m_turn))]; public: slot() = default; slot(const slot& s); slot(slot&& s); ~slot(); template void construct(Args&&... args); void destruct(); const_reference get()const&; reference get()&; rvalue_reference get()&&; std::atomic& turn(); const std::atomic& turn()const; }; private: std::vector m_slots; //keep head and tail on separate cache lines to prevent thread thrashing alignas(cacheline_size) std::atomic m_head; alignas(cacheline_size) std::atomic m_tail; public: explicit mpmc_queue(size_type capacity); //copy/move NOT thread safe! requires external locking mechanism! mpmc_queue(const mpmc_queue& m); constexpr mpmc_queue(mpmc_queue&& m); ~mpmc_queue() = default; mpmc_queue& operator=(const mpmc_queue& m); constexpr mpmc_queue& operator=(mpmc_queue&& m); //NOT thread safe! requires external locking mechanism! void resize(size_type newcap); //NOT thread safe! requires external locking mechanism! void clear(); template void emplace(Args&&... args); template bool try_emplace(Args&&... args); void push(const_reference t); void push(rvalue_reference t); bool try_push(const_reference t); bool try_push(rvalue_reference t); void pop(reference t); bool try_pop(reference t); private: constexpr size_type rotation_cnt(size_type t); }; } #include "mpmc_queue.tpp" #endif