modbus_test/client_socket.py

145 lines
4.6 KiB
Python

import socket
import json
from pprint import pprint
from func import *
s = socket.socket()
host = MODBUS_SERVER_HOST
port = 9760
s.connect((host, port))
# print(s)
def send_data(data):
response = None
if data["reqType"] != 'AddRCC':
s.send(str.encode(json.dumps(data)))
response_data = s.recv(1024)
response = json.loads(response_data)
if data["reqType"] == 'query':
# pprint({"request":data["queryAddr"], "response":response["queryData"]})
return response["queryData"]
elif data["reqType"] == 'command':
# pprint({"request":data["cmdData"], "response":response["cmdReply"]})
return response["cmdReply"]
else:
pprint(response)
def make_addrcc_data(q, e="1"):
# print(q)
return {"dsID": "www.hc-system.com.HCRemoteCommand","reqType":"AddRCC","emptyList":e,"packID":"0","instructions":q}
def make_command_data(q):
return {"dsID": "www.hc-system.com.RemoteMonitor","reqType":"command","packID":"0","cmdData":q}
def make_query_data(q):
return {"dsID": "www.hc-system.com.RemoteMonitor","reqType":"query","packID":"0","queryAddr":q}
axis_coord_raw = send_data(make_query_data(['axis-0','axis-1','axis-2','axis-3','axis-4','axis-5']))
axis_coord = []
for i in axis_coord_raw:
axis_coord.append(float(i))
world_coord_raw = send_data(make_query_data(['world-0','world-1','world-2','world-3','world-4','world-5']))
world_coord = []
for i in world_coord_raw:
world_coord.append(float(i))
x,y,z,u,v,w = world_coord
print('axis', axis_coord)
print('world', world_coord)
print('remote command count', send_data(make_query_data(['RemoteCmdLen'])))
def make_world_step(type, p):
speed= 10.0
smooth= 9
step = {
"oneshot": "1",
"delay":"0.0",
"speed":str(speed),"smooth":str(smooth),
"coord":"0","tool":"1",
"ckStatus":"0xFF",
}
if type == 'line':
m0,m1,m2,m3,m4,m5=p
step.update({
"action": "10",
"m0": x + m0, "m1": y + m1, "m2": z + m2,
"m3": u + m3, "m4": v + m4, "m5": w + m5,
"m6": 0, "m7": 0,
})
elif type == 'curve':
m0,m1,m2,m3,m4,m5, m0_p, m1_p,m2_p,m3_p,m4_p,m5_p=p
step.update({
"action": "17",
"m0": x + m0, "m1": y + m1, "m2": z + m2,
"m3": u + m3, "m4": v + m4, "m5": w + m5,
"m6": 0, "m7": 0,
"m0_p": x + m0_p, "m1_p": y + m1_p, "m2_p": z + m2_p,
"m3_p": u + m3_p, "m4_p": v + m4_p, "m5_p": w + m3_p,
"m6_p": 0, "m7_p": 0,
})
for s in step:
step[s] = str(step[s])
return step
k = 4
# data = [
# ('line',(10*k, 10*k, 0,
# 0, 0, 0)),
# ('line',(0, 20*k, 0,
# 0, 0, 0)),
# ('line',(-10*k, 10*k, 0,
# 0, 0, 0)),
# ('line',(0, 0, 0,
# 0, 0, 0)),
# ('line',(0, 0, 0,
# 0, -10, 0)),
# ('curve',(0, -10*k, 0,
# -10, 0, 0,
# -20*k, -10*k, 0,
# 0, -10, 0)),
# ('line',(-20*k, -10*k, 0,
# 0, 0, 0)),
# ('curve',(0, -20*k, 0,
# 0, 0, 0,
# 0, -10*k, 0,
# 0, 0, 0)),
# ]
result = []
with open('data/half-sphere.nc.result', 'r') as fp:
for line in fp:
data = line.strip().split(' ')
prep = {}
for item in data:
prep[item[:1]] = float(item[1:])
result.append(
(
'line',
(
prep.get('X', 0), prep.get('Y', 0), prep.get('Z', 0),
prep.get('U', 0), prep.get('V', 0), prep.get('W', 0)
)
)
)
# Изменили глобальную скорость на 11%
send_data(make_command_data(['modifyGSPD', str(20 * 10)]))
# Поставили Y026 в TRUE
# send_data(make_addrcc_data([
# {"oneshot":"0", "action":"200","type":"0","io_status":"0", "point":"15"},
# {"oneshot":"0", "action":"200","type":"0","io_status":"0", "point":"15", "delay":"1"},
# ]))
send_data(make_addrcc_data(
[
# {"oneshot":"0", "action":"200","type":"0","io_status":"1", "point":"14"},
# {"oneshot":"0", "action":"51","isUse":"1","speed":str(8*1000)}
]
+[make_world_step(*p) for p in result]
+[
# {"oneshot":"0", "action":"51","isUse":"0"},
# {"oneshot":"0", "action":"200","type":"0","io_status":"0", "point":"14"},
]
))
print('remote command count', send_data(make_query_data(['RemoteCmdLen'])))
send_data(make_command_data(['actionSingleCycle']))
# send_data(make_command_data(['actionStop']))