Changeset - 2ae5cba932b1
[Not reviewed]
0 3 0
HanzZ - 13 years ago 2012-12-27 15:02:21
hanzz.k@gmail.com
service.login_delay
3 files changed with 52 insertions and 22 deletions:
0 comments (0 inline, 0 general)
include/transport/networkpluginserver.h
Show inline comments
 
@@ -120,47 +120,51 @@ class NetworkPluginServer {
 
		void handleRoomLeft(User *user, const std::string &room);
 
		void handleUserReadyToConnect(User *user);
 
		void handleUserPresenceChanged(User *user, Swift::Presence::ref presence);
 
		void handleUserDestroyed(User *user);
 

	
 
		void handleBuddyUpdated(Buddy *buddy, const Swift::RosterItemPayload &item);
 
		void handleBuddyRemoved(Buddy *buddy);
 
		void handleBuddyAdded(Buddy *buddy, const Swift::RosterItemPayload &item);
 

	
 
		void handleBlockToggled(Buddy *buddy);
 

	
 
		void handleVCardUpdated(User *user, boost::shared_ptr<Swift::VCard> vcard);
 
		void handleVCardRequired(User *user, const std::string &name, unsigned int id);
 

	
 
		void handleFTStateChanged(Swift::FileTransfer::State state, const std::string &userName, const std::string &buddyName, const std::string &fileName, unsigned long size, unsigned long id);
 
		void handleFTAccepted(User *user, const std::string &buddyName, const std::string &fileName, unsigned long size, unsigned long ftID);
 
		void handleFTRejected(User *user, const std::string &buddyName, const std::string &fileName, unsigned long size);
 
		void handleFTDataNeeded(Backend *b, unsigned long ftid);
 

	
 
	private:
 
		void send(boost::shared_ptr<Swift::Connection> &, const std::string &data);
 

	
 
		void pingTimeout();
 
		void sendPing(Backend *c);
 
		Backend *getFreeClient(bool acceptUsers = true, bool longRun = false);
 
		Backend *getFreeClient(bool acceptUsers = true, bool longRun = false, bool check = false);
 
		void connectWaitingUsers();
 
		void loginDelayFinished();
 

	
 
		UserManager *m_userManager;
 
		VCardResponder *m_vcardResponder;
 
		RosterResponder *m_rosterResponder;
 
		BlockResponder *m_blockResponder;
 
		Config *m_config;
 
		boost::shared_ptr<Swift::ConnectionServer> m_server;
 
		std::list<Backend *>  m_clients;
 
		Swift::Timer::ref m_pingTimer;
 
		Swift::Timer::ref m_collectTimer;
 
		Swift::Timer::ref m_loginTimer;
 
		Component *m_component;
 
		std::list<User *> m_waitingUsers;
 
		bool m_isNextLongRun;
 
		std::map<unsigned long, FileTransferManager::Transfer> m_filetransfers;
 
		FileTransferManager *m_ftManager;
 
		std::vector<std::string> m_crashedBackends;
 
		AdminInterface *m_adminInterface;
 
		bool m_startingBackend;
 
		DiscoItemsResponder *m_discoItemsResponder;
 
		time_t m_lastLogin;
 
};
 

	
 
}
src/config.cpp
Show inline comments
 
@@ -77,48 +77,49 @@ bool Config::load(std::istream &ifs, boost::program_options::options_description
 
		("service.port", value<int>()->default_value(0), "Port the server is listening on")
 
		("service.user", value<std::string>()->default_value(""), "The name of user Spectrum runs as.")
 
		("service.group", value<std::string>()->default_value(""), "The name of group Spectrum runs as.")
 
		("service.backend", value<std::string>()->default_value("libpurple_backend"), "Backend")
 
		("service.protocol", value<std::string>()->default_value(""), "Protocol")
 
		("service.pidfile", value<std::string>()->default_value("/var/run/spectrum2/$jid.pid"), "Full path to pid file")
 
		("service.portfile", value<std::string>()->default_value("/var/run/spectrum2/$jid.port"), "File to store backend_port to. It's used by spectrum2_manager.")
 
		("service.working_dir", value<std::string>()->default_value("/var/lib/spectrum2/$jid"), "Working dir")
 
		("service.allowed_servers", value<std::vector<std::string> >()->multitoken(), "Only users from these servers can connect")
 
		("service.server_mode", value<bool>()->default_value(false), "True if Spectrum should behave as server")
 
		("service.users_per_backend", value<int>()->default_value(100), "Number of users per one legacy network backend")
 
		("service.backend_host", value<std::string>()->default_value("localhost"), "Host to bind backend server to")
 
		("service.backend_port", value<std::string>()->default_value("0"), "Port to bind backend server to")
 
		("service.cert", value<std::string>()->default_value(""), "PKCS#12 Certificate.")
 
		("service.cert_password", value<std::string>()->default_value(""), "PKCS#12 Certificate password.")
 
		("service.admin_jid", value<std::vector<std::string> >()->multitoken(), "Administrator jid.")
 
		("service.admin_password", value<std::string>()->default_value(""), "Administrator password.")
 
		("service.reuse_old_backends", value<bool>()->default_value(true), "True if Spectrum should use old backends which were full in the past.")
 
		("service.idle_reconnect_time", value<int>()->default_value(0), "Time in seconds after which idle users are reconnected to let their backend die.")
 
		("service.memory_collector_time", value<int>()->default_value(0), "Time in seconds after which backend with most memory is set to die.")
 
		("service.more_resources", value<bool>()->default_value(false), "Allow more resources to be connected in server mode at the same time.")
 
		("service.enable_privacy_lists", value<bool>()->default_value(true), "")
 
		("service.enable_xhtml", value<bool>()->default_value(true), "")
 
		("service.max_room_list_size", value<int>()->default_value(100), "")
 
		("service.login_delay", value<int>()->default_value(0), "")
 
		("service.jid_escaping", value<bool>()->default_value(true), "")
 
		("service.vip_only", value<bool>()->default_value(false), "")
 
		("service.vip_message", value<std::string>()->default_value(""), "")
 
		("vhosts.vhost", value<std::vector<std::string> >()->multitoken(), "")
 
		("identity.name", value<std::string>()->default_value("Spectrum 2 Transport"), "Name showed in service discovery.")
 
		("identity.category", value<std::string>()->default_value("gateway"), "Disco#info identity category. 'gateway' by default.")
 
		("identity.type", value<std::string>()->default_value(""), "Type of transport ('icq','msn','gg','irc', ...)")
 
		("registration.enable_public_registration", value<bool>()->default_value(true), "True if users should be able to register.")
 
		("registration.language", value<std::string>()->default_value("en"), "Default language for registration form")
 
		("registration.instructions", value<std::string>()->default_value("Enter your legacy network username and password."), "Instructions showed to user in registration form")
 
		("registration.username_label", value<std::string>()->default_value("Legacy network username:"), "Label for username field")
 
		("registration.username_mask", value<std::string>()->default_value(""), "Username mask")
 
		("registration.allowed_usernames", value<std::string>()->default_value(""), "Allowed usernames")
 
		("registration.auto_register", value<bool>()->default_value(false), "Register new user automatically when the presence arrives.")
 
		("registration.encoding", value<std::string>()->default_value("utf8"), "Default encoding in registration form")
 
		("registration.require_local_account", value<bool>()->default_value(false), "True if users have to have a local account to register to this transport from remote servers.")
 
		("registration.notify_jid", value<std::vector<std::string> >()->multitoken(), "Send message to this JID if user registers/unregisters")
 
		("registration.local_username_label", value<std::string>()->default_value("Local username:"), "Label for local usernme field")
 
		("registration.local_account_server", value<std::string>()->default_value("localhost"), "The server on which the local accounts will be checked for validity")
 
		("registration.local_account_server_timeout", value<int>()->default_value(10000), "Timeout when checking local user on local_account_server (msecs)")
 
		("gateway_responder.prompt", value<std::string>()->default_value("Contact ID"), "Value of <prompt> </promt> field")
 
		("gateway_responder.label", value<std::string>()->default_value("Enter legacy network contact ID."), "Label for add contact ID field")
 
		("database.type", value<std::string>()->default_value("none"), "Database type.")
 
		("database.database", value<std::string>()->default_value("/var/lib/spectrum2/$jid/database.sql"), "Database used to store data")
src/networkpluginserver.cpp
Show inline comments
 
@@ -237,57 +237,62 @@ static void handleBuddyPayload(LocalBuddy *buddy, const pbnetwork::Buddy &payloa
 
	// Change groups if it's not empty. The same as above...
 
	std::vector<std::string> groups;
 
	for (int i = 0; i < payload.group_size(); i++) {
 
		std::string group = payload.group(i);
 
		utf8::replace_invalid(payload.group(i).begin(), payload.group(i).end(), group.begin(), '_');
 
		groups.push_back(group);
 
	}
 
	if (!groups.empty()) {
 
		buddy->setGroups(groups);
 
	}
 

	
 
	buddy->setStatus(Swift::StatusShow((Swift::StatusShow::Type) payload.status()), payload.statusmessage());
 
	buddy->setIconHash(payload.iconhash());
 
	buddy->setBlocked(payload.blocked());
 
}
 

	
 
NetworkPluginServer::NetworkPluginServer(Component *component, Config *config, UserManager *userManager, FileTransferManager *ftManager, DiscoItemsResponder *discoItemsResponder) {
 
	m_ftManager = ftManager;
 
	m_userManager = userManager;
 
	m_config = config;
 
	m_component = component;
 
	m_isNextLongRun = false;
 
	m_adminInterface = NULL;
 
	m_startingBackend = false;
 
	m_lastLogin = 0;
 
	m_discoItemsResponder = discoItemsResponder;
 
	m_component->m_factory = new NetworkFactory(this);
 
	m_userManager->onUserCreated.connect(boost::bind(&NetworkPluginServer::handleUserCreated, this, _1));
 
	m_userManager->onUserDestroyed.connect(boost::bind(&NetworkPluginServer::handleUserDestroyed, this, _1));
 

	
 
	m_pingTimer = component->getNetworkFactories()->getTimerFactory()->createTimer(20000);
 
	m_pingTimer->onTick.connect(boost::bind(&NetworkPluginServer::pingTimeout, this));
 
	m_pingTimer->start();
 

	
 
	m_loginTimer = component->getNetworkFactories()->getTimerFactory()->createTimer(CONFIG_INT(config, "service.login_delay") * 1000);
 
	m_loginTimer->onTick.connect(boost::bind(&NetworkPluginServer::loginDelayFinished, this));
 
	m_loginTimer->start();
 

	
 
	if (CONFIG_INT(m_config, "service.memory_collector_time") != 0) {
 
		m_collectTimer = component->getNetworkFactories()->getTimerFactory()->createTimer(CONFIG_INT(m_config, "service.memory_collector_time"));
 
		m_collectTimer->onTick.connect(boost::bind(&NetworkPluginServer::collectBackend, this));
 
		m_collectTimer->start();
 
	}
 

	
 
	m_vcardResponder = new VCardResponder(component->getIQRouter(), component->getNetworkFactories(), userManager);
 
	m_vcardResponder->onVCardRequired.connect(boost::bind(&NetworkPluginServer::handleVCardRequired, this, _1, _2, _3));
 
	m_vcardResponder->onVCardUpdated.connect(boost::bind(&NetworkPluginServer::handleVCardUpdated, this, _1, _2));
 
	m_vcardResponder->start();
 

	
 
	m_rosterResponder = new RosterResponder(component->getIQRouter(), userManager);
 
	m_rosterResponder->onBuddyAdded.connect(boost::bind(&NetworkPluginServer::handleBuddyAdded, this, _1, _2));
 
	m_rosterResponder->onBuddyRemoved.connect(boost::bind(&NetworkPluginServer::handleBuddyRemoved, this, _1));
 
	m_rosterResponder->onBuddyUpdated.connect(boost::bind(&NetworkPluginServer::handleBuddyUpdated, this, _1, _2));
 
	m_rosterResponder->start();
 

	
 
	m_blockResponder = new BlockResponder(component->getIQRouter(), userManager);
 
	m_blockResponder->onBlockToggled.connect(boost::bind(&NetworkPluginServer::handleBlockToggled, this, _1));
 
	m_blockResponder->start();
 

	
 
	m_server = component->getNetworkFactories()->getConnectionServerFactory()->createConnectionServer(Swift::HostAddress(CONFIG_STRING(m_config, "service.backend_host")), boost::lexical_cast<int>(CONFIG_STRING(m_config, "service.backend_port")));
 
	m_server->onNewConnection.connect(boost::bind(&NetworkPluginServer::handleNewClientConnection, this, _1));
 
}
 
@@ -334,48 +339,53 @@ void NetworkPluginServer::start() {
 
				if (WEXITSTATUS(status) != 0) {
 
					if (status == 254) {
 
						LOG4CXX_ERROR(logger, "Backend can not be started, because it needs database to store data, but the database backend is not configured.");
 
					}
 
					else {
 
						LOG4CXX_ERROR(logger, "Backend can not be started, exit_code=" << WEXITSTATUS(status) << ", possible error: " << strerror(WEXITSTATUS(status)));
 
					}
 
					LOG4CXX_ERROR(logger, "Check backend log for more details");
 
					continue;
 
				}
 
			}
 
			else {
 
				LOG4CXX_ERROR(logger, "Backend can not be started");
 
				continue;
 
			}
 
		}
 

	
 
		signal(SIGCHLD, SigCatcher);
 
#endif
 
		// quit the while loop
 
		break;
 
	}
 
}
 

	
 
void NetworkPluginServer::loginDelayFinished() {
 
	m_loginTimer->stop();
 
	connectWaitingUsers();
 
}
 

	
 
void NetworkPluginServer::handleNewClientConnection(boost::shared_ptr<Swift::Connection> c) {
 
	// Create new Backend instance
 
	Backend *client = new Backend;
 
	client->pongReceived = -1;
 
	client->connection = c;
 
	client->res = 0;
 
	client->init_res = 0;
 
	client->shared = 0;
 
	// Until we receive first PONG from backend, backend is in willDie state.
 
	client->willDie = true;
 
	// Backend does not accept new clients automatically if it's long-running
 
	client->acceptUsers = !m_isNextLongRun;
 
	client->longRun = m_isNextLongRun;
 

	
 
	m_startingBackend = false;
 

	
 
	LOG4CXX_INFO(logger, "New" + (client->longRun ? std::string(" long-running") : "") +  " backend " << client << " connected. Current backend count=" << (m_clients.size() + 1));
 

	
 
	m_clients.push_front(client);
 

	
 
	c->onDisconnected.connect(boost::bind(&NetworkPluginServer::handleSessionFinished, this, client));
 
	c->onDataRead.connect(boost::bind(&NetworkPluginServer::handleDataRead, this, client, _1));
 
	sendPing(client);
 

	
 
@@ -836,79 +846,83 @@ void NetworkPluginServer::handleFTDataPayload(Backend *b, const std::string &dat
 
		f.set_data("");
 

	
 
		std::string message;
 
		f.SerializeToString(&message);
 

	
 
		WRAP(message, pbnetwork::WrapperMessage_Type_TYPE_FT_PAUSE);
 

	
 
		send(b->connection, message);
 
	}
 
}
 

	
 
void NetworkPluginServer::handleFTDataNeeded(Backend *b, unsigned long ftid) {
 
	pbnetwork::FileTransferData f;
 
	f.set_ftid(ftid);
 
	f.set_data("");
 

	
 
	std::string message;
 
	f.SerializeToString(&message);
 

	
 
	WRAP(message, pbnetwork::WrapperMessage_Type_TYPE_FT_CONTINUE);
 

	
 
	send(b->connection, message);
 
}
 

	
 
void NetworkPluginServer::connectWaitingUsers() {
 
	// some users are in queue waiting for this backend
 
	while(!m_waitingUsers.empty()) {
 
		// There's no new backend, so stop associating users and wait for new backend,
 
		// which has been already spawned in getFreeClient() call.
 
		if (getFreeClient(true, false, true) == NULL)
 
			break;
 

	
 
		User *u = m_waitingUsers.front();
 
		m_waitingUsers.pop_front();
 

	
 
		LOG4CXX_INFO(logger, "Associating " << u->getJID().toString() << " with this backend");
 

	
 
		// associate backend with user
 
		handleUserCreated(u);
 

	
 
		// connect user if it's ready
 
		if (u->isReadyToConnect()) {
 
			handleUserReadyToConnect(u);
 
		}
 
	}
 
}
 

	
 
void NetworkPluginServer::handlePongReceived(Backend *c) {
 
	// This could be first PONG from the backend
 
	if (c->pongReceived == -1) {
 
		// Backend is fully ready to handle requests
 
		c->willDie = false;
 

	
 
		if (m_clients.size() == 1) {
 
			// first backend connected, start the server, we're ready.
 
			m_component->start();
 
		}
 

	
 
		// some users are in queue waiting for this backend
 
		while(!m_waitingUsers.empty()) {
 
			// There's no new backend, so stop associating users and wait for new backend,
 
			// which has been already spawned in getFreeClient() call.
 
			if (getFreeClient() == NULL)
 
				break;
 

	
 
			User *u = m_waitingUsers.front();
 
			m_waitingUsers.pop_front();
 

	
 
			LOG4CXX_INFO(logger, "Associating " << u->getJID().toString() << " with this backend");
 

	
 
			// associate backend with user
 
			handleUserCreated(u);
 

	
 
			// connect user if it's ready
 
			if (u->isReadyToConnect()) {
 
				handleUserReadyToConnect(u);
 
			}
 
		}
 
		connectWaitingUsers();
 
	}
 

	
 
	c->pongReceived = true;
 
}
 

	
 
void NetworkPluginServer::handleQueryPayload(Backend *b, const std::string &data) {
 
	pbnetwork::BackendConfig payload;
 
	if (payload.ParseFromString(data) == false) {
 
		// TODO: ERROR
 
		return;
 
	}
 

	
 
	if (!m_adminInterface) {
 
		return;
 
	}
 

	
 
	boost::shared_ptr<Swift::Message> msg(new Swift::Message());
 
	msg->setBody(payload.config());
 
	m_adminInterface->handleQuery(msg);
 

	
 
	pbnetwork::BackendConfig response;
 
	response.set_config(msg->getBody());
 

	
 
	std::string message;
 
@@ -1621,51 +1635,62 @@ void NetworkPluginServer::handleFTStateChanged(Swift::FileTransfer::State state,
 
	}
 
	if (state.state == Swift::FileTransfer::State::Transferring) {
 
		handleFTAccepted(user, buddyName, fileName, size, id);
 
	}
 
	else if (state.state == Swift::FileTransfer::State::Canceled) {
 
		handleFTRejected(user, buddyName, fileName, size);
 
	}
 
}
 

	
 
void NetworkPluginServer::sendPing(Backend *c) {
 

	
 
	std::string message;
 
	pbnetwork::WrapperMessage wrap;
 
	wrap.set_type(pbnetwork::WrapperMessage_Type_TYPE_PING);
 
	wrap.SerializeToString(&message);
 

	
 
	if (c->connection) {
 
		LOG4CXX_INFO(logger, "PING to " << c << " (ID=" << c->id << ")");
 
		send(c->connection, message);
 
		c->pongReceived = false;
 
	}
 
// 	LOG4CXX_INFO(logger, "PING to " << c);
 
}
 

	
 
NetworkPluginServer::Backend *NetworkPluginServer::getFreeClient(bool acceptUsers, bool longRun) {
 
NetworkPluginServer::Backend *NetworkPluginServer::getFreeClient(bool acceptUsers, bool longRun, bool check) {
 
	NetworkPluginServer::Backend *c = NULL;
 

	
 
	unsigned long diff = CONFIG_INT(m_config, "service.login_delay");
 
	time_t now = time(NULL);
 
	if (diff && (now - m_lastLogin < diff)) {
 
		m_loginTimer->start();
 
		return NULL;
 
	}
 

	
 
	if (!check) {
 
		m_lastLogin = time(NULL);
 
	}
 

	
 
	// Check all backends and find free one
 
	for (std::list<Backend *>::const_iterator it = m_clients.begin(); it != m_clients.end(); it++) {
 
		if ((*it)->willDie == false && (*it)->acceptUsers == acceptUsers && (*it)->users.size() < CONFIG_INT(m_config, "service.users_per_backend") && (*it)->connection && (*it)->longRun == longRun) {
 
			c = *it;
 
			// if we're not reusing all backends and backend is full, stop accepting new users on this backend
 
			if (!CONFIG_BOOL(m_config, "service.reuse_old_backends")) {
 
				if (c->users.size() + 1 >= CONFIG_INT(m_config, "service.users_per_backend")) {
 
					c->acceptUsers = false;
 
				}
 
			}
 
			break;
 
		}
 
	}
 

	
 
	// there's no free backend, so spawn one.
 
	if (c == NULL && !m_startingBackend) {
 
		m_isNextLongRun = longRun;
 
		m_startingBackend = true;
 
		exec_(CONFIG_STRING(m_config, "service.backend"), CONFIG_STRING(m_config, "service.backend_host").c_str(), CONFIG_STRING(m_config, "service.backend_port").c_str(), m_config->getCommandLineArgs().c_str());
 
	}
 

	
 
	return c;
 
}
 

	
0 comments (0 inline, 0 general)