message_handler.cpp 6.82 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
#include <logging/logging.hpp>
#include <network/message_handler.hpp>
#include <network/message.hpp>

MessageHandler::MessageHandler(boost::asio::io_service& io_service):
  BaseSocket(io_service),
  writing(false)
{
}

MessageHandler::~MessageHandler()
{
}

void MessageHandler::install_callback(const std::string& message,
                                      t_read_callback callback)
{
  log_debug("installing callback for message " << message);
19
  this->callbacks.emplace(message, callback);
20 21 22 23 24 25
}

void MessageHandler::install_callback_once(const std::string& message,
                                           t_read_callback callback)
{
  log_debug("installing ONCE callback for message " << message);
26
  this->callbacks_once.emplace(message, callback);
27 28 29 30
}

void MessageHandler::remove_callback(const std::string& message)
{
31 32 33
  log_debug("remove_callback: " << message);
  auto res = this->callbacks.erase(message);
  log_warning("Removed " << res << " callbacks");
34 35
}

36
std::vector<t_read_callback> MessageHandler::get_callbacks(const std::string& message)
37
{
38
  std::vector<t_read_callback> res;
39

40 41 42 43 44 45 46 47 48
  auto its = this->callbacks.equal_range(message);
  for (auto it = std::get<0>(its); it != std::get<1>(its); ++it)
    res.push_back(it->second);

  its = this->callbacks_once.equal_range(message);
  for (auto it = std::get<0>(its); it != std::get<1>(its); ++it)
    res.push_back(it->second);
  this->callbacks_once.erase(std::get<0>(its), std::get<1>(its));
  return res;
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
}

void MessageHandler::read_handler(const boost::system::error_code& error, const std::size_t bytes_transferred)
{
  log_debug("read_handler, size: " << bytes_transferred << " bytes.");
  if (error)
    {
      log_debug("Read error: " << error);
      this->on_connection_closed();
      return;
    }
  if (bytes_transferred <= 1)
    {
      log_warning("Not enough data received for a message header to be valid.");
      return ;
    }

  // Extract the needed data from the buffer
  char *c = new char[bytes_transferred+1];
  this->data.sgetn(c, bytes_transferred);
  c[bytes_transferred] = 0;

  // find the . separator
  std::size_t pos = 0;
  while (c[pos] && c[pos] != '.')
    pos++;

  std::string message_name;
  std::size_t size;
  if (pos == bytes_transferred)
    {  // no '.' was found, the message name is assumed to 1 char long.
      message_name = std::string(1, c[0]);
      size = atoi(std::string(c+1, bytes_transferred-2).data());;
    }
  else
    {
      message_name = std::string(c, pos);
      size = atoi(std::string(c+pos+1, bytes_transferred-pos-2).data()); // remove the ending :
    }
  log_debug("Received : " << message_name << " . " << size);
  Message* message = new Message;
  message->set_name(message_name);

  delete[] c;

  // Find out if a callback was registered for that message.
95
  auto callbacks = this->get_callbacks(message_name);
96 97 98 99 100 101 102 103 104
  // We check what we need to read on the socket to have the rest of the binary datas
  const std::size_t length_to_read = this->data.size() >= size ? 0 : size - this->data.size();

  boost::asio::async_read(this->socket,
                          this->data,
                          boost::asio::transfer_at_least(length_to_read),
                          std::bind(&MessageHandler::binary_read_handler, this,
                                    std::placeholders::_1,
                                    message,
105
                                    size, callbacks));
106 107 108 109 110
}

void MessageHandler::binary_read_handler(const boost::system::error_code& error,
                                         Message* message,
                                         std::size_t bytes_transferred,
111
                                         std::vector<t_read_callback> callbacks)
112 113 114 115 116 117 118 119 120 121 122
{
  if (error)
    {
      log_error("binary_read_handler failed: "<< error);
      exit(1);
    }
  log_debug("binary_read_handler " << bytes_transferred);
  message->body = new char[bytes_transferred];
  this->data.sgetn(message->body, bytes_transferred);
  message->set_body_size(bytes_transferred);

123 124 125 126
  if (callbacks.empty())
    {
      log_debug("no callback");
    }
127
  else
128 129
    {
      for (const auto& cb: callbacks)
130 131 132 133 134 135 136 137 138
        {
          try {
            cb(message);
          }
          catch (const SerializationException& error)
            {
              log_error("Invalid message received.");
            }
        }
139
    }
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
  delete message;
  this->install_read_handler();
}

void MessageHandler::install_read_handler(void)
{
  boost::asio::async_read_until(this->socket,
                    this->data,
                    ':',
                    std::bind(&MessageHandler::read_handler, this,
                                          std::placeholders::_1,
                                          std::placeholders::_2));
}

void MessageHandler::request_answer(Message* message, t_read_callback on_answer, std::string name)
{
  if (name.size() == 0)
    name = message->get_name();
  this->install_callback_once(name, on_answer);
  this->send(message);
}

void MessageHandler::send(Message* message, std::function< void(void) > on_sent)
{
  if (on_sent)
    message->callback = on_sent;
  log_debug("Sending message: " << message->get_name());
  this->messages_to_send.push_front(message);
  this->check_messages_to_send();
}

171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
void MessageHandler::send_message(const char* name, const google::protobuf::Message& msg)
{
  Message* message = new Message;
  message->set_name(name);
  message->set_body(msg);
  this->send(message);
}

void MessageHandler::send_message(const char* name, const std::string& archive)
{
  Message* message = new Message;
  message->set_name(name);
  message->set_body(archive.data(), archive.length());
  this->send(message);
}

187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
bool MessageHandler::check_messages_to_send()
{
  log_debug("Length of the queue: " << this->messages_to_send.size());
  if (this->writing || this->messages_to_send.empty())
    return false;
  this->actually_send(this->messages_to_send.back());
  this->messages_to_send.pop_back();
  return true;
}

void MessageHandler::actually_send(Message* message)
{
  this->writing = true;
  message->pack();
  std::vector<boost::asio::const_buffer> buffs;
  buffs.push_back(boost::asio::buffer(message->header.data(), message->header.length()));
  buffs.push_back(boost::asio::buffer(message->body, message->body_size));
  async_write(this->socket,
205 206
              buffs,
              std::bind(&MessageHandler::send_handler, this,
207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228
                        std::placeholders::_1,
                        std::placeholders::_2,
                        message));
}

void MessageHandler::send_handler(const boost::system::error_code& error,
                      std::size_t bytes_transferred,
                      Message* message)
{
  this->writing = false;
  assert(bytes_transferred == message->header.length() + message->body_size);

  if (message->callback)
    message->callback();
  delete message;

  // TODO check for error
  if (error)
    exit(1);

  this->check_messages_to_send();
}