#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2020 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy. If not, see <http://www.gnu.org/licenses/>.
"""Tests for modifiers in modifiers/__init__.py."""
import unittest
from datetime import datetime
from unittest import mock
import dask.array as da
import numpy as np
import pytest
import xarray as xr
from pyresample.geometry import AreaDefinition, StackedAreaDefinition
# from pytest_lazyfixture import lazy_fixture
def _sunz_area_def():
"""Get fake area for testing sunz generation."""
area = AreaDefinition("test", "test", "test",
{"proj": "merc"}, 2, 2,
(-2000, -2000, 2000, 2000))
return area
def _sunz_bigger_area_def():
"""Get area that is twice the size of 'sunz_area_def'."""
bigger_area = AreaDefinition("test", "test", "test",
{"proj": "merc"}, 4, 4,
(-2000, -2000, 2000, 2000))
return bigger_area
def _sunz_stacked_area_def():
"""Get fake stacked area for testing sunz generation."""
area1 = AreaDefinition("test", "test", "test",
{"proj": "merc"}, 2, 1,
(-2000, 0, 2000, 2000))
area2 = AreaDefinition("test", "test", "test",
{"proj": "merc"}, 2, 1,
(-2000, -2000, 2000, 0))
return StackedAreaDefinition(area1, area2)
def _shared_sunz_attrs(area_def):
attrs = {"area": area_def,
"start_time": datetime(2018, 1, 1, 18),
"modifiers": tuple(),
"name": "test_vis"}
return attrs
def _get_ds1(attrs):
ds1 = xr.DataArray(da.ones((2, 2), chunks=2, dtype=np.float64),
attrs=attrs, dims=("y", "x"),
coords={"y": [0, 1], "x": [0, 1]})
return ds1
[docs]
@pytest.fixture(scope="session")
def sunz_ds1():
"""Generate fake dataset for sunz tests."""
attrs = _shared_sunz_attrs(_sunz_area_def())
return _get_ds1(attrs)
[docs]
@pytest.fixture(scope="session")
def sunz_ds1_stacked():
"""Generate fake dataset for sunz tests."""
attrs = _shared_sunz_attrs(_sunz_stacked_area_def())
return _get_ds1(attrs)
[docs]
@pytest.fixture(scope="session")
def sunz_ds2():
"""Generate larger fake dataset for sunz tests."""
attrs = _shared_sunz_attrs(_sunz_bigger_area_def())
ds2 = xr.DataArray(da.ones((4, 4), chunks=2, dtype=np.float64),
attrs=attrs, dims=("y", "x"),
coords={"y": [0, 0.5, 1, 1.5], "x": [0, 0.5, 1, 1.5]})
return ds2
[docs]
@pytest.fixture(scope="session")
def sunz_sza():
"""Generate fake solar zenith angle data array for testing."""
sza = xr.DataArray(
np.rad2deg(np.arccos(da.from_array([[0.0149581333, 0.0146694376], [0.0150812684, 0.0147925727]],
chunks=2))),
attrs={"area": _sunz_area_def()},
dims=("y", "x"),
coords={"y": [0, 1], "x": [0, 1]},
)
return sza
[docs]
class TestSunZenithCorrector:
"""Test case for the zenith corrector."""
[docs]
@pytest.mark.parametrize("as_32bit", [False, True])
def test_basic_default_not_provided(self, sunz_ds1, as_32bit):
"""Test default limits when SZA isn't provided."""
from satpy.modifiers.geometry import SunZenithCorrector
if as_32bit:
sunz_ds1 = sunz_ds1.astype(np.float32)
comp = SunZenithCorrector(name="sza_test", modifiers=tuple())
res = comp((sunz_ds1,), test_attr="test")
np.testing.assert_allclose(res.values, np.array([[22.401667, 22.31777], [22.437503, 22.353533]]),
rtol=1e-6)
assert "y" in res.coords
assert "x" in res.coords
ds1 = sunz_ds1.copy().drop_vars(("y", "x"))
res = comp((ds1,), test_attr="test")
res_np = res.compute()
np.testing.assert_allclose(res_np.values, np.array([[22.401667, 22.31777], [22.437503, 22.353533]]),
rtol=1e-6)
assert res.dtype == res_np.dtype
assert "y" not in res.coords
assert "x" not in res.coords
[docs]
def test_basic_lims_not_provided(self, sunz_ds1):
"""Test custom limits when SZA isn't provided."""
from satpy.modifiers.geometry import SunZenithCorrector
comp = SunZenithCorrector(name="sza_test", modifiers=tuple(), correction_limit=90)
res = comp((sunz_ds1,), test_attr="test")
np.testing.assert_allclose(res.values, np.array([[66.853262, 68.168939], [66.30742, 67.601493]]))
# @pytest.mark.parametrize("data_arr", [lazy_fixture("sunz_ds1"), lazy_fixture("sunz_ds1_stacked")])
# def test_basic_default_provided(self, data_arr, sunz_sza):
# """Test default limits when SZA is provided."""
# from satpy.modifiers.geometry import SunZenithCorrector
# comp = SunZenithCorrector(name="sza_test", modifiers=tuple())
# res = comp((data_arr, sunz_sza), test_attr="test")
# np.testing.assert_allclose(res.values, np.array([[22.401667, 22.31777], [22.437503, 22.353533]]))
#
# @pytest.mark.parametrize("data_arr", [lazy_fixture("sunz_ds1"), lazy_fixture("sunz_ds1_stacked")])
# def test_basic_lims_provided(self, data_arr, sunz_sza):
# """Test custom limits when SZA is provided."""
# from satpy.modifiers.geometry import SunZenithCorrector
# comp = SunZenithCorrector(name="sza_test", modifiers=tuple(), correction_limit=90)
# res = comp((data_arr, sunz_sza), test_attr="test")
# np.testing.assert_allclose(res.values, np.array([[66.853262, 68.168939], [66.30742, 67.601493]]))
[docs]
def test_imcompatible_areas(self, sunz_ds2, sunz_sza):
"""Test sunz correction on incompatible areas."""
from satpy.composites import IncompatibleAreas
from satpy.modifiers.geometry import SunZenithCorrector
comp = SunZenithCorrector(name="sza_test", modifiers=tuple(), correction_limit=90)
with pytest.raises(IncompatibleAreas):
comp((sunz_ds2, sunz_sza), test_attr="test")
[docs]
class TestSunZenithReducer:
"""Test case for the sun zenith reducer."""
[docs]
@classmethod
def setup_class(cls):
"""Initialze SunZenithReducer classes that shall be tested."""
from satpy.modifiers.geometry import SunZenithReducer
cls.default = SunZenithReducer(name="sza_reduction_test_default", modifiers=tuple())
cls.custom = SunZenithReducer(name="sza_reduction_test_custom", modifiers=tuple(),
correction_limit=70, max_sza=95, strength=3.0)
[docs]
def test_default_settings(self, sunz_ds1, sunz_sza):
"""Test default settings with sza data available."""
res = self.default((sunz_ds1, sunz_sza), test_attr="test")
np.testing.assert_allclose(res.values,
np.array([[0.02916261, 0.02839063], [0.02949383, 0.02871911]]),
rtol=1e-5)
[docs]
def test_custom_settings(self, sunz_ds1, sunz_sza):
"""Test custom settings with sza data available."""
res = self.custom((sunz_ds1, sunz_sza), test_attr="test")
np.testing.assert_allclose(res.values,
np.array([[0.01041319, 0.01030033], [0.01046164, 0.01034834]]),
rtol=1e-5)
[docs]
def test_invalid_max_sza(self, sunz_ds1, sunz_sza):
"""Test invalid max_sza with sza data available."""
from satpy.modifiers.geometry import SunZenithReducer
with pytest.raises(ValueError, match="`max_sza` must be defined when using the SunZenithReducer."):
SunZenithReducer(name="sza_reduction_test_invalid", modifiers=tuple(), max_sza=None)
[docs]
class TestNIRReflectance(unittest.TestCase):
"""Test NIR reflectance compositor."""
[docs]
def setUp(self):
"""Set up the test case for the NIRReflectance compositor."""
self.get_lonlats = mock.MagicMock()
self.lons, self.lats = 1, 2
self.get_lonlats.return_value = (self.lons, self.lats)
area = mock.MagicMock(get_lonlats=self.get_lonlats)
self.start_time = 1
self.metadata = {"platform_name": "Meteosat-11",
"sensor": "seviri",
"name": "IR_039",
"area": area,
"start_time": self.start_time}
nir_arr = np.random.random((2, 2))
self.nir = xr.DataArray(da.from_array(nir_arr), dims=["y", "x"])
self.nir.attrs.update(self.metadata)
ir_arr = 100 * np.random.random((2, 2))
self.ir_ = xr.DataArray(da.from_array(ir_arr), dims=["y", "x"])
self.ir_.attrs["area"] = area
self.sunz_arr = 100 * np.random.random((2, 2))
self.sunz = xr.DataArray(da.from_array(self.sunz_arr), dims=["y", "x"])
self.sunz.attrs["standard_name"] = "solar_zenith_angle"
self.sunz.attrs["area"] = area
self.da_sunz = da.from_array(self.sunz_arr)
refl_arr = np.random.random((2, 2))
self.refl = da.from_array(refl_arr)
self.refl_with_co2 = da.from_array(np.random.random((2, 2)))
self.refl_from_tbs = mock.MagicMock()
self.refl_from_tbs.side_effect = self.fake_refl_from_tbs
[docs]
def fake_refl_from_tbs(self, sun_zenith, da_nir, da_tb11, tb_ir_co2=None):
"""Fake refl_from_tbs."""
del sun_zenith, da_nir, da_tb11
if tb_ir_co2 is not None:
return self.refl_with_co2
return self.refl
[docs]
@mock.patch("satpy.modifiers.spectral.sun_zenith_angle")
@mock.patch("satpy.modifiers.NIRReflectance.apply_modifier_info")
@mock.patch("satpy.modifiers.spectral.Calculator")
def test_provide_sunz_no_co2(self, calculator, apply_modifier_info, sza):
"""Test NIR reflectance compositor provided only sunz."""
calculator.return_value = mock.MagicMock(
reflectance_from_tbs=self.refl_from_tbs)
sza.return_value = self.da_sunz
from satpy.modifiers.spectral import NIRReflectance
comp = NIRReflectance(name="test")
info = {"modifiers": None}
res = comp([self.nir, self.ir_], optional_datasets=[self.sunz], **info)
assert self.metadata.items() <= res.attrs.items()
assert res.attrs["units"] == "%"
assert res.attrs["sun_zenith_threshold"] is not None
assert np.allclose(res.data, self.refl * 100).compute()
[docs]
@mock.patch("satpy.modifiers.spectral.sun_zenith_angle")
@mock.patch("satpy.modifiers.NIRReflectance.apply_modifier_info")
@mock.patch("satpy.modifiers.spectral.Calculator")
def test_no_sunz_no_co2(self, calculator, apply_modifier_info, sza):
"""Test NIR reflectance compositor with minimal parameters."""
calculator.return_value = mock.MagicMock(
reflectance_from_tbs=self.refl_from_tbs)
sza.return_value = self.da_sunz
from satpy.modifiers.spectral import NIRReflectance
comp = NIRReflectance(name="test")
info = {"modifiers": None}
res = comp([self.nir, self.ir_], optional_datasets=[], **info)
# due to copying of DataArrays, self.get_lonlats is not the same as the one that was called
# we must used the area from the final result DataArray
res.attrs["area"].get_lonlats.assert_called_with(chunks=((2,), (2,)), dtype=self.nir.dtype)
sza.assert_called_with(self.start_time, self.lons, self.lats)
self.refl_from_tbs.assert_called_with(self.da_sunz, self.nir.data, self.ir_.data, tb_ir_co2=None)
assert np.allclose(res.data, self.refl * 100).compute()
[docs]
@mock.patch("satpy.modifiers.spectral.sun_zenith_angle")
@mock.patch("satpy.modifiers.NIRReflectance.apply_modifier_info")
@mock.patch("satpy.modifiers.spectral.Calculator")
def test_no_sunz_with_co2(self, calculator, apply_modifier_info, sza):
"""Test NIR reflectance compositor provided extra co2 info."""
calculator.return_value = mock.MagicMock(
reflectance_from_tbs=self.refl_from_tbs)
from satpy.modifiers.spectral import NIRReflectance
sza.return_value = self.da_sunz
comp = NIRReflectance(name="test")
info = {"modifiers": None}
co2_arr = np.random.random((2, 2))
co2 = xr.DataArray(da.from_array(co2_arr), dims=["y", "x"])
co2.attrs["wavelength"] = [12.0, 13.0, 14.0]
co2.attrs["units"] = "K"
res = comp([self.nir, self.ir_], optional_datasets=[co2], **info)
self.refl_from_tbs.assert_called_with(self.da_sunz, self.nir.data, self.ir_.data, tb_ir_co2=co2.data)
assert np.allclose(res.data, self.refl_with_co2 * 100).compute()
[docs]
@mock.patch("satpy.modifiers.spectral.sun_zenith_angle")
@mock.patch("satpy.modifiers.NIRReflectance.apply_modifier_info")
@mock.patch("satpy.modifiers.spectral.Calculator")
def test_provide_sunz_and_threshold(self, calculator, apply_modifier_info, sza):
"""Test NIR reflectance compositor provided sunz and a sunz threshold."""
calculator.return_value = mock.MagicMock(
reflectance_from_tbs=self.refl_from_tbs)
from satpy.modifiers.spectral import NIRReflectance
sza.return_value = self.da_sunz
comp = NIRReflectance(name="test", sunz_threshold=84.0)
info = {"modifiers": None}
res = comp([self.nir, self.ir_], optional_datasets=[self.sunz], **info)
assert res.attrs["sun_zenith_threshold"] == 84.0
calculator.assert_called_with("Meteosat-11", "seviri", "IR_039",
sunz_threshold=84.0, masking_limit=NIRReflectance.MASKING_LIMIT)
[docs]
@mock.patch("satpy.modifiers.spectral.sun_zenith_angle")
@mock.patch("satpy.modifiers.NIRReflectance.apply_modifier_info")
@mock.patch("satpy.modifiers.spectral.Calculator")
def test_sunz_threshold_default_value_is_not_none(self, calculator, apply_modifier_info, sza):
"""Check that sun_zenith_threshold is not None."""
from satpy.modifiers.spectral import NIRReflectance
comp = NIRReflectance(name="test")
info = {"modifiers": None}
calculator.return_value = mock.MagicMock(
reflectance_from_tbs=self.refl_from_tbs)
comp([self.nir, self.ir_], optional_datasets=[self.sunz], **info)
assert comp.sun_zenith_threshold is not None
[docs]
@mock.patch("satpy.modifiers.spectral.sun_zenith_angle")
@mock.patch("satpy.modifiers.NIRReflectance.apply_modifier_info")
@mock.patch("satpy.modifiers.spectral.Calculator")
def test_provide_masking_limit(self, calculator, apply_modifier_info, sza):
"""Test NIR reflectance compositor provided sunz and a sunz threshold."""
calculator.return_value = mock.MagicMock(
reflectance_from_tbs=self.refl_from_tbs)
from satpy.modifiers.spectral import NIRReflectance
sza.return_value = self.da_sunz
comp = NIRReflectance(name="test", masking_limit=None)
info = {"modifiers": None}
res = comp([self.nir, self.ir_], optional_datasets=[self.sunz], **info)
assert res.attrs["sun_zenith_masking_limit"] is None
calculator.assert_called_with("Meteosat-11", "seviri", "IR_039",
sunz_threshold=NIRReflectance.TERMINATOR_LIMIT, masking_limit=None)
[docs]
@mock.patch("satpy.modifiers.spectral.sun_zenith_angle")
@mock.patch("satpy.modifiers.NIRReflectance.apply_modifier_info")
@mock.patch("satpy.modifiers.spectral.Calculator")
def test_masking_limit_default_value_is_not_none(self, calculator, apply_modifier_info, sza):
"""Check that sun_zenith_threshold is not None."""
from satpy.modifiers.spectral import NIRReflectance
comp = NIRReflectance(name="test")
info = {"modifiers": None}
calculator.return_value = mock.MagicMock(
reflectance_from_tbs=self.refl_from_tbs)
comp([self.nir, self.ir_], optional_datasets=[self.sunz], **info)
assert comp.masking_limit is not None
[docs]
class TestNIREmissivePartFromReflectance(unittest.TestCase):
"""Test the NIR Emissive part from reflectance compositor."""
[docs]
@mock.patch("satpy.modifiers.spectral.sun_zenith_angle")
@mock.patch("satpy.modifiers.NIRReflectance.apply_modifier_info")
@mock.patch("satpy.modifiers.spectral.Calculator")
def test_compositor(self, calculator, apply_modifier_info, sza):
"""Test the NIR emissive part from reflectance compositor."""
from satpy.modifiers.spectral import NIRReflectance
refl_arr = np.random.random((2, 2))
refl = da.from_array(refl_arr)
refl_from_tbs = mock.MagicMock()
refl_from_tbs.return_value = refl
calculator.return_value = mock.MagicMock(reflectance_from_tbs=refl_from_tbs)
emissive_arr = np.random.random((2, 2))
emissive = da.from_array(emissive_arr)
emissive_part = mock.MagicMock()
emissive_part.return_value = emissive
calculator.return_value = mock.MagicMock(emissive_part_3x=emissive_part)
from satpy.modifiers.spectral import NIREmissivePartFromReflectance
comp = NIREmissivePartFromReflectance(name="test", sunz_threshold=86.0)
info = {"modifiers": None}
platform = "NOAA-20"
sensor = "viirs"
chan_name = "M12"
get_lonlats = mock.MagicMock()
lons, lats = 1, 2
get_lonlats.return_value = (lons, lats)
area = mock.MagicMock(get_lonlats=get_lonlats)
nir_arr = np.random.random((2, 2))
nir = xr.DataArray(da.from_array(nir_arr), dims=["y", "x"])
nir.attrs["platform_name"] = platform
nir.attrs["sensor"] = sensor
nir.attrs["name"] = chan_name
nir.attrs["area"] = area
ir_arr = np.random.random((2, 2))
ir_ = xr.DataArray(da.from_array(ir_arr), dims=["y", "x"])
ir_.attrs["area"] = area
sunz_arr = 100 * np.random.random((2, 2))
sunz = xr.DataArray(da.from_array(sunz_arr), dims=["y", "x"])
sunz.attrs["standard_name"] = "solar_zenith_angle"
sunz.attrs["area"] = area
sunz2 = da.from_array(sunz_arr)
sza.return_value = sunz2
res = comp([nir, ir_], optional_datasets=[sunz], **info)
assert res.attrs["sun_zenith_threshold"] == 86.0
assert res.attrs["units"] == "K"
assert res.attrs["platform_name"] == platform
assert res.attrs["sensor"] == sensor
assert res.attrs["name"] == chan_name
calculator.assert_called_with("NOAA-20", "viirs", "M12", sunz_threshold=86.0,
masking_limit=NIRReflectance.MASKING_LIMIT)
[docs]
class TestPSPRayleighReflectance:
"""Test the pyspectral-based Rayleigh correction modifier."""
def _make_data_area(self):
"""Create test area definition and data."""
rows = 3
cols = 5
area = AreaDefinition(
"some_area_name", "On-the-fly area", "geosabii",
{"a": "6378137.0", "b": "6356752.31414", "h": "35786023.0", "lon_0": "-89.5", "proj": "geos", "sweep": "x",
"units": "m"},
cols, rows,
(-5434894.954752679, -5434894.964451744, 5434894.964451744, 5434894.954752679))
data = np.zeros((rows, cols)) + 25
data[1, :] += 25
data[2, :] += 50
data = da.from_array(data, chunks=2)
return area, data
def _create_test_data(self, name, wavelength, resolution):
area, dnb = self._make_data_area()
input_band = xr.DataArray(dnb,
dims=("y", "x"),
attrs={
"platform_name": "Himawari-8",
"calibration": "reflectance", "units": "%", "wavelength": wavelength,
"name": name, "resolution": resolution, "sensor": "ahi",
"start_time": "2017-09-20 17:30:40.800000",
"end_time": "2017-09-20 17:41:17.500000",
"area": area, "ancillary_variables": [],
"orbital_parameters": {
"satellite_nominal_longitude": -89.5,
"satellite_nominal_latitude": 0.0,
"satellite_nominal_altitude": 35786023.4375,
},
})
red_band = xr.DataArray(dnb,
dims=("y", "x"),
attrs={
"platform_name": "Himawari-8",
"calibration": "reflectance", "units": "%", "wavelength": (0.62, 0.64, 0.66),
"name": "B03", "resolution": 500, "sensor": "ahi",
"start_time": "2017-09-20 17:30:40.800000",
"end_time": "2017-09-20 17:41:17.500000",
"area": area, "ancillary_variables": [],
"orbital_parameters": {
"satellite_nominal_longitude": -89.5,
"satellite_nominal_latitude": 0.0,
"satellite_nominal_altitude": 35786023.4375,
},
})
fake_angle_data = da.ones_like(dnb, dtype=np.float32) * 90.0
angle1 = xr.DataArray(fake_angle_data,
dims=("y", "x"),
attrs={
"platform_name": "Himawari-8",
"calibration": "reflectance", "units": "%", "wavelength": wavelength,
"name": "satellite_azimuth_angle", "resolution": resolution, "sensor": "ahi",
"start_time": "2017-09-20 17:30:40.800000",
"end_time": "2017-09-20 17:41:17.500000",
"area": area, "ancillary_variables": [],
})
return input_band, red_band, angle1, angle1, angle1, angle1
[docs]
@pytest.mark.parametrize(
("name", "wavelength", "resolution", "aerosol_type", "reduce_lim_low", "reduce_lim_high", "reduce_strength",
"exp_mean", "exp_unique"),
[
("B01", (0.45, 0.47, 0.49), 1000, "rayleigh_only", 70, 95, 1, 41.540239,
np.array([9.22630464, 10.67844368, 13.58057226, 37.92186549, 40.13822472, 44.66259518,
44.92748445, 45.03917091, 69.5821722, 70.11226943, 71.07352559])),
("B02", (0.49, 0.51, 0.53), 1000, "rayleigh_only", 70, 95, 1, 43.663805,
np.array([13.15770104, 14.26526104, 16.49084485, 40.88633902, 42.60682921, 46.04288,
46.2356062, 46.28276282, 70.92799823, 71.33561614, 72.07001693])),
("B03", (0.62, 0.64, 0.66), 500, "rayleigh_only", 70, 95, 1, 46.916187,
np.array([19.22922328, 19.76884762, 20.91027446, 45.51075967, 46.39925968, 48.10221156,
48.15715058, 48.18698356, 73.01115816, 73.21552816, 73.58666477])),
("B01", (0.45, 0.47, 0.49), 1000, "rayleigh_only", -95, -70, -1, 41.540239,
np.array([9.22630464, 10.67844368, 13.58057226, 37.92186549, 40.13822472, 44.66259518,
44.92748445, 45.03917091, 69.5821722, 70.11226943, 71.07352559])),
]
)
def test_rayleigh_corrector(self, name, wavelength, resolution, aerosol_type, reduce_lim_low, reduce_lim_high,
reduce_strength, exp_mean, exp_unique):
"""Test PSPRayleighReflectance with fake data."""
from satpy.modifiers.atmosphere import PSPRayleighReflectance
ray_cor = PSPRayleighReflectance(name=name, atmosphere="us-standard", aerosol_types=aerosol_type,
reduce_lim_low=reduce_lim_low, reduce_lim_high=reduce_lim_high,
reduce_strength=reduce_strength)
assert ray_cor.attrs["name"] == name
assert ray_cor.attrs["atmosphere"] == "us-standard"
assert ray_cor.attrs["aerosol_types"] == aerosol_type
assert ray_cor.attrs["reduce_lim_low"] == reduce_lim_low
assert ray_cor.attrs["reduce_lim_high"] == reduce_lim_high
assert ray_cor.attrs["reduce_strength"] == reduce_strength
input_band, red_band, *_ = self._create_test_data(name, wavelength, resolution)
res = ray_cor([input_band, red_band])
assert isinstance(res, xr.DataArray)
assert isinstance(res.data, da.Array)
data = res.values
unique = np.unique(data[~np.isnan(data)])
np.testing.assert_allclose(np.nanmean(data), exp_mean, rtol=1e-5)
assert data.shape == (3, 5)
np.testing.assert_allclose(unique, exp_unique, rtol=1e-5)
[docs]
@pytest.mark.parametrize("as_optionals", [False, True])
def test_rayleigh_with_angles(self, as_optionals):
"""Test PSPRayleighReflectance with angles provided."""
from satpy.modifiers.atmosphere import PSPRayleighReflectance
aerosol_type = "rayleigh_only"
ray_cor = PSPRayleighReflectance(name="B01", atmosphere="us-standard", aerosol_types=aerosol_type)
prereqs, opt_prereqs = self._get_angles_prereqs_and_opts(as_optionals)
with mock.patch("satpy.modifiers.atmosphere.get_angles") as get_angles:
res = ray_cor(prereqs, opt_prereqs)
get_angles.assert_not_called()
assert isinstance(res, xr.DataArray)
assert isinstance(res.data, da.Array)
data = res.values
unique = np.unique(data[~np.isnan(data)])
np.testing.assert_allclose(unique, np.array([-75.0, -37.71298492, 31.14350754]), rtol=1e-5)
assert data.shape == (3, 5)
def _get_angles_prereqs_and_opts(self, as_optionals):
wavelength = (0.45, 0.47, 0.49)
resolution = 1000
input_band, red_band, *angles = self._create_test_data("B01", wavelength, resolution)
prereqs = [input_band, red_band]
opt_prereqs = []
if as_optionals:
opt_prereqs = angles
else:
prereqs += angles
return prereqs, opt_prereqs
[docs]
class TestPSPAtmosphericalCorrection(unittest.TestCase):
"""Test the pyspectral-based atmospheric correction modifier."""
[docs]
def test_call(self):
"""Test atmospherical correction."""
from pyresample.geometry import SwathDefinition
from satpy.modifiers import PSPAtmosphericalCorrection
# Patch methods
lons = np.zeros((5, 5))
lons[1, 1] = np.inf
lons = da.from_array(lons, chunks=5)
lats = np.zeros((5, 5))
lats[1, 1] = np.inf
lats = da.from_array(lats, chunks=5)
area = SwathDefinition(lons, lats)
stime = datetime(2020, 1, 1, 12, 0, 0)
orb_params = {
"satellite_actual_altitude": 12345678,
"nadir_longitude": 0.0,
"nadir_latitude": 0.0,
}
band = xr.DataArray(da.zeros((5, 5)),
attrs={"area": area,
"start_time": stime,
"name": "name",
"platform_name": "platform",
"sensor": "sensor",
"orbital_parameters": orb_params},
dims=("y", "x"))
# Perform atmospherical correction
psp = PSPAtmosphericalCorrection(name="dummy")
res = psp(projectables=[band])
res.compute()