Package SpecClient :: Module SpecConnection
[hide private]
[frames] | no frames]

Source Code for Module SpecClient.SpecConnection

  1  #$Id: SpecConnection.py,v 1.11 2005/12/09 10:32:24 guijarro Exp $ 
  2  """SpecConnection module 
  3   
  4  Low-level module for communicating with a 
  5  remove Spec server 
  6   
  7  Classes : 
  8  SpecClientNotConnectedError -- exception class 
  9  SpecConnection 
 10  SpecConnectionDispatcher 
 11  """ 
 12   
 13  __author__ = 'Matias Guijarro' 
 14  __version__ = '1.0' 
 15   
 16  import asyncore 
 17  import socket 
 18  import weakref 
 19  import string 
 20  import logging 
 21  import time 
 22  from SpecClient.SpecClientError import SpecClientNotConnectedError 
 23  import SpecEventsDispatcher 
 24  import SpecChannel 
 25  import SpecMessage 
 26  import SpecReply 
 27   
 28  asyncore.dispatcher.ac_in_buffer_size = 32768 #32 ko input buffer 
 29   
 30  (DISCONNECTED, PORTSCANNING, WAITINGFORHELLO, CONNECTED) = (1,2,3,4) 
 31  (MIN_PORT, MAX_PORT) = (6510, 6530) 
 32   
33 -class SpecConnection:
34 """Represent a connection to a remote Spec 35 36 Signals: 37 connected() -- emitted when the required Spec version gets connected 38 disconnected() -- emitted when the required Spec version gets disconnected 39 replyFromSpec(reply id, SpecReply object) -- emitted when a reply comes from the remote Spec 40 error(error code) -- emitted when an error event is received from the remote Spec 41 """
42 - def __init__(self, *args):
43 """Constructor""" 44 self.dispatcher = SpecConnectionDispatcher(*args) 45 46 SpecEventsDispatcher.connect(self.dispatcher, 'connected', self.connected) 47 SpecEventsDispatcher.connect(self.dispatcher, 'disconnected', self.disconnected) 48 #SpecEventsDispatcher.connect(self.dispatcher, 'replyFromSpec', self.replyFromSpec) 49 SpecEventsDispatcher.connect(self.dispatcher, 'error', self.error)
50 51
52 - def __str__(self):
53 return str(self.dispatcher)
54 55
56 - def __getattr__(self, attr):
57 """Delegate access to the underlying SpecConnectionDispatcher object""" 58 if not attr.startswith('__'): 59 return getattr(self.dispatcher, attr) 60 else: 61 raise AttributeError
62 63
64 - def connected(self):
65 """Propagate 'connection' event""" 66 SpecEventsDispatcher.emit(self, 'connected', ())
67 68
69 - def disconnected(self):
70 """Propagate 'disconnection' event""" 71 SpecEventsDispatcher.emit(self, 'disconnected', ())
72 73 74 #def replyFromSpec(self, replyID, reply): 75 # """Propagate 'reply from Spec' event""" 76 # SpecEventsDispatcher.emit(self, 'replyFromSpec', (replyID, reply, )) 77 78
79 - def error(self, error):
80 """Propagate 'error' event""" 81 SpecEventsDispatcher.emit(self, 'error', (error, ))
82 83
84 -class SpecConnectionDispatcher(asyncore.dispatcher):
85 """SpecConnection class 86 87 Signals: 88 connected() -- emitted when the required Spec version gets connected 89 disconnected() -- emitted when the required Spec version gets disconnected 90 replyFromSpec(reply id, SpecReply object) -- emitted when a reply comes from the remote Spec 91 error(error code) -- emitted when an error event is received from the remote Spec 92 """
93 - def __init__(self, specVersion):
94 """Constructor 95 96 Arguments: 97 specVersion -- a 'host:port' string 98 """ 99 asyncore.dispatcher.__init__(self) 100 101 self.state = DISCONNECTED 102 self.connected = False 103 self.receivedStrings = [] 104 self.message = None 105 self.serverVersion = None 106 self.scanport = False 107 self.scanname = '' 108 self.registeredChannels = {} 109 self.registeredReplies = {} 110 self.sendq = [] 111 self.outputStrings = [] 112 self.simulationMode = False 113 self.valid_socket = False 114 115 # some shortcuts 116 self.macro = self.send_msg_cmd_with_return 117 self.macro_noret = self.send_msg_cmd 118 self.abort = self.send_msg_abort 119 120 tmp = str(specVersion).split(':') 121 self.host = tmp[0] 122 123 if len(tmp) > 1: 124 self.port = tmp[1] 125 else: 126 self.port = 6789 127 128 try: 129 self.port = int(self.port) 130 except: 131 self.scanname = self.port 132 self.port = None 133 self.scanport = True 134 135 # 136 # register 'service' channels 137 # 138 self.registerChannel('error', self.error, dispatchMode = SpecEventsDispatcher.FIREEVENT) 139 self.registerChannel('status/simulate', self.simulationStatusChanged)
140
141 - def __str__(self):
142 return '<connection to Spec, host=%s, port=%s>' % (self.host, self.port or self.scanname)
143
144 - def set_socket(self, s):
145 self.valid_socket = True 146 asyncore.dispatcher.set_socket(self, s)
147
148 - def makeConnection(self):
149 """Establish a connection to Spec 150 151 If the connection is already established, do nothing. 152 Otherwise, create a socket object and try to connect. 153 If we are in port scanning mode, try to connect using 154 a port defined in the range from MIN_PORT to MAX_PORT 155 """ 156 if not self.connected: 157 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 158 s.settimeout(0.2) 159 if self.scanport: 160 if self.port is None or self.port > MAX_PORT: 161 self.port = MIN_PORT 162 else: 163 self.port += 1 164 while not self.scanport or self.port < MAX_PORT: 165 try: 166 if s.connect_ex( (self.host, self.port) ) == 0: 167 self.set_socket(s) 168 self.handle_connect() 169 break 170 except socket.error, err: 171 pass #exception could be 'host not found' for example, we ignore it 172 if self.scanport: 173 self.port += 1 174 else: 175 break
176
177 - def registerChannel(self, chanName, receiverSlot, registrationFlag = SpecChannel.DOREG, dispatchMode = SpecEventsDispatcher.UPDATEVALUE):
178 """Register a channel 179 180 Tell the remote Spec we are interested in receiving channel update events. 181 If the channel is not already registered, create a new SpecChannel object, 182 and connect the channel 'valueChanged' signal to the receiver slot. If the 183 channel is already registered, simply add a connection to the receiver 184 slot. 185 186 Arguments: 187 chanName -- a string representing the channel name, i.e. 'var/toto' 188 receiverSlot -- any callable object in Python 189 190 Keywords arguments: 191 registrationFlag -- internal flag 192 dispatchMode -- can be SpecEventsDispatcher.UPDATEVALUE (default) or SpecEventsDispatcher.FIREEVENT, 193 depending on how the receiver slot will be called. UPDATEVALUE means we don't mind skipping some 194 channel update events as long as we got the last one (for example, a motor position). FIREEVENT means 195 we want to call the receiver slot for every event. 196 """ 197 if dispatchMode is None: 198 return 199 200 chanName = str(chanName) 201 202 if not chanName in self.registeredChannels: 203 newChannel = SpecChannel.SpecChannel(self, chanName, registrationFlag) 204 self.registeredChannels[chanName] = newChannel 205 206 SpecEventsDispatcher.connect(self.registeredChannels[chanName], 'valueChanged', receiverSlot, dispatchMode) 207 208 channelValue = self.registeredChannels[chanName].value 209 if channelValue is not None: 210 # we received a value, so emit an update signal 211 self.registeredChannels[chanName].update(channelValue)
212 213
214 - def unregisterChannel(self, chanName):
215 """Unregister a channel 216 217 Arguments: 218 chanName -- a string representing the channel to unregister, i.e. 'var/toto' 219 """ 220 chanName = str(chanName) 221 222 if chanName in self.registeredChannels: 223 self.registeredChannels[chanName].unregister() 224 del self.registeredChannels[chanName]
225 226
227 - def getChannel(self, chanName):
228 """Return a channel object 229 230 If the required channel is already registered, return it. 231 Otherwise, return a new 'temporary' unregistered SpecChannel object ; 232 reference should be kept in the caller or the object will get dereferenced. 233 234 Arguments: 235 chanName -- a string representing the channel name, i.e. 'var/toto' 236 """ 237 if not chanName in self.registeredChannels: 238 # return a newly created temporary SpecChannel object, without registering 239 return SpecChannel.SpecChannel(self, chanName, SpecChannel.DONTREG) 240 241 return self.registeredChannels[chanName]
242 243
244 - def error(self, error):
245 """Emit the 'error' signal when the remote Spec version signals an error.""" 246 logging.getLogger('SpecClient').error('Error from Spec: %s', error) 247 248 SpecEventsDispatcher.emit(self, 'error', (error, ))
249 250
251 - def simulationStatusChanged(self, simulationMode):
252 self.simulationMode = simulationMode
253 254
255 - def isSpecConnected(self):
256 """Return True if the remote Spec version is connected.""" 257 return self.state == CONNECTED
258 259
260 - def specConnected(self):
261 """Emit the 'connected' signal when the remote Spec version is connected.""" 262 old_state = self.state 263 self.state = CONNECTED 264 if old_state != CONNECTED: 265 logging.getLogger('SpecClient').info('Connected to %s:%s', self.host, (self.scanport and self.scanname) or self.port) 266 267 SpecEventsDispatcher.emit(self, 'connected', ())
268
269 - def specDisconnected(self):
270 """Emit the 'disconnected' signal when the remote Spec version is disconnected.""" 271 SpecEventsDispatcher.dispatch() 272 273 old_state = self.state 274 self.state = DISCONNECTED 275 if old_state == CONNECTED: 276 logging.getLogger('SpecClient').info('Disconnected from %s:%s', self.host, (self.scanport and self.scanname) or self.port) 277 278 SpecEventsDispatcher.emit(self, 'disconnected', ())
279
280 - def handle_close(self):
281 """Handle 'close' event on socket.""" 282 self.connected = False 283 self.serverVersion = None 284 if self.socket: 285 self.close() 286 self.valid_socket = False 287 self.specDisconnected()
288 289
290 - def disconnect(self):
291 """Disconnect from the remote Spec version.""" 292 self.handle_close()
293 294
295 - def handle_error(self):
296 """Handle an uncaught error.""" 297 return
298 299
300 - def handle_read(self):
301 """Handle 'read' events on socket 302 303 Messages are built from the read calls on the socket. 304 """ 305 self.receivedStrings.append(self.recv(32768)) #read at most all the input buffer 306 s = ''.join(self.receivedStrings) 307 sbuffer = buffer(s) 308 consumedBytes = 0 309 offset = 0 310 311 while offset < len(sbuffer): 312 if self.message is None: 313 self.message = SpecMessage.message(version = self.serverVersion) 314 315 consumedBytes = self.message.readFromStream(sbuffer[offset:]) 316 317 if consumedBytes == 0: 318 break 319 320 offset += consumedBytes 321 322 if self.message.isComplete(): 323 # dispatch incoming message 324 if self.message.cmd == SpecMessage.REPLY: 325 replyID = self.message.sn 326 327 if replyID > 0: 328 try: 329 reply = self.registeredReplies[replyID] 330 except: 331 logging.getLogger("SpecClient").exception("Unexpected error while receiving a message from server") 332 else: 333 del self.registeredReplies[replyID] 334 335 reply.update(self.message.data, self.message.type == SpecMessage.ERROR, self.message.err) 336 337 #SpecEventsDispatcher.emit(self, 'replyFromSpec', (replyID, reply, )) 338 elif self.message.cmd == SpecMessage.EVENT: 339 for name in SpecChannel.SpecChannel.channel_aliases[self.message.name]: 340 self.registeredChannels[name].update(self.message.data, deleted=self.message.flags == SpecMessage.DELETED) 341 elif self.message.cmd == SpecMessage.HELLO_REPLY: 342 if self.checkourversion(self.message.name): 343 self.serverVersion = self.message.vers #header version 344 #self.state = CONNECTED 345 self.specConnected() 346 else: 347 self.serverVersion = None 348 self.connected = False 349 self.close() 350 self.state = DISCONNECTED 351 352 self.message = None 353 354 self.receivedStrings = [ s[offset:] ]
355 356
357 - def checkourversion(self, name):
358 """Check remote Spec version 359 360 If we are in port scanning mode, check if the name from 361 Spec corresponds to our required Spec version. 362 """ 363 if self.scanport: 364 if name == self.scanname: 365 return True 366 else: 367 #connected version does not match 368 return False 369 else: 370 return True
371 372
373 - def readable(self):
374 return self.valid_socket
375 376
377 - def writable(self):
378 """Return True if socket should be written.""" 379 ret = self.readable() and (len(self.sendq) > 0 or sum(map(len, self.outputStrings)) > 0) 380 #print 'writable?', str(self), ret 381 return ret
382 383
384 - def handle_connect(self):
385 """Handle 'connect' event on socket 386 387 Send a HELLO message. 388 """ 389 self.connected = True 390 391 self.state = WAITINGFORHELLO 392 self.send_msg_hello()
393 394
395 - def handle_write(self):
396 """Handle 'write' events on socket 397 398 Send all the messages from the queue. 399 """ 400 while len(self.sendq) > 0: 401 self.outputStrings.append(self.sendq.pop().sendingString()) 402 403 outputBuffer = ''.join(self.outputStrings) 404 405 sent = self.send(outputBuffer) 406 407 self.outputStrings = [ outputBuffer[sent:] ]
408 409
410 - def send_msg_cmd_with_return(self, cmd):
411 """Send a command message to the remote Spec server, and return the reply id. 412 413 Arguments: 414 cmd -- command string, i.e. '1+1' 415 """ 416 if self.isSpecConnected(): 417 import sys 418 try: 419 caller = sys._getframe(1).f_locals['self'] 420 except KeyError: 421 caller = None 422 423 return self.__send_msg_with_reply(replyReceiverObject = caller, *SpecMessage.msg_cmd_with_return(cmd, version = self.serverVersion)) 424 else: 425 raise SpecClientNotConnectedError
426 427
428 - def send_msg_func_with_return(self, cmd):
429 """Send a command message to the remote Spec server using the new 'func' feature, and return the reply id. 430 431 Arguments: 432 cmd -- command string 433 """ 434 if self.serverVersion < 3: 435 logging.getLogger('SpecClient').error('Cannot execute command in Spec : feature is available since Spec server v3 only') 436 else: 437 if self.isSpecConnected(): 438 import sys 439 try: 440 caller = sys._getframe(1).f_locals['self'] 441 except KeyError: 442 caller = None 443 444 message = SpecMessage.msg_func_with_return(cmd, version = self.serverVersion) 445 return self.__send_msg_with_reply(replyReceiverObject = caller, *message) 446 else: 447 raise SpecClientNotConnectedError
448 449
450 - def send_msg_cmd(self, cmd):
451 """Send a command message to the remote Spec server. 452 453 Arguments: 454 cmd -- command string, i.e. 'mv psvo 1.2' 455 """ 456 if self.isSpecConnected(): 457 self.__send_msg_no_reply(SpecMessage.msg_cmd(cmd, version = self.serverVersion)) 458 else: 459 raise SpecClientNotConnectedError
460 461
462 - def send_msg_func(self, cmd):
463 """Send a command message to the remote Spec server using the new 'func' feature 464 465 Arguments: 466 cmd -- command string 467 """ 468 if self.serverVersion < 3: 469 logging.getLogger('SpecClient').error('Cannot execute command in Spec : feature is available since Spec server v3 only') 470 else: 471 if self.isSpecConnected(): 472 self.__send_msg_no_reply(SpecMessage.msg_func(cmd, version = self.serverVersion)) 473 else: 474 raise SpecClientNotConnectedError
475 476
477 - def send_msg_chan_read(self, chanName):
478 """Send a channel read message, and return the reply id. 479 480 Arguments: 481 chanName -- a string representing the channel name, i.e. 'var/toto' 482 """ 483 if self.isSpecConnected(): 484 import sys 485 try: 486 caller = sys._getframe(1).f_locals['self'] 487 except KeyError: 488 caller = None 489 490 return self.__send_msg_with_reply(replyReceiverObject = caller, *SpecMessage.msg_chan_read(chanName, version = self.serverVersion)) 491 else: 492 raise SpecClientNotConnectedError
493 494
495 - def send_msg_chan_send(self, chanName, value):
496 """Send a channel write message. 497 498 Arguments: 499 chanName -- a string representing the channel name, i.e. 'var/toto' 500 value -- channel value 501 """ 502 if self.isSpecConnected(): 503 self.__send_msg_no_reply(SpecMessage.msg_chan_send(chanName, value, version = self.serverVersion)) 504 else: 505 raise SpecClientNotConnectedError
506 507
508 - def send_msg_register(self, chanName):
509 """Send a channel register message. 510 511 Arguments: 512 chanName -- a string representing the channel name, i.e. 'var/toto' 513 """ 514 if self.isSpecConnected(): 515 self.__send_msg_no_reply(SpecMessage.msg_register(chanName, version = self.serverVersion)) 516 else: 517 raise SpecClientNotConnectedError
518 519
520 - def send_msg_unregister(self, chanName):
521 """Send a channel unregister message. 522 523 Arguments: 524 chanName -- a string representing the channel name, i.e. 'var/toto' 525 """ 526 if self.isSpecConnected(): 527 self.__send_msg_no_reply(SpecMessage.msg_unregister(chanName, version = self.serverVersion)) 528 else: 529 raise SpecClientNotConnectedError
530 531
532 - def send_msg_close(self):
533 """Send a close message.""" 534 if self.isSpecConnected(): 535 self.__send_msg_no_reply(SpecMessage.msg_close(version = self.serverVersion)) 536 else: 537 raise SpecClientNotConnectedError
538 539
540 - def send_msg_abort(self):
541 """Send an abort message.""" 542 if self.isSpecConnected(): 543 self.__send_msg_no_reply(SpecMessage.msg_abort(version = self.serverVersion)) 544 else: 545 raise SpecClientNotConnectedError
546 547
548 - def send_msg_hello(self):
549 """Send a hello message.""" 550 self.__send_msg_no_reply(SpecMessage.msg_hello())
551 552
553 - def __send_msg_with_reply(self, reply, message, replyReceiverObject = None):
554 """Send a message to the remote Spec, and return the reply id. 555 556 The reply object is added to the registeredReplies dictionary, 557 with its reply id as the key. The reply id permits then to 558 register for the reply using the 'registerReply' method. 559 560 Arguments: 561 reply -- SpecReply object which will receive the reply 562 message -- SpecMessage object defining the message to send 563 """ 564 replyID = reply.id 565 self.registeredReplies[replyID] = reply 566 567 if hasattr(replyReceiverObject, 'replyArrived'): 568 SpecEventsDispatcher.connect(reply, 'replyFromSpec', replyReceiverObject.replyArrived) 569 570 self.sendq.insert(0, message) 571 572 return replyID
573 574
575 - def __send_msg_no_reply(self, message):
576 """Send a message to the remote Spec. 577 578 If a reply is sent depends only on the message, and not on the 579 method to send the message. Using this method, any reply is 580 lost. 581 """ 582 self.sendq.insert(0, message)
583