Dahu Package¶
You can have a look at the man pages of the CLi tool: dahu-reprocess and dahu_server which is the Tang-device server.
This is the public API of dahu.
dahu.job¶
Data Analysis RPC server over Tango:
Contains the Job class which handles jobs. A static part of the class contains statistics of the class
- 
class Job(name='plugin.Plugin', input_data={})[source]¶
- Bases: - threading.Thread- Class Job - Each instance will be a job 
- Constructor takes an input data and generates the JobId 
- Each instance will gave a “getOutput” method with optional join 
- there could be a “join” method, waiting for the job to finish 
- Each instance will have a “execute” method and returning a JobId 
- Each instance will have a “setCallBack” method that stores the name of the external callback 
- provide status of a job 
- Each instance has an abort method which can be used to stop processing (or a server) 
 - Static part: * keeps track of all jobs status * leave the time to job to initialize * static class retrieve job-instance, status, small-log … * does not manage workload of the computer, should be managed at the ExecPlugin level - Used for the tango binding - == class variables == dictPluginStatus[pluginName] = [“uninitialized”|”running”|”executed”|”failed”] dictJobs [JobId] = Job.Instance - == static methods == getJob(JobId) - RESERVED keywords from Thread: start, run, join, name, ident, is_alive, daemon - start is overridden with a call to the factory to instanciate the plugin - 
STATE_INEXISTANT= 'inexistant'¶
 - 
STATE_UNINITIALIZED= 'uninitialized'¶
 - 
STATE_STARTING= 'starting'¶
 - 
STATE_RUNNING= 'running'¶
 - 
STATE_SUCCESS= 'success'¶
 - 
STATE_FAILURE= 'failure'¶
 - 
STATE_ABORTED= 'aborted'¶
 - 
STATE= ['uninitialized', 'starting', 'running', 'success', 'failure', 'aborted']¶
 - 
join(timeout=None)[source]¶
- Wait until the thread terminates. - This blocks the calling thread until the thread whose join() method is called terminates – either normally or through an unhandled exception or until the optional timeout occurs. - When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call is_alive() after join() to decide whether a timeout happened – if the thread is still alive, the join() call timed out. - When the timeout argument is not present or None, the operation will block until the thread terminates. - A thread can be join()ed many times. - join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raises the same exception. 
 - 
run()[source]¶
- Defines the sequence of execution of the plugin 1) the the state to “running” 2) sets the input data to the plugin 3) run the set-up 4) run the process 4) run the tear-down: always runs tear-down ! 5) run the call-backs 
 - 
clean(force=False, wait=True)[source]¶
- Frees the memory associated with the plugin - Parameters
- force – Force garbage collection after clean-up 
- wait – wait for job to be finished 
 
 
 - 
synchronize(timeout=None)¶
- Wait until the thread terminates. - This blocks the calling thread until the thread whose join() method is called terminates – either normally or through an unhandled exception or until the optional timeout occurs. - When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call is_alive() after join() to decide whether a timeout happened – if the thread is still alive, the join() call timed out. - When the timeout argument is not present or None, the operation will block until the thread terminates. - A thread can be join()ed many times. - join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raises the same exception. 
 - 
property id¶
- Returns
- JobId 
 - @rtype: integer 
 - 
property plugin¶
- Returns
- the processing instance 
 - @rtype: python object 
 - 
property status¶
- Returns
- status of the Job 
 - @rtype: string 
 - 
property input_data¶
- Returns the job input data 
 - 
property output_data¶
- Returns the job output data :param _bWait: shall we wait for the plugin to finish to retrieve output data: Yes by default. :type _bWait: boolean 
 - 
property name¶
- A string used for identification purposes only. - It has no semantics. Multiple threads may be given the same name. The initial name is set by the constructor. 
 - 
classmethod synchronize_job(jobId, timeout=None)[source]¶
- Wait for all a specific jobs to finish. - Parameters
- jobId – identifier of the job … intg 
- timeout – timeout in second to wait 
 
- Returns
- status of the job 
 
 - 
classmethod getStatusFromID(jobId)[source]¶
- Retrieve the job (hence the plugin) status - Parameters
- jobId (int) – the Job identification number 
- Returns
- the Job status 
 - @rtype: string 
 - 
classmethod getStatusFromId(jobId)¶
- Retrieve the job (hence the plugin) status - Parameters
- jobId (int) – the Job identification number 
- Returns
- the Job status 
 - @rtype: string 
 - 
classmethod getJobFromID(jobId)[source]¶
- Retrieve the job (hence the plugin) - Parameters
- jobId – the Job identification number 
- Returns
- the “Job instance”, which contains the plugin and the status 
 - @rtype: a Python object, instance of Job. 
 - 
classmethod getJobFromId(jobId)¶
- Retrieve the job (hence the plugin) - Parameters
- jobId – the Job identification number 
- Returns
- the “Job instance”, which contains the plugin and the status 
 - @rtype: a Python object, instance of Job. 
 - 
classmethod cleanJobfromId(jobId, forceGC=True)[source]¶
- Frees the memory associated with the top level plugin - Parameters
- jobId (int) – the Job identification number 
- forceGC (boolean) – Force garbage collection after clean-up 
 
 
 - 
classmethod cleanJobfromID(jobId, forceGC=True)¶
- Frees the memory associated with the top level plugin - Parameters
- jobId (int) – the Job identification number 
- forceGC (boolean) – Force garbage collection after clean-up 
 
 
 - 
classmethod getDataOutputFromId(jobId, as_JSON=False)[source]¶
- Returns the Plugin Output Data :param jobId: job idenfier :type jobId: int :return: Job.DataOutput JSON string 
 - 
classmethod getDataOutputFromID(jobId, as_JSON=False)¶
- Returns the Plugin Output Data :param jobId: job idenfier :type jobId: int :return: Job.DataOutput JSON string 
 - 
classmethod getDataInputFromId(jobId, as_JSON=False)[source]¶
- Returns the Plugin Input Data :param jobId: job idenfier :type jobId: int :return: Job.DataInput JSON string 
 - 
classmethod getDataInputFromID(jobId, as_JSON=False)¶
- Returns the Plugin Input Data :param jobId: job idenfier :type jobId: int :return: Job.DataInput JSON string 
 - 
classmethod getErrorFromId(jobId)[source]¶
- Returns the error messages from plugin :param jobId: job idenfier :type jobId: int :return: error message as a string 
 - 
classmethod getErrorFromID(jobId)¶
- Returns the error messages from plugin :param jobId: job idenfier :type jobId: int :return: error message as a string 
 
dahu.plugin¶
Data Analysis RPC server over Tango:
Definiton of plugins
- 
class Plugin[source]¶
- Bases: - object- A plugin is instanciated - Gets its input parameters as a dictionary from the setup method 
- Performs some work in the process 
- Sets the result as output attribute, should be a dictionary 
- The process can be an infinite loop or a server which can be aborted using the abort method 
 - 
DEFAULT_SET_UP= 'setup'¶
 - 
DEFAULT_PROCESS= 'process'¶
 - 
DEFAULT_TEAR_DOWN= 'teardown'¶
 - 
DEFAULT_ABORT= 'abort'¶
 - 
REPROCESS_IGNORE= []¶
 - 
setup(kwargs=None)[source]¶
- This is the second constructor to setup input variables and possibly initialize some objects 
 
- 
class PluginFromFunction[source]¶
- Bases: - dahu.plugin.Plugin- Template class to build a plugin from a function 
dahu.factory¶
Data Analysis RPC server over Tango:
Factory for the loading of plugins
- 
class Factory(workdir=None, plugin_path=None)[source]¶
- Bases: - object- This is a factory, it instanciates a plugin from it name - 
registry= {}¶
 - 
modules= {}¶
 - 
plugin_dirs= {'/users/kieffer/workspace-400/dahu/build/lib/dahu/plugins': ['__init__', 'id15v2', 'id02', 'pyfai', 'id02', 'id31', 'example', 'id15', 'sandbox', 'bm29', 'focus']}¶
 - 
reg_sem= <threading.Semaphore object>¶
 
- 
- 
register(klass, fqn=None)¶
- Register a class as a plugin which can be instanciated. - This can be used as a decorator - @plugin_factor.register - @param klass: class to be registered as a plugin @param fqn: fully qualified name @return klass 
dahu.server¶
- 
class DahuDS(cl, name)[source]¶
- Bases: - tango.device_server.LatestDeviceImpl- Tango device server launcher for Dahu server. - 
always_executed_hook(self) → None[source]¶
- Hook method. Default method to implement an action necessary on a device before any command is executed. This method can be redefined in sub-classes in case of the default behaviour does not fullfill the needs - Parameters : None Return : None - Throws : DevFailed This method does not throw exception but a redefined method can. 
 - 
read_attr_hardware(self, attr_list) → None[source]¶
- Read the hardware to return attribute value(s). Default method to implement an action necessary on a device to read the hardware involved in a a read attribute CORBA call. This method must be redefined in sub-classes in order to support attribute reading - Parameters :
- attr_list(sequence<int>) list of indices in the device object attribute vector
- of an attribute to be read. 
 
 - Return : None - Throws : DevFailed This method does not throw exception but a redefined method can. 
 - 
startJob(argin)[source]¶
- Starts a job - @param argin: 2-list [<Dahu plugin to execute>, <JSON serialized dict>] @return: jobID which is an int (-1 for error) 
 - 
finished_processing(job)[source]¶
- callback: when processing is done - @param job: instance of dahu.job.Job 
 - 
collectStatistics()[source]¶
- Retrieve some statistics on all Dahu-Jobs @return: a page of information about Dahu-jobs 
 - 
getJobOutput(jobId)[source]¶
- Retrieve XML output form a job @param jobId: name of the job @return: output from a job 
 - 
getJobInput(jobId)[source]¶
- Retrieve input from a job as JSON string @param jobId: identifier of the job (int) @return: JSON serialized input from a job 
 
- 
- 
class DahuDSClass(name)[source]¶
- Bases: - tango._tango.DeviceClass- 
class_property_list= {}¶
 - 
device_property_list= {'plugins_directory': [tango._tango.CmdArgType.DevString, 'Dahu plugins directory', []]}¶
 - 
cmd_list= {'abort': [[tango._tango.CmdArgType.DevLong, 'job id'], [tango._tango.CmdArgType.DevBoolean, '']], 'cleanJob': [[tango._tango.CmdArgType.DevLong, 'job id'], [tango._tango.CmdArgType.DevString, 'Message']], 'collectStatistics': [[tango._tango.CmdArgType.DevVoid, 'nothing needed'], [tango._tango.CmdArgType.DevVoid, 'Collect some statistics about jobs within Dahu']], 'getJobError': [[tango._tango.CmdArgType.DevLong, 'job id'], [tango._tango.CmdArgType.DevString, 'Error message']], 'getJobInput': [[tango._tango.CmdArgType.DevLong, 'job id'], [tango._tango.CmdArgType.DevString, '<JSON serialized dict>']], 'getJobOutput': [[tango._tango.CmdArgType.DevLong, 'job id'], [tango._tango.CmdArgType.DevString, '<JSON serialized dict>']], 'getJobState': [[tango._tango.CmdArgType.DevLong, 'job id'], [tango._tango.CmdArgType.DevString, 'job state']], 'getStatistics': [[tango._tango.CmdArgType.DevVoid, 'nothing needed'], [tango._tango.CmdArgType.DevString, 'Retrieve statistics about Dahu-jobs']], 'initPlugin': [[tango._tango.CmdArgType.DevString, 'plugin name'], [tango._tango.CmdArgType.DevString, 'Message']], 'listPlugins': [[tango._tango.CmdArgType.DevVoid, 'nothing needed'], [tango._tango.CmdArgType.DevString, 'prints the list of all plugin classes currently loaded']], 'startJob': [[tango._tango.CmdArgType.DevVarStringArray, '[<Dahu plugin to execute>, <JSON serialized dict>]'], [tango._tango.CmdArgType.DevLong, 'job id']], 'waitJob': [[tango._tango.CmdArgType.DevLong, 'job id'], [tango._tango.CmdArgType.DevString, 'job state']]}¶
 - 
attr_list= {'jobFailure': [[tango._tango.CmdArgType.DevLong, tango._tango.AttrDataFormat.SCALAR, tango._tango.AttrWriteType.READ]], 'jobSuccess': [[tango._tango.CmdArgType.DevLong, tango._tango.AttrDataFormat.SCALAR, tango._tango.AttrWriteType.READ]], 'serialize': [[tango._tango.CmdArgType.DevBoolean, tango._tango.AttrDataFormat.SCALAR, tango._tango.AttrWriteType.READ_WRITE]], 'statisticsCollected': [[tango._tango.CmdArgType.DevString, tango._tango.AttrDataFormat.SCALAR, tango._tango.AttrWriteType.READ]]}¶
 
- 
dahu.cache¶
Data Analysis RPC server over Tango:
Class Cache for storing the data in a Borg
- 
class DataCache(max_size=10)[source]¶
- Bases: - dict- This class is a Borg : always returns the same values regardless to the instance of the object it is used as data storage for images … with a limit on the number of images to keep in memory. - 
has_key(key)¶
- D.__contains__(k) -> True if D has a key k, else False 
 
-