scales: Add simulator.py

This commit is contained in:
2025-11-09 23:42:42 +01:00
parent 92f08db458
commit d41aa38ba1

105
scales/simulator.py Executable file
View File

@@ -0,0 +1,105 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import socket
import datetime
import random
import time
import crcmod.predefined
IDENT_NR = 0
def crc16_modbus(data: bytes) -> int:
crc = crcmod.predefined.Crc('modbus')
crc.update(data)
return int(crc.hexdigest(), 16)
def handle_systec_it(req: bytes) -> bytes:
global IDENT_NR
if not req.startswith(b'<') or not req.endswith(b'>'):
return b'<31>\r\n'
req = req[1:-1]
if req in (b'OS01', b'OS02', b'OC01', b'OC02'):
# set or clear outputs 1 or 2
return b'<00>\r\n'
elif len(req) > 3:
return b'<32>\r\n'
if req.startswith(b'RN'):
incr = True
elif req.startswith(b'RM'):
incr = False
else:
return b'<32>\r\n'
if incr:
time.sleep(random.randint(0, 5) / 10.0)
gew = random.randint(40, 4000)
m = random.randint(1, 10)
match m:
case 1:
return b'<12>\r\n'
case 2:
incr = False
gew = 0
if incr:
IDENT_NR += 1
now = datetime.datetime.now()
data = f'0000{now:%d.%m.%y%H:%M}{IDENT_NR if incr else 0:4}1{gew:8}{0:8}{gew:8}kg {1:3}'.encode('ascii')
return b'<' + data + f'{crc16_modbus(data):8}'.encode('ascii') + b'>\r\n'
def handle_avery_async(client: socket.socket) -> None:
global IDENT_NR
try:
client.settimeout(0)
while True:
time.sleep(5)
gew = random.randint(40, 4000)
IDENT_NR += 1
now = datetime.datetime.now()
data = f' {now:%d.%m.%y %H:%M} {IDENT_NR:4} {gew:9}kg '.encode('ascii')
client.send(data)
except:
pass
def log(msg: bytes) -> None:
print(msg.decode('ascii'), end='', flush=True)
def main(mode: str) -> None:
s = socket.socket()
s.bind(('0.0.0.0', 1234))
s.listen()
print('Ready for connections')
while True:
c, (addr, port) = s.accept()
print(f'Accepting connection from [{addr}]:{port}')
if mode == 'SysTec-IT':
try:
while True:
req = c.recv(1024)
log(req)
res = handle_systec_it(req)
log(res)
c.send(res)
except:
pass
elif mode == 'Avery-Async':
handle_avery_async(c)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('mode', choices=['SysTec-IT', 'Avery-Async'])
args = parser.parse_args()
main(args.mode)