Files @ 36b444536cb0
Branch filter:

Location: libtransport.git/spectrum/src/tests/irc_test.py

Jan Kaluza
Fix regression when 'from' attribute has not been sent in some situations in server mode.
import optparse
import sys
import time
import subprocess
import os

import sleekxmpp


class Responder(sleekxmpp.ClientXMPP):
	def __init__(self, jid, password, room, nick):
		sleekxmpp.ClientXMPP.__init__(self, jid, password)
		self.room = room
		self.nick = nick
		self.add_event_handler("session_start", self.start)
		self.add_event_handler("groupchat_message", self.muc_message)

	def muc_message(self, msg):
		if msg['mucnick'] != self.nick:
			self.send_message(mto=msg['from'].bare,
							mbody="echo %s" % msg['body'],
							mtype='groupchat')

	def start(self, event):
		self.plugin['xep_0045'].joinMUC(self.room, self.nick, wait=True)
		#time.sleep(1)
		#self.send_message(mto=self.room, mbody=self.message, mtype='groupchat')
		#time.sleep(10)
		#self.disconnect()

class Client(sleekxmpp.ClientXMPP):
	def __init__(self, jid, password, room, nick):
		sleekxmpp.ClientXMPP.__init__(self, jid, password)
		self.room = room
		self.nick = nick
		self.echo_received = False
		self.add_event_handler("session_start", self.start)
		self.add_event_handler("groupchat_message", self.muc_message)

	def muc_message(self, msg):
		if msg['mucnick'] != self.nick:
			if msg['body'] == "echo abc":
				self.echo_received = True

	def start(self, event):
		self.getRoster()
		self.sendPresence()
		self.plugin['xep_0045'].joinMUC(self.room, self.nick, wait=True)
		self.send_message(mto=self.room, mbody="abc", mtype='groupchat')
		#time.sleep(10)
		#self.disconnect()

if __name__ == '__main__':
	os.system("../spectrum2 -n ./irc_test.cfg &")
	os.system("ngircd -f ngircd.conf &")
	time.sleep(1)
	responder = Responder("responder@localhost", "password", "#channel@localhost", "responder")
	responder.register_plugin('xep_0030')  # Service Discovery
	responder.register_plugin('xep_0045')  # Multi-User Chat
	responder.register_plugin('xep_0199')  # XMPP Ping
	responder['feature_mechanisms'].unencrypted_plain = True

	if responder.connect():
		responder.process(block=False)
	else:
		print "connect() failed"
		sys.exit(1)

	client = Client("client@localhost", "password", "#channel@localhost", "client")
	client.register_plugin('xep_0030')  # Service Discovery
	client.register_plugin('xep_0045')  # Multi-User Chat
	client.register_plugin('xep_0199')  # XMPP Ping
	client['feature_mechanisms'].unencrypted_plain = True

	time.sleep(2)

	if client.connect():
		client.process(block=False)
	else:
		print "connect() failed"
		sys.exit(1)

	time.sleep(10)
	client.disconnect()
	responder.disconnect()

	os.system("killall spectrum2")
	os.system("killall ngircd")
	os.system("killall spectrum2_libcommuni_backend")

	if client.echo_received == False:
		print "ERROR: 'echo abc' not received."
		sys.exit(1)

	print "ALL TESTS PASSED"