Changeset - 7500ab6c4c30
[Not reviewed]
0 3 0
Jan Kaluza - 9 years ago 2016-02-24 08:35:10
jkaluza@redhat.com
Libtransport: Fix memory leak in HTTPRequest, initialize CURL in the same thread as we use for its cleanup
3 files changed with 22 insertions and 7 deletions:
0 comments (0 inline, 0 general)
libtransport/HTTPRequest.cpp
Show inline comments
 
#include "transport/HTTPRequest.h"
 

	
 
namespace Transport {
 

	
 
DEFINE_LOGGER(logger, "HTTPRequest")
 

	
 
HTTPRequest::HTTPRequest(ThreadPool *tp, Type type, const std::string &url, Callback callback) {
 
	m_type = type;
 
	m_url = url;
 
	m_tp = tp;
 
	m_callback = callback;
 

	
 
	init();
 
	curlhandle = NULL;
 
}
 

	
 
HTTPRequest::HTTPRequest(Type type, const std::string &url) {
 
	m_type = type;
 
	m_url = url;
 
	m_tp = NULL;
 

	
 
	init();
 
	curlhandle = NULL;
 
}
 

	
 
HTTPRequest::~HTTPRequest() {
 
	if (curlhandle) {
 
		LOG4CXX_INFO(logger, "Cleaning up CURL handle");
 
		curl_easy_cleanup(curlhandle);
 
		curlhandle = NULL;
 
	}
 
}
 

	
 
bool HTTPRequest::init() {
 
	curlhandle = curl_easy_init();
 
	if (curlhandle) {
 
		return true;
 
	}
 

	
 
	curlhandle = curl_easy_init();
 
	if (curlhandle) {
 
		curl_easy_setopt(curlhandle, CURLOPT_PROXY, NULL);
 
		curl_easy_setopt(curlhandle, CURLOPT_PROXYUSERPWD, NULL);
 
		curl_easy_setopt(curlhandle, CURLOPT_PROXYAUTH, (long)CURLAUTH_ANY);
 
		return true;
 
	}
 

	
 
	LOG4CXX_ERROR(logger, "Couldn't Initialize curl!")
 
	return false;
 
}
 

	
 
void HTTPRequest::setProxy(std::string IP, std::string port, std::string username, std::string password) {
 
	if (curlhandle) {
 
@@ -104,44 +105,53 @@ bool HTTPRequest::GET(std::string url, rapidjson::Document &json) {
 

	
 
	if(json.Parse<0>(m_data.c_str()).HasParseError()) {
 
		LOG4CXX_ERROR(logger, "Error while parsing JSON");
 
        LOG4CXX_ERROR(logger, m_data);
 
		strcpy(curl_errorbuffer, "Error while parsing JSON");
 
		return false;
 
	}
 

	
 
	return true;
 
}
 

	
 
void HTTPRequest::run() {
 
	if (!init()) {
 
		m_ok =  false;
 
		return;
 
	}
 

	
 
	switch (m_type) {
 
		case Get:
 
			m_ok = GET(m_url, m_json);
 
			break;
 
	}
 

	
 
	curl_easy_cleanup(curlhandle);
 
	curlhandle = NULL;
 
}
 

	
 
void HTTPRequest::finalize() {
 
	m_callback(this, m_ok, m_json, m_data);
 
	onRequestFinished();
 
}
 

	
 
bool HTTPRequest::execute() {
 
	if (!m_tp) {
 
		return false;
 
	}
 

	
 
	m_tp->runAsThread(this);
 
	return true;
 
}
 

	
 
bool HTTPRequest::execute(rapidjson::Document &json) {
 
	init();
 
	switch (m_type) {
 
		case Get:
 
			m_ok = GET(m_url, json);
 
			break;
 
	}
 

	
 
	return m_ok;
 
}
 

	
 
}
libtransport/ThreadPool.cpp
Show inline comments
 
@@ -9,41 +9,41 @@ boost::signals2::signal< void (Thread*, int) > onWorkCompleted;
 
static void Worker(Thread *t, int wid, Swift::EventLoop *loop)
 
{
 
	LOG4CXX_INFO(logger, "Starting thread " << wid)
 
	t->run();
 
	loop->postEvent(boost::bind(boost::ref(onWorkCompleted), t, wid), boost::shared_ptr<Swift::EventOwner>());
 
}
 

	
 

	
 
ThreadPool::ThreadPool(Swift::EventLoop *loop, int maxthreads) : MAX_THREADS(maxthreads)
 
{
 
	this->loop = loop;
 
	activeThreads = 0;
 
	worker = new boost::thread*[MAX_THREADS];
 
	worker = (boost::thread **) malloc(sizeof(boost::thread *) * MAX_THREADS);
 
	for(int i=0 ; i<MAX_THREADS ; i++) {
 
		worker[i] = NULL;
 
		freeThreads.push(i);
 
	}
 
	onWorkCompleted.connect(boost::bind(&ThreadPool::cleandUp, this, _1, _2));
 
	onWorkerAvailable.connect(boost::bind(&ThreadPool::scheduleFromQueue, this));
 
}
 

	
 
ThreadPool::~ThreadPool()
 
{
 
	for(int i=0; i<MAX_THREADS ; i++) {
 
		if(worker[i]) {
 
			delete worker[i];
 
		}
 
	}
 
	delete worker;
 
	free(worker);
 

	
 
	while(!requestQueue.empty()) {
 
		Thread *t = requestQueue.front(); requestQueue.pop();
 
		delete t;
 
	}
 
}
 

	
 
int ThreadPool::getActiveThreadCount()
 
{	
 
	int res;
 
	count_lock.lock(); 
 
	res = activeThreads;
tests/libtransport/main.cpp
Show inline comments
 
@@ -3,24 +3,25 @@
 
#include <cppunit/extensions/TestFactoryRegistry.h>
 
#include <cppunit/TestResult.h>
 
#include <cppunit/TestResultCollector.h>
 
#include <cppunit/TestRunner.h>
 
#include <cppunit/BriefTestProgressListener.h>
 
#ifdef WITH_LOG4CXX
 
#include "log4cxx/logger.h"
 
#include "log4cxx/fileappender.h"
 
#include "log4cxx/patternlayout.h"
 
#include "log4cxx/propertyconfigurator.h"
 

	
 
#include "transport/protocol.pb.h"
 
#include "transport/HTTPRequest.h"
 

	
 
using namespace log4cxx;
 
#endif
 

	
 

	
 
int main (int argc, char* argv[])
 
{
 
#ifdef WITH_LOG4CXX
 
	LoggerPtr root = Logger::getRootLogger();
 
#ifndef _MSC_VER
 
	root->addAppender(new FileAppender(new PatternLayout("%d %-5p %c: %m%n"), "libtransport_test.log", false));
 
#else
 
@@ -29,46 +30,50 @@ int main (int argc, char* argv[])
 
#endif
 

	
 
	std::vector<std::string> testsToRun;
 
	for (int i = 1; i < argc; ++i) {
 
		std::string param(argv[i]);
 
		testsToRun.push_back(param);
 
	}
 

	
 
	if (testsToRun.empty()) {
 
		testsToRun.push_back("");
 
	}
 

	
 
	Transport::HTTPRequest::globalInit();
 

	
 
	// informs test-listener about testresults
 
	CPPUNIT_NS :: TestResult testresult;
 

	
 
	// register listener for collecting the test-results
 
	CPPUNIT_NS :: TestResultCollector collectedresults;
 
	testresult.addListener (&collectedresults);
 

	
 
	// register listener for per-test progress output
 
	CPPUNIT_NS :: BriefTestProgressListener progress;
 
	testresult.addListener (&progress);
 

	
 
	// insert test-suite at test-runner by registry
 
	CPPUNIT_NS :: TestRunner testrunner;
 
	testrunner.addTest (CPPUNIT_NS :: TestFactoryRegistry :: getRegistry ().makeTest ());
 
	for (std::vector<std::string>::const_iterator i = testsToRun.begin(); i != testsToRun.end(); ++i) {
 
		try {
 
			testrunner.run(testresult, *i);
 
		}
 
		catch (const std::exception& e) {
 
			google::protobuf::ShutdownProtobufLibrary();
 
			Transport::HTTPRequest::globalCleanup();
 
			std::cerr << "Error: " << e.what() << std::endl;
 
			return -1;
 
		}
 
	}
 

	
 
	// output results in compiler-format
 
	CPPUNIT_NS :: CompilerOutputter compileroutputter (&collectedresults, std::cerr);
 
	compileroutputter.write ();
 

	
 
	google::protobuf::ShutdownProtobufLibrary();
 
	Transport::HTTPRequest::globalCleanup();
 

	
 
	// return 0 if tests were successful
 
	return collectedresults.wasSuccessful () ? 0 : 1;
 
}
0 comments (0 inline, 0 general)