From 4d9059cb75dfe0454970ce336c2a898096874df3 Mon Sep 17 00:00:00 2001 From: Jerome Kieffer Date: Wed, 30 Oct 2024 10:10:40 +0100 Subject: [PATCH] Collective functions for reduction close #2310 --- src/pyFAI/opencl/test/meson.build | 3 +- src/pyFAI/opencl/test/test_collective.py | 154 ++++++++++++++++++ src/pyFAI/resources/openCL/bitonic.cl | 2 +- .../resources/openCL/collective/reduction.cl | 66 ++++++++ src/pyFAI/resources/openCL/meson.build | 2 + 5 files changed, 225 insertions(+), 2 deletions(-) create mode 100644 src/pyFAI/opencl/test/test_collective.py diff --git a/src/pyFAI/opencl/test/meson.build b/src/pyFAI/opencl/test/meson.build index 94e68a580..8bc353746 100644 --- a/src/pyFAI/opencl/test/meson.build +++ b/src/pyFAI/opencl/test/meson.build @@ -7,7 +7,8 @@ py.install_sources( 'test_ocl_sort.py', 'test_openCL.py', 'test_peak_finder.py', - 'test_preproc.py'], + 'test_preproc.py', + 'test_collective.py'], pure: false, # Will be installed next to binaries subdir: 'pyFAI/opencl/test' # Folder relative to site-packages to install to ) diff --git a/src/pyFAI/opencl/test/test_collective.py b/src/pyFAI/opencl/test/test_collective.py new file mode 100644 index 000000000..78edffc63 --- /dev/null +++ b/src/pyFAI/opencl/test/test_collective.py @@ -0,0 +1,154 @@ +#!/usr/bin/env python3 +# coding: utf-8 +# +# Project: Basic OpenCL test +# https://github.com/silx-kit/silx +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation +# files (the "Software"), to deal in the Software without +# restriction, including without limitation the rights to use, +# copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following +# conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. + +""" +Simple test for collective functions +""" + +__authors__ = ["Jérôme Kieffer"] +__contact__ = "jerome.kieffer@esrf.eu" +__license__ = "MIT" +__copyright__ = "2013 European Synchrotron Radiation Facility, Grenoble, France" +__date__ = "30/10/2024" + +import logging +import numpy +import platform +import unittest +from .. import ocl +if ocl: + import pyopencl.array +from ...test.utilstest import UtilsTest +from silx.opencl.common import _measure_workgroup_size +from silx.opencl.utils import get_opencl_code + +logger = logging.getLogger(__name__) + + +@unittest.skipIf(UtilsTest.opencl is False, "User request to skip OpenCL tests") +@unittest.skipUnless(ocl, "PyOpenCl is missing") +class TestReduction(unittest.TestCase): + + @classmethod + def setUpClass(cls): + super(TestReduction, cls).setUpClass() + + if ocl: + cls.ctx = ocl.create_context() + if logger.getEffectiveLevel() <= logging.INFO: + cls.PROFILE = True + properties = pyopencl.command_queue_properties.PROFILING_ENABLE + cls.queue = pyopencl.CommandQueue(cls.ctx, properties=properties) + else: + cls.PROFILE = False + cls.queue = pyopencl.CommandQueue(cls.ctx) + cls.max_valid_wg = cls.ctx.devices[0].max_work_group_size + if (platform.machine().startswith("ppc") and + cls.ctx.devices[0].platform.name.startswith("Portable") + and cls.ctx.devices[0].type == pyopencl.device_type.GPU): + raise unittest.SkipTest("Skip test on Power9 GPU with PoCL driver") + + @classmethod + def tearDownClass(cls): + super(TestReduction, cls).tearDownClass() + print("Maximum valid workgroup size %s on device %s" % (cls.max_valid_wg, cls.ctx.devices[0])) + cls.ctx = None + cls.queue = None + + def setUp(self): + if ocl is None: + return + self.shape = 4096 + rng = UtilsTest.get_rng() + self.data = rng.poisson(10, size=self.shape).astype(numpy.int32) + self.data_d = pyopencl.array.to_device(self.queue, self.data) + self.sum_d = pyopencl.array.zeros_like(self.data_d) + self.program = pyopencl.Program(self.ctx, get_opencl_code("pyfai:openCL/collective/reduction.cl")).build() + + def tearDown(self): + self.img = self.data = None + self.data_d = self.sum_d = self.program = None + + @unittest.skipUnless(ocl, "pyopencl is missing") + def test_reduction(self): + """ + tests the sum_int_reduction function + """ + # rec_workgroup = self.program.test_sum_int_reduction.get_work_group_info(pyopencl.kernel_work_group_info.WORK_GROUP_SIZE, self.ctx.devices[0]) + maxi = int(round(numpy.log2(min(self.shape,self.max_valid_wg))))+1 + for i in range(maxi): + wg = 1 << i + try: + evt = self.program.test_sum_int_reduction(self.queue, (self.shape,), (wg,), + self.data_d.data, + self.sum_d.data, + pyopencl.LocalMemory(4*wg)) + evt.wait() + except Exception as error: + logger.error("Error %s on WG=%s: test_reduction", error, wg) + break + else: + res = self.sum_d.get() + ref = numpy.outer(self.data.reshape((-1, wg)).sum(axis=-1),numpy.ones(wg)).ravel() + good = numpy.allclose(res, ref) + logger.info("Wg: %s result: reduction OK %s", wg, good) + self.assertTrue(good, "calculation is correct for WG=%s" % wg) + + @unittest.skipUnless(ocl, "pyopencl is missing") + def test_atomic(self): + """ + tests the sum_int_atomic function + """ + + maxi = int(round(numpy.log2(min(self.shape, self.max_valid_wg))))+1 + for i in range(maxi): + wg = 1 << i + try: + evt = self.program.test_sum_int_atomic(self.queue, (self.shape,), (wg,), + self.data_d.data, + self.sum_d.data, + pyopencl.LocalMemory(4*wg)) + evt.wait() + except Exception as error: + logger.error("Error %s on WG=%s: test_atomic", error, wg) + break + else: + res = self.sum_d.get() + ref = numpy.outer(self.data.reshape((-1, wg)).sum(axis=-1),numpy.ones(wg)).ravel() + good = numpy.allclose(res, ref) + logger.info("Wg: %s result: atomic good: %s", wg, good) + self.assertTrue(good, "calculation is correct for WG=%s" % wg) + +def suite(): + loader = unittest.defaultTestLoader.loadTestsFromTestCase + testSuite = unittest.TestSuite() + testSuite.addTest(loader(TestReduction)) + return testSuite + + +if __name__ == '__main__': + unittest.main(defaultTest="suite") diff --git a/src/pyFAI/resources/openCL/bitonic.cl b/src/pyFAI/resources/openCL/bitonic.cl index 9f32796d5..20fbfb817 100644 --- a/src/pyFAI/resources/openCL/bitonic.cl +++ b/src/pyFAI/resources/openCL/bitonic.cl @@ -292,7 +292,7 @@ __kernel void bsort_all(__global float4 *g_data, // dim0 = y: wg=1 // dim1 = x: wg=number_of_element/8 __kernel void bsort_horizontal(__global float *g_data, - __local float4 *l_data) { + __local float4 *l_data) { float8 input, output; uint id, global_start, offset; diff --git a/src/pyFAI/resources/openCL/collective/reduction.cl b/src/pyFAI/resources/openCL/collective/reduction.cl index e69de29bb..04030988f 100644 --- a/src/pyFAI/resources/openCL/collective/reduction.cl +++ b/src/pyFAI/resources/openCL/collective/reduction.cl @@ -0,0 +1,66 @@ + +/* sum all elements in a shared memory, same size as the workgroup size 0 + * + * Return the same sum-value in all threads. + */ + +int inline sum_int_reduction(local int* shared) +{ + int wg = get_local_size(0); + int tid = get_local_id(0); + + // local reduction based implementation + for (int stride=wg>>1; stride>0; stride>>=1) + { + barrier(CLK_LOCAL_MEM_FENCE); + if ((tid