diff --git a/src/pyFAI/opencl/test/test_collective.py b/src/pyFAI/opencl/test/test_collective.py index 78edffc63..0a4b4fd7e 100644 --- a/src/pyFAI/opencl/test/test_collective.py +++ b/src/pyFAI/opencl/test/test_collective.py @@ -33,7 +33,7 @@ __contact__ = "jerome.kieffer@esrf.eu" __license__ = "MIT" __copyright__ = "2013 European Synchrotron Radiation Facility, Grenoble, France" -__date__ = "30/10/2024" +__date__ = "07/11/2024" import logging import numpy @@ -87,7 +87,9 @@ def setUp(self): self.data = rng.poisson(10, size=self.shape).astype(numpy.int32) self.data_d = pyopencl.array.to_device(self.queue, self.data) self.sum_d = pyopencl.array.zeros_like(self.data_d) - self.program = pyopencl.Program(self.ctx, get_opencl_code("pyfai:openCL/collective/reduction.cl")).build() + self.program = pyopencl.Program(self.ctx, get_opencl_code("pyfai:openCL/collective/reduction.cl")+ + get_opencl_code("pyfai:openCL/collective/scan.cl") + ).build() def tearDown(self): self.img = self.data = None @@ -143,6 +145,91 @@ def test_atomic(self): logger.info("Wg: %s result: atomic good: %s", wg, good) self.assertTrue(good, "calculation is correct for WG=%s" % wg) + @unittest.skipUnless(ocl, "pyopencl is missing") + def test_Hillis_Steele(self): + """ + tests the Hillis_Steele scan function + """ + data_d = pyopencl.array.to_device(self.queue, self.data.astype("float32")) + scan_d = pyopencl.array.empty_like(data_d) + maxi = int(round(numpy.log2(min(self.shape, self.max_valid_wg))))+1 + for i in range(0, maxi): + wg = 1 << i + try: + evt = self.program.test_cumsum(self.queue, (self.shape,), (wg,), + data_d.data, + scan_d.data, + pyopencl.LocalMemory(2*4*wg)) + evt.wait() + except Exception as error: + logger.error("Error %s on WG=%s: Hillis_Steele", error, wg) + break + else: + res = scan_d.get().reshape((-1, wg)) + ref = numpy.array([numpy.cumsum(i) for i in self.data.reshape((-1, wg))]) + good = numpy.allclose(res, ref) + logger.info("Wg: %s result: cumsum good: %s", wg, good) + self.assertTrue(good, "Cumsum calculation is correct for WG=%s" % wg) + + @unittest.skipUnless(ocl, "pyopencl is missing") + def test_Blelloch(self): + """ + tests the Blelloch scan function + """ + data_d = pyopencl.array.to_device(self.queue, self.data.astype("float32")) + scan_d = pyopencl.array.empty_like(data_d) + maxi = int(round(numpy.log2(min(self.shape/2, self.max_valid_wg))))+1 + for i in range(maxi): + wg = 1 << i + try: + evt = self.program.test_blelloch_scan(self.queue, (self.shape//2,), (wg,), + data_d.data, + scan_d.data, + pyopencl.LocalMemory(2*4*wg)) + evt.wait() + except Exception as error: + logger.error("Error %s on WG=%s: Blelloch", error, wg) + break + else: + res = scan_d.get().reshape((-1, 2*wg)) + ref = numpy.array([numpy.cumsum(i) for i in self.data.reshape((-1, 2*wg))]) + good = numpy.allclose(res, ref) + if not good: + print(ref) + print(res) + logger.info("Wg: %s result: cumsum good: %s", wg, good) + self.assertTrue(good, "calculation is correct for WG=%s" % wg) + + @unittest.skipUnless(ocl, "pyopencl is missing") + def test_Blelloch_multipass(self): + """ + tests the Blelloch cumsum using multiple passes ... + """ + data_d = pyopencl.array.to_device(self.queue, self.data.astype("float32")) + scan_d = pyopencl.array.empty_like(data_d) + maxi = int(round(numpy.log2(min(self.shape/2, self.max_valid_wg))))+1 + for i in range(maxi): + wg = 1 << i + try: + evt = self.program.test_blelloch_multi(self.queue, (wg,), (wg,), + data_d.data, + scan_d.data, + numpy.int32(self.shape), + pyopencl.LocalMemory(2*4*wg)) + evt.wait() + except Exception as error: + logger.error("Error %s on WG=%s: Blelloch multi", error, wg) + break + else: + res = scan_d.get() + ref = numpy.cumsum(self.data) + good = numpy.allclose(res, ref) + if not good: + print(ref) + print(res) + logger.info("Wg: %s result: cumsum good: %s", wg, good) + self.assertTrue(good, "calculation is correct for WG=%s" % wg) + def suite(): loader = unittest.defaultTestLoader.loadTestsFromTestCase testSuite = unittest.TestSuite() diff --git a/src/pyFAI/resources/openCL/collective/comb_sort.cl b/src/pyFAI/resources/openCL/collective/comb_sort.cl index e69de29bb..f873c88c7 100644 --- a/src/pyFAI/resources/openCL/collective/comb_sort.cl +++ b/src/pyFAI/resources/openCL/collective/comb_sort.cl @@ -0,0 +1,197 @@ +/************ Management of the initial step size *********************/ + +int inline next_step(int step, float ratio) +{ + return convert_int_rtp((float)step*ratio); +} + +int inline previous_step(int step, float ratio) +{ + return convert_int_rtn((float)step/ratio); +} + +// smallest step smaller than the size ... iterative version. +int inline first_step(int step, int size, float ratio) +{ + while (step=size) + step=previous_step(step, ratio); + // if (get_local_id(0) == 0) printf("- %d %d\n", step, size); + return step; +} + + + +// returns the number of swap performed +int passe(global volatile float* elements, + int size, + int step, + local volatile int* shared) +{ + int wg = get_local_size(0); + int tid = get_local_id(0); + int cnt = 0; + int i, j, k; + barrier(CLK_GLOBAL_MEM_FENCE); + if (2*step>=size) + { + for (i=tid;i=size) + { + for (i=tid;i0; step=previous_step(step, ratio)) + { + cnt = passe(&elements[start], size, step, shared); + } + step = 1; + while (cnt){ + cnt = passe(&elements[start], size, step, shared); + } + + +} + +// workgroup: (wg, 1) grid:(wg, nb_lines), shared wg*sizeof(int) +kernel void test_combsort_float4(global volatile float4* elements, + global int* positions, + local volatile int* shared) +{ + int gid = get_group_id(1); + int step = 11; // magic value + float ratio=1.3f; // magic value + int cnt; + + int start, stop, size; + start = (gid)?positions[gid-1]:0; + stop = positions[gid]; + size = stop-start; + + step = first_step(step, size, ratio); + + for (step=step; step>0; step=previous_step(step, ratio)) + cnt = passe_float4(&elements[start], size, step, shared); + + step = 1; + while (cnt) + cnt = passe_float4(&elements[start], size, step, shared); +} diff --git a/src/pyFAI/resources/openCL/collective/scan.cl b/src/pyFAI/resources/openCL/collective/scan.cl index e69de29bb..a4ff2d7bf 100644 --- a/src/pyFAI/resources/openCL/collective/scan.cl +++ b/src/pyFAI/resources/openCL/collective/scan.cl @@ -0,0 +1,145 @@ + +/* + * Cumsum all elements in a shared memory + * + * Nota: the first wg-size elements, are used. + * The shared buffer needs to be twice this size of the workgroup + * + * Implements Hillis and Steele algorithm + * https://en.wikipedia.org/wiki/Prefix_sum#cite_ref-hs1986_9-0 + * + */ + +#define SWAP_LOCAL_FLOAT(a,b) {__local float *tmp=a;a=b;b=tmp;} + +void static inline cumsum_scan_float(local float* shared) +{ + int wg = get_local_size(0); + int tid = get_local_id(0); + // Split the input buffer in two parts + local float* buf1 = shared; + local float* buf2 = &shared[wg]; + barrier(CLK_LOCAL_MEM_FENCE); + for (int i=1; i=i) + buf2[tid] = buf1[tid] + buf1[tid-i]; + else + buf2[tid] = buf1[tid]; + barrier(CLK_LOCAL_MEM_FENCE); + SWAP_LOCAL_FLOAT(buf1, buf2); + } + // Finally copy all values to both halfs of the + barrier(CLK_LOCAL_MEM_FENCE); + buf2[tid] = buf1[tid]; + barrier(CLK_LOCAL_MEM_FENCE); +} + + +/* + * Kernel to test the cumsum_scan_float collective function + * + * Note the shared buffer needs to be twice the size of the workgroup ! + */ + +kernel void test_cumsum(global float* input, + global float* output, + local float* shared) +{ + int gid = get_global_id(0); + int wg = get_local_size(0); + int tid = get_local_id(0); + + shared[tid] = input[gid]; + cumsum_scan_float(shared); + output[gid] = shared[tid]; +} + +/* + * Exclusive prefix sum in a shared memory + * + * Implements Blelloch algorithm + * https://en.wikipedia.org/wiki/Prefix_sum#cite_ref-offman_10-0 + * + * One workgroup calculates the cumsum in an array of twice its size! + */ + + +void static inline blelloch_scan_float(local float *shared) +{ + int ws = get_local_size(0); + int lid = get_local_id(0); + int dp = 1; + int w; + + for(int s = ws; s > 0; s >>= 1) + { + barrier(CLK_LOCAL_MEM_FENCE); + if(lid < s) + { + int i = dp*(2*lid+1)-1; + int j = i + dp; + shared[j] += shared[i]; + } + dp <<= 1; + } + + dp >>= 1; + for(int s = 1; s < ws; s=((s+1)<<1)-1) + { + w = dp; + dp >>= 1; + + barrier(CLK_LOCAL_MEM_FENCE); + + if(lid < s) { + int i = (lid+1)*w - 1; + int j = i + dp; + shared[j] += shared[i]; + } + } + barrier(CLK_LOCAL_MEM_FENCE); +} + +kernel void test_blelloch_scan(global float *input, + global float *output, + local float *shared) +{ + int gid = get_global_id(0); + int lid = get_local_id(0); + + shared[2*lid] = input[2*gid]; + shared[2*lid+1] = input[2*gid+1]; + + blelloch_scan_float(shared); + + output[2*gid] = shared[2*lid]; + output[2*gid+1] = shared[2*lid+1]; +} + +kernel void test_blelloch_multi(global float *input, + global float *output, + int size, + local float *shared) +{ + int lid = get_local_id(0); + int ws = get_local_size(0); + float sum = 0.0f; + int target = (size + ws - 1) & ~ (ws - 1); + for (int i=lid; i