diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index cd1c4c72b0b..afe607a8888 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -492,6 +492,7 @@ target_link_libraries(cugraph rmm::rmm raft::raft $ + cuda PRIVATE ${COMPILED_RAFT_LIB} cuco::cuco diff --git a/cpp/cmake/thirdparty/get_cccl.cmake b/cpp/cmake/thirdparty/get_cccl.cmake index 72b53d4c833..b5a03dbfbc8 100644 --- a/cpp/cmake/thirdparty/get_cccl.cmake +++ b/cpp/cmake/thirdparty/get_cccl.cmake @@ -15,6 +15,9 @@ # This function finds CCCL and sets any additional necessary environment variables. function(find_and_configure_cccl) include(${rapids-cmake-dir}/cpm/cccl.cmake) + include(${rapids-cmake-dir}/cpm/package_override.cmake) + rapids_cpm_package_override("${CMAKE_CURRENT_FUNCTION_LIST_DIR}/cccl_override.json") + set(CCCL_ENABLE_UNSTABLE ON) rapids_cpm_cccl(BUILD_EXPORT_SET cugraph-exports INSTALL_EXPORT_SET cugraph-exports) endfunction() diff --git a/cpp/src/prims/detail/per_v_transform_reduce_e.cuh b/cpp/src/prims/detail/per_v_transform_reduce_e.cuh index eab2dd7a316..b3ed2242b33 100644 --- a/cpp/src/prims/detail/per_v_transform_reduce_e.cuh +++ b/cpp/src/prims/detail/per_v_transform_reduce_e.cuh @@ -64,6 +64,10 @@ #include #include +#include +#include +using namespace cuda::experimental::stf; + namespace cugraph { namespace detail { @@ -1161,6 +1165,14 @@ void per_v_transform_reduce_e_edge_partition( std::optional> key_segment_offsets, std::optional> const& edge_partition_stream_pool_indices) { + async_resources_handle& cudastf_handle = *raft::resource::get_custom_resource(handle); + stream_ctx cudastf_ctx(handle.get_stream(), cudastf_handle); + token output_tokens[4]; + for (size_t i = 0; i < 4; i++) + { + output_tokens[i] = cudastf_ctx.token(); + } + constexpr bool use_input_key = !std::is_same_v; using vertex_t = typename GraphViewType::vertex_type; @@ -1184,10 +1196,12 @@ void per_v_transform_reduce_e_edge_partition( if constexpr (update_major && !use_input_key) { // this is necessary as we don't visit // every vertex in the hypersparse segment - thrust::fill(rmm::exec_policy_nosync(exec_stream), + cudastf_ctx.task(output_tokens[3].write())->*[=](cudaStream_t stream) { + thrust::fill(rmm::exec_policy_nosync(stream), output_buffer + (*key_segment_offsets)[3], output_buffer + (*key_segment_offsets)[4], major_init); + }; } auto segment_size = use_input_key @@ -1197,8 +1211,9 @@ void per_v_transform_reduce_e_edge_partition( raft::grid_1d_thread_t update_grid(segment_size, detail::per_v_transform_reduce_e_kernel_block_size, handle.get_device_properties().maxGridSize[0]); + size_t token_idx = 0; auto segment_output_buffer = output_buffer; - if constexpr (update_major) { segment_output_buffer += (*key_segment_offsets)[3]; } + if constexpr (update_major) { segment_output_buffer += (*key_segment_offsets)[3]; token_idx +=3; } auto segment_key_first = edge_partition_key_first; auto segment_key_last = edge_partition_key_last; if constexpr (use_input_key) { @@ -1209,8 +1224,9 @@ void per_v_transform_reduce_e_edge_partition( assert(segment_key_first == nullptr); assert(segment_key_last == nullptr); } + cudastf_ctx.task(output_tokens[token_idx].rw())->*[=](cudaStream_t stream) { detail::per_v_transform_reduce_e_hypersparse - <<>>( + <<>>( edge_partition, segment_key_first, segment_key_last, @@ -1223,6 +1239,7 @@ void per_v_transform_reduce_e_edge_partition( major_init, reduce_op, pred_op); + }; } } if ((*key_segment_offsets)[3] - (*key_segment_offsets)[2]) { @@ -1233,8 +1250,9 @@ void per_v_transform_reduce_e_edge_partition( raft::grid_1d_thread_t update_grid((*key_segment_offsets)[3] - (*key_segment_offsets)[2], detail::per_v_transform_reduce_e_kernel_block_size, handle.get_device_properties().maxGridSize[0]); + size_t token_idx = 0; auto segment_output_buffer = output_buffer; - if constexpr (update_major) { segment_output_buffer += (*key_segment_offsets)[2]; } + if constexpr (update_major) { segment_output_buffer += (*key_segment_offsets)[2]; token_idx += 2; } std::optional segment_key_first{}; // std::optional as thrust::transform_iterator's default constructor // is a deleted function, segment_key_first should always have a value @@ -1244,8 +1262,9 @@ void per_v_transform_reduce_e_edge_partition( segment_key_first = thrust::make_counting_iterator(edge_partition.major_range_first()); } *segment_key_first += (*key_segment_offsets)[2]; + cudastf_ctx.task(output_tokens[token_idx].rw())->*[=](cudaStream_t stream) { detail::per_v_transform_reduce_e_low_degree - <<>>( + <<>>( edge_partition, *segment_key_first, *segment_key_first + ((*key_segment_offsets)[3] - (*key_segment_offsets)[2]), @@ -1258,6 +1277,7 @@ void per_v_transform_reduce_e_edge_partition( major_init, reduce_op, pred_op); + }; } if ((*key_segment_offsets)[2] - (*key_segment_offsets)[1] > 0) { auto exec_stream = edge_partition_stream_pool_indices @@ -1267,8 +1287,10 @@ void per_v_transform_reduce_e_edge_partition( raft::grid_1d_warp_t update_grid((*key_segment_offsets)[2] - (*key_segment_offsets)[1], detail::per_v_transform_reduce_e_kernel_block_size, handle.get_device_properties().maxGridSize[0]); + size_t token_idx = 0; auto segment_output_buffer = output_buffer; - if constexpr (update_major) { segment_output_buffer += (*key_segment_offsets)[1]; } + + if constexpr (update_major) { segment_output_buffer += (*key_segment_offsets)[1]; token_idx += 1;} std::optional segment_key_first{}; // std::optional as thrust::transform_iterator's default constructor // is a deleted function, segment_key_first should always have a value @@ -1278,8 +1300,9 @@ void per_v_transform_reduce_e_edge_partition( segment_key_first = thrust::make_counting_iterator(edge_partition.major_range_first()); } *segment_key_first += (*key_segment_offsets)[1]; + cudastf_ctx.task(output_tokens[token_idx].rw())->*[=](cudaStream_t stream) { detail::per_v_transform_reduce_e_mid_degree - <<>>( + <<>>( edge_partition, *segment_key_first, *segment_key_first + ((*key_segment_offsets)[2] - (*key_segment_offsets)[1]), @@ -1293,6 +1316,7 @@ void per_v_transform_reduce_e_edge_partition( major_identity_element, reduce_op, pred_op); + }; } if ((*key_segment_offsets)[1] > 0) { auto exec_stream = edge_partition_stream_pool_indices @@ -1313,8 +1337,9 @@ void per_v_transform_reduce_e_edge_partition( } else { segment_key_first = thrust::make_counting_iterator(edge_partition.major_range_first()); } + cudastf_ctx.task(output_tokens[0].rw())->*[=](cudaStream_t stream) { detail::per_v_transform_reduce_e_high_degree - <<>>( + <<>>( edge_partition, *segment_key_first, *segment_key_first + (*key_segment_offsets)[1], @@ -1328,7 +1353,10 @@ void per_v_transform_reduce_e_edge_partition( major_identity_element, reduce_op, pred_op); + }; } + + cudastf_ctx.finalize(); } else { auto exec_stream = edge_partition_stream_pool_indices ? handle.get_stream_from_stream_pool( diff --git a/cpp/src/prims/detail/transform_v_frontier_e.cuh b/cpp/src/prims/detail/transform_v_frontier_e.cuh index 7dbe7e09bc0..ceb8951a6bd 100644 --- a/cpp/src/prims/detail/transform_v_frontier_e.cuh +++ b/cpp/src/prims/detail/transform_v_frontier_e.cuh @@ -27,6 +27,9 @@ #include #include +#include +#include + #include #include @@ -37,6 +40,8 @@ #include +using namespace cuda::experimental::stf; + namespace cugraph { namespace detail { @@ -415,6 +420,9 @@ auto transform_v_frontier_e(raft::handle_t const& handle, auto edge_mask_view = graph_view.edge_mask_view(); + async_resources_handle& cudastf_handle = *raft::resource::get_custom_resource(handle); + stream_ctx cudastf_ctx(handle.get_stream(), cudastf_handle); + // 1. update aggregate_local_frontier_local_degree_offsets auto aggregate_local_frontier_local_degree_offsets = @@ -508,9 +516,17 @@ auto transform_v_frontier_e(raft::handle_t const& handle, } auto edge_partition_e_value_input = edge_partition_e_input_device_view_t(edge_value_input, i); + + // CUDASTF logical data buffer for transform reduce phase + std::vector l_tv_buffers(5); + for (size_t segment_i = 0; segment_i < 5; segment_i++) { + l_tv_buffers[segment_i] = cudastf_ctx.token(); + } + auto segment_offsets = graph_view.local_edge_partition_segment_offsets(i); if (segment_offsets) { - auto [edge_partition_key_indices, edge_partition_v_frontier_partition_offsets] = + //auto [edge_partition_key_indices, edge_partition_v_frontier_partition_offsets] = + auto res_partition_v_frontier = partition_v_frontier( handle, edge_partition_frontier_major_first, @@ -520,6 +536,10 @@ auto transform_v_frontier_e(raft::handle_t const& handle, edge_partition.major_range_first() + (*segment_offsets)[2], edge_partition.major_range_first() + (*segment_offsets)[3]}); + // We cannot capture structured binding before C++20 so we create these variables manually + auto& edge_partition_key_indices = ::std::get<0>(res_partition_v_frontier); + auto& edge_partition_v_frontier_partition_offsets = ::std::get<1>(res_partition_v_frontier); + // FIXME: we may further improve performance by 1) concurrently running kernels on different // segments; 2) individually tuning block sizes for different segments; and 3) adding one // more segment for very high degree vertices and running segmented reduction @@ -529,8 +549,10 @@ auto transform_v_frontier_e(raft::handle_t const& handle, raft::grid_1d_block_t update_grid(high_size, detail::transform_v_frontier_e_kernel_block_size, handle.get_device_properties().maxGridSize[0]); + + cudastf_ctx.task(l_tv_buffers[0].write())->*[&](cudaStream_t stream) { detail::transform_v_frontier_e_high_degree - <<>>( + <<>>( edge_partition, edge_partition_frontier_key_first, edge_partition_key_indices.begin() + edge_partition_v_frontier_partition_offsets[0], @@ -542,6 +564,7 @@ auto transform_v_frontier_e(raft::handle_t const& handle, edge_partition_frontier_local_degree_offsets, e_op, get_dataframe_buffer_begin(aggregate_value_buffer)); + }; } auto mid_size = edge_partition_v_frontier_partition_offsets[2] - edge_partition_v_frontier_partition_offsets[1]; @@ -549,8 +572,9 @@ auto transform_v_frontier_e(raft::handle_t const& handle, raft::grid_1d_warp_t update_grid(mid_size, detail::transform_v_frontier_e_kernel_block_size, handle.get_device_properties().maxGridSize[0]); + cudastf_ctx.task(l_tv_buffers[1].write())->*[&](cudaStream_t stream) { detail::transform_v_frontier_e_mid_degree - <<>>( + <<>>( edge_partition, edge_partition_frontier_key_first, edge_partition_key_indices.begin() + edge_partition_v_frontier_partition_offsets[1], @@ -562,6 +586,7 @@ auto transform_v_frontier_e(raft::handle_t const& handle, edge_partition_frontier_local_degree_offsets, e_op, get_dataframe_buffer_begin(aggregate_value_buffer)); + }; } auto low_size = edge_partition_v_frontier_partition_offsets[3] - edge_partition_v_frontier_partition_offsets[2]; @@ -569,8 +594,9 @@ auto transform_v_frontier_e(raft::handle_t const& handle, raft::grid_1d_thread_t update_grid(low_size, detail::transform_v_frontier_e_kernel_block_size, handle.get_device_properties().maxGridSize[0]); + cudastf_ctx.task(l_tv_buffers[2].write())->*[&](cudaStream_t stream) { detail::transform_v_frontier_e_hypersparse_or_low_degree - <<>>( + <<>>( edge_partition, edge_partition_frontier_key_first, edge_partition_key_indices.begin() + edge_partition_v_frontier_partition_offsets[2], @@ -582,6 +608,7 @@ auto transform_v_frontier_e(raft::handle_t const& handle, edge_partition_frontier_local_degree_offsets, e_op, get_dataframe_buffer_begin(aggregate_value_buffer)); + }; } auto hypersparse_size = edge_partition_v_frontier_partition_offsets[4] - edge_partition_v_frontier_partition_offsets[3]; @@ -589,8 +616,9 @@ auto transform_v_frontier_e(raft::handle_t const& handle, raft::grid_1d_thread_t update_grid(hypersparse_size, detail::transform_v_frontier_e_kernel_block_size, handle.get_device_properties().maxGridSize[0]); + cudastf_ctx.task(l_tv_buffers[3].write())->*[&](cudaStream_t stream) { detail::transform_v_frontier_e_hypersparse_or_low_degree - <<>>( + <<>>( edge_partition, edge_partition_frontier_key_first, edge_partition_key_indices.begin() + edge_partition_v_frontier_partition_offsets[3], @@ -602,6 +630,7 @@ auto transform_v_frontier_e(raft::handle_t const& handle, edge_partition_frontier_local_degree_offsets, e_op, get_dataframe_buffer_begin(aggregate_value_buffer)); + }; } } else { raft::grid_1d_thread_t update_grid( @@ -609,8 +638,9 @@ auto transform_v_frontier_e(raft::handle_t const& handle, detail::transform_v_frontier_e_kernel_block_size, handle.get_device_properties().maxGridSize[0]); + cudastf_ctx.task(l_tv_buffers[4].write())->*[&,i](cudaStream_t stream) { detail::transform_v_frontier_e_hypersparse_or_low_degree - <<>>( + <<>>( edge_partition, edge_partition_frontier_key_first, thrust::make_counting_iterator(size_t{0}), @@ -622,9 +652,12 @@ auto transform_v_frontier_e(raft::handle_t const& handle, edge_partition_frontier_local_degree_offsets, e_op, get_dataframe_buffer_begin(aggregate_value_buffer)); + }; } } + cudastf_ctx.finalize(); + return std::make_tuple(std::move(aggregate_value_buffer), std::move(aggregate_local_frontier_local_degree_offsets)); } diff --git a/cpp/src/prims/transform_reduce_e.cuh b/cpp/src/prims/transform_reduce_e.cuh index 66de9e107c5..ab55ce62fca 100644 --- a/cpp/src/prims/transform_reduce_e.cuh +++ b/cpp/src/prims/transform_reduce_e.cuh @@ -47,6 +47,11 @@ #include #include + +#include +#include +using namespace cuda::experimental::stf; + namespace cugraph { namespace detail { @@ -473,6 +478,9 @@ T transform_reduce_e(raft::handle_t const& handle, // currently, nothing to do } + async_resources_handle& cudastf_handle = *raft::resource::get_custom_resource(handle); + stream_ctx cudastf_ctx(handle.get_stream(), cudastf_handle); + property_op edge_property_add{}; auto result_buffer = allocate_dataframe_buffer(1, handle.get_stream()); @@ -507,6 +515,11 @@ T transform_reduce_e(raft::handle_t const& handle, } auto edge_partition_e_value_input = edge_partition_e_input_device_view_t(edge_value_input, i); + // CUDASTF logical data buffer for transform_reduce phase + std::vector l_tr_buffers(5); + for (size_t segment_i = 0; segment_i < 5; segment_i++) { l_tr_buffers[segment_i] = cudastf_ctx.token(); + } + auto segment_offsets = graph_view.local_edge_partition_segment_offsets(i); if (segment_offsets) { // FIXME: we may further improve performance by 1) concurrently running kernels on different @@ -517,8 +530,9 @@ T transform_reduce_e(raft::handle_t const& handle, raft::grid_1d_block_t update_grid((*segment_offsets)[1], detail::transform_reduce_e_kernel_block_size, handle.get_device_properties().maxGridSize[0]); + cudastf_ctx.task(l_tr_buffers[0].write())->*[&](cudaStream_t stream) { detail::transform_reduce_e_high_degree - <<>>( + <<>>( edge_partition, edge_partition.major_range_first(), edge_partition.major_range_first() + (*segment_offsets)[1], @@ -528,13 +542,15 @@ T transform_reduce_e(raft::handle_t const& handle, edge_partition_e_mask, get_dataframe_buffer_begin(result_buffer), e_op); + }; } if ((*segment_offsets)[2] - (*segment_offsets)[1] > 0) { raft::grid_1d_warp_t update_grid((*segment_offsets)[2] - (*segment_offsets)[1], detail::transform_reduce_e_kernel_block_size, handle.get_device_properties().maxGridSize[0]); + cudastf_ctx.task(l_tr_buffers[1].write())->*[&](cudaStream_t stream) { detail::transform_reduce_e_mid_degree - <<>>( + <<>>( edge_partition, edge_partition.major_range_first() + (*segment_offsets)[1], edge_partition.major_range_first() + (*segment_offsets)[2], @@ -544,13 +560,15 @@ T transform_reduce_e(raft::handle_t const& handle, edge_partition_e_mask, get_dataframe_buffer_begin(result_buffer), e_op); + }; } if ((*segment_offsets)[3] - (*segment_offsets)[2] > 0) { + cudastf_ctx.task(l_tr_buffers[2].write())->*[&](cudaStream_t stream) { raft::grid_1d_thread_t update_grid((*segment_offsets)[3] - (*segment_offsets)[2], detail::transform_reduce_e_kernel_block_size, handle.get_device_properties().maxGridSize[0]); detail::transform_reduce_e_low_degree - <<>>( + <<>>( edge_partition, edge_partition.major_range_first() + (*segment_offsets)[2], edge_partition.major_range_first() + (*segment_offsets)[3], @@ -560,13 +578,15 @@ T transform_reduce_e(raft::handle_t const& handle, edge_partition_e_mask, get_dataframe_buffer_begin(result_buffer), e_op); + }; } if (edge_partition.dcs_nzd_vertex_count() && (*(edge_partition.dcs_nzd_vertex_count()) > 0)) { raft::grid_1d_thread_t update_grid(*(edge_partition.dcs_nzd_vertex_count()), detail::transform_reduce_e_kernel_block_size, handle.get_device_properties().maxGridSize[0]); + cudastf_ctx.task(l_tr_buffers[3].write())->*[&](cudaStream_t stream) { detail::transform_reduce_e_hypersparse - <<>>( + <<>>( edge_partition, edge_partition_src_value_input, edge_partition_dst_value_input, @@ -574,6 +594,7 @@ T transform_reduce_e(raft::handle_t const& handle, edge_partition_e_mask, get_dataframe_buffer_begin(result_buffer), e_op); + }; } } else { if (edge_partition.major_range_size() > 0) { @@ -581,8 +602,9 @@ T transform_reduce_e(raft::handle_t const& handle, detail::transform_reduce_e_kernel_block_size, handle.get_device_properties().maxGridSize[0]); + cudastf_ctx.task(l_tr_buffers[4].write())->*[&](cudaStream_t stream) { detail::transform_reduce_e_low_degree - <<>>( + <<>>( edge_partition, edge_partition.major_range_first(), edge_partition.major_range_last(), @@ -592,10 +614,13 @@ T transform_reduce_e(raft::handle_t const& handle, edge_partition_e_mask, get_dataframe_buffer_begin(result_buffer), e_op); + }; } } } + cudastf_ctx.finalize(); + auto result = thrust::reduce( handle.get_thrust_policy(), get_dataframe_buffer_begin(result_buffer),