抽象父类接口
抽象父类接口是指当子类继承父类时可以根据实际需求进行重写的函数。本节介绍以下几个抽象父类:
Communication类
Communication 类是负责通信的基础类,提供了一系列接口,服务端或客户端需要重写此类的接口函数。
类函数 | 说明 |
---|---|
is_connected() |
判断当前连接是否建立。 |
set_recv_size() |
设置接收数据的长度,默认是 1024 字节。 |
send() |
接口函数,发送数据。 |
recv() |
接口函数,接收数据。 |
close() |
接口函数,关闭连接。 |
before_recv() |
接口函数,在接收数据之前可根据实际添加逻辑,可重写此函数。 |
after_recv() |
接口函数,在接收到数据之后可根据实际添加逻辑,可重写此函数。 |
after_handle() |
接口函数,在处理数据之后可根据实际添加逻辑,可重写此函数。 |
TcpServer类
TcpServer类封装了一个TCP/IP Socket服务端。
类函数 | 说明 |
---|---|
bind_and_listen() |
绑定端口。 |
local_socket() |
提供本机 Socket 信息。 |
remote_socket() |
提供远程 Socket 信息。 |
accept() |
接受客户端连接。 |
send() |
发送数据。 |
recv() |
接收数据。 |
close() |
关闭 Socket 连接。 |
close_client() |
关闭客户端连接。 |
TcpClinet类
TcpClient类封装了一个TCP/IP Socket客户端。
类属性 | 说明 |
---|---|
is_bind_port |
是否绑定端口,如果服务端限制了所连接的客户端的端口,此变量需要为 True |
类函数 | 说明 |
---|---|
send() |
发送数据。 |
recv() |
接收数据。 |
close() |
关闭连接。 |
set_timeout() |
设置超时,参数的单位为秒。 |
reconnect_server() |
重连服务端。 |
after_connect_server() |
接口函数,首次连接服务端成功后的操作。 |
after_reconnect_server() |
接口函数,重连服务端成功后的操作。 |
after_timeout() |
接口函数,超时之后的操作。 |
Adapter基类
Adapter封装了Mech-Viz、Mech-Vision、Mech-Center、Robserver相关的调用,包括启动Mech-Viz、停止Mech-Viz、设置Mech-Vision或Mech-Viz步骤参数、启动Mech-Vision识别等功能。Adapter程序只要调用Mech-Viz或Mech-Vision,则必然需继承Adapter类。
Adapter 类属性如下表所示。
类属性 | 说明 |
---|---|
viz_project_dir |
当前Mech-Viz工程路径 |
vision_project_name |
当前Mech-Vision工程名称 |
is_simulate |
是否仿真运行Mech-Viz |
is_keep_viz_state |
是否保持Mech-Viz上次停止时的运行状态 |
is_save_executor_data |
是否保存Mech-Viz执行器的数据 |
is_force_simulate |
是否强制仿真运行Mech-Viz |
is_force_real_run |
是否强制真实运行Mech-Viz |
code_signal |
在 Mech-Center 主界面显示Adapter信息的信号(信息带错误码) |
msg_signal |
在 Mech-Center 主界面显示Adapter信息的信号(信息不带错误码) |
i_code_signal |
在 Mech-Center 主界面显示Mech-Interface信息的信号(信息带错误码) |
i_msg_signal |
在 Mech-Center 主界面显示Mech-Interface信息的信号(信息不带错误码) |
viz_finished_signal |
Mech-Viz 运行结束信号(正常结束或异常结束) |
connect_robot_signal |
连接/断开机器人信号 |
start_adapter_signal |
启动 Adapter 信号 |
service_name_changed |
Mech-Center主界面显示Mech-Viz和Mech-Vision状态的信号 |
setting_infos |
Mech-Center的配置信息 |
service_name |
注册的服务名称 |
Adapter 类函数如下表所示。
类函数 | 说明 |
---|---|
on_exec_status_changed() |
接收来自Mech-Viz和Mech-Vision的状态信息。 |
register_self_service() |
注册 Adapter 服务。 |
vision_project_dirs(self): |
查询Mech-Vision工程文件夹路径。 |
vision_project_names() |
查询所有的Mech-Vision工程名称。 |
vision_project_names_in_center() |
查询在Mech-Center中已加载的所有Mech-Vision工程名称。 |
is_viz_registered() |
判断Mech-Viz是否已注册。 |
is_viz_in_running() |
判断Mech-Viz是否在运行。 |
is_vision_started() |
判断Mech-Vision工程是否已注册。 |
find_services() |
查找服务。 |
before_start_viz() |
在Mech-Viz启动前调用的函数。 |
after_start_viz() |
在Mech-Viz启动后调用的函数。 |
viz_not_registerd() |
启动Mech-Viz之后发现Mech-Viz未注册,则会调用此函数。 |
viz_is_running() |
启动Mech-Viz之后发现Mech-Viz正在运行,则会调用此函数。 |
viz_run_error() |
启动Mech-Viz之后,Mech-Viz运行过程中发生错误,则会调用此函数。 |
viz_run_finished() |
当Mech-Viz运行结束后调用的函数。 |
viz_plan_failed() |
当Mech-Viz规划失败后调用的函数。 |
viz_no_targets() |
当Mech-Viz规划没有移动点位后调用的函数。 |
viz_unreachable_targets() |
当Mech-Viz规划出现不可达点位后调用的函数。 |
viz_collision_checked() |
当Mech-Viz规划检测到碰撞后调用的函数。 |
parse_viz_reply() |
解析Mech-Viz的回复。 |
wait_viz_result() |
等待Mech-Viz的回复。 |
start_viz() |
启动 Mech-Viz。 |
stop_viz() |
停止Mech-Viz。 |
pause_viz() |
暂停Mech-Viz。 |
find_vision_pose() |
触发Mech-Vision工程拍照。 |
async_call_vision_run() |
异步触发Mech-Vision工程拍照。 |
async_get_vision_callback() |
异步接收Mech-Vision的结果。 |
deal_vision_result() |
处理Mech-Vision的结果。 |
set_step_property() |
设置Mech-Vision中步骤参数。 |
read_step_property() |
读取Mech-Vision中步骤参数。 |
select_parameter_group() |
选择Mech-Vision工程中的配方模板。 |
set_task_property() |
设置Mech-Viz中的步骤参数。 |
read_task_property() |
读取Mech-Viz中的步骤参数。 |
get_digital_in() |
获取 DI。 |
set_digital_out() |
设置DO。 |
before_start_adapter() |
在启动Adapter之前会调用此函数。 |
start() |
启动Adapter。 |
close() |
关闭Adapter。 |
handle_command() |
处理接收到的外部命令。 |
TcpServerAdapter类
TcpServerAdapter类继承自 Adapter,并封装了TcpServer的功能,具体如下所示。
class TcpServerAdapter(Adapter):
def __init__(self, host_address, server=TcpServer):
super(TcpServerAdapter, self).__init__()
self.init_server(host_address, server)
def init_server(self, host_address, server=TcpServer):
self._server = server(host_address)
def set_recv_size(self, size):
self._server.set_recv_size(size)
def send(self, msg, is_logging=True):
return self._server.send(msg, is_logging)
def recv(self):
return self._server.recv()
def start(self):
self.before_start_adapter()
while not self.is_stop_adapter:
try:
self._server.before_recv()
cmds = self._server.recv()
logging.info("Received raw data from client:{}".format(cmds))
if not cmds:
logging.warning("Adapter client is disconnected!")
self.code_signal.emit(logging.WARNING, CENTER_CLIENT_DISCONNECTED)
self._server.close_client()
self.accept()
continue
self._server.after_recv()
except socket.error:
logging.warning("Adapter client is closed!")
self.code_signal.emit(logging.WARNING, CENTER_CLIENT_DISCONNECTED)
self._server.close_client()
self.accept()
except Exception as e:
logging.exception("Exception occurred when receiving data from client: {}.".format(e))
else:
try:
self.handle_command(cmds)
self._server.after_handle()
except Exception as e:
self.msg_signal.emit(logging.ERROR, _translate("messages", "Handle command exception: {}".format(e)))
logging.exception("Adapter exception in handle_command(): {}".format(e))
def close(self):
super().close()
self._server.close()
def before_start_adapter(self):
super().before_start_adapter()
self.accept()
def accept(self):
if self.is_stop_adapter:
return
self.code_signal.emit(logging.INFO, CENTER_WAIT_FOR_CLIENT)
self._server.accept()
if self._server.is_connected():
self.code_signal.emit(logging.INFO, CENTER_CLIENT_CONNECTED)
self.msg_signal.emit(logging.INFO, _translate("messages", "Client address is") + " {}".format(self._server.remote_socket()[1]))
TcpClientAdapter类
TcpClientAdapter类继承自 Adapter,封装了TcpClient的功能,具体如下所示。
class TcpClientAdapter(Adapter):
def __init__(self, host_address):
super().__init__()
self.init_client(host_address)
def init_client(self, host_address, client=TcpClient):
self._client = client(host_address)
def set_bind_port(self, is_bind=True):
self._client.is_bind_port = is_bind
def set_recv_size(self, size):
self._client.set_recv_size(size)
def send(self, msg, is_logging=True):
self._client.send(msg, is_logging)
def recv(self):
return self._client.recv()
def start(self):
self.reconnect_server(False)
while not self.is_stop_adapter:
try:
self._client.before_recv()
cmds = self._client.recv()
if not cmds:
self.reconnect_server()
continue
logging.info("Received command from server:{}".format(cmds))
self._client.after_recv()
except socket.timeout:
logging.warning("Socket timeout")
self._client.after_timeout()
except socket.error:
sleep(5)
self.reconnect_server()
except Exception as e:
logging.exception("Exception occurred when receiving from server: {}".format(e))
else:
try:
self.handle_command(cmds)
except Exception as e:
self.msg_signal.emit(logging.ERROR, _translate("messages", "Handle command exception: {}".format(e)))
logging.exception("Adapter exception in handle_command(): {}".format(e))
def close(self):
super().close()
self._client.close()
def reconnect_server(self, is_reconnect=True):
self._client.reconnect_server()
if self.is_stop_adapter:
return
if self._client.is_connected():
self.code_signal.emit(logging.INFO, CENTER_CONNECT_TO_SERVER)
else:
self.code_signal.emit(logging.WARNING, CENTER_SERVER_DISCONNECTED)
TcpMultiplexingServerAdapter类
TcpMultiplexingServerAdapter类继承自Adapter,主要用于多个客户端的连接,具体如下所示。
class TcpMultiThreadingServerAdapter(Adapter):
def __init__(self, address):
super().__init__()
self._servers = {}
self.add_server(address)
self.sockets = {}
self.clients_ip = {}
self.thread_pool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="tcp_multi_server_thread")
self.thread_id_socket_dict = {}
self.set_recv_size()
def set_recv_size(self, size=1024):
self.recv_size = size
def _find_client_ip(self, sock):
for k, v in self.sockets.items():
if v = sock:
return k
def _find_server(self, sock):
for k, v in self._servers.items():
if v = sock:
return k
def add_server(self, host_address):
server = TcpServer(host_address)
server.bind_and_listen()
self._servers[server] = server.local_socket()
def set_clients_ip(self, clients_ip):
"""
Must be called before start().
`clients_ip` is a dict(key is client ip, value is client description).
"""
self.clients_ip = clients_ip
def add_connection(self, ip_port, sock):
self.sockets[ip_port] = sock
logging.info("Add {}, connections: {}".format(ip_port, self.sockets))
self.msg_signal.emit(logging.INFO, _translate("messages", "The client {} gets online.").format(ip_port))
def del_connection(self, ip):
logging.info("Del {}, connections: {}".format(ip, self.sockets))
if self.client_connection(ip):
self.client_connection(ip).close()
self.sockets.pop(ip)
self.msg_signal.emit(logging.WARNING, _translate("messages", "The client {} gets offline.").format(ip))
def client_connection(self, client_ip):
return self.sockets.get(client_ip)
def check_read_events(self, rs):
for s in rs:
if s in self._servers.values(): # recv connection
server = self._find_server(s)
if self.is_stop_adapter:
return
server.accept()
client_socket, client_addr = server.remote_socket()
ip_port = "{}:{}".format(str(client_addr[0]), str(client_addr[1]))
self.add_connection(ip_port, client_socket)
elif s in self.sockets.values(): # recv data
client_ip = self._find_client_ip(s)
if not client_ip:
continue
msg = self.recv_by_s(s)
if not msg:
self.del_connection(client_ip)
return
try:
future = self.thread_pool.submit(self.handle_command_thread, s, msg)
except Exception as e:
logging.exception("Adapter exception in handle_command(): {}".format(e))
def handle_command_thread(self, s, msg):
thread_id = threading.get_ident()
self.thread_id_socket_dict[thread_id] = s
self.handle_command(msg)
# del self.thread_id_socket_dict[thread_id]
def send(self, msg, is_logging=True):
thread_id = threading.get_ident()
sock = self.thread_id_socket_dict.get(thread_id)
len_total = len(msg)
while msg:
if sock:
len_sent = sock.send(msg)
else:
for v in self.sockets.values():
try:
len_sent = v.send(msg)
except Exception as e:
logging.warning(e)
if not len_sent:
logging.warning("Connection lost, close the client connection.")
return len_sent
if is_logging:
logging.info("Server send: {}, len_sent: {}".format(msg, len_sent))
msg = msg[len_sent:]
return len_total
def recv(self):
thread_id = threading.get_ident()
sock = self.thread_id_socket_dict.get(thread_id)
return self.recv_by_s(sock)
def recv_by_s(self, sock):
msg = b""
try:
msg = sock.recv(self.recv_size)
except socket.error:
logging.error("The client is closed!")
if msg:
logging.info("Received message: {}".format(msg))
return msg
def check_task(self):
"""
Interface.
"""
def close(self):
super().close()
for server in self._servers.keys():
server.close()
for client_ip in self.sockets.keys():
try:
self.client_connection(client_ip).close()
logging.info("Close socket :{}".format(client_ip))
except Exception as e:
logging.warning("Close socket error:{}, exception:{}".format(client_ip, e))
self.sockets = {}
def start(self):
self.before_start_adapter()
while not self.is_stop_adapter:
avalible_sockets = list(self.sockets.values()) + list(self._servers.values())
rs, _, _ = select(avalible_sockets, [], [], 0.1)
self.check_read_events(rs)
try:
self.check_task()
except Exception as e:
self.msg_signal.emit(logging.ERROR,
_translate("messages", "Handle command exception: {}".format(e)))
logging.exception("Exception when check task:{}".format(e))
sleep(5)
IOAdapter类
IOAdapter类继承自Adapter,封装了循环获取DI的操作,具体如下所示。
class IOAdapter(Adapter):
robot_name = None
check_rate = 0.5
def __init__(self, host_address):
super().__init__()
self.last_gi = 0
def get_digital_in(self, timeout=None):
return super().get_digital_in(self.robot_name, timeout)
def set_digital_out(self, port, value, timeout=None):
super().set_digital_out(self.robot_name, port, value, timeout)
def _check_gi(self):
gi_js = self.get_digital_in()
gi = int(json.loads(gi_js.decode())["value"])
if self.last_gi != gi:
self.last_gi = gi
logging.info("Check GI signal status: {}".format(gi))
self.handle_gi(gi)
def start(self):
self.before_start_adapter()
while not self.is_stop_adapter:
try:
self._check_gi()
except Exception as e:
logging.exception(e)
self.check_gi_failed()
sleep(self.check_rate)
def handle_gi(self, gi):
"""
Interface.
"""
def check_gi_failed(self):
"""
Interface.
"""
AdapterWidget类
AdapterWidget类是定制Adapter用户界面的父类,任何定制用户界面的功能必须继承它,具体如下所示。
class AdapterWidget(QWidget):
def set_adapter(self, adapter):
self.adapter = adapter
self.after_set_adapter()
def after_set_adapter(self):
"""
Interface.
"""
def close(self):
super().close()
"""
Interface.
"""
NotifyService类
NotifyService 类如下所示。
class NotifyService(JsonService):
service_type = "notify"
service_name = "adapter"
def handle_message(self, msg):
"""
Interface.
"""
def notify(self, request, _):
msg = request["notify_message"]
logging.info("notify message:{}".format(msg))
return self.handle_message(msg)
默认服务名称为adapter,若项目中需要多个通知服务,可在子类中重写service_name来区分不同的服务。类函数说明如下表所示。
类函数 | 说明 |
---|---|
handle_message() |
接口函数,子类可重写此函数,在此函数中实现逻辑。 |
notify() |
提供了对消息的解析,子类一般不需要重写。 |
VisionResultSelectedAtService类
VisionResultSelectedAtService类如下所示。
class VisionResultSelectedAtService(JsonService):
service_type = "vision_watcher"
service_name = "vision_watcher_adapter"
def __init__(self):
self.poses = None
def poses_found(self, result):
"""
Interface.
"""
def posesFound(self, request, _):
logging.info("{} result:{}".format(jk.mech_vision, request))
self.poses_found(request)
def poses_planned(self, result):
"""
Interface.
"""
def posesPlanned(self, request, _):
logging.info("Plan result:{}".format(request))
self.poses_planned(request)
def multiPickCombination(self, request, _):
logging.info("multiPickCombination:{}".format(request))
默认服务类型为vision_watcher,类型不可修改为其他;默认名称为vision_watcher_adapter ,若项目中需要多个vision_watcher服务,可在子类中重写service_name以区分不同的服务。类函数说明如下表所示。
类函数 | 说明 |
---|---|
poses_found() |
接口函数,子类可重写此函数,在此函数中实现逻辑,参数是 Mech-Vision 的识别结果。 |
posesFound() |
解析 Mech-Vision 识别的消息,子类一般不需要重写。 |
poses_planned() |
接口函数,参数是 Mech-Viz 规划选择的视觉点。 |
posesPlanned() |
提供了对 Mech-Viz 规划消息解析。 |
RobotService类
RobotService类如下所示。
class RobotService(JsonService):
service_type = "robot"
service_name = "robot"
jps = [0, 0, 0, 0, 0, 0]
pose = [0, 0, 0, 1, 0, 0, 0]
def getJ(self, *_):
return {"joint_positions": self.jps}
def setJ(self, jps):
logging.info("setJ:{}".format(jps))
self.jps = jps
def getL(self, *_):
return {"tcp_pose": self.pose}
def getFL(self, *_):
return {"flange_pose": self.pose}
def setL(self, pose):
logging.info("setL:{}".format(pose))
self.pose = pose
def moveXs(self, params, _):
pass
def stop(self, *_):
pass
def setTcp(self, *_):
pass
def setDigitalOut(self, params, _):
pass
def getDigitalIn(self, *_):
pass
def switchPauseContinue(self, *_):
pass
默认服务类型为robot,类型不可修改为其他;默认名称为robot,子类中需要修改为对应的机器人名称。子类中需要设置jps或pose值,作用是在Mech-Viz运行过程中固定一个位姿,此处需注意,此位姿需要在整个轨迹中不会和场景发生碰撞。类函数说明如下表所示。
类函数 | 说明 |
---|---|
getJ() |
给Mech-Viz/Mech-Vision返回关节角。 |
setJ() |
外部设置关节角,注意单位是弧度。 |
getL() |
给Mech-Viz/Mech-Vision返回工具位姿。 |
getFL() |
给Mech-Viz/Mech-Vision返回法兰位姿。 |
setL() |
外部设置法兰位姿(四元数形式),注意单位是米。 |
moveXs() |
Mech-Viz 在规划完路径后,会调用此函数,参数里包含了移动点的属性,注意:若 Mech-Viz 工程中存在检查DI、分支等会打断预规划的步骤时,Mech-Viz 会调用该函数多次。 |
stop() |
停止机器人,一般不用。 |
setTcp() |
设置TCP,一般不用。 |
setDigitalOut() |
设置DO,一般不用。 |
getDigitalIn() |
获取DI,一般不用。 |
switchPauseContinue() |
暂停/继续机器人,一般不用。 |
OuterMoveService类
OuterMoveService类如下所示。
class OuterMoveService(JsonService):
service_type = "outer_move"
service_name = "outer_move"
move_target_type = TCP_POSE
velocity = 0.25
acceleration = 0.25
blend_radius = 0.05
motion_type = MOVEJ
is_tcp_pose = False
pick_or_place = 0
def __init__(self):
self.targets = []
def gather_targets(self, di, jps, flange_pose):
"""
Interface.
Please add targets to `self.targets` here if needed.
"""
def add_target(self, move_target_type, target):
self.targets.append({"move_target_type": move_target_type, "target": target})
def getMoveTargets(self, params, *_):
"""
@return: targets(move_target_type 0:jps, 1:tcp_pose, 2:obj_pose)
velocity(default 0.25)
acceleration(default 0.25)
blend_radius(default 0.05)
motion_type(default moveJ 'J':moveJ, 'L':moveL)
is_tcp_pose(default False)
"""
di = params["di"]
jps = params["joint_positions"]
flange_pose = params["pose"]
logging.info("getMoveTargets: di={}, jps={}, flange_pose={}".format(di, jps, flange_pose))
self.gather_targets(di, jps, flange_pose)
targets = self.targets[:]
self.targets.clear()
logging.info("Targets: {}".format(targets))
return {"targets": targets, "velocity": self.velocity, "acceleration": self.acceleration, "blend_radius": self.blend_radius,
"motion_type": self.motion_type, "is_tcp_pose": self.is_tcp_pose, "pick_or_place": self.pick_or_place}
默认服务类型和名称为outer_move,若项目中需要多个outer_move 服务,可在子类中重写service_name以区分不同的服务。类函数说明如下表所示。
类函数 | 说明 |
---|---|
move_target_type() |
移动点类型,0:jps, 1:tcp_pose, 2:obj_pose。 |
velocity() |
移动点速度,默认为 0.25。 |
acceleration() |
移动点加速度,默认为 0.25。 |
blend_radius() |
移动点转弯半径,默认为 0.05m。 |
motion_type() |
移动点运动类型,’J’:moveJ, ‘L’:moveL。 |
is_tcp_pose() |
移动点是否为 TCP。 |
gather_targets() |
接口函数,收集所有移动点,参数是此时机器人的关节角、法兰位姿和 DI 值,子类可根据需要进行判断和修改。 |
add_target() |
添加单个移动点,子类中可调用此函数来添加移动点。 |
getMoveTargets() |
Mech-Viz 执行到外部移动时,会调用此函数,参数包含了此时机器人的关节角、法兰位姿和 DI 值。 |
注册服务
以上四种类对应的服务只有在注册后才能使用,注册服务函数如下所示。
def register_service(hub_caller, service, other_info=None):
server, port = start_server(service)
if service.service_type = "robot":
other_info["from_adapter"] = True
other_info["simulate"] = False
hub_caller.register_service(service.service_type, service.service_name, port, other_info)
return server, port