#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2017 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy. If not, see <http://www.gnu.org/licenses/>.
"""The HRIT base reader tests package."""
import os
import unittest
from datetime import datetime
from io import BytesIO
from tempfile import NamedTemporaryFile, gettempdir
from unittest import mock
import numpy as np
from satpy.readers import FSFile
from satpy.readers.hrit_base import HRITFileHandler, decompress, get_xritdecompress_cmd, get_xritdecompress_outfile
[docs]class TestHRITDecompress(unittest.TestCase):
"""Test the on-the-fly decompression."""
[docs] def test_xrit_cmd(self):
"""Test running the xrit decompress command."""
old_env = os.environ.get('XRIT_DECOMPRESS_PATH', None)
os.environ['XRIT_DECOMPRESS_PATH'] = '/path/to/my/bin'
self.assertRaises(IOError, get_xritdecompress_cmd)
os.environ['XRIT_DECOMPRESS_PATH'] = gettempdir()
self.assertRaises(IOError, get_xritdecompress_cmd)
with NamedTemporaryFile() as fd:
os.environ['XRIT_DECOMPRESS_PATH'] = fd.name
fname = fd.name
res = get_xritdecompress_cmd()
if old_env is not None:
os.environ['XRIT_DECOMPRESS_PATH'] = old_env
else:
os.environ.pop('XRIT_DECOMPRESS_PATH')
self.assertEqual(fname, res)
[docs] def test_xrit_outfile(self):
"""Test the right decompression filename is used."""
stdout = [b"Decompressed file: bla.__\n"]
outfile = get_xritdecompress_outfile(stdout)
self.assertEqual(outfile, b'bla.__')
[docs] @mock.patch('satpy.readers.hrit_base.Popen')
def test_decompress(self, popen):
"""Test decompression works."""
popen.return_value.returncode = 0
popen.return_value.communicate.return_value = [b"Decompressed file: bla.__\n"]
old_env = os.environ.get('XRIT_DECOMPRESS_PATH', None)
with NamedTemporaryFile() as fd:
os.environ['XRIT_DECOMPRESS_PATH'] = fd.name
res = decompress('bla.C_')
if old_env is not None:
os.environ['XRIT_DECOMPRESS_PATH'] = old_env
else:
os.environ.pop('XRIT_DECOMPRESS_PATH')
self.assertEqual(res, os.path.join('.', 'bla.__'))
[docs]def new_get_hd(instance, hdr_info):
"""Generate some metadata."""
instance.mda = {'spectral_channel_id': 1}
instance.mda.setdefault('number_of_bits_per_pixel', 10)
instance.mda['projection_parameters'] = {'a': 6378169.00,
'b': 6356583.80,
'h': 35785831.00,
'SSP_longitude': 0.0}
instance.mda['orbital_parameters'] = {}
instance.mda['total_header_length'] = 12
[docs]class TestHRITFileHandler(unittest.TestCase):
"""Test the HRITFileHandler."""
[docs] @mock.patch('satpy.readers.hrit_base.np.fromfile')
def setUp(self, fromfile):
"""Set up the hrit file handler for testing."""
m = mock.mock_open()
fromfile.return_value = np.array([(1, 2)], dtype=[('total_header_length', int),
('hdr_id', int)])
with mock.patch('satpy.readers.hrit_base.open', m, create=True) as newopen, \
mock.patch('satpy.readers.utils.open', m, create=True) as utilopen, \
mock.patch.object(HRITFileHandler, '_get_hd', new=new_get_hd):
newopen.return_value.__enter__.return_value.tell.return_value = 1
FAKE_DATA_LARGE_ENOUGH_FOR_TESTING = bytes([0]*8192)
utilopen.return_value.__enter__.return_value.read.return_value = FAKE_DATA_LARGE_ENOUGH_FOR_TESTING
self.reader = HRITFileHandler('filename',
{'platform_shortname': 'MSG3',
'start_time': datetime(2016, 3, 3, 0, 0)},
{'filetype': 'info'},
[mock.MagicMock(), mock.MagicMock(),
mock.MagicMock()])
ncols = 3712
nlines = 464
nbits = 10
self.reader.mda['number_of_bits_per_pixel'] = nbits
self.reader.mda['number_of_lines'] = nlines
self.reader.mda['number_of_columns'] = ncols
self.reader.mda['data_field_length'] = nlines * ncols * nbits
self.reader.mda['cfac'] = 5
self.reader.mda['lfac'] = 5
self.reader.mda['coff'] = 10
self.reader.mda['loff'] = 10
self.reader.mda['projection_parameters'] = {}
self.reader.mda['projection_parameters']['a'] = 6378169.0
self.reader.mda['projection_parameters']['b'] = 6356583.8
self.reader.mda['projection_parameters']['h'] = 35785831.0
self.reader.mda['projection_parameters']['SSP_longitude'] = 44
[docs] def test_get_xy_from_linecol(self):
"""Test get_xy_from_linecol."""
x__, y__ = self.reader.get_xy_from_linecol(0, 0, (10, 10), (5, 5))
self.assertEqual(-131072, x__)
self.assertEqual(-131072, y__)
x__, y__ = self.reader.get_xy_from_linecol(10, 10, (10, 10), (5, 5))
self.assertEqual(0, x__)
self.assertEqual(0, y__)
x__, y__ = self.reader.get_xy_from_linecol(20, 20, (10, 10), (5, 5))
self.assertEqual(131072, x__)
self.assertEqual(131072, y__)
[docs] def test_get_area_extent(self):
"""Test getting the area extent."""
res = self.reader.get_area_extent((20, 20), (10, 10), (5, 5), 33)
exp = (-71717.44995740513, -71717.44995740513,
79266.655216079365, 79266.655216079365)
self.assertTupleEqual(res, exp)
[docs] def test_get_area_def(self):
"""Test getting an area definition."""
from pyresample.utils import proj4_radius_parameters
area = self.reader.get_area_def('VIS06')
proj_dict = area.proj_dict
a, b = proj4_radius_parameters(proj_dict)
self.assertEqual(a, 6378169.0)
self.assertEqual(b, 6356583.8)
self.assertEqual(proj_dict['h'], 35785831.0)
self.assertEqual(proj_dict['lon_0'], 44.0)
self.assertEqual(proj_dict['proj'], 'geos')
self.assertEqual(proj_dict['units'], 'm')
self.assertEqual(area.area_extent,
(-77771774058.38356, -77771774058.38356,
30310525626438.438, 3720765401003.719))
[docs] @mock.patch('satpy.readers.hrit_base.np.memmap')
def test_read_band_filepath(self, memmap):
"""Test reading a single band from a filepath."""
nbits = self.reader.mda['number_of_bits_per_pixel']
memmap.return_value = np.random.randint(0, 256,
size=int((464 * 3712 * nbits) / 8),
dtype=np.uint8)
res = self.reader.read_band('VIS006', None)
self.assertEqual(res.compute().shape, (464, 3712))
[docs] @mock.patch('satpy.readers.FSFile.open')
def test_read_band_FSFile(self, fsfile_open):
"""Test reading a single band from a FSFile."""
nbits = self.reader.mda['number_of_bits_per_pixel']
self.reader.filename = FSFile(self.reader.filename) # convert str to FSFile
fsfile_open.return_value = BytesIO(
np.random.randint(
0, 256,
size=int((464 * 3712 * nbits) / 8) + self.reader.mda['total_header_length'],
dtype=np.uint8
).tobytes()
)
res = self.reader.read_band('VIS006', None)
self.assertEqual(res.compute().shape, (464, 3712))