Commit a4ca4f1d authored by louiz’'s avatar louiz’

implement file transfer reception.

parent 750824da
......@@ -56,7 +56,7 @@ void Game::on_authenticate(const std::string& result)
int res = atoi(result.data());
log_debug("on_authenticate :" << res << "." << ((res > 4) ? "Unknown error" : auth_messages[res]));
if (res == 0)
this->request_file("file.txt");
this->request_file("file.bin");
// TODO some UI stuf, etc.
}
......
#include <network/client.hpp>
#include <boost/algorithm/string.hpp>
Client::Client()
{
......@@ -44,12 +45,44 @@ void Client::connect_handler(boost::function< void(void) > on_success,
{
log_info("Connected.");
this->install_read_handler();
this->install_callbacks();
if (on_success)
on_success();
}
}
void Client::poll(void)
void Client::install_callbacks()
{
this->install_callback("TRANSFER", boost::bind(&Client::transfer_init_callback, this, _1, _2));
}
void Client::transfer_init_callback(const char* carg, int length)
{
std::string arg(carg, length);
std::vector<std::string> args;
boost::split(args, arg, boost::is_any_of("|"));
if (args.size() != 3)
{
log_warning("File transfer information are wrong: " << arg);
return ;
}
log_debug("Starting file transfer reception: " << arg);
TransferReceiver* receiver = new TransferReceiver(this, args[0], args[1], atoi(args[2].data()));
this->receivers.push_back(receiver);
}
void Client::on_transfer_ended(const TransferReceiver* receiver)
{
log_debug("on_transfer_ended");
std::vector<TransferReceiver*>::iterator it;
for (it = this->receivers.begin(); it < this->receivers.end(); ++it)
if (*it == receiver)
this->receivers.erase(it);
delete receiver;
}
void Client::poll(void)
{
this->io_service.poll();
}
......
......@@ -10,8 +10,6 @@
* @class Client
*/
#include <network/command_handler.hpp>
#include <boost/asio.hpp>
#include <boost/function.hpp>
#include <boost/bind.hpp>
......@@ -21,6 +19,9 @@
#ifndef __CLIENT_HPP__
# define __CLIENT_HPP__
#include <network/command_handler.hpp>
#include <network/transfer_receiver.hpp>
using boost::asio::ip::tcp;
class Client: public CommandHandler
......@@ -37,6 +38,10 @@ public:
const short&,
boost::function< void(void) > on_success = 0,
boost::function< void(void) > on_failure = 0);
/**
* Install all the default callbacks.
*/
void install_callbacks();
/**
* Check for data on the sockets (to send or write), and execute the
* appropriate handlers. Does nothing if there’s nothing to read or write.
......@@ -45,12 +50,22 @@ public:
void poll(void);
virtual void on_connection_closed() {}
/**
* Called when a file transfer is finished.
*/
void on_transfer_ended(const TransferReceiver*);
private:
void connect_handler(boost::function< void(void) >,
boost::function< void(void) >,
const boost::system::error_code&);
/**
* Called when the server initiate a file transfer with us.
* Install the temporary callback to receive each file chunk.
*/
void transfer_init_callback(const char*, int);
std::vector<TransferReceiver*> receivers;
boost::asio::io_service io_service;
};
......
......@@ -5,23 +5,23 @@ CommandHandler::CommandHandler()
}
void CommandHandler::install_callback(const std::string& command,
boost::function< void(std::string) > callback)
t_read_callback callback)
{
log_debug("installing callback for command " << command);
this->callbacks[command] = callback;
}
void CommandHandler::install_callback_once(const std::string& command,
boost::function< void(std::string) > callback)
t_read_callback callback)
{
log_debug("installing ONCE callback for command " << command);
this->callbacks_once[command] = callback;
}
boost::function< void(std::string) > CommandHandler::get_callback(const std::string& command)
t_read_callback CommandHandler::get_callback(const std::string& command)
{
log_debug("get_callback");
std::map<const std::string, boost::function< void(std::string) > >::iterator it;
std::map<const std::string, t_read_callback >::iterator it;
it = this->callbacks.find(command);
if (it != this->callbacks.end())
......@@ -31,7 +31,7 @@ boost::function< void(std::string) > CommandHandler::get_callback(const std::str
if (it != this->callbacks_once.end())
{
log_debug("Removing callback for command " << command);
boost::function< void(std::string) > callback = it->second;
t_read_callback callback = it->second;
this->callbacks_once.erase(it);
return callback;
}
......@@ -72,13 +72,14 @@ void CommandHandler::read_handler(const boost::system::error_code& error, const
size = atoi(std::string(c+pos+1, bytes_transferred-pos-2).data()); // remove the ending :
}
log_debug(command << " . " << size);
delete c;
delete[] c;
// Find out if a callback was registered for that command.
boost::function< void(std::string) > callback = this->get_callback(command);
t_read_callback callback = this->get_callback(command);
// We check what we need to read on the socket to have the rest of the binary datas
const std::size_t length_to_read = this->data.size() >= size ? 0 : size - this->data.size();
log_debug("length to read on the socket: " << length_to_read);
boost::asio::async_read(*this->socket,
this->data,
boost::asio::transfer_at_least(length_to_read),
......@@ -90,18 +91,18 @@ void CommandHandler::read_handler(const boost::system::error_code& error, const
void CommandHandler::binary_read_handler(const boost::system::error_code& error,
std::size_t bytes_transferred,
// std::size_t size,
boost::function<void(std::string)> callback)
t_read_callback callback)
{
log_debug("binary_read_handler" << bytes_transferred);
log_debug("binary_read_handler " << bytes_transferred);
char *c = new char[bytes_transferred+1];
this->data.sgetn(c, bytes_transferred);
c[bytes_transferred] = 0;
log_debug("[" << c << "]");
if (callback)
callback(c);
callback(c, bytes_transferred);
else
log_debug("no callback");
delete c;
delete[] c;
this->install_read_handler();
}
......@@ -119,7 +120,7 @@ void CommandHandler::install_read_handler(void)
// Send a command and add a callback to be called when the answer to
// this command will be received
void CommandHandler::request_answer(const char* command, const char* data,
boost::function< void(const std::string&) > on_answer)
t_read_callback on_answer)
{
std::string msg(command);
......@@ -133,15 +134,17 @@ void CommandHandler::request_answer(const char* command, const char* data,
this->send(msg.data());
}
void CommandHandler::send(const char* msg, boost::function< void(void) > on_sent)
void CommandHandler::send(const char* msg, boost::function< void(void) > on_sent, int length)
{
if (length == 0)
length = strlen(msg);
async_write(*this->socket,
boost::asio::buffer(msg, strlen(msg)),
boost::asio::buffer(msg, length),
boost::bind(&CommandHandler::send_handler, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred,
on_sent));
log_debug("Sending [" << strlen(msg) << "] bytes");
log_debug("Sending [" << length << "] bytes");
}
void CommandHandler::send_handler(const boost::system::error_code& error,
......@@ -149,6 +152,8 @@ void CommandHandler::send_handler(const boost::system::error_code& error,
boost::function< void(void) > on_sent)
{
// TODO check for error
if (error)
exit(1);
log_debug(bytes_transferred << " bytes sent");
if (on_sent)
on_sent();
......
......@@ -20,8 +20,11 @@
# define __COMMAND_HANDLER_HPP__
#include <network/transfer_sender.hpp>
using boost::asio::ip::tcp;
typedef boost::function<void(const char*, int)> t_read_callback;
class CommandHandler
{
friend void TransferSender::send_next_chunk();
......@@ -40,36 +43,36 @@ public:
* Read the arguments after a command (can read 0 bytes too) and pass that
* to the callback that was associated with this command.
*/
void binary_read_handler(const boost::system::error_code&, std::size_t, boost::function<void(std::string)>);
void binary_read_handler(const boost::system::error_code&, std::size_t, t_read_callback);
/**
* Sends a command, and use install_callback_once to wait for the answer
* and call that callback to handle it.
*/
void request_answer(const char*, const char*, boost::function< void(const std::string&) > on_answer = 0);
protected:
void request_answer(const char*, const char*, t_read_callback on_answer = 0);
/**
* Install a new callback associated with a command. This callback will
* be called upon receiving that command.
*/
void install_callback(const std::string&, boost::function< void(std::string) >);
void install_callback(const std::string&, t_read_callback);
/**
* Install a new callback associated with a command. This callback will
* be called upon receiving that command, but only once. This is used
* for example if you send a command waiting for and answer, you install
* a callback that will handle that answer, and only this one.
*/
void install_callback_once(const std::string&, boost::function< void(std::string) >);
void install_callback_once(const std::string&, t_read_callback);
protected:
/**
* Returns the callback associated with the passed command name.
* Returns 0 if nothing was found, in that case the execution of the
* return value cause a failure.
*/
boost::function< void(std::string) > get_callback(const std::string&);
t_read_callback get_callback(const std::string&);
/**
* Send the given data on the socket.
*/
void send(const char*, boost::function< void(void) > on_send = 0);
void send(const char*, boost::function< void(void) > on_send = 0, int length = 0);
/**
* @todo Check if the data was correctly sent on the socket
*/
......@@ -89,8 +92,8 @@ private:
CommandHandler(const CommandHandler&);
CommandHandler& operator=(const CommandHandler&);
std::map<const std::string, boost::function< void(std::string) > > callbacks;
std::map<const std::string, boost::function< void(std::string) > > callbacks_once;
std::map<const std::string, t_read_callback > callbacks;
std::map<const std::string, t_read_callback > callbacks_once;
};
#endif // __COMMAND_HANDLER_HPP__
......
......@@ -34,12 +34,13 @@ User* RemoteClient::get_user()
void RemoteClient::install_callbacks()
{
this->install_callback("AUTH", boost::bind(&RemoteClient::auth_callback, this, _1));
this->install_callback("TRANSFER", boost::bind(&RemoteClient::transfer_callback, this, _1));
this->install_callback("AUTH", boost::bind(&RemoteClient::auth_callback, this, _1, _2));
this->install_callback("TRANSFER", boost::bind(&RemoteClient::transfer_callback, this, _1, _2));
}
void RemoteClient::auth_callback(const std::string& arg)
void RemoteClient::auth_callback(const char* carg, int)
{
std::string arg(carg);
std::string res("AUTH.");
bool success = false;
log_debug("auth_callback: " << arg);
......@@ -83,9 +84,9 @@ void RemoteClient::auth_callback(const std::string& arg)
this->on_auth_success();
}
void RemoteClient::transfer_callback(const std::string& filename)
void RemoteClient::transfer_callback(const char* filename, int length)
{
this->send_file(filename);
this->send_file(std::string(filename, length));
}
void RemoteClient::on_auth_success()
......@@ -117,7 +118,7 @@ void RemoteClient::on_connection_closed()
this->server->remove_client(this);
}
void RemoteClient::on_transfer_ended(TransferSender* transfer)
void RemoteClient::on_transfer_ended(const TransferSender* transfer)
{
log_debug("on_transfer_ended");
std::vector<TransferSender*>::iterator it;
......
......@@ -56,7 +56,7 @@ public:
* Removes the TransferSender from the list.
* @param transfer The TransferSender to remove from the list.
*/
void on_transfer_ended(TransferSender*);
void on_transfer_ended(const TransferSender*);
/**
* Called when the client is successfully authenticated.
* For example, checks if there are news to send, or offline messages, etc
......@@ -73,8 +73,8 @@ private:
*/
void install_callbacks();
void install_read_handler(void);
void auth_callback(const std::string&);
void transfer_callback(const std::string&);
void auth_callback(const char*, int);
void transfer_callback(const char*, int);
const unsigned long int number;
/**
* A pointer to the server, to call its method when the RemoteClient
......
#include <network/transfer_receiver.hpp>
#include <network/client.hpp>
TransferReceiver::TransferReceiver(Client* client,
const std::string& sid,
const std::string& filename,
int length):
client(client),
id(sid),
filename(filename),
length(length),
received_length(0)
{
std::string command_name("TRANSFER_");
command_name += sid;
this->client->install_callback_once(command_name, boost::bind(&TransferReceiver::get_next_chunk, this, _1, _2));
this->file.open(this->filename.data(), std::ofstream::binary);
}
TransferReceiver::~TransferReceiver()
{
log_debug("Deleting transfer of " << this->filename << ": " << this->id);
this->file.close();
}
void TransferReceiver::get_next_chunk(const char* data, int length)
{
log_debug("coucou");
if (!this->file.good())
{
log_warning("Could not write data to file " << this->filename << ". Stopping transfer");
return ;
}
if (length == 0)
{
// file transfer is over.
this->client->on_transfer_ended(this);
}
else
{
log_debug("Writing " << length << " bytes to file");
this->file.write(data, length);
std::ostringstream sid;
sid << this->id;
std::string command_name("TRANSFER_");
command_name += sid.str();
// transfer is not over yet, add the callback again to get the next chunk.
this->client->install_callback_once(command_name, boost::bind(&TransferReceiver::get_next_chunk, this, _1, _2));
}
}
/** @addtogroup Network
* @{
*/
/**
* Represent an outgoing file transfer.
* @class TransferReceiver
*/
#include <iostream>
#include <fstream>
#include <string>
#ifndef __TRANSFER_RECEIVER_HPP__
# define __TRANSFER_RECEIVER_HPP__
#include <logging/logging.hpp>
class Client;
class TransferReceiver
{
public:
TransferReceiver(Client*, const std::string&, const std::string&, int);
~TransferReceiver();
private:
TransferReceiver(const TransferReceiver&);
TransferReceiver& operator=(const TransferReceiver&);
void get_next_chunk(const char*, int);
Client* client;
std::string id;
const std::string filename;
int length;
int received_length;
std::ofstream file;
};
#endif // __TRANSFER_RECEIVER_HPP__
/**@}*/
#include <network/transfer_sender.hpp>
#include <network/remote_client.hpp>
#define CHUNK_SIZE 65536
unsigned long int TransferSender::current_id = 0;
TransferSender::TransferSender(RemoteClient* client, const std::string& filename):
client(client),
filename(filename),
file(filename.data()),
id(TransferSender::current_id++)
file(filename.data(), std::ofstream::binary)
{
std::ostringstream sid;
sid << TransferSender::current_id++;
this->id = std::string(sid.str());
}
TransferSender::~TransferSender()
......@@ -41,25 +46,25 @@ bool TransferSender::start()
void TransferSender::send_next_chunk()
{
char data[4096+1];
char data[CHUNK_SIZE];
this->file.read(data, 4096);
std::streamsize read_size = this->file.gcount();
std::streamsize read_size = this->file.readsome(data, CHUNK_SIZE);
log_debug("Read from file: " << read_size);
data[read_size] = 0;
std::ostringstream sid;
sid << this->id;
std::ostringstream ssize;
ssize << read_size;
std::string msg = "TRANSFER_" + std::string(sid.str());
msg += std::string(".") + std::string(ssize.str()) + std::string(":") + std::string(data);
std::string msg = "TRANSFER_" + this->id;
msg += std::string(".") + std::string(ssize.str()) + std::string(":");
this->client->send(msg.data());
if (read_size)
// this was not the last chunk, so we'll send an other one when this one is
// successfully sent.
this->client->send(msg.data(), boost::bind(&TransferSender::send_next_chunk, this));
{
// this was not the last chunk, so we'll send an other one when this one is
// successfully sent.
this->client->send(data, boost::bind(&TransferSender::send_next_chunk, this), read_size);
}
else
this->client->send(msg.data(), boost::bind(&RemoteClient::on_transfer_ended, this->client, this));
{
this->client->send(data, boost::bind(&RemoteClient::on_transfer_ended, this->client, this), read_size);
}
}
......@@ -3,7 +3,7 @@
*/
/**
* Represent an outgoing file transfert. The file is sent by chunks, each in a
* Represent an outgoing file transfer. The file is sent by chunks, each in a
* different message. The transfer has an ID that is used in the message's
* command name, that way the receiver just needs to add a callback on that
* ID to receive all the chunks for that specific file. Every chunk is sent as
......@@ -49,7 +49,7 @@ private:
std::ifstream file;
int length;
unsigned long int id;
std::string id;
};
#endif // __TRANSFER_SENDER_HPP__
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment