Abstract Parent Class Interfaces¶
Abstract Parent Class Interfaces are the functions that can be added with a child class that will inherit from its parent class. You can rewrite the functions according to actual requirements. This section will introduce you to the following abstract parent classes.
Communication¶
The source code belongs to communication category is stored in the /src/interface/communication.py
file in the installation directory of Mech-Center.
Communication Class¶
Communication class is a basis class used for communication. It provides a series of interface functions which should be rewritten on the server or client.
Class Function |
Description |
is_connected() |
Determine if the current connection is successful. |
set_recv_size() |
Set the length of the data received each time; the default value is 1024 bytes. |
send() |
Interface function which is used to send data. |
recv() |
Interface function which is used to receive data. |
close() |
Interface function which is used to close the socket communication. |
before_recv() |
Interface function which you can add an logic according to actual needs before receiving the data. Function overriding is available. |
after_recv() |
Interface function which you can add an logic according to actual needs after receiving the data. Function overriding is available. |
after_handle() |
Interface function which you can add an logic according to actual needs after processing the data. Function overriding is available. |
TcpServer Class¶
TcpServer class encapsulates a TCP/IP Socket server.
Class Function |
Description |
bind_and_listen() |
Bind the port |
local_socket() |
Provide the local Socket information |
remote_socket() |
Provide the remote Socket information |
accept() |
Accept the connection with the client |
send() |
Send the data |
recv() |
Receive the data |
close() |
Close the socket |
close_client() |
Disconnect with the client |
TcpClinet Class¶
TcpClient class encapsulates a TCP/IP Socket client.
Class Property |
Description |
is_bind_port |
Whether to bind the port. If there is a port restriction on the client, the value should be set to True. |
Class Function |
Description |
send() |
Send the data |
recv() |
Receive the data |
close() |
Close the socket |
set_timeout() |
Timeout period in seconds |
reconnect_server() |
Reconnect to the server |
after_connect_server() |
Interface function which is used to specify what to do after the first successful connection with the server |
after_reconnect_server() |
Interface function which is used to specify what to do after reconnecting with the server |
after_timeout() |
Interface function which is used to specify what to do after the timeout period |
Adapter¶
The source code belongs to Adapter category is stored in the /src/interface/adapter.py
file in the installation directory of Mech-Center.
Adapter Base Class¶
Adapter encapsulates functions related to Mech-Viz, Mech-Vision, Mech-Center, Robserver, including start_viz(), stop_viz(), set_task_property(), and set_step_property(). When an Adapter program is used to control Mech-Viz or Mech-Vision, the child class of Adapter base class must be used.
The class properties of Adapter are shown in the table below.
Class Property |
Description |
viz_project_dir |
Directory of the current Mech-Viz project |
vision_project_name |
Name of the current Mech-Vision project |
is_simulate |
Whether to run Mech-Viz in simulation mode |
is_keep_viz_state |
Whether to keep the state when Mech-Viz stopped last time |
is_save_executor_data |
Whether to save the data of Mech-Viz executor |
is_force_simulate |
Whether to force Mech-Viz to run in simulation mode |
is_force_real_run |
Whether to force Mech-Viz to run and control the real robot |
code_signal |
The signal of displaying Adapter information in the main interface of Mech-Center (including error codes) |
msg_signal |
The signal of displaying Adapter information in the main interface of Mech-Center (NOT including error codes) |
i_code_signal |
The signal of displaying Mech-Interface information in the main interface of Mech-Center (including error codes) |
i_msg_signal |
The signal of displaying Mech-Interface information in the main interface of Mech-Center (NOT including error codes) |
viz_finished_signal |
The signal indicating that Mech-Viz is stopped normally or with an error |
connect_robot_signal |
Connect/disconnect with the robot signal |
start_adapter_signal |
Start Adapter signal |
service_name_changed |
The signal of displaying statuses of Mech-Viz and Mech-Vision in the main interface of Mech-Center |
setting_infos |
The configuration information of Mech-Center |
service_name |
Name of the registered service |
The Adapter class functions are as shown in the table below.
Class Function |
Description |
on_exec_status_changed() |
Receive the status information from Mech-Viz and Mech-Vision |
register_self_service() |
Register Adapter service |
vision_project_dirs(self): |
Check the directory of Mech-Vision project |
vision_project_names() |
Check all project names in Mech-Vision |
vision_project_names_in_center() |
Check the names of all Mech-Vision projects loaded in Mech-Center |
is_viz_registered() |
Check whether Mech-Viz is registered in Mech-Center |
is_viz_in_running() |
Check whether Mech-Viz is running |
is_vision_started() |
Check whether Mech-Vision project is registered in Mech-Center |
find_services() |
Check the service |
before_start_viz() |
This function will be called before starting Mech-Viz |
after_start_viz() |
This function will be called after starting Mech-Viz |
viz_not_registerd() |
This function will be called if Mech-Viz is not registered after being started |
viz_is_running() |
This function will be called if Mech-Viz is already running when starting Mech-Viz |
viz_run_error() |
This function will be called if an error occurs when running Mech-Viz |
viz_run_finished() |
This function will be called after Mech-Viz is stopped |
viz_plan_failed() |
This function will be called after Mech-Viz fails to plan a path |
viz_unreachable_targets() |
This function will be called when there is an unreachable target in the path planned by Mech-Viz |
viz_collision_checked() |
This function will be called when a collision is detected in the path planned by Mech-Viz |
parse_viz_reply() |
Parse the reply from Mech-Viz |
wait_viz_result() |
Wait for Mech-Viz to reply |
start_viz() |
Start Mech-Viz |
stop_viz() |
Stop Mech-Viz |
pause_viz() |
Pause Mech-Viz |
find_vision_pose() |
Trigger Mech-Vision project to capture images |
async_call_vision_run() |
Trigger Mech-Vision project to capture images asynchronously |
async_get_vision_callback() |
Receive the result from Mech-Vision asynchronously |
deal_vision_result() |
Process the result from Mech-Vision |
set_step_property() |
Set Step parameters in Mech-Vision |
read_step_property() |
Read Step parameters in Mech-Vision |
select_parameter_group() |
Select the parameter recipe in Mech-Vision |
set_task_property() |
Set Task parameters in Mech-Viz |
read_task_property() |
Read Task parameters in Mech-Viz |
get_digital_in() |
Obtain DI |
set_digital_out() |
Set DO |
before_start_adapter() |
This function will be called before starting Adapter |
start() |
Start Adapter |
close() |
Stop Adapter |
handle_command() |
Process commands received from external devices |
TcpServerAdapter Class¶
As shown below, TcpServerAdapter class inherits from Adapter, and it encapsulates functions of TcpServer.
class TcpServerAdapter(Adapter):
def __init__(self, host_address, server=TcpServer):
super(TcpServerAdapter, self).__init__()
self.init_server(host_address, server)
def init_server(self, host_address, server=TcpServer):
self._server = server(host_address)
def set_recv_size(self, size):
self._server.set_recv_size(size)
def send(self, msg, is_logging=True):
return self._server.send(msg, is_logging)
def recv(self):
return self._server.recv()
def start(self):
self.before_start_adapter()
while not self.is_stop_adapter:
try:
self._server.before_recv()
cmds = self._server.recv()
logging.info("Received raw data from client:{}".format(cmds))
if not cmds:
logging.warning("Adapter client is disconnected!")
self.code_signal.emit(logging.WARNING, CENTER_CLIENT_DISCONNECTED)
self._server.close_client()
self.accept()
continue
self._server.after_recv()
except socket.error:
logging.warning("Adapter client is closed!")
self.code_signal.emit(logging.WARNING, CENTER_CLIENT_DISCONNECTED)
self._server.close_client()
self.accept()
except Exception as e:
logging.exception("Exception occurred when receiving data from client: {}.".format(e))
else:
try:
self.handle_command(cmds)
self._server.after_handle()
except Exception as e:
self.msg_signal.emit(logging.ERROR, _translate("messages", "Handle command exception: {}".format(e)))
logging.exception("Adapter exception in handle_command(): {}".format(e))
def close(self):
super().close()
self._server.close()
def before_start_adapter(self):
super().before_start_adapter()
self.accept()
def accept(self):
if self.is_stop_adapter:
return
self.code_signal.emit(logging.INFO, CENTER_WAIT_FOR_CLIENT)
self._server.accept()
if self._server.is_connected():
self.code_signal.emit(logging.INFO, CENTER_CLIENT_CONNECTED)
self.msg_signal.emit(logging.INFO, _translate("messages", "Client address is") + " {}".format(self._server.remote_socket()[1]))
TcpClientAdapter Class¶
As shown below, TcpClientAdapter class inherits from Adapter, and it encapsulates functions of TcpClient.
class TcpClientAdapter(Adapter):
def __init__(self, host_address):
super().__init__()
self.init_client(host_address)
def init_client(self, host_address, client=TcpClient):
self._client = client(host_address)
def set_bind_port(self, is_bind=True):
self._client.is_bind_port = is_bind
def set_recv_size(self, size):
self._client.set_recv_size(size)
def send(self, msg, is_logging=True):
self._client.send(msg, is_logging)
def recv(self):
return self._client.recv()
def start(self):
self.reconnect_server(False)
while not self.is_stop_adapter:
try:
self._client.before_recv()
cmds = self._client.recv()
if not cmds:
self.reconnect_server()
continue
logging.info("Received command from server:{}".format(cmds))
self._client.after_recv()
except socket.timeout:
logging.warning("Socket timeout")
self._client.after_timeout()
except socket.error:
sleep(5)
self.reconnect_server()
except Exception as e:
logging.exception("Exception occurred when receiving from server: {}".format(e))
else:
try:
self.handle_command(cmds)
except Exception as e:
self.msg_signal.emit(logging.ERROR, _translate("messages", "Handle command exception: {}".format(e)))
logging.exception("Adapter exception in handle_command(): {}".format(e))
def close(self):
super().close()
self._client.close()
def reconnect_server(self, is_reconnect=True):
self._client.reconnect_server()
if self.is_stop_adapter:
return
if self._client.is_connected():
self.code_signal.emit(logging.INFO, CENTER_CONNECT_TO_SERVER)
else:
self.code_signal.emit(logging.WARNING, CENTER_SERVER_DISCONNECTED)
TcpMultiplexingServerAdapter Class¶
As shown below, TcpMultiplexingServerAdapter class inherits from Adapter, and it is mainly used to connect multiple clients.
class TcpMultiThreadingServerAdapter(Adapter):
def __init__(self, address):
super().__init__()
self._servers = {}
self.add_server(address)
self.sockets = {}
self.clients_ip = {}
self.thread_pool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="tcp_multi_server_thread")
self.thread_id_socket_dict = {}
self.set_recv_size()
def set_recv_size(self, size=1024):
self.recv_size = size
def _find_client_ip(self, sock):
for k, v in self.sockets.items():
if v == sock:
return k
def _find_server(self, sock):
for k, v in self._servers.items():
if v == sock:
return k
def add_server(self, host_address):
server = TcpServer(host_address)
server.bind_and_listen()
self._servers[server] = server.local_socket()
def set_clients_ip(self, clients_ip):
"""
Must be called before start().
`clients_ip` is a dict(key is client ip, value is client description).
"""
self.clients_ip = clients_ip
def add_connection(self, ip_port, sock):
self.sockets[ip_port] = sock
logging.info("Add {}, connections: {}".format(ip_port, self.sockets))
self.msg_signal.emit(logging.INFO, _translate("messages", "The client {} gets online.").format(ip_port))
def del_connection(self, ip):
logging.info("Del {}, connections: {}".format(ip, self.sockets))
if self.client_connection(ip):
self.client_connection(ip).close()
self.sockets.pop(ip)
self.msg_signal.emit(logging.WARNING, _translate("messages", "The client {} gets offline.").format(ip))
def client_connection(self, client_ip):
return self.sockets.get(client_ip)
def check_read_events(self, rs):
for s in rs:
if s in self._servers.values(): # recv connection
server = self._find_server(s)
if self.is_stop_adapter:
return
server.accept()
client_socket, client_addr = server.remote_socket()
ip_port = "{}:{}".format(str(client_addr[0]), str(client_addr[1]))
self.add_connection(ip_port, client_socket)
elif s in self.sockets.values(): # recv data
client_ip = self._find_client_ip(s)
if not client_ip:
continue
msg = self.recv_by_s(s)
if not msg:
self.del_connection(client_ip)
return
try:
future = self.thread_pool.submit(self.handle_command_thread, s, msg)
except Exception as e:
logging.exception("Adapter exception in handle_command(): {}".format(e))
def handle_command_thread(self, s, msg):
thread_id = threading.get_ident()
self.thread_id_socket_dict[thread_id] = s
self.handle_command(msg)
# del self.thread_id_socket_dict[thread_id]
def send(self, msg, is_logging=True):
thread_id = threading.get_ident()
sock = self.thread_id_socket_dict.get(thread_id)
len_total = len(msg)
while msg:
if sock:
len_sent = sock.send(msg)
else:
for v in self.sockets.values():
try:
len_sent = v.send(msg)
except Exception as e:
logging.warning(e)
if not len_sent:
logging.warning("Connection lost, close the client connection.")
return len_sent
if is_logging:
logging.info("Server send: {}, len_sent: {}".format(msg, len_sent))
msg = msg[len_sent:]
return len_total
def recv(self):
thread_id = threading.get_ident()
sock = self.thread_id_socket_dict.get(thread_id)
return self.recv_by_s(sock)
def recv_by_s(self, sock):
msg = b""
try:
msg = sock.recv(self.recv_size)
except socket.error:
logging.error("The client is closed!")
if msg:
logging.info("Received message: {}".format(msg))
return msg
def check_task(self):
"""
Interface.
"""
def close(self):
super().close()
for server in self._servers.keys():
server.close()
for client_ip in self.sockets.keys():
try:
self.client_connection(client_ip).close()
logging.info("Close socket :{}".format(client_ip))
except Exception as e:
logging.warning("Close socket error:{}, exception:{}".format(client_ip, e))
self.sockets = {}
def start(self):
self.before_start_adapter()
while not self.is_stop_adapter:
avalible_sockets = list(self.sockets.values()) + list(self._servers.values())
rs, _, _ = select(avalible_sockets, [], [], 0.1)
self.check_read_events(rs)
try:
self.check_task()
except Exception as e:
self.msg_signal.emit(logging.ERROR,
_translate("messages", "Handle command exception: {}".format(e)))
logging.exception("Exception when check task:{}".format(e))
sleep(5)
IOAdapter Class¶
As shown below, IOAdapter class inherits from Adapter, and it uses a while loop in obtaining DI.
class IOAdapter(Adapter):
robot_name = None
check_rate = 0.5
def __init__(self, host_address):
super().__init__()
self.last_gi = 0
def get_digital_in(self, timeout=None):
return super().get_digital_in(self.robot_name, timeout)
def set_digital_out(self, port, value, timeout=None):
super().set_digital_out(self.robot_name, port, value, timeout)
def _check_gi(self):
gi_js = self.get_digital_in()
gi = int(json.loads(gi_js.decode())["value"])
if self.last_gi != gi:
self.last_gi = gi
logging.info("Check GI signal status: {}".format(gi))
self.handle_gi(gi)
def start(self):
self.before_start_adapter()
while not self.is_stop_adapter:
try:
self._check_gi()
except Exception as e:
logging.exception(e)
self.check_gi_failed()
sleep(self.check_rate)
def handle_gi(self, gi):
"""
Interface.
"""
def check_gi_failed(self):
"""
Interface.
"""
AdapterWidget Class¶
As shown below, AdapterWidget class is a parent class, and all functions related to user interface customization must be its child classes.
class AdapterWidget(QWidget):
def set_adapter(self, adapter):
self.adapter = adapter
self.after_set_adapter()
def after_set_adapter(self):
"""
Interface.
"""
def close(self):
super().close()
"""
Interface.
"""
Service¶
The source code belongs to service category is stored in the /src/interface/services.py
file in the installation directory of Mech-Center.
NotifyService Class¶
The code of NotifyService class is shown below.
class NotifyService(JsonService):
service_type = "notify"
service_name = "adapter"
def handle_message(self, msg):
"""
Interface.
"""
def notify(self, request, _):
msg = request["notify_message"]
logging.info("notify message:{}".format(msg))
return self.handle_message(msg)
The default service name is “adapter”. If multiple notify services are needed in the project, you can override the service_name to distinguish different services.
The descriptions of class functions are shown in the table below.
Class Function |
Description |
handle_message() |
Interface function; the child class can be overriden |
notify() |
Parse the message; the child class usually does not need to be overriden |
VisionResultSelectedAtService Class¶
The code of VisionResultSelectedAtService class is shown below.
class VisionResultSelectedAtService(JsonService):
service_type = "vision_watcher"
service_name = "vision_watcher_adapter"
def __init__(self):
self.poses = None
def poses_found(self, result):
"""
Interface.
"""
def posesFound(self, request, _):
logging.info("{} result:{}".format(jk.mech_vision, request))
self.poses_found(request)
def poses_planned(self, result):
"""
Interface.
"""
def posesPlanned(self, request, _):
logging.info("Plan result:{}".format(request))
self.poses_planned(request)
def multiPickCombination(self, request, _):
logging.info("multiPickCombination:{}".format(request))
The default service type is vision_watcher, which cannot be modified. The default service name is vision_watcher_adapter. If multiple vision_watcher services are needed in the project, you can modify the service_name in the child class to distinguish different services.
The descriptions of class functions are shown in the table below.
Class Function |
Description |
poses_found() |
Interface function whose child class can be overriden; the parameter is the recognition result of Mech-Vision |
posesFound() |
Parse the recognition result sent from Mech-Vision; child class function usually does not need to be overriden |
poses_planned() |
Interface function; the parameter is the vision point in the path planned by Mech-Viz |
posesPlanned() |
Parse the planning message sent from Mech-Viz |
RobotService Class¶
The RobotService class is shown below.
class RobotService(JsonService):
service_type = "robot"
service_name = "robot"
jps = [0, 0, 0, 0, 0, 0]
pose = [0, 0, 0, 1, 0, 0, 0]
def getJ(self, *_):
return {"joint_positions": self.jps}
def setJ(self, jps):
logging.info("setJ:{}".format(jps))
self.jps = jps
def getL(self, *_):
return {"tcp_pose": self.pose}
def getFL(self, *_):
return {"flange_pose": self.pose}
def setL(self, pose):
logging.info("setL:{}".format(pose))
self.pose = pose
def moveXs(self, params, _):
pass
def stop(self, *_):
pass
def setTcp(self, *_):
pass
def setDigitalOut(self, params, _):
pass
def getDigitalIn(self, *_):
pass
def switchPauseContinue(self, *_):
pass
The default service type is robot, which cannot be modified. The default service name is robot, which should be modified into the name of the real robot in the child class. You will need to set a home position in JPs or TCP in the child class. Please make sure that this pose will not cause collision with the scene objects.
The descriptions of class functions are shown in the table below.
Class Function |
Description |
getJ() |
Return JPs to Mech-Viz/Mech-Vision |
setJ() |
Set JPs in radians for external srvices |
getL() |
Return TCP to Mech-Viz/Mech-Vision |
getFL() |
Return flange pose to Mech-Viz/Mech-Vision |
setL() |
Set flange pose in quaternions for external srvices; the unit is meter |
moveXs() |
This function will be called after Mech-Viz finishes planning the path. The parameter contains property of targets from move Tasks. Please note that if Tasks that may interrupt pre-planning, such as check_di, branch_by_msg, Mech-Viz will call this function for multiple times. |
stop() |
Stop the robot (this function is seldom called) |
setTcp() |
Set TCP (this function is seldom called) |
setDigitalOut() |
Set DO (this function is seldom called) |
getDigitalIn() |
Obtain DI (this function is seldom called) |
switchPauseContinue() |
Pause/continue the robot (this function is seldom called) |
OuterMoveService Class¶
The code of OuterMoveService class is shown below.
class OuterMoveService(JsonService):
service_type = "outer_move"
service_name = "outer_move"
move_target_type = TCP_POSE
velocity = 0.25
acceleration = 0.25
blend_radius = 0.05
motion_type = MOVEJ
is_tcp_pose = False
pick_or_place = 0
def __init__(self):
self.targets = []
def gather_targets(self, di, jps, flange_pose):
"""
Interface.
Please add targets to `self.targets` here if needed.
"""
def add_target(self, move_target_type, target):
self.targets.append({"move_target_type": move_target_type, "target": target})
def getMoveTargets(self, params, *_):
"""
@return: targets(move_target_type 0:jps, 1:tcp_pose, 2:obj_pose)
velocity(default 0.25)
acceleration(default 0.25)
blend_radius(default 0.05)
motion_type(default moveJ 'J':moveJ, 'L':moveL)
is_tcp_pose(default False)
"""
di = params["di"]
jps = params["joint_positions"]
flange_pose = params["pose"]
logging.info("getMoveTargets: di={}, jps={}, flange_pose={}".format(di, jps, flange_pose))
self.gather_targets(di, jps, flange_pose)
targets = self.targets[:]
self.targets.clear()
logging.info("Targets: {}".format(targets))
return {"targets": targets, "velocity": self.velocity, "acceleration": self.acceleration, "blend_radius": self.blend_radius,
"motion_type": self.motion_type, "is_tcp_pose": self.is_tcp_pose, "pick_or_place": self.pick_or_place}
Both the default service type and service name are outer_move. If multiple outer_move services are needed in the project, you can modify the service_name in the child class to distinguish different services.
The descriptions of class functions are shown in the table below.
Class Function |
Description |
move_target_type() |
Type of the target; 0:jps, 1:tcp_pose, 2:obj_pose. |
velocity() |
Speed of the target; the default value is 0.25. |
acceleration() |
Acceleration of the target; the default value is 0.25. |
blend_radius() |
The turning radius of the target; the default value is 0.05m. |
motion_type() |
The motion type of the target: ‘J’:moveJ, ‘L’:moveL. |
is_tcp_pose() |
Whether the pose of the target is in TCP. |
gather_targets() |
Interface function, which collects all the targets. The parameter contains current JPs, flange pose, and DI value of the robot. The child class can be overriden according to actual requirements. |
add_target() |
Add a target; this function can be called in the child class to add a target. |
getMoveTargets() |
This function will be called when Mech-Viz proceeds on the outer_move Task. The parameter contains current JPs, flange pose, and DI value of the robot. |
Register Service¶
The services corresponding to the above 4 classes are only available after being registered. The function used for service registration is shown below.
def register_service(hub_caller, service, other_info=None):
server, port = start_server(service)
if service.service_type == "robot":
other_info["from_adapter"] = True
other_info["simulate"] = False
hub_caller.register_service(service.service_type, service.service_name, port, other_info)
return server, port