import optparse import sys import time import subprocess import os import sleekxmpp class Responder(sleekxmpp.ClientXMPP): def __init__(self, jid, password, room, nick): sleekxmpp.ClientXMPP.__init__(self, jid, password) self.room = room self.nick = nick self.add_event_handler("session_start", self.start) self.add_event_handler("groupchat_message", self.muc_message) def muc_message(self, msg): if msg['mucnick'] != self.nick: self.send_message(mto=msg['from'].bare, mbody="echo %s" % msg['body'], mtype='groupchat') def start(self, event): self.plugin['xep_0045'].joinMUC(self.room, self.nick, wait=True) #time.sleep(1) #self.send_message(mto=self.room, mbody=self.message, mtype='groupchat') #time.sleep(10) #self.disconnect() class Client(sleekxmpp.ClientXMPP): def __init__(self, jid, password, room, nick): sleekxmpp.ClientXMPP.__init__(self, jid, password) self.room = room self.nick = nick self.echo_received = False self.add_event_handler("session_start", self.start) self.add_event_handler("groupchat_message", self.muc_message) def muc_message(self, msg): if msg['mucnick'] != self.nick: if msg['body'] == "echo abc": self.echo_received = True def start(self, event): self.getRoster() self.sendPresence() self.plugin['xep_0045'].joinMUC(self.room, self.nick, wait=True) self.send_message(mto=self.room, mbody="abc", mtype='groupchat') #time.sleep(10) #self.disconnect() if __name__ == '__main__': os.system("../spectrum2 -n ./irc_test.cfg &") os.system("ngircd -f ngircd.conf &") time.sleep(1) responder = Responder("responder@localhost", "password", "#channel@localhost", "responder") responder.register_plugin('xep_0030') # Service Discovery responder.register_plugin('xep_0045') # Multi-User Chat responder.register_plugin('xep_0199') # XMPP Ping responder['feature_mechanisms'].unencrypted_plain = True if responder.connect(): responder.process(block=False) else: print "connect() failed" sys.exit(1) client = Client("client@localhost", "password", "#channel@localhost", "client") client.register_plugin('xep_0030') # Service Discovery client.register_plugin('xep_0045') # Multi-User Chat client.register_plugin('xep_0199') # XMPP Ping client['feature_mechanisms'].unencrypted_plain = True time.sleep(2) if client.connect(): client.process(block=False) else: print "connect() failed" sys.exit(1) time.sleep(10) client.disconnect() responder.disconnect() os.system("killall spectrum2") os.system("killall ngircd") os.system("killall spectrum2_libcommuni_backend") if client.echo_received == False: print "ERROR: 'echo abc' not received." sys.exit(1) print "ALL TESTS PASSED"