Semana 37: Conducción end-to-end con PilotNet y generación del dataset de control
Se propuso como objetivo para esta semana el crear un nuevo control del vehículo a parte del PID, en específico un control End to End con un modelo de Red Neuronal convolucional tipo PilotNet con imágenes de segmentación semántica. Por lo que se ha investigado basado en la información proporcionado por Robotics Academy.
Generación del dataset de control con PID
Para generar el dataset se ha decidido usar:
- Un control PID como conductor experto.
- El control manual para provocar salidas del carril.
Debido a que se guardaban los datos de ambos controles en el .csv generado, aparecían errores en los modelos entrenados:
- El coche se salía del camino.
- No realizaba bien las curvas.
Gracias a la ayuda de José María, el error quedó claro. Al guardar los datos del control manual, se entrena al modelo con datos incorrectos no deseados y no se consigue coherencia en la conducción. A si que, para evitar eso simplemente había que añadir una cosa al código para que solo guardase los datos con el control PID:
if not manual_mode:
f.write(f"{rel_path},{last_v},{last_w}\n")
Los datos que se guardan en el .csv son:
- La dirección de la imágen correspondiente.
- El throttle correspondiente a esa imágen.
- El steer correspondiente a la imágen.
Por ejemplo:
image,v,w
segmentation/segmentation_0005.png,0.63,-0.12
segmentation/segmentation_0006.png,0.61,-0.10
Mejora del controlador PID.
Durante la realización de este entrenamiento del modelo también se mejoró el controlador PID anteriormente hecho, ya que se observaban movimientos bruscos en el giro, el throttle no se adaptaba del todo bien al ángulo de giro y el término integral podía crecer demasiado (windup). Para solucionar los problemas se llevo a cabo:
Suavizado del término derivativo
Se implemento un filtro exponencial (EMA) en la derivada:
raw_derivative = (error - previous_error) / max(dt, 1e-3)
derivative = ALPHA_D * derivative + (1 - ALPHA_D) * raw_derivative
Lo que suaviza el ruido y se eliminan las oscilaciones anteriormente producidas.
Anti-windup en el término integral
Para evitar un crecimiento descontrolado, simplemente se pusieron límites a la integral:
integral += error * dt
integral = np.clip(integral, -I_MAX, I_MAX)
Lo que evita que la sobrecorrección en las curvas y haga transiciones más naturales.
Valor de steer
steer_cmd = Kp * error + Ki * integral + Kd * derivative
steer_cmd = np.clip(steer_cmd, -1.0, 1.0)
Control de la velocidad del vehículo
Las velocidades eran muy fijas, por lo que en las curvas podía ir edemasiado rápido, por lo que se cambio el funcionamiento del throttle para que dependiese de una velocidad objetivo basada en el ángulo de giro y en unas velocidades máximas y mínimas.
target_speed = v_max - 5.0 * abs(steer_cmd)
target_speed = np.clip(target_speed, v_min, v_max)
speed_error = target_speed - speed
throttle = K_v * speed_error
throttle = np.clip(throttle, 0.0, 1.0)
Obtención del modelo
Balanceo
Antes del entrenamiento, el dataset estaba muy desbalanceado, ya que predominaban claramente las rectas frente a las curvas, lo que suponía un problema para que el modelo girase correctamente. Entonces fue necesario añadir un balanceo:
df = pd.read_csv("dataset/control.csv")
bins = np.linspace(-1, 1, 21)
df["bin"] = np.digitize(df["w"], bins)
TARGET_TOTAL = 40000
unique_bins = df["bin"].unique()
target_per_bin = TARGET_TOTAL // len(unique_bins)
balanced = []
for b in unique_bins:
group = df[df["bin"] == b]
if len(group) == 0:
continue
if len(group) < target_per_bin:
group = group.sample(target_per_bin, replace=True, random_state=42)
else:
group = group.sample(target_per_bin, replace=False, random_state=42)
balanced.append(group)
df_balanced = pd.concat(balanced).sample(frac=1).reset_index(drop=True)
df_balanced.to_csv("dataset/control_balanced.csv", index=False)
El balanceo se encarga de dividir el rango de giro en 20 intervalos en -1 y 1, después igualar el número de muestras de cada intervalo al objetivo por intervalo, duplicando filas en caso de haber pocas muestras (upsampling) o reduciendo en caso de haber demasiadas (downsampling). Así se obtiene el csv con 40.000 muestras donde no predominan las rectas sobre lo demás.
Arquitectura PilotNet
Como se dijo anteriormente se ha pensado en usar la arquitectura PilotNet para el aprendizaje PilotNet, pero adaptado para las salidas:
- v: control del throttle
- w: control del steer.
class PilotNetVW(nn.Module):
def __init__(self, input_shape=(3, 66, 200)):
super().__init__()
# Bloque convolucional tipo PilotNet
self.conv1 = nn.Conv2d(3, 24, kernel_size=5, stride=2)
self.conv2 = nn.Conv2d(24, 36, kernel_size=5, stride=2)
self.conv3 = nn.Conv2d(36, 48, kernel_size=5, stride=2)
self.conv4 = nn.Conv2d(48, 64, kernel_size=3, stride=1)
self.conv5 = nn.Conv2d(64, 64, kernel_size=3, stride=1)
self.flatten = nn.Flatten()
# Cálculo automático del tamaño de la parte densa
with torch.no_grad():
dummy = torch.zeros(1, *input_shape)
x = self._forward_conv(dummy)
n_flat = x.shape[1]
# Cabezal fully-connected
self.fc1 = nn.Linear(n_flat, 100)
self.fc2 = nn.Linear(100, 50)
self.fc3 = nn.Linear(50, 10)
self.fc_out = nn.Linear(10, 2) # [v, w]
Entrenamiento
Primero se carga el csv, abre la imagen correspondiente y puede llegar a aplicar un flip horizontal con una probabilidad del 50% para añadir variedad a algunos giros más excasos.
class CarDataset(Dataset):
def __init__(self, csv_paths, root_dir="",, transform=None):
"""
csv_paths: lista de rutas de csv
root_dir: carpeta raíz para las imágenes
"""
dfs = [pd.read_csv(os.path.join(root_dir, p)) for p in csv_paths]
self.data = pd.concat(dfs, ignore_index=True)
self.root_dir = root_dir
self.transform = transform
self.hflip = T.RandomHorizontalFlip(p=0.5)
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
row = self.data.iloc[idx]
img_path = os.path.join(self.root_dir, row["image"])
image = Image.open(img_path).convert("RGB")
v = float(row["v"])
w = float(row["w"])
image, flipped = self.apply_hflip(image)
if flipped:
w = -w # si volteamos horizontalmente, invertimos el giro
if self.transform:
image = self.transform(image)
target = torch.tensor([v, w], dtype=torch.float32)
return image, target
def apply_hflip(self, img):
if random.random() < 0.5:
return T.functional.hflip(img), True
return img, False
Los parámetros principales son:
BATCH_SIZE = 128
NUM_EPOCHS = 100
LEARNING_RATE = 1e-4
VAL_SPLIT = 0.2
PATIENCE = 10
MIN_DELTA = 5e-4
Donde se elige el número de muestras procesadas a la vez,el número de epocas, la tasa de aprendizaje, el porcentaje de muestras colocadas en validación, la paciencia y el delta mínimo.
Finalmente se procede a realizar el entrenamiento y la validación con una parada temprana en caso de que no haya mejoras visibles:
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=4, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=4, pin_memory=True)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Usando dispositivo: {device}")
model = PilotNetVW(input_shape=(3, 66, 200)).to(device)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)
# Scheduler: baja LR si la val_loss se estanca
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer, mode="min", factor=0.5, patience=3, verbose=True
)
best_val_loss = float("inf")
patience_counter = 0
for epoch in range(NUM_EPOCHS):
# ---- Entrenamiento ----
model.train()
running_loss = 0.0
progress = tqdm(train_loader, desc=f"Epoch {epoch+1}/{NUM_EPOCHS}", ncols=100)
for images, targets in progress:
images = images.to(device)
targets = targets.to(device)
optimizer.zero_grad()
outputs = model(images) # (batch, 2)
loss = criterion(outputs, targets)
loss.backward()
optimizer.step()
running_loss += loss.item() * images.size(0)
progress.set_postfix({"loss": loss.item()})
train_loss = running_loss / n_train
# ---- Validación ----
model.eval()
val_loss = 0.0
with torch.no_grad():
for images, targets in val_loader:
images = images.to(device)
targets = targets.to(device)
outputs = model(images)
loss = criterion(outputs, targets)
val_loss += loss.item() * images.size(0)
val_loss /= n_val
writer.add_scalar("Loss/train", train_loss, epoch)
writer.add_scalar("Loss/val", val_loss, epoch)
# ---- Early stopping + guardar mejor modelo ----
if best_val_loss - val_loss > MIN_DELTA:
best_val_loss = val_loss
patience_counter = 0
torch.save(model.state_dict(), MODEL_PATH)
print(f" -> Nuevo mejor modelo guardado (val_loss={best_val_loss:.6f})")
else:
patience_counter += 1
print(f" -> Sin mejora (paciencia {patience_counter}/{PATIENCE})")
if patience_counter >= PATIENCE:
print("Early stopping activado.")
break
Inferencia
Se carga el modelo como recomendaba Robotics Academy
from model import model_path
import onnxruntime
import numpy as np
import cv2
onnxruntime.preload_dlls()
session = onnxruntime.InferenceSession(
model_path,
providers=["CUDAExecutionProvider"]
)
input_name = session.get_inputs()[0].name
output_name = session.get_outputs()[0].name
Después se preprocesa la imágen obtenida en CARLA para usar imágenes como las del entrenamiento, es decir, una imagen RGB 66x200 normalizada en [-1 , 1]. Y se obtiene las predicciones del módelo:
model_input = preprocess(last_seg_image)
v_cmd, w_cmd = session.run([output_name], {input_name: model_input})[0][0]
Se obtiene la velocidad objetivo según el throttle y unos límites preestablecidos, y asi saber cuando puede acelerar o cuando debe parar.
# Asegurar rangos
v_cmd = float(np.clip(v_cmd, 0.0, 1.0))
w_cmd = float(np.clip(w_cmd, -1.0, 1.0))
# Convertir a velocidad objetivo real
v_min, v_max = 2.0, 8.0
target_speed = v_min + v_cmd * (v_max - v_min)
K_v = 0.5
speed_error = target_speed - speed
throttle = K_v * speed_error
throttle = float(np.clip(throttle, 0.0, 1.0))
brake = 0.0
# Seguridad si se pasa demasiado de la velocidad objetivo
if speed > v_max + 1.0:
throttle = 0.0
control.throttle = throttle
control.steer = w_cmd
vehicle.apply_control(control)