diff --git a/spectrum_manager/src/methods.cpp b/spectrum_manager/src/methods.cpp new file mode 100644 index 0000000000000000000000000000000000000000..c3785c214a594b84ff31dbfa232c7e76267ffc00 --- /dev/null +++ b/spectrum_manager/src/methods.cpp @@ -0,0 +1,492 @@ +#include "methods.h" + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include "signal.h" +#include "sys/wait.h" + +#define WRAP(MESSAGE, TYPE) pbnetwork::WrapperMessage wrap; \ + wrap.set_type(TYPE); \ + wrap.set_payload(MESSAGE); \ + wrap.SerializeToString(&MESSAGE); + + +using namespace Transport; + +using namespace boost::filesystem; + +using namespace boost; + +std::string _data; +static std::string response; + +std::string get_response() { + return response; +} + +std::string searchForBinary(const std::string &binary) { + std::vector path_list; + char * env_path = getenv("PATH"); + + if (env_path != NULL) { + std::string buffer = ""; + for (int s = 0; s < strlen(env_path); s++) { + if (env_path[s] == ':') { + path_list.insert(path_list.end(), std::string(buffer)); + buffer = ""; + } + else { + buffer += env_path[s]; + } + } + + if (buffer != "") { + path_list.insert(path_list.end(), std::string(buffer)); + buffer = ""; + } + + for (std::vector::iterator dit = path_list.begin(); dit < path_list.end(); dit++) { + std::string bpath = *dit; + bpath += "/"; + bpath += binary; + path p(bpath); + if (exists(p) && !is_directory(p)) { + return bpath; + } + } + } + return ""; +} + +// Executes new backend +unsigned long exec_(std::string path, std::string config, std::string jid) { + // fork and exec + pid_t pid = fork(); + if ( pid == 0 ) { + // child process + if (jid.empty()) { + exit(execl(path.c_str(), path.c_str(), config.c_str(), NULL)); + } + else { + exit(execl(path.c_str(), path.c_str(), "-j", jid.c_str(), config.c_str(), NULL)); + } + } else if ( pid < 0 ) { + // fork failed + } + else { + waitpid(pid, 0, 0); + } + + return (unsigned long) pid; +} + +int getPort(const std::string &portfile) { + path p(portfile); + if (!exists(p) || is_directory(p)) { + return 0; + } + + std::ifstream f(p.string().c_str(), std::ios_base::in); + std::string port; + f >> port; + + if (port.empty()) + return 0; + + return boost::lexical_cast(port); +} + +int isRunning(const std::string &pidfile) { + path p(pidfile); + if (!exists(p) || is_directory(p)) { + return 0; + } + + std::ifstream f(p.string().c_str(), std::ios_base::in); + std::string pid; + f >> pid; + + if (pid.empty()) + return 0; + + if (kill(boost::lexical_cast(pid), 0) != 0) + return 0; + + return boost::lexical_cast(pid); +} + +void start_instances(ManagerConfig *config, const std::string &_jid) { + path p(CONFIG_STRING(config, "service.config_directory")); + + try { + if (!exists(p)) { + std::cerr << "Config directory " << CONFIG_STRING(config, "service.config_directory") << " does not exist\n"; + exit(6); + } + + if (!is_directory(p)) { + std::cerr << "Config directory " << CONFIG_STRING(config, "service.config_directory") << " does not exist\n"; + exit(7); + } + + std::string spectrum2_binary = searchForBinary("spectrum2"); + if (spectrum2_binary.empty()) { + std::cerr << "spectrum2 binary not found in PATH\n"; + exit(8); + } + + directory_iterator end_itr; + for (directory_iterator itr(p); itr != end_itr; ++itr) { + if (is_regular(itr->path()) && extension(itr->path()) == ".cfg") { + Config cfg; + if (cfg.load(itr->path().string()) == false) { + std::cerr << "Can't load config file " << itr->path().string() << ". Skipping...\n"; + continue; + } + + + std::vector vhosts; + if (CONFIG_HAS_KEY(&cfg, "vhosts.vhost")) + vhosts = CONFIG_VECTOR(&cfg, "vhosts.vhost"); + vhosts.push_back(CONFIG_STRING(&cfg, "service.jid")); + + BOOST_FOREACH(std::string &vhost, vhosts) { + Config vhostCfg; + if (vhostCfg.load(itr->path().string(), vhost) == false) { + std::cerr << "Can't load config file " << itr->path().string() << ". Skipping...\n"; + continue; + } + + if (!_jid.empty() && _jid != vhost) { + continue; + } + + int pid = isRunning(CONFIG_STRING(&vhostCfg, "service.pidfile")); + if (pid == 0) { + std::cout << "Starting " << itr->path() << ": OK\n"; + exec_(spectrum2_binary, itr->path().string(), vhost); + } + else { + std::cout << "Starting " << itr->path() << ": Already started (PID=" << pid << ")\n"; + } + } + } + } + } + catch (const filesystem_error& ex) { + std::cerr << "boost filesystem error\n"; + exit(5); + } +} + +void stop_instances(ManagerConfig *config, const std::string &_jid) { + path p(CONFIG_STRING(config, "service.config_directory")); + + try { + if (!exists(p)) { + std::cerr << "Config directory " << CONFIG_STRING(config, "service.config_directory") << " does not exist\n"; + exit(6); + } + + if (!is_directory(p)) { + std::cerr << "Config directory " << CONFIG_STRING(config, "service.config_directory") << " does not exist\n"; + exit(7); + } + + directory_iterator end_itr; + for (directory_iterator itr(p); itr != end_itr; ++itr) { + if (is_regular(itr->path()) && extension(itr->path()) == ".cfg") { + Config cfg; + if (cfg.load(itr->path().string()) == false) { + std::cerr << "Can't load config file " << itr->path().string() << ". Skipping...\n"; + } + + std::vector vhosts; + if (CONFIG_HAS_KEY(&cfg, "vhosts.vhost")) + vhosts = CONFIG_VECTOR(&cfg, "vhosts.vhost"); + vhosts.push_back(CONFIG_STRING(&cfg, "service.jid")); + + BOOST_FOREACH(std::string &vhost, vhosts) { + Config vhostCfg; + if (vhostCfg.load(itr->path().string(), vhost) == false) { + std::cerr << "Can't load config file " << itr->path().string() << ". Skipping...\n"; + continue; + } + + if (!_jid.empty() && _jid != vhost) { + continue; + } + + int pid = isRunning(CONFIG_STRING(&vhostCfg, "service.pidfile")); + if (pid) { + std::cout << "Stopping " << itr->path() << ": "; + kill(pid, SIGTERM); + + sleep(1); + int count = 20; + while (kill(pid, 0) == 0 && count != 0) { + std::cout << "."; + sleep(1); + count--; + } + if (count == 0) { + std::cout << " ERROR (timeout)\n"; + } + else { + std::cout << " OK\n"; + } + } + else { + std::cout << "Stopping " << itr->path() << ": Not running\n"; + } + } + } + } + } + catch (const filesystem_error& ex) { + std::cerr << "boost filesystem error\n"; + exit(5); + } +} + +int show_status(ManagerConfig *config) { + int ret = 0; + path p(CONFIG_STRING(config, "service.config_directory")); + + try { + if (!exists(p)) { + std::cerr << "Config directory " << CONFIG_STRING(config, "service.config_directory") << " does not exist\n"; + exit(6); + } + + if (!is_directory(p)) { + std::cerr << "Config directory " << CONFIG_STRING(config, "service.config_directory") << " does not exist\n"; + exit(7); + } + + directory_iterator end_itr; + for (directory_iterator itr(p); itr != end_itr; ++itr) { + if (is_regular(itr->path()) && extension(itr->path()) == ".cfg") { + Config cfg; + if (cfg.load(itr->path().string()) == false) { + std::cerr << "Can't load config file " << itr->path().string() << ". Skipping...\n"; + } + + std::vector vhosts; + if (CONFIG_HAS_KEY(&cfg, "vhosts.vhost")) + vhosts = CONFIG_VECTOR(&cfg, "vhosts.vhost"); + vhosts.push_back(CONFIG_STRING(&cfg, "service.jid")); + + BOOST_FOREACH(std::string &vhost, vhosts) { + Config vhostCfg; + if (vhostCfg.load(itr->path().string(), vhost) == false) { + std::cerr << "Can't load config file " << itr->path().string() << ". Skipping...\n"; + continue; + } + + int pid = isRunning(CONFIG_STRING(&vhostCfg, "service.pidfile")); + if (pid) { + std::cout << itr->path() << ": " << vhost << " Running\n"; + } + else { + ret = 3; + std::cout << itr->path() << ": " << vhost << " Stopped\n"; + } + } + } + } + } + catch (const filesystem_error& ex) { + std::cerr << "boost filesystem error\n"; + exit(5); + } + return ret; +} + +static void handleDataRead(boost::shared_ptr m_conn, boost::shared_ptr data) { + _data += std::string(data->begin(), data->end()); + + // Parse data while there are some + while (_data.size() != 0) { + // expected_size of wrapper message + unsigned int expected_size; + + // if data is >= 4, we have whole header and we can + // read expected_size. + if (_data.size() >= 4) { + expected_size = *((unsigned int*) &_data[0]); + expected_size = ntohl(expected_size); + // If we don't have whole wrapper message, wait for next + // handleDataRead call. + if (_data.size() - 4 < expected_size) + return; + } + else { + return; + } + + // Parse wrapper message and erase it from buffer. + pbnetwork::WrapperMessage wrapper; + if (wrapper.ParseFromArray(&_data[4], expected_size) == false) { + std::cout << "PARSING ERROR " << expected_size << "\n"; + _data.erase(_data.begin(), _data.begin() + 4 + expected_size); + continue; + } + _data.erase(_data.begin(), _data.begin() + 4 + expected_size); + + if (wrapper.type() == pbnetwork::WrapperMessage_Type_TYPE_QUERY) { + pbnetwork::BackendConfig payload; + if (payload.ParseFromString(wrapper.payload()) == false) { + std::cout << "PARSING ERROR\n"; + // TODO: ERROR + continue; + } + m_conn->onDataRead.disconnect(boost::bind(&handleDataRead, m_conn, _1)); + response = payload.config(); + std::cout << payload.config() << "\n"; +// exit(0); + } + } +} + +static void handleConnected(boost::shared_ptr m_conn, const std::string &msg, bool error) { + m_conn->onConnectFinished.disconnect(boost::bind(&handleConnected, m_conn, msg, _1)); + if (error) { + std::cerr << "Can't connect the server\n"; + response = "Can't connect the server\n"; + m_conn->onDataRead.disconnect(boost::bind(&handleDataRead, m_conn, _1)); + +// exit(50); + } + else { + pbnetwork::BackendConfig m; + m.set_config(msg); + + std::string message; + m.SerializeToString(&message); + + WRAP(message, pbnetwork::WrapperMessage_Type_TYPE_QUERY); + + uint32_t size = htonl(message.size()); + char *header = (char *) &size; + + + // send header together with wrapper message + m_conn->write(Swift::createSafeByteArray(std::string(header, 4) + message)); + } +} + +void ask_local_server(ManagerConfig *config, Swift::BoostNetworkFactories &networkFactories, const std::string &jid, const std::string &message) { + path p(CONFIG_STRING(config, "service.config_directory")); + + try { + if (!exists(p)) { + std::cerr << "Config directory " << CONFIG_STRING(config, "service.config_directory") << " does not exist\n"; + exit(6); + } + + if (!is_directory(p)) { + std::cerr << "Config directory " << CONFIG_STRING(config, "service.config_directory") << " does not exist\n"; + exit(7); + } + + bool found = false; + directory_iterator end_itr; + for (directory_iterator itr(p); itr != end_itr; ++itr) { + if (is_regular(itr->path()) && extension(itr->path()) == ".cfg") { + Config cfg; + if (cfg.load(itr->path().string()) == false) { + std::cerr << "Can't load config file " << itr->path().string() << ". Skipping...\n"; + continue; + } + + if (CONFIG_STRING(&cfg, "service.jid") != jid) { + continue; + } + + found = true; + + boost::shared_ptr m_conn; + m_conn = networkFactories.getConnectionFactory()->createConnection(); + m_conn->onDataRead.connect(boost::bind(&handleDataRead, m_conn, _1)); + m_conn->onConnectFinished.connect(boost::bind(&handleConnected, m_conn, message, _1)); + m_conn->connect(Swift::HostAddressPort(Swift::HostAddress(CONFIG_STRING(&cfg, "service.backend_host")), getPort(CONFIG_STRING(&cfg, "service.portfile")))); + +// finished++; +// Swift::Client *client = new Swift::Client(CONFIG_VECTOR(&cfg, "service.admin_jid")[0], CONFIG_STRING(&cfg, "service.admin_password"), &networkFactories); +// client->setAlwaysTrustCertificates(); +// client->onConnected.connect(boost::bind(&handleConnected, client, CONFIG_STRING(&cfg, "service.jid"))); +// client->onDisconnected.connect(bind(&handleDisconnected, client, _1, CONFIG_STRING(&cfg, "service.jid"))); +// client->onMessageReceived.connect(bind(&handleMessageReceived, client, _1, CONFIG_STRING(&cfg, "service.jid"))); +// Swift::ClientOptions opt; +// opt.allowPLAINWithoutTLS = true; +// client->connect(opt); + } + } + + if (!found) { + std::cerr << "Config file for Spectrum instance with this JID was not found\n"; + exit(20); + } + } + catch (const filesystem_error& ex) { + std::cerr << "boost filesystem error\n"; + exit(5); + } +} + +std::vector show_list(ManagerConfig *config, bool show) { + path p(CONFIG_STRING(config, "service.config_directory")); + std::vector list; + + try { + if (!exists(p)) { + std::cerr << "Config directory " << CONFIG_STRING(config, "service.config_directory") << " does not exist\n"; + exit(6); + } + + if (!is_directory(p)) { + std::cerr << "Config directory " << CONFIG_STRING(config, "service.config_directory") << " does not exist\n"; + exit(7); + } + + bool found = false; + directory_iterator end_itr; + for (directory_iterator itr(p); itr != end_itr; ++itr) { + if (is_regular(itr->path()) && extension(itr->path()) == ".cfg") { + Config cfg; + if (cfg.load(itr->path().string()) == false) { + std::cerr << "Can't load config file " << itr->path().string() << ". Skipping...\n"; + continue; + } + + if (show) { + std::cout << CONFIG_STRING(&cfg, "service.jid") << "\n"; + } + list.push_back(CONFIG_STRING(&cfg, "service.jid")); + } + } + } + catch (const filesystem_error& ex) { + std::cerr << "boost filesystem error\n"; + } + return list; +} + + + + +