Source code for satpy.readers.hrit_jma

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2010-2017 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy.  If not, see <http://www.gnu.org/licenses/>.
"""HRIT format reader for JMA data.

References:
    JMA HRIT - Mission Specific Implementation
    http://www.jma.go.jp/jma/jma-eng/satellite/introduction/4_2HRIT.pdf

"""

import logging
from datetime import datetime

import numpy as np
import xarray as xr

from satpy.readers.hrit_base import (HRITFileHandler, ancillary_text,
                                     annotation_header, base_hdr_map,
                                     image_data_function)
from satpy.readers._geos_area import get_area_definition, get_area_extent
from satpy.readers.utils import get_geostationary_mask

logger = logging.getLogger('hrit_jma')


# JMA implementation:
key_header = np.dtype([('key_number', 'u4')])

segment_identification = np.dtype([('image_segm_seq_no', '>u1'),
                                   ('total_no_image_segm', '>u1'),
                                   ('line_no_image_segm', '>u2')])

encryption_key_message = np.dtype([('station_number', '>u2')])

image_compensation_information = np.dtype([('compensation', '|S1')])

image_observation_time = np.dtype([('times', '|S1')])

image_quality_information = np.dtype([('quality', '|S1')])


jma_variable_length_headers = {}

jma_text_headers = {image_data_function: 'image_data_function',
                    annotation_header: 'annotation_header',
                    ancillary_text: 'ancillary_text',
                    image_compensation_information: 'image_compensation_information',
                    image_observation_time: 'image_observation_time',
                    image_quality_information: 'image_quality_information'}

jma_hdr_map = base_hdr_map.copy()
jma_hdr_map.update({7: key_header,
                    128: segment_identification,
                    129: encryption_key_message,
                    130: image_compensation_information,
                    131: image_observation_time,
                    132: image_quality_information
                    })


cuc_time = np.dtype([('coarse', 'u1', (4, )),
                     ('fine', 'u1', (3, ))])

time_cds_expanded = np.dtype([('days', '>u2'),
                              ('milliseconds', '>u4'),
                              ('microseconds', '>u2'),
                              ('nanoseconds', '>u2')])

FULL_DISK = 1
NORTH_HEMIS = 2
SOUTH_HEMIS = 3
UNKNOWN_AREA = -1
AREA_NAMES = {FULL_DISK: {'short': 'FLDK', 'long': 'Full Disk'},
              NORTH_HEMIS: {'short': 'NH', 'long': 'Northern Hemisphere'},
              SOUTH_HEMIS: {'short': 'SH', 'long': 'Southern Hemisphere'},
              UNKNOWN_AREA: {'short': 'UNKNOWN', 'long': 'Unknown Area'}}

MTSAT1R = 'MTSAT-1R'
MTSAT2 = 'MTSAT-2'
HIMAWARI8 = 'Himawari-8'
UNKNOWN_PLATFORM = 'Unknown Platform'
PLATFORMS = {
    'GEOS(140.00)': MTSAT1R,
    'GEOS(140.25)': MTSAT1R,
    'GEOS(140.70)': HIMAWARI8,
    'GEOS(145.00)': MTSAT2,
}
SENSORS = {
    MTSAT1R: 'jami',
    MTSAT2: 'mtsat2_imager',
    HIMAWARI8: 'ahi'
}


[docs]class HRITJMAFileHandler(HRITFileHandler): """JMA HRIT format reader.""" def __init__(self, filename, filename_info, filetype_info): """Initialize the reader.""" super(HRITJMAFileHandler, self).__init__(filename, filename_info, filetype_info, (jma_hdr_map, jma_variable_length_headers, jma_text_headers)) self.mda['segment_sequence_number'] = self.mda['image_segm_seq_no'] self.mda['planned_end_segment_number'] = self.mda['total_no_image_segm'] self.mda['planned_start_segment_number'] = 1 items = self.mda['image_data_function'].decode().split('\r') if items[0].startswith('$HALFTONE'): self.calibration_table = [] for item in items[1:]: if item == '': continue key, value = item.split(':=') if key.startswith('_UNIT'): self.mda['unit'] = item.split(':=')[1] elif key.startswith('_NAME'): pass elif key.isdigit(): key = int(key) value = float(value) self.calibration_table.append((key, value)) self.calibration_table = np.array(self.calibration_table) self.projection_name = self.mda['projection_name'].decode().strip() sublon = float(self.projection_name.split('(')[1][:-1]) self.mda['projection_parameters']['SSP_longitude'] = sublon self.platform = self._get_platform() self.is_segmented = self.mda['segment_sequence_number'] > 0 self.area_id = filename_info.get('area', UNKNOWN_AREA) if self.area_id not in AREA_NAMES: self.area_id = UNKNOWN_AREA self.area = self._get_area_def() def _get_platform(self): """Get the platform name. The platform is not specified explicitly in JMA HRIT files. For segmented data it is not even specified in the filename. But it can be derived indirectly from the projection name: GEOS(140.00): MTSAT-1R GEOS(140.25): MTSAT-1R # TODO: Check if there is more... GEOS(140.70): Himawari-8 GEOS(145.00): MTSAT-2 See [MTSAT], section 3.1. Unfortunately Himawari-8 and 9 are not distinguishable using that method at the moment. From [HIMAWARI]: "HRIT/LRIT files have the same file naming convention in the same format in Himawari-8 and Himawari-9, so there is no particular difference." TODO: Find another way to distinguish Himawari-8 and 9. References: [MTSAT] http://www.data.jma.go.jp/mscweb/notice/Himawari7_e.html [HIMAWARI] http://www.data.jma.go.jp/mscweb/en/himawari89/space_segment/sample_hrit.html """ try: return PLATFORMS[self.projection_name] except KeyError: logger.error('Unable to determine platform: Unknown projection ' 'name "{}"'.format(self.projection_name)) return UNKNOWN_PLATFORM def _check_sensor_platform_consistency(self, sensor): """Make sure sensor and platform are consistent. Args: sensor (str) : Sensor name from YAML dataset definition Raises: ValueError if they don't match """ ref_sensor = SENSORS.get(self.platform, None) if ref_sensor and not sensor == ref_sensor: logger.error('Sensor-Platform mismatch: {} is not a payload ' 'of {}. Did you choose the correct reader?' .format(sensor, self.platform)) def _get_line_offset(self): """Get line offset for the current segment. Read line offset from the file and adapt it to the current segment or half disk scan so that y(l) ~ l - loff because this is what get_geostationary_area_extent() expects. """ # Get line offset from the file nlines = int(self.mda['number_of_lines']) loff = np.float32(self.mda['loff']) # Adapt it to the current segment if self.is_segmented: # loff in the file specifies the offset of the full disk image # centre (1375/2750 for VIS/IR) segment_number = self.mda['segment_sequence_number'] - 1 loff -= (self.mda['total_no_image_segm'] - segment_number - 1) * nlines elif self.area_id in (NORTH_HEMIS, SOUTH_HEMIS): # loff in the file specifies the start line of the half disk image # in the full disk image loff = nlines - loff elif self.area_id == UNKNOWN_AREA: logger.error('Cannot compute line offset for unknown area') return loff def _get_area_def(self): """Get the area definition of the band.""" pdict = { 'cfac': np.int32(self.mda['cfac']), 'lfac': np.int32(self.mda['lfac']), 'coff': np.float32(self.mda['coff']), 'loff': self._get_line_offset(), 'ncols': int(self.mda['number_of_columns']), 'nlines': int(self.mda['number_of_lines']), 'scandir': 'N2S', 'a': float(self.mda['projection_parameters']['a']), 'b': float(self.mda['projection_parameters']['b']), 'h': float(self.mda['projection_parameters']['h']), 'ssp_lon': float(self.mda['projection_parameters']['SSP_longitude']), 'a_name': AREA_NAMES[self.area_id]['short'], 'a_desc': AREA_NAMES[self.area_id]['long'], 'p_id': 'geosmsg' } area_extent = get_area_extent(pdict) return get_area_definition(pdict, area_extent)
[docs] def get_area_def(self, dsid): """Get the area definition of the band.""" return self.area
[docs] def get_dataset(self, key, info): """Get the dataset designated by *key*.""" res = super(HRITJMAFileHandler, self).get_dataset(key, info) # Filenames of segmented data is identical for MTSAT-1R, MTSAT-2 # and Himawari-8/9. Make sure we have the correct reader for the data # at hand. self._check_sensor_platform_consistency(info['sensor']) # Calibrate and mask space pixels res = self._mask_space(self.calibrate(res, key.calibration)) # Update attributes res.attrs.update(info) res.attrs['platform_name'] = self.platform res.attrs['satellite_longitude'] = float(self.mda['projection_parameters']['SSP_longitude']) res.attrs['satellite_latitude'] = 0. res.attrs['satellite_altitude'] = float(self.mda['projection_parameters']['h']) res.attrs['orbital_parameters'] = { 'projection_longitude': float(self.mda['projection_parameters']['SSP_longitude']), 'projection_latitude': 0., 'projection_altitude': float(self.mda['projection_parameters']['h'])} return res
def _mask_space(self, data): """Mask space pixels.""" geomask = get_geostationary_mask(area=self.area) return data.where(geomask) @staticmethod def _interp(arr, cal): return np.interp(arr.ravel(), cal[:, 0], cal[:, 1]).reshape(arr.shape)
[docs] def calibrate(self, data, calibration): """Calibrate the data.""" tic = datetime.now() if calibration == 'counts': return data elif calibration == 'radiance': raise NotImplementedError("Can't calibrate to radiance.") else: cal = self.calibration_table res = data.data.map_blocks(self._interp, cal, dtype=cal[:, 0].dtype) res = xr.DataArray(res, dims=data.dims, attrs=data.attrs, coords=data.coords) res = res.where(data < 65535) logger.debug("Calibration time " + str(datetime.now() - tic)) return res