/** This file is a part of rexy's general purpose library Copyright (C) 2020 rexy712 This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . */ #ifndef REXY_MPMC_QUEUE_TPP #define REXY_MPMC_QUEUE_TPP #include //forward, move #include //memory_order, atomic #include //memcpy namespace rexy{ template mpmc_queue::slot::slot(const slot& s): m_turn(s.m_turn.load(std::memory_order_acquire)) { memcpy(m_data, s.m_data, sizeof(s.m_data)); } template mpmc_queue::slot::slot(slot&& s): m_turn(s.m_turn.load(std::memory_order_acquire)) { memcpy(m_data, s.m_data, sizeof(s.m_data)); } template mpmc_queue::slot::~slot(){ if(m_turn & active_bit){ destruct(); } } template template void mpmc_queue::slot::construct(Args&&... args){ new (&m_data) value_type(std::forward(args)...); } template void mpmc_queue::slot::destruct(){ reinterpret_cast(&m_data)->~value_type(); } template auto mpmc_queue::slot::get()const& -> const_reference{ return reinterpret_cast(m_data); } template auto mpmc_queue::slot::get()& -> reference{ return reinterpret_cast(m_data); } template auto mpmc_queue::slot::get()&& -> rvalue_reference{ return std::move(reinterpret_cast(m_data)); } template auto mpmc_queue::slot::turn() -> std::atomic&{ return m_turn; } template auto mpmc_queue::slot::turn()const -> const std::atomic&{ return m_turn; } template mpmc_queue::mpmc_queue(size_type capacity): m_slots(capacity), m_head(0), m_tail(0){} template mpmc_queue::mpmc_queue(const mpmc_queue& m): m_slots(m.m_slots), m_head(m.m_head.load()), m_tail(m.m_tail.load()){} template constexpr mpmc_queue::mpmc_queue(mpmc_queue&& m): m_slots(std::move(m.m_slots)), m_head(m.m_head.load()), m_tail(m.m_tail.load()){} template mpmc_queue& mpmc_queue::operator=(const mpmc_queue& m){ return (*this = mpmc_queue(m)); } template constexpr mpmc_queue& mpmc_queue::operator=(mpmc_queue&& m){ std::swap(m_slots, m.m_slots); m_head = m.m_head.load(); m_tail = m.m_tail.load(); return *this; } template void mpmc_queue::resize(size_type newcap){ mpmc_queue tmp(newcap); size_type max = (m_head - m_tail) < newcap ? (m_head - m_tail) : newcap; for(size_type i = m_tail, j = 0;j < max;++i, ++j){ tmp.m_slots[j].get() = std::move(m_slots[i % m_slots.capacity()].get()); tmp.m_slots[j].turn() |= 1; //in-use bit } tmp.m_head = max; tmp.m_tail = 0; *this = std::move(tmp); } template void mpmc_queue::clear(){ size_type head = m_head.load(std::memory_order_acquire); for(size_type i = m_tail;i < head;++i){ m_slots[i].destruct(); m_slots[i].turn().store(0, std::memory_order_release); } m_head.store(0, std::memory_order_release); m_tail.store(0, std::memory_order_release); } template template void mpmc_queue::emplace(Args&&... args){ const size_type head = m_head.fetch_add(1, std::memory_order_seq_cst); slot& s = m_slots[head % m_slots.capacity()]; //lsb is in-use flag. wait for it to be 0 while(rotation_cnt(head) << 1 != s.turn().load(std::memory_order_acquire)); s.construct(std::forward(args)...); //set in-use flag s.turn().store((rotation_cnt(head) << 1) + 1, std::memory_order_release); } template template bool mpmc_queue::try_emplace(Args&&... args){ size_type head = m_head.load(std::memory_order_acquire); while(1){ slot& s = m_slots[head % m_slots.capacity()]; if((rotation_cnt(head) << 1) == s.turn().load(std::memory_order_acquire)){ if(m_head.compare_exchange_strong(head, head+1, std::memory_order_seq_cst)){ s.construct(std::forward(args)...); s.turn().store((rotation_cnt(head) << 1) + 1, std::memory_order_release); return true; } }else{ const size_type prev_head = head; head = m_head.load(std::memory_order_acquire); if(head == prev_head) return false; } } } template void mpmc_queue::push(const_reference t){ emplace(t); } template bool mpmc_queue::try_push(const_reference t){ return try_emplace(t); } template void mpmc_queue::push(rvalue_reference t){ emplace(std::move(t)); } template bool mpmc_queue::try_push(rvalue_reference t){ return try_emplace(std::move(t)); } template void mpmc_queue::pop(reference t){ const size_type tail = m_tail.fetch_add(1, std::memory_order_seq_cst); slot& s = m_slots[tail % m_slots.capacity()]; //lsb is in-use flag. wait for it to be 1 while((rotation_cnt(tail) << 1) + 1 != s.turn().load(std::memory_order_acquire)); t = std::move(s).get(); s.destruct(); s.turn().store((rotation_cnt(tail) << 1) + 2, std::memory_order_release); } template bool mpmc_queue::try_pop(reference t){ size_type tail = m_tail.load(std::memory_order_acquire); while(1){ slot& s = m_slots[tail % m_slots.capacity()]; if((rotation_cnt(tail) << 1) + 1 == s.turn().load(std::memory_order_acquire)){ if(m_tail.compare_exchange_strong(tail, tail+1, std::memory_order_seq_cst)){ t = std::move(s).get(); s.destruct(); s.turn().store((rotation_cnt(tail) << 1) + 2, std::memory_order_release); return true; } }else{ //if the tail hasn't moved, then we're still waiting on producer. //if it has moved, another consumer took our data. try again. const size_type prev_tail = tail; tail = m_tail.load(std::memory_order_acquire); if(tail == prev_tail) return false; } } } template constexpr auto mpmc_queue::rotation_cnt(size_type t) -> size_type{ return (t / m_slots.capacity()); } } #endif