maubot-plugin-spaceapi/me.s3lph.spaceapi/spaceapi.py

132 lines
4.6 KiB
Python

from typing import Tuple, Type
import json
import asyncio
import aiohttp
from datetime import datetime
from maubot import Plugin
from maubot.handlers import command, web
from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper
from mautrix.types import MessageEvent, MessageType, TextMessageEventContent, RoomID, RoomAlias
class Config(BaseProxyConfig):
def do_update(self, helper: ConfigUpdateHelper) -> None:
helper.copy("spaceapi")
helper.copy("poll")
helper.copy("messages")
helper.copy('command_prefix')
class SpaceapiBot(Plugin):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._room: RoomID = None
self._update_room = True
self._polling_task = None
def get_command_prefix(self) -> str:
return self.config.get('command_prefix', 'spaceapi')
@classmethod
def get_config_class(cls) -> Type[BaseProxyConfig]:
return Config
def on_external_config_update(self) -> None:
self.config.load_and_update()
self._update_room = True
async def start(self) -> None:
self.on_external_config_update()
await self._get_room()
if self._polling_task:
self._polling_task.cancel()
self._polling_task = asyncio.create_task(self.poll())
async def stop(self) -> None:
if self._polling_task:
self._polling_task.cancel()
self._polling_task = None
async def _get_room(self) -> RoomID:
if self._room is not None and not self._update_room:
return self._room
room = self.config['poll']['room']
if room.startswith('#'):
if 'resolve_room_alias' in dir(self.client):
self._room = (await self.client.resolve_room_alias(RoomAlias(room))).room_id
else:
self._room = (await self.client.get_room_alias(RoomAlias(room))).room_id
else:
self._room = RoomID(room)
self._update_room = False
return self._room
async def update_spaceapi_status(self, http: aiohttp.ClientSession) -> Tuple[object, bool]:
try:
async with http.get(self.config['spaceapi']) as resp:
if resp.status != 200:
raise RuntimeError()
rdata = await resp.text()
now = json.loads(rdata)
except BaseException:
return None, False
with open(self.config['poll']['cache'], 'a+') as cache:
cache.seek(0)
try:
pdata = cache.read()
previous = json.loads(pdata)
except BaseException as e:
self.log.exception(e)
previous = json.loads(rdata)
cache.seek(0)
cache.truncate()
cache.write(rdata)
changed = (now['state']['open'] != previous['state']['open'])
return now, changed
async def send_spaceapi_update(self, state: object, requester: MessageEvent = None) -> None:
if requester:
room = requester.room_id
else:
room = await self._get_room()
try:
fmt = {
'lastchange': datetime.fromtimestamp(state['state']['lastchange']).strftime('%H:%M')
}
if state['state']['open']:
body = self.config['messages']['open'].format(**fmt)
else:
body = self.config['messages']['closed'].format(**fmt)
except BaseException as e:
self.log.exception(e)
if not requester:
return
body = self.config['messages']['error']
msg = TextMessageEventContent(msgtype=MessageType.TEXT, body=body)
if requester:
await requester.reply(msg)
else:
await self.client.send_message(room, msg)
@command.new(name=get_command_prefix)
async def cmd(self, evt: MessageEvent):
async with aiohttp.ClientSession(read_timeout=15, conn_timeout=5) as http:
state, _ = await self.update_spaceapi_status(http)
await self.send_spaceapi_update(state, evt)
async def poll(self) -> None:
async with aiohttp.ClientSession(read_timeout=15, conn_timeout=5) as http:
while True:
await asyncio.sleep(self.config['poll'].get('interval', 60))
self.log.debug('Polling SpaceAPI endpoint')
try:
state, changed = await self.update_spaceapi_status(http)
if changed:
await self.send_spaceapi_update(state)
except BaseException as e:
self.log.exception(e)