from typing import Callable, Tuple, Type import json import os.path import asyncio import aiohttp from datetime import timedelta from maubot import Plugin from maubot.handlers import command from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper from mautrix.crypto.attachments import encrypt_attachment from mautrix.errors.request import MNotFound from mautrix.types import MessageEvent, TextMessageEventContent, MediaMessageEventContent, ImageInfo, RoomID, RoomAlias, MessageType, EventType, RoomEncryptionStateEventContent class Config(BaseProxyConfig): def do_update(self, helper: ConfigUpdateHelper) -> None: helper.copy('ultimaker') helper.copy('poll') helper.copy('messages') helper.copy('state_messages') helper.copy('job_messages') helper.copy('command_prefix') class UltimakerBot(Plugin): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._room: RoomID = None self._update_room = True self._polling_task = None def get_command_prefix(self) -> str: return self.config.get('command_prefix', '3dp') @classmethod def get_config_class(cls) -> Type[BaseProxyConfig]: return Config def on_external_config_update(self) -> None: self.config.load_and_update() self._update_room = True async def start(self) -> None: self.on_external_config_update() await self._get_room() if self._polling_task: self._polling_task.cancel() self._polling_task = asyncio.create_task(self.poll()) async def stop(self) -> None: if self._polling_task: self._polling_task.cancel() self._polling_task = None async def _get_room(self) -> RoomID: if self._room is not None and not self._update_room: return self._room room = self.config['poll']['room'] if room.startswith('#'): if 'resolve_room_alias' in dir(self.client): self._room = (await self.client.resolve_room_alias(RoomAlias(room))).room_id else: self._room = (await self.client.get_room_alias(RoomAlias(room))).room_id else: self._room = RoomID(room) self._update_room = False return self._room async def _cached_ultimaker_api(self, http: aiohttp.ClientSession, url: str, cache_key: str, diff_key: Callable) -> Tuple[object, bool]: try: url = os.path.join(self.config["ultimaker"]["api"], url) async with http.get(url) as resp: if resp.status != 200: raise RuntimeError() rdata = await resp.text() now = json.loads(rdata) except BaseException: return None, False with open(self.config['poll'][cache_key], 'a+') as cache: cache.seek(0) try: pdata = cache.read() previous = json.loads(pdata) except BaseException as e: self.log.exception(e) previous = json.loads(rdata) cache.seek(0) cache.truncate() cache.write(rdata) changed = diff_key(now) != diff_key(previous) return now, changed async def update_ultimaker_state(self, http: aiohttp.ClientSession) -> Tuple[object, bool]: return await self._cached_ultimaker_api(http, 'api/v1/printer', 'state_cache', lambda x: x['status']) async def update_ultimaker_job(self, http: aiohttp.ClientSession) -> Tuple[object, bool]: return await self._cached_ultimaker_api(http, 'api/v1/print_job', 'job_cache', lambda x: x['state']) async def send_ultimaker_state(self, state: object, job: object, requester: MessageEvent = None) -> None: if requester is not None: room = requester.room_id else: room = await self._get_room() if state is not None: fmt = { 'status': state['status'] } if state['status'] == 'printing': fmt.update({ 'job_name': job['name'], 'job_progress': str(int(job['progress'] * 100)), }) state_msg = self.config['state_messages'].get(state['status']) else: state_msg = self.config['messages'].get('_unreachable') fmt = {} if state_msg is not None: body = state_msg.format(**fmt) msg = TextMessageEventContent(msgtype=MessageType.TEXT, body=body) if requester is not None: await requester.reply(msg) else: await self.client.send_message(room, msg) async def send_ultimaker_job_report(self, job: object, requester: MessageEvent = None) -> None: if requester is not None: room = requester.room_id else: room = await self._get_room() if job is not None: fmt = { 'job': job['name'], 'state': job['state'], 'duration': timedelta(seconds=job['time_elapsed']) } job_msg = self.config['job_messages'].get(job['state']) else: job__msg = self.config['job_messages'].get('_error') fmt = {} if state_msg is not None: body = job_msg.format(**fmt) msg = TextMessageEventContent(msgtype=MessageType.TEXT, body=body) if requester is not None: await requester.reply(msg) else: await self.client.send_message(room, msg) async def is_room_encrypted(self, room: RoomID) -> bool: try: evt: RoomEncryptionStateEventContent = \ await self.client.get_state_event(room, EventType.ROOM_ENCRYPTION) except MNotFound: return False return True async def send_optionally_encrypted_image(self, info: ImageInfo, image: bytes, requester: MessageEvent): if requester is not None: room = requester.room_id else: room = await self._get_room() mimetype = info.mimetype content = MediaMessageEventContent( msgtype=MessageType.IMAGE, body='uploaded an image', info=info ) if await self.is_room_encrypted(room): image, content.file = encrypt_attachment(image) mimetype = 'application/octet-stream' content.url = await self.client.upload_media(image, mimetype) # Update content structure for encrypted data if content.file: content.file.url = content.url content.url = None if requester is not None: await requester.reply(content) else: await self.client.send_message(room, content) async def send_ultimaker_photo(self, http: aiohttp.ClientSession, requester: MessageEvent = None) -> None: if self.config['ultimaker']['camera_id'] is None: body = self.config['messages'].get('_camera_disabled') if requester and body: msg = TextMessageEventContent(msgtype=MessageType.TEXT, body=body) await requester.reply(msg) return url = os.path.join( self.config["ultimaker"]["api"], 'api/v1/camera', str(self.config["ultimaker"]["camera_id"]), 'snapshot' ) try: async with http.get(url) as resp: if resp.status != 200: raise RuntimeError() image = await resp.read() content_type = resp.headers['Content-Type'] except: body = self.config['messages'].get('_unreachable') if requester and body: msg = TextMessageEventContent(msgtype=MessageType.TEXT, body=body) await requester.reply(msg) return info = ImageInfo( mimetype=content_type, size=len(image), height=self.config['ultimaker'].get('camera_height', 480), width=self.config['ultimaker'].get('camera_width', 640) ) await self.send_optionally_encrypted_image(info, image, requester) @command.new(name=get_command_prefix) async def cmd(self, evt: MessageEvent) -> None: await self.state(evt) await self.photo(evt) @cmd.subcommand(help='Get the printer state') async def state(self, evt: MessageEvent) -> None: async with aiohttp.ClientSession(read_timeout=15, conn_timeout=5) as http: state, _ = await self.update_ultimaker_state(http) job, _ = await self.update_ultimaker_job(http) await self.send_ultimaker_state(state, job, evt) @cmd.subcommand(help='Get a photo from the printer\'s camera') async def photo(self, evt: MessageEvent) -> None: async with aiohttp.ClientSession(read_timeout=15, conn_timeout=5) as http: await self.send_ultimaker_photo(http, evt) async def poll(self) -> None: async with aiohttp.ClientSession(read_timeout=15, conn_timeout=5) as http: while True: await asyncio.sleep(self.config['poll'].get('interval', 60)) self.log.debug('Polling ultimaker print job state') try: job, changed = await self.update_ultimaker_job(http) if job and changed: await self.send_ultimaker_job_report(job) await self.send_ultimaker_photo(http) except BaseException as e: self.log.exception(e)