191 lines
5.7 KiB
Python
191 lines
5.7 KiB
Python
import os
|
|
import sys
|
|
import glob
|
|
import json
|
|
import time
|
|
import argparse
|
|
import threading
|
|
from PyQt6.QtCore import QThread
|
|
from PyQt6.QtWidgets import QApplication
|
|
|
|
from robot.client_socket import SocketRobotArm
|
|
from gui.init import MainContentComponent
|
|
from logger import logger
|
|
|
|
|
|
class MyApp:
|
|
if getattr(sys, "frozen", False):
|
|
# Если приложение собрано в один файл
|
|
application_path = os.path.dirname(sys.executable)
|
|
else:
|
|
# Если приложение запускается как скрипт
|
|
application_path = os.path.dirname(os.path.abspath(__file__))
|
|
|
|
robots_json_path = os.path.join(application_path, "robots.json")
|
|
|
|
with open(robots_json_path, "r") as file:
|
|
robots = json.load(file)
|
|
|
|
robot_app = None
|
|
|
|
def __init__(self, mode):
|
|
self.mode = mode
|
|
|
|
self.startGui()
|
|
self.startRobot()
|
|
self.startWindow()
|
|
|
|
def startGui(self):
|
|
self.app = QApplication(sys.argv)
|
|
mainWindow = MainContentComponent(
|
|
get_status=self.get_status,
|
|
robotPanel={
|
|
"robots": self.robots,
|
|
"updateRobot": self.updateRobot,
|
|
"status": self.get_status,
|
|
},
|
|
statusPanel={
|
|
"status": self.get_status,
|
|
},
|
|
informerPanel={
|
|
"world_coord": self.get_world_coordinates,
|
|
"axis_coord": self.get_axis_coordinates,
|
|
"command_count": self.get_command_count,
|
|
"updateData": self.update_data_wrapper,
|
|
},
|
|
imitatorPanel={
|
|
"startImitate": self.imitate_wrapper,
|
|
"imitate_point": self.get_imitate_point,
|
|
},
|
|
commandPanel={
|
|
"startCommand": self.command_wrapper,
|
|
"changeType": self.change_type_wrapper,
|
|
"command_data": self.get_command_data,
|
|
"command_type": self.get_command_type,
|
|
},
|
|
visPanel={
|
|
"get_pybullet_image": self.get_pybullet_image_wrapper,
|
|
},
|
|
filenamePanel={
|
|
"files": self.get_data_files(),
|
|
"getFilename": self.get_filename,
|
|
"setFilename": self.set_filename,
|
|
},
|
|
)
|
|
mainWindow.setWindowTitle("ROBOT GUI")
|
|
self.window = mainWindow
|
|
|
|
def get_data_files(self):
|
|
files = glob.glob(f"{self.application_path}/data/*.nc.result")
|
|
return [os.path.basename(x) for x in files]
|
|
|
|
def startWindow(self):
|
|
self.window.show()
|
|
sys.exit(self.app.exec())
|
|
|
|
def startRobot(self):
|
|
self.robot_app = SocketRobotArm(application_path=self.application_path)
|
|
# Запускаем SocketRobotArm в отдельном потоке
|
|
threading.Thread(target=self.run_robot_arm, daemon=True).start()
|
|
|
|
def run_robot_arm(self):
|
|
time.sleep(1)
|
|
self.robot_app.start("SHARED_MEMORY")
|
|
|
|
def updateRobot(self, robot):
|
|
if robot in self.robots:
|
|
selected_robot = robot
|
|
if self.mode == "test":
|
|
robot["host"] = "127.0.0.1"
|
|
|
|
if self.robot_app.status == "connected":
|
|
self.robot_app.close()
|
|
time.sleep(0.3)
|
|
else:
|
|
self.robot_app.connect(
|
|
selected_robot["host"], selected_robot["slave_id"]
|
|
)
|
|
self.robot_app.start_loop(selected_robot["urdf"])
|
|
|
|
def check_robot_app(func):
|
|
def wrapper(self, *args, **kwargs):
|
|
if self.robot_app:
|
|
return func(self, *args, **kwargs)
|
|
else:
|
|
logger.info(f"robot_app еще не инициализирован. Метод: {func.__name__}")
|
|
return None # Или любое значение по умолчанию
|
|
|
|
return wrapper
|
|
|
|
@check_robot_app
|
|
def get_status(self):
|
|
return self.robot_app.get_status()
|
|
|
|
@check_robot_app
|
|
def get_world_coordinates(self):
|
|
return self.robot_app.world_coordinates
|
|
|
|
@check_robot_app
|
|
def get_axis_coordinates(self):
|
|
return self.robot_app.axis_coordinates
|
|
|
|
@check_robot_app
|
|
def get_command_count(self):
|
|
return self.robot_app.command_count
|
|
|
|
@check_robot_app
|
|
def get_command_data(self):
|
|
return self.robot_app.get_command_data()
|
|
|
|
@check_robot_app
|
|
def get_command_type(self):
|
|
return self.robot_app.get_command_type()
|
|
|
|
@check_robot_app
|
|
def get_imitate_point(self):
|
|
return self.robot_app.get_imitate_point()
|
|
|
|
@check_robot_app
|
|
def change_type_wrapper(self, type="base"):
|
|
return self.robot_app.set_command_type(type)
|
|
|
|
@check_robot_app
|
|
def command_wrapper(self, *args, **kwargs):
|
|
# logger.info(args)
|
|
# logger.info(kwargs)
|
|
return self.robot_app.cycle_start()
|
|
|
|
@check_robot_app
|
|
def imitate_wrapper(self, *args, **kwargs):
|
|
return self.robot_app.imitate()
|
|
|
|
@check_robot_app
|
|
def update_data_wrapper(self, *args, **kwargs):
|
|
return self.robot_app.upd_model()
|
|
|
|
@check_robot_app
|
|
def get_pybullet_image_wrapper(self):
|
|
return self.robot_app.get_pybullet_image()
|
|
|
|
@check_robot_app
|
|
def get_filename(self):
|
|
return self.robot_app.filename
|
|
|
|
@check_robot_app
|
|
def set_filename(self, filename):
|
|
self.robot_app.filename = filename
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="MyApp Command Line Interface")
|
|
parser.add_argument(
|
|
"--mode",
|
|
type=str,
|
|
choices=["test", "prod"],
|
|
default="prod",
|
|
help="Mode of operation",
|
|
)
|
|
# MyApp()
|
|
args = parser.parse_args()
|
|
app = MyApp(args.mode)
|