Source code for satpy.readers.hrit_base

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2014-2018 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy.  If not, see <http://www.gnu.org/licenses/>.
"""HRIT/LRIT format reader.

This module is the base module for all HRIT-based formats. Here, you will find
the common building blocks for hrit reading.

One of the features here is the on-the-fly decompression of hrit files. It needs
a path to the xRITDecompress binary to be provided through the environment
variable called XRIT_DECOMPRESS_PATH. When compressed hrit files are then
encountered (files finishing with `.C_`), they are decompressed to the system's
temporary directory for reading.

"""

import logging
import os
from datetime import timedelta
from io import BytesIO
from subprocess import PIPE, Popen  # nosec B404
from tempfile import gettempdir

import dask.array as da
import numpy as np
import xarray as xr
from pyresample import geometry

import satpy.readers.utils as utils
from satpy.readers.eum_base import time_cds_short
from satpy.readers.file_handlers import BaseFileHandler
from satpy.readers.seviri_base import dec10216

logger = logging.getLogger('hrit_base')

common_hdr = np.dtype([('hdr_id', 'u1'),
                       ('record_length', '>u2')])

primary_header = np.dtype([('file_type', 'u1'),
                           ('total_header_length', '>u4'),
                           ('data_field_length', '>u8')])

image_structure = np.dtype([('number_of_bits_per_pixel', 'u1'),
                            ('number_of_columns', '>u2'),
                            ('number_of_lines', '>u2'),
                            ('compression_flag_for_data', 'u1')])

image_navigation = np.dtype([('projection_name', 'S32'),
                             ('cfac', '>i4'),
                             ('lfac', '>i4'),
                             ('coff', '>i4'),
                             ('loff', '>i4')])

image_data_function = np.dtype([('function', '|S1')])

annotation_header = np.dtype([('annotation', '|S1')])

timestamp_record = np.dtype([('cds_p_field', 'u1'),
                             ('timestamp', time_cds_short)])

ancillary_text = np.dtype([('ancillary', '|S1')])

key_header = np.dtype([('key', '|S1')])

base_text_headers = {image_data_function: 'image_data_function',
                     annotation_header: 'annotation_header',
                     ancillary_text: 'ancillary_text',
                     key_header: 'key_header'}

base_hdr_map = {0: primary_header,
                1: image_structure,
                2: image_navigation,
                3: image_data_function,
                4: annotation_header,
                5: timestamp_record,
                6: ancillary_text,
                7: key_header,
                }


[docs]def get_xritdecompress_cmd(): """Find a valid binary for the xRITDecompress command.""" cmd = os.environ.get('XRIT_DECOMPRESS_PATH', None) if not cmd: raise IOError("XRIT_DECOMPRESS_PATH is not defined (complete path to xRITDecompress)") question = ("Did you set the environment variable XRIT_DECOMPRESS_PATH correctly?") if not os.path.exists(cmd): raise IOError(str(cmd) + " does not exist!\n" + question) elif os.path.isdir(cmd): raise IOError(str(cmd) + " is a directory!\n" + question) return cmd
[docs]def get_xritdecompress_outfile(stdout): """Analyse the output of the xRITDecompress command call and return the file.""" outfile = b'' for line in stdout: try: k, v = [x.strip() for x in line.split(b':', 1)] except ValueError: break if k == b'Decompressed file': outfile = v break return outfile
[docs]def decompress(infile, outdir='.'): """Decompress an XRIT data file and return the path to the decompressed file. It expect to find Eumetsat's xRITDecompress through the environment variable XRIT_DECOMPRESS_PATH. """ cmd = get_xritdecompress_cmd() infile = os.path.abspath(infile) cwd = os.getcwd() os.chdir(outdir) p = Popen([cmd, infile], stdout=PIPE) # nosec B603 stdout = BytesIO(p.communicate()[0]) status = p.returncode os.chdir(cwd) if status != 0: raise IOError("xrit_decompress '%s', failed, status=%d" % (infile, status)) outfile = get_xritdecompress_outfile(stdout) if not outfile: raise IOError("xrit_decompress '%s', failed, no output file is generated" % infile) return os.path.join(outdir, outfile.decode('utf-8'))
[docs]def get_header_id(fp): """Return the HRIT header common data.""" data = fp.read(common_hdr.itemsize) return np.frombuffer(data, dtype=common_hdr, count=1)[0]
[docs]def get_header_content(fp, header_dtype, count=1): """Return the content of the HRIT header.""" data = fp.read(header_dtype.itemsize*count) return np.frombuffer(data, dtype=header_dtype, count=count)
[docs]class HRITFileHandler(BaseFileHandler): """HRIT standard format reader.""" def __init__(self, filename, filename_info, filetype_info, hdr_info): """Initialize the reader.""" super(HRITFileHandler, self).__init__(filename, filename_info, filetype_info) self.mda = {} self._get_hd(hdr_info) if self.mda.get('compression_flag_for_data'): logger.debug('Unpacking %s', filename) try: self.filename = decompress(filename, gettempdir()) except IOError as err: logger.warning("Unpacking failed: %s", str(err)) self.mda = {} self._get_hd(hdr_info) self._start_time = filename_info['start_time'] self._end_time = self._start_time + timedelta(minutes=15) def _get_hd(self, hdr_info): """Open the file, read and get the basic file header info and set the mda dictionary.""" hdr_map, variable_length_headers, text_headers = hdr_info with utils.generic_open(self.filename, mode='rb') as fp: total_header_length = 16 while fp.tell() < total_header_length: hdr_id = get_header_id(fp) the_type = hdr_map[hdr_id['hdr_id']] if the_type in variable_length_headers: field_length = int((hdr_id['record_length'] - 3) / the_type.itemsize) current_hdr = get_header_content(fp, the_type, field_length) key = variable_length_headers[the_type] if key in self.mda: if not isinstance(self.mda[key], list): self.mda[key] = [self.mda[key]] self.mda[key].append(current_hdr) else: self.mda[key] = current_hdr elif the_type in text_headers: field_length = int((hdr_id['record_length'] - 3) / the_type.itemsize) char = list(the_type.fields.values())[0][0].char new_type = np.dtype(char + str(field_length)) current_hdr = get_header_content(fp, new_type)[0] self.mda[text_headers[the_type]] = current_hdr else: current_hdr = get_header_content(fp, the_type)[0] self.mda.update( dict(zip(current_hdr.dtype.names, current_hdr))) total_header_length = self.mda['total_header_length'] self.mda.setdefault('number_of_bits_per_pixel', 10) self.mda['projection_parameters'] = {'a': 6378169.00, 'b': 6356583.80, 'h': 35785831.00, # FIXME: find a reasonable SSP 'SSP_longitude': 0.0} self.mda['orbital_parameters'] = {}
[docs] def get_shape(self, dsid, ds_info): """Get shape.""" return int(self.mda['number_of_lines']), int(self.mda['number_of_columns'])
@property def start_time(self): """Get start time.""" return self._start_time @property def end_time(self): """Get end time.""" return self._end_time
[docs] def get_dataset(self, key, info): """Load a dataset.""" # Read bands data = self.read_band(key, info) # Convert to xarray xdata = xr.DataArray(data, dims=['y', 'x']) return xdata
[docs] def get_xy_from_linecol(self, line, col, offsets, factors): """Get the intermediate coordinates from line & col. Intermediate coordinates are actually the instruments scanning angles. """ loff, coff = offsets lfac, cfac = factors x__ = (col - coff) / cfac * 2**16 y__ = (line - loff) / lfac * 2**16 return x__, y__
[docs] def get_area_extent(self, size, offsets, factors, platform_height): """Get the area extent of the file.""" nlines, ncols = size h = platform_height # count starts at 1 cols = 1 - 0.5 lines = 1 - 0.5 ll_x, ll_y = self.get_xy_from_linecol(lines, cols, offsets, factors) cols += ncols lines += nlines ur_x, ur_y = self.get_xy_from_linecol(lines, cols, offsets, factors) return (np.deg2rad(ll_x) * h, np.deg2rad(ll_y) * h, np.deg2rad(ur_x) * h, np.deg2rad(ur_y) * h)
[docs] def get_area_def(self, dsid): """Get the area definition of the band.""" cfac = np.int32(self.mda['cfac']) lfac = np.int32(self.mda['lfac']) coff = np.float32(self.mda['coff']) loff = np.float32(self.mda['loff']) a = self.mda['projection_parameters']['a'] b = self.mda['projection_parameters']['b'] h = self.mda['projection_parameters']['h'] lon_0 = self.mda['projection_parameters']['SSP_longitude'] nlines = int(self.mda['number_of_lines']) ncols = int(self.mda['number_of_columns']) area_extent = self.get_area_extent((nlines, ncols), (loff, coff), (lfac, cfac), h) proj_dict = {'a': float(a), 'b': float(b), 'lon_0': float(lon_0), 'h': float(h), 'proj': 'geos', 'units': 'm'} area = geometry.AreaDefinition( 'some_area_name', "On-the-fly area", 'geosmsg', proj_dict, ncols, nlines, area_extent) self.area = area return area
def _memmap_data(self, shape, dtype): # For reading the image data, unzip_context is faster than generic_open with utils.unzip_context(self.filename) as fn: return np.memmap(fn, mode='r', offset=self.mda['total_header_length'], dtype=dtype, shape=shape) def _read_file_or_file_like(self, shape, dtype): # filename is likely to be a file-like object, already in memory with utils.generic_open(self.filename, mode="rb") as fp: no_elements = np.prod(shape) fp.seek(self.mda['total_header_length']) return np.frombuffer( fp.read(np.dtype(dtype).itemsize * no_elements), dtype=np.dtype(dtype), count=no_elements.item() ).reshape(shape) def _read_or_memmap_data(self, shape, dtype): # check, if 'filename' is a file on disk, # or a file like obj, possibly residing already in memory try: return self._memmap_data(shape, dtype) except (FileNotFoundError, AttributeError): return self._read_file_or_file_like(shape, dtype)
[docs] def read_band(self, key, info): """Read the data.""" shape = int(np.ceil(self.mda['data_field_length'] / 8.)) if self.mda['number_of_bits_per_pixel'] == 16: dtype = '>u2' shape //= 2 elif self.mda['number_of_bits_per_pixel'] in [8, 10]: dtype = np.uint8 shape = (shape, ) data = self._read_or_memmap_data(shape, dtype) data = da.from_array(data, chunks=shape[0]) if self.mda['number_of_bits_per_pixel'] == 10: data = dec10216(data) data = data.reshape((self.mda['number_of_lines'], self.mda['number_of_columns'])) return data