Changeset - 5e19186950b9
[Not reviewed]
0 4 0
Sarang Bharadwaj - 13 years ago 2012-06-03 14:33:39
sarang.bh@gmail.com
Calling finalize in main thread
4 files changed with 11 insertions and 8 deletions:
0 comments (0 inline, 0 general)
backends/twitter/ThreadPool.cpp
Show inline comments
 
#include "ThreadPool.h"
 
DEFINE_LOGGER(logger, "ThreadPool")
 
boost::signals2::signal< void (Thread*, int) > onWorkCompleted;
 

	
 
void Worker(Thread *t, int wid)
 
static void Worker(Thread *t, int wid, Swift::EventLoop *loop)
 
{
 
	LOG4CXX_INFO(logger, "Starting thread " << wid)
 
	t->run();
 
	onWorkCompleted(t, wid);
 
	loop->postEvent(boost::bind(boost::ref(onWorkCompleted), t, wid), boost::shared_ptr<Swift::EventOwner>());
 
}
 

	
 

	
 
ThreadPool::ThreadPool(int maxthreads) : MAX_THREADS(maxthreads)
 
ThreadPool::ThreadPool(Swift::EventLoop *loop, int maxthreads) : MAX_THREADS(maxthreads)
 
{
 
	this->loop = loop;
 
	activeThreads = 0;
 
	worker = new boost::thread*[MAX_THREADS];
 
	for(int i=0 ; i<MAX_THREADS ; i++) {
 
		worker[i] = NULL;
 
		freeThreads.push(i);
 
	}
 
@@ -96,26 +97,26 @@ void ThreadPool::scheduleFromQueue()
 
		int  w = getFreeThread();
 
		if(w == -1) break;
 

	
 
		LOG4CXX_INFO(logger, "Worker Available. Creating thread #" << w)
 
		Thread *t = requestQueue.front(); requestQueue.pop();
 
		t->setThreadID(w);
 
		worker[w] = new boost::thread(Worker, t, w);
 
		worker[w] = new boost::thread(Worker, t, w, loop);
 
		updateActiveThreadCount(-1);
 
	}
 
	criticalregion.unlock();
 
}
 

	
 

	
 
void ThreadPool::runAsThread(Thread *t)
 
{
 
	int w;
 
	if((w = getFreeThread()) != -1) {
 
		LOG4CXX_INFO(logger, "Creating thread #" << w)
 
		t->setThreadID(w);
 
		worker[w] = new boost::thread(Worker, t, w);
 
		worker[w] = new boost::thread(Worker, t, w, loop);
 
		updateActiveThreadCount(-1);
 
	}
 
	else {
 
		LOG4CXX_INFO(logger, "No workers available! adding to queue.")
 
		requestQueue.push(t);
 
	}
backends/twitter/ThreadPool.h
Show inline comments
 
@@ -4,12 +4,13 @@
 
#include <boost/thread.hpp>
 
#include <boost/thread/mutex.hpp>
 
#include <boost/signals2/signal.hpp>
 
#include <queue>
 
#include <iostream>
 
#include "transport/logging.h"
 
#include "Swiften/Swiften.h"
 

	
 

	
 
/*
 
 * Thread serves as a base class for any code that has to be excuted as a thread
 
 * by the ThreadPool class. The run method defines the code that has to be run
 
 * as a theard. For example, code in run could be sendinga request to a server
 
@@ -51,17 +52,18 @@ class ThreadPool
 
	std::queue<Thread*> requestQueue;
 
	boost::thread **worker;
 

	
 
	boost::mutex count_lock;
 
	boost::mutex pool_lock;
 
	boost::mutex criticalregion;
 
	Swift::EventLoop *loop;
 

	
 
	boost::signals2::signal  < void () > onWorkerAvailable;
 

	
 
	public:
 
	ThreadPool(int maxthreads);
 
	ThreadPool(Swift::EventLoop *loop, int maxthreads);
 
	~ThreadPool();
 
	void runAsThread(Thread *t);
 
	int getActiveThreadCount(); 
 
	void updateActiveThreadCount(int k);
 
	void cleandUp(Thread *, int);
 
	void scheduleFromQueue();
backends/twitter/libtwitcurl/twitcurl.cpp
Show inline comments
 
@@ -600,13 +600,13 @@ bool twitCurl::userLookup( std::vector<std::string> &userInfo, bool isUserId )
 
{
 
    bool retVal = false;
 
    if( userInfo.size() )
 
    {
 
		std::string userIds = isUserId?twitCurlDefaults::TWITCURL_USERID : twitCurlDefaults::TWITCURL_SCREENNAME;
 
		std::string sep = "";
 
		for(int i=0 ; i<std::min(100U, userInfo.size()) ; i++, sep = ",")
 
		for(int i=0 ; i<std::min(100U,(unsigned int) userInfo.size()) ; i++, sep = ",")
 
			userIds += sep + userInfo[i];
 
 
        /* Set URL */
 
        std::string buildUrl = twitterDefaults::TWITCURL_LOOKUPUSERS_URL + twitCurlDefaults::TWITCURL_EXTENSIONFORMATS[m_eApiFormatType];
 
 
		std::cerr << buildUrl << " " << userIds << std::endl;
backends/twitter/main.cpp
Show inline comments
 
@@ -70,13 +70,13 @@ class TwitterPlugin : public NetworkPlugin {
 

	
 
			m_factories = new Swift::BoostNetworkFactories(loop);
 
			m_conn = m_factories->getConnectionFactory()->createConnection();
 
			m_conn->onDataRead.connect(boost::bind(&TwitterPlugin::_handleDataRead, this, _1));
 
			m_conn->connect(Swift::HostAddressPort(Swift::HostAddress(host), port));
 

	
 
			tp = new ThreadPool(10);
 
			tp = new ThreadPool(loop_, 10);
 
				
 
			LOG4CXX_INFO(logger, "Starting the plugin.");
 
		}
 

	
 
		~TwitterPlugin() {
 
			delete storagebackend;
0 comments (0 inline, 0 general)