仅使用 Mech-Vision 发送视觉点

本节详细介绍一个仅使用 Mech-Vision 发送视觉点的 Adapter 样例程序。本节包含以下内容:

背景介绍

本样例针对曲轴上料的应用场景,相机固定安装在料框上方支架上,Mech-Vision 进行拍照并输出一个可抓工件的坐标给机器人端。

本样例使用 Mech-Vision 内置的示例工程“大尺寸非平面工件”(文件 ‣ 浏览示例工程 ‣ 上下料 ‣ 大尺寸非平面工件)。

该工程采用 3D 模板匹配的算法,对不同的工件需要设置不同的模板文件和抓取点。因此,需要在 Mech-Vision 中设置参数配方,配方编号(工件编号)在机器人端发送拍照指令时进行设置。关于如何配置参数配方以及查看配方编号,请参考 参数配方 章节。

../../../../../_images/sample1.png

通信方案

机器人与安装 Mech-Mind 视觉系列软件的工控机采用 TCP/IP Socket 协议进行通信,通信格式为 ASCII 字符串,使用英文逗号(,)作为数据分隔符。 其中,视觉系列软件作为通信的服务端,机器人作为客户端。

通信流程如下图所示。

../../../../../_images/adapter_sample1_interaction.png

通信流程详细描述如下:

  1. Mech-Center 等待机器人发送拍照指令 P 和配方编号。

  2. Mech-Center 触发 Mech-Vision 切换参数配方。

  3. Mech-Center 触发 Mech-Vision 拍照识别。

  4. Mech-Vision 拍照识别成功后,返回状态码和视觉点给 Mech-Center。

  5. Mech-Center 将状态码和视觉点返回给机器人。

注解

为了方便机器人抓取,Mech-Center 将待抓工件的坐标转换为机器人 TCP 坐标。

通信报文格式

具体通信报文格式如下表所示。

请求命令

工件编号

发送(机器人 -> 工控机)

P

整数,取值范围: 1~100

状态码

视觉点(TCP坐标)

接收(工控机 -> 机器人)

整数,取值范围:0~4,0-正常识别;1-错误的命令码;2-Vision 工程未注册;3-无视觉点;4-无点云

6个浮点型数据,以逗号(,)隔开,格式为:x,y,z,a,b,c

注解

响应报文的长度固定。如果响应报文的状态码为异常码(1~4),视觉点数据用 0 补齐。

通信报文样例

请求报文

P,1

正常响应报文

0,1994.9217,-192.198,506.4646,-23.5336,-0.2311,173.6517

注解

本样例中,Mech-Vision 识别正常,返回的 TCP 坐标为:1994.9217,-192.198,506.4646,-23.5336,-0.2311,173.6517。

异常响应报文:错误的命令码

1,0,0,0,0,0,0

异常响应报文:Vision 工程未注册

2,0,0,0,0,0,0

异常响应报文:无视觉点

3,0,0,0,0,0,0

异常响应报文:无点云

4,0,0,0,0,0,0

编程思路

为了实现项目目标,本样例按照下图所示的思路编写 Adapter 程序。

../../../../../_images/adapter_sample1_guidelines.png

上图仅列出了样例程序的报文处理逻辑。下节将详细解释样例程序。

样例程序详解

注解

点击下载 Adpater 样例程序

引入 Python 包

导入 Adapter 程序依赖的所有模块。

import json
import logging
import math
import sys
from time import sleep
import os

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")))
from transforms3d import euler
from interface.adapter import TcpServerAdapter, TcpClientAdapter
from util.transforms import object2tcp

定义类

定义继承“TcpServerAdapter”父类的“TestAdapter”子类。

class TestAdapter(TcpServerAdapter):
    vision_project_name = "Large_Non_Planar_Workpieces"
    # vision_project_name = 'Vis-2StationR7-WorkobjectRecognition-L1'
    is_force_real_run = True
    service_name = "test Adapter"

    def __init__(self, address):
        super().__init__(address)
        self.robot_service = None
        self.set_recv_size(1024)

注解

本样例将 Adapter 程序定义为 TCP/IP Socket 通信的服务端。

设置接收指令与处理逻辑

设置接收请求(包括拍照指令和参数配方)的处理逻辑。

# Receive command _create_received_section
def handle_command(self, cmds):
    photo_cmd, *extra_cmds = cmds.decode().split(',')
    recipe = extra_cmds[0]
    # Check command validity _check_cmd_validity_section
    if photo_cmd != 'P':
        self.msg_signal.emit(logging.ERROR, 'Illegal command: {}'.format(photo_cmd))
        self.send(('1' + '' + '').encode())
        return
    # Check whether vision is registered _check_vision_service_section
    if not self.is_vision_started():
        self.msg_signal.emit(logging.ERROR, 'Vision not registered: {}'.format(self.vision_project_name))
        self.send(('2' + '' + '').encode())
        return
    # Change TODO parameter "extra_cmds" according to actual conditions
    sleep(0.1)  # wait for a cycle of getting in Vision
    # _check_vision_result_function_section

    try:
        result = self.select_parameter_group(self.vision_project_name, int(recipe) - 1)
        if result:
            result = result.decode()
            if result.startswith("CV-E0401"):
                self.send(('5' + '' + '').encode())
                return
            elif result.startswith("CV-E0403"):
                self.send(('5' + '' + '').encode())
                return
            raise RuntimeError(result)
    except Exception as e:
        logging.exception('Exception happened when switching model: {}'.format(e))
        self.send(('5' + '' + '').encode())
        return
    self.show_custom_message(logging.INFO, "Switched model for project successfully")

    self.msg_signal.emit(logging.WARNING, 'Started capturing image')
    try:
        self.check_vision_result(json.loads(self.find_vision_pose().decode()))

    except Exception as e:
        self.msg_signal.emit(logging.ERROR, 'Calling project timed out. Please check whether the project is correct: {}'.format(e))
        self.send(('2' + '' + '').encode())

注解

“handle_command”函数作为 TCP/IP Socket 服务端接收报文的处理入口。

定义 Mech-Vision 视觉结果检查

设置 Adapter 检查 Mech-Vision 输出的视觉结果。

# Check vision results
def check_vision_result(self, vision_result, at=None):

    noCloudInRoi = vision_result.get('noCloudInRoi', True)
    if noCloudInRoi:
        self.msg_signal.emit(logging.ERROR, 'No point clouds')
        self.send(('4' + '' + '').encode())
        return

    poses = vision_result.get('poses')
    labels = vision_result.get('labels')
    if not poses or not poses[0]:
        self.msg_signal.emit(logging.ERROR, 'No visual points')
        self.send(('3' + '' + '').encode())
        return

    self.send(self.pack_pose(poses, labels).encode())
    self.msg_signal.emit(logging.INFO, 'Sent TCP successfully')

设置视觉点输出格式

设置视觉点的对外输出格式。

# Pack pose _pack_pose_section
def pack_pose(self, poses, labels, at=None):

    pack_count = min(len(poses), 1)
    msg_body = ''
    for i in range(pack_count):
        pose = poses[i]
        object2tcp(pose)
        t = [p * 1000 for p in pose[:3]]
        r = [math.degrees(p) for p in euler.quat2euler(pose[3:], 'rzyx')]
        p = t + r
        self.msg_signal.emit(logging.INFO, 'Sent pose: {}'.format(p))
        msg_body += ('{:.4f},' * (len(p) - 1) + '{:.4f}').format(*p)

        if i != (pack_count - 1):
            msg_body += ','
    return '{},'.format(0) + msg_body + ''

定义 Adapter 的关闭操作

定义如何关闭 Adapter。

def close(self):
    super().close()