Commit ed186120 authored by louiz’'s avatar louiz’

Concept of slave, connected to the master server and spawning game servers

- Master server is now composed of two separate servers, one accepting game
  clients, and one accepting slave servers
- Introduce some way to communicate over IPC (used between the slaves their
  child game servers)
- Child game servers are spawned and managed by the slave
parent 04b8a290
......@@ -10,14 +10,12 @@ int main()
if (!Config::read_conf("./batajelo.conf"))
return 1;
log_debug("Starting master server");
log_debug(Config::get("db_login", "non"));
log_debug(Config::get("db_login", ""));
srand(getpid());
MasterServer s(7878);
s.start();
MasterServer s(7878, 7877);
while (s.is_started())
s.poll(0);
s.run();
log_debug("Exiting...");
return 0;
......
#include <slave/slave.hpp>
#include <logging/logging.hpp>
int main()
{
Slave slave;
slave.run();
log_debug("Slave exiting...");
}
#include <master_server/master_server.hpp>
MasterServer::MasterServer(const short port):
Server<RemoteClient>(port)
MasterServer::MasterServer(const short client_port,
const short slave_port):
to_client_server(this, client_port),
to_slave_server(this, slave_port)
{
}
const RemoteClient* MasterServer::find_client_by_login(const std::string& login) const
{
for (const RemoteClient* client: this->clients)
{
const auto user = client->get_user();
if (user && user->login == login)
return client;
}
return nullptr;
}
Database* MasterServer::get_database()
{
return &this->database;
}
void MasterServer::on_new_client(RemoteClient* client)
{
client->set_server(this);
}
void MasterServer::on_client_left(RemoteClient* client)
void MasterServer::run()
{
log_debug("Client left");
this->to_client_server.start();
this->to_slave_server.start();
while (true)
IoService::get().poll();
}
#ifndef MASTER_SERVER_HPP_INCLUDED
#define MASTER_SERVER_HPP_INCLUDED
#include <network/server.hpp>
#include <master_server/remote_client.hpp>
#include <master_server/master_to_client_server.hpp>
#include <master_server/master_to_slave_server.hpp>
#include <database/database.hpp>
class MasterServer: public Server<RemoteClient>
class MasterServer
{
public:
MasterServer(const short port);
MasterServer(const short client_port, const short slave_port);
~MasterServer() = default;
/**
* Search for a connected client with this login
* @return RemoteClient* can be nullptr if nothing is found
*/
const RemoteClient* find_client_by_login(const std::string& login) const;
void on_new_client(RemoteClient* client) override final;
void on_client_left(RemoteClient* client) override final;
Database* get_database();
void run();
private:
MasterToClientServer to_client_server;
MasterToSlaveServer to_slave_server;
Database database;
MasterServer(const MasterServer&) = delete;
MasterServer(MasterServer&&) = delete;
MasterServer& operator=(const MasterServer&) = delete;
......
#include <master_server/master_to_client_server.hpp>
#include <master_server/master_server.hpp>
MasterToClientServer::MasterToClientServer(MasterServer* master,
const short port):
Server<RemoteClient>(port),
master(master)
{
}
const RemoteClient* MasterToClientServer::find_client_by_login(const std::string& login) const
{
for (const RemoteClient* client: this->clients)
{
const auto user = client->get_user();
if (user && user->login == login)
return client;
}
return nullptr;
}
void MasterToClientServer::on_new_client(RemoteClient* client)
{
log_debug("starting RemoteClient");
client->set_server(this);
}
void MasterToClientServer::on_client_left(RemoteClient* client)
{
log_debug("RemoteClient left");
}
Database* MasterToClientServer::get_database()
{
return this->master->get_database();
}
#ifndef MASTERTOCLIENT_SERVER_HPP_INCLUDED
#define MASTERTOCLIENT_SERVER_HPP_INCLUDED
#include <network/server.hpp>
#include <master_server/remote_client.hpp>
class MasterServer;
class Database;
class MasterToClientServer: public Server<RemoteClient>
{
public:
MasterToClientServer(MasterServer* master,
const short port);
~MasterToClientServer() = default;
/**
* Search for a connected client with this login
* @return RemoteClient* can be nullptr if nothing is found
*/
const RemoteClient* find_client_by_login(const std::string& login) const;
void on_new_client(RemoteClient* client) override final;
void on_client_left(RemoteClient* client) override final;
Database* get_database();
private:
MasterServer* master;
MasterToClientServer(const MasterToClientServer&) = delete;
MasterToClientServer(MasterToClientServer&&) = delete;
MasterToClientServer& operator=(const MasterToClientServer&) = delete;
MasterToClientServer& operator=(MasterToClientServer&&) = delete;
};
#endif /* MASTERTOCLIENT_SERVER_HPP_INCLUDED */
#include <master_server/master_to_slave_server.hpp>
#include <logging/logging.hpp>
MasterToSlaveServer::MasterToSlaveServer(MasterServer* master,
const short port):
Server<RemoteSlaveClient>(port),
master(master)
{
}
void MasterToSlaveServer::on_new_client(RemoteSlaveClient* client)
{
log_debug("Starting RemoteSlaveClient");
client->set_server(this);
}
void MasterToSlaveServer::on_client_left(RemoteSlaveClient* client)
{
log_debug("Ending RemoteSlaveClient");
}
#ifndef MASTERTOSLAVE_SERVER_HPP_INCLUDED
#define MASTERTOSLAVE_SERVER_HPP_INCLUDED
#include <network/server.hpp>
#include <master_server/remote_slave_client.hpp>
class MasterServer;
class MasterToSlaveServer: public Server<RemoteSlaveClient>
{
public:
MasterToSlaveServer(MasterServer* master,
const short port);
~MasterToSlaveServer() = default;
void on_new_client(RemoteSlaveClient* client) override final;
void on_client_left(RemoteSlaveClient* client) override final;
private:
MasterServer* master;
MasterToSlaveServer(const MasterToSlaveServer&) = delete;
MasterToSlaveServer(MasterToSlaveServer&&) = delete;
MasterToSlaveServer& operator=(const MasterToSlaveServer&) = delete;
MasterToSlaveServer& operator=(MasterToSlaveServer&&) = delete;
};
#endif /* MASTERTOSLAVE_SERVER_HPP_INCLUDED */
#include <master_server/remote_client.hpp>
#include <master_server/master_server.hpp>
#include <master_server/master_to_client_server.hpp>
#include <database/database.hpp>
#include "master.pb.h"
RemoteClient::RemoteClient():
......@@ -14,7 +16,7 @@ RemoteClient::~RemoteClient()
log_info("Deleting remote client " << this->id);
}
void RemoteClient::set_server(MasterServer* server)
void RemoteClient::set_server(MasterToClientServer* server)
{
this->server = server;
}
......
......@@ -17,7 +17,7 @@
#include <network/remote_client_base.hpp>
class MasterServer;
class MasterToClientServer;
class Message;
class TransferSender;
......@@ -27,7 +27,7 @@ public:
RemoteClient();
~RemoteClient();
void on_connection_closed() override final;
void set_server(MasterServer* server);
void set_server(MasterToClientServer* server);
/**
* Sends a file to the remote client.
......@@ -75,7 +75,7 @@ private:
/**
* A pointer to the server owning use
*/
MasterServer* server;
MasterToClientServer* server;
/**
* A list of all the current file transfers with the client.
*/
......
#include <master_server/remote_slave_client.hpp>
#include <master_server/master_to_slave_server.hpp>
void RemoteSlaveClient::set_server(MasterToSlaveServer* server)
{
this->server = server;
}
void RemoteSlaveClient::on_connection_closed()
{
this->server->remove_client(this);
}
void RemoteSlaveClient::install_callbacks()
{
this->send_message("START", "");
}
#ifndef REMOTE_SLAVE_CLIENT_HPP_INCLUDED
#define REMOTE_SLAVE_CLIENT_HPP_INCLUDED
#include <network/remote_client_base.hpp>
class MasterToSlaveServer;
class RemoteSlaveClient: public RemoteClientBase
{
public:
RemoteSlaveClient() = default;
~RemoteSlaveClient() = default;
void on_connection_closed() override final;
void set_server(MasterToSlaveServer* server);
private:
MasterToSlaveServer* server;
void install_callbacks() override final;
RemoteSlaveClient(const RemoteSlaveClient&) = delete;
RemoteSlaveClient(RemoteSlaveClient&&) = delete;
RemoteSlaveClient& operator=(const RemoteSlaveClient&) = delete;
RemoteSlaveClient& operator=(RemoteSlaveClient&&) = delete;
};
#endif /* REMOTE_SLAVE_CLIENT_HPP_INCLUDED */
#include <network/ipc_endpoint.hpp>
#include <network/ioservice.hpp>
#include <boost/filesystem.hpp>
#include <boost/asio.hpp>
#include <stdexcept>
#include <errno.h>
#include <logging/logging.hpp>
using namespace std::string_literals;
IPCEndpoint::IPCEndpoint():
must_destroy(true),
mqd(-1),
mq_name{},
stream_descriptor(IoService::get())
{
retry:
{
auto path = boost::filesystem::unique_path();
this->mq_name = "/"s + path.string();
}
log_debug("Trying to create an ipc endpoint : " << this->mq_name);
mqd_t mqd = ::mq_open(this->mq_name.data(),
O_CREAT|O_RDWR|O_EXCL,
S_IRUSR|S_IWUSR,
nullptr);
if (mqd == static_cast<mqd_t>(-1))
{
if (errno == EEXIST)
{
// We're out of luck, we randomly generated a name that we are
// already using. Try generating a new one
goto retry;
}
else
throw std::runtime_error("mq_open failed: "s + strerror(errno));
}
this->mqd = mqd;
this->resize_recv_buffer();
}
IPCEndpoint::IPCEndpoint(const std::string& path):
must_destroy(false),
mqd(-1),
mq_name{path},
stream_descriptor(IoService::get())
{
log_debug("Trying to open an existing ipc endpoint : " << this->mq_name);
this->mqd = ::mq_open(this->mq_name.data(),
O_RDWR);
if (this->mqd == static_cast<mqd_t>(-1))
throw std::runtime_error("mq_open failed: "s + strerror(errno));
this->resize_recv_buffer();
}
IPCEndpoint::~IPCEndpoint()
{
log_debug("~IPCEndpoint");
if (::mq_close(this->mqd) == -1)
log_error("Failed to close message queue: " << strerror(errno));
if (this->must_destroy)
{
log_debug("Destroying message queue: " << this->mq_name);
if (::mq_unlink(this->mq_name.data()) == -1)
log_error("Failed to unlink message queue " << this->mq_name << ": " << strerror(errno));
}
}
std::string IPCEndpoint::get_path() const
{
return this->mq_name;
}
void IPCEndpoint::resize_recv_buffer()
{
mq_attr attr;
auto res = ::mq_getattr(this->mqd, &attr);
if (res == -1)
throw std::runtime_error("mq_getattr failed: "s + strerror(errno));
this->recv_buffer.resize(attr.mq_msgsize);
}
std::size_t IPCEndpoint::get_max_msgsize() const
{
return this->recv_buffer.size();
}
std::string IPCEndpoint::recv()
{
auto size = ::mq_receive(this->mqd, this->recv_buffer.data(),
this->recv_buffer.size(), nullptr);
if (size == -1)
throw std::runtime_error("mq_receive failed: "s + strerror(errno));
return {this->recv_buffer.data(),
static_cast<std::string::size_type>(size)};
}
void IPCEndpoint::send(const std::string& msg)
{
auto res = ::mq_send(this->mqd, msg.data(), msg.size(), 0);
if (res == -1)
throw std::runtime_error("mq_send failed: "s + strerror(errno));
}
void IPCEndpoint::watch_read(std::function<void(const std::string&)>&& cb)
{
this->stream_descriptor.assign(this->mqd);
this->read_cb = std::move(cb);
this->watch_read();
}
void IPCEndpoint::watch_read()
{
// Do not actually read anything using boost because we are using
// null_buffers. Boost will just call our callback when someone is ready
// to be read, and we do it manually ourself in the callback.
auto cb = [this](const boost::system::error_code& error,
std::size_t)
{
if (error)
log_debug("Error polling on the IPC file descriptor");
else
{
log_debug("Ready to read");
auto res = this->recv();
this->read_cb(res);
}
this->watch_read();
};
this->stream_descriptor.async_read_some(boost::asio::null_buffers(),
cb);
}
#ifndef IPC_ENDPOINT_HPP_INCLUDED
#define IPC_ENDPOINT_HPP_INCLUDED
/**
* Uses message queues (man mq_open)
*/
#include <functional>
#include <vector>
#include <string>
#include <boost/asio/posix/stream_descriptor.hpp>
#include <mqueue.h>
class IPCEndpoint
{
public:
/**
* Use the given path to open an existing IPC message queue.
*/
IPCEndpoint(const std::string& path);
/**
* Generate the path and use to create a new IPC message queue.
*/
IPCEndpoint();
~IPCEndpoint();
std::string get_path() const;
std::size_t get_max_msgsize() const;
std::string recv();
void send(const std::string& msg);
void watch_read(std::function<void(const std::string&)>&& cb);
private:
/**
* Whether or not this object is responsible for the destruction of the
* message queue. It's true when we are the one who created it, false
* otherwise.
*/
bool must_destroy;
mqd_t mqd;
std::string mq_name;
boost::asio::posix::stream_descriptor stream_descriptor;
std::vector<char> recv_buffer;
std::function<void(const std::string&)> read_cb;
void resize_recv_buffer();
void watch_read();
IPCEndpoint(const IPCEndpoint&) = delete;
IPCEndpoint(IPCEndpoint&&) = delete;
IPCEndpoint& operator=(const IPCEndpoint&) = delete;
IPCEndpoint& operator=(IPCEndpoint&&) = delete;
};
#endif /* IPC_ENDPOINT_HPP_INCLUDED */
......@@ -30,8 +30,8 @@
class RemoteClientBase: public MessageHandler, public TimedEventHandler, public PingHandler
{
public:
explicit RemoteClientBase(boost::asio::io_service&);
~RemoteClientBase();
RemoteClientBase();
virtual ~RemoteClientBase();
/**
* starts the client (install the read handler, etc)
*/
......@@ -56,6 +56,11 @@ public:
virtual void on_connection_closed() = 0;
boost::asio::ip::tcp::endpoint& get_endpoint()
{
return this->endpoint;
}
protected:
/**
* The client number (aka id).
......@@ -70,10 +75,11 @@ protected:
void install_read_handler(void);
private:
/**
* We keep a reference on the io_service that was passed to us by the
* Server, for convenience.
* A endpoint containing the information of the remote peer. It is set by
* asio, when async_accept succeeds, just before the accept handler is
* called.
*/
boost::asio::io_service& io_service;
boost::asio::ip::tcp::endpoint endpoint;
};
#endif // REMOTE_CLIENT_BASE
......
#include <slave/slave.hpp>
#include <network/ioservice.hpp>
#include <logging/logging.hpp>
short Slave::min_port = 7790;
std::size_t Slave::max_children = 2;
Slave::Slave():
running(false),
signal_set(IoService::get(), SIGINT, SIGTERM),
client(this),
children_handler(this)
{
this->signal_set.async_wait([this](const boost::system::error_code& error,
int)
{
log_debug("Received SIGINT.");
this->running = false;
});
}
void Slave::run()
{
this->running = true;
this->client.start();
while (this->running)
this->client.poll();
}
void Slave::start_game()
{
this->children_handler.start_subprocess();
}
void Slave::fill_info(ser::slave::SlaveInfo& info)
{
info.set_free_slots(Slave::max_children - this->children_handler.size());
}
#ifndef SLAVE_HPP_INCLUDED
#define SLAVE_HPP_INCLUDED
#include <slave/slave_client.hpp>
#include <slave/children_handler.hpp>
#include <boost/asio/signal_set.hpp>
#include "slave.pb.h"
class Slave
{
public:
Slave();
~Slave() = default;
void run();
void start_game();
/**
* Fill the given structure with the information about this slave.
*/
void fill_info(ser::slave::SlaveInfo& info);
static short min_port;
static std::size_t max_children;
private:
bool running;
boost::asio::signal_set signal_set;
SlaveClient client;
ChildrenHandler children_handler;
Slave(const Slave&) = delete;
Slave(Slave&&) = delete;
Slave& operator=(const Slave&) = delete;
Slave& operator=(Slave&&) = delete;
};
#endif /* SLAVE_HPP_INCLUDED */