/**
This file is a part of rexy's general purpose library
Copyright (C) 2020 rexy712
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see .
*/
#ifndef REXY_MPMC_QUEUE_TPP
#define REXY_MPMC_QUEUE_TPP
#include //forward, move
#include //memory_order, atomic
#include //memcpy
namespace rexy{
template
mpmc_queue::slot::slot(const slot& s):
m_turn(s.m_turn.load(std::memory_order_acquire))
{
memcpy(m_data, s.m_data, sizeof(s.m_data));
}
template
mpmc_queue::slot::slot(slot&& s):
m_turn(s.m_turn.load(std::memory_order_acquire))
{
memcpy(m_data, s.m_data, sizeof(s.m_data));
}
template
mpmc_queue::slot::~slot(){
if(m_turn & active_bit){
destruct();
}
}
template
template
void mpmc_queue::slot::construct(Args&&... args){
new (&m_data) value_type(std::forward(args)...);
}
template
void mpmc_queue::slot::destruct(){
reinterpret_cast(&m_data)->~value_type();
}
template
auto mpmc_queue::slot::get()const& -> const_reference{
return reinterpret_cast(m_data);
}
template
auto mpmc_queue::slot::get()& -> reference{
return reinterpret_cast(m_data);
}
template
auto mpmc_queue::slot::get()&& -> rvalue_reference{
return std::move(reinterpret_cast(m_data));
}
template
auto mpmc_queue::slot::turn() -> std::atomic&{
return m_turn;
}
template
auto mpmc_queue::slot::turn()const -> const std::atomic&{
return m_turn;
}
template
mpmc_queue::mpmc_queue(size_type capacity):
m_slots(capacity),
m_head(0),
m_tail(0){}
template
mpmc_queue::mpmc_queue(const mpmc_queue& m):
m_slots(m.m_slots),
m_head(m.m_head.load()),
m_tail(m.m_tail.load()){}
template
constexpr mpmc_queue::mpmc_queue(mpmc_queue&& m):
m_slots(std::move(m.m_slots)),
m_head(m.m_head.load()),
m_tail(m.m_tail.load()){}
template
mpmc_queue& mpmc_queue::operator=(const mpmc_queue& m){
return (*this = mpmc_queue(m));
}
template
constexpr mpmc_queue& mpmc_queue::operator=(mpmc_queue&& m){
std::swap(m_slots, m.m_slots);
m_head = m.m_head.load();
m_tail = m.m_tail.load();
return *this;
}
template
void mpmc_queue::resize(size_type newcap){
mpmc_queue tmp(newcap);
size_type max = (m_head - m_tail) < newcap ? (m_head - m_tail) : newcap;
for(size_type i = m_tail, j = 0;j < max;++i, ++j){
tmp.m_slots[j].get() = std::move(m_slots[i % m_slots.capacity()].get());
tmp.m_slots[j].turn() |= 1; //in-use bit
}
tmp.m_head = max;
tmp.m_tail = 0;
*this = std::move(tmp);
}
template
void mpmc_queue::clear(){
size_type head = m_head.load(std::memory_order_acquire);
for(size_type i = m_tail;i < head;++i){
m_slots[i].destruct();
m_slots[i].turn().store(0, std::memory_order_release);
}
m_head.store(0, std::memory_order_release);
m_tail.store(0, std::memory_order_release);
}
template
template
void mpmc_queue::emplace(Args&&... args){
const size_type head = m_head.fetch_add(1, std::memory_order_seq_cst);
slot& s = m_slots[head % m_slots.capacity()];
const size_type rot_count = rotation_cnt(head);
//lsb is in-use flag. wait for it to be 0
while(rot_count << 1 != s.turn().load(std::memory_order_acquire));
s.construct(std::forward(args)...);
//set in-use flag
s.turn().store((rot_count << 1) + 1, std::memory_order_release);
}
template
template
bool mpmc_queue::try_emplace(Args&&... args){
size_type head = m_head.load(std::memory_order_acquire);
while(1){
slot& s = m_slots[head % m_slots.capacity()];
if((rotation_cnt(head) << 1) == s.turn().load(std::memory_order_acquire)){
if(m_head.compare_exchange_strong(head, head+1, std::memory_order_seq_cst)){
s.construct(std::forward(args)...);
s.turn().store((rotation_cnt(head) << 1) + 1, std::memory_order_release);
return true;
}
}else{
const size_type prev_head = head;
head = m_head.load(std::memory_order_acquire);
if(head == prev_head)
return false;
}
}
}
template
void mpmc_queue::push(const_reference t){
emplace(t);
}
template
bool mpmc_queue::try_push(const_reference t){
return try_emplace(t);
}
template
void mpmc_queue::push(rvalue_reference t){
emplace(std::move(t));
}
template
bool mpmc_queue::try_push(rvalue_reference t){
return try_emplace(std::move(t));
}
template
void mpmc_queue::pop(reference t){
const size_type tail = m_tail.fetch_add(1, std::memory_order_seq_cst);
slot& s = m_slots[tail % m_slots.capacity()];
//lsb is in-use flag. wait for it to be 1
while((rotation_cnt(tail) << 1) + 1 != s.turn().load(std::memory_order_acquire));
t = std::move(s).get();
s.destruct();
s.turn().store((rotation_cnt(tail) << 1) + 2, std::memory_order_release);
}
template
bool mpmc_queue::try_pop(reference t){
size_type tail = m_tail.load(std::memory_order_acquire);
while(1){
slot& s = m_slots[tail % m_slots.capacity()];
if((rotation_cnt(tail) << 1) + 1 == s.turn().load(std::memory_order_acquire)){
if(m_tail.compare_exchange_strong(tail, tail+1, std::memory_order_seq_cst)){
t = std::move(s).get();
s.destruct();
s.turn().store((rotation_cnt(tail) << 1) + 2, std::memory_order_release);
return true;
}
}else{
//if the tail hasn't moved, then we're still waiting on producer.
//if it has moved, another consumer took our data. try again.
const size_type prev_tail = tail;
tail = m_tail.load(std::memory_order_acquire);
if(tail == prev_tail)
return false;
}
}
}
template
auto mpmc_queue::size(void)const -> size_type{
return m_slots.size();
}
template
constexpr auto mpmc_queue::rotation_cnt(size_type t) -> size_type{
return (t / m_slots.capacity());
}
}
#endif