maubot-plugin-ultimaker/me.s3lph.ultimaker/ultimaker.py
2022-01-16 20:24:51 +01:00

246 lines
9.6 KiB
Python

from typing import Callable, Tuple, Type
import json
import os.path
import asyncio
import aiohttp
from datetime import timedelta
from maubot import Plugin
from maubot.handlers import command
from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper
from mautrix.crypto.attachments import encrypt_attachment
from mautrix.errors.request import MNotFound
from mautrix.types import MessageEvent, TextMessageEventContent, MediaMessageEventContent, ImageInfo, RoomID, RoomAlias, MessageType, EventType, RoomEncryptionStateEventContent
class Config(BaseProxyConfig):
def do_update(self, helper: ConfigUpdateHelper) -> None:
helper.copy('ultimaker')
helper.copy('poll')
helper.copy('messages')
helper.copy('state_messages')
helper.copy('job_messages')
helper.copy('command_prefix')
class UltimakerBot(Plugin):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._room: RoomID = None
self._update_room = True
self._polling_task = None
def get_command_prefix(self) -> str:
return self.config.get('command_prefix', '3dp')
@classmethod
def get_config_class(cls) -> Type[BaseProxyConfig]:
return Config
def on_external_config_update(self) -> None:
self.config.load_and_update()
self._update_room = True
async def start(self) -> None:
self.on_external_config_update()
await self._get_room()
if self._polling_task:
self._polling_task.cancel()
self._polling_task = asyncio.create_task(self.poll())
async def stop(self) -> None:
if self._polling_task:
self._polling_task.cancel()
self._polling_task = None
async def _get_room(self) -> RoomID:
if self._room is not None and not self._update_room:
return self._room
room = self.config['poll']['room']
if room.startswith('#'):
if 'resolve_room_alias' in dir(self.client):
self._room = (await self.client.resolve_room_alias(RoomAlias(room))).room_id
else:
self._room = (await self.client.get_room_alias(RoomAlias(room))).room_id
else:
self._room = RoomID(room)
self._update_room = False
return self._room
async def _cached_ultimaker_api(self,
http: aiohttp.ClientSession,
url: str, cache_key: str,
diff_key: Callable) -> Tuple[object, bool]:
try:
url = os.path.join(self.config["ultimaker"]["api"], url)
async with http.get(url) as resp:
if resp.status != 200:
raise RuntimeError()
rdata = await resp.text()
now = json.loads(rdata)
except BaseException:
return None, False
with open(self.config['poll'][cache_key], 'a+') as cache:
cache.seek(0)
try:
pdata = cache.read()
previous = json.loads(pdata)
except BaseException as e:
self.log.exception(e)
previous = json.loads(rdata)
cache.seek(0)
cache.truncate()
cache.write(rdata)
changed = diff_key(now) != diff_key(previous)
return now, changed
async def update_ultimaker_state(self, http: aiohttp.ClientSession) -> Tuple[object, bool]:
return await self._cached_ultimaker_api(http, 'api/v1/printer', 'state_cache', lambda x: x['status'])
async def update_ultimaker_job(self, http: aiohttp.ClientSession) -> Tuple[object, bool]:
return await self._cached_ultimaker_api(http, 'api/v1/print_job', 'job_cache', lambda x: x['state'])
async def send_ultimaker_state(self, state: object, job: object, requester: MessageEvent = None) -> None:
if requester is not None:
room = requester.room_id
else:
room = await self._get_room()
if state is not None:
fmt = {
'status': state['status']
}
if state['status'] == 'printing':
fmt.update({
'job_name': job['name'],
'job_progress': str(int(job['progress'] * 100)),
})
state_msg = self.config['state_messages'].get(state['status'])
else:
state_msg = self.config['messages'].get('_unreachable')
fmt = {}
if state_msg is not None:
body = state_msg.format(**fmt)
msg = TextMessageEventContent(msgtype=MessageType.TEXT, body=body)
if requester is not None:
await requester.reply(msg)
else:
await self.client.send_message(room, msg)
async def send_ultimaker_job_report(self, job: object, requester: MessageEvent = None) -> None:
if requester is not None:
room = requester.room_id
else:
room = await self._get_room()
if job is not None:
fmt = {
'job': job['name'],
'state': job['state'],
'duration': timedelta(seconds=job['time_elapsed'])
}
job_msg = self.config['job_messages'].get(job['state'])
else:
job__msg = self.config['job_messages'].get('_error')
fmt = {}
if state_msg is not None:
body = job_msg.format(**fmt)
msg = TextMessageEventContent(msgtype=MessageType.TEXT, body=body)
if requester is not None:
await requester.reply(msg)
else:
await self.client.send_message(room, msg)
async def is_room_encrypted(self, room: RoomID) -> bool:
try:
evt: RoomEncryptionStateEventContent = \
await self.client.get_state_event(room, EventType.ROOM_ENCRYPTION)
except MNotFound:
return False
return True
async def send_optionally_encrypted_image(self, info: ImageInfo, image: bytes, requester: MessageEvent):
if requester is not None:
room = requester.room_id
else:
room = await self._get_room()
mimetype = info.mimetype
content = MediaMessageEventContent(
msgtype=MessageType.IMAGE,
body='uploaded an image',
info=info
)
if await self.is_room_encrypted(room):
image, content.file = encrypt_attachment(image)
mimetype = 'application/octet-stream'
content.url = await self.client.upload_media(image, mimetype)
# Update content structure for encrypted data
if content.file:
content.file.url = content.url
content.url = None
if requester is not None:
await requester.reply(content)
else:
await self.client.send_message(room, content)
async def send_ultimaker_photo(self, http: aiohttp.ClientSession, requester: MessageEvent = None) -> None:
if self.config['ultimaker']['camera_id'] is None:
body = self.config['messages'].get('_camera_disabled')
if requester and body:
msg = TextMessageEventContent(msgtype=MessageType.TEXT, body=body)
await requester.reply(msg)
return
url = os.path.join(
self.config["ultimaker"]["api"], 'api/v1/camera', str(self.config["ultimaker"]["camera_id"]), 'snapshot'
)
try:
async with http.get(url) as resp:
if resp.status != 200:
raise RuntimeError()
image = await resp.read()
content_type = resp.headers['Content-Type']
except:
body = self.config['messages'].get('_unreachable')
if requester and body:
msg = TextMessageEventContent(msgtype=MessageType.TEXT, body=body)
await requester.reply(msg)
return
info = ImageInfo(
mimetype=content_type,
size=len(image),
height=self.config['ultimaker'].get('camera_height', 480),
width=self.config['ultimaker'].get('camera_width', 640)
)
await self.send_optionally_encrypted_image(info, image, requester)
@command.new(name=get_command_prefix)
async def cmd(self, evt: MessageEvent) -> None:
await self.state(evt)
await self.photo(evt)
@cmd.subcommand(help='Get the printer state')
async def state(self, evt: MessageEvent) -> None:
async with aiohttp.ClientSession(read_timeout=15, conn_timeout=5) as http:
state, _ = await self.update_ultimaker_state(http)
job, _ = await self.update_ultimaker_job(http)
await self.send_ultimaker_state(state, job, evt)
@cmd.subcommand(help='Get a photo from the printer\'s camera')
async def photo(self, evt: MessageEvent) -> None:
async with aiohttp.ClientSession(read_timeout=15, conn_timeout=5) as http:
await self.send_ultimaker_photo(http, evt)
async def poll(self) -> None:
async with aiohttp.ClientSession(read_timeout=15, conn_timeout=5) as http:
while True:
await asyncio.sleep(self.config['poll'].get('interval', 60))
self.log.debug('Polling ultimaker print job state')
try:
job, changed = await self.update_ultimaker_job(http)
if job and changed:
await self.send_ultimaker_job_report(job)
await self.send_ultimaker_photo(http)
except BaseException as e:
self.log.exception(e)