Skip to content

Commit

Permalink
Stateful allocator support for concurrent_queue and concurrent_bounde…
Browse files Browse the repository at this point in the history
…d_queue
  • Loading branch information
YexuanXiao committed Sep 23, 2024
1 parent 498a0c7 commit 18188f0
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 39 deletions.
82 changes: 47 additions & 35 deletions include/oneapi/tbb/concurrent_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,25 +137,29 @@ class concurrent_queue {
}

concurrent_queue& operator=( const concurrent_queue& other ) {
//TODO: implement support for std::allocator_traits::propagate_on_container_copy_assignment
if (my_queue_representation != other.my_queue_representation) {
clear();
if (my_queue_representation != other.my_queue_representation)
return *this;
clear();
if (queue_allocator_traits::propagate_on_container_move_assignment::value) {
my_allocator = other.my_allocator;
my_queue_representation->assign(*other.my_queue_representation, my_allocator, copy_construct_item);
}
my_queue_representation->assign(*other.my_queue_representation, my_allocator, copy_construct_item);
return *this;
}

concurrent_queue& operator=( concurrent_queue&& other ) {
//TODO: implement support for std::allocator_traits::propagate_on_container_move_assignment
if (my_queue_representation != other.my_queue_representation) {
clear();
if (my_queue_representation == other.my_queue_representation)
return *this;
clear();
if (queue_allocator_traits::propagate_on_container_move_assignment::value) {
my_allocator = std::move(other.my_allocator);
internal_swap(other);
} else {
if (my_allocator == other.my_allocator) {
internal_swap(other);
} else {
my_queue_representation->assign(*other.my_queue_representation, other.my_allocator, move_construct_item);
my_queue_representation->assign(*other.my_queue_representation, my_allocator, move_construct_item);
other.clear();
my_allocator = std::move(other.my_allocator);
}
}
return *this;
Expand All @@ -178,8 +182,12 @@ class concurrent_queue {
}

void swap ( concurrent_queue& other ) {
//TODO: implement support for std::allocator_traits::propagate_on_container_swap
__TBB_ASSERT(my_allocator == other.my_allocator, "unequal allocators");
if (queue_allocator_traits::propagate_on_container_swap::value) {
using std::swap;
swap(my_allocator, other.my_allocator);
} else {
__TBB_ASSERT(my_allocator == other.my_allocator, "unequal allocators");
}
internal_swap(other);
}

Expand Down Expand Up @@ -253,15 +261,13 @@ class concurrent_queue {
template <typename Container, typename Value, typename A>
friend class concurrent_queue_iterator;

static void copy_construct_item(T* location, const void* src) {
// TODO: use allocator_traits for copy construction
new (location) value_type(*static_cast<const value_type*>(src));
// queue_allocator_traits::construct(my_allocator, location, *static_cast<const T*>(src));

static void copy_construct_item(queue_allocator_type& allocator, T* location, const void* src) {
queue_allocator_traits::construct(allocator, location, *static_cast<const T*>(src));
}

static void move_construct_item(T* location, const void* src) {
// TODO: use allocator_traits for move construction
new (location) value_type(std::move(*static_cast<value_type*>(const_cast<void*>(src))));
static void move_construct_item(queue_allocator_type& allocator, T* location, const void* src) {
queue_allocator_traits::construct(allocator, location, std::move(*static_cast<value_type*>(const_cast<void*>(src))));
}

queue_allocator_type my_allocator;
Expand Down Expand Up @@ -416,25 +422,29 @@ class concurrent_bounded_queue {
}

concurrent_bounded_queue& operator=( const concurrent_bounded_queue& other ) {
//TODO: implement support for std::allocator_traits::propagate_on_container_copy_assignment
if (my_queue_representation != other.my_queue_representation) {
clear();
if (my_queue_representation != other.my_queue_representation)
return *this;
clear();
if (queue_allocator_traits::propagate_on_container_move_assignment::value) {
my_allocator = other.my_allocator;
my_queue_representation->assign(*other.my_queue_representation, my_allocator, copy_construct_item);
}
my_queue_representation->assign(*other.my_queue_representation, my_allocator, copy_construct_item);
return *this;
}

concurrent_bounded_queue& operator=( concurrent_bounded_queue&& other ) {
//TODO: implement support for std::allocator_traits::propagate_on_container_move_assignment
if (my_queue_representation != other.my_queue_representation) {
clear();
if (my_queue_representation == other.my_queue_representation)
return *this;
clear();
if (queue_allocator_traits::propagate_on_container_move_assignment::value) {
my_allocator = std::move(other.my_allocator);
internal_swap(other);
} else {
if (my_allocator == other.my_allocator) {
internal_swap(other);
} else {
my_queue_representation->assign(*other.my_queue_representation, other.my_allocator, move_construct_item);
my_queue_representation->assign(*other.my_queue_representation, my_allocator, move_construct_item);
other.clear();
my_allocator = std::move(other.my_allocator);
}
}
return *this;
Expand All @@ -457,8 +467,12 @@ class concurrent_bounded_queue {
}

void swap ( concurrent_bounded_queue& other ) {
//TODO: implement support for std::allocator_traits::propagate_on_container_swap
__TBB_ASSERT(my_allocator == other.my_allocator, "unequal allocators");
if (queue_allocator_traits::propagate_on_container_swap::value) {
using std::swap;
swap(my_allocator, other.my_allocator);
} else {
__TBB_ASSERT(my_allocator == other.my_allocator, "unequal allocators");
}
internal_swap(other);
}

Expand Down Expand Up @@ -641,14 +655,12 @@ class concurrent_bounded_queue {
r1::abort_bounded_queue_monitors(my_monitors);
}

static void copy_construct_item(T* location, const void* src) {
// TODO: use allocator_traits for copy construction
new (location) value_type(*static_cast<const value_type*>(src));
static void copy_construct_item(queue_allocator_type& ator, T* location, const void* src) {
queue_allocator_traits::construct(ator, location, *static_cast<const T*>(src));
}

static void move_construct_item(T* location, const void* src) {
// TODO: use allocator_traits for move construction
new (location) value_type(std::move(*static_cast<value_type*>(const_cast<void*>(src))));
static void move_construct_item(queue_allocator_type& ator, T* location, const void* src) {
queue_allocator_traits::construct(ator, location, std::move(*static_cast<value_type*>(const_cast<void*>(src))));
}

template <typename Container, typename Value, typename A>
Expand Down
8 changes: 4 additions & 4 deletions include/oneapi/tbb/detail/_concurrent_queue_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class micro_queue {
using page_allocator_traits = tbb::detail::allocator_traits<page_allocator_type>;

public:
using item_constructor_type = void (*)(value_type* location, const void* src);
using item_constructor_type = void (*)(queue_allocator_type&, value_type* location, const void* src);
micro_queue() = default;
micro_queue( const micro_queue& ) = delete;
micro_queue& operator=( const micro_queue& ) = delete;
Expand Down Expand Up @@ -254,7 +254,7 @@ class micro_queue {
new_page->mask.store(src_page->mask.load(std::memory_order_relaxed), std::memory_order_relaxed);
for (; begin_in_page!=end_in_page; ++begin_in_page, ++g_index) {
if (new_page->mask.load(std::memory_order_relaxed) & uintptr_t(1) << begin_in_page) {
copy_item(*new_page, begin_in_page, *src_page, begin_in_page, construct_item);
copy_item(allocator, *new_page, begin_in_page, *src_page, begin_in_page, construct_item);
}
}
return new_page;
Expand Down Expand Up @@ -324,11 +324,11 @@ class micro_queue {
~destroyer() {my_value.~T();}
}; // class destroyer

void copy_item( padded_page& dst, size_type dindex, const padded_page& src, size_type sindex,
void copy_item( queue_allocator_type& allocator, padded_page& dst, size_type dindex, const padded_page& src, size_type sindex,
item_constructor_type construct_item )
{
auto& src_item = src[sindex];
construct_item( &dst[dindex], static_cast<const void*>(&src_item) );
construct_item( allocator, &dst[dindex], static_cast<const void*>(&src_item) );
}

void assign_and_destroy_item( void* dst, padded_page& src, size_type index ) {
Expand Down

0 comments on commit 18188f0

Please sign in to comment.