diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index d3d8486483b524b01fb7acbb96c96d92008783c2..a1c9ef0365ab99f0e82df33b4261a115e0b1d5da 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -11,6 +11,7 @@ variables: PYTHON_IMG: python:3.12-slim PACKAGE_INSTALL_EXTRAS: "[test]" + PIP_RASTERIO_USED: true PIP_PACKAGE_URL: "https://upload.pypi.org/legacy/" TWINE_USERNAME: __token__ TWINE_PASSWORD: ${PYPI_TOKEN} diff --git a/doc/index.md b/doc/index.md index 8f24758b316af7ecd53221f176929a82251e864b..145c17eff5ba38ed67dcc6e93dc4226f9f7c9ced 100644 --- a/doc/index.md +++ b/doc/index.md @@ -14,7 +14,7 @@ </a> </p> -**Theia-dumper** enables to publish Spatio Temporal Assets Catalogs (STAC) on the THEIA-MTP geospatial data center. +**Theia-dumper** enables to share Spatio Temporal Assets Catalogs (STAC) on the THEIA-MTP geospatial data center. If spatioreferenced raster assets are not in COG format, Theia-dumper generates COG before publishing catalogs. ## Installation and requirements diff --git a/pyproject.toml b/pyproject.toml index a23c8e67abc050da05be42cf86bb164becedce46..bca49850570888758e9bff555edd02c11118ce36 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,7 +16,9 @@ dependencies = [ "requests", "click", "rich", + "rasterio", "rio-cogeo", + "rio_stac", ] license = { text = "Apache-2.0" } classifiers = [ @@ -46,7 +48,7 @@ pretty = true exclude = ["doc", "venv", ".venv"] [tool.pylint] -disable = "W1203,R0903,E0401,W0622,C0116,C0115,W0718,W0719" +disable = "W1203,R0903,E0401,W0622,C0116,C0115,W0718,W0719,W0718,R0902,R0913,R0917" [tool.pylint.MASTER] ignore-paths = '^.venv' diff --git a/tests/test_upload.py b/tests/test_upload.py index e417f67eae408ae30a7f2337734976984397278a..46a8aa9c8055403494591a9c34d1f445b69eb051 100755 --- a/tests/test_upload.py +++ b/tests/test_upload.py @@ -4,6 +4,7 @@ import os import shutil import tempfile from datetime import datetime +import urllib.request import pystac import pystac_client @@ -11,7 +12,7 @@ import requests import utils import pytest -from theia_dumper import stac +from theia_dumper import stac, raster utils.set_secret_key_env() @@ -19,7 +20,7 @@ DEFAULT_COL_HREF = "http://hello.fr/collections/collection-for-tests" STAC_EP = "https://stacapi-cdos.apps.okd.crocc.meso.umontpellier.fr" IMAGE_HREF = ( "https://gitlab.orfeo-toolbox.org/orfeotoolbox/" - "otb/-/raw/develop/Data/Input/SP67_FR_subset_1.tif" + "otb/-/raw/develop/Data/Input/Capitole_Rasterization.tif" ) COL_ID = "collection-for-theia-dumper-tests" items_ids = ["item_1", "item_2"] @@ -81,12 +82,22 @@ def remote_col_test(expected_bbox): for i in col.get_items(): assets = i.get_assets().values() for asset in assets: - assert stac.asset_exists(asset.href) + asset_url_signed = stac.asset_exists(asset.href) + assert asset_url_signed + + urllib.request.urlretrieve(asset_url_signed, "/tmp/a.tif") + assert raster.is_cog("/tmp/a.tif") def create_item(item_id: str): """Create a STAC item.""" coordinates = COORDS1 if item_id == "item_1" else COORDS2 + + # Check is COG + assert not raster.is_cog(RASTER_FILE1) + assert not raster.is_cog(RASTER_FILE2) + assert not raster.is_cog(RASTER_FILE3) + item = pystac.Item( id=item_id, geometry={ diff --git a/theia_dumper/__init__.py b/theia_dumper/__init__.py index f56342c67a448f37bb4dc963c59b6040c9101c50..8a4d18dc2c188bdd37cfe96b0b666b2ed10b0c7b 100644 --- a/theia_dumper/__init__.py +++ b/theia_dumper/__init__.py @@ -1,3 +1,3 @@ """Theia dumper package.""" -__version__ = "0.2.3" +__version__ = "0.2.4" diff --git a/theia_dumper/cli.py b/theia_dumper/cli.py index bca8f44550d101fc124ef64411125e197c0f35f6..cab3084bff73001664a831316d51addc66fc7296 100644 --- a/theia_dumper/cli.py +++ b/theia_dumper/cli.py @@ -4,14 +4,15 @@ import os import tempfile import subprocess import click + +from . import diff from .stac import ( - StacUploadTransactionsHandler, StacTransactionsHandler, + StacUploadTransactionsHandler, DEFAULT_S3_EP, DEFAULT_STAC_EP, DEFAULT_S3_STORAGE, ) -from . import diff @click.group() @@ -47,12 +48,20 @@ def theia_dumper() -> None: default=False, help="Overwrite assets if already existing", ) +@click.option( + "--keep_cog_dir", + help="Set a directory to keep converted COG files", + type=str, + nargs=1, + default=None, +) def publish( stac_obj_path: str, stac_endpoint: str, storage_endpoint: str, storage_bucket: str, overwrite: bool, + keep_cog_dir: str | None, ): """Publish a STAC object (collection or item collection).""" StacUploadTransactionsHandler( @@ -61,6 +70,7 @@ def publish( storage_endpoint=storage_endpoint, storage_bucket=storage_bucket, assets_overwrite=overwrite, + keep_cog_dir=keep_cog_dir, ).load_and_publish(stac_obj_path) diff --git a/theia_dumper/raster.py b/theia_dumper/raster.py new file mode 100644 index 0000000000000000000000000000000000000000..0e0e864d125e82525d03cf6e4a92a92f39f53b96 --- /dev/null +++ b/theia_dumper/raster.py @@ -0,0 +1,348 @@ +"""Raster analysis and parsing module.""" + +import os +from datetime import datetime +from rasterio import warp, features, open as ropen, crs, errors # type:ignore +from rio_stac.stac import bbox_to_geom, get_projection_info # type:ignore +import numpy +import rasterio +from rio_cogeo import cog_translate, cog_validate +from pystac.asset import Asset +from pystac.item import Item +from pystac.extensions.projection import AssetProjectionExtension +from pystac.extensions.raster import RasterBand, RasterExtension, Statistics +from pystac.extensions.timestamps import ItemTimestampsExtension +from pystac.errors import ExtensionNotImplemented + +from .logger import logger + +EPSG_4326 = crs.CRS.from_epsg(4326) # pylint: disable=c-extension-no-member + + +class NoSpatialLayerException(Exception): + """No spatial layer exception.""" + + +class ScalesInputFormatException(Exception): + """No spatial layer exception.""" + + +class OffsetsInputFormatException(Exception): + """No spatial layer exception.""" + + +class Info: + """Grabs raster information.""" + + def __init__(self, raster_file): + """Init Info class. + + Args: + raster_file: str + + Returns: + Info class + + """ + self.raster_file = raster_file + with ropen(self.raster_file) as src: + bbox = src.bounds + geom = bbox_to_geom(bbox) + # Reproject the geometry to "epsg:4326" + geom = warp.transform_geom(src.crs, EPSG_4326, geom) + self.bbox = features.bounds(geom) + self.geom = bbox_to_geom(self.bbox) + self.meta = src.meta + self.gsd = src.res[0] + self.proj_ext_info = get_projection_info(src) + self.nodata = src.nodata + self.area_or_point = src.tags().get("AREA_OR_POINT", "").lower() + self.bands = src.indexes + self.stats = None + + def band_info(self, band: int): + """Get band info. + + Args: + band: band index + + Returns: + band metadata and band statistics + + """ + if band <= 0: + raise ValueError('The "band" parameter value starts at 1') + + with ropen(self.raster_file) as src_dst: + md = { + "data_type": src_dst.dtypes[band - 1], + "scale": src_dst.scales[band - 1], + "offset": src_dst.offsets[band - 1], + } + if self.area_or_point: + md["sampling"] = self.area_or_point + + # If the Nodata is not set we don't forward it. + if src_dst.nodata is not None: + if numpy.isnan(src_dst.nodata): + md["nodata"] = "nan" + elif numpy.isposinf(src_dst.nodata): + md["nodata"] = "inf" + elif numpy.isneginf(src_dst.nodata): + md["nodata"] = "-inf" + else: + md["nodata"] = src_dst.nodata + + if src_dst.units[band - 1] is not None: + md["unit"] = src_dst.units[band - 1] + + stats = {} + try: + if not self.stats: + self.stats = src_dst.stats(approx=True) + statistics = self.stats[band - 1] + stats.update( + { + "mean": statistics.mean, + "minimum": statistics.min, + "maximum": statistics.max, + "stddev": statistics.std, + } + ) + except rasterio.errors.StatisticsError as e: + logger.warning("Unable to compute relevant statistics: %s", e) + + return md, stats + + +def raster2cog( + src_raster: str, + dst_raster: str, +): + """Convert a raster to Cloud Optimized Geotiff. + + Args: + src_raster: source raster file path + dst_raster: destination raster file path + """ + profile = { + "driver": "GTiff", + "interleave": "pixel", + "tiled": True, + "blockxsize": 512, + "blockysize": 512, + "compress": "DEFLATE", + "BIGTIFF": "IF_SAFER", + } + + config = { + "GDAL_NUM_THREADS": os.environ.get("GDAL_NUM_THREADS") or "ALL_CPUS", + "GDAL_TIFF_INTERNAL_MASK": True, + "GDAL_TIFF_OVR_BLOCKSIZE": "512", + } + + # Change output COG filename extension if not .tif + if not dst_raster.lower().endswith(".tif"): + pre, _ = os.path.splitext(dst_raster) + dst_raster = f"{pre}.tif" + + cog_translate( + source=src_raster, + dst_path=dst_raster, + dst_kwargs=profile, + config=config, + web_optimized=False, + progress_out=None, + in_memory=False, + allow_intermediate_compression=True, + ) + + +def is_cog(src_raster: str) -> bool: + """Check if the raster is in COG format. + + Args: + src_raster: source raster file path + + Returns: + True if the raster is a COG, else False + """ + is_valid, _, _ = cog_validate(src_raster, quiet=True, strict=True) + return is_valid + + +def is_raster(src_filepath: str) -> bool: + """Check if the file is a raster. + + Args: + src_filepath: input file path + + Returns: + True if the provided file is a raster + """ + try: + Info(src_filepath) + return True + except (errors.RasterioIOError, errors.CRSError): + pass + return False + + +def convert_to_cog( + local_filename: str, + keep_cog_dir: str | None, +) -> str: + """Convert raster to COG in a tmp directory. + + Args: + local_filename: input file path + + Returns: + file path of the converted raster + """ + if keep_cog_dir: + tmpcog = os.path.join(keep_cog_dir, os.path.basename(local_filename)) + if not os.path.exists(keep_cog_dir): + os.makedirs(keep_cog_dir) + else: + tmpcog = os.path.join( + os.path.dirname(local_filename), "TMPCOG", os.path.basename(local_filename) + ) + if not os.path.exists(os.path.dirname(tmpcog)): + os.makedirs(os.path.dirname(tmpcog)) + if os.path.exists(tmpcog) and keep_cog_dir: + return tmpcog + raster2cog(local_filename, tmpcog) + return tmpcog + + +def apply_proj_extension(asset: Asset): + """Apply projection extension. + + Args: + asset: cog raster asset + + Returns: + None + """ + # Projection + proj_ext = AssetProjectionExtension.ext(asset, add_if_missing=True) + try: + info = Info(raster_file=asset.href) + except NoSpatialLayerException: + logger.warning("Failed to retrieve spatial info for %s", asset.href) + proj_ext_args = info.proj_ext_info + logger.debug("Projection extension args: %s", proj_ext_args) + if proj_ext_args is not None: + logger.debug("Applying projection extension") + proj_ext.apply(**proj_ext_args) + + +def get_args_for_raster_ext(asset_path: str): + """This function returns the arguments, as a dict, for the raster extension. + + Args: + asset_path: asset path + + Returns: + Dict of arguments for the `RasterExtension.ext(...).apply()` + function. + """ + try: + raster_info = Info(asset_path) + bands = [] + for band in raster_info.bands: + md, stats = raster_info.band_info(band=band) + + raster_stats = Statistics.create(**stats) + bands.append(RasterBand.create(statistics=raster_stats, **md)) + return {"bands": bands} + except (rasterio.errors.RasterioIOError, rasterio.errors.CRSError) as err: + logger.warning( + "Failed to retrieve raster information for %s (%s). " + "Maybe it is expected because the file is not a raster.", + err, + asset_path, + ) + return None + + +def _merge_raster_bands(b1: RasterBand, b2: RasterBand) -> RasterBand: + """Merge two RasterBand instances. + + Args: + b1: band 1 + b2: band 2 + + Returns: + Merged RasterBand instance + """ + args = {} + for band in [b1, b2]: + for argn in band.to_dict().keys(): + args[argn] = getattr(band, argn) + return RasterBand.create(**args) + + +def apply_raster_extension(asset: Asset): + """Apply raster extension. + + Args: + asset: cog raster asset + + Returns: + None + """ + # Raster + # For the raster extension, we merge RasterBand arguments from + # (1) what is implemented in the `task`, and (2) stacflow built-in + # properties (stats, nodata, etc) + raster_ext_args = get_args_for_raster_ext(asset.href) + logger.debug("Built-in raster extension args: %s", raster_ext_args) + if raster_ext_args is not None: + logger.debug("Applying raster extension") + try: + raster_ext = RasterExtension.ext(asset) + if raster_ext.bands: + builtin_bands = raster_ext_args["bands"] + + # When a raster extension has already been applied to + # the asset, we want to keep the information that is + # already there. + logger.debug( + "Found an existing raster extension for this asset. Merging %s and %s", + [band.to_dict() for band in builtin_bands], + [band.to_dict() for band in raster_ext.bands], + ) + raster_ext_args = { + "bands": [ + _merge_raster_bands(b1, b2) + for b1, b2 in zip( + builtin_bands, # built-in + raster_ext.bands, # in `task` + ) + ] + } + logger.debug("Result: %s", raster_ext_args) + else: + logger.debug("No raster extension properties for asset") + except ExtensionNotImplemented: + logger.debug("No existing raster extension found. Creating.") + raster_ext = RasterExtension.ext(asset, add_if_missing=True) + + raster_ext.apply(**raster_ext_args) + + +def apply_published_extension(item: Item): + """Apply published date extension. + + Args: + asset: cog raster asset + + Returns: + None + """ + # Timestamps + logger.debug("Apply timestamp extension for published date") + ts_ext = ItemTimestampsExtension.ext(item, add_if_missing=True) + ts_ext.apply(published=datetime.now()) diff --git a/theia_dumper/stac.py b/theia_dumper/stac.py index 60c8fa01062a543880e6698ff74d185e64557828..92afc083fe7426007727b5254f006e6b20923efb 100644 --- a/theia_dumper/stac.py +++ b/theia_dumper/stac.py @@ -2,6 +2,7 @@ import os import re +import shutil import json from dataclasses import dataclass from typing import List, cast @@ -11,12 +12,16 @@ import dinamis_sdk import pystac import pystac_client import requests -from pystac import Collection, Item, ItemCollection +from requests.exceptions import HTTPError from requests.adapters import HTTPAdapter, Retry +from pystac import Collection, Item, ItemCollection from rich.pretty import pretty_repr +from . import raster from .logger import logger +# logger.setLevel("DEBUG") + DEFAULT_STAC_EP = "https://stacapi-cdos.apps.okd.crocc.meso.umontpellier.fr" DEFAULT_S3_EP = "https://s3-data.meso.umontpellier.fr" DEFAULT_S3_STORAGE = "sm1-gdc-ext" @@ -30,6 +35,14 @@ class UnconsistentCollectionIDs(Exception): """Inconsistent STAC collection exception.""" +class UnconsistentAssetNaming(Exception): + """Inconsistent Asset Naming exception.""" + + +class LogException(Exception): + """Inconsistent Asset Naming exception.""" + + def _check_naming_is_compliant(s: str, allow_dot=False, allow_slash=False): _s = re.sub(r"[-|_]", r"", s) if allow_slash: @@ -37,7 +50,9 @@ def _check_naming_is_compliant(s: str, allow_dot=False, allow_slash=False): if allow_dot: _s = re.sub(r"\.", r"", _s) if not _s.isalnum(): - raise Exception(f"{_s} does not only contain alphanumeric or - or _ chars") + raise UnconsistentAssetNaming( + f"{_s} does not only contain alphanumeric or - or _ chars" + ) def create_session(): @@ -74,7 +89,7 @@ def asset_exists(asset_url: str) -> None | str: res = sess.get(asset_url_signed, stream=True) if res.status_code == 200: logger.info("Asset %s already exists.", asset_url) - return asset_url + return asset_url_signed return None @@ -87,7 +102,7 @@ def post_or_put(url: str, data: dict): if resp.status_code == 409: # Exists, so update - logger.info(f"Item at {url} already exists, doing a PUT") + logger.info("Item at %s already exists, doing a PUT", url) resp = sess.put( f"{url}/{data['id']}", json=data, @@ -100,10 +115,10 @@ def post_or_put(url: str, data: dict): try: resp.raise_for_status() - except Exception as e: + except HTTPError as e: try: logger.error("Server returned: %s", pretty_repr(resp.json())) - except Exception: + except LogException: logger.error("Server returned: %s", resp.text) raise e @@ -279,6 +294,7 @@ class StacUploadTransactionsHandler(StacTransactionsHandler): storage_endpoint: str storage_bucket: str assets_overwrite: bool + keep_cog_dir: str | None = None def publish_item_and_push_assets(self, item: Item, assets_root_dir: str): """Publish an item and push all its assets. @@ -291,6 +307,8 @@ class StacUploadTransactionsHandler(StacTransactionsHandler): self.storage_endpoint, f"{self.storage_bucket}/{item.collection_id}/" ) + logger.debug("Itemid = %s", item.id) + _check_naming_is_compliant(self.storage_bucket) _check_naming_is_compliant(item.id) for _, asset in item.assets.items(): @@ -309,12 +327,30 @@ class StacUploadTransactionsHandler(StacTransactionsHandler): ) logger.debug("Target file: %s", target_url) + # Add raster metadata to asset + logger.debug("Updating assets metadata for rasters...") + if raster.is_raster(local_filename): + raster.apply_proj_extension(asset) + raster.apply_raster_extension(asset) + # Skip when target file exists and overwrite is not enabled if not self.assets_overwrite: if asset_href := asset_exists(target_url): asset.href = asset_href continue + # Check is_cog, converts if not + cogconv = False + if raster.is_raster(local_filename): + if not raster.is_cog(local_filename): + orig_filename = local_filename + local_filename = raster.convert_to_cog( + orig_filename, + keep_cog_dir=self.keep_cog_dir, + ) + + cogconv = True + # Upload file logger.info("Uploading %s to %s...", local_filename, target_url) try: @@ -327,6 +363,16 @@ class StacUploadTransactionsHandler(StacTransactionsHandler): logger.debug("Updating assets HREFs ...") asset.href = target_url + # Delete temp cog + if cogconv and not self.keep_cog_dir: + logger.debug("Deleting temporary COG ...") + shutil.rmtree(os.path.dirname(local_filename)) + local_filename = orig_filename + + # Add published metadata to item + logger.debug("Updating item metadata ...") + raster.apply_published_extension(item) + # Push item self.publish_item(item=item)