Source code for pyLOM.RL.evaluations

import os
import random

import aerosandbox as asb
import numpy as np
from aerosandbox import _asb_root

from pyLOM.utils import pprint
from pyLOM.RL import NON_CONVERGED_REWARD
from pyLOM.utils.mpi import MPI_RANK, MPI_SIZE, MPI_Status, MPI_ANY_TAG, MPI_ANY_SOURCE, mpi_send, mpi_recv
import pyLOM

airfoil_database_root = _asb_root / "geometry" / "airfoil" / "airfoil_database"

[docs] def run_episode(rl_model, env, initial_shape=None, seed=None, keep_unconverged=False): """ Runs a single episode of the RL agent in the given environment. Args: rl_model: The RL model to evaluate. env: The environment in which to run the episode. initial_shape: An initial airfoil shape to start the episode with. Default: ``None``, which means a random shape will be generated. seed: A seed for reproducibility. Default: ``None``. keep_unconverged: If True, keeps the last state and reward even if the episode was truncated. Default: ``False``. Returns: rewards: A list of cumulative rewards at each step. states: A list of airfoil shapes at each step. """ done = False truncated = False states = [] rewards = [] # Set the seed for reproducibility if seed is not None: np.random.seed(seed) env.action_space.seed(seed) reset_options = {"initial_shape": initial_shape} if initial_shape is not None else {} obs, info = env.reset(seed=seed, options=reset_options) initial_reward = info["initial_reward"] rewards.append(initial_reward) states.append(obs) while not done and not truncated: action, _ = rl_model.predict(obs, deterministic=True) obs, reward, done, truncated, info = env.step(action) rewards.append(rewards[-1] + reward) states.append(obs) if truncated and not keep_unconverged: # If the episode was truncated, remove the last state and reward as they are not valid states.pop() rewards.pop() return rewards, states
[docs] def evaluate_airfoil_agent(agent, env, num_episodes=200, save_path=None): """ Evaluates an RL agent on a set of airfoils from the UIUC airfoil database and prints a summary of the results. Args: agent: The RL agent to evaluate. env: The environment in which to run the episodes. num_episodes: The number of airfoils to evaluate. Default: 200. save_path: If provided, saves the results to a CSV file at this path. Default: ``None``. Returns: all_rewards: A list with the lists of cumulative rewards for each airfoil optimization. states: A list with lists of airfoil shapes for each step of the optimization of each airfoil. """ all_rewards, states = [], [] airfoil_list = random.sample(os.listdir(airfoil_database_root), num_episodes) if "utils" in airfoil_list: airfoil_list.remove("utils") for airfoil_name in airfoil_list: try: initial_airfoil = asb.Airfoil(airfoil_name) except: # noqa: E722 continue rewards, airfoils = run_episode(agent, env, initial_shape=initial_airfoil) all_rewards.append(rewards) states.append(airfoils) print_metric_summary(all_rewards, states,) if save_path is not None: save_results_to_csv(all_rewards, states, save_path) return all_rewards, states
[docs] def evaluate_airfoil_agent_whole_uiuc(agent, env, save_path=None): """ Evaluates an RL agent on all airfoils in the UIUC airfoil database and prints a summary of the results. Args: agent: The RL agent to evaluate. env: The environment in which to run the episodes. save_path: If provided, saves the results to a CSV file at this path. Default: ``None``. Returns: all_rewards: A list with the lists of cumulative rewards for each airfoil optimization. states: A list with lists of airfoil shapes for each step of the optimization of each airfoil. """ return evaluate_airfoil_agent( agent, env, num_episodes=len(os.listdir(airfoil_database_root)), save_path=save_path )
[docs] def evaluate_airfoil_agent_whole_uiuc_mpi(agent, env, save_results_path): """ Evaluates an RL agent on all airfoils in the UIUC airfoil database using MPI for parallel processing and saves the results to a CSV file. If this function is used on a script, it should be run with `mpiexec -n <num_processes> python <script_name>.py`. Args: agent: The RL agent to evaluate. env: The environment in which to run the episodes. save_results_path: If provided, saves the results to a CSV file at this path. """ rank = MPI_RANK size = MPI_SIZE active_workers = size - 1 # Number of workers currently active if rank == 0: # Master process airfoil_names = os.listdir(airfoil_database_root) if "utils" in airfoil_names: airfoil_names.remove("utils") results = [] # Send initial work to each worker for i in range(1, size): if airfoil_names: airfoil_name = airfoil_names.pop() mpi_send(airfoil_name, dest=i, tag=1) # Receive results and distribute remaining work while active_workers > 0: #i < airfoil_count: status = MPI_Status() result = mpi_recv(source=MPI_ANY_SOURCE, tag=2, status=status) worker_rank = status.source if result[0] is not None and result[1] is not None: results.append(result[:2]) if airfoil_names: airfoil_name = airfoil_names.pop() mpi_send(airfoil_name, dest=worker_rank, tag=1) else: mpi_send(None, dest=worker_rank, tag=0) # Signal no more work active_workers -= 1 # Finalize results rewards = [res[0] for res in results] states = [res[1] for res in results] print_metric_summary(rewards, states) if save_results_path is not None: save_results_to_csv(rewards, states, save_results_path) else: # Worker process while True: airfoil_name = mpi_recv(source=0, tag=MPI_ANY_TAG, status=MPI_Status()) if airfoil_name is None: # No more work break try: initial_airfoil = asb.Airfoil(airfoil_name) rewards, airfoils = run_episode(agent, env, initial_shape=initial_airfoil) data = (rewards, airfoils, airfoil_name) except Exception as e: pprint(rank, f"Error processing airfoil {airfoil_name}: {e}") data = (None, None, airfoil_name) # Send None for rewards and airfoils if there's an error mpi_send(data, dest=0, tag=2) # Send results back to master
def extract_metrics(rewards, states): initial_rewards = np.array([reward[0] for reward in rewards]) final_rewards = np.array([reward[-1] for reward in rewards]) best_rewards = np.array([max(reward) for reward in rewards]) initial_states = [state[0] for state in states] best_states_idx = np.array([np.argmax(reward) for reward in rewards]) best_states = [states[i][idx] for i, idx in enumerate(best_states_idx)] return initial_rewards, final_rewards, best_rewards, initial_states, best_states def save_results_to_csv(rewards, states, save_results_path): initial_rewards, _, best_rewards, initial_states, best_states = extract_metrics(rewards, states) columns_names = ["initial_param_" + str(i) for i in range(initial_states[0].shape[0])] columns_names += ["best_param_" + str(i) for i in range(best_states[0].shape[0])] columns_names += ["initial_CLCD", "best_CLCD"] combined_data = np.column_stack((initial_states, best_states, initial_rewards, best_rewards)) np.savetxt(save_results_path, combined_data, delimiter=',', header=','.join(columns_names), comments='', fmt='%.6f') def print_metric_summary(rewards, states): initial_rewards, _, best_rewards, _, _ = extract_metrics(rewards, states) # remove states that didn't converge converged_mask = (initial_rewards > NON_CONVERGED_REWARD) & (best_rewards > NON_CONVERGED_REWARD) & (initial_rewards != best_rewards) initial_rewards = initial_rewards[converged_mask] best_rewards = best_rewards[converged_mask] q75, q25 = np.percentile(best_rewards, [75 ,25]) pprint(0, f"Number of airfoils converged: {len(best_rewards)}") pprint(0, f"Best CL/CD (median(IQR)): {round(np.median(best_rewards))} ({round(q75 - q25)})") pprint(0, f"Best CL/CD increment (mean +/- std): {(best_rewards - initial_rewards).mean():.1f}+/-{(best_rewards - initial_rewards).std():.1f}")