Commit 99aba566 authored by louiz’'s avatar louiz’

Connection to servers does not block the process anymore

parent 61ca40fa
...@@ -52,7 +52,7 @@ void Bridge::clean() ...@@ -52,7 +52,7 @@ void Bridge::clean()
while (it != this->irc_clients.end()) while (it != this->irc_clients.end())
{ {
IrcClient* client = it->second.get(); IrcClient* client = it->second.get();
if (!client->is_connected()) if (!client->is_connected() && !client->is_connecting())
it = this->irc_clients.erase(it); it = this->irc_clients.erase(it);
else else
++it; ++it;
...@@ -249,7 +249,7 @@ std::string Bridge::get_own_nick(const Iid& iid) ...@@ -249,7 +249,7 @@ std::string Bridge::get_own_nick(const Iid& iid)
return ""; return "";
} }
size_t Bridge::connected_clients() const size_t Bridge::active_clients() const
{ {
return this->irc_clients.size(); return this->irc_clients.size();
} }
......
...@@ -105,9 +105,9 @@ public: ...@@ -105,9 +105,9 @@ public:
*/ */
std::string get_own_nick(const Iid& iid); std::string get_own_nick(const Iid& iid);
/** /**
* Get the number of server to which this bridge is connected. * Get the number of server to which this bridge is connected or connecting.
*/ */
size_t connected_clients() const; size_t active_clients() const;
private: private:
/** /**
...@@ -125,7 +125,7 @@ private: ...@@ -125,7 +125,7 @@ private:
* The JID of the user associated with this bridge. Messages from/to this * The JID of the user associated with this bridge. Messages from/to this
* JID are only managed by this bridge. * JID are only managed by this bridge.
*/ */
std::string user_jid; const std::string user_jid;
/** /**
* One IrcClient for each IRC server we need to be connected to. * One IrcClient for each IRC server we need to be connected to.
* The pointer is shared by the bridge and the poller. * The pointer is shared by the bridge and the poller.
......
...@@ -29,10 +29,13 @@ void IrcClient::start() ...@@ -29,10 +29,13 @@ void IrcClient::start()
{ {
this->bridge->send_xmpp_message(this->hostname, "", std::string("Connecting to ") + this->bridge->send_xmpp_message(this->hostname, "", std::string("Connecting to ") +
this->hostname + ":" + "6667"); this->hostname + ":" + "6667");
std::pair<bool, std::string> res = this->connect(this->hostname, "6667"); this->connect(this->hostname, "6667");
if (!res.first) }
this->bridge->send_xmpp_message(this->hostname, "",
std::string("Connection failed: ") + res.second); void IrcClient::on_connection_failed(const std::string& reason)
{
this->bridge->send_xmpp_message(this->hostname, "",
std::string("Connection failed: ") + reason);
} }
void IrcClient::on_connected() void IrcClient::on_connected()
......
...@@ -29,6 +29,10 @@ public: ...@@ -29,6 +29,10 @@ public:
* Connect to the IRC server * Connect to the IRC server
*/ */
void start(); void start();
/**
* Called when the connection to the server cannot be established
*/
void on_connection_failed(const std::string& reason) override final;
/** /**
* Called when successfully connected to the server * Called when successfully connected to the server
*/ */
......
...@@ -66,11 +66,6 @@ int main(int ac, char** av) ...@@ -66,11 +66,6 @@ int main(int ac, char** av)
Poller p; Poller p;
p.add_socket_handler(xmpp_component); p.add_socket_handler(xmpp_component);
if (!xmpp_component->start())
{
log_info("Exiting");
return -1;
}
// Install the signals used to exit the process cleanly, or reload the // Install the signals used to exit the process cleanly, or reload the
// config // config
...@@ -93,8 +88,10 @@ int main(int ac, char** av) ...@@ -93,8 +88,10 @@ int main(int ac, char** av)
sigaction(SIGUSR1, &on_sigusr, nullptr); sigaction(SIGUSR1, &on_sigusr, nullptr);
sigaction(SIGUSR2, &on_sigusr, nullptr); sigaction(SIGUSR2, &on_sigusr, nullptr);
xmpp_component->start();
const std::chrono::milliseconds timeout(-1); const std::chrono::milliseconds timeout(-1);
while (p.poll(timeout) != -1 || !exiting) while (p.poll(timeout) != -1)
{ {
// Check for empty irc_clients (not connected, or with no joined // Check for empty irc_clients (not connected, or with no joined
// channel) and remove them // channel) and remove them
...@@ -119,6 +116,8 @@ int main(int ac, char** av) ...@@ -119,6 +116,8 @@ int main(int ac, char** av)
} }
// If the only existing connection is the one to the XMPP component: // If the only existing connection is the one to the XMPP component:
// close the XMPP stream. // close the XMPP stream.
if (exiting && xmpp_component->is_connecting())
xmpp_component->close();
if (exiting && p.size() == 1 && xmpp_component->is_document_open()) if (exiting && p.size() == 1 && xmpp_component->is_document_open())
xmpp_component->close_document(); xmpp_component->close_document();
} }
......
...@@ -155,8 +155,7 @@ int Poller::poll(const std::chrono::milliseconds& timeout) ...@@ -155,8 +155,7 @@ int Poller::poll(const std::chrono::milliseconds& timeout)
else if (this->fds[i].revents & POLLIN) else if (this->fds[i].revents & POLLIN)
{ {
auto socket_handler = this->socket_handlers.at(this->fds[i].fd); auto socket_handler = this->socket_handlers.at(this->fds[i].fd);
if (socket_handler->is_connected()) socket_handler->on_recv();
socket_handler->on_recv();
nb_events--; nb_events--;
} }
else if (this->fds[i].revents & POLLOUT) else if (this->fds[i].revents & POLLOUT)
...@@ -164,6 +163,8 @@ int Poller::poll(const std::chrono::milliseconds& timeout) ...@@ -164,6 +163,8 @@ int Poller::poll(const std::chrono::milliseconds& timeout)
auto socket_handler = this->socket_handlers.at(this->fds[i].fd); auto socket_handler = this->socket_handlers.at(this->fds[i].fd);
if (socket_handler->is_connected()) if (socket_handler->is_connected())
socket_handler->on_send(); socket_handler->on_send();
else
socket_handler->connect();
nb_events--; nb_events--;
} }
} }
...@@ -185,7 +186,12 @@ int Poller::poll(const std::chrono::milliseconds& timeout) ...@@ -185,7 +186,12 @@ int Poller::poll(const std::chrono::milliseconds& timeout)
if (revents[i].events & EPOLLIN) if (revents[i].events & EPOLLIN)
socket_handler->on_recv(); socket_handler->on_recv();
if (revents[i].events & EPOLLOUT) if (revents[i].events & EPOLLOUT)
socket_handler->on_send(); {
if (socket_handler->is_connected())
socket_handler->on_send();
else
socket_handler->connect();
}
} }
return nb_events; return nb_events;
#endif #endif
......
...@@ -10,6 +10,7 @@ ...@@ -10,6 +10,7 @@
#include <unistd.h> #include <unistd.h>
#include <errno.h> #include <errno.h>
#include <cstring> #include <cstring>
#include <fcntl.h>
#include <netdb.h> #include <netdb.h>
#include <stdio.h> #include <stdio.h>
...@@ -17,18 +18,38 @@ ...@@ -17,18 +18,38 @@
SocketHandler::SocketHandler(): SocketHandler::SocketHandler():
poller(nullptr), poller(nullptr),
connected(false) connected(false),
connecting(false)
{
this->init_socket();
}
void SocketHandler::init_socket()
{ {
if ((this->socket = ::socket(AF_INET, SOCK_STREAM, 0)) == -1) if ((this->socket = ::socket(AF_INET, SOCK_STREAM, 0)) == -1)
throw std::runtime_error("Could not create socket"); throw std::runtime_error("Could not create socket");
int optval = 1; int optval = 1;
if (::setsockopt(this->socket, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(optval)) == -1) if (::setsockopt(this->socket, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(optval)) == -1)
log_warning("Failed to enable TCP keepalive on socket: " << strerror(errno)); log_warning("Failed to enable TCP keepalive on socket: " << strerror(errno));
// Set the socket on non-blocking mode. This is useful to receive a EAGAIN
// error when connect() would block, to not block the whole process if a
// remote is not responsive.
const int existing_flags = ::fcntl(this->socket, F_GETFL, 0);
if ((existing_flags == -1) ||
(::fcntl(this->socket, F_SETFL, existing_flags | O_NONBLOCK) == -1))
throw std::runtime_error(std::string("Could not initialize socket: ") + strerror(errno));
} }
std::pair<bool, std::string> SocketHandler::connect(const std::string& address, const std::string& port) void SocketHandler::connect(const std::string& address, const std::string& port)
{ {
log_info("Trying to connect to " << address << ":" << port); if (!this->connecting)
{
log_info("Trying to connect to " << address << ":" << port);
}
this->connecting = true;
this->address = address;
this->port = port;
struct addrinfo hints; struct addrinfo hints;
memset(&hints, 0, sizeof(struct addrinfo)); memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_flags = 0; hints.ai_flags = 0;
...@@ -43,7 +64,8 @@ std::pair<bool, std::string> SocketHandler::connect(const std::string& address, ...@@ -43,7 +64,8 @@ std::pair<bool, std::string> SocketHandler::connect(const std::string& address,
{ {
log_warning(std::string("getaddrinfo failed: ") + gai_strerror(res)); log_warning(std::string("getaddrinfo failed: ") + gai_strerror(res));
this->close(); this->close();
return std::make_pair(false, gai_strerror(res)); this->on_connection_failed(gai_strerror(res));
return ;
} }
// Make sure the alloced structure is always freed at the end of the // Make sure the alloced structure is always freed at the end of the
...@@ -56,14 +78,28 @@ std::pair<bool, std::string> SocketHandler::connect(const std::string& address, ...@@ -56,14 +78,28 @@ std::pair<bool, std::string> SocketHandler::connect(const std::string& address,
{ {
log_info("Connection success."); log_info("Connection success.");
this->connected = true; this->connected = true;
this->connecting = false;
this->on_connected(); this->on_connected();
return std::make_pair(true, ""); return ;
}
else if (errno == EINPROGRESS || errno == EALREADY)
{ // retry this process later, when the socket
// is ready to be written on.
log_debug("Need to retry connecting later..." << strerror(errno));
this->poller->watch_send_events(this);
return ;
} }
log_info("Connection failed:" << strerror(errno)); log_info("Connection failed:" << strerror(errno));
} }
log_error("All connection attempts failed."); log_error("All connection attempts failed.");
this->close(); this->close();
return std::make_pair(false, ""); this->on_connection_failed(strerror(errno));
return ;
}
void SocketHandler::connect()
{
this->connect(this->address, this->port);
} }
void SocketHandler::set_poller(Poller* poller) void SocketHandler::set_poller(Poller* poller)
...@@ -114,11 +150,11 @@ void SocketHandler::on_send() ...@@ -114,11 +150,11 @@ void SocketHandler::on_send()
void SocketHandler::close() void SocketHandler::close()
{ {
this->connected = false; this->connected = false;
this->connecting = false;
this->poller->remove_socket_handler(this->get_socket()); this->poller->remove_socket_handler(this->get_socket());
::close(this->socket); ::close(this->socket);
// recreate the socket for a potential future usage // recreate the socket for a potential future usage
if ((this->socket = ::socket(AF_INET, SOCK_STREAM, 0)) == -1) this->init_socket();
throw std::runtime_error("Could not create socket");
} }
socket_t SocketHandler::get_socket() const socket_t SocketHandler::get_socket() const
...@@ -139,3 +175,8 @@ bool SocketHandler::is_connected() const ...@@ -139,3 +175,8 @@ bool SocketHandler::is_connected() const
{ {
return this->connected; return this->connected;
} }
bool SocketHandler::is_connecting() const
{
return this->connecting;
}
...@@ -19,10 +19,15 @@ class SocketHandler ...@@ -19,10 +19,15 @@ class SocketHandler
public: public:
explicit SocketHandler(); explicit SocketHandler();
virtual ~SocketHandler() {} virtual ~SocketHandler() {}
/**
* (re-)Initialize the socket
*/
void init_socket();
/** /**
* Connect to the remote server, and call on_connected() if this succeeds * Connect to the remote server, and call on_connected() if this succeeds
*/ */
std::pair<bool, std::string> connect(const std::string& address, const std::string& port); void connect(const std::string& address, const std::string& port);
void connect();
/** /**
* Set the pointer to the given Poller, to communicate with it. * Set the pointer to the given Poller, to communicate with it.
*/ */
...@@ -53,6 +58,11 @@ public: ...@@ -53,6 +58,11 @@ public:
* Called when the connection is successful. * Called when the connection is successful.
*/ */
virtual void on_connected() = 0; virtual void on_connected() = 0;
/**
* Called when the connection fails. Not when it is closed later, just at
* the connect() call.
*/
virtual void on_connection_failed(const std::string& reason) = 0;
/** /**
* Called when we detect a disconnection from the remote host. * Called when we detect a disconnection from the remote host.
*/ */
...@@ -63,6 +73,7 @@ public: ...@@ -63,6 +73,7 @@ public:
*/ */
virtual void parse_in_buffer() = 0; virtual void parse_in_buffer() = 0;
bool is_connected() const; bool is_connected() const;
bool is_connecting() const;
protected: protected:
socket_t socket; socket_t socket;
...@@ -86,7 +97,17 @@ protected: ...@@ -86,7 +97,17 @@ protected:
* (actually it is sharing our ownership with a Bridge). * (actually it is sharing our ownership with a Bridge).
*/ */
Poller* poller; Poller* poller;
/**
* Hostname we are connected/connecting to
*/
std::string address;
/**
* Port we are connected/connecting to
*/
std::string port;
bool connected; bool connected;
bool connecting;
private: private:
SocketHandler(const SocketHandler&) = delete; SocketHandler(const SocketHandler&) = delete;
......
...@@ -50,9 +50,9 @@ XmppComponent::~XmppComponent() ...@@ -50,9 +50,9 @@ XmppComponent::~XmppComponent()
{ {
} }
bool XmppComponent::start() void XmppComponent::start()
{ {
return this->connect("127.0.0.1", "5347").first; this->connect("127.0.0.1", "5347");
} }
bool XmppComponent::is_document_open() const bool XmppComponent::is_document_open() const
...@@ -67,6 +67,11 @@ void XmppComponent::send_stanza(const Stanza& stanza) ...@@ -67,6 +67,11 @@ void XmppComponent::send_stanza(const Stanza& stanza)
this->send_data(std::move(str)); this->send_data(std::move(str));
} }
void XmppComponent::on_connection_failed(const std::string& reason)
{
log_error("Failed to connect to the XMPP server: " << reason);
}
void XmppComponent::on_connected() void XmppComponent::on_connected()
{ {
log_info("connected to XMPP server"); log_info("connected to XMPP server");
...@@ -103,7 +108,7 @@ void XmppComponent::clean() ...@@ -103,7 +108,7 @@ void XmppComponent::clean()
while (it != this->bridges.end()) while (it != this->bridges.end())
{ {
it->second->clean(); it->second->clean();
if (it->second->connected_clients() == 0) if (it->second->active_clients() == 0)
it = this->bridges.erase(it); it = this->bridges.erase(it);
else else
++it; ++it;
......
...@@ -20,6 +20,7 @@ class XmppComponent: public SocketHandler ...@@ -20,6 +20,7 @@ class XmppComponent: public SocketHandler
public: public:
explicit XmppComponent(const std::string& hostname, const std::string& secret); explicit XmppComponent(const std::string& hostname, const std::string& secret);
~XmppComponent(); ~XmppComponent();
void on_connection_failed(const std::string& reason) override final;
void on_connected() override final; void on_connected() override final;
void on_connection_close() override final; void on_connection_close() override final;
void parse_in_buffer() override final; void parse_in_buffer() override final;
...@@ -38,9 +39,8 @@ public: ...@@ -38,9 +39,8 @@ public:
void clean(); void clean();
/** /**
* Connect to the XMPP server. * Connect to the XMPP server.
* Returns false if we failed to connect
*/ */
bool start(); void start();
/** /**
* Serialize the stanza and add it to the out_buf to be sent to the * Serialize the stanza and add it to the out_buf to be sent to the
* server. * server.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment