diff --git a/backends/twitter/main.cpp b/backends/twitter/main.cpp index e1c4b4221037d11259a184ec4487ff729ad99a46..a8b3f6873c93c6a8621ea9c16d341c2cbfd7c9b4 100644 --- a/backends/twitter/main.cpp +++ b/backends/twitter/main.cpp @@ -33,6 +33,7 @@ #include "Requests/DirectMessageRequest.h" #include "Requests/TimelineRequest.h" #include "Requests/FetchFriends.h" +#include "Requests/HelpMessageRequest.h" using namespace boost::filesystem; using namespace boost::program_options; @@ -71,13 +72,9 @@ class TwitterPlugin : public NetworkPlugin { m_conn = m_factories->getConnectionFactory()->createConnection(); m_conn->onDataRead.connect(boost::bind(&TwitterPlugin::_handleDataRead, this, _1)); m_conn->connect(Swift::HostAddressPort(Swift::HostAddress(host), port)); - onDispatchRequest.connect(boost::bind(&TwitterPlugin::requestDispatcher, this, _1, _2)); tp = new ThreadPool(10); - - activeThreadCount = 0; - MAX_THREADS = 50; - + LOG4CXX_INFO(logger, "Starting the plugin."); } @@ -209,193 +206,9 @@ class TwitterPlugin : public NetworkPlugin { LOG4CXX_INFO(logger, user << ": Sent PIN " << data << " and obtained Access Token"); } - void printHelpMessage(const std::string &user) { - std::string helpMsg = ""; - helpMsg = helpMsg - + "\nHELP\n" - + "#status: - Update your status\n" - + "#timeline - Retrieve your timeline\n" - + "@: - Send a directed message to the user \n" - + "#help - print this help message\n"; - - handleMessage(user, "twitter-account", helpMsg); - } - - void handleDirectMessage(const std::string &user, std::string &username, std::string &data) { - if(sessions[user]->directMessageSend(username, data, false) == false) { - LOG4CXX_ERROR(logger, user << ": Error while sending directed message to user " << username ); - return; - } - - LOG4CXX_INFO(logger, user << ": Sending " << data << " to " << username) - - std::string replyMsg; - sessions[user]->getLastWebResponse( replyMsg ); - LOG4CXX_INFO(logger, replyMsg); - } - - void handleStatusUpdate(const std::string &user, std::string &data) { - if(connectionState[user] != CONNECTED) { - LOG4CXX_ERROR(logger, "Trying to update status for " << user << " when not connected!"); - return; - } - - std::string replyMsg; - if( sessions[user]->statusUpdate( data ) ) { - replyMsg = ""; - while(replyMsg.length() == 0) { - sessions[user]->getLastWebResponse( replyMsg ); - } - LOG4CXX_INFO(logger, user << ": twitCurl:statusUpdate web response: " << replyMsg ); - } else { - sessions[user]->getLastCurlError( replyMsg ); - LOG4CXX_INFO(logger, user << ": twitCurl::statusUpdate error: " << replyMsg ); - } - LOG4CXX_INFO(logger, "Updated status for " << user << ": " << data); - } - - void fetchTimeline(const std::string &user) { - if(connectionState[user] != CONNECTED) { - LOG4CXX_ERROR(logger, "Trying to fetch timeline for " << user << " when not connected!"); - return; - } - - std::string replyMsg = ""; - if( sessions[user]->timelineHomeGet()) { - - while(replyMsg.length() == 0) { - sessions[user]->getLastWebResponse( replyMsg ); - } - - LOG4CXX_INFO(logger, user << ": twitCurl::timeline web response: " << replyMsg.length() << " " << replyMsg << "\n" ); - - std::vector tweets = getTimeline(replyMsg); - std::string timeline = "\n"; - for(int i=0 ; igetLastCurlError( replyMsg ); - LOG4CXX_INFO(logger, user << ": twitCurl::timeline error: " << replyMsg ); - } - } - - void fetchFriends(const std::string &user) { - if(connectionState[user] != CONNECTED) { - LOG4CXX_ERROR(logger, "Trying to fetch friends of " << user << " when not connected!"); - return; - } - - std::string replyMsg = ""; - if( sessions[user]->friendsIdsGet(sessions[user]->getTwitterUsername())) { - - while(replyMsg.length() == 0) { - sessions[user]->getLastWebResponse( replyMsg ); - } - - LOG4CXX_INFO(logger, user << ": twitCurl::friendsIdsGet web response: " << replyMsg.length() << " " << replyMsg << "\n" ); - - std::vector IDs = getIDs( replyMsg ); - /*for(int i=0 ; iuserLookup(IDs, true); - sessions[user]->getLastWebResponse( replyMsg ); - LOG4CXX_INFO(logger, user << ": twitCurl::UserLookUp web response: " << replyMsg.length() << " " << replyMsg << "\n" ); - - std::vector users = getUsers( replyMsg ); - - std::string userlist = "\n***************USER LIST****************\n"; - for(int i=0 ; i < users.size() ; i++) { - userlist += "*)" + users[i].getUserName() + " (" + users[i].getScreenName() + ")\n"; - } - userlist += "***************************************\n"; - handleMessage(user, "twitter-account", userlist); - - } else { - sessions[user]->getLastCurlError( replyMsg ); - LOG4CXX_INFO(logger, user << ": twitCurl::friendsIdsGet error: " << replyMsg ); - } - - } - - void spawnThreadForRequest(Request r) { - std::string &user = r.from; - std::string &legacyName = r.to; - std::string &message = r.message; - - LOG4CXX_INFO(logger, "Sending message from " << user << " to " << legacyName << "."); - - if(legacyName == "twitter-account") { - std::string cmd = message.substr(0, message.find(':')); - std::string data = message.substr(message.find(':') + 1); - - handleMessage(user, "twitter-account", cmd + " " + data); - - if(cmd == "#pin") handlePINExchange(user, data); - else if(cmd == "#help") printHelpMessage(user); - else if(cmd[0] == '@') {std::string username = cmd.substr(1); handleDirectMessage(user, username, data);} - else if(cmd == "#status") handleStatusUpdate(user, data); - else if(cmd == "#timeline") fetchTimeline(user); - else if(cmd == "#friends") fetchFriends(user); - } - updateActiveThreadCount(-1); - onDispatchRequest(r, false); - } - - /* - * usersBeingServed - set of users being served at present - * usersToServe - queue of users who have requests pending in their request queue and are yet to be served; Each user appears only once here. - */ - void requestDispatcher(Request r, bool incoming) { - - criticalRegion.lock(); - - if(incoming) { - std::string user = r.from; - if(getActiveThreadCount() < MAX_THREADS && usersBeingServed.count(user) == false) { - updateActiveThreadCount(1); - boost::thread(&TwitterPlugin::spawnThreadForRequest, this, r); - usersBeingServed.insert(user); - LOG4CXX_INFO(logger, user << ": Sending request " << STR(r) << " to twitter") - } else { - requests[user].push(r); - LOG4CXX_INFO(logger, user << " is already being served! Adding " << STR(r) << " to request queue"); - if (!usersBeingServed.count(user)) { - usersToServe.push(user); - } - } - } else { - usersBeingServed.erase(r.from); - if(requests[r.from].size()) usersToServe.push(r.from); - while(getActiveThreadCount() < MAX_THREADS && !usersToServe.empty()) { - std::string user = usersToServe.front(); usersToServe.pop(); - Request s = requests[user].front(); requests[user].pop(); - updateActiveThreadCount(1); - boost::thread(&TwitterPlugin::spawnThreadForRequest, this, s); - usersBeingServed.insert(user); - LOG4CXX_INFO(logger, user << ": Sending request " << STR(s) << " to twitter") - } - } - - criticalRegion.unlock(); - } - void handleMessageSendRequest(const std::string &user, const std::string &legacyName, const std::string &message, const std::string &xhtml = "") { - /*Request r; - r.from = user; - r.to = legacyName; - r.message = message; - LOG4CXX_INFO(logger, user << "Dispatching request " << STR(r)) - onDispatchRequest(r,true);*/ - //requestDispatcher(r, true); - if(legacyName == "twitter-account") { std::string cmd = message.substr(0, message.find(':')); std::string data = message.substr(message.find(':') + 1); @@ -403,7 +216,9 @@ class TwitterPlugin : public NetworkPlugin { handleMessage(user, "twitter-account", cmd + " " + data); if(cmd == "#pin") handlePINExchange(user, data); - //else if(cmd == "#help") printHelpMessage(user); + else if(cmd == "#help") { + tp->runAsThread(new HelpMessageRequest(np, user)); + } else if(cmd[0] == '@') { std::string username = cmd.substr(1); tp->runAsThread(new DirectMessageRequest(np, sessions[user], user, username, data)); @@ -413,6 +228,7 @@ class TwitterPlugin : public NetworkPlugin { } else if(cmd == "#timeline") { tp->runAsThread(new TimelineRequest(np, sessions[user], user)); + //fetchTimeline(user); } else if(cmd == "#friends") { tp->runAsThread(new FetchFriends(np, sessions[user], user)); @@ -429,28 +245,9 @@ class TwitterPlugin : public NetworkPlugin { } - int getActiveThreadCount() { - int res; - threadLock.lock(); - res = activeThreadCount; - threadLock.unlock(); - return res; - } - - void updateActiveThreadCount(int k) { - threadLock.lock(); - activeThreadCount+=k; - threadLock.unlock(); - } - private: enum status {NEW, WAITING_FOR_PIN, CONNECTED, DISCONNECTED}; - struct Request { - std::string from; - std::string to; - std::string message; - }; - + Config *config; std::string consumerKey; @@ -458,22 +255,9 @@ class TwitterPlugin : public NetworkPlugin { std::string OAUTH_KEY; std::string OAUTH_SECRET; - int activeThreadCount; - int MAX_THREADS; - - boost::mutex criticalRegion; - boost::mutex threadLock; - ThreadPool *tp; - std::map sessions; - std::map > requests; - - std::queue usersToServe; - std::set usersBeingServed; - - + std::map sessions; std::map connectionState; - boost::signal< void (Request, bool) > onDispatchRequest; }; static void spectrum_sigchld_handler(int sig)