#!/usr/bin/env python3 import argparse import select import os import socket import struct import sys import time import webbrowser import queue from threading import Thread, Event from websockets.sync.server import serve import hid # struct input_event data to move the mouse # the timeval header uses native types, the event data uses platform-independent types # struct timeval { tv_sec = 0, tv_usec = 0 } evhdr = struct.pack('@LL', 0, 0) # struct input_event { time = evhdr, type = EV_REL(2), code = REL_X(0), value = 1/-1 } moveleft = evhdr + struct.pack('=HHi', 2, 0, -1) moveright = evhdr + struct.pack('=HHi', 2, 0, 1) # struct input_event { time = evhdr, type = EV_SYN(0), code = 0, value = 0 } evsyn = evhdr + struct.pack('=HHi', 0, 0, 0) class BarcodeServer: def __init__(self, ns): self.vid = int(ns.vid, 16) self.pid = int(ns.pid, 16) self.url = ns.url self.aim = ns.aim self.chars = ns.chars self.q = queue.Queue() self.e = Event() self.shutdown = Event() self.sock = None if ns.ydotool_socket: self.ydotool_socket = ns.ydotool_socket self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) self.wshost = ns.websocket_host self.wsport = ns.websocket_port def start(self): self.t_ws = Thread(target=self.serve, args=[]) self.t_ws.start() self.t_bc = Thread(target=self.loop, args=[]) self.t_bc.start() try: self.t_ws.join() self.t_bc.join() except KeyboardInterrupt: print('Shutting down...') self.shutdown.set() self.t_ws.join() self.t_bc.join() def ws_handler(self, websocket): a = websocket.remote_address if self.e.is_set(): return self.e.set() print(f'websocket connected: {a}') try: while not self.shutdown.is_set(): try: code = self.q.get(timeout=1) print(f'sending code {code} via ws {a}') try: websocket.send(code) self.q.task_done() except: self.q.put(code) finally: websocket.close() break except queue.Empty: websocket.ping() finally: self.e.clear() print(f'websocket {a} disconnected') def serve(self): with serve(self.ws_handler, self.wshost, self.wsport) as server: server.serve_forever() def loop(self): while not self.shutdown.is_set(): try: h = hid.device() h.open(self.vid, self.pid) print(f'Device manufacturer: {h.get_manufacturer_string()}') print(f'Product: {h.get_product_string()}') print(f'Serial Number: {h.get_serial_number_string()}') while not self.shutdown.is_set(): # Line from Honeywell scanner consists of: # - 0x02 (magic?) # - 1 byte payload length (without AIM, so 13 for EAN-13) # - 3 bytes AIM, e.g. "]E0" for EAN-13 # - payload bytes # - Rest is filled with zeros hidmsg = h.read(255, 1000) try: flag = hidmsg.index(ord(']')) except ValueError: continue if len(hidmsg) < flag+3: continue try: end = hidmsg[flag:].index(0) end += flag except ValueError: continue aim = bytes(hidmsg[flag:flag+3]).decode() data = bytes(hidmsg[flag+3:end]).decode().strip() print(aim, hidmsg[flag+3:end], data) if not aim.startswith(self.aim): print('aim mismatch') continue if any(filter(lambda c: c not in self.chars, data)): print('chars mismatch') continue url = self.url.format(data) if self.sock: try: print(f'Moving mouse via ydotoold') self.sock.sendto(moveleft, self.ydotool_socket) self.sock.sendto(evsyn, self.ydotool_socket) self.sock.sendto(moveright, self.ydotool_socket) self.sock.sendto(evsyn, self.ydotool_socket) except Exception as e: print(e) if not self.e.is_set(): with self.q.mutex: print('clearing queue') self.q.queue.clear() print(f'Opening URL {url}') webbrowser.open(url, new=2) self.q.put(data) print(f'enqued {data}') except Exception as e: print(e) time.sleep(5) def main(): ap = argparse.ArgumentParser(sys.argv[0], description='Opens a website when a barcode is scanned with an USB-HID barcode scanner.') ap.add_argument('--url', '-u', type=str, help='The URL to open. "{}" is replaced by the barcode data. Default: http://localhost/?ean={}', default='http://localhost/?ean={}') ap.add_argument('--vid', '-v', type=str, help='The USB vendor ID. Default: 0c2e', default='0c2e') ap.add_argument('--pid', '-p', type=str, help='The USB product ID. Default: 0b07', default='0b07') ap.add_argument('--aim', '-A', type=str, help='AIM filter prefix, e.g. "]E0" for EAN13, "]E" for all EAN variants, or "]" for all barcodes. Default: "]E"', default=']E') ap.add_argument('--chars', '-C', type=str, help='Permitted characters. Defaults to "0123456789"', default='0123456789') ap.add_argument('--ydotool-socket', '-Y', type=str, help='If provided, the ydotool socket to wake the screen.') ap.add_argument('--websocket-host', '-w', type=str, help='Websocket host to listen on. Defaults to "localhost"', default='localhost') ap.add_argument('--websocket-port', '-W', type=int, help='Websocket port to listen on. Defaults to 47808', default=47808) ns = ap.parse_args() BarcodeServer(ns).start() if __name__ == '__main__': main()