modbus_test/robot/client_socket.py

496 lines
15 KiB
Python

import socket
import json
import os
import math
import numpy as np
import sys
from typing import Literal
from pprint import pprint
import threading
import pybullet as p
import pybullet_industrial as pi
from robot.func import *
from logger import logger
# os.environ["LIBGL_ALWAYS_SOFTWARE"] = "1"
class PrepareRobotData:
line_speed = 100.0
line_smooth = 0
line_tool = 0
def make_step(self, type, point, start_coordinates):
step = {
"oneshot": "1",
"delay": "0.0",
"speed": str(self.line_speed),
"smooth": str(self.line_smooth),
"coord": "0",
"tool": str(self.line_tool),
"ckStatus": "0x3F",
}
if type == "line" or type == "free":
pairs = zip(start_coordinates, point)
m0, m1, m2, m3, m4, m5 = [round(sum(i), 3) for i in pairs]
if type == "line":
step.update({"action": "10"})
if type == "free":
step.update({"action": "4"})
step.update({"m0": m0, "m1": m1, "m2": m2, "m3": m3, "m4": m4, "m5": m5})
step.update({"m6": 0, "m7": 0})
elif type == "curve":
pairs = zip(self.world_coordinates, point[:5])
m0, m1, m2, m3, m4, m5 = [round(sum(i), 3) for i in pairs]
pairs_p = zip(self.world_coordinates, point[6:])
m0_p, m1_p, m2_p, m3_p, m4_p, m5_p = [round(sum(i), 3) for i in pairs_p]
step.update({"action": "17"})
step.update({"m0": m0, "m1": m1, "m2": m2, "m3": m3, "m4": m4, "m5": m5})
step.update({"m6": 0, "m7": 0})
step.update(
{
"m0_p": m0_p,
"m1_p": m1_p,
"m2_p": m2_p,
"m3_p": m3_p,
"m4_p": m4_p,
"m5_p": m5_p,
}
)
step.update({"m6_p": 0, "m7_p": 0})
for s in step:
step[s] = str(step[s])
return step
class SocketManager:
def __init__(self):
self.host = None
self.port = 9760
self.socket = None
self.status: Literal["connected", "not_connected", "error"] = "not_connected"
def connect(self, host):
self.host = host
if self.socket is None:
self.socket = socket.socket()
logger.info(f"trying connect to {(self.host, self.port)}")
self.socket.connect((self.host, self.port))
self.status = "connected"
def close(self):
if self.socket:
self.socket.close()
self.socket = None
self.status = "not_connected"
def send_data(self, data):
if not self.socket:
return
try:
self.socket.send(str.encode(json.dumps(data)))
response_data = self.socket.recv(1024)
response = json.loads(response_data)
if data["reqType"] == "query":
return response["queryData"]
elif data["reqType"] == "command":
return response["cmdReply"]
elif data["reqType"] == "AddRCC" and "cmdReply" in response.keys():
return response["cmdReply"]
else:
pprint(response_data)
return json.loads(response_data)
except Exception as e:
logger.error(f"Error sending data: {e}")
return None
class UrdfManager:
urdf_filename = None
physics_client = None
body_id = None
def __init__(self, robot_start_position, application_path):
self.robot_start_position = robot_start_position
self.application_path = application_path
def start_loop(self, urdf):
self.urdf_filename = urdf
p.resetSimulation()
self.load_models()
def run_pybullet(self, type="DIRECT"):
self.physics_client = p.connect(getattr(p, type))
logger.info(f"Connect to {self.physics_client} by {type}")
def load_models(self):
# p.loadURDF("urdf/plane.urdf", physicsClientId=self.physics_client)
urdf_path = os.path.join(
self.application_path, "urdf", f"{self.urdf_filename}.urdf"
)
self.body_id = p.loadURDF(urdf_path, self.robot_start_position)
time.sleep(1)
def get_pybullet_image(self):
if self.physics_client is None:
return
width, height, rgb, _, _ = p.getCameraImage(
width=500,
height=500,
viewMatrix=p.computeViewMatrix(
cameraEyePosition=[4, -4, 1.5],
# cameraEyePosition=[4,-4,4],
cameraTargetPosition=[0, 0, 0], # Центр фокусировки камеры
cameraUpVector=[0, 0, 1], # Направление вверх
),
projectionMatrix=p.computeProjectionMatrixFOV(
fov=60.0, aspect=1.0, nearVal=0.1, farVal=10.0
),
# renderer=p.ER_TINY_RENDERER,
physicsClientId=self.physics_client,
)
return (rgb, width, height)
class SocketRobotArm:
global_speed = 10
physical_speed = 10
# laser_id = 15
laser_id = 14
filename = "axis"
urdf_filename = "sample"
pass_size = 4
Status = Literal["connected", "not_connected", "error"]
start_axis_coordinates = []
start_world_coordinates = []
remote_command_count = []
command_type = "base"
command_data = None
def __init__(self, application_path, *args, **kwargs):
self.socket = None
self.host = None
self.port = 9760
self.slave_id = None
self.status: SocketRobotArm.Status = "not_connected"
self.q_app = None
self.imitate_point = None
robot_start_position = [0, 0, 0]
self.prepare_data = PrepareRobotData()
self.socket_manager = SocketManager()
self.urdf_manager = UrdfManager(
robot_start_position=robot_start_position, application_path=application_path
)
def __exit__(self, exc_type, exc_value, traceback):
logger.info("exiting")
self.socket.close()
def start(self, type="DIRECT"):
self.urdf_manager.run_pybullet(type)
def start_loop(self, urdf):
self.urdf_manager.start_loop(urdf)
def load_models(self):
self.urdf_manager.load_models()
self.body_id = self.urdf_manager.body_id
def get_pybullet_image(self):
return self.urdf_manager.get_pybullet_image()
def close(self):
self.socket_manager.close()
def connect(self, host, slave_id):
self.socket_manager.connect(host)
with open("delta.json", "w") as f:
pass
def get_status(self):
return self.socket_manager.status
def upd_model(self):
threading.Thread(target=self.upd_model_func, daemon=True).start()
def upd_model_func(self):
# logger.info("UPDATE MODEL FUNC")\
mode = "by_joint"
# mode = "by_world"
self.get_axis()
self.set_text(text=f"Координаты осей {self.axis_coordinates}")
self.get_world()
self.set_text(text=f"Мировые координаты {self.world_coordinates}")
num_joints = p.getNumJoints(self.urdf_manager.body_id)
jointPoses = self.axis_coordinates
for i in range(num_joints):
p.resetJointState(self.urdf_manager.body_id, i, math.radians(jointPoses[i]))
time.sleep(2)
coordinates = self.world_coordinates[:3]
angles = [
self.world_coordinates[1],
self.world_coordinates[2],
self.world_coordinates[3],
]
jointPoses = self.convert_to_joint_base(
[float(p) * 0.001 for p in coordinates], angles
)
for i in range(num_joints):
p.resetJointState(self.urdf_manager.body_id, i, jointPoses[i])
time.sleep(2)
self._get_command_count()
self.set_text(text=f"Команд в очереди {self.remote_command_count}")
time.sleep(0.5)
def cycle_base(self):
self.upd_model()
self.send_data(self.set_global_speed())
self.set_text(text=f"Установили глобальную скорость {self.global_speed}")
time.sleep(0.5)
def cycle_start(self):
self.send_data(self.start_cycle())
self.set_text(text=f"Старт одиночного цикла")
time.sleep(0.5)
commands = (
self.steps_from_file()
if self.command_type == "base"
else self.convert_file_to_joint()
)
with open(f"{self.command_type}.log", "w") as myfile:
myfile.write(json.dumps(commands))
self.add_rcc_list = (
[self.set_physical_speed(True), self.set_output_laser(True)]
+ commands
+ [self.set_physical_speed(False), self.set_output_laser(False)]
)
step = 4
empty = 1
for i in range(0, len(self.add_rcc_list), step):
self.command_data = f"Отправка данных {i}...{i+step-1}"
self.send_data(make_addrcc_data(self.add_rcc_list[i : i + step], empty))
empty = 0
time.sleep(0.05)
def imitate(self):
threading.Thread(target=self.imitate_func, daemon=True).start()
def set_text(self, text):
logger.info(text)
def send_data(self, data):
return self.socket_manager.send_data(data)
def get_axis(self):
axis_coord_raw = self.send_data(
make_query_data(
["axis-0", "axis-1", "axis-2", "axis-3", "axis-4", "axis-5"]
)
)
self.axis_coordinates = [float(i) for i in axis_coord_raw]
print("start_axis_coordinates", self.axis_coordinates)
def get_axis_coordinates(self):
return self.axis_coordinates
def get_world(self):
world_coord_raw = self.send_data(
make_query_data(
["world-0", "world-1", "world-2", "world-3", "world-4", "world-5"]
)
)
self.world_coordinates = [float(i) for i in world_coord_raw]
print("start_world_coordinates", self.world_coordinates)
def get_world_coordinates(self):
return self.world_coordinates
def _get_command_count(self):
res = self.send_data(make_query_data(["RemoteCmdLen"]))
self.remote_command_count = res
print(res)
def get_command_count(self):
return self.remote_command_count
def set_command_type(self, data):
self.command_type = data
def get_command_type(self):
return self.command_type
def get_command_data(self):
return self.command_data
def get_imitate_point(self):
return self.imitate_point
def set_global_speed(self):
# Изменили глобальную скорость на global_speed%
return make_command_data(["modifyGSPD", str(self.global_speed * 10)])
def start_cycle(self):
return make_command_data(["actionSingleCycle"])
def set_physical_speed(self, status: bool = False):
return (
{
"oneshot": "0",
"action": "51",
"isUse": str(int(status)),
"speed": str(self.physical_speed * 1000),
},
)
def set_output_laser(self, status: bool = False):
return (
{
"oneshot": "0",
"action": "200",
"type": "0",
"io_status": str(int(status)),
"point": str(self.laser_id),
},
)
def imitate_func(self):
points = self.steps_from_file()
for i, point in enumerate(points):
if point["action"] == "10":
self.imitate_point = i
points = [
point["m0"],
point["m1"],
point["m2"],
]
angles = [
point["m3"],
point["m4"],
point["m5"],
]
jointPoses = self.convert_to_joint_base([float(p) * 0.001 for p in points], angles)
num_joints = p.getNumJoints(self.urdf_manager.body_id)
for i in range(num_joints):
p.resetJointState(self.urdf_manager.body_id, i, jointPoses[i])
time.sleep(1)
def convert_to_joint_base(self, coordinates, angles):
body_id = 0
num_joints = p.getNumJoints(body_id)
lower_limits = []
upper_limits = []
for i in range(num_joints):
limit = p.getJointInfo(body_id, i)
lower_limits.append(limit[8])
upper_limits.append(limit[9])
joint_angles = p.calculateInverseKinematics(
body_id,
endEffectorLinkIndex=num_joints - 1,
targetPosition=[float(c) for c in coordinates],
targetOrientation=angles,
lowerLimits=lower_limits,
upperLimits=upper_limits,
# jointRanges=[0, 0, 0, 0, 0, 0],
# restPoses=[0, 0, 0, 0, 0, 0, -math.pi * 0.5, 0],
# solver=0,
# maxNumIterations=100,
# residualThreshold=0.001,
)
return joint_angles
def convert_file_to_joint(self):
result = []
with open(f"data/{self.filename}.nc.result", "r") as fp:
for line in fp:
data = line.strip().split(" ")
prep = {}
for item in data:
prep[item[:1]] = float(item[1:])
pj = list(
self.convert_to_joint_base(
coordinates=[
f
for f in (
prep.get("X", 0) + self.world_coordinates[0],
prep.get("Y", 0) + self.world_coordinates[1],
prep.get("Z", 0) + self.world_coordinates[2],
)
],
angles=[
prep.get("U", 0) + self.world_coordinates[3],
prep.get("V", 0) + self.world_coordinates[4],
prep.get("W", 0) + self.world_coordinates[5],
],
)
)
result.append(
self.prepare_data.make_step(
"free", np.degrees(pj), [0, 0, 0, 0, 0, 0]
)
)
return result
def steps_from_file(self):
result = []
with open(f"data/{self.filename}.nc.result", "r") as fp:
for line in fp:
data = line.strip().split(" ")
prep = {}
for item in data:
prep[item[:1]] = float(item[1:])
result.append(
self.prepare_data.make_step(
"line",
(
prep.get("X", 0),
prep.get("Y", 0),
prep.get("Z", 0),
prep.get("U", 0),
prep.get("V", 0),
prep.get("W", 0),
),
self.world_coordinates,
)
)
return result