Skip to content

Commit

Permalink
[oneDPL] Fix an error in the oneDPL __future implementation (#1747)
Browse files Browse the repository at this point in the history
  • Loading branch information
SergeyKopienko authored Aug 14, 2024
1 parent 7597d5c commit 120fce5
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 42 deletions.
7 changes: 7 additions & 0 deletions documentation/library_guide/macros.rst
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,13 @@ Macro Description
---------------------------------- ------------------------------
``ONEDPL_ALLOW_DEFERRED_WAITING`` This macro allows waiting for completion of certain algorithms executed with
device policies to be deferred. (Disabled by default.)

When the macro evaluates to non-zero, a call to a oneDPL algorithm with
a device policy might return before the computation completes on the device.

.. Warning:: Before accessing data produced or modified by the call, waiting
for completion of all tasks in the corresponding SYCL queue is required;
otherwise, the program behavior is undefined.
---------------------------------- ------------------------------
``ONEDPL_FPGA_DEVICE`` Use this macro to build your code containing |onedpl_short| parallel
algorithms for FPGA devices. (Disabled by default.)
Expand Down
6 changes: 1 addition & 5 deletions include/oneapi/dpl/internal/async_impl/async_impl_hetero.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ __pattern_walk1_async(__hetero_tag<_BackendTag>, _ExecutionPolicy&& __exec, _For
return __future_obj;
}

template <typename _IsSync = ::std::false_type,
__par_backend_hetero::access_mode __acc_mode1 = __par_backend_hetero::access_mode::read,
template <__par_backend_hetero::access_mode __acc_mode1 = __par_backend_hetero::access_mode::read,
__par_backend_hetero::access_mode __acc_mode2 = __par_backend_hetero::access_mode::write,
typename _BackendTag, typename _ExecutionPolicy, typename _ForwardIterator1, typename _ForwardIterator2,
typename _Function>
Expand All @@ -70,9 +69,6 @@ __pattern_walk2_async(__hetero_tag<_BackendTag>, _ExecutionPolicy&& __exec, _For
_BackendTag{}, ::std::forward<_ExecutionPolicy>(__exec),
unseq_backend::walk_n<_ExecutionPolicy, _Function>{__f}, __n, __buf1.all_view(), __buf2.all_view());

if constexpr (_IsSync::value)
__future.wait();

return __future.__make_future(__first2 + __n);
}

Expand Down
6 changes: 3 additions & 3 deletions include/oneapi/dpl/internal/binary_search_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ lower_bound_impl(__internal::__hetero_tag<_BackendTag>, Policy&& policy, InputIt
_BackendTag{}, ::std::forward<decltype(policy)>(policy),
custom_brick<StrictWeakOrdering, decltype(size), search_algorithm::lower_bound>{comp, size, use_32bit_indexing},
value_size, zip_vw)
.wait();
.__deferrable_wait();
return result + value_size;
}

Expand Down Expand Up @@ -183,7 +183,7 @@ upper_bound_impl(__internal::__hetero_tag<_BackendTag>, Policy&& policy, InputIt
_BackendTag{}, std::forward<decltype(policy)>(policy),
custom_brick<StrictWeakOrdering, decltype(size), search_algorithm::upper_bound>{comp, size, use_32bit_indexing},
value_size, zip_vw)
.wait();
.__deferrable_wait();
return result + value_size;
}

Expand Down Expand Up @@ -215,7 +215,7 @@ binary_search_impl(__internal::__hetero_tag<_BackendTag>, Policy&& policy, Input
custom_brick<StrictWeakOrdering, decltype(size), search_algorithm::binary_search>{
comp, size, use_32bit_indexing},
value_size, zip_vw)
.wait();
.__deferrable_wait();
return result + value_size;
}

Expand Down
53 changes: 28 additions & 25 deletions include/oneapi/dpl/pstl/hetero/algorithm_impl_hetero.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ __pattern_walk1(__hetero_tag<_BackendTag>, _ExecutionPolicy&& __exec, _ForwardIt

oneapi::dpl::__par_backend_hetero::__parallel_for(
_BackendTag{}, __exec, unseq_backend::walk_n<_ExecutionPolicy, _Function>{__f}, __n, __buf.all_view())
.wait();
.__deferrable_wait();
}

//------------------------------------------------------------------------
Expand All @@ -75,10 +75,10 @@ __pattern_walk1_n(__hetero_tag<_BackendTag> __tag, _ExecutionPolicy&& __exec, _F
// walk2
//------------------------------------------------------------------------

// TODO: A tag _IsSync is used for provide a patterns call pipeline, where the last one should be synchronous
// Probably it should be re-designed by a pipeline approach, when a pattern returns some sync obejects
// TODO: A tag _WaitMode is used for provide a patterns call pipeline, where the last one should be synchronous
// Probably it should be re-designed by a pipeline approach, when a pattern returns some sync objects
// and ones are combined into a "pipeline" (probably like Range pipeline)
template <typename _IsSync = ::std::true_type,
template <typename _WaitMode = __par_backend_hetero::__deferrable_mode,
__par_backend_hetero::access_mode __acc_mode1 = __par_backend_hetero::access_mode::read,
__par_backend_hetero::access_mode __acc_mode2 = __par_backend_hetero::access_mode::write,
typename _BackendTag, typename _ExecutionPolicy, typename _ForwardIterator1, typename _ForwardIterator2,
Expand All @@ -97,12 +97,12 @@ __pattern_walk2(__hetero_tag<_BackendTag>, _ExecutionPolicy&& __exec, _ForwardIt
auto __keep2 = oneapi::dpl::__ranges::__get_sycl_range<__acc_mode2, _ForwardIterator2>();
auto __buf2 = __keep2(__first2, __first2 + __n);

auto __future_obj = oneapi::dpl::__par_backend_hetero::__parallel_for(
auto __future = oneapi::dpl::__par_backend_hetero::__parallel_for(
_BackendTag{}, ::std::forward<_ExecutionPolicy>(__exec),
unseq_backend::walk_n<_ExecutionPolicy, _Function>{__f}, __n, __buf1.all_view(), __buf2.all_view());

if constexpr (_IsSync())
__future_obj.wait();
// Call no wait, wait or deferrable wait depending on _WaitMode
__future.wait(_WaitMode{});

return __first2 + __n;
}
Expand All @@ -126,7 +126,8 @@ _ForwardIterator2
__pattern_swap(__hetero_tag<_BackendTag> __tag, _ExecutionPolicy&& __exec, _ForwardIterator1 __first1,
_ForwardIterator1 __last1, _ForwardIterator2 __first2, _Function __f)
{
return __pattern_walk2</*_IsSync=*/::std::true_type, __par_backend_hetero::access_mode::read_write,
return __pattern_walk2</*_WaitMode*/ __par_backend_hetero::__deferrable_mode,
__par_backend_hetero::access_mode::read_write,
__par_backend_hetero::access_mode::read_write>(
__tag, ::std::forward<_ExecutionPolicy>(__exec), __first1, __last1, __first2, __f);
}
Expand Down Expand Up @@ -158,7 +159,7 @@ __pattern_walk3(__hetero_tag<_BackendTag>, _ExecutionPolicy&& __exec, _ForwardIt
oneapi::dpl::__par_backend_hetero::__parallel_for(_BackendTag{}, ::std::forward<_ExecutionPolicy>(__exec),
unseq_backend::walk_n<_ExecutionPolicy, _Function>{__f}, __n,
__buf1.all_view(), __buf2.all_view(), __buf3.all_view())
.wait();
.__deferrable_wait();

return __first3 + __n;
}
Expand Down Expand Up @@ -249,8 +250,8 @@ __pattern_walk2_transform_if(__hetero_tag<_BackendTag> __tag, _ExecutionPolicy&&
{
// Require `read_write` access mode for output sequence to force a copy in for host iterators to capture incoming
// values of the output sequence for elements where the predicate is false.
return __pattern_walk2</*_IsSync=*/::std::true_type, __par_backend_hetero::access_mode::read,
__par_backend_hetero::access_mode::read_write>(
return __pattern_walk2</*_WaitMode*/ __par_backend_hetero::__deferrable_mode,
__par_backend_hetero::access_mode::read, __par_backend_hetero::access_mode::read_write>(
__tag,
__par_backend_hetero::make_wrapped_policy<__walk2_transform_if_wrapper>(
::std::forward<_ExecutionPolicy>(__exec)),
Expand Down Expand Up @@ -1038,7 +1039,8 @@ __pattern_unique(__hetero_tag<_BackendTag> __tag, _ExecutionPolicy&& __exec, _It

// The temporary buffer is constructed from a range, therefore it's destructor will not block, therefore
// we must call __pattern_walk2 in a way which provides blocking synchronization for this pattern.
return __pattern_walk2</*_IsSync=*/std::true_type, __par_backend_hetero::access_mode::read_write,
return __pattern_walk2</*_WaitMode*/ __par_backend_hetero::__deferrable_mode,
__par_backend_hetero::access_mode::read_write,
__par_backend_hetero::access_mode::read_write>(
__tag, __par_backend_hetero::make_wrapped_policy<copy_back_wrapper>(::std::forward<_ExecutionPolicy>(__exec)),
__copy_first, __copy_last, __first, __brick_copy<__hetero_tag<_BackendTag>, _ExecutionPolicy>{});
Expand Down Expand Up @@ -1185,7 +1187,7 @@ __pattern_merge(__hetero_tag<_BackendTag> __tag, _ExecutionPolicy&& __exec, _Ite

__par_backend_hetero::__parallel_merge(_BackendTag{}, ::std::forward<_ExecutionPolicy>(__exec),
__buf1.all_view(), __buf2.all_view(), __buf3.all_view(), __comp)
.wait();
.__deferrable_wait();
}
return __d_first + __n;
}
Expand Down Expand Up @@ -1243,7 +1245,7 @@ __stable_sort_with_projection(__hetero_tag<_BackendTag>, _ExecutionPolicy&& __ex

__par_backend_hetero::__parallel_stable_sort(_BackendTag{}, ::std::forward<_ExecutionPolicy>(__exec),
__buf.all_view(), __comp, __proj)
.wait();
.__deferrable_wait();
}

template <typename _BackendTag, typename _ExecutionPolicy, typename _Iterator, typename _Compare>
Expand Down Expand Up @@ -1429,7 +1431,7 @@ __pattern_partial_sort(__hetero_tag<_BackendTag>, _ExecutionPolicy&& __exec, _It
__par_backend_hetero::make_iter_mode<__par_backend_hetero::access_mode::read_write>(__first),
__par_backend_hetero::make_iter_mode<__par_backend_hetero::access_mode::read_write>(__mid),
__par_backend_hetero::make_iter_mode<__par_backend_hetero::access_mode::read_write>(__last), __comp)
.wait();
.__deferrable_wait();
}

//------------------------------------------------------------------------
Expand Down Expand Up @@ -1482,9 +1484,9 @@ __pattern_partial_sort_copy(__hetero_tag<_BackendTag> __tag, _ExecutionPolicy&&
{
// If our output buffer is larger than the input buffer, simply copy elements to the output and use
// full sort on them.
auto __out_end =
__pattern_walk2(__tag, __par_backend_hetero::make_wrapped_policy<__initial_copy_1>(__exec), __first, __last,
__out_first, __brick_copy<__hetero_tag<_BackendTag>, _ExecutionPolicy>{});
auto __out_end = __pattern_walk2<__par_backend_hetero::__sync_mode>(
__tag, __par_backend_hetero::make_wrapped_policy<__initial_copy_1>(__exec), __first, __last, __out_first,
__brick_copy<__hetero_tag<_BackendTag>, _ExecutionPolicy>{});

// TODO: __pattern_walk2 is a blocking call here, so there is a synchronization between the patterns.
// But, when the input iterators are a kind of hetero iterator on top of sycl::buffer, SYCL
Expand All @@ -1510,7 +1512,7 @@ __pattern_partial_sort_copy(__hetero_tag<_BackendTag> __tag, _ExecutionPolicy&&

auto __buf_first = __buf.get();

auto __buf_last = __pattern_walk2</*_IsSync=*/::std::false_type>(
auto __buf_last = __pattern_walk2<__par_backend_hetero::__async_mode>(
__tag, __par_backend_hetero::make_wrapped_policy<__initial_copy_2>(__exec), __first, __last, __buf_first,
__brick_copy<__hetero_tag<_BackendTag>, _ExecutionPolicy>{});

Expand Down Expand Up @@ -1571,7 +1573,7 @@ __pattern_reverse(__hetero_tag<_BackendTag>, _ExecutionPolicy&& __exec, _Iterato
_BackendTag{}, ::std::forward<_ExecutionPolicy>(__exec),
unseq_backend::__reverse_functor<typename ::std::iterator_traits<_Iterator>::difference_type>{__n}, __n / 2,
__buf.all_view())
.wait();
.__deferrable_wait();
}

//------------------------------------------------------------------------
Expand All @@ -1597,7 +1599,7 @@ __pattern_reverse_copy(__hetero_tag<_BackendTag>, _ExecutionPolicy&& __exec, _Bi
_BackendTag{}, ::std::forward<_ExecutionPolicy>(__exec),
unseq_backend::__reverse_copy<typename ::std::iterator_traits<_BidirectionalIterator>::difference_type>{__n},
__n, __buf1.all_view(), __buf2.all_view())
.wait();
.__deferrable_wait();

return __result + __n;
}
Expand Down Expand Up @@ -1648,7 +1650,8 @@ __pattern_rotate(__hetero_tag<_BackendTag>, _ExecutionPolicy&& __exec, _Iterator
auto __temp_rng_rw =
oneapi::dpl::__ranges::all_view<_Tp, __par_backend_hetero::access_mode::read_write>(__temp_buf.get_buffer());
oneapi::dpl::__par_backend_hetero::__parallel_for(_BackendTag{}, ::std::forward<_ExecutionPolicy>(__exec), __brick,
__n, __temp_rng_rw, __buf.all_view()).wait();
__n, __temp_rng_rw, __buf.all_view())
.__deferrable_wait();

// The temporary buffer is constructed from a range, therefore it's destructor will not block, therefore
// we must call __parallel_for with wait() to provide the blocking synchronization for this pattern.
Expand Down Expand Up @@ -1683,7 +1686,7 @@ __pattern_rotate_copy(__hetero_tag<_BackendTag>, _ExecutionPolicy&& __exec, _Bid
unseq_backend::__rotate_copy<typename ::std::iterator_traits<_BidirectionalIterator>::difference_type>{__n,
__shift},
__n, __buf1.all_view(), __buf2.all_view())
.wait();
.__deferrable_wait();

return __result + __n;
}
Expand Down Expand Up @@ -1983,7 +1986,7 @@ __pattern_shift_left(__hetero_tag<_BackendTag>, _ExecutionPolicy&& __exec, _Rang

oneapi::dpl::__par_backend_hetero::__parallel_for(_BackendTag{}, ::std::forward<_ExecutionPolicy>(__exec),
__brick, __size_res, __src, __dst)
.wait();
.__deferrable_wait();
}
else //2. n < size/2; 'n' parallel copying
{
Expand All @@ -1993,7 +1996,7 @@ __pattern_shift_left(__hetero_tag<_BackendTag>, _ExecutionPolicy&& __exec, _Rang
oneapi::dpl::__par_backend_hetero::make_wrapped_policy<__shift_left_right>(
::std::forward<_ExecutionPolicy>(__exec)),
__brick, __n, __rng)
.wait();
.__deferrable_wait();
}

return __size_res;
Expand Down
8 changes: 4 additions & 4 deletions include/oneapi/dpl/pstl/hetero/algorithm_ranges_impl_hetero.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ __pattern_walk_n(__hetero_tag<_BackendTag>, _ExecutionPolicy&& __exec, _Function
oneapi::dpl::__par_backend_hetero::__parallel_for(_BackendTag{}, ::std::forward<_ExecutionPolicy>(__exec),
unseq_backend::walk_n<_ExecutionPolicy, _Function>{__f}, __n,
::std::forward<_Ranges>(__rngs)...)
.wait();
.__deferrable_wait();
}
}

Expand Down Expand Up @@ -516,7 +516,7 @@ __pattern_merge(__hetero_tag<_BackendTag> __tag, _ExecutionPolicy&& __exec, _Ran
__par_backend_hetero::__parallel_merge(_BackendTag{}, ::std::forward<_ExecutionPolicy>(__exec),
::std::forward<_Range1>(__rng1), ::std::forward<_Range2>(__rng2),
::std::forward<_Range3>(__rng3), __comp)
.wait();
.__deferrable_wait();
}

return __n;
Expand All @@ -533,7 +533,7 @@ __pattern_sort(__hetero_tag<_BackendTag>, _ExecutionPolicy&& __exec, _Range&& __
if (__rng.size() >= 2)
__par_backend_hetero::__parallel_stable_sort(_BackendTag{}, ::std::forward<_ExecutionPolicy>(__exec),
::std::forward<_Range>(__rng), __comp, __proj)
.wait();
.__deferrable_wait();
}

//------------------------------------------------------------------------
Expand Down Expand Up @@ -783,7 +783,7 @@ __pattern_reduce_by_segment(__hetero_tag<_BackendTag> __tag, _ExecutionPolicy&&
__result_end,
oneapi::dpl::__ranges::take_view_simple(experimental::ranges::views::all_read(__idx), __result_end),
experimental::ranges::views::all_read(__tmp_out_values), ::std::forward<_Range4>(__out_values))
.wait();
.__deferrable_wait();

return __result_end;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,20 @@ struct __result_and_scratch_storage
}
};

// Tag __async_mode describe a pattern call mode which should be executed asynchronously
struct __async_mode
{
};
// Tag __sync_mode describe a pattern call mode which should be executed synchronously
struct __sync_mode
{
};
// Tag __deferrable_mode describe a pattern call mode which should be executed
// synchronously/asynchronously : it's depends on ONEDPL_ALLOW_DEFERRED_WAITING macro state
struct __deferrable_mode
{
};

//A contract for future class: <sycl::event or other event, a value, sycl::buffers..., or __usm_host_or_buffer_storage>
//Impl details: inheritance (private) instead of aggregation for enabling the empty base optimization.
template <typename _Event, typename... _Args>
Expand Down Expand Up @@ -709,8 +723,23 @@ class __future : private std::tuple<_Args...>
void
wait()
{
#if !ONEDPL_ALLOW_DEFERRED_WAITING
__my_event.wait_and_throw();
}
template <typename _WaitModeTag>
void
wait(_WaitModeTag)
{
if constexpr (std::is_same_v<_WaitModeTag, __sync_mode>)
wait();
else if constexpr (std::is_same_v<_WaitModeTag, __deferrable_mode>)
__deferrable_wait();
}

void
__deferrable_wait()
{
#if !ONEDPL_ALLOW_DEFERRED_WAITING
wait();
#endif
}

Expand Down
2 changes: 1 addition & 1 deletion include/oneapi/dpl/pstl/hetero/histogram_impl_hetero.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ __pattern_histogram(__hetero_tag<_BackendTag>, _ExecutionPolicy&& __exec, _Rando

__parallel_histogram(_BackendTag{}, ::std::forward<_ExecutionPolicy>(__exec), __init_event,
__input_buf.all_view(), ::std::move(__bins), __binhash_manager)
.wait();
.__deferrable_wait();
}
else
{
Expand Down
4 changes: 2 additions & 2 deletions include/oneapi/dpl/pstl/hetero/numeric_impl_hetero.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ __pattern_transform_scan_base(__hetero_tag<_BackendTag> __tag, _ExecutionPolicy&
oneapi::dpl::__par_backend_hetero::__parallel_transform_scan(
_BackendTag{}, ::std::forward<_ExecutionPolicy>(__exec), __buf1.all_view(), __buf2.all_view(), __n,
__unary_op, __init, __binary_op, _Inclusive{})
.wait();
.__deferrable_wait();
}
else
{
Expand Down Expand Up @@ -267,7 +267,7 @@ __pattern_adjacent_difference(__hetero_tag<_BackendTag> __tag, _ExecutionPolicy&

oneapi::dpl::__par_backend_hetero::__parallel_for(_BackendTag{}, __exec, _Function{__fn}, __n,
__buf1.all_view(), __buf2.all_view())
.wait();
.__deferrable_wait();
}

return __d_last;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ __pattern_transform_scan_base(__hetero_tag<_BackendTag>, _ExecutionPolicy&& __ex
__binary_op, _NoOpFunctor{}, __no_assign_op, __assign_op, __get_data_op},
// global scan
unseq_backend::__global_scan_functor<_Inclusive, _BinaryOperation, _InitType>{__binary_op, __init})
.wait();
.__deferrable_wait();
return __rng1_size;
}

Expand Down

0 comments on commit 120fce5

Please sign in to comment.