Source code for silx.gui.plot.tools.profile.ScatterProfileToolBar
# coding: utf-8
# /*##########################################################################
#
# Copyright (c) 2018 European Synchrotron Radiation Facility
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# ###########################################################################*/
"""This module profile tools for scatter plots.
"""
__authors__ = ["T. Vincent"]
__license__ = "MIT"
__date__ = "28/06/2018"
import logging
import threading
import time
import numpy
try:
from scipy.interpolate import LinearNDInterpolator
except ImportError:
LinearNDInterpolator = None
# Fallback using local Delaunay and matplotlib interpolator
from silx.third_party.scipy_spatial import Delaunay
import matplotlib.tri
from ._BaseProfileToolBar import _BaseProfileToolBar
from .... import qt
from ... import items
_logger = logging.getLogger(__name__)
# TODO support log scale
class _InterpolatorInitThread(qt.QThread):
"""Thread building a scatter interpolator
This works in greedy mode in that the signal is only emitted
when no other request is pending
"""
sigInterpolatorReady = qt.Signal(object)
"""Signal emitted whenever an interpolator is ready
It provides a 3-tuple (points, values, interpolator)
"""
_RUNNING_THREADS_TO_DELETE = []
"""Store reference of no more used threads but still running"""
def __init__(self):
super(_InterpolatorInitThread, self).__init__()
self._lock = threading.RLock()
self._pendingData = None
self._firstFallbackRun = True
def discard(self, obj=None):
"""Wait for pending thread to complete and delete then
Connect this to the destroyed signal of widget using this thread
"""
if self.isRunning():
self.cancel()
self._RUNNING_THREADS_TO_DELETE.append(self) # Keep a reference
self.finished.connect(self.__finished)
def __finished(self):
"""Handle finished signal of threads to delete"""
try:
self._RUNNING_THREADS_TO_DELETE.remove(self)
except ValueError:
_logger.warning('Finished thread no longer in reference list')
def request(self, points, values):
"""Request new initialisation of interpolator
:param numpy.ndarray points: Point coordinates (N, D)
:param numpy.ndarray values: Values the N points (1D array)
"""
with self._lock:
# Possibly replace already pending data
self._pendingData = points, values
if not self.isRunning():
self.start()
def cancel(self):
"""Cancel any running/pending requests"""
with self._lock:
self._pendingData = 'cancelled'
def run(self):
"""Run the init of the scatter interpolator"""
if LinearNDInterpolator is None:
self.run_matplotlib()
else:
self.run_scipy()
def run_matplotlib(self):
"""Run the init of the scatter interpolator"""
if self._firstFallbackRun:
self._firstFallbackRun = False
_logger.warning(
"scipy.spatial.LinearNDInterpolator not available: "
"Scatter plot interpolator initialisation can freeze the GUI.")
while True:
with self._lock:
data = self._pendingData
self._pendingData = None
if data in (None, 'cancelled'):
return
points, values = data
startTime = time.time()
try:
delaunay = Delaunay(points)
except:
_logger.warning(
"Cannot triangulate scatter data")
else:
with self._lock:
data = self._pendingData
if data is not None: # Break point
_logger.info('Interpolator discarded after %f s',
time.time() - startTime)
else:
x, y = points.T
triangulation = matplotlib.tri.Triangulation(
x, y, triangles=delaunay.simplices)
interpolator = matplotlib.tri.LinearTriInterpolator(
triangulation, values)
with self._lock:
data = self._pendingData
if data is not None:
_logger.info('Interpolator discarded after %f s',
time.time() - startTime)
else:
# No other processing requested: emit the signal
_logger.info("Interpolator initialised in %f s",
time.time() - startTime)
# Wrap interpolator to have same API as scipy's one
def wrapper(points):
return interpolator(*points.T)
self.sigInterpolatorReady.emit(
(points, values, wrapper))
def run_scipy(self):
"""Run the init of the scatter interpolator"""
while True:
with self._lock:
data = self._pendingData
self._pendingData = None
if data in (None, 'cancelled'):
return
points, values = data
startTime = time.time()
try:
interpolator = LinearNDInterpolator(points, values)
except:
_logger.warning(
"Cannot initialise scatter profile interpolator")
else:
with self._lock:
data = self._pendingData
if data is not None: # Break point
_logger.info('Interpolator discarded after %f s',
time.time() - startTime)
else:
# First call takes a while, do it here
interpolator([(0., 0.)])
with self._lock:
data = self._pendingData
if data is not None:
_logger.info('Interpolator discarded after %f s',
time.time() - startTime)
else:
# No other processing requested: emit the signal
_logger.info("Interpolator initialised in %f s",
time.time() - startTime)
self.sigInterpolatorReady.emit(
(points, values, interpolator))
[docs]class ScatterProfileToolBar(_BaseProfileToolBar):
"""QToolBar providing scatter plot profiling tools
:param parent: See :class:`QToolBar`.
:param plot: :class:`~silx.gui.plot.PlotWidget` on which to operate.
:param str title: See :class:`QToolBar`.
"""
def __init__(self, parent=None, plot=None, title='Scatter Profile'):
super(ScatterProfileToolBar, self).__init__(parent, plot, title)
self.__nPoints = 1024
self.__interpolator = None
self.__interpolatorCache = None # points, values, interpolator
self.__initThread = _InterpolatorInitThread()
self.destroyed.connect(self.__initThread.discard)
self.__initThread.sigInterpolatorReady.connect(
self.__interpolatorReady)
roiManager = self._getRoiManager()
if roiManager is None:
_logger.error(
"Error during scatter profile toolbar initialisation")
else:
roiManager.sigInteractiveModeStarted.connect(
self.__interactionStarted)
roiManager.sigInteractiveModeFinished.connect(
self.__interactionFinished)
if roiManager.isStarted():
self.__interactionStarted(roiManager.getCurrentInteractionModeRoiClass())
def __interactionStarted(self, roiClass):
"""Handle start of ROI interaction"""
plot = self.getPlotWidget()
if plot is None:
return
plot.sigActiveScatterChanged.connect(self.__activeScatterChanged)
scatter = plot._getActiveItem(kind='scatter')
legend = None if scatter is None else scatter.getLegend()
self.__activeScatterChanged(None, legend)
def __interactionFinished(self):
"""Handle end of ROI interaction"""
plot = self.getPlotWidget()
if plot is None:
return
plot.sigActiveScatterChanged.disconnect(self.__activeScatterChanged)
scatter = plot._getActiveItem(kind='scatter')
legend = None if scatter is None else scatter.getLegend()
self.__activeScatterChanged(legend, None)
def __activeScatterChanged(self, previous, legend):
"""Handle change of active scatter
:param Union[str,None] previous:
:param Union[str,None] legend:
"""
self.__initThread.cancel()
# Reset interpolator
self.__interpolator = None
plot = self.getPlotWidget()
if plot is None:
_logger.error("Associated PlotWidget no longer exists")
else:
if previous is not None: # Disconnect signal
scatter = plot.getScatter(previous)
if scatter is not None:
scatter.sigItemChanged.disconnect(
self.__scatterItemChanged)
if legend is not None:
scatter = plot.getScatter(legend)
if scatter is None:
_logger.error("Cannot retrieve active scatter")
else:
scatter.sigItemChanged.connect(self.__scatterItemChanged)
points = numpy.transpose(numpy.array((
scatter.getXData(copy=False),
scatter.getYData(copy=False))))
values = scatter.getValueData(copy=False)
self.__updateInterpolator(points, values)
# Refresh profile
self.updateProfile()
def __scatterItemChanged(self, event):
"""Handle update of active scatter plot item
:param ItemChangedType event:
"""
if event == items.ItemChangedType.DATA:
self.__interpolator = None
scatter = self.sender()
if scatter is None:
_logger.error("Cannot retrieve updated scatter item")
else:
points = numpy.transpose(numpy.array((
scatter.getXData(copy=False),
scatter.getYData(copy=False))))
values = scatter.getValueData(copy=False)
self.__updateInterpolator(points, values)
# Handle interpolator init thread
def __updateInterpolator(self, points, values):
"""Update used interpolator with new data"""
if (self.__interpolatorCache is not None and
len(points) == len(self.__interpolatorCache[0]) and
numpy.all(numpy.equal(self.__interpolatorCache[0], points)) and
numpy.all(numpy.equal(self.__interpolatorCache[1], values))):
# Reuse previous interpolator
_logger.info(
'Scatter changed: Reuse previous interpolator')
self.__interpolator = self.__interpolatorCache[2]
else:
# Interpolator needs update: Start background processing
_logger.info(
'Scatter changed: Rebuild interpolator')
self.__interpolator = None
self.__interpolatorCache = None
self.__initThread.request(points, values)
def __interpolatorReady(self, data):
"""Handle end of init interpolator thread"""
points, values, interpolator = data
self.__interpolator = interpolator
self.__interpolatorCache = None if interpolator is None else data
self.updateProfile()
def hasPendingOperations(self):
return self.__initThread.isRunning()
# Number of points
[docs] def getNPoints(self):
"""Returns the number of points of the profiles
:rtype: int
"""
return self.__nPoints
[docs] def setNPoints(self, npoints):
"""Set the number of points of the profiles
:param int npoints:
"""
npoints = int(npoints)
if npoints < 1:
raise ValueError("Unsupported number of points: %d" % npoints)
else:
self.__nPoints = npoints
# Overridden methods
def computeProfileTitle(self, x0, y0, x1, y1):
"""Compute corresponding plot title
:param float x0: Profile start point X coord
:param float y0: Profile start point Y coord
:param float x1: Profile end point X coord
:param float y1: Profile end point Y coord
:return: Title to use
:rtype: str
"""
if self.hasPendingOperations():
return 'Pre-processing data...'
else:
return super(ScatterProfileToolBar, self).computeProfileTitle(
x0, y0, x1, y1)
def computeProfile(self, x0, y0, x1, y1):
"""Compute corresponding profile
:param float x0: Profile start point X coord
:param float y0: Profile start point Y coord
:param float x1: Profile end point X coord
:param float y1: Profile end point Y coord
:return: (points, values) profile data or None
"""
if self.__interpolator is None:
return None
nPoints = self.getNPoints()
points = numpy.transpose((
numpy.linspace(x0, x1, nPoints, endpoint=True),
numpy.linspace(y0, y1, nPoints, endpoint=True)))
values = self.__interpolator(points)
if not numpy.any(numpy.isfinite(values)):
return None # Profile outside convex hull
return points, values