Changeset - d2ed4065362c
[Not reviewed]
0 1 0
Sarang Bharadwaj - 13 years ago 2012-06-01 19:22:01
sarang.bh@gmail.com
Multi-threaded twitter requests
1 file changed with 122 insertions and 8 deletions:
0 comments (0 inline, 0 general)
backends/twitter/main.cpp
Show inline comments
 
@@ -10,7 +10,12 @@
 
#include "signal.h"
 
#include "sys/wait.h"
 
#include "sys/signal.h"
 

	
 
#include <boost/algorithm/string.hpp>
 
#include <boost/signal.hpp>
 
#include <boost/thread.hpp>
 
#include <boost/thread/mutex.hpp>
 

	
 
#include "twitcurl.h"
 
#include "TwitterResponseParser.h"
 

	
 
@@ -18,6 +23,8 @@
 
#include <sstream>
 
#include <map>
 
#include <vector>
 
#include <queue>
 
#include <set>
 
#include <cstdio>
 
#include "userdb.h"
 

	
 
@@ -31,7 +38,11 @@ class TwitterPlugin; // The plugin
 
TwitterPlugin * np = NULL;
 
StorageBackend *storagebackend;
 

	
 
#define STR(x) (std::string("(") + x.from + ", " + x.to + ", " + x.message + ")")
 

	
 
class TwitterPlugin : public NetworkPlugin {
 
	private:
 
		struct Request;
 
	public:
 
		Swift::BoostNetworkFactories *m_factories;
 
		Swift::BoostIOServiceThread m_boostIOServiceThread;
 
@@ -54,15 +65,15 @@ class TwitterPlugin : public NetworkPlugin {
 
			m_conn = m_factories->getConnectionFactory()->createConnection();
 
			m_conn->onDataRead.connect(boost::bind(&TwitterPlugin::_handleDataRead, this, _1));
 
			m_conn->connect(Swift::HostAddressPort(Swift::HostAddress(host), port));
 
			onDispatchRequest.connect(boost::bind(&TwitterPlugin::requestDispatcher, this, _1, _2));
 
			
 
			//db = new UserDB(std::string("user.db"));
 
			//registeredUsers = db->getRegisteredUsers();
 
			activeThreadCount = 0;
 
			MAX_THREADS  = 50;
 
			
 
			LOG4CXX_INFO(logger, "Starting the plugin.");
 
		}
 

	
 
		~TwitterPlugin() {
 
			//delete db;
 
			delete storagebackend;
 
			std::map<std::string, twitCurl*>::iterator it;
 
			for(it = sessions.begin() ; it != sessions.end() ; it++) delete it->second;
 
@@ -303,9 +314,13 @@ class TwitterPlugin : public NetworkPlugin {
 
			
 
		}
 

	
 
		void spawnThreadForRequest(Request r) {
 
			std::string &user = r.from;
 
			std::string &legacyName = r.to;
 
		    std::string &message = r.message;
 

	
 
		void handleMessageSendRequest(const std::string &user, const std::string &legacyName, const std::string &message, const std::string &xhtml = "") {
 
			LOG4CXX_INFO(logger, "Sending message from " << user << " to " << legacyName << ".");
 

	
 
			if(legacyName == "twitter-account") {
 
				std::string cmd = message.substr(0, message.find(':'));
 
				std::string data = message.substr(message.find(':') + 1);
 
@@ -319,6 +334,72 @@ class TwitterPlugin : public NetworkPlugin {
 
				else if(cmd == "#timeline") fetchTimeline(user);
 
				else if(cmd == "#friends") fetchFriends(user);
 
			}
 
			updateActiveThreadCount(-1);
 
			onDispatchRequest(r, false);	
 
		}
 
		
 
		/*
 
		 * usersBeingServed - set of users being served at present
 
		 * usersToServe - queue of users who have requests pending in their request queue and are yet to be served; Each user appears only once here.
 
		 */
 
		void requestDispatcher(Request r, bool incoming) {
 

	
 
			criticalRegion.lock();
 

	
 
			if(incoming) {
 
				std::string user = r.from;
 
				if(getActiveThreadCount() < MAX_THREADS && usersBeingServed.count(user) == false) {
 
					updateActiveThreadCount(1);
 
					boost::thread(&TwitterPlugin::spawnThreadForRequest, this, r);
 
					usersBeingServed.insert(user);
 
					LOG4CXX_INFO(logger, user << ": Sending request " << STR(r) << " to twitter")
 
				} else {
 
					requests[user].push(r);
 
					LOG4CXX_INFO(logger, user << " is already being served! Adding " << STR(r) << " to request queue");
 
					if (!usersBeingServed.count(user)) {
 
						usersToServe.push(user);
 
					}
 
				}
 
			} else {
 
				usersBeingServed.erase(r.from);
 
				if(requests[r.from].size()) usersToServe.push(r.from);
 
				while(getActiveThreadCount() < MAX_THREADS && !usersToServe.empty()) {
 
					std::string user = usersToServe.front(); usersToServe.pop();
 
					Request s = requests[user].front(); requests[user].pop();
 
					updateActiveThreadCount(1);
 
					boost::thread(&TwitterPlugin::spawnThreadForRequest, this, s);
 
					usersBeingServed.insert(user);
 
					LOG4CXX_INFO(logger, user << ": Sending request " << STR(s) << " to twitter")
 
				} 
 
			}
 

	
 
			criticalRegion.unlock();
 
		}
 

	
 

	
 
		void handleMessageSendRequest(const std::string &user, const std::string &legacyName, const std::string &message, const std::string &xhtml = "") {
 
			
 
			Request r;
 
			r.from = user;
 
			r.to = legacyName;
 
			r.message = message;
 
			LOG4CXX_INFO(logger, user << "Dispatching request " << STR(r))
 
			onDispatchRequest(r,true);
 
			//requestDispatcher(r, true);
 

	
 
			/*if(legacyName == "twitter-account") {
 
				std::string cmd = message.substr(0, message.find(':'));
 
				std::string data = message.substr(message.find(':') + 1);
 
				
 
				handleMessage(user, "twitter-account", cmd + " " + data);
 

	
 
				if(cmd == "#pin") handlePINExchange(user, data);
 
				else if(cmd == "#help") printHelpMessage(user);
 
				else if(cmd[0] == '@') {std::string username = cmd.substr(1); handleDirectMessage(user, username, data);}
 
				else if(cmd == "#status") handleStatusUpdate(user, data);
 
				else if(cmd == "#timeline") fetchTimeline(user);
 
				else if(cmd == "#friends") fetchFriends(user);
 
			}*/
 
		}
 

	
 
		void handleBuddyUpdatedRequest(const std::string &user, const std::string &buddyName, const std::string &alias, const std::vector<std::string> &groups) {
 
@@ -330,17 +411,50 @@ class TwitterPlugin : public NetworkPlugin {
 

	
 
		}
 

	
 
		int getActiveThreadCount() {
 
			int res;
 
			threadLock.lock();
 
			res = activeThreadCount;
 
			threadLock.unlock();
 
			return res;
 
		}
 

	
 
		void updateActiveThreadCount(int k) {
 
			threadLock.lock();
 
			activeThreadCount+=k;
 
			threadLock.unlock();
 
		}
 

	
 
	private:
 
		enum status {NEW, WAITING_FOR_PIN, CONNECTED, DISCONNECTED};
 
		struct Request {
 
			std::string from;
 
			std::string to;
 
			std::string message;
 
		};
 

	
 
		Config *config;
 
		//UserDB *db;
 
		std::string consumerKey;
 
		std::string consumerSecret;
 
		//std::set<std::string> registeredUsers;
 
		std::map<std::string, twitCurl*> sessions;
 
		std::map<std::string, status> connectionState;
 
		std::string OAUTH_KEY;
 
		std::string OAUTH_SECRET;
 

	
 
		int activeThreadCount;
 
		int MAX_THREADS;
 
		
 
		boost::mutex criticalRegion;
 
		boost::mutex threadLock;
 

	
 
		std::map<std::string, twitCurl*> sessions;
 
		std::map<std::string, std::queue<Request> > requests;
 
		
 
		std::queue<std::string> usersToServe;
 
		std::set<std::string> usersBeingServed;
 

	
 
		
 
		std::map<std::string, status> connectionState;
 

	
 
		boost::signal< void (Request, bool) > onDispatchRequest;
 
};
 

	
 
static void spectrum_sigchld_handler(int sig)
0 comments (0 inline, 0 general)