modbus_test/robot/client_socket.py

457 lines
15 KiB
Python

import socket
import json
import os
import math
import numpy as np
from typing import Literal
from pprint import pprint
import threading
import pybullet as p
import pybullet_industrial as pi
from logger import logger
from robot.func import *
from robot.prepare_data import PrepareRobotData
from robot.urdf_manager import UrdfManager
from robot.socket_manager import SocketManager
# os.environ["LIBGL_ALWAYS_SOFTWARE"] = "1"
class SocketRobotArm:
global_speed = 10
physical_speed = 10
# laser_id = 15
laser_id = 14
filename = "axis"
urdf_filename = "sample"
pass_size = 4
connected_status = Literal["connected", "not_connected", "error"]
_axis_coordinates = []
_world_coordinates = []
_command_count = []
command_type = "base"
command_data = None
motion_type = "reset"
motion_type = "normal"
def __init__(self, application_path, *args, **kwargs):
self.socket = None
self.host = None
self.port = 9760
self.slave_id = None
self.status: SocketRobotArm.connected_status = "not_connected"
self.q_app = None
self.imitate_point = None
robot_start_position = [0, 0, 0]
self.prepare_data = PrepareRobotData()
self.socket_manager = SocketManager()
self.urdf_manager = UrdfManager(
robot_start_position,
application_path,
self.palletizing,
)
self.constraint_id = None
def __exit__(self, exc_type, exc_value, traceback):
logger.info("exiting")
self.socket.close()
def start(self, type="DIRECT"):
self.urdf_manager.run_pybullet(type)
def start_loop(self, urdf):
self.urdf_manager.start_loop(urdf)
self.body_id = self.urdf_manager.body_id
self.num_joints = p.getNumJoints(self.body_id)
def load_palletizing(self):
self.urdf_manager.load_palletizing()
def start_moving(self):
self.urdf_manager.conveyor.moving = True
def get_pybullet_image(self):
return self.urdf_manager.get_pybullet_image()
def close(self):
self.socket_manager.close()
def connect(self, host, slave_id):
self.socket_manager.connect(host)
with open("delta.json", "w") as f:
pass
def get_status(self):
return self.socket_manager.status
def motionFund(self, radians):
if self.motion_type == "reset":
for index, r in enumerate(radians):
p.resetJointState(self.urdf_manager.body_id, index, r)
time.sleep(0.3)
else:
p.setJointMotorControlArray(
self.urdf_manager.body_id,
[i for i in range(len(radians))],
p.POSITION_CONTROL,
radians,
)
time.sleep(1)
def upd_model(self):
threading.Thread(target=self.upd_model_func, daemon=True).start()
def upd_model_func(self):
self.get_coordinates("axis")
self.set_text(text=f"Координаты осей {self.axis_coordinates}")
time.sleep(0.5)
self.get_coordinates("world")
self.set_text(text=f"Мировые координаты {self.world_coordinates}")
time.sleep(0.5)
self._fetch_command_count()
time.sleep(0.5)
axisJointPoses = self.axis_coordinates
self.motionFund(np.radians(axisJointPoses))
time.sleep(0.5)
if False:
coordinates = [float(p) * 0.001 for p in self.world_coordinates[:3]]
angles = [math.radians(p) for p in self.world_coordinates[3:6]]
worldJointPoses = self.convert_to_joint_base(coordinates, angles)
for i in range(self.num_joints):
v = worldJointPoses[i] if i < len(worldJointPoses) else 0
self.motionFund(worldJointPoses)
time.sleep(0.5)
calcWorldPosition = p.getLinkState(
self.urdf_manager.body_id,
self.num_joints - 1,
computeForwardKinematics=True,
)
calcWorldPosition = list(
[p * 1000 for p in calcWorldPosition[0]]
+ [
np.degrees(p)
for p in p.getEulerFromQuaternion(calcWorldPosition[1])
]
)
time.sleep(0.5)
position_diff = np.linalg.norm(
self.world_coordinates[:3] - np.array(calcWorldPosition[:3])
)
# print(*[f"world delta {i} {self.world_coordinates[i] - calcWorldPosition[i]}" for i in range(6)])
# print(*[f"axis delta {i} {self.axis_coordinates[i] - np.rad2deg(worldJointPoses[i])}" for i in range(6)])
# print(self.axis_coordinates[4] - np.rad2deg(worldJointPoses[4]))
print(
"Разница между расчетом по модели и мировыми координатами:",
position_diff,
)
self.set_text(text=f"Команд в очереди {self.command_count}")
time.sleep(0.5)
def cycle_base(self):
self.upd_model()
self.send_data(self.set_global_speed())
self.set_text(text=f"Установили глобальную скорость {self.global_speed}")
time.sleep(0.5)
def cycle_start(self):
self.send_data(self.start_cycle())
self.set_text(text=f"Старт одиночного цикла")
time.sleep(0.5)
if self.command_type == "base":
commands = self.steps_from_file()
if self.command_type == "calc":
commands = self.convert_point_to_free()
if self.command_type == "pallette":
commands = self.get_palletizing_json()
with open(f"log/{self.command_type}.json", "w") as myfile:
myfile.write(json.dumps(commands))
self.add_rcc_list = (
[self.set_physical_speed(True), self.set_output_laser(True)]
+ commands
+ [self.set_physical_speed(False), self.set_output_laser(False)]
)
step = 4
empty = 1
for i in range(0, len(self.add_rcc_list), step):
self.command_data = f"Отправка данных {i}...{i+step-1}"
self.send_data(make_addrcc_data(self.add_rcc_list[i : i + step], empty))
empty = 0
time.sleep(0.05)
def imitate(self):
threading.Thread(target=self.imitate_func, daemon=True).start()
def palletizing(self, position, box_id, counter):
dir_wall = (self.urdf_manager.conv_direction + 1) % 2
near_position = position[:]
near_position[dir_wall] -= 0.4
object_position = position[:]
object_position[dir_wall] -= 0.02
pallet_position = self.urdf_manager.packing[
counter % len(self.urdf_manager.packing)
]
logger.info(f"pallete {pallet_position}")
base1_position = pallet_position[:]
base1_position[0] -= 0.2
base1_position[2] += 0.2
base_position = pallet_position[:]
base_position[0] -= 0.2
drop_position = pallet_position[:]
drop_position[0] -= 0.4
drop_position[2] += 0.4
json_res = []
tool_pos = [0, 0, 0]
steps = [
{"type": "move", "coordinates": near_position},
{"type": "move", "coordinates": object_position},
{"type": "create_constraint"},
{"type": "move", "coordinates": base1_position},
{"type": "move", "coordinates": base_position},
{"type": "move", "coordinates": pallet_position},
{"type": "remove_constraint"},
{"type": "move", "coordinates": base_position},
{"type": "move", "coordinates": base1_position},
{"type": "move", "coordinates": drop_position},
]
constraint_id = None
for step in steps:
if step["type"] == "move":
# logger.info(step["coordinates"])
jointPoses = self.convert_to_joint_base(
step["coordinates"],
np.radians(tool_pos),
)
json_res.append(
self.prepare_data.make_step(
"free", np.degrees(jointPoses), [0, 0, 0, 0, 0, 0]
)
)
self.motionFund(jointPoses)
# time.sleep(2)
if step["type"] == "create_constraint":
link_state = p.getLinkState(self.body_id, self.num_joints - 1)
orientation = link_state[1]
constraint_id = p.createConstraint(
parentBodyUniqueId=self.body_id,
parentLinkIndex=self.num_joints - 1,
childBodyUniqueId=box_id,
childLinkIndex=-1,
jointType=p.JOINT_FIXED,
jointAxis=[0, 0, 0],
parentFramePosition=[0, 0, 0],
childFramePosition=[0, 0.2, 0],
parentFrameOrientation=p.getQuaternionFromEuler(
np.radians([0, 0, 90])
),
childFrameOrientation=orientation,
)
if step["type"] == "remove_constraint":
time.sleep(0.5)
p.removeConstraint(constraint_id)
# time.sleep(0.5)
# p.resetBasePositionAndOrientation(
# box_id,
# pallet_position,
# p.getQuaternionFromEuler(np.radians([0, 0, 0])),
# )
with open(f"data/palletizing.json", "w") as myfile:
myfile.write(json.dumps(json_res))
self.urdf_manager.conveyor.conveyor_stopped = False
def set_text(self, text):
logger.info(text)
def send_data(self, data):
return self.socket_manager.send_data(data)
def get_coordinates(self, coord_type: Literal["axis", "world"] = "axis"):
request = make_query_data([f"{coord_type}-{i}" for i in range(6)])
result = self.send_data(request)
if not result:
return
data = [float(i) for i in result]
if coord_type == "axis":
self._axis_coordinates = data
elif coord_type == "world":
self._world_coordinates = data
@property
def axis_coordinates(self):
return self._axis_coordinates
@property
def world_coordinates(self):
return self._world_coordinates
def _fetch_command_count(self):
request = make_query_data(["RemoteCmdLen"])
result = self.send_data(request)
self._command_count = result
@property
def command_count(self):
return self._command_count
def set_command_type(self, data):
self.command_type = data
def get_command_type(self):
return self.command_type
def get_command_data(self):
return self.command_data
def get_imitate_point(self):
return self.imitate_point
def set_global_speed(self):
return make_command_data(["modifyGSPD", str(self.global_speed * 10)])
def start_cycle(self):
return make_command_data(["actionSingleCycle"])
def set_physical_speed(self, status: bool = False):
return (
{
"oneshot": "0",
"action": "51",
"isUse": str(int(status)),
"speed": str(self.physical_speed * 1000),
},
)
def set_output_laser(self, status: bool = False):
return (
{
"oneshot": "0",
"action": "200",
"type": "0",
"io_status": str(int(status)),
"point": str(self.laser_id),
},
)
def imitate_func(self):
points = self.convert_point_to_free()
for point in points:
jointPoses = [np.radians(float(point[f"m{i}"])) for i in range(6)]
self.motionFund(jointPoses)
def get_palletizing_json(self):
with open(f"data/palletizing.json", "r") as fp:
data = json.load(fp)
return data
def convert_point_to_free(self):
points = self.steps_from_file()
res = []
for i, point in enumerate(points):
if point["action"] == "10":
self.imitate_point = i
points = [
point["m0"],
point["m1"],
point["m2"],
]
angles = [
point["m3"],
point["m4"],
point["m5"],
]
jointPoses = self.convert_to_joint_base(
[float(p) * 0.001 for p in points],
np.radians([float(x) for x in angles]),
)
res.append(
self.prepare_data.make_step(
"free", np.degrees(jointPoses), [0, 0, 0, 0, 0, 0]
)
)
return res
def convert_to_joint_base(self, coordinates, angles):
lower_limits = []
upper_limits = []
for i in range(self.num_joints):
limit = p.getJointInfo(self.body_id, i)
lower_limits.append(limit[8])
upper_limits.append(limit[9])
joint_angles = p.calculateInverseKinematics(
self.body_id,
endEffectorLinkIndex=self.num_joints - 1,
targetPosition=[float(c) for c in coordinates],
targetOrientation=p.getQuaternionFromEuler(angles),
lowerLimits=lower_limits,
upperLimits=upper_limits,
)
return joint_angles
def steps_from_file(self):
if not self.world_coordinates:
return []
result = []
with open(f"data/{self.filename}.nc.result", "r") as fp:
for line in fp:
data = line.strip().split(" ")
prep = {}
axis = ["X", "Y", "Z", "U", "V", "W"]
for item in data:
if item and item[0] and item[0] in axis:
prep[item[:1]] = float(item[1:])
if len(prep) > 0:
r = self.prepare_data.make_step(
"line",
(
prep.get("X", 0),
prep.get("Y", 0),
prep.get("Z", 0),
prep.get("U", 0),
prep.get("V", 0),
prep.get("W", 0),
),
self.world_coordinates,
)
# print(r)
result.append(r)
return result