import sys
import os
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
import re
import tqdm
import torch
import copy
import json
import random
import argparse
import itertools
import quaternion
import transformers
import numpy as np
from typing import Any
from omegaconf import OmegaConf
from PIL import Image, ImageFile, ImageDraw, ImageFont
from collections import OrderedDict
from torch.nn.utils.rnn import pad_sequence
from depth_camera_filtering import filter_depth
from transformers.image_utils import to_numpy_array
import habitat
from habitat import logger, Env
from habitat_extensions import measures
from habitat.config.default import get_agent_config
from habitat_baselines.config.default import get_config as get_habitat_config
from habitat.config.default_structured_configs import (
CollisionsMeasurementConfig,
FogOfWarConfig,
TopDownMapMeasurementConfig,
)
from habitat.utils.visualizations import maps
from habitat.utils.visualizations.utils import images_to_video, observations_to_image
from model.stream_video_vln import StreamVLNForCausalLM
from utils.utils import dict_to_cuda
from utils.dist import *
from utils.utils import DEFAULT_IMAGE_TOKEN, IMAGE_TOKEN_INDEX, DEFAULT_MEMORY_TOKEN, MEMORY_TOKEN_INDEX
class VLNEvaluator:
"""视觉语言导航 (VLN) 评估器类,用于评估模型在 Habitat 环境中的导航性能"""
def __init__(self, config_path: str, split: str = "val_seen", env_num: int = 8, output_path: str = None, model: Any = None, tokenizer: Any = None, epoch: int = 0, args: argparse.Namespace = None):
self.args = args
self.device = torch.device('cuda')
self.split = split
self.env_num = env_num
self.save_video = args.save_video
self.output_path = output_path
self.epoch = epoch
self.config_path = config_path
self.config = get_habitat_config(config_path)
self.agent_config = get_agent_config(self.config.habitat.simulator)
self.sim_sensors_config = self.config.habitat.simulator.agents.main_agent.sim_sensors
with habitat.config.read_write(self.config):
self.config.habitat.dataset.split = self.split
self.config.habitat.task.measurements.update({
"top_down_map": TopDownMapMeasurementConfig(map_padding=3, map_resolution=1024, draw_source=True, draw_border=True, draw_shortest_path=True, draw_view_points=True, draw_goal_positions=True, draw_goal_aabbs=True, fog_of_war=FogOfWarConfig(draw=True, visibility_dist=5.0, fov=90)),
"collisions": CollisionsMeasurementConfig(),
})
print(f"config 类型 = {type(self.config)}")
print(OmegaConf.to_yaml(self.config))
self._camera_height = self.sim_sensors_config.rgb_sensor.position[1]
self._min_depth = self.sim_sensors_config.depth_sensor.min_depth
self._max_depth = self.sim_sensors_config.depth_sensor.max_depth
camera_fov_rad = np.deg2rad(self.sim_sensors_config.depth_sensor.hfov)
self._camera_fov = camera_fov_rad
self._fx = self._fy = self.sim_sensors_config.depth_sensor.width / (2 * np.tan(camera_fov_rad / 2))
self.image_processor = model.get_vision_tower().image_processor
self.model = model
self.tokenizer = tokenizer
prompt = f"<video>\nYou are an autonomous navigation assistant. Your task is to <instruction>. Devise an action sequence to follow the instruction using the four actions: TURN LEFT (←) or TURN RIGHT (→) by 15 degrees, MOVE FORWARD (↑) by 25 centimeters, or STOP."
self.conversation = [{"from": "human", "value": prompt}, {"from": "gpt", "value": "answer"}]
self.actions2idx = OrderedDict({'STOP': [0], "↑": [1], "←": [2], "→": [3]})
self.conjunctions = ['you can see ', 'in front of you is ', 'there is ', 'you can spot ', 'you are toward the ', 'ahead of you is ', 'in your sight is ']
self.num_frames = args.num_frames
self.num_future_steps = args.num_future_steps
self.num_history = args.num_history
def eval_action(self, idx) -> None:
pass
def eval():
global local_rank
parser = argparse.ArgumentParser()
parser.add_argument("--local_rank", default=0, type=int, help="本地进程排名")
parser.add_argument("--model_path", type=str, help="模型路径")
parser.add_argument("--habitat_config_path", type=str, default='config/vln_r2r.yaml', help="Habitat 配置文件路径")
parser.add_argument("--eval_split", type=str, default='val_unseen', help="评估数据集分割")
parser.add_argument("--output_path", type=str, default='./results/val_unseen/streamvln', help="结果输出路径")
parser.add_argument("--num_future_steps", type=int, default=4, help="未来步骤数")
parser.add_argument("--num_frames", type=int, default=32, help="每批处理的帧数")
parser.add_argument("--save_video", default=True, help="是否保存导航视频")
parser.add_argument("--num_history", type=int, default=8, help="历史帧数")
parser.add_argument("--model_max_length", type=int, default=4096, help="模型最大序列长度")
parser.add_argument('--world_size', default=1, type=int, help='分布式进程数')
parser.add_argument('--rank', default=0, type=int, help='进程排名')
parser.add_argument('--gpu', default=0, type=int, help='GPU 设备 ID')
parser.add_argument('--port', default='1111', help='分布式通信端口')
parser.add_argument('--dist_url', default='env://', help='分布式通信 URL')
parser.add_argument('--device', default='cuda', help='设备类型')
args = parser.parse_args()
init_distributed_mode(args)
local_rank = args.local_rank
tokenizer = transformers.AutoTokenizer.from_pretrained(args.model_path, model_max_length=args.model_max_length, padding_side="right")
config = transformers.AutoConfig.from_pretrained(args.model_path)
model = StreamVLNForCausalLM.from_pretrained(args.model_path, attn_implementation="eager", torch_dtype=torch.bfloat16, config=config, low_cpu_mem_usage=False)
model.model.num_history = args.num_history
model.requires_grad_(False)
model.to(local_rank)
evaluate(model, tokenizer, args)
def evaluate(model, tokenizer, args):
model.eval()
world_size = get_world_size()
model.reset(world_size)
evaluator = VLNEvaluator(config_path=args.habitat_config_path, split=args.eval_split, env_num=world_size, output_path=args.output_path, model=model, tokenizer=tokenizer, epoch=0, args=args)
sucs, spls, oss, ones, ep_num = evaluator.eval_action(get_rank())
result_all = {
"平均成功率": (sum(sucs_all)/len(sucs_all)).item(),
"平均 SPL": (sum(spls_all)/len(spls_all)).item(),
"平均 Oracle 成功率": (sum(oss_all)/len(oss_all)).item(),
"平均到目标距离": (sum(ones_all)/len(ones_all)).item(),
"总 episode 数": len(sucs_all)
}
print(result_all)
if get_rank() == 0:
with open(os.path.join(args.output_path, f'result.json'), 'a') as f:
f.write(json.dumps(result_all))
if __name__ == "__main__":
eval()