{ "cells": [ { "cell_type": "markdown", "id": "c38240c3-7646-497b-a3ac-d9cfa6c974f2", "metadata": {}, "source": [ "# Parallel processing of a stack of data stored in HDF5 with multi-threading\n", "\n", "This tutorial explains how it is possible to treat in parallel a large HDF5 dataset which does not fit into the computer memory.\n", "\n", "![Typical workflow](workflow.png)\n", "\n", "For this tutorial, a recent version of pyFAI is needed (>=0.22, summer 2022).\n", "\n", "This tutorial expains how to take benefit from multi-threading. This framework is not very popular in the Python world due to the [Global Interpreter Lock (GIL)](https://wiki.python.org/moin/GlobalInterpreterLock), but properly written C-code which does release the GIL can be very fast, sometimes as fast as GPU code (on large computers).\n", "\n", "**Credits:**\n", "\n", "* Thomas Vincent (ESRF) for the parallel decompression of HDF5 chunks and the Jupyter-slurm\n", "* Pierre Paleo (ESRF) for struggling with this kind of stuff with GPUs\n", "* Jon Wright (ESRF) for the CSC integrator, while implemented in serial is multithreading friendly + HDF5 investigation\n", "* The French-CRG for providing a manycore computer (2 x 32-core AMD EPYC 75F3)\n", "\n", "**Nota:** No GPU is needed for this tutorial!\n", "\n", "**Important:** the `bitshuffle` module needs to be compiled without OpenMP, since the tutorial aims at demonstrating that Python threads can be almost as efficient as OpenMP. If you have a doubt about OpenMP, please uncomment the environment variable OMP_NUM_THREADS reset in the second cell. This will unfortunately bias the performance measurement of pyFAI with the CSR sparse-matrix multiplication.\n", "\n", "## 1. Description of the computer.\n", "\n", "The results obtained vary a lot as function of the computer and its topology. This section details some internal details about the computer." ] }, { "cell_type": "code", "execution_count": 1, "id": "5eb5f112-bf32-4413-85ea-67eb88bf7ee9", "metadata": {}, "outputs": [], "source": [ "%matplotlib inline\n", "# use `widget` for better user experience; `inline` is for documentation generation" ] }, { "cell_type": "code", "execution_count": 2, "id": "638e4966-b05e-47e2-a4b0-5843b1b5ff93", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Working on computer hpc6-04.\n" ] } ], "source": [ "import sys, os, collections, struct, time, socket\n", "# Ensure OpenMP is disabled\n", "os.environ[\"OMP_NUM_THREADS\"] = \"1\"\n", "import numpy, pyFAI\n", "import h5py, hdf5plugin\n", "from queue import Queue\n", "import threading\n", "import bitshuffle\n", "from matplotlib.pyplot import subplots\n", "start_time = time.time()\n", "Item = collections.namedtuple(\"Item\", \"index data\")\n", "print(f\"Working on computer {socket.gethostname()}.\")" ] }, { "cell_type": "code", "execution_count": 3, "id": "210e4de4-184b-4c64-872e-5c1aaf080501", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Working with 64 threads. Mind OpenMP needs to be disabled in the bitshuffle code !\n" ] } ], "source": [ "nbthreads = len(os.sched_getaffinity(0))\n", "print(f\"Working with {nbthreads} threads. Mind OpenMP needs to be disabled in the bitshuffle code !\")" ] }, { "cell_type": "code", "execution_count": 4, "id": "8e649b79-f024-4635-ac8f-fc68827c60aa", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Architecture: x86_64\n", "CPU op-mode(s): 32-bit, 64-bit\n", "Byte Order: Little Endian\n", "Address sizes: 48 bits physical, 48 bits virtual\n", "CPU(s): 64\n", "On-line CPU(s) list: 0-63\n", "Thread(s) per core: 1\n", "Core(s) per socket: 32\n", "Socket(s): 2\n", "NUMA node(s): 2\n", "Vendor ID: AuthenticAMD\n", "CPU family: 25\n", "Model: 1\n", "Model name: AMD EPYC 75F3 32-Core Processor\n", "Stepping: 1\n", "Frequency boost: enabled\n", "CPU MHz: 1493.870\n", "CPU max MHz: 2950.0000\n", "CPU min MHz: 1500.0000\n", "BogoMIPS: 5888.66\n", "Virtualization: AMD-V\n", "L1d cache: 2 MiB\n", "L1i cache: 2 MiB\n", "L2 cache: 32 MiB\n", "L3 cache: 512 MiB\n", "NUMA node0 CPU(s): 0-31\n", "NUMA node1 CPU(s): 32-63\n", "Vulnerability Itlb multihit: Not affected\n", "Vulnerability L1tf: Not affected\n", "Vulnerability Mds: Not affected\n", "Vulnerability Meltdown: Not affected\n", "Vulnerability Mmio stale data: Not affected\n", "Vulnerability Retbleed: Not affected\n", "Vulnerability Spec store bypass: Mitigation; Speculative Store Bypass disabled v\n", " ia prctl and seccomp\n", "Vulnerability Spectre v1: Mitigation; usercopy/swapgs barriers and __user\n", " pointer sanitization\n", "Vulnerability Spectre v2: Mitigation; Retpolines, IBPB conditional, IBRS_\n", " FW, STIBP disabled, RSB filling, PBRSB-eIBRS No\n", " t affected\n", "Vulnerability Srbds: Not affected\n", "Vulnerability Tsx async abort: Not affected\n", "Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtr\n", " r pge mca cmov pat pse36 clflush mmx fxsr sse s\n", " se2 ht syscall nx mmxext fxsr_opt pdpe1gb rdtsc\n", " p lm constant_tsc rep_good nopl nonstop_tsc cpu\n", " id extd_apicid aperfmperf pni pclmulqdq monitor\n", " ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe\n", " popcnt aes xsave avx f16c rdrand lahf_lm cmp_l\n", " egacy svm extapic cr8_legacy abm sse4a misalign\n", " sse 3dnowprefetch osvw ibs skinit wdt tce topoe\n", " xt perfctr_core perfctr_nb bpext perfctr_llc mw\n", " aitx cpb cat_l3 cdp_l3 invpcid_single hw_pstate\n", " ssbd mba ibrs ibpb stibp vmmcall fsgsbase bmi1\n", " avx2 smep bmi2 invpcid cqm rdt_a rdseed adx sm\n", " ap clflushopt clwb sha_ni xsaveopt xsavec xgetb\n", " v1 xsaves cqm_llc cqm_occup_llc cqm_mbm_total c\n", " qm_mbm_local clzero irperf xsaveerptr wbnoinvd \n", " arat npt lbrv svm_lock nrip_save tsc_scale vmcb\n", " _clean flushbyasid decodeassists pausefilter pf\n", " threshold v_vmsave_vmload vgif umip pku ospke v\n", " aes vpclmulqdq rdpid overflow_recov succor smca\n" ] } ], "source": [ "!lscpu" ] }, { "cell_type": "code", "execution_count": 5, "id": "97da69b1-f59d-4bf7-99ff-19b426cdccce", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "available: 2 nodes (0-1)\n", "node 0 cpus: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31\n", "node 0 size: 257524 MB\n", "node 0 free: 244301 MB\n", "node 1 cpus: 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63\n", "node 1 size: 257995 MB\n", "node 1 free: 211831 MB\n", "node distances:\n", "node 0 1 \n", " 0: 10 32 \n", " 1: 32 10 \n" ] } ], "source": [ "!numactl --hardware" ] }, { "cell_type": "code", "execution_count": 6, "id": "345903ed-e336-4b37-b520-34790b95252d", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Machine (503GB total)\n", " Package L#0\n", " NUMANode L#0 (P#0 251GB)\n", " L3 L#0 (32MB)\n", " L2 L#0 (512KB) + L1d L#0 (32KB) + L1i L#0 (32KB) + Core L#0 + PU L#0 (P#0)\n", " L2 L#1 (512KB) + L1d L#1 (32KB) + L1i L#1 (32KB) + Core L#1 + PU L#1 (P#1)\n", " L2 L#2 (512KB) + L1d L#2 (32KB) + L1i L#2 (32KB) + Core L#2 + PU L#2 (P#2)\n", " L2 L#3 (512KB) + L1d L#3 (32KB) + L1i L#3 (32KB) + Core L#3 + PU L#3 (P#3)\n", " L3 L#1 (32MB)\n", " L2 L#4 (512KB) + L1d L#4 (32KB) + L1i L#4 (32KB) + Core L#4 + PU L#4 (P#4)\n", " L2 L#5 (512KB) + L1d L#5 (32KB) + L1i L#5 (32KB) + Core L#5 + PU L#5 (P#5)\n", " L2 L#6 (512KB) + L1d L#6 (32KB) + L1i L#6 (32KB) + Core L#6 + PU L#6 (P#6)\n", " L2 L#7 (512KB) + L1d L#7 (32KB) + L1i L#7 (32KB) + Core L#7 + PU L#7 (P#7)\n", " L3 L#2 (32MB)\n", " L2 L#8 (512KB) + L1d L#8 (32KB) + L1i L#8 (32KB) + Core L#8 + PU L#8 (P#8)\n", " L2 L#9 (512KB) + L1d L#9 (32KB) + L1i L#9 (32KB) + Core L#9 + PU L#9 (P#9)\n", " L2 L#10 (512KB) + L1d L#10 (32KB) + L1i L#10 (32KB) + Core L#10 + PU L#10 (P#10)\n", " L2 L#11 (512KB) + L1d L#11 (32KB) + L1i L#11 (32KB) + Core L#11 + PU L#11 (P#11)\n", " L3 L#3 (32MB)\n", " L2 L#12 (512KB) + L1d L#12 (32KB) + L1i L#12 (32KB) + Core L#12 + PU L#12 (P#12)\n", " L2 L#13 (512KB) + L1d L#13 (32KB) + L1i L#13 (32KB) + Core L#13 + PU L#13 (P#13)\n", " L2 L#14 (512KB) + L1d L#14 (32KB) + L1i L#14 (32KB) + Core L#14 + PU L#14 (P#14)\n", " L2 L#15 (512KB) + L1d L#15 (32KB) + L1i L#15 (32KB) + Core L#15 + PU L#15 (P#15)\n", " L3 L#4 (32MB)\n", " L2 L#16 (512KB) + L1d L#16 (32KB) + L1i L#16 (32KB) + Core L#16 + PU L#16 (P#16)\n", " L2 L#17 (512KB) + L1d L#17 (32KB) + L1i L#17 (32KB) + Core L#17 + PU L#17 (P#17)\n", " L2 L#18 (512KB) + L1d L#18 (32KB) + L1i L#18 (32KB) + Core L#18 + PU L#18 (P#18)\n", " L2 L#19 (512KB) + L1d L#19 (32KB) + L1i L#19 (32KB) + Core L#19 + PU L#19 (P#19)\n", " L3 L#5 (32MB)\n", " L2 L#20 (512KB) + L1d L#20 (32KB) + L1i L#20 (32KB) + Core L#20 + PU L#20 (P#20)\n", " L2 L#21 (512KB) + L1d L#21 (32KB) + L1i L#21 (32KB) + Core L#21 + PU L#21 (P#21)\n", " L2 L#22 (512KB) + L1d L#22 (32KB) + L1i L#22 (32KB) + Core L#22 + PU L#22 (P#22)\n", " L2 L#23 (512KB) + L1d L#23 (32KB) + L1i L#23 (32KB) + Core L#23 + PU L#23 (P#23)\n", " L3 L#6 (32MB)\n", " L2 L#24 (512KB) + L1d L#24 (32KB) + L1i L#24 (32KB) + Core L#24 + PU L#24 (P#24)\n", " L2 L#25 (512KB) + L1d L#25 (32KB) + L1i L#25 (32KB) + Core L#25 + PU L#25 (P#25)\n", " L2 L#26 (512KB) + L1d L#26 (32KB) + L1i L#26 (32KB) + Core L#26 + PU L#26 (P#26)\n", " L2 L#27 (512KB) + L1d L#27 (32KB) + L1i L#27 (32KB) + Core L#27 + PU L#27 (P#27)\n", " L3 L#7 (32MB)\n", " L2 L#28 (512KB) + L1d L#28 (32KB) + L1i L#28 (32KB) + Core L#28 + PU L#28 (P#28)\n", " L2 L#29 (512KB) + L1d L#29 (32KB) + L1i L#29 (32KB) + Core L#29 + PU L#29 (P#29)\n", " L2 L#30 (512KB) + L1d L#30 (32KB) + L1i L#30 (32KB) + Core L#30 + PU L#30 (P#30)\n", " L2 L#31 (512KB) + L1d L#31 (32KB) + L1i L#31 (32KB) + Core L#31 + PU L#31 (P#31)\n", " HostBridge\n", " PCIBridge\n", " PCI 01:00.0 (RAID)\n", " Block(Disk) \"sdb\"\n", " Block(Disk) \"sda\"\n", " HostBridge\n", " PCIBridge\n", " PCI 63:00.0 (Ethernet)\n", " Net \"eno12399np0\"\n", " PCI 63:00.1 (Ethernet)\n", " Net \"eno12409np1\"\n", " PCIBridge\n", " PCIBridge\n", " PCI 62:00.0 (VGA)\n", " Package L#1\n", " NUMANode L#1 (P#1 252GB)\n", " L3 L#8 (32MB)\n", " L2 L#32 (512KB) + L1d L#32 (32KB) + L1i L#32 (32KB) + Core L#32 + PU L#32 (P#32)\n", " L2 L#33 (512KB) + L1d L#33 (32KB) + L1i L#33 (32KB) + Core L#33 + PU L#33 (P#33)\n", " L2 L#34 (512KB) + L1d L#34 (32KB) + L1i L#34 (32KB) + Core L#34 + PU L#34 (P#34)\n", " L2 L#35 (512KB) + L1d L#35 (32KB) + L1i L#35 (32KB) + Core L#35 + PU L#35 (P#35)\n", " L3 L#9 (32MB)\n", " L2 L#36 (512KB) + L1d L#36 (32KB) + L1i L#36 (32KB) + Core L#36 + PU L#36 (P#36)\n", " L2 L#37 (512KB) + L1d L#37 (32KB) + L1i L#37 (32KB) + Core L#37 + PU L#37 (P#37)\n", " L2 L#38 (512KB) + L1d L#38 (32KB) + L1i L#38 (32KB) + Core L#38 + PU L#38 (P#38)\n", " L2 L#39 (512KB) + L1d L#39 (32KB) + L1i L#39 (32KB) + Core L#39 + PU L#39 (P#39)\n", " L3 L#10 (32MB)\n", " L2 L#40 (512KB) + L1d L#40 (32KB) + L1i L#40 (32KB) + Core L#40 + PU L#40 (P#40)\n", " L2 L#41 (512KB) + L1d L#41 (32KB) + L1i L#41 (32KB) + Core L#41 + PU L#41 (P#41)\n", " L2 L#42 (512KB) + L1d L#42 (32KB) + L1i L#42 (32KB) + Core L#42 + PU L#42 (P#42)\n", " L2 L#43 (512KB) + L1d L#43 (32KB) + L1i L#43 (32KB) + Core L#43 + PU L#43 (P#43)\n", " L3 L#11 (32MB)\n", " L2 L#44 (512KB) + L1d L#44 (32KB) + L1i L#44 (32KB) + Core L#44 + PU L#44 (P#44)\n", " L2 L#45 (512KB) + L1d L#45 (32KB) + L1i L#45 (32KB) + Core L#45 + PU L#45 (P#45)\n", " L2 L#46 (512KB) + L1d L#46 (32KB) + L1i L#46 (32KB) + Core L#46 + PU L#46 (P#46)\n", " L2 L#47 (512KB) + L1d L#47 (32KB) + L1i L#47 (32KB) + Core L#47 + PU L#47 (P#47)\n", " L3 L#12 (32MB)\n", " L2 L#48 (512KB) + L1d L#48 (32KB) + L1i L#48 (32KB) + Core L#48 + PU L#48 (P#48)\n", " L2 L#49 (512KB) + L1d L#49 (32KB) + L1i L#49 (32KB) + Core L#49 + PU L#49 (P#49)\n", " L2 L#50 (512KB) + L1d L#50 (32KB) + L1i L#50 (32KB) + Core L#50 + PU L#50 (P#50)\n", " L2 L#51 (512KB) + L1d L#51 (32KB) + L1i L#51 (32KB) + Core L#51 + PU L#51 (P#51)\n", " L3 L#13 (32MB)\n", " L2 L#52 (512KB) + L1d L#52 (32KB) + L1i L#52 (32KB) + Core L#52 + PU L#52 (P#52)\n", " L2 L#53 (512KB) + L1d L#53 (32KB) + L1i L#53 (32KB) + Core L#53 + PU L#53 (P#53)\n", " L2 L#54 (512KB) + L1d L#54 (32KB) + L1i L#54 (32KB) + Core L#54 + PU L#54 (P#54)\n", " L2 L#55 (512KB) + L1d L#55 (32KB) + L1i L#55 (32KB) + Core L#55 + PU L#55 (P#55)\n", " L3 L#14 (32MB)\n", " L2 L#56 (512KB) + L1d L#56 (32KB) + L1i L#56 (32KB) + Core L#56 + PU L#56 (P#56)\n", " L2 L#57 (512KB) + L1d L#57 (32KB) + L1i L#57 (32KB) + Core L#57 + PU L#57 (P#57)\n", " L2 L#58 (512KB) + L1d L#58 (32KB) + L1i L#58 (32KB) + Core L#58 + PU L#58 (P#58)\n", " L2 L#59 (512KB) + L1d L#59 (32KB) + L1i L#59 (32KB) + Core L#59 + PU L#59 (P#59)\n", " L3 L#15 (32MB)\n", " L2 L#60 (512KB) + L1d L#60 (32KB) + L1i L#60 (32KB) + Core L#60 + PU L#60 (P#60)\n", " L2 L#61 (512KB) + L1d L#61 (32KB) + L1i L#61 (32KB) + Core L#61 + PU L#61 (P#61)\n", " L2 L#62 (512KB) + L1d L#62 (32KB) + L1i L#62 (32KB) + Core L#62 + PU L#62 (P#62)\n", " L2 L#63 (512KB) + L1d L#63 (32KB) + L1i L#63 (32KB) + Core L#63 + PU L#63 (P#63)\n", " HostBridge\n", " PCIBridge\n", " PCI c3:00.0 (SATA)\n", " HostBridge\n", " PCIBridge\n", " PCI e1:00.0 (Ethernet)\n", " Net \"eno8303\"\n", " PCI e1:00.1 (Ethernet)\n", " Net \"eno8403\"\n" ] } ], "source": [ "!lstopo --of console" ] }, { "cell_type": "markdown", "id": "d2c54705-4b6c-444b-b9f1-b763f6a8d915", "metadata": {}, "source": [ "## 2. Setup the enviroment:\n", "\n", "This is a purely virtual experiment, the tutorial tries to be representative of the processing for the beamline of Jon Wright: ESRF-ID11 this is why we will use an Eiger 4M detector with data integrated over 1000 bins. Those parameters can be tuned.\n", "\n", "Random data are generated to mimic the scattering of a liquid with Poisson noise. The input file is fairly small, since those data compress nicely. The speed of the drive used for temporary storage is likely to have a huge impact, especially if all data do not hold in memory !" ] }, { "cell_type": "code", "execution_count": 7, "id": "1cd22d82-d4fb-4d28-9960-0442989ca18c", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "HDF5PluginBuildConfig(openmp=True, native=True, bmi2=True, sse2=True, avx2=True, avx512=False, cpp11=True, cpp14=True, ipp=False, filter_file_extension='.so', embedded_filters=('blosc', 'blosc2', 'bshuf', 'bzip2', 'fcidecomp', 'lz4', 'sz', 'sz3', 'zfp', 'zstd'))" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "det = pyFAI.detector_factory(\"eiger_4M\")\n", "shape = det.shape\n", "dtype = numpy.dtype(\"uint32\")\n", "filename = \"/tmp/big.h5\"\n", "h5path = \"data\"\n", "nbins = 1000\n", "cmp = hdf5plugin.Bitshuffle()\n", "hdf5plugin.get_config().build_config" ] }, { "cell_type": "code", "execution_count": 8, "id": "daf4eec8-e364-43b6-b5af-8f279c6aed38", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Number of frames the computer can host in memory: 30127.005\n", "Number of frames the computer can host in memory with SLURM restrictions: 3829.928\n" ] } ], "source": [ "mem_bytes = os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES')\n", "print(f\"Number of frames the computer can host in memory: {mem_bytes/(numpy.prod(shape)*dtype.itemsize):.3f}\")\n", "if os.environ.get('SLURM_MEM_PER_NODE'):\n", " print(f\"Number of frames the computer can host in memory with SLURM restrictions: {int(os.environ['SLURM_MEM_PER_NODE'])*(1<<20)/(numpy.prod(shape)*dtype.itemsize):.3f}\")" ] }, { "cell_type": "code", "execution_count": 9, "id": "bd038dc8-1e6d-4bd6-95b3-303f2d0f250b", "metadata": {}, "outputs": [], "source": [ "#The computer being limited to 64G of RAM, the number of frames actually possible is 3800.\n", "nbframes = 4096 # slightly larger than the maximum achievable ! Such a dataset should not host in memory." ] }, { "cell_type": "code", "execution_count": 10, "id": "a32bd8fb-9b64-4564-9a69-67a759bf3427", "metadata": {}, "outputs": [], "source": [ "#Prepare a frame with little count so that it compresses well\n", "geo = {\"detector\": det, \n", " \"wavelength\": 1e-10}\n", "ai = pyFAI.load(geo)\n", "q = numpy.arange(15)\n", "img = ai.calcfrom1d(q, 100/(1+q*q))\n", "frame = numpy.random.poisson(img).astype(dtype)" ] }, { "cell_type": "code", "execution_count": 11, "id": "53b49350-6b0c-4c3b-ba5b-ece50cd6fe98", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" }, { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": { "needs_background": "light" }, "output_type": "display_data" } ], "source": [ "# display the image\n", "fig,ax = subplots()\n", "ax.imshow(frame)" ] }, { "cell_type": "code", "execution_count": 12, "id": "17e3d7e2-de22-4d40-8230-39295f04e239", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Performances of the different algorithms for azimuthal integration of Eiger 4M image\n", "Using algorithm histogram : 420 ms ± 6.95 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n", "Using algorithm csc : 38.1 ms ± 1.21 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n", "Using algorithm csr : 46.8 ms ± 676 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)\n" ] } ], "source": [ "print(\"Performances of the different algorithms for azimuthal integration of Eiger 4M image\")\n", "for algo in (\"histogram\", \"csc\", \"csr\"):\n", " print(f\"Using algorithm {algo:10s}:\", end=\" \")\n", " %timeit ai.integrate1d(img, nbins, method=(\"full\", algo, \"cython\"))" ] }, { "cell_type": "markdown", "id": "22969c56-1d10-4950-aebe-699da628858e", "metadata": {}, "source": [ "**Note:** The full pixel splitting is time consuming and handicaps the histogram algorithm while both sparse-matrix methods are much faster since they cache this calculation in the sparse matrix.\n", "\n", "The compared performances of sparse-matrix methods is rather surprizing since the CSC algorithm, single threaded, is faster than the CSR which runs in parallel over 2x32 cores.\n", "This result is the combination of two facotors:\n", "\n", "1. The computer is built with two processors/sockets controling each its own memory. We call this a **Non Uniform Memory Access** computer and can be checked with `numactrl --hardware`. The CSR matrix multiplication will dispatch work on both processors and thus, needs to transfer part of the image from one NUMA subsystem (socket) to the other, which is slow (3.2x slower compared to a single-socket access, according to the output of numactl). \n", "\n", "2. The very large cache of this processor: 512MB are reported by `lscpu`, but a more precise tool, `lstopo` describes them as 32MB of L3 cache shared between 4 cores. This very large cache allows the complete frame and the sparse matrix to be pre-fetched which is a great advantage for the CSC algorithm.\n", "\n", "Running the very same benchmark on an Intel 2-socket server would remove the point 2, while running on a singe socket intel workstation would remove both points and the normal results would be that CSR should be faster than CSC. The best performances on can get with the CSR algorithm should be obtained when using 4 cores (sharing the same cache L3) out of 64 on this computer. This can be done by setting the environment variable **OMP_NUM_THREADS**. Unfortunately, it also requires to restart the process, thus cannot be demonstrated easily in the notebook (without restarting). \n", "\n", "**The first message to take home is that without the knownledge of the actual computer, no high-performace computing is possible**" ] }, { "cell_type": "code", "execution_count": 13, "id": "a98654ed-372a-4886-9292-32b31ae4ec2a", "metadata": {}, "outputs": [], "source": [ "#Does not work unless one restarts the process\n", "\n", "# print(\"Performances of the different algorithms for azimuthal integration of Eiger 4M image when using only 4 cores\")\n", "# mask = os.sched_getaffinity(0)\n", "# os.sched_setaffinity(0, [0,1,2,3])\n", "# for algo in (\"histogram\", \"csc\", \"csr\"):\n", "# print(f\"Using algorithm {algo}:\", end=\" \")\n", "# %timeit ai.integrate1d(img, nbins, method=(\"full\", algo, \"cython\"))\n", "# os.sched_setaffinity(0, mask)" ] }, { "cell_type": "markdown", "id": "63768a00-0b9e-4c55-a55e-efe6a8a17358", "metadata": {}, "source": [ "## 3. Writing the test dataset on disk." ] }, { "cell_type": "code", "execution_count": 14, "id": "32d5ebc6-3473-49a2-93e7-76e7bb8f17f1", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 14, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%%timeit -r1 -n1 -o -q\n", "#Saving of a HDF5 file with many frames ...\n", "if not os.path.exists(filename):\n", " with h5py.File(filename, \"w\") as h:\n", " ds = h.create_dataset(h5path, shape=(nbframes,)+shape, chunks=(1,)+shape, dtype=dtype, **cmp) \n", " for i in range(nbframes):\n", " ds[i] = frame + i%500 #Each frame has a different value to prevent caching effects" ] }, { "cell_type": "code", "execution_count": 15, "id": "f82b2daf-77c2-4720-9848-6ca9c273707f", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "File size 9.214 GB with a compression ratio of 7.429x\n", "Write speed: 1748.975 MB/s of uncompressed data, or 97.475 fps.\n" ] } ], "source": [ "timing_write = _\n", "size=os.stat(filename).st_size\n", "print(f\"File size {size/(1024**3):.3f} GB with a compression ratio of {nbframes*numpy.prod(shape)*dtype.itemsize/size:.3f}x\")\n", "print(f\"Write speed: {nbframes*numpy.prod(shape)*dtype.itemsize/(1e6*timing_write.best):.3f} MB/s of uncompressed data, or {nbframes/timing_write.best:.3f} fps.\")" ] }, { "cell_type": "markdown", "id": "dc086333-96da-47eb-8477-9a0a5f60bc20", "metadata": {}, "source": [ "No optimisation is done for writing: this tutorial is focused on reading & processing speed.\n", "We keep nevertheless those figures for reference.\n", "\n", "## 4. Reading the dataset using the h5py/HDF5 library:\n", "### 4.1 Using the `h5py` API in a natural way\n", "We start with the simplest way to read back all those data:" ] }, { "cell_type": "code", "execution_count": 16, "id": "889b740b-2ba4-4677-a8d6-346608d90158", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 16, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%%timeit -r1 -n1 -o -q\n", "#Reading all frames and decompressing them, the natural way way\n", "with h5py.File(filename, \"r\") as h:\n", " ds = h[h5path]\n", " for i in range(nbframes):\n", " frame = ds[i][...]" ] }, { "cell_type": "code", "execution_count": 17, "id": "c748fcbe-128d-46ab-9aa7-149c9a5b31fd", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Read speed: 2935.324 MB/s of uncompressed data, or 163.594 fps.\n" ] } ], "source": [ "timing_read0 = _\n", "print(f\"Read speed: {nbframes*numpy.prod(shape)*dtype.itemsize/(1e6*timing_read0.best):.3f} MB/s of uncompressed data,\\\n", " or {nbframes/timing_read0.best:.3f} fps.\")" ] }, { "cell_type": "markdown", "id": "4f95dca1-0301-4a04-b397-a9358816ee50", "metadata": {}, "source": [ "Reading all data from HDF5 file is as slow as (if not slower than) writing. \n", "This is mostly due to the decompression and to the many memory allocation performed.\n", "\n", "### 4.2 Pre-allocate the output buffer (for `h5py`)\n", "\n", "Now, we can try to pre-allocate the output buffer and check if it helps:" ] }, { "cell_type": "code", "execution_count": 18, "id": "a643e63d-1709-45b1-aaad-a542792b5a62", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 18, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%%timeit -r1 -n1 -o -q\n", "#Reading all frames and decompressing them\n", "buffer = numpy.zeros(shape, dtype=dtype)\n", "with h5py.File(filename, \"r\") as h:\n", " ds = h[h5path]\n", " for i in range(nbframes):\n", " ds.read_direct(buffer, numpy.s_[i,:,:], numpy.s_[:,:])" ] }, { "cell_type": "code", "execution_count": 19, "id": "847240c6-f9df-4626-b1a3-2197f535897c", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Read speed: 3247.430 MB/s of uncompressed data, or 180.988 fps.\n" ] } ], "source": [ "timing_read1 = _\n", "print(f\"Read speed: {nbframes*numpy.prod(shape)*dtype.itemsize/(1e6*timing_read1.best):.3f} MB/s of uncompressed data,\\\n", " or {nbframes/timing_read1.best:.3f} fps.\")" ] }, { "cell_type": "code", "execution_count": 20, "id": "8613536f-6bd7-42ff-b401-f8ed80cdd8da", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ " Speed-up: 10.6 %\n" ] } ], "source": [ "print(f\" Speed-up: {(timing_read0.best/timing_read1.best-1)*100:.1f} %\")" ] }, { "cell_type": "markdown", "id": "4f9267a3-d056-4f91-a183-3dcfa0053885", "metadata": {}, "source": [ "The gain exists but it is not huge (10%).\n", "\n", "## 5. Decouple HDF5 chunk reading from decompression.\n", "\n", "We will benchmark separately the file reading (i.e. reading chunks one by one) and decompressing to check the maximum achievable read speed.\n", "\n", "### 5.1 Benchmarking of the chunk reading using the `read_direct_chunk` from `h5py`" ] }, { "cell_type": "code", "execution_count": 21, "id": "3492bfe6-c71e-40a2-98c4-7fd2cd7d7560", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 21, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%%timeit -r1 -n1 -o -q\n", "#Reading all frames without decompressing them\n", "with h5py.File(filename, \"r\") as h:\n", " ds = h[h5path]\n", " for i in range(ds.id.get_num_chunks()):\n", " filter_mask, chunk = ds.id.read_direct_chunk(ds.id.get_chunk_info(i).chunk_offset)" ] }, { "cell_type": "code", "execution_count": 22, "id": "95cccf4c-f63f-4ecb-9155-41e2a1821380", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Read speed: 10900.036 MB/s of compressed data.\n", "HDF5 direct chunk read speed: 4512.810 fps (without decompression).\n" ] } ], "source": [ "timing_read2 = _\n", "print(f\"Read speed: {size/(1e6*timing_read2.best):.3f} MB/s of compressed data.\")\n", "print(f\"HDF5 direct chunk read speed: {nbframes/timing_read2.best:.3f} fps (without decompression).\")" ] }, { "cell_type": "markdown", "id": "8e1a4654-45d3-4ade-bda5-1d33ebd125a0", "metadata": {}, "source": [ "The reading part data is really fast, it is apparently mostly by the disk speed or by the memory (if the compressed dataset stays in memory). " ] }, { "cell_type": "markdown", "id": "8969804e-77af-464e-8f9e-11e0e35541bd", "metadata": {}, "source": [ "### 5.2 Benchmarking of the decompression (single threaded)\n", "\n", "The function `decompress_bslz4_chunk` can be used to decompress one chunk.\n", "We benchmark it on one chunk" ] }, { "cell_type": "code", "execution_count": 23, "id": "341d4b27-bbc3-4010-a577-820d20f89d72", "metadata": {}, "outputs": [], "source": [ "def decompress_bslz4_chunk(payload, dtype, chunk_shape):\n", " \"\"\"This function decompresses ONE chunk with bitshuffle-LZ4. \n", " The library needs to be compiled without OpenMP when using threads !\n", " \n", " :param payload: string with the compressed data as read by h5py.\n", " :param dtype: data type of the stored content\n", " :param chunk_shape: shape of one chunk\n", " :return: decompressed chunk\"\"\"\n", " total_nbytes, block_nbytes = struct.unpack(\">QI\", payload[:12])\n", " block_size = block_nbytes // dtype.itemsize\n", "\n", " arr = numpy.frombuffer(payload, dtype=numpy.uint8, offset=12) # No copy here\n", " chunk_data = bitshuffle.decompress_lz4(arr, chunk_shape, dtype, block_size)\n", " return chunk_data" ] }, { "cell_type": "code", "execution_count": 24, "id": "e836bd52-7b5f-4535-81d4-2bfc1a5dfaf9", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Read chunk #123 which is 2605242 bytes long.\n", "The decompressed frame is 17942760 bytes long\n", "This frame is compressed with a ratio of 6.9 x.\n", "Benchmarking the decompression: 3.98 ms ± 5.97 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)\n", "Decompression speed (single threaded): 251.329 fps\n", "Maximum read+decompression speed (single threaded): 238.070 fps\n", "Maximum read+decompression speed (64-threads): 3524.092 fps\n" ] } ], "source": [ "frame_id = 123\n", "with h5py.File(filename, \"r\") as h:\n", " ds = h[h5path]\n", " filter_mask, chunk = ds.id.read_direct_chunk(ds.id.get_chunk_info(frame_id).chunk_offset)\n", " \n", "print(f\"Read chunk #{frame_id} which is {len(chunk)} bytes long.\")\n", "frame = decompress_bslz4_chunk(chunk, dtype, shape)\n", "print(f\"The decompressed frame is {frame.nbytes} bytes long\")\n", "print(f\"This frame is compressed with a ratio of {frame.nbytes/len(chunk):.1f} x.\")\n", "print(\"Benchmarking the decompression: \", end=\"\")\n", "timing_decompression = %timeit -o decompress_bslz4_chunk(chunk, dtype, shape)\n", "print(f\"Decompression speed (single threaded): {1/timing_decompression.best:.3f} fps\")\n", "print(f\"Maximum read+decompression speed (single threaded): {1/(timing_decompression.best+timing_read2.best/nbframes):.3f} fps\")\n", "print(f\"Maximum read+decompression speed ({nbthreads}-threads): {1/(timing_decompression.best/nbthreads+timing_read2.best/nbframes):.3f} fps\")" ] }, { "cell_type": "markdown", "id": "ec96510d-1648-4e59-8d34-cdf37997e711", "metadata": {}, "source": [ "At this stage it is interesting to compare the maximum achievable speed in parallel and the raw read speed.\n", "\n", "This difference is known as [Amdahl's law](https://en.wikipedia.org/wiki/Amdahl%27s_law) which states that performances of a parallel program become limited limited by the serial part of it when the number of threads increases. In other words, find all the serial sections and squeeze them to get performances.\n", "\n", "Some part of the code are made serial to prevent data corruption. One typical example are the commands seek+read in file. If several threads are doing this at the same time, the file-pointer is likely to be changed and the read will return the wrong data. Serializing this section, for example by using locks, mutex, semaphores, ... is a simple way to prevent such issues. Let's list some of the lock we have in this example:\n", "\n", "- `h5py` has a lock called `phil` which serializes the access to the HDF5 library\n", "- `HDF5` has a global lock preventing files from being modified from different processes\n", "- `Python` has a global interpreter lock `GIL` which ensures only one python object is manipulated at a time.\n", "\n", "The later is widely commented and an urban legend says it prevents multithreading in Python. \n", "You will at the end of the tutorial how much this is True (or not). \n" ] }, { "cell_type": "markdown", "id": "bc5939d1-5c25-4f35-9b46-e9684110dfc9", "metadata": {}, "source": [ "### 5.3 Benchmark the analysis of the HDF5 file\n", "\n", "To come back on the parallel reading, the different locks from `h5py` and `HDF5` are preventing us from a parallel access to the data.\n", "Can we dive deeper into the `HDF5` file and retrieve the position of the different chunks and their size ? \n", "If so, it would be possible read chunks without the `h5py`/`HDF5` library, working around their different locks.\n", "\n", "Let's check the parsing of the HDF5 structure of the dataset" ] }, { "cell_type": "code", "execution_count": 25, "id": "b301837a-9f6c-43d7-b1cc-f3dc64998bad", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Each chunk descriptor is an object like: \n", "StoreInfo(chunk_offset=(0, 0, 0), filter_mask=0, byte_offset=4536, size=1972400)\n", "It represents a very small amount of data: 72 bytes.\n", "All 4096 frames, weighting 9893.291 MB, can be represented by 327.960 kB\n" ] } ], "source": [ "with h5py.File(filename, \"r\") as h:\n", " ds = h[h5path]\n", " res = [ds.id.get_chunk_info(i) for i in range(ds.id.get_num_chunks())]\n", "print(f\"Each chunk descriptor is an object like: \\n{res[0]}\")\n", "print(f\"It represents a very small amount of data: {sys.getsizeof(res[0])} bytes.\")\n", "print(f\"All {nbframes} frames, weighting {size/1e6:.3f} MB, can be represented by {(sys.getsizeof(res)+sys.getsizeof(res[0])*nbframes)/1000:.3f} kB\")" ] }, { "cell_type": "code", "execution_count": 26, "id": "5201a9dd-fd9e-4e87-a2bf-f045b8029c08", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 26, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%%timeit -r1 -n1 -o -q\n", "#Parsing speed\n", "with h5py.File(filename, \"r\") as h:\n", " ds = h[h5path]\n", " res = [ds.id.get_chunk_info(i) for i in range(ds.id.get_num_chunks())]" ] }, { "cell_type": "code", "execution_count": 27, "id": "27a4d511-7aa0-491a-8f01-37125c9e2fe5", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Parse speed: 41444.687 MB/s of compressed data.\n", "HDF5 parse speed (without reading): 17158.845 fps.\n" ] } ], "source": [ "timing_parse = _\n", "print(f\"Parse speed: {size/(1e6*timing_parse.best):.3f} MB/s of compressed data.\")\n", "print(f\"HDF5 parse speed (without reading): {nbframes/timing_parse.best:.3f} fps.\")" ] }, { "cell_type": "code", "execution_count": 28, "id": "e112c3a7-060b-4cce-a6e1-6bb060bb21b8", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "db3ee77199ed8aef5aac34b91f86b3deac7bea66 using HDF5\n", "db3ee77199ed8aef5aac34b91f86b3deac7bea66 using direct file access\n" ] } ], "source": [ "# Validation that the data read by HDF5 and via the file interface matches\n", "import hashlib\n", "idx = 10\n", "with h5py.File(filename, \"r\") as h:\n", " ds = h[h5path]\n", " indexes = [ds.id.get_chunk_info(i) for i in range(ds.id.get_num_chunks())]\n", " filter_mask, ref = ds.id.read_direct_chunk(indexes[idx].chunk_offset)\n", "# and validate the indexes\n", "with open(filename, \"rb\") as f:\n", " item = indexes[idx]\n", " f.seek(item.byte_offset)\n", " res = f.read(item.size)\n", "print(f\"{hashlib.sha1(ref).hexdigest()} using HDF5\\n{hashlib.sha1(res).hexdigest()} using direct file access\")" ] }, { "cell_type": "markdown", "id": "cebd4b8d-f11c-4f65-9a8e-6f270ce0a9f4", "metadata": {}, "source": [ "So the HDF5 chunk parsing is the only part of the code needing to be serial, so the maximum achievable speed is very high: 12 kfps.\n", "\n", "If Amdahl's law is providing us with the upper performance limit, one should take care to optimize all the code to be run in parallel. \n", "\n", "Here are two ways to read the different chunks, either using the `Python file` interface or `numpy.memmap`. \n", "Their performances are expected to be similar to what `HDF5 direct chunk read` provides, the idea is to use them to bypass the locks in `HDF5`.\n", "\n", "### 5.4 Benchmark the chunk reading using the `h5py` direct chunk read" ] }, { "cell_type": "code", "execution_count": 29, "id": "4f5a31a7-b5ab-4d68-b55f-76f3159f3cb7", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 29, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%%timeit -r1 -n1 -o -q\n", "#Reading all frames without decompressing them\n", "with h5py.File(filename, \"r\") as h:\n", " ds = h[h5path]\n", " for chunk_descr in indexes:\n", " filter_mask, chunk = ds.id.read_direct_chunk(chunk_descr.chunk_offset)" ] }, { "cell_type": "code", "execution_count": 30, "id": "64a1861b-e09b-4753-afe0-16db72131408", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Read speed (h5py direct chunk read): 15994.548 MB/s of compressed data.\n", "Chunk read (from h5py) speed (without decompression): 6622.030 fps.\n" ] } ], "source": [ "timing_read2a = _\n", "print(f\"Read speed (h5py direct chunk read): {size/(1e6*timing_read2a.best):.3f} MB/s of compressed data.\")\n", "print(f\"Chunk read (from h5py) speed (without decompression): {nbframes/timing_read2a.best:.3f} fps.\")" ] }, { "cell_type": "markdown", "id": "177bd11f-1a31-4965-9ae1-23985a2ed084", "metadata": {}, "source": [ "### 5.5 Benchmark the chunk reading using the Python file interface" ] }, { "cell_type": "code", "execution_count": 31, "id": "e06a3939-fe2a-485f-b774-229c75565334", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 31, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%%timeit -r1 -n1 -o -q\n", "#Reading all frames without using the HDF5 library (neither decompressing them)\n", "with open(filename, \"rb\") as f:\n", " for chunk_descr in indexes:\n", " f.seek(chunk_descr.byte_offset)\n", " chunk = f.read(chunk_descr.size)" ] }, { "cell_type": "code", "execution_count": 32, "id": "ae59eaeb-0b0b-4f36-a4f7-990b7c61100f", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Read speed (Python file): 16127.053 MB/s of compressed data.\n", "File read (from Python) speed (without decompression): 6676.890 fps.\n", "Pure reading using the Python (file interface) is 0.8 % faster than HDF5 direct chunk read.\n", "But it removes the file-locking issue from HDF5 !\n" ] } ], "source": [ "timing_read3 = _\n", "print(f\"Read speed (Python file): {size/(1e6*timing_read3.best):.3f} MB/s of compressed data.\")\n", "print(f\"File read (from Python) speed (without decompression): {nbframes/timing_read3.best:.3f} fps.\")\n", "print(f\"Pure reading using the Python (file interface) is {100*timing_read2a.best/(timing_read3.best)-100:.1f} % faster than HDF5 direct chunk read.\")\n", "print(\"But it removes the file-locking issue from HDF5 !\")" ] }, { "cell_type": "markdown", "id": "47590db3-de63-4179-9921-1d88d6a0a64b", "metadata": {}, "source": [ "### 5.5 Benchmark the chunk reading using `numpy.memmap`" ] }, { "cell_type": "code", "execution_count": 33, "id": "59e6da20-bc46-42dd-830c-6f9b46c6e367", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 33, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%%timeit -r1 -n1 -o -q\n", "#Reading positions via HDF5 but chunks are read via numpy.memmap\n", "f = numpy.memmap(filename, mode=\"r\")\n", "for chunk_descr in indexes:\n", " chunk = numpy.array(f[chunk_descr.byte_offset:chunk_descr.byte_offset+chunk_descr.size])\n", "del f" ] }, { "cell_type": "code", "execution_count": 34, "id": "06209e35-1813-44f4-b340-b3e60b359a8d", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Read speed (numpy.memmap): 13048.649 MB/s of compressed data.\n", "File read (numpy.memmap) speed (without decompression): 5402.375 fps.\n", "Pure reading using the numpy.memmap is -18.4 % faster than using the h5py/HDF5 interface\n", "This removes the file-locking issue from HDF5 !\n" ] } ], "source": [ "timing_read4 = _\n", "print(f\"Read speed (numpy.memmap): {size/(1e6*timing_read4.best):.3f} MB/s of compressed data.\")\n", "print(f\"File read (numpy.memmap) speed (without decompression): {nbframes/timing_read4.best:.3f} fps.\")\n", "print(f\"Pure reading using the numpy.memmap is {100*timing_read2a.best/(timing_read4.best)-100:.1f} % faster than using the h5py/HDF5 interface\")\n", "print(\"This removes the file-locking issue from HDF5 !\")" ] }, { "cell_type": "markdown", "id": "3cdcc158-260c-445c-acbb-8da51e3be6ca", "metadata": {}, "source": [ "Numpy's memmap apprears to be much slow than the equivalent python file read.\n", "\n", "We found out that the reading of data, initially in the order of 1 minute can be decomposed into:\n", "\n", " * 0.3s for the reading of the chunk description\n", " * 1s for the reading of the chunks themselves\n", " * 1 minute for the decompression of the data.\n", "\n", "Two parallelization schemes appear clearly:\n", "1. read chunks in serial mode with h5py and decompress+integrate in parallel.\n", "2. read chunk descriptors in serial mode with h5py and parallelize the reading, decompression and integration.\n", "\n", "But befor we can investigate those two routes, we first need to establish some baseline for the complete serial processing: read, decompress, integrate.\n", "\n", "## 6. Azimuthal integration\n", "\n", "### 6.1 Serial workflow\n" ] }, { "cell_type": "markdown", "id": "9fccb178-94db-4bb4-a5e1-9cab60af0aeb", "metadata": {}, "source": [ "#### 6.1.1 Prepare the azimuthal integrator\n", "To allow the full parallelization of different integrators working in parallel, one must limit the number of Python call performed, this is why we need to extract the Cython integrator from AzimuthalIntegator. The integrator used here is a sparse matrix multiplication one with a CSC representation which is single-threaded. This engine is usually not the fastest but it is multitheading friendly.\n", "\n", "The figures obtained should be similar to the one obtaind in chapter 2, the overhead from the azimuthal integrator being tuned to be minimal." ] }, { "cell_type": "code", "execution_count": 35, "id": "0814ccb4-2906-44e6-9061-bb36c5139b3e", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Timing for the direct azimuthal integration: 37.1 ms ± 44.1 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)\n", "The maximum achievable integration speed on a single core is 27.041 fps which does not look fancy.\n", "But parallelized over 64 threads, it could reach: 1730.592 fps!\n" ] } ], "source": [ "geo = {\"detector\": det, \n", " \"wavelength\": 1e-10}\n", "ai = pyFAI.load(geo)\n", "omega = ai.solidAngleArray()\n", "res0 = ai.integrate1d(frame, nbins, method=(\"full\", \"csc\", \"cython\"))\n", "engine = ai.engines[res0.method].engine\n", "#This is how the engine works:\n", "res1 = engine.integrate_ng(frame, solidangle=omega)\n", "assert numpy.allclose(res0.intensity, res1.intensity) # validates the equivalence of both approaches:\n", "print(\"Timing for the direct azimuthal integration: \", end=\"\")\n", "timing_integration = %timeit -o engine.integrate_ng(frame, solidangle=omega)\n", "print(f\"The maximum achievable integration speed on a single core is {1/timing_integration.best:.3f} fps which does not look fancy.\")\n", "print(f\"But parallelized over {nbthreads} threads, it could reach: {nbthreads/timing_integration.best:.3f} fps!\")" ] }, { "cell_type": "markdown", "id": "6178d4b3-86f0-4428-a147-90cff651ebd3", "metadata": {}, "source": [ "### 6.1.2 Benchmarking of the serial workflow\n", "\n", "This code tries to be simple and elegant. \n", "It provides the reference values on the one hand and the baseline performances on the other." ] }, { "cell_type": "code", "execution_count": 36, "id": "59e5bc41-f960-47a3-a9d2-133c2131e60d", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 36, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%%timeit -o -r1 -n1 -q\n", "#Naive implementation ... read+integrate\n", "result0 = numpy.empty((nbframes, nbins), dtype=numpy.float32)\n", "method = (\"full\", \"csc\", \"cython\")\n", "\n", "with h5py.File(filename, \"r\") as h:\n", " ds = h[h5path]\n", " for i, frame in enumerate(ds):\n", " result0[i] = ai.integrate1d(frame, nbins, method=method).intensity" ] }, { "cell_type": "code", "execution_count": 37, "id": "ce61a7ec-577e-4e6a-bf59-e0cc8f60478d", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "A naive implementation provides only 22.895 fps.\n" ] } ], "source": [ "timing_naive = _\n", "# print(f\"The maximum achievable decompression+integration speed is {1/(timing_decompress.best+timing_integration.best):.3f} fps in serial \\n\\\n", "# and {nbthreads*1/(timing_decompress.best+timing_integration.best):.3f} fps in parallel on {nbthreads} threads\\n\\\n", "print(f\"A naive implementation provides only {nbframes/(timing_naive.best):.3f} fps.\")" ] }, { "cell_type": "markdown", "id": "0131a856-0a8b-4845-91a8-287abc0edf01", "metadata": {}, "source": [ "## 6.2 Pool of threads, queues, \n", "\n", "Unlike processes, threads share the same memory space (with the GIL preventing read/write collision).\n", "Threads are a great idea which allow multiple flow of execution to occure in parallel but threads come with a cost.\n", "Thus it is stupid to have as many threads as tasks to perform. \n", "It is better to have a limited number of threads, on the order of the number of cores, and have them processing several frames.\n", "\n", "We will define a pool of threads, a list of threads, started and ready to crunch some data.\n", "Communication between threads can be made via `Queues`.\n", "Each worker waits on the input-queue (`qin`) for something to process and puts the result into the output queue (`qout`).\n", "Since we want the processing to tidy up at the end, if a worker gets a `None` this means it is time to end the thread. \n", "This is sometimes called a \"kill-pill\". \n", "The `end_pool` function distributes as many \"kill-pills\" as needed to end all threads in the pool. \n", "\n", "In this section we define some tools to create and stop a pool of worker and also a dummy_worker which does nothing:" ] }, { "cell_type": "code", "execution_count": 38, "id": "def90b10-3d93-4051-98c4-6de6e7db3e37", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "None\n", "None\n", "None\n", "None\n" ] } ], "source": [ "# a few of utility functions\n", "def dummy_worker(qin, qout, funct=lambda item: item):\n", " \"\"\"Dummy worker which takes something in qin, applies funct on it and puts the result in qout\"\"\"\n", " while True:\n", " item = qin.get()\n", " if item is None:\n", " qout.put(None)\n", " qin.task_done()\n", " return\n", " qout.put(funct(item))\n", " qin.task_done()\n", "\n", "def build_pool(nbthreads, qin, qout, worker=None, funct=None):\n", " \"\"\"Build a pool of threads with workers, and starts them\"\"\"\n", " pool = []\n", " for i in range(nbthreads):\n", " if funct is not None:\n", " worker = dummy_worker\n", " thread = threading.Thread(target=worker, name=f\"{worker.__name__}_{i:02d}\", args=(qin, qout, funct))\n", " elif worker is None:\n", " worker = dummy_worker\n", " thread = threading.Thread(target=worker, name=f\"{worker.__name__}_{i:02d}\", args=(qin, qout))\n", " else:\n", " thread = threading.Thread(target=worker, name=f\"{worker.__name__}_{i:02d}\", args=(qin, qout, filename))\n", " thread.start()\n", " pool.append(thread)\n", " return pool\n", "\n", "def end_pool(pool):\n", " \"\"\"Ends all threads from a pool by sending them a \"kill-pill\"\"\"\n", " for thread in pool:\n", " qin.put(None)\n", "\n", "\n", "#Small validation to check it works: \n", "qin = Queue()\n", "qout = Queue()\n", "pool=build_pool(4, qin, qout, dummy_worker)\n", "end_pool(pool)\n", "qin.join()\n", "while not qout.empty():\n", " print(qout.get())\n", " qout.task_done()\n", "qout.join()" ] }, { "cell_type": "markdown", "id": "dc93cd8b-3f93-4f21-b8a9-97e4bd08ca93", "metadata": {}, "source": [ "### 6.3 Parallelize decompression + processing\n", "\n", "In this example, all chunks are read by the HDF5 library and put in a queue for the processing.\n", "As a consequence, all chunks are likely to be held in memory at the same time, which is equivalent of the size of the compressed HDF5 file, 10GB. \n", "This could be a problem on many computer and we choose to limit the number of chunks in memory to ~10x more than the number of threads.\n", "The implementation of the the slow-down mechanism is done via the size of the input queue (into which the reader puts chunks).\n" ] }, { "cell_type": "code", "execution_count": 39, "id": "b0cffcf6-ede0-4cf0-8e38-f247e8c15352", "metadata": {}, "outputs": [], "source": [ "def reader_chunks(filename, h5path, queue):\n", " \"\"\"Function reading the HDF5 file and enqueuing raw-chunks into the queue.\n", " \n", " :param filename: name of the HDF5 file\n", " :param h5path: path to the dataset within the HDF5 file\n", " :param queue: queue where to put read chunks\n", " :return: number of chunks\"\"\"\n", " with h5py.File(filename, \"r\") as h:\n", " ds = h[h5path]\n", " for i in range(ds.id.get_num_chunks()):\n", " filter_mask, chunk = ds.id.read_direct_chunk(ds.id.get_chunk_info(i).chunk_offset)\n", " if filter_mask==0:\n", " while queue.full():\n", " # slow down to prevent filling up memory\n", " os.sched_yield()\n", " queue.put(Item(i, chunk))\n", " return i+1" ] }, { "cell_type": "code", "execution_count": 40, "id": "098b0fc0-4c60-4e73-80a6-3a1e3f5ed867", "metadata": {}, "outputs": [], "source": [ "def decompress_integrate_funct(item):\n", " \"function to be used within a dummy_worker: takes an item and returns an item\"\n", " frame = decompress_bslz4_chunk(item.data, dtype, shape)\n", " return Item(item.index, engine.integrate_ng(frame, solidangle=omega).intensity)\n" ] }, { "cell_type": "code", "execution_count": 41, "id": "bf81e2e9-a2d0-45fb-8108-446549a27ba0", "metadata": {}, "outputs": [], "source": [ "def parallel_decompress_integrate(filename, h5path, nbthreads):\n", " qin = Queue(nbthreads*10)\n", " qout = Queue()\n", " pool = build_pool(nbthreads, qin, qout, funct=decompress_integrate_funct)\n", " nchunks = reader_chunks(filename, h5path, qin)\n", " output = numpy.empty((nchunks, nbins), numpy.float32)\n", " end_pool(pool)\n", " qin.join()\n", " while not qout.empty():\n", " item = qout.get()\n", " if item is not None:\n", " output[item.index] = item.data\n", " qout.task_done()\n", " qout.join()\n", " return output\n", " \n", "# parallel_decompress_integrate(filename, h5path, nbthreads)" ] }, { "cell_type": "code", "execution_count": 42, "id": "3b510f2f-49b3-41a2-8b71-81a41f725ec0", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Timimg of serial read (h5py direct) and 64x(decompression+integration): \n", "8.79 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n", "Direct read + // integration reaches 466.170 fps.\n", "The speed-up is 20.361x for a computer with 64.\n" ] } ], "source": [ "print(f\"Timimg of serial read (h5py direct) and {nbthreads}x(decompression+integration): \")\n", "timing_dcr = %timeit -o -r1 -n1 parallel_decompress_integrate(filename, h5path, nbthreads)\n", "print(f\"Direct read + // integration reaches {nbframes/(timing_dcr.best):.3f} fps.\")\n", "print(f\"The speed-up is {timing_naive.best/timing_dcr.best:.3f}x for a computer with {nbthreads} threads.\")" ] }, { "cell_type": "markdown", "id": "8cdf7557-f6fe-46d7-bdca-770e2e62c194", "metadata": {}, "source": [ "### 6.3 Parallelize read + decompression + processing\n", "We will now investigate the case where even the reading is made in the worker thread.\n", "One advantage is that all chunk-descriptions can be hosted in memory (hundreeds of kilobytes) and one does not need to take care of memory filling up with raw data.\n", "\n", "Here is the reader for such type of processing:" ] }, { "cell_type": "code", "execution_count": 43, "id": "82e58870-ebb0-45ff-aec2-f203f25a9f44", "metadata": {}, "outputs": [], "source": [ "def reader_descr(filename, h5path, queue):\n", " \"\"\"Function reading the HDF5 file and enqueuing chunk-descriptor into the queue.\n", " \n", " :param filename: name of the HDF5 file\n", " :param h5path: path to the dataset within the HDF5 file\n", " :param queue: queue where to put read chunks\n", " :return: number of chunks\"\"\"\n", " with h5py.File(filename, \"r\") as h:\n", " ds = h[h5path]\n", " for i in range(ds.id.get_num_chunks()):\n", " queue.put(Item(i, ds.id.get_chunk_info(i)))\n", " return i+1" ] }, { "cell_type": "code", "execution_count": 44, "id": "24158f69-2577-42c6-9cc1-8e9a1ad74581", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "The reader is providing performances close to those benchmarked at section #5.3:\n", "290 ms ± 32.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n", "It is measured 12.108 % slower.\n", "The reader is able to reach 15305.633 fps\n" ] } ], "source": [ "print(\"The reader is providing performances close to those benchmarked at section #5.3:\")\n", "timing_reader_descr = %timeit -o reader_descr(filename, h5path, Queue())\n", "print(f\"It is measured {100*(timing_reader_descr.best/timing_parse.best-1):.3f} % slower.\")\n", "print(f\"The reader is able to reach {nbframes/timing_reader_descr.best:.3f} fps\")" ] }, { "cell_type": "markdown", "id": "4fee001e-da99-4e3d-b266-0dd8fd18e08f", "metadata": {}, "source": [ "#### 6.3.1 Parallelize read + decompression + processing using the Python file interface" ] }, { "cell_type": "code", "execution_count": 45, "id": "13b0dd04-9f28-412f-98d6-1511eb163698", "metadata": {}, "outputs": [], "source": [ "def worker_python(qin, qout, filename):\n", " with open(filename, \"rb\") as f:\n", " while True:\n", " item = qin.get() \n", " qin.task_done()\n", " if item is None:\n", " return\n", " idx, chunk_descr = item\n", " f.seek(chunk_descr.byte_offset)\n", " chunk = f.read(chunk_descr.size)\n", " frame = decompress_bslz4_chunk(chunk, dtype, shape)\n", " qout.put(Item(idx, engine.integrate_ng(frame, solidangle=omega).intensity))\n", " del chunk, frame" ] }, { "cell_type": "code", "execution_count": 46, "id": "1147ec52-99be-474e-b7c8-1c456c05e091", "metadata": {}, "outputs": [], "source": [ "def parallel_read_decompress_integrate(filename, h5path, nbthreads, worker):\n", " qin = Queue()\n", " qout = Queue()\n", " pool = build_pool(nbthreads, qin, qout, worker=worker)\n", " nchunks = reader_descr(filename, h5path, qin)\n", " output = numpy.empty((nchunks, nbins), numpy.float32)\n", " end_pool(pool)\n", " qin.join()\n", " while not qout.empty():\n", " item = qout.get()\n", " if item is not None:\n", " output[item.index] = item.data\n", " qout.task_done()\n", " qout.join()\n", " return output" ] }, { "cell_type": "code", "execution_count": 47, "id": "72ee1bc4-5d8a-4eaf-99e8-a9323b0d1c93", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Timimg of serial descriptor read and 64x(read+decompression+integration): \n", "9.01 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n", "Parallel read+integration reaches 454.561 fps.\n", "The speed-up is 19.854x for a computer with 64.\n" ] } ], "source": [ "print(f\"Timimg of serial descriptor read and {nbthreads}x(read+decompression+integration): \")\n", "timing_python_file = %timeit -o -r1 -n1 parallel_read_decompress_integrate(filename, h5path, nbthreads, worker_python)\n", "print(f\"Parallel read+integration reaches {nbframes/(timing_python_file.best):.3f} fps.\")\n", "print(f\"The speed-up is {timing_naive.best/timing_python_file.best:.3f}x for a computer with {nbthreads} threads.\")" ] }, { "cell_type": "markdown", "id": "aa63fdcd-1d58-4ca1-b51f-6d0ab3bbaf0a", "metadata": {}, "source": [ "#### 6.3.1 Parallelize read + decompression + processing using the `numpy.memmap` interface" ] }, { "cell_type": "code", "execution_count": 48, "id": "8001b38d-1837-45a5-838c-15e12abd0995", "metadata": {}, "outputs": [], "source": [ "def worker_numpy(qin, qout, filename):\n", " f = numpy.memmap(filename, mode=\"r\")\n", " while True:\n", " item = qin.get() \n", " qin.task_done()\n", " if item is None:\n", " del f\n", " return\n", " idx, chunk_descr = item\n", " chunk = f[chunk_descr.byte_offset:chunk_descr.byte_offset+chunk_descr.size]\n", " frame = decompress_bslz4_chunk(chunk, dtype, shape)\n", " qout.put(Item(idx, engine.integrate_ng(frame, solidangle=omega).intensity))\n", " del chunk, frame" ] }, { "cell_type": "code", "execution_count": 49, "id": "a0906da8-4b10-452b-b4a5-9fa0f49159a9", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Timimg of serial descriptor read and 64x(read+decompression+integration): \n", "7.57 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n", "Parallel read+integration reaches 541.320 fps.\n", "The speed-up is 23.644x for a computer with 64.\n" ] } ], "source": [ "print(f\"Timimg of serial descriptor read and {nbthreads}x(read+decompression+integration): \")\n", "timing_numpy_file = %timeit -o -r1 -n1 parallel_read_decompress_integrate(filename, h5path, nbthreads, worker_numpy)\n", "print(f\"Parallel read+integration reaches {nbframes/(timing_numpy_file.best):.3f} fps.\")\n", "print(f\"The speed-up is {timing_naive.best/timing_numpy_file.best:.3f}x for a computer with {nbthreads} threads.\")" ] }, { "cell_type": "markdown", "id": "c314a9d6-4878-480a-8625-ba6b295bf96f", "metadata": {}, "source": [ "Effective implementation using multithreading:\n", "* One reader which reads the dataset chunk-by-chunk or descriptor by descriptor and makes them available via an input-queue, called `qin`.\n", "* A pool of workers: pool of the size of the number of cores. Each worker is doing the (reading of one chunk if needed), decompression of one chunk into one frame and the azimuthal integration of that frame. The integrated result is put into an output-queue, called `qout`.\n", "* 2 queues: `qin` and `qout`, the former could need to be limited in size to prevent the filling-up of the memory when complete chunks are put into it.\n", "* The gathering of the data is performed in the main thread (as does the reader). Each piece of data is associated with its index in the dataset using the Item named-tuple.\n", "\n", "Nota: I had a hard time to perform both reading and writing with HDF5 (even in different files). This is why the result is reconstructed in memory and the saving performed at the very end. Could be a bug in h5py." ] }, { "cell_type": "markdown", "id": "769fd274-e668-4065-ba8f-681a56906838", "metadata": {}, "source": [ "## 7. Display some results\n", "Since the input data were all synthetic and similar, no great science is expected from this... but one can ensure each frame differs slightly from the neighbors with a pattern of 500 frames. " ] }, { "cell_type": "code", "execution_count": 50, "id": "478cf30b-a1ca-4c62-8031-c364c2543816", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 4min 59s, sys: 13.1 s, total: 5min 12s\n", "Wall time: 7.84 s\n" ] }, { "data": { "text/plain": [ "" ] }, "execution_count": 50, "metadata": {}, "output_type": "execute_result" }, { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": { "needs_background": "light" }, "output_type": "display_data" } ], "source": [ "%time result = parallel_read_decompress_integrate(filename, h5path, nbthreads, worker_numpy)\n", "fig,ax = subplots(figsize=(8,8))\n", "ax.imshow(result)" ] }, { "cell_type": "markdown", "id": "d52a13fe-3265-4e74-866e-8056e105dce5", "metadata": {}, "source": [ "## 7. Evolution of the performances with the number of threads" ] }, { "cell_type": "code", "execution_count": 51, "id": "a89e908a-d950-48d6-8ff2-2b39cdda629e", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Using 64 threads\n", "8.58 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n", "8.64 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n", "7.38 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n", "Using 56 threads\n", "8.4 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n", "8.62 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n", "7.15 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n", "Using 48 threads\n", "6.54 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n", "7.93 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n", "7.49 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n", "Using 40 threads\n", "7.8 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n", "8.08 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n", "7.48 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n", "Using 36 threads\n", "7.48 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n", "8.34 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n", "8 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n", "Using 32 threads\n", "9.05 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n", "8.5 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n", "8.38 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n", "Using 28 threads\n", "9.91 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n", "9.14 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n", "8.05 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n", "Using 24 threads\n", "11.2 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n", "10.2 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n", "9.56 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n", "Using 20 threads\n", "13.8 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n", "11.7 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n", "10.9 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n", "Using 16 threads\n", "17.8 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n", "13.5 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n", "13.1 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n", "Using 12 threads\n", "24.3 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n", "17.5 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n", "17.1 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n", "Using 8 threads\n", "38 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n", "25.3 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n", "23.3 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n", "Using 4 threads\n", "1min 19s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n", "48.2 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n", "45.9 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n", "Using 2 threads\n", "2min 44s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n", "1min 35s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n", "1min 32s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n", "Using 1 threads\n", "5min 46s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n", "3min 5s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n", "3min ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n" ] } ], "source": [ "performances_h5py = {}\n", "performances_file = {}\n", "performances_memmap = {}\n", "for i in (64, 56, 48, 40, 36, 32, 28, 24, 20,16, 12, 8, 4, 2, 1):\n", " print(f\"Using {i} threads\")\n", " \n", " t = %timeit -r1 -n1 -o parallel_decompress_integrate(filename, h5path, i)\n", " performances_h5py[i] = nbframes/t.best\n", "\n", " t = %timeit -r1 -n1 -o parallel_read_decompress_integrate(filename, h5path, i, worker_python)\n", " performances_file[i] = nbframes/t.best\n", "\n", " t = %timeit -r1 -n1 -o parallel_read_decompress_integrate(filename, h5path, i, worker_numpy)\n", " performances_memmap[i] = nbframes/t.best\n" ] }, { "cell_type": "code", "execution_count": 56, "id": "1d6c9040-6de2-47b4-b598-528b96833de3", "metadata": {}, "outputs": [ { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": { "needs_background": "light" }, "output_type": "display_data" } ], "source": [ "fig, ax = subplots(figsize=(10,8))\n", "ax.plot(list(performances_h5py.keys()),list(performances_h5py.values()), \"o-\", label=\"h5py direct chunk\")\n", "ax.plot(list(performances_file.keys()),list(performances_file.values()), \"o-\", label=\"python file\")\n", "ax.plot(list(performances_memmap.keys()),list(performances_memmap.values()), \"o-\", label=\"numpy memmap\")\n", "ax.legend()\n", "ax.set_xlabel(\"Number of threads\")\n", "ax.set_ylabel(\"Number of frames processed by second (fps)\")\n", "ax.set_title(\"Performances to read HDF5 + azimuthal integration of 4Mpix images\")\n", "pass" ] }, { "cell_type": "markdown", "id": "094b673d-f373-431b-92ed-4c9ef9ff3c79", "metadata": {}, "source": [ "## 8. Conclusion\n", "\n", "Reading Bitshuffle-LZ4 data can be parallelized using multi-threading in Python. \n", "\n", "The procedure is a bit tedious but not out of reach for a Python programmer: few threads and a couple of queues. \n", "This burden is worth when coupling decompression with azimuthal integration to reduce the amount of data stored in memory.\n", "\n", "The performances obtained on a 64-core computer are close to what can be obtained from a GPU: ~500 fps\n", "The speed-up obtained with the procedure is 30x on a 64-core computer versus single threaded implementation, which demonstrates multithreading is worth the burden.\n", "\n", "One notices the effort for going arount the different locks from `HDF5` and `h5py` did not bring much more performances. This could be linked to a common limiting factor: the `GIL`. \n", "This demonstration shows multithreaded python is possible but the number of effectively parallel threads is \n", "limited around 40-48 threds on the 2x32 core computer. \n", "Other methods exists to have more simulaneous core: multiprocessing, but also GPU processing which exposed in other notebooks." ] }, { "cell_type": "markdown", "id": "0d2b8105-71d9-4440-93f9-0a30f8f507cc", "metadata": {}, "source": [ "Thanks again to the French-CRG for the computer." ] }, { "cell_type": "code", "execution_count": 53, "id": "b90f5458-9fd4-4d83-bfbd-017e3a9365dd", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Total processing time: 1988.758 s\n" ] } ], "source": [ "print(f\"Total processing time: {time.time()-start_time:.3f} s\")" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.12" } }, "nbformat": 4, "nbformat_minor": 5 }