Abstract Parent Class Interfaces

Abstract Parent Class Interfaces are the functions that can be added with a child class that will inherit from its parent class. You can rewrite the functions according to actual requirements. This section will introduce you to the following abstract parent classes.

Communication

The source code belongs to communication category is stored in the /src/interface/communication.py file in the installation directory of Mech-Center.

Communication Class

Communication class is a basis class used for communication. It provides a series of interface functions which should be rewritten on the server or client.

Class Function

Description

is_connected()

Determine if the current connection is successful.

set_recv_size()

Set the length of the data received each time; the default value is 1024 bytes.

send()

Interface function which is used to send data.

recv()

Interface function which is used to receive data.

close()

Interface function which is used to close the socket communication.

before_recv()

Interface function which you can add an logic according to actual needs before receiving the data. Function overriding is available.

after_recv()

Interface function which you can add an logic according to actual needs after receiving the data. Function overriding is available.

after_handle()

Interface function which you can add an logic according to actual needs after processing the data. Function overriding is available.

TcpServer Class

TcpServer class encapsulates a TCP/IP Socket server.

Class Function

Description

bind_and_listen()

Bind the port

local_socket()

Provide the local Socket information

remote_socket()

Provide the remote Socket information

accept()

Accept the connection with the client

send()

Send the data

recv()

Receive the data

close()

Close the socket

close_client()

Disconnect with the client

TcpClinet Class

TcpClient class encapsulates a TCP/IP Socket client.

Class Property

Description

is_bind_port

Whether to bind the port. If there is a port restriction on the client, the value should be set to True.

Class Function

Description

send()

Send the data

recv()

Receive the data

close()

Close the socket

set_timeout()

Timeout period in seconds

reconnect_server()

Reconnect to the server

after_connect_server()

Interface function which is used to specify what to do after the first successful connection with the server

after_reconnect_server()

Interface function which is used to specify what to do after reconnecting with the server

after_timeout()

Interface function which is used to specify what to do after the timeout period

Adapter

The source code belongs to Adapter category is stored in the /src/interface/adapter.py file in the installation directory of Mech-Center.

Adapter Base Class

Adapter encapsulates functions related to Mech-Viz, Mech-Vision, Mech-Center, Robserver, including start_viz(), stop_viz(), set_task_property(), and set_step_property(). When an Adapter program is used to control Mech-Viz or Mech-Vision, the child class of Adapter base class must be used.

The class properties of Adapter are shown in the table below.

Class Property

Description

viz_project_dir

Directory of the current Mech-Viz project

vision_project_name

Name of the current Mech-Vision project

is_simulate

Whether to run Mech-Viz in simulation mode

is_keep_viz_state

Whether to keep the state when Mech-Viz stopped last time

is_save_executor_data

Whether to save the data of Mech-Viz executor

is_force_simulate

Whether to force Mech-Viz to run in simulation mode

is_force_real_run

Whether to force Mech-Viz to run and control the real robot

code_signal

The signal of displaying Adapter information in the main interface of Mech-Center (including error codes)

msg_signal

The signal of displaying Adapter information in the main interface of Mech-Center (NOT including error codes)

i_code_signal

The signal of displaying Mech-Interface information in the main interface of Mech-Center (including error codes)

i_msg_signal

The signal of displaying Mech-Interface information in the main interface of Mech-Center (NOT including error codes)

viz_finished_signal

The signal indicating that Mech-Viz is stopped normally or with an error

connect_robot_signal

Connect/disconnect with the robot signal

start_adapter_signal

Start Adapter signal

service_name_changed

The signal of displaying statuses of Mech-Viz and Mech-Vision in the main interface of Mech-Center

setting_infos

The configuration information of Mech-Center

service_name

Name of the registered service

The Adapter class functions are as shown in the table below.

Class Function

Description

on_exec_status_changed()

Receive the status information from Mech-Viz and Mech-Vision

register_self_service()

Register Adapter service

vision_project_dirs(self):

Check the directory of Mech-Vision project

vision_project_names()

Check all project names in Mech-Vision

vision_project_names_in_center()

Check the names of all Mech-Vision projects loaded in Mech-Center

is_viz_registered()

Check whether Mech-Viz is registered in Mech-Center

is_viz_in_running()

Check whether Mech-Viz is running

is_vision_started()

Check whether Mech-Vision project is registered in Mech-Center

find_services()

Check the service

before_start_viz()

This function will be called before starting Mech-Viz

after_start_viz()

This function will be called after starting Mech-Viz

viz_not_registerd()

This function will be called if Mech-Viz is not registered after being started

viz_is_running()

This function will be called if Mech-Viz is already running when starting Mech-Viz

viz_run_error()

This function will be called if an error occurs when running Mech-Viz

viz_run_finished()

This function will be called after Mech-Viz is stopped

viz_plan_failed()

This function will be called after Mech-Viz fails to plan a path

viz_unreachable_targets()

This function will be called when there is an unreachable target in the path planned by Mech-Viz

viz_collision_checked()

This function will be called when a collision is detected in the path planned by Mech-Viz

parse_viz_reply()

Parse the reply from Mech-Viz

wait_viz_result()

Wait for Mech-Viz to reply

start_viz()

Start Mech-Viz

stop_viz()

Stop Mech-Viz

pause_viz()

Pause Mech-Viz

find_vision_pose()

Trigger Mech-Vision project to capture images

async_call_vision_run()

Trigger Mech-Vision project to capture images asynchronously

async_get_vision_callback()

Receive the result from Mech-Vision asynchronously

deal_vision_result()

Process the result from Mech-Vision

set_step_property()

Set Step parameters in Mech-Vision

read_step_property()

Read Step parameters in Mech-Vision

select_parameter_group()

Select the parameter recipe in Mech-Vision

set_task_property()

Set Task parameters in Mech-Viz

read_task_property()

Read Task parameters in Mech-Viz

get_digital_in()

Obtain DI

set_digital_out()

Set DO

before_start_adapter()

This function will be called before starting Adapter

start()

Start Adapter

close()

Stop Adapter

handle_command()

Process commands received from external devices

TcpServerAdapter Class

As shown below, TcpServerAdapter class inherits from Adapter, and it encapsulates functions of TcpServer.

class TcpServerAdapter(Adapter):
    def __init__(self, host_address, server=TcpServer):
        super(TcpServerAdapter, self).__init__()
        self.init_server(host_address, server)

    def init_server(self, host_address, server=TcpServer):
        self._server = server(host_address)

    def set_recv_size(self, size):
        self._server.set_recv_size(size)

    def send(self, msg, is_logging=True):
        return self._server.send(msg, is_logging)

    def recv(self):
        return self._server.recv()

    def start(self):
        self.before_start_adapter()
        while not self.is_stop_adapter:
            try:
                self._server.before_recv()
                cmds = self._server.recv()
                logging.info("Received raw data from client:{}".format(cmds))
                if not cmds:
                    logging.warning("Adapter client is disconnected!")
                    self.code_signal.emit(logging.WARNING, CENTER_CLIENT_DISCONNECTED)
                    self._server.close_client()
                    self.accept()
                    continue
                self._server.after_recv()
            except socket.error:
                logging.warning("Adapter client is closed!")
                self.code_signal.emit(logging.WARNING, CENTER_CLIENT_DISCONNECTED)
                self._server.close_client()
                self.accept()
            except Exception as e:
                logging.exception("Exception occurred when receiving data from client: {}.".format(e))
            else:
                try:
                    self.handle_command(cmds)
                    self._server.after_handle()
                except Exception as e:
                    self.msg_signal.emit(logging.ERROR, _translate("messages", "Handle command exception: {}".format(e)))
                    logging.exception("Adapter exception in handle_command(): {}".format(e))

    def close(self):
        super().close()
        self._server.close()

    def before_start_adapter(self):
        super().before_start_adapter()
        self.accept()

    def accept(self):
        if self.is_stop_adapter:
            return
        self.code_signal.emit(logging.INFO, CENTER_WAIT_FOR_CLIENT)
        self._server.accept()
        if self._server.is_connected():
            self.code_signal.emit(logging.INFO, CENTER_CLIENT_CONNECTED)
            self.msg_signal.emit(logging.INFO, _translate("messages", "Client address is") + " {}".format(self._server.remote_socket()[1]))

TcpClientAdapter Class

As shown below, TcpClientAdapter class inherits from Adapter, and it encapsulates functions of TcpClient.

class TcpClientAdapter(Adapter):

    def __init__(self, host_address):
        super().__init__()
        self.init_client(host_address)

    def init_client(self, host_address, client=TcpClient):
        self._client = client(host_address)

    def set_bind_port(self, is_bind=True):
        self._client.is_bind_port = is_bind

    def set_recv_size(self, size):
        self._client.set_recv_size(size)

    def send(self, msg, is_logging=True):
        self._client.send(msg, is_logging)

    def recv(self):
        return self._client.recv()

    def start(self):
        self.reconnect_server(False)
        while not self.is_stop_adapter:
            try:
                self._client.before_recv()
                cmds = self._client.recv()
                if not cmds:
                    self.reconnect_server()
                    continue
                logging.info("Received command from server:{}".format(cmds))
                self._client.after_recv()
            except socket.timeout:
                logging.warning("Socket timeout")
                self._client.after_timeout()
            except socket.error:
                sleep(5)
                self.reconnect_server()
            except Exception as e:
                logging.exception("Exception occurred when receiving from server: {}".format(e))
            else:
                try:
                    self.handle_command(cmds)
                except Exception as e:
                    self.msg_signal.emit(logging.ERROR, _translate("messages", "Handle command exception: {}".format(e)))
                    logging.exception("Adapter exception in handle_command(): {}".format(e))

    def close(self):
        super().close()
        self._client.close()

    def reconnect_server(self, is_reconnect=True):
        self._client.reconnect_server()
        if self.is_stop_adapter:
            return
        if self._client.is_connected():
            self.code_signal.emit(logging.INFO, CENTER_CONNECT_TO_SERVER)
        else:
            self.code_signal.emit(logging.WARNING, CENTER_SERVER_DISCONNECTED)

TcpMultiplexingServerAdapter Class

As shown below, TcpMultiplexingServerAdapter class inherits from Adapter, and it is mainly used to connect multiple clients.

class TcpMultiThreadingServerAdapter(Adapter):
    def __init__(self, address):
        super().__init__()
        self._servers = {}
        self.add_server(address)
        self.sockets = {}
        self.clients_ip = {}
        self.thread_pool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="tcp_multi_server_thread")
        self.thread_id_socket_dict = {}
        self.set_recv_size()

    def set_recv_size(self, size=1024):
        self.recv_size = size

    def _find_client_ip(self, sock):
        for k, v in self.sockets.items():
            if v == sock:
                return k

    def _find_server(self, sock):
        for k, v in self._servers.items():
            if v == sock:
                return k

    def add_server(self, host_address):
        server = TcpServer(host_address)
        server.bind_and_listen()
        self._servers[server] = server.local_socket()

    def set_clients_ip(self, clients_ip):
        """
            Must be called before start().
            `clients_ip` is a dict(key is client ip, value is client description).
        """
        self.clients_ip = clients_ip

    def add_connection(self, ip_port, sock):
        self.sockets[ip_port] = sock
        logging.info("Add {}, connections: {}".format(ip_port, self.sockets))
        self.msg_signal.emit(logging.INFO, _translate("messages", "The client {} gets online.").format(ip_port))

    def del_connection(self, ip):
        logging.info("Del {}, connections: {}".format(ip, self.sockets))
        if self.client_connection(ip):
            self.client_connection(ip).close()
            self.sockets.pop(ip)
        self.msg_signal.emit(logging.WARNING, _translate("messages", "The client {} gets offline.").format(ip))

    def client_connection(self, client_ip):
        return self.sockets.get(client_ip)

    def check_read_events(self, rs):
        for s in rs:
            if s in self._servers.values():  # recv connection
                server = self._find_server(s)
                if self.is_stop_adapter:
                    return
                server.accept()
                client_socket, client_addr = server.remote_socket()
                ip_port = "{}:{}".format(str(client_addr[0]), str(client_addr[1]))
                self.add_connection(ip_port, client_socket)
            elif s in self.sockets.values():  # recv data
                client_ip = self._find_client_ip(s)
                if not client_ip:
                    continue
                msg = self.recv_by_s(s)
                if not msg:
                    self.del_connection(client_ip)
                    return
                try:
                    future = self.thread_pool.submit(self.handle_command_thread, s, msg)
                except Exception as e:
                    logging.exception("Adapter exception in handle_command(): {}".format(e))

    def handle_command_thread(self, s, msg):
        thread_id = threading.get_ident()
        self.thread_id_socket_dict[thread_id] = s
        self.handle_command(msg)
        # del self.thread_id_socket_dict[thread_id]

    def send(self, msg, is_logging=True):
        thread_id = threading.get_ident()
        sock = self.thread_id_socket_dict.get(thread_id)
        len_total = len(msg)
        while msg:
            if sock:
                len_sent = sock.send(msg)
            else:
                for v in self.sockets.values():
                    try:
                        len_sent = v.send(msg)
                    except Exception as e:
                        logging.warning(e)
            if not len_sent:
                logging.warning("Connection lost, close the client connection.")
                return len_sent
            if is_logging:
                logging.info("Server send: {}, len_sent: {}".format(msg, len_sent))
            msg = msg[len_sent:]
        return len_total

    def recv(self):
        thread_id = threading.get_ident()
        sock = self.thread_id_socket_dict.get(thread_id)
        return self.recv_by_s(sock)

    def recv_by_s(self, sock):
        msg = b""
        try:
            msg = sock.recv(self.recv_size)
        except socket.error:
            logging.error("The client is closed!")
        if msg:
            logging.info("Received message: {}".format(msg))
        return msg

    def check_task(self):
        """
            Interface.
        """

    def close(self):
        super().close()
        for server in self._servers.keys():
            server.close()
        for client_ip in self.sockets.keys():
            try:
                self.client_connection(client_ip).close()
                logging.info("Close socket :{}".format(client_ip))
            except Exception as e:
                logging.warning("Close socket error:{}, exception:{}".format(client_ip, e))
        self.sockets = {}

    def start(self):
        self.before_start_adapter()
        while not self.is_stop_adapter:
            avalible_sockets = list(self.sockets.values()) + list(self._servers.values())
            rs, _, _ = select(avalible_sockets, [], [], 0.1)
            self.check_read_events(rs)
            try:
                self.check_task()
            except Exception as e:
                self.msg_signal.emit(logging.ERROR,
                                     _translate("messages", "Handle command exception: {}".format(e)))
                logging.exception("Exception when check task:{}".format(e))
                sleep(5)

IOAdapter Class

As shown below, IOAdapter class inherits from Adapter, and it uses a while loop in obtaining DI.

class IOAdapter(Adapter):
    robot_name = None
    check_rate = 0.5

    def __init__(self, host_address):
        super().__init__()
        self.last_gi = 0

    def get_digital_in(self, timeout=None):
        return super().get_digital_in(self.robot_name, timeout)

    def set_digital_out(self, port, value, timeout=None):
        super().set_digital_out(self.robot_name, port, value, timeout)

    def _check_gi(self):
        gi_js = self.get_digital_in()
        gi = int(json.loads(gi_js.decode())["value"])
        if self.last_gi != gi:
            self.last_gi = gi
            logging.info("Check GI signal status: {}".format(gi))
        self.handle_gi(gi)

    def start(self):
        self.before_start_adapter()
        while not self.is_stop_adapter:
            try:
                self._check_gi()
            except Exception as e:
                logging.exception(e)
                self.check_gi_failed()
            sleep(self.check_rate)

    def handle_gi(self, gi):
        """
            Interface.
        """

    def check_gi_failed(self):
        """
            Interface.
        """

AdapterWidget Class

As shown below, AdapterWidget class is a parent class, and all functions related to user interface customization must be its child classes.

class AdapterWidget(QWidget):

    def set_adapter(self, adapter):
        self.adapter = adapter
        self.after_set_adapter()

    def after_set_adapter(self):
        """
            Interface.
        """

    def close(self):
        super().close()
        """
            Interface.
        """

Service

The source code belongs to service category is stored in the /src/interface/services.py file in the installation directory of Mech-Center.

NotifyService Class

The code of NotifyService class is shown below.

class NotifyService(JsonService):
    service_type = "notify"
    service_name = "adapter"

    def handle_message(self, msg):
        """
            Interface.
        """

    def notify(self, request, _):
        msg = request["notify_message"]
        logging.info("notify message:{}".format(msg))
        return self.handle_message(msg)

The default service name is “adapter”. If multiple notify services are needed in the project, you can override the service_name to distinguish different services.

The descriptions of class functions are shown in the table below.

Class Function

Description

handle_message()

Interface function; the child class can be overriden

notify()

Parse the message; the child class usually does not need to be overriden

VisionResultSelectedAtService Class

The code of VisionResultSelectedAtService class is shown below.

class VisionResultSelectedAtService(JsonService):
    service_type = "vision_watcher"
    service_name = "vision_watcher_adapter"

    def __init__(self):
        self.poses = None

    def poses_found(self, result):
        """
            Interface.
        """

    def posesFound(self, request, _):
        logging.info("{} result:{}".format(jk.mech_vision, request))
        self.poses_found(request)

    def poses_planned(self, result):
        """
            Interface.
        """

    def posesPlanned(self, request, _):
        logging.info("Plan result:{}".format(request))
        self.poses_planned(request)

    def multiPickCombination(self, request, _):
        logging.info("multiPickCombination:{}".format(request))

The default service type is vision_watcher, which cannot be modified. The default service name is vision_watcher_adapter. If multiple vision_watcher services are needed in the project, you can modify the service_name in the child class to distinguish different services.

The descriptions of class functions are shown in the table below.

Class Function

Description

poses_found()

Interface function whose child class can be overriden; the parameter is the recognition result of Mech-Vision

posesFound()

Parse the recognition result sent from Mech-Vision; child class function usually does not need to be overriden

poses_planned()

Interface function; the parameter is the vision point in the path planned by Mech-Viz

posesPlanned()

Parse the planning message sent from Mech-Viz

RobotService Class

The RobotService class is shown below.

class RobotService(JsonService):
    service_type = "robot"
    service_name = "robot"
    jps = [0, 0, 0, 0, 0, 0]
    pose = [0, 0, 0, 1, 0, 0, 0]

    def getJ(self, *_):
        return {"joint_positions": self.jps}

    def setJ(self, jps):
        logging.info("setJ:{}".format(jps))
        self.jps = jps

    def getL(self, *_):
        return {"tcp_pose": self.pose}

    def getFL(self, *_):
        return {"flange_pose": self.pose}

    def setL(self, pose):
        logging.info("setL:{}".format(pose))
        self.pose = pose

    def moveXs(self, params, _):
        pass

    def stop(self, *_):
        pass

    def setTcp(self, *_):
        pass

    def setDigitalOut(self, params, _):
        pass

    def getDigitalIn(self, *_):
        pass

    def switchPauseContinue(self, *_):
        pass

The default service type is robot, which cannot be modified. The default service name is robot, which should be modified into the name of the real robot in the child class. You will need to set a home position in JPs or TCP in the child class. Please make sure that this pose will not cause collision with the scene objects.

The descriptions of class functions are shown in the table below.

Class Function

Description

getJ()

Return JPs to Mech-Viz/Mech-Vision

setJ()

Set JPs in radians for external srvices

getL()

Return TCP to Mech-Viz/Mech-Vision

getFL()

Return flange pose to Mech-Viz/Mech-Vision

setL()

Set flange pose in quaternions for external srvices; the unit is meter

moveXs()

This function will be called after Mech-Viz finishes planning the path. The parameter contains property of targets from move Tasks. Please note that if Tasks that may interrupt pre-planning, such as check_di, branch_by_msg, Mech-Viz will call this function for multiple times.

stop()

Stop the robot (this function is seldom called)

setTcp()

Set TCP (this function is seldom called)

setDigitalOut()

Set DO (this function is seldom called)

getDigitalIn()

Obtain DI (this function is seldom called)

switchPauseContinue()

Pause/continue the robot (this function is seldom called)

OuterMoveService Class

The code of OuterMoveService class is shown below.

class OuterMoveService(JsonService):
    service_type = "outer_move"
    service_name = "outer_move"
    move_target_type = TCP_POSE
    velocity = 0.25
    acceleration = 0.25
    blend_radius = 0.05
    motion_type = MOVEJ
    is_tcp_pose = False
    pick_or_place = 0

    def __init__(self):
        self.targets = []

    def gather_targets(self, di, jps, flange_pose):
        """
            Interface.

            Please add targets to `self.targets` here if needed.
        """

    def add_target(self, move_target_type, target):
        self.targets.append({"move_target_type": move_target_type, "target": target})

    def getMoveTargets(self, params, *_):
        """
        @return: targets(move_target_type  0:jps, 1:tcp_pose, 2:obj_pose)
                 velocity(default 0.25)
                 acceleration(default 0.25)
                 blend_radius(default 0.05)
                 motion_type(default moveJ  'J':moveJ, 'L':moveL)
                 is_tcp_pose(default False)
        """
        di = params["di"]
        jps = params["joint_positions"]
        flange_pose = params["pose"]
        logging.info("getMoveTargets: di={}, jps={}, flange_pose={}".format(di, jps, flange_pose))

        self.gather_targets(di, jps, flange_pose)
        targets = self.targets[:]
        self.targets.clear()
        logging.info("Targets: {}".format(targets))
        return {"targets": targets, "velocity": self.velocity, "acceleration": self.acceleration, "blend_radius": self.blend_radius,
                "motion_type": self.motion_type, "is_tcp_pose": self.is_tcp_pose, "pick_or_place": self.pick_or_place}

Both the default service type and service name are outer_move. If multiple outer_move services are needed in the project, you can modify the service_name in the child class to distinguish different services.

The descriptions of class functions are shown in the table below.

Class Function

Description

move_target_type()

Type of the target; 0:jps, 1:tcp_pose, 2:obj_pose.

velocity()

Speed of the target; the default value is 0.25.

acceleration()

Acceleration of the target; the default value is 0.25.

blend_radius()

The turning radius of the target; the default value is 0.05m.

motion_type()

The motion type of the target: ‘J’:moveJ, ‘L’:moveL.

is_tcp_pose()

Whether the pose of the target is in TCP.

gather_targets()

Interface function, which collects all the targets. The parameter contains current JPs, flange pose, and DI value of the robot. The child class can be overriden according to actual requirements.

add_target()

Add a target; this function can be called in the child class to add a target.

getMoveTargets()

This function will be called when Mech-Viz proceeds on the outer_move Task. The parameter contains current JPs, flange pose, and DI value of the robot.

Register Service

The services corresponding to the above 4 classes are only available after being registered. The function used for service registration is shown below.

def register_service(hub_caller, service, other_info=None):
    server, port = start_server(service)
    if service.service_type == "robot":
        other_info["from_adapter"] = True
        other_info["simulate"] = False
    hub_caller.register_service(service.service_type, service.service_name, port, other_info)
    return server, port