Files
@ f004677d447a
Branch filter:
Location: libtransport.git/spectrum/src/tests/irc_test.py - annotation
f004677d447a
2.6 KiB
text/x-python
Another try to use sleekxmpp in travis...
e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 e3b210bbdd09 | import optparse
import sys
import time
import subprocess
import os
import sleekxmpp
class Responder(sleekxmpp.ClientXMPP):
def __init__(self, jid, password, room, nick):
sleekxmpp.ClientXMPP.__init__(self, jid, password)
self.room = room
self.nick = nick
self.add_event_handler("session_start", self.start)
self.add_event_handler("groupchat_message", self.muc_message)
def muc_message(self, msg):
if msg['mucnick'] != self.nick:
self.send_message(mto=msg['from'].bare,
mbody="echo %s" % msg['body'],
mtype='groupchat')
def start(self, event):
self.plugin['xep_0045'].joinMUC(self.room, self.nick, wait=True)
#time.sleep(1)
#self.send_message(mto=self.room, mbody=self.message, mtype='groupchat')
#time.sleep(10)
#self.disconnect()
class Client(sleekxmpp.ClientXMPP):
def __init__(self, jid, password, room, nick):
sleekxmpp.ClientXMPP.__init__(self, jid, password)
self.room = room
self.nick = nick
self.echo_received = False
self.add_event_handler("session_start", self.start)
self.add_event_handler("groupchat_message", self.muc_message)
def muc_message(self, msg):
if msg['mucnick'] != self.nick:
if msg['body'] == "echo abc":
self.echo_received = True
def start(self, event):
self.plugin['xep_0045'].joinMUC(self.room, self.nick, wait=True)
self.send_message(mto=self.room, mbody="abc", mtype='groupchat')
#time.sleep(10)
#self.disconnect()
if __name__ == '__main__':
os.system("../spectrum2 -n ./irc_test.cfg &")
os.system("ngircd -f ngircd.conf &")
time.sleep(1)
responder = Responder("responder@localhost", "password", "#channel@localhost", "responder")
responder.register_plugin('xep_0030') # Service Discovery
responder.register_plugin('xep_0045') # Multi-User Chat
responder.register_plugin('xep_0199') # XMPP Ping
responder['feature_mechanisms'].unencrypted_plain = True
if responder.connect():
responder.process(block=False)
else:
print "connect() failed"
sys.exit(1)
client = Client("client@localhost", "password", "#channel@localhost", "client")
client.register_plugin('xep_0030') # Service Discovery
client.register_plugin('xep_0045') # Multi-User Chat
client.register_plugin('xep_0199') # XMPP Ping
client['feature_mechanisms'].unencrypted_plain = True
time.sleep(2)
if client.connect():
client.process(block=False)
else:
print "connect() failed"
sys.exit(1)
time.sleep(10)
client.disconnect()
responder.disconnect()
os.system("killall spectrum2")
os.system("killall ngircd")
os.system("killall spectrum2_libcommuni_backend")
if client.echo_received == False:
print "ERROR: 'echo abc' not received."
sys.exit(1)
print "ALL TESTS PASSED"
|