Files
@ be3af42a21a7
Branch filter:
Location: libtransport.git/libtransport/ThreadPool.cpp - annotation
be3af42a21a7
2.7 KiB
text/x-c++hdr
MySQLBackend: disable autoreconnect, control CR_SERVER_LOST manually
* correctly recreate prepared statements
* do not close connection while reconnecting
* signed/unsigned comparison fix
* correctly recreate prepared statements
* do not close connection while reconnecting
* signed/unsigned comparison fix
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 | 78e71f9345c7 78e71f9345c7 78e71f9345c7 6d2f8c192761 6d2f8c192761 5adb3d1f9733 5adb3d1f9733 2b47d32a916c 2b47d32a916c 5e19186950b9 2b47d32a916c 5e19186950b9 2b47d32a916c 7500ab6c4c30 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 7500ab6c4c30 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c d93cc2ce66eb 2b47d32a916c d93cc2ce66eb 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c d93cc2ce66eb 2b47d32a916c d93cc2ce66eb 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c d93cc2ce66eb 2b47d32a916c 2b47d32a916c 2b47d32a916c d93cc2ce66eb 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c d93cc2ce66eb 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c d93cc2ce66eb 2b47d32a916c 2b47d32a916c f328e80a974a 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c d93cc2ce66eb 2b47d32a916c f328e80a974a 2b47d32a916c 2b47d32a916c 2b47d32a916c d93cc2ce66eb 2b47d32a916c 2b47d32a916c 2b47d32a916c 5adb3d1f9733 f328e80a974a d93cc2ce66eb f328e80a974a f328e80a974a f328e80a974a f328e80a974a 5adb3d1f9733 | #include "transport/ThreadPool.h"
#include "transport/Logging.h"
#include "Swiften/SwiftenCompat.h"
namespace Transport {
DEFINE_LOGGER(logger, "ThreadPool")
ThreadPool::ThreadPool(Swift::EventLoop *loop, int maxthreads) : MAX_THREADS(maxthreads)
{
this->loop = loop;
activeThreads = 0;
worker = (boost::thread **) malloc(sizeof(boost::thread *) * MAX_THREADS);
for(int i=0 ; i<MAX_THREADS ; i++) {
worker[i] = NULL;
freeThreads.push(i);
}
onWorkerAvailable.connect(boost::bind(&ThreadPool::scheduleFromQueue, this));
}
ThreadPool::~ThreadPool()
{
for(int i=0; i<MAX_THREADS ; i++) {
if(worker[i]) {
delete worker[i];
}
}
free(worker);
while(!requestQueue.empty()) {
Thread *t = requestQueue.front(); requestQueue.pop();
delete t;
}
}
int ThreadPool::getActiveThreadCount()
{
int res;
count_lock.lock();
res = activeThreads;
count_lock.unlock();
return res;
}
void ThreadPool::updateActiveThreadCount(int k)
{
count_lock.lock();
activeThreads += k;
count_lock.unlock();
}
int ThreadPool::getFreeThread()
{
int res = -1;
pool_lock.lock();
if(!freeThreads.empty()){
res = freeThreads.front();
freeThreads.pop();
updateActiveThreadCount(-1);
}
pool_lock.unlock();
return res;
}
void ThreadPool::releaseThread(int i)
{
if(i < 0 || i > MAX_THREADS) return;
pool_lock.lock();
delete worker[i];
worker[i] = NULL;
freeThreads.push(i);
updateActiveThreadCount(1);
pool_lock.unlock();
}
void ThreadPool::cleandUp(Thread *t, int wid)
{
LOG4CXX_INFO(logger, "Cleaning up thread #" << t->getThreadID());
t->finalize();
delete t;
releaseThread(wid);
onWorkerAvailable();
}
void ThreadPool::scheduleFromQueue()
{
criticalregion.lock();
while(!requestQueue.empty()) {
int w = getFreeThread();
if(w == -1) break;
LOG4CXX_INFO(logger, "Worker Available. Creating thread #" << w);
Thread *t = requestQueue.front(); requestQueue.pop();
t->setThreadID(w);
worker[w] = new boost::thread(boost::bind(&ThreadPool::workerBody, this, _1, _2), t, w, loop);
updateActiveThreadCount(-1);
}
criticalregion.unlock();
}
void ThreadPool::runAsThread(Thread *t)
{
int w;
if((w = getFreeThread()) != -1) {
LOG4CXX_INFO(logger, "Creating thread #" << w);
t->setThreadID(w);
worker[w] = new boost::thread(boost::bind(&ThreadPool::workerBody, this, _1, _2), t, w, loop);
updateActiveThreadCount(-1);
}
else {
LOG4CXX_INFO(logger, "No workers available! adding to queue.");
requestQueue.push(t);
}
}
void ThreadPool::workerBody(Thread *t, int wid) {
LOG4CXX_INFO(logger, "Starting thread " << wid);
t->run();
loop->postEvent(boost::bind(&ThreadPool::cleandUp, this, t, wid), SWIFTEN_SHRPTR_NAMESPACE::shared_ptr<Swift::EventOwner>());
}
}
|