Commit 6477f57d authored by louiz’'s avatar louiz’
Browse files

Finally (and hopefuly…) fix all corrupted data sent with async_write.

Since doing more than one simultaneous async_write on one single socket
is NOT safe, we use a deque<Command*> to keep the list of all the command
to send, and we send them properly one by one.
parent c149ab23
...@@ -65,7 +65,6 @@ void Game::on_authenticate(Command* received_command) ...@@ -65,7 +65,6 @@ void Game::on_authenticate(Command* received_command)
log_debug("on_authenticate :" << res << "." << ((res > 4) ? "Unknown error" : auth_messages[res])); log_debug("on_authenticate :" << res << "." << ((res > 4) ? "Unknown error" : auth_messages[res]));
if (res == 0) if (res == 0)
this->request_file("file.bin"); this->request_file("file.bin");
// TODO some UI stuf, etc.
} }
void Game::run() void Game::run()
......
...@@ -2,7 +2,8 @@ ...@@ -2,7 +2,8 @@
Command::Command(): Command::Command():
body(0), body(0),
body_size(0) body_size(0),
callback(0)
{ {
} }
......
...@@ -28,6 +28,8 @@ ...@@ -28,6 +28,8 @@
#include <fstream> #include <fstream>
#include <sstream> #include <sstream>
#include <boost/function.hpp>
#ifndef __COMMAND_HPP__ #ifndef __COMMAND_HPP__
# define __COMMAND_HPP__ # define __COMMAND_HPP__
...@@ -66,6 +68,10 @@ public: ...@@ -66,6 +68,10 @@ public:
std::string header; std::string header;
std::string name; std::string name;
size_t body_size; size_t body_size;
/**
* If not 0, will be called from the send_handler.
*/
boost::function< void(void) > callback;
private: private:
Command(const Command&); Command(const Command&);
......
#include <network/command_handler.hpp> #include <network/command_handler.hpp>
CommandHandler::CommandHandler() CommandHandler::CommandHandler():
writing(false)
{ {
} }
...@@ -135,8 +136,6 @@ void CommandHandler::install_read_handler(void) ...@@ -135,8 +136,6 @@ void CommandHandler::install_read_handler(void)
boost::asio::placeholders::bytes_transferred)); boost::asio::placeholders::bytes_transferred));
} }
// Send a command and add a callback to be called when the answer to
// this command will be received
void CommandHandler::request_answer(Command* command, t_read_callback on_answer) void CommandHandler::request_answer(Command* command, t_read_callback on_answer)
{ {
// We may want to send a command that do not require an answer. // We may want to send a command that do not require an answer.
...@@ -147,6 +146,29 @@ void CommandHandler::request_answer(Command* command, t_read_callback on_answer) ...@@ -147,6 +146,29 @@ void CommandHandler::request_answer(Command* command, t_read_callback on_answer)
void CommandHandler::send(Command* command, boost::function< void(void) > on_sent) void CommandHandler::send(Command* command, boost::function< void(void) > on_sent)
{ {
if (on_sent)
command->callback = on_sent;
this->commands_to_send.push_front(command);
this->check_commands_to_send();
}
bool CommandHandler::check_commands_to_send()
{
log_debug("Length of the queue: " << this->commands_to_send.size());
// log_debug("check_commands_to_send");
if (this->writing || this->commands_to_send.empty())
{
// log_debug("not sending: this->writing=" << this->writing << " queue empty" << this->commands_to_send.empty());
return false;
}
this->actually_send(this->commands_to_send.back());
this->commands_to_send.pop_back();
return true;
}
void CommandHandler::actually_send(Command* command)
{
this->writing = true;
command->pack(); command->pack();
std::vector<boost::asio::const_buffer> buffs; std::vector<boost::asio::const_buffer> buffs;
buffs.push_back(boost::asio::buffer(command->header.data(), command->header.length())); buffs.push_back(boost::asio::buffer(command->header.data(), command->header.length()));
...@@ -156,19 +178,23 @@ void CommandHandler::send(Command* command, boost::function< void(void) > on_sen ...@@ -156,19 +178,23 @@ void CommandHandler::send(Command* command, boost::function< void(void) > on_sen
boost::bind(&CommandHandler::send_handler, this, boost::bind(&CommandHandler::send_handler, this,
boost::asio::placeholders::error, boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred, boost::asio::placeholders::bytes_transferred,
on_sent, command)); command));
} }
void CommandHandler::send_handler(const boost::system::error_code& error, void CommandHandler::send_handler(const boost::system::error_code& error,
std::size_t bytes_transferred, std::size_t bytes_transferred,
boost::function< void(void) > on_sent, Command* command) Command* command)
{ {
this->writing = false;
assert(bytes_transferred == command->header.length() + command->body_size); assert(bytes_transferred == command->header.length() + command->body_size);
if (command->callback)
command->callback();
delete command; delete command;
// TODO check for error // TODO check for error
if (error) if (error)
exit(1); exit(1);
if (on_sent)
on_sent(); this->check_commands_to_send();
} }
...@@ -24,6 +24,7 @@ ...@@ -24,6 +24,7 @@
using boost::asio::ip::tcp; using boost::asio::ip::tcp;
typedef boost::function<void(Command*)> t_read_callback; typedef boost::function<void(Command*)> t_read_callback;
typedef std::deque<Command*> command_queue;
class CommandHandler class CommandHandler
{ {
...@@ -61,13 +62,15 @@ public: ...@@ -61,13 +62,15 @@ public:
* a callback that will handle that answer, and only this one. * a callback that will handle that answer, and only this one.
*/ */
void install_callback_once(const std::string&, t_read_callback); void install_callback_once(const std::string&, t_read_callback);
/** /**
* Remove a callback that has been installed. * Remove a callback that has been installed.
*/ */
void remove_callback(const std::string&); void remove_callback(const std::string&);
/** /**
* Send the given command on the socket. * Add the given command to the commands_to_send queue, then
* calls check_commands_to_send. Which may send the next available
* command, if there's no async_write() call already running.
* It does not necessarily actually send the command on the socket.
*/ */
void send(Command* command, boost::function< void(void) > on_sent = 0); void send(Command* command, boost::function< void(void) > on_sent = 0);
...@@ -81,16 +84,27 @@ protected: ...@@ -81,16 +84,27 @@ protected:
/** /**
* @todo Check if the data was correctly sent on the socket * @todo Check if the data was correctly sent on the socket
*/ */
void send_handler(const boost::system::error_code&, std::size_t, boost::function< void(void) >, Command*); void send_handler(const boost::system::error_code&, std::size_t, Command*);
/**
* Actually sends the command on the socket, calling async_write.
*/
void actually_send(Command*);
/**
* Checks if there's something to send on the socket. We first check if
* the writing boolean is false, and then we pop the next available command
* from the queue (if any) and we send it using async_write.
* @returns true if a command was sent, false otherwise.
*/
bool check_commands_to_send();
/** /**
* A buffer keeping the data that is read on the socket. * A buffer keeping the data that is read on the socket.
*/ */
boost::asio::streambuf data;
/** /**
* What happens when a read error occurs. * What happens when a read error occurs.
*/ */
virtual void on_connection_closed() = 0; virtual void on_connection_closed() = 0;
boost::asio::streambuf data;
tcp::socket* socket; tcp::socket* socket;
private: private:
...@@ -99,6 +113,18 @@ private: ...@@ -99,6 +113,18 @@ private:
std::map<const std::string, t_read_callback > callbacks; std::map<const std::string, t_read_callback > callbacks;
std::map<const std::string, t_read_callback > callbacks_once; std::map<const std::string, t_read_callback > callbacks_once;
/**
* A queue of messages. If there's not async_write running, we pop one
* from it and we send it.
*/
command_queue commands_to_send;
/**
* Tells us if we are waiting for an async_write to finish or note.
* This must be set to true when calling async_write(), to false
* in the write handler. It is used by check_commands_to_send.
*/
bool writing;
}; };
#endif // __COMMAND_HANDLER_HPP__ #endif // __COMMAND_HANDLER_HPP__
......
...@@ -19,6 +19,8 @@ BOOST_AUTO_TEST_CASE(command_consistency_test) ...@@ -19,6 +19,8 @@ BOOST_AUTO_TEST_CASE(command_consistency_test)
BOOST_REQUIRE(command->header == std::string("FAKE_COMMAND.15:")); BOOST_REQUIRE(command->header == std::string("FAKE_COMMAND.15:"));
BOOST_REQUIRE(command->body_size == 15); BOOST_REQUIRE(command->body_size == 15);
BOOST_REQUIRE(strncmp(command->body, "coucou les amis", 15) == 0); BOOST_REQUIRE(strncmp(command->body, "coucou les amis", 15) == 0);
delete command;
} }
BOOST_AUTO_TEST_SUITE_END() BOOST_AUTO_TEST_SUITE_END()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment