Changeset - f082fdf50542
[Not reviewed]
0 3 2
Jan Kaluza - 10 years ago 2015-12-16 15:01:18
jkaluza@redhat.com
Test join/leave IRC room and change IRC room topic
5 files changed with 106 insertions and 2 deletions:
0 comments (0 inline, 0 general)
.gitignore
Show inline comments
 
@@ -22,13 +22,14 @@ spectrum2_libcommuni_backend
 
spectrum2_libpurple_backend
 
spectrum2_skype_backend
 
spectrum2_smstools3_backend
 
spectrum2_swiften_backend
 
spectrum2_template_backend
 
spectrum2_twitter_backend
 
install_manifest.txt
 
slack.cfg
 
*.sqlite
 
localhost.port
 
*.a
 
core.*
 
*.pyc
 

	
spectrum/src/tests/muc_change_topic.py
Show inline comments
 
new file 100644
 
import optparse
 
import sys
 
import time
 
import subprocess
 
import os
 

	
 
import sleekxmpp
 

	
 

	
 
class Responder(sleekxmpp.ClientXMPP):
 
	def __init__(self, jid, password, room, nick):
 
		sleekxmpp.ClientXMPP.__init__(self, jid, password)
 
		self.room = room
 
		self.nick = nick
 
		self.finished = False
 
		self.add_event_handler("session_start", self.start)
 
		self.add_event_handler("groupchat_message", self.muc_message)
 

	
 
		self.tests = {}
 

	
 
	def muc_message(self, msg):
 
		if msg['body'] == "ready":
 
			self.send_message(mto=self.room, mbody=None, msubject='The new subject', mtype='groupchat')
 

	
 
	def start(self, event):
 
		self.plugin['xep_0045'].joinMUC(self.room, self.nick, wait=True)
 

	
 
class Client(sleekxmpp.ClientXMPP):
 
	def __init__(self, jid, password, room, nick):
 
		sleekxmpp.ClientXMPP.__init__(self, jid, password)
 
		self.room = room
 
		self.nick = nick
 
		self.add_event_handler("session_start", self.start)
 
		self.add_event_handler("groupchat_subject", self.muc_subject)
 
		self.finished = False
 

	
 
		self.tests = {}
 
		self.tests["subject_changed"] = ["libcommuni: Change topic", False]
 

	
 
	def muc_subject(self, msg):
 
		if msg['subject'] == "The new subject":
 
			self.tests["subject_changed"][1] = True
 
			self.finished = True
 

	
 
	def start(self, event):
 
		self.getRoster()
 
		self.sendPresence()
 
		self.plugin['xep_0045'].joinMUC(self.room, self.nick, wait=True)
 
		self.send_message(mto=self.room, mbody="ready", mtype='groupchat')
spectrum/src/tests/muc_echo.py
Show inline comments
 
@@ -3,24 +3,25 @@ import sys
 
import time
 
import subprocess
 
import os
 

	
 
import sleekxmpp
 

	
 

	
 
class Responder(sleekxmpp.ClientXMPP):
 
	def __init__(self, jid, password, room, nick):
 
		sleekxmpp.ClientXMPP.__init__(self, jid, password)
 
		self.room = room
 
		self.nick = nick
 
		self.finished = False
 
		self.add_event_handler("session_start", self.start)
 
		self.add_event_handler("groupchat_message", self.muc_message)
 

	
 
		self.tests = {}
 

	
 
	def muc_message(self, msg):
 
		if msg['mucnick'] != self.nick:
 
			self.send_message(mto=msg['from'].bare,
 
							mbody="echo %s" % msg['body'],
 
							mtype='groupchat')
 

	
 
	def start(self, event):
spectrum/src/tests/muc_join_leave.py
Show inline comments
 
new file 100644
 
import optparse
 
import sys
 
import time
 
import subprocess
 
import os
 

	
 
import sleekxmpp
 

	
 

	
 
class Responder(sleekxmpp.ClientXMPP):
 
	def __init__(self, jid, password, room, nick):
 
		sleekxmpp.ClientXMPP.__init__(self, jid, password)
 
		self.room = room
 
		self.nick = nick
 
		self.finished = False
 
		self.add_event_handler("session_start", self.start)
 
		self.add_event_handler("muc::" + room + "::got_online", self.muc_got_online)
 
		self.add_event_handler("muc::" + room + "::got_offline", self.muc_got_offline)
 

	
 
		self.tests = {}
 
		self.tests["online_received"] = ["libcommuni: Received available presence", False]
 
		self.tests["offline_received"] = ["libcommuni: Received unavailable presence", False]
 

	
 
	def muc_got_online(self, presence):
 
		if presence['muc']['nick'] == "client":
 
			self.tests["online_received"][1] = True
 

	
 
	def muc_got_offline(self, presence):
 
		if presence['muc']['nick'] == "client":
 
			self.tests["offline_received"][1] = True
 
			self.finished = True
 

	
 
	def start(self, event):
 
		self.plugin['xep_0045'].joinMUC(self.room, self.nick, wait=True)
 

	
 
class Client(sleekxmpp.ClientXMPP):
 
	def __init__(self, jid, password, room, nick):
 
		sleekxmpp.ClientXMPP.__init__(self, jid, password)
 
		self.room = room
 
		self.nick = nick
 
		self.add_event_handler("session_start", self.start)
 
		self.finished = False
 

	
 
		self.tests = {}
 

	
 
	def start(self, event):
 
		self.getRoster()
 
		self.sendPresence()
 
		self.plugin['xep_0045'].joinMUC(self.room, self.nick, wait=True)
 
		self.plugin['xep_0045'].leaveMUC(self.room, self.nick)
spectrum/src/tests/start.py
Show inline comments
 
@@ -29,36 +29,39 @@ def single_test(Client, Responder):
 
	client.register_plugin('xep_0199')  # XMPP Ping
 
	client['feature_mechanisms'].unencrypted_plain = True
 

	
 
	time.sleep(2)
 

	
 
	if client.connect():
 
		client.process(block=False)
 
	else:
 
		print "connect() failed"
 
		sys.exit(1)
 

	
 
	max_time = 60
 
	while not client.finished and max_time > 0:
 
	while not client.finished and not responder.finished and max_time > 0:
 
		time.sleep(1)
 
		max_time -= 1
 
	client.disconnect()
 
	responder.disconnect()
 

	
 
	os.system("killall spectrum2")
 
	os.system("killall ngircd")
 
	os.system("killall spectrum2_libcommuni_backend 2>/dev/null")
 

	
 
	ret = True
 
	for v in client.tests.values():
 
	tests = []
 
	tests += client.tests.values()
 
	tests += responder.tests.values()
 
	for v in tests:
 
		if v[1]:
 
			print v[0] + ": PASSED"
 
		else:
 
			print v[0] + ": FAILED"
 
			ret = False
 

	
 
	if not ret:
 
		os.system("cat spectrum2.log")
 

	
 
	return ret
 

	
 
exitcode = 0
0 comments (0 inline, 0 general)