import os
import importlib_resources
import yaml
import numpy as np
import xarray as xr
# keep attributes through operatyion on xarray objects
xr.set_options(keep_attrs=True)
import rioxarray as rio
import logging
import gc
from multiprocessing import Pool # Process pool
from multiprocessing import sharedctypes
import itertools
import GRSdriver
from . import Product, acutils, AuxData, CamsProduct, L2aProduct, Masking, Rasterization
opj = os.path.join
configfile = importlib_resources.files(__package__) / 'config.yml'
with open(configfile, 'r') as file:
config = yaml.safe_load(file)
GRSDATA = config['path']['grsdata']
TOALUT = config['path']['toa_lut']
TRANSLUT = config['path']['trans_lut']
CAMS_PATH = config['path']['trans_lut']
NCPU = config['processor']['ncpu']
[docs]
class Process:
'''
Main GRS class.
'''
def __init__(self):
self.lut_file = opj(GRSDATA, TOALUT)
self.trans_lut_file = opj(GRSDATA, TRANSLUT)
self.cams_dir = CAMS_PATH
self.Nproc = NCPU
self.pressure_ref = 101500.
self.flags_tokeep = [3]
self.flags_tomask = [0,1,10,13,14,18]
self.successful = False
[docs]
def execute(self, l1c_prod,
ofile='',
cams_file=None,
surfwater_file=None,
dem_file=None,
resolution=20,
scale_aot=1,
opac_model=None,
allpixels=False,
snap_compliant=False
):
'''
Main program calling all GRS steps
:param l1c_prod: xarray L1C object or L1C input file (path) to be processed
:param ofile: Absolute path of the output file
:param cams_file: Absolute path for root directory of CAMS data
:param surfwater_file: Absolute path the surfwater file (.tif)
:param dem_file: Absolute path of the DEM geotiff file
:param resolution: pixel resolution in meter
:param scale_aot: scaling factor applied to CAMS aod550 raster
:param opac_model: If set force OPAC aerosol model for LUT interpolation (taken from CAMS data otherwise)
:param allpixels: if True process all pixels (no water pixel masking)
:param snap_compliant: Output format compliant with SNAP software for practical analysis
:return:
Examples
--------
>>> import grs
>>> file ='$YOUR_PATH_TO_IMG/S2A_MSIL1C_20181019T102031_N0500_R065_T30PYT_20230815T043850.SAFE'
>>> tile = file.split('_')[-2][1:]
>>> dem_file = '$YOUR_PATH_TO_DEM/COP-DEM_GLO-30-DGED_'+tile+'.tif'
>>> cams_file = '$YOUR_PATH_TO_CAMS/cams_forecast_2018-10.nc'
>>> process_ = grs.Process()
In this example, you force the aerosol model to be 'DESE_rh70' for desert dust
>>> process_.execute(file_nc,
... cams_file=cams_file,
... surfwater_file=None,
... dem_file=dem_file,
... scale_aot=1,
... opac_model='DESE_rh70')
INFO:root:pass netcdf image as grs product object
INFO:root:get CAMS auxilliary data
INFO:root:flagging from l1c data
INFO:root:cloud masking with s2cloudless
INFO:root:land masking
INFO:root:cirrus masking
INFO:root:high swir masking
INFO:root:loading look-up tables
INFO:root:compute gaseous transmittance from cams data
INFO:root:correct for gaseous absorption
INFO:root:compute spectral index (e.g., NDWI)
INFO:root:apply water masking
INFO:root:lut interpolation
INFO:root:selected aerosol model: DESE_rh70
INFO:root:scaling aot by: 1
INFO:root:set final parameters
INFO:root:compute surface pressure from dem
INFO:root:run grs process
INFO:root:success
INFO:root:construct final product
INFO:root:construct l2a
>>> process_.l2a.l2_prod
<xarray.Dataset>
Dimensions: (wl: 11, y: 1818, x: 2523)
Coordinates:
* wl (wl) int64 443 490 560 665 705 740 783 842 865 1610 2190
time datetime64[ns] 2018-10-19T10:20:31.024000
* x (x) float64 7.299e+05 7.299e+05 7.3e+05 ... 7.803e+05 7.804e+05
* y (y) float64 1.3e+06 1.3e+06 1.3e+06 ... 1.264e+06 1.264e+06
band int64 1
spatial_ref int64 0
Data variables:
Rrs (wl, y, x) float32 nan nan nan nan nan ... nan nan nan nan nan
BRDFg (y, x) float32 nan nan nan nan nan nan ... nan nan nan nan nan
aot550 (y, x) float32 0.187 0.187 0.187 0.187 ... 0.1929 0.1929 0.1929
vza (y, x) float32 6.489 6.489 6.483 6.483 ... 2.13 2.125 2.125
sza (y, x) float32 nan nan nan nan nan nan ... nan nan nan nan nan
raa (y, x) float32 332.5 332.5 332.5 332.5 ... 290.0 289.9 289.9
flags (y, x) int64 184 56 184 184 184 184 ... 160 160 160 160 160 160
dem (y, x) float32 282.9 282.7 282.5 282.3 ... 237.3 237.3 237.4
surfwater (y, x) int8 1 1 1 1 1 1 1 1 1 1 1 1 ... 1 1 1 1 1 1 1 1 1 1 1 1
Attributes: (12/71)
long_name: CA BLUE GREEN RED VRE_1 VRE_2 VRE_3 ...
constellation: Sentinel-2
constellation_id: S2
product_path: /data/satellite/Sentinel-2/L1C/30PYT...
product_name: S2A_MSIL1C_20181019T102031_N0500_R06...
product_filename: S2A_MSIL1C_20181019T102031_N0500_R06...
... ...
ndwi_threshold: 0.0
vis_swir_index_threshold: 0.0
hcld_threshold: 0.003
dirdata: /data/grs/grsdata
abs_gas_file: /home/harmel/Dropbox/Dropbox/work/gi...
water_vapor_transmittance_file: /home/harmel/Dropbox/Dropbox/work/gi...
You can either further play with the l2a xarray or save it into netcdf:
>>> process_.ofile='./name_of_your_output_l2a_netcdf'
>>> process_.write_output()
INFO:root:export final product into netcdf
INFO:root:export into encoded netcdf
'''
self.ofile = ofile
self.snap_compliant = snap_compliant
self.successful = False
##################################
# Get image data
##################################
if isinstance(l1c_prod, str):
# get extension
extension = l1c_prod.split('.')[-1]
basename = os.path.basename(l1c_prod)
if extension == 'nc':
logging.info('pass netcdf image as grs product object')
prod = Product(xr.open_dataset(l1c_prod))
elif 'SAFE' in extension:
logging.info('Open L1C Sentinel 2 image and compute angle parameters')
global l1c
l1c = GRSdriver.Sentinel2Driver(l1c_prod, resolution=resolution)
l1c.load_product()
logging.info('pass raw image as grs product object')
prod = Product(l1c.prod)
# clear memory (TODO make it work!!)
del l1c
gc.collect()
elif ('LC09_L1' in basename) or ('LC08_L1' in basename):
logging.info('Open L1TP Landsat image')
l1c = GRSdriver.LandsatDriver(l1c_prod, resolution=resolution)
l1c.load_mask()
l1c.load_product()
logging.info('pass raw image as grs product object')
prod = Product(l1c.prod)
# clear memory (TODO make it work!!)
del l1c
gc.collect()
else:
logging.info('input file format not recognized, stop')
return
elif isinstance(l1c_prod, xr.Dataset):
try:
prod = Product(l1c_prod)
except:
logging.info('input file format not recognized, stop')
return
self.prod = prod
wl_true = prod.raster.wl_true
##################################
# Set sensor specifications
##################################
# TODO check evolution concerning viewing angles computation for Lansdat, now in monoview mode
if 'S2' in prod.sensor:
monoview = False
else:
monoview = True
_R_ = Rasterization(monoview=monoview)
##################################
# GET ANCILLARY DATA (Pressure, O3, water vapor, NO2...
##################################
logging.info('get CAMS auxilliary data')
if cams_file:
cams = CamsProduct(prod.raster, cams_file=cams_file)
else:
tile = prod.raster.attrs['tile']
cams_dir = os.path.join(self.cams_dir, tile)
cams = CamsProduct(prod.raster, dir=cams_dir, suffix='_' + tile)
cams.load()
# Cox-Munk isotropic mean square slope (sigma2)
wind = np.sqrt(cams.raster['v10'] ** 2 + cams.raster['u10'] ** 2)
sigma2 = (wind + 0.586) / 195.3
# get mean values to set LUT
_sigma2 = sigma2.mean().values
_wind = wind.mean().values
##################################
# Pixel classification
# Generate the flags raster
##################################
logging.info('flagging from l1c data')
if surfwater_file:
logging.info('loading surfwater data file')
prod.raster['surfwater'] = rio.open_rasterio(surfwater_file
).astype(np.uint8
).squeeze().interp(x=prod.x,
y=prod.y,
method='nearest')
prod.raster.surfwater.name = 'surfwater'
prod.raster.surfwater.attrs = {
'description': 'surfwater file not provided as input, all pixels flagged as water (e.g., surfwater=1)'}
masking_ = Masking(prod.raster)
prod.raster = masking_.process(output="prod")
# -- clean up
prod.raster = prod.raster.drop_vars(["surfwater"])
#####################################
# SUBSET RASTER TO KEEP REQUESTED BANDS
#####################################
# TODO check if we can remove cirrus and water vapor band from output object
if prod.bcirrus:
prod.cirrus = prod.raster.bands.sel(wl=prod.bcirrus, method='nearest')
if prod.bwv:
prod.wv = prod.raster.bands.sel(wl=prod.bwv, method='nearest')
prod.raster = prod.raster.sel(wl=prod.wl_process, method='nearest')
##################################
## ADD ELEVATION AND PRESSURE BAND
##################################
# TODO activate DEM loading to improve pressure computation
# prod.get_elevation()
#####################################
# LOAD LUT FOR ATMOSPHERIC CORRECTION
#####################################
logging.info('loading look-up tables')
Ttot_Ed = xr.open_dataset(self.trans_lut_file)
Ttot_Ed['wl'] = Ttot_Ed['wl'] * 1000
aero_lut = xr.open_dataset(self.lut_file)
aero_lut['wl'] = aero_lut['wl'] * 1000
aero_lut['aot'] = aero_lut.aot.isel(wind=0).squeeze()
# remove URBAN aerosol model for this example
models = aero_lut.drop_sel(model='URBA_rh70').model.values
_auxdata = AuxData(wl=wl_true) # wl=masked.wl)
sunglint_eps = _auxdata.sunglint_eps # ['mean'].interp(wl=wl_true)
rot = _auxdata.rot
####################################
# absorbing gases correction
####################################
logging.info('compute gaseous transmittance from cams data')
gas_trans = acutils.GaseousTransmittance(prod, cams)
Tg_raster = gas_trans.get_gaseous_transmittance()
logging.info('correct for gaseous absorption')
for wl in prod.raster.wl.values:
prod.raster['bands'].loc[wl] = prod.raster.bands.sel(wl=wl) / Tg_raster.sel(wl=wl).interp(x=prod.raster.x,
y=prod.raster.y)
prod.raster.bands.attrs['gas_absorption_correction'] = True
######################################
# Water mask
######################################
# TODO remove ndwi export / replace this part with flags masking instead
logging.info('compute spectral index (e.g., NDWI)')
vis = prod.raster.bands.sel(wl=prod.bvis, method='nearest')
nir = prod.raster.bands.sel(wl=prod.bnir, method='nearest')
swir = prod.raster.bands.sel(wl=prod.bswir, method='nearest')
swir2 = prod.raster.bands.sel(wl=prod.bswir2, method='nearest')
ndwi = (vis - nir) / (vis + nir)
ndwi_swir = (vis - swir) / (vis + swir)
prod.raster['ndwi'] = ndwi
prod.raster.ndwi.attrs = {
'description': 'Normalized difference spectral index between bands at ' + str(prod.bvis) + ' and ' + str(
prod.bnir) + ' nm', 'units': '-'}
prod.raster['ndwi_swir'] = ndwi_swir
prod.raster.ndwi_swir.attrs = {
'description': 'Normalized difference spectral index between bands at ' + str(prod.bvis) + ' and ' + str(
prod.bswir) + ' nm', 'units': '-'}
if allpixels:
pass # masked_raster = prod.raster.bands
else:
logging.info('apply water masking')
mask = (ndwi_swir > prod.vis_swir_index_threshold) & (swir2 < prod.sunglint_threshold) # (ndwi > -0.0) &
masked = prod.raster.bands.where(mask)
prod.raster['bands'] = masked
prod.raster['sza'] = prod.raster['sza'].where(mask)
# stop process if no valid (water) pixel available
if np.isnan(prod.raster['sza'].values).all():
logging.info('no water pixels, stop process')
return
######################################
# LUT preparation
######################################
logging.info('lut interpolation')
# select appropriate opac aerosol model from CAMS aod
# remove URBAN for the moment
models = aero_lut.drop_sel(model='URBA_rh70').model.values
# get mean aot and aot550 from CAMS
cams_aot_mean = cams.cams_aod.mean(['x', 'y'])
cams_aot_ref = cams.cams_aod.interp(wl=550, method='quadratic')
cams_aot_ref_mean = cams_aot_ref.mean(['x', 'y'])
# get the model that has the closest aot spectral shape
if opac_model is None:
lut_aod = aero_lut.aot.sel(model=models, aot_ref=1).interp(wl=cams.cams_aod.wl)
idx = np.abs((cams_aot_mean / cams_aot_ref_mean) - lut_aod).sum('wl').argmin()
opac_model = aero_lut.sel(model=models).model.values[idx]
logging.info('selected aerosol model: ' + opac_model)
# slice LUT
aero_lut_ = aero_lut.sel(wind=_wind, method='nearest').sel(model=opac_model)
# get AOT550 raster (TODO replace with optimal estimation)
logging.info('scaling aot by: ' + str(scale_aot))
aot_ref_raster = cams_aot_ref * scale_aot
aot_ref_raster = aot_ref_raster.astype(np.float32)
# get unique values for angles and further lut interpolation
ang_resol = {'sza': 0.1, 'vza': 0.1, 'raa_round': 0}
szamin, szamax = float(prod.raster['sza'].min()), float(prod.raster['sza'].max())
vzamin, vzamax = float(prod.raster.isel(wl=0)['vza'].min()), float(prod.raster.isel(wl=0)['vza'].max())
# check for out-of-range
def check_out_of_range(vmin, vmax, ceiling=88):
vmin = np.max([0, vmin])
vmax = np.min([ceiling, vmax])
return vmin, vmax
szamin, szamax = check_out_of_range(szamin, szamax)
vzamin, vzamax = check_out_of_range(vzamin, vzamax, ceiling=25)
sza_ = np.arange(szamin, szamax + ang_resol['sza'], ang_resol['sza'])
vza_ = np.arange(vzamin, vzamax + ang_resol['vza'], ang_resol['vza'])
azi_ = (180 - np.unique(prod.raster.isel(wl=0)['raa'].round(ang_resol['raa_round']))) % 360
azi_ = azi_[~np.isnan(azi_)]
sza_lut_step = 2
vza_lut_step = 2
sza_slice = slice(np.min(sza_) - sza_lut_step, np.max(sza_) + sza_lut_step)
vza_slice = slice(np.min(vza_) - vza_lut_step, np.max(vza_) + vza_lut_step)
tweak = 3
aot_ref_ = np.unique((aot_ref_raster / tweak).round(3)) * tweak
aot_ref_min = aot_ref_raster.min()
aot_ref_max = aot_ref_raster.max()
aot_lut = aero_lut_.aot.interp(wl=wl_true, method='quadratic')
aot_lut = aot_lut.interp(aot_ref=np.linspace(aot_ref_min, aot_ref_max, 1000)) # .plot(hue='wl')
Rdiff_lut = aero_lut_.I.sel(sza=sza_slice, vza=vza_slice).interp(wl=wl_true, method='quadratic').interp(
azi=azi_)
Rdiff_lut = Rdiff_lut.interp(sza=sza_, vza=vza_)
Rray = Rdiff_lut.sel(aot_ref=0)
Rdiff_lut = Rdiff_lut.interp(aot_ref=aot_ref_, method='quadratic')
szas = Rdiff_lut.sza.values
vzas = Rdiff_lut.vza.values
azis = Rdiff_lut.azi.values
aot_refs = Rdiff_lut.aot_ref.values
Ttot_Ed_ = Ttot_Ed.sel(model=opac_model).sel(wind=_wind, method='nearest').interp(sza=szas).interp(
aot_ref=aot_ref_, method='quadratic').interp(wl=wl_true, method='cubic').Ttot_Ed
Ttot_Lu_ = Ttot_Ed.sel(model=opac_model).sel(wind=_wind, method='nearest').interp(sza=vzas).interp(
aot_ref=aot_ref_, method='quadratic').interp(wl=wl_true, method='cubic').Ttot_Ed ** 1.05
######################################
# Set final parameters for grs processing
######################################
logging.info('set final parameters')
width = prod.width
height = prod.height
Nwl = len(prod.raster.wl_to_process)
pressure_ref = self.pressure_ref
_sunglint_eps = sunglint_eps.values
# prepare aerosol parameters
aot_ref_raster = aot_ref_raster.interp(x=prod.raster.x, y=prod.raster.y).drop('wl').astype(np.float32)
aot_ref_raster.name = 'aot550'
_rot = rot.values
# _aot = aot_lut.interp(aot_ref=_aot_ref)
if dem_file:
logging.info('compute surface pressure from dem')
dem = xr.open_dataset(dem_file).squeeze().interp(y=prod.raster.y, x=prod.raster.x, method='nearest')
dem = dem.rename_vars({'band_data': 'dem'})
dem.dem.attrs['long_name'] = 'digital elevation model'
dem.dem.attrs['units'] = 'm'
dem.dem.attrs['source'] = dem_file
presure_msl = cams.raster.msl.interp(y=prod.raster.y, x=prod.raster.x)
_pressure = (presure_msl * (1. - 0.0065 * dem.dem / 288.15) ** 5.255).values
else:
dem = None
_pressure = cams.raster.sp.interp(x=prod.raster.x, y=prod.raster.y).values
######################################
# Run grs processing
######################################
logging.info('run grs process')
global chunk_process
Rrs_result = np.ctypeslib.as_ctypes(np.full((Nwl, height, width), np.nan, dtype=prod._type))
Rf_result = np.ctypeslib.as_ctypes(np.full((height, width), np.nan, dtype=prod._type))
shared_Rrs = sharedctypes.RawArray(Rrs_result._type_, Rrs_result)
shared_Rf = sharedctypes.RawArray(Rf_result._type_, Rf_result)
def chunk_process(args):
iy, ix = args
yc = min(height, iy + prod.chunk)
xc = min(width, ix + prod.chunk)
Rrs_tmp = np.ctypeslib.as_array(shared_Rrs)
Rf_tmp = np.ctypeslib.as_array(shared_Rf)
_band_rad = prod.raster.bands[:, iy:yc, ix:xc]
Nwl, Ny, Nx = _band_rad.shape
if Ny == 0 or Nx == 0:
return
arr_tmp = np.full((Nwl, Ny, Nx), np.nan, dtype=prod._type)
# subsetting
_sza = prod.raster.sza[iy:yc, ix:xc] # .values
if monoview:
_raa = prod.raster.raa[iy:yc, ix:xc]
_vza = prod.raster.vza[iy:yc, ix:xc]
_vza_mean = _vza.values
else:
_raa = prod.raster.raa[:, iy:yc, ix:xc]
_vza = prod.raster.vza[:, iy:yc, ix:xc]
_vza_mean = np.mean(_vza, axis=0).values
_azi = (180. - _raa) % 360
_air_mass_ = acutils.Misc.air_mass(_sza, _vza).values
_p_slope_ = prod.p_slope(_sza, _vza, _raa, sigma2=_sigma2, monoview=monoview).values
_aot_ref = aot_ref_raster.values[iy:yc, ix:xc]
_pressure_ = _pressure[iy:yc, ix:xc] / pressure_ref
# construct wl,y,x raster for Rayleigh optical thickness
_rot_raster = _R_._multiplicate(_rot, _pressure_, arr_tmp)
# get LUT values
_Rdiff = _R_.interp_Rlut(szas, _sza.values,
vzas, _vza.values,
azis, _azi.values,
aot_refs, _aot_ref,
Nwl, Ny, Nx, Rdiff_lut.values)
_Rray = _R_.interp_Rlut_rayleigh(szas, _sza.values,
vzas, _vza.values,
azis, _azi.values,
Nwl, Ny, Nx, Rray.values)
_Rdiff = _Rdiff + (_pressure_ - 1) * _Rray
_aot = _R_._interp_aotlut(aot_lut.aot_ref.values, _aot_ref, Nwl, Ny, Nx, aot_lut.values)
# correction for diffuse light
Rcorr = _band_rad.values - _Rdiff
# direct transmittance up/down
Tdir = acutils.Misc.transmittance_dir(_aot, _air_mass_, _rot_raster)
# vTotal transmittance (for Ed and Lu)
Tdown = _R_._interp_Tlut(szas, _sza.values, Ttot_Ed_.aot_ref.values, _aot_ref, Nwl, Ny, Nx,
Ttot_Ed_.values)
Tup = _R_._interp_Tlut(vzas, _vza_mean, Ttot_Ed_.aot_ref.values, _aot_ref, Nwl, Ny, Nx, Ttot_Lu_.values)
Ttot_du = Tdown * Tup
Rf = np.full((len(prod.iwl_swir), Ny, Nx), np.nan, dtype=prod._type)
nRf = np.full((len(prod.iwl_swir), Ny, Nx), np.nan, dtype=np.float32)
for iwl in prod.iwl_swir:
Rf[iwl] = Rcorr[iwl] / Tdir[iwl]
if monoview:
nRf[iwl] = Rf[iwl] / (_sunglint_eps[iwl] * _p_slope_)
else:
nRf[iwl] = Rf[iwl] / (_sunglint_eps[iwl] * _p_slope_[iwl])
Rf[Rf < 0] = 0.
nRf[nRf < 0] = 0.
nRf = np.min(nRf, axis=0)
Rf_tmp[iy:yc, ix:xc] = np.min(Rf, axis=0)
Rf = _R_._multiplicate(_sunglint_eps, nRf, arr_tmp)
Rf = Tdir * _p_slope_ * Rf
Rrs_tmp[:, iy:yc, ix:xc] = ((Rcorr - Rf) / np.pi) / Ttot_du
return
window_idxs = [(i, j) for i, j in
itertools.product(range(0, height, prod.chunk),
range(0, width, prod.chunk))]
global pool
pool = Pool(self.Nproc)
res = pool.map(chunk_process, window_idxs)
pool.terminate()
pool = None
logging.info('success')
######################################
# construct l2a object
######################################
logging.info('construct final product')
self.aot_ref_raster = aot_ref_raster
l2_prod = xr.Dataset(dict(Rrs=(['wl', "y", "x"], np.ctypeslib.as_array(shared_Rrs)),
BRDFg=(["y", "x"], np.ctypeslib.as_array(shared_Rf)),
aot550=(["y", "x"], aot_ref_raster.values)),
coords=dict(wl=prod.raster.wl,
x=prod.raster.x,
y=prod.raster.y),
)
l2_prod['central_wavelength'] = ('wl', prod.raster.wl_true.values)
l2_prod = l2_prod.set_coords('central_wavelength')
##############################################
# Update flags and create mask from recipe
##############################################
# flags for negative blue/green Rrs
bitmask = 18
prod.raster['flags'] = prod.raster.flags + (((l2_prod.Rrs.sel(wl=490, method='nearest') < -0.0005) |
(l2_prod.Rrs.sel(wl=565, method='nearest') < -0.0005)) << bitmask)
# add name and description
prod.raster.flags.attrs['flag_descriptions'][bitmask] = 'negative Rrs for blue or green bands'
prod.raster.flags.attrs['flag_names'][bitmask] = 'neg_rrs'
# mask from recipe
mask = masking_.create_mask(prod.raster.flags,
tomask=self.flags_tomask,
tokeep=self.flags_tokeep,
mask_name="mask")
l2_prod = xr.merge([l2_prod, mask])
######################################
# Write final product
######################################
logging.info('construct final product')
#self.l2_prod = l2_prod
self.l2a = L2aProduct(prod, l2_prod, cams, gas_trans, dem)
del prod, l2_prod, cams, gas_trans, dem
self.successful = True
return
def write_output(self):
logging.info('export final product into netcdf')
self.l2a.export_to_netcdf(self.ofile,
snap_compliant=self.snap_compliant)