Source code for satpy.readers.fci_l1c_fdhsi

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2017-2019 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy.  If not, see <http://www.gnu.org/licenses/>.
"""Interface to MTG-FCI-FDHSI L1C NetCDF files.

This module defines the :class:`FCIFDHSIFileHandler` file handler, to
be used for reading Meteosat Third Generation (MTG) Flexible Combined
Imager (FCI) Full Disk High Spectral Imagery (FDHSI) data.  FCI will fly
on the MTG Imager (MTG-I) series of satellites, scheduled to be launched
in 2021 by the earliest.  For more information about FCI, see `EUMETSAT`_.

.. _EUMETSAT: https://www.eumetsat.int/website/home/Satellites/FutureSatellites/MeteosatThirdGeneration/MTGDesign/index.html#fci  # noqa: E501
"""

from __future__ import (division, absolute_import, print_function,
                        unicode_literals)

import logging
import numpy as np
import dask.array as da
import xarray as xr

from pyresample import geometry
from netCDF4 import default_fillvals

from .netcdf_utils import NetCDF4FileHandler

logger = logging.getLogger(__name__)


[docs]class FCIFDHSIFileHandler(NetCDF4FileHandler): """Class implementing the MTG FCI FDHSI File . This class implements the Meteosat Third Generation (MTG) Flexible Combined Imager (FCI) Full Disk High Spectral Imagery (FDHSI) reader. It is designed to be used through the :class:`~satpy.Scene` class using the :mod:`~satpy.Scene.load` method with the reader ``"fci_l1c_fdhsi"``. """ def __init__(self, filename, filename_info, filetype_info): """Initialize file handler.""" super(FCIFDHSIFileHandler, self).__init__(filename, filename_info, filetype_info, cache_var_size=10000, cache_handle=True) logger.debug('Reading: {}'.format(self.filename)) logger.debug('Start: {}'.format(self.start_time)) logger.debug('End: {}'.format(self.end_time)) self.cache = {} @property def start_time(self): """Get start time.""" return self.filename_info['start_time'] @property def end_time(self): """Get end time.""" return self.filename_info['end_time']
[docs] def get_dataset(self, key, info=None): """Load a dataset.""" logger.debug('Reading {} from {}'.format(key.name, self.filename)) # Get the dataset # Get metadata for given dataset measured, root = self.get_channel_dataset(key.name) radlab = measured + "/effective_radiance" data = self[radlab] attrs = data.attrs.copy() info = info.copy() fv = attrs.pop( "FillValue", default_fillvals.get(data.dtype.str[1:], np.nan)) vr = attrs.pop("valid_range", [-np.inf, np.inf]) if key.calibration == "counts": attrs["_FillValue"] = fv nfv = fv else: nfv = np.nan data = data.where(data >= vr[0], nfv) data = data.where(data <= vr[1], nfv) if key.calibration == "counts": # from package description, this just means not applying add_offset # and scale_factor attrs.pop("scale_factor") attrs.pop("add_offset") data.attrs["units"] = "1" res = data else: data = (data * attrs.pop("scale_factor", 1) + attrs.pop("add_offset", 0)) if key.calibration in ("brightness_temperature", "reflectance"): res = self.calibrate(data, key, measured, root) else: res = data data.attrs["units"] = attrs["units"] # pre-calibration units no longer apply info.pop("units") attrs.pop("units") self.nlines, self.ncols = res.shape res.attrs.update(key.to_dict()) res.attrs.update(info) res.attrs.update(attrs) return res
[docs] def get_channel_dataset(self, channel): """Get channel dataset.""" root_group = 'data/{}'.format(channel) group = 'data/{}/measured'.format(channel) return group, root_group
[docs] def calc_area_extent(self, key): """Calculate area extent for a dataset.""" # Calculate the area extent of the swath based on start line and column # information, total number of segments and channel resolution # numbers from Package Description, Table 8 xyres = {500: 22272, 1000: 11136, 2000: 5568} chkres = xyres[key.resolution] # Get metadata for given dataset measured, root = self.get_channel_dataset(key.name) # Get start/end line and column of loaded swath. self.startline = int(self[measured + "/start_position_row"]) self.endline = int(self[measured + "/end_position_row"]) self.startcol = int(self[measured + "/start_position_column"]) self.endcol = int(self[measured + "/end_position_column"]) self.nlines, self.ncols = self[measured + "/effective_radiance/shape"] logger.debug('Channel {} resolution: {}'.format(key.name, chkres)) logger.debug('Row/Cols: {} / {}'.format(self.nlines, self.ncols)) logger.debug('Start/End row: {} / {}'.format(self.startline, self.endline)) logger.debug('Start/End col: {} / {}'.format(self.startcol, self.endcol)) # total_segments = 70 # Calculate full globe line extent max_y = 5432229.9317116784 min_y = -5429229.5285458621 full_y = max_y + abs(min_y) # Single swath line extent res_y = full_y / chkres # Extent per pixel resolution startl = min_y + res_y * self.startline - 0.5 * (res_y) endl = min_y + res_y * self.endline + 0.5 * (res_y) logger.debug('Start / end extent: {} / {}'.format(startl, endl)) chk_extent = (-5432229.9317116784, endl, 5429229.5285458621, startl) return(chk_extent)
_fallback_area_def = { "reference_altitude": 35786400, # metre }
[docs] def get_area_def(self, key, info=None): """Calculate on-fly area definition for 0 degree geos-projection for a dataset.""" # TODO Projection information are hard coded for 0 degree geos projection # Test dataset doen't provide the values in the file container. # Only fill values are inserted a = float(self["state/processor/earth_equatorial_radius"]) b = float(self["state/processor/earth_polar_radius"]) h = float(self["state/processor/reference_altitude"]) lon_0 = float(self["state/processor/projection_origin_longitude"]) if h == default_fillvals[ self["state/processor/reference_altitude"].dtype.str[1:]]: logger.warning( "Reference altitude in {:s} set to " "fill value, using {:d}".format( self.filename, self._fallback_area_def["reference_altitude"])) h = self._fallback_area_def["reference_altitude"] # Channel dependent swath resoultion area_extent = self.calc_area_extent(key) logger.debug('Calculated area extent: {}' .format(''.join(str(area_extent)))) proj_dict = {'a': float(a), 'b': float(b), 'lon_0': float(lon_0), 'h': float(h), 'proj': 'geos', 'units': 'm'} area = geometry.AreaDefinition( 'some_area_name', "On-the-fly area", 'geosfci', proj_dict, self.ncols, self.nlines, area_extent) self.area = area return area
[docs] def calibrate(self, data, key, measured, root): """Calibrate data.""" if key.calibration == 'brightness_temperature': data = self._ir_calibrate(data, measured, root) elif key.calibration == 'reflectance': data = self._vis_calibrate(data, measured) else: raise RuntimeError( "Received unknown calibration key. Expected " "'brightness_temperature' or 'reflectance', got " + key.calibration) return data
def _ir_calibrate(self, radiance, measured, root): """IR channel calibration.""" coef = self[measured + "/radiance_unit_conversion_coefficient"] wl_c = self[root + "/central_wavelength_actual"] a = self[measured + "/radiance_to_bt_conversion_coefficient_a"] b = self[measured + "/radiance_to_bt_conversion_coefficient_b"] c1 = self[measured + "/radiance_to_bt_conversion_constant_c1"] c2 = self[measured + "/radiance_to_bt_conversion_constant_c2"] for v in (coef, wl_c, a, b, c1, c2): if v == v.attrs.get("FillValue", default_fillvals.get(v.dtype.str[1:])): logger.error( "{:s} set to fill value, cannot produce " "brightness temperatures for {:s}.".format( v.attrs.get("long_name", "at least one necessary coefficient"), root)) return xr.DataArray( da.full(shape=radiance.shape, chunks=radiance.chunks, fill_value=np.nan), dims=radiance.dims, coords=radiance.coords, attrs=radiance.attrs) Lv = radiance * coef vc = 1e6/wl_c # from wl in um to wn in m^-1 nom = c2 * vc denom = a * np.log(1 + (c1 * vc**3) / Lv) res = nom / denom - b / a res.attrs["units"] = "K" return res def _vis_calibrate(self, radiance, measured): """VIS channel calibration.""" # radiance to reflectance taken as in mipp/xrit/MSG.py # again FCI User Guide is not clear on how to do this cesilab = measured + "/channel_effective_solar_irradiance" cesi = self[cesilab] if cesi == cesi.attrs.get( "FillValue", default_fillvals.get(cesi.dtype.str[1:])): logger.error( "channel effective solar irradiance set to fill value, " "cannot produce reflectance for {:s}.".format(measured)) return xr.DataArray( da.full(shape=radiance.shape, chunks=radiance.chunks, fill_value=np.nan), dims=radiance.dims, coords=radiance.coords, attrs=radiance.attrs) sirr = float(cesi) res = radiance / sirr * 100 res.attrs["units"] = "%" return res