Changeset - 2b47d32a916c
[Not reviewed]
0 2 6
Sarang Bharadwaj - 13 years ago 2012-06-02 16:40:42
sarang.bh@gmail.com
ThreadPool implementation
8 files changed with 323 insertions and 12 deletions:
0 comments (0 inline, 0 general)
backends/twitter/CMakeLists.txt
Show inline comments
 
include_directories (${libtransport_SOURCE_DIR}/backends/twitter/libtwitcurl) 
 
FILE(GLOB SRC *.cpp libtwitcurl/*.cpp)
 
FILE(GLOB SRC *.cpp libtwitcurl/*.cpp Requests/*.cpp)
 
add_executable(spectrum_twitter_backend ${SRC})
 
#add_executable(parser TwitterResponseParser.cpp test.cpp)
 
target_link_libraries(spectrum_twitter_backend curl transport pthread sqlite3 ${Boost_LIBRARIES} ${SWIFTEN_LIBRARY} ${LOG4CXX_LIBRARIES})
backends/twitter/Requests/DirectMessageRequest.cpp
Show inline comments
 
new file 100644
 
#include "DirectMessageRequest.h"
 
DEFINE_LOGGER(logger, "DirectMessageRequest")
 
void DirectMessageRequest::run() 
 
{
 
	if(twitObj.directMessageSend(username, data, false) == false) {
 
		LOG4CXX_ERROR(logger, user << ": Error while sending directed message to " << username );
 
		return;
 
	}
 
	twitObj.getLastWebResponse( replyMsg );
 
}
 

	
 
void DirectMessageRequest::finalize()
 
{
 
	LOG4CXX_INFO(logger, user << ": Sent " << data << " to " << username)
 
	LOG4CXX_INFO(logger, user << ": Twitter reponse - " << replyMsg)
 
}
backends/twitter/Requests/DirectMessageRequest.h
Show inline comments
 
new file 100644
 
#ifndef DIRECT_MESSAGE
 
#define DIRECT_MESSAGE
 

	
 
#include "../ThreadPool.h"
 
#include "../libtwitcurl/twitcurl.h"
 
#include "transport/networkplugin.h"
 
#include "transport/logging.h"
 
#include <string>
 
#include <iostream>
 

	
 
using namespace Transport;
 

	
 
class DirectMessageRequest : public Thread
 
{
 
	twitCurl twitObj;
 
	std::string data;
 
	std::string user;
 
	std::string username;
 
	std::string replyMsg;
 
	NetworkPlugin *np;
 

	
 
	public:
 
	DirectMessageRequest(NetworkPlugin *_np, twitCurl *obj, const std::string &_user, const std::string & _username, const std::string &_data) {
 
		twitObj = *obj;
 
		data = _data;
 
		user = _user;
 
		username = _username;
 
		np = _np;
 
	}
 

	
 
	void run();
 
	void finalize();
 
};
 

	
 
#endif
backends/twitter/Requests/StatusUpdateRequest.cpp
Show inline comments
 
new file 100644
 
#include "StatusUpdateRequest.h"
 
DEFINE_LOGGER(logger, "StatusUpdateRequest")
 
void StatusUpdateRequest::run() 
 
{
 
	if( twitObj.statusUpdate( data ) ) {
 
		replyMsg = "";
 
		while(replyMsg.length() == 0) {
 
			twitObj.getLastWebResponse( replyMsg );
 
		}
 
		LOG4CXX_INFO(logger, user << "StatusUpdateRequest response " << replyMsg );
 
	} else {
 
		twitObj.getLastCurlError( replyMsg );
 
		LOG4CXX_ERROR(logger, user << "Error - " << replyMsg );
 
	}
 
}
 

	
 
void StatusUpdateRequest::finalize()
 
{
 
	LOG4CXX_INFO(logger, "Updated status for " << user << ": " << data);
 
	return;
 
}
backends/twitter/Requests/StatusUpdateRequest.h
Show inline comments
 
new file 100644
 
#ifndef STATUS_UPDATE
 
#define STATUS_UPDATE
 

	
 
#include "../ThreadPool.h"
 
#include "../libtwitcurl/twitcurl.h"
 
#include "transport/networkplugin.h"
 
#include "transport/logging.h"
 
#include <string>
 
#include <iostream>
 

	
 
using namespace Transport;
 
class StatusUpdateRequest : public Thread
 
{
 
	twitCurl twitObj;
 
	std::string data;
 
	std::string user;
 
	std::string replyMsg;
 
	NetworkPlugin *np;
 
	public:
 
	StatusUpdateRequest(NetworkPlugin *_np, twitCurl *obj, const std::string &_user, const std::string &_data) {
 
		twitObj = *obj;
 
		data = _data;
 
		user = _user;
 
		np = _np;
 
	}
 
	void run();
 
	void finalize();
 
};
 

	
 
#endif
backends/twitter/ThreadPool.cpp
Show inline comments
 
new file 100644
 
#include "ThreadPool.h"
 
DEFINE_LOGGER(logger, "ThreadPool")
 
boost::signals2::signal< void (Thread*, int) > onWorkCompleted;
 

	
 
void Worker(Thread *t, int wid)
 
{
 
	LOG4CXX_INFO(logger, "Starting thread " << wid)
 
	t->run();
 
	onWorkCompleted(t, wid);
 
}
 

	
 

	
 
ThreadPool::ThreadPool(int maxthreads) : MAX_THREADS(maxthreads)
 
{
 
	activeThreads = 0;
 
	worker = new boost::thread*[MAX_THREADS];
 
	for(int i=0 ; i<MAX_THREADS ; i++) {
 
		worker[i] = NULL;
 
		freeThreads.push(i);
 
	}
 
	onWorkCompleted.connect(boost::bind(&ThreadPool::cleandUp, this, _1, _2));
 
	onWorkerAvailable.connect(boost::bind(&ThreadPool::scheduleFromQueue, this));
 
}
 

	
 
ThreadPool::~ThreadPool()
 
{
 
	for(int i=0; i<MAX_THREADS ; i++) {
 
		if(worker[i]) {
 
			delete worker[i];
 
		}
 
	}
 
	delete worker;
 

	
 
	while(!requestQueue.empty()) {
 
		Thread *t = requestQueue.front(); requestQueue.pop();
 
		delete t;
 
	}
 
}
 

	
 
int ThreadPool::getActiveThreadCount()
 
{	
 
	int res;
 
	count_lock.lock(); 
 
	res = activeThreads;
 
	count_lock.unlock();
 
	return res;
 
}
 

	
 
void ThreadPool::updateActiveThreadCount(int k)
 
{
 
	count_lock.lock();
 
	activeThreads += k;
 
	count_lock.unlock();
 
}
 

	
 
int ThreadPool::getFreeThread()
 
{
 
	int res = -1;
 
	pool_lock.lock();
 
	if(!freeThreads.empty()){
 
		res = freeThreads.front();
 
		freeThreads.pop();
 
		updateActiveThreadCount(-1);
 
	}
 
	pool_lock.unlock();
 
	return res;
 
}
 

	
 
void ThreadPool::releaseThread(int i)
 
{
 
	if(i < 0 || i > MAX_THREADS) return;
 
	pool_lock.lock();
 

	
 
	delete worker[i];
 
	worker[i] = NULL;
 
	freeThreads.push(i);
 
	
 
	updateActiveThreadCount(1);
 
	
 
	pool_lock.unlock();
 
}
 

	
 
void ThreadPool::cleandUp(Thread *t, int wid)
 
{
 
	LOG4CXX_INFO(logger, "Cleaning up thread #" << t->getThreadID())
 
	t->finalize();
 
	delete t;
 
	releaseThread(wid);
 
	onWorkerAvailable();	
 
}
 

	
 
void ThreadPool::scheduleFromQueue()
 
{
 
	criticalregion.lock();	
 
	while(!requestQueue.empty()) {
 
		int  w = getFreeThread();
 
		if(w == -1) break;
 

	
 
		LOG4CXX_INFO(logger, "Worker Available. Creating thread #" << w)
 
		Thread *t = requestQueue.front(); requestQueue.pop();
 
		t->setThreadID(w);
 
		worker[w] = new boost::thread(Worker, t, w);
 
		updateActiveThreadCount(-1);
 
	}
 
	criticalregion.unlock();
 
}
 

	
 

	
 
void ThreadPool::runAsThread(Thread *t)
 
{
 
	int w;
 
	if((w = getFreeThread()) != -1) {
 
		LOG4CXX_INFO(logger, "Creating thread #" << w)
 
		t->setThreadID(w);
 
		worker[w] = new boost::thread(Worker, t, w);
 
		updateActiveThreadCount(-1);
 
	}
 
	else {
 
		LOG4CXX_INFO(logger, "No workers available! adding to queue.")
 
		requestQueue.push(t);
 
	}
 
}
backends/twitter/ThreadPool.h
Show inline comments
 
new file 100644
 
#ifndef THREAD_POOL
 
#define THREAD_POOL
 

	
 
#include <boost/thread.hpp>
 
#include <boost/thread/mutex.hpp>
 
#include <boost/signals2/signal.hpp>
 
#include <queue>
 
#include <iostream>
 
#include "transport/logging.h"
 

	
 

	
 
/*
 
 * Thread serves as a base class for any code that has to be excuted as a thread
 
 * by the ThreadPool class. The run method defines the code that has to be run
 
 * as a theard. For example, code in run could be sendinga request to a server
 
 * waiting for the response and storing the response. When the thread finishes
 
 * execution, the ThreadPool invokes finalize where one could have the code necessary
 
 * to collect all the responses and release any resources. 
 
 *
 
 * NOTE: The object of the Thread class must be valid (in scope) throughout the 
 
 * execution of the thread.
 
 */
 

	
 
class Thread
 
{
 
	int threadID;
 

	
 
	public:
 
	
 
	Thread() {}
 
	virtual ~Thread() {}
 
	virtual void run() = 0;
 
	virtual void finalize() {}
 
	int getThreadID() {return threadID;}
 
	void setThreadID(int tid) {threadID = tid;}
 
};
 

	
 
/*
 
 * ThreadPool provides the interface to manage a pool of threads. It schedules jobs
 
 * on free threads and when the thread completes it automatically deletes the object
 
 * corresponding to a Thread. If free threads are not available, the requests are 
 
 * added to a queue and scheduled later when threads become available.
 
 */
 

	
 
class ThreadPool
 
{
 
	const int MAX_THREADS;
 
	int activeThreads;
 
	std::queue<int> freeThreads;
 
	
 
	std::queue<Thread*> requestQueue;
 
	boost::thread **worker;
 

	
 
	boost::mutex count_lock;
 
	boost::mutex pool_lock;
 
	boost::mutex criticalregion;
 

	
 
	boost::signals2::signal  < void () > onWorkerAvailable;
 

	
 
	public:
 
	ThreadPool(int maxthreads);
 
	~ThreadPool();
 
	void runAsThread(Thread *t);
 
	int getActiveThreadCount(); 
 
	void updateActiveThreadCount(int k);
 
	void cleandUp(Thread *, int);
 
	void scheduleFromQueue();
 
	int getFreeThread();
 
	void releaseThread(int i);
 
};
 

	
 
#endif
backends/twitter/main.cpp
Show inline comments
 
@@ -5,6 +5,7 @@
 
#include "transport/mysqlbackend.h"
 
#include "transport/pqxxbackend.h"
 
#include "transport/storagebackend.h"
 

	
 
#include "Swiften/Swiften.h"
 
#include "unistd.h"
 
#include "signal.h"
 
@@ -26,7 +27,10 @@
 
#include <queue>
 
#include <set>
 
#include <cstdio>
 
#include "userdb.h"
 

	
 
#include "ThreadPool.h"
 
#include "Requests/StatusUpdateRequest.h"
 
#include "Requests/DirectMessageRequest.h"
 

	
 
using namespace boost::filesystem;
 
using namespace boost::program_options;
 
@@ -66,6 +70,8 @@ class TwitterPlugin : public NetworkPlugin {
 
			m_conn->onDataRead.connect(boost::bind(&TwitterPlugin::_handleDataRead, this, _1));
 
			m_conn->connect(Swift::HostAddressPort(Swift::HostAddress(host), port));
 
			onDispatchRequest.connect(boost::bind(&TwitterPlugin::requestDispatcher, this, _1, _2));
 

	
 
			tp = new ThreadPool(10);
 
			
 
			activeThreadCount = 0;
 
			MAX_THREADS  = 50;
 
@@ -77,6 +83,7 @@ class TwitterPlugin : public NetworkPlugin {
 
			delete storagebackend;
 
			std::map<std::string, twitCurl*>::iterator it;
 
			for(it = sessions.begin() ; it != sessions.end() ; it++) delete it->second;
 
			delete tp;
 
		}
 

	
 
		// Send data to NetworkPlugin server
 
@@ -379,27 +386,34 @@ class TwitterPlugin : public NetworkPlugin {
 

	
 
		void handleMessageSendRequest(const std::string &user, const std::string &legacyName, const std::string &message, const std::string &xhtml = "") {
 
			
 
			Request r;
 
			/*Request r;
 
			r.from = user;
 
			r.to = legacyName;
 
			r.message = message;
 
			LOG4CXX_INFO(logger, user << "Dispatching request " << STR(r))
 
			onDispatchRequest(r,true);
 
			onDispatchRequest(r,true);*/
 
			//requestDispatcher(r, true);
 

	
 
			/*if(legacyName == "twitter-account") {
 
			if(legacyName == "twitter-account") {
 
				std::string cmd = message.substr(0, message.find(':'));
 
				std::string data = message.substr(message.find(':') + 1);
 
				
 
				handleMessage(user, "twitter-account", cmd + " " + data);
 

	
 
				if(cmd == "#pin") handlePINExchange(user, data);
 
				else if(cmd == "#help") printHelpMessage(user);
 
				else if(cmd[0] == '@') {std::string username = cmd.substr(1); handleDirectMessage(user, username, data);}
 
				else if(cmd == "#status") handleStatusUpdate(user, data);
 
				else if(cmd == "#timeline") fetchTimeline(user);
 
				else if(cmd == "#friends") fetchFriends(user);
 
			}*/
 
				//else if(cmd == "#help") printHelpMessage(user);
 
				else if(cmd[0] == '@') {
 
					std::string username = cmd.substr(1); 
 
					tp->runAsThread(new DirectMessageRequest(np, sessions[user], user, username, data));
 
					//handleDirectMessage(user, username, data);
 
				}
 
				else if(cmd == "#status") {
 
					tp->runAsThread(new StatusUpdateRequest(np, sessions[user], user, data));
 
					//handleStatusUpdate(user, data);
 
				}
 
				//else if(cmd == "#timeline") fetchTimeline(user);
 
				//else if(cmd == "#friends") fetchFriends(user);
 
			}
 
		}
 

	
 
		void handleBuddyUpdatedRequest(const std::string &user, const std::string &buddyName, const std::string &alias, const std::vector<std::string> &groups) {
 
@@ -434,6 +448,7 @@ class TwitterPlugin : public NetworkPlugin {
 
		};
 

	
 
		Config *config;
 

	
 
		std::string consumerKey;
 
		std::string consumerSecret;
 
		std::string OAUTH_KEY;
 
@@ -445,6 +460,7 @@ class TwitterPlugin : public NetworkPlugin {
 
		boost::mutex criticalRegion;
 
		boost::mutex threadLock;
 

	
 
		ThreadPool *tp;
 
		std::map<std::string, twitCurl*> sessions;
 
		std::map<std::string, std::queue<Request> > requests;
 
		
 
@@ -453,7 +469,6 @@ class TwitterPlugin : public NetworkPlugin {
 

	
 
		
 
		std::map<std::string, status> connectionState;
 

	
 
		boost::signal< void (Request, bool) > onDispatchRequest;
 
};
 

	
0 comments (0 inline, 0 general)