Changeset - 98687145b02a
[Not reviewed]
0 5 0
Jan Kaluza - 14 years ago 2011-05-26 08:53:05
hanzz.k@gmail.com
first more or less working version of multiple-backend support
5 files changed with 92 insertions and 47 deletions:
0 comments (0 inline, 0 general)
include/transport/networkplugin.h
Show inline comments
 
@@ -65,24 +65,26 @@ class NetworkPlugin {
 
		void connect();
 
		void handleLoginPayload(const std::string &payload);
 
		void handleLogoutPayload(const std::string &payload);
 
		void handleConvMessagePayload(const std::string &payload);
 
		void handleJoinRoomPayload(const std::string &payload);
 
		void handleLeaveRoomPayload(const std::string &payload);
 
		void handleDataRead(const Swift::ByteArray&);
 
		void handleConnected(bool error);
 
		void handleDisconnected();
 

	
 
		void send(const std::string &data);
 
		void sendPong();
 
		void pingTimeout();
 

	
 
		std::string m_data;
 
		std::string m_host;
 
		int m_port;
 
		Swift::BoostNetworkFactories *m_factories;
 
		Swift::BoostIOServiceThread m_boostIOServiceThread;
 
		boost::shared_ptr<Swift::Connection> m_conn;
 
		Swift::Timer::ref m_reconnectTimer;
 
		Swift::Timer::ref m_pingTimer;
 
		bool m_pingReceived;
 

	
 
};
 

	
 
}
include/transport/networkpluginserver.h
Show inline comments
 
@@ -31,24 +31,30 @@
 
namespace Transport {
 

	
 
class UserManager;
 
class User;
 
class Component;
 
class Buddy;
 
class LocalBuddy;
 
class Config;
 
class NetworkConversation;
 

	
 
class NetworkPluginServer {
 
	public:
 
		struct Client {
 
			bool pongReceived;
 
			std::list<User *> users;
 
			std::string data;
 
		};
 

	
 
		NetworkPluginServer(Component *component, Config *config, UserManager *userManager);
 

	
 
		virtual ~NetworkPluginServer();
 

	
 
		void handleMessageReceived(NetworkConversation *conv, boost::shared_ptr<Swift::Message> &message);
 

	
 
	private:
 
		void handleNewClientConnection(boost::shared_ptr<Swift::Connection> c);
 
		void handleSessionFinished(boost::shared_ptr<Swift::Connection>);
 
		void handleDataRead(boost::shared_ptr<Swift::Connection>, const Swift::ByteArray&);
 

	
 
		void handleConnectedPayload(const std::string &payload);
 
@@ -58,25 +64,23 @@ class NetworkPluginServer {
 
		void handleParticipantChangedPayload(const std::string &payload);
 
		void handleRoomChangedPayload(const std::string &payload);
 

	
 
		void handleUserCreated(User *user);
 
		void handleRoomJoined(User *user, const std::string &room, const std::string &nickname, const std::string &password);
 
		void handleRoomLeft(User *user, const std::string &room);
 
		void handleUserReadyToConnect(User *user);
 
		void handleUserDestroyed(User *user);
 

	
 
		void send(boost::shared_ptr<Swift::Connection> &, const std::string &data);
 

	
 
		void pingTimeout();
 
		void sendPing();
 
		void sendPing(boost::shared_ptr<Swift::Connection> c);
 
		boost::shared_ptr<Swift::Connection> getFreeClient();
 

	
 
		std::string m_command;
 
		std::string m_data;
 
		UserManager *m_userManager;
 
		Config *m_config;
 
		boost::shared_ptr<Swift::ConnectionServer> m_server;
 
		boost::shared_ptr<Swift::Connection> m_client;
 
		bool m_pongReceived;
 
		std::map<boost::shared_ptr<Swift::Connection>, Client >  m_clients;
 
		Swift::Timer::ref m_pingTimer;
 
};
 

	
 
}
include/transport/user.h
Show inline comments
 
@@ -87,15 +87,17 @@ class User {
 
		Swift::JID m_jid;
 
		Component *m_component;
 
		RosterManager *m_rosterManager;
 
		UserManager *m_userManager;
 
		ConversationManager *m_conversationManager;
 
		Swift::EntityCapsManager *m_entityCapsManager;
 
		Swift::PresenceOracle *m_presenceOracle;
 
		UserInfo m_userInfo;
 
		void *m_data;
 
		bool m_connected;
 
		bool m_readyForConnect;
 
		Swift::Timer::ref m_reconnectTimer;
 
		boost::shared_ptr<Swift::Connection> connection;
 
		friend class NetworkPluginServer;
 
};
 

	
 
}
src/networkplugin.cpp
Show inline comments
 
@@ -32,31 +32,32 @@
 

	
 
namespace Transport {
 

	
 
#define WRAP(MESSAGE, TYPE) 	pbnetwork::WrapperMessage wrap; \
 
	wrap.set_type(TYPE); \
 
	wrap.set_payload(MESSAGE); \
 
	wrap.SerializeToString(&MESSAGE);
 

	
 
NetworkPlugin::NetworkPlugin(Swift::EventLoop *loop, const std::string &host, int port) {
 
	m_factories = new Swift::BoostNetworkFactories(loop);
 
	m_host = host;
 
	m_port = port;
 
	m_pingReceived = false;
 
	m_conn = m_factories->getConnectionFactory()->createConnection();
 
	m_conn->onDataRead.connect(boost::bind(&NetworkPlugin::handleDataRead, this, _1));
 
	m_conn->onConnectFinished.connect(boost::bind(&NetworkPlugin::handleConnected, this, _1));
 
	m_conn->onDisconnected.connect(boost::bind(&NetworkPlugin::handleDisconnected, this));
 

	
 
	m_reconnectTimer = m_factories->getTimerFactory()->createTimer(1000);
 
	m_reconnectTimer->onTick.connect(boost::bind(&NetworkPlugin::connect, this)); 
 
	m_pingTimer = m_factories->getTimerFactory()->createTimer(30000);
 
	m_pingTimer->onTick.connect(boost::bind(&NetworkPlugin::pingTimeout, this)); 
 
	connect();
 
}
 

	
 
NetworkPlugin::~NetworkPlugin() {
 
	delete m_factories;
 
}
 

	
 
void NetworkPlugin::handleMessage(const std::string &user, const std::string &legacyName, const std::string &msg, const std::string &nickname) {
 
	pbnetwork::ConversationMessage m;
 
	m.set_username(user);
 
	m.set_buddyname(legacyName);
 
	m.set_message(msg);
 
@@ -147,44 +148,43 @@ void NetworkPlugin::handleRoomChanged(const std::string &user, const std::string
 
	room.set_password("");
 

	
 
	std::string message;
 
	room.SerializeToString(&message);
 

	
 
	WRAP(message, pbnetwork::WrapperMessage_Type_TYPE_ROOM_NICKNAME_CHANGED);
 
 
 
	send(message);
 
}
 

	
 
void NetworkPlugin::handleConnected(bool error) {
 
	if (error) {
 
		std::cout << "Connecting error\n";
 
// 		m_reconnectTimer->start();
 
		std::cerr << "Connecting error\n";
 
		m_pingTimer->stop();
 
		exit(1);
 
	}
 
	else {
 
		std::cout << "Connected\n";
 
		m_reconnectTimer->stop();
 
		m_pingTimer->start();
 
	}
 
}
 

	
 
void NetworkPlugin::handleDisconnected() {
 
	std::cout << "Disconnected\n";
 
// 	m_reconnectTimer->start();
 
	std::cerr << "Disconnected\n";
 
	m_pingTimer->stop();
 
	exit(1);
 
}
 

	
 
void NetworkPlugin::connect() {
 
	std::cout << "Trying to connect the server\n";
 
	m_conn->connect(Swift::HostAddressPort(Swift::HostAddress(m_host), m_port));
 
	m_reconnectTimer->stop();
 
}
 

	
 
void NetworkPlugin::handleLoginPayload(const std::string &data) {
 
	pbnetwork::Login payload;
 
	if (payload.ParseFromString(data) == false) {
 
		// TODO: ERROR
 
		return;
 
	}
 
	handleLoginRequest(payload.user(), payload.legacyname(), payload.password());
 
}
 

	
 
void NetworkPlugin::handleLogoutPayload(const std::string &data) {
 
@@ -279,22 +279,30 @@ void NetworkPlugin::send(const std::string &data) {
 
	std::string header("    ");
 
// 	std::cout << data.size() << "\n";
 
	boost::int32_t size = data.size();
 
	for (int i = 0; i != 4; ++i) {
 
		header.at(i) = static_cast<char>(size >> (8 * (3 - i)));
 
// 		std::cout << std::hex << (int) header.at(i) << "\n";
 
	}
 

	
 
	m_conn->write(Swift::ByteArray(header + data));
 
}
 

	
 
void NetworkPlugin::sendPong() {
 
	m_pingReceived = true;
 
	std::string message;
 
	pbnetwork::WrapperMessage wrap;
 
	wrap.set_type(pbnetwork::WrapperMessage_Type_TYPE_PONG);
 
	wrap.SerializeToString(&message);
 

	
 
	send(message);
 
// 	std::cout << "SENDING PONG\n";
 
}
 

	
 
void NetworkPlugin::pingTimeout() {
 
	if (m_pingReceived == false) {
 
		exit(1);
 
	}
 
	m_pingReceived = false;
 
}
 

	
 
}
src/networkpluginserver.cpp
Show inline comments
 
@@ -97,63 +97,68 @@ static void handleBuddyPayload(LocalBuddy *buddy, const pbnetwork::Buddy &payloa
 
	buddy->setAlias(payload.alias());
 
	std::vector<std::string> groups;
 
	groups.push_back(payload.groups());
 
	buddy->setGroups(groups);
 
	buddy->setStatus(Swift::StatusShow((Swift::StatusShow::Type) payload.status()), payload.statusmessage());
 
	buddy->setIconHash(payload.iconhash());
 
}
 

	
 
NetworkPluginServer::NetworkPluginServer(Component *component, Config *config, UserManager *userManager) {
 
	m_userManager = userManager;
 
	m_config = config;
 
	component->m_factory = new NetworkFactory(this);
 
	m_pongReceived = false;
 
	m_userManager->onUserCreated.connect(boost::bind(&NetworkPluginServer::handleUserCreated, this, _1));
 
	m_userManager->onUserDestroyed.connect(boost::bind(&NetworkPluginServer::handleUserDestroyed, this, _1));
 

	
 
	m_pingTimer = component->getFactories()->getTimerFactory()->createTimer(10000);
 
	m_pingTimer->onTick.connect(boost::bind(&NetworkPluginServer::pingTimeout, this)); 
 

	
 
	m_server = component->getFactories()->getConnectionFactory()->createConnectionServer(10000);
 
	m_server->onNewConnection.connect(boost::bind(&NetworkPluginServer::handleNewClientConnection, this, _1));
 
	m_server->start();
 

	
 
	signal(SIGCHLD, SigCatcher);
 

	
 
	exec_(CONFIG_STRING(m_config, "service.backend").c_str(), "localhost", "10000", m_config->getConfigFile().c_str());
 
}
 

	
 
NetworkPluginServer::~NetworkPluginServer() {
 
	m_pingTimer->stop();
 
}
 

	
 
void NetworkPluginServer::handleNewClientConnection(boost::shared_ptr<Swift::Connection> c) {
 
	if (m_client) {
 
		c->disconnect();
 
	}
 
	m_client = c;
 
	m_pongReceived = false;
 
	
 
	Client client;
 
	client.pongReceived = true;
 

	
 
	m_clients[c] = client;
 

	
 
	c->onDisconnected.connect(boost::bind(&NetworkPluginServer::handleSessionFinished, this, c));
 
	c->onDataRead.connect(boost::bind(&NetworkPluginServer::handleDataRead, this, c, _1));
 
	sendPing();
 
	sendPing(c);
 
	m_pingTimer->start();
 
}
 

	
 
void NetworkPluginServer::handleSessionFinished(boost::shared_ptr<Swift::Connection> c) {
 
	if (c == m_client) {
 
		m_client.reset();
 
	for (std::list<User *>::const_iterator it = m_clients[c].users.begin(); it != m_clients[c].users.end(); it++) {
 
		(*it)->handleDisconnected("Internal Server Error, please reconnect.");
 
	}
 

	
 
	m_clients.erase(c);
 

	
 
	// Execute new session only if there's no free one after this crash/disconnection
 
	for (std::map<boost::shared_ptr<Swift::Connection>, Client>::const_iterator it = m_clients.begin(); it != m_clients.end(); it++) {
 
		if ((*it).second.users.size() < 1) {
 
			return;
 
		}
 
	}
 
	m_pingTimer->stop();
 
	exec_(CONFIG_STRING(m_config, "service.backend").c_str(), "localhost", "10000", m_config->getConfigFile().c_str());
 
}
 

	
 
void NetworkPluginServer::handleConnectedPayload(const std::string &data) {
 
	pbnetwork::Connected payload;
 
	if (payload.ParseFromString(data) == false) {
 
		// TODO: ERROR
 
		return;
 
	}
 
// 	std::cout << payload.name() << "\n";
 
}
 

	
 
@@ -265,196 +270,220 @@ void NetworkPluginServer::handleConvMessagePayload(const std::string &data, bool
 

	
 
	NetworkConversation *conv = (NetworkConversation *) user->getConversationManager()->getConversation(payload.buddyname());
 
	if (!conv) {
 
		conv = new NetworkConversation(user->getConversationManager(), payload.buddyname());
 
		conv->onMessageToSend.connect(boost::bind(&NetworkPluginServer::handleMessageReceived, this, _1, _2));
 
	}
 

	
 
	conv->handleMessage(msg, payload.nickname());
 
}
 

	
 
void NetworkPluginServer::handleDataRead(boost::shared_ptr<Swift::Connection> c, const Swift::ByteArray &data) {
 
	long expected_size = 0;
 
	m_data += data.toString();
 
	m_clients[c].data += data.toString();
 
// 	std::cout << "received data; size = " << m_data.size() << "\n";
 
	while (m_data.size() != 0) {
 
		if (m_data.size() >= 4) {
 
			unsigned char * head = (unsigned char*) m_data.c_str();
 
	while (m_clients[c].data.size() != 0) {
 
		if (m_clients[c].data.size() >= 4) {
 
			unsigned char * head = (unsigned char*) m_clients[c].data.c_str();
 
			expected_size = (((((*head << 8) | *(head + 1)) << 8) | *(head + 2)) << 8) | *(head + 3);
 
			//expected_size = m_data[0];
 
// 			std::cout << "expected_size=" << expected_size << "\n";
 
			if (m_data.size() - 4 < expected_size)
 
			if (m_clients[c].data.size() - 4 < expected_size)
 
				return;
 
		}
 
		else {
 
			return;
 
		}
 

	
 
		std::string msg = m_data.substr(4, expected_size);
 
		m_data.erase(0, 4 + expected_size);
 
		std::string msg = m_clients[c].data.substr(4, expected_size);
 
		m_clients[c].data.erase(0, 4 + expected_size);
 

	
 
		pbnetwork::WrapperMessage wrapper;
 
		if (wrapper.ParseFromString(msg) == false) {
 
			// TODO: ERROR
 
			return;
 
		}
 

	
 
		switch(wrapper.type()) {
 
			case pbnetwork::WrapperMessage_Type_TYPE_CONNECTED:
 
				handleConnectedPayload(wrapper.payload());
 
				break;
 
			case pbnetwork::WrapperMessage_Type_TYPE_DISCONNECTED:
 
				handleDisconnectedPayload(wrapper.payload());
 
				break;
 
			case pbnetwork::WrapperMessage_Type_TYPE_BUDDY_CHANGED:
 
				handleBuddyChangedPayload(wrapper.payload());
 
				break;
 
			case pbnetwork::WrapperMessage_Type_TYPE_CONV_MESSAGE:
 
				handleConvMessagePayload(wrapper.payload());
 
				break;
 
			case pbnetwork::WrapperMessage_Type_TYPE_ROOM_SUBJECT_CHANGED:
 
				handleConvMessagePayload(wrapper.payload(), true);
 
				break;
 
			case pbnetwork::WrapperMessage_Type_TYPE_PONG:
 
				m_pongReceived = true;
 
				m_clients[c].pongReceived = true;
 
				break;
 
			case pbnetwork::WrapperMessage_Type_TYPE_PARTICIPANT_CHANGED:
 
				handleParticipantChangedPayload(wrapper.payload());
 
				break;
 
			case pbnetwork::WrapperMessage_Type_TYPE_ROOM_NICKNAME_CHANGED:
 
				handleRoomChangedPayload(wrapper.payload());
 
				break;
 
			default:
 
				return;
 
		}
 
	}
 
}
 

	
 
void NetworkPluginServer::send(boost::shared_ptr<Swift::Connection> &c, const std::string &data) {
 
	std::string header("    ");
 
	for (int i = 0; i != 4; ++i)
 
		header.at(i) = static_cast<char>(data.size() >> (8 * (3 - i)));
 
	
 
	c->write(Swift::ByteArray(header + data));
 
}
 

	
 
void NetworkPluginServer::pingTimeout() {
 
	std::cout << "pingtimeout\n";
 
	if (m_pongReceived) {
 
		sendPing();
 
		m_pingTimer->start();
 
	}
 
	else {
 
		exec_(CONFIG_STRING(m_config, "service.backend").c_str(), "localhost", "10000", m_config->getConfigFile().c_str());
 
	for (std::map<boost::shared_ptr<Swift::Connection>, Client>::const_iterator it = m_clients.begin(); it != m_clients.end(); it++) {
 
		if ((*it).second.pongReceived) {
 
			sendPing((*it).first);
 
			m_pingTimer->start();
 
		}
 
		else {
 
			exec_(CONFIG_STRING(m_config, "service.backend").c_str(), "localhost", "10000", m_config->getConfigFile().c_str());
 
		}
 
	}
 
}
 

	
 
void NetworkPluginServer::handleUserCreated(User *user) {
 
// 	UserInfo userInfo = user->getUserInfo();
 
	user->onReadyToConnect.connect(boost::bind(&NetworkPluginServer::handleUserReadyToConnect, this, user));
 
	user->onRoomJoined.connect(boost::bind(&NetworkPluginServer::handleRoomJoined, this, user, _1, _2, _3));
 
	user->onRoomLeft.connect(boost::bind(&NetworkPluginServer::handleRoomLeft, this, user, _1));
 
}
 

	
 
void NetworkPluginServer::handleUserReadyToConnect(User *user) {
 
	UserInfo userInfo = user->getUserInfo();
 

	
 
	pbnetwork::Login login;
 
	login.set_user(user->getJID().toBare());
 
	login.set_legacyname(userInfo.uin);
 
	login.set_password(userInfo.password);
 

	
 
	std::string message;
 
	login.SerializeToString(&message);
 

	
 
	WRAP(message, pbnetwork::WrapperMessage_Type_TYPE_LOGIN);
 
	user->connection = getFreeClient();
 
	m_clients[user->connection].users.push_back(user);
 

	
 
	send(m_client, message);
 
	send(user->connection, message);
 
}
 

	
 
void NetworkPluginServer::handleRoomJoined(User *user, const std::string &r, const std::string &nickname, const std::string &password) {
 
	UserInfo userInfo = user->getUserInfo();
 

	
 
	pbnetwork::Room room;
 
	room.set_username(user->getJID().toBare());
 
	room.set_nickname(nickname);
 
	room.set_room(r);
 
	room.set_password(password);
 

	
 
	std::string message;
 
	room.SerializeToString(&message);
 

	
 
	WRAP(message, pbnetwork::WrapperMessage_Type_TYPE_JOIN_ROOM);
 
 
 
	send(m_client, message);
 
	send(user->connection, message);
 

	
 
	NetworkConversation *conv = new NetworkConversation(user->getConversationManager(), r, true);
 
	conv->onMessageToSend.connect(boost::bind(&NetworkPluginServer::handleMessageReceived, this, _1, _2));
 
	conv->setNickname(nickname);
 
}
 

	
 
void NetworkPluginServer::handleRoomLeft(User *user, const std::string &r) {
 
	UserInfo userInfo = user->getUserInfo();
 

	
 
	pbnetwork::Room room;
 
	room.set_username(user->getJID().toBare());
 
	room.set_nickname("");
 
	room.set_room(r);
 
	room.set_password("");
 

	
 
	std::string message;
 
	room.SerializeToString(&message);
 

	
 
	WRAP(message, pbnetwork::WrapperMessage_Type_TYPE_LEAVE_ROOM);
 
 
 
	send(m_client, message);
 
	send(user->connection, message);
 

	
 
	NetworkConversation *conv = (NetworkConversation *) user->getConversationManager()->getConversation(r);
 
	if (!conv) {
 
		return;
 
	}
 

	
 
	delete conv;
 
}
 

	
 
void NetworkPluginServer::handleUserDestroyed(User *user) {
 
	UserInfo userInfo = user->getUserInfo();
 

	
 
	pbnetwork::Logout logout;
 
	logout.set_user(user->getJID().toBare());
 
	logout.set_legacyname(userInfo.uin);
 

	
 
	std::string message;
 
	logout.SerializeToString(&message);
 

	
 
	WRAP(message, pbnetwork::WrapperMessage_Type_TYPE_LOGOUT);
 
 
 
	send(m_client, message);
 
	send(user->connection, message);
 

	
 
	m_clients[user->connection].users.remove(user);
 
	if (m_clients[user->connection].users.size() == 0) {
 
		std::cout << "DISCONNECTING\n";
 
		user->connection->disconnect();
 
		user->connection.reset();
 
		m_clients.erase(user->connection);
 
	}
 
}
 

	
 
void NetworkPluginServer::handleMessageReceived(NetworkConversation *conv, boost::shared_ptr<Swift::Message> &msg) {
 
	pbnetwork::ConversationMessage m;
 
	m.set_username(conv->getConversationManager()->getUser()->getJID().toBare());
 
	m.set_buddyname(conv->getLegacyName());
 
	m.set_message(msg->getBody());
 

	
 
	std::string message;
 
	m.SerializeToString(&message);
 

	
 
	WRAP(message, pbnetwork::WrapperMessage_Type_TYPE_CONV_MESSAGE);
 

	
 
	send(m_client, message);
 
	send(conv->getConversationManager()->getUser()->connection, message);
 
}
 

	
 
void NetworkPluginServer::sendPing() {
 
void NetworkPluginServer::sendPing(boost::shared_ptr<Swift::Connection> c) {
 

	
 
	std::string message;
 
	pbnetwork::WrapperMessage wrap;
 
	wrap.set_type(pbnetwork::WrapperMessage_Type_TYPE_PING);
 
	wrap.SerializeToString(&message);
 

	
 
	send(m_client, message);
 
	m_pongReceived = false;
 
	send(c, message);
 
	m_clients[c].pongReceived = false;
 
	std::cout << "SENDING PING\n";
 
}
 

	
 
boost::shared_ptr<Swift::Connection> NetworkPluginServer::getFreeClient() {
 
	for (std::map<boost::shared_ptr<Swift::Connection>, Client>::const_iterator it = m_clients.begin(); it != m_clients.end(); it++) {
 
		if ((*it).second.users.size() < 1) {
 
			if ((*it).second.users.size() + 1 == 1) {
 
				exec_(CONFIG_STRING(m_config, "service.backend").c_str(), "localhost", "10000", m_config->getConfigFile().c_str());
 
			}
 
			return (*it).first;
 
		}
 
	}
 
	return boost::shared_ptr<Swift::Connection>(); 
 
}
 

	
 
}
0 comments (0 inline, 0 general)