Unverified Commit e2414121 authored by mathieui's avatar mathieui

Add type hints here and there

parent 3cb8e33f
......@@ -30,9 +30,10 @@ Adding a remote bookmark:
import functools
import logging
from typing import Optional, List
from slixmpp.plugins.xep_0048 import Bookmarks, Conference, URL
from slixmpp import JID
from slixmpp.plugins.xep_0048 import Bookmarks, Conference, URL
from poezio.common import safeJID
from poezio.config import config
......@@ -41,11 +42,11 @@ log = logging.getLogger(__name__)
class Bookmark:
def __init__(self,
jid,
name=None,
jid: JID,
name: Optional[str] = None,
autojoin=False,
nick=None,
password=None,
nick: Optional[str] = None,
password: Optional[str] = None,
method='local'):
self.jid = jid
self.name = name or jid
......@@ -55,21 +56,21 @@ class Bookmark:
self._method = method
@property
def method(self):
def method(self) -> str:
return self._method
@method.setter
def method(self, value):
def method(self, value: str):
if value not in ('local', 'remote'):
log.debug('Could not set bookmark storing method: %s', value)
return
self._method = value
def __repr__(self):
def __repr__(self) -> str:
return '<%s%s|%s>' % (self.jid, ('/' + self.nick)
if self.nick else '', self.method)
def stanza(self):
def stanza(self) -> Conference:
"""
Generate a <conference/> stanza from the instance
"""
......@@ -83,7 +84,7 @@ class Bookmark:
el['password'] = self.password
return el
def local(self):
def local(self) -> str:
"""Generate a str for local storage"""
local = self.jid
if self.nick:
......
......@@ -10,19 +10,18 @@ Various useful functions.
from datetime import datetime, timedelta
from pathlib import Path
from slixmpp import JID, InvalidJID
from poezio.poezio_shlex import shlex
from typing import Dict, List, Optional, Tuple, Union
import base64
import os
import mimetypes
import hashlib
import subprocess
import time
import string
from slixmpp import JID, InvalidJID, Message
from poezio.poezio_shlex import shlex
def _get_output_of_command(command):
def _get_output_of_command(command: str) -> Optional[List[str]]:
"""
Runs a command and returns its output.
......@@ -37,7 +36,7 @@ def _get_output_of_command(command):
return None
def _is_in_path(command, return_abs_path=False):
def _is_in_path(command: str, return_abs_path=False) -> Union[bool, str]:
"""
Check if *command* is in the $PATH or not.
......@@ -53,8 +52,7 @@ def _is_in_path(command, return_abs_path=False):
if command in os.listdir(directory):
if return_abs_path:
return os.path.join(directory, command)
else:
return True
return True
except OSError:
# If the user has non directories in his path
pass
......@@ -84,7 +82,7 @@ DISTRO_INFO = {
}
def get_os_info():
def get_os_info() -> str:
"""
Returns a detailed and well formatted string containing
information about the operating system
......@@ -146,7 +144,7 @@ def get_os_info():
return os_info
def _datetime_tuple(timestamp):
def _datetime_tuple(timestamp: str) -> datetime:
"""
Convert a timestamp using strptime and the format: %Y%m%dT%H:%M:%S.
......@@ -172,10 +170,10 @@ def _datetime_tuple(timestamp):
try:
if tz_msg and tz_msg != 'Z':
tz_mod = -1 if tz_msg[0] == '-' else 1
tz_msg = time.strptime(tz_msg[1:], '%H%M')
tz_msg = tz_msg.tm_hour * 3600 + tz_msg.tm_min * 60
tz_msg = timedelta(seconds=tz_mod * tz_msg)
ret -= tz_msg
tz_parsed = time.strptime(tz_msg[1:], '%H%M')
tz_seconds = tz_parsed.tm_hour * 3600 + tz_parsed.tm_min * 60
delta = timedelta(seconds=tz_mod * tz_seconds)
ret -= delta
except ValueError:
pass # ignore if we got a badly-formatted offset
# convert UTC to local time, with DST etc.
......@@ -187,7 +185,7 @@ def _datetime_tuple(timestamp):
return ret
def get_utc_time(local_time=None):
def get_utc_time(local_time: Optional[datetime] = None) -> datetime:
"""
Get the current UTC time
......@@ -210,7 +208,7 @@ def get_utc_time(local_time=None):
return utc_time
def get_local_time(utc_time):
def get_local_time(utc_time: datetime) -> datetime:
"""
Get the local time from an UTC time
"""
......@@ -226,7 +224,7 @@ def get_local_time(utc_time):
return local_time
def find_delayed_tag(message):
def find_delayed_tag(message: Message) -> Tuple[bool, datetime]:
"""
Check if a message is delayed or not.
......@@ -253,7 +251,7 @@ def find_delayed_tag(message):
return (delayed, date)
def shell_split(st):
def shell_split(st: str) -> List[str]:
"""
Split a string correctly according to the quotes
around the elements.
......@@ -276,7 +274,7 @@ def shell_split(st):
return ret
def find_argument(pos, text, quoted=True):
def find_argument(pos: int, text: str, quoted=True) -> int:
"""
Split an input into a list of arguments, return the number of the
argument selected by pos.
......@@ -293,11 +291,10 @@ def find_argument(pos, text, quoted=True):
"""
if quoted:
return _find_argument_quoted(pos, text)
else:
return _find_argument_unquoted(pos, text)
return _find_argument_unquoted(pos, text)
def _find_argument_quoted(pos, text):
def _find_argument_quoted(pos: int, text: str) -> int:
"""
Get the number of the argument at position pos in
a string with possibly quoted text.
......@@ -314,7 +311,7 @@ def _find_argument_quoted(pos, text):
return count + 1
def _find_argument_unquoted(pos, text):
def _find_argument_unquoted(pos: int, text: str) -> int:
"""
Get the number of the argument at position pos in
a string without interpreting quotes.
......@@ -332,7 +329,7 @@ def _find_argument_unquoted(pos, text):
return argnum + 1
def parse_str_to_secs(duration=''):
def parse_str_to_secs(duration='') -> int:
"""
Parse a string of with a number of d, h, m, s.
......@@ -360,7 +357,7 @@ def parse_str_to_secs(duration=''):
return result
def parse_secs_to_str(duration=0):
def parse_secs_to_str(duration=0) -> str:
"""
Do the reverse operation of :py:func:`parse_str_to_secs`.
......@@ -390,7 +387,7 @@ def parse_secs_to_str(duration=0):
return result
def format_tune_string(infos):
def format_tune_string(infos: Dict[str, str]) -> str:
"""
Contruct a string from a dict created from an "User tune" event.
......@@ -417,18 +414,18 @@ def format_tune_string(infos):
rating = infos.get('rating')
if rating:
elems.append('[ ' + rating + '/10 ]')
length = infos.get('length')
if length:
length = int(length)
length_str = infos.get('length')
if length_str:
length = int(length_str)
secs = length % 60
mins = length // 60
secs = str(secs).zfill(2)
mins = str(mins).zfill(2)
elems.append('[' + mins + ':' + secs + ']')
secs_str = str(secs).zfill(2)
mins_str = str(mins).zfill(2)
elems.append('[' + mins_str + ':' + secs_str + ']')
return ' '.join(elems)
def format_gaming_string(infos):
def format_gaming_string(infos: Dict[str, str]) -> str:
"""
Construct a string from a dict containing "user gaming" information.
(for now, only use address and name)
......@@ -447,7 +444,7 @@ def format_gaming_string(infos):
return name
def safeJID(*args, **kwargs):
def safeJID(*args, **kwargs) -> JID:
"""
Construct a :py:class:`slixmpp.JID` object from a string.
......
......@@ -10,8 +10,6 @@ TODO: get http://bugs.python.org/issue1410680 fixed, one day, in order
to remove our ugly custom I/O methods.
"""
DEFSECTION = "Poezio"
import logging.config
import os
import stat
......@@ -19,12 +17,17 @@ import sys
import pkg_resources
from configparser import RawConfigParser, NoOptionError, NoSectionError
from os import remove
from shutil import copy2
from pathlib import Path
from shutil import copy2
from typing import Callable, Dict, List, Optional, Union, Tuple
from poezio.args import parse_args
from poezio import xdg
ConfigValue = Union[str, int, float, bool]
DEFSECTION = "Poezio"
DEFAULT_CONFIG = {
'Poezio': {
'ack_message_receipts': True,
......@@ -159,7 +162,7 @@ class Config(RawConfigParser):
load/save the config to a file
"""
def __init__(self, file_name, default=None):
def __init__(self, file_name: Path, default=None) -> None:
RawConfigParser.__init__(self, None)
# make the options case sensitive
self.optionxform = str
......@@ -176,7 +179,10 @@ class Config(RawConfigParser):
if not self.has_section(section):
self.add_section(section)
def get(self, option, default=None, section=DEFSECTION):
def get(self,
option: str,
default: Optional[ConfigValue] = None,
section=DEFSECTION) -> ConfigValue:
"""
get a value from the config but return
a default value if it is not found
......@@ -190,12 +196,12 @@ class Config(RawConfigParser):
default = ''
try:
if type(default) == int:
if isinstance(default, bool):
res = self.getboolean(option, section)
elif isinstance(default, int):
res = self.getint(option, section)
elif type(default) == float:
elif isinstance(default, float):
res = self.getfloat(option, section)
elif type(default) == bool:
res = self.getboolean(option, section)
else:
res = self.getstr(option, section)
except (NoOptionError, NoSectionError, ValueError, AttributeError):
......@@ -279,7 +285,8 @@ class Config(RawConfigParser):
"""
return RawConfigParser.getboolean(self, section, option)
def write_in_file(self, section, option, value):
def write_in_file(self, section: str, option: str,
value: ConfigValue) -> bool:
"""
Our own way to save write the value in the file
Just find the right section, and then find the
......@@ -305,7 +312,7 @@ class Config(RawConfigParser):
return self._write_file(result_lines)
def remove_in_file(self, section, option):
def remove_in_file(self, section: str, option: str) -> bool:
"""
Our own way to remove an option from the file.
"""
......@@ -334,7 +341,7 @@ class Config(RawConfigParser):
return self._write_file(result_lines)
def _write_file(self, lines):
def _write_file(self, lines: List[str]) -> bool:
"""
Write the config file, write to a temporary file
before copying it to the final destination
......@@ -360,7 +367,7 @@ class Config(RawConfigParser):
success = True
return success
def _parse_file(self):
def _parse_file(self) -> Optional[Tuple[Dict[str, List[int]], List[str]]]:
"""
Parse the config file and return the list of sections with
their start and end positions, and the lines in the file.
......@@ -372,17 +379,18 @@ class Config(RawConfigParser):
if file_ok(self.file_name):
try:
with self.file_name.open('r', encoding='utf-8') as df:
lines_before = [line.strip() for line in df]
lines_before = [line.strip()
for line in df] # type: List[str]
except OSError:
log.error(
'Unable to read the config file %s',
self.file_name,
exc_info=True)
return tuple()
return None
else:
lines_before = []
sections = {}
sections = {} # type: Dict[str, List[int]]
duplicate_section = False
current_section = ''
current_line = 0
......@@ -408,7 +416,8 @@ class Config(RawConfigParser):
return (sections, lines_before)
def set_and_save(self, option, value, section=DEFSECTION):
def set_and_save(self, option: str, value: ConfigValue,
section=DEFSECTION) -> Tuple[str, str]:
"""
set the value in the configuration then save it
to the file
......@@ -439,7 +448,8 @@ class Config(RawConfigParser):
return ('Unable to write in the config file', 'Error')
return ("%s=%s" % (option, value), 'Info')
def remove_and_save(self, option, section=DEFSECTION):
def remove_and_save(self, option: str,
section=DEFSECTION) -> Tuple[str, str]:
"""
Remove an option and then save it the config file
"""
......@@ -449,7 +459,7 @@ class Config(RawConfigParser):
return ('Unable to save the config file', 'Error')
return ('Option %s deleted' % option, 'Info')
def silent_set(self, option, value, section=DEFSECTION):
def silent_set(self, option: str, value: ConfigValue, section=DEFSECTION):
"""
Set a value, save, and return True on success and False on failure
"""
......@@ -460,7 +470,7 @@ class Config(RawConfigParser):
RawConfigParser.set(self, section, option, value)
return self.write_in_file(section, option, value)
def set(self, option, value, section=DEFSECTION):
def set(self, option: str, value: ConfigValue, section=DEFSECTION):
"""
Set the value of an option temporarily
"""
......@@ -469,11 +479,11 @@ class Config(RawConfigParser):
except NoSectionError:
pass
def to_dict(self):
def to_dict(self) -> Dict[str, Dict[str, ConfigValue]]:
"""
Returns a dict of the form {section: {option: value, option: value}, …}
"""
res = {}
res = {} # Dict[str, Dict[str, ConfigValue]]
for section in self.sections():
res[section] = {}
for option in self.options(section):
......@@ -481,7 +491,7 @@ class Config(RawConfigParser):
return res
def find_line(lines, start, end, option):
def find_line(lines: List[str], start: int, end: int, option: str) -> int:
"""
Get the number of the line containing the option in the
relevant part of the config file.
......@@ -497,7 +507,7 @@ def find_line(lines, start, end, option):
return -1
def file_ok(filepath):
def file_ok(filepath: Path) -> bool:
"""
Returns True if the file exists and is readable and writeable,
False otherwise.
......@@ -507,7 +517,7 @@ def file_ok(filepath):
return bool(val)
def get_image_cache():
def get_image_cache() -> Path:
if not config.get('extract_inline_images'):
return None
tmp_dir = config.get('tmp_image_dir')
......@@ -664,16 +674,16 @@ LOGGING_CONFIG = {
firstrun = False
# Global config object. Is setup in poezio.py
config = None
config = None # type: Optional[Config]
# The logger object for this module
log = None
log = None # type: Optional[logging.Logger]
# The command-line options
options = None
# delayed import from common.py
safeJID = None
safeJID = None # type: Optional[Callable]
# the global log dir
LOG_DIR = ''
......@@ -9,11 +9,14 @@ Defines the Resource and Contact classes, which are used in
the roster.
"""
from collections import defaultdict
import logging
log = logging.getLogger(__name__)
from typing import Dict, Iterator, List, Optional, Union
from poezio.common import safeJID
from collections import defaultdict
from slixmpp import JID
log = logging.getLogger(__name__)
class Resource:
......@@ -26,29 +29,30 @@ class Resource:
"""
data: the dict to use as a source
"""
self._jid = jid # Full jid
self._data = data
# Full JID
self._jid = jid # type: str
self._data = data # type: Dict[str, Union[str, int]]
@property
def jid(self):
def jid(self) -> str:
return self._jid
@property
def priority(self):
def priority(self) -> int:
return self._data.get('priority') or 0
@property
def presence(self):
def presence(self) -> str:
return self._data.get('show') or ''
@property
def status(self):
def status(self) -> str:
return self._data.get('status') or ''
def __repr__(self):
def __repr__(self) -> str:
return '<%s>' % self._jid
def __eq__(self, value):
def __eq__(self, value: object) -> bool:
if not isinstance(value, Resource):
return False
return self.jid == value.jid and self._data == value._data
......@@ -66,22 +70,22 @@ class Contact:
item: a slixmpp RosterItem pointing to that contact
"""
self.__item = item
self.folded_states = defaultdict(lambda: True)
self.folded_states = defaultdict(lambda: True) # type: Dict[str, bool]
self._name = ''
self.avatar = None
self.error = None
self.tune = {}
self.gaming = {}
self.tune = {} # type: Dict[str, str]
self.gaming = {} # type: Dict[str, str]
self.mood = ''
self.activity = ''
@property
def groups(self):
def groups(self) -> List[str]:
"""Name of the groups the contact is in"""
return self.__item['groups'] or ['none']
@property
def bare_jid(self):
def bare_jid(self) -> JID:
"""The bare jid of the contact"""
return self.__item.jid
......@@ -119,29 +123,29 @@ class Contact:
self.__item['pending_out'] = value
@property
def resources(self):
def resources(self) -> Iterator[Resource]:
"""List of the available resources as Resource objects"""
return (Resource('%s%s' % (self.bare_jid, ('/' + key)
if key else ''), self.__item.resources[key])
for key in self.__item.resources.keys())
@property
def subscription(self):
def subscription(self) -> str:
return self.__item['subscription']
def __contains__(self, value):
return value in self.__item.resources or safeJID(
value).resource in self.__item.resources
def __len__(self):
def __len__(self) -> int:
"""Number of resources"""
return len(self.__item.resources)
def __bool__(self):
"""This contacts exists even when he has no resources"""
def __bool__(self) -> bool:
"""This contact exists even when he has no resources"""
return True
def __getitem__(self, key):
def __getitem__(self, key) -> Optional[Resource]:
"""Return the corresponding Resource object, or None"""
res = safeJID(key).resource
resources = self.__item.resources
......@@ -164,23 +168,24 @@ class Contact:
"""Unsubscribe from this JID"""
self.__item.unsubscribe()
def get(self, key, default=None):
def get(self, key: str,
default: Optional[Resource] = None) -> Optional[Resource]:
"""Same as __getitem__, but with a configurable default"""
return self[key] or default
def get_resources(self):
def get_resources(self) -> List[Resource]:
"""Return all resources, sorted by priority """
compare_resources = lambda x: x.priority
return sorted(self.resources, key=compare_resources, reverse=True)
def get_highest_priority_resource(self):
def get_highest_priority_resource(self) -> Optional[Resource]:
"""Return the resource with the highest priority"""
resources = self.get_resources()
if resources:
return resources[0]
return None
def folded(self, group_name='none'):
def folded(self, group_name='none') -> bool:
"""
Return the Folded state of a contact for this group
"""
......@@ -192,7 +197,7 @@ class Contact:
"""
self.folded_states[group] = not self.folded_states[group]
def __repr__(self):
def __repr__(self) -> str:
ret = '<Contact: %s' % self.bare_jid
for resource in self.resources:
ret += '\n\t\t%s' % resource
......
......@@ -9,6 +9,8 @@ The list of available events is here:
http://poezio.eu/doc/en/plugins.html#_poezio_events
"""
from typing import Callable, Dict, List
class EventHandler:
"""
......@@ -44,9 +46,10 @@ class EventHandler:
'send_normal_presence': [],
'ignored_private': [],
'tab_change': [],
}
} # type: Dict[str, List[Callable]]
def add_event_handler(self, name, callback, position=0):