Skip to content

Commit

Permalink
Merge pull request #2903 from Graenni/implement_pyPublicDecompWT_hrit…
Browse files Browse the repository at this point in the history
…_base

Implement py public decomp wt hrit base
  • Loading branch information
mraspaud authored Sep 20, 2024
2 parents d3e6fd4 + 862c558 commit 4aed70b
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 145 deletions.
1 change: 1 addition & 0 deletions AUTHORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ The following people have made contributions to this project:
- Marco Sassi - meteoswiss
- [Stefan Scheiblauer (StefanSnippetCoder)](https://github.com/StefanSnippetCoder)
- [Ronald Scheirer](https://github.com/)
- [Michael Schmutz (Graenni)](https://github.com/Graenni) - Meteotest AG
- [Hauke Schulz (observingClouds)](https://github.com/observingClouds)
- [Jakub Seidl (seidlj)](https://github.com/seidlj)
- [Eysteinn Sigurðsson (eysteinn)](https://github.com/eysteinn)
Expand Down
1 change: 1 addition & 0 deletions continuous_integration/environment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,4 @@ dependencies:
- trollimage>=1.24
- pyspectral
- pyorbital
- pyPublicDecompWT
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ msi_safe = ["rioxarray", "bottleneck", "python-geotiepoints", "defusedxml"]
nc_nwcsaf_msg = ["netCDF4 >= 1.1.8"]
sar_c = ["python-geotiepoints >= 1.1.7", "rasterio", "rioxarray", "defusedxml"]
abi_l1b = ["h5netcdf"]
seviri_l1b_hrit = ["pyorbital >= 1.3.1"]
seviri_l1b_hrit = ["pyorbital >= 1.3.1", "pyPublicDecompWT"]
seviri_l1b_native = ["pyorbital >= 1.3.1"]
seviri_l1b_nc = ["pyorbital >= 1.3.1", "netCDF4 >= 1.1.8"]
seviri_l2_bufr = ["eccodes"]
Expand Down
105 changes: 25 additions & 80 deletions satpy/readers/hrit_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,13 @@
This module is the base module for all HRIT-based formats. Here, you will find
the common building blocks for hrit reading.
One of the features here is the on-the-fly decompression of hrit files. It needs
a path to the xRITDecompress binary to be provided through the environment
variable called XRIT_DECOMPRESS_PATH. When compressed hrit files are then
encountered (files finishing with `.C_`), they are decompressed to the system's
temporary directory for reading.
One of the features here is the on-the-fly decompression of hrit files when
compressed hrit files are encountered (files finishing with `.C_`).
"""

import datetime as dt
import logging
import os
from contextlib import contextmanager, nullcontext
from io import BytesIO
from subprocess import PIPE, Popen # nosec B404

import dask
import dask.array as da
Expand All @@ -43,7 +36,6 @@
from pyresample import geometry

import satpy.readers.utils as utils
from satpy import config
from satpy.readers import FSFile
from satpy.readers.eum_base import time_cds_short
from satpy.readers.file_handlers import BaseFileHandler
Expand Down Expand Up @@ -96,61 +88,16 @@
}


def get_xritdecompress_cmd():
"""Find a valid binary for the xRITDecompress command."""
cmd = os.environ.get("XRIT_DECOMPRESS_PATH", None)
if not cmd:
raise IOError("XRIT_DECOMPRESS_PATH is not defined (complete path to xRITDecompress)")

question = ("Did you set the environment variable XRIT_DECOMPRESS_PATH correctly?")
if not os.path.exists(cmd):
raise IOError(str(cmd) + " does not exist!\n" + question)
elif os.path.isdir(cmd):
raise IOError(str(cmd) + " is a directory!\n" + question)

return cmd


def get_xritdecompress_outfile(stdout):
"""Analyse the output of the xRITDecompress command call and return the file."""
outfile = b""
for line in stdout:
try:
k, v = [x.strip() for x in line.split(b":", 1)]
except ValueError:
break
if k == b"Decompressed file":
outfile = v
break

return outfile


def decompress(infile, outdir="."):
"""Decompress an XRIT data file and return the path to the decompressed file.
def decompress(infile):
"""Decompress an XRIT data file and return the decompressed buffer."""
from pyPublicDecompWT import xRITDecompress

It expect to find Eumetsat's xRITDecompress through the environment variable
XRIT_DECOMPRESS_PATH.
"""
cmd = get_xritdecompress_cmd()
infile = os.path.abspath(infile)
cwd = os.getcwd()
os.chdir(outdir)
# decompress in-memory
with open(infile, mode="rb") as fh:
xrit = xRITDecompress()
xrit.decompress(fh.read())

p = Popen([cmd, infile], stdout=PIPE) # nosec B603
stdout = BytesIO(p.communicate()[0])
status = p.returncode
os.chdir(cwd)

if status != 0:
raise IOError("xrit_decompress '%s', failed, status=%d" % (infile, status))

outfile = get_xritdecompress_outfile(stdout)

if not outfile:
raise IOError("xrit_decompress '%s', failed, no output file is generated" % infile)

return os.path.join(outdir, outfile.decode("utf-8"))
return xrit.data()


def get_header_id(fp):
Expand Down Expand Up @@ -343,18 +290,6 @@ def _read_data(filename, mda):
return HRITSegment(filename, mda).read_data()


@contextmanager
def decompressed(filename):
"""Decompress context manager."""
try:
new_filename = decompress(filename, config["tmp_dir"])
except IOError as err:
logger.error("Unpacking failed: %s", str(err))
raise
yield new_filename
os.remove(new_filename)


class HRITSegment:
"""An HRIT segment with data."""

Expand Down Expand Up @@ -389,11 +324,21 @@ def _read_data_from_disk(self):
# For reading the image data, unzip_context is faster than generic_open
dtype, shape = self._get_input_info()
with utils.unzip_context(self.filename) as fn:
with decompressed(fn) if self.compressed else nullcontext(fn) as filename:
return np.fromfile(filename,
offset=self.offset,
dtype=dtype,
count=np.prod(shape))

if self.compressed:
return np.frombuffer(
decompress(fn),
offset=self.offset,
dtype=dtype,
count=np.prod(shape)
)
else:
return np.fromfile(
fn,
offset=self.offset,
dtype=dtype,
count=np.prod(shape)
)

def _read_file_like(self):
# filename is likely to be a file-like object, already in memory
Expand Down
80 changes: 16 additions & 64 deletions satpy/tests/reader_tests/test_hrit_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,75 +21,20 @@
import datetime as dt
import gzip
import os
import unittest
from tempfile import NamedTemporaryFile, gettempdir
from unittest import mock

import numpy as np
import pytest

from satpy.readers import FSFile
from satpy.readers.hrit_base import HRITFileHandler, decompress, get_xritdecompress_cmd, get_xritdecompress_outfile
from satpy.readers.hrit_base import HRITFileHandler
from satpy.tests.utils import RANDOM_GEN

# NOTE:
# The following fixtures are not defined in this file, but are used and injected by Pytest:
# - tmp_path


class TestHRITDecompress(unittest.TestCase):
"""Test the on-the-fly decompression."""

def test_xrit_cmd(self):
"""Test running the xrit decompress command."""
old_env = os.environ.get("XRIT_DECOMPRESS_PATH", None)

os.environ["XRIT_DECOMPRESS_PATH"] = "/path/to/my/bin"
with pytest.raises(IOError, match=".* does not exist!"):
get_xritdecompress_cmd()

os.environ["XRIT_DECOMPRESS_PATH"] = gettempdir()
with pytest.raises(IOError, match=".* is a directory!.*"):
get_xritdecompress_cmd()

with NamedTemporaryFile() as fd:
os.environ["XRIT_DECOMPRESS_PATH"] = fd.name
fname = fd.name
res = get_xritdecompress_cmd()

if old_env is not None:
os.environ["XRIT_DECOMPRESS_PATH"] = old_env
else:
os.environ.pop("XRIT_DECOMPRESS_PATH")

assert fname == res

def test_xrit_outfile(self):
"""Test the right decompression filename is used."""
stdout = [b"Decompressed file: bla.__\n"]
outfile = get_xritdecompress_outfile(stdout)
assert outfile == b"bla.__"

@mock.patch("satpy.readers.hrit_base.Popen")
def test_decompress(self, popen):
"""Test decompression works."""
popen.return_value.returncode = 0
popen.return_value.communicate.return_value = [b"Decompressed file: bla.__\n"]

old_env = os.environ.get("XRIT_DECOMPRESS_PATH", None)

with NamedTemporaryFile() as fd:
os.environ["XRIT_DECOMPRESS_PATH"] = fd.name
res = decompress("bla.C_")

if old_env is not None:
os.environ["XRIT_DECOMPRESS_PATH"] = old_env
else:
os.environ.pop("XRIT_DECOMPRESS_PATH")

assert res == os.path.join(".", "bla.__")


# From a compressed msg hrit file.
# uncompressed data field length 17223680
# compressed data field length 1578312
Expand Down Expand Up @@ -142,18 +87,25 @@ def stub_hrit_file(tmp_path):

def create_stub_hrit(filename, open_fun=open, meta=mda):
"""Create a stub hrit file."""
stub_hrit_data = create_stub_hrit_data(meta)

with open_fun(filename, mode="wb") as fd:
fd.write(stub_hrit_data)
return filename

def create_stub_hrit_data(meta):
"""Create the data for the stub hrit."""
nbits = meta["number_of_bits_per_pixel"]
lines = meta["number_of_lines"]
cols = meta["number_of_columns"]
total_bits = lines * cols * nbits
arr = RANDOM_GEN.integers(0, 256,
size=int(total_bits / 8),
dtype=np.uint8)
with open_fun(filename, mode="wb") as fd:
fd.write(b" " * meta["total_header_length"])
bytes_data = arr.tobytes()
fd.write(bytes_data)
return filename
header_data = b" " * meta["total_header_length"]
bytes_data = arr.tobytes()
stub_hrit_data = header_data + bytes_data
return stub_hrit_data


@pytest.fixture
Expand Down Expand Up @@ -276,10 +228,10 @@ def test_start_end_time(self):
assert self.reader.end_time == self.reader.observation_end_time


def fake_decompress(infile, outdir="."):
def fake_decompress(filename):
"""Fake decompression."""
filename = os.fspath(infile)[:-3]
return create_stub_hrit(filename)
del filename
return create_stub_hrit_data(mda)


class TestHRITFileHandlerCompressed:
Expand Down

0 comments on commit 4aed70b

Please sign in to comment.