Commit e48780dd authored by mathieui's avatar mathieui

Merge branch 'fix-history-fetch' into 'master'

Fix many MAM issues

Closes #3516, #3496, #3498, #3506, #3522, and #3493

See merge request !105
parents 4c1ab027 faeab78c
......@@ -8,7 +8,11 @@
Various useful functions.
"""
from datetime import datetime, timedelta
from datetime import (
datetime,
timedelta,
timezone,
)
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union
......@@ -488,3 +492,14 @@ def unique_prefix_of(a: str, b: str) -> str:
return a[:i+1]
# both are equal, return a
return a
def to_utc(time: datetime) -> datetime:
"""Convert a datetime-aware time zone into raw UTC"""
tzone = datetime.now().astimezone().tzinfo
if time.tzinfo is not None: # Convert to UTC
time = time.astimezone(tz=timezone.utc)
else: # Assume local tz, convert to URC
time = time.replace(tzinfo=tzone).astimezone(tz=timezone.utc)
# Return an offset-naive datetime
return time.replace(tzinfo=None)
......@@ -2075,16 +2075,7 @@ class Core:
# do not join rooms that do not have autojoin
# but display them anyway
if bm.autojoin:
muc.join_groupchat(
self,
bm.jid,
nick,
passwd=bm.password,
status=self.status.message,
show=self.status.show,
tab=tab)
if tab._text_buffer.last_message is None:
asyncio.ensure_future(mam.on_tab_open(tab))
tab.join()
def check_bookmark_storage(self, features):
private = 'jabber:iq:private' in features
......
......@@ -6,34 +6,49 @@
XEP-0313: Message Archive Management(MAM).
"""
import asyncio
import logging
import random
from datetime import datetime, timedelta, timezone
from hashlib import md5
from typing import Optional, Callable
from typing import (
Any,
AsyncIterable,
Callable,
Dict,
List,
Optional,
)
from slixmpp import JID
from slixmpp import JID, Message as SMessage
from slixmpp.exceptions import IqError, IqTimeout
from poezio.theming import get_theme
from poezio import tabs
from poezio import xhtml, colors
from poezio.config import config
from poezio.text_buffer import TextBuffer
from poezio.ui.types import Message
from poezio.common import to_utc
from poezio.text_buffer import TextBuffer, HistoryGap
from poezio.ui.types import (
BaseMessage,
EndOfArchive,
Message,
)
log = logging.getLogger(__name__)
class DiscoInfoException(Exception): pass
class MAMQueryException(Exception): pass
class NoMAMSupportException(Exception): pass
def add_line(
tab,
text_buffer: TextBuffer,
def make_line(
tab: tabs.ChatTab,
text: str,
time: datetime,
nick: str,
top: bool,
) -> None:
identifier: str = '',
) -> Message:
"""Adds a textual entry in the TextBuffer"""
# Convert to local timezone
......@@ -61,150 +76,188 @@ def add_line(
color = xhtml.colors.get(color)
color = (color, -1)
else:
nick = nick.split('/')[0]
color = get_theme().COLOR_OWN_NICK
text_buffer.add_message(
Message(
txt=text,
time=time,
nickname=nick,
nick_color=color,
history=True,
user=None,
top=top,
)
if nick.split('/')[0] == tab.core.xmpp.boundjid.bare:
color = get_theme().COLOR_OWN_NICK
else:
color = get_theme().COLOR_REMOTE_USER
nick = tab.get_nick()
return Message(
txt=text,
identifier=identifier,
time=time,
nickname=nick,
nick_color=color,
history=True,
user=None,
)
async def query(
async def get_mam_iterator(
core,
groupchat: bool,
remote_jid: JID,
amount: int,
reverse: bool,
start: Optional[datetime] = None,
end: Optional[datetime] = None,
reverse: bool = True,
start: Optional[str] = None,
end: Optional[str] = None,
before: Optional[str] = None,
callback: Optional[Callable] = None,
) -> None:
) -> AsyncIterable[Message]:
"""Get an async iterator for this mam query"""
try:
query_jid = remote_jid if groupchat else JID(core.xmpp.boundjid.bare)
iq = await core.xmpp.plugin['xep_0030'].get_info(jid=query_jid)
except (IqError, IqTimeout):
raise DiscoInfoException
raise DiscoInfoException()
if 'urn:xmpp:mam:2' not in iq['disco_info'].get_features():
raise NoMAMSupportException
raise NoMAMSupportException()
args = {
'iterator': True,
'reverse': reverse,
}
} # type: Dict[str, Any]
if groupchat:
args['jid'] = remote_jid
else:
args['with_jid'] = remote_jid
args['rsm'] = {'max': amount}
if reverse:
if before is not None:
args['rsm']['before'] = before
else:
args['end'] = end
else:
args['rsm']['start'] = start
if before is not None:
args['rsm']['end'] = end
try:
results = core.xmpp['xep_0313'].retrieve(**args)
except (IqError, IqTimeout):
raise MAMQueryException
if callback is not None:
callback(results)
if amount > 0:
args['rsm'] = {'max': amount}
args['start'] = start
args['end'] = end
return core.xmpp['xep_0313'].retrieve(**args)
return results
def _parse_message(msg: SMessage) -> Dict:
"""Parse info inside a MAM forwarded message"""
forwarded = msg['mam_result']['forwarded']
message = forwarded['stanza']
return {
'time': forwarded['delay']['stamp'],
'nick': str(message['from']),
'text': message['body'],
'identifier': message['origin-id']
}
async def add_messages_to_buffer(tab, top: bool, results, amount: int) -> bool:
"""Prepends or appends messages to the tab text_buffer"""
async def retrieve_messages(tab: tabs.ChatTab,
results: AsyncIterable[SMessage],
amount: int = 100) -> List[BaseMessage]:
"""Run the MAM query and put messages in order"""
text_buffer = tab._text_buffer
msg_count = 0
msgs = []
async for rsm in results:
if top:
to_add = []
try:
async for rsm in results:
for msg in rsm['mam']['results']:
if msg['mam_result']['forwarded']['stanza'] \
.xml.find('{%s}%s' % ('jabber:client', 'body')) is not None:
msgs.append(msg)
if msg_count == amount:
tab.core.refresh_window()
return False
.xml.find('{%s}%s' % ('jabber:client', 'body')) is not None:
args = _parse_message(msg)
msgs.append(make_line(tab, **args))
for msg in reversed(msgs):
to_add.append(msg)
msg_count += 1
msgs.reverse()
for msg in msgs:
forwarded = msg['mam_result']['forwarded']
timestamp = forwarded['delay']['stamp']
message = forwarded['stanza']
tab.last_stanza_id = msg['mam_result']['id']
nick = str(message['from'])
add_line(tab, text_buffer, message['body'], timestamp, nick, top)
else:
for msg in rsm['mam']['results']:
forwarded = msg['mam_result']['forwarded']
timestamp = forwarded['delay']['stamp']
message = forwarded['stanza']
nick = str(message['from'])
add_line(tab, text_buffer, message['body'], timestamp, nick, top)
tab.core.refresh_window()
return False
if msg_count == amount:
to_add.reverse()
return to_add
msgs = []
to_add.reverse()
return to_add
except (IqError, IqTimeout) as exc:
log.debug('Unable to complete MAM query: %s', exc, exc_info=True)
raise MAMQueryException('Query interrupted')
async def fetch_history(tab, end: Optional[datetime] = None, amount: Optional[int] = None):
async def fetch_history(tab: tabs.ChatTab,
start: Optional[datetime] = None,
end: Optional[datetime] = None,
amount: int = 100) -> List[BaseMessage]:
remote_jid = tab.jid
before = tab.last_stanza_id
if not end:
for msg in tab._text_buffer.messages:
if isinstance(msg, Message):
end = msg.time
end -= timedelta(microseconds=1)
break
if end is None:
end = datetime.now()
tzone = datetime.now().astimezone().tzinfo
end = end.replace(tzinfo=tzone).astimezone(tz=timezone.utc)
end = end.replace(tzinfo=None)
end = datetime.strftime(end, '%Y-%m-%dT%H:%M:%SZ')
if amount >= 100:
amount = 99
end = to_utc(end)
end_str = datetime.strftime(end, '%Y-%m-%dT%H:%M:%SZ')
groupchat = isinstance(tab, tabs.MucTab)
start_str = None
if start is not None:
start = to_utc(start)
start_str = datetime.strftime(start, '%Y-%m-%dT%H:%M:%SZ')
results = await query(
tab.core,
groupchat,
remote_jid,
amount,
mam_iterator = await get_mam_iterator(
core=tab.core,
groupchat=isinstance(tab, tabs.MucTab),
remote_jid=remote_jid,
amount=amount,
end=end_str,
start=start_str,
reverse=True,
end=end,
before=before,
)
query_status = await add_messages_to_buffer(tab, True, results, amount)
tab.query_status = query_status
return await retrieve_messages(tab, mam_iterator, amount)
async def fill_missing_history(tab: tabs.ChatTab, gap: HistoryGap) -> None:
start = gap.last_timestamp_before_leave
end = gap.first_timestamp_after_join
if start:
start = start + timedelta(seconds=1)
if end:
end = end - timedelta(seconds=1)
try:
messages = await fetch_history(tab, start=start, end=end, amount=999)
tab._text_buffer.add_history_messages(messages, gap=gap)
if messages:
tab.core.refresh_window()
except (NoMAMSupportException, MAMQueryException, DiscoInfoException):
return
finally:
tab.query_status = False
async def on_tab_open(tab) -> None:
async def on_new_tab_open(tab: tabs.ChatTab) -> None:
"""Called when opening a new tab"""
amount = 2 * tab.text_win.height
end = datetime.now()
tab.query_status = True
for message in tab._text_buffer.messages:
time = message.time
if time < end:
end = time
end = end + timedelta(seconds=-1)
if isinstance(message, Message) and to_utc(message.time) < to_utc(end):
end = message.time
break
end = end - timedelta(microseconds=1)
try:
await fetch_history(tab, end=end, amount=amount)
messages = await fetch_history(tab, end=end, amount=amount)
tab._text_buffer.add_history_messages(messages)
if messages:
tab.core.refresh_window()
except (NoMAMSupportException, MAMQueryException, DiscoInfoException):
tab.query_status = False
return None
finally:
tab.query_status = False
def schedule_tab_open(tab: tabs.ChatTab) -> None:
"""Set the query status and schedule a MAM query"""
tab.query_status = True
asyncio.ensure_future(on_tab_open(tab))
async def on_tab_open(tab: tabs.ChatTab) -> None:
gap = tab._text_buffer.find_last_gap_muc()
if gap is None or not gap.leave_message:
await on_new_tab_open(tab)
else:
await fill_missing_history(tab, gap)
def schedule_scroll_up(tab: tabs.ChatTab) -> None:
"""Set query status and schedule a scroll up"""
tab.query_status = True
asyncio.ensure_future(on_scroll_up(tab))
async def on_scroll_up(tab) -> None:
async def on_scroll_up(tab: tabs.ChatTab) -> None:
tw = tab.text_win
# If position in the tab is < two screen pages, then fetch MAM, so that we
......@@ -212,22 +265,31 @@ async def on_scroll_up(tab) -> None:
# join if not already available.
total, pos, height = len(tw.built_lines), tw.pos, tw.height
rest = (total - pos) // height
# Not resetting the state of query_status here, it is changed only after the
# query is complete (in fetch_history)
# This is done to stop message repetition, eg: if the user presses PageUp continuously.
tab.query_status = True
if rest > 1:
tab.query_status = False
return None
try:
# XXX: Do we want to fetch a possibly variable number of messages?
# (InfoTab changes height depending on the type of messages, see
# `information_buffer_popup_on`).
await fetch_history(tab, amount=height)
messages = await fetch_history(tab, amount=height)
last_message_exists = False
if tab._text_buffer.messages:
last_message = tab._text_buffer.messages[0]
last_message_exists = True
if not messages and last_message_exists and not isinstance(last_message, EndOfArchive):
time = tab._text_buffer.messages[0].time
messages = [EndOfArchive('End of archive reached', time=time)]
tab._text_buffer.add_history_messages(messages)
if messages:
tab.core.refresh_window()
except NoMAMSupportException:
tab.core.information('MAM not supported for %r' % tab.jid, 'Info')
return None
except (MAMQueryException, DiscoInfoException):
tab.core.information('An error occured when fetching MAM for %r' % tab.jid, 'Error')
return None
finally:
tab.query_status = False
......@@ -32,7 +32,6 @@ from typing import (
)
from poezio import (
mam,
poopt,
timed_events,
xhtml,
......@@ -493,12 +492,11 @@ class ChatTab(Tab):
self._jid = jid
#: Is the tab currently requesting MAM data?
self.query_status = False
self.last_stanza_id = None
self._name = jid.full # type: Optional[str]
self.text_win = None
self.text_win = windows.TextWin()
self.directed_presence = None
self._text_buffer = TextBuffer()
self._text_buffer.add_window(self.text_win)
self.chatstate = None # can be "active", "composing", "paused", "gone", "inactive"
# We keep a reference of the event that will set our chatstate to "paused", so that
# we can delete it or change it if we need to
......@@ -926,7 +924,8 @@ class ChatTab(Tab):
def on_scroll_up(self):
if not self.query_status:
asyncio.ensure_future(mam.on_scroll_up(tab=self))
from poezio import mam
mam.schedule_scroll_up(tab=self)
return self.text_win.scroll_up(self.text_win.height - 1)
def on_scroll_down(self):
......
......@@ -48,8 +48,6 @@ class ConversationTab(OneToOneTab):
self.nick = None
self.nick_sent = False
self.state = 'normal'
self.text_win = windows.TextWin()
self._text_buffer.add_window(self.text_win)
self.upper_bar = windows.ConversationStatusMessageWin()
self.input = windows.MessageInput()
# keys
......
......@@ -31,7 +31,7 @@ from poezio import multiuserchat as muc
from poezio import timed_events
from poezio import windows
from poezio import xhtml
from poezio.common import safeJID
from poezio.common import safeJID, to_utc
from poezio.config import config
from poezio.core.structs import Command
from poezio.decorators import refresh_wrapper, command_args_parser
......@@ -40,7 +40,14 @@ from poezio.roster import roster
from poezio.theming import get_theme, dump_tuple
from poezio.user import User
from poezio.core.structs import Completion, Status
from poezio.ui.types import BaseMessage, Message, InfoMessage, StatusMessage
from poezio.ui.types import (
BaseMessage,
InfoMessage,
Message,
MucOwnJoinMessage,
MucOwnLeaveMessage,
StatusMessage,
)
log = logging.getLogger(__name__)
......@@ -84,8 +91,6 @@ class MucTab(ChatTab):
self.self_ping_event = None
# UI stuff
self.topic_win = windows.Topic()
self.text_win = windows.TextWin()
self._text_buffer.add_window(self.text_win)
self.v_separator = windows.VerticalSeparator()
self.user_win = windows.UserList()
self.info_header = windows.MucInfoWin()
......@@ -151,10 +156,10 @@ class MucTab(ChatTab):
"""
status = self.core.get_status()
if self.last_connection:
delta = datetime.now() - self.last_connection
delta = to_utc(datetime.now()) - to_utc(self.last_connection)
seconds = delta.seconds + delta.days * 24 * 3600
else:
seconds = None
seconds = self._text_buffer.find_last_message()
muc.join_groupchat(
self.core,
self.jid.bare,
......@@ -163,7 +168,6 @@ class MucTab(ChatTab):
status=status.message,
show=status.show,
seconds=seconds)
asyncio.ensure_future(mam.on_tab_open(self))
def leave_room(self, message: str):
if self.joined:
......@@ -200,7 +204,7 @@ class MucTab(ChatTab):
'color_spec': spec_col,
'nick': self.own_nick,
}
self.add_message(InfoMessage(msg), typ=2)
self.add_message(MucOwnLeaveMessage(msg), typ=2)
self.disconnect()
muc.leave_groupchat(self.core.xmpp, self.jid.bare, self.own_nick,
message)
......@@ -567,7 +571,7 @@ class MucTab(ChatTab):
'nick_col': color,
'info_col': info_col,
}
self.add_message(InfoMessage(enable_message), typ=2)
self.add_message(MucOwnJoinMessage(enable_message), typ=2)
self.core.enable_private_tabs(self.jid.bare, enable_message)
if '201' in status_codes:
self.add_message(
......@@ -594,6 +598,7 @@ class MucTab(ChatTab):
},
),
typ=0)
mam.schedule_tab_open(self)
def handle_presence_joined(self, presence: Presence, status_codes) -> None:
"""
......@@ -651,7 +656,7 @@ class MucTab(ChatTab):
def on_non_member_kicked(self):
"""We have been kicked because the MUC is members-only"""
self.add_message(
InfoMessage(
MucOwnLeaveMessage(
'You have been kicked because you '
'are not a member and the room is now members-only.'
),
......@@ -661,7 +666,7 @@ class MucTab(ChatTab):
def on_muc_shutdown(self):
"""We have been kicked because the MUC service is shutting down"""
self.add_message(
InfoMessage(
MucOwnLeaveMessage(
'You have been kicked because the'
' MUC service is shutting down.'
),
......@@ -759,6 +764,7 @@ class MucTab(ChatTab):
"""
When someone is banned from a muc
"""
cls = InfoMessage
self.users.remove(user)
by = presence.xml.find('{%s}x/{%s}item/{%s}actor' %
(NS_MUC_USER, NS_MUC_USER, NS_MUC_USER))
......@@ -774,6 +780,7 @@ class MucTab(ChatTab):
char_kick = theme.CHAR_KICK
if from_nick == self.own_nick: # we are banned
cls = MucOwnLeaveMessage
if by:
kick_msg = ('\x191}%(spec)s \x193}You\x19%(info_col)s}'
' have been banned by \x194}%(by)s') % {
......@@ -834,12 +841,13 @@ class MucTab(ChatTab):
'reason': reason.text,
'info_col': info_col
}
self.add_message(InfoMessage(kick_msg), typ=2)
self.add_message(cls(kick_msg), typ=2)
def on_user_kicked(self, presence, user, from_nick):
"""
When someone is kicked from a muc
"""
cls = InfoMessage
self.users.remove(user)
actor_elem = presence.xml.find('{%s}x/{%s}item/{%s}actor' %
(NS_MUC_USER, NS_MUC_USER, NS_MUC_USER))
......@@ -852,6 +860,7 @@ class MucTab(ChatTab):
if actor_elem is not None:
by = actor_elem.get('nick') or actor_elem.get('jid')
if from_nick == self.own_nick: # we are kicked
cls = MucOwnLeaveMessage
if by:
kick_msg = ('\x191}%(spec)s \x193}You\x19'
'%(info_col)s} have been kicked'
......@@ -912,7 +921,7 @@ class MucTab(ChatTab):
'reason': reason.text,
'info_col': info_col
}
self.add_message(InfoMessage(kick_msg), typ=2)
self.add_message(cls(kick_msg), typ=2)
def on_user_leave_groupchat(self,
user: User,
......
......@@ -46,8 +46,6 @@ class PrivateTab(OneToOneTab):
def __init__(self, core, jid, nick):
OneToOneTab.__init__(self, core, jid)
self.own_nick = nick
self.text_win = windows.TextWin()
self._text_buffer.add_window(self.text_win)
self.info_header = windows.PrivateInfoWin()
self.input = windows.MessageInput()
# keys
......
......@@ -12,6 +12,7 @@ import logging
log = logging.getLogger(__name__)
from typing import (
cast,
Dict,
List,
Optional,
......@@ -19,9 +20,15 @@ from typing import (
Tuple,
Union,
)
from dataclasses import dataclass
from datetime import datetime
from poezio.config import config
from poezio.ui.types import Message, BaseMessage
from poezio.ui.types import (
BaseMessage,
Message,
MucOwnJoinMessage,