498 lines
16 KiB
Python
498 lines
16 KiB
Python
import socket
|
|
import json
|
|
import os
|
|
import math
|
|
import numpy as np
|
|
|
|
from typing import Literal
|
|
from pprint import pprint
|
|
|
|
import threading
|
|
import pybullet as p
|
|
import pybullet_data
|
|
|
|
from robot.func import *
|
|
from logger import logger
|
|
|
|
# os.environ["LIBGL_ALWAYS_SOFTWARE"] = "1"
|
|
|
|
|
|
class PrepareRobotData:
|
|
line_speed = 100.0
|
|
line_smooth = 9
|
|
line_tool = 1
|
|
|
|
def make_step(self, type, point, start_coordinates):
|
|
step = {
|
|
"oneshot": "1",
|
|
"delay": "0.0",
|
|
"speed": str(self.line_speed),
|
|
"smooth": str(self.line_smooth),
|
|
"coord": "0",
|
|
"tool": str(self.line_tool),
|
|
"ckStatus": "0x3F",
|
|
}
|
|
if type == "line" or type == "free":
|
|
pairs = zip(start_coordinates, point)
|
|
|
|
m0, m1, m2, m3, m4, m5 = [round(sum(i), 3) for i in pairs]
|
|
if type == "line":
|
|
step.update({"action": "10"})
|
|
if type == "free":
|
|
step.update({"action": "4"})
|
|
step.update({"m0": m0, "m1": m1, "m2": m2, "m3": m3, "m4": m4, "m5": m5})
|
|
step.update({"m6": 0, "m7": 0})
|
|
elif type == "curve":
|
|
pairs = zip(self.start_world_coordinates, point[:5])
|
|
m0, m1, m2, m3, m4, m5 = [round(sum(i), 3) for i in pairs]
|
|
|
|
pairs_p = zip(self.start_world_coordinates, point[6:])
|
|
m0_p, m1_p, m2_p, m3_p, m4_p, m5_p = [round(sum(i), 3) for i in pairs_p]
|
|
step.update({"action": "17"})
|
|
step.update({"m0": m0, "m1": m1, "m2": m2, "m3": m3, "m4": m4, "m5": m5})
|
|
step.update({"m6": 0, "m7": 0})
|
|
step.update(
|
|
{
|
|
"m0_p": m0_p,
|
|
"m1_p": m1_p,
|
|
"m2_p": m2_p,
|
|
"m3_p": m3_p,
|
|
"m4_p": m4_p,
|
|
"m5_p": m5_p,
|
|
}
|
|
)
|
|
step.update({"m6_p": 0, "m7_p": 0})
|
|
|
|
for s in step:
|
|
step[s] = str(step[s])
|
|
return step
|
|
|
|
|
|
class SocketManager:
|
|
def __init__(self):
|
|
self.host = None
|
|
self.port = 9760
|
|
self.socket = None
|
|
self.status: Literal["connected", "not_connected", "error"] = "not_connected"
|
|
|
|
def connect(self, host):
|
|
self.host = host
|
|
if self.socket is None:
|
|
self.socket = socket.socket()
|
|
self.socket.connect((self.host, self.port))
|
|
self.status = "connected"
|
|
|
|
def close(self):
|
|
if self.socket:
|
|
self.socket.close()
|
|
self.socket = None
|
|
self.status = "not_connected"
|
|
|
|
def send_data(self, data):
|
|
if not self.socket:
|
|
return
|
|
try:
|
|
self.socket.send(str.encode(json.dumps(data)))
|
|
response_data = self.socket.recv(1024)
|
|
response = json.loads(response_data)
|
|
|
|
if data["reqType"] == "query":
|
|
return response["queryData"]
|
|
elif data["reqType"] == "command":
|
|
return response["cmdReply"]
|
|
elif data["reqType"] == "AddRCC" and "cmdReply" in response.keys():
|
|
return response["cmdReply"]
|
|
else:
|
|
pprint(response_data)
|
|
|
|
return json.loads(response_data)
|
|
except Exception as e:
|
|
logger.error(f"Error sending data: {e}")
|
|
return None
|
|
|
|
|
|
class UrdfManager:
|
|
urdf_filename = None
|
|
physics_client = None
|
|
body_id = None
|
|
|
|
def __init__(self, robot_start_position):
|
|
self.robot_start_position = robot_start_position
|
|
|
|
def start_loop(self, urdf):
|
|
self.urdf_filename = urdf
|
|
|
|
p.resetSimulation()
|
|
self.load_models()
|
|
|
|
def run_pybullet(self, type="DIRECT"):
|
|
self.physics_client = p.connect(getattr(p, type))
|
|
logger.info(f"Connect to {self.physics_client} by {type}")
|
|
|
|
def load_models(self):
|
|
p.loadURDF("urdf/plane.urdf", physicsClientId=self.physics_client)
|
|
|
|
urdf_path = os.path.join("urdf", f"{self.urdf_filename}.urdf")
|
|
logger.info(urdf_path)
|
|
self.body_id = p.loadURDF(
|
|
urdf_path,
|
|
self.robot_start_position,
|
|
useFixedBase=1,
|
|
physicsClientId=self.physics_client,
|
|
)
|
|
time.sleep(1)
|
|
|
|
def get_pybullet_image(self):
|
|
if self.physics_client is None:
|
|
return
|
|
|
|
width, height, rgb, _, _ = p.getCameraImage(
|
|
width=500,
|
|
height=500,
|
|
viewMatrix=p.computeViewMatrix(
|
|
cameraEyePosition=[3, -3, 3],
|
|
cameraTargetPosition=[0, 0, 0], # Центр фокусировки камеры
|
|
cameraUpVector=[0, 0, 1], # Направление вверх
|
|
),
|
|
projectionMatrix=p.computeProjectionMatrixFOV(
|
|
fov=60.0, aspect=1.0, nearVal=0.1, farVal=10.0
|
|
),
|
|
# renderer=p.ER_TINY_RENDERER,
|
|
physicsClientId=self.physics_client,
|
|
)
|
|
|
|
return (rgb, width, height)
|
|
|
|
|
|
class SocketRobotArm:
|
|
global_speed = 10
|
|
physical_speed = 10
|
|
# laser_id = 15
|
|
laser_id = 14
|
|
|
|
filename = "test"
|
|
urdf_filename = "sample"
|
|
|
|
pass_size = 4
|
|
Status = Literal["connected", "not_connected", "error"]
|
|
|
|
start_axis_coordinates = []
|
|
start_world_coordinates = []
|
|
remote_command_count = []
|
|
|
|
command_type = "base"
|
|
command_data = None
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
self.socket = None
|
|
self.host = None
|
|
self.port = 9760
|
|
self.slave_id = None
|
|
self.status: SocketRobotArm.Status = "not_connected"
|
|
|
|
self.q_app = None
|
|
self.imitate_point = None
|
|
|
|
robot_start_position = [
|
|
-0.075458, # новое значение x в метрах
|
|
0.068477, # новое значение y в метрах
|
|
0.082201, # новое значение z в метрах
|
|
]
|
|
|
|
self.prepare_data = PrepareRobotData()
|
|
self.socket_manager = SocketManager()
|
|
self.urdf_manager = UrdfManager(robot_start_position=robot_start_position)
|
|
|
|
def __exit__(self, exc_type, exc_value, traceback):
|
|
logger.info("exiting")
|
|
self.socket.close()
|
|
|
|
def start(self, type="DIRECT"):
|
|
self.urdf_manager.run_pybullet(type)
|
|
|
|
def start_loop(self, urdf):
|
|
self.urdf_manager.start_loop(urdf)
|
|
|
|
def load_models(self):
|
|
self.urdf_manager.load_models()
|
|
self.body_id = self.urdf_manager.body_id
|
|
|
|
def get_pybullet_image(self):
|
|
return self.urdf_manager.get_pybullet_image()
|
|
|
|
def close(self):
|
|
self.socket_manager.close()
|
|
|
|
def connect(self, host, slave_id):
|
|
self.socket_manager.connect(host)
|
|
|
|
def get_status(self):
|
|
return self.socket_manager.status
|
|
|
|
def upd_model(self):
|
|
threading.Thread(target=self.upd_model_func, daemon=True).start()
|
|
|
|
def upd_model_func(self):
|
|
logger.info("UPDATE MODEL FUNC")
|
|
|
|
self.get_axis()
|
|
self.set_text(text=f"Координаты осей {self.start_axis_coordinates}")
|
|
time.sleep(0.5)
|
|
self.set_joint([math.radians(c) for c in self.start_axis_coordinates])
|
|
|
|
self.get_world()
|
|
self.set_text(text=f"Мировые координаты {self.start_world_coordinates}")
|
|
time.sleep(0.5)
|
|
self.set_joint(
|
|
self.convert_to_joint(
|
|
[s * 0.001 for s in self.start_world_coordinates[0:3]]
|
|
)
|
|
)
|
|
|
|
self._get_command_count()
|
|
self.set_text(text=f"Команд в очереди {self.remote_command_count}")
|
|
time.sleep(0.5)
|
|
|
|
num_joints = p.getNumJoints(self.urdf_manager.body_id)
|
|
end_effector_state = p.getLinkState(
|
|
self.urdf_manager.body_id, num_joints - 1, computeForwardKinematics=True
|
|
)
|
|
|
|
end_effector_position = end_effector_state[4]
|
|
end_effector_orientation = end_effector_state[5]
|
|
euler_orientation = p.getEulerFromQuaternion(end_effector_orientation)
|
|
|
|
logger.info(
|
|
f"Вычисленное положение {[a * 1000 for a in end_effector_position]}"
|
|
)
|
|
logger.info(f"Вычисленная ориентация {np.degrees(euler_orientation)}")
|
|
|
|
def cycle_base(self):
|
|
self.upd_model()
|
|
|
|
self.send_data(self.set_global_speed())
|
|
self.set_text(text=f"Установили глобальную скорость {self.global_speed}")
|
|
time.sleep(0.5)
|
|
|
|
def cycle_start(self):
|
|
self.send_data(self.start_cycle())
|
|
self.set_text(text=f"Старт одиночного цикла")
|
|
time.sleep(0.5)
|
|
|
|
commands = (
|
|
self.steps_from_file()
|
|
if self.command_type == "base"
|
|
else self.convert_file_to_joint()
|
|
)
|
|
with open(f"{self.command_type}.log", "w") as myfile:
|
|
myfile.write(json.dumps(commands))
|
|
|
|
self.add_rcc_list = (
|
|
[self.set_physical_speed(True), self.set_output_laser(True)]
|
|
+ commands
|
|
+ [self.set_physical_speed(False), self.set_output_laser(False)]
|
|
)
|
|
|
|
step = 4
|
|
empty = 1
|
|
for i in range(0, len(self.add_rcc_list), step):
|
|
# if not self.socket:
|
|
# return
|
|
|
|
self.command_data = f"Отправка данных {i}...{i+step-1}"
|
|
|
|
self.send_data(make_addrcc_data(self.add_rcc_list[i : i + step], empty))
|
|
empty = 0
|
|
time.sleep(0.05)
|
|
|
|
def imitate(self):
|
|
threading.Thread(target=self.imitate_func, daemon=True).start()
|
|
|
|
def convert_to_joint(self, coordinates, angles=[]):
|
|
num_joints = p.getNumJoints(self.urdf_manager.body_id)
|
|
|
|
joint_angles = p.calculateInverseKinematics(
|
|
self.urdf_manager.body_id,
|
|
endEffectorLinkIndex=num_joints - 1,
|
|
targetPosition=coordinates,
|
|
targetOrientation=(
|
|
p.getQuaternionFromEuler([math.radians(float(a)) for a in angles])
|
|
if len(angles) == 3
|
|
else None
|
|
),
|
|
)
|
|
logger.info(f"convrt to joint {joint_angles}")
|
|
# logger.info(joint_angles)
|
|
return joint_angles
|
|
|
|
def set_text(self, text):
|
|
logger.info(text)
|
|
|
|
def set_joint(self, coordinates):
|
|
logger.info("set joints")
|
|
num_joints = p.getNumJoints(self.urdf_manager.body_id)
|
|
if coordinates is None:
|
|
return
|
|
for joint_index in range(0, num_joints):
|
|
if len(coordinates) <= joint_index or coordinates[joint_index] is None:
|
|
return
|
|
p.setJointMotorControl2(
|
|
bodyUniqueId=self.urdf_manager.body_id,
|
|
jointIndex=joint_index,
|
|
controlMode=p.POSITION_CONTROL,
|
|
targetPosition=coordinates[joint_index],
|
|
)
|
|
time.sleep(0.1)
|
|
|
|
def send_data(self, data):
|
|
return self.socket_manager.send_data(data)
|
|
|
|
def get_axis(self):
|
|
axis_coord_raw = self.send_data(
|
|
make_query_data(
|
|
["axis-0", "axis-1", "axis-2", "axis-3", "axis-4", "axis-5"]
|
|
)
|
|
)
|
|
self.start_axis_coordinates = [float(i) for i in axis_coord_raw]
|
|
print("start_axis_coordinates", self.start_axis_coordinates)
|
|
|
|
def get_axis_coordinates(self):
|
|
return self.start_axis_coordinates
|
|
|
|
def get_world(self):
|
|
world_coord_raw = self.send_data(
|
|
make_query_data(
|
|
["world-0", "world-1", "world-2", "world-3", "world-4", "world-5"]
|
|
)
|
|
)
|
|
self.start_world_coordinates = [float(i) for i in world_coord_raw]
|
|
print("start_world_coordinates", self.start_world_coordinates)
|
|
|
|
def get_world_coordinates(self):
|
|
return self.start_world_coordinates
|
|
|
|
def _get_command_count(self):
|
|
res = self.send_data(make_query_data(["RemoteCmdLen"]))
|
|
self.remote_command_count = res
|
|
print(res)
|
|
|
|
def get_command_count(self):
|
|
return self.remote_command_count
|
|
|
|
def set_command_type(self, data):
|
|
self.command_type = data
|
|
|
|
def get_command_type(self):
|
|
return self.command_type
|
|
|
|
def get_command_data(self):
|
|
return self.command_data
|
|
|
|
def get_imitate_point(self):
|
|
return self.imitate_point
|
|
|
|
def set_global_speed(self):
|
|
# Изменили глобальную скорость на global_speed%
|
|
return make_command_data(["modifyGSPD", str(self.global_speed * 10)])
|
|
|
|
def start_cycle(self):
|
|
return make_command_data(["actionSingleCycle"])
|
|
|
|
def set_physical_speed(self, status: bool = False):
|
|
return (
|
|
{
|
|
"oneshot": "0",
|
|
"action": "51",
|
|
"isUse": str(int(status)),
|
|
"speed": str(self.physical_speed * 1000),
|
|
},
|
|
)
|
|
|
|
def set_output_laser(self, status: bool = False):
|
|
return (
|
|
{
|
|
"oneshot": "0",
|
|
"action": "200",
|
|
"type": "0",
|
|
"io_status": str(int(status)),
|
|
"point": str(self.laser_id),
|
|
},
|
|
)
|
|
|
|
def imitate_func(self):
|
|
points = self.steps_from_file()
|
|
for i, point in enumerate(points):
|
|
if point["action"] == "10":
|
|
self.imitate_point = i
|
|
points = [
|
|
point["m0"],
|
|
point["m1"],
|
|
point["m2"],
|
|
]
|
|
coor = [(float(point) * 0.001) for point in points]
|
|
angles = self.convert_to_joint(
|
|
coor,
|
|
[
|
|
point["m3"],
|
|
point["m4"],
|
|
point["m5"],
|
|
],
|
|
)
|
|
# logger.info(f"point {point} {angles}")
|
|
self.set_joint(angles)
|
|
time.sleep(1)
|
|
|
|
def convert_file_to_joint(self):
|
|
result = []
|
|
with open(f"data/{self.filename}.nc.result", "r") as fp:
|
|
for line in fp:
|
|
data = line.strip().split(" ")
|
|
prep = {}
|
|
for item in data:
|
|
prep[item[:1]] = float(item[1:])
|
|
|
|
pj = list(
|
|
self.convert_to_joint(
|
|
coordinates=(
|
|
prep.get("X", 0),
|
|
prep.get("Y", 0),
|
|
prep.get("Z", 0),
|
|
),
|
|
angles=[
|
|
prep.get("U", 0),
|
|
prep.get("V", 0),
|
|
prep.get("W", 0),
|
|
],
|
|
)
|
|
)
|
|
result.append(
|
|
self.prepare_data.make_step(
|
|
"free", tuple(pj), self.start_axis_coordinates
|
|
)
|
|
)
|
|
return result
|
|
|
|
def steps_from_file(self):
|
|
result = []
|
|
with open(f"data/{self.filename}.nc.result", "r") as fp:
|
|
for line in fp:
|
|
data = line.strip().split(" ")
|
|
prep = {}
|
|
for item in data:
|
|
prep[item[:1]] = float(item[1:])
|
|
result.append(
|
|
self.prepare_data.make_step(
|
|
"line",
|
|
(
|
|
prep.get("X", 0),
|
|
prep.get("Y", 0),
|
|
prep.get("Z", 0),
|
|
prep.get("U", 0),
|
|
prep.get("V", 0),
|
|
prep.get("W", 0),
|
|
),
|
|
self.start_world_coordinates,
|
|
)
|
|
)
|
|
return result
|