modbus_test/utils/test_socket_server.py

116 lines
3.4 KiB
Python

import socket
import json
HOST = "127.0.0.1" # Standard loopback interface address (localhost)
PORT = 9760 # Port to listen on (non-privileged ports are > 1023)
c_angles = [
{
"world": [
1131.959351,
-588.689941,
1277.805054,
6.181231,
54.227802,
-100.604988,
],
"axis": [-41.612457, 31.759747, -26.773878, 74.869049, -45.417992, -20.86335],
},
{
"world": [1282.244, -75.427, 1772.476, 84.629, 34.519, 153.999],
"axis": [-8.487, -8.681, 33.058, 88.070, -75.010, -10.566],
},
{
"world": [1630.006, 628.326, 1322.554, 0, 90, 0],
"axis": [22.493, -30.696, 35.997, -77.419, -23.007, 76.362],
},
{
"world": [1052.757, -1028.846, 334.417, 162.374, 20.467, 75.065],
"axis": [-43.742, -30.698, -12.227, -21.884, -23.078, 76.362],
},
{
"world": [1396.525, 1.687, 1322.697, 0, 90, 0],
"axis": [0, -0.002, -0.002, 0.003, -0.001, 0],
},
{
"world": [1396.513, 1.758, 1322.749, -0.002, 90, 0],
"axis": [0.002, -0.001, 0, 0, 0, 0],
},
{
"world": [-1.645, 1396.533, 1322.700, -89.998, 90.000, 0],
"axis": [89.998, -0.002, -0.001, 0.001, -0.001, -0.001],
},
{
"world": [-61.982, 1.689, 1955.771, 0, 25, 0.003],
"axis": [0.003, 65, -0.001, 0.001, 0, -0.001],
},
{
"world": [72.252, 1.696, 2450.564, 0, 0, 0],
"axis": [0.002, -0.001, 89.999, 0.002, 0, -0.001],
},
{
"world": [1396.542, 1.748, 1322.702, 90.004, 0.001, 90.003],
"axis": [0.002, -0.003, 0, 89.999, -0.001, 0],
},
{
"world": [1279.539, 1.716, 1439.725, 0.004, 0.005, 0.001],
"axis": [0.001, -0.002, -0.001, 0.004, 89.998, 0],
},
{
"world": [1396.518, 1.704, 1322.743, 90.002, -0.006, 90],
"axis": [0, -0.001, 0, 0.002, 0, 90.003],
},
]
coordinates_index = 0
def handle_client(conn, addr):
print(f"Connected by {addr}")
try:
while True:
data = conn.recv(1024)
if not data:
break
req = json.loads(data)
res = {"queryData": ["ok"]}
if "axis-0" in req["queryAddr"]:
global coordinates_index
print(coordinates_index)
c = c_angles[coordinates_index]
limit = len(c_angles) - 1
# limit = 2
if coordinates_index < limit:
coordinates_index += 1
else:
coordinates_index = 0
if "queryAddr" in req.keys() and "axis-0" in req["queryAddr"]:
res["queryData"] = c["axis"]
if "queryAddr" in req and "world-0" in req["queryAddr"]:
res["queryData"] = c["world"]
if req["reqType"] == "command":
res["cmdReply"] = ["ok"]
conn.sendall(json.dumps(res).encode())
except json.JSONDecodeError as e:
print(f"JSON decode error: {e}")
except socket.error as e:
print(f"Socket error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
finally:
conn.close()
print(f"Connection with {addr} closed")
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((HOST, PORT))
s.listen()
print(f"Server listening on {HOST}:{PORT}")
while True:
conn, addr = s.accept()
handle_client(conn, addr)