Source code for grs.acutils

'''
Atmospheric Correction utilities to manage LUT and atmosphere parameters (aerosols, gases)
'''

import os, sys
import numpy as np
import xarray as xr
from scipy.optimize import curve_fit

from numba import njit, prange


@njit()
def _getnearpos(array, value):
    '''
    Get index of nearest neighbor in an array.

    :param array: array to search in
    :param value: value you want the nearest index
    :return:
    '''
    idx = (np.abs(array - value)).argmin()
    return idx


[docs] class Rasterization: ''' Class to numba compile python code with heavy loops. ''' def __init__(self, monoview=False): ''' Set compiled function for: - monoview = True : same viewing angles for all the spectral bands - monoview = False : viewing angles depend on spectral band (e.g. Sentinel-2 images) :param monoview: True or False ''' self.monoview = monoview if monoview: self.interp_Rlut_rayleigh = self._interp_Rlut_rayleigh_mono self.interp_Rlut = self._interp_Rlut_mono else: self.interp_Rlut_rayleigh = self._interp_Rlut_rayleigh self.interp_Rlut = self._interp_Rlut @staticmethod @njit() def _interp_Rlut_rayleigh_mono(szas, _sza, vzas, _vza, azis, _azi, Nwl, Ny, Nx, lut): arr_lut = np.full((Nwl, Ny, Nx), np.nan, dtype=np.float32) mus = np.cos(np.radians(_sza)) for _iy in range(Ny): for _ix in range(Nx): if np.isnan(_sza[_iy, _ix]): continue isza = _getnearpos(szas, _sza[_iy, _ix]) iazi = _getnearpos(azis, _azi[_iy, _ix]) ivza = _getnearpos(vzas, _vza[_iy, _ix]) for _iwl in range(Nwl): arr_lut[_iwl, _iy, _ix] = lut[_iwl, isza, ivza, iazi] / mus[_iy, _ix] return arr_lut @staticmethod @njit() def _interp_Rlut_mono(szas, _sza, vzas, _vza, azis, _azi, aot_refs, _aot_ref, Nwl, Ny, Nx, lut): arr_lut = np.full((Nwl, Ny, Nx), np.nan, dtype=np.float32) mus = np.cos(np.radians(_sza)) for _iy in range(Ny): for _ix in range(Nx): if np.isnan(_sza[_iy, _ix]): continue isza = _getnearpos(szas, _sza[_iy, _ix]) iazi = _getnearpos(azis, _azi[_iy, _ix]) ivza = _getnearpos(vzas, _vza[_iy, _ix]) iaot_ref = _getnearpos(aot_refs, _aot_ref[_iy, _ix]) for _iwl in range(Nwl): arr_lut[_iwl, _iy, _ix] = lut[iaot_ref, _iwl, isza, ivza, iazi] / mus[_iy, _ix] return arr_lut @staticmethod @njit() def _interp_Rlut_rayleigh(szas, _sza, vzas, _vza, azis, _azi, Nwl, Ny, Nx, lut): arr_lut = np.full((Nwl, Ny, Nx), np.nan, dtype=np.float32) mus = np.cos(np.radians(_sza)) for _iy in range(Ny): for _ix in range(Nx): if np.isnan(_sza[_iy, _ix]): continue isza = _getnearpos(szas, _sza[_iy, _ix]) for _iwl in range(Nwl): iazi = _getnearpos(azis, _azi[_iwl, _iy, _ix]) ivza = _getnearpos(vzas, _vza[_iwl, _iy, _ix]) arr_lut[_iwl, _iy, _ix] = lut[_iwl, isza, ivza, iazi] / mus[_iy, _ix] return arr_lut @staticmethod @njit() def _interp_Rlut(szas, _sza, vzas, _vza, azis, _azi, aot_refs, _aot_ref, Nwl, Ny, Nx, lut): arr_lut = np.full((Nwl, Ny, Nx), np.nan, dtype=np.float32) mus = np.cos(np.radians(_sza)) for _iy in range(Ny): for _ix in range(Nx): if np.isnan(_sza[_iy, _ix]): continue isza = _getnearpos(szas, _sza[_iy, _ix]) iaot_ref = _getnearpos(aot_refs, _aot_ref[_iy, _ix]) for _iwl in range(Nwl): iazi = _getnearpos(azis, _azi[_iwl, _iy, _ix]) ivza = _getnearpos(vzas, _vza[_iwl, _iy, _ix]) arr_lut[_iwl, _iy, _ix] = lut[iaot_ref, _iwl, isza, ivza, iazi] / mus[_iy, _ix] return arr_lut @staticmethod @njit() def _interp_Tlut(szas, _sza, aot_refs, _aot_ref, Nwl, Ny, Nx, lut): arr_lut = np.full((Nwl, Ny, Nx), np.nan, dtype=np.float32) for _iy in range(Ny): for _ix in range(Nx): if np.isnan(_sza[_iy, _ix]): continue isza = _getnearpos(szas, _sza[_iy, _ix]) iaot_ref = _getnearpos(aot_refs, _aot_ref[_iy, _ix]) for _iwl in range(Nwl): arr_lut[_iwl, _iy, _ix] = lut[iaot_ref, _iwl, isza] return arr_lut @staticmethod @njit() def _interp_aotlut(aot_refs, _aot_ref, Nwl, Ny, Nx, lut): arr_lut = np.full((Nwl, Ny, Nx), np.nan, dtype=np.float32) for _iy in range(Ny): for _ix in range(Nx): iaot_ref = _getnearpos(aot_refs, _aot_ref[_iy, _ix]) for _iwl in range(Nwl): arr_lut[_iwl, _iy, _ix] = lut[iaot_ref, _iwl] return arr_lut @staticmethod @njit() def _multiplicate(arrwl, raster, arresult): Nwl, Ny, Nx = arresult.shape for iwl in range(Nwl): arresult[iwl] = arrwl[iwl] * raster return arresult
[docs] class Aerosol: ''' aerosol parameters and parameterizations ''' def __init__(self): self.aot550 = 0.1 self.wavelengths = [] self.aot = [] self.wl = [] self.ang = [] self.o3du = 300 self.no2du = 300 self.fcoef = 0.5 self.popt = []
[docs] def func(self, lnwl, a, b, c): '''function for spectral variation of AOT''' return (a + b * lnwl + c * lnwl ** 2)
[docs] def fit_spectral_aot(self, wl, aot): '''call to get fitting results on AOT data''' lnwl = np.log(wl) self.popt, pcov = curve_fit(self.func, lnwl, np.log(aot))
[docs] def get_spectral_aot(self, wl): '''set aot for a given set of wavelengths''' lnwl = np.log(wl) return np.exp(self.func(lnwl, *self.popt))
[docs] def func_aero(self, Cext, fcoef): '''function to fit spectral behavior of bimodal aerosols onto aeronet optical thickness''' return fcoef * Cext[0] + (1 - fcoef) * Cext[1]
[docs] def fit_aero(self, nCext_f, nCext_c, naot): '''Call to get fine mode coefficient based on fitting on AOT data. Arguments: * ``nCext_f`` -- Normalized extinction coefficient of the fine mode aerosols * ``nCext_c`` -- Normalized extinction coefficient of the coarse mode aerosols * ``naot`` -- Normalized spectral aerosol optical thickness Return values: The mixing ratio of the fine mode aerosol Notes: . ''' self.fcoef, pcov = curve_fit(self.func_aero, [nCext_f, nCext_c], naot) return self.fcoef
[docs] class CamsParams: def __init__(self, name, resol): self.name = name self.resol = resol
[docs] class Gases(): ''' Intermediate class to set parameters for absorbing gases. ''' def __init__(self): # atmosphere auxiliary data # TODO get them from CAMS self.pressure = 1010 self.to3c = 6.5e-3 self.tno2c = 3e-6 self.tch4c = 1e-2 self.psl = 1013 self.coef_abs_scat = 0.3
[docs] class GaseousTransmittance(Gases): ''' Class containing functions to compute rasters of the direct transmittance of the absorbing gases. ''' def __init__(self, prod, cams): Gases.__init__(self) self.xmin, self.ymin, self.xmax, self.ymax = prod.raster.rio.bounds() self.prod = prod self.cams = cams self.gas_lut = prod.gas_lut self.Twv_lut = prod.Twv_lut self.SRF = self.prod.raster.SRF self.air_mass_mean = self.prod.air_mass_mean self.pressure = cams.raster.sp * 1e-2 self.coef_abs_scat = 0.3 self.Tg_tot_coarse = None self.cams_gases = {'ch4': CamsParams('tc_ch4', 4), 'no2': CamsParams('tcno2', 7), 'o3': CamsParams('gtco3', 4), 'h2o': CamsParams('tcwv', 1), }
[docs] def Tgas_background(self): ''' Compute direct transmittance for background absorbing gases: :math:`CO,\ O_2,\ O_4` :return: ''' gl = self.gas_lut pressure = self.pressure.round(1) self.ot_air = (gl.co + self.coef_abs_scat * gl.co2 + self.coef_abs_scat * gl.o2 + self.coef_abs_scat * gl.o4) / 1000 wl_ref = gl.wl SRF_hr = self.prod.raster.SRF.interp(wl_hr=wl_ref.values) vals = np.unique(pressure) vals = vals[~np.isnan(vals)] if len(vals) == 1: vals = np.concatenate([vals, 1.2 * vals]) Tg_raster = [] for val in vals: Tg = np.exp(- self.air_mass_mean * self.ot_air * val) Tg = Tg.rename({'wl': 'wl_hr'}) Tg_int = [] for label, srf in SRF_hr.groupby('wl', squeeze=False): srf = srf.dropna('wl_hr').squeeze() Tg_ = Tg.sel(wl_hr=srf.wl_hr) wl_integr = Tg_.wl_hr.values Tg_ = np.trapz(Tg_ * srf, wl_integr) / np.trapz(srf, wl_integr) Tg_int.append(Tg_) Tg_raster.append(xr.DataArray(Tg_int, name='Ttot', coords={'wl': SRF_hr.wl.values} ).assign_coords({'pressure': val})) Tg_raster = xr.concat(Tg_raster, dim='pressure') return Tg_raster.interp(pressure=pressure).drop_vars(['pressure'])
[docs] def Tgas(self, gas_name): ''' Compute hyperspectral transmittance for a given absorbing gas and convolve it with the spectral respnse functions of the sattelite sensor. :param gas_name: name of the absorbing gas, choose between: - 'h2o' - 'o3' - n2o' :return: Gaseous transmittance for satellite bands ''' renorm = 1. if gas_name == 'h2o': renorm = 0.4 cams_gas = self.cams_gases[gas_name].name resol = self.cams_gases[gas_name].resol lut_abs = self.gas_lut[gas_name] rounded = renorm * self.cams.raster[cams_gas].round(resol) wl_ref = self.gas_lut.wl SRF_hr = self.prod.raster.SRF.interp(wl_hr=wl_ref.values) vals = np.unique(rounded) vals = vals[~np.isnan(vals)] if len(vals) == 1: vals = np.concatenate([vals, 1.2 * vals]) Tg_raster = [] for val in vals: Tg = np.exp(- self.air_mass_mean * lut_abs * val) Tg = Tg.rename({'wl': 'wl_hr'}) Tg_int = [] for label, srf in SRF_hr.groupby('wl', squeeze=False): srf = srf.dropna('wl_hr').squeeze() Tg_ = Tg.sel(wl_hr=srf.wl_hr) wl_integr = Tg_.wl_hr.values Tg_ = np.trapz(Tg_ * srf, wl_integr) / np.trapz(srf, wl_integr) Tg_int.append(Tg_) Tg_raster.append(xr.DataArray(Tg_int, name='Ttot', coords={'wl': SRF_hr.wl.values} ).assign_coords({'tc': val})) Tg_raster = xr.concat(Tg_raster, dim='tc') return Tg_raster.interp(tc=rounded)
[docs] def get_gaseous_optical_thickness(self): ''' Get gaseous optival thickness from total column integrated concentration. :return: ''' gas_lut = self.gas_lut ot_o3 = gas_lut.o3 * self.to3c ot_ch4 = gas_lut.ch4 * self.tch4c ot_no2 = gas_lut.no2 * self.tno2c ot_air = (gas_lut.co + self.coef_abs_scat * gas_lut.co2 + self.coef_abs_scat * gas_lut.o2 + self.coef_abs_scat * gas_lut.o4) * self.pressure / 1000 self.abs_gas_opt_thick = ot_ch4 + ot_no2 + ot_o3 + ot_air
[docs] def get_gaseous_transmittance(self, gases=['ch4','no2','o3','h2o']): ''' Get the final total gaseous transmittance. :return: ''' Tg_tot = self.Tgas_background() for gas in gases: Tg_tot =Tg_tot *self.Tgas(gas) # Tg_other = Tg_other.rename({'longitude': 'x', 'latitude': 'y'}) # Nx = len(Tg_other.x) # Ny = len(Tg_other.y) # x = np.linspace(self.xmin, self.xmax, Nx) # y = np.linspace(self.ymax, self.ymin, Ny) # Tg_other['x'] = x # Tg_other['y'] = y self.Tg_tot_coarse = Tg_tot # TODO remove interp for the whole object and proceed with loop on spectral bands to save memory return Tg_tot # .interp(x=self.prod.raster.x, y=self.prod.raster.y)
def correct_gaseous_transmittance(self): return
[docs] def get_gaseous_transmittance_old(self): ''' Obsolete function :return: ''' self.get_gaseous_optical_thickness() wl_ref = self.gas_lut.wl # .values SRF_hr = self.SRF.interp(wl_hr=wl_ref.values) Tg = np.exp(- self.air_mass_mean * self.abs_gas_opt_thick) Tg = Tg.rename({'wl': 'wl_hr'}) Tg_int = [] for label, srf in SRF_hr.groupby('wl'): srf = srf.dropna('wl_hr').squeeze() Tg_ = Tg.sel(wl_hr=srf.wl_hr) wl_integr = Tg_.wl_hr.values Tg_ = np.trapz(Tg_ * srf, wl_integr) / np.trapz(srf, wl_integr) Tg_int.append(Tg_) self.Tg_other = xr.DataArray(Tg_int, name='Ttot', coords={'wl': SRF_hr.wl.values})
[docs] def other_gas_correction(self, raster_name='masked_raster', variable='Rtoa_masked'): ''' Correct for transmittance of high altitude gases (i.e., no coupling with low atmosphere scattering) :param raster_name: :param variable: :return: ''' raster = self.__dict__[raster_name] attrs = raster[variable].attrs if attrs.__contains__('other_gas_correction'): if attrs['other_gas_correction']: print('raster ' + raster_name + '.' + variable + ' is already corrected for other gases transmittance') print('set attribute other_gas_correction to False to proceed anyway') return if self.Tg_other is None: self.get_gaseous_transmittance(self.air_mass_mean) raster[variable] = raster[variable] / self.Tg_other raster[variable].attrs['other_gas_correction'] = True
[docs] def water_vapor_correction(self, raster_name='coarse_masked_raster', variable='Rtoa_masked'): ''' Correct for water vapor transmittance. :param raster_name: :param variable: :return: ''' raster = self.__dict__[raster_name] attrs = raster[variable].attrs if attrs.__contains__('water_vapor_correction'): if attrs['other_gas_correction']: print('raster ' + raster_name + '.' + variable + ' is already corrected for water vapor transmittance') print('set attribute other_gas_correction to False to proceed anyway') return if self.Twv_raster is None: print('xarray of water vapor transmittance is not set, please run get_wv_transmittance_raster(tcwv_raster)') return raster[variable] = raster[variable] / self.Twv_raster raster[variable].attrs['water_vapor_correction'] = True
[docs] def get_wv_transmittance_raster(self, tcwv_raster): ''' Get transmittance raster for correction of the image. :param tcwv_raster: :return: ''' tcwv_vals = tcwv_raster.tcwv.round(1) tcwvs = np.unique(tcwv_vals) tcwvs = tcwvs[~np.isnan(tcwvs)] # TODO improve for air_mass raster Twvs = self.Twv_lut.Twv.interp(air_mass=self.air_mass_mean).interp(tcwv=tcwvs, method='linear').drop('air_mass') self.Twv_raster = Twvs.interp(tcwv=tcwv_vals, method='nearest')
[docs] class Misc: ''' Miscelaneous utilities '''
[docs] @staticmethod def get_pressure(alt, psl): '''Compute the pressure for a given altitude alt : altitude in meters (float or np.array) psl : pressure at sea level in hPa palt : pressure at the given altitude in hPa''' palt = psl * (1. - 0.0065 * np.nan_to_num(alt) / 288.15) ** 5.255 return palt
@staticmethod def transmittance_dir(aot, air_mass, rot=0): return np.exp(-(rot + aot) * air_mass) @staticmethod def air_mass(sza, vza): return 1 / np.cos(np.radians(vza)) + 1 / np.cos(np.radians(sza))