59 lines
1.6 KiB
Python
59 lines
1.6 KiB
Python
import socket
|
|
import json
|
|
import os
|
|
import math
|
|
import numpy as np
|
|
|
|
from typing import Literal
|
|
from pprint import pprint
|
|
|
|
import threading
|
|
import pybullet as p
|
|
import pybullet_industrial as pi
|
|
|
|
from logger import logger
|
|
|
|
|
|
class SocketManager:
|
|
def __init__(self):
|
|
self.host = None
|
|
self.port = 9760
|
|
self.socket = None
|
|
self.status: Literal["connected", "not_connected", "error"] = "not_connected"
|
|
|
|
def connect(self, host):
|
|
self.host = host
|
|
if self.socket is None:
|
|
self.socket = socket.socket()
|
|
logger.info(f"trying connect to {(self.host, self.port)}")
|
|
self.socket.connect((self.host, self.port))
|
|
self.status = "connected"
|
|
|
|
def close(self):
|
|
if self.socket:
|
|
self.socket.close()
|
|
self.socket = None
|
|
self.status = "not_connected"
|
|
|
|
def send_data(self, data):
|
|
if not self.socket:
|
|
return
|
|
try:
|
|
self.socket.send(str.encode(json.dumps(data)))
|
|
response_data = self.socket.recv(1024)
|
|
response = json.loads(response_data)
|
|
|
|
if data["reqType"] == "query":
|
|
return response["queryData"]
|
|
elif data["reqType"] == "command":
|
|
return response["cmdReply"]
|
|
elif data["reqType"] == "AddRCC" and "cmdReply" in response.keys():
|
|
return response["cmdReply"]
|
|
else:
|
|
pprint(response_data)
|
|
|
|
return json.loads(response_data)
|
|
except Exception as e:
|
|
logger.error(f"Error sending data: {e}")
|
|
return None
|