multischleuder/multischleuder/conflict.py
2022-04-23 00:18:03 +02:00

204 lines
9 KiB
Python

from typing import Dict, List, Optional, Tuple
import hashlib
import json
import logging
import os
import struct
from collections import OrderedDict
from datetime import datetime
import pgpy # type: ignore
from multischleuder.reporting import KeyConflictMessage, Message, UserConflictMessage
from multischleuder.types import SchleuderKey, SchleuderList, SchleuderSubscriber
class KeyConflictResolution:
def __init__(self, interval: int, statefile: str, key_template: str, user_template: str):
self._interval: int = interval
self._state_file: str = statefile
self._key_template: str = key_template
self._user_template: str = user_template
self._dry_run: bool = False
def dry_run(self):
self._dry_run = True
def resolve(self,
target: str,
mail_from: str,
subscriptions: List[SchleuderSubscriber],
sources: List[SchleuderList]) -> Tuple[List[SchleuderSubscriber], List[Optional[Message]]]:
sourcemap: Dict[int, str] = {s.id: s.name for s in sources}
conflicts: List[Optional[Message]] = []
# First check for keys that are being used by more than one subscriber
keys: Dict[str, List[SchleuderSubscriber]] = {}
for s in subscriptions:
if s.key is None:
continue
keys.setdefault(s.key.fingerprint, []).append(s)
key_resolved: List[SchleuderSubscriber] = []
for k in keys.values():
rk, m = self._resolve_users(target, mail_from, k, sourcemap)
if rk is not None:
key_resolved.append(rk)
if m is not None:
conflicts.extend(m)
subs: Dict[str, List[SchleuderSubscriber]] = OrderedDict()
# Only consider subscribers who have not been removed due to a key used by multiple users
for s in key_resolved:
subs.setdefault(s.email, []).append(s)
# Perform conflict resolution for each set of subscribers with the same email
resolved: List[SchleuderSubscriber] = []
for c in subs.values():
r, msg = self._resolve_keys(target, mail_from, c, sourcemap)
if r is not None:
resolved.append(r)
if msg is not None:
conflicts.append(msg)
return resolved, conflicts
def _resolve_keys(self,
target: str,
mail_from: str,
subscriptions: List[SchleuderSubscriber],
sourcemap: Dict[int, str]) \
-> Tuple[Optional[SchleuderSubscriber], Optional[Message]]:
notnull = [s for s in subscriptions if s.key is not None]
if len(notnull) == 0:
return None, None
if len({s.key.blob for s in subscriptions if s.key is not None}) == 1:
# No conflict if all keys are the same
return notnull[0], None
# Conflict Resolution: Choose the OLDEST subscription with a key, but notify using ALL keys
earliest: SchleuderSubscriber = min(notnull, key=lambda x: x.created_at)
assert earliest.key is not None # Make mypy happy; it can't know that earliest.key can't be None
logging.debug(f'Key Conflict for {earliest.email} in lists, chose {earliest.key.fingerprint}:')
for s in subscriptions:
fpr = 'no key' if s.key is None else s.key.fingerprint
sschleuder = sourcemap.get(s.schleuder, 'unknown')
logging.debug(f' {sschleuder}: {fpr}')
# Generate a SHA1 digest that only changes when the subscription list changes
digest = self._make_key_digest(earliest, subscriptions)
msg: Optional[Message] = None
# Create a conflict message only if it hasn't been sent recently
if self._should_send(digest):
msg = KeyConflictMessage(
target,
earliest,
subscriptions,
digest,
mail_from,
self._key_template,
sourcemap
)
# Return the result of conflict resolution
return earliest, msg
def _resolve_users(self,
target: str,
mail_from: str,
subscriptions: List[SchleuderSubscriber],
sourcemap: Dict[int, str]) \
-> Tuple[Optional[SchleuderSubscriber], Optional[List[Message]]]:
notnull = [s for s in subscriptions if s.key is not None]
if len(notnull) == 0:
return None, None
emails = {s.email for s in notnull}
if len(emails) == 1:
# No conflict if all emails are the same
return notnull[0], None
# Conflict Resolution: Choose the OLDEST subscription with a key, but notify using ALL recipients
earliest: SchleuderSubscriber = min(notnull, key=lambda x: x.created_at)
assert earliest.key is not None # Make mypy happy; it can't know that earliest.key can't be None
logging.debug(f'User Conflict for {earliest.key.fingerprint} in lists, chose {earliest.email}:')
for s in subscriptions:
sschleuder = sourcemap.get(s.schleuder, 'unknown')
logging.debug(f' {sschleuder}: {s.email}')
# Generate a SHA1 digest that only changes when the subscription list changes
digest = self._make_user_digest(earliest, subscriptions)
msgs: Optional[List[Message]] = None
# Create a conflict message only if it hasn't been sent recently
if self._should_send(digest):
msgs = []
for email in emails:
msgs.append(UserConflictMessage(
target,
email,
earliest,
subscriptions,
digest,
mail_from,
self._user_template,
sourcemap
))
# Return the result of conflict resolution
return earliest, msgs
def _should_send(self, digest: str) -> bool:
now = int(datetime.utcnow().timestamp())
try:
with open(self._state_file, 'a+') as f:
state: Dict[str, int] = {}
if f.tell() > 0:
# Only load the state if the file is not empty
f.seek(0)
try:
state = json.load(f)
except BaseException:
logging.exception('Cannot read statefile. Not sending any messages!')
return False
# Remove all state entries older than conflict_interval
state = {k: v for k, v in state.items() if now-v < self._interval}
# Should send if it has not been sent before or has been removed in the line above
send = digest not in state
# Add all remaining messages to state dict
if send:
state[digest] = now
# Write the new state to file
if not self._dry_run:
f.seek(0)
f.truncate()
json.dump(state, f)
return send
except BaseException:
logging.exception('Cannot open or write statefile. Not sending any messages!')
return False
def _make_key_digest(self, chosen: SchleuderSubscriber, candidates: List[SchleuderSubscriber]) -> str:
# Sort so the hash stays the same if the set of subscriptions is the same.
# There is no guarantee that the subs are in any specific order.
subs: List[SchleuderSubscriber] = sorted(candidates, key=lambda x: x.schleuder)
h = hashlib.new('sha1')
# Include the chosen email an source sub-list
h.update(struct.pack('!sd',
chosen.email.encode(),
chosen.schleuder))
# Include all subscriptions, including the FULL key
for s in subs:
key = b'no key'
if s.key is not None:
key = s.key.blob.encode()
h.update(struct.pack('!ds', s.schleuder, key))
return h.hexdigest()
def _make_user_digest(self, chosen: SchleuderSubscriber, candidates: List[SchleuderSubscriber]) -> str:
# Sort so the hash stays the same if the set of subscriptions is the same.
# There is no guarantee that the subs are in any specific order.
subs: List[SchleuderSubscriber] = sorted(candidates, key=lambda x: x.schleuder)
h = hashlib.new('sha1')
assert chosen.key is not None # Make mypy happy; it can't know that chosen.key can't be None
# Include the chosen email an source sub-list
h.update(struct.pack('!ssd',
chosen.key.fingerprint.encode(),
chosen.email.encode(),
chosen.schleuder))
# Include all subscriptions, including the FULL key
for s in subs:
h.update(struct.pack('!ds', s.schleuder, s.email.encode()))
return h.hexdigest()