417 lines
13 KiB
Python
417 lines
13 KiB
Python
import socket
|
|
import json
|
|
import os
|
|
import math
|
|
import numpy as np
|
|
|
|
from typing import Literal
|
|
from pprint import pprint
|
|
|
|
import threading
|
|
import pybullet as p
|
|
import pybullet_industrial as pi
|
|
|
|
from logger import logger
|
|
|
|
from robot.func import *
|
|
from robot.prepare_data import PrepareRobotData
|
|
from robot.urdf_manager import UrdfManager
|
|
from robot.socket_manager import SocketManager
|
|
|
|
# os.environ["LIBGL_ALWAYS_SOFTWARE"] = "1"
|
|
|
|
|
|
class SocketRobotArm:
|
|
global_speed = 10
|
|
physical_speed = 10
|
|
# laser_id = 15
|
|
laser_id = 14
|
|
|
|
filename = "axis"
|
|
urdf_filename = "sample"
|
|
|
|
pass_size = 4
|
|
connected_status = Literal["connected", "not_connected", "error"]
|
|
|
|
_axis_coordinates = []
|
|
_world_coordinates = []
|
|
_command_count = []
|
|
|
|
command_type = "base"
|
|
command_data = None
|
|
|
|
motion_type = "reset"
|
|
motion_type = "normal"
|
|
|
|
def __init__(self, application_path, *args, **kwargs):
|
|
self.socket = None
|
|
self.host = None
|
|
self.port = 9760
|
|
self.slave_id = None
|
|
self.status: SocketRobotArm.connected_status = "not_connected"
|
|
|
|
self.q_app = None
|
|
self.imitate_point = None
|
|
|
|
robot_start_position = [0, -0, 0]
|
|
|
|
self.prepare_data = PrepareRobotData()
|
|
self.socket_manager = SocketManager()
|
|
self.urdf_manager = UrdfManager(
|
|
robot_start_position,
|
|
application_path,
|
|
self.palletizing,
|
|
)
|
|
|
|
def __exit__(self, exc_type, exc_value, traceback):
|
|
logger.info("exiting")
|
|
self.socket.close()
|
|
|
|
def start(self, type="DIRECT"):
|
|
self.urdf_manager.run_pybullet(type)
|
|
|
|
def start_loop(self, urdf):
|
|
self.urdf_manager.start_loop(urdf)
|
|
self.body_id = self.urdf_manager.body_id
|
|
self.num_joints = p.getNumJoints(self.body_id)
|
|
|
|
def load_palletizing(self):
|
|
self.urdf_manager.load_palletizing()
|
|
|
|
def start_moving(self):
|
|
self.urdf_manager.conveyor.moving = True
|
|
|
|
def get_pybullet_image(self):
|
|
return self.urdf_manager.get_pybullet_image()
|
|
|
|
def close(self):
|
|
self.socket_manager.close()
|
|
|
|
def connect(self, host, slave_id):
|
|
self.socket_manager.connect(host)
|
|
with open("delta.json", "w") as f:
|
|
pass
|
|
|
|
def get_status(self):
|
|
return self.socket_manager.status
|
|
|
|
def motionFund(self, radians):
|
|
if self.motion_type == "reset":
|
|
for index in radians:
|
|
p.resetJointState(self.urdf_manager.body_id, index, radians)
|
|
time.sleep(0.3)
|
|
else:
|
|
p.setJointMotorControlArray(
|
|
self.urdf_manager.body_id,
|
|
[i for i in range(len(radians))],
|
|
p.POSITION_CONTROL,
|
|
radians,
|
|
)
|
|
time.sleep(1)
|
|
|
|
def upd_model(self):
|
|
threading.Thread(target=self.upd_model_func, daemon=True).start()
|
|
|
|
def upd_model_func(self):
|
|
self.get_coordinates("axis")
|
|
self.set_text(text=f"Координаты осей {self.axis_coordinates}")
|
|
time.sleep(0.5)
|
|
|
|
self.get_coordinates("world")
|
|
self.set_text(text=f"Мировые координаты {self.world_coordinates}")
|
|
time.sleep(0.5)
|
|
|
|
self._fetch_command_count()
|
|
time.sleep(0.5)
|
|
|
|
axisJointPoses = self.axis_coordinates
|
|
self.motionFund(np.radians(axisJointPoses))
|
|
|
|
time.sleep(0.5)
|
|
|
|
if False:
|
|
coordinates = [float(p) * 0.001 for p in self.world_coordinates[:3]]
|
|
angles = [math.radians(p) for p in self.world_coordinates[3:6]]
|
|
worldJointPoses = self.convert_to_joint_base(coordinates, angles)
|
|
for i in range(self.num_joints):
|
|
v = worldJointPoses[i] if i < len(worldJointPoses) else 0
|
|
self.motionFund(worldJointPoses)
|
|
|
|
time.sleep(0.5)
|
|
|
|
calcWorldPosition = p.getLinkState(
|
|
self.urdf_manager.body_id,
|
|
self.num_joints - 1,
|
|
computeForwardKinematics=True,
|
|
)
|
|
calcWorldPosition = list(
|
|
[p * 1000 for p in calcWorldPosition[0]]
|
|
+ [
|
|
np.degrees(p)
|
|
for p in p.getEulerFromQuaternion(calcWorldPosition[1])
|
|
]
|
|
)
|
|
time.sleep(0.5)
|
|
|
|
position_diff = np.linalg.norm(
|
|
self.world_coordinates[:3] - np.array(calcWorldPosition[:3])
|
|
)
|
|
print(
|
|
"Разница между расчетом по модели и мировыми координатами:",
|
|
position_diff,
|
|
)
|
|
|
|
self.set_text(text=f"Команд в очереди {self.command_count}")
|
|
time.sleep(0.5)
|
|
|
|
def cycle_base(self):
|
|
self.upd_model()
|
|
|
|
self.send_data(self.set_global_speed())
|
|
self.set_text(text=f"Установили глобальную скорость {self.global_speed}")
|
|
time.sleep(0.5)
|
|
|
|
def cycle_start(self):
|
|
self.send_data(self.start_cycle())
|
|
self.set_text(text=f"Старт одиночного цикла")
|
|
time.sleep(0.5)
|
|
|
|
if self.command_type == "base":
|
|
commands = self.steps_from_file()
|
|
if self.command_type == "calc":
|
|
commands = self.convert_point_to_free()
|
|
if self.command_type == "pallette":
|
|
commands = self.get_palletizing_json()
|
|
with open(f"log/{self.command_type}.json", "w") as myfile:
|
|
myfile.write(json.dumps(commands))
|
|
|
|
self.add_rcc_list = (
|
|
[self.set_physical_speed(True), self.set_output_laser(True)]
|
|
+ commands
|
|
+ [self.set_physical_speed(False), self.set_output_laser(False)]
|
|
)
|
|
|
|
step = 4
|
|
empty = 1
|
|
for i in range(0, len(self.add_rcc_list), step):
|
|
self.command_data = f"Отправка данных {i}...{i+step-1}"
|
|
|
|
self.send_data(make_addrcc_data(self.add_rcc_list[i : i + step], empty))
|
|
empty = 0
|
|
time.sleep(0.05)
|
|
|
|
def imitate(self):
|
|
threading.Thread(target=self.imitate_func, daemon=True).start()
|
|
|
|
def palletizing(self, position, box_id):
|
|
dir_wall = (self.urdf_manager.conv_direction + 1) % 2
|
|
|
|
conv = position[:]
|
|
conv[dir_wall] -= 0.4
|
|
position[dir_wall] -= 0.02
|
|
|
|
json_res = []
|
|
tool_pos = [0, 0, 0]
|
|
|
|
for pos in [conv, position]:
|
|
jointPoses = self.convert_to_joint_base(
|
|
pos,
|
|
np.radians(tool_pos),
|
|
)
|
|
json_res.append(
|
|
self.prepare_data.make_step(
|
|
"free", np.degrees(jointPoses), [0, 0, 0, 0, 0, 0]
|
|
)
|
|
)
|
|
self.motionFund(jointPoses)
|
|
# time.sleep(2)
|
|
|
|
with open(f"data/palletizing.json", "w") as myfile:
|
|
myfile.write(json.dumps(json_res))
|
|
|
|
# Создаём констрейнт для прикрепления
|
|
constraint_id = p.createConstraint(
|
|
parentBodyUniqueId=self.body_id,
|
|
parentLinkIndex=self.num_joints - 1,
|
|
childBodyUniqueId=box_id,
|
|
childLinkIndex=-1,
|
|
jointType=p.JOINT_FIXED,
|
|
jointAxis=[0, 0, 0],
|
|
parentFramePosition=[0, 0, 0],
|
|
childFramePosition=[0, 0.2, 0],
|
|
parentFrameOrientation=p.getQuaternionFromEuler(np.radians([0, 0, 90])),
|
|
childFrameOrientation=p.getQuaternionFromEuler(np.radians([0, 0, 0])),
|
|
)
|
|
pal_pos = self.urdf_manager.pallet.position
|
|
for pos in [conv, pal_pos]:
|
|
jointPoses = self.convert_to_joint_base(
|
|
pos,
|
|
np.radians(tool_pos),
|
|
)
|
|
json_res.append(
|
|
self.prepare_data.make_step(
|
|
"free", np.degrees(jointPoses), [0, 0, 0, 0, 0, 0]
|
|
)
|
|
)
|
|
self.motionFund(jointPoses)
|
|
# time.sleep(2)
|
|
p.removeConstraint(constraint_id)
|
|
self.urdf_manager.conveyor.conveyor_stopped = False
|
|
|
|
def set_text(self, text):
|
|
logger.info(text)
|
|
|
|
def send_data(self, data):
|
|
return self.socket_manager.send_data(data)
|
|
|
|
def get_coordinates(self, coord_type: Literal["axis", "world"] = "axis"):
|
|
request = make_query_data([f"{coord_type}-{i}" for i in range(6)])
|
|
result = self.send_data(request)
|
|
if not result:
|
|
return
|
|
data = [float(i) for i in result]
|
|
if coord_type == "axis":
|
|
self._axis_coordinates = data
|
|
elif coord_type == "world":
|
|
self._world_coordinates = data
|
|
|
|
@property
|
|
def axis_coordinates(self):
|
|
return self._axis_coordinates
|
|
|
|
@property
|
|
def world_coordinates(self):
|
|
return self._world_coordinates
|
|
|
|
def _fetch_command_count(self):
|
|
request = make_query_data(["RemoteCmdLen"])
|
|
result = self.send_data(request)
|
|
self._command_count = result
|
|
|
|
@property
|
|
def command_count(self):
|
|
return self._command_count
|
|
|
|
def set_command_type(self, data):
|
|
self.command_type = data
|
|
|
|
def get_command_type(self):
|
|
return self.command_type
|
|
|
|
def get_command_data(self):
|
|
return self.command_data
|
|
|
|
def get_imitate_point(self):
|
|
return self.imitate_point
|
|
|
|
def set_global_speed(self):
|
|
return make_command_data(["modifyGSPD", str(self.global_speed * 10)])
|
|
|
|
def start_cycle(self):
|
|
return make_command_data(["actionSingleCycle"])
|
|
|
|
def set_physical_speed(self, status: bool = False):
|
|
return (
|
|
{
|
|
"oneshot": "0",
|
|
"action": "51",
|
|
"isUse": str(int(status)),
|
|
"speed": str(self.physical_speed * 1000),
|
|
},
|
|
)
|
|
|
|
def set_output_laser(self, status: bool = False):
|
|
return (
|
|
{
|
|
"oneshot": "0",
|
|
"action": "200",
|
|
"type": "0",
|
|
"io_status": str(int(status)),
|
|
"point": str(self.laser_id),
|
|
},
|
|
)
|
|
|
|
def imitate_func(self):
|
|
points = self.convert_point_to_free()
|
|
for point in points:
|
|
jointPoses = [np.radians(float(point[f"m{i}"])) for i in range(6)]
|
|
self.motionFund(jointPoses)
|
|
|
|
def get_palletizing_json(self):
|
|
with open(f"data/palletizing.json", "r") as fp:
|
|
data = json.load(fp)
|
|
return data
|
|
|
|
def convert_point_to_free(self):
|
|
points = self.steps_from_file()
|
|
res = []
|
|
for i, point in enumerate(points):
|
|
if point["action"] == "10":
|
|
self.imitate_point = i
|
|
points = [
|
|
point["m0"],
|
|
point["m1"],
|
|
point["m2"],
|
|
]
|
|
angles = [
|
|
point["m3"],
|
|
point["m4"],
|
|
point["m5"],
|
|
]
|
|
jointPoses = self.convert_to_joint_base(
|
|
[float(p) * 0.001 for p in points],
|
|
np.radians([float(x) for x in angles]),
|
|
)
|
|
res.append(
|
|
self.prepare_data.make_step(
|
|
"free", np.degrees(jointPoses), [0, 0, 0, 0, 0, 0]
|
|
)
|
|
)
|
|
return res
|
|
|
|
def convert_to_joint_base(self, coordinates, angles):
|
|
lower_limits = []
|
|
upper_limits = []
|
|
for i in range(self.num_joints):
|
|
limit = p.getJointInfo(self.body_id, i)
|
|
lower_limits.append(limit[8])
|
|
upper_limits.append(limit[9])
|
|
|
|
joint_angles = p.calculateInverseKinematics(
|
|
self.body_id,
|
|
endEffectorLinkIndex=self.num_joints - 1,
|
|
targetPosition=[float(c) for c in coordinates],
|
|
targetOrientation=p.getQuaternionFromEuler(angles),
|
|
)
|
|
|
|
return joint_angles
|
|
|
|
def steps_from_file(self):
|
|
if not self.world_coordinates:
|
|
return []
|
|
result = []
|
|
with open(f"data/{self.filename}.nc.result", "r") as fp:
|
|
for line in fp:
|
|
data = line.strip().split(" ")
|
|
prep = {}
|
|
axis = ["X", "Y", "Z", "U", "V", "W"]
|
|
for item in data:
|
|
if item and item[0] and item[0] in axis:
|
|
prep[item[:1]] = float(item[1:])
|
|
if len(prep) > 0:
|
|
r = self.prepare_data.make_step(
|
|
"line",
|
|
(
|
|
prep.get("X", 0),
|
|
prep.get("Y", 0),
|
|
prep.get("Z", 0),
|
|
prep.get("U", 0),
|
|
prep.get("V", 0),
|
|
prep.get("W", 0),
|
|
),
|
|
self.world_coordinates,
|
|
)
|
|
# print(r)
|
|
result.append(r)
|
|
return result
|