Abstract Parent Class Interfaces

Abstract Parent Class Interfaces are the functions that can be added with a child class that will inherit from its parent class. You can rewrite the functions according to actual requirements. This section will introduce you to the following abstract parent classes.

Communication

Source code belonging to the communication category is stored in the Communication Component/src/interface/communication.py file in the installation directory of Mech-Vision and Mech-Viz.

Communication Class

Communication class is a basic class used for communication. It provides a series of interface functions that should be rewritten on the server or client.

Class Function Description

is_connected()

Determine if the current connection is successful.

set_recv_size()

Set the length of the data received each time; the default value is 1024 bytes.

send()

Interface function that is used to send data.

recv()

Interface function that is used to receive data.

close()

Interface function that is used to close the socket communication.

before_recv()

Interface function that you can add a logic according to actual needs before receiving the data. Function overriding is available.

after_recv()

Interface function that you can add a logic according to actual needs after receiving the data. Function overriding is available.

after_handle()

Interface function that you can add a logic according to actual needs after processing the data. Function overriding is available.

TcpServer Class

TcpServer class encapsulates a TCP/IP Socket server.

Class Function Description

bind_and_listen()

Bind the port.

local_socket()

Provide the local Socket information.

remote_socket()

Provide the remote Socket information.

accept()

Accept the connection with the client.

send()

Send the data.

recv()

Receive the data.

close()

Close the socket.

close_client()

Disconnect with the client.

TcpClinet Class

TcpClient class encapsulates a TCP/IP Socket client.

Class Property Description

is_bind_port

Whether to bind the port. If there is a port restriction on the client, the value should be set to True.

Class Function Description

send()

Send the data.

recv()

Receive the data.

close()

Close the socket.

set_timeout()

Set the timeout value in seconds.

reconnect_server()

Reconnect to the server.

after_connect_server()

Interface function that is used to specify what to do after the first successful connection with the server.

after_reconnect_server()

Interface function that is used to specify what to do after reconnecting with the server.

after_timeout()

Interface function that is used to specify what to do after the timeout.

Adapter

Source code belonging to the Adapter category is stored in the Communication Component/src/interface/adapter.py file in the installation directory of Mech-Vision and Mech-Viz.

Adapter Base Class

Adapter encapsulates functions related to Mech-Viz, Mech-Vision, and Robserver, including start Mech-Viz, stop Mech-Viz, set Mech-Vision or Mech-Viz Step parameters, and start Mech-Vision recognition. When an Adapter program is used to call Mech-Viz or Mech-Vision, the child class of Adapter base class must be used.

The class properties of Adapter are shown in the table below.

Class Property Description

viz_project_dir

Directory of the current Mech-Viz project

vision_project_name

Name of the current Mech-Vision project

is_simulate

Whether to run Mech-Viz in simulation mode

is_keep_viz_state

Whether to keep the state when Mech-Viz stopped last time

is_save_executor_data

Whether to save the data of Mech-Viz executor

is_force_simulate

Whether to force Mech-Viz to run in simulation mode

is_force_real_run

Whether to force Mech-Viz to run and control the real robot

code_signal

The signal of displaying Adapter information in the Console tab of the log panel in Mech-Vision (including error codes)

msg_signal

The signal of displaying Adapter information in the Console tab of the log panel in Mech-Vision (not including error codes)

i_code_signal

The signal of displaying Mech-Interface information in the Console tab of the log panel in Mech-Vision (including error codes)

i_msg_signal

The signal of displaying Mech-Interface information in the Console tab of the log panel in Mech-Vision (not including error codes)

viz_finished_signal

The signal indicating that Mech-Viz is stopped normally or with an error

connect_robot_signal

Connect/disconnect with the robot signal

start_adapter_signal

Start the Adapter signal

service_name_changed

The signal of displaying status of Mech-Viz and Mech-Vision in the Console tab of the log panel in Mech-Vision

setting_infos

The configuration information

service_name

Name of the registered service

The Adapter class functions are as shown in the table below.

Class Function Description

on_exec_status_changed()

Receive the status information from Mech-Viz and Mech-Vision.

register_self_service()

Register the Adapter service.

vision_project_dirs(self)

Query the directory of the Mech-Vision project.

vision_project_names()

Query all Mech-Vision project names.

vision_project_names_in_center()

Query the names of all loaded Mech-Vision projects.

is_viz_registered()

Check whether Mech-Viz is registered.

is_viz_in_running()

Check whether Mech-Viz is running.

is_vision_started()

Check whether the Mech-Vision project is registered.

find_services()

Find services.

before_start_viz()

This function will be called before starting Mech-Viz.

after_start_viz()

This function will be called after starting Mech-Viz.

viz_not_registerd()

This function will be called if Mech-Viz is not registered after being started.

viz_is_running()

This function will be called if Mech-Viz is already running when starting Mech-Viz.

viz_run_error()

This function will be called if an error occurs when running Mech-Viz after Mech-Viz is started.

viz_run_finished()

This function will be called after Mech-Viz running is finished.

viz_plan_failed()

This function will be called after Mech-Viz fails to plan a path.

viz_no_targets()

This function will be called when the path planned by Mech-Viz has no moving waypoints.

viz_unreachable_targets()

This function will be called when the path planned by Mech-Viz contains a waypoint that cannot be reached.

viz_collision_checked()

This function will be called when a collision is detected in the path planned by Mech-Viz.

parse_viz_reply()

Parse the reply from Mech-Viz.

wait_viz_result()

Wait for Mech-Viz to reply.

start_viz()

Start Mech-Viz.

stop_viz()

Stop Mech-Viz.

pause_viz()

Pause Mech-Viz.

find_vision_pose()

Trigger the Mech-Vision project to capture images.

async_call_vision_run()

Trigger the Mech-Vision project to capture images asynchronously.

async_get_vision_callback()

Receive the result from Mech-Vision asynchronously.

deal_vision_result()

Process the result from Mech-Vision.

set_step_property()

Set Step parameters in Mech-Vision.

read_step_property()

Read Step parameters in Mech-Vision.

select_parameter_group()

Select the parameter recipe in the Mech-Vision project.

set_task_property()

Set Step parameters in Mech-Viz.

read_task_property()

Read Step parameters in Mech-Viz.

get_digital_in()

Obtain DI.

set_digital_out()

Set DO.

before_start_adapter()

This function will be called before starting Adapter.

start()

Start Adapter.

close()

Stop Adapter.

handle_command()

Process commands received from external devices.

TcpServerAdapter Class

As shown below, TcpServerAdapter class inherits from Adapter, and it encapsulates functions of TcpServer.

class TcpServerAdapter(Adapter):
    def __init__(self, host_address, server=TcpServer):
        super(TcpServerAdapter, self).__init__()
        self.init_server(host_address, server)

    def init_server(self, host_address, server=TcpServer):
        self._server = server(host_address)

    def set_recv_size(self, size):
        self._server.set_recv_size(size)

    def send(self, msg, is_logging=True):
        return self._server.send(msg, is_logging)

    def recv(self):
        return self._server.recv()

    def start(self):
        self.before_start_adapter()
        while not self.is_stop_adapter:
            try:
                self._server.before_recv()
                cmds = self._server.recv()
                logging.info("Received raw data from client:{}".format(cmds))
                if not cmds:
                    logging.warning("Adapter client is disconnected!")
                    self.code_signal.emit(logging.WARNING, CENTER_CLIENT_DISCONNECTED)
                    self._server.close_client()
                    self.accept()
                    continue
                self._server.after_recv()
            except socket.error:
                logging.warning("Adapter client is closed!")
                self.code_signal.emit(logging.WARNING, CENTER_CLIENT_DISCONNECTED)
                self._server.close_client()
                self.accept()
            except Exception as e:
                logging.exception("Exception occurred when receiving data from client: {}.".format(e))
            else:
                try:
                    self.handle_command(cmds)
                    self._server.after_handle()
                except Exception as e:
                    self.msg_signal.emit(logging.ERROR, _translate("messages", "Handle command exception: {}".format(e)))
                    logging.exception("Adapter exception in handle_command(): {}".format(e))

    def close(self):
        super().close()
        self._server.close()

    def before_start_adapter(self):
        super().before_start_adapter()
        self.accept()

    def accept(self):
        if self.is_stop_adapter:
            return
        self.code_signal.emit(logging.INFO, CENTER_WAIT_FOR_CLIENT)
        self._server.accept()
        if self._server.is_connected():
            self.code_signal.emit(logging.INFO, CENTER_CLIENT_CONNECTED)
            self.msg_signal.emit(logging.INFO, _translate("messages", "Client address is") + " {}".format(self._server.remote_socket()[1]))

TcpClientAdapter Class

As shown below, TcpClientAdapter class inherits from Adapter, and it encapsulates functions of TcpClient.

class TcpClientAdapter(Adapter):

    def __init__(self, host_address):
        super().__init__()
        self.init_client(host_address)

    def init_client(self, host_address, client=TcpClient):
        self._client = client(host_address)

    def set_bind_port(self, is_bind=True):
        self._client.is_bind_port = is_bind

    def set_recv_size(self, size):
        self._client.set_recv_size(size)

    def send(self, msg, is_logging=True):
        self._client.send(msg, is_logging)

    def recv(self):
        return self._client.recv()

    def start(self):
        self.reconnect_server(False)
        while not self.is_stop_adapter:
            try:
                self._client.before_recv()
                cmds = self._client.recv()
                if not cmds:
                    self.reconnect_server()
                    continue
                logging.info("Received command from server:{}".format(cmds))
                self._client.after_recv()
            except socket.timeout:
                logging.warning("Socket timeout")
                self._client.after_timeout()
            except socket.error:
                sleep(5)
                self.reconnect_server()
            except Exception as e:
                logging.exception("Exception occurred when receiving from server: {}".format(e))
            else:
                try:
                    self.handle_command(cmds)
                except Exception as e:
                    self.msg_signal.emit(logging.ERROR, _translate("messages", "Handle command exception: {}".format(e)))
                    logging.exception("Adapter exception in handle_command(): {}".format(e))

    def close(self):
        super().close()
        self._client.close()

    def reconnect_server(self, is_reconnect=True):
        self._client.reconnect_server()
        if self.is_stop_adapter:
            return
        if self._client.is_connected():
            self.code_signal.emit(logging.INFO, CENTER_CONNECT_TO_SERVER)
        else:
            self.code_signal.emit(logging.WARNING, CENTER_SERVER_DISCONNECTED)

TcpMultiplexingServerAdapter Class

As shown below, TcpMultiplexingServerAdapter class inherits from Adapter, and it is mainly used to connect multiple clients.

class TcpMultiThreadingServerAdapter(Adapter):
    def __init__(self, address):
        super().__init__()
        self._servers = {}
        self.add_server(address)
        self.sockets = {}
        self.clients_ip = {}
        self.thread_pool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="tcp_multi_server_thread")
        self.thread_id_socket_dict = {}
        self.set_recv_size()

    def set_recv_size(self, size=1024):
        self.recv_size = size

    def _find_client_ip(self, sock):
        for k, v in self.sockets.items():
            if v = sock:
                return k

    def _find_server(self, sock):
        for k, v in self._servers.items():
            if v = sock:
                return k

    def add_server(self, host_address):
        server = TcpServer(host_address)
        server.bind_and_listen()
        self._servers[server] = server.local_socket()

    def set_clients_ip(self, clients_ip):
        """
            Must be called before start().
            `clients_ip` is a dict(key is client ip, value is client description).
        """
        self.clients_ip = clients_ip

    def add_connection(self, ip_port, sock):
        self.sockets[ip_port] = sock
        logging.info("Add {}, connections: {}".format(ip_port, self.sockets))
        self.msg_signal.emit(logging.INFO, _translate("messages", "The client {} gets online.").format(ip_port))

    def del_connection(self, ip):
        logging.info("Del {}, connections: {}".format(ip, self.sockets))
        if self.client_connection(ip):
            self.client_connection(ip).close()
            self.sockets.pop(ip)
        self.msg_signal.emit(logging.WARNING, _translate("messages", "The client {} gets offline.").format(ip))

    def client_connection(self, client_ip):
        return self.sockets.get(client_ip)

    def check_read_events(self, rs):
        for s in rs:
            if s in self._servers.values():  # recv connection
                server = self._find_server(s)
                if self.is_stop_adapter:
                    return
                server.accept()
                client_socket, client_addr = server.remote_socket()
                ip_port = "{}:{}".format(str(client_addr[0]), str(client_addr[1]))
                self.add_connection(ip_port, client_socket)
            elif s in self.sockets.values():  # recv data
                client_ip = self._find_client_ip(s)
                if not client_ip:
                    continue
                msg = self.recv_by_s(s)
                if not msg:
                    self.del_connection(client_ip)
                    return
                try:
                    future = self.thread_pool.submit(self.handle_command_thread, s, msg)
                except Exception as e:
                    logging.exception("Adapter exception in handle_command(): {}".format(e))

    def handle_command_thread(self, s, msg):
        thread_id = threading.get_ident()
        self.thread_id_socket_dict[thread_id] = s
        self.handle_command(msg)
        # del self.thread_id_socket_dict[thread_id]

    def send(self, msg, is_logging=True):
        thread_id = threading.get_ident()
        sock = self.thread_id_socket_dict.get(thread_id)
        len_total = len(msg)
        while msg:
            if sock:
                len_sent = sock.send(msg)
            else:
                for v in self.sockets.values():
                    try:
                        len_sent = v.send(msg)
                    except Exception as e:
                        logging.warning(e)
            if not len_sent:
                logging.warning("Connection lost, close the client connection.")
                return len_sent
            if is_logging:
                logging.info("Server send: {}, len_sent: {}".format(msg, len_sent))
            msg = msg[len_sent:]
        return len_total

    def recv(self):
        thread_id = threading.get_ident()
        sock = self.thread_id_socket_dict.get(thread_id)
        return self.recv_by_s(sock)

    def recv_by_s(self, sock):
        msg = b""
        try:
            msg = sock.recv(self.recv_size)
        except socket.error:
            logging.error("The client is closed!")
        if msg:
            logging.info("Received message: {}".format(msg))
        return msg

    def check_task(self):
        """
            Interface.
        """

    def close(self):
        super().close()
        for server in self._servers.keys():
            server.close()
        for client_ip in self.sockets.keys():
            try:
                self.client_connection(client_ip).close()
                logging.info("Close socket :{}".format(client_ip))
            except Exception as e:
                logging.warning("Close socket error:{}, exception:{}".format(client_ip, e))
        self.sockets = {}

    def start(self):
        self.before_start_adapter()
        while not self.is_stop_adapter:
            avalible_sockets = list(self.sockets.values()) + list(self._servers.values())
            rs, _, _ = select(avalible_sockets, [], [], 0.1)
            self.check_read_events(rs)
            try:
                self.check_task()
            except Exception as e:
                self.msg_signal.emit(logging.ERROR,
                                     _translate("messages", "Handle command exception: {}".format(e)))
                logging.exception("Exception when check task:{}".format(e))
                sleep(5)

IOAdapter Class

As shown below, IOAdapter class inherits from Adapter, and it uses a while loop in obtaining DI.

class IOAdapter(Adapter):
    robot_name = None
    check_rate = 0.5

    def __init__(self, host_address):
        super().__init__()
        self.last_gi = 0

    def get_digital_in(self, timeout=None):
        return super().get_digital_in(self.robot_name, timeout)

    def set_digital_out(self, port, value, timeout=None):
        super().set_digital_out(self.robot_name, port, value, timeout)

    def _check_gi(self):
        gi_js = self.get_digital_in()
        gi = int(json.loads(gi_js.decode())["value"])
        if self.last_gi != gi:
            self.last_gi = gi
            logging.info("Check GI signal status: {}".format(gi))
        self.handle_gi(gi)

    def start(self):
        self.before_start_adapter()
        while not self.is_stop_adapter:
            try:
                self._check_gi()
            except Exception as e:
                logging.exception(e)
                self.check_gi_failed()
            sleep(self.check_rate)

    def handle_gi(self, gi):
        """
            Interface.
        """

    def check_gi_failed(self):
        """
            Interface.
        """

AdapterWidget Class

As shown below, AdapterWidget class is a parent class, and all functions related to user interface customization must be its child classes.

class AdapterWidget(QWidget):

    def set_adapter(self, adapter):
        self.adapter = adapter
        self.after_set_adapter()

    def after_set_adapter(self):
        """
            Interface.
        """

    def close(self):
        super().close()
        """
            Interface.
        """

Service

Source code belonging to the service category is stored in the Communication Component/src/interface/services.py file in the installation directory of Mech-Vision and Mech-Viz.

NotifyService Class

The code of the NotifyService class is shown below.

class NotifyService(JsonService):
    service_type = "notify"
    service_name = "adapter"

    def handle_message(self, msg):
        """
            Interface.
        """

    def notify(self, request, _):
        msg = request["notify_message"]
        logging.info("notify message:{}".format(msg))
        return self.handle_message(msg)

The default service name is adapter. If multiple notify services are needed in the project, you can override the service_name to distinguish different services. The descriptions of class functions are shown in the table below.

Class Function Description

handle_message()

Interface function; the child class can be overridden.

notify()

Parse the message; the child class usually does not need to be overridden.

VisionResultSelectedAtService Class

The code of VisionResultSelectedAtService class is shown below.

class VisionResultSelectedAtService(JsonService):
    service_type = "vision_watcher"
    service_name = "vision_watcher_adapter"

    def __init__(self):
        self.poses = None

    def poses_found(self, result):
        """
            Interface.
        """

    def posesFound(self, request, _):
        logging.info("{} result:{}".format(jk.mech_vision, request))
        self.poses_found(request)

    def poses_planned(self, result):
        """
            Interface.
        """

    def posesPlanned(self, request, _):
        logging.info("Plan result:{}".format(request))
        self.poses_planned(request)

    def multiPickCombination(self, request, _):
        logging.info("multiPickCombination:{}".format(request))

The default service type is vision_watcher, which cannot be modified. The default service name is vision_watcher_adapter. If multiple vision_watcher services are needed in the project, you can modify the service_name in the child class to distinguish different services. The descriptions of class functions are shown in the table below.

Class Function Description

poses_found()

Interface function whose child class can be overridden; the parameter is the recognition result of Mech-Vision.

posesFound()

Parse the recognition result sent from Mech-Vision; child class function usually does not need to be overridden.

poses_planned()

Interface function; the parameter is the vision point in the path planned by Mech-Viz.

posesPlanned()

Parse the planning message sent from Mech-Viz.

RobotService Class

The RobotService class is shown below.

class RobotService(JsonService):
    service_type = "robot"
    service_name = "robot"
    jps = [0, 0, 0, 0, 0, 0]
    pose = [0, 0, 0, 1, 0, 0, 0]

    def getJ(self, *_):
        return {"joint_positions": self.jps}

    def setJ(self, jps):
        logging.info("setJ:{}".format(jps))
        self.jps = jps

    def getL(self, *_):
        return {"tcp_pose": self.pose}

    def getFL(self, *_):
        return {"flange_pose": self.pose}

    def setL(self, pose):
        logging.info("setL:{}".format(pose))
        self.pose = pose

    def moveXs(self, params, _):
        pass

    def stop(self, *_):
        pass

    def setTcp(self, *_):
        pass

    def setDigitalOut(self, params, _):
        pass

    def getDigitalIn(self, *_):
        pass

    def switchPauseContinue(self, *_):
        pass

The default service type is robot, which cannot be modified. The default service name is robot, which should be modified into the name of the real robot in the child class. You will need to set a pose in JPs or TCP in the child class, which is used to set a fixed position during Mech-Viz running. Please make sure that this pose will not cause collision with the scene objects. The descriptions of class functions are shown in the table below.

Class Function Description

getJ()

Return JPs to Mech-Viz/Mech-Vision.

setJ()

Set JPs by external services in radians.

getL()

Return TCP to Mech-Viz/Mech-Vision.

getFL()

Return flange pose to Mech-Viz/Mech-Vision.

setL()

Set flange pose in quaternions for external services; the unit is meters.

moveXs()

This function will be called after Mech-Viz finishes planning the path. The parameter contains property of waypoints. Please note that if there are Steps that may interrupt pre-planning, such as Check DI, Branch by Msg in the Mech-Viz project, Mech-Viz will call this function for multiple times.

stop()

Stop the robot (this function is seldom called).

setTcp()

Set TCP (this function is seldom called).

setDigitalOut()

Set DO (this function is seldom called).

getDigitalIn()

Obtain DI (this function is seldom called).

switchPauseContinue()

Pause/continue the robot (this function is seldom called).

OuterMoveService Class

The code of OuterMoveService class is shown below.

class OuterMoveService(JsonService):
    service_type = "outer_move"
    service_name = "outer_move"
    move_target_type = TCP_POSE
    velocity = 0.25
    acceleration = 0.25
    blend_radius = 0.05
    motion_type = MOVEJ
    is_tcp_pose = False
    pick_or_place = 0

    def __init__(self):
        self.targets = []

    def gather_targets(self, di, jps, flange_pose):
        """
            Interface.

            Please add targets to `self.targets` here if needed.
        """

    def add_target(self, move_target_type, target):
        self.targets.append({"move_target_type": move_target_type, "target": target})

    def getMoveTargets(self, params, *_):
        """
        @return: targets(move_target_type  0:jps, 1:tcp_pose, 2:obj_pose)
                 velocity(default 0.25)
                 acceleration(default 0.25)
                 blend_radius(default 0.05)
                 motion_type(default moveJ  'J':moveJ, 'L':moveL)
                 is_tcp_pose(default False)
        """
        di = params["di"]
        jps = params["joint_positions"]
        flange_pose = params["pose"]
        logging.info("getMoveTargets: di={}, jps={}, flange_pose={}".format(di, jps, flange_pose))

        self.gather_targets(di, jps, flange_pose)
        targets = self.targets[:]
        self.targets.clear()
        logging.info("Targets: {}".format(targets))
        return {"targets": targets, "velocity": self.velocity, "acceleration": self.acceleration, "blend_radius": self.blend_radius,
                "motion_type": self.motion_type, "is_tcp_pose": self.is_tcp_pose, "pick_or_place": self.pick_or_place}

Both the default service type and service name are outer_move. If multiple outer_move services are needed in the project, you can modify the service_name in the child class to distinguish different services. The descriptions of class functions are shown in the table below.

Class Function Description

move_target_type()

Type of the target; 0: jps, 1: tcp_pose, 2: obj_pose.

velocity()

Velocity of the target (waypoint); the default value is 0.25.

acceleration()

Acceleration of the target; the default value is 0.25.

blend_radius()

The blend radius of the target; the default value is 0.05m.

motion_type()

The motion type of the target: ‘J’: moveJ, ‘L’: moveL.

is_tcp_pose()

Whether the pose of the target is in TCP.

gather_targets()

Interface function, which collects all the targets. The parameter contains current JPs, flange pose, and DI value of the robot. The child class can be overridden according to actual requirements.

add_target()

Add a target; this function can be called in the child class to add a target.

getMoveTargets()

This function will be called when Mech-Viz proceeds on the External Move Step. The parameter contains the current JPs, flange pose, and DI value of the robot.

Register Service

The services corresponding to the above four classes are only available after being registered. The function used for service registration is shown below.

def register_service(hub_caller, service, other_info=None):
    server, port = start_server(service)
    if service.service_type = "robot":
        other_info["from_adapter"] = True
        other_info["simulate"] = False
    hub_caller.register_service(service.service_type, service.service_name, port, other_info)
    return server, port

We Value Your Privacy

We use cookies to provide you with the best possible experience on our website. By continuing to use the site, you acknowledge that you agree to the use of cookies. If you decline, a single cookie will be used to ensure you're not tracked or remembered when you visit this website.