modbus_test/robot/conveyor.py

146 lines
5.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pybullet as p
import time
import threading
from logger import logger
class ConveyorBelt:
moving = False
def __init__(
self,
size,
position,
direction,
box_size,
box_spacing,
box_speed,
palletizinFunc,
):
self.direction = direction
self.conveyor_size = size
self.conveyor_position = position
self.conveyor_id = self.create_conveyor(size, position)
self.conveyor_stopped = False
self.box_size = box_size
self.box_spacing = box_spacing
self.box_speed = box_speed
self.boxes = []
self.create_boxes()
self.stop_thread = False
self.thread = threading.Thread(target=self.run)
self.thread.start()
self.palletizingFunc = palletizinFunc
def create_conveyor(self, size, position):
collision_shape_id = p.createCollisionShape(
shapeType=p.GEOM_BOX,
halfExtents=[size[0] / 2.0, size[1] / 2.0, size[2] / 2.0],
)
visual_shape_id = p.createVisualShape(
shapeType=p.GEOM_BOX,
halfExtents=[size[0] / 2.0, size[1] / 2.0, size[2] / 2.0],
rgbaColor=[0.5, 0.5, 0.5, 1],
)
conveyor_id = p.createMultiBody(
baseMass=0.0,
baseCollisionShapeIndex=collision_shape_id,
baseVisualShapeIndex=visual_shape_id,
basePosition=position,
)
return conveyor_id
def box_position(self, i=0):
position = self.conveyor_position[:]
position[self.direction] = (
self.conveyor_position[self.direction]
- self.conveyor_size[self.direction] * 0.5
+ i * (self.box_size[self.direction] + self.box_spacing)
+ self.box_size[0] * 0.5
)
return position
def create_boxes(self):
num_boxes = int(
self.conveyor_size[self.direction] / (self.box_size[0] + self.box_spacing)
)
for i in range(num_boxes):
position = self.box_position(i)
self.boxes.append(self.create_box(position))
def create_box(self, position):
collision_shape_id = p.createCollisionShape(
shapeType=p.GEOM_BOX,
halfExtents=[
self.box_size[0] / 2.0,
self.box_size[1] / 2.0,
self.box_size[2] / 2.0,
],
)
visual_shape_id = p.createVisualShape(
shapeType=p.GEOM_BOX,
halfExtents=[
self.box_size[0] / 2.0,
self.box_size[1] / 2.0,
self.box_size[2] / 2.0,
],
rgbaColor=[1, 0, 0, 1],
)
box_id = p.createMultiBody(
baseMass=1.0,
baseCollisionShapeIndex=collision_shape_id,
baseVisualShapeIndex=visual_shape_id,
basePosition=position,
baseOrientation=p.getQuaternionFromEuler([0, 0, 0]),
)
return box_id
def move_boxes(self):
if self.moving and not self.conveyor_stopped:
for box_id in self.boxes[:]:
pos, orn = p.getBasePositionAndOrientation(box_id)
if (
pos[self.direction] + self.box_size[0] * 0.5
> self.conveyor_position[self.direction]
+ self.conveyor_size[self.direction] * 0.5
):
# Если коробка выходит за конец конвейера, она падает вниз (физика продолжает действовать)
# Удаляем коробку с конвейера
self.boxes.remove(box_id)
self.conveyor_stopped = True
wall_position = list(pos)
dir_wall = (self.direction + 1) % 3
wall_position[dir_wall] += self.box_size[dir_wall]
self.call_robot(wall_position, box_id)
else:
new_pos = list(pos[:])
new_pos[self.direction] += self.box_speed
p.resetBasePositionAndOrientation(box_id, new_pos, orn)
# Добавляем новые коробки, если на конвейере меньше, чем максимально возможное количество
if len(self.boxes) < int(
self.conveyor_size[0] / (self.box_size[0] + self.box_spacing)
):
new_box = self.create_box(self.box_position())
self.boxes.append(new_box)
def call_robot(self, position, box_id):
# Здесь можно вызвать робота для выполнения операции, например:
logger.info("Конвейер остановлен, вызываем робота для работы с коробкой.")
self.palletizingFunc(position, box_id)
# time.sleep(5)
# self.conveyor_stopped = False
def run(self):
while not self.stop_thread:
self.move_boxes()
time.sleep(1)
def stop(self):
self.stop_thread = True
self.thread.join()