Source code for grs.product

'''
Module to build GRS object from input image.
'''

import os

import numpy as np
import xarray as xr
import datetime

import logging
from importlib.resources import files
import yaml
from . import AuxData, __version__, __package__

opj = os.path.join

configfile = files(__package__) / 'config.yml'
with open(configfile, 'r') as file:
    config = yaml.safe_load(file)


[docs] class Product(): ''' Assign the GRS product object and attributes from L1C image raster ''' def __init__(self, l1c_obj=None, auxdatabase='cams-global-atmospheric-composition-forecasts', output='Rrs'): ''' Set the metadata of GRS object. :param l1c_obj: input image :param auxdatabase: Deprecated :param output: Deprecated ''' self.processor = __package__ + '_' + __version__ self.raster = l1c_obj # .prod # TODO check why drivers sends an object for wl coordinates instead of array of int self.raster['wl'] = self.raster['wl'].astype(int) self.sensor = self.raster.attrs['satellite'] self.date_str = self.raster.attrs['acquisition_date'] if 'S2' in self.sensor: self.date = datetime.datetime.strptime(self.date_str, '%Y-%m-%dT%H:%M:%S.%fZ') else: self.date = datetime.datetime.strptime(self.date_str, '%Y-%m-%d %H:%M:%S') if not 'time' in self.raster.coords: self.raster = self.raster.assign_coords({'time': self.date}) # add metadata for future export to L2product self.raster.attrs['processing_time'] = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f') self.raster.attrs['processor'] = self.processor self.raster.attrs['version'] = __version__ self.x = self.raster.x self.y = self.raster.y self.width = self.x.__len__() self.height = self.y.__len__() self.lonmin, self.latmin, self.lonmax, self.latmax = self.raster.rio.transform_bounds(4326) # set longitude between 0 and 360 deg self.lonmin, self.lonmax, = self.lonmin % 360, self.lonmax % 360 self.xmin, self.ymin, self.xmax, self.ymax = self.raster.rio.bounds() self.wl = self.raster.wl self.central_wavelength = self.raster.wl_true.values # correct for bug with VZA == Inf self.raster['vza'] = self.raster.vza.where(self.raster.vza < 88) self.vza_mean = self.raster.vza.mean() self.sza_mean = self.raster.sza.mean() self.air_mass_mean = 1. / np.cos(np.radians(self.sza_mean)) + 1. / np.cos(np.radians(self.vza_mean)) # surfwater object: surfwater = xr.ones_like(self.raster.bands.isel(wl=0, drop=True).squeeze().astype(np.uint8)) surfwater.name = 'surfwater' self.raster['surfwater'] = surfwater # make sure that raster are encoding in float32 for key in self.raster.keys(): if self.raster[key].dtype == "float64": self.raster[key] = self.raster[key].astype(np.float32) # TODO remove or harmonize with landsat # self.U = self.raster.attrs['REFLECTANCE_CONVERSION_U'] # convert into mW cm-2 um-1 #self.solar_irradiance = xr.DataArray(self.raster.solar_irradiance / 10, # coords={'wl': self.wl}, # attrs={ # 'description': 'extraterrestrial solar irradiance from satellite metadata', # 'units': 'mW cm-2 um-1'}) self.auxdatabase = auxdatabase self.output = output ######################### # settings: ######################### self.wl_process = self.raster.wl_to_process self.chunk = 512 self._type = np.float32 # self.sunglint_bands = [12] self.pressure_ref = 101500. self.iwl_swir = [-2, -1] self.bvis = 490 self.bnir = 842 self.bswir = 1610 self.bswir2 = 2190 # set extra bands (e.g., cirrus, water vapor) self.bcirrus = 1375 self.cirrus = None self.bwv = 945 self.wv = None # mask thresholding parameters self.sunglint_threshold = 0.2 self.ndwi_threshold = 0.0 self.vis_swir_index_threshold = 0. self.hcld_threshold = 3e-3 # pre-computed auxiliary data self.dirdata = config['path']['grsdata'] self.abs_gas_file = files('grs.data.lut.gases').joinpath('lut_abs_opt_thickness_normalized.nc') # self.lut_file = opj(self.dirdata, 'lut', 'opac_osoaa_lut_v2.nc') self.water_vapor_transmittance_file = files('grs.data.lut.gases').joinpath('water_vapor_transmittance.nc') self.load_auxiliary_data() # set retrieved parameter unit (Rrs or Lwn); is passed to fortran module self.rrs = False if self.output == 'Rrs': self.rrs = True ######################### # variables: ######################### self.headerfile = '' self.l2_product = None self.aux = None self.name = '' self.description = '' self.nodata = -999.9 self.sza = [] self.sazi = [] self.vza = [] self.vazi = [] self.wkt = [] # geographical extent in POLYGON format self.outfile = '' self.aeronetfile = 'no' self.pressure_ref = 1015.20 self.pressure = 1015.2 self.ssp = 1015.2 self.aot550 = 0.1 self.aot = [] self.angstrom = 1 self.fcoef = 0.5 self.rot = [] self.oot = [] # parameters for MAJA masks (CLM-cloud and MG2-geophysical) # masks order must follow bit order # CLM masks self.clm_masks = ['CLM_cloud_shadow', 'CLM_opaque_cloud', 'CLM_cloud_from_blue', 'CLM_cloud_multitemp', 'CLM_thin_cloud', 'CLM_shadow', 'CLM_shadow_from_outer_cloud', 'CLM_cirrus'] # MG2 masks self.mg2_masks = ['MG2_Water_Mask', 'MG2_Cloud_Mask_All_Cloud', 'MG2_Snow_Mask', 'MG2_Shadow_Mask', 'MG2_Topographical_Shadows_Mask', 'MG2_Hidden_Areas_Mask', ] self.maja_masks = np.concatenate([self.clm_masks, self.mg2_masks])
[docs] def load_auxiliary_data(self): ''' Load look-up tables data for gas absorption and backgroud transmittance :return: ''' # get LUT self.gas_lut = xr.open_dataset(self.abs_gas_file) # self.aero_lut = xr.open_dataset(self.lut_file) # convert wavelength in nanometer # self.aero_lut['wl'] = self.aero_lut['wl'] * 1000 # self.aero_lut['wl'].attrs['description'] = 'wavelength of simulation (nanometer)' self.Twv_lut = xr.open_dataset(self.water_vapor_transmittance_file)
[docs] def set_outfile(self, file): ''' Set name (path) of outputimage :param file: :return: ''' self.outfile = file
[docs] def set_aeronetfile(self, file): ''' Deprecated :param file: :return: ''' self.aeronetfile = file
[docs] def get_flag(self, product, flag_name): ''' Deprecated: obsolete solution now handled by GRSdriver module Get binary flag raster `flag_name` from `product` :param product: ProductIO object :param flag_name: name of the flag to be loaded :return: ''' # TODO implement bit masks w, h = self.width, self.height flag_raster = np.zeros((w, h), dtype=np.int32, order='F').T return flag_raster
# TODO implement DEM fetching def get_elevation(self, source='Copernicus30m'): self.elevation = xr.DataArray(np.zeros((self.height, self.width)), name="dem", coords=dict( y=self.y, x=self.x), attrs=dict( description="Digital elevation model from " + source, units="m") )
[docs] def mu_N(self, sza, vza, azi, monoview=False): ''' Compute the normal angle to wave slopes that produce sunglint. Warning: azi: azimuth in rad for convention azi=180 when sun-sensenor in oppositioon :param sza: solar zenith angle in degree :param vza: viewing zenith angle in degree :param azi: relative azimuth bewteen sun and sensor :param monoview: Set sensor viewing configuration: - monoview = True : same viewing angles for all the spectral bands - monoview = False : viewing angles depend on spectral band (e.g. Sentinel-2 images) :return: cosine of normal angle to sunglint wave facets ''' vzar = np.radians(vza) azir = np.radians(azi) szar = np.radians(sza) cos_alpha = np.cos(azir) * np.sin(vzar) * np.sin(szar) + np.cos(vzar) * np.cos(szar) xmu_N = (np.cos(szar) + np.cos(vzar)) / np.sqrt(2 * (1 + cos_alpha)) if monoview: xmu_N = xmu_N.transpose('y', 'x') else: # ensure similar shape as inputs xmu_N = xmu_N.transpose('wl', 'y', 'x') return xmu_N
[docs] def p_slope(self, sza, vza, azi, sigma2=0.02, monoview=False): ''' Compute propability of wave slopes producing sunglint. :param sza: solar zenith angle in degree :param vza: viewing zenith angle in degree :param azi: relative azimuth bewteen sun and sensor :param sigma2: mean square slope of the wave slope distribution :param monoview: Set sensor viewing configuration: - monoview = True : same viewing angles for all the spectral bands - monoview = False : viewing angles depend on spectral band (e.g. Sentinel-2 images) :return: ''' cosN = self.mu_N(sza, vza, azi,monoview=monoview) thetaN = np.arccos(cosN) # stats == 'cm_iso': # TODO check consitency between sigma2 and formulation # Pdist_ = 1. / (np.pi *2.* sigma2) * np.exp(-1./2 * np.tan(thetaN) ** 2 / sigma2) xp_slope = 1. / (np.pi * sigma2) * np.exp(- np.tan(thetaN) ** 2 / sigma2) / cosN ** 4 if monoview: xp_slope = xp_slope.transpose('y', 'x') else: xp_slope = xp_slope.transpose('wl', 'y', 'x') return xp_slope