추상적인 상위 클래스 인터페이스¶
추상적인 상위 클래스 인터페이스는 하위 클래스가 상위 클래스를 상속받을 때 실제 필요에 따라 다시 작성할 수 있는 함수를 말합니다. 이 부분에서는 다음과 같은 추상적인 상위 클래스를 소개합니다.
Communication¶
통신 관련 클래스의 소스 파일은 Mech-Center 소프트웨어 설치 경로 아래의 /src/interface/communication.py
파일에 있습니다.
Communication 클래스¶
Communication 클래스는 통신을 담당하는 기본 클래스로 일련의 인터페이스를 제공하며, 서버나 클라이언트는 이 클래스의 인터페이스 함수를 다시 작성해야 합니다.
클래스 함수 |
설명 |
is_connected() |
현재 연결이 끊어졌는지 확인 |
set_recv_size() |
수신된 데이터의 길이를 설정합니다. 기본값은 1024바이트입니다. |
send() |
인터페이스 함수, 데이터 전송 |
recv() |
인터페이스 함수, 데이터 수신 |
close() |
인터페이스 함수, 연결 닫기 |
before_recv() |
인터페이스 함수, 데이터를 수신하기 전에 실제 상황에 따라 논리를 추가할 수 있으며 이 함수를 다시 작성할 수 있습니다. |
after_recv() |
인터페이스 함수, 데이터를 수신한 후 실제 상황에 따라 논리를 추가할 수 있으며 이 함수를 다시 작성할 수 있습니다. |
after_handle() |
인터페이스 함수, 데이터를 처리한 후 실제 상황에 따라 논리를 추가할 수 있으며 이 함수를 다시 작성할 수 있습니다. |
TcpServer 클래스¶
TcpServer 클래스는 TCP/IP Socket 서버를 캡슐화합니다.
클래스 함수 |
설명 |
bind_and_listen() |
바인드 포트 |
local_socket() |
기본 로컬 Socket 정보 제공 |
remote_socket() |
원격 Socket 정보 제공 |
accept() |
클라이언트 연결 수락 |
send() |
데이터 전송 |
recv() |
데이터 수신 |
close() |
Socket 연결 닫기 |
close_client() |
클라이언트 연결 닫기 |
TcpClinet 클래스¶
TcpClient 클래스는 TCP/IP Socket 클라이언트를 캡슐화합니다.
클래스 속성 |
설명 |
is_bind_port |
포트 바인딩 여부, 서버가 연결된 클라이언트의 포트를 제한하는 경우 이 변수는 True여야 합니다. |
클래스 함수 |
설명 |
send() |
데이터 전송 |
recv() |
데이터 수신 |
close() |
연결 닫기 |
set_timeout() |
제한 시간을 설정합니다. 파라미터의 단위는 초입니다. |
reconnect_server() |
서버 다시 연결 |
after_connect_server() |
인터페이스 함수, 처음으로 서버 연결 성공 후의 작업 |
after_reconnect_server() |
인터페이스 함수, 서버 재접속 성공 후의 작업 |
after_timeout() |
인터페이스 함수, 타임아웃 후의 작업 |
Adapter¶
Adapter 관련 클래스의 소스 파일은 Mech-Center 소프트웨어 설치 경로 아래의 /src/interface/adapter.py
파일에 있습니다.
Adapter 상위 클래스¶
Adapter는 Mech-Viz 시작, Mech-Viz 중지, 태스크 파라미터 설정, 스텝 파라미터 설정 및 Mech-Vision 인식 시작과 같은 기능을 포함하여 Mech-Viz, Mech-Vision, Mech-Center 및 Robserver와 관련된 호출을 캡슐화합니다. Adapter 프로그램이 Mech-Viz 또는 Mech-Vision을 호출할 때마다 Adapter 클래스는 Adapter의 하위 클래스여야 합니다.
Adapter 클래스 속성은 아래와 같습니다.
클래스 속성 |
설명 |
viz_project_dir |
현재 Mech-Viz 프로젝트 경로 |
vision_project_name |
현재 Mech-Vision 프로젝트 경로 |
is_simulate |
Mech-Viz 시뮬레이션 실행 여부 |
is_keep_viz_state |
Mech-Viz가 마지막으로 중지되었을 때의 상태 그대로 계속 유지할지 여부 |
is_save_executor_data |
Mech-Viz 이그제큐터의 데이터 저장 여부 |
is_force_simulate |
시뮬레이션을 강제로 Mech-Viz를 실행할지 여부 |
is_force_real_run |
강제로 Mech-Viz를 실제 실행할지 여부 |
code_signal |
Mech-Center의 메인 인터페이스에서 Adapter 정보를 표시하는 신호(오류 코드가 있는 정보) |
msg_signal |
Mech-Center의 메인 인터페이스에서 Adapter 정보를 표시하는 신호(오류 코드가 없는 정보) |
i_code_signal |
Mech-Center의 메인 인터페이스에서 Mech-Interface 정보를 표시하는 신호(오류 코드가 있는 정보) |
i_msg_signal |
Mech-Center의 메인 인터페이스에서 Mech-Interface 정보를 표시하는 신호(오류 코드가 없는 정보) |
viz_finished_signal |
Mech-Viz 실행 종료 신호(정상 종료 또는 비정상 종료) |
connect_robot_signal |
로봇 신호 연결/분리 |
start_adapter_signal |
Adapter 신호 실행 |
service_name_changed |
Mech-Center 메인 인터페이스는 Mech-Viz 및 Mech-Vision 상태 신호를 표시합니다. |
setting_infos |
Mech-Center의 구성 정보 |
service_name |
등록된 서비스 이름 |
Adapter 클래스 함수는 다음 표에 나와 있습니다.
클래스 함수 |
설명 |
on_exec_status_changed() |
Mech-Viz 및 Mech-Vision에서 보낸 상태 정보 수신 |
register_self_service() |
Adapter 서비스 등록 |
vision_project_dirs(self): |
Mech-Vision 프로젝트 폴더 경로 조회 |
vision_project_names() |
모든 Mech-Vision 프로젝트 명칭 조회 |
vision_project_names_in_center() |
Mech-Center에서 모든 Mech-Vision 프로젝트 명칭 조회 |
is_viz_registered() |
Mech-Viz가 등록되었는지 확인 |
is_viz_in_running() |
Mech-Viz가 실행 중인지 확인 |
is_vision_started() |
Mech-Vision 프로젝트 등록 여부 확인 |
find_services() |
서비스 찾기 |
before_start_viz() |
Mech-Viz가 시작되기 전에 호출할 함수 |
after_start_viz() |
Mech-Viz가 시작된 후 호출할 함수 |
viz_not_registerd() |
Mech-Viz를 시작한 후 Mech-Viz가 등록되지 않은 것으로 확인되면 이 함수가 호출됩니다. |
viz_is_running() |
이 함수는 Mech-Viz를 시작한 후 Mech-Viz가 실행 중인 것으로 확인되면 호출됩니다. |
viz_run_error() |
Mech-Viz 시작 후 Mech-Viz 운행 중 에러 발생 시 호출되는 함수 |
viz_run_finished() |
Mech-Viz 실행이 완료되면 호출되는 함수 |
viz_plan_failed() |
Mech-Viz 계획이 실패할 때 호출되는 함수 |
viz_no_targets() |
Mech-Viz 계획에 이동 포인트가 없을 때 호출되는 함수 |
viz_unreachable_targets() |
Mech-Viz 계획에 도달할 수 없는 포인트가 있을 때 호출되는 함수 |
viz_collision_checked() |
Mech-Viz 계획이 충돌을 감지할 때 호출되는 함수 |
parse_viz_reply() |
Mech-Viz의 응답 구문 분석 |
wait_viz_result() |
Mech-Viz의 회신을 기다림 |
start_viz() |
Mech-Viz 실행 |
stop_viz() |
Mech-Viz 중지 |
pause_viz() |
Mech-Viz 정지 |
find_vision_pose() |
Mech-Vision 프로젝트가 사진을 캡처하도록 트리거하기 |
async_call_vision_run() |
Mech-Vision 프로젝트가 사진을 캡처하도록 비동기식으로 트리거하기 |
async_get_vision_callback() |
Mech-Vision에서 비동기식으로 결과 수신 |
deal_vision_result() |
Mech-Vision의 결과 처리 |
set_step_property() |
Mech-Vision에서 스텝 파라미터 설정 |
read_step_property() |
Mech-Vision에서 스텝 파라미터 읽어내기 |
select_parameter_group() |
Mech-Vision 프로젝트에서 레시피 템플릿 선택 |
set_task_property() |
Mech-Viz에서 태스크 파라미터 설정 |
read_task_property() |
Mech-Viz에서 태스크 파라미터 읽어내기 |
get_digital_in() |
DI 획득 |
set_digital_out() |
DO 설정 |
before_start_adapter() |
이 함수는 Adapter를 시작하기 전에 호출됩니다. |
start() |
Adapter 시작 |
close() |
Adapter 닫기 |
handle_command() |
수신된 외부 명령 처리 |
TcpServerAdapter 클래스¶
TcpServerAdapter 클래스는 Adapter를 상속받아 아래와 같이 TcpServer의 기능을 캡슐화합니다.
class TcpServerAdapter(Adapter):
def __init__(self, host_address, server=TcpServer):
super(TcpServerAdapter, self).__init__()
self.init_server(host_address, server)
def init_server(self, host_address, server=TcpServer):
self._server = server(host_address)
def set_recv_size(self, size):
self._server.set_recv_size(size)
def send(self, msg, is_logging=True):
return self._server.send(msg, is_logging)
def recv(self):
return self._server.recv()
def start(self):
self.before_start_adapter()
while not self.is_stop_adapter:
try:
self._server.before_recv()
cmds = self._server.recv()
logging.info("Received raw data from client:{}".format(cmds))
if not cmds:
logging.warning("Adapter client is disconnected!")
self.code_signal.emit(logging.WARNING, CENTER_CLIENT_DISCONNECTED)
self._server.close_client()
self.accept()
continue
self._server.after_recv()
except socket.error:
logging.warning("Adapter client is closed!")
self.code_signal.emit(logging.WARNING, CENTER_CLIENT_DISCONNECTED)
self._server.close_client()
self.accept()
except Exception as e:
logging.exception("Exception occurred when receiving data from client: {}.".format(e))
else:
try:
self.handle_command(cmds)
self._server.after_handle()
except Exception as e:
self.msg_signal.emit(logging.ERROR, _translate("messages", "Handle command exception: {}".format(e)))
logging.exception("Adapter exception in handle_command(): {}".format(e))
def close(self):
super().close()
self._server.close()
def before_start_adapter(self):
super().before_start_adapter()
self.accept()
def accept(self):
if self.is_stop_adapter:
return
self.code_signal.emit(logging.INFO, CENTER_WAIT_FOR_CLIENT)
self._server.accept()
if self._server.is_connected():
self.code_signal.emit(logging.INFO, CENTER_CLIENT_CONNECTED)
self.msg_signal.emit(logging.INFO, _translate("messages", "Client address is") + " {}".format(self._server.remote_socket()[1]))
TcpClientAdapter 클래스¶
TcpClientAdapter 클래스는 Adapter를 상속받아 아래와 같이 TcpClient의 기능을 캡슐화합니다.
class TcpClientAdapter(Adapter):
def __init__(self, host_address):
super().__init__()
self.init_client(host_address)
def init_client(self, host_address, client=TcpClient):
self._client = client(host_address)
def set_bind_port(self, is_bind=True):
self._client.is_bind_port = is_bind
def set_recv_size(self, size):
self._client.set_recv_size(size)
def send(self, msg, is_logging=True):
self._client.send(msg, is_logging)
def recv(self):
return self._client.recv()
def start(self):
self.reconnect_server(False)
while not self.is_stop_adapter:
try:
self._client.before_recv()
cmds = self._client.recv()
if not cmds:
self.reconnect_server()
continue
logging.info("Received command from server:{}".format(cmds))
self._client.after_recv()
except socket.timeout:
logging.warning("Socket timeout")
self._client.after_timeout()
except socket.error:
sleep(5)
self.reconnect_server()
except Exception as e:
logging.exception("Exception occurred when receiving from server: {}".format(e))
else:
try:
self.handle_command(cmds)
except Exception as e:
self.msg_signal.emit(logging.ERROR, _translate("messages", "Handle command exception: {}".format(e)))
logging.exception("Adapter exception in handle_command(): {}".format(e))
def close(self):
super().close()
self._client.close()
def reconnect_server(self, is_reconnect=True):
self._client.reconnect_server()
if self.is_stop_adapter:
return
if self._client.is_connected():
self.code_signal.emit(logging.INFO, CENTER_CONNECT_TO_SERVER)
else:
self.code_signal.emit(logging.WARNING, CENTER_SERVER_DISCONNECTED)
TcpMultiplexingServerAdapter 클래스¶
TcpMultiplexingServerAdapter 클래스는 Adapter를 상속받아 아래와 같이 다중 클라이언트 연결에 주로 사용됩니다.
class TcpMultiThreadingServerAdapter(Adapter):
def __init__(self, address):
super().__init__()
self._servers = {}
self.add_server(address)
self.sockets = {}
self.clients_ip = {}
self.thread_pool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="tcp_multi_server_thread")
self.thread_id_socket_dict = {}
self.set_recv_size()
def set_recv_size(self, size=1024):
self.recv_size = size
def _find_client_ip(self, sock):
for k, v in self.sockets.items():
if v == sock:
return k
def _find_server(self, sock):
for k, v in self._servers.items():
if v == sock:
return k
def add_server(self, host_address):
server = TcpServer(host_address)
server.bind_and_listen()
self._servers[server] = server.local_socket()
def set_clients_ip(self, clients_ip):
"""
Must be called before start().
`clients_ip` is a dict(key is client ip, value is client description).
"""
self.clients_ip = clients_ip
def add_connection(self, ip_port, sock):
self.sockets[ip_port] = sock
logging.info("Add {}, connections: {}".format(ip_port, self.sockets))
self.msg_signal.emit(logging.INFO, _translate("messages", "The client {} gets online.").format(ip_port))
def del_connection(self, ip):
logging.info("Del {}, connections: {}".format(ip, self.sockets))
if self.client_connection(ip):
self.client_connection(ip).close()
self.sockets.pop(ip)
self.msg_signal.emit(logging.WARNING, _translate("messages", "The client {} gets offline.").format(ip))
def client_connection(self, client_ip):
return self.sockets.get(client_ip)
def check_read_events(self, rs):
for s in rs:
if s in self._servers.values(): # recv connection
server = self._find_server(s)
if self.is_stop_adapter:
return
server.accept()
client_socket, client_addr = server.remote_socket()
ip_port = "{}:{}".format(str(client_addr[0]), str(client_addr[1]))
self.add_connection(ip_port, client_socket)
elif s in self.sockets.values(): # recv data
client_ip = self._find_client_ip(s)
if not client_ip:
continue
msg = self.recv_by_s(s)
if not msg:
self.del_connection(client_ip)
return
try:
future = self.thread_pool.submit(self.handle_command_thread, s, msg)
except Exception as e:
logging.exception("Adapter exception in handle_command(): {}".format(e))
def handle_command_thread(self, s, msg):
thread_id = threading.get_ident()
self.thread_id_socket_dict[thread_id] = s
self.handle_command(msg)
# del self.thread_id_socket_dict[thread_id]
def send(self, msg, is_logging=True):
thread_id = threading.get_ident()
sock = self.thread_id_socket_dict.get(thread_id)
len_total = len(msg)
while msg:
if sock:
len_sent = sock.send(msg)
else:
for v in self.sockets.values():
try:
len_sent = v.send(msg)
except Exception as e:
logging.warning(e)
if not len_sent:
logging.warning("Connection lost, close the client connection.")
return len_sent
if is_logging:
logging.info("Server send: {}, len_sent: {}".format(msg, len_sent))
msg = msg[len_sent:]
return len_total
def recv(self):
thread_id = threading.get_ident()
sock = self.thread_id_socket_dict.get(thread_id)
return self.recv_by_s(sock)
def recv_by_s(self, sock):
msg = b""
try:
msg = sock.recv(self.recv_size)
except socket.error:
logging.error("The client is closed!")
if msg:
logging.info("Received message: {}".format(msg))
return msg
def check_task(self):
"""
Interface.
"""
def close(self):
super().close()
for server in self._servers.keys():
server.close()
for client_ip in self.sockets.keys():
try:
self.client_connection(client_ip).close()
logging.info("Close socket :{}".format(client_ip))
except Exception as e:
logging.warning("Close socket error:{}, exception:{}".format(client_ip, e))
self.sockets = {}
def start(self):
self.before_start_adapter()
while not self.is_stop_adapter:
avalible_sockets = list(self.sockets.values()) + list(self._servers.values())
rs, _, _ = select(avalible_sockets, [], [], 0.1)
self.check_read_events(rs)
try:
self.check_task()
except Exception as e:
self.msg_signal.emit(logging.ERROR,
_translate("messages", "Handle command exception: {}".format(e)))
logging.exception("Exception when check task:{}".format(e))
sleep(5)
IOAdapter 클래스¶
IOAdapter 클래스는 Adapter에서 상속되며 아래와 같이 주기적으로 DI를 얻는 작업을 캡슐화합니다.
class IOAdapter(Adapter):
robot_name = None
check_rate = 0.5
def __init__(self, host_address):
super().__init__()
self.last_gi = 0
def get_digital_in(self, timeout=None):
return super().get_digital_in(self.robot_name, timeout)
def set_digital_out(self, port, value, timeout=None):
super().set_digital_out(self.robot_name, port, value, timeout)
def _check_gi(self):
gi_js = self.get_digital_in()
gi = int(json.loads(gi_js.decode())["value"])
if self.last_gi != gi:
self.last_gi = gi
logging.info("Check GI signal status: {}".format(gi))
self.handle_gi(gi)
def start(self):
self.before_start_adapter()
while not self.is_stop_adapter:
try:
self._check_gi()
except Exception as e:
logging.exception(e)
self.check_gi_failed()
sleep(self.check_rate)
def handle_gi(self, gi):
"""
Interface.
"""
def check_gi_failed(self):
"""
Interface.
"""
AdapterWidget 클래스¶
AdapterWidget 클래스는 Adapter 사용자 인터페이스의 상위 클래스를 정의하여 모든 사용자 인터페이스 정의의 기능은 반드시 아래와 같이 상속되어야 합니다.
class AdapterWidget(QWidget):
def set_adapter(self, adapter):
self.adapter = adapter
self.after_set_adapter()
def after_set_adapter(self):
"""
Interface.
"""
def close(self):
super().close()
"""
Interface.
"""
Service¶
서비스 관련 클래스의 소스 파일은 Mech-Center 소프트웨어 설치 경로 아래의 /src/interface/services.py
파일에 있습니다.
NotifyService 클래스¶
NotifyService 클래스는 아래와 같습니다.
class NotifyService(JsonService):
service_type = "notify"
service_name = "adapter"
def handle_message(self, msg):
"""
Interface.
"""
def notify(self, request, _):
msg = request["notify_message"]
logging.info("notify message:{}".format(msg))
return self.handle_message(msg)
기본 서비스 이름은 adapter이며 프로젝트에서 여러 알림 서비스가 필요한 경우 하위 클래스에서 service_name을 다시 작성하여 다른 서비스를 구별할 수 있습니다. 클래스 함수 설명은 다음 표와 같습니다.
클래스 함수 |
설명 |
handle_message() |
인터페이스 함수, 서브클래스는 이 함수를 재정의하고 이 함수에서 논리를 구현할 수 있습니다. |
notify() |
메시지 구문 분석을 제공하며 일반적으로 하위 클래스를 다시 작성할 필요가 없습니다. |
VisionResultSelectedAtService 클래스¶
VisionResultSelectedAtService 클래스는 아래와 같습니다.
class VisionResultSelectedAtService(JsonService):
service_type = "vision_watcher"
service_name = "vision_watcher_adapter"
def __init__(self):
self.poses = None
def poses_found(self, result):
"""
Interface.
"""
def posesFound(self, request, _):
logging.info("{} result:{}".format(jk.mech_vision, request))
self.poses_found(request)
def poses_planned(self, result):
"""
Interface.
"""
def posesPlanned(self, request, _):
logging.info("Plan result:{}".format(request))
self.poses_planned(request)
def multiPickCombination(self, request, _):
logging.info("multiPickCombination:{}".format(request))
기본 서비스 유형은 vision_watcher이며 다른 유형으로 변경할 수 없으며 기본 이름은 vision_watcher_adapter 입니다. 프로젝트에서 여러 vision_watcher 서비스가 필요한 경우 서브 클래스에서 service_name을 다시 작성하여 다른 서비스를 구분할 수 있습니다. 클래스 함수 설명은 다음 표와 같습니다.
클래스 함수 |
설명 |
poses_found() |
인터페이스 함수, 하위 클래스는 이 함수를 재정의하고 이 함수에서 논리를 구현할 수 있으며 파라미터는 Mech-Vision의 인식 결과입니다. |
posesFound() |
Mech-Vision에서 식별한 메시지를 구문 분석하며 일반적으로 하위 클래스를 다시 작성할 필요가 없습니다. |
poses_planned() |
인터페이스 함수, 파라미터는 Mech-Viz 계획에 의해 선택된 비전 포인트입니다. |
posesPlanned() |
Mech-Viz 계획 메시지 구문 분석 제공 |
RobotService 클래스¶
RobotService 클래스는 아래와 같습니다.
class RobotService(JsonService):
service_type = "robot"
service_name = "robot"
jps = [0, 0, 0, 0, 0, 0]
pose = [0, 0, 0, 1, 0, 0, 0]
def getJ(self, *_):
return {"joint_positions": self.jps}
def setJ(self, jps):
logging.info("setJ:{}".format(jps))
self.jps = jps
def getL(self, *_):
return {"tcp_pose": self.pose}
def getFL(self, *_):
return {"flange_pose": self.pose}
def setL(self, pose):
logging.info("setL:{}".format(pose))
self.pose = pose
def moveXs(self, params, _):
pass
def stop(self, *_):
pass
def setTcp(self, *_):
pass
def setDigitalOut(self, params, _):
pass
def getDigitalIn(self, *_):
pass
def switchPauseContinue(self, *_):
pass
기본 서비스 유형은 robot이며 다른 유형으로 변경할 수 없으며 기본 이름은 robot이며 하위 클래스는 해당 로봇 이름으로 변경해야 합니다. jps 또는 pose 값은 하위 클래스에서 설정해야 합니다. 기능은 Mech-Viz 작동 중 포즈를 수정하는 것입니다. 여기에서 이 포즈가 전체 경로에서 시나리오와 충돌하지 않아야 한다는 점에 유의해야 합니다. 클래스 함수 설명은 다음 표와 같습니다.
클래스 함수 |
설명 |
getJ() |
Mech-Viz/Mech-Vision의 관절 각도 반환 |
setJ() |
외부에서 관절 각도를 설정합니다. 단위는 라디안입니다. |
getL() |
Mech-Viz/Mech-Vision에 TCP 포즈 반환 |
getFL() |
Mech-Viz/Mech-Vision에 플랜지 포즈 반환 |
setL() |
외부적으로 플랜지 포즈(사원수 형식)를 설정합니다. 단위는 미터입니다. |
moveXs() |
Mech-Viz가 경로를 계획한 후 이 함수를 호출합니다. 파라미터에는 이동 포인트의 속성이 포함되어 있습니다. 주의: Mech-Viz 프로젝트에 DI 체크, 분기 등 태스크가 있으면 사전 계획된 경로를 방해합니다. 이때 Mech-Viz는 함수를 여러 번 호출합니다. |
stop() |
로봇 중지, 일반적으로 사용하지 않음 |
setTcp() |
TCP 설정, 일반적으로 사용하지 않음 |
setDigitalOut() |
DO 설정, 일반적으로 사용하지 않음 |
getDigitalIn() |
DI 획득, 일반적으로 사용하지 않음 |
switchPauseContinue() |
로봇 일시 중지/계속 실행하기, 일반적으로 사용하지 않음 |
OuterMoveService 클래스¶
OuterMoveService 클래스는 아래와 같습니다.
class OuterMoveService(JsonService):
service_type = "outer_move"
service_name = "outer_move"
move_target_type = TCP_POSE
velocity = 0.25
acceleration = 0.25
blend_radius = 0.05
motion_type = MOVEJ
is_tcp_pose = False
pick_or_place = 0
def __init__(self):
self.targets = []
def gather_targets(self, di, jps, flange_pose):
"""
Interface.
Please add targets to `self.targets` here if needed.
"""
def add_target(self, move_target_type, target):
self.targets.append({"move_target_type": move_target_type, "target": target})
def getMoveTargets(self, params, *_):
"""
@return: targets(move_target_type 0:jps, 1:tcp_pose, 2:obj_pose)
velocity(default 0.25)
acceleration(default 0.25)
blend_radius(default 0.05)
motion_type(default moveJ 'J':moveJ, 'L':moveL)
is_tcp_pose(default False)
"""
di = params["di"]
jps = params["joint_positions"]
flange_pose = params["pose"]
logging.info("getMoveTargets: di={}, jps={}, flange_pose={}".format(di, jps, flange_pose))
self.gather_targets(di, jps, flange_pose)
targets = self.targets[:]
self.targets.clear()
logging.info("Targets: {}".format(targets))
return {"targets": targets, "velocity": self.velocity, "acceleration": self.acceleration, "blend_radius": self.blend_radius,
"motion_type": self.motion_type, "is_tcp_pose": self.is_tcp_pose, "pick_or_place": self.pick_or_place}
기본 서비스 유형 및 명칭은 outer_move이며, 프로젝트에 여러 outer_move 서비스가 필요한 경우 하위 클래스에서 service_name 을 다시 작성하여 다른 서비스를 구분할 수 있습니다. 클래스 함수 설명은 다음 표와 같습니다.
클래스 함수 |
설명 |
move_target_type() |
이동 포인트 유형,0:jps, 1:tcp_pose, 2:obj_pose |
velocity() |
이동 포인트 속도, 기본값은 0.25입니다. |
acceleration() |
이동 포인트 가속도, 기본값은 0.25입니다. |
blend_radius() |
이동 포인트의 회전 반경, 기본값은 0.05m입니다. |
motion_type() |
이동 포인트 운동 유형,'J':moveJ, 'L':moveL |
is_tcp_pose() |
이동 포인트가 TCP인지 여부 |
gather_targets() |
인터페이스 함수는 모든 이동 포인트를 수집하는데 이때 로봇의 관절 각도, 플랜지 포즈, DI 값을 파라미터로 하며, 서브 클래스는 필요에 따라 판단 및 수정할 수 있습니다. |
add_target() |
단일 이동 포인트를 추가합니다. 이 함수는 하위 클래스에서 호출하여 이동을 추가할 수 있습니다. |
getMoveTargets() |
이 함수는 Mech-Viz가 외부 이동을 수행할 때 호출되며, 이때 파라미터에 로봇의 관절 각도, 플랜지 포즈 및 DI 값이 포함됩니다. |
서비스 등록¶
위 네 가지 클래스에 해당하는 서비스는 회원가입 후 이용이 가능하며, 서비스 등록 함수는 다음과 같습니다.
def register_service(hub_caller, service, other_info=None):
server, port = start_server(service)
if service.service_type == "robot":
other_info["from_adapter"] = True
other_info["simulate"] = False
hub_caller.register_service(service.service_type, service.service_name, port, other_info)
return server, port