Files @ daeb6dfc9a9f
Branch filter:

Location: libtransport.git/libtransport/ThreadPool.cpp - annotation

Conrad Kostecki
Enable support for Qt5

Since Qt4 is EOL, we should also support libcommuni build with Qt5.
In order not to break Fedora docker builds, -DENABLE_QT4 is introduced.

When set to 'ON', support for Qt4 is being enabled,
otherwise support for Qt5 is enabled.

Signed-off-by: Conrad Kostecki <conrad@kostecki.com>
78e71f9345c7
78e71f9345c7
78e71f9345c7
6d2f8c192761
6d2f8c192761
5adb3d1f9733
5adb3d1f9733
2b47d32a916c
2b47d32a916c
5e19186950b9
2b47d32a916c
5e19186950b9
2b47d32a916c
7500ab6c4c30
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
7500ab6c4c30
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
d93cc2ce66eb
2b47d32a916c
d93cc2ce66eb
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
d93cc2ce66eb
2b47d32a916c
d93cc2ce66eb
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
d93cc2ce66eb
2b47d32a916c
2b47d32a916c
2b47d32a916c
d93cc2ce66eb
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
d93cc2ce66eb
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
d93cc2ce66eb
2b47d32a916c
2b47d32a916c
f328e80a974a
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
d93cc2ce66eb
2b47d32a916c
f328e80a974a
2b47d32a916c
2b47d32a916c
2b47d32a916c
d93cc2ce66eb
2b47d32a916c
2b47d32a916c
2b47d32a916c
5adb3d1f9733
f328e80a974a
d93cc2ce66eb
f328e80a974a
f328e80a974a
f328e80a974a
f328e80a974a
5adb3d1f9733
#include "transport/ThreadPool.h"
#include "transport/Logging.h"

#include "Swiften/SwiftenCompat.h"

namespace Transport {

DEFINE_LOGGER(logger, "ThreadPool")

ThreadPool::ThreadPool(Swift::EventLoop *loop, int maxthreads) : MAX_THREADS(maxthreads)
{
	this->loop = loop;
	activeThreads = 0;
	worker = (boost::thread **) malloc(sizeof(boost::thread *) * MAX_THREADS);
	for(int i=0 ; i<MAX_THREADS ; i++) {
		worker[i] = NULL;
		freeThreads.push(i);
	}
	onWorkerAvailable.connect(boost::bind(&ThreadPool::scheduleFromQueue, this));
}

ThreadPool::~ThreadPool()
{
	for(int i=0; i<MAX_THREADS ; i++) {
		if(worker[i]) {
			delete worker[i];
		}
	}
	free(worker);

	while(!requestQueue.empty()) {
		Thread *t = requestQueue.front(); requestQueue.pop();
		delete t;
	}
}

int ThreadPool::getActiveThreadCount()
{
	int res;
	count_lock.lock();
	res = activeThreads;
	count_lock.unlock();
	return res;
}

void ThreadPool::updateActiveThreadCount(int k)
{
	count_lock.lock();
	activeThreads += k;
	count_lock.unlock();
}

int ThreadPool::getFreeThread()
{
	int res = -1;
	pool_lock.lock();
	if(!freeThreads.empty()){
		res = freeThreads.front();
		freeThreads.pop();
		updateActiveThreadCount(-1);
	}
	pool_lock.unlock();
	return res;
}

void ThreadPool::releaseThread(int i)
{
	if(i < 0 || i > MAX_THREADS) return;
	pool_lock.lock();

	delete worker[i];
	worker[i] = NULL;
	freeThreads.push(i);

	updateActiveThreadCount(1);

	pool_lock.unlock();
}

void ThreadPool::cleandUp(Thread *t, int wid)
{
	LOG4CXX_INFO(logger, "Cleaning up thread #" << t->getThreadID());
	t->finalize();
	delete t;
	releaseThread(wid);
	onWorkerAvailable();
}

void ThreadPool::scheduleFromQueue()
{
	criticalregion.lock();
	while(!requestQueue.empty()) {
		int  w = getFreeThread();
		if(w == -1) break;

		LOG4CXX_INFO(logger, "Worker Available. Creating thread #" << w);
		Thread *t = requestQueue.front(); requestQueue.pop();
		t->setThreadID(w);
		worker[w] = new boost::thread(boost::bind(&ThreadPool::workerBody, this, _1, _2), t, w, loop);
		updateActiveThreadCount(-1);
	}
	criticalregion.unlock();
}


void ThreadPool::runAsThread(Thread *t)
{
	int w;
	if((w = getFreeThread()) != -1) {
		LOG4CXX_INFO(logger, "Creating thread #" << w);
		t->setThreadID(w);
		worker[w] = new boost::thread(boost::bind(&ThreadPool::workerBody, this, _1, _2), t, w, loop);
		updateActiveThreadCount(-1);
	}
	else {
		LOG4CXX_INFO(logger, "No workers available! adding to queue.");
		requestQueue.push(t);
	}
}

void ThreadPool::workerBody(Thread *t, int wid) {
	LOG4CXX_INFO(logger, "Starting thread " << wid);
	t->run();
	loop->postEvent(boost::bind(&ThreadPool::cleandUp, this, t, wid), SWIFTEN_SHRPTR_NAMESPACE::shared_ptr<Swift::EventOwner>());
}

}