r71166 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r71165‎ | r71166 | r71167 >
Date:20:28, 16 August 2010
Author:daniel
Status:deferred
Tags:
Comment:
error recovery and debug options
Modified paths:
  • /trunk/extensions/XMLRC/bridge/udp2xmpp.py (modified) (history)

Diff [purge]

Index: trunk/extensions/XMLRC/bridge/udp2xmpp.py
@@ -21,7 +21,7 @@
2222 # along with this program. If not, see <http://www.gnu.org/licenses/>.
2323 ##############################################################################
2424
25 -import sys, os, os.path, traceback
 25+import sys, os, os.path, traceback, random
2626 import ConfigParser, optparse
2727 import select, socket, urllib
2828 import xmpp, xmpp.simplexml # using the xmpppy library <http://xmpppy.sourceforge.net/>, GPL
@@ -84,8 +84,13 @@
8585 self.loglevel = LOG_VERBOSE
8686 self.wiki_info = wiki_info;
8787
88 - def warn(self, message):
 88+ def warn(self, message, error_type = None, error_value = None, trbk = None):
8989 if self.loglevel >= LOG_QUIET:
 90+ if trbk and not error_type:
 91+ message = message + "\n" + " ".join( traceback.format_tb( trbk ) )
 92+ elif error_type:
 93+ message = message + " * " + " ".join( traceback.format_exception( error_type, error_value, trbk ) )
 94+
9095 sys.stderr.write( "WARNING: %s\n" % ( message.encode( self.console_encoding ) ) )
9196
9297 def info(self, message):
@@ -284,34 +289,103 @@
285290 if m is None or m == False:
286291 self.warn( "insufficient information in RC packet (rcid: %s), discarding message" % rc.getAttr('rcid') )
287292 else:
 293+ self.debug( "relying RC message: %s" % m )
288294 return t.send_message( m, rc )
289295
 296+ def select_connections( self, connection_sockets, broken, timeout = 1 ):
 297+ waiting = []
 298+
 299+ try:
 300+ (in_socks , out_socks, err_socks) = select.select(connection_sockets.keys(),[],connection_sockets.keys(),1)
 301+
 302+ for sock in err_socks:
 303+ con = connection_sockets[ sock ]
 304+ self.warn("exception in socket %s, connection %s" % (repr(sock), repr(con)));
 305+
 306+ broken.append( con )
 307+ del connection_sockets[ sock ]
 308+
 309+ for sock in in_socks:
 310+ con = connection_sockets[ sock ]
 311+ waiting.append( con )
 312+
 313+ except socket.error, e:
 314+ error_type, error_value, trbk = sys.exc_info()
 315+ found = False
 316+
 317+ for sock, conn in connection_sockets.items():
 318+ if not conn.test_connection():
 319+ self.warn("test_connection for connection %s failed after exception" % repr(con), error_type, error_value, trbk);
 320+ found = True
 321+
 322+ broken.append(conn)
 323+ del connection_sockets[ sock ]
 324+
 325+ if not found:
 326+ self.warn("exception ocurred, but all connections seem valid!", error_type, error_value, trbk);
 327+
 328+ except IOError, e:
 329+ error_type, error_value, trbk = sys.exc_info()
 330+ found = False
 331+
 332+ for sock, conn in connection_sockets.items():
 333+ if not conn.test_connection():
 334+ self.warn("test_connection for connection %s failed after exception" % repr(con), error_type, error_value, trbk);
 335+ found = True
 336+
 337+ broken.append(conn)
 338+ del connection_sockets[ sock ]
 339+
 340+ if not found:
 341+ self.warn("exception ocurred, but all connections seem valid!", error_type, error_value, trbk);
 342+
 343+ return waiting
 344+
290345 def service_loop( self, *connections ):
291 - socketlist = {}
292 - for con in connections:
293 - socketlist[ con.get_socket() ] = con
294 -
295346 self.online = 1
296347
 348+ connection_sockets = {}
 349+ for conn in connections:
 350+ connection_sockets[ conn.get_socket() ] = conn
 351+
 352+ broken = set()
 353+
297354 try:
298355 while self.online:
299 - (in_socks , out_socks, err_socks) = select.select(socketlist.keys(),[],socketlist.keys(),1)
 356+ waiting = self.select_connections( connection_sockets, broken, timeout = 1 )
300357
301 - for sock in in_socks:
302 - con = socketlist[ sock ]
 358+ if broken:
 359+ self.debug( "currently broken: %s" % repr(broken) )
303360
304 - if con:
305 - try:
306 - con.process()
307 - except Exception, e:
308 - error_type, error_value, trbk = sys.exc_info()
309 - self.warn( "Error while processing! %s" % " ".join( traceback.format_exception( error_type, error_value, trbk ) ) )
310 - # TODO: detect when we should kill the loop because a connection failed
311 - else:
312 - raise Exception( "Unknown socket: %s" % repr(sock) )
 361+ for conn in broken:
 362+ if conn.reconnect_backoff():
 363+ self.debug( "skipping attempt to reconnected for %s!" % repr(conn) )
 364+ continue
313365
314 - for sock in err_socks:
315 - raise Exception( "Error in socket: %s" % repr(sock) )
 366+ try:
 367+ conn.reconnect()
 368+
 369+ broken.remove( conn )
 370+ connection_sockets[ conn.get_socket() ] = conn
 371+ self.info( "reconnected %s!" % repr(conn) )
 372+
 373+ except Exception, e:
 374+ error_type, error_value, trbk = sys.exc_info()
 375+ self.warn( "Error during reconnect for connection %s!" % repr(conn), error_type, error_value, trbk )
 376+
 377+ for conn in waiting:
 378+ try:
 379+ conn.process()
 380+ except Exception, e:
 381+ error_type, error_value, trbk = sys.exc_info()
 382+
 383+ if not conn.test_connection():
 384+ self.warn("test_connection for connection %s failed after exception in process()" % repr(conn), error_type, error_value, trbk);
 385+ broken.append(conn)
 386+ del connection_sockets[ conn.get_socket() ]
 387+ else:
 388+ self.info("connection %s seems to be valid after exception in process()" % repr(conn), error_type, error_value, trbk);
 389+
316390 except KeyboardInterrupt:
317391 self.online= 0
318392
@@ -337,21 +411,29 @@
338412 self.relay.debug( message )
339413
340414 class XmppConnection (Connection):
341 - def __init__( self, relay, message_encoding = 'utf-8' ):
 415+ def __init__( self, relay, message_encoding = 'utf-8', backoff_factor = 5, backoff_max_tock = 6, xmpp_debug = [], connection_security = None ):
342416 super( XmppConnection, self ).__init__( relay )
343417 self.message_encoding = message_encoding
344418 self.jid = None
345419
 420+ if connection_security == '0' or connection_security == 0 or connection_security == 'off':
 421+ self.connection_security = 0
 422+ elif connection_security == '1' or connection_security == 1 or connection_security == 'on':
 423+ self.connection_security = 1
 424+ else:
 425+ self.connection_security = None
 426+
 427+ self.xmpp_debug = xmpp_debug
 428+ self.last_iq = None
 429+
 430+ self.backoff_tick = 0
 431+ self.backoff_tock = 0
 432+ self.backoff_factor = backoff_factor
 433+ self.backoff_max_tock = backoff_max_tock
 434+
346435 def process( self ):
347436 self.jabber.Process(1)
348437
349 - if not self.jabber.isConnected():
350 - self.warn("connection lost, reconnecting...")
351 -
352 - if self.jabber.reconnectAndReauth():
353 - self.warn("re-connect successful.")
354 - self.on_connect()
355 -
356438 def close( self ):
357439 # self.jabber.disconnect() #wha??
358440 # XXX: leave chat rooms, etc?
@@ -380,8 +462,13 @@
381463 if (message.getError()):
382464 self.warn("received %s error from <%s>: %s" % (message.getType(), message.getError(), message.getFrom() ))
383465 elif message.getBody():
384 - self.debug("discarding %s message from <%s>: %s" % (message.getType(), message.getFrom(), message.getBody().strip() ))
 466+ if message.getFrom().getResource() != self.jid.getNode(): #FIXME: this inly works if no different nick was specified when joining the channel
 467+ self.debug("discarding %s message from <%s>: %s" % (message.getType(), message.getFrom(), message.getBody().strip() ))
385468
 469+ def process_iq(self, con, iq):
 470+ self.debug("received iq: %s" % repr(iq))
 471+ self.last_iq = iq
 472+
386473 def guess_local_resource(self):
387474 resource = "%s-%d" % ( socket.gethostname(), os.getpid() )
388475
@@ -393,10 +480,10 @@
394481 jid = xmpp.protocol.JID( jid )
395482
396483 if jid.getResource() is None:
397 - jid = xmpp.protocol.JID( host= jid.getHost(), node= jid.getNode(), resource = self.guess_local_resource() )
 484+ jid = xmpp.protocol.JID( host= jid.getDomain(), node= jid.getNode(), resource = self.guess_local_resource() )
398485
399 - self.jabber = xmpp.Client(jid.getDomain(),debug=[])
400 - con= self.jabber.connect()
 486+ self.jabber = xmpp.Client(jid.getDomain(),debug=self.xmpp_debug)
 487+ con= self.jabber.connect( secure = self.connection_security )
401488
402489 if not con:
403490 self.warn( 'could not connect to %s!' % jid.getDomain() )
@@ -413,27 +500,109 @@
414501 self.debug('authenticated using %s as %s' % ( auth, jid ) )
415502
416503 self.jabber.RegisterHandler( 'message', self.process_message )
 504+ self.jabber.RegisterHandler( 'iq', self.process_iq )
417505
418506 self.jid = jid;
419507 self.info( 'connected as %s' % ( jid ) )
420508
 509+ if self.ping():
 510+ self.info( 'ping ok!' )
 511+ else:
 512+ self.warn( 'ping failed!' )
 513+
421514 self.on_connect()
422515
423516 return con
424517
425518 def on_connect( self ):
426519 self.jabber.sendInitPresence(self)
 520+
 521+ self.backoff_tick = 0
 522+ self.backoff_tock = 0
 523+
427524 self.roster = self.jabber.getRoster()
428525
429 - self.relay.join_channels()
 526+ self.relay.join_channels() #FIXME: this re-joins *all* channels. not just the ones for this connection!
430527
431528 def get_socket( self ):
432529 return self.jabber.Connection._sock
433530
 531+ def is_connected( self ):
 532+ return self.jabber.isConnected()
 533+
 534+ def test_connection( self ):
 535+ if not self.is_connected():
 536+ return False
 537+
 538+ ok = True
 539+
 540+ try:
 541+ ok = self.get_socket().fileno() >= 0
 542+ except:
 543+ ok = False
 544+
 545+ if not ok: return False
 546+
 547+ try:
 548+ ok = self.ping()
 549+ except:
 550+ ok = False
 551+
 552+ self.jabber.connected = None #XXX: ugly
 553+ return ok
 554+
 555+ def ping( self ):
 556+ ping_id = "ping-%s" % random.randint(1000000, 9999999)
 557+
 558+ ping = xmpp.Iq( typ='get', attrs={ 'id': ping_id }, to= self.jid.getDomain(), frm= self.jid.getStripped() )
 559+ ping.addChild( name= "ping", namespace = "urn:xmpp:ping" )
 560+ self.jabber.send( ping )
 561+ self.debug('XMPP ping sent')
 562+
 563+ ping_stanzas = 10
 564+
 565+ while self.last_iq is None or self.last_iq.getAttr('id') != ping_id:
 566+ n = self.jabber.Process(1)
 567+ if n == 0 or n is None or not self.jabber.isConnected():
 568+ raise IOError("connection lost!")
 569+
 570+ self.debug('waiting for XMPP pong, received %s bytes of other data; %s tries remaining' % (n, ping_stanzas) )
 571+
 572+ ping_stanzas -= 1
 573+ if ping_stanzas <= 0:
 574+ raise IOError("ping got no response in time!")
 575+
 576+ if not self.jabber.isConnected():
 577+ raise IOError("connection lost!")
 578+
 579+ return self.last_iq
 580+
 581+ def reconnect_backoff( self ):
 582+ self.debug( "reconnect_backoff: tick = %d, tock = %d" )
 583+
 584+ if self.backoff_tick <= 0:
 585+ self.backoff_tock = min( self.backoff_tock + 1, self.backoff_max_tock )
 586+ self.backoff_tick = self.backoff_tock * self.backoff_factor
 587+ return False
 588+ else:
 589+ self.backoff_tick -= 1
 590+ return True
 591+
 592+ def reconnect( self ):
 593+ try:
 594+ if self.jabber:
 595+ self.close()
 596+ except:
 597+ pass
 598+
 599+ self.jabber.reconnectAndReauth(self)
 600+ self.on_connect()
 601+
434602 class CommandConnection (Connection):
435603 def __init__( self, relay, socket ):
436604 super( CommandConnection, self ).__init__( relay )
437605 self.socket = socket
 606+ self.connected = None
438607
439608 def close( self ):
440609 if self.socket != sys.stdin:
@@ -453,14 +622,38 @@
454623 def process_command(self, command):
455624 self.relay.process_command( command )
456625
 626+ def is_connected( self ):
 627+ if self.connected is None:
 628+ self.connected = self.test_connection()
 629+
 630+ return self.connected
 631+
457632 def get_socket( self ):
458633 return self.socket
459634
 635+ def test_connection( self ):
 636+ try:
 637+ self.socket.fileno()
 638+ self.connected = True
 639+ except:
 640+ self.connected = False
 641+
 642+ return self.connected
 643+
 644+ def reconnect_backoff( self ):
 645+ return False
 646+
 647+ def reconnect( self ):
 648+ raise IOException("can't reconnect command socket!")
 649+
 650+
460651 class UdpConnection (Connection):
461652 def __init__( self, relay, buffer_size = 8192 ):
462653 super( UdpConnection, self ).__init__( relay )
463654 self.buffer_size = buffer_size
464655 self.socket = None
 656+ self.address = None
 657+ self.connected = None
465658
466659 def close( self ):
467660 self.socket.close()
@@ -478,7 +671,6 @@
479672 def process_rc_packet(self, data):
480673 try:
481674 dom = xmpp.simplexml.XML2Node( data )
482 - self.debug( "parsed rc packet" )
483675
484676 if dom.getName() != "rc":
485677 self.warn( "expected <rc> element, found <%s>; sklipping unknown XML" % dom.getName() )
@@ -503,12 +695,45 @@
504696 self.warn( "failed to bind to UDP %s:%d" % (interface, port) )
505697 return False
506698
 699+ self.address = (interface, port)
 700+
507701 self.info( "listening to UDP %s:%d" % (interface, port) )
 702+ self.connected = True
 703+
508704 return True
509705
510706 def get_socket( self ):
511707 return self.socket
512708
 709+ def is_connected( self ):
 710+ return self.connected
 711+
 712+ def test_connection( self ):
 713+ try:
 714+ self.socket.fileno()
 715+ #TODO: try more stuff!
 716+ return True
 717+ except:
 718+ pass
 719+
 720+ self.connected = False
 721+ return False
 722+
 723+ def reconnect_backoff( self ):
 724+ return False
 725+
 726+ def reconnect( self ):
 727+ try:
 728+ if self.socket:
 729+ self.close()
 730+ except:
 731+ pass
 732+
 733+ if self.address:
 734+ return self.connect( self.address[1], self.address[0] )
 735+ else:
 736+ return None
 737+
513738 ##################################################################################
514739
515740 class Channel(object):
@@ -551,6 +776,10 @@
552777 return message
553778
554779 def send_message( self, message, xml = None, mtype = None ):
 780+ if not self.connection.is_connected():
 781+ self.connection.warn( "not connected XMPP server, discarding message %s" % message )
 782+ return False
 783+
555784 message = self.compose_message( message, mtype = mtype, xml = xml )
556785
557786 return self.connection.jabber.send( message )
@@ -597,12 +826,18 @@
598827 option_parser.add_option("--wiki-info", dest="wiki_info_file",
599828 help="read wiki info from FILE", metavar="FILE")
600829
 830+ option_parser.add_option("--xmpp-security", dest="xmpp_security",
 831+ help="SSL/TSL security. 'on', 'off' or 'auto'.")
 832+
601833 option_parser.add_option("--quiet", action="store_const", dest="loglevel", const=LOG_QUIET, default=LOG_VERBOSE,
602834 help="suppress informational messages, only print warnings and errors")
603835
604836 option_parser.add_option("--debug", action="store_const", dest="loglevel", const=LOG_DEBUG,
605837 help="print debug messages")
606838
 839+ option_parser.add_option("--xmpp-debug", action="store_true", dest="xmpp_debug",
 840+ help="""enable debugging in the xmpppy library. Flags are set in the [XMPP] section configuration, using the key 'debug-flags'. Flags are separated by pipe characters.""")
 841+
607842 (options, args) = option_parser.parse_args()
608843
609844 # find config file........
@@ -627,6 +862,8 @@
628863
629864 config.add_section( 'XMPP' )
630865 config.set( 'XMPP', 'message-encoding', 'utf-8' )
 866+ config.set( 'XMPP', 'debug-flags', 'client|component|got' )
 867+ config.set( 'XMPP', 'security', 'auto' )
631868
632869 # read config file........
633870 if not config.read( cfg ):
@@ -684,10 +921,19 @@
685922
686923 relay.loglevel = options.loglevel
687924
 925+ xmpp_debug = []
 926+ if options.xmpp_debug:
 927+ xmpp_debug = config.get( 'XMPP', 'debug-flags' ).split("|")
 928+
 929+ if options.xmpp_security:
 930+ connection_security = options.xmpp_security
 931+ else:
 932+ connection_security = config.get( 'XMPP', 'security' )
 933+
688934 # create connections............
689935 commands_con = CommandConnection( relay, sys.stdin )
690936 udp_con = UdpConnection( relay, buffer_size = config.getint( 'UDP', 'buffer-size' ) )
691 - xmpp_con = XmppConnection( relay, message_encoding = config.get( 'XMPP', 'message-encoding' ) )
 937+ xmpp_con = XmppConnection( relay, message_encoding = config.get( 'XMPP', 'message-encoding' ), xmpp_debug = xmpp_debug, connection_security = connection_security )
692938
693939 # -- DO STUFF -----------------------------------------------------------------------------------
694940

Status & tagging log