flow3r-openhab/flow3r_openhab/__init__.py

203 lines
9.1 KiB
Python

import gc
import json
import math
import network
import os
import socket
import time
from st3m.application import Application, ApplicationContext
from umqtt.robust import MQTTClient
with open(os.path.join(os.path.dirname(__file__), 'config.json'), 'r') as f:
CONFIG = json.load(f)
class Flow3rOpenhabMqtt(Application):
def __init__(self, app_ctx: ApplicationContext) -> None:
super().__init__(app_ctx)
self.state = 0
self.nic = network.WLAN(network.STA_IF)
client_id = CONFIG['mqtt_client_id'].format(self.nic.config('mac').hex())
self.base_topic = CONFIG['mqtt_topic_base'].format(self.nic.config('mac').hex()).encode()
self.mqtt = MQTTClient(client_id, CONFIG['mqtt_broker_ip'])
self.mqtt.set_callback(self.mqtt_cb)
self.touch_time = [None]*10
self.touch_begin = [None]*10
self.touch_last = [None]*10
self.touch_base = [False]*10
self.touch_tip = [False]*10
self.touch_log = [[]]*10
self.button_app = None
self.button_app_repeat = 0
self.button_os = None
self.button_os_repeat = 0
self.ui_config = {}
def mqtt_cb(self, topic, msg):
print(topic, msg)
if topic == self.base_topic + b'ui_config':
try:
print(msg)
self.ui_config = json.loads(msg.decode())
except Exception as e:
print(e)
def draw(self, ctx: Context) -> None:
if self.state == 0:
return
ctx.line_width = 18
ctx.line_join = ctx.BEVEL
ctx.rgb(0, 0, 0).rectangle(-120, -120, 240, 240).fill()
ctx.rgb(0.3, 0.3, 0.3).arc(0, 0, 90, 0, 2.39, 0).arc(0, 0, 90, 2.79, 6.29, 0).stroke()
ctx.rgb(0.9, 0.3, 0.1).move_to(-96, 53).line_to(-1, -43).line_to(71, 29).stroke()
ctx.text_align = ctx.CENTER
ctx.text_baseline = ctx.MIDDLE
ctx.font_size = 20
for i, text in enumerate(self.ui_config.get('labels', [])):
phi = (2*math.pi/10)*(i+1)-(math.pi/2)
radius = 50 if i%2 else 80
x = radius*math.cos(phi)
y = radius*math.sin(phi)
ctx.rgb(1, 1, 1).move_to(x, y).text(text)
mode = self.ui_config.get('mode')
if mode:
ctx.font_size = 50
ctx.rgb(1, 1, 1).move_to(0, 0).text(mode)
if CONFIG.get('debug_touch', False):
ctx.line_width =1
for i in range(10):
if len(self.touch_log[i]) > 0:
ctx.rgb(1, 0, 0).move_to(int(self.touch_log[i][0][1]*0.0024), -int(self.touch_log[i][0][0]*0.0024))
for j in range(1, len(self.touch_log[i])):
ctx.line_to(int(self.touch_log[i][j][1]*0.0024), -int(self.touch_log[i][j][0]*0.0024))
ctx.stroke()
def do_connect(self):
self.nic.active(True)
if not self.nic.isconnected():
print('connecting to network...')
self.nic.connect(CONFIG['wifi_ssid'], CONFIG['wifi_psk'])
while not self.nic.isconnected():
pass
print('network config:', self.nic.ifconfig())
def think(self, ins: InputState, delta_ms: int) -> None:
if self.nic.status() not in [network.STAT_CONNECTING, network.STAT_GOT_IP]:
self.do_connect()
if self.nic.status() == network.STAT_GOT_IP and self.state == 0:
gc.collect()
self.mqtt.set_last_will(self.base_topic + 'status', 'Offline', retain=True)
self.mqtt.connect()
self.mqtt.subscribe(self.base_topic + 'ui_config')
self.mqtt.publish(self.base_topic + 'status', 'Online', retain=True)
self.state = 1
if self.state == 1:
self.mqtt.check_msg()
if ins.buttons.app != ins.buttons.NOT_PRESSED:
now = time.ticks_ms()
btn = None
if ins.buttons.app == ins.buttons.PRESSED_LEFT:
btn = 'app_left'
elif ins.buttons.app == ins.buttons.PRESSED_RIGHT:
btn = 'app_right'
elif ins.buttons.app == ins.buttons.PRESSED_DOWN:
btn = 'app_down'
if self.button_app is None:
self.button_app = now
msg = b'{{"mode":"{}","type":"button","button":"{}","repeat":0}}'\
.format(self.ui_config.get('mode'), btn)
self.mqtt.publish(self.base_topic + 'event', msg)
elif now - self.button_app > 500:
self.button_app_repeat += 1
msg = b'{{"mode":"{}","type":"button","button":"{}","repeat":{}}}'\
.format(self.ui_config.get('mode'), btn, self.button_app_repeat)
self.mqtt.publish(self.base_topic + 'event', msg)
self.button_app = now
else:
self.button_app = None
self.button_app_repeat = 0
if ins.buttons.os != ins.buttons.NOT_PRESSED:
now = time.ticks_ms()
btn = None
if ins.buttons.os == ins.buttons.PRESSED_LEFT:
btn = 'os_left'
elif ins.buttons.os == ins.buttons.PRESSED_RIGHT:
btn = 'os_right'
elif ins.buttons.os == ins.buttons.PRESSED_DOWN:
btn = 'os_down'
if self.button_os is None:
self.button_os = now
msg = b'{{"mode":"{}","type":"button","button":"{}","repeat":0}}'\
.format(self.ui_config.get('mode'), btn)
self.mqtt.publish(self.base_topic + 'event', msg)
elif now - self.button_os > 500:
self.button_os_repeat += 1
msg = b'{{"mode":"{}","type":"button","button":"{}","repeat":{}}}'\
.format(self.ui_config.get('mode'), btn, self.button_os_repeat)
self.mqtt.publish(self.base_topic + 'event', msg)
self.button_os = now
else:
self.button_os = None
self.button_os_repeat = 0
for i in range(10):
if ins.captouch.petals[i].pressed:
# Ignore the first few ms of touch events, as position data appears to be unrealiable at first
if self.touch_time[i] is None:
self.touch_time[i] = time.ticks_ms()
elif time.ticks_ms() - self.touch_time[i] > 25:
if self.touch_begin[i] is None:
self.touch_begin[i] = ins.captouch.petals[i].position
self.touch_last[i] = ins.captouch.petals[i].position
if CONFIG.get('debug_touch', False):
self.touch_log[i].append(ins.captouch.petals[i].position)
self.touch_base[i] = ins.captouch.petals[i].pads.base
if hasattr(ins.captouch.petals[i].pads, 'tip'):
self.touch_tip[i] = ins.captouch.petals[i].pads.tip
else:
self.touch_tip[i] = ins.captouch.petals[i].pads.cw or ins.captouch.petals[i].pads.ccw
else:
if self.touch_begin[i] is not None:
duration = time.ticks_ms() - self.touch_time[i]
d_x = self.touch_last[i][0] - self.touch_begin[i][0]
d_y = self.touch_last[i][1] - self.touch_begin[i][1]
event = None
if abs(d_x) < 15000 and abs(d_y) < 15000:
if self.touch_base[i]:
event = 'touch_base'
elif self.touch_tip[i]:
event = 'touch_tip'
elif abs(d_x) > abs(d_y):
if d_x > 0:
event = 'swipe_up'
else:
event = 'swipe_down'
else:
if d_y > 0:
event = 'swipe_right'
else:
event = 'swipe_left'
if event is not None:
human_petal = 10-(-i%10)
msg = b'{{"mode":"{}","type":"touch","petal":"{}","event":"{}","dx":{},"dy":{},"duration":{}}}'\
.format(self.ui_config.get('mode'), human_petal, event, d_x, d_y, duration)
self.mqtt.publish(self.base_topic + 'event', msg)
self.touch_time[i] = None
self.touch_begin[i] = None
self.touch_last[i] = None
self.touch_base[i] = False
self.touch_tip[i] = False
self.touch_log[i].clear()
gc.collect()
if __name__ == "__main__":
import st3m.run
st3m.run.run_view(Flow3rOpenhabMqtt(ApplicationContext()))