Commit c2f6f077 authored by louiz’'s avatar louiz’

Make xmlstream use an asyncio loop

Scheduled events, connection, TLS handshake (with STARTTLS), read and write
on the socket are all done using only asyncio.

A lot of threads, and thread-related (and thus useless) things still remain.
This is only a first step.
parent 5ab77c74
......@@ -213,37 +213,12 @@ class BaseXMPP(XMLStream):
log.warning('Legacy XMPP 0.9 protocol detected.')
self.event('legacy_protocol')
def process(self, *args, **kwargs):
"""Initialize plugins and begin processing the XML stream.
The number of threads used for processing stream events is determined
by :data:`HANDLER_THREADS`.
:param bool block: If ``False``, then event dispatcher will run
in a separate thread, allowing for the stream to be
used in the background for another application.
Otherwise, ``process(block=True)`` blocks the current
thread. Defaults to ``False``.
:param bool threaded: **DEPRECATED**
If ``True``, then event dispatcher will run
in a separate thread, allowing for the stream to be
used in the background for another application.
Defaults to ``True``. This does **not** mean that no
threads are used at all if ``threaded=False``.
Regardless of these threading options, these threads will
always exist:
- The event queue processor
- The send queue processor
- The scheduler
"""
def init_plugins(self, *args, **kwargs):
for name in self.plugin:
if not hasattr(self.plugin[name], 'post_inited'):
if hasattr(self.plugin[name], 'post_init'):
self.plugin[name].post_init()
self.plugin[name].post_inited = True
return XMLStream.process(self, *args, **kwargs)
def register_plugin(self, plugin, pconfig={}, module=None):
"""Register and configure a plugin for use in this stream.
......
......@@ -128,8 +128,8 @@ class ClientXMPP(BaseXMPP):
def password(self, value):
self.credentials['password'] = value
def connect(self, address=tuple(), reattempt=True,
use_tls=True, use_ssl=False):
def connect(self, address=tuple(), use_ssl=False,
force_starttls=True, disable_starttls=False):
"""Connect to the XMPP server.
When no address is given, a SRV lookup for the server will
......@@ -155,9 +155,8 @@ class ClientXMPP(BaseXMPP):
address = (self.boundjid.host, 5222)
self.dns_service = 'xmpp-client'
return XMLStream.connect(self, address[0], address[1],
use_tls=use_tls, use_ssl=use_ssl,
reattempt=reattempt)
return XMLStream.connect(self, address[0], address[1], use_ssl=use_ssl,
force_starttls=force_starttls, disable_starttls=disable_starttls)
def register_feature(self, name, handler, restart=False, order=5000):
"""Register a stream feature handler.
......
......@@ -42,13 +42,16 @@ class FeatureBind(BasePlugin):
features -- The stream features stanza.
"""
log.debug("Requesting resource: %s", self.xmpp.requested_jid.resource)
self.features = features
iq = self.xmpp.Iq()
iq['type'] = 'set'
iq.enable('bind')
if self.xmpp.requested_jid.resource:
iq['bind']['resource'] = self.xmpp.requested_jid.resource
response = iq.send(now=True)
iq.send(block=False, callback=self._on_bind_response)
def _on_bind_response(self, response):
self.xmpp.boundjid = JID(response['bind']['jid'], cache_lock=True)
self.xmpp.bound = True
self.xmpp.event('session_bind', self.xmpp.boundjid, direct=True)
......@@ -58,7 +61,7 @@ class FeatureBind(BasePlugin):
log.info("JID set to: %s", self.xmpp.boundjid.full)
if 'session' not in features['features']:
if 'session' not in self.features['features']:
log.debug("Established Session")
self.xmpp.sessionstarted = True
self.xmpp.session_started_event.set()
......
......@@ -233,7 +233,9 @@ class FeatureMechanisms(BasePlugin):
self.xmpp.authenticated = True
self.xmpp.features.add('mechanisms')
self.xmpp.event('auth_success', stanza, direct=True)
raise RestartStream()
# Restart the stream
self.xmpp.init_parser()
self.xmpp.send_raw(self.xmpp.stream_header)
def _handle_fail(self, stanza):
"""SASL authentication failed. Disconnect and shutdown."""
......
......@@ -44,8 +44,9 @@ class FeatureSession(BasePlugin):
iq = self.xmpp.Iq()
iq['type'] = 'set'
iq.enable('session')
iq.send(now=True)
iq.send(block=False, callback=self._on_start_session_response)
def _on_start_session_response(self, response):
self.xmpp.features.add('session')
log.debug("Established Session")
......
......@@ -52,7 +52,7 @@ class FeatureSTARTTLS(BasePlugin):
# We have already negotiated TLS, but the server is
# offering it again, against spec.
return False
elif not self.xmpp.use_tls:
elif self.xmpp.disable_starttls:
return False
else:
self.xmpp.send(features['starttls'], now=True)
......@@ -63,4 +63,3 @@ class FeatureSTARTTLS(BasePlugin):
log.debug("Starting TLS")
if self.xmpp.start_tls():
self.xmpp.features.add('starttls')
raise RestartStream()
......@@ -7,13 +7,12 @@
"""
from slixmpp.jid import JID
from slixmpp.xmlstream.scheduler import Scheduler
from slixmpp.xmlstream.stanzabase import StanzaBase, ElementBase, ET
from slixmpp.xmlstream.stanzabase import register_stanza_plugin
from slixmpp.xmlstream.tostring import tostring
from slixmpp.xmlstream.xmlstream import XMLStream, RESPONSE_TIMEOUT
from slixmpp.xmlstream.xmlstream import RestartStream
__all__ = ['JID', 'Scheduler', 'StanzaBase', 'ElementBase',
__all__ = ['JID', 'StanzaBase', 'ElementBase',
'ET', 'StateMachine', 'tostring', 'XMLStream',
'RESPONSE_TIMEOUT', 'RestartStream']
# -*- coding: utf-8 -*-
"""
slixmpp.xmlstream.scheduler
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
This module provides a task scheduler that works better
with Slixmpp's threading usage than the stock version.
Part of Slixmpp: The Slick XMPP Library
:copyright: (c) 2011 Nathanael C. Fritz
:license: MIT, see LICENSE for more details
"""
import time
import threading
import logging
import itertools
from slixmpp.util import Queue, QueueEmpty
#: The time in seconds to wait for events from the event queue, and also the
#: time between checks for the process stop signal.
WAIT_TIMEOUT = 1.0
log = logging.getLogger(__name__)
class Task(object):
"""
A scheduled task that will be executed by the scheduler
after a given time interval has passed.
:param string name: The name of the task.
:param int seconds: The number of seconds to wait before executing.
:param callback: The function to execute.
:param tuple args: The arguments to pass to the callback.
:param dict kwargs: The keyword arguments to pass to the callback.
:param bool repeat: Indicates if the task should repeat.
Defaults to ``False``.
:param pointer: A pointer to an event queue for queuing callback
execution instead of executing immediately.
"""
def __init__(self, name, seconds, callback, args=None,
kwargs=None, repeat=False, qpointer=None):
#: The name of the task.
self.name = name
#: The number of seconds to wait before executing.
self.seconds = seconds
#: The function to execute once enough time has passed.
self.callback = callback
#: The arguments to pass to :attr:`callback`.
self.args = args or tuple()
#: The keyword arguments to pass to :attr:`callback`.
self.kwargs = kwargs or {}
#: Indicates if the task should repeat after executing,
#: using the same :attr:`seconds` delay.
self.repeat = repeat
#: The time when the task should execute next.
self.next = time.time() + self.seconds
#: The main event queue, which allows for callbacks to
#: be queued for execution instead of executing immediately.
self.qpointer = qpointer
def run(self):
"""Execute the task's callback.
If an event queue was supplied, place the callback in the queue;
otherwise, execute the callback immediately.
"""
if self.qpointer is not None:
self.qpointer.put(('schedule', self.callback,
self.args, self.kwargs, self.name))
else:
self.callback(*self.args, **self.kwargs)
self.reset()
return self.repeat
def reset(self):
"""Reset the task's timer so that it will repeat."""
self.next = time.time() + self.seconds
class Scheduler(object):
"""
A threaded scheduler that allows for updates mid-execution unlike the
scheduler in the standard library.
Based on: http://docs.python.org/library/sched.html#module-sched
:param parentstop: An :class:`~threading.Event` to signal stopping
the scheduler.
"""
def __init__(self, parentstop=None):
#: A queue for storing tasks
self.addq = Queue()
#: A list of tasks in order of execution time.
self.schedule = []
#: If running in threaded mode, this will be the thread processing
#: the schedule.
self.thread = None
#: A flag indicating that the scheduler is running.
self.run = False
#: An :class:`~threading.Event` instance for signalling to stop
#: the scheduler.
self.stop = parentstop
#: Lock for accessing the task queue.
self.schedule_lock = threading.RLock()
#: The time in seconds to wait for events from the event queue,
#: and also the time between checks for the process stop signal.
self.wait_timeout = WAIT_TIMEOUT
def process(self, threaded=True, daemon=False):
"""Begin accepting and processing scheduled tasks.
:param bool threaded: Indicates if the scheduler should execute
in its own thread. Defaults to ``True``.
"""
if threaded:
self.thread = threading.Thread(name='scheduler_process',
target=self._process)
self.thread.daemon = daemon
self.thread.start()
else:
self._process()
def _process(self):
"""Process scheduled tasks."""
self.run = True
try:
while self.run and not self.stop.is_set():
updated = False
if self.schedule:
wait = self.schedule[0].next - time.time()
else:
wait = self.wait_timeout
try:
if wait <= 0.0:
newtask = self.addq.get(False)
else:
newtask = None
while self.run and \
not self.stop.is_set() and \
newtask is None and \
wait > 0:
try:
newtask = self.addq.get(True, min(wait, self.wait_timeout))
except QueueEmpty: # Nothing to add, nothing to do. Check run flags and continue waiting.
wait -= self.wait_timeout
except QueueEmpty: # Time to run some tasks, and no new tasks to add.
self.schedule_lock.acquire()
# select only those tasks which are to be executed now
relevant = itertools.takewhile(
lambda task: time.time() >= task.next, self.schedule)
# run the tasks and keep the return value in a tuple
status = map(lambda task: (task, task.run()), relevant)
# remove non-repeating tasks
for task, doRepeat in status:
if not doRepeat:
try:
self.schedule.remove(task)
except ValueError:
pass
else:
# only need to resort tasks if a repeated task has
# been kept in the list.
updated = True
else: # Add new task
self.schedule_lock.acquire()
if newtask is not None:
self.schedule.append(newtask)
updated = True
finally:
if updated:
self.schedule.sort(key=lambda task: task.next)
self.schedule_lock.release()
except KeyboardInterrupt:
self.run = False
except SystemExit:
self.run = False
log.debug("Quitting Scheduler thread")
def add(self, name, seconds, callback, args=None,
kwargs=None, repeat=False, qpointer=None):
"""Schedule a new task.
:param string name: The name of the task.
:param int seconds: The number of seconds to wait before executing.
:param callback: The function to execute.
:param tuple args: The arguments to pass to the callback.
:param dict kwargs: The keyword arguments to pass to the callback.
:param bool repeat: Indicates if the task should repeat.
Defaults to ``False``.
:param pointer: A pointer to an event queue for queuing callback
execution instead of executing immediately.
"""
try:
self.schedule_lock.acquire()
for task in self.schedule:
if task.name == name:
raise ValueError("Key %s already exists" % name)
self.addq.put(Task(name, seconds, callback, args,
kwargs, repeat, qpointer))
except:
raise
finally:
self.schedule_lock.release()
def remove(self, name):
"""Remove a scheduled task ahead of schedule, and without
executing it.
:param string name: The name of the task to remove.
"""
try:
self.schedule_lock.acquire()
the_task = None
for task in self.schedule:
if task.name == name:
the_task = task
if the_task is not None:
self.schedule.remove(the_task)
except:
raise
finally:
self.schedule_lock.release()
def quit(self):
"""Shutdown the scheduler."""
self.run = False
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment