diff --git a/src/ThreadPool.cpp b/src/ThreadPool.cpp new file mode 100644 index 0000000000000000000000000000000000000000..69c0fc748613e477eb8487bb4c7273ab733b049c --- /dev/null +++ b/src/ThreadPool.cpp @@ -0,0 +1,125 @@ +#include "transport/ThreadPool.h" +#include "transport/Logging.h" + +DEFINE_LOGGER(logger, "ThreadPool") +boost::signals2::signal< void (Thread*, int) > onWorkCompleted; + +static void Worker(Thread *t, int wid, Swift::EventLoop *loop) +{ + LOG4CXX_INFO(logger, "Starting thread " << wid) + t->run(); + loop->postEvent(boost::bind(boost::ref(onWorkCompleted), t, wid), boost::shared_ptr()); +} + + +ThreadPool::ThreadPool(Swift::EventLoop *loop, int maxthreads) : MAX_THREADS(maxthreads) +{ + this->loop = loop; + activeThreads = 0; + worker = new boost::thread*[MAX_THREADS]; + for(int i=0 ; i MAX_THREADS) return; + pool_lock.lock(); + + delete worker[i]; + worker[i] = NULL; + freeThreads.push(i); + + updateActiveThreadCount(1); + + pool_lock.unlock(); +} + +void ThreadPool::cleandUp(Thread *t, int wid) +{ + LOG4CXX_INFO(logger, "Cleaning up thread #" << t->getThreadID()) + t->finalize(); + delete t; + releaseThread(wid); + onWorkerAvailable(); +} + +void ThreadPool::scheduleFromQueue() +{ + criticalregion.lock(); + while(!requestQueue.empty()) { + int w = getFreeThread(); + if(w == -1) break; + + LOG4CXX_INFO(logger, "Worker Available. Creating thread #" << w) + Thread *t = requestQueue.front(); requestQueue.pop(); + t->setThreadID(w); + worker[w] = new boost::thread(Worker, t, w, loop); + updateActiveThreadCount(-1); + } + criticalregion.unlock(); +} + + +void ThreadPool::runAsThread(Thread *t) +{ + int w; + if((w = getFreeThread()) != -1) { + LOG4CXX_INFO(logger, "Creating thread #" << w) + t->setThreadID(w); + worker[w] = new boost::thread(Worker, t, w, loop); + updateActiveThreadCount(-1); + } + else { + LOG4CXX_INFO(logger, "No workers available! adding to queue.") + requestQueue.push(t); + } +}