#!/usr/bin/env python
#
# pyLOM - Python Low Order Modeling.
#
# GPOD Module
#
# Last rev: 29/04/2025
from __future__ import print_function
import numpy as np
from ..utils.gpu import cp
from ..utils import cr_nvtx as cr, raiseError
from ..vmmath import temporal_mean, subtract_mean, matmul, vector_sum, least_squares, ridge_regresion
from ..POD import run, truncate
[docs]
class GappyPOD:
"""
This class implements the Gappy POD algorithm. Gappy POD model for reconstructing
incomplete data using POD basis modes.
Attributes:
centered (bool): If True, subtract the mean from inputs.
truncate (bool): If True, apply modal truncation based on a threshold.
truncation_param (float or int): Parameter controlling truncation.
reconstruction_type (callable): Function to compute POD coefficients.
ridge_lambda (float): Regularization weight for ridge reconstruction.
mean (np.ndarray or cp.ndarray): Mean vector computed during fitting.
U_truncated (np.ndarray or cp.ndarray): Truncated POD modes.
S_truncated (np.ndarray or cp.ndarray): Corresponding singular values.
U_truncated_scaled (np.ndarray or cp.ndarray or cp.ndarray): Modes scaled by singular values.
"""
[docs]
def __init__(
self,
centered=False,
apply_truncation=False,
truncation_param=-0.99,
reconstruction_method="standard",
ridge_lambda=0.01,
):
"""
Initialize a Gappy POD instance.
Args:
centered (bool): Whether to subtract the mean from data.
apply_truncation (bool): Whether to truncate POD modes.
truncation_param (float or int): Parameter controlling truncation (r):
If r >= 1, it is treated as the number of modes.
If r < 1 and r > 0 it is treated as the residual target.
If r < 1 and r < 0 it is treated as the fraction of cumulative energy to retain.
Note: must be in (0,-1] and r = -1 is valid
reconstruction_method (str):
"standard" for least squares.
"ridge" for ridge regression.
ridge_lambda (float): Regularization parameter for ridge solver.
Raises:
ValueError: If `reconstruction_method` is not "standard" or "ridge".
"""
# Validate reconstruction method
if not reconstruction_method.lower() in ["standard", "ridge"]:
raiseError("Reconstruction method must be either 'standard' or 'ridge'.")
self.centered = centered
self.truncate = apply_truncation
self.truncation_param = truncation_param
self.reconstruction_type = least_squares if reconstruction_method.lower() == "standard" else lambda a,b : ridge_regresion(a,b,ridge_lambda)
# Attributes to store fitted data
self.mean = None
self.U_truncated_scaled = None
[docs]
@cr('GPOD.fit')
def fit(self, snapshot_matrix: np.ndarray, **kwargs) -> None:
"""
Fit the Gappy POD model using a complete snapshot matrix.
Computes the POD basis (via SVD) of the input data, optionally centers
the data and truncates modes.
Args:
snapshot_matrix (np.ndarray or cp.ndarray):
Array of shape (n_features, n_samples) containing training snapshots.
**kwargs: Additional keyword arguments forwarded to the POD `run` function.
Returns:
None
"""
cnp = cp if type(snapshot_matrix) is cp.ndarray else np
# Center data if required
self.mean = temporal_mean(snapshot_matrix) if self.centered else cnp.zeros(snapshot_matrix.shape[0])
X = subtract_mean(snapshot_matrix,self.mean)
# Compute SVD through POD this way we can reuse some nice features
self.U_truncated, self.S_truncated, VT = run(X, remove_mean=False, **kwargs)
# Apply truncation if specified
if self.truncate:
self.U_truncated, self.S_truncated, _ = truncate(self.U_truncated,self.S_truncated,VT,self.truncation_param)
# Masked and scaled POD modes
self.U_truncated_scaled = self.U_truncated * self.S_truncated
[docs]
@cr('GPOD.predict')
def predict(self, gappy_vector: np.ndarray) -> np.ndarray:
"""
Reconstruct a single vector with missing entries.
Uses the fitted POD basis to estimate missing values in the input vector.
Args:
gappy_vector (np.ndarray or cp.ndarray):
Input vector of length n_features where missing entries are zeroed.
Returns:
np.ndarray or cp.ndarray:
Full reconstructed vector of length n_features.
Raises:
RuntimeError: If the model has not been fitted prior to prediction.
"""
if self.U_truncated_scaled is None:
raiseError("The model must be fitted before calling predict.")
# Prepare the masked input
mask = (gappy_vector != 0) # Binary mask for observed data
gappy_input = gappy_vector - self.mean*mask
PT_U = mask[:,None]*self.U_truncated_scaled
# Solve for coefficients
coef = self.reconstruction_type(PT_U,gappy_input)
# Reconstruct missing data
vector_reconstructed = matmul(self.U_truncated_scaled,coef[:,None]).flatten() + self.mean
return vector_reconstructed
[docs]
@cr('GPOD.reconstruct')
def reconstruct_full_set(
self, incomplete_snapshot: np.ndarray, iter_num: int
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Iteratively reconstruct an entire snapshot matrix with missing data.
Applies the Gappy POD algorithm for a specified number of iterations,
updating missing entries based on previous reconstructions.
Args:
incomplete_snapshot (np.ndarray or cp.ndarray):
Array of shape (n_features, n_samples) with zeros indicating missing values.
iter_num (int): Number of iterations to perform.
Returns:
tuple:
- reconstructed (np.ndarray or cp.ndarray):
Reconstructed snapshot matrix of same shape as input.
- eig_spec_iter (np.ndarray or cp.ndarray):
Eigenvalue spectrum (normalized squared singular values)
at each iteration, shape (n_samples, iter_num).
- cumulative_energy (np.ndarray or cp.ndarray):
Cumulative energy content of modes per iteration,
shape (n_modes, iter_num).
"""
cnp = cp if type(incomplete_snapshot) is cp.ndarray else np
# Step 1: Create mask for missing values
mask_incomplete = (incomplete_snapshot != 0).astype(int)
# Step 2: Compute row-wise mean for non-missing values
g_i_mean = cnp.array([vector_sum(incomplete_snapshot[i, :]) / vector_sum(mask_incomplete[i, :].astype(incomplete_snapshot.dtype))
for i in range(incomplete_snapshot.shape[0])])
# Initialize the reconstructed snapshot matrix
h_recons = incomplete_snapshot.copy()
for i in range(h_recons.shape[0]):
h_recons[i, mask_incomplete[i] == 0] = g_i_mean[i]
# Step 3: Initialize arrays to store results
eig_spec_iter = cnp.empty((h_recons.shape[1], iter_num))
c_e = cnp.empty((h_recons.shape[1], iter_num))
# Iterative reconstruction
for k in range(iter_num):
# Fit the model with the current reconstructed snapshot matrix
self.fit(h_recons)
# Reconstruct all snapshots
snapshots_recons = cnp.empty(h_recons.shape)
for i in range(h_recons.shape[1]):
gappy_vector = cnp.copy(incomplete_snapshot[:, i])
snapshots_recons[:, i] = self.predict(gappy_vector)
# Update the reconstruction matrix
h_recons = incomplete_snapshot.copy()
for j in range(h_recons.shape[1]):
mask = mask_incomplete[:, j] == 0
h_recons[mask, j] = snapshots_recons[mask, j]
# Compute the eigenvalue spectrum and cumulative energy
_, S, _ = run(h_recons, remove_mean=True)
eig_spec_iter[:, k] = S**2/ vector_sum(S**2,0)
normS = vector_sum(S,0)
cumulative = 0
for ii in range(S.shape[0]):
cumulative += S[ii]
c_e[ii, k] = cumulative / normS
return h_recons, eig_spec_iter, c_e