barcode-utils/barcode-websocket-server
s3lph db1ce3226c
Some checks failed
/ build_debian (push) Failing after 10s
feat: add debian package
2024-11-29 01:23:35 +01:00

182 lines
7.2 KiB
Python
Executable file

#!/usr/bin/env python3
import argparse
import select
import os
import socket
import struct
import sys
import time
import webbrowser
import queue
from threading import Thread, Event
from websockets.sync.server import serve
import hid
HID_USAGE_PAGE_BARCODE_SCANNER = 0x8C
HID_USAGE_BARCODE_SCANNER = 0x02
HID_REPORT_SCANNED_DATA = 0x12
HID_REPORT_SCANNED_DATA_MAXLENGTH = 0xFF
# struct input_event data to move the mouse
# the timeval header uses native types, the event data uses platform-independent types
# struct timeval { tv_sec = 0, tv_usec = 0 }
evhdr = struct.pack('@LL', 0, 0)
# struct input_event { time = evhdr, type = EV_REL(2), code = REL_X(0), value = 1/-1 }
moveleft = evhdr + struct.pack('=HHi', 2, 0, -1)
moveright = evhdr + struct.pack('=HHi', 2, 0, 1)
# struct input_event { time = evhdr, type = EV_SYN(0), code = 0, value = 0 }
evsyn = evhdr + struct.pack('=HHi', 0, 0, 0)
class BarcodeServer:
def __init__(self, ns):
self.vid = int(ns.vid, 16) if ns.vid else None
self.pid = int(ns.pid, 16) if ns.pid else None
self.url = ns.url
self.aim = ns.aim
self.chars = ns.chars
self.q = queue.Queue()
self.e = Event()
self.shutdown = Event()
self.sock = None
if ns.ydotool_socket:
self.ydotool_socket = ns.ydotool_socket
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
self.wshost = ns.websocket_host
self.wsport = ns.websocket_port
def start(self):
self.t_ws = Thread(target=self.serve, args=[])
self.t_ws.start()
self.t_bc = Thread(target=self.loop, args=[])
self.t_bc.start()
try:
self.t_ws.join()
self.t_bc.join()
except KeyboardInterrupt:
print('Shutting down...')
self.shutdown.set()
self.t_ws.join()
self.t_bc.join()
def ws_handler(self, websocket):
a = websocket.remote_address
if self.e.is_set():
return
self.e.set()
print(f'websocket connected: {a}')
try:
while not self.shutdown.is_set():
try:
code = self.q.get(timeout=1)
print(f'sending code {code} via ws {a}')
try:
websocket.send(code)
self.q.task_done()
except:
self.q.put(code)
finally:
websocket.close()
break
except queue.Empty:
websocket.ping()
finally:
self.e.clear()
print(f'websocket {a} disconnected')
def serve(self):
with serve(self.ws_handler, self.wshost, self.wsport) as server:
server.serve_forever()
def loop(self):
while not self.shutdown.is_set():
try:
h = hid.device()
if self.vid and self.pid:
h.open(self.vid, self.pid)
else:
# Auto-detect barcode scanners
devs = [x for x in hid.enumerate()
if x['usage_page'] == HID_USAGE_PAGE_BARCODE_SCANNER
and x['usage'] == HID_USAGE_BARCODE_SCANNER]
if not devs:
raise RuntimeError('No USB HID POS barcode scanner found')
h.open(devs[0]['vendor_id'], devs[0]['product_id'])
print(f'Device manufacturer: {h.get_manufacturer_string()}')
print(f'Product: {h.get_product_string()}')
print(f'Serial Number: {h.get_serial_number_string()}')
while not self.shutdown.is_set():
# Line from Honeywell HID POS scanner consists of:
# - 0x02 = HID Report ID "Scanned Data Report"
# - 1 byte payload length (without AIM, so 13 for EAN-13)
# - 3 bytes AIM, e.g. "]E0" for EAN-13
# - payload bytes
# - Rest is filled with zeros
hidmsg = h.read(255, 1000)
try:
flag = hidmsg.index(ord(']'))
except ValueError:
continue
if len(hidmsg) < flag+3:
continue
try:
end = hidmsg[flag:].index(0)
end += flag
except ValueError:
continue
aim = bytes(hidmsg[flag:flag+3]).decode()
data = bytes(hidmsg[flag+3:end]).decode().strip()
print(aim, hidmsg[flag+3:end], data)
if not aim.startswith(self.aim):
print('aim mismatch')
continue
if any(filter(lambda c: c not in self.chars, data)):
print('chars mismatch')
continue
url = self.url.format(data)
if self.sock:
try:
print(f'Moving mouse via ydotoold')
self.sock.sendto(moveleft, self.ydotool_socket)
self.sock.sendto(evsyn, self.ydotool_socket)
self.sock.sendto(moveright, self.ydotool_socket)
self.sock.sendto(evsyn, self.ydotool_socket)
except Exception as e:
print(e)
if not self.e.is_set():
with self.q.mutex:
print('clearing queue')
self.q.queue.clear()
print(f'Opening URL {url}')
webbrowser.open(url, new=2)
self.q.put(data)
print(f'enqued {data}')
except Exception as e:
print(e)
time.sleep(5)
def main():
ap = argparse.ArgumentParser(sys.argv[0], description='Opens a website when a barcode is scanned with an USB-HID barcode scanner.')
ap.add_argument('--url', '-u', type=str, help='The URL to open. "{}" is replaced by the barcode data. Default: http://localhost/?ean={}', default='http://localhost/?ean={}')
ap.add_argument('--vid', '-v', type=str, help='Connect to device using this USB vendor ID.')
ap.add_argument('--pid', '-p', type=str, help='Connect to device using this USB product ID.')
ap.add_argument('--aim', '-A', type=str, help='AIM filter prefix, e.g. "]E0" for EAN13, "]E" for all EAN variants, or "]" for all barcodes. Default: "]"', default=']')
ap.add_argument('--chars', '-C', type=str, help='Permitted characters. Defaults to "0123456789"', default='0123456789')
ap.add_argument('--ydotool-socket', '-Y', type=str, help='If provided, the ydotool socket to wake the screen.')
ap.add_argument('--websocket-host', '-w', type=str, help='Websocket host to listen on. Defaults to "localhost"', default='localhost')
ap.add_argument('--websocket-port', '-W', type=int, help='Websocket port to listen on. Defaults to 47808', default=47808)
ns = ap.parse_args()
BarcodeServer(ns).start()
if __name__ == '__main__':
main()