Commit 62e66e7d authored by mathieui's avatar mathieui

stanzabase: types

parent 79f71ec0
# slixmpp.xmlstream.stanzabase
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# module implements a wrapper layer for XML objects
......@@ -11,13 +10,34 @@ from __future__ import annotations
import copy
import logging
import weakref
from typing import Optional
from typing import (
cast,
Any,
Callable,
ClassVar,
Coroutine,
Dict,
List,
Iterable,
Optional,
Set,
Tuple,
Type,
TYPE_CHECKING,
Union,
)
from weakref import ReferenceType
from xml.etree import ElementTree as ET
from slixmpp.types import JidStr
from slixmpp.xmlstream import JID
from slixmpp.xmlstream.tostring import tostring
if TYPE_CHECKING:
from slixmpp.xmlstream import XMLStream
log = logging.getLogger(__name__)
......@@ -28,7 +48,8 @@ XML_TYPE = type(ET.Element('xml'))
XML_NS = 'http://www.w3.org/XML/1998/namespace'
def register_stanza_plugin(stanza, plugin, iterable=False, overrides=False):
def register_stanza_plugin(stanza: Type[ElementBase], plugin: Type[ElementBase],
iterable: bool = False, overrides: bool = False) -> None:
"""
Associate a stanza object as a plugin for another stanza.
......@@ -85,15 +106,15 @@ def register_stanza_plugin(stanza, plugin, iterable=False, overrides=False):
stanza.plugin_overrides[interface] = plugin.plugin_attrib
def multifactory(stanza, plugin_attrib):
def multifactory(stanza: Type[ElementBase], plugin_attrib: str) -> Type[ElementBase]:
"""
Returns a ElementBase class for handling reoccuring child stanzas
"""
def plugin_filter(self):
def plugin_filter(self: Multi) -> Callable[..., bool]:
return lambda x: isinstance(x, self._multistanza)
def plugin_lang_filter(self, lang):
def plugin_lang_filter(self: Multi, lang: Optional[str]) -> Callable[..., bool]:
return lambda x: isinstance(x, self._multistanza) and \
x['lang'] == lang
......@@ -101,31 +122,41 @@ def multifactory(stanza, plugin_attrib):
"""
Template class for multifactory
"""
def setup(self, xml=None):
_multistanza: Type[ElementBase]
def setup(self, xml: Optional[ET.Element] = None) -> bool:
self.xml = ET.Element('')
return False
def get_multi(self, lang=None):
parent = self.parent()
def get_multi(self: Multi, lang: Optional[str] = None) -> List[ElementBase]:
parent = fail_without_parent(self)
if not lang or lang == '*':
res = filter(plugin_filter(self), parent)
else:
res = filter(plugin_filter(self, lang), parent)
res = filter(plugin_lang_filter(self, lang), parent)
return list(res)
def set_multi(self, val, lang=None):
parent = self.parent()
def set_multi(self: Multi, val: Iterable[ElementBase], lang: Optional[str] = None) -> None:
parent = fail_without_parent(self)
del_multi = getattr(self, 'del_%s' % plugin_attrib)
del_multi(lang)
for sub in val:
parent.append(sub)
def del_multi(self, lang=None):
parent = self.parent()
def fail_without_parent(self: Multi) -> ElementBase:
parent = None
if self.parent:
parent = self.parent()
if not parent:
raise ValueError('No stanza parent for multifactory')
return parent
def del_multi(self: Multi, lang: Optional[str] = None) -> None:
parent = fail_without_parent(self)
if not lang or lang == '*':
res = filter(plugin_filter(self), parent)
res = list(filter(plugin_filter(self), parent))
else:
res = filter(plugin_filter(self, lang), parent)
res = list(res)
res = list(filter(plugin_lang_filter(self, lang), parent))
if not res:
del parent.plugins[(plugin_attrib, None)]
parent.loaded_plugins.remove(plugin_attrib)
......@@ -149,7 +180,8 @@ def multifactory(stanza, plugin_attrib):
return Multi
def fix_ns(xpath, split=False, propagate_ns=True, default_ns=''):
def fix_ns(xpath: str, split: bool = False, propagate_ns: bool = True,
default_ns: str = '') -> Union[str, List[str]]:
"""Apply the stanza's namespace to elements in an XPath expression.
:param string xpath: The XPath expression to fix with namespaces.
......@@ -275,12 +307,12 @@ class ElementBase(object):
#: The XML tag name of the element, not including any namespace
#: prefixes. For example, an :class:`ElementBase` object for
#: ``<message />`` would use ``name = 'message'``.
name = 'stanza'
name: ClassVar[str] = 'stanza'
#: The XML namespace for the element. Given ``<foo xmlns="bar" />``,
#: then ``namespace = "bar"`` should be used. The default namespace
#: is ``jabber:client`` since this is being used in an XMPP library.
namespace = 'jabber:client'
namespace: str = 'jabber:client'
#: For :class:`ElementBase` subclasses which are intended to be used
#: as plugins, the ``plugin_attrib`` value defines the plugin name.
......@@ -290,7 +322,7 @@ class ElementBase(object):
#: register_stanza_plugin(Message, FooPlugin)
#: msg = Message()
#: msg['foo']['an_interface_from_the_foo_plugin']
plugin_attrib = 'plugin'
plugin_attrib: ClassVar[str] = 'plugin'
#: For :class:`ElementBase` subclasses that are intended to be an
#: iterable group of items, the ``plugin_multi_attrib`` value defines
......@@ -300,29 +332,29 @@ class ElementBase(object):
#: # Given stanza class Foo, with plugin_multi_attrib = 'foos'
#: parent['foos']
#: filter(isinstance(item, Foo), parent['substanzas'])
plugin_multi_attrib = ''
plugin_multi_attrib: ClassVar[str] = ''
#: The set of keys that the stanza provides for accessing and
#: manipulating the underlying XML object. This set may be augmented
#: with the :attr:`plugin_attrib` value of any registered
#: stanza plugins.
interfaces = {'type', 'to', 'from', 'id', 'payload'}
interfaces: ClassVar[Set[str]] = {'type', 'to', 'from', 'id', 'payload'}
#: A subset of :attr:`interfaces` which maps interfaces to direct
#: subelements of the underlying XML object. Using this set, the text
#: of these subelements may be set, retrieved, or removed without
#: needing to define custom methods.
sub_interfaces = set()
sub_interfaces: ClassVar[Set[str]] = set()
#: A subset of :attr:`interfaces` which maps the presence of
#: subelements to boolean values. Using this set allows for quickly
#: checking for the existence of empty subelements like ``<required />``.
#:
#: .. versionadded:: 1.1
bool_interfaces = set()
bool_interfaces: ClassVar[Set[str]] = set()
#: .. versionadded:: 1.1.2
lang_interfaces = set()
lang_interfaces: ClassVar[Set[str]] = set()
#: In some cases you may wish to override the behaviour of one of the
#: parent stanza's interfaces. The ``overrides`` list specifies the
......@@ -336,7 +368,7 @@ class ElementBase(object):
#: be affected.
#:
#: .. versionadded:: 1.0-Beta5
overrides = []
overrides: ClassVar[List[str]] = []
#: If you need to add a new interface to an existing stanza, you
#: can create a plugin and set ``is_extension = True``. Be sure
......@@ -346,7 +378,7 @@ class ElementBase(object):
#: parent stanza will be passed to the plugin directly.
#:
#: .. versionadded:: 1.0-Beta5
is_extension = False
is_extension: ClassVar[bool] = False
#: A map of interface operations to the overriding functions.
#: For example, after overriding the ``set`` operation for
......@@ -355,15 +387,15 @@ class ElementBase(object):
#: {'set_body': <some function>}
#:
#: .. versionadded: 1.0-Beta5
plugin_overrides = {}
plugin_overrides: ClassVar[Dict[str, str]] = {}
#: A mapping of the :attr:`plugin_attrib` values of registered
#: plugins to their respective classes.
plugin_attrib_map = {}
plugin_attrib_map: ClassVar[Dict[str, Type[ElementBase]]] = {}
#: A mapping of root element tag names (in ``'{namespace}elementname'``
#: format) to the plugin classes responsible for them.
plugin_tag_map = {}
plugin_tag_map: ClassVar[Dict[str, Type[ElementBase]]] = {}
#: The set of stanza classes that can be iterated over using
#: the 'substanzas' interface. Classes are added to this set
......@@ -372,17 +404,26 @@ class ElementBase(object):
#: register_stanza_plugin(DiscoInfo, DiscoItem, iterable=True)
#:
#: .. versionadded:: 1.0-Beta5
plugin_iterables = set()
plugin_iterables: ClassVar[Set[Type[ElementBase]]] = set()
#: The default XML namespace: ``http://www.w3.org/XML/1998/namespace``.
xml_ns = XML_NS
def __init__(self, xml=None, parent=None):
xml_ns: ClassVar[str] = XML_NS
plugins: Dict[Tuple[str, Optional[str]], ElementBase]
#: The underlying XML object for the stanza. It is a standard
#: :class:`xml.etree.ElementTree` object.
xml: ET.Element
_index: int
loaded_plugins: Set[str]
iterables: List[ElementBase]
tag: str
parent: Optional[ReferenceType[ElementBase]]
def __init__(self, xml: Optional[ET.Element] = None, parent: Union[Optional[ElementBase], ReferenceType[ElementBase]] = None):
self._index = 0
#: The underlying XML object for the stanza. It is a standard
#: :class:`xml.etree.ElementTree` object.
self.xml = xml
if xml is not None:
self.xml = xml
#: An ordered dictionary of plugin stanzas, mapped by their
#: :attr:`plugin_attrib` value.
......@@ -419,7 +460,7 @@ class ElementBase(object):
existing_xml=child,
reuse=False)
def setup(self, xml=None):
def setup(self, xml: Optional[ET.Element] = None) -> bool:
"""Initialize the stanza's XML contents.
Will return ``True`` if XML was generated according to the stanza's
......@@ -429,29 +470,31 @@ class ElementBase(object):
:param xml: An existing XML object to use for the stanza's content
instead of generating new XML.
"""
if self.xml is None:
if hasattr(self, 'xml'):
return False
if not hasattr(self, 'xml') and xml is not None:
self.xml = xml
return False
last_xml = self.xml
if self.xml is None:
# Generate XML from the stanza definition
for ename in self.name.split('/'):
new = ET.Element("{%s}%s" % (self.namespace, ename))
if self.xml is None:
self.xml = new
else:
last_xml.append(new)
last_xml = new
if self.parent is not None:
self.parent().xml.append(self.xml)
# We had to generate XML
return True
else:
# We did not generate XML
return False
# Generate XML from the stanza definition
last_xml = ET.Element('')
for ename in self.name.split('/'):
new = ET.Element("{%s}%s" % (self.namespace, ename))
if not hasattr(self, 'xml'):
self.xml = new
else:
last_xml.append(new)
last_xml = new
if self.parent is not None:
parent = self.parent()
if parent:
parent.xml.append(self.xml)
# We had to generate XML
return True
def enable(self, attrib, lang=None):
def enable(self, attrib: str, lang: Optional[str] = None) -> ElementBase:
"""Enable and initialize a stanza plugin.
Alias for :meth:`init_plugin`.
......@@ -487,7 +530,10 @@ class ElementBase(object):
else:
return None if check else self.init_plugin(name, lang)
def init_plugin(self, attrib, lang=None, existing_xml=None, element=None, reuse=True):
def init_plugin(self, attrib: str, lang: Optional[str] = None,
existing_xml: Optional[ET.Element] = None,
reuse: bool = True,
element: Optional[ElementBase] = None) -> ElementBase:
"""Enable and initialize a stanza plugin.
:param string attrib: The :attr:`plugin_attrib` value of the
......@@ -525,7 +571,7 @@ class ElementBase(object):
return plugin
def _get_stanza_values(self):
def _get_stanza_values(self) -> Dict[str, Any]:
"""Return A JSON/dictionary version of the XML content
exposed through the stanza's interfaces::
......@@ -567,7 +613,7 @@ class ElementBase(object):
values['substanzas'] = iterables
return values
def _set_stanza_values(self, values):
def _set_stanza_values(self, values: Dict[str, Any]) -> ElementBase:
"""Set multiple stanza interface values using a dictionary.
Stanza plugin values may be set using nested dictionaries.
......@@ -623,7 +669,7 @@ class ElementBase(object):
plugin.values = value
return self
def __getitem__(self, full_attrib):
def __getitem__(self, full_attrib: str) -> Any:
"""Return the value of a stanza interface using dict-like syntax.
Example::
......@@ -688,7 +734,7 @@ class ElementBase(object):
else:
return ''
def __setitem__(self, attrib, value):
def __setitem__(self, attrib: str, value: Any) -> Any:
"""Set the value of a stanza interface using dictionary-like syntax.
Example::
......@@ -773,7 +819,7 @@ class ElementBase(object):
plugin[full_attrib] = value
return self
def __delitem__(self, attrib):
def __delitem__(self, attrib: str) -> Any:
"""Delete the value of a stanza interface using dict-like syntax.
Example::
......@@ -851,7 +897,7 @@ class ElementBase(object):
pass
return self
def _set_attr(self, name, value):
def _set_attr(self, name: str, value: Optional[JidStr]) -> None:
"""Set the value of a top level attribute of the XML object.
If the new value is None or an empty string, then the attribute will
......@@ -868,7 +914,7 @@ class ElementBase(object):
value = str(value)
self.xml.attrib[name] = value
def _del_attr(self, name):
def _del_attr(self, name: str) -> None:
"""Remove a top level attribute of the XML object.
:param name: The name of the attribute.
......@@ -876,7 +922,7 @@ class ElementBase(object):
if name in self.xml.attrib:
del self.xml.attrib[name]
def _get_attr(self, name, default=''):
def _get_attr(self, name: str, default: str = '') -> str:
"""Return the value of a top level attribute of the XML object.
In case the attribute has not been set, a default value can be
......@@ -889,7 +935,8 @@ class ElementBase(object):
"""
return self.xml.attrib.get(name, default)
def _get_sub_text(self, name, default='', lang=None):
def _get_sub_text(self, name: str, default: str = '',
lang: Optional[str] = None) -> Union[str, Dict[str, str]]:
"""Return the text contents of a sub element.
In case the element does not exist, or it has no textual content,
......@@ -900,7 +947,7 @@ class ElementBase(object):
:param default: Optional default to return if the element does
not exists. An empty string is returned otherwise.
"""
name = self._fix_ns(name)
name = cast(str, self._fix_ns(name))
if lang == '*':
return self._get_all_sub_text(name, default, None)
......@@ -924,8 +971,9 @@ class ElementBase(object):
return result
return default
def _get_all_sub_text(self, name, default='', lang=None):
name = self._fix_ns(name)
def _get_all_sub_text(self, name: str, default: str = '',
lang: Optional[str] = None) -> Dict[str, str]:
name = cast(str, self._fix_ns(name))
default_lang = self.get_lang()
results = {}
......@@ -935,10 +983,16 @@ class ElementBase(object):
stanza_lang = stanza.attrib.get('{%s}lang' % XML_NS,
default_lang)
if not lang or lang == '*' or stanza_lang == lang:
results[stanza_lang] = stanza.text
if stanza.text is None:
text = default
else:
text = stanza.text
results[stanza_lang] = text
return results
def _set_sub_text(self, name, text=None, keep=False, lang=None):
def _set_sub_text(self, name: str, text: Optional[str] = None,
keep: bool = False,
lang: Optional[str] = None) -> Optional[ET.Element]:
"""Set the text contents of a sub element.
In case the element does not exist, a element will be created,
......@@ -959,15 +1013,16 @@ class ElementBase(object):
lang = default_lang
if not text and not keep:
return self._del_sub(name, lang=lang)
self._del_sub(name, lang=lang)
return None
path = self._fix_ns(name, split=True)
path = cast(List[str], self._fix_ns(name, split=True))
name = path[-1]
parent = self.xml
parent: Optional[ET.Element] = self.xml
# The first goal is to find the parent of the subelement, or, if
# we can't find that, the closest grandparent element.
missing_path = []
missing_path: List[str] = []
search_order = path[:-1]
while search_order:
parent = self.xml.find('/'.join(search_order))
......@@ -1008,15 +1063,17 @@ class ElementBase(object):
parent.append(element)
return element
def _set_all_sub_text(self, name, values, keep=False, lang=None):
self._del_sub(name, lang)
def _set_all_sub_text(self, name: str, values: Dict[str, str],
keep: bool = False,
lang: Optional[str] = None) -> None:
self._del_sub(name, lang=lang)
for value_lang, value in values.items():
if not lang or lang == '*' or value_lang == lang:
self._set_sub_text(name, text=value,
keep=keep,
lang=value_lang)
def _del_sub(self, name, all=False, lang=None):
def _del_sub(self, name: str, all: bool = False, lang: Optional[str] = None) -> None:
"""Remove sub elements that match the given name or XPath.
If the element is in a path, then any parent elements that become
......@@ -1034,11 +1091,11 @@ class ElementBase(object):
if not lang:
lang = default_lang
parent = self.xml
parent: Optional[ET.Element] = self.xml
for level, _ in enumerate(path):
# Generate the paths to the target elements and their parent.
element_path = "/".join(path[:len(path) - level])
parent_path = "/".join(path[:len(path) - level - 1])
parent_path: Optional[str] = "/".join(path[:len(path) - level - 1])
elements = self.xml.findall(element_path)
if parent_path == '':
......@@ -1061,7 +1118,7 @@ class ElementBase(object):
# after deleting the first level of elements.
return
def match(self, xpath):
def match(self, xpath: Union[str, List[str]]) -> bool:
"""Compare a stanza object with an XPath-like expression.
If the XPath matches the contents of the stanza object, the match
......@@ -1127,7 +1184,7 @@ class ElementBase(object):
# Everything matched.
return True
def get(self, key, default=None):
def get(self, key: str, default: Optional[Any] = None) -> Any:
"""Return the value of a stanza interface.
If the found value is None or an empty string, return the supplied
......@@ -1144,7 +1201,7 @@ class ElementBase(object):
return default
return value
def keys(self):
def keys(self) -> List[str]:
"""Return the names of all stanza interfaces provided by the
stanza object.
......@@ -1158,7 +1215,7 @@ class ElementBase(object):
out.append('substanzas')
return out
def append(self, item):
def append(self, item: Union[ET.Element, ElementBase]) -> ElementBase:
"""Append either an XML object or a substanza to this stanza object.
If a substanza object is appended, it will be added to the list
......@@ -1189,7 +1246,7 @@ class ElementBase(object):
return self
def appendxml(self, xml):
def appendxml(self, xml: ET.Element) -> ElementBase:
"""Append an XML object to the stanza's XML.
The added XML will not be included in the list of
......@@ -1200,7 +1257,7 @@ class ElementBase(object):
self.xml.append(xml)
return self
def pop(self, index=0):
def pop(self, index: int = 0) -> ElementBase:
"""Remove and return the last substanza in the list of
iterable substanzas.
......@@ -1212,11 +1269,11 @@ class ElementBase(object):
self.xml.remove(substanza.xml)
return substanza
def next(self):
def next(self) -> ElementBase:
"""Return the next iterable substanza."""
return self.__next__()
def clear(self):
def clear(self) -> ElementBase:
"""Remove all XML element contents and plugins.
Any attribute values will be preserved.
......@@ -1229,7 +1286,7 @@ class ElementBase(object):
return self
@classmethod
def tag_name(cls):
def tag_name(cls) -> str:
"""Return the namespaced name of the stanza's root element.
The format for the tag name is::
......@@ -1241,29 +1298,32 @@ class ElementBase(object):
"""
return "{%s}%s" % (cls.namespace, cls.name)
def get_lang(self, lang=None):
def get_lang(self, lang: Optional[str] = None) -> str:
result = self.xml.attrib.get('{%s}lang' % XML_NS, '')
if not result and self.parent and self.parent():
return self.parent()['lang']
if not result and self.parent:
parent = self.parent()
if parent:
return cast(str, parent['lang'])
return result
def set_lang(self, lang):
def set_lang(self, lang: Optional[str]) -> None:
self.del_lang()
attr = '{%s}lang' % XML_NS
if lang:
self.xml.attrib[attr] = lang
def del_lang(self):
def del_lang(self) -> None:
attr = '{%s}lang' % XML_NS
if attr in self.xml.attrib:
del self.xml.attrib[attr]
def _fix_ns(self, xpath, split=False, propagate_ns=True):
def _fix_ns(self, xpath: str, split: bool = False,
propagate_ns: bool = True) -> Union[str, List[str]]:
return fix_ns(xpath, split=split,
propagate_ns=propagate_ns,
default_ns=self.namespace)
def __eq__(self, other):
def __eq__(self, other: Any) -> bool:
"""Compare the stanza object with another to test for equality.
Stanzas are equal if their interfaces return the same values,
......@@ -1290,7 +1350,7 @@ class ElementBase(object):
# must be equal.
return True
def __ne__(self, other):
def __ne__(self, other: Any) -> bool:
"""Compare the stanza object with another to test for inequality.
Stanzas are not equal if their interfaces return different values,
......@@ -1300,16 +1360,16 @@ class ElementBase(object):
"""
return not self.__eq__(other)
def __bool__(self):
def __bool__(self) -> bool:
"""Stanza objects should be treated as True in boolean contexts.
"""
return True
def __len__(self):
def __len__(self) -> int:
"""Return the number of iterable substanzas in this stanza."""