Cyber RT 是百度开源的一个高性能、灵活的机器人操作系统,cyber record
是 Cyber RT 中用于录制和回放数据的工具。下面是一个使用 Python 解析 cyber record
文件的示例,该示例使用 cyber_py
库(Cyber RT 的 Python 绑定)来读取记录文件并打印消息信息。
目录
一、环境准备
二、示例代码
三、使用说明
四、关键实现说明
一、环境准备
确保已经安装了 Cyber RT 开发环境,并且 cyber_py
库可以正常使用。
二、示例代码
以下是一个基于 cyber_py3
库的 Cyber Record 文件解析示例代码,支持读取目录或单个文件,并自动过滤非 Record 文件:
python">import argparse
import os
from cyber_py3 import record
from cyber_py3.record import RecordReader, RecordWriter, RecordMessage
def parse_record_file(file_path):
"""
解析单个 Cyber Record 文件
"""
try:
reader = RecordReader(file_path)
print(f"\n===== 正在解析文件: {os.path.basename(file_path)} =====")
print(f"总消息数: {reader.get_messagenumber()}")
print(f"开始时间: {reader.get_starttime()}")
print(f"结束时间: {reader.get_endtime()}")
print(f"通道列表: {reader.get_channellist()}\n")
# 遍历所有消息
for channel_name, msg, datatype, timestamp in reader.read_messages():
print(f"[通道] {channel_name}")
print(f" 时间戳: {timestamp}")
print(f" 数据类型: {datatype}")
print(f" 消息长度: {len(msg)} bytes")
print("-" * 60)
except Exception as e:
print(f"解析文件 {file_path} 失败: {str(e)}")
def parse_record_directory(directory):
"""
解析目录下的所有 Cyber Record 文件
"""
if not os.path.isdir(directory):
print(f"错误: {directory} 不是有效目录")
return
for filename in os.listdir(directory):
file_path = os.path.join(directory, filename)
# 通过文件头验证是否为合法 Record 文件
if record.is_valid_record_file(file_path):
parse_record_file(file_path)
else:
print(f"跳过非 Record 文件: {filename}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Cyber Record 文件解析工具')
parser.add_argument('path', type=str, help='文件路径或目录路径')
args = parser.parse_args()
target_path = args.path
if os.path.isfile(target_path):
if record.is_valid_record_file(target_path):
parse_record_file(target_path)
else:
print(f"错误: {target_path} 不是有效的 Cyber Record 文件")
elif os.path.isdir(target_path):
parse_record_directory(target_path)
else:
print(f"错误: {target_path} 不存在")
三、使用说明
-
运行示例:
# 解析单个文件 python parse_record.py /path/to/your.record # 解析目录 python parse_record.py /path/to/record_dir/
-
功能特性:
-
自动验证文件有效性(通过文件头校验)
-
显示文件元信息:消息数量、时间范围、通道列表
-
支持解析消息头信息(通道、时间戳、数据类型)
-
自动跳过无效文件和非 Record 文件
-
四、关键实现说明
-
文件验证:
-
使用
record.is_valid_record_file()
方法进行二进制验证 -
比单纯检查文件扩展名更可靠
-
-
消息遍历:
-
reader.read_messages()
生成器逐条读取消息 -
返回元组:(channel_name, message, data_type, timestamp)
-
-
性能优化:
-
按需解析消息内容(当前示例仅读取元信息)
-
支持大文件流式读取(不加载全部内容到内存)
-