Changeset - f082fdf50542
[Not reviewed]
0 3 2
Jan Kaluza - 10 years ago 2015-12-16 15:01:18
jkaluza@redhat.com
Test join/leave IRC room and change IRC room topic
5 files changed with 106 insertions and 2 deletions:
0 comments (0 inline, 0 general)
.gitignore
Show inline comments
 
*.pb.cc
 
*.pb.h
 
plugin/python/protocol_pb2.py
 
Makefile
 
*.cmake
 
*.so
 
*.so.*
 
*.log
 
libtransport_test
 
CMakeFiles
 
spectrum2
 
transport_config.h
 
Doxyfile
 
moc_*
 
CMakeCache.txt
 
*.patch
 
*.orig
 
spectrum2_manager
 
dfrotz
 
spectrum2_frotz_backend
 
spectrum2_libcommuni_backend
 
spectrum2_libpurple_backend
 
spectrum2_skype_backend
 
spectrum2_smstools3_backend
 
spectrum2_swiften_backend
 
spectrum2_template_backend
 
spectrum2_twitter_backend
 
install_manifest.txt
 
slack.cfg
 
*.sqlite
 
localhost.port
 
*.a
 
core.*
 
*.pyc
 

	
spectrum/src/tests/muc_change_topic.py
Show inline comments
 
new file 100644
 
import optparse
 
import sys
 
import time
 
import subprocess
 
import os
 

	
 
import sleekxmpp
 

	
 

	
 
class Responder(sleekxmpp.ClientXMPP):
 
	def __init__(self, jid, password, room, nick):
 
		sleekxmpp.ClientXMPP.__init__(self, jid, password)
 
		self.room = room
 
		self.nick = nick
 
		self.finished = False
 
		self.add_event_handler("session_start", self.start)
 
		self.add_event_handler("groupchat_message", self.muc_message)
 

	
 
		self.tests = {}
 

	
 
	def muc_message(self, msg):
 
		if msg['body'] == "ready":
 
			self.send_message(mto=self.room, mbody=None, msubject='The new subject', mtype='groupchat')
 

	
 
	def start(self, event):
 
		self.plugin['xep_0045'].joinMUC(self.room, self.nick, wait=True)
 

	
 
class Client(sleekxmpp.ClientXMPP):
 
	def __init__(self, jid, password, room, nick):
 
		sleekxmpp.ClientXMPP.__init__(self, jid, password)
 
		self.room = room
 
		self.nick = nick
 
		self.add_event_handler("session_start", self.start)
 
		self.add_event_handler("groupchat_subject", self.muc_subject)
 
		self.finished = False
 

	
 
		self.tests = {}
 
		self.tests["subject_changed"] = ["libcommuni: Change topic", False]
 

	
 
	def muc_subject(self, msg):
 
		if msg['subject'] == "The new subject":
 
			self.tests["subject_changed"][1] = True
 
			self.finished = True
 

	
 
	def start(self, event):
 
		self.getRoster()
 
		self.sendPresence()
 
		self.plugin['xep_0045'].joinMUC(self.room, self.nick, wait=True)
 
		self.send_message(mto=self.room, mbody="ready", mtype='groupchat')
spectrum/src/tests/muc_echo.py
Show inline comments
 
import optparse
 
import sys
 
import time
 
import subprocess
 
import os
 

	
 
import sleekxmpp
 

	
 

	
 
class Responder(sleekxmpp.ClientXMPP):
 
	def __init__(self, jid, password, room, nick):
 
		sleekxmpp.ClientXMPP.__init__(self, jid, password)
 
		self.room = room
 
		self.nick = nick
 
		self.finished = False
 
		self.add_event_handler("session_start", self.start)
 
		self.add_event_handler("groupchat_message", self.muc_message)
 

	
 
		self.tests = {}
 

	
 
	def muc_message(self, msg):
 
		if msg['mucnick'] != self.nick:
 
			self.send_message(mto=msg['from'].bare,
 
							mbody="echo %s" % msg['body'],
 
							mtype='groupchat')
 

	
 
	def start(self, event):
 
		self.plugin['xep_0045'].joinMUC(self.room, self.nick, wait=True)
 

	
 
class Client(sleekxmpp.ClientXMPP):
 
	def __init__(self, jid, password, room, nick):
 
		sleekxmpp.ClientXMPP.__init__(self, jid, password)
 
		self.room = room
 
		self.nick = nick
 
		self.add_event_handler("session_start", self.start)
 
		self.add_event_handler("groupchat_message", self.muc_message)
 
		self.finished = False
 

	
 
		self.tests = {}
 
		self.tests["echo_received"] = ["libcommuni: Send and receive messages", False]
 

	
 
	def muc_message(self, msg):
 
		if msg['mucnick'] != self.nick:
 
			if msg['body'] == "echo abc":
 
				self.tests["echo_received"][1] = True
 
				self.finished = True
 

	
 
	def start(self, event):
 
		self.getRoster()
 
		self.sendPresence()
 
		self.plugin['xep_0045'].joinMUC(self.room, self.nick, wait=True)
 
		self.send_message(mto=self.room, mbody="abc", mtype='groupchat')
spectrum/src/tests/muc_join_leave.py
Show inline comments
 
new file 100644
 
import optparse
 
import sys
 
import time
 
import subprocess
 
import os
 

	
 
import sleekxmpp
 

	
 

	
 
class Responder(sleekxmpp.ClientXMPP):
 
	def __init__(self, jid, password, room, nick):
 
		sleekxmpp.ClientXMPP.__init__(self, jid, password)
 
		self.room = room
 
		self.nick = nick
 
		self.finished = False
 
		self.add_event_handler("session_start", self.start)
 
		self.add_event_handler("muc::" + room + "::got_online", self.muc_got_online)
 
		self.add_event_handler("muc::" + room + "::got_offline", self.muc_got_offline)
 

	
 
		self.tests = {}
 
		self.tests["online_received"] = ["libcommuni: Received available presence", False]
 
		self.tests["offline_received"] = ["libcommuni: Received unavailable presence", False]
 

	
 
	def muc_got_online(self, presence):
 
		if presence['muc']['nick'] == "client":
 
			self.tests["online_received"][1] = True
 

	
 
	def muc_got_offline(self, presence):
 
		if presence['muc']['nick'] == "client":
 
			self.tests["offline_received"][1] = True
 
			self.finished = True
 

	
 
	def start(self, event):
 
		self.plugin['xep_0045'].joinMUC(self.room, self.nick, wait=True)
 

	
 
class Client(sleekxmpp.ClientXMPP):
 
	def __init__(self, jid, password, room, nick):
 
		sleekxmpp.ClientXMPP.__init__(self, jid, password)
 
		self.room = room
 
		self.nick = nick
 
		self.add_event_handler("session_start", self.start)
 
		self.finished = False
 

	
 
		self.tests = {}
 

	
 
	def start(self, event):
 
		self.getRoster()
 
		self.sendPresence()
 
		self.plugin['xep_0045'].joinMUC(self.room, self.nick, wait=True)
 
		self.plugin['xep_0045'].leaveMUC(self.room, self.nick)
spectrum/src/tests/start.py
Show inline comments
 
import optparse
 
import sys
 
import time
 
import subprocess
 
import os
 

	
 
import sleekxmpp
 
import imp
 

	
 
def single_test(Client, Responder):
 
	os.system("../spectrum2 -n ./irc_test.cfg > spectrum2.log &")
 
	os.system("ngircd -f ngircd.conf &")
 
	time.sleep(1)
 
	responder = Responder("responder@localhost", "password", "#channel@localhost", "responder")
 
	responder.register_plugin('xep_0030')  # Service Discovery
 
	responder.register_plugin('xep_0045')  # Multi-User Chat
 
	responder.register_plugin('xep_0199')  # XMPP Ping
 
	responder['feature_mechanisms'].unencrypted_plain = True
 

	
 
	if responder.connect():
 
		responder.process(block=False)
 
	else:
 
		print "connect() failed"
 
		sys.exit(1)
 

	
 
	client = Client("client@localhost", "password", "#channel@localhost", "client")
 
	client.register_plugin('xep_0030')  # Service Discovery
 
	client.register_plugin('xep_0045')  # Multi-User Chat
 
	client.register_plugin('xep_0199')  # XMPP Ping
 
	client['feature_mechanisms'].unencrypted_plain = True
 

	
 
	time.sleep(2)
 

	
 
	if client.connect():
 
		client.process(block=False)
 
	else:
 
		print "connect() failed"
 
		sys.exit(1)
 

	
 
	max_time = 60
 
	while not client.finished and max_time > 0:
 
	while not client.finished and not responder.finished and max_time > 0:
 
		time.sleep(1)
 
		max_time -= 1
 
	client.disconnect()
 
	responder.disconnect()
 

	
 
	os.system("killall spectrum2")
 
	os.system("killall ngircd")
 
	os.system("killall spectrum2_libcommuni_backend 2>/dev/null")
 

	
 
	ret = True
 
	for v in client.tests.values():
 
	tests = []
 
	tests += client.tests.values()
 
	tests += responder.tests.values()
 
	for v in tests:
 
		if v[1]:
 
			print v[0] + ": PASSED"
 
		else:
 
			print v[0] + ": FAILED"
 
			ret = False
 

	
 
	if not ret:
 
		os.system("cat spectrum2.log")
 

	
 
	return ret
 

	
 
exitcode = 0
 
for f in os.listdir("."):
 
	if not f.endswith(".py") or f == "start.py":
 
		continue
 

	
 
	print "Starting " + f + " test ..."
 
	test = imp.load_source('test', './' + f)
 
	ret = single_test(test.Client, test.Responder)
 
	if not ret:
 
		exitcode = -1
 

	
 
sys.exit(exitcode)
 

	
0 comments (0 inline, 0 general)