Changeset - f082fdf50542
[Not reviewed]
0 3 2
Jan Kaluza - 10 years ago 2015-12-16 15:01:18
jkaluza@redhat.com
Test join/leave IRC room and change IRC room topic
5 files changed with 106 insertions and 2 deletions:
0 comments (0 inline, 0 general)
.gitignore
Show inline comments
 
@@ -28,7 +28,8 @@ spectrum2_twitter_backend
 
install_manifest.txt
 
slack.cfg
 
*.sqlite
 
localhost.port
 
*.a
 
core.*
 
*.pyc
 

	
spectrum/src/tests/muc_change_topic.py
Show inline comments
 
new file 100644
 
import optparse
 
import sys
 
import time
 
import subprocess
 
import os
 

	
 
import sleekxmpp
 

	
 

	
 
class Responder(sleekxmpp.ClientXMPP):
 
	def __init__(self, jid, password, room, nick):
 
		sleekxmpp.ClientXMPP.__init__(self, jid, password)
 
		self.room = room
 
		self.nick = nick
 
		self.finished = False
 
		self.add_event_handler("session_start", self.start)
 
		self.add_event_handler("groupchat_message", self.muc_message)
 

	
 
		self.tests = {}
 

	
 
	def muc_message(self, msg):
 
		if msg['body'] == "ready":
 
			self.send_message(mto=self.room, mbody=None, msubject='The new subject', mtype='groupchat')
 

	
 
	def start(self, event):
 
		self.plugin['xep_0045'].joinMUC(self.room, self.nick, wait=True)
 

	
 
class Client(sleekxmpp.ClientXMPP):
 
	def __init__(self, jid, password, room, nick):
 
		sleekxmpp.ClientXMPP.__init__(self, jid, password)
 
		self.room = room
 
		self.nick = nick
 
		self.add_event_handler("session_start", self.start)
 
		self.add_event_handler("groupchat_subject", self.muc_subject)
 
		self.finished = False
 

	
 
		self.tests = {}
 
		self.tests["subject_changed"] = ["libcommuni: Change topic", False]
 

	
 
	def muc_subject(self, msg):
 
		if msg['subject'] == "The new subject":
 
			self.tests["subject_changed"][1] = True
 
			self.finished = True
 

	
 
	def start(self, event):
 
		self.getRoster()
 
		self.sendPresence()
 
		self.plugin['xep_0045'].joinMUC(self.room, self.nick, wait=True)
 
		self.send_message(mto=self.room, mbody="ready", mtype='groupchat')
spectrum/src/tests/muc_echo.py
Show inline comments
 
@@ -9,12 +9,13 @@ import sleekxmpp
 

	
 
class Responder(sleekxmpp.ClientXMPP):
 
	def __init__(self, jid, password, room, nick):
 
		sleekxmpp.ClientXMPP.__init__(self, jid, password)
 
		self.room = room
 
		self.nick = nick
 
		self.finished = False
 
		self.add_event_handler("session_start", self.start)
 
		self.add_event_handler("groupchat_message", self.muc_message)
 

	
 
		self.tests = {}
 

	
 
	def muc_message(self, msg):
spectrum/src/tests/muc_join_leave.py
Show inline comments
 
new file 100644
 
import optparse
 
import sys
 
import time
 
import subprocess
 
import os
 

	
 
import sleekxmpp
 

	
 

	
 
class Responder(sleekxmpp.ClientXMPP):
 
	def __init__(self, jid, password, room, nick):
 
		sleekxmpp.ClientXMPP.__init__(self, jid, password)
 
		self.room = room
 
		self.nick = nick
 
		self.finished = False
 
		self.add_event_handler("session_start", self.start)
 
		self.add_event_handler("muc::" + room + "::got_online", self.muc_got_online)
 
		self.add_event_handler("muc::" + room + "::got_offline", self.muc_got_offline)
 

	
 
		self.tests = {}
 
		self.tests["online_received"] = ["libcommuni: Received available presence", False]
 
		self.tests["offline_received"] = ["libcommuni: Received unavailable presence", False]
 

	
 
	def muc_got_online(self, presence):
 
		if presence['muc']['nick'] == "client":
 
			self.tests["online_received"][1] = True
 

	
 
	def muc_got_offline(self, presence):
 
		if presence['muc']['nick'] == "client":
 
			self.tests["offline_received"][1] = True
 
			self.finished = True
 

	
 
	def start(self, event):
 
		self.plugin['xep_0045'].joinMUC(self.room, self.nick, wait=True)
 

	
 
class Client(sleekxmpp.ClientXMPP):
 
	def __init__(self, jid, password, room, nick):
 
		sleekxmpp.ClientXMPP.__init__(self, jid, password)
 
		self.room = room
 
		self.nick = nick
 
		self.add_event_handler("session_start", self.start)
 
		self.finished = False
 

	
 
		self.tests = {}
 

	
 
	def start(self, event):
 
		self.getRoster()
 
		self.sendPresence()
 
		self.plugin['xep_0045'].joinMUC(self.room, self.nick, wait=True)
 
		self.plugin['xep_0045'].leaveMUC(self.room, self.nick)
spectrum/src/tests/start.py
Show inline comments
 
@@ -35,24 +35,27 @@ def single_test(Client, Responder):
 
		client.process(block=False)
 
	else:
 
		print "connect() failed"
 
		sys.exit(1)
 

	
 
	max_time = 60
 
	while not client.finished and max_time > 0:
 
	while not client.finished and not responder.finished and max_time > 0:
 
		time.sleep(1)
 
		max_time -= 1
 
	client.disconnect()
 
	responder.disconnect()
 

	
 
	os.system("killall spectrum2")
 
	os.system("killall ngircd")
 
	os.system("killall spectrum2_libcommuni_backend 2>/dev/null")
 

	
 
	ret = True
 
	for v in client.tests.values():
 
	tests = []
 
	tests += client.tests.values()
 
	tests += responder.tests.values()
 
	for v in tests:
 
		if v[1]:
 
			print v[0] + ": PASSED"
 
		else:
 
			print v[0] + ": FAILED"
 
			ret = False
 

	
0 comments (0 inline, 0 general)