#!/usr/bin/env python
#
# Project: S I L X project
# https://github.com/silx-kit/silx
#
# Copyright (C) 2012-2023 European Synchrotron Radiation Facility, Grenoble, France
#
# Principal author: Jérôme Kieffer (Jerome.Kieffer@ESRF.eu)
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
# files (the "Software"), to deal in the Software without
# restriction, including without limitation the rights to use,
# copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following
# conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
#
"""
Common OpenCL abstract base classe for different processing
"""
__author__ = "Jerome Kieffer"
__contact__ = "Jerome.Kieffer@ESRF.eu"
__license__ = "MIT"
__copyright__ = "European Synchrotron Radiation Facility, Grenoble, France"
__date__ = "09/11/2022"
__status__ = "stable"
import sys
import os
import logging
import gc
from collections import namedtuple
import numpy
import threading
from .common import (
ocl,
pyopencl,
release_cl_buffers,
query_kernel_info,
allocate_texture,
check_textures_availability,
)
from .utils import concatenate_cl_kernel
import platform
BufferDescription = namedtuple("BufferDescription", ["name", "size", "dtype", "flags"])
EventDescription = namedtuple(
"EventDescription", ["name", "event"]
) # Deprecated, please use ProfileDescription
ProfileDescription = namedtuple("ProfileDescription", ["name", "start", "stop"])
logger = logging.getLogger(__name__)
[docs]
class KernelContainer(object):
"""Those object holds a copy of all kernels accessible as attributes"""
def __init__(self, program):
"""Constructor of the class
:param program: the OpenCL program as generated by PyOpenCL
"""
self._program = program
for kernel in program.all_kernels():
self.__setattr__(kernel.function_name, kernel)
[docs]
def get_kernels(self):
"return the dictionary with all kernels"
return dict(
item for item in self.__dict__.items() if not item[0].startswith("_")
)
[docs]
def get_kernel(self, name):
"get a kernel from its name"
logger.debug("KernelContainer.get_kernel(%s)", name)
return self.__dict__.get(name)
[docs]
def max_workgroup_size(self, kernel_name):
"Retrieve the compile time WORK_GROUP_SIZE for a given kernel"
if isinstance(kernel_name, pyopencl.Kernel):
kernel = kernel_name
else:
kernel = self.get_kernel(kernel_name)
return query_kernel_info(self._program, kernel, "WORK_GROUP_SIZE")
[docs]
def min_workgroup_size(self, kernel_name):
"Retrieve the compile time PREFERRED_WORK_GROUP_SIZE_MULTIPLE for a given kernel"
if isinstance(kernel_name, pyopencl.Kernel):
kernel = kernel_name
else:
kernel = self.get_kernel(kernel_name)
return query_kernel_info(
self._program, kernel, "PREFERRED_WORK_GROUP_SIZE_MULTIPLE"
)
[docs]
class OpenclProcessing(object):
"""Abstract class for different types of OpenCL processing.
This class provides:
* Generation of the context, queues, profiling mode
* Additional function to allocate/free all buffers declared as static attributes of the class
* Functions to compile kernels, cache them and clean them
* helper functions to clone the object
"""
# Example of how to create an output buffer of 10 floats
buffers = [
BufferDescription("output", 10, numpy.float32, None),
]
# list of kernel source files to be concatenated before compilation of the program
kernel_files = []
def __init__(
self,
ctx=None,
devicetype="all",
platformid=None,
deviceid=None,
block_size=None,
memory=None,
profile=False,
):
"""Constructor of the abstract OpenCL processing class
:param ctx: actual working context, left to None for automatic
initialization from device type or platformid/deviceid
:param devicetype: type of device, can be "CPU", "GPU", "ACC" or "ALL"
:param platformid: integer with the platform_identifier, as given by clinfo
:param deviceid: Integer with the device identifier, as given by clinfo
:param block_size: preferred workgroup size, may vary depending on the
out come of the compilation
:param memory: minimum memory available on device
:param profile: switch on profiling to be able to profile at the kernel
level, store profiling elements (makes code slightly slower)
"""
self.sem = threading.Semaphore()
self._X87_VOLATILE = None
self.profile = None
self.events = [] # List with of EventDescription, kept for profiling
self.cl_mem = {} # dict with all buffer allocated
self.cl_program = None # The actual OpenCL program
self.cl_kernel_args = {} # dict with all kernel arguments
self.queue = None
if ctx:
self.ctx = ctx
else:
self.ctx = ocl.create_context(
devicetype=devicetype,
platformid=platformid,
deviceid=deviceid,
memory=memory,
)
device_name = self.ctx.devices[0].name.strip()
platform_name = self.ctx.devices[0].platform.name.strip()
platform = ocl.get_platform(platform_name)
self.device = platform.get_device(device_name)
self.cl_kernel_args = {} # dict with all kernel arguments
self.set_profiling(profile)
self.block_size = block_size
self.program = None
self.kernels = None
[docs]
def check_textures_availability(self):
return check_textures_availability(self.ctx)
def __del__(self):
"""Destructor: release all buffers and programs"""
try:
self.reset_log()
self.free_kernels()
self.free_buffers()
if self.queue is not None:
self.queue.finish()
except Exception as err:
logger.warning("%s: %s", type(err), err)
self.queue = None
self.device = None
self.ctx = None
gc.collect()
[docs]
def allocate_buffers(self, buffers=None, use_array=False):
"""
Allocate OpenCL buffers required for a specific configuration
:param buffers: a list of BufferDescriptions, leave to None for
paramatrized buffers.
:param use_array: allocate memory as pyopencl.array.Array
instead of pyopencl.Buffer
Note that an OpenCL context also requires some memory, as well
as Event and other OpenCL functionalities which cannot and are
not taken into account here. The memory required by a context
varies depending on the device. Typical for GTX580 is 65Mb but
for a 9300m is ~15Mb In addition, a GPU will always have at
least 3-5Mb of memory in use. Unfortunately, OpenCL does NOT
have a built-in way to check the actual free memory on a
device, only the total memory.
"""
if buffers is None:
buffers = self.buffers
with self.sem:
mem = {}
# check if enough memory is available on the device
ualloc = 0
for buf in buffers:
ualloc += numpy.dtype(buf.dtype).itemsize * numpy.prod(buf.size)
logger.info(
"%.3fMB are needed on device: %s, which has %.3fMB",
ualloc / 1.0e6,
self.device,
self.device.memory / 1.0e6,
)
if ualloc >= self.device.memory:
raise MemoryError(
"Fatal error in allocate_buffers. Not enough "
" device memory for buffers (%lu requested, %lu available)"
% (ualloc, self.device.memory)
)
# do the allocation
try:
if use_array:
for buf in buffers:
mem[buf.name] = pyopencl.array.empty(
self.queue, buf.size, buf.dtype
)
else:
for buf in buffers:
size = numpy.dtype(buf.dtype).itemsize * numpy.prod(buf.size)
mem[buf.name] = pyopencl.Buffer(self.ctx, buf.flags, int(size))
except pyopencl.MemoryError as error:
release_cl_buffers(mem)
raise MemoryError(error)
self.cl_mem.update(mem)
[docs]
def add_to_cl_mem(self, parrays):
"""
Add pyopencl.array, which are allocated by pyopencl, to self.cl_mem.
This should be used before calling allocate_buffers().
:param parrays: a dictionary of `pyopencl.array.Array` or `pyopencl.Buffer`
"""
mem = self.cl_mem
for name, parr in parrays.items():
mem[name] = parr
self.cl_mem.update(mem)
[docs]
def check_workgroup_size(self, kernel_name):
"Calculate the maximum workgroup size from given kernel after compilation"
return self.kernels.max_workgroup_size(kernel_name)
[docs]
def free_buffers(self):
"""free all device.memory allocated on the device"""
with self.sem:
for key, buf in list(self.cl_mem.items()):
if buf is not None:
if isinstance(buf, pyopencl.array.Array):
try:
buf.data.release()
except pyopencl.LogicError:
logger.error("Error while freeing buffer %s", key)
else:
try:
buf.release()
except pyopencl.LogicError:
logger.error("Error while freeing buffer %s", key)
self.cl_mem[key] = None
[docs]
def compile_kernels(self, kernel_files=None, compile_options=None):
"""Call the OpenCL compiler
:param kernel_files: list of path to the kernel
(by default use the one declared in the class)
:param compile_options: string of compile options
"""
# concatenate all needed source files into a single openCL module
kernel_files = kernel_files or self.kernel_files
kernel_src = concatenate_cl_kernel(kernel_files)
compile_options = compile_options or self.get_compiler_options()
logger.info("Compiling file %s with options %s", kernel_files, compile_options)
try:
self.program = pyopencl.Program(self.ctx, kernel_src).build(
options=compile_options
)
except (pyopencl.MemoryError, pyopencl.LogicError) as error:
raise MemoryError(error)
else:
self.kernels = KernelContainer(self.program)
[docs]
def free_kernels(self):
"""Free all kernels"""
for kernel in self.cl_kernel_args:
self.cl_kernel_args[kernel] = []
self.kernels = None
self.program = None
# Methods about Profiling
[docs]
def set_profiling(self, value=True):
"""Switch On/Off the profiling flag of the command queue to allow debugging
:param value: set to True to enable profiling, or to False to disable it.
Without profiling, the processing is marginally faster
Profiling information can then be retrieved with the 'log_profile' method
"""
if bool(value) != self.profile:
with self.sem:
self.profile = bool(value)
if self.queue is not None:
self.queue.finish()
if self.profile:
self.queue = pyopencl.CommandQueue(
self.ctx,
properties=pyopencl.command_queue_properties.PROFILING_ENABLE,
)
else:
self.queue = pyopencl.CommandQueue(self.ctx)
# Update all memory-objects with the new queue:
for obj, cl_obj in list(self.cl_mem.items()):
if isinstance(cl_obj, pyopencl.array.Array):
self.cl_mem[obj] = cl_obj.with_queue(self.queue)
[docs]
def profile_add(self, event, desc):
"""
Add an OpenCL event to the events lists, if profiling is enabled.
:param event: pyopencl.NanyEvent.
:param desc: event description
"""
if self.profile:
try:
profile = event.profile
self.events.append(ProfileDescription(desc, profile.start, profile.end))
except Exception:
# Probably the driver does not support profiling
pass
[docs]
def profile_multi(self, event_lists):
"""
Extract profiling info from several OpenCL event, if profiling is enabled.
:param event_lists: list of ("desc", pyopencl.NanyEvent).
"""
if self.profile:
for event_desc in event_lists:
if isinstance(event_desc, ProfileDescription):
self.events.append(event_desc)
else:
if (
isinstance(event_desc, EventDescription)
or "__len__" in dir(event_desc)
and len(event_desc) == 2
):
desc, event = event_desc
else:
desc = "?"
event = event_desc
try:
profile = event.profile
start = profile.start
end = profile.end
except Exception:
# probably an unfinished job ... use old-style.
self.events.append(event_desc)
else:
self.events.append(ProfileDescription(desc, start, end))
[docs]
def log_profile(self, stats=False):
"""If we are in profiling mode, prints out all timing for every single OpenCL call
:param stats: if True, prints the statistics on each kernel instead of all execution timings
:return: list of lines to print
"""
total_time = 0.0
out = [""]
if stats:
stats = {}
out.append(
f"OpenCL kernel profiling statistics in milliseconds for: {self.__class__.__name__}"
)
out.append(
f"{'Kernel name':>50} (count): min median max mean std"
)
else:
stats = None
out.append(f"Profiling info for OpenCL: {self.__class__.__name__}")
if self.profile:
for e in self.events:
if isinstance(e, ProfileDescription):
name = e[0]
t0 = e[1]
t1 = e[2]
elif (
isinstance(e, EventDescription)
or "__len__" in dir(e)
and len(e) == 2
):
name = e[0]
pr = e[1].profile
t0 = pr.start
t1 = pr.end
else:
name = "?"
t0 = e.profile.start
t1 = e.profile.end
et = 1e-6 * (t1 - t0)
total_time += et
if stats is None:
out.append(f"{name:>50} : {et:.3f}ms")
else:
if name in stats:
stats[name].append(et)
else:
stats[name] = [et]
if stats is not None:
for k, v in stats.items():
n = numpy.array(v)
out.append(
f"{k:>50} ({len(v):5}): {n.min():8.3f} {numpy.median(n):8.3f} {n.max():8.3f} {n.mean():8.3f} {n.std():8.3f}"
)
out.append("_" * 80)
out.append(
f"{'Total OpenCL execution time':>50} : {total_time:.3f}ms"
)
logger.info(os.linesep.join(out))
return out
[docs]
def reset_log(self):
"""
Resets the profiling timers
"""
with self.sem:
self.events = []
# Methods about textures
[docs]
def allocate_texture(self, shape, hostbuf=None, support_1D=False):
return allocate_texture(self.ctx, shape, hostbuf=hostbuf, support_1D=support_1D)
[docs]
def transfer_to_texture(self, arr, tex_ref):
"""
Transfer an array to a texture.
:param arr: Input array. Can be a numpy array or a pyopencl array.
:param tex_ref: texture reference (pyopencl._cl.Image).
"""
copy_args = [self.queue, tex_ref, arr]
shp = arr.shape
ndim = arr.ndim
if ndim == 1:
# pyopencl and OpenCL < 1.2 do not support image1d_t
# force 2D with one row in this case
# ~ ndim = 2
shp = (1,) + shp
copy_kwargs = {"origin": (0,) * ndim, "region": shp[::-1]}
if not (isinstance(arr, numpy.ndarray)): # assuming pyopencl.array.Array
# D->D copy
copy_args[2] = arr.data
copy_kwargs["offset"] = 0
ev = pyopencl.enqueue_copy(*copy_args, **copy_kwargs)
self.profile_add(ev, "Transfer to texture")
@property
def x87_volatile_option(self):
# this is running 32 bits OpenCL woth POCL
if self._X87_VOLATILE is None:
if (
platform.machine() in ("i386", "i686", "x86_64", "AMD64")
and (tuple.__itemsize__ == 4)
and self.ctx.devices[0].platform.name == "Portable Computing Language"
):
self._X87_VOLATILE = "-DX87_VOLATILE=volatile"
else:
self._X87_VOLATILE = ""
return self._X87_VOLATILE
[docs]
def get_compiler_options(self, x87_volatile=False):
"""Provide the default OpenCL compiler options
:param x87_volatile: needed for Kahan summation
:return: string with compiler option
"""
option_list = []
if x87_volatile:
option_list.append(self.x87_volatile_option)
return " ".join(i for i in option_list if i)
# This should be implemented by concrete class
# def __copy__(self):
# """Shallow copy of the object
#
# :return: copy of the object
# """
# return self.__class__((self._data, self._indices, self._indptr),
# self.size, block_size=self.BLOCK_SIZE,
# platformid=self.platform.id,
# deviceid=self.device.id,
# checksum=self.on_device.get("data"),
# profile=self.profile, empty=self.empty)
#
# def __deepcopy__(self, memo=None):
# """deep copy of the object
#
# :return: deepcopy of the object
# """
# if memo is None:
# memo = {}
# new_csr = self._data.copy(), self._indices.copy(), self._indptr.copy()
# memo[id(self._data)] = new_csr[0]
# memo[id(self._indices)] = new_csr[1]
# memo[id(self._indptr)] = new_csr[2]
# new_obj = self.__class__(new_csr, self.size,
# block_size=self.BLOCK_SIZE,
# platformid=self.platform.id,
# deviceid=self.device.id,
# checksum=self.on_device.get("data"),
# profile=self.profile, empty=self.empty)
# memo[id(self)] = new_obj
# return new_obj