plugin_e2ee.py 9.52 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# vim:fenc=utf-8 et ts=4 sts=4 sw=4
#
# Copyright © 2019 Maxime “pep” Buquet <pep@bouah.net>
#
# Distributed under terms of the zlib license. See COPYING file.

"""
    Interface for E2EE (End-to-end Encryption) plugins.
"""

13
from typing import Callable, Dict, Optional, Union
14 15

from slixmpp import InvalidJID, JID, Message
16
from slixmpp.xmlstream import StanzaBase
17 18 19 20 21 22 23 24 25 26 27 28 29
from poezio.tabs import ConversationTab, DynamicConversationTab, PrivateTab, MucTab
from poezio.plugin import BasePlugin

import logging
log = logging.getLogger(__name__)


ChatTabs = Union[
    MucTab,
    DynamicConversationTab,
    PrivateTab,
]

30 31 32
EME_NS = 'urn:xmpp:eme:0'
EME_TAG = 'encryption'

33 34 35
JCLIENT_NS = 'jabber:client'
HINTS_NS = 'urn:xmpp:hints'

36 37

class E2EEPlugin(BasePlugin):
Maxime Buquet's avatar
Maxime Buquet committed
38
    """Interface for E2EE plugins.
39

Maxime Buquet's avatar
Maxime Buquet committed
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
        This is a wrapper built on top of BasePlugin. It provides a base for
        End-to-end Encryption mechanisms in poezio.

        Plugin developers are excepted to implement the `decrypt` and
        `encrypt` function, provide an encryption name (and/or short name),
        and an eme namespace.

        Once loaded, the plugin will attempt to decrypt any message that
        contains an EME message that matches the one set.

        The plugin will also register a command (using the short name) to
        enable encryption per tab. It is only possible to have one encryption
        mechanism per tab, even if multiple e2ee plugins are loaded.

        The encryption status will be displayed in the status bar, using the
        plugin short name, alongside the JID, nickname etc.
    """

    #: Specifies that the encryption mechanism does more than encrypting
    #: <body/>.
60 61
    stanza_encryption = False

Maxime Buquet's avatar
Maxime Buquet committed
62
    #: Whitelist applied to messages when `stanza_encryption` is False.
63
    tag_whitelist = list(map(lambda x: '{%s}%s' % (x[0], x[1]), [
64
        (JCLIENT_NS, 'body'),
65 66 67 68 69
        (EME_NS, EME_TAG),
        (HINTS_NS, 'store'),
        (HINTS_NS, 'no-copy'),
        (HINTS_NS, 'no-store'),
        (HINTS_NS, 'no-permanent-store'),
70
        # TODO: Add other encryption mechanisms tags here
71 72
    ]))

Maxime Buquet's avatar
Maxime Buquet committed
73 74 75 76
    #: Replaces body with `eme <https://xmpp.org/extensions/xep-0380.html>`_
    #: if set. Should be suitable for most plugins except those using <body/>
    #: directly as their encryption container, like OTR, or the example base64
    #: plugin in poezio.
77 78
    replace_body_with_eme = True

Maxime Buquet's avatar
Maxime Buquet committed
79 80
    #: Encryption name, used in command descriptions, and logs. At least one
    #: of `encryption_name` and `encryption_short_name` must be set.
81
    encryption_name = None  # type: Optional[str]
Maxime Buquet's avatar
Maxime Buquet committed
82 83 84 85

    #: Encryption short name, used as command name, and also to display
    #: encryption status in a tab. At least one of `encryption_name` and
    #: `encryption_short_name` must be set.
86 87
    encryption_short_name = None  # type: Optional[str]

Maxime Buquet's avatar
Maxime Buquet committed
88
    #: Required.
89 90
    eme_ns = None  # type: Optional[str]

91 92 93 94
    # Static map, to be able to limit to one encryption mechanism per tab at a
    # time
    _enabled_tabs = {}  # type: Dict[JID, Callable]

95 96 97 98 99 100 101 102 103 104 105 106
    def init(self):
        if self.encryption_name is None and self.encryption_short_name is None:
            raise NotImplementedError

        if self.eme_ns is None:
            raise NotImplementedError

        if self.encryption_name is None:
            self.encryption_name = self.encryption_short_name
        if self.encryption_short_name is None:
            self.encryption_short_name = self.encryption_name

107 108 109 110 111 112 113 114
        # Ensure decryption is done before everything, so that other handlers
        # don't have to know about the encryption mechanism.
        self.api.add_event_handler('muc_msg', self._decrypt, priority=0)
        self.api.add_event_handler('conversation_msg', self._decrypt, priority=0)
        self.api.add_event_handler('private_msg', self._decrypt, priority=0)

        # Ensure encryption is done after everything, so that whatever can be
        # encrypted is encrypted, and no plain element slips in.
115 116 117
        # Using a stream filter might be a bit too much, but at least we're
        # sure poezio is not sneaking anything past us.
        self.core.xmpp.add_filter('out', self._encrypt)
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157

        for tab_t in (DynamicConversationTab, PrivateTab, MucTab):
            self.api.add_tab_command(
                tab_t,
                self.encryption_short_name,
                self._toggle_tab,
                usage='',
                short='Toggle {} encryption for tab.'.format(self.encryption_name),
                help='Toggle automatic {} encryption for tab.'.format(self.encryption_name),
            )

        ConversationTab.add_information_element(
            self.encryption_short_name,
            self._display_encryption_status,
        )
        MucTab.add_information_element(
            self.encryption_short_name,
            self._display_encryption_status,
        )
        PrivateTab.add_information_element(
            self.encryption_short_name,
            self._display_encryption_status,
        )

    def cleanup(self):
        ConversationTab.remove_information_element(self.encryption_short_name)
        MucTab.remove_information_element(self.encryption_short_name)
        PrivateTab.remove_information_element(self.encryption_short_name)

    def _display_encryption_status(self, jid_s: str) -> str:
        """
            Return information to display in the infobar if encryption is
            enabled for the JID.
        """

        try:
            jid = JID(jid_s)
        except InvalidJID:
            return ""

158
        if self._encryption_enabled(jid):
159
            return " " + self.encryption_short_name
160 161 162 163 164
        return ""

    def _toggle_tab(self, _input: str) -> None:
        jid = self.api.current_tab().jid  # type: JID

165 166
        if self._encryption_enabled(jid):
            del self._enabled_tabs[jid]
167 168 169 170 171
            self.api.information(
                '{} encryption disabled for {}'.format(self.encryption_name, jid),
                'Info',
            )
        else:
172
            self._enabled_tabs[jid] = self.encrypt
173 174 175 176 177
            self.api.information(
                '{} encryption enabled for {}'.format(self.encryption_name, jid),
                'Info',
            )

178 179 180
    def _encryption_enabled(self, jid: JID) -> bool:
        return jid in self._enabled_tabs and self._enabled_tabs[jid] == self.encrypt

181
    def _decrypt(self, message: Message, tab: ChatTabs) -> None:
182
        if message.xml.find('{%s}%s' % (EME_NS, EME_TAG)) is None:
183 184 185 186 187 188 189 190 191 192 193 194
            return None

        if message['eme']['namespace'] != self.eme_ns:
            return None

        log.debug('Received %s message: %r', self.encryption_name, message['body'])

        self.decrypt(message, tab)

        log.debug('Decrypted %s message: %r', self.encryption_name, message['body'])
        return None

195
    def _encrypt(self, stanza: StanzaBase) -> Optional[StanzaBase]:
196
        if not isinstance(stanza, Message) or stanza['type'] not in ('chat', 'groupchat'):
197 198 199 200
            return stanza
        message = stanza

        tab = self.api.current_tab()
201
        jid = tab.jid
202
        if not self._encryption_enabled(jid):
203
            return message
204

205
        log.debug('Sending %s message: %r', self.encryption_name, message)
206

207
        has_body = message.xml.find('{%s}%s' % (JCLIENT_NS, 'body')) is not None
208

209 210 211 212 213 214 215 216 217 218 219
        # Drop all messages that don't contain a body if the plugin doesn't do
        # Stanza Encryption
        if not self.stanza_encryption and not has_body:
            log.debug(
                '%s plugin: Dropping message as it contains no body, and is '
                'not doesn\'t do stanza encryption: %r',
                self.encryption_name,
                message,
            )
            return None

220 221
        # Call the enabled encrypt method
        self._enabled_tabs[jid](message, tab)
222

223 224 225 226 227 228 229 230 231 232 233 234 235 236
        if has_body:
            # Only add EME tag if the message has a body.
            # Per discussion in jdev@:
            # The receiving client needs to know the message contains
            # meaningful information or not to display notifications to the
            # user, and not display anything when it's e.g., a chatstate.
            # This does leak the fact that the encrypted payload contains a
            # message.
            message['eme']['namespace'] = self.eme_ns
            message['eme']['name'] = self.encryption_name

            if self.replace_body_with_eme:
                self.core.xmpp['xep_0380'].replace_body_with_eme(message)

237 238 239 240 241
        # Filter stanza with the whitelist. Plugins doing stanza encryption
        # will have to include these in their encrypted container beforehand.
        for elem in message.xml[:]:
            if elem.tag not in self.tag_whitelist:
                message.xml.remove(elem)
242

243
        log.debug('Encrypted %s message: %r', self.encryption_name, message)
244
        return message
245 246

    def decrypt(self, _message: Message, tab: ChatTabs):
Maxime Buquet's avatar
Maxime Buquet committed
247 248 249 250 251 252 253 254 255 256 257
        """Decryption method

        This is a method the plugin must implement.  It is expected that this
        method will edit the received message and return nothing.

        :param message: Message to be decrypted.
        :param tab: Tab the message is coming from.

        :returns: None
        """

258 259 260
        raise NotImplementedError

    def encrypt(self, _message: Message, tab: ChatTabs):
Maxime Buquet's avatar
Maxime Buquet committed
261 262 263 264 265 266 267 268 269 270 271
        """Encryption method

        This is a method the plugin must implement.  It is expected that this
        method will edit the received message and return nothing.

        :param message: Message to be encrypted.
        :param tab: Tab the message is going to.

        :returns: None
        """

272
        raise NotImplementedError