Abstract Parent Class Interfaces
Abstract Parent Class Interfaces are the functions that can be added with a child class that will inherit from its parent class. You can rewrite the functions according to actual requirements. This section will introduce you to the following abstract parent classes.
Communication
Source code belonging to the communication category is stored in the Mech-Center/src/interface/communication.py
file in the installation directory of Mech-Vision and Mech-Viz.
Communication Class
Communication class is a basic class used for communication. It provides a series of interface functions that should be rewritten on the server or client.
Class Function | Description |
---|---|
is_connected() |
Determine if the current connection is successful. |
set_recv_size() |
Set the length of the data received each time; the default value is 1024 bytes. |
send() |
Interface function that is used to send data. |
recv() |
Interface function that is used to receive data. |
close() |
Interface function that is used to close the socket communication. |
before_recv() |
Interface function that you can add a logic according to actual needs before receiving the data. Function overriding is available. |
after_recv() |
Interface function that you can add a logic according to actual needs after receiving the data. Function overriding is available. |
after_handle() |
Interface function that you can add a logic according to actual needs after processing the data. Function overriding is available. |
TcpServer Class
TcpServer class encapsulates a TCP/IP Socket server.
Class Function | Description |
---|---|
bind_and_listen() |
Bind the port. |
local_socket() |
Provide the local Socket information. |
remote_socket() |
Provide the remote Socket information. |
accept() |
Accept the connection with the client. |
send() |
Send the data. |
recv() |
Receive the data. |
close() |
Close the socket. |
close_client() |
Disconnect with the client. |
TcpClinet Class
TcpClient class encapsulates a TCP/IP Socket client.
Class Property | Description |
---|---|
is_bind_port |
Whether to bind the port. If there is a port restriction on the client, the value should be set to True. |
Class Function | Description |
---|---|
send() |
Send the data. |
recv() |
Receive the data. |
close() |
Close the socket. |
set_timeout() |
Set the timeout value in seconds. |
reconnect_server() |
Reconnect to the server. |
after_connect_server() |
Interface function that is used to specify what to do after the first successful connection with the server. |
after_reconnect_server() |
Interface function that is used to specify what to do after reconnecting with the server. |
after_timeout() |
Interface function that is used to specify what to do after the timeout. |
Adapter
Source code belonging to the Adapter category is stored in the Mech-Center/src/interface/adapter.py
file in the installation directory of Mech-Vision and Mech-Viz.
Adapter Base Class
Adapter encapsulates functions related to Mech-Viz, Mech-Vision, Mech-Center, and Robserver, including start Mech-Viz, stop Mech-Viz, set Mech-Vision or Mech-Viz Step parameters, and start Mech-Vision. When an Adapter program is used to call Mech-Viz or Mech-Vision, the child class of Adapter base class must be used.
The class properties of Adapter are shown in the table below.
Class Property | Description |
---|---|
viz_project_dir |
Directory of the current Mech-Viz project |
vision_project_name |
Name of the current Mech-Vision project |
is_simulate |
Whether to run Mech-Viz in simulation mode |
is_keep_viz_state |
Whether to keep the state when Mech-Viz stopped last time |
is_save_executor_data |
Whether to save the data of Mech-Viz executor |
is_force_simulate |
Whether to force Mech-Viz to run in simulation mode |
is_force_real_run |
Whether to force Mech-Viz to run and control the real robot |
code_signal |
The signal of displaying Adapter information in the Console tab of the log panel in Mech-Vision (including error codes) |
msg_signal |
The signal of displaying Adapter information in the Console tab of the log panel in Mech-Vision (not including error codes) |
i_code_signal |
The signal of displaying Mech-Interface information in the Console tab of the log panel in Mech-Vision (including error codes) |
i_msg_signal |
The signal of displaying Mech-Interface information in the Console tab of the log panel in Mech-Vision (not including error codes) |
viz_finished_signal |
The signal indicating that Mech-Viz is stopped normally or with an error |
connect_robot_signal |
Connect/disconnect with the robot signal |
start_adapter_signal |
Start the Adapter signal |
service_name_changed |
The signal of displaying status of Mech-Viz and Mech-Vision in the Console tab of the log panel in Mech-Vision |
setting_infos |
The configuration information |
service_name |
Name of the registered service |
The Adapter class functions are as shown in the table below.
Class Function | Description |
---|---|
on_exec_status_changed() |
Receive the status information from Mech-Viz and Mech-Vision. |
register_self_service() |
Register the Adapter service. |
vision_project_dirs(self) |
Query the directory of the Mech-Vision project. |
vision_project_names() |
Query all Mech-Vision project names. |
vision_project_names_in_center() |
Query the names of all loaded Mech-Vision projects. |
is_viz_registered() |
Check whether Mech-Viz is registered. |
is_viz_in_running() |
Check whether Mech-Viz is running. |
is_vision_started() |
Check whether the Mech-Vision project is registered. |
find_services() |
Find services. |
before_start_viz() |
This function will be called before starting Mech-Viz. |
after_start_viz() |
This function will be called after starting Mech-Viz. |
viz_not_registerd() |
This function will be called if Mech-Viz is not registered after being started. |
viz_is_running() |
This function will be called if Mech-Viz is already running when starting Mech-Viz. |
viz_run_error() |
This function will be called if an error occurs when running Mech-Viz after Mech-Viz is started. |
viz_run_finished() |
This function will be called after Mech-Viz running is finished. |
viz_plan_failed() |
This function will be called after Mech-Viz fails to plan a path. |
viz_no_targets() |
This function will be called when the path planned by Mech-Viz has no moving waypoints. |
viz_unreachable_targets() |
This function will be called when the path planned by Mech-Viz contains a waypoint that cannot be reached. |
viz_collision_checked() |
This function will be called when a collision is detected in the path planned by Mech-Viz. |
parse_viz_reply() |
Parse the reply from Mech-Viz. |
wait_viz_result() |
Wait for Mech-Viz to reply. |
start_viz() |
Start Mech-Viz. |
stop_viz() |
Stop Mech-Viz. |
pause_viz() |
Pause Mech-Viz. |
find_vision_pose() |
Trigger the Mech-Vision project to capture images. |
async_call_vision_run() |
Trigger the Mech-Vision project to capture images asynchronously. |
async_get_vision_callback() |
Receive the result from Mech-Vision asynchronously. |
deal_vision_result() |
Process the result from Mech-Vision. |
set_step_property() |
Set Step parameters in Mech-Vision. |
read_step_property() |
Read Step parameters in Mech-Vision. |
select_parameter_group() |
Select the parameter recipe in the Mech-Vision project. |
set_task_property() |
Set Step parameters in Mech-Viz. |
read_task_property() |
Read Step parameters in Mech-Viz. |
get_digital_in() |
Obtain DI. |
set_digital_out() |
Set DO. |
before_start_adapter() |
This function will be called before starting Adapter. |
start() |
Start Adapter. |
close() |
Stop Adapter. |
handle_command() |
Process commands received from external devices. |
TcpServerAdapter Class
As shown below, TcpServerAdapter class inherits from Adapter, and it encapsulates functions of TcpServer.
class TcpServerAdapter(Adapter):
def __init__(self, host_address, server=TcpServer):
super(TcpServerAdapter, self).__init__()
self.init_server(host_address, server)
def init_server(self, host_address, server=TcpServer):
self._server = server(host_address)
def set_recv_size(self, size):
self._server.set_recv_size(size)
def send(self, msg, is_logging=True):
return self._server.send(msg, is_logging)
def recv(self):
return self._server.recv()
def start(self):
self.before_start_adapter()
while not self.is_stop_adapter:
try:
self._server.before_recv()
cmds = self._server.recv()
logging.info("Received raw data from client:{}".format(cmds))
if not cmds:
logging.warning("Adapter client is disconnected!")
self.code_signal.emit(logging.WARNING, CENTER_CLIENT_DISCONNECTED)
self._server.close_client()
self.accept()
continue
self._server.after_recv()
except socket.error:
logging.warning("Adapter client is closed!")
self.code_signal.emit(logging.WARNING, CENTER_CLIENT_DISCONNECTED)
self._server.close_client()
self.accept()
except Exception as e:
logging.exception("Exception occurred when receiving data from client: {}.".format(e))
else:
try:
self.handle_command(cmds)
self._server.after_handle()
except Exception as e:
self.msg_signal.emit(logging.ERROR, _translate("messages", "Handle command exception: {}".format(e)))
logging.exception("Adapter exception in handle_command(): {}".format(e))
def close(self):
super().close()
self._server.close()
def before_start_adapter(self):
super().before_start_adapter()
self.accept()
def accept(self):
if self.is_stop_adapter:
return
self.code_signal.emit(logging.INFO, CENTER_WAIT_FOR_CLIENT)
self._server.accept()
if self._server.is_connected():
self.code_signal.emit(logging.INFO, CENTER_CLIENT_CONNECTED)
self.msg_signal.emit(logging.INFO, _translate("messages", "Client address is") + " {}".format(self._server.remote_socket()[1]))
TcpClientAdapter Class
As shown below, TcpClientAdapter class inherits from Adapter, and it encapsulates functions of TcpClient.
class TcpClientAdapter(Adapter):
def __init__(self, host_address):
super().__init__()
self.init_client(host_address)
def init_client(self, host_address, client=TcpClient):
self._client = client(host_address)
def set_bind_port(self, is_bind=True):
self._client.is_bind_port = is_bind
def set_recv_size(self, size):
self._client.set_recv_size(size)
def send(self, msg, is_logging=True):
self._client.send(msg, is_logging)
def recv(self):
return self._client.recv()
def start(self):
self.reconnect_server(False)
while not self.is_stop_adapter:
try:
self._client.before_recv()
cmds = self._client.recv()
if not cmds:
self.reconnect_server()
continue
logging.info("Received command from server:{}".format(cmds))
self._client.after_recv()
except socket.timeout:
logging.warning("Socket timeout")
self._client.after_timeout()
except socket.error:
sleep(5)
self.reconnect_server()
except Exception as e:
logging.exception("Exception occurred when receiving from server: {}".format(e))
else:
try:
self.handle_command(cmds)
except Exception as e:
self.msg_signal.emit(logging.ERROR, _translate("messages", "Handle command exception: {}".format(e)))
logging.exception("Adapter exception in handle_command(): {}".format(e))
def close(self):
super().close()
self._client.close()
def reconnect_server(self, is_reconnect=True):
self._client.reconnect_server()
if self.is_stop_adapter:
return
if self._client.is_connected():
self.code_signal.emit(logging.INFO, CENTER_CONNECT_TO_SERVER)
else:
self.code_signal.emit(logging.WARNING, CENTER_SERVER_DISCONNECTED)
TcpMultiplexingServerAdapter Class
As shown below, TcpMultiplexingServerAdapter class inherits from Adapter, and it is mainly used to connect multiple clients.
class TcpMultiThreadingServerAdapter(Adapter):
def __init__(self, address):
super().__init__()
self._servers = {}
self.add_server(address)
self.sockets = {}
self.clients_ip = {}
self.thread_pool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="tcp_multi_server_thread")
self.thread_id_socket_dict = {}
self.set_recv_size()
def set_recv_size(self, size=1024):
self.recv_size = size
def _find_client_ip(self, sock):
for k, v in self.sockets.items():
if v = sock:
return k
def _find_server(self, sock):
for k, v in self._servers.items():
if v = sock:
return k
def add_server(self, host_address):
server = TcpServer(host_address)
server.bind_and_listen()
self._servers[server] = server.local_socket()
def set_clients_ip(self, clients_ip):
"""
Must be called before start().
`clients_ip` is a dict(key is client ip, value is client description).
"""
self.clients_ip = clients_ip
def add_connection(self, ip_port, sock):
self.sockets[ip_port] = sock
logging.info("Add {}, connections: {}".format(ip_port, self.sockets))
self.msg_signal.emit(logging.INFO, _translate("messages", "The client {} gets online.").format(ip_port))
def del_connection(self, ip):
logging.info("Del {}, connections: {}".format(ip, self.sockets))
if self.client_connection(ip):
self.client_connection(ip).close()
self.sockets.pop(ip)
self.msg_signal.emit(logging.WARNING, _translate("messages", "The client {} gets offline.").format(ip))
def client_connection(self, client_ip):
return self.sockets.get(client_ip)
def check_read_events(self, rs):
for s in rs:
if s in self._servers.values(): # recv connection
server = self._find_server(s)
if self.is_stop_adapter:
return
server.accept()
client_socket, client_addr = server.remote_socket()
ip_port = "{}:{}".format(str(client_addr[0]), str(client_addr[1]))
self.add_connection(ip_port, client_socket)
elif s in self.sockets.values(): # recv data
client_ip = self._find_client_ip(s)
if not client_ip:
continue
msg = self.recv_by_s(s)
if not msg:
self.del_connection(client_ip)
return
try:
future = self.thread_pool.submit(self.handle_command_thread, s, msg)
except Exception as e:
logging.exception("Adapter exception in handle_command(): {}".format(e))
def handle_command_thread(self, s, msg):
thread_id = threading.get_ident()
self.thread_id_socket_dict[thread_id] = s
self.handle_command(msg)
# del self.thread_id_socket_dict[thread_id]
def send(self, msg, is_logging=True):
thread_id = threading.get_ident()
sock = self.thread_id_socket_dict.get(thread_id)
len_total = len(msg)
while msg:
if sock:
len_sent = sock.send(msg)
else:
for v in self.sockets.values():
try:
len_sent = v.send(msg)
except Exception as e:
logging.warning(e)
if not len_sent:
logging.warning("Connection lost, close the client connection.")
return len_sent
if is_logging:
logging.info("Server send: {}, len_sent: {}".format(msg, len_sent))
msg = msg[len_sent:]
return len_total
def recv(self):
thread_id = threading.get_ident()
sock = self.thread_id_socket_dict.get(thread_id)
return self.recv_by_s(sock)
def recv_by_s(self, sock):
msg = b""
try:
msg = sock.recv(self.recv_size)
except socket.error:
logging.error("The client is closed!")
if msg:
logging.info("Received message: {}".format(msg))
return msg
def check_task(self):
"""
Interface.
"""
def close(self):
super().close()
for server in self._servers.keys():
server.close()
for client_ip in self.sockets.keys():
try:
self.client_connection(client_ip).close()
logging.info("Close socket :{}".format(client_ip))
except Exception as e:
logging.warning("Close socket error:{}, exception:{}".format(client_ip, e))
self.sockets = {}
def start(self):
self.before_start_adapter()
while not self.is_stop_adapter:
avalible_sockets = list(self.sockets.values()) + list(self._servers.values())
rs, _, _ = select(avalible_sockets, [], [], 0.1)
self.check_read_events(rs)
try:
self.check_task()
except Exception as e:
self.msg_signal.emit(logging.ERROR,
_translate("messages", "Handle command exception: {}".format(e)))
logging.exception("Exception when check task:{}".format(e))
sleep(5)
IOAdapter Class
As shown below, IOAdapter class inherits from Adapter, and it uses a while loop in obtaining DI.
class IOAdapter(Adapter):
robot_name = None
check_rate = 0.5
def __init__(self, host_address):
super().__init__()
self.last_gi = 0
def get_digital_in(self, timeout=None):
return super().get_digital_in(self.robot_name, timeout)
def set_digital_out(self, port, value, timeout=None):
super().set_digital_out(self.robot_name, port, value, timeout)
def _check_gi(self):
gi_js = self.get_digital_in()
gi = int(json.loads(gi_js.decode())["value"])
if self.last_gi != gi:
self.last_gi = gi
logging.info("Check GI signal status: {}".format(gi))
self.handle_gi(gi)
def start(self):
self.before_start_adapter()
while not self.is_stop_adapter:
try:
self._check_gi()
except Exception as e:
logging.exception(e)
self.check_gi_failed()
sleep(self.check_rate)
def handle_gi(self, gi):
"""
Interface.
"""
def check_gi_failed(self):
"""
Interface.
"""
AdapterWidget Class
As shown below, AdapterWidget class is a parent class, and all functions related to user interface customization must be its child classes.
class AdapterWidget(QWidget):
def set_adapter(self, adapter):
self.adapter = adapter
self.after_set_adapter()
def after_set_adapter(self):
"""
Interface.
"""
def close(self):
super().close()
"""
Interface.
"""
Service
Source code belonging to the service category is stored in the Mech-Center/src/interface/services.py
file in the installation directory of Mech-Vision and Mech-Viz.
NotifyService Class
The code of the NotifyService class is shown below.
class NotifyService(JsonService):
service_type = "notify"
service_name = "adapter"
def handle_message(self, msg):
"""
Interface.
"""
def notify(self, request, _):
msg = request["notify_message"]
logging.info("notify message:{}".format(msg))
return self.handle_message(msg)
The default service name is “adapter”. If multiple notify services are needed in the project, you can override the service_name to distinguish different services. The descriptions of class functions are shown in the table below.
Class Function | Description |
---|---|
handle_message() |
Interface function; the child class can be overridden. |
notify() |
Parse the message; the child class usually does not need to be overridden. |
VisionResultSelectedAtService Class
The code of VisionResultSelectedAtService class is shown below.
class VisionResultSelectedAtService(JsonService):
service_type = "vision_watcher"
service_name = "vision_watcher_adapter"
def __init__(self):
self.poses = None
def poses_found(self, result):
"""
Interface.
"""
def posesFound(self, request, _):
logging.info("{} result:{}".format(jk.mech_vision, request))
self.poses_found(request)
def poses_planned(self, result):
"""
Interface.
"""
def posesPlanned(self, request, _):
logging.info("Plan result:{}".format(request))
self.poses_planned(request)
def multiPickCombination(self, request, _):
logging.info("multiPickCombination:{}".format(request))
The default service type is vision_watcher, which cannot be modified. The default service name is vision_watcher_adapter. If multiple vision_watcher services are needed in the project, you can modify the service_name in the child class to distinguish different services. The descriptions of class functions are shown in the table below.
Class Function | Description |
---|---|
poses_found() |
Interface function whose child class can be overridden; the parameter is the recognition result of Mech-Vision. |
posesFound() |
Parse the recognition result sent from Mech-Vision; child class function usually does not need to be overridden. |
poses_planned() |
Interface function; the parameter is the vision point in the path planned by Mech-Viz. |
posesPlanned() |
Parse the planning message sent from Mech-Viz. |
RobotService Class
The RobotService class is shown below.
class RobotService(JsonService):
service_type = "robot"
service_name = "robot"
jps = [0, 0, 0, 0, 0, 0]
pose = [0, 0, 0, 1, 0, 0, 0]
def getJ(self, *_):
return {"joint_positions": self.jps}
def setJ(self, jps):
logging.info("setJ:{}".format(jps))
self.jps = jps
def getL(self, *_):
return {"tcp_pose": self.pose}
def getFL(self, *_):
return {"flange_pose": self.pose}
def setL(self, pose):
logging.info("setL:{}".format(pose))
self.pose = pose
def moveXs(self, params, _):
pass
def stop(self, *_):
pass
def setTcp(self, *_):
pass
def setDigitalOut(self, params, _):
pass
def getDigitalIn(self, *_):
pass
def switchPauseContinue(self, *_):
pass
The default service type is robot, which cannot be modified. The default service name is robot, which should be modified into the name of the real robot in the child class. You will need to set a pose in JPs or TCP in the child class, which is used to set a fixed position during Mech-Viz running. Please make sure that this pose will not cause collision with the scene objects. The descriptions of class functions are shown in the table below.
Class Function | Description |
---|---|
getJ() |
Return JPs to Mech-Viz/Mech-Vision. |
setJ() |
Set JPs by external services in radians. |
getL() |
Return TCP to Mech-Viz/Mech-Vision. |
getFL() |
Return flange pose to Mech-Viz/Mech-Vision. |
setL() |
Set flange pose in quaternions for external services; the unit is meters. |
moveXs() |
This function will be called after Mech-Viz finishes planning the path. The parameter contains property of waypoints. Please note that if there are Steps that may interrupt pre-planning, such as Check DI, Branch by Msg in the Mech-Viz project, Mech-Viz will call this function for multiple times. |
stop() |
Stop the robot (this function is seldom called). |
setTcp() |
Set TCP (this function is seldom called). |
setDigitalOut() |
Set DO (this function is seldom called). |
getDigitalIn() |
Obtain DI (this function is seldom called). |
switchPauseContinue() |
Pause/continue the robot (this function is seldom called). |
OuterMoveService Class
The code of OuterMoveService class is shown below.
class OuterMoveService(JsonService):
service_type = "outer_move"
service_name = "outer_move"
move_target_type = TCP_POSE
velocity = 0.25
acceleration = 0.25
blend_radius = 0.05
motion_type = MOVEJ
is_tcp_pose = False
pick_or_place = 0
def __init__(self):
self.targets = []
def gather_targets(self, di, jps, flange_pose):
"""
Interface.
Please add targets to `self.targets` here if needed.
"""
def add_target(self, move_target_type, target):
self.targets.append({"move_target_type": move_target_type, "target": target})
def getMoveTargets(self, params, *_):
"""
@return: targets(move_target_type 0:jps, 1:tcp_pose, 2:obj_pose)
velocity(default 0.25)
acceleration(default 0.25)
blend_radius(default 0.05)
motion_type(default moveJ 'J':moveJ, 'L':moveL)
is_tcp_pose(default False)
"""
di = params["di"]
jps = params["joint_positions"]
flange_pose = params["pose"]
logging.info("getMoveTargets: di={}, jps={}, flange_pose={}".format(di, jps, flange_pose))
self.gather_targets(di, jps, flange_pose)
targets = self.targets[:]
self.targets.clear()
logging.info("Targets: {}".format(targets))
return {"targets": targets, "velocity": self.velocity, "acceleration": self.acceleration, "blend_radius": self.blend_radius,
"motion_type": self.motion_type, "is_tcp_pose": self.is_tcp_pose, "pick_or_place": self.pick_or_place}
Both the default service type and service name are “outer_move”. If multiple outer_move services are needed in the project, you can modify the service_name in the child class to distinguish different services. The descriptions of class functions are shown in the table below.
Class Function | Description |
---|---|
move_target_type() |
Type of the target; 0: jps, 1: tcp_pose, 2: obj_pose. |
velocity() |
Velocity of the target (waypoint); the default value is 0.25. |
acceleration() |
Acceleration of the target; the default value is 0.25. |
blend_radius() |
The blend radius of the target; the default value is 0.05m. |
motion_type() |
The motion type of the target: ‘J’: moveJ, ‘L’: moveL. |
is_tcp_pose() |
Whether the pose of the target is in TCP. |
gather_targets() |
Interface function, which collects all the targets. The parameter contains current JPs, flange pose, and DI value of the robot. The child class can be overridden according to actual requirements. |
add_target() |
Add a target; this function can be called in the child class to add a target. |
getMoveTargets() |
This function will be called when Mech-Viz proceeds on the External Move Step. The parameter contains the current JPs, flange pose, and DI value of the robot. |
Register Service
The services corresponding to the above four classes are only available after being registered. The function used for service registration is shown below.
def register_service(hub_caller, service, other_info=None):
server, port = start_server(service)
if service.service_type = "robot":
other_info["from_adapter"] = True
other_info["simulate"] = False
hub_caller.register_service(service.service_type, service.service_name, port, other_info)
return server, port