抽象親クラスインターフェース

抽象親クラスインターフェースとは、子クラスが親クラスを継承する際に、実際のニーズに応じて書き換えることができる関数を指します。本節では、次の抽象親クラスについて説明します。

Communication

通信関連のクラスのソースファイルは、Mech-Mindソフトウェアシステムのインストールディレクトリの Mech-Center/src/interface/communication.py ファイルに格納されています。

Communicationクラス

Communicationクラスは、通信に関する基本的な機能を提供するクラスであり、一連のインターフェースを提供します。サーバーまたはクライアントは、このクラスのインターフェース関数を書き換える必要があります。

クラス関数 説明

is_connected()

現在の接続が確立されているかどうかを確認します。

set_recv_size()

受信データの長さを設定します(デフォルトは1024バイト)。

send()

データを送信するためのインターフェース関数です。

recv()

データを受信するためのインターフェース関数です。

close()

接続を切断するためのインターフェース関数です。

before_recv()

インターフェース関数で、データを受信する前に実際の状況に応じてロジックを追加できます(この関数を書き換えることができます)。

after_recv()

インターフェース関数で、データを受信した後に実際の状況に応じてロジックを追加できます(この関数を書き換えることができます)。

after_handle()

インターフェース関数で、データを処理した後に実際の状況に応じてロジックを追加できます(この関数を書き換えることができます)。

TcpServerクラス

TcpServerクラスは、TCP/IP Socketのサーバーをパッケージ化します。

クラス関数 説明

bind_and_listen()

ポートをバインドします。

local_socket()

ネイティブソケット情報を提供します。

remote_socket()

リモートソケット情報を提供します。

accept()

クライアント接続を受け入れます。

send()

データを送信します。

recv()

データを受信します。

close()

Socket接続を終了します。

close_client()

クライアント接続を閉じます。

TcpClinetクラス

TcpClientクラスは、TCP/IP Socketのクライアントをパッケージ化します。

クラスのプロパティ 説明

is_bind_port

ポートをバインドするかどうか(接続されたクライアントのポートがサーバーによって制限されている場合、この変数はTrueに設定する必要があります)

クラス関数 説明

send()

データを送信します。

recv()

データを受信します。

close()

接続を閉じます。

set_timeout()

タイムアウトを設定します(単位は秒)。

reconnect_server()

サーバーを再接続します。

after_connect_server()

インターフェース関数で、サーバーへの最初の接続が成功した後の操作です。

after_reconnect_server()

インターフェース関数で、サーバーへの再接続が成功した後の操作です。

after_timeout()

インターフェース関数で、タイムアウト後の操作です。

Adapter(アダプター)

Adapter関連のクラスのソースファイルは、Mech-Mindソフトウェアシステムのインストールディレクトリの Mech-Center/src/interface/adapter.py ファイルに格納されています。

Adapterクラス

Adapterクラスは、Mech-Viz、Mech-Vision、Mech-CenterおよびRobserverに関連する呼び出しをパッケージ化します。その中には、Mech-Vizの起動、Mech-Vizの停止、Mech-VisionまたはMech-Vizステップパラメータの設定、Mech-Vision認識の実行などの関数が含まれます。AdapterプログラムがMech-VizまたはMech-Visionを呼び出す限り、Adapterクラスを継承する必要があります。

Adapterクラスのプロパティを次の表に示します。

クラスのプロパティ 説明

viz_project_dir

現在のMech-Vizプロジェクトパス

vision_project_name

現在のMech-Visionプロジェクト名

is_simulate

Mech-Vizをシミュレーションで実行するかどうか

is_keep_viz_state

Mech-Vizを最後に停止した時の状態を保持するかどうか

is_save_executor_data

Mech-Vizアクチュエータのデータを保存するかどうか

is_force_simulate

Mech-Vizを強制的にシミュレーションで実行するかどうか

is_force_real_run

Mech-Vizを強制的に実行するかどうか

code_signal

Mech-CenterのメインインターフェースでAdapterメッセージを表示するための信号(エラーコード付きのメッセージ)

msg_signal

Mech-CenterのメインインターフェースでAdapterメッセージを表示するための信号(エラーコードのないメッセージ)

i_code_signal

Mech-CenterのメインインターフェースでMech-Interfaceメッセージを表示するための信号(エラーコード付きのメッセージ)

i_msg_signal

Mech-CenterのメインインターフェースでMech-Interfaceメッセージを表示するための信号(エラーコードのないメッセージ)

viz_finished_signal

Mech-Viz実行終了信号(正常終了または異常終了)

connect_robot_signal

ロボット接続/切断の信号

start_adapter_signal

Adapterの起動信号

service_name_changed

Mech-CenterメインインターフェースでMech-VizおよびMech-Visionステータを表示するための信号

setting_infos

Mech-Centerの設定情報

service_name

登録サービス名

Adapterクラスのプロパティを次の表に示します。

クラス関数 説明

on_exec_status_changed()

Mech-VizおよびMech-Visionのステータス情報を受信します。

register_self_service()

Adapterサービスを登録します。

vision_project_dirs(self):

Mech-Visionプロジェクトのフォルダパスを探します。

vision_project_names()

すべてのMech-Visionプロジェクト名を探します。

vision_project_names_in_center()

Mech-Centerに登録されたすべてのMech-Visionプロジェクト名を探します。

is_viz_registered()

Mech-Vizプロジェクトが登録されているかどうかを確認します。

is_viz_in_running()

Mech-Vizが実行しているかどうかを確認します。

is_vision_started()

Mech-Visionプロジェクトが登録されているかどうかを確認します。

find_services()

サービスを探します。

before_start_viz()

Mech-Vizの起動前に呼び出される関数です。

after_start_viz()

Mech-Vizの起動後に呼び出される関数です。

viz_not_registerd()

Mech-Vizの起動後、Mech-Vizプロジェクトが登録されていない場合はこの関数が呼び出されます。

viz_is_running()

Mech-Vizの起動後、Mech-Vizが実行している場合はこの関数が呼び出されます。

viz_run_error()

Mech-Vizの起動後、Mech-Viz実行中にエラーが発生した場合はこの関数が呼び出されます。

viz_run_finished()

Mech-Vizの実行が終了したときに呼び出される関数です。

viz_plan_failed()

Mech-Vizの計画が失敗したときに呼び出される関数です。

viz_no_targets()

Mech-Vizの計画に移動点がない場合に呼び出される関数です。

viz_unreachable_targets()

Mech-Vizの計画に到達不能な移動点がある場合に呼び出される関数です。

viz_collision_checked()

Mech-Vizの計画に衝突が検出された場合に呼び出される関数です。

parse_viz_reply()

Mech-Vizの返信を解析します。

wait_viz_result()

Mech-Vizの返信を待ちます。

start_viz()

Mech-Vizを起動します。

stop_viz()

Mech-Vizを停止します。

pause_viz()

Mech-Vizを一時停止します。

find_vision_pose()

Mech-Visionプロジェクトをトリガーして撮影します。

async_call_vision_run()

Mech-Visionプロジェクトを非同期にトリガーして撮影します。

async_get_vision_callback()

Mech-Visionからの結果を非同期的に受信します。

deal_vision_result()

Mech-Visionからの結果を処理します。

set_step_property()

Mech-Visionのステップパラメータを設定します。

read_step_property()

Mech-Visionのステップパラメータを読み取ります。

select_parameter_group()

Mech-Visionプロジェクトのパラメータレシピを選択します。

set_task_property()

Mech-Vizのステップパラメータを設定します。

read_task_property()

Mech-Vizのステップパラメータを読み取ります。

get_digital_in()

DIを取得します。

set_digital_out()

DOを設定します。

before_start_adapter()

Adapterの起動前に呼び出される関数です。

start()

Adapterを起動します。

close()

Adapterを終了します。

handle_command()

受信した外部コマンドを処理します。

TcpServerAdapterクラス

TcpServerAdapterクラスはAdapterを継承し、TcpServerの機能をパッケージ化します。詳細は次の通りです。

class TcpServerAdapter(Adapter):
    def __init__(self, host_address, server=TcpServer):
        super(TcpServerAdapter, self).__init__()
        self.init_server(host_address, server)

    def init_server(self, host_address, server=TcpServer):
        self._server = server(host_address)

    def set_recv_size(self, size):
        self._server.set_recv_size(size)

    def send(self, msg, is_logging=True):
        return self._server.send(msg, is_logging)

    def recv(self):
        return self._server.recv()

    def start(self):
        self.before_start_adapter()
        while not self.is_stop_adapter:
            try:
                self._server.before_recv()
                cmds = self._server.recv()
                logging.info("Received raw data from client:{}".format(cmds))
                if not cmds:
                    logging.warning("Adapter client is disconnected!")
                    self.code_signal.emit(logging.WARNING, CENTER_CLIENT_DISCONNECTED)
                    self._server.close_client()
                    self.accept()
                    continue
                self._server.after_recv()
            except socket.error:
                logging.warning("Adapter client is closed!")
                self.code_signal.emit(logging.WARNING, CENTER_CLIENT_DISCONNECTED)
                self._server.close_client()
                self.accept()
            except Exception as e:
                logging.exception("Exception occurred when receiving data from client: {}.".format(e))
            else:
                try:
                    self.handle_command(cmds)
                    self._server.after_handle()
                except Exception as e:
                    self.msg_signal.emit(logging.ERROR, _translate("messages", "Handle command exception: {}".format(e)))
                    logging.exception("Adapter exception in handle_command(): {}".format(e))

    def close(self):
        super().close()
        self._server.close()

    def before_start_adapter(self):
        super().before_start_adapter()
        self.accept()

    def accept(self):
        if self.is_stop_adapter:
            return
        self.code_signal.emit(logging.INFO, CENTER_WAIT_FOR_CLIENT)
        self._server.accept()
        if self._server.is_connected():
            self.code_signal.emit(logging.INFO, CENTER_CLIENT_CONNECTED)
            self.msg_signal.emit(logging.INFO, _translate("messages", "Client address is") + " {}".format(self._server.remote_socket()[1]))

TcpClientAdapterクラス

TcpClientAdapterクラスはAdapterを継承し、TcpClientの機能をパッケージ化します。詳細は次の通りです。

class TcpClientAdapter(Adapter):

    def __init__(self, host_address):
        super().__init__()
        self.init_client(host_address)

    def init_client(self, host_address, client=TcpClient):
        self._client = client(host_address)

    def set_bind_port(self, is_bind=True):
        self._client.is_bind_port = is_bind

    def set_recv_size(self, size):
        self._client.set_recv_size(size)

    def send(self, msg, is_logging=True):
        self._client.send(msg, is_logging)

    def recv(self):
        return self._client.recv()

    def start(self):
        self.reconnect_server(False)
        while not self.is_stop_adapter:
            try:
                self._client.before_recv()
                cmds = self._client.recv()
                if not cmds:
                    self.reconnect_server()
                    continue
                logging.info("Received command from server:{}".format(cmds))
                self._client.after_recv()
            except socket.timeout:
                logging.warning("Socket timeout")
                self._client.after_timeout()
            except socket.error:
                sleep(5)
                self.reconnect_server()
            except Exception as e:
                logging.exception("Exception occurred when receiving from server: {}".format(e))
            else:
                try:
                    self.handle_command(cmds)
                except Exception as e:
                    self.msg_signal.emit(logging.ERROR, _translate("messages", "Handle command exception: {}".format(e)))
                    logging.exception("Adapter exception in handle_command(): {}".format(e))

    def close(self):
        super().close()
        self._client.close()

    def reconnect_server(self, is_reconnect=True):
        self._client.reconnect_server()
        if self.is_stop_adapter:
            return
        if self._client.is_connected():
            self.code_signal.emit(logging.INFO, CENTER_CONNECT_TO_SERVER)
        else:
            self.code_signal.emit(logging.WARNING, CENTER_SERVER_DISCONNECTED)

TcpMultiplexingServerAdapterクラス

TcpMultiplexingServerAdapterクラスはAdapterを継承しており、主に複数のクライアントの接続に使用されます。詳細は次の通りです。

class TcpMultiThreadingServerAdapter(Adapter):
    def __init__(self, address):
        super().__init__()
        self._servers = {}
        self.add_server(address)
        self.sockets = {}
        self.clients_ip = {}
        self.thread_pool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="tcp_multi_server_thread")
        self.thread_id_socket_dict = {}
        self.set_recv_size()

    def set_recv_size(self, size=1024):
        self.recv_size = size

    def _find_client_ip(self, sock):
        for k, v in self.sockets.items():
            if v = sock:
                return k

    def _find_server(self, sock):
        for k, v in self._servers.items():
            if v = sock:
                return k

    def add_server(self, host_address):
        server = TcpServer(host_address)
        server.bind_and_listen()
        self._servers[server] = server.local_socket()

    def set_clients_ip(self, clients_ip):
        """
            Must be called before start().
            `clients_ip` is a dict(key is client ip, value is client description).
        """
        self.clients_ip = clients_ip

    def add_connection(self, ip_port, sock):
        self.sockets[ip_port] = sock
        logging.info("Add {}, connections: {}".format(ip_port, self.sockets))
        self.msg_signal.emit(logging.INFO, _translate("messages", "The client {} gets online.").format(ip_port))

    def del_connection(self, ip):
        logging.info("Del {}, connections: {}".format(ip, self.sockets))
        if self.client_connection(ip):
            self.client_connection(ip).close()
            self.sockets.pop(ip)
        self.msg_signal.emit(logging.WARNING, _translate("messages", "The client {} gets offline.").format(ip))

    def client_connection(self, client_ip):
        return self.sockets.get(client_ip)

    def check_read_events(self, rs):
        for s in rs:
            if s in self._servers.values():  # recv connection
                server = self._find_server(s)
                if self.is_stop_adapter:
                    return
                server.accept()
                client_socket, client_addr = server.remote_socket()
                ip_port = "{}:{}".format(str(client_addr[0]), str(client_addr[1]))
                self.add_connection(ip_port, client_socket)
            elif s in self.sockets.values():  # recv data
                client_ip = self._find_client_ip(s)
                if not client_ip:
                    continue
                msg = self.recv_by_s(s)
                if not msg:
                    self.del_connection(client_ip)
                    return
                try:
                    future = self.thread_pool.submit(self.handle_command_thread, s, msg)
                except Exception as e:
                    logging.exception("Adapter exception in handle_command(): {}".format(e))

    def handle_command_thread(self, s, msg):
        thread_id = threading.get_ident()
        self.thread_id_socket_dict[thread_id] = s
        self.handle_command(msg)
        # del self.thread_id_socket_dict[thread_id]

    def send(self, msg, is_logging=True):
        thread_id = threading.get_ident()
        sock = self.thread_id_socket_dict.get(thread_id)
        len_total = len(msg)
        while msg:
            if sock:
                len_sent = sock.send(msg)
            else:
                for v in self.sockets.values():
                    try:
                        len_sent = v.send(msg)
                    except Exception as e:
                        logging.warning(e)
            if not len_sent:
                logging.warning("Connection lost, close the client connection.")
                return len_sent
            if is_logging:
                logging.info("Server send: {}, len_sent: {}".format(msg, len_sent))
            msg = msg[len_sent:]
        return len_total

    def recv(self):
        thread_id = threading.get_ident()
        sock = self.thread_id_socket_dict.get(thread_id)
        return self.recv_by_s(sock)

    def recv_by_s(self, sock):
        msg = b""
        try:
            msg = sock.recv(self.recv_size)
        except socket.error:
            logging.error("The client is closed!")
        if msg:
            logging.info("Received message: {}".format(msg))
        return msg

    def check_task(self):
        """
            Interface.
        """

    def close(self):
        super().close()
        for server in self._servers.keys():
            server.close()
        for client_ip in self.sockets.keys():
            try:
                self.client_connection(client_ip).close()
                logging.info("Close socket :{}".format(client_ip))
            except Exception as e:
                logging.warning("Close socket error:{}, exception:{}".format(client_ip, e))
        self.sockets = {}

    def start(self):
        self.before_start_adapter()
        while not self.is_stop_adapter:
            avalible_sockets = list(self.sockets.values()) + list(self._servers.values())
            rs, _, _ = select(avalible_sockets, [], [], 0.1)
            self.check_read_events(rs)
            try:
                self.check_task()
            except Exception as e:
                self.msg_signal.emit(logging.ERROR,
                                     _translate("messages", "Handle command exception: {}".format(e)))
                logging.exception("Exception when check task:{}".format(e))
                sleep(5)

IOAdapterクラス

IOAdapterクラスはAdapterを継承し、DIを周期的に取得する操作をパッケージ化します。詳細は次の通りです。

class IOAdapter(Adapter):
    robot_name = None
    check_rate = 0.5

    def __init__(self, host_address):
        super().__init__()
        self.last_gi = 0

    def get_digital_in(self, timeout=None):
        return super().get_digital_in(self.robot_name, timeout)

    def set_digital_out(self, port, value, timeout=None):
        super().set_digital_out(self.robot_name, port, value, timeout)

    def _check_gi(self):
        gi_js = self.get_digital_in()
        gi = int(json.loads(gi_js.decode())["value"])
        if self.last_gi != gi:
            self.last_gi = gi
            logging.info("Check GI signal status: {}".format(gi))
        self.handle_gi(gi)

    def start(self):
        self.before_start_adapter()
        while not self.is_stop_adapter:
            try:
                self._check_gi()
            except Exception as e:
                logging.exception(e)
                self.check_gi_failed()
            sleep(self.check_rate)

    def handle_gi(self, gi):
        """
            Interface.
        """

    def check_gi_failed(self):
        """
            Interface.
        """

AdapterWidgetクラス

AdapterWidgetクラスは、Adapter UIをカスタマイズするための親クラスであり、UIをカスタマイズする機能はそれから継承する必要があります。詳細は次の通りです。

class AdapterWidget(QWidget):

    def set_adapter(self, adapter):
        self.adapter = adapter
        self.after_set_adapter()

    def after_set_adapter(self):
        """
            Interface.
        """

    def close(self):
        super().close()
        """
            Interface.
        """

Service

サービス関連のクラスのソースファイルは、Mech-Mindソフトウェアシステムのインストールディレクトリ Mech-Center/src/interface/services.py ファイルに格納されています。

NotifyServiceクラス

NotifyServiceクラスは次の通りです。

class NotifyService(JsonService):
    service_type = "notify"
    service_name = "adapter"

    def handle_message(self, msg):
        """
            Interface.
        """

    def notify(self, request, _):
        msg = request["notify_message"]
        logging.info("notify message:{}".format(msg))
        return self.handle_message(msg)

デフォルトのサービス名はadapterです。プロジェクトで複数の通知サービスが必要な場合は、子クラスでservice_nameを書き換えて、異なるサービスを区別できます。クラス関数の説明を次の表に示します。

クラス関数 説明

handle_message()

インターフェース関数で、この関数にロジックを実装できるために子クラスはこの関数を書き換えることができます。

notify()

メッセージの解析を提供します(通常、子クラスは書き換える必要はありません)。

VisionResultSelectedAtServiceクラス

VisionResultSelectedAtServiceクラスは次の通りです。

class VisionResultSelectedAtService(JsonService):
    service_type = "vision_watcher"
    service_name = "vision_watcher_adapter"

    def __init__(self):
        self.poses = None

    def poses_found(self, result):
        """
            Interface.
        """

    def posesFound(self, request, _):
        logging.info("{} result:{}".format(jk.mech_vision, request))
        self.poses_found(request)

    def poses_planned(self, result):
        """
            Interface.
        """

    def posesPlanned(self, request, _):
        logging.info("Plan result:{}".format(request))
        self.poses_planned(request)

    def multiPickCombination(self, request, _):
        logging.info("multiPickCombination:{}".format(request))

デフォルトのサービスタイプはvision_watcherで、タイプを変更することはできません。デフォルトの名前はvision_watcher_adapterです。プロジェクトで複数のvision_watcherサービスが必要な場合は、子クラスでservice_nameを書き換えて、異なるサービスを区別できます。クラス関数の説明を次の表に示します。

クラス関数 説明

poses_found()

インターフェース関数で、パラメータはMech-Visionからの認識結果です(この関数にロジックを実装できるために子クラスはこの関数を書き換えることができます)。

posesFound()

Mech-Visionによって認識されたメッセージを解析します(通常、子クラスを書き換える必要はありません)。

poses_planned()

インターフェース関数で、パラメータはMech-Viz計画によって選択されたビジョンポイントです。

posesPlanned()

Mech-Vizからの計画メッセージの解析を提供します。

RobotServiceクラス

RobotServiceクラスは次の通りです。

class RobotService(JsonService):
    service_type = "robot"
    service_name = "robot"
    jps = [0, 0, 0, 0, 0, 0]
    pose = [0, 0, 0, 1, 0, 0, 0]

    def getJ(self, *_):
        return {"joint_positions": self.jps}

    def setJ(self, jps):
        logging.info("setJ:{}".format(jps))
        self.jps = jps

    def getL(self, *_):
        return {"tcp_pose": self.pose}

    def getFL(self, *_):
        return {"flange_pose": self.pose}

    def setL(self, pose):
        logging.info("setL:{}".format(pose))
        self.pose = pose

    def moveXs(self, params, _):
        pass

    def stop(self, *_):
        pass

    def setTcp(self, *_):
        pass

    def setDigitalOut(self, params, _):
        pass

    def getDigitalIn(self, *_):
        pass

    def switchPauseContinue(self, *_):
        pass

デフォルトのサービスタイプはrobotで、タイプを変更することはできません。デフォルトの名前はrobotで、子クラスに対応するロボット名に変更する必要があります。また、Mech-Vizの実行中に1つの位置姿勢を固定するために、子クラスにjpsまたはpose値を設定する必要があります。この位置姿勢が経路全体でシーンと衝突しないようにすることに注意してください。クラス関数の説明を次の表に示します。

クラス関数 説明

getJ()

Mech-Viz/Mech-Visionへ関節角度を返します。

setJ()

外部で関節角度を設定します( ラジアン単位 )。

getL()

Mech-Viz/Mech-Visionへツール位置姿勢を返します。

getFL()

Mech-Viz/Mech-Visionへフランジ位置姿勢を返します。

setL()

外部でフランジ位置姿勢(四元数の形式、 メートル単位 )を設定します。

moveXs()

Mech-Vizが経路を計画した後にこの関数が呼び出され、パラメータには移動点のプロパティが含まれます(Mech-Vizプロジェクトに「DIをチェック」、分岐関連のステップなど、事前計画を中断するステップがある場合は、Mech-Vizはこの関数を複数回呼び出します)。

stop()

ロボットを停止します(通常は使用されません)。

setTcp()

TCPを設定します(通常は使用されません)。

setDigitalOut()

DOを設定します(通常は使用されません)。

getDigitalIn()

DIを取得します(通常は使用されません)。

switchPauseContinue()

ロボットを一時停止/再開します(通常は使用されません)。

OuterMoveServiceクラス

OuterMoveServiceクラスは次の通りです。

class OuterMoveService(JsonService):
    service_type = "outer_move"
    service_name = "outer_move"
    move_target_type = TCP_POSE
    velocity = 0.25
    acceleration = 0.25
    blend_radius = 0.05
    motion_type = MOVEJ
    is_tcp_pose = False
    pick_or_place = 0

    def __init__(self):
        self.targets = []

    def gather_targets(self, di, jps, flange_pose):
        """
            Interface.

            Please add targets to `self.targets` here if needed.
        """

    def add_target(self, move_target_type, target):
        self.targets.append({"move_target_type": move_target_type, "target": target})

    def getMoveTargets(self, params, *_):
        """
        @return: targets(move_target_type  0:jps, 1:tcp_pose, 2:obj_pose)
                 velocity(default 0.25)
                 acceleration(default 0.25)
                 blend_radius(default 0.05)
                 motion_type(default moveJ  'J':moveJ, 'L':moveL)
                 is_tcp_pose(default False)
        """
        di = params["di"]
        jps = params["joint_positions"]
        flange_pose = params["pose"]
        logging.info("getMoveTargets: di={}, jps={}, flange_pose={}".format(di, jps, flange_pose))

        self.gather_targets(di, jps, flange_pose)
        targets = self.targets[:]
        self.targets.clear()
        logging.info("Targets: {}".format(targets))
        return {"targets": targets, "velocity": self.velocity, "acceleration": self.acceleration, "blend_radius": self.blend_radius,
                "motion_type": self.motion_type, "is_tcp_pose": self.is_tcp_pose, "pick_or_place": self.pick_or_place}

デフォルトのサービスタイプと名前はouter_moveです。プロジェクトで複数のouter_moveサービスが必要な場合は、子クラスでservice_nameを書き換えて、異なるサービスを区別できます。クラス関数の説明を次の表に示します。

クラス関数 説明

move_target_type()

移動点のタイプ(0:jps、1:tcp_pose、2:obj_pose)。

velocity()

移動点の速度(初期値は0.25)。

acceleration()

移動点の加速度(初期値は0.25)。

blend_radius()

移動点のブレンド半径(初期値は0.05m)。

motion_type()

移動点の運動タイプ('J':moveJ、 'L':moveL)。

is_tcp_pose()

移動点はTCPであるかどうか。

gather_targets()

インターフェース関数で、すべての移動点を取得します。パラメータは現時点でのロボットの関節角度、フランジ位置姿勢およびDI値で、必要に応じて子クラスを変更できます。

add_target()

単一の移動点を追加します(子クラスでこの関数を呼び出して移動点を追加できます)。

getMoveTargets()

Mech-Vizは外部移動ステップに実行すると、この関数が呼び出されます(パラメータには現時点でのロボットの関節角度、フランジ位置姿勢、およびDI値が含まれます)。

サービスを登録

上記4つのカテゴリーに対応するサービスは、登録後のみ使用できます。登録サービス関数は次の通りです。

def register_service(hub_caller, service, other_info=None):
    server, port = start_server(service)
    if service.service_type = "robot":
        other_info["from_adapter"] = True
        other_info["simulate"] = False
    hub_caller.register_service(service.service_type, service.service_name, port, other_info)
    return server, port