Commit dafe5b9e authored by mathieui's avatar mathieui

fix: logger: improve typing, globals and tests

parent f56811d5
Pipeline #3849 passed with stages
in 6 minutes and 44 seconds
......@@ -13,19 +13,17 @@ import mmap
import re
from typing import List, Dict, Optional, IO, Any, Union
from datetime import datetime
from pathlib import Path
from poezio import common
from poezio.config import config
from poezio.xhtml import clean_text
from poezio.theming import dump_tuple, get_theme
from poezio.ui.types import Message, BaseMessage, LoggableTrait
import logging
log = logging.getLogger(__name__)
from poezio.config import LOG_DIR as log_dir
MESSAGE_LOG_RE = re.compile(r'^MR (\d{4})(\d{2})(\d{2})T'
r'(\d{2}):(\d{2}):(\d{2})Z '
r'(\d+) <([^ ]+)>  (.*)$')
......@@ -35,8 +33,13 @@ INFO_LOG_RE = re.compile(r'^MI (\d{4})(\d{2})(\d{2})T'
class LogItem:
def __init__(self, year, month, day, hour, minute, second, nb_lines,
time: datetime
nb_lines: int
text: str
def __init__(self, year: str, month: str, day: str, hour: str, minute: str,
second: str, nb_lines: str,
message: str):
self.time = datetime(
int(year), int(month), int(day), int(hour), int(minute),
......@@ -50,14 +53,23 @@ class LogInfo(LogItem):
class LogMessage(LogItem):
def __init__(self, year, month, day, hour, minute, seconds, nb_lines, nick,
nick: str
def __init__(self, year: str, month: str, day: str, hour: str, minute: str,
seconds: str, nb_lines: str, nick: str,
message: str):
LogItem.__init__(self, year, month, day, hour, minute, seconds,
nb_lines, message)
self.nick = nick
def parse_log_line(msg: str, jid: str) -> Optional[LogItem]:
"""Parse a log line.
:param msg: The message ligne
:param jid: jid (for error logging)
:returns: The LogItem or None on error
match = re.match(MESSAGE_LOG_RE, msg)
if match:
return LogMessage(*match.groups())
......@@ -73,60 +85,83 @@ class Logger:
Appends things to files. Error/information/warning logs
and also log the conversations to logfiles
_roster_logfile: Optional[IO[str]]
log_dir: Path
_fds: Dict[str, IO[str]]
def __init__(self):
self._roster_logfile = None # Optional[IO[Any]]
self.log_dir = Path()
self._roster_logfile = None
# a dict of 'groupchatname': file-object (opened)
self._fds: Dict[str, IO[Any]] = {}
self._fds = {}
def __del__(self):
"""Close all fds on exit"""
for opened_file in self._fds.values():
if opened_file:
except: # Can't close? too bad
except Exception: # Can't close? too bad
except Exception:
def close(self, jid) -> None:
def close(self, jid: str) -> None:
"""Close the log file for a JID."""
jid = str(jid).replace('/', '\\')
if jid in self._fds:
log.debug('Log file for %s closed.', jid)
del self._fds[jid]
return None
def reload_all(self) -> None:
"""Close and reload all the file handles (on SIGHUP)"""
for opened_file in self._fds.values():
not_closed = set()
for key, opened_file in self._fds.items():
if opened_file:
except Exception:
if self._roster_logfile:
except Exception:
log.debug('All log file handles closed')
if not_closed:
log.error('Unable to close log files for: %s', not_closed)
for room in self._fds:
log.debug('Log handle for %s re-created', room)
return None
def _check_and_create_log_dir(self, room: str,
open_fd: bool = True) -> Optional[IO[Any]]:
def _check_and_create_log_dir(self, jid: str,
open_fd: bool = True) -> Optional[IO[str]]:
Check that the directory where we want to log the messages
exists. if not, create it
:param jid: JID of the file to open after creating the dir
:param open_fd: if the file should be opened after creating the dir
:returns: the opened fd or None
if not config.get_by_tabname('use_log', room):
if not config.get_by_tabname('use_log', jid):
return None
log_dir.mkdir(parents=True, exist_ok=True)
except OSError as e:
self.log_dir.mkdir(parents=True, exist_ok=True)
except OSError:
log.error('Unable to create the log dir', exc_info=True)
except Exception:
log.error('Unable to create the log dir', exc_info=True)
return None
if not open_fd:
return None
filename = log_dir / room
filename = self.log_dir / jid
fd ='a', encoding='utf-8')
self._fds[room] = fd
self._fds[jid] = fd
return fd
except IOError:
......@@ -137,11 +172,11 @@ class Logger:
jid: str,
msg: Union[BaseMessage, Message]) -> bool:
log the message in the appropriate jid's file
0 = Don’t log
1 = Message
2 = Status/whatever
Log the message in the appropriate file
:param jid: JID of the entity for which to log the message
:param msg: Message to log
:returns: True if no error was encountered
if not config.get_by_tabname('use_log', jid):
return True
......@@ -149,12 +184,12 @@ class Logger:
return True
date = msg.time
txt = msg.txt
typ = 2
nick = ''
typ = 'MI'
if isinstance(msg, Message):
nick = msg.nickname or ''
typ = 1
logged_msg = build_log_message(nick, txt, date=date, typ=typ)
typ = 'MR'
logged_msg = build_log_message(nick, txt, date=date, prefix=typ)
if not logged_msg:
return True
jid = str(jid).replace('/', '\\')
......@@ -165,7 +200,7 @@ class Logger:
if option_fd is None:
return True
fd = option_fd
filename = log_dir / jid
filename = self.log_dir / jid
except OSError:
......@@ -188,11 +223,15 @@ class Logger:
def log_roster_change(self, jid: str, message: str) -> bool:
Log a roster change
:param jid: jid to log the change for
:param message: message to log
:returns: True if no error happened
if not config.get_by_tabname('use_log', jid):
return True
self._check_and_create_log_dir('', open_fd=False)
filename = log_dir / 'roster.log'
filename = self.log_dir / 'roster.log'
if not self._roster_logfile:
self._roster_logfile ='a', encoding='utf-8')
......@@ -213,7 +252,7 @@ class Logger:
for line in lines:
self._roster_logfile.write(' %s\n' % line)
except Exception:
'Unable to write in the log file (%s)',
......@@ -225,21 +264,19 @@ class Logger:
def build_log_message(nick: str,
msg: str,
date: Optional[datetime] = None,
typ: int = 1) -> str:
prefix: str = 'MI') -> str:
Create a log message from a nick, a message, optionally a date and type
message types:
0 = Don’t log
1 = Message
2 = Status/whatever
if not typ:
return ''
:param nick: nickname to log
:param msg: text of the message
:param date: date of the message
:param prefix: MI (info) or MR (message)
:returns: The log line(s)
msg = clean_text(msg)
time = common.get_utc_time() if date is None else common.get_utc_time(date)
str_time = time.strftime('%Y%m%dT%H:%M:%SZ')
prefix = 'MR' if typ == 1 else 'MI'
lines = msg.split('\n')
first_line = lines.pop(0)
nb_lines = str(len(lines)).zfill(3)
......@@ -254,7 +291,11 @@ def build_log_message(nick: str,
def _get_lines_from_fd(fd: IO[Any], nb: int = 10) -> List[str]:
Get the last log lines from a fileno
Get the last log lines from a fileno with mmap
:param fd: File descriptor on the log file
:param nb: number of messages to fetch
:returns: A list of message lines
with mmap.mmap(fd.fileno(), 0, prot=mmap.PROT_READ) as m:
# start of messages begin with MI or MR, after a \n
......@@ -271,9 +312,12 @@ def _get_lines_from_fd(fd: IO[Any], nb: int = 10) -> List[str]:
def parse_log_lines(lines: List[str], jid: str) -> List[Dict[str, Any]]:
Parse raw log lines into poezio log objects
:param lines: Message lines
:param jid: jid (for error logging)
:return: a list of dicts containing message info
messages = []
color = '\x19%s}' % dump_tuple(get_theme().COLOR_LOG_MSG)
# now convert that data into actual Message objects
idx = 0
......@@ -294,10 +338,10 @@ def parse_log_lines(lines: List[str], jid: str) -> List[Dict[str, Any]]:
size = log_item.nb_lines
if isinstance(log_item, LogInfo):
message_lines.append(color + log_item.text)
elif isinstance(log_item, LogMessage):
message['nickname'] = log_item.nick
message_lines.append(color + log_item.text)
while size != 0 and idx < len(lines):
size -= 1
......@@ -307,10 +351,4 @@ def parse_log_lines(lines: List[str], jid: str) -> List[Dict[str, Any]]:
return messages
def create_logger() -> None:
"Create the global logger object"
global logger
logger = Logger()
logger = Logger()
......@@ -100,8 +100,8 @@ def main():
from poezio import theming
from poezio import logger
from poezio.logger import logger
logger.log_dir = config.LOG_DIR
from poezio import roster
......@@ -2,8 +2,76 @@
Test the functions in the `logger` module
import datetime
from poezio.logger import LogMessage, parse_log_line, parse_log_lines, build_log_message
from poezio.common import get_utc_time, get_local_time
from pathlib import Path
from random import sample
from shutil import rmtree
from string import hexdigits
from poezio import logger
from poezio.logger import (
LogMessage, parse_log_line, parse_log_lines, build_log_message
from poezio.ui.types import Message
from poezio.common import get_utc_time
import pytest
class ConfigShim:
def __init__(self, value):
self.value = value
def get_by_tabname(self, name, *args, **kwargs):
return self.value
logger.config = ConfigShim(True)
def log_dir():
name = 'tmplog-' + ''.join(sample(hexdigits, 16))
path = Path('/tmp', name)
yield path
rmtree(path, ignore_errors=True)
def read_file(logger, name):
if '/' in name:
name = name.replace('/', '\\')
filename = logger.log_dir / f'{name}'
with open(filename) as fd:
def test_log_roster(log_dir):
instance = logger.Logger()
instance.log_dir = log_dir
instance.log_roster_change('', 'test test')
content = read_file(instance, 'roster.log')
assert content[:3] == 'MI '
assert content[-32:] == ' 000 test test\n'
def test_log_message(log_dir):
instance = logger.Logger()
instance.log_dir = log_dir
msg = Message('content', 'toto')
instance.log_message('', msg)
content = read_file(instance, '')
line = parse_log_lines(content.split('\n'), '')[0]
assert line['nickname'] == 'toto'
assert line['txt'] == 'content'
msg2 = Message('content\ncontent2', 'titi')
instance.log_message('', msg2)
content = read_file(instance, '')
lines = parse_log_lines(content.split('\n'), '')
assert lines[0]['nickname'] == 'toto'
assert lines[0]['txt'] == 'content'
assert lines[1]['nickname'] == 'titi'
assert lines[1]['txt'] == 'content\ncontent2'
def test_parse_message():
line = 'MR 20170909T09:09:09Z 000 <nick>  body'
......@@ -17,17 +85,27 @@ def test_parse_message():
def test_log_and_parse_messages():
msg1 = {'nick': 'toto', 'msg': 'coucou', 'date':}
msg1 = {
'nick': 'toto',
'msg': 'coucou',
'prefix': 'MR',
msg1_utc = get_utc_time(msg1['date'])
built_msg1 = build_log_message(**msg1)
assert built_msg1 == 'MR %s 000 <toto>  coucou\n' % (msg1_utc.strftime('%Y%m%dT%H:%M:%SZ'))
msg2 = {'nick': 'toto', 'msg': 'coucou\ncoucou', 'date':}
msg2 = {
'nick': 'toto',
'msg': 'coucou\ncoucou',
'prefix': 'MR',
built_msg2 = build_log_message(**msg2)
msg2_utc = get_utc_time(msg2['date'])
assert built_msg2 == 'MR %s 001 <toto>  coucou\n coucou\n' % (msg2_utc.strftime('%Y%m%dT%H:%M:%SZ'))
assert parse_log_lines((built_msg1 + built_msg2).split('\n'), 'user@domain') == [
{'time': msg1['date'], 'history': True, 'txt': '\x195,-1}coucou', 'nickname': 'toto'},
{'time': msg2['date'], 'history': True, 'txt': '\x195,-1}coucou\ncoucou', 'nickname': 'toto'},
{'time': msg1['date'], 'history': True, 'txt': 'coucou', 'nickname': 'toto'},
{'time': msg2['date'], 'history': True, 'txt': 'coucou\ncoucou', 'nickname': 'toto'},
