Source code for pyLOM.dataset

#!/usr/bin/env python
#
# pyLOM, dataset.
#
# Dataset class, reader and reduction routines.
#
# Last rev: 30/07/2021
from __future__ import print_function, division

import os, numpy as np

from .partition_table import PartitionTable
from .                import inp_out as io
from .utils           import cr_nvtx as cr, raiseError, gpu_to_cpu, cpu_to_gpu, pprint, mpi_gather, MPI_RANK, MPI_SIZE
from .vmmath          import data_splitting, find_random_sensors


[docs] class Dataset(object): ''' The Dataset class wraps the position of the nodes and the time instants with the number of variables and relates them so that the operations in parallel are easier. '''
[docs] def __init__(self, xyz=None, ptable=None, vars=None, order=None, point=True, **kwargs): ''' Class constructor Inputs: > xyz: coordinates of the points. > ptable: partition table used. > vars: dictionary containing the variable name and values as as a python dictionary. > order: ordering of the points (automatically created if none) > point: True if point data, False if cell data. > kwags: dictionary containing the field name and values as a python dictionary. ''' self._xyz = xyz self._vardict = vars self._fieldict = kwargs self._ptable = ptable self._order = np.arange(xyz.shape[0]) if order is None else order self._point = point
def __len__(self): return self._xyz.shape[0]
[docs] def __str__(self): ''' String representation ''' s = 'Dataset of %d variables:\n' % len(self.varnames) for key in self.varnames: var = self.vars[key]['value'] nanstr = ' (has NaNs) ' if np.any(np.isnan(var)) else ' ' s += ' > ' + key + nanstr + ' - max = ' + str(np.nanmax(var)) + ', min = ' + str(np.nanmin(var)) + '\n' s += 'and %d fields with %d points:\n' % (len(self.fieldnames),len(self)) for key in self.fieldnames: field = self.fields[key]['value'] nanstr = ' (has NaNs) ' if np.any(np.isnan(field)) else ' ' s += ' > ' + key + nanstr + '- max = ' + str(np.nanmax(field)) \ + ', min = ' + str(np.nanmin(field)) \ + ', avg = ' + str(np.nanmean(field)) \ + '\n' return s
# Set and get functions
[docs] def __getitem__(self,key): ''' Dataset[key] Recover the value of a field given its key ''' return self._fieldict[key]['value']
[docs] def __setitem__(self,key,value): ''' Dataset[key] = value Set the field of a variable given its key ''' self._fieldict[key]['value'] = value
# Functions
[docs] def rename(self,new,old): ''' Rename a variable inside a field. ''' self.fields[new] = self.fields.pop(old) return self
[docs] def delete(self,varname): ''' Delete a variable inside a field. ''' return self.fields.pop(varname)
[docs] def get_variable(self,key:str): r''' Recover the value of a variable given its key Args: key (str): name of the variable Returns: (np.ndarray): value of the variable ''' return self._vardict[key]['value']
[docs] def set_variable(self,key,value): ''' Recover the value of a variable given its key ''' self._vardict[key]['value'] = value
[docs] def get_dim(self,var,idim): ''' Recover the value of a variable for a given dimension ''' ndim = self._fieldict[var]['ndim'] if idim >= ndim: raiseError(f'Requested dimension {idim} for {var} greater than its number of dimensions {ndim}!') print(len(self)) return np.ascontiguousarray(self._fieldict[var]['value'][idim:ndim*len(self):ndim])
[docs] def info(self,var): ''' Returns the information for a certain variable ''' return {'point':self._point,'ndim':self._fieldict[var]['ndim']}
[docs] def to_gpu(self,fields=None): ''' Send field data to the GPU ''' fields = fields if not fields is None else self.fieldnames for key in fields: self._fieldict[key]['value'] = cpu_to_gpu(self._fieldict[key]['value']) return self
[docs] def to_cpu(self,fields=None): ''' Send field data to the CPU ''' fields = fields if not fields is None else self.fieldnames for key in fields: self._fieldict[key]['value'] = gpu_to_cpu(self._fieldict[key]['value']) return self
[docs] def add_field(self,varname,ndim,var): ''' Add a field to the dataset ''' self._fieldict[varname] = { 'ndim' : ndim, 'value' : var, }
[docs] def add_variable(self,varname,idim,var): ''' Add a variable to the dataset ''' self._vardict[varname] = { 'idim' : idim, 'value' : var, }
[docs] def split_data(self,var,mode='reconstruct'): r''' Generate random training, validation and test masks for a dataset of Nt samples. Args: variable (str): variable which will be splitted in different samples mode (str, optional): type of splitting to perform (default, ``'reconstruct'``). In reconstruct mode all three datasets have samples along all the data range. Returns: [(np.ndarray), (np.ndarray), (np.ndarray)]: List of arrays containing the identifiers of the training, validation and test samples. ''' N = len(self.vars[var]["value"]) idim = self.vars[var]["idim"] trid, vaid, teid = data_splitting(N, mode) self.add_variable('training_%s'%var,idim,trid) self.add_variable('validation_%s'%var,idim,vaid) self.add_variable('test_%s'%var,idim,teid) return trid, vaid, teid
[docs] def mask_field(self, key, mask): ''' Mask a field over a defined variable ''' mask = mask if mask is not str else self.get_variable(mask) return self[key][:,mask]
[docs] def append_variable(self,varname,var,**fieldict): ''' Appends new timesteps to the dataset ''' # Add to variable vector self.vars[varname]['value'] = np.concatenate((self.vars[varname]['value'],var)) # Sort ascendingly and retrieve sorting index idx = np.argsort(self.vars[varname]['value']) self.vars[varname]['value'] = self.vars[varname]['value'][idx] idim = self.vars[varname]['idim'] # Now concatenate and sort per variable for v in fieldict: aux = np.concatenate((self[v][:,:,idim],fieldict[v]),axis=1)[:,idx] self[v][:,:,idim] = aux
[docs] def select_random_sensors(self, nsensors, bounds, VARLIST, seed=-1): ''' Generates a set of coordinats of nsensors random sensors inside the region defined by bounds. Then for each sensor finds the nearest point from the dataset to get its coordinates and dataset value. It creates a new dataset containing all the sensor coordinates and values ''' np.random.seed(0) if seed == -1 else np.random.seed(seed) mysensors = find_random_sensors(bounds, self.xyz, nsensors) # Initialize new dataset myNsensors = len(mysensors) time = self.get_variable('time') nparts = MPI_SIZE ids = np.arange(1,nparts+1,dtype=np.int32) points = mpi_gather(myNsensors, all=True) if MPI_SIZE > 1 else np.array([myNsensors]) elements = np.zeros((MPI_SIZE,), dtype=int) ptable = PartitionTable(nparts, ids, elements, points, has_master=False) sp, ep = ptable.partition_bounds(MPI_RANK) order = np.linspace(start=sp, stop=ep-1, num=ep-sp, dtype=int) sd = self.__class__(xyz=self.xyz[mysensors], ptable=ptable, order=order, point=True, vars=self._vardict)#{'time':{'idim':0,'value':time}}) for field in self.fieldnames: if field not in VARLIST: continue if self.fields[field]["ndim"] > 1: pprint(0, "WARNING!! Multidimensional variables are skipped as sensor datasets must be saved in nopartition mode. Separate each dimension of your variable", flush=True) continue sd.add_field(field,1,self[field][mysensors]) return sd
[docs] @cr('Dataset.reshape') def reshape(self,field,info): ''' Reshape a field for a single variable according to the info ''' # Obtain number of points from the mesh npoints = len(self) # Only reshape the variable if ndim > 1 return np.ascontiguousarray(field.reshape((npoints,info['ndim']),order='C') if info['ndim'] > 1 else field)
[docs] @cr('Dataset.X') def X(self,*args): ''' Return the X matrix for the selected fields ''' # Select all variables if none is provided fieldnames = self.fieldnames if len(args) == 0 else args # Compute the number of fields npoints = len(self) nfields = 0 for f in fieldnames: nfields += self.fields[f]['ndim'] dims = [nfields*npoints] # Variable order could be random, thus create a list of variable # names and their idim to order varls = np.array(list(self.varnames)) ivars = np.array([self.vars[v]['idim'] for v in varls]) idx = np.argsort(ivars) # Order the variables varls = varls[idx] ivars = ivars[idx] # Loop the number of variables according to their idim # As minimum we will have 1 variable, thus idim=0. If # we have idim > 0, this surely indicates a multi-dimensional # field varc = 0 for v in varls: ivar = self.vars[v]['idim'] lvar = len(self.vars[v]['value']) if ivar == varc: dims += [lvar] varc += 1 # Create output array X = np.zeros(dims,np.double) # Populate output matrix ifield = 0 for field in fieldnames: v = self.fields[field] for idim in range(v['ndim']): X[ifield:nfields*npoints:nfields] = v['value'][idim:v['ndim']*npoints:v['ndim']] ifield += 1 return X
[docs] @cr('Dataset.save') def save(self,fname,**kwargs): ''' Store the field in various formats. ''' # Guess format from extension fmt = os.path.splitext(fname)[1][1:] # skip the . # Pickle format if fmt.lower() == 'pkl': io.pkl_save(fname,self) # H5 format if fmt.lower() == 'h5': # Set default parameters if not 'mode' in kwargs.keys(): kwargs['mode'] = 'w' if not os.path.exists(fname) else 'a' if not 'mpio' in kwargs.keys(): kwargs['mpio'] = True if not 'nopartition' in kwargs.keys(): kwargs['nopartition'] = False # Append or save if not kwargs.pop('append',False): io.h5_save_dset(fname,self.xyz,self.vars,self.fields,self.ordering,self.point,self.partition_table,**kwargs) else: io.h5_append_dset(fname,self.xyz,self.vars,self.fields,self.ordering,self.point,self.partition_table,**kwargs)
[docs] @classmethod @cr('Dataset.load') def load(cls,fname,**kwargs): ''' Load a field from various formats ''' # Guess format from extension fmt = os.path.splitext(fname)[1][1:] # skip the . # Pickle format if fmt.lower() == 'pkl': return io.pkl_load(fname) # H5 format if fmt.lower() == 'h5': if not 'mpio' in kwargs.keys(): kwargs['mpio'] = True xyz, order, point, ptable, varDict, fieldDict = io.h5_load_dset(fname,**kwargs) return cls(xyz,ptable,varDict,order, point, **fieldDict) raiseError('Cannot load file <%s>!'%fname)
# Properties @property def xyz(self): return self._xyz
[docs] def x(self): return self._xyz[:,0]
@property def y(self): return self._xyz[:,1] @property def z(self): return self._xyz[:,2] @property def ordering(self): return self._order @property def point(self): return self._point @property def partition_table(self): return self._ptable @property def vars(self): return self._vardict @property def varnames(self): return list(self._vardict.keys()) @property def fields(self): return self._fieldict @property def fieldnames(self): return list(self._fieldict.keys())