Commit 20258e61 authored by louiz’'s avatar louiz’

Move message_handler, client_base and remote_client_base implem into hpp files

parent 1ccbd31d
#include <logging/logging.hpp>
#include <network/client_base.hpp>
#include <boost/algorithm/string.hpp>
ClientBase::ClientBase()
{
}
ClientBase::~ClientBase()
{
if (this->get_socket().is_open())
{
log_debug("Closing connection");
this->get_socket().close();
}
}
// Connect, asyncly, and call one of the given callbacks
void ClientBase::connect(const std::string& host,
const short& port,
std::function<void(void)> on_success,
std::function<void(void)> on_failure)
{
// TODO use resolve and DNS
boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::address::from_string(host), port);
this->get_socket().async_connect(endpoint,
std::bind(&ClientBase::connect_handler, this,
on_success, on_failure,
std::placeholders::_1));
log_info("Connecting to " << host << ":" << port);
}
void ClientBase::connect_handler(std::function< void(void) > on_success,
std::function< void(void) > on_failure,
const boost::system::error_code& error)
{
if (error)
{
log_info("Connection failed: " << error);
if (on_failure)
on_failure();
}
else
{
log_info("Connected.");
this->install_callback("PING", std::bind(&ClientBase::ping_callback, this, std::placeholders::_1));
this->install_callbacks();
this->install_read_handler();
if (on_success)
on_success();
}
}
void ClientBase::ping_callback(Message*)
{
log_debug("Received PING");
Message* message = new Message;
message->set_name("PONG");
this->send(message);
}
void ClientBase::poll()
{
while (IoService::get().poll() != 0)
;
}
......@@ -10,36 +10,57 @@
* @class ClientBase
*/
#include <functional>
#include <string>
#include <cstdlib>
#ifndef __CLIENT_BASE_HPP__
# define __CLIENT_BASE_HPP__
#include <boost/algorithm/string.hpp>
#include <network/message_handler.hpp>
#include <network/ping_handler.hpp>
#include <network/timed_event_handler.hpp>
#include <network/message.hpp>
#include <network/timed_event.hpp>
#include <logging/logging.hpp>
#include <utils/time.hpp>
class ClientBase:public MessageHandler, public TimedEventHandler,
public PingHandler
#include <functional>
#include <string>
#include <cstdlib>
class ClientBase: public MessageHandler, public TimedEventHandler,
public PingHandler
{
public:
ClientBase();
~ClientBase();
ClientBase() = default;
~ClientBase()
{
if (this->get_socket().is_open())
{
log_debug("Closing connection");
this->get_socket().close();
}
}
/**
* Tries to connect to the remote server. Calls one of the given callbacks, if it
* succeeds or fails.
* @return void
*/
void connect(const std::string&,
const short&,
std::function< void(void) > on_success = 0,
std::function< void(void) > on_failure = 0);
void connect(const std::string& host,
const short& port,
std::function<void(void)> on_success=nullptr,
std::function<void(const boost::system::error_code&)> on_failure=nullptr)
{
// TODO use resolve and DNS
boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::address::from_string(host), port);
this->get_socket().async_connect(endpoint,
std::bind(&ClientBase::connect_handler, this,
on_success, on_failure,
std::placeholders::_1));
log_info("Connecting to " << host << ":" << port);
}
/**
* Install all the default callbacks.
*/
......@@ -47,18 +68,48 @@ public:
/**
* Checks for network or timed events readiness.
*/
void poll();
void poll()
{
while (IoService::get().poll() != 0)
;
}
virtual void on_connection_closed() = 0;
private:
void connect_handler(std::function< void(void) >,
std::function< void(void) >,
const boost::system::error_code&);
void connect_handler(std::function<void(void)> on_success,
std::function<void(const boost::system::error_code&)> on_failure,
const boost::system::error_code& error)
{
if (error)
{
log_info("Connection failed: " << error);
if (on_failure)
on_failure(error);
}
else
{
log_info("Connected.");
this->install_callback("PING", std::bind(&ClientBase::ping_callback,
this, std::placeholders::_1));
this->install_callbacks();
this->install_read_handler();
if (on_success)
on_success();
}
}
/**
* Called when the server sends us a PING request. Sends a PONG back.
*/
void ping_callback(Message*);
void ping_callback(Message*)
{
log_debug("Received PING");
Message* message = new Message;
message->set_name("PONG");
this->send(message);
}
};
#endif /*__CLIENT_HPP__ */
......
......@@ -22,5 +22,4 @@ private:
IoService& operator=(IoService&&) = delete;
};
#endif /* IOSERVICE_HPP_INCLUDED */
#include <logging/logging.hpp>
#include <network/message_handler.hpp>
#include <network/message.hpp>
#include <utils/scopeguard.hpp>
MessageHandler::MessageHandler():
writing(false)
{
}
MessageHandler::~MessageHandler()
{
}
void MessageHandler::install_callback(const std::string& name,
t_read_callback callback)
{
log_debug("installing callback for message " << name);
this->callbacks.emplace(name, callback);
}
void MessageHandler::install_callback_once(const std::string& name,
t_read_callback callback)
{
log_debug("installing ONCE callback for message " << name);
this->callbacks_once.emplace(name, callback);
}
void MessageHandler::remove_callback(const std::string& name)
{
log_debug("remove_callback: " << name);
auto res = this->callbacks.erase(name);
log_warning("Removed " << res << " callbacks");
}
std::vector<t_read_callback> MessageHandler::get_callbacks(const std::string& message)
{
std::vector<t_read_callback> res;
auto its = this->callbacks.equal_range(message);
for (auto it = std::get<0>(its); it != std::get<1>(its); ++it)
res.push_back(it->second);
its = this->callbacks_once.equal_range(message);
for (auto it = std::get<0>(its); it != std::get<1>(its); ++it)
res.push_back(it->second);
this->callbacks_once.erase(std::get<0>(its), std::get<1>(its));
return res;
}
void MessageHandler::read_handler(const boost::system::error_code& error, const std::size_t bytes_transferred)
{
log_debug("read_handler, size: " << bytes_transferred << " bytes.");
if (error)
{
log_debug("Read error: " << error);
this->on_connection_closed();
return;
}
if (bytes_transferred <= 1)
{
log_warning("Not enough data received for a message header to be valid.");
return ;
}
char *c = new char[bytes_transferred+1];
// Make sure c will ALWAYS be deleted
utils::ScopeGuard guard([c](){ delete[] c;});
// Extract the needed data from the buffer
this->data.sgetn(c, bytes_transferred);
c[bytes_transferred] = 0;
// find the . separator
std::size_t pos = 0;
while (c[pos] && c[pos] != '.')
pos++;
std::string message_name;
std::size_t size;
if (pos == bytes_transferred)
{ // no '.' was found, the message name is assumed to 1 char long.
message_name = std::string(1, c[0]);
size = atoi(std::string(c+1, bytes_transferred-2).data());;
}
else
{
message_name = std::string(c, pos);
size = atoi(std::string(c+pos+1, bytes_transferred-pos-2).data()); // remove the ending :
}
log_debug("Received : " << message_name << " . " << size);
Message* message = new Message;
message->set_name(message_name);
// Find out if a callback was registered for that message.
auto callbacks = this->get_callbacks(message_name);
// We check what we need to read on the socket to have the rest of the binary datas
const std::size_t length_to_read = this->data.size() >= size ? 0 : size - this->data.size();
boost::asio::async_read(this->get_socket(),
this->data,
boost::asio::transfer_at_least(length_to_read),
std::bind(&MessageHandler::binary_read_handler, this,
std::placeholders::_1,
message,
size, callbacks));
}
void MessageHandler::binary_read_handler(const boost::system::error_code& error,
Message* message,
std::size_t bytes_transferred,
std::vector<t_read_callback> callbacks)
{
if (error)
{
log_error("binary_read_handler failed: "<< error);
exit(1);
}
log_debug("binary_read_handler " << bytes_transferred);
message->body = new char[bytes_transferred];
this->data.sgetn(message->body, bytes_transferred);
message->set_body_size(bytes_transferred);
for (const auto& cb: callbacks)
{
try {
cb(message);
}
catch (const SerializationException& error)
{
log_error("Invalid message received.");
}
}
delete message;
this->install_read_handler();
}
void MessageHandler::install_read_handler(void)
{
boost::asio::async_read_until(this->get_socket(),
this->data,
':',
std::bind(&MessageHandler::read_handler, this,
std::placeholders::_1,
std::placeholders::_2));
}
void MessageHandler::request_answer(Message* message, t_read_callback on_answer, std::string name)
{
if (name.size() == 0)
name = message->get_name();
this->install_callback_once(name, on_answer);
this->send(message);
}
void MessageHandler::send(Message* message, std::function< void(void) > on_sent)
{
if (on_sent)
message->callback = on_sent;
log_debug("Sending message: " << message->get_name());
this->messages_to_send.push_front(message);
this->check_messages_to_send();
}
void MessageHandler::send_message(const char* name, const google::protobuf::Message& msg)
{
Message* message = new Message;
message->set_name(name);
message->set_body(msg);
this->send(message);
}
void MessageHandler::send_message(const char* name, const std::string& archive)
{
Message* message = new Message;
message->set_name(name);
message->set_body(archive.data(), archive.length());
this->send(message);
}
bool MessageHandler::check_messages_to_send()
{
if (this->writing || this->messages_to_send.empty())
return false;
this->actually_send(this->messages_to_send.back());
this->messages_to_send.pop_back();
return true;
}
void MessageHandler::actually_send(Message* message)
{
this->writing = true;
message->pack();
std::vector<boost::asio::const_buffer> buffs;
buffs.push_back(boost::asio::buffer(message->header.data(), message->header.length()));
buffs.push_back(boost::asio::buffer(message->body, message->body_size));
async_write(this->get_socket(),
buffs,
std::bind(&MessageHandler::send_handler, this,
std::placeholders::_1,
std::placeholders::_2,
message));
}
void MessageHandler::send_handler(const boost::system::error_code& error,
std::size_t bytes_transferred,
Message* message)
{
this->writing = false;
assert(bytes_transferred == message->header.length() + message->body_size);
if (message->callback)
message->callback();
delete message;
// TODO check for error
if (error)
exit(1);
this->check_messages_to_send();
}
......@@ -21,79 +21,280 @@
#include <network/transfer_sender.hpp>
#include <network/base_socket.hpp>
#include <network/message.hpp>
#include <utils/scopeguard.hpp>
#include <logging/logging.hpp>
typedef std::function<void(Message*)> t_read_callback;
class MessageHandler: public BaseSocket
{
public:
explicit MessageHandler();
virtual ~MessageHandler();
MessageHandler():
writing(false)
{ }
virtual ~MessageHandler() = default;
void install_read_handler()
{
boost::asio::async_read_until(this->get_socket(),
this->data,
':',
std::bind(&MessageHandler::read_handler, this,
std::placeholders::_1,
std::placeholders::_2));
}
void install_read_handler();
/**
* called when there's something to read on the socket. Reads the message
* the size of the arguments, and then calls binary_read_handler to read
* the arguments of the message, if any.
*/
void read_handler(const boost::system::error_code& error, const std::size_t bytes_transferred);
void read_handler(const boost::system::error_code& error, const std::size_t bytes_transferred)
{
log_debug("read_handler, size: " << bytes_transferred << " bytes.");
if (error)
{
log_debug("Read error: " << error);
this->on_connection_closed();
return;
}
if (bytes_transferred <= 1)
{
log_warning("Not enough data received for a message header to be valid.");
return ;
}
char *c = new char[bytes_transferred+1];
// Make sure c will ALWAYS be deleted
utils::ScopeGuard guard([c](){ delete[] c;});
// Extract the needed data from the buffer
this->data.sgetn(c, bytes_transferred);
c[bytes_transferred] = 0;
// find the . separator
std::size_t pos = 0;
while (c[pos] && c[pos] != '.')
pos++;
std::string message_name;
std::size_t size;
if (pos == bytes_transferred)
{ // no '.' was found, the message name is assumed to 1 char long.
message_name = std::string(1, c[0]);
size = atoi(std::string(c+1, bytes_transferred-2).data());;
}
else
{
message_name = std::string(c, pos);
size = atoi(std::string(c+pos+1, bytes_transferred-pos-2).data()); // remove the ending :
}
log_debug("Received : " << message_name << " . " << size);
Message* message = new Message;
message->set_name(message_name);
// Find out if a callback was registered for that message.
auto callbacks = this->get_callbacks(message_name);
// We check what we need to read on the socket to have the rest of the binary datas
const std::size_t length_to_read = this->data.size() >= size ? 0 : size - this->data.size();
boost::asio::async_read(this->get_socket(),
this->data,
boost::asio::transfer_at_least(length_to_read),
std::bind(&MessageHandler::binary_read_handler, this,
std::placeholders::_1,
message,
size, callbacks));
}
/**
* Read the arguments after a message (can read 0 bytes too) and pass that
* to the callback that was associated with this message.
*/
void binary_read_handler(const boost::system::error_code&, Message*, std::size_t, std::vector<t_read_callback> callbacks);
void binary_read_handler(const boost::system::error_code& error,
Message* message, std::size_t bytes_transferred,
std::vector<t_read_callback> callbacks)
{
if (error)
{
log_error("binary_read_handler failed: "<< error);
exit(1);
}
log_debug("binary_read_handler " << bytes_transferred);
message->body = new char[bytes_transferred];
this->data.sgetn(message->body, bytes_transferred);
message->set_body_size(bytes_transferred);
for (const auto& cb: callbacks)
{
try {
cb(message);
}
catch (const SerializationException& error)
{
log_error("Invalid message received.");
}
}
delete message;
this->install_read_handler();
}
/**
* Sends a message, and use install_callback_once to wait for the answer
* and call that callback to handle it.
*/
void request_answer(Message*, t_read_callback on_answer, std::string name = "");
void request_answer(Message* message, t_read_callback on_answer, std::string name="")
{
if (name.size() == 0)
name = message->get_name();
this->install_callback_once(name, on_answer);
this->send(message);
}
/**
* Install a new callback associated with a message. This callback will
* be called upon receiving that message.
*/
void install_callback(const std::string&, t_read_callback);
void install_callback(const std::string& name, t_read_callback callback)
{
log_debug("installing callback for message " << name);
this->callbacks.emplace(name, callback);
}
/**
* Install a new callback associated with a message. This callback will
* be called upon receiving that message, but only once. This is used
* for example if you send a message waiting for and answer, you install
* a callback that will handle that answer, and only this one.
*/
void install_callback_once(const std::string&, t_read_callback);
void install_callback_once(const std::string& name, t_read_callback callback)
{
log_debug("installing ONCE callback for message " << name);
this->callbacks_once.emplace(name, callback);
}
/**
* Remove a callback that has been installed.
*/
void remove_callback(const std::string&);
void remove_callback(const std::string& name)
{
log_debug("remove_callback: " << name);
auto res = this->callbacks.erase(name);
log_warning("Removed " << res << " callbacks");
}
/**
* Add the given message to the messages_to_send queue, then
* calls check_messages_to_send. Which may send the next available
* message, if there's no async_write() call already running.
* It does not necessarily actually send the message on the socket.
*/
void send(Message* message, std::function< void(void) > on_sent = 0);
void send_message(const char* name, const google::protobuf::Message& msg);
void send_message(const char* name, const std::string& archive);
void send(Message* message, std::function<void(void)> on_sent=nullptr)
{
if (on_sent)
message->callback = on_sent;
log_debug("Sending message: " << message->get_name());
this->messages_to_send.push_front(message);
this->check_messages_to_send();
}
void send_message(const char* name, const google::protobuf::Message& msg)
{
Message* message = new Message;
message->set_name(name);
message->set_body(msg);
this->send(message);
}
void send_message(const char* name, const std::string& archive)
{