Dahu Package¶
You can have a look at the man pages of the CLi tool: dahu-reprocess and dahu_server which is the Tang-device server.
This is the public API of dahu.
dahu.job¶
Data Analysis RPC server over Tango:
Contains the Job class which handles jobs. A static part of the class contains statistics of the class
-
class
Job
(name='plugin.Plugin', input_data={})[source]¶ Bases:
threading.Thread
Class Job
- Each instance will be a job
- Constructor takes an input data and generates the JobId
- Each instance will gave a “getOutput” method with optional join
- there could be a “join” method, waiting for the job to finish
- Each instance will have a “execute” method and returning a JobId
- Each instance will have a “setCallBack” method that stores the name of the external callback
- provide status of a job
- Each instance has an abort method which can be used to stop processing (or a server)
Static part: * keeps track of all jobs status * leave the time to job to initialize * static class retrieve job-instance, status, small-log … * does not manage workload of the computer, should be managed at the ExecPlugin level
Used for the tango binding
== class variables == dictPluginStatus[pluginName] = [“uninitialized”|”running”|”executed”|”failed”] dictJobs [JobId] = Job.Instance
== static methods == getJob(JobId)
RESERVED keywords from Thread: start, run, join, name, ident, is_alive, daemon
start is overridden with a call to the factory to instanciate the plugin
-
STATE_INEXISTANT
= 'inexistant'¶
-
STATE_UNINITIALIZED
= 'uninitialized'¶
-
STATE_STARTING
= 'starting'¶
-
STATE_RUNNING
= 'running'¶
-
STATE_SUCCESS
= 'success'¶
-
STATE_FAILURE
= 'failure'¶
-
STATE_ABORTED
= 'aborted'¶
-
STATE
= ['uninitialized', 'starting', 'running', 'success', 'failure', 'aborted']¶
-
join
(timeout=None)[source]¶ Wait until the thread terminates.
This blocks the calling thread until the thread whose join() method is called terminates – either normally or through an unhandled exception or until the optional timeout occurs.
When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call is_alive() after join() to decide whether a timeout happened – if the thread is still alive, the join() call timed out.
When the timeout argument is not present or None, the operation will block until the thread terminates.
A thread can be join()ed many times.
join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raises the same exception.
-
run
()[source]¶ Defines the sequence of execution of the plugin 1) the the state to “running” 2) sets the input data to the plugin 3) run the set-up 4) run the process 4) run the tear-down: always runs tear-down ! 5) run the call-backs
-
clean
(force=False, wait=True)[source]¶ Frees the memory associated with the plugin
Parameters: - force – Force garbage collection after clean-up
- wait – wait for job to be finished
-
synchronize
(timeout=None)¶ Wait until the thread terminates.
This blocks the calling thread until the thread whose join() method is called terminates – either normally or through an unhandled exception or until the optional timeout occurs.
When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call is_alive() after join() to decide whether a timeout happened – if the thread is still alive, the join() call timed out.
When the timeout argument is not present or None, the operation will block until the thread terminates.
A thread can be join()ed many times.
join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raises the same exception.
-
id
¶ Returns: JobId @rtype: integer
-
plugin
¶ Returns: the processing instance @rtype: python object
-
status
¶ Returns: status of the Job @rtype: string
-
input_data
¶ Returns the job input data
-
output_data
¶ Returns the job output data :param _bWait: shall we wait for the plugin to finish to retrieve output data: Yes by default. :type _bWait: boolean
-
name
¶
-
classmethod
synchronize_job
(jobId, timeout=None)[source]¶ Wait for all a specific jobs to finish.
Parameters: - jobId – identifier of the job … intg
- timeout – timeout in second to wait
Returns: status of the job
-
classmethod
getStatusFromID
(jobId)[source]¶ Retrieve the job (hence the plugin) status
Parameters: jobId (int) – the Job identification number Returns: the Job status @rtype: string
-
classmethod
getStatusFromId
(jobId)¶ Retrieve the job (hence the plugin) status
Parameters: jobId (int) – the Job identification number Returns: the Job status @rtype: string
-
classmethod
getJobFromID
(jobId)[source]¶ Retrieve the job (hence the plugin)
Parameters: jobId – the Job identification number Returns: the “Job instance”, which contains the plugin and the status @rtype: a Python object, instance of Job.
-
classmethod
getJobFromId
(jobId)¶ Retrieve the job (hence the plugin)
Parameters: jobId – the Job identification number Returns: the “Job instance”, which contains the plugin and the status @rtype: a Python object, instance of Job.
-
classmethod
cleanJobfromId
(jobId, forceGC=True)[source]¶ Frees the memory associated with the top level plugin
Parameters: - jobId (int) – the Job identification number
- forceGC (boolean) – Force garbage collection after clean-up
-
classmethod
cleanJobfromID
(jobId, forceGC=True)¶ Frees the memory associated with the top level plugin
Parameters: - jobId (int) – the Job identification number
- forceGC (boolean) – Force garbage collection after clean-up
-
classmethod
getDataOutputFromId
(jobId, as_JSON=False)[source]¶ Returns the Plugin Output Data :param jobId: job idenfier :type jobId: int :return: Job.DataOutput JSON string
-
classmethod
getDataOutputFromID
(jobId, as_JSON=False)¶ Returns the Plugin Output Data :param jobId: job idenfier :type jobId: int :return: Job.DataOutput JSON string
-
classmethod
getDataInputFromId
(jobId, as_JSON=False)[source]¶ Returns the Plugin Input Data :param jobId: job idenfier :type jobId: int :return: Job.DataInput JSON string
-
classmethod
getDataInputFromID
(jobId, as_JSON=False)¶ Returns the Plugin Input Data :param jobId: job idenfier :type jobId: int :return: Job.DataInput JSON string
-
classmethod
getErrorFromId
(jobId)[source]¶ Returns the error messages from plugin :param jobId: job idenfier :type jobId: int :return: error message as a string
-
classmethod
getErrorFromID
(jobId)¶ Returns the error messages from plugin :param jobId: job idenfier :type jobId: int :return: error message as a string
dahu.plugin¶
Data Analysis RPC server over Tango:
Definiton of plugins
-
class
Plugin
[source]¶ Bases:
object
A plugin is instanciated
- Gets its input parameters as a dictionary from the setup method
- Performs some work in the process
- Sets the result as output attribute, should be a dictionary
- The process can be an infinite loop or a server which can be aborted using the abort method
-
DEFAULT_SET_UP
= 'setup'¶
-
DEFAULT_PROCESS
= 'process'¶
-
DEFAULT_TEAR_DOWN
= 'teardown'¶
-
DEFAULT_ABORT
= 'abort'¶
-
REPROCESS_IGNORE
= []¶
-
setup
(kwargs=None)[source]¶ This is the second constructor to setup input variables and possibly initialize some objects
-
class
PluginFromFunction
[source]¶ Bases:
dahu.plugin.Plugin
Template class to build a plugin from a function
dahu.factory¶
Data Analysis RPC server over Tango:
Factory for the loading of plugins
-
class
Factory
(workdir=None, plugin_path=None)[source]¶ Bases:
object
This is a factory, it instanciates a plugin from it name
-
registry
= {}¶
-
modules
= {}¶
-
plugin_dirs
= {'/home/tester/dahu/build/lib/dahu/plugins': ['id15v2', 'id02', 'bm29', 'focus', 'id15', 'id31', 'example', 'pyfai']}¶
-
reg_sem
= <threading.Semaphore object>¶
-
dahu.server¶
dahu.cache¶
Data Analysis RPC server over Tango:
Class Cache for storing the data in a Borg
-
class
DataCache
(max_size=10)[source]¶ Bases:
dict
This class is a Borg : always returns the same values regardless to the instance of the object it is used as data storage for images … with a limit on the number of images to keep in memory.
-
has_key
(key)¶ D.__contains__(k) -> True if D has a key k, else False
-