modbus_test/utils/test_socket_server.py

180 lines
5.4 KiB
Python

import socket
import json
HOST = "127.0.0.1" # Standard loopback interface address (localhost)
PORT = 9760 # Port to listen on (non-privileged ports are > 1023)
c_angles = [
{
"world": [
1131.959351,
-588.689941,
1277.805054,
6.181231,
54.227802,
-100.604988,
],
"axis": [-41.612457, 31.759747, -26.773878, 74.869049, -45.417992, -20.86335],
},
{
"world": [1282.244, -75.427, 1772.476, 84.629, 34.519, 153.999],
"axis": [-8.487, -8.681, 33.058, 88.070, -75.010, -10.566],
},
{
"world": [1630.006, 628.326, 1322.554, 0, 90, 0],
"axis": [22.493, -30.696, 35.997, -77.419, -23.007, 76.362],
},
{
"world": [1052.757, -1028.846, 334.417, 162.374, 20.467, 75.065],
"axis": [-43.742, -30.698, -12.227, -21.884, -23.078, 76.362],
},
{
"world": [1396.525, 1.687, 1322.697, 0, 90, 0],
"axis": [0, -0.002, -0.002, 0.003, -0.001, 0],
},
{
"world": [1396.513, 1.758, 1322.749, -0.002, 90, 0],
"axis": [0.002, -0.001, 0, 0, 0, 0],
},
{
"world": [-1.645, 1396.533, 1322.700, -89.998, 90.000, 0],
"axis": [89.998, -0.002, -0.001, 0.001, -0.001, -0.001],
},
{
"world": [-61.982, 1.689, 1955.771, 0, 25, 0.003],
"axis": [0.003, 65, -0.001, 0.001, 0, -0.001],
},
{
"world": [72.252, 1.696, 2450.564, 0, 0, 0],
"axis": [0.002, -0.001, 89.999, 0.002, 0, -0.001],
},
{
"world": [1396.542, 1.748, 1322.702, 90.004, 0.001, 90.003],
"axis": [0.002, -0.003, 0, 89.999, -0.001, 0],
},
{
"world": [1279.539, 1.716, 1439.725, 0.004, 0.005, 0.001],
"axis": [0.001, -0.002, -0.001, 0.004, 89.998, 0],
},
{
"world": [1396.518, 1.704, 1322.743, 90.002, -0.006, 90],
"axis": [0, -0.001, 0, 0.002, 0, 90.003],
},
]
c_angles = [
{
"world": [
894.159973,
-738.295654,
782.748047,
172.274109,
28.079031,
120.377357,
],
"axis": [-39.428261, 10.391139, -32.562145, 176.81929, 38.895988, -159.779755],
},
{
"world": [
1100.071655,
-347.347412,
782.754578,
172.284241,
28.079557,
142.505508,
],
"axis": [-17.409513, 10.897068, -33.003525, 176.892288, 38.957874, -159.934479],
},
]
c_small = [
{
"world": [549.087, -803.609, 939.418, 125.479, 50.992, 63.257],
"axis": [-55.721, 12.229, -33.657, 35.030, 0.00, -1.621],
},
{
"world": [202.361, 333.995, 1743.859, -33.7, 40.692, 12.951],
"axis": [58.637, 41.540, -2.386, -32.975, -0.052, 0.119],
},
{
"world": [915.212, 352.196, 1110.636, -109.839, 55.517, -85.605],
"axis": [20.987, 14.281, -25.313, -32.976, -0.054, 0.117],
},
{
"world": [915.172, 352.254, 1110.614, 179.937, 78.911, -159.077],
"axis": [20.986, 14.282, -25.313, -0.102, -0.057, 0.115],
},
{
"world": [1165.135, 1.142, 1312.130, 179.258, 89.223, 179.259],
"axis": [0.001, -0.299, -0.416, -0.103, -0.059, 0.113],
},
{
"world": [1410.226, 1.123, 1324.733, 0, 90, 0],
"axis": [0, -20.6, 23.854, -0.001, -3.249, 0.004],
},
{
"world": [943.3, -650.361, 783.806, 103.505, 55.005, 42.234],
"axis": [-31.249, -8.117, -28.424, 43.951, 36.544, 0.001],
},
{
"world": [721.214, 299.284, 1141.345, -133.593, 37.242, -120.067],
"axis": [27.053, 27.798, -28.423, -35.215, -41.446, 0.005],
},
]
c_angles = c_small
coordinates_index = 0
def handle_client(conn, addr):
print(f"Connected by {addr}")
try:
while True:
data = conn.recv(1024 * 2)
if not data:
break
req = json.loads(data)
print(req)
res = {"queryData": ["ok"]}
global coordinates_index
if "queryAddr" in req and "axis-0" in req["queryAddr"]:
print(f"----{coordinates_index}----")
limit = len(c_angles) - 1
# limit = 2
if coordinates_index < limit:
coordinates_index += 1
else:
coordinates_index = 0
c = c_angles[coordinates_index]
if "queryAddr" in req and "axis-0" in req["queryAddr"]:
res["queryData"] = c["axis"]
if "queryAddr" in req and "world-0" in req["queryAddr"]:
res["queryData"] = c["world"]
if "reqType" in req and (
req["reqType"] == "command" or req["reqType"] == "AddRCC"
):
res["cmdReply"] = ["it is local dear"]
# print(res)
conn.sendall(json.dumps(res).encode())
except json.JSONDecodeError as e:
print(f"JSON decode error: {e}")
except socket.error as e:
print(f"Socket error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
finally:
conn.close()
print(f"Connection with {addr} closed")
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((HOST, PORT))
s.listen()
print(f"Server listening on {HOST}:{PORT}")
while True:
conn, addr = s.accept()
handle_client(conn, addr)