import gc import json import math import network import os import socket import time from st3m.application import Application, ApplicationContext from umqtt.robust import MQTTClient with open(os.path.join(os.path.dirname(__file__), 'config.json'), 'r') as f: CONFIG = json.load(f) class Flow3rOpenhabMqtt(Application): def __init__(self, app_ctx: ApplicationContext) -> None: super().__init__(app_ctx) self.state = 0 self.nic = network.WLAN(network.STA_IF) client_id = CONFIG['mqtt_client_id'].format(self.nic.config('mac').hex()) self.base_topic = CONFIG['mqtt_topic_base'].format(self.nic.config('mac').hex()).encode() self.mqtt = MQTTClient(client_id, CONFIG['mqtt_broker_ip']) self.mqtt.set_callback(self.mqtt_cb) self.touch_time = [None]*10 self.touch_begin = [None]*10 self.touch_last = [None]*10 self.touch_base = [False]*10 self.touch_tip = [False]*10 self.touch_log = [[]]*10 self.button_app = None self.ui_config = {} def mqtt_cb(self, topic, msg): print(topic, msg) if topic == self.base_topic + b'ui_config': try: print(msg) self.ui_config = json.loads(msg.decode()) except Exception as e: print(e) def draw(self, ctx: Context) -> None: if self.state == 0: return ctx.line_width = 18 ctx.line_join = ctx.BEVEL ctx.rgb(0, 0, 0).rectangle(-120, -120, 240, 240).fill() ctx.rgb(0.3, 0.3, 0.3).arc(0, 0, 90, 0, 2.39, 0).arc(0, 0, 90, 2.79, 6.29, 0).stroke() ctx.rgb(0.9, 0.3, 0.1).move_to(-96, 53).line_to(-1, -43).line_to(71, 29).stroke() ctx.text_align = ctx.CENTER ctx.text_baseline = ctx.MIDDLE ctx.font_size = 20 for i, text in enumerate(self.ui_config.get('labels', [])): phi = (2*math.pi/10)*(i+1)-(math.pi/2) radius = 50 if i%2 else 80 x = radius*math.cos(phi) y = radius*math.sin(phi) ctx.rgb(1, 1, 1).move_to(x, y).text(text) mode = self.ui_config.get('mode') if mode: ctx.font_size = 50 ctx.rgb(1, 1, 1).move_to(0, 0).text(mode) if CONFIG.get('debug_touch', False): ctx.line_width =1 for i in range(10): if len(self.touch_log[i]) > 0: ctx.rgb(1, 0, 0).move_to(int(self.touch_log[i][0][1]*0.0024), -int(self.touch_log[i][0][0]*0.0024)) for j in range(1, len(self.touch_log[i])): ctx.line_to(int(self.touch_log[i][j][1]*0.0024), -int(self.touch_log[i][j][0]*0.0024)) ctx.stroke() def do_connect(self): self.nic.active(True) if not self.nic.isconnected(): print('connecting to network...') self.nic.connect(CONFIG['wifi_ssid'], CONFIG['wifi_psk']) while not self.nic.isconnected(): pass print('network config:', self.nic.ifconfig()) def think(self, ins: InputState, delta_ms: int) -> None: if self.nic.status() not in [network.STAT_CONNECTING, network.STAT_GOT_IP]: self.do_connect() if self.nic.status() == network.STAT_GOT_IP and self.state == 0: gc.collect() self.mqtt.set_last_will(self.base_topic + 'status', 'Offline', retain=True) self.mqtt.connect() self.mqtt.subscribe(self.base_topic + 'ui_config') self.mqtt.publish(self.base_topic + 'status', 'Online', retain=True) self.state = 1 if self.state == 1: self.mqtt.check_msg() if ins.buttons.app != ins.buttons.NOT_PRESSED: now = time.ticks_ms() btn = None if ins.buttons.app == ins.buttons.PRESSED_LEFT: btn = 'left' elif ins.buttons.app == ins.buttons.PRESSED_RIGHT: btn = 'right' elif ins.buttons.app == ins.buttons.PRESSED_DOWN: btn = 'down' if self.button_app is None: self.button_app = now msg = b'{{"mode":"{}","type":"button","button":"{}","repeat":false}}'\ .format(self.ui_config.get('mode'), btn) self.mqtt.publish(self.base_topic + 'event', msg) elif now - self.button_app > 500: msg = b'{{"mode":"{}","type":"button","button":"{}","repeat":true}}'\ .format(self.ui_config.get('mode'), btn) self.mqtt.publish(self.base_topic + 'event', msg) self.button_app = now else: self.button_app = None for i in range(10): if ins.captouch.petals[i].pressed: # Ignore the first few ms of touch events, as position data appears to be unrealiable at first if self.touch_time[i] is None: self.touch_time[i] = time.ticks_ms() elif time.ticks_ms() - self.touch_time[i] > 25: if self.touch_begin[i] is None: self.touch_begin[i] = ins.captouch.petals[i].position self.touch_last[i] = ins.captouch.petals[i].position if CONFIG.get('debug_touch', False): self.touch_log[i].append(ins.captouch.petals[i].position) self.touch_base[i] = ins.captouch.petals[i].pads.base if hasattr(ins.captouch.petals[i].pads, 'tip'): self.touch_tip[i] = ins.captouch.petals[i].pads.tip else: self.touch_tip[i] = ins.captouch.petals[i].pads.cw or ins.captouch.petals[i].pads.ccw else: if self.touch_begin[i] is not None: duration = time.ticks_ms() - self.touch_time[i] d_x = self.touch_last[i][0] - self.touch_begin[i][0] d_y = self.touch_last[i][1] - self.touch_begin[i][1] event = None if abs(d_x) < 15000 and abs(d_y) < 15000: if self.touch_base[i]: event = 'touch_base' elif self.touch_tip[i]: event = 'touch_tip' elif abs(d_x) > abs(d_y): if d_x > 0: event = 'swipe_up' else: event = 'swipe_down' else: if d_y > 0: event = 'swipe_right' else: event = 'swipe_left' if event is not None: human_petal = 10-(-i%10) msg = b'{{"mode":"{}","type":"touch","petal":"{}","event":"{}","dx":{},"dy":{},"duration":{}}}'\ .format(self.ui_config.get('mode'), human_petal, event, d_x, d_y, duration) self.mqtt.publish(self.base_topic + 'event', msg) self.touch_time[i] = None self.touch_begin[i] = None self.touch_last[i] = None self.touch_base[i] = False self.touch_tip[i] = False self.touch_log[i].clear() gc.collect() if __name__ == "__main__": import st3m.run st3m.run.run_view(Flow3rOpenhabMqtt(ApplicationContext()))