From 1b366132184d3de1b24cc5854e7ab4397d7f03c5 Mon Sep 17 00:00:00 2001 From: Grigory Pomadchin Date: Mon, 13 May 2024 23:04:16 -0400 Subject: [PATCH] stac-simple-example project --- build.sbt | 11 +++ .../scala/geotrellis/example/MainAkka.scala | 76 +++++++++++++++++++ .../scala/geotrellis/example/MainCats.scala | 68 +++++++++++++++++ .../scala/geotrellis/example/package.scala | 74 ++++++++++++++++++ 4 files changed, 229 insertions(+) create mode 100644 stac-simple-example/src/main/scala/geotrellis/example/MainAkka.scala create mode 100644 stac-simple-example/src/main/scala/geotrellis/example/MainCats.scala create mode 100644 stac-simple-example/src/main/scala/geotrellis/example/package.scala diff --git a/build.sbt b/build.sbt index a4273e17..959e4d9c 100644 --- a/build.sbt +++ b/build.sbt @@ -402,6 +402,17 @@ lazy val bench = project .settings(noPublishSettings) .enablePlugins(JmhPlugin) +lazy val `stac-simple-example` = project + .settings(commonSettings) + .settings(noPublishSettings) + .settings( + libraryDependencies ++= Seq( + "com.azavea.geotrellis" %% "geotrellis-stac" % "4.6.0", + "com.softwaremill.sttp.client3" %% "async-http-client-backend-cats" % "3.9.6", + "com.softwaremill.sttp.client3" %% "akka-http-backend" % "3.9.6" + ) + ) + def priorTo213(scalaVersion: String): Boolean = CrossVersion.partialVersion(scalaVersion) match { case Some((2, minor)) if minor < 13 => true diff --git a/stac-simple-example/src/main/scala/geotrellis/example/MainAkka.scala b/stac-simple-example/src/main/scala/geotrellis/example/MainAkka.scala new file mode 100644 index 00000000..a7b167b6 --- /dev/null +++ b/stac-simple-example/src/main/scala/geotrellis/example/MainAkka.scala @@ -0,0 +1,76 @@ +package geotrellis.example + +import cats.data.NonEmptyList +import cats.effect.unsafe.IORuntime +import cats.syntax.functor._ +import cats.syntax.nested._ +import cats.syntax.option._ +import com.azavea.stac4s.api.client.{SearchFilters, SttpStacClient} +import geotrellis.proj4.WebMercator +import geotrellis.raster.effects.MosaicRasterSourceIO +import geotrellis.raster.{MosaicRasterSource, StringName} +import geotrellis.stac.raster.{StacAssetRasterSource, StacItemAsset} +import geotrellis.vector.Extent +import sttp.client3.UriContext + +import scala.concurrent.duration.DurationInt +import scala.concurrent.Await + +object MainAkka { + // async context is good for client + import scala.concurrent.ExecutionContext.Implicits.global + + def main(args: Array[String]): Unit = { + val searchFilters = SearchFilters() + val limit = 10000 // max items length if filter result is too wide + val assetName = "b0".r + val withGDAL: Boolean = false + val defaultCRS = WebMercator + val parallelMosaicEnabled = false + val collectionName = StringName("aviris-classic") + val extent = Extent(0, 0, 180, 180) + val stacCatalogURI = uri"http://localhost:9090/" + + import sttp.client3.akkahttp._ + val backend = AkkaHttpBackend() + val client = SttpStacClient(backend, stacCatalogURI) + + val source = for { + items <- client + .search(searchFilters) + .take(limit) + .compileToFutureList + sources = items.flatMap { item => + item.assets + .select(assetName) + .map(itemAsset => StacAssetRasterSource(StacItemAsset(itemAsset.withGDAL(withGDAL), item))) + } + } yield sources match { + case head :: Nil => head.some + case head :: tail => + val commonCrs = if (sources.flatMap(_.asset.crs).distinct.size == 1) head.crs else defaultCRS + val reprojectedSources = NonEmptyList.of(head, tail: _*).map(_.reproject(commonCrs)) + val attributes = reprojectedSources.toList.attributesByName + + val mosaicRasterSource = + if (parallelMosaicEnabled) + MosaicRasterSourceIO.instance(reprojectedSources, commonCrs, collectionName, attributes)(IORuntime.global) + else + MosaicRasterSource.instance(reprojectedSources, commonCrs, collectionName, attributes) + + mosaicRasterSource.some + case _ => None + } + + val result = source.nested + .map(_.read(extent)) + .value + .map(_.flatten) + .map { + case Some(raster) => println(s"raster.extent: ${raster.extent}") + case None => println(s"no rasters found for $extent") + } + + Await.ready(result, 10.seconds) + } +} diff --git a/stac-simple-example/src/main/scala/geotrellis/example/MainCats.scala b/stac-simple-example/src/main/scala/geotrellis/example/MainCats.scala new file mode 100644 index 00000000..2df2b146 --- /dev/null +++ b/stac-simple-example/src/main/scala/geotrellis/example/MainCats.scala @@ -0,0 +1,68 @@ +package geotrellis.example + +import cats.data.NonEmptyList +import cats.effect.{ExitCode, IO, IOApp} +import com.azavea.stac4s.api.client.{SearchFilters, SttpStacClient} +import geotrellis.proj4.WebMercator +import geotrellis.raster.{MosaicRasterSource, StringName} +import geotrellis.raster.effects.MosaicRasterSourceIO +import geotrellis.stac.raster.{StacAssetRasterSource, StacItemAsset} +import geotrellis.vector.Extent +import sttp.client3.asynchttpclient.cats.AsyncHttpClientCatsBackend +import sttp.client3.UriContext +import cats.syntax.option._ +import cats.syntax.nested._ +import cats.syntax.functor._ + +object MainCats extends IOApp { + + def run(args: List[String]): IO[ExitCode] = { + val searchFilters = SearchFilters() + val limit = 10000 // max items length if filter result is too wide + val assetName = "b0".r + val withGDAL: Boolean = false + val defaultCRS = WebMercator + val parallelMosaicEnabled = false + val collectionName = StringName("aviris-classic") + val extent = Extent(0, 0, 180, 180) + val stacCatalogURI = uri"http://localhost:9090/" + + AsyncHttpClientCatsBackend + .resource[IO]() + .use { backend => + val client = SttpStacClient(backend, stacCatalogURI) + for { + items <- client.search(searchFilters).take(limit).compile.toList + sources = items.flatMap { item => + item.assets + .select(assetName) + .map(itemAsset => StacAssetRasterSource(StacItemAsset(itemAsset.withGDAL(withGDAL), item))) + } + } yield sources match { + case head :: Nil => head.some + case head :: tail => + val commonCrs = if (sources.flatMap(_.asset.crs).distinct.size == 1) head.crs else defaultCRS + val reprojectedSources = NonEmptyList.of(head, tail: _*).map(_.reproject(commonCrs)) + val attributes = reprojectedSources.toList.attributesByName + + val mosaicRasterSource = + if (parallelMosaicEnabled) + MosaicRasterSourceIO.instance(reprojectedSources, commonCrs, collectionName, attributes)(runtime) + else + MosaicRasterSource.instance(reprojectedSources, commonCrs, collectionName, attributes) + + mosaicRasterSource.some + case _ => None + } + } + .nested + .map(_.read(extent)) + .value + .map(_.flatten) + .map { + case Some(raster) => println(s"raster.extent: ${raster.extent}") + case None => println(s"no rasters found for $extent") + } + .as(ExitCode.Success) + } +} diff --git a/stac-simple-example/src/main/scala/geotrellis/example/package.scala b/stac-simple-example/src/main/scala/geotrellis/example/package.scala new file mode 100644 index 00000000..aecd624e --- /dev/null +++ b/stac-simple-example/src/main/scala/geotrellis/example/package.scala @@ -0,0 +1,74 @@ +package geotrellis + +import cats.{~>, Foldable, FunctorFilter} +import cats.data.NonEmptyList +import cats.effect.IO +import cats.effect.unsafe.IORuntime +import cats.syntax.foldable._ +import com.azavea.stac4s.StacAsset +import geotrellis.proj4.CRS +import geotrellis.raster.{EmptyName, GridExtent, MosaicRasterSource, RasterSource, SourceName, StringName} +import geotrellis.raster.geotiff.GeoTiffPath + +import scala.concurrent.{ExecutionContext, Future} +import scala.util.matching.Regex + +package object example { + implicit class AssetsMapOps(private val assets: Map[String, StacAsset]) extends AnyVal { + def select(selector: Regex): Option[StacAsset] = assets.find { case (k, _) => selector.findFirstIn(k).nonEmpty }.map(_._2) + } + + implicit class StacAssetOps(private val self: StacAsset) extends AnyVal { + def hrefGDAL(withGDAL: Boolean): String = if (withGDAL) s"gdal+${self.href}" else s"${GeoTiffPath.PREFIX}${self.href}" + def withGDAL(withGDAL: Boolean): StacAsset = self.copy(href = hrefGDAL(withGDAL)) + } + + implicit class RasterSourcesQueryOps[G[_]: Foldable: FunctorFilter, T <: RasterSource](private val self: G[T]) { + def attributesByName: Map[String, String] = + self.foldMap { rs => + rs.name match { + case StringName(sn) => rs.attributes.map { case (k, v) => s"$sn-$k" -> v } + case EmptyName => rs.attributes + } + } + } + + implicit class MosaicRasterSourceOps(private val self: MosaicRasterSource.type) extends AnyVal { + def instance( + sourcesList: NonEmptyList[RasterSource], + targetCRS: CRS, + sourceName: SourceName, + stacAttributes: Map[String, String] + ): MosaicRasterSource = { + val combinedExtent = sourcesList.map(_.extent).toList.reduce(_ combine _) + val minCellSize = sourcesList.map(_.cellSize).toList.maxBy(_.resolution) + val combinedGridExtent = GridExtent[Long](combinedExtent, minCellSize) + + new MosaicRasterSource { + val sources: NonEmptyList[RasterSource] = sourcesList + val crs: CRS = targetCRS + def gridExtent: GridExtent[Long] = combinedGridExtent + val name: SourceName = sourceName + + override val attributes = stacAttributes + } + } + + def instance(sourcesList: NonEmptyList[RasterSource], targetCRS: CRS, stacAttributes: Map[String, String]): MosaicRasterSource = + instance(sourcesList, targetCRS, EmptyName, stacAttributes) + } + + // format: off + /** + * Ugly shims: + * 1. search via Futures backend and produce futures + * 2. map into IO to compile fs2.Stream + * 3. convert it back to Future[List[T]] + */ + // format: on + implicit class FS2StreamFutureOps[A](private val self: fs2.Stream[Future, A]) extends AnyVal { + def toStreamIO: fs2.Stream[IO, A] = self.translate(λ[Future ~> IO](future => IO.fromFuture(IO(future)))) + def compileToFutureList(implicit ec: ExecutionContext): Future[List[A]] = + Future(toStreamIO.compile.toList.unsafeRunSync()(IORuntime.global)) + } +}