Semana 5 - Visor 3D en tiempo real con Carla, Exploración de PCL
Esta semana me he dedicado a mejorar el visor para la visualización de muestras LiDAR en tiempo real provenientes de un sensor LiDAR simulado en Carla. Para ello he continuado desarrollando con Open3D para el manejo y visualización eficiente de las nubes de puntos. Tuve algunos problemas con la visualización ya que el renderizador colapsaba al pasarle las muestras directamente, pero conseguí implementar una solución sencilla para asegurar que los datos de entrada lleguen antes de la renderización. También realicé un buffer FIFO para para manejar correctamente la entrada de datos al renderizador, pero me pareció mas eficiente y sencilla la primera propuesta.
Actualmente solo se trata de un script, la próxima semana me encargaré de empaquetar la primera versión del visor con todas las funcionalidades que he ido construyendo para las distintas fuentes de datos.
Visor 3D en tiempo real
Código: Funciones Principales
1. Función lidar_callback()
Esta función procesa los datos LiDAR obtenidos desde el simulador Carla y los convierte en un objeto PointCloud de la libreria Open3D. Se procesan los datos de Carla con Numpy y se ordenan en un array bidimensional. Se le aplica un mapa de color de Matplotly. Es la funcion que se ejecutará asíncronamente cada vez que el LiDAR tome una muestra, modificando el valor de la nube de puntos a renderizar.
def lidar_callback(lidar_data, point_cloud):
'''
Procesa los datos de Carla
cada vez que se toma una muestra con .listen (callback)
y actualiza la nube de puntos en el objeto PointCloud de open3D
'''
data = np.copy(np.frombuffer(lidar_data.raw_data, dtype=np.dtype('f4')))
data = np.reshape(data, (int(data.shape[0] / 4), 4))
intensity = data[:, -1]
int_color = np.c_[
np.interp(intensity, VID_RANGE, VIRIDIS[:, 0]),
np.interp(intensity, VID_RANGE, VIRIDIS[:, 1]),
np.interp(intensity, VID_RANGE, VIRIDIS[:, 2])]
# Convertir la nube de puntos al formato de Open3D
point_cloud.points = o3d.utility.Vector3dVector(data[:, :-1])
point_cloud.colors = o3d.utility.Vector3dVector(int_color)
2. Función spawn_vehicle_lidar()
Esta función recibe como parámetros el mundo importado del servidor de Carla, la blueprint_library
para importar los actores disponibles, el traffic_manager
para gestionar las opciones de tráfico y el parámetro temporal delta
para sincronizar el visualizador con el simulador
Se escoge un punto de spawn aleatorio dentro de los posibles, se crea un vehículo y se le adjunta un sensor LiDAR. El sensor LiDAR queda ajustado con los parametros adecuados para que la visualización sea precisa y coordinada. Para ello se ajusta el parámetro rotation_frequency
en función de delta
. De esta manera se consigue que la rotación esté sincronizada al tiempo de simulación y se obtiene una visualización mucho más limpia. Finalmente se establece el modo de conducción automática al vehiculo
def spawn_vehicle_lidar(world, bp, traffic_manager, delta):
vehicle_bp = bp.filter('vehicle.*')[0]
spawn_points = world.get_map().get_spawn_points()
# Elegir un punto de spawn aleatorio
spawn_point = random.choice(spawn_points)
# Crear el vehículo en el punto aleatorio
vehicle = world.spawn_actor(vehicle_bp, spawn_point)
lidar_bp = bp.find('sensor.lidar.ray_cast')
lidar_bp.set_attribute('range', '100')
lidar_bp.set_attribute('rotation_frequency', str(1 / delta))
lidar_bp.set_attribute('channels', '64')
lidar_bp.set_attribute('points_per_second', '500000')
lidar_position = carla.Transform(carla.Location(x=-0.5, z=1.8))
lidar = world.spawn_actor(lidar_bp, lidar_position, attach_to=vehicle)
# Conectar el vehículo al Traffic Manager y activar el autopiloto
vehicle.set_autopilot(True, traffic_manager.get_port())
return vehicle, lidar
3. Programa principal main()
- Conexión a CARLA: Establece la conexión con el servidor de CARLA y recupera el mundo y los blueprints
- Configuración del Traffic Manager: Configura el Traffic Manager para manejar el tráfico en modo sincrónico y establece una distancia de seguridad entre los vehículos
- Configuración del mundo: Establece el mundo en modo sincrónico y configura un valor de
delta
de 0.05 segundos para los pasos de simulaciónticks
- Creación de actores: Llama a la función
spawn_vehicle_lidar()
para generar un vehículo y un sensor LiDAR, los cuales se añaden a la lista de actores. - Visualización de nube de puntos: Inicializa una nube de puntos en Open3D y utiliza una función de retrollamada (callback) a través del método
lidar.listen()
para recibir y procesar continuamente los datos de LiDAR en tiempo real - Bucle de renderizado: Entra en un bucle que actualiza la geometría de la nube de puntos en el visualizador y renderiza los datos hasta que se cierre la ventana. Espera 5 frames a comenzar a renderizar para que el renderizador no colapse
- Finalización: Al salir del bucle, limpia los actores creados para liberar los recursos
def main():
# Conectarse al servidor de CARLA
client = carla.Client('localhost', 2000)
client.set_timeout(10.0)
# Obtener el mundo y la biblioteca de planos
world = client.get_world()
blueprint_library = world.get_blueprint_library()
# Configurar el Traffic Manager
traffic_manager = client.get_trafficmanager(8000)
traffic_manager.set_synchronous_mode(True)
traffic_manager.set_global_distance_to_leading_vehicle(2.5)
# Configuración del mundo (modo sincrónico y delta fijo)
settings = world.get_settings()
delta = 0.05
settings.fixed_delta_seconds = delta
settings.synchronous_mode = True
world.apply_settings(settings)
# Crear el vehículo y LiDAR
global actor_list
vehicle, lidar = spawn_vehicle_lidar(world, blueprint_library, traffic_manager, delta)
actor_list.append(vehicle)
actor_list.append(lidar)
# Inicializar la nube de puntos y escuchar datos LiDAR
point_cloud = o3d.geometry.PointCloud()
lidar.listen(lambda data: lidar_callback(data, point_cloud))
# Configurar el visualizador de Open3D
viz = o3d.visualization.Visualizer()
viz.create_window(window_name='Lidar simulado en Carla', width=960, height=540, left=480, top=270)
viz.get_render_option().background_color = [0.05, 0.05, 0.05]
viz.get_render_option().point_size = 1.35
viz.get_render_option().show_coordinate_frame = True
add_open3d_axis(viz)
# Bucle principal: actualizar la geometría y renderizar la nube de puntos
frame = 0
dt0 = datetime.now()
lidar_data_received = False
while True:
if frame == 5 and not lidar_data_received:
viz.add_geometry(point_cloud)
lidar_data_received = True
viz.update_geometry(point_cloud)
viz.poll_events()
viz.update_renderer()
time.sleep(0.005)
world.tick()
# Calcular FPS
process_time = datetime.now() - dt0
if process_time.total_seconds() > 0:
fps = 1.0 / process_time.total_seconds()
sys.stdout.write(f'\rFPS: {fps:.2f} ')
sys.stdout.flush()
dt0 = datetime.now()
frame += 1
# Salir si no hay eventos
if not viz.poll_events():
break
cleanup() # Limpiar los actores al final