modbus_test/robot/socket_manager.py

59 lines
1.6 KiB
Python

import socket
import json
import os
import math
import numpy as np
from typing import Literal
from pprint import pprint
import threading
import pybullet as p
import pybullet_industrial as pi
from logger import logger
class SocketManager:
def __init__(self):
self.host = None
self.port = 9760
self.socket = None
self.status: Literal["connected", "not_connected", "error"] = "not_connected"
def connect(self, host):
self.host = host
if self.socket is None:
self.socket = socket.socket()
logger.info(f"trying connect to {(self.host, self.port)}")
self.socket.connect((self.host, self.port))
self.status = "connected"
def close(self):
if self.socket:
self.socket.close()
self.socket = None
self.status = "not_connected"
def send_data(self, data):
if not self.socket:
return
try:
self.socket.send(str.encode(json.dumps(data)))
response_data = self.socket.recv(1024)
response = json.loads(response_data)
if data["reqType"] == "query":
return response["queryData"]
elif data["reqType"] == "command":
return response["cmdReply"]
elif data["reqType"] == "AddRCC" and "cmdReply" in response.keys():
return response["cmdReply"]
else:
pprint(response_data)
return json.loads(response_data)
except Exception as e:
logger.error(f"Error sending data: {e}")
return None