#!/usr/bin/env python
#
# Project: Sift implementation in Python + OpenCL
# https://github.com/silx-kit/silx
#
# Copyright (C) 2013-2020 European Synchrotron Radiation Facility, Grenoble, France
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
# files (the "Software"), to deal in the Software without
# restriction, including without limitation the rights to use,
# copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following
# conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
"""
This module provides a class for CBF byte offset compression/decompression.
"""
__authors__ = ["Jérôme Kieffer"]
__contact__ = "jerome.kieffer@esrf.eu"
__license__ = "MIT"
__copyright__ = "European Synchrotron Radiation Facility, Grenoble, France"
__date__ = "11/10/2018"
__status__ = "production"
import functools
import os
import numpy
from ..common import ocl, pyopencl
from ..processing import BufferDescription, EventDescription, OpenclProcessing
import logging
logger = logging.getLogger(__name__)
if pyopencl:
import pyopencl.version
if pyopencl.version.VERSION < (2016, 0):
from pyopencl.scan import GenericScanKernel, GenericDebugScanKernel
else:
from pyopencl.algorithm import GenericScanKernel
from pyopencl.scan import GenericDebugScanKernel
else:
logger.warning("No PyOpenCL, no byte-offset, please see fabio")
[docs]class ByteOffset(OpenclProcessing):
"""Perform the byte offset compression/decompression on the GPU
See :class:`OpenclProcessing` for optional arguments description.
:param int raw_size:
Size of the raw stream for decompression.
It can be (slightly) larger than the array.
:param int dec_size:
Size of the decompression output array
(mandatory for decompression)
"""
def __init__(self, raw_size=None, dec_size=None,
ctx=None, devicetype="all",
platformid=None, deviceid=None,
block_size=None, profile=False):
OpenclProcessing.__init__(self, ctx=ctx, devicetype=devicetype,
platformid=platformid, deviceid=deviceid,
block_size=block_size, profile=profile)
if self.block_size is None:
self.block_size = self.device.max_work_group_size
wg = self.block_size
buffers = [BufferDescription("counter", 1, numpy.int32, None)]
if raw_size is None:
self.raw_size = -1
self.padded_raw_size = -1
else:
self.raw_size = int(raw_size)
self.padded_raw_size = int((self.raw_size + wg - 1) & ~(wg - 1))
buffers += [
BufferDescription("raw", self.padded_raw_size, numpy.int8, None),
BufferDescription("mask", self.padded_raw_size, numpy.int32, None),
BufferDescription("values", self.padded_raw_size, numpy.int32, None),
BufferDescription("exceptions", self.padded_raw_size, numpy.int32, None)
]
if dec_size is None:
self.dec_size = None
else:
self.dec_size = numpy.int32(dec_size)
buffers += [
BufferDescription("data_float", self.dec_size, numpy.float32, None),
BufferDescription("data_int", self.dec_size, numpy.int32, None)
]
self.allocate_buffers(buffers, use_array=True)
self.compile_kernels([os.path.join("codec", "byte_offset")])
self.kernels.__setattr__("scan", self._init_double_scan())
self.kernels.__setattr__("compression_scan",
self._init_compression_scan())
def _init_double_scan(self):
""""generates a double scan on indexes and values in one operation"""
arguments = "__global int *value", "__global int *index"
int2 = pyopencl.tools.get_or_register_dtype("int2")
input_expr = "index[i]>0 ? (int2)(0, 0) : (int2)(value[i], 1)"
scan_expr = "a+b"
neutral = "(int2)(0,0)"
output_statement = "value[i] = item.s0; index[i+1] = item.s1;"
if self.block_size > 256:
knl = GenericScanKernel(self.ctx,
dtype=int2,
arguments=arguments,
input_expr=input_expr,
scan_expr=scan_expr,
neutral=neutral,
output_statement=output_statement)
else: # MacOS on CPU
knl = GenericDebugScanKernel(self.ctx,
dtype=int2,
arguments=arguments,
input_expr=input_expr,
scan_expr=scan_expr,
neutral=neutral,
output_statement=output_statement)
return knl
[docs] def decode(self, raw, as_float=False, out=None):
"""This function actually performs the decompression by calling the kernels
:param numpy.ndarray raw: The compressed data as a 1D numpy array of char.
:param bool as_float: True to decompress as float32,
False (default) to decompress as int32
:param pyopencl.array out: pyopencl array in which to place the result.
:return: The decompressed image as an pyopencl array.
:rtype: pyopencl.array
"""
assert self.dec_size is not None, \
"dec_size is a mandatory ByteOffset init argument for decompression"
events = []
with self.sem:
len_raw = numpy.int32(len(raw))
if len_raw > self.padded_raw_size:
wg = self.block_size
self.raw_size = int(len(raw))
self.padded_raw_size = (self.raw_size + wg - 1) & ~(wg - 1)
logger.info("increase raw buffer size to %s", self.padded_raw_size)
buffers = {
"raw": pyopencl.array.empty(self.queue, self.padded_raw_size, dtype=numpy.int8),
"mask": pyopencl.array.empty(self.queue, self.padded_raw_size, dtype=numpy.int32),
"exceptions": pyopencl.array.empty(self.queue, self.padded_raw_size, dtype=numpy.int32),
"values": pyopencl.array.empty(self.queue, self.padded_raw_size, dtype=numpy.int32),
}
self.cl_mem.update(buffers)
else:
wg = self.block_size
evt = pyopencl.enqueue_copy(self.queue, self.cl_mem["raw"].data,
raw,
is_blocking=False)
events.append(EventDescription("copy raw H -> D", evt))
evt = self.kernels.fill_int_mem(self.queue, (self.padded_raw_size,), (wg,),
self.cl_mem["mask"].data,
numpy.int32(self.padded_raw_size),
numpy.int32(0),
numpy.int32(0))
events.append(EventDescription("memset mask", evt))
evt = self.kernels.fill_int_mem(self.queue, (1,), (1,),
self.cl_mem["counter"].data,
numpy.int32(1),
numpy.int32(0),
numpy.int32(0))
events.append(EventDescription("memset counter", evt))
evt = self.kernels.mark_exceptions(self.queue, (self.padded_raw_size,), (wg,),
self.cl_mem["raw"].data,
len_raw,
numpy.int32(self.raw_size),
self.cl_mem["mask"].data,
self.cl_mem["values"].data,
self.cl_mem["counter"].data,
self.cl_mem["exceptions"].data)
events.append(EventDescription("mark exceptions", evt))
nb_exceptions = numpy.empty(1, dtype=numpy.int32)
evt = pyopencl.enqueue_copy(self.queue, nb_exceptions, self.cl_mem["counter"].data,
is_blocking=False)
events.append(EventDescription("copy counter D -> H", evt))
evt.wait()
nbexc = int(nb_exceptions[0])
if nbexc == 0:
logger.info("nbexc %i", nbexc)
else:
evt = self.kernels.treat_exceptions(self.queue, (nbexc,), (1,),
self.cl_mem["raw"].data,
len_raw,
self.cl_mem["mask"].data,
self.cl_mem["exceptions"].data,
self.cl_mem["values"].data
)
events.append(EventDescription("treat_exceptions", evt))
#self.cl_mem["copy_values"] = self.cl_mem["values"].copy()
#self.cl_mem["copy_mask"] = self.cl_mem["mask"].copy()
evt = self.kernels.scan(self.cl_mem["values"],
self.cl_mem["mask"],
queue=self.queue,
size=int(len_raw),
wait_for=(evt,))
events.append(EventDescription("double scan", evt))
#evt.wait()
if out is not None:
if out.dtype == numpy.float32:
copy_results = self.kernels.copy_result_float
else:
copy_results = self.kernels.copy_result_int
else:
if as_float:
out = self.cl_mem["data_float"]
copy_results = self.kernels.copy_result_float
else:
out = self.cl_mem["data_int"]
copy_results = self.kernels.copy_result_int
evt = copy_results(self.queue, (self.padded_raw_size,), (wg,),
self.cl_mem["values"].data,
self.cl_mem["mask"].data,
len_raw,
self.dec_size,
out.data
)
events.append(EventDescription("copy_results", evt))
#evt.wait()
if self.profile:
self.events += events
return out
__call__ = decode
def _init_compression_scan(self):
"""Initialize CBF compression scan kernels"""
preamble = """
int compressed_size(int diff) {
int abs_diff = abs(diff);
if (abs_diff < 128) {
return 1;
}
else if (abs_diff < 32768) {
return 3;
}
else {
return 7;
}
}
void write(const int index,
const int diff,
global char *output) {
int abs_diff = abs(diff);
if (abs_diff < 128) {
output[index] = (char) diff;
}
else if (abs_diff < 32768) {
output[index] = -128;
output[index + 1] = (char) (diff >> 0);
output[index + 2] = (char) (diff >> 8);
}
else {
output[index] = -128;
output[index + 1] = 0;
output[index + 2] = -128;
output[index + 3] = (char) (diff >> 0);
output[index + 4] = (char) (diff >> 8);
output[index + 5] = (char) (diff >> 16);
output[index + 6] = (char) (diff >> 24);
}
}
"""
arguments = "__global const int *data, __global char *compressed, __global int *size"
input_expr = "compressed_size((i == 0) ? data[0] : (data[i] - data[i - 1]))"
scan_expr = "a+b"
neutral = "0"
output_statement = """
if (prev_item == 0) { // 1st thread store compressed data size
size[0] = last_item;
}
write(prev_item, (i == 0) ? data[0] : (data[i] - data[i - 1]), compressed);
"""
if self.block_size >= 64:
knl = GenericScanKernel(self.ctx,
dtype=numpy.int32,
preamble=preamble,
arguments=arguments,
input_expr=input_expr,
scan_expr=scan_expr,
neutral=neutral,
output_statement=output_statement)
else: # MacOS on CPU
knl = GenericDebugScanKernel(self.ctx,
dtype=numpy.int32,
preamble=preamble,
arguments=arguments,
input_expr=input_expr,
scan_expr=scan_expr,
neutral=neutral,
output_statement=output_statement)
return knl
[docs] def encode(self, data, out=None):
"""Compress data to CBF.
:param data: The data to compress as a numpy array
(or a pyopencl Array) of int32.
:type data: Union[numpy.ndarray, pyopencl.array.Array]
:param pyopencl.array out:
pyopencl array of int8 in which to store the result.
The array should be large enough to store the compressed data.
:return: The compressed data as a pyopencl array.
If out is provided, this array shares the backing buffer,
but has the exact size of the compressed data and the queue
of the ByteOffset instance.
:rtype: pyopencl.array
:raises ValueError: if out array is not large enough
"""
events = []
with self.sem:
if isinstance(data, pyopencl.array.Array):
d_data = data # Uses provided array
else: # Copy data to device
data = numpy.ascontiguousarray(data, dtype=numpy.int32).ravel()
# Make sure data array exists and is large enough
if ("data_input" not in self.cl_mem or
self.cl_mem["data_input"].size < data.size):
logger.info("increase data input buffer size to %s", data.size)
self.cl_mem.update({
"data_input": pyopencl.array.empty(self.queue,
data.size,
dtype=numpy.int32)})
d_data = self.cl_mem["data_input"]
evt = pyopencl.enqueue_copy(
self.queue, d_data.data, data, is_blocking=False)
events.append(EventDescription("copy data H -> D", evt))
# Make sure compressed array exists and is large enough
compressed_size = d_data.size * 7
if ("compressed" not in self.cl_mem or
self.cl_mem["compressed"].size < compressed_size):
logger.info("increase compressed buffer size to %s", compressed_size)
self.cl_mem.update({
"compressed": pyopencl.array.empty(self.queue,
compressed_size,
dtype=numpy.int8)})
d_compressed = self.cl_mem["compressed"]
d_size = self.cl_mem["counter"] # Shared with decompression
evt = self.kernels.compression_scan(d_data, d_compressed, d_size)
events.append(EventDescription("compression scan", evt))
byte_count = int(d_size.get()[0])
if out is None:
# Create out array from a sub-region of the compressed buffer
out = pyopencl.array.Array(
self.queue,
shape=(byte_count,),
dtype=numpy.int8,
allocator=functools.partial(
d_compressed.base_data.get_sub_region,
d_compressed.offset))
elif out.size < byte_count:
raise ValueError(
"Provided output buffer is not large enough: "
"requires %d bytes, got %d" % (byte_count, out.size))
else: # out.size >= byte_count
# Create an array with a sub-region of out and this class queue
out = pyopencl.array.Array(
self.queue,
shape=(byte_count,),
dtype=numpy.int8,
allocator=functools.partial(out.base_data.get_sub_region,
out.offset))
evt = pyopencl.enqueue_copy(self.queue, out.data, d_compressed.data,
byte_count=byte_count)
events.append(
EventDescription("copy D -> D: internal -> out", evt))
if self.profile:
self.events += events
return out
[docs] def encode_to_bytes(self, data):
"""Compresses data to CBF and returns compressed data as bytes.
Usage:
Provided an image (`image`) stored as a numpy array of int32,
first, create a byte offset compression/decompression object:
>>> from silx.opencl.codec.byte_offset import ByteOffset
>>> byte_offset_codec = ByteOffset()
Then, compress an image into bytes:
>>> compressed = byte_offset_codec.encode_to_bytes(image)
:param data: The data to compress as a numpy array
(or a pyopencl Array) of int32.
:type data: Union[numpy.ndarray, pyopencl.array.Array]
:return: The compressed data as bytes.
:rtype: bytes
"""
compressed_array = self.encode(data)
return compressed_array.get().tobytes()