#!/usr/bin/env python3 import argparse import select import os import socket import struct import sys import time import webbrowser import queue from threading import Thread, Event from websockets.sync.server import serve import hid HID_USAGE_PAGE_BARCODE_SCANNER = 0x8C HID_USAGE_BARCODE_SCANNER = 0x02 HID_REPORT_SCANNED_DATA = 0x12 HID_REPORT_SCANNED_DATA_MAXLENGTH = 0xFF # struct input_event data to move the mouse # the timeval header uses native types, the event data uses platform-independent types # struct timeval { tv_sec = 0, tv_usec = 0 } evhdr = struct.pack('@LL', 0, 0) # struct input_event { time = evhdr, type = EV_REL(2), code = REL_X(0), value = 1/-1 } moveleft = evhdr + struct.pack('=HHi', 2, 0, -1) moveright = evhdr + struct.pack('=HHi', 2, 0, 1) # struct input_event { time = evhdr, type = EV_SYN(0), code = 0, value = 0 } evsyn = evhdr + struct.pack('=HHi', 0, 0, 0) class BarcodeServer: def __init__(self, ns): self.vid = int(ns.vid, 16) if ns.vid else None self.pid = int(ns.pid, 16) if ns.pid else None self.url = ns.url self.aim = ns.aim self.chars = ns.chars self.q = queue.Queue() self.e = Event() self.shutdown = Event() self.sock = None if ns.ydotool_socket: self.ydotool_socket = ns.ydotool_socket self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) self.wshost = ns.websocket_host self.wsport = ns.websocket_port def start(self): self.t_ws = Thread(target=self.serve, args=[]) self.t_ws.start() self.t_bc = Thread(target=self.loop, args=[]) self.t_bc.start() try: self.t_ws.join() self.t_bc.join() except KeyboardInterrupt: print('Shutting down...') self.shutdown.set() self.t_ws.join() self.t_bc.join() def ws_handler(self, websocket): a = websocket.remote_address if self.e.is_set(): return self.e.set() print(f'websocket connected: {a}') try: while not self.shutdown.is_set(): try: code = self.q.get(timeout=1) print(f'sending code {code} via ws {a}') try: websocket.send(code) self.q.task_done() except: self.q.put(code) finally: websocket.close() break except queue.Empty: websocket.ping() finally: self.e.clear() print(f'websocket {a} disconnected') def serve(self): with serve(self.ws_handler, self.wshost, self.wsport) as server: server.serve_forever() def loop(self): while not self.shutdown.is_set(): try: h = hid.device() if self.vid and self.pid: h.open(self.vid, self.pid) else: # Auto-detect barcode scanners devs = [x for x in hid.enumerate() if x['usage_page'] == HID_USAGE_PAGE_BARCODE_SCANNER and x['usage'] == HID_USAGE_BARCODE_SCANNER] if not devs: raise RuntimeError('No USB HID POS barcode scanner found') h.open(devs[0]['vendor_id'], devs[0]['product_id']) print(f'Device manufacturer: {h.get_manufacturer_string()}') print(f'Product: {h.get_product_string()}') print(f'Serial Number: {h.get_serial_number_string()}') while not self.shutdown.is_set(): # Line from Honeywell HID POS scanner consists of: # - 0x02 = HID Report ID "Scanned Data Report" # - 1 byte payload length (without AIM, so 13 for EAN-13) # - 3 bytes AIM, e.g. "]E0" for EAN-13 # - payload bytes # - Rest is filled with zeros hidmsg = h.read(255, 1000) try: flag = hidmsg.index(ord(']')) except ValueError: continue if len(hidmsg) < flag+3: continue try: end = hidmsg[flag:].index(0) end += flag except ValueError: continue aim = bytes(hidmsg[flag:flag+3]).decode() data = bytes(hidmsg[flag+3:end]).decode().strip() print(aim, hidmsg[flag+3:end], data) if not aim.startswith(self.aim): print('aim mismatch') continue if any(filter(lambda c: c not in self.chars, data)): print('chars mismatch') continue url = self.url.format(data) if self.sock: try: print(f'Moving mouse via ydotoold') self.sock.sendto(moveleft, self.ydotool_socket) self.sock.sendto(evsyn, self.ydotool_socket) self.sock.sendto(moveright, self.ydotool_socket) self.sock.sendto(evsyn, self.ydotool_socket) except Exception as e: print(e) if not self.e.is_set(): with self.q.mutex: print('clearing queue') self.q.queue.clear() print(f'Opening URL {url}') webbrowser.open(url, new=2) self.q.put(data) print(f'enqued {data}') except Exception as e: print(e) time.sleep(5) def main(): ap = argparse.ArgumentParser(sys.argv[0], description='Opens a website when a barcode is scanned with an USB-HID barcode scanner.') ap.add_argument('--url', '-u', type=str, help='The URL to open. "{}" is replaced by the barcode data. Default: http://localhost/?ean={}', default='http://localhost/?ean={}') ap.add_argument('--vid', '-v', type=str, help='Connect to device using this USB vendor ID.') ap.add_argument('--pid', '-p', type=str, help='Connect to device using this USB product ID.') ap.add_argument('--aim', '-A', type=str, help='AIM filter prefix, e.g. "]E0" for EAN13, "]E" for all EAN variants, or "]" for all barcodes. Default: "]"', default=']') ap.add_argument('--chars', '-C', type=str, help='Permitted characters. Defaults to "0123456789"', default='0123456789') ap.add_argument('--ydotool-socket', '-Y', type=str, help='If provided, the ydotool socket to wake the screen.') ap.add_argument('--websocket-host', '-w', type=str, help='Websocket host to listen on. Defaults to "localhost"', default='localhost') ap.add_argument('--websocket-port', '-W', type=int, help='Websocket port to listen on. Defaults to 47808', default=47808) ns = ap.parse_args() BarcodeServer(ns).start() if __name__ == '__main__': main()