Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

again cache aware queue ¯\_(ツ)_/¯ #100

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
423 changes: 423 additions & 0 deletions cds/container/ca_segmented_queue.h

Large diffs are not rendered by default.

749 changes: 749 additions & 0 deletions cds/intrusive/ca_segmented_queue.h

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions test/stress/queue/intrusive_push_pop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,16 @@ namespace {
//CDSSTRESS_QUEUE_F( SegmentedQueue_DHP_mutex_padding )
CDSSTRESS_QUEUE_F( SegmentedQueue_DHP_mutex_stat )

CDSSTRESS_QUEUE_F( CASegmentedQueue_HP_spin )
CDSSTRESS_QUEUE_F( CASegmentedQueue_HP_spin_stat )
CDSSTRESS_QUEUE_F( CASegmentedQueue_HP_mutex )
CDSSTRESS_QUEUE_F( CASegmentedQueue_HP_mutex_stat )

CDSSTRESS_QUEUE_F( CASegmentedQueue_DHP_spin )
CDSSTRESS_QUEUE_F( CASegmentedQueue_DHP_spin_stat )
CDSSTRESS_QUEUE_F( CASegmentedQueue_DHP_mutex )
CDSSTRESS_QUEUE_F( CASegmentedQueue_DHP_mutex_stat )


#ifdef CDSTEST_GTEST_INSTANTIATE_TEST_CASE_P_HAS_4TH_ARG
static std::string get_test_parameter_name( testing::TestParamInfo<size_t> const& p )
Expand Down
45 changes: 45 additions & 0 deletions test/stress/queue/intrusive_queue_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <cds/intrusive/basket_queue.h>
#include <cds/intrusive/fcqueue.h>
#include <cds/intrusive/segmented_queue.h>
#include <cds/intrusive/ca_segmented_queue.h>

#include <cds/gc/hp.h>
#include <cds/gc/dhp.h>
Expand Down Expand Up @@ -469,6 +470,50 @@ namespace queue {
typedef cds::intrusive::SegmentedQueue< cds::gc::DHP, T, traits_SegmentedQueue_mutex_padding > SegmentedQueue_DHP_mutex_padding;
typedef cds::intrusive::SegmentedQueue< cds::gc::DHP, T, traits_SegmentedQueue_mutex_stat > SegmentedQueue_DHP_mutex_stat;

// CASegmentedQueue
class traits_CASegmentedQueue_spin_stat:
public cds::intrusive::ca_segmented_queue::make_traits<
cds::opt::stat< cds::intrusive::ca_segmented_queue::stat<> >
>::type
{};
class traits_CASegmentedQueue_spin_padding :
public cds::intrusive::ca_segmented_queue::make_traits<
cds::opt::padding< cds::opt::cache_line_padding >
>::type
{};
class traits_CASegmentedQueue_mutex_stat :
public cds::intrusive::ca_segmented_queue::make_traits<
cds::opt::stat< cds::intrusive::ca_segmented_queue::stat<> >
,cds::opt::lock_type< std::mutex >
>::type
{};
class traits_CASegmentedQueue_mutex:
public cds::intrusive::ca_segmented_queue::make_traits<
cds::opt::lock_type< std::mutex >
>::type
{};
class traits_CASegmentedQueue_mutex_padding:
public cds::intrusive::ca_segmented_queue::make_traits<
cds::opt::lock_type< std::mutex >
,cds::opt::padding< cds::opt::cache_line_padding >
>::type
{};

typedef cds::intrusive::CASegmentedQueue< cds::gc::HP, T > CASegmentedQueue_HP_spin;
typedef cds::intrusive::CASegmentedQueue< cds::gc::HP, T, traits_CASegmentedQueue_spin_padding > CASegmentedQueue_HP_spin_padding;
typedef cds::intrusive::CASegmentedQueue< cds::gc::HP, T, traits_CASegmentedQueue_spin_stat > CASegmentedQueue_HP_spin_stat;
typedef cds::intrusive::CASegmentedQueue< cds::gc::HP, T, traits_CASegmentedQueue_mutex > CASegmentedQueue_HP_mutex;
typedef cds::intrusive::CASegmentedQueue< cds::gc::HP, T, traits_CASegmentedQueue_mutex_padding > CASegmentedQueue_HP_mutex_padding;
typedef cds::intrusive::CASegmentedQueue< cds::gc::HP, T, traits_CASegmentedQueue_mutex_stat > CASegmentedQueue_HP_mutex_stat;

typedef cds::intrusive::CASegmentedQueue< cds::gc::DHP, T > CASegmentedQueue_DHP_spin;
typedef cds::intrusive::CASegmentedQueue< cds::gc::DHP, T, traits_CASegmentedQueue_spin_padding > CASegmentedQueue_DHP_spin_padding;
typedef cds::intrusive::CASegmentedQueue< cds::gc::DHP, T, traits_CASegmentedQueue_spin_stat > CASegmentedQueue_DHP_spin_stat;
typedef cds::intrusive::CASegmentedQueue< cds::gc::DHP, T, traits_CASegmentedQueue_mutex > CASegmentedQueue_DHP_mutex;
typedef cds::intrusive::CASegmentedQueue< cds::gc::DHP, T, traits_CASegmentedQueue_mutex_padding > CASegmentedQueue_DHP_mutex_padding;
typedef cds::intrusive::CASegmentedQueue< cds::gc::DHP, T, traits_CASegmentedQueue_mutex_stat > CASegmentedQueue_DHP_mutex_stat;


// Boost SList
typedef details::BoostSList< T, std::mutex > BoostSList_mutex;
typedef details::BoostSList< T, cds::sync::spin > BoostSList_spin;
Expand Down
1 change: 1 addition & 0 deletions test/stress/queue/pop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ namespace {
}

CDSSTRESS_SegmentedQueue( segmented_queue_pop )
CDSSTRESS_CASegmentedQueue( segmented_queue_pop )

#ifdef CDSTEST_GTEST_INSTANTIATE_TEST_CASE_P_HAS_4TH_ARG
static std::string get_test_parameter_name( testing::TestParamInfo<size_t> const& p )
Expand Down
20 changes: 20 additions & 0 deletions test/stress/queue/print_stat.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,26 @@ namespace cds_test {
return o;
}

static inline property_stream& operator <<( property_stream& o, cds::intrusive::ca_segmented_queue::stat<> const& s )
{
return o
<< CDSSTRESS_STAT_OUT( s, m_nPush )
<< CDSSTRESS_STAT_OUT( s, m_nPushPopulated )
<< CDSSTRESS_STAT_OUT( s, m_nPushContended )
<< CDSSTRESS_STAT_OUT( s, m_nPop )
<< CDSSTRESS_STAT_OUT( s, m_nPopEmpty )
<< CDSSTRESS_STAT_OUT( s, m_nPopContended )
<< CDSSTRESS_STAT_OUT( s, m_nCreateSegmentReq )
<< CDSSTRESS_STAT_OUT( s, m_nDeleteSegmentReq )
<< CDSSTRESS_STAT_OUT( s, m_nSegmentCreated )
<< CDSSTRESS_STAT_OUT( s, m_nSegmentDeleted );
}

static inline property_stream& operator <<( property_stream& o, cds::intrusive::ca_segmented_queue::empty_stat const& /*s*/ )
{
return o;
}

} // namespace cds_test

#endif // CDSSTRESS_QUEUE_PRINT_STAT_H
1 change: 1 addition & 0 deletions test/stress/queue/push.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ namespace {
}

CDSSTRESS_SegmentedQueue( segmented_queue_push )
CDSSTRESS_CASegmentedQueue( segmented_queue_push )

#ifdef CDSTEST_GTEST_INSTANTIATE_TEST_CASE_P_HAS_4TH_ARG
static std::string get_test_parameter_name( testing::TestParamInfo<size_t> const& p )
Expand Down
1 change: 1 addition & 0 deletions test/stress/queue/push_pop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ namespace {
}

CDSSTRESS_SegmentedQueue( segmented_queue_push_pop )
CDSSTRESS_CASegmentedQueue( segmented_queue_push_pop )

#ifdef CDSTEST_GTEST_INSTANTIATE_TEST_CASE_P_HAS_4TH_ARG
static std::string get_test_parameter_name( testing::TestParamInfo<size_t> const& p )
Expand Down
59 changes: 59 additions & 0 deletions test/stress/queue/queue_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
#include <cds/container/fcdeque.h>
#include <cds/container/segmented_queue.h>
#include <cds/container/weak_ringbuffer.h>
#include <cds/container/ca_segmented_queue.h>


#include <cds/gc/hp.h>
#include <cds/gc/dhp.h>
Expand Down Expand Up @@ -582,6 +584,50 @@ namespace fc_details{
typedef cds::container::SegmentedQueue< cds::gc::DHP, Value, traits_SegmentedQueue_mutex > SegmentedQueue_DHP_mutex;
typedef cds::container::SegmentedQueue< cds::gc::DHP, Value, traits_SegmentedQueue_mutex_padding > SegmentedQueue_DHP_mutex_padding;
typedef cds::container::SegmentedQueue< cds::gc::DHP, Value, traits_SegmentedQueue_mutex_stat > SegmentedQueue_DHP_mutex_stat;

// CASegmentedQueue
class traits_CASegmentedQueue_spin_stat:
public cds::container::ca_segmented_queue::make_traits<
cds::opt::stat< cds::intrusive::ca_segmented_queue::stat<> >
>::type
{};
class traits_CASegmentedQueue_spin_padding:
public cds::container::ca_segmented_queue::make_traits<
cds::opt::padding< cds::opt::cache_line_padding >
>::type
{};
class traits_CASegmentedQueue_mutex_stat:
public cds::container::ca_segmented_queue::make_traits<
cds::opt::stat< cds::intrusive::ca_segmented_queue::stat<> >
,cds::opt::lock_type< std::mutex >
>::type
{};
class traits_CASegmentedQueue_mutex:
public cds::container::ca_segmented_queue::make_traits<
cds::opt::lock_type< std::mutex >
>::type
{};
class traits_CASegmentedQueue_mutex_padding:
public cds::container::ca_segmented_queue::make_traits<
cds::opt::lock_type< std::mutex >
, cds::opt::padding< cds::opt::cache_line_padding >
>::type
{};

typedef cds::container::CASegmentedQueue< cds::gc::HP, Value > CASegmentedQueue_HP_spin;
typedef cds::container::CASegmentedQueue< cds::gc::HP, Value, traits_CASegmentedQueue_spin_padding > CASegmentedQueue_HP_spin_padding;
typedef cds::container::CASegmentedQueue< cds::gc::HP, Value, traits_CASegmentedQueue_spin_stat > CASegmentedQueue_HP_spin_stat;
typedef cds::container::CASegmentedQueue< cds::gc::HP, Value, traits_CASegmentedQueue_mutex > CASegmentedQueue_HP_mutex;
typedef cds::container::CASegmentedQueue< cds::gc::HP, Value, traits_CASegmentedQueue_mutex_padding > CASegmentedQueue_HP_mutex_padding;
typedef cds::container::CASegmentedQueue< cds::gc::HP, Value, traits_CASegmentedQueue_mutex_stat > CASegmentedQueue_HP_mutex_stat;

typedef cds::container::CASegmentedQueue< cds::gc::DHP, Value > CASegmentedQueue_DHP_spin;
typedef cds::container::CASegmentedQueue< cds::gc::DHP, Value, traits_CASegmentedQueue_spin_padding > CASegmentedQueue_DHP_spin_padding;
typedef cds::container::CASegmentedQueue< cds::gc::DHP, Value, traits_CASegmentedQueue_spin_stat > CASegmentedQueue_DHP_spin_stat;
typedef cds::container::CASegmentedQueue< cds::gc::DHP, Value, traits_CASegmentedQueue_mutex > CASegmentedQueue_DHP_mutex;
typedef cds::container::CASegmentedQueue< cds::gc::DHP, Value, traits_CASegmentedQueue_mutex_padding > CASegmentedQueue_DHP_mutex_padding;
typedef cds::container::CASegmentedQueue< cds::gc::DHP, Value, traits_CASegmentedQueue_mutex_stat > CASegmentedQueue_DHP_mutex_stat;

};

template <typename Value>
Expand Down Expand Up @@ -786,6 +832,7 @@ namespace cds_test {
# define CDSSTRESS_FCDeque_HeavyValue_1( test_fixture )
# define CDSSTRESS_RWQueue_1( test_fixture )
# define CDSSTRESS_SegmentedQueue_1( test_fixture )
# define CDSSTRESS_CASegmentedQueue_1( test_fixture )
# define CDSSTRESS_StdQueue_1( test_fixture )
#endif

Expand Down Expand Up @@ -873,6 +920,18 @@ namespace cds_test {
CDSSTRESS_Queue_F( test_fixture, SegmentedQueue_DHP_mutex_stat ) \
CDSSTRESS_SegmentedQueue_1( test_fixture )

#define CDSSTRESS_CASegmentedQueue( test_fixture ) \
CDSSTRESS_Queue_F( test_fixture, CASegmentedQueue_HP_spin ) \
CDSSTRESS_Queue_F( test_fixture, CASegmentedQueue_HP_spin_padding ) \
CDSSTRESS_Queue_F( test_fixture, CASegmentedQueue_HP_spin_stat ) \
CDSSTRESS_Queue_F( test_fixture, CASegmentedQueue_HP_mutex ) \
CDSSTRESS_Queue_F( test_fixture, CASegmentedQueue_HP_mutex_stat ) \
CDSSTRESS_Queue_F( test_fixture, CASegmentedQueue_DHP_spin ) \
CDSSTRESS_Queue_F( test_fixture, CASegmentedQueue_DHP_spin_stat ) \
CDSSTRESS_Queue_F( test_fixture, CASegmentedQueue_DHP_mutex ) \
CDSSTRESS_Queue_F( test_fixture, CASegmentedQueue_DHP_mutex_stat ) \
CDSSTRESS_CASegmentedQueue_1( test_fixture )

#define CDSSTRESS_VyukovQueue( test_fixture ) \
CDSSTRESS_Queue_F( test_fixture, VyukovMPMCCycleQueue_dyn ) \
CDSSTRESS_Queue_F( test_fixture, VyukovMPMCCycleQueue_dyn_ic )
Expand Down
1 change: 1 addition & 0 deletions test/stress/queue/random.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ namespace {
}

CDSSTRESS_SegmentedQueue( segmented_queue_random )
CDSSTRESS_CASegmentedQueue( segmented_queue_random )

#ifdef CDSTEST_GTEST_INSTANTIATE_TEST_CASE_P_HAS_4TH_ARG
static std::string get_test_parameter_name( testing::TestParamInfo<size_t> const& p )
Expand Down
7 changes: 6 additions & 1 deletion test/unit/queue/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ set(CDSGTEST_QUEUE_SOURCES
intrusive_segmented_queue_hp.cpp
intrusive_segmented_queue_dhp.cpp
intrusive_vyukov_queue.cpp
intrusive_ca_segmented_queue_hp.cpp
intrusive_ca_segmented_queue_dhp.cpp
ca_segmented_queue_hp.cpp
ca_segmented_queue_dhp.cpp
)

include_directories(
Expand All @@ -37,4 +41,5 @@ include_directories(
add_executable(${PACKAGE_NAME} ${CDSGTEST_QUEUE_SOURCES})
target_link_libraries(${PACKAGE_NAME} ${CDS_TEST_LIBRARIES})

add_test(NAME ${PACKAGE_NAME} COMMAND ${PACKAGE_NAME} WORKING_DIRECTORY ${EXECUTABLE_OUTPUT_PATH})
add_test(NAME ${PACKAGE_NAME} COMMAND ${PACKAGE_NAME} WORKING_DIRECTORY ${EXECUTABLE_OUTPUT_PATH})

120 changes: 120 additions & 0 deletions test/unit/queue/ca_segmented_queue_dhp.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
This file is a part of libcds - Concurrent Data Structures library

(C) Copyright Maxim Khizhinsky ([email protected]) 2006-2017

Source code repo: http://github.com/khizmax/libcds/
Download: http://sourceforge.net/projects/libcds/files/

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:

* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.

* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

#include "test_segmented_queue.h"

#include <cds/gc/dhp.h>
#include <cds/container/ca_segmented_queue.h>

namespace {
namespace cc = cds::container;
typedef cds::gc::DHP gc_type;


class CASegmentedQueue_DHP : public cds_test::segmented_queue
{
protected:
static const size_t c_QuasiFactor = 15;
void SetUp()
{
typedef cc::CASegmentedQueue< gc_type, int > queue_type;

cds::gc::dhp::smr::construct( queue_type::c_nHazardPtrCount );
cds::threading::Manager::attachThread();
}

void TearDown()
{
cds::threading::Manager::detachThread();
cds::gc::dhp::smr::destruct();
}
};

TEST_F( CASegmentedQueue_DHP, defaulted )
{
typedef cds::container::CASegmentedQueue< gc_type, int > test_queue;

test_queue q( c_QuasiFactor );
ASSERT_EQ( q.quasi_factor(), cds::beans::ceil2( c_QuasiFactor ));
test(q);
}

TEST_F( CASegmentedQueue_DHP, mutex )
{
struct traits : public cds::container::ca_segmented_queue::traits
{
typedef cds::atomicity::item_counter item_counter;
};
typedef cds::container::CASegmentedQueue< gc_type, int, traits > test_queue;

test_queue q( c_QuasiFactor );
ASSERT_EQ( q.quasi_factor(), cds::beans::ceil2( c_QuasiFactor ));
test( q );
}

TEST_F( CASegmentedQueue_DHP, stat )
{
struct traits : public
cds::container::ca_segmented_queue::make_traits <
cds::opt::item_counter< cds::atomicity::item_counter >
, cds::opt::stat < cds::container::ca_segmented_queue::stat<> >
> ::type
{};
typedef cds::container::CASegmentedQueue< gc_type, int, traits > test_queue;

test_queue q( c_QuasiFactor );
ASSERT_EQ( q.quasi_factor(), cds::beans::ceil2( c_QuasiFactor ));
test( q );
}

TEST_F( CASegmentedQueue_DHP, move )
{
typedef cds::container::CASegmentedQueue< gc_type, std::string > test_queue;

test_queue q( c_QuasiFactor );
ASSERT_EQ( q.quasi_factor(), cds::beans::ceil2( c_QuasiFactor ));
test_string( q );
}

TEST_F( CASegmentedQueue_DHP, move_item_counting )
{
struct traits : public cds::container::ca_segmented_queue::traits
{
typedef cds::atomicity::item_counter item_counter;
};
typedef cds::container::CASegmentedQueue< gc_type, std::string, traits > test_queue;

test_queue q( c_QuasiFactor );
ASSERT_EQ( q.quasi_factor(), cds::beans::ceil2( c_QuasiFactor ));
test_string( q );
}

} // namespace

Loading