Source code for satpy.readers.aapp_l1b

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2012-2021 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy.  If not, see <http://www.gnu.org/licenses/>.
"""Reader for aapp level 1b data.

Options for loading:

 - pre_launch_coeffs (False): use pre-launch coefficients if True, operational
   otherwise (if available).

https://nwp-saf.eumetsat.int/site/download/documentation/aapp/NWPSAF-MF-UD-003_Formats_v8.0.pdf
"""
import functools
import logging
from datetime import datetime, timedelta

import dask.array as da
import numpy as np
import xarray as xr
from dask import delayed

from satpy import CHUNK_SIZE
from satpy.readers.file_handlers import BaseFileHandler

LINE_CHUNK = CHUNK_SIZE ** 2 // 2048

logger = logging.getLogger(__name__)

AVHRR_CHANNEL_NAMES = ["1", "2", "3a", "3b", "4", "5"]

AVHRR_ANGLE_NAMES = ['sensor_zenith_angle',
                     'solar_zenith_angle',
                     'sun_sensor_azimuth_difference_angle']

AVHRR_PLATFORM_IDS2NAMES = {4: 'NOAA-15',
                            2: 'NOAA-16',
                            6: 'NOAA-17',
                            7: 'NOAA-18',
                            8: 'NOAA-19',
                            11: 'Metop-B',
                            12: 'Metop-A',
                            13: 'Metop-C',
                            14: 'Metop simulator'}


[docs]def create_xarray(arr): """Create an `xarray.DataArray`.""" res = xr.DataArray(arr, dims=['y', 'x']) return res
[docs]class AAPPL1BaseFileHandler(BaseFileHandler): """A base file handler for the AAPP level-1 formats.""" def __init__(self, filename, filename_info, filetype_info): """Initialize AAPP level-1 file handler object.""" super().__init__(filename, filename_info, filetype_info) self.channels = None self.units = None self.sensor = "unknown" self._data = None self._header = None self.area = None self._channel_names = [] self._angle_names = [] def _set_filedata_layout(self): """Set the file data type/layout.""" self._header_offset = 0 self._scan_type = np.dtype([("siteid", "<i2")]) self._header_type = np.dtype([("siteid", "<i2")]) @property def start_time(self): """Get the time of the first observation.""" return datetime(self._data['scnlinyr'][0], 1, 1) + timedelta( days=int(self._data['scnlindy'][0]) - 1, milliseconds=int(self._data['scnlintime'][0])) @property def end_time(self): """Get the time of the final observation.""" return datetime(self._data['scnlinyr'][-1], 1, 1) + timedelta( days=int(self._data['scnlindy'][-1]) - 1, milliseconds=int(self._data['scnlintime'][-1])) def _update_dataset_attributes(self, dataset, key, info): dataset.attrs.update({'platform_name': self.platform_name, 'sensor': self.sensor}) dataset.attrs.update(key.to_dict()) for meta_key in ('standard_name', 'units'): if meta_key in info: dataset.attrs.setdefault(meta_key, info[meta_key]) def _get_platform_name(self, platform_names_lookup): """Get the platform name from the file header.""" self.platform_name = platform_names_lookup.get(self._header['satid'][0], None) if self.platform_name is None: raise ValueError("Unsupported platform ID: %d" % self.header['satid'])
[docs] def read(self): """Read the data.""" tic = datetime.now() header = np.memmap(self.filename, dtype=self._header_type, mode="r", shape=(1, )) data = np.memmap(self.filename, dtype=self._scan_type, offset=self._header_offset, mode="r") logger.debug("Reading time %s", str(datetime.now() - tic)) self._header = header self._data = data
def _calibrate_active_channel_data(self, key): """Calibrate active channel data only.""" raise NotImplementedError("This should be implemented in the sub class")
[docs] def get_dataset(self, key, info): """Get a dataset from the file.""" if key['name'] in self._channel_names: dataset = self._calibrate_active_channel_data(key) if dataset is None: return None elif key['name'] in ['longitude', 'latitude']: dataset = self.navigate(key['name']) dataset.attrs = info elif key['name'] in self._angle_names: dataset = self.get_angles(key['name']) else: raise ValueError("Not a supported dataset: %s", key['name']) self._update_dataset_attributes(dataset, key, info) return dataset
[docs]class AVHRRAAPPL1BFile(AAPPL1BaseFileHandler): """Reader for AVHRR L1B files created from the AAPP software.""" def __init__(self, filename, filename_info, filetype_info): """Initialize object information by reading the input file.""" super(AVHRRAAPPL1BFile, self).__init__(filename, filename_info, filetype_info) self.channels = {i: None for i in AVHRR_CHANNEL_NAMES} self.units = {i: 'counts' for i in AVHRR_CHANNEL_NAMES} self._is3b = None self._is3a = None self._channel_names = AVHRR_CHANNEL_NAMES self._angle_names = AVHRR_ANGLE_NAMES self._set_filedata_layout() self.read() self.active_channels = self._get_active_channels() self._get_platform_name(AVHRR_PLATFORM_IDS2NAMES) self.sensor = 'avhrr-3' def _set_filedata_layout(self): """Set the file data type/layout.""" self._header_offset = 22016 self._scan_type = _SCANTYPE self._header_type = _HEADERTYPE def _get_active_channels(self): status = self._get_channel_binary_status_from_header() return self._convert_binary_channel_status_to_activation_dict(status) def _calibrate_active_channel_data(self, key): """Calibrate active channel data only.""" if self.active_channels[key['name']]: return self.calibrate(key) return None def _get_channel_binary_status_from_header(self): status = self._header['inststat1'].item() change_line = self._header['statchrecnb'] if change_line > 0: status |= self._header['inststat2'].item() return status @staticmethod def _convert_binary_channel_status_to_activation_dict(status): bits_channels = ((13, '1'), (12, '2'), (11, '3a'), (10, '3b'), (9, '4'), (8, '5')) activated = dict() for bit, channel_name in bits_channels: activated[channel_name] = bool(status >> bit & 1) return activated
[docs] def available_datasets(self, configured_datasets=None): """Get the available datasets.""" for _, mda in configured_datasets: if mda['name'] in self._channel_names: yield self.active_channels[mda['name']], mda else: yield True, mda
[docs] def get_angles(self, angle_id): """Get sun-satellite viewing angles.""" sunz, satz, azidiff = self._get_all_interpolated_angles() name_to_variable = dict(zip(self._angle_names, (satz, sunz, azidiff))) return create_xarray(name_to_variable[angle_id])
@functools.lru_cache(maxsize=10) def _get_all_interpolated_angles(self): sunz40km, satz40km, azidiff40km = self._get_tiepoint_angles_in_degrees() return self._interpolate_arrays(sunz40km, satz40km, azidiff40km) def _get_tiepoint_angles_in_degrees(self): sunz40km = self._data["ang"][:, :, 0] * 1e-2 satz40km = self._data["ang"][:, :, 1] * 1e-2 azidiff40km = self._data["ang"][:, :, 2] * 1e-2 return sunz40km, satz40km, azidiff40km def _interpolate_arrays(self, *input_arrays, geolocation=False): lines = input_arrays[0].shape[0] try: interpolator = self._create_40km_interpolator(lines, *input_arrays, geolocation=geolocation) except ImportError: logger.warning("Could not interpolate, python-geotiepoints missing.") output_arrays = input_arrays else: output_delayed = delayed(interpolator.interpolate, nout=3)() output_arrays = [da.from_delayed(out_array, (lines, 2048), in_array.dtype) for in_array, out_array in zip(input_arrays, output_delayed)] return output_arrays @staticmethod def _create_40km_interpolator(lines, *arrays_40km, geolocation=False): if geolocation: # Slower but accurate at datum line from geotiepoints.geointerpolator import GeoInterpolator as Interpolator else: from geotiepoints.interpolator import Interpolator cols40km = np.arange(24, 2048, 40) cols1km = np.arange(2048) rows40km = np.arange(lines) rows1km = np.arange(lines) along_track_order = 1 cross_track_order = 3 satint = Interpolator( arrays_40km, (rows40km, cols40km), (rows1km, cols1km), along_track_order, cross_track_order) return satint
[docs] def navigate(self, coordinate_id): """Get the longitudes and latitudes of the scene.""" lons, lats = self._get_all_interpolated_coordinates() if coordinate_id == 'longitude': return create_xarray(lons) if coordinate_id == 'latitude': return create_xarray(lats) raise KeyError("Coordinate {} unknown.".format(coordinate_id))
@functools.lru_cache(maxsize=10) def _get_all_interpolated_coordinates(self): lons40km, lats40km = self._get_coordinates_in_degrees() return self._interpolate_arrays(lons40km, lats40km, geolocation=True) def _get_coordinates_in_degrees(self): lons40km = self._data["pos"][:, :, 1] * 1e-4 lats40km = self._data["pos"][:, :, 0] * 1e-4 return lons40km, lats40km
[docs] def calibrate(self, dataset_id, pre_launch_coeffs=False, calib_coeffs=None): """Calibrate the data.""" if calib_coeffs is None: calib_coeffs = {} units = {'reflectance': '%', 'brightness_temperature': 'K', 'counts': '', 'radiance': 'W*m-2*sr-1*cm ?'} if dataset_id['name'] in ("3a", "3b") and self._is3b is None: # Is it 3a or 3b: self._is3a = da.bitwise_and(da.from_array(self._data['scnlinbit'], chunks=LINE_CHUNK), 3) == 0 self._is3b = da.bitwise_and(da.from_array(self._data['scnlinbit'], chunks=LINE_CHUNK), 3) == 1 try: vis_idx = ['1', '2', '3a'].index(dataset_id['name']) ir_idx = None except ValueError: vis_idx = None ir_idx = ['3b', '4', '5'].index(dataset_id['name']) mask = True if vis_idx is not None: coeffs = calib_coeffs.get('ch' + dataset_id['name']) if dataset_id['name'] == '3a': mask = self._is3a[:, None] ds = create_xarray( _vis_calibrate(self._data, vis_idx, dataset_id['calibration'], pre_launch_coeffs, coeffs, mask=mask)) else: if dataset_id['name'] == '3b': mask = self._is3b[:, None] ds = create_xarray( _ir_calibrate(self._header, self._data, ir_idx, dataset_id['calibration'], mask=mask)) ds.attrs['units'] = units[dataset_id['calibration']] ds.attrs.update(dataset_id._asdict()) return ds
# AAPP 1b header _HEADERTYPE = np.dtype([("siteid", "S3"), ("blank", "S1"), ("l1bversnb", "<i2"), ("l1bversyr", "<i2"), ("l1bversdy", "<i2"), ("reclg", "<i2"), ("blksz", "<i2"), ("hdrcnt", "<i2"), ("filler0", "S6"), ("dataname", "S42"), ("prblkid", "S8"), ("satid", "<i2"), ("instid", "<i2"), ("datatype", "<i2"), ("tipsrc", "<i2"), ("startdatajd", "<i4"), ("startdatayr", "<i2"), ("startdatady", "<i2"), ("startdatatime", "<i4"), ("enddatajd", "<i4"), ("enddatayr", "<i2"), ("enddatady", "<i2"), ("enddatatime", "<i4"), ("cpidsyr", "<i2"), ("cpidsdy", "<i2"), ("filler1", "S8"), # data set quality indicators ("inststat1", "<i4"), ("filler2", "S2"), ("statchrecnb", "<i2"), ("inststat2", "<i4"), ("scnlin", "<i2"), ("callocscnlin", "<i2"), ("misscnlin", "<i2"), ("datagaps", "<i2"), ("okdatafr", "<i2"), ("pacsparityerr", "<i2"), ("auxsyncerrsum", "<i2"), ("timeseqerr", "<i2"), ("timeseqerrcode", "<i2"), ("socclockupind", "<i2"), ("locerrind", "<i2"), ("locerrcode", "<i2"), ("pacsstatfield", "<i2"), ("pacsdatasrc", "<i2"), ("filler3", "S4"), ("spare1", "S8"), ("spare2", "S8"), ("filler4", "S10"), # Calibration ("racalind", "<i2"), ("solarcalyr", "<i2"), ("solarcaldy", "<i2"), ("pcalalgind", "<i2"), ("pcalalgopt", "<i2"), ("scalalgind", "<i2"), ("scalalgopt", "<i2"), ("irttcoef", "<i2", (4, 6)), ("filler5", "<i4", (2, )), # radiance to temperature conversion ("albcnv", "<i4", (2, 3)), ("radtempcnv", "<i4", (3, 3)), ("filler6", "<i4", (3, )), # Navigation ("modelid", "S8"), ("nadloctol", "<i2"), ("locbit", "<i2"), ("filler7", "S2"), ("rollerr", "<i2"), ("pitcherr", "<i2"), ("yawerr", "<i2"), ("epoyr", "<i2"), ("epody", "<i2"), ("epotime", "<i4"), ("smaxis", "<i4"), ("eccen", "<i4"), ("incli", "<i4"), ("argper", "<i4"), ("rascnod", "<i4"), ("manom", "<i4"), ("xpos", "<i4"), ("ypos", "<i4"), ("zpos", "<i4"), ("xvel", "<i4"), ("yvel", "<i4"), ("zvel", "<i4"), ("earthsun", "<i4"), ("filler8", "S16"), # analog telemetry conversion ("pchtemp", "<i2", (5, )), ("reserved1", "<i2"), ("pchtempext", "<i2", (5, )), ("reserved2", "<i2"), ("pchpow", "<i2", (5, )), ("reserved3", "<i2"), ("rdtemp", "<i2", (5, )), ("reserved4", "<i2"), ("bbtemp1", "<i2", (5, )), ("reserved5", "<i2"), ("bbtemp2", "<i2", (5, )), ("reserved6", "<i2"), ("bbtemp3", "<i2", (5, )), ("reserved7", "<i2"), ("bbtemp4", "<i2", (5, )), ("reserved8", "<i2"), ("eleccur", "<i2", (5, )), ("reserved9", "<i2"), ("motorcur", "<i2", (5, )), ("reserved10", "<i2"), ("earthpos", "<i2", (5, )), ("reserved11", "<i2"), ("electemp", "<i2", (5, )), ("reserved12", "<i2"), ("chtemp", "<i2", (5, )), ("reserved13", "<i2"), ("bptemp", "<i2", (5, )), ("reserved14", "<i2"), ("mhtemp", "<i2", (5, )), ("reserved15", "<i2"), ("adcontemp", "<i2", (5, )), ("reserved16", "<i2"), ("d4bvolt", "<i2", (5, )), ("reserved17", "<i2"), ("d5bvolt", "<i2", (5, )), ("reserved18", "<i2"), ("bbtempchn3b", "<i2", (5, )), ("reserved19", "<i2"), ("bbtempchn4", "<i2", (5, )), ("reserved20", "<i2"), ("bbtempchn5", "<i2", (5, )), ("reserved21", "<i2"), ("refvolt", "<i2", (5, )), ("reserved22", "<i2"), ]) # AAPP 1b scanline _SCANTYPE = np.dtype([("scnlin", "<i2"), ("scnlinyr", "<i2"), ("scnlindy", "<i2"), ("clockdrift", "<i2"), ("scnlintime", "<i4"), ("scnlinbit", "<i2"), ("filler0", "S10"), ("qualind", "<i4"), ("scnlinqual", "<i4"), ("calqual", "<i2", (3, )), ("cbiterr", "<i2"), ("filler1", "S8"), # Calibration ("calvis", "<i4", (3, 3, 5)), ("calir", "<i4", (3, 2, 3)), ("filler2", "<i4", (3, )), # Navigation ("navstat", "<i4"), ("attangtime", "<i4"), ("rollang", "<i2"), ("pitchang", "<i2"), ("yawang", "<i2"), ("scalti", "<i2"), ("ang", "<i2", (51, 3)), ("filler3", "<i2", (3, )), ("pos", "<i4", (51, 2)), ("filler4", "<i4", (2, )), ("telem", "<i2", (103, )), ("filler5", "<i2"), ("hrpt", "<i2", (2048, 5)), ("filler6", "<i4", (2, )), # tip minor frame header ("tipmfhd", "<i2", (7, 5)), # cpu telemetry ("cputel", "S6", (2, 5)), ("filler7", "<i2", (67, )), ]) def _vis_calibrate(data, chn, calib_type, pre_launch_coeffs=False, calib_coeffs=None, mask=True): """Calibrate visible channel data. *calib_type* in count, reflectance, radiance. """ # Calibration count to albedo, the calibration is performed separately for # two value ranges. if calib_type not in ['counts', 'radiance', 'reflectance']: raise ValueError('Calibration ' + calib_type + ' unknown!') channel = da.from_array(data["hrpt"][:, :, chn], chunks=(LINE_CHUNK, 2048)) mask &= channel != 0 if calib_type == 'counts': return channel channel = channel.astype(np.float64) if calib_type == 'radiance': logger.info("Radiances are not yet supported for " + "the VIS/NIR channels!") if pre_launch_coeffs: coeff_idx = 2 else: # check that coeffs are valid if np.all(data["calvis"][:, chn, 0, 4] == 0): logger.info( "No valid operational coefficients, fall back to pre-launch") coeff_idx = 2 else: coeff_idx = 0 intersection = da.from_array(data["calvis"][:, chn, coeff_idx, 4], chunks=LINE_CHUNK) if calib_coeffs is not None: logger.info("Updating from external calibration coefficients.") slope1 = da.from_array(calib_coeffs[0], chunks=LINE_CHUNK) intercept1 = da.from_array(calib_coeffs[1], chunks=LINE_CHUNK) slope2 = da.from_array(calib_coeffs[2], chunks=LINE_CHUNK) intercept2 = da.from_array(calib_coeffs[3], chunks=LINE_CHUNK) else: slope1 = da.from_array(data["calvis"][:, chn, coeff_idx, 0], chunks=LINE_CHUNK) * 1e-10 intercept1 = da.from_array(data["calvis"][:, chn, coeff_idx, 1], chunks=LINE_CHUNK) * 1e-7 slope2 = da.from_array(data["calvis"][:, chn, coeff_idx, 2], chunks=LINE_CHUNK) * 1e-10 intercept2 = da.from_array(data["calvis"][:, chn, coeff_idx, 3], chunks=LINE_CHUNK) * 1e-7 if chn == 1: # In the level 1b file, the visible coefficients are stored as 4-byte integers. Scaling factors then convert # them to real numbers which are applied to the measured counts. The coefficient is different depending on # whether the counts are less than or greater than the high-gain/low-gain transition value (nominally 500). # The slope for visible channels should always be positive (reflectance increases with count). With the # pre-launch coefficients the channel 2 slope is always positive but with the operational coefs the stored # number in the high-reflectance regime overflows the maximum 2147483647, i.e. it is negative when # interpreted as a signed integer. So you have to modify it. slope2 = da.where(slope2 < 0, slope2 + 0.4294967296, slope2) channel = da.where(channel <= intersection[:, None], channel * slope1[:, None] + intercept1[:, None], channel * slope2[:, None] + intercept2[:, None]) channel = channel.clip(min=0) return da.where(mask, channel, np.nan) def _ir_calibrate(header, data, irchn, calib_type, mask=True): """Calibrate for IR bands. *calib_type* in brightness_temperature, radiance, count """ count = da.from_array(data["hrpt"][:, :, irchn + 2], chunks=(LINE_CHUNK, 2048)) if calib_type == 0: return count # Mask unnaturally low values mask &= count != 0 count = count.astype(np.float64) k1_ = da.from_array(data['calir'][:, irchn, 0, 0], chunks=LINE_CHUNK) / 1.0e9 k2_ = da.from_array(data['calir'][:, irchn, 0, 1], chunks=LINE_CHUNK) / 1.0e6 k3_ = da.from_array(data['calir'][:, irchn, 0, 2], chunks=LINE_CHUNK) / 1.0e6 # Count to radiance conversion: rad = k1_[:, None] * count * count + k2_[:, None] * count + k3_[:, None] # Suspicious lines mask &= ((k1_ != 0) | (k2_ != 0) | (k3_ != 0))[:, None] if calib_type == 2: mask &= rad > 0.0 return da.where(mask, rad, np.nan) # Central wavenumber: cwnum = header['radtempcnv'][0, irchn, 0] if irchn == 0: cwnum = cwnum / 1.0e2 else: cwnum = cwnum / 1.0e3 bandcor_2 = header['radtempcnv'][0, irchn, 1] / 1e5 bandcor_3 = header['radtempcnv'][0, irchn, 2] / 1e6 ir_const_1 = 1.1910659e-5 ir_const_2 = 1.438833 t_planck = (ir_const_2 * cwnum) / \ np.log(1 + ir_const_1 * cwnum * cwnum * cwnum / rad) # Band corrections applied to t_planck to get correct # brightness temperature for channel: if bandcor_2 < 0: # Post AAPP-v4 tb_ = bandcor_2 + bandcor_3 * t_planck else: # AAPP 1 to 4 tb_ = (t_planck - bandcor_2) / bandcor_3 # Mask unnaturally low values return da.where(mask, tb_, np.nan)