Skip to content

Commit

Permalink
stac-simple-example project
Browse files Browse the repository at this point in the history
  • Loading branch information
pomadchin committed May 14, 2024
1 parent dfc62f5 commit 1b36613
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 0 deletions.
11 changes: 11 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,17 @@ lazy val bench = project
.settings(noPublishSettings)
.enablePlugins(JmhPlugin)

lazy val `stac-simple-example` = project
.settings(commonSettings)
.settings(noPublishSettings)
.settings(
libraryDependencies ++= Seq(
"com.azavea.geotrellis" %% "geotrellis-stac" % "4.6.0",
"com.softwaremill.sttp.client3" %% "async-http-client-backend-cats" % "3.9.6",
"com.softwaremill.sttp.client3" %% "akka-http-backend" % "3.9.6"
)
)

def priorTo213(scalaVersion: String): Boolean =
CrossVersion.partialVersion(scalaVersion) match {
case Some((2, minor)) if minor < 13 => true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package geotrellis.example

import cats.data.NonEmptyList
import cats.effect.unsafe.IORuntime
import cats.syntax.functor._
import cats.syntax.nested._
import cats.syntax.option._
import com.azavea.stac4s.api.client.{SearchFilters, SttpStacClient}
import geotrellis.proj4.WebMercator
import geotrellis.raster.effects.MosaicRasterSourceIO
import geotrellis.raster.{MosaicRasterSource, StringName}
import geotrellis.stac.raster.{StacAssetRasterSource, StacItemAsset}
import geotrellis.vector.Extent
import sttp.client3.UriContext

import scala.concurrent.duration.DurationInt
import scala.concurrent.Await

object MainAkka {
// async context is good for client
import scala.concurrent.ExecutionContext.Implicits.global

def main(args: Array[String]): Unit = {
val searchFilters = SearchFilters()
val limit = 10000 // max items length if filter result is too wide
val assetName = "b0".r
val withGDAL: Boolean = false
val defaultCRS = WebMercator
val parallelMosaicEnabled = false
val collectionName = StringName("aviris-classic")
val extent = Extent(0, 0, 180, 180)
val stacCatalogURI = uri"http://localhost:9090/"

import sttp.client3.akkahttp._
val backend = AkkaHttpBackend()
val client = SttpStacClient(backend, stacCatalogURI)

val source = for {
items <- client
.search(searchFilters)
.take(limit)
.compileToFutureList
sources = items.flatMap { item =>
item.assets
.select(assetName)
.map(itemAsset => StacAssetRasterSource(StacItemAsset(itemAsset.withGDAL(withGDAL), item)))
}
} yield sources match {
case head :: Nil => head.some
case head :: tail =>
val commonCrs = if (sources.flatMap(_.asset.crs).distinct.size == 1) head.crs else defaultCRS
val reprojectedSources = NonEmptyList.of(head, tail: _*).map(_.reproject(commonCrs))
val attributes = reprojectedSources.toList.attributesByName

val mosaicRasterSource =
if (parallelMosaicEnabled)
MosaicRasterSourceIO.instance(reprojectedSources, commonCrs, collectionName, attributes)(IORuntime.global)
else
MosaicRasterSource.instance(reprojectedSources, commonCrs, collectionName, attributes)

mosaicRasterSource.some
case _ => None
}

val result = source.nested
.map(_.read(extent))
.value
.map(_.flatten)
.map {
case Some(raster) => println(s"raster.extent: ${raster.extent}")
case None => println(s"no rasters found for $extent")
}

Await.ready(result, 10.seconds)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package geotrellis.example

import cats.data.NonEmptyList
import cats.effect.{ExitCode, IO, IOApp}
import com.azavea.stac4s.api.client.{SearchFilters, SttpStacClient}
import geotrellis.proj4.WebMercator
import geotrellis.raster.{MosaicRasterSource, StringName}
import geotrellis.raster.effects.MosaicRasterSourceIO
import geotrellis.stac.raster.{StacAssetRasterSource, StacItemAsset}
import geotrellis.vector.Extent
import sttp.client3.asynchttpclient.cats.AsyncHttpClientCatsBackend
import sttp.client3.UriContext
import cats.syntax.option._
import cats.syntax.nested._
import cats.syntax.functor._

object MainCats extends IOApp {

def run(args: List[String]): IO[ExitCode] = {
val searchFilters = SearchFilters()
val limit = 10000 // max items length if filter result is too wide
val assetName = "b0".r
val withGDAL: Boolean = false
val defaultCRS = WebMercator
val parallelMosaicEnabled = false
val collectionName = StringName("aviris-classic")
val extent = Extent(0, 0, 180, 180)
val stacCatalogURI = uri"http://localhost:9090/"

AsyncHttpClientCatsBackend
.resource[IO]()
.use { backend =>
val client = SttpStacClient(backend, stacCatalogURI)
for {
items <- client.search(searchFilters).take(limit).compile.toList
sources = items.flatMap { item =>
item.assets
.select(assetName)
.map(itemAsset => StacAssetRasterSource(StacItemAsset(itemAsset.withGDAL(withGDAL), item)))
}
} yield sources match {
case head :: Nil => head.some
case head :: tail =>
val commonCrs = if (sources.flatMap(_.asset.crs).distinct.size == 1) head.crs else defaultCRS
val reprojectedSources = NonEmptyList.of(head, tail: _*).map(_.reproject(commonCrs))
val attributes = reprojectedSources.toList.attributesByName

val mosaicRasterSource =
if (parallelMosaicEnabled)
MosaicRasterSourceIO.instance(reprojectedSources, commonCrs, collectionName, attributes)(runtime)
else
MosaicRasterSource.instance(reprojectedSources, commonCrs, collectionName, attributes)

mosaicRasterSource.some
case _ => None
}
}
.nested
.map(_.read(extent))
.value
.map(_.flatten)
.map {
case Some(raster) => println(s"raster.extent: ${raster.extent}")
case None => println(s"no rasters found for $extent")
}
.as(ExitCode.Success)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package geotrellis

import cats.{~>, Foldable, FunctorFilter}
import cats.data.NonEmptyList
import cats.effect.IO
import cats.effect.unsafe.IORuntime
import cats.syntax.foldable._
import com.azavea.stac4s.StacAsset
import geotrellis.proj4.CRS
import geotrellis.raster.{EmptyName, GridExtent, MosaicRasterSource, RasterSource, SourceName, StringName}
import geotrellis.raster.geotiff.GeoTiffPath

import scala.concurrent.{ExecutionContext, Future}
import scala.util.matching.Regex

package object example {
implicit class AssetsMapOps(private val assets: Map[String, StacAsset]) extends AnyVal {
def select(selector: Regex): Option[StacAsset] = assets.find { case (k, _) => selector.findFirstIn(k).nonEmpty }.map(_._2)
}

implicit class StacAssetOps(private val self: StacAsset) extends AnyVal {
def hrefGDAL(withGDAL: Boolean): String = if (withGDAL) s"gdal+${self.href}" else s"${GeoTiffPath.PREFIX}${self.href}"
def withGDAL(withGDAL: Boolean): StacAsset = self.copy(href = hrefGDAL(withGDAL))
}

implicit class RasterSourcesQueryOps[G[_]: Foldable: FunctorFilter, T <: RasterSource](private val self: G[T]) {
def attributesByName: Map[String, String] =
self.foldMap { rs =>
rs.name match {
case StringName(sn) => rs.attributes.map { case (k, v) => s"$sn-$k" -> v }
case EmptyName => rs.attributes
}
}
}

implicit class MosaicRasterSourceOps(private val self: MosaicRasterSource.type) extends AnyVal {
def instance(
sourcesList: NonEmptyList[RasterSource],
targetCRS: CRS,
sourceName: SourceName,
stacAttributes: Map[String, String]
): MosaicRasterSource = {
val combinedExtent = sourcesList.map(_.extent).toList.reduce(_ combine _)
val minCellSize = sourcesList.map(_.cellSize).toList.maxBy(_.resolution)
val combinedGridExtent = GridExtent[Long](combinedExtent, minCellSize)

new MosaicRasterSource {
val sources: NonEmptyList[RasterSource] = sourcesList
val crs: CRS = targetCRS
def gridExtent: GridExtent[Long] = combinedGridExtent
val name: SourceName = sourceName

override val attributes = stacAttributes
}
}

def instance(sourcesList: NonEmptyList[RasterSource], targetCRS: CRS, stacAttributes: Map[String, String]): MosaicRasterSource =
instance(sourcesList, targetCRS, EmptyName, stacAttributes)
}

// format: off
/**
* Ugly shims:
* 1. search via Futures backend and produce futures
* 2. map into IO to compile fs2.Stream
* 3. convert it back to Future[List[T]]
*/
// format: on
implicit class FS2StreamFutureOps[A](private val self: fs2.Stream[Future, A]) extends AnyVal {
def toStreamIO: fs2.Stream[IO, A] = self.translate(λ[Future ~> IO](future => IO.fromFuture(IO(future))))
def compileToFutureList(implicit ec: ExecutionContext): Future[List[A]] =
Future(toStreamIO.compile.toList.unsafeRunSync()(IORuntime.global))
}
}

0 comments on commit 1b36613

Please sign in to comment.