Changeset - c68479f86d5e
[Not reviewed]
0 6 0
Jan Kaluza - 14 years ago 2011-08-03 13:08:48
hanzz.k@gmail.com
[service] idle_reconnect_time
6 files changed with 112 insertions and 12 deletions:
0 comments (0 inline, 0 general)
include/transport/networkpluginserver.h
Show inline comments
 
@@ -49,12 +49,13 @@ class NetworkPluginServer {
 
			Swift::SafeByteArray data;
 
			boost::shared_ptr<Swift::Connection> connection;
 
			unsigned long res;
 
			unsigned long init_res;
 
			unsigned long shared;
 
			bool acceptUsers;
 
			bool longRun;
 
		};
 

	
 
		NetworkPluginServer(Component *component, Config *config, UserManager *userManager);
 

	
 
		virtual ~NetworkPluginServer();
 

	
 
@@ -65,12 +66,14 @@ class NetworkPluginServer {
 
		const std::list<Backend *> &getBackends() {
 
			return m_clients;
 
		}
 

	
 
		void collectBackend();
 

	
 
		void moveToLongRunBackend(User *user);
 

	
 
		void handleMessageReceived(NetworkConversation *conv, boost::shared_ptr<Swift::Message> &message);
 

	
 
	private:
 
		void handleNewClientConnection(boost::shared_ptr<Swift::Connection> c);
 
		void handleSessionFinished(Backend *c);
 
		void handleDataRead(Backend *c, const Swift::SafeByteArray&);
 
@@ -104,22 +107,23 @@ class NetworkPluginServer {
 
		void handleVCardRequired(User *user, const std::string &name, unsigned int id);
 

	
 
		void send(boost::shared_ptr<Swift::Connection> &, const std::string &data);
 

	
 
		void pingTimeout();
 
		void sendPing(Backend *c);
 
		Backend *getFreeClient();
 
		Backend *getFreeClient(bool acceptUsers = true, bool longRun = false);
 

	
 
		UserManager *m_userManager;
 
		VCardResponder *m_vcardResponder;
 
		RosterResponder *m_rosterResponder;
 
		BlockResponder *m_blockResponder;
 
		Config *m_config;
 
		boost::shared_ptr<Swift::BoostConnectionServer> m_server;
 
		std::list<Backend *>  m_clients;
 
		Swift::Timer::ref m_pingTimer;
 
		Swift::Timer::ref m_collectTimer;
 
		Component *m_component;
 
		std::list<User *> m_waitingUsers;
 
		bool m_isNextLongRun;
 
};
 

	
 
}
include/transport/user.h
Show inline comments
 
@@ -72,26 +72,38 @@ class User {
 
		/// Handles presence from XMPP JID associated with this user.
 
		/// \param presence Swift::Presence.
 
		void handlePresence(Swift::Presence::ref presence);
 

	
 
		void handleSubscription(Swift::Presence::ref presence);
 

	
 
		time_t &getLastActivity() {
 
			return m_lastActivity;
 
		}
 

	
 
		void updateLastActivity() {
 
			m_lastActivity = time(NULL);
 
		}
 

	
 
		/// Returns language.
 
		/// \return language
 
		const char *getLang() { return "en"; }
 

	
 
		void handleDisconnected(const std::string &error);
 

	
 
		bool isReadyToConnect() {
 
			return m_readyForConnect;
 
		}
 

	
 
		void setConnected(bool connected) {
 
			m_connected = connected;
 
			setIgnoreDisconnect(false);
 
			updateLastActivity();
 
		}
 

	
 
		void setIgnoreDisconnect(bool ignoreDisconnect);
 

	
 
		bool isConnected() {
 
			return m_connected;
 
		}
 

	
 
		boost::signal<void ()> onReadyToConnect;
 
		boost::signal<void (Swift::Presence::ref presence)> onPresenceChanged;
 
@@ -110,11 +122,13 @@ class User {
 
		Swift::EntityCapsManager *m_entityCapsManager;
 
		Swift::PresenceOracle *m_presenceOracle;
 
		UserInfo m_userInfo;
 
		void *m_data;
 
		bool m_connected;
 
		bool m_readyForConnect;
 
		bool m_ignoreDisconnect;
 
		Swift::Timer::ref m_reconnectTimer;
 
		boost::shared_ptr<Swift::Connection> connection;
 
		time_t m_lastActivity;
 
};
 

	
 
}
spectrum/src/sample.cfg
Show inline comments
 
@@ -5,12 +5,13 @@ server = 127.0.0.1
 
port = 5222
 
server_mode = 1
 
backend_host=localhost # < this option doesn't work yet
 
backend_port=10001
 
admin_username=admin
 
admin_password=test
 
#idle_reconnect_time=10
 
#cert= #patch to PKCS#12 certificate
 
#cert_password= #password to that certificate if any
 
users_per_backend=10
 
backend=../../backends/libpurple/spectrum_libpurple_backend
 
#backend=../../backends/libircclient-qt/spectrum_libircclient-qt_backend
 
#protocol=prpl-msn
src/config.cpp
Show inline comments
 
@@ -44,12 +44,13 @@ bool Config::load(const std::string &configfile, boost::program_options::options
 
		("service.backend_port", value<std::string>()->default_value("10000"), "Port to bind backend server to")
 
		("service.cert", value<std::string>()->default_value(""), "PKCS#12 Certificate.")
 
		("service.cert_password", value<std::string>()->default_value(""), "PKCS#12 Certificate password.")
 
		("service.admin_username", value<std::string>()->default_value(""), "Administrator username.")
 
		("service.admin_password", value<std::string>()->default_value(""), "Administrator password.")
 
		("service.reuse_old_backends", value<bool>()->default_value(true), "True if Spectrum should use old backends which were full in the past.")
 
		("service.idle_reconnect_time", value<int>()->default_value(4*3600), "Time in seconds after which idle users are reconnected to let their backend die.")
 
		("identity.name", value<std::string>()->default_value("Spectrum 2 Transport"), "Name showed in service discovery.")
 
		("identity.category", value<std::string>()->default_value("gateway"), "Disco#info identity category. 'gateway' by default.")
 
		("identity.type", value<std::string>()->default_value(""), "Type of transport ('icq','msn','gg','irc', ...)")
 
		("registration.enable_public_registration", value<bool>()->default_value(true), "True if users should be able to register.")
 
		("registration.language", value<std::string>()->default_value("en"), "Default language for registration form")
 
		("registration.instructions", value<std::string>()->default_value(""), "Instructions showed to user in registration form")
src/networkpluginserver.cpp
Show inline comments
 
@@ -152,12 +152,13 @@ static void handleBuddyPayload(LocalBuddy *buddy, const pbnetwork::Buddy &payloa
 
}
 

	
 
NetworkPluginServer::NetworkPluginServer(Component *component, Config *config, UserManager *userManager) {
 
	m_userManager = userManager;
 
	m_config = config;
 
	m_component = component;
 
	m_isNextLongRun = false;
 
	m_component->m_factory = new NetworkFactory(this);
 
	m_userManager->onUserCreated.connect(boost::bind(&NetworkPluginServer::handleUserCreated, this, _1));
 
	m_userManager->onUserDestroyed.connect(boost::bind(&NetworkPluginServer::handleUserDestroyed, this, _1));
 

	
 
	m_pingTimer = component->getNetworkFactories()->getTimerFactory()->createTimer(20000);
 
	m_pingTimer->onTick.connect(boost::bind(&NetworkPluginServer::pingTimeout, this));
 
@@ -207,15 +208,16 @@ void NetworkPluginServer::handleNewClientConnection(boost::shared_ptr<Swift::Con
 
	Backend *client = new Backend;
 
	client->pongReceived = -1;
 
	client->connection = c;
 
	client->res = 0;
 
	client->init_res = 0;
 
	client->shared = 0;
 
	client->acceptUsers = true;
 
	client->acceptUsers = !m_isNextLongRun;
 
	client->longRun = m_isNextLongRun;
 

	
 
	LOG4CXX_INFO(logger, "New backend " << client << " connected. Current backend count=" << (m_clients.size() + 1));
 
	LOG4CXX_INFO(logger, "New" + (client->longRun ? std::string(" long-running") : "") +  " backend " << client << " connected. Current backend count=" << (m_clients.size() + 1));
 

	
 
	if (m_clients.size() == 0) {
 
		// first backend connected, start the server, we're ready.
 
		m_component->start();
 
	}
 

	
 
@@ -261,18 +263,18 @@ void NetworkPluginServer::handleSessionFinished(Backend *c) {
 
// 	c->connection->onDataRead.disconnect_all_slots();
 

	
 
	m_clients.remove(c);
 
	delete c;
 

	
 
	// Execute new session only if there's no free one after this crash/disconnection
 
	for (std::list<Backend *>::const_iterator it = m_clients.begin(); it != m_clients.end(); it++) {
 
		if ((*it)->users.size() < CONFIG_INT(m_config, "service.users_per_backend")) {
 
			return;
 
		}
 
	}
 
	exec_(CONFIG_STRING(m_config, "service.backend"), CONFIG_STRING(m_config, "service.backend_host").c_str(), CONFIG_STRING(m_config, "service.backend_port").c_str(), m_config->getConfigFile().c_str());
 
// 	for (std::list<Backend *>::const_iterator it = m_clients.begin(); it != m_clients.end(); it++) {
 
// 		if ((*it)->users.size() < CONFIG_INT(m_config, "service.users_per_backend")) {
 
// 			return;
 
// 		}
 
// 	}
 
// 	exec_(CONFIG_STRING(m_config, "service.backend"), CONFIG_STRING(m_config, "service.backend_host").c_str(), CONFIG_STRING(m_config, "service.backend_port").c_str(), m_config->getConfigFile().c_str());
 
}
 

	
 
void NetworkPluginServer::handleConnectedPayload(const std::string &data) {
 
	pbnetwork::Connected payload;
 
	if (payload.ParseFromString(data) == false) {
 
		// TODO: ERROR
 
@@ -449,12 +451,14 @@ void NetworkPluginServer::handleConvMessagePayload(const std::string &data, bool
 
	}
 
// 	std::cout << "payload 2...\n";
 
	User *user = m_userManager->getUser(payload.username());
 
	if (!user)
 
		return;
 

	
 
	user->updateLastActivity();
 

	
 
	boost::shared_ptr<Swift::Message> msg(new Swift::Message());
 
	if (subject) {
 
		msg->setSubject(payload.message());
 
	}
 
	else {
 
		msg->setBody(payload.message());
 
@@ -587,24 +591,51 @@ void NetworkPluginServer::send(boost::shared_ptr<Swift::Connection> &c, const st
 
	char header[4];
 
	*((int*)(header)) = htonl(data.size());
 
	c->write(Swift::createSafeByteArray(std::string(header, 4) + data));
 
}
 

	
 
void NetworkPluginServer::pingTimeout() {
 
	// TODO: move to separate timer, those 2 loops could be expensive
 
	time_t now = time(NULL);
 
	std::vector<User *> usersToMove;
 
	unsigned long diff = CONFIG_INT(m_config, "service.idle_reconnect_time");
 
	for (std::list<Backend *>::const_iterator it = m_clients.begin(); it != m_clients.end(); it++) {
 
		if ((*it)->longRun) {
 
			continue;
 
		}
 

	
 
		BOOST_FOREACH(User *u, (*it)->users) {
 
			if (now - u->getLastActivity() > diff) {
 
				usersToMove.push_back(u);
 
			}
 
		}
 
	}
 

	
 
	BOOST_FOREACH(User *u, usersToMove) {
 
		LOG4CXX_INFO(logger, "Moving user " << u->getJID().toString() << " to long-running backend");
 
		moveToLongRunBackend(u);
 
	}
 
	
 

	
 
	// check ping responses
 
	for (std::list<Backend *>::const_iterator it = m_clients.begin(); it != m_clients.end(); it++) {
 
		if ((*it)->pongReceived || (*it)->pongReceived == -1) {
 
			sendPing((*it));
 
		}
 
		else {
 
			LOG4CXX_INFO(logger, "Disconnecting backend " << (*it) << ". PING response not received.");
 
			(*it)->connection->disconnect();
 
			(*it)->connection.reset();
 
// 			handleSessionFinished((*it));
 
		}
 

	
 
		if ((*it)->users.size() == 0) {
 
			LOG4CXX_INFO(logger, "Disconnecting backend " << (*it) << ". There are no users.");
 
			(*it)->connection->disconnect();
 
			(*it)->connection.reset();
 
		}
 
	}
 
	m_pingTimer->start();
 
}
 

	
 
void NetworkPluginServer::collectBackend() {
 
	LOG4CXX_INFO(logger, "Collect backend called, finding backend which will be set to die");
 
@@ -621,12 +652,49 @@ void NetworkPluginServer::collectBackend() {
 
		m_collectTimer->start();
 
		LOG4CXX_INFO(logger, "Backend " << backend << "is set to die");
 
		backend->acceptUsers = false;
 
	}
 
}
 

	
 
void NetworkPluginServer::moveToLongRunBackend(User *user) {
 
	// Check if user has already some backend
 
	Backend *old = (Backend *) user->getData();
 
	if (!old) {
 
		LOG4CXX_INFO(logger, "User " << user->getJID().toString() << " does not have old backend. Not moving.");
 
		return;
 
	}
 

	
 
	// if he's already on long run, do nothing
 
	if (old->longRun) {
 
		LOG4CXX_INFO(logger, "User " << user->getJID().toString() << " is already on long-running backend. Not moving.");
 
		return;
 
	}
 

	
 
	// Get free longrun backend, if there's no longrun backend, create one and wait
 
	// for its connection
 
	Backend *backend = getFreeClient(false, true);
 
	if (!backend) {
 
		LOG4CXX_INFO(logger, "No free long-running backend for user " << user->getJID().toString() << ". Will try later");
 
		return;
 
	}
 

	
 
	// old backend will trigger disconnection which has to be ignored to keep user online
 
	user->setIgnoreDisconnect(true);
 

	
 
	// remove user from the old backend
 
	// If backend is empty, it will be collected by pingTimeout
 
	old->users.remove(user);
 

	
 
	// switch to new backend and connect
 
	user->setData(backend);
 
	backend->users.push_back(user);
 

	
 
	// connect him
 
	handleUserReadyToConnect(user);
 
}
 

	
 
void NetworkPluginServer::handleUserCreated(User *user) {
 
	Backend *c = getFreeClient();
 

	
 
	if (!c) {
 
		LOG4CXX_INFO(logger, "There is no backend to handle user " << user->getJID().toString() << ". Adding him to queue.");
 
		m_waitingUsers.push_back(user);
 
@@ -768,13 +836,13 @@ void NetworkPluginServer::handleUserDestroyed(User *user) {
 
// 		handleSessionFinished(c);
 
// 		m_clients.erase(user->connection);
 
	}
 
}
 

	
 
void NetworkPluginServer::handleMessageReceived(NetworkConversation *conv, boost::shared_ptr<Swift::Message> &msg) {
 

	
 
	conv->getConversationManager()->getUser()->updateLastActivity();
 
	boost::shared_ptr<Swift::ChatState> statePayload = msg->getPayload<Swift::ChatState>();
 
	if (statePayload) {
 
		pbnetwork::WrapperMessage_Type type = pbnetwork::WrapperMessage_Type_TYPE_BUDDY_CHANGED;
 
		switch (statePayload->getChatState()) {
 
			case Swift::ChatState::Active:
 
				type = pbnetwork::WrapperMessage_Type_TYPE_BUDDY_STOPPED_TYPING;
 
@@ -975,29 +1043,30 @@ void NetworkPluginServer::sendPing(Backend *c) {
 
		send(c->connection, message);
 
		c->pongReceived = false;
 
	}
 
// 	LOG4CXX_INFO(logger, "PING to " << c);
 
}
 

	
 
NetworkPluginServer::Backend *NetworkPluginServer::getFreeClient() {
 
NetworkPluginServer::Backend *NetworkPluginServer::getFreeClient(bool acceptUsers, bool longRun) {
 
	NetworkPluginServer::Backend *c = NULL;
 
// 	bool spawnNew = false;
 
	for (std::list<Backend *>::const_iterator it = m_clients.begin(); it != m_clients.end(); it++) {
 
		// This backend is free.
 
		if ((*it)->acceptUsers && (*it)->users.size() < CONFIG_INT(m_config, "service.users_per_backend") && (*it)->connection) {
 
		if ((*it)->acceptUsers == acceptUsers && (*it)->users.size() < CONFIG_INT(m_config, "service.users_per_backend") && (*it)->connection && (*it)->longRun == longRun) {
 
			c = *it;
 
			if (!CONFIG_BOOL(m_config, "service.reuse_old_backends")) {
 
				if (c->users.size() + 1 >= CONFIG_INT(m_config, "service.users_per_backend")) {
 
					c->acceptUsers = false;
 
				}
 
			}
 
			break;
 
		}
 
	}
 

	
 
	if (c == NULL) {
 
		m_isNextLongRun = longRun;
 
		exec_(CONFIG_STRING(m_config, "service.backend"), CONFIG_STRING(m_config, "service.backend_host").c_str(), CONFIG_STRING(m_config, "service.backend_port").c_str(), m_config->getConfigFile().c_str());
 
	}
 

	
 
	return c;
 
}
 

	
src/user.cpp
Show inline comments
 
@@ -48,19 +48,21 @@ User::User(const Swift::JID &jid, UserInfo &userInfo, Component *component, User
 
	m_presenceOracle = component->m_presenceOracle;
 
	m_entityCapsManager = component->m_entityCapsManager;
 
	m_userManager = userManager;
 
	m_userInfo = userInfo;
 
	m_connected = false;
 
	m_readyForConnect = false;
 
	m_ignoreDisconnect = false;
 

	
 
	m_reconnectTimer = m_component->getNetworkFactories()->getTimerFactory()->createTimer(10000);
 
	m_reconnectTimer->onTick.connect(boost::bind(&User::onConnectingTimeout, this)); 
 

	
 
	m_rosterManager = new RosterManager(this, m_component);
 
	m_conversationManager = new ConversationManager(this, m_component);
 
	LOG4CXX_INFO(logger, m_jid.toString() << ": Created");
 
	updateLastActivity();
 
}
 

	
 
User::~User(){
 
	LOG4CXX_INFO(logger, m_jid.toString() << ": Destroying");
 
	if (m_component->inServerMode()) {
 
		dynamic_cast<Swift::ServerStanzaChannel *>(m_component->getStanzaChannel())->finishSession(m_jid, boost::shared_ptr<Swift::Element>());
 
@@ -172,13 +174,22 @@ void User::onConnectingTimeout() {
 
		return;
 
	m_reconnectTimer->stop();
 
	m_readyForConnect = true;
 
	onReadyToConnect();
 
}
 

	
 
void User::setIgnoreDisconnect(bool ignoreDisconnect) {
 
	m_ignoreDisconnect = ignoreDisconnect;
 
	LOG4CXX_INFO(logger, m_jid.toString() << ": Setting ignoreDisconnect=" << m_ignoreDisconnect);
 
}
 

	
 
void User::handleDisconnected(const std::string &error) {
 
	if (m_ignoreDisconnect) {
 
		LOG4CXX_INFO(logger, m_jid.toString() << ": Disconnecting from legacy network ignored (probably moving between backends)");
 
		return;
 
	}
 

	
 
	if (error.empty()) {
 
		LOG4CXX_INFO(logger, m_jid.toString() << ": Disconnected from legacy network");
 
	}
 
	else {
 
		LOG4CXX_INFO(logger, m_jid.toString() << ": Disconnected from legacy network with error " << error);
0 comments (0 inline, 0 general)