1 """SpecWaitObject module
2
3 This module defines the classes for helper objects
4 designed for waiting specific events from Spec
5
6 Classes:
7 SpecWaitObject -- base class for Wait objects
8
9 Functions:
10 waitChannel -- wait for a channel update
11 waitReply -- wait for a reply
12 waitConnection -- wait for a connection
13 """
14
15 __author__ = 'Matias Guijarro'
16 __version__ = '1.0'
17
18 import weakref
19 import time
20 import types
21
22 import SpecEventsDispatcher
23 from SpecClient.SpecClientError import SpecClientError, SpecClientTimeoutError
24 import SpecConnectionsManager
25
26
28 """Waiting function
29
30 Arguments:
31 timeout -- waiting time in milliseconds
32 """
33 time.sleep(timeout/1000.0)
34
35
37 """Helper class for waiting specific events from Spec"""
54
55
57 """Callback triggered by a 'connected' event."""
58 self.isdisconnected = False
59
60
62 """Callback triggered by a 'disconnected' event."""
63 self.isdisconnected = True
64
65
66 - def waitReply(self, command, argsTuple, timeout = None):
67 """Wait for a reply from Spec
68
69 Arguments:
70 command -- method returning a replyID to be executed on the connection object
71 argsTuple -- tuple of arguments to be passed to the command
72 timeout -- optional timeout (defaults to None)
73 """
74 connection = self.connection()
75
76 if connection is not None:
77 try:
78 func = getattr(connection, command)
79 except:
80 return
81 else:
82 if callable(func):
83 func(*argsTuple)
84
85 self.wait(timeout = timeout)
86
87
89 """Wait for a channel update
90
91 Arguments:
92 chanName -- channel name
93 waitValue -- particular value to wait (defaults to None, meaning any value)
94 timeout -- optional timeout (defaults to None)
95 """
96 connection = self.connection()
97
98 if connection is not None:
99 self.channelWasUnregistered = False
100 channel = connection.getChannel(chanName)
101
102 if not channel.registered:
103 self.channelWasUnregistered = True
104 connection.registerChannel(chanName, self.channelUpdated)
105 else:
106 SpecEventsDispatcher.connect(channel, 'valueChanged', self.channelUpdated)
107
108 self.wait(waitValue = waitValue, timeout = timeout)
109
110 if self.channelWasUnregistered:
111 connection.unregisterChannel(chanName)
112
113
115 """Wait for the connection to Spec being established
116
117 Arguments:
118 timeout -- optional timeout (defaults to None)
119
120 Exceptions:
121 timeout -- raise a timeout exception on timeout
122 """
123 connection = self.connection()
124
125 if connection is not None:
126 t = 0
127
128 while self.isdisconnected:
129 SpecEventsDispatcher.dispatch()
130
131 t0 = time.time()
132 waitFunc(10)
133 t += (time.time() - t0)*1000
134
135 if timeout is not None and t >= timeout:
136 raise SpecClientTimeoutError
137
138
139 - def wait(self, waitValue = None, timeout = None):
140 """Block until the object's internal value gets updated
141
142 Arguments:
143 waitValue -- particular value to wait (defaults to None, meaning any value)
144 timeout -- optional timeout (defaults to None)
145
146 Exceptions:
147 timeout -- raise a timeout exception on timeout
148 """
149 t = 0
150 while not self.isdisconnected:
151 SpecEventsDispatcher.dispatch()
152
153 if self.value is not None:
154 if waitValue is None:
155 return
156
157 if waitValue == self.value:
158 return
159 else:
160 self.value = None
161
162 if self.value is None:
163 t0 = time.time()
164 waitFunc(10)
165 t += (time.time() - t0)*1000
166
167 if timeout is not None and t >= timeout:
168 raise SpecClientTimeoutError
169
170 try:
171 P = getattr(SpecConnectionsManager.SpecConnectionsManager(), "poll")
172 except AttributeError:
173 pass
174 else:
175 P()
176
177
179 """Callback triggered by a reply from Spec."""
180 self.value = reply.getValue()
181
182 if reply.error:
183 raise SpecClientError('Server request did not complete: %s' % self.value, reply.error_code)
184
185
187 """Callback triggered by a channel update
188
189 If channel was unregistered, we skip the first update,
190 else we update our internal value
191 """
192 if self.channelWasUnregistered == True:
193
194
195
196 self.channelWasUnregistered = 2
197 else:
198 self.value = channelValue
199
200
215
216
218 """Wait for a channel to be updated
219
220 Arguments:
221 chanName -- channel name (e.g 'var/toto')
222 connection -- a 'host:port' string
223 waitValue -- value to wait (defaults to None)
224 timeout -- optional timeout (defaults to None)
225 """
226 if type(connection) in (types.UnicodeType, types.StringType):
227 connection = str(connection)
228 from SpecClient.SpecConnectionsManager import SpecConnectionsManager
229 connection = SpecConnectionsManager().getConnection(connection)
230 waitConnection(connection, timeout = timeout)
231
232 w = SpecWaitObject(connection)
233 w.waitChannelUpdate(chanName, waitValue = waitValue, timeout = timeout)
234
235 return w.value
236
237
238 -def waitReply(connection, command, argsTuple, timeout = None):
239 """Wait for a reply from a remote Spec server
240
241 Arguments:
242 connection -- a 'host:port' string
243 command -- command to execute
244 argsTuple -- tuple of arguments for the command
245 timeout -- optional timeout (defaults to None)
246 """
247 if type(connection) in (types.UnicodeType, types.StringType):
248 connection = str(connection)
249 from SpecClient.SpecConnectionsManager import SpecConnectionsManager
250 connection = SpecConnectionsManager().getConnection(connection)
251 waitConnection(connection, timeout = timeout)
252
253 w = SpecWaitObject(connection)
254 w.waitReply(command, argsTuple, timeout=timeout)
255
256 return w.value
257