125 lines
4.1 KiB
Python
125 lines
4.1 KiB
Python
# Настройки клиента Modbus
|
|
MODBUS_SERVER_HOST = "192.168.70.55" # IP-адрес Modbus-сервера
|
|
# MODBUS_SERVER_HOST = "192.168.70.65" # IP-адрес Modbus-сервера
|
|
MODBUS_SERVER_PORT = 502
|
|
MODBUS_SLAVE_ID = 11
|
|
# MODBUS_SLAVE_ID = 22
|
|
|
|
indent = 21100
|
|
s = 800
|
|
k = 2
|
|
|
|
from pymodbus.client import ModbusTcpClient
|
|
from pymodbus.constants import Endian
|
|
from pymodbus.payload import BinaryPayloadDecoder
|
|
import time
|
|
import glob
|
|
|
|
def two_byte_convert(result):
|
|
# print(result)
|
|
v0 = result.registers[0]
|
|
value = (v0 << 16) + result.registers[1]
|
|
if result.registers[0] > (65535 * 0.5):
|
|
value = value - (1 << 32)
|
|
return value
|
|
|
|
|
|
def bulb(addr, new_state, client, slave_id):
|
|
try:
|
|
coils = client.read_coils(addr, 1, MODBUS_SLAVE_ID)
|
|
# print("читаем начальное состояние", coils.bits)
|
|
|
|
if not coils.bits[0] == new_state:
|
|
result = client.write_coil(addr, new_state, coils.slave_id)
|
|
# print(result)
|
|
|
|
# print(
|
|
# "читаем после попытки",
|
|
# client.read_coils(0, len(coils.bits), coils.slave_id).bits,
|
|
# )
|
|
except Exception as e:
|
|
raise e
|
|
|
|
|
|
def get_coordinates(client):
|
|
try:
|
|
data = [
|
|
("j1", 2268),
|
|
("j2", 2270),
|
|
("j3", 2272),
|
|
("j4", 2274),
|
|
("j5", 2276),
|
|
("j6", 2278),
|
|
("x", 2332),
|
|
("y", 2334),
|
|
("z", 2336),
|
|
("u", 2338),
|
|
("v", 2340),
|
|
("w", 2342),
|
|
]
|
|
i = 2
|
|
for d in data:
|
|
result = client.read_holding_registers(d[1], 2, MODBUS_SLAVE_ID)
|
|
value = two_byte_convert(result)
|
|
adjusted_value = value / 1000.0
|
|
i *= 2
|
|
print(d[0], result.registers, adjusted_value)
|
|
except Exception as e:
|
|
raise e
|
|
|
|
|
|
def collect_coordinates(client):
|
|
# собираем предварительные данные
|
|
x = two_byte_convert(client.read_holding_registers(2332, 2, MODBUS_SLAVE_ID))
|
|
y = two_byte_convert(client.read_holding_registers(2334, 2, MODBUS_SLAVE_ID))
|
|
z = two_byte_convert(client.read_holding_registers(2336, 2, MODBUS_SLAVE_ID))
|
|
u = two_byte_convert(client.read_holding_registers(2338, 2, MODBUS_SLAVE_ID))
|
|
v = two_byte_convert(client.read_holding_registers(2340, 2, MODBUS_SLAVE_ID))
|
|
w = two_byte_convert(client.read_holding_registers(2342, 2, MODBUS_SLAVE_ID))
|
|
print(
|
|
f"Предварительные данные X {x * 0.001}, Y {y * 0.001}, Z {z*0.001}, U {u*0.001}, V {v*0.001}, W {w*0.001}"
|
|
)
|
|
return [x, y, z, u, v, w]
|
|
|
|
|
|
def get_or_set_speed(number, client):
|
|
result = client.read_holding_registers(20200, 1, MODBUS_SLAVE_ID)
|
|
print("сейчас скорость", result.registers[0] * 0.1)
|
|
if number and number > 0 and number <= 1000:
|
|
client.write_register(20200, number * 10, MODBUS_SLAVE_ID)
|
|
result = client.read_holding_registers(20200, 1, MODBUS_SLAVE_ID)
|
|
print("установлена скорость", result.registers[0] * 0.1)
|
|
|
|
|
|
def set_user_register(n, values, client):
|
|
client.write_registers(indent + s + k * n, values, MODBUS_SLAVE_ID)
|
|
|
|
|
|
def to_double(integr):
|
|
return [(integr >> 16) & 0xFFFF, integr & 0xFFFF]
|
|
|
|
|
|
def set_user_reg_800(data, client):
|
|
for item in data:
|
|
d, values = item
|
|
set_user_register(d, values, client)
|
|
|
|
|
|
def start_in_auto(client):
|
|
# старт в авторежиме single loop
|
|
client.write_register(20002, 1, MODBUS_SLAVE_ID)
|
|
|
|
|
|
def stop_immediately(client):
|
|
client.write_register(20004, 1, MODBUS_SLAVE_ID)
|
|
|
|
def multiply_1000(n):
|
|
return int(n * 1000)
|
|
|
|
def make_addrcc_data(q, emptyList=0):
|
|
# print(q)
|
|
return {"dsID": "www.hc-system.com.HCRemoteCommand","reqType":"AddRCC","emptyList":str(emptyList),"packID":"0","instructions":q}
|
|
def make_command_data(q):
|
|
return {"dsID": "www.hc-system.com.RemoteMonitor","reqType":"command","packID":"0","cmdData":q}
|
|
def make_query_data(q):
|
|
return {"dsID": "www.hc-system.com.RemoteMonitor","reqType":"query","packID":"0","queryAddr":q} |