#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2011-2019 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy. If not, see <http://www.gnu.org/licenses/>.
"""Interface to VIIRS SDR format.
This reader implements the support of VIIRS SDR files as produced by CSPP and CLASS.
It is comprised of two parts:
- A subclass of the YAMLFileReader class to allow handling all the files
- A filehandler class to implement the actual reading
Format documentation:
- http://npp.gsfc.nasa.gov/science/sciencedocuments/082012/474-00001-03_CDFCBVolIII_RevC.pdf
"""
import logging
import os.path
from contextlib import suppress
from datetime import datetime, timedelta
from glob import glob
import dask.array as da
import numpy as np
import xarray as xr
from satpy.readers.hdf5_utils import HDF5FileHandler
from satpy.readers.yaml_reader import FileYAMLReader
NO_DATE = datetime(1958, 1, 1)
EPSILON_TIME = timedelta(days=2)
LOG = logging.getLogger(__name__)
def _get_invalid_info(granule_data):
"""Get a detailed report of the missing data.
N/A: not applicable
MISS: required value missing at time of processing
OBPT: onboard pixel trim (overlapping/bow-tie pixel removed during SDR processing)
OGPT: on-ground pixel trim (overlapping/bow-tie pixel removed during EDR processing)
ERR: error occurred during processing / non-convergence
ELINT: ellipsoid intersect failed / instrument line-of-sight does not intersect the Earth’s surface
VDNE: value does not exist / processing algorithm did not execute
SOUB: scaled out-of-bounds / solution not within allowed range
"""
msg = None
if issubclass(granule_data.dtype.type, np.integer):
msg = ("na:" + str((granule_data == 65535).sum()) +
" miss:" + str((granule_data == 65534).sum()) +
" obpt:" + str((granule_data == 65533).sum()) +
" ogpt:" + str((granule_data == 65532).sum()) +
" err:" + str((granule_data == 65531).sum()) +
" elint:" + str((granule_data == 65530).sum()) +
" vdne:" + str((granule_data == 65529).sum()) +
" soub:" + str((granule_data == 65528).sum()))
elif issubclass(granule_data.dtype.type, np.floating):
msg = ("na:" + str((granule_data == -999.9).sum()) +
" miss:" + str((granule_data == -999.8).sum()) +
" obpt:" + str((granule_data == -999.7).sum()) +
" ogpt:" + str((granule_data == -999.6).sum()) +
" err:" + str((granule_data == -999.5).sum()) +
" elint:" + str((granule_data == -999.4).sum()) +
" vdne:" + str((granule_data == -999.3).sum()) +
" soub:" + str((granule_data == -999.2).sum()))
return msg
DATASET_KEYS = {'GDNBO': 'VIIRS-DNB-GEO',
'SVDNB': 'VIIRS-DNB-SDR',
'GITCO': 'VIIRS-IMG-GEO-TC',
'GIMGO': 'VIIRS-IMG-GEO',
'SVI01': 'VIIRS-I1-SDR',
'SVI02': 'VIIRS-I2-SDR',
'SVI03': 'VIIRS-I3-SDR',
'SVI04': 'VIIRS-I4-SDR',
'SVI05': 'VIIRS-I5-SDR',
'GMTCO': 'VIIRS-MOD-GEO-TC',
'GMODO': 'VIIRS-MOD-GEO',
'SVM01': 'VIIRS-M1-SDR',
'SVM02': 'VIIRS-M2-SDR',
'SVM03': 'VIIRS-M3-SDR',
'SVM04': 'VIIRS-M4-SDR',
'SVM05': 'VIIRS-M5-SDR',
'SVM06': 'VIIRS-M6-SDR',
'SVM07': 'VIIRS-M7-SDR',
'SVM08': 'VIIRS-M8-SDR',
'SVM09': 'VIIRS-M9-SDR',
'SVM10': 'VIIRS-M10-SDR',
'SVM11': 'VIIRS-M11-SDR',
'SVM12': 'VIIRS-M12-SDR',
'SVM13': 'VIIRS-M13-SDR',
'SVM14': 'VIIRS-M14-SDR',
'SVM15': 'VIIRS-M15-SDR',
'SVM16': 'VIIRS-M16-SDR',
'IVCDB': 'VIIRS-DualGain-Cal-IP'
}
[docs]class VIIRSSDRFileHandler(HDF5FileHandler):
"""VIIRS HDF5 File Reader."""
def __init__(self, filename, filename_info, filetype_info, use_tc=None, **kwargs):
"""Initialize file handler."""
self.datasets = filename_info['datasets'].split('-')
self.use_tc = use_tc
super(VIIRSSDRFileHandler, self).__init__(filename, filename_info, filetype_info, **kwargs)
def __getitem__(self, item):
"""Get item."""
if '*' in item:
# this is an aggregated field that can't easily be loaded, need to
# join things together
idx = 0
base_item = item
item = base_item.replace('*', str(idx))
result = []
while True:
try:
res = super(VIIRSSDRFileHandler, self).__getitem__(item)
result.append(res)
except KeyError:
# no more granule keys
LOG.debug("Aggregated granule stopping on '%s'", item)
break
idx += 1
item = base_item.replace('*', str(idx))
return result
else:
return super(VIIRSSDRFileHandler, self).__getitem__(item)
def _parse_datetime(self, datestr, timestr):
try:
datetime_str = datestr + timestr
except TypeError:
datetime_str = str(datestr.astype(str)) + str(timestr.astype(str))
time_val = datetime.strptime(datetime_str, '%Y%m%d%H%M%S.%fZ')
if abs(time_val - NO_DATE) < EPSILON_TIME:
# catch rare case when SDR files have incorrect date
raise ValueError("Datetime invalid {}".format(time_val))
return time_val
@property
def start_time(self):
"""Get start time."""
dataset_group = DATASET_KEYS[self.datasets[0]]
default_start_date = 'Data_Products/{dataset_group}/{dataset_group}_Aggr/attr/AggregateBeginningDate'
default_start_time = 'Data_Products/{dataset_group}/{dataset_group}_Aggr/attr/AggregateBeginningTime'
date_var_path = self.filetype_info.get('start_date', default_start_date).format(dataset_group=dataset_group)
time_var_path = self.filetype_info.get('start_time', default_start_time).format(dataset_group=dataset_group)
return self._parse_datetime(self[date_var_path], self[time_var_path])
@property
def end_time(self):
"""Get end time."""
dataset_group = DATASET_KEYS[self.datasets[0]]
default_end_date = 'Data_Products/{dataset_group}/{dataset_group}_Aggr/attr/AggregateEndingDate'
default_end_time = 'Data_Products/{dataset_group}/{dataset_group}_Aggr/attr/AggregateEndingTime'
date_var_path = self.filetype_info.get('end_date', default_end_date).format(dataset_group=dataset_group)
time_var_path = self.filetype_info.get('end_time', default_end_time).format(dataset_group=dataset_group)
return self._parse_datetime(self[date_var_path], self[time_var_path])
@property
def start_orbit_number(self):
"""Get start orbit number."""
dataset_group = DATASET_KEYS[self.datasets[0]]
default = 'Data_Products/{dataset_group}/{dataset_group}_Aggr/attr/AggregateBeginningOrbitNumber'
start_orbit_path = self.filetype_info.get('start_orbit', default).format(dataset_group=dataset_group)
return int(self[start_orbit_path])
@property
def end_orbit_number(self):
"""Get end orbit number."""
dataset_group = DATASET_KEYS[self.datasets[0]]
default = 'Data_Products/{dataset_group}/{dataset_group}_Aggr/attr/AggregateEndingOrbitNumber'
end_orbit_path = self.filetype_info.get('end_orbit', default).format(dataset_group=dataset_group)
return int(self[end_orbit_path])
@property
def platform_name(self):
"""Get platform name."""
default = '/attr/Platform_Short_Name'
platform_path = self.filetype_info.get(
'platform_name', default).format(**self.filetype_info)
platform_dict = {'NPP': 'Suomi-NPP',
'JPSS-1': 'NOAA-20',
'J01': 'NOAA-20',
'JPSS-2': 'NOAA-21',
'J02': 'NOAA-21'}
return platform_dict.get(self[platform_path], self[platform_path])
@property
def sensor_name(self):
"""Get sensor name."""
dataset_group = DATASET_KEYS[self.datasets[0]]
default = 'Data_Products/{dataset_group}/attr/Instrument_Short_Name'
sensor_path = self.filetype_info.get(
'sensor_name', default).format(dataset_group=dataset_group)
return self[sensor_path].lower()
[docs] def get_file_units(self, dataset_id, ds_info):
"""Get file units from metadata."""
file_units = ds_info.get("file_units")
if file_units is None:
LOG.debug("Unknown units for file key '%s'", dataset_id)
return file_units
[docs] def scale_swath_data(self, data, scaling_factors, dataset_group):
"""Scale swath data using scaling factors and offsets.
Multi-granule (a.k.a. aggregated) files will have more than the usual two values.
"""
rows_per_gran = self._get_rows_per_granule(dataset_group)
factors = self._mask_and_reshape_factors(scaling_factors)
data = self._map_and_apply_factors(data, factors, rows_per_gran)
return data
@staticmethod
def _mask_and_reshape_factors(factors):
factors = factors.where(factors > -999, np.float32(np.nan))
return factors.data.reshape((-1, 2)).rechunk((1, 2)) # make it so map_blocks happens per factor
@staticmethod
def _map_and_apply_factors(data, factors, rows_per_gran):
# The user may have requested a different chunking scheme, but we need
# per granule chunking right now so factor chunks map 1:1 to data chunks
old_chunks = data.chunks
dask_data = data.data.rechunk((tuple(rows_per_gran), data.data.chunks[1]))
dask_data = da.map_blocks(_apply_factors, dask_data, factors,
chunks=dask_data.chunks, dtype=data.dtype,
meta=np.array([[]], dtype=data.dtype))
data = xr.DataArray(dask_data.rechunk(old_chunks),
dims=data.dims, coords=data.coords,
attrs=data.attrs)
return data
@staticmethod
def _scale_factors_for_units(factors, file_units, output_units):
if file_units == "W cm-2 sr-1" and output_units == "W m-2 sr-1":
LOG.debug("Adjusting scaling factors to convert '%s' to '%s'",
file_units, output_units)
factors = factors * 10000.
elif file_units == "1" and output_units == "%":
LOG.debug("Adjusting scaling factors to convert '%s' to '%s'",
file_units, output_units)
factors = factors * 100.
else:
raise ValueError("Don't know how to convert '{}' to '{}'".format(
file_units, output_units))
return factors
@staticmethod
def _get_valid_scaling_factors(factors):
if factors is None:
factors = np.array([1, 0], dtype=np.float32)
factors = xr.DataArray(da.from_array(factors, chunks=1))
else:
factors = factors.where(factors != -999., np.float32(np.nan))
return factors
def _adjust_scaling_factors(self, factors, file_units, output_units):
"""Adjust scaling factors ."""
if file_units == output_units:
LOG.debug("File units and output units are the same (%s)", file_units)
return factors
factors = self._get_valid_scaling_factors(factors)
return self._scale_factors_for_units(factors, file_units, output_units)
def _get_scaling_factors(self, file_units, output_units, factor_var_path):
"""Get file scaling factors and scale according to expected units."""
factors = self.get(factor_var_path)
factors = self._adjust_scaling_factors(factors, file_units, output_units)
return factors
def _generate_file_key(self, ds_id, ds_info, factors=False):
var_path = ds_info.get('file_key', 'All_Data/{dataset_group}_All/{calibration}')
calibration = {
'radiance': 'Radiance',
'reflectance': 'Reflectance',
'brightness_temperature': 'BrightnessTemperature',
}.get(ds_id.get('calibration'))
var_path = var_path.format(calibration=calibration, dataset_group=DATASET_KEYS[ds_info['dataset_group']])
if ds_id['name'] in ['dnb_longitude', 'dnb_latitude']:
if self.use_tc is True:
return var_path + '_TC'
if self.use_tc is None and var_path + '_TC' in self.file_content:
return var_path + '_TC'
return var_path
[docs] @staticmethod
def expand_single_values(var, scans):
"""Expand single valued variable to full scan lengths."""
if scans.size == 1:
return var
else:
expanded = np.repeat(var, scans)
expanded.attrs = var.attrs
expanded.rename({expanded.dims[0]: 'y'})
return expanded
def _scan_size(self, dataset_group_name):
"""Get how many rows of data constitute one scanline."""
if 'I' in dataset_group_name:
scan_size = 32
else:
scan_size = 16
return scan_size
[docs] def concatenate_dataset(self, dataset_group, var_path):
"""Concatenate dataset."""
scan_size = self._scan_size(dataset_group)
scans = self._get_scans_per_granule(dataset_group)
start_scan = 0
data_chunks = []
scans = xr.DataArray(scans)
variable = self[var_path]
# check if these are single per-granule value
if variable.size != scans.size:
for gscans in scans.values:
data_chunks.append(self[var_path].isel(y=slice(start_scan, start_scan + gscans * scan_size)))
start_scan += gscans * scan_size
return xr.concat(data_chunks, 'y')
else:
return self.expand_single_values(variable, scans)
def _get_rows_per_granule(self, dataset_group):
scan_size = self._scan_size(dataset_group)
scans_per_gran = self._get_scans_per_granule(dataset_group)
return [scan_size * gran_scans for gran_scans in scans_per_gran]
def _get_scans_per_granule(self, dataset_group):
number_of_granules_path = 'Data_Products/{dataset_group}/{dataset_group}_Aggr/attr/AggregateNumberGranules'
nb_granules_path = number_of_granules_path.format(dataset_group=DATASET_KEYS[dataset_group])
scans = []
for granule in range(self[nb_granules_path]):
scans_path = 'Data_Products/{dataset_group}/{dataset_group}_Gran_{granule}/attr/N_Number_Of_Scans'
scans_path = scans_path.format(dataset_group=DATASET_KEYS[dataset_group], granule=granule)
scans.append(self[scans_path])
return scans
[docs] def mask_fill_values(self, data, ds_info):
"""Mask fill values."""
is_floating = np.issubdtype(data.dtype, np.floating)
if is_floating:
# If the data is a float then we mask everything <= -999.0
fill_max = np.float32(ds_info.pop("fill_max_float", -999.0))
return data.where(data > fill_max, np.float32(np.nan))
else:
# If the data is an integer then we mask everything >= fill_min_int
fill_min = int(ds_info.pop("fill_min_int", 65528))
return data.where(data < fill_min, np.float32(np.nan))
[docs] def get_dataset(self, dataset_id, ds_info):
"""Get the dataset corresponding to *dataset_id*.
The size of the return DataArray will be dependent on the number of
scans actually sensed, and not necessarily the regular 768 scanlines
that the file contains for each granule. To that end, the number of
scans for each granule is read from:
``Data_Products/...Gran_x/N_Number_Of_Scans``.
"""
dataset_group = [ds_group for ds_group in ds_info['dataset_groups'] if ds_group in self.datasets]
if not dataset_group:
return
dataset_group = dataset_group[0]
ds_info['dataset_group'] = dataset_group
var_path = self._generate_file_key(dataset_id, ds_info)
factor_var_path = ds_info.get("factors_key", var_path + "Factors")
data = self.concatenate_dataset(dataset_group, var_path)
data = self.mask_fill_values(data, ds_info)
file_units = self.get_file_units(dataset_id, ds_info)
output_units = ds_info.get("units", file_units)
factors = self._get_scaling_factors(file_units, output_units, factor_var_path)
if factors is not None:
data = self.scale_swath_data(data, factors, dataset_group)
else:
LOG.debug("No scaling factors found for %s", dataset_id)
i = getattr(data, 'attrs', {})
i.update(ds_info)
i.update({
"units": output_units,
"platform_name": self.platform_name,
"sensor": self.sensor_name,
"start_orbit": self.start_orbit_number,
"end_orbit": self.end_orbit_number,
"rows_per_scan": self._scan_size(dataset_group),
})
i.update(dataset_id.to_dict())
data.attrs.update(i)
return data
[docs] def get_bounding_box(self):
"""Get the bounding box of this file."""
from pyproj import Geod
geod = Geod(ellps='WGS84')
dataset_group = DATASET_KEYS[self.datasets[0]]
idx = 0
lons_ring = None
lats_ring = None
while True:
path = 'Data_Products/{dataset_group}/{dataset_group}_Gran_{idx}/attr/'
prefix = path.format(dataset_group=dataset_group, idx=idx)
try:
lats = self.file_content[prefix + 'G-Ring_Latitude']
lons = self.file_content[prefix + 'G-Ring_Longitude']
if lons_ring is None:
lons_ring = lons
lats_ring = lats
else:
prev_lon = lons_ring[0]
prev_lat = lats_ring[0]
dists = [geod.inv(lon, lat, prev_lon, prev_lat)[2] for lon, lat in zip(lons, lats)]
first_idx = np.argmin(dists)
if first_idx == 2 and len(lons) == 8:
lons_ring = np.hstack((lons[:3], lons_ring[:-2], lons[4:]))
lats_ring = np.hstack((lats[:3], lats_ring[:-2], lats[4:]))
else:
raise NotImplementedError("Don't know how to handle G-Rings of length %d" % len(lons))
except KeyError:
break
idx += 1
return lons_ring, lats_ring
[docs] def available_datasets(self, configured_datasets=None):
"""Generate dataset info and their availablity.
See
:meth:`satpy.readers.file_handlers.BaseFileHandler.available_datasets`
for details.
"""
for is_avail, ds_info in (configured_datasets or []):
if is_avail is not None:
yield is_avail, ds_info
continue
dataset_group = [ds_group for ds_group in ds_info['dataset_groups'] if ds_group in self.datasets]
if dataset_group:
yield True, ds_info
elif is_avail is None:
yield is_avail, ds_info
[docs]def split_desired_other(fhs, prime_geo, second_geo):
"""Split the provided filehandlers *fhs* into desired filehandlers and others."""
desired = []
other = []
for fh in fhs:
if prime_geo in fh.datasets:
desired.append(fh)
elif second_geo in fh.datasets:
other.append(fh)
return desired, other
def _apply_factors(data, factor_set):
return data * factor_set[0, 0] + factor_set[0, 1]