From a7571b03b15a9613936a6f331fccfaa81f2c3ea2 Mon Sep 17 00:00:00 2001 From: Jakub Chrobasik Date: Thu, 30 Mar 2023 08:42:34 +0200 Subject: [PATCH 1/8] fix: stream closing on clone to be raising repo exception --- .../triplesgeneration/renkulog/Commands.scala | 3 ++- .../triplesgeneration/renkulog/GitSpec.scala | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/awaitinggeneration/triplesgeneration/renkulog/Commands.scala b/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/awaitinggeneration/triplesgeneration/renkulog/Commands.scala index dbd61a9ff0..e927a67e65 100644 --- a/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/awaitinggeneration/triplesgeneration/renkulog/Commands.scala +++ b/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/awaitinggeneration/triplesgeneration/renkulog/Commands.scala @@ -214,7 +214,8 @@ private object Commands { "The requested URL returned error: 504", "fatal: error reading section header 'shallow-info'", "Error in the HTTP2 framing layer", - "remote: The project you were looking for could not be found or you don't have permission to view it." + "remote: The project you were looking for could not be found or you don't have permission to view it.", + "was not closed cleanly before end of the underlying stream" ) private lazy val relevantError: PartialFunction[Throwable, F[Either[ProcessingRecoverableError, Unit]]] = { case ShelloutException(result) => diff --git a/triples-generator/src/test/scala/io/renku/triplesgenerator/events/consumers/awaitinggeneration/triplesgeneration/renkulog/GitSpec.scala b/triples-generator/src/test/scala/io/renku/triplesgenerator/events/consumers/awaitinggeneration/triplesgeneration/renkulog/GitSpec.scala index bbaf35f812..6452e44d43 100644 --- a/triples-generator/src/test/scala/io/renku/triplesgenerator/events/consumers/awaitinggeneration/triplesgeneration/renkulog/GitSpec.scala +++ b/triples-generator/src/test/scala/io/renku/triplesgenerator/events/consumers/awaitinggeneration/triplesgeneration/renkulog/GitSpec.scala @@ -105,7 +105,8 @@ class GitSpec extends AnyWordSpec with IOSpec with MockFactory with should.Match "The requested URL returned error: 504", "Error in the HTTP2 framing layer", "remote: The project you were looking for could not be found or you don't have permission to view it.", - "error: RPC failed\nfatal: error reading section header 'shallow-info'" + "error: RPC failed\nfatal: error reading section header 'shallow-info'", + "error: RPC failed; curl 18 HTTP/2 stream 5 was not closed cleanly before end of the underlying stream" ) foreach { error => s"return ProcessingNonRecoverableError.MalformedRepository if command fails with a message containing '$error'" in new TestCase { From cde731fd3e69636a5eeee738de3e4342a7cfb3ce Mon Sep 17 00:00:00 2001 From: Jakub Chrobasik Date: Thu, 30 Mar 2023 11:20:36 +0200 Subject: [PATCH 2/8] chore: kg chart to allow specifying replicas --- .../renku-graph/templates/knowledge-graph-deployment.yaml | 2 +- helm-chart/renku-graph/values.yaml | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/helm-chart/renku-graph/templates/knowledge-graph-deployment.yaml b/helm-chart/renku-graph/templates/knowledge-graph-deployment.yaml index d1b4796105..3045986926 100644 --- a/helm-chart/renku-graph/templates/knowledge-graph-deployment.yaml +++ b/helm-chart/renku-graph/templates/knowledge-graph-deployment.yaml @@ -8,7 +8,7 @@ metadata: release: {{ .Release.Name }} heritage: {{ .Release.Service }} spec: - replicas: 1 + replicas: {{ .Values.knowledgeGraph.replicas }} strategy: type: Recreate selector: diff --git a/helm-chart/renku-graph/values.yaml b/helm-chart/renku-graph/values.yaml index 661482dc6a..37332fb6e5 100644 --- a/helm-chart/renku-graph/values.yaml +++ b/helm-chart/renku-graph/values.yaml @@ -128,6 +128,7 @@ triplesGenerator: renkuPythonDevVersion: knowledgeGraph: + replicas: 1 image: repository: renku/knowledge-graph tag: 'latest' From 9387ab243127dc2cbe1f94fe29a5486a6667efef Mon Sep 17 00:00:00 2001 From: Jakub Chrobasik Date: Thu, 30 Mar 2023 12:10:39 +0200 Subject: [PATCH 3/8] fix: flaky ConcurrentProcessExecutorSpec --- .../events/consumers/ConcurrentProcessExecutorSpec.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/graph-commons/src/test/scala/io/renku/events/consumers/ConcurrentProcessExecutorSpec.scala b/graph-commons/src/test/scala/io/renku/events/consumers/ConcurrentProcessExecutorSpec.scala index 1273c2c3c5..8a3c2710ac 100644 --- a/graph-commons/src/test/scala/io/renku/events/consumers/ConcurrentProcessExecutorSpec.scala +++ b/graph-commons/src/test/scala/io/renku/events/consumers/ConcurrentProcessExecutorSpec.scala @@ -50,9 +50,11 @@ class ConcurrentProcessExecutorSpec val nonSchedulableValue = ints(processesCount.value + 1).generateOne val nonSchedulable = executor.tryExecuting(slowProcess(nonSchedulableValue)) - (schedulable :+ nonSchedulable).parSequence + schedulable.parSequence .unsafeRunSync() - .distinct should contain theSameElementsAs Set(Accepted, Busy) + .distinct + .toSet shouldBe Set(Accepted) + nonSchedulable.unsafeRunSync() shouldBe Busy eventually { executionRegister.get.unsafeRunSync() should contain theSameElementsAs schedulableValues From 827305352bcb5b9e702a34faa579d35da4b69c23 Mon Sep 17 00:00:00 2001 From: Jakub Chrobasik Date: Fri, 31 Mar 2023 14:27:11 +0200 Subject: [PATCH 4/8] refactor: PersonViewedProject functionality moved to its own package --- ...LUserViewedProject.scala => Project.scala} | 7 +-- .../collector/{projects => }/ontology.scala | 2 +- .../Encoder.scala} | 28 ++---------- .../PersonViewedProjectPersister.scala | 13 +++--- .../viewings/collector/persons/model.scala | 30 +++++++++++++ .../viewings/collector/projects/Encoder.scala | 44 +++++++++++++++++++ .../projects/activated/EventPersister.scala | 2 +- .../projects/viewed/EventPersister.scala | 6 ++- .../PersonViewedProjectPersisterSpec.scala | 28 ++++++------ .../projects/viewed/EventPersisterSpec.scala | 7 ++- .../projects/ViewingRemoverSpec.scala | 5 ++- .../ontology/OntologyGenerator.scala | 2 +- 12 files changed, 114 insertions(+), 60 deletions(-) rename entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/{projects/viewed/GLUserViewedProject.scala => Project.scala} (75%) rename entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/{projects => }/ontology.scala (97%) rename entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/{projects/ProjectViewingEncoder.scala => persons/Encoder.scala} (65%) rename entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/{projects/viewed => persons}/PersonViewedProjectPersister.scala (94%) create mode 100644 entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/persons/model.scala create mode 100644 entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/projects/Encoder.scala rename entities-viewings-collector/src/test/scala/io/renku/entities/viewings/collector/{projects/viewed => persons}/PersonViewedProjectPersisterSpec.scala (87%) diff --git a/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/projects/viewed/GLUserViewedProject.scala b/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/Project.scala similarity index 75% rename from entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/projects/viewed/GLUserViewedProject.scala rename to entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/Project.scala index cc5ffebf35..70cd5eb413 100644 --- a/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/projects/viewed/GLUserViewedProject.scala +++ b/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/Project.scala @@ -16,11 +16,8 @@ * limitations under the License. */ -package io.renku.entities.viewings.collector.projects -package viewed +package io.renku.entities.viewings.collector import io.renku.graph.model.projects -import ProjectViewingEncoder.Project -import io.renku.triplesgenerator.api.events.UserId -private final case class GLUserViewedProject(userId: UserId, project: Project, date: projects.DateViewed) +private final case class Project(id: projects.ResourceId, path: projects.Path) diff --git a/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/projects/ontology.scala b/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/ontology.scala similarity index 97% rename from entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/projects/ontology.scala rename to entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/ontology.scala index 8335a0caaa..6477e7de0a 100644 --- a/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/projects/ontology.scala +++ b/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/ontology.scala @@ -16,7 +16,7 @@ * limitations under the License. */ -package io.renku.entities.viewings.collector.projects +package io.renku.entities.viewings.collector import io.renku.graph.model.Schemas.{renku, xsd} import io.renku.graph.model.entities.Project diff --git a/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/projects/ProjectViewingEncoder.scala b/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/persons/Encoder.scala similarity index 65% rename from entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/projects/ProjectViewingEncoder.scala rename to entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/persons/Encoder.scala index 1fe9d73e68..73b102b059 100644 --- a/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/projects/ProjectViewingEncoder.scala +++ b/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/persons/Encoder.scala @@ -16,25 +16,14 @@ * limitations under the License. */ -package io.renku.entities.viewings.collector.projects +package io.renku.entities.viewings.collector +package persons -import io.renku.graph.model.{persons, projects, GraphClass} +import io.renku.graph.model.GraphClass import io.renku.jsonld._ import io.renku.jsonld.syntax._ -private object ProjectViewingEncoder { - - final case class ProjectViewing(projectId: projects.ResourceId, dateViewed: projects.DateViewed) - - final case class Project(id: projects.ResourceId, path: projects.Path) - - final case class PersonViewedProject(userId: persons.ResourceId, project: Project, dateViewed: projects.DateViewed) - - def encode(viewing: ProjectViewing): NamedGraph = - NamedGraph.fromJsonLDsUnsafe( - GraphClass.ProjectViewedTimes.id, - viewing.asJsonLD - ) +private object Encoder { def encode(viewing: PersonViewedProject): NamedGraph = NamedGraph.fromJsonLDsUnsafe( @@ -42,15 +31,6 @@ private object ProjectViewingEncoder { viewing.asJsonLD ) - private implicit lazy val projectViewingEncoder: JsonLDEncoder[ProjectViewing] = - JsonLDEncoder.instance { case ProjectViewing(entityId, date) => - JsonLD.entity( - entityId.asEntityId, - EntityTypes of ProjectViewedTimeOntology.classType, - ProjectViewedTimeOntology.dataViewedProperty.id -> date.asJsonLD - ) - } - private implicit lazy val personViewedProjectEncoder: JsonLDEncoder[PersonViewedProject] = JsonLDEncoder.instance { case ev @ PersonViewedProject(userId, _, _) => JsonLD.entity( diff --git a/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/projects/viewed/PersonViewedProjectPersister.scala b/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/persons/PersonViewedProjectPersister.scala similarity index 94% rename from entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/projects/viewed/PersonViewedProjectPersister.scala rename to entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/persons/PersonViewedProjectPersister.scala index d26d98eca7..3bdd89b23a 100644 --- a/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/projects/viewed/PersonViewedProjectPersister.scala +++ b/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/persons/PersonViewedProjectPersister.scala @@ -16,8 +16,7 @@ * limitations under the License. */ -package io.renku.entities.viewings.collector.projects -package viewed +package io.renku.entities.viewings.collector.persons import cats.MonadThrow import io.renku.graph.model.{persons, projects} @@ -48,7 +47,7 @@ private class PersonViewedProjectPersisterImpl[F[_]: MonadThrow](tsClient: TSCli import io.renku.triplesstore.client.syntax._ import io.renku.triplesstore.SparqlQuery.Prefixes import tsClient._ - import ProjectViewingEncoder._ + import Encoder._ override def persist(event: GLUserViewedProject): F[Unit] = findPersonId(event.userId) >>= { @@ -66,7 +65,7 @@ private class PersonViewedProjectPersisterImpl[F[_]: MonadThrow](tsClient: TSCli private def userResourceIdByGLId(glId: persons.GitLabId) = SparqlQuery.ofUnsafe( - show"${categoryName.show.toLowerCase}: find user id by glid", + show"${GraphClass.PersonViewings}: find user id by glid", Prefixes of (renku -> "renku", schema -> "schema"), sparql"""|SELECT DISTINCT ?id |WHERE { @@ -81,7 +80,7 @@ private class PersonViewedProjectPersisterImpl[F[_]: MonadThrow](tsClient: TSCli ) private def userResourceIdByEmail(email: persons.Email) = SparqlQuery.ofUnsafe( - show"${categoryName.show.toLowerCase}: find user id by email", + show"${GraphClass.PersonViewings}: find user id by email", Prefixes of (renku -> "renku", schema -> "schema"), sparql"""|SELECT DISTINCT ?id |WHERE { @@ -113,7 +112,7 @@ private class PersonViewedProjectPersisterImpl[F[_]: MonadThrow](tsClient: TSCli ): F[Option[projects.DateViewed]] = queryExpecting { SparqlQuery.ofUnsafe( - show"${categoryName.show.toLowerCase}: find date", + show"${GraphClass.PersonViewings}: find project viewed date", Prefixes of renku -> "renku", sparql"""|SELECT (MAX(?date) AS ?mostRecentDate) |WHERE { @@ -139,7 +138,7 @@ private class PersonViewedProjectPersisterImpl[F[_]: MonadThrow](tsClient: TSCli private def deleteOldViewedDate(personId: persons.ResourceId, projectId: projects.ResourceId): F[Unit] = updateWithNoResult( SparqlQuery.ofUnsafe( - show"${categoryName.show.toLowerCase}: delete", + show"${GraphClass.PersonViewings}: delete", Prefixes of renku -> "renku", sparql"""|DELETE { | GRAPH ${GraphClass.PersonViewings.id} { diff --git a/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/persons/model.scala b/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/persons/model.scala new file mode 100644 index 0000000000..714d591668 --- /dev/null +++ b/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/persons/model.scala @@ -0,0 +1,30 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.entities.viewings.collector +package persons + +import io.renku.graph.model.{persons, projects} +import io.renku.triplesgenerator.api.events.UserId + +private[collector] final case class GLUserViewedProject(userId: UserId, project: Project, date: projects.DateViewed) + +private[collector] final case class PersonViewedProject(userId: persons.ResourceId, + project: Project, + dateViewed: projects.DateViewed +) diff --git a/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/projects/Encoder.scala b/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/projects/Encoder.scala new file mode 100644 index 0000000000..2b528b56b7 --- /dev/null +++ b/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/projects/Encoder.scala @@ -0,0 +1,44 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.entities.viewings.collector +package projects + +import io.renku.graph.model.{projects, GraphClass} +import io.renku.jsonld._ +import io.renku.jsonld.syntax._ + +private object Encoder { + + final case class ProjectViewing(projectId: projects.ResourceId, dateViewed: projects.DateViewed) + + def encode(viewing: ProjectViewing): NamedGraph = + NamedGraph.fromJsonLDsUnsafe( + GraphClass.ProjectViewedTimes.id, + viewing.asJsonLD + ) + + private implicit lazy val projectViewingEncoder: JsonLDEncoder[ProjectViewing] = + JsonLDEncoder.instance { case ProjectViewing(entityId, date) => + JsonLD.entity( + entityId.asEntityId, + EntityTypes of ProjectViewedTimeOntology.classType, + ProjectViewedTimeOntology.dataViewedProperty.id -> date.asJsonLD + ) + } +} diff --git a/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/projects/activated/EventPersister.scala b/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/projects/activated/EventPersister.scala index 3b58bf25db..b5782939f6 100644 --- a/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/projects/activated/EventPersister.scala +++ b/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/projects/activated/EventPersister.scala @@ -42,7 +42,7 @@ private class EventPersisterImpl[F[_]: MonadThrow](tsClient: TSClient[F]) extend import io.renku.graph.model.{projects, GraphClass} import io.renku.graph.model.Schemas._ import io.renku.jsonld.syntax._ - import ProjectViewingEncoder._ + import Encoder._ import io.renku.triplesstore.ResultsDecoder._ import io.renku.triplesstore.SparqlQuery import io.renku.triplesstore.client.syntax._ diff --git a/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/projects/viewed/EventPersister.scala b/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/projects/viewed/EventPersister.scala index 77e0a20b1e..ebb36bdc8f 100644 --- a/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/projects/viewed/EventPersister.scala +++ b/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/projects/viewed/EventPersister.scala @@ -16,7 +16,8 @@ * limitations under the License. */ -package io.renku.entities.viewings.collector.projects +package io.renku.entities.viewings.collector +package projects package viewed import cats.effect.Async @@ -25,6 +26,7 @@ import cats.MonadThrow import io.renku.triplesgenerator.api.events.ProjectViewedEvent import io.renku.triplesstore._ import org.typelevel.log4cats.Logger +import persons.{GLUserViewedProject, PersonViewedProjectPersister} private[viewings] trait EventPersister[F[_]] { def persist(event: ProjectViewedEvent): F[Unit] @@ -52,7 +54,7 @@ private[viewings] class EventPersisterImpl[F[_]: MonadThrow]( import io.renku.triplesstore.client.syntax._ import io.renku.triplesstore.SparqlQuery.Prefixes import tsClient.{queryExpecting, updateWithNoResult, upload} - import ProjectViewingEncoder._ + import Encoder._ override def persist(event: ProjectViewedEvent): F[Unit] = findProjectId(event) >>= { diff --git a/entities-viewings-collector/src/test/scala/io/renku/entities/viewings/collector/projects/viewed/PersonViewedProjectPersisterSpec.scala b/entities-viewings-collector/src/test/scala/io/renku/entities/viewings/collector/persons/PersonViewedProjectPersisterSpec.scala similarity index 87% rename from entities-viewings-collector/src/test/scala/io/renku/entities/viewings/collector/projects/viewed/PersonViewedProjectPersisterSpec.scala rename to entities-viewings-collector/src/test/scala/io/renku/entities/viewings/collector/persons/PersonViewedProjectPersisterSpec.scala index e6710a33d5..00a3c8dc20 100644 --- a/entities-viewings-collector/src/test/scala/io/renku/entities/viewings/collector/projects/viewed/PersonViewedProjectPersisterSpec.scala +++ b/entities-viewings-collector/src/test/scala/io/renku/entities/viewings/collector/persons/PersonViewedProjectPersisterSpec.scala @@ -16,12 +16,12 @@ * limitations under the License. */ -package io.renku.entities.viewings.collector.projects -package viewed +package io.renku.entities.viewings.collector.persons import cats.effect.IO import cats.syntax.all._ import eu.timepit.refined.auto._ +import io.renku.entities.viewings.collector import io.renku.generators.Generators.{fixed, timestamps, timestampsNotInTheFuture} import io.renku.generators.Generators.Implicits._ import io.renku.graph.model._ @@ -60,7 +60,7 @@ class PersonViewedProjectPersisterSpec upload(to = projectsDataset, project) val dateViewed = projectViewedDates(project.dateCreated.value).generateOne - val event = GLUserViewedProject(userId, toEncoderProject(project), dateViewed) + val event = GLUserViewedProject(userId, toCollectorProject(project), dateViewed) persister.persist(event).unsafeRunSync() shouldBe () @@ -78,7 +78,7 @@ class PersonViewedProjectPersisterSpec upload(to = projectsDataset, project) val dateViewed = projectViewedDates(project.dateCreated.value).generateOne - val event = GLUserViewedProject(userId, toEncoderProject(project), dateViewed) + val event = collector.persons.GLUserViewedProject(userId, toCollectorProject(project), dateViewed) persister.persist(event).unsafeRunSync() shouldBe () @@ -97,7 +97,7 @@ class PersonViewedProjectPersisterSpec upload(to = projectsDataset, project) val dateViewed = projectViewedDates(project.dateCreated.value).generateOne - val event = GLUserViewedProject(userId, toEncoderProject(project), dateViewed) + val event = collector.persons.GLUserViewedProject(userId, toCollectorProject(project), dateViewed) persister.persist(event).unsafeRunSync() shouldBe () @@ -120,7 +120,7 @@ class PersonViewedProjectPersisterSpec upload(to = projectsDataset, project) val dateViewed = projectViewedDates(project.dateCreated.value).generateOne - val event = GLUserViewedProject(userId, toEncoderProject(project), dateViewed) + val event = collector.persons.GLUserViewedProject(userId, toCollectorProject(project), dateViewed) persister.persist(event).unsafeRunSync() shouldBe () @@ -145,11 +145,13 @@ class PersonViewedProjectPersisterSpec upload(to = projectsDataset, project1, project2) val project1DateViewed = projectViewedDates(project1.dateCreated.value).generateOne - val project1Event = GLUserViewedProject(userId, toEncoderProject(project1), project1DateViewed) + val project1Event = + collector.persons.GLUserViewedProject(userId, toCollectorProject(project1), project1DateViewed) persister.persist(project1Event).unsafeRunSync() shouldBe () val project2DateViewed = projectViewedDates(project2.dateCreated.value).generateOne - val project2Event = GLUserViewedProject(userId, toEncoderProject(project2), project2DateViewed) + val project2Event = + collector.persons.GLUserViewedProject(userId, toCollectorProject(project2), project2DateViewed) persister.persist(project2Event).unsafeRunSync() shouldBe () findAllViewings shouldBe Set( @@ -172,9 +174,9 @@ class PersonViewedProjectPersisterSpec val project = generateProjectWithCreator(userIds.generateOne) - val event = GLUserViewedProject(userIds.generateOne, - toEncoderProject(project), - projectViewedDates(project.dateCreated.value).generateOne + val event = collector.persons.GLUserViewedProject(userIds.generateOne, + toCollectorProject(project), + projectViewedDates(project.dateCreated.value).generateOne ) persister.persist(event).unsafeRunSync() shouldBe () @@ -232,6 +234,6 @@ class PersonViewedProjectPersisterSpec date: projects.DateViewed ) - private def toEncoderProject(project: entities.Project) = - ProjectViewingEncoder.Project(project.resourceId, project.path) + private def toCollectorProject(project: entities.Project) = + collector.Project(project.resourceId, project.path) } diff --git a/entities-viewings-collector/src/test/scala/io/renku/entities/viewings/collector/projects/viewed/EventPersisterSpec.scala b/entities-viewings-collector/src/test/scala/io/renku/entities/viewings/collector/projects/viewed/EventPersisterSpec.scala index 748bfbdd02..76e9589bda 100644 --- a/entities-viewings-collector/src/test/scala/io/renku/entities/viewings/collector/projects/viewed/EventPersisterSpec.scala +++ b/entities-viewings-collector/src/test/scala/io/renku/entities/viewings/collector/projects/viewed/EventPersisterSpec.scala @@ -22,6 +22,8 @@ package viewed import cats.effect.IO import cats.syntax.all._ import eu.timepit.refined.auto._ +import io.renku.entities.viewings.collector +import io.renku.entities.viewings.collector.persons.{GLUserViewedProject, PersonViewedProjectPersister} import io.renku.generators.Generators.{timestamps, timestampsNotInTheFuture} import io.renku.generators.Generators.Implicits._ import io.renku.graph.model.{projects, GraphClass} @@ -134,10 +136,7 @@ class EventPersisterSpec event.maybeUserId.map(userId => (personViewingPersister.persist _) .expects( - GLUserViewedProject(userId, - ProjectViewingEncoder.Project(project.resourceId, project.path), - event.dateViewed - ) + GLUserViewedProject(userId, collector.Project(project.resourceId, project.path), event.dateViewed) ) .returning(returning) ) diff --git a/entities-viewings-collector/src/test/scala/io/renku/entities/viewings/deletion/projects/ViewingRemoverSpec.scala b/entities-viewings-collector/src/test/scala/io/renku/entities/viewings/deletion/projects/ViewingRemoverSpec.scala index 22787dd85b..6c8d3e27fc 100644 --- a/entities-viewings-collector/src/test/scala/io/renku/entities/viewings/deletion/projects/ViewingRemoverSpec.scala +++ b/entities-viewings-collector/src/test/scala/io/renku/entities/viewings/deletion/projects/ViewingRemoverSpec.scala @@ -21,7 +21,8 @@ package deletion.projects import cats.effect.IO import cats.syntax.all._ -import collector.projects.viewed.{EventPersisterImpl, PersonViewedProjectPersister} +import collector.persons.PersonViewedProjectPersister +import collector.projects.viewed.EventPersisterImpl import eu.timepit.refined.auto._ import io.renku.generators.Generators.Implicits._ import io.renku.generators.Generators.fixed @@ -32,8 +33,8 @@ import io.renku.interpreters.TestLogger import io.renku.jsonld.syntax._ import io.renku.logging.TestSparqlQueryTimeRecorder import io.renku.testtools.IOSpec -import io.renku.triplesgenerator.api.events.Generators._ import io.renku.triplesgenerator.api.events.{ProjectViewingDeletion, UserId} +import io.renku.triplesgenerator.api.events.Generators._ import io.renku.triplesstore._ import io.renku.triplesstore.client.syntax._ import io.renku.triplesstore.SparqlQuery.Prefixes diff --git a/knowledge-graph/src/main/scala/io/renku/knowledgegraph/ontology/OntologyGenerator.scala b/knowledge-graph/src/main/scala/io/renku/knowledgegraph/ontology/OntologyGenerator.scala index 4eaaf8c3dd..2c77b300d9 100644 --- a/knowledge-graph/src/main/scala/io/renku/knowledgegraph/ontology/OntologyGenerator.scala +++ b/knowledge-graph/src/main/scala/io/renku/knowledgegraph/ontology/OntologyGenerator.scala @@ -20,7 +20,7 @@ package io.renku.knowledgegraph.ontology import cats.data.NonEmptyList import io.renku.entities.searchgraphs.SearchInfoOntology -import io.renku.entities.viewings.collector.projects.{PersonViewingOntology, ProjectViewedTimeOntology} +import io.renku.entities.viewings.collector.{PersonViewingOntology, ProjectViewedTimeOntology} import io.renku.graph.model.Schemas import io.renku.graph.model.entities.{CompositePlan, Project} import io.renku.jsonld.JsonLD From 84716b890b3233185888d2e0dae44c1b321eca48 Mon Sep 17 00:00:00 2001 From: Jakub Chrobasik Date: Fri, 31 Mar 2023 14:31:04 +0200 Subject: [PATCH 5/8] feat: ontology for the PersonViewedDataset --- .../viewings/collector/ontology.scala | 23 +++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/ontology.scala b/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/ontology.scala index 6477e7de0a..182f36d6cf 100644 --- a/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/ontology.scala +++ b/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/ontology.scala @@ -19,7 +19,7 @@ package io.renku.entities.viewings.collector import io.renku.graph.model.Schemas.{renku, xsd} -import io.renku.graph.model.entities.Project +import io.renku.graph.model.entities.{Dataset, Project} import io.renku.jsonld.ontology.{Class, DataProperties, DataProperty, ObjectProperties, ObjectProperty, Type} import io.renku.jsonld.Property @@ -38,10 +38,12 @@ object PersonViewingOntology { val classType: Property = renku / "PersonViewing" val viewedProjectProperty: Property = renku / "viewedProject" + val viewedDatasetProperty: Property = renku / "viewedDataset" lazy val typeDef: Type = Type.Def( Class(classType), - ObjectProperty(viewedProjectProperty, PersonViewedProjectOntology.typeDef) + ObjectProperty(viewedProjectProperty, PersonViewedProjectOntology.typeDef), + ObjectProperty(viewedDatasetProperty, PersonViewedDatasetOntology.typeDef) ) } @@ -61,3 +63,20 @@ object PersonViewedProjectOntology { ) ) } + +object PersonViewedDatasetOntology { + + val classType: Property = renku / "ViewedDataset" + val datasetProperty: Property = renku / "dataset" + val dateViewedProperty: DataProperty.Def = DataProperty(renku / "dateViewed", xsd / "dateTime") + + lazy val typeDef: Type = Type.Def( + Class(classType), + ObjectProperties( + ObjectProperty(datasetProperty, Dataset.Ontology.typeDef) + ), + DataProperties( + dateViewedProperty + ) + ) +} From cf391752b3a631b78887f8daf5bfbc6f83288dea Mon Sep 17 00:00:00 2001 From: Jakub Chrobasik Date: Fri, 31 Mar 2023 17:35:24 +0200 Subject: [PATCH 6/8] feat: persisting PersonViewedDataset events --- .../entities/viewings/collector/Project.scala | 23 -- .../collector/datasets/DSInfoFinder.scala | 91 +++++++ .../collector/datasets/EventUploader.scala | 39 ++- .../collector/datasets/ProjectFinder.scala | 81 ------ .../viewings/collector/persons/Encoder.scala | 25 ++ .../collector/persons/PersonFinder.scala | 93 +++++++ .../PersonViewedDatasetPersister.scala | 127 +++++++++ .../PersonViewedProjectPersister.scala | 51 +--- .../viewings/collector/persons/model.scala | 13 +- .../projects/viewed/EventPersister.scala | 2 +- ...inderSpec.scala => DSInfoFinderSpec.scala} | 23 +- .../datasets/EventUploaderSpec.scala | 65 ++++- .../PersonViewedDatasetPersisterSpec.scala | 244 ++++++++++++++++++ .../PersonViewedProjectPersisterSpec.scala | 5 +- .../projects/viewed/EventPersisterSpec.scala | 2 +- 15 files changed, 697 insertions(+), 187 deletions(-) delete mode 100644 entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/Project.scala create mode 100644 entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/datasets/DSInfoFinder.scala delete mode 100644 entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/datasets/ProjectFinder.scala create mode 100644 entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/persons/PersonFinder.scala create mode 100644 entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/persons/PersonViewedDatasetPersister.scala rename entities-viewings-collector/src/test/scala/io/renku/entities/viewings/collector/datasets/{ProjectFinderSpec.scala => DSInfoFinderSpec.scala} (71%) create mode 100644 entities-viewings-collector/src/test/scala/io/renku/entities/viewings/collector/persons/PersonViewedDatasetPersisterSpec.scala diff --git a/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/Project.scala b/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/Project.scala deleted file mode 100644 index 70cd5eb413..0000000000 --- a/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/Project.scala +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Copyright 2023 Swiss Data Science Center (SDSC) - * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and - * Eidgenössische Technische Hochschule Zürich (ETHZ). - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.renku.entities.viewings.collector - -import io.renku.graph.model.projects - -private final case class Project(id: projects.ResourceId, path: projects.Path) diff --git a/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/datasets/DSInfoFinder.scala b/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/datasets/DSInfoFinder.scala new file mode 100644 index 0000000000..dd71bc965b --- /dev/null +++ b/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/datasets/DSInfoFinder.scala @@ -0,0 +1,91 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.entities.viewings.collector +package datasets + +import cats.effect.Async +import cats.syntax.all._ +import cats.MonadThrow +import eu.timepit.refined.auto._ +import io.renku.entities.viewings.collector.persons.Dataset +import io.renku.graph.model.{datasets, projects} +import io.renku.triplesstore.{ProjectsConnectionConfig, SparqlQueryTimeRecorder, TSClient} +import org.typelevel.log4cats.Logger + +private trait DSInfoFinder[F[_]] { + def findDSInfo(identifier: datasets.Identifier): F[Option[DSInfo]] +} + +private final case class DSInfo(projectPath: projects.Path, dataset: Dataset) + +private object DSInfoFinder { + def apply[F[_]: Async: Logger: SparqlQueryTimeRecorder]: F[DSInfoFinder[F]] = + ProjectsConnectionConfig[F]().map(TSClient[F](_)).map(new DSInfoFinderImpl[F](_)) +} + +private class DSInfoFinderImpl[F[_]: MonadThrow](tsClient: TSClient[F]) extends DSInfoFinder[F] { + + import io.circe.Decoder + import io.renku.graph.model.Schemas.{renku, schema} + import io.renku.triplesstore.{ResultsDecoder, SparqlQuery} + import io.renku.triplesstore.SparqlQuery.Prefixes + import io.renku.triplesstore.client.syntax._ + import io.renku.triplesstore.ResultsDecoder._ + import tsClient._ + + override def findDSInfo(identifier: datasets.Identifier): F[Option[DSInfo]] = + findProjects(identifier) + .map(findOldestProject) + .map(_.map { case (path, _, dataset) => DSInfo(path, dataset) }) + + private type Row = (projects.Path, projects.DateCreated, Dataset) + + private def findProjects(identifier: datasets.Identifier): F[List[Row]] = queryExpecting( + SparqlQuery + .ofUnsafe( + show"${categoryName.show.toLowerCase}: find projects", + Prefixes of (schema -> "schema", renku -> "renku"), + sparql"""|SELECT ?path ?dateCreated ?dsId ?dsIdentifier + |WHERE { + | GRAPH ?projectId { + | BIND (${identifier.asObject} AS ?dsIdentifier) + | ?dsId a schema:Dataset; + | schema:identifier ?dsIdentifier. + | ?projectId a schema:Project; + | renku:projectPath ?path; + | schema:dateCreated ?dateCreated. + | } + |} + |""".stripMargin + ) + )(rowsDecoder) + + private lazy val rowsDecoder: Decoder[List[Row]] = ResultsDecoder[List, Row] { implicit cur => + import io.renku.tinytypes.json.TinyTypeDecoders._ + for { + path <- extract[projects.Path]("path") + date <- extract[projects.DateCreated]("dateCreated") + dsId <- extract[datasets.ResourceId]("dsId") + dsIdentifier <- extract[datasets.Identifier]("dsIdentifier") + } yield (path, date, Dataset(dsId, dsIdentifier)) + } + + private lazy val findOldestProject: List[Row] => Option[Row] = + _.sortBy(_._2).headOption +} diff --git a/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/datasets/EventUploader.scala b/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/datasets/EventUploader.scala index 959a567b16..5ced87af65 100644 --- a/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/datasets/EventUploader.scala +++ b/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/datasets/EventUploader.scala @@ -16,12 +16,14 @@ * limitations under the License. */ -package io.renku.entities.viewings.collector.datasets +package io.renku.entities.viewings.collector +package datasets import cats.effect.Async import cats.syntax.all._ import cats.MonadThrow import cats.data.OptionT +import io.renku.entities.viewings.collector.persons.{GLUserViewedDataset, PersonViewedDatasetPersister} import io.renku.entities.viewings.collector.projects.viewed.EventPersister import io.renku.graph.model.projects import io.renku.triplesgenerator.api.events.{DatasetViewedEvent, ProjectViewedEvent, UserId} @@ -34,22 +36,39 @@ private trait EventUploader[F[_]] { private object EventUploader { def apply[F[_]: Async: Logger: SparqlQueryTimeRecorder]: F[EventUploader[F]] = - (ProjectFinder[F], EventPersister[F]) - .mapN(new EventUploaderImpl[F](_, _)) + (DSInfoFinder[F], EventPersister[F], PersonViewedDatasetPersister[F]) + .mapN(new EventUploaderImpl[F](_, _, _)) } private class EventUploaderImpl[F[_]: MonadThrow]( - projectFinder: ProjectFinder[F], - eventPersister: EventPersister[F] + dsInfoFinder: DSInfoFinder[F], + projectViewedEventPersister: EventPersister[F], + personViewedDatasetPersister: PersonViewedDatasetPersister[F] ) extends EventUploader[F] { - import eventPersister._ - import projectFinder._ + import dsInfoFinder._ override def upload(event: DatasetViewedEvent): F[Unit] = - OptionT(findProject(event.identifier)) - .map(ProjectViewedEvent(_, projects.DateViewed(event.dateViewed.value), event.maybeUserId.map(UserId(_)))) - .semiflatMap(persist) + OptionT(findDSInfo(event.identifier)) + .semiflatTap(persistProjectViewedEvent(_, event)) + .semiflatTap(persistPersonViewedDataset(_, event)) .value .void + + private def persistProjectViewedEvent(dsInfo: DSInfo, event: DatasetViewedEvent) = + projectViewedEventPersister.persist( + ProjectViewedEvent(dsInfo.projectPath, + projects.DateViewed(event.dateViewed.value), + event.maybeUserId.map(UserId(_)) + ) + ) + + private def persistPersonViewedDataset(dsInfo: DSInfo, event: DatasetViewedEvent) = + event.maybeUserId match { + case None => ().pure[F] + case Some(glId) => + personViewedDatasetPersister.persist( + GLUserViewedDataset(UserId(glId), dsInfo.dataset, event.dateViewed) + ) + } } diff --git a/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/datasets/ProjectFinder.scala b/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/datasets/ProjectFinder.scala deleted file mode 100644 index 242d5a8eec..0000000000 --- a/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/datasets/ProjectFinder.scala +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright 2023 Swiss Data Science Center (SDSC) - * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and - * Eidgenössische Technische Hochschule Zürich (ETHZ). - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.renku.entities.viewings.collector.datasets - -import cats.effect.Async -import cats.syntax.all._ -import cats.MonadThrow -import eu.timepit.refined.auto._ -import io.renku.graph.model.{datasets, projects} -import io.renku.triplesstore.{ProjectsConnectionConfig, SparqlQueryTimeRecorder, TSClient} -import org.typelevel.log4cats.Logger - -private trait ProjectFinder[F[_]] { - def findProject(identifier: datasets.Identifier): F[Option[projects.Path]] -} - -private object ProjectFinder { - def apply[F[_]: Async: Logger: SparqlQueryTimeRecorder]: F[ProjectFinder[F]] = - ProjectsConnectionConfig[F]().map(TSClient[F](_)).map(new ProjectFinderImpl[F](_)) -} - -private class ProjectFinderImpl[F[_]: MonadThrow](tsClient: TSClient[F]) extends ProjectFinder[F] { - - import io.circe.Decoder - import io.renku.graph.model.Schemas.{renku, schema} - import io.renku.triplesstore.{ResultsDecoder, SparqlQuery} - import io.renku.triplesstore.SparqlQuery.Prefixes - import io.renku.triplesstore.client.syntax._ - import io.renku.triplesstore.ResultsDecoder._ - import tsClient._ - - override def findProject(identifier: datasets.Identifier): F[Option[projects.Path]] = - findProjects(identifier) - .map(findOldestProject) - - private type Row = (projects.Path, projects.DateCreated) - - private def findProjects(identifier: datasets.Identifier): F[List[Row]] = queryExpecting( - SparqlQuery - .ofUnsafe( - show"${categoryName.show.toLowerCase}: find projects", - Prefixes of (schema -> "schema", renku -> "renku"), - s"""|SELECT ?path ?dateCreated - |WHERE { - | GRAPH ?projectId { - | ?id a schema:Dataset; - | schema:identifier ${identifier.asObject.asSparql.sparql}. - | ?projectId a schema:Project; - | renku:projectPath ?path; - | schema:dateCreated ?dateCreated. - | } - |} - |""".stripMargin - ) - )(rowsDecoder) - - private lazy val rowsDecoder: Decoder[List[Row]] = - ResultsDecoder[List, Row] { implicit cur => - import io.renku.tinytypes.json.TinyTypeDecoders._ - (extract[projects.Path]("path") -> extract[projects.DateCreated]("dateCreated")).bisequence - } - - private lazy val findOldestProject: List[Row] => Option[projects.Path] = - _.sortBy(_._2).headOption.map(_._1) -} diff --git a/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/persons/Encoder.scala b/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/persons/Encoder.scala index 73b102b059..ec38f9c484 100644 --- a/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/persons/Encoder.scala +++ b/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/persons/Encoder.scala @@ -31,6 +31,12 @@ private object Encoder { viewing.asJsonLD ) + def encode(viewing: PersonViewedDataset): NamedGraph = + NamedGraph.fromJsonLDsUnsafe( + GraphClass.PersonViewings.id, + viewing.asJsonLD + ) + private implicit lazy val personViewedProjectEncoder: JsonLDEncoder[PersonViewedProject] = JsonLDEncoder.instance { case ev @ PersonViewedProject(userId, _, _) => JsonLD.entity( @@ -49,4 +55,23 @@ private object Encoder { PersonViewedProjectOntology.dateViewedProperty.id -> date.asJsonLD ) } + + private implicit lazy val personViewedDatasetEncoder: JsonLDEncoder[PersonViewedDataset] = + JsonLDEncoder.instance { case ev @ PersonViewedDataset(userId, _, _) => + JsonLD.entity( + userId.asEntityId, + EntityTypes of PersonViewingOntology.classType, + PersonViewingOntology.viewedDatasetProperty -> ev.asJsonLD(viewedDatasetEncoder) + ) + } + + private lazy val viewedDatasetEncoder: JsonLDEncoder[PersonViewedDataset] = + JsonLDEncoder.instance { case PersonViewedDataset(userId, Dataset(id, identifier), date) => + JsonLD.entity( + EntityId.of(s"$userId/datasets/$identifier"), + EntityTypes of PersonViewedProjectOntology.classType, + PersonViewedDatasetOntology.datasetProperty -> id.asJsonLD, + PersonViewedDatasetOntology.dateViewedProperty.id -> date.asJsonLD + ) + } } diff --git a/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/persons/PersonFinder.scala b/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/persons/PersonFinder.scala new file mode 100644 index 0000000000..e6b57e412d --- /dev/null +++ b/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/persons/PersonFinder.scala @@ -0,0 +1,93 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.entities.viewings.collector.persons + +import cats.MonadThrow +import io.renku.graph.model.entities.Person +import io.renku.graph.model.persons +import io.renku.triplesgenerator.api.events.UserId +import io.renku.triplesstore.TSClient + +private trait PersonFinder[F[_]] { + def findPersonId(userId: UserId): F[Option[persons.ResourceId]] +} + +private object PersonFinder { + def apply[F[_]: MonadThrow](tsClient: TSClient[F]): PersonFinder[F] = + new PersonFinderImpl[F](tsClient) +} + +private class PersonFinderImpl[F[_]](tsClient: TSClient[F]) extends PersonFinder[F] { + + import cats.syntax.all._ + import eu.timepit.refined.auto._ + import io.circe.Decoder + import io.renku.graph.model.GraphClass + import io.renku.graph.model.Schemas._ + import io.renku.triplesstore.{ResultsDecoder, SparqlQuery} + import io.renku.triplesstore.ResultsDecoder._ + import io.renku.triplesstore.client.syntax._ + import io.renku.triplesstore.SparqlQuery.Prefixes + import tsClient._ + + override def findPersonId(userId: UserId): F[Option[persons.ResourceId]] = + queryExpecting[Option[persons.ResourceId]] { + userId.fold( + userResourceIdByGLId, + userResourceIdByEmail + ) + }(idDecoder) + + private def userResourceIdByGLId(glId: persons.GitLabId) = + SparqlQuery.ofUnsafe( + show"${GraphClass.PersonViewings}: find user id by glid", + Prefixes of (renku -> "renku", schema -> "schema"), + sparql"""|SELECT DISTINCT ?id + |WHERE { + | GRAPH ${GraphClass.Persons.id} { + | ?id schema:sameAs ?sameAsId. + | ?sameAsId schema:additionalType ${Person.gitLabSameAsAdditionalType.asTripleObject}; + | schema:identifier ${glId.asObject} + | } + |} + |LIMIT 1 + |""".stripMargin + ) + + private def userResourceIdByEmail(email: persons.Email) = + SparqlQuery.ofUnsafe( + show"${GraphClass.PersonViewings}: find user id by email", + Prefixes of (renku -> "renku", schema -> "schema"), + sparql"""|SELECT DISTINCT ?id + |WHERE { + | GRAPH ${GraphClass.Persons.id} { + | ?id schema:email ${email.asObject} + | } + |} + |LIMIT 1 + |""".stripMargin + ) + + private lazy val idDecoder: Decoder[Option[persons.ResourceId]] = ResultsDecoder[Option, persons.ResourceId] { + Decoder.instance[persons.ResourceId] { implicit cur => + import io.renku.tinytypes.json.TinyTypeDecoders._ + extract[persons.ResourceId]("id") + } + } +} diff --git a/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/persons/PersonViewedDatasetPersister.scala b/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/persons/PersonViewedDatasetPersister.scala new file mode 100644 index 0000000000..90c7b1bd0e --- /dev/null +++ b/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/persons/PersonViewedDatasetPersister.scala @@ -0,0 +1,127 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.entities.viewings.collector.persons + +import cats.MonadThrow +import cats.effect.Async +import cats.syntax.all._ +import io.renku.graph.model.{datasets, persons} +import io.renku.triplesstore.{ProjectsConnectionConfig, SparqlQueryTimeRecorder, TSClient} +import org.typelevel.log4cats.Logger + +private[viewings] trait PersonViewedDatasetPersister[F[_]] { + def persist(event: GLUserViewedDataset): F[Unit] +} + +private[viewings] object PersonViewedDatasetPersister { + + def apply[F[_]: Async: Logger: SparqlQueryTimeRecorder]: F[PersonViewedDatasetPersister[F]] = + ProjectsConnectionConfig[F]() + .map(TSClient[F](_)) + .map(apply[F](_)) + + def apply[F[_]: MonadThrow](tsClient: TSClient[F]): PersonViewedDatasetPersister[F] = + new PersonViewedDatasetPersisterImpl[F](tsClient, PersonFinder(tsClient)) +} + +private class PersonViewedDatasetPersisterImpl[F[_]: MonadThrow](tsClient: TSClient[F], personFinder: PersonFinder[F]) + extends PersonViewedDatasetPersister[F] { + + import cats.syntax.all._ + import eu.timepit.refined.auto._ + import io.circe.Decoder + import io.renku.graph.model.GraphClass + import io.renku.graph.model.Schemas._ + import io.renku.jsonld.syntax._ + import io.renku.triplesstore.{ResultsDecoder, SparqlQuery} + import io.renku.triplesstore.ResultsDecoder._ + import io.renku.triplesstore.client.syntax._ + import io.renku.triplesstore.SparqlQuery.Prefixes + import tsClient._ + import Encoder._ + import personFinder._ + + override def persist(event: GLUserViewedDataset): F[Unit] = + findPersonId(event.userId) >>= { + case None => ().pure[F] + case Some(personId) => persistIfOlderOrNone(personId, event) + } + + private def persistIfOlderOrNone(personId: persons.ResourceId, event: GLUserViewedDataset) = + findStoredDate(personId, event.dataset.id) >>= { + case None => insert(personId, event) + case Some(date) if date < event.date => + deleteOldViewedDate(personId, event.dataset.id) >> insert(personId, event) + case _ => ().pure[F] + } + + private def findStoredDate(personId: persons.ResourceId, + datasetId: datasets.ResourceId + ): F[Option[datasets.DateViewed]] = + queryExpecting { + SparqlQuery.ofUnsafe( + show"${GraphClass.PersonViewings}: find dataset viewed date", + Prefixes of renku -> "renku", + sparql"""|SELECT (MAX(?date) AS ?mostRecentDate) + |WHERE { + | GRAPH ${GraphClass.PersonViewings.id} { + | BIND (${personId.asEntityId} AS ?personId) + | ?personId renku:viewedDataset ?viewingId. + | ?viewingId renku:dataset ${datasetId.asEntityId}; + | renku:dateViewed ?date. + | } + |} + |GROUP BY ?id + |""".stripMargin + ) + }(dateDecoder) + + private lazy val dateDecoder: Decoder[Option[datasets.DateViewed]] = ResultsDecoder[Option, datasets.DateViewed] { + Decoder.instance[datasets.DateViewed] { implicit cur => + import io.renku.tinytypes.json.TinyTypeDecoders._ + extract[datasets.DateViewed]("mostRecentDate") + } + } + + private def deleteOldViewedDate(personId: persons.ResourceId, datasetId: datasets.ResourceId): F[Unit] = + updateWithNoResult( + SparqlQuery.ofUnsafe( + show"${GraphClass.PersonViewings}: delete", + Prefixes of renku -> "renku", + sparql"""|DELETE { + | GRAPH ${GraphClass.PersonViewings.id} { + | ?viewingId ?p ?o + | } + |} + |WHERE { + | GRAPH ${GraphClass.PersonViewings.id} { + | ${personId.asEntityId} renku:viewedDataset ?viewingId. + | ?viewingId renku:dataset ${datasetId.asEntityId}; + | ?p ?o. + | } + |} + |""".stripMargin + ) + ) + + private def insert(personId: persons.ResourceId, event: GLUserViewedDataset): F[Unit] = + upload( + encode(PersonViewedDataset(personId, event.dataset, event.date)) + ) +} diff --git a/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/persons/PersonViewedProjectPersister.scala b/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/persons/PersonViewedProjectPersister.scala index 3bdd89b23a..888fd78c7c 100644 --- a/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/persons/PersonViewedProjectPersister.scala +++ b/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/persons/PersonViewedProjectPersister.scala @@ -20,8 +20,6 @@ package io.renku.entities.viewings.collector.persons import cats.MonadThrow import io.renku.graph.model.{persons, projects} -import io.renku.graph.model.entities.Person -import io.renku.triplesgenerator.api.events.UserId import io.renku.triplesstore.TSClient private[viewings] trait PersonViewedProjectPersister[F[_]] { @@ -30,10 +28,10 @@ private[viewings] trait PersonViewedProjectPersister[F[_]] { private[viewings] object PersonViewedProjectPersister { def apply[F[_]: MonadThrow](tsClient: TSClient[F]): PersonViewedProjectPersister[F] = - new PersonViewedProjectPersisterImpl[F](tsClient) + new PersonViewedProjectPersisterImpl[F](tsClient, PersonFinder(tsClient)) } -private class PersonViewedProjectPersisterImpl[F[_]: MonadThrow](tsClient: TSClient[F]) +private class PersonViewedProjectPersisterImpl[F[_]: MonadThrow](tsClient: TSClient[F], personFinder: PersonFinder[F]) extends PersonViewedProjectPersister[F] { import cats.syntax.all._ @@ -48,6 +46,7 @@ private class PersonViewedProjectPersisterImpl[F[_]: MonadThrow](tsClient: TSCli import io.renku.triplesstore.SparqlQuery.Prefixes import tsClient._ import Encoder._ + import personFinder._ override def persist(event: GLUserViewedProject): F[Unit] = findPersonId(event.userId) >>= { @@ -55,50 +54,6 @@ private class PersonViewedProjectPersisterImpl[F[_]: MonadThrow](tsClient: TSCli case Some(personId) => persistIfOlderOrNone(personId, event) } - private def findPersonId(userId: UserId) = - queryExpecting[Option[persons.ResourceId]] { - userId.fold( - userResourceIdByGLId, - userResourceIdByEmail - ) - }(idDecoder) - - private def userResourceIdByGLId(glId: persons.GitLabId) = - SparqlQuery.ofUnsafe( - show"${GraphClass.PersonViewings}: find user id by glid", - Prefixes of (renku -> "renku", schema -> "schema"), - sparql"""|SELECT DISTINCT ?id - |WHERE { - | GRAPH ${GraphClass.Persons.id} { - | ?id schema:sameAs ?sameAsId. - | ?sameAsId schema:additionalType ${Person.gitLabSameAsAdditionalType.asTripleObject}; - | schema:identifier ${glId.asObject} - | } - |} - |LIMIT 1 - |""".stripMargin - ) - private def userResourceIdByEmail(email: persons.Email) = - SparqlQuery.ofUnsafe( - show"${GraphClass.PersonViewings}: find user id by email", - Prefixes of (renku -> "renku", schema -> "schema"), - sparql"""|SELECT DISTINCT ?id - |WHERE { - | GRAPH ${GraphClass.Persons.id} { - | ?id schema:email ${email.asObject} - | } - |} - |LIMIT 1 - |""".stripMargin - ) - - private lazy val idDecoder: Decoder[Option[persons.ResourceId]] = ResultsDecoder[Option, persons.ResourceId] { - Decoder.instance[persons.ResourceId] { implicit cur => - import io.renku.tinytypes.json.TinyTypeDecoders._ - extract[persons.ResourceId]("id") - } - } - private def persistIfOlderOrNone(personId: persons.ResourceId, event: GLUserViewedProject) = findStoredDate(personId, event.project.id) >>= { case None => insert(personId, event) diff --git a/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/persons/model.scala b/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/persons/model.scala index 714d591668..32827bbe93 100644 --- a/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/persons/model.scala +++ b/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/persons/model.scala @@ -19,7 +19,7 @@ package io.renku.entities.viewings.collector package persons -import io.renku.graph.model.{persons, projects} +import io.renku.graph.model.{datasets, persons, projects} import io.renku.triplesgenerator.api.events.UserId private[collector] final case class GLUserViewedProject(userId: UserId, project: Project, date: projects.DateViewed) @@ -28,3 +28,14 @@ private[collector] final case class PersonViewedProject(userId: persons.Reso project: Project, dateViewed: projects.DateViewed ) + +private[collector] final case class Project(id: projects.ResourceId, path: projects.Path) + +private[collector] final case class GLUserViewedDataset(userId: UserId, dataset: Dataset, date: datasets.DateViewed) + +private[collector] final case class PersonViewedDataset(userId: persons.ResourceId, + dataset: Dataset, + dateViewed: datasets.DateViewed +) + +private[collector] final case class Dataset(id: datasets.ResourceId, identifier: datasets.Identifier) diff --git a/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/projects/viewed/EventPersister.scala b/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/projects/viewed/EventPersister.scala index ebb36bdc8f..9af4a5c314 100644 --- a/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/projects/viewed/EventPersister.scala +++ b/entities-viewings-collector/src/main/scala/io/renku/entities/viewings/collector/projects/viewed/EventPersister.scala @@ -26,7 +26,7 @@ import cats.MonadThrow import io.renku.triplesgenerator.api.events.ProjectViewedEvent import io.renku.triplesstore._ import org.typelevel.log4cats.Logger -import persons.{GLUserViewedProject, PersonViewedProjectPersister} +import persons.{GLUserViewedProject, PersonViewedProjectPersister, Project} private[viewings] trait EventPersister[F[_]] { def persist(event: ProjectViewedEvent): F[Unit] diff --git a/entities-viewings-collector/src/test/scala/io/renku/entities/viewings/collector/datasets/ProjectFinderSpec.scala b/entities-viewings-collector/src/test/scala/io/renku/entities/viewings/collector/datasets/DSInfoFinderSpec.scala similarity index 71% rename from entities-viewings-collector/src/test/scala/io/renku/entities/viewings/collector/datasets/ProjectFinderSpec.scala rename to entities-viewings-collector/src/test/scala/io/renku/entities/viewings/collector/datasets/DSInfoFinderSpec.scala index 489ec84470..7af530fa97 100644 --- a/entities-viewings-collector/src/test/scala/io/renku/entities/viewings/collector/datasets/ProjectFinderSpec.scala +++ b/entities-viewings-collector/src/test/scala/io/renku/entities/viewings/collector/datasets/DSInfoFinderSpec.scala @@ -20,6 +20,7 @@ package io.renku.entities.viewings.collector.datasets import cats.effect.IO import cats.syntax.all._ +import io.renku.entities.viewings.collector import io.renku.generators.Generators.Implicits._ import io.renku.graph.model.entities import io.renku.graph.model.testentities._ @@ -31,7 +32,7 @@ import org.scalatest.matchers.should import org.scalatest.wordspec.AnyWordSpec import org.scalatest.OptionValues -class ProjectFinderSpec +class DSInfoFinderSpec extends AnyWordSpec with should.Matchers with IOSpec @@ -39,18 +40,19 @@ class ProjectFinderSpec with ProjectsDataset with OptionValues { - "findProject" should { + "findDSInfo" should { - "return path of the project where the non-modified DS with the given identifier exists" in new TestCase { + "return the Project info of the project where the non-modified DS with the given identifier exists" in new TestCase { val ds -> project = anyRenkuProjectEntities .addDataset(datasetEntities(provenanceNonModified)) .generateOne - .map(_.to[entities.Project]) + .bimap(_.to[entities.Dataset[entities.Dataset.Provenance]], _.to[entities.Project]) upload(to = projectsDataset, project) - finder.findProject(ds.identification.identifier).unsafeRunSync().value shouldBe project.path + finder.findDSInfo(ds.identification.identifier).unsafeRunSync().value shouldBe + DSInfo(project.path, toCollectorDataset(ds)) } "return path of the project where the modified DS with the given identifier exists" in new TestCase { @@ -62,7 +64,8 @@ class ProjectFinderSpec upload(to = projectsDataset, project) - finder.findProject(modifiedDS.identification.identifier).unsafeRunSync().value shouldBe project.path + finder.findDSInfo(modifiedDS.identification.identifier).unsafeRunSync().value shouldBe + DSInfo(project.path, toCollectorDataset(modifiedDS.to[entities.Dataset[entities.Dataset.Provenance.Modified]])) } "return path of the parent project where the DS with the given identifier exists" in new TestCase { @@ -80,13 +83,17 @@ class ProjectFinderSpec val ds = project.datasets.headOption.value - finder.findProject(ds.identification.identifier).unsafeRunSync().value shouldBe parentProject.path + finder.findDSInfo(ds.identification.identifier).unsafeRunSync().value shouldBe + DSInfo(parentProject.path, toCollectorDataset(parentProject.datasets.headOption.value)) } } private trait TestCase { private implicit val logger: TestLogger[IO] = TestLogger[IO]() private implicit val sqtr: SparqlQueryTimeRecorder[IO] = TestSparqlQueryTimeRecorder[IO].unsafeRunSync() - val finder = new ProjectFinderImpl[IO](TSClient[IO](projectsDSConnectionInfo)) + val finder = new DSInfoFinderImpl[IO](TSClient[IO](projectsDSConnectionInfo)) } + + private def toCollectorDataset(ds: entities.Dataset[entities.Dataset.Provenance]) = + collector.persons.Dataset(ds.resourceId, ds.identification.identifier) } diff --git a/entities-viewings-collector/src/test/scala/io/renku/entities/viewings/collector/datasets/EventUploaderSpec.scala b/entities-viewings-collector/src/test/scala/io/renku/entities/viewings/collector/datasets/EventUploaderSpec.scala index 8d5aa96d67..d4697627bf 100644 --- a/entities-viewings-collector/src/test/scala/io/renku/entities/viewings/collector/datasets/EventUploaderSpec.scala +++ b/entities-viewings-collector/src/test/scala/io/renku/entities/viewings/collector/datasets/EventUploaderSpec.scala @@ -20,12 +20,15 @@ package io.renku.entities.viewings.collector package datasets import cats.syntax.all._ +import io.renku.entities.viewings.collector +import io.renku.entities.viewings.collector.persons.{GLUserViewedDataset, PersonViewedDatasetPersister} import io.renku.generators.Generators.Implicits._ import projects.viewed.EventPersister import io.renku.graph.model.{datasets, projects} -import io.renku.graph.model.RenkuTinyTypeGenerators.projectPaths +import io.renku.graph.model.RenkuTinyTypeGenerators.{datasetIdentifiers, datasetResourceIds, personGitLabIds, projectPaths} +import io.renku.triplesgenerator.api.events.{DatasetViewedEvent, ProjectViewedEvent, UserId} import io.renku.triplesgenerator.api.events.Generators.datasetViewedEvents -import io.renku.triplesgenerator.api.events.{ProjectViewedEvent, UserId} +import org.scalacheck.Gen import org.scalamock.scalatest.MockFactory import org.scalatest.matchers.should import org.scalatest.wordspec.AnyWordSpec @@ -37,16 +40,42 @@ class EventUploaderSpec extends AnyWordSpec with should.Matchers with MockFactor "upload" should { + "find the project which the event DS viewing should be accounted for, " + + "store a relevant ProjectViewedEvent and " + + "a UserViewedDataset event if userId given" in new TestCase { + + val event = datasetViewedEvents.generateOne.copy(maybeUserId = personGitLabIds.generateSome) + + val dsInfo = dsInfos.generateOne + givenProjectFinding(event.identifier, returning = dsInfo.some.pure[Try]) + + givenEventPersisting( + ProjectViewedEvent(dsInfo.projectPath, + projects.DateViewed(event.dateViewed.value), + event.maybeUserId.map(UserId(_)) + ), + returning = ().pure[Try] + ) + + givenPersistingUserViewedEvent(event, dsInfo, returning = ().pure[Try]) + + uploader.upload(event).success.value shouldBe () + } + "find the project which the event DS viewing should be accounted for " + - "and store a relevant ProjectViewedEvent" in new TestCase { + "and store a relevant ProjectViewedEvent " + + "without sending a UserViewedDataset event if userId not given" in new TestCase { - val event = datasetViewedEvents.generateOne + val event = datasetViewedEvents.generateOne.copy(maybeUserId = None) - val path = projectPaths.generateOne - givenProjectFinding(event.identifier, returning = path.some.pure[Try]) + val dsInfo = dsInfos.generateOne + givenProjectFinding(event.identifier, returning = dsInfo.some.pure[Try]) givenEventPersisting( - ProjectViewedEvent(path, projects.DateViewed(event.dateViewed.value), event.maybeUserId.map(UserId(_))), + ProjectViewedEvent(dsInfo.projectPath, + projects.DateViewed(event.dateViewed.value), + event.maybeUserId.map(UserId(_)) + ), returning = ().pure[Try] ) @@ -65,14 +94,26 @@ class EventUploaderSpec extends AnyWordSpec with should.Matchers with MockFactor private trait TestCase { - private val projectFinder = mock[ProjectFinder[Try]] - private val eventPersister = mock[EventPersister[Try]] - val uploader = new EventUploaderImpl[Try](projectFinder, eventPersister) + private val projectFinder = mock[DSInfoFinder[Try]] + private val eventPersister = mock[EventPersister[Try]] + private val personViewedDatasetPersister = mock[PersonViewedDatasetPersister[Try]] + val uploader = new EventUploaderImpl[Try](projectFinder, eventPersister, personViewedDatasetPersister) - def givenProjectFinding(identifier: datasets.Identifier, returning: Try[Option[projects.Path]]) = - (projectFinder.findProject _).expects(identifier).returning(returning) + def givenProjectFinding(identifier: datasets.Identifier, returning: Try[Option[DSInfo]]) = + (projectFinder.findDSInfo _).expects(identifier).returning(returning) def givenEventPersisting(event: ProjectViewedEvent, returning: Try[Unit]) = (eventPersister.persist _).expects(event).returning(returning) + + def givenPersistingUserViewedEvent(event: DatasetViewedEvent, dsInfo: DSInfo, returning: Try[Unit]) = + event.maybeUserId + .map(UserId(_)) + .map(GLUserViewedDataset(_, dsInfo.dataset, event.dateViewed)) + .map((personViewedDatasetPersister.persist _).expects(_).returning(returning)) } + + private lazy val dsInfos: Gen[DSInfo] = + (projectPaths -> + (datasetResourceIds -> datasetIdentifiers).mapN(collector.persons.Dataset)) + .mapN(DSInfo) } diff --git a/entities-viewings-collector/src/test/scala/io/renku/entities/viewings/collector/persons/PersonViewedDatasetPersisterSpec.scala b/entities-viewings-collector/src/test/scala/io/renku/entities/viewings/collector/persons/PersonViewedDatasetPersisterSpec.scala new file mode 100644 index 0000000000..9086f92314 --- /dev/null +++ b/entities-viewings-collector/src/test/scala/io/renku/entities/viewings/collector/persons/PersonViewedDatasetPersisterSpec.scala @@ -0,0 +1,244 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.entities.viewings.collector.persons + +import cats.effect.IO +import cats.syntax.all._ +import eu.timepit.refined.auto._ +import io.renku.entities.viewings.collector +import io.renku.generators.Generators.{fixed, timestamps, timestampsNotInTheFuture} +import io.renku.generators.Generators.Implicits._ +import io.renku.graph.model._ +import io.renku.graph.model.testentities._ +import io.renku.graph.model.Schemas.renku +import io.renku.interpreters.TestLogger +import io.renku.logging.TestSparqlQueryTimeRecorder +import io.renku.testtools.IOSpec +import io.renku.triplesgenerator.api.events.Generators.userIds +import io.renku.triplesgenerator.api.events.UserId +import io.renku.triplesstore._ +import io.renku.triplesstore.client.syntax._ +import io.renku.triplesstore.SparqlQuery.Prefixes +import org.scalatest.OptionValues +import org.scalatest.matchers.should +import org.scalatest.wordspec.AnyWordSpec + +import java.time.Instant + +class PersonViewedDatasetPersisterSpec + extends AnyWordSpec + with should.Matchers + with OptionValues + with IOSpec + with InMemoryJenaForSpec + with ProjectsDataset { + + "persist" should { + + "insert the given GLUserViewedDataset to the TS if it doesn't exist yet " + + "case with a user identified with GitLab id" in new TestCase { + + val userId = UserId(personGitLabIds.generateOne) + val dataset -> project = generateProjectWithCreator(userId) + + upload(to = projectsDataset, project) + + val dateViewed = datasetViewedDates(dataset.provenance.date.instant).generateOne + val event = GLUserViewedDataset(userId, toCollectorDataset(dataset), dateViewed) + + persister.persist(event).unsafeRunSync() shouldBe () + + findAllViewings shouldBe Set( + ViewingRecord(project.maybeCreator.value.resourceId, dataset.resourceId, dateViewed) + ) + } + + "insert the given GLUserViewedDataset to the TS if it doesn't exist yet " + + "case with a user identified with email" in new TestCase { + + val userId = UserId(personEmails.generateOne) + val dataset -> project = generateProjectWithCreator(userId) + + upload(to = projectsDataset, project) + + val dateViewed = datasetViewedDates(dataset.provenance.date.instant).generateOne + val event = GLUserViewedDataset(userId, toCollectorDataset(dataset), dateViewed) + + persister.persist(event).unsafeRunSync() shouldBe () + + findAllViewings shouldBe Set( + ViewingRecord(project.maybeCreator.value.resourceId, dataset.resourceId, dateViewed) + ) + } + + "update the date for the user and ds from the GLUserViewedDataset " + + "if an event for the ds already exists in the TS " + + "and the date from the new event is newer than this in the TS" in new TestCase { + + val userId = userIds.generateOne + val dataset -> project = generateProjectWithCreator(userId) + + upload(to = projectsDataset, project) + + val dateViewed = datasetViewedDates(dataset.provenance.date.instant).generateOne + val event = GLUserViewedDataset(userId, toCollectorDataset(dataset), dateViewed) + + persister.persist(event).unsafeRunSync() shouldBe () + + findAllViewings shouldBe Set( + ViewingRecord(project.maybeCreator.value.resourceId, dataset.resourceId, dateViewed) + ) + + val newDate = timestampsNotInTheFuture(butYoungerThan = event.date.value).generateAs(datasets.DateViewed) + + persister.persist(event.copy(date = newDate)).unsafeRunSync() shouldBe () + + findAllViewings shouldBe Set(ViewingRecord(project.maybeCreator.value.resourceId, dataset.resourceId, newDate)) + } + + "do nothing if the event date is older than the date in the TS" in new TestCase { + + val userId = userIds.generateOne + val dataset -> project = generateProjectWithCreator(userId) + + upload(to = projectsDataset, project) + + val dateViewed = datasetViewedDates(dataset.provenance.date.instant).generateOne + val event = GLUserViewedDataset(userId, toCollectorDataset(dataset), dateViewed) + + persister.persist(event).unsafeRunSync() shouldBe () + + findAllViewings shouldBe Set( + ViewingRecord(project.maybeCreator.value.resourceId, dataset.resourceId, dateViewed) + ) + + val newDate = timestamps(max = event.date.value.minusSeconds(1)).generateAs(datasets.DateViewed) + + persister.persist(event.copy(date = newDate)).unsafeRunSync() shouldBe () + + findAllViewings shouldBe Set(ViewingRecord(project.maybeCreator.value.resourceId, dataset.resourceId, dateViewed)) + } + + "update the date for the user and project from the GLUserViewedProject " + + "and leave other user viewings if they exist" in new TestCase { + + val userId = userIds.generateOne + val dataset1 -> project1 = generateProjectWithCreator(userId) + val dataset2 -> project2 = generateProjectWithCreator(userId) + + upload(to = projectsDataset, project1, project2) + + val dataset1DateViewed = datasetViewedDates(dataset1.provenance.date.instant).generateOne + val dataset1Event = + collector.persons.GLUserViewedDataset(userId, toCollectorDataset(dataset1), dataset1DateViewed) + persister.persist(dataset1Event).unsafeRunSync() shouldBe () + + val dataset2DateViewed = datasetViewedDates(dataset2.provenance.date.instant).generateOne + val dataset2Event = + collector.persons.GLUserViewedDataset(userId, toCollectorDataset(dataset2), dataset2DateViewed) + persister.persist(dataset2Event).unsafeRunSync() shouldBe () + + findAllViewings shouldBe Set( + ViewingRecord(project1.maybeCreator.value.resourceId, dataset1.resourceId, dataset1DateViewed), + ViewingRecord(project2.maybeCreator.value.resourceId, dataset2.resourceId, dataset2DateViewed) + ) + + val newDate = + timestampsNotInTheFuture(butYoungerThan = dataset1Event.date.value).generateAs(datasets.DateViewed) + + persister.persist(dataset1Event.copy(date = newDate)).unsafeRunSync() shouldBe () + + findAllViewings shouldBe Set( + ViewingRecord(project1.maybeCreator.value.resourceId, dataset1.resourceId, newDate), + ViewingRecord(project2.maybeCreator.value.resourceId, dataset2.resourceId, dataset2DateViewed) + ) + } + + "do nothing if the given event is for a non-existing user" in new TestCase { + + val dataset -> _ = generateProjectWithCreator(userIds.generateOne) + + val event = collector.persons.GLUserViewedDataset(userIds.generateOne, + toCollectorDataset(dataset), + datasetViewedDates(dataset.provenance.date.instant).generateOne + ) + + persister.persist(event).unsafeRunSync() shouldBe () + + findAllViewings shouldBe Set.empty + } + } + + private trait TestCase { + private implicit val logger: TestLogger[IO] = TestLogger[IO]() + private implicit val sqtr: SparqlQueryTimeRecorder[IO] = TestSparqlQueryTimeRecorder[IO].unsafeRunSync() + private val tsClient = TSClient[IO](projectsDSConnectionInfo) + val persister = new PersonViewedDatasetPersisterImpl[IO](tsClient, PersonFinder(tsClient)) + } + + private def generateProjectWithCreator(userId: UserId) = { + + val creator = userId + .fold( + glId => personEntities(maybeGitLabIds = fixed(glId.some)).map(removeOrcidId), + email => personEntities(withoutGitLabId, maybeEmails = fixed(email.some)).map(removeOrcidId) + ) + .generateSome + + anyRenkuProjectEntities + .map(replaceProjectCreator(creator)) + .addDataset(datasetEntities(provenanceInternal)) + .generateOne + .bimap( + _.to[entities.Dataset[entities.Dataset.Provenance.Internal]], + _.to[entities.Project] + ) + } + + private def findAllViewings = + runSelect( + on = projectsDataset, + SparqlQuery.of( + "test find user project viewings", + Prefixes of renku -> "renku", + sparql"""|SELECT ?id ?datasetId ?date + |FROM ${GraphClass.PersonViewings.id} { + | ?id renku:viewedDataset ?viewingId. + | ?viewingId renku:dataset ?datasetId; + | renku:dateViewed ?date. + |} + |""".stripMargin + ) + ).unsafeRunSync() + .map(row => + ViewingRecord(persons.ResourceId(row("id")), + datasets.ResourceId(row("datasetId")), + datasets.DateViewed(Instant.parse(row("date"))) + ) + ) + .toSet + + private case class ViewingRecord(userId: persons.ResourceId, + datasetId: datasets.ResourceId, + date: datasets.DateViewed + ) + + private def toCollectorDataset(ds: entities.Dataset[entities.Dataset.Provenance]) = + collector.persons.Dataset(ds.resourceId, ds.identification.identifier) +} diff --git a/entities-viewings-collector/src/test/scala/io/renku/entities/viewings/collector/persons/PersonViewedProjectPersisterSpec.scala b/entities-viewings-collector/src/test/scala/io/renku/entities/viewings/collector/persons/PersonViewedProjectPersisterSpec.scala index 00a3c8dc20..0e33d84c22 100644 --- a/entities-viewings-collector/src/test/scala/io/renku/entities/viewings/collector/persons/PersonViewedProjectPersisterSpec.scala +++ b/entities-viewings-collector/src/test/scala/io/renku/entities/viewings/collector/persons/PersonViewedProjectPersisterSpec.scala @@ -188,7 +188,8 @@ class PersonViewedProjectPersisterSpec private trait TestCase { private implicit val logger: TestLogger[IO] = TestLogger[IO]() private implicit val sqtr: SparqlQueryTimeRecorder[IO] = TestSparqlQueryTimeRecorder[IO].unsafeRunSync() - val persister = new PersonViewedProjectPersisterImpl[IO](TSClient[IO](projectsDSConnectionInfo)) + private val tsClient = TSClient[IO](projectsDSConnectionInfo) + val persister = new PersonViewedProjectPersisterImpl[IO](tsClient, PersonFinder(tsClient)) } private def generateProjectWithCreator(userId: UserId) = { @@ -235,5 +236,5 @@ class PersonViewedProjectPersisterSpec ) private def toCollectorProject(project: entities.Project) = - collector.Project(project.resourceId, project.path) + collector.persons.Project(project.resourceId, project.path) } diff --git a/entities-viewings-collector/src/test/scala/io/renku/entities/viewings/collector/projects/viewed/EventPersisterSpec.scala b/entities-viewings-collector/src/test/scala/io/renku/entities/viewings/collector/projects/viewed/EventPersisterSpec.scala index 76e9589bda..266f428183 100644 --- a/entities-viewings-collector/src/test/scala/io/renku/entities/viewings/collector/projects/viewed/EventPersisterSpec.scala +++ b/entities-viewings-collector/src/test/scala/io/renku/entities/viewings/collector/projects/viewed/EventPersisterSpec.scala @@ -136,7 +136,7 @@ class EventPersisterSpec event.maybeUserId.map(userId => (personViewingPersister.persist _) .expects( - GLUserViewedProject(userId, collector.Project(project.resourceId, project.path), event.dateViewed) + GLUserViewedProject(userId, collector.persons.Project(project.resourceId, project.path), event.dateViewed) ) .returning(returning) ) From 99e5ee04421c436398aa33bd2dad341acb27b7b2 Mon Sep 17 00:00:00 2001 From: RenkuBot <53332360+RenkuBot@users.noreply.github.com> Date: Sat, 1 Apr 2023 09:09:03 +0200 Subject: [PATCH 7/8] chore: Update sentry-logback from 6.16.0 to 6.17.0 (#1418) Co-authored-by: RenkuBot --- project/Dependencies.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 9f20eb346b..e577971cef 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -30,7 +30,7 @@ object Dependencies { val scalamock = "5.2.0" val scalatest = "3.2.15" val scalatestScalacheck = "3.2.2.0" - val sentryLogback = "6.16.0" + val sentryLogback = "6.17.0" val skunk = "0.5.1" val swaggerParser = "2.1.13" val testContainersScala = "0.40.14" From d530be9d950450235de5e553a1a3d5ae0be8796d Mon Sep 17 00:00:00 2001 From: Jakub Chrobasik Date: Mon, 3 Apr 2023 15:09:23 +0200 Subject: [PATCH 8/8] fix: Throttler not to die losing semaphore permit --- .../scala/io/renku/control/Throttler.scala | 77 +++++++----------- .../io/renku/http/client/RestClient.scala | 20 +++-- .../io/renku/control/ThrottlerSpec.scala | 80 ++++++++++-------- .../io/renku/http/client/RestClientSpec.scala | 81 ++++++++++--------- 4 files changed, 131 insertions(+), 127 deletions(-) diff --git a/graph-commons/src/main/scala/io/renku/control/Throttler.scala b/graph-commons/src/main/scala/io/renku/control/Throttler.scala index bc4a010dd4..d124a955f8 100644 --- a/graph-commons/src/main/scala/io/renku/control/Throttler.scala +++ b/graph-commons/src/main/scala/io/renku/control/Throttler.scala @@ -19,75 +19,60 @@ package io.renku.control import cats.MonadThrow +import cats.effect._ import cats.effect.kernel.Clock import cats.effect.std.Semaphore -import cats.effect.{Concurrent, Ref, Temporal} import cats.syntax.all._ -import java.util.concurrent.TimeUnit import scala.concurrent.duration._ trait Throttler[F[_], +ThrottlingTarget] { - def acquire(): F[Unit] - def release(): F[Unit] + def throttle[O](value: F[O]): F[O] } -final class StandardThrottler[F[_]: MonadThrow: Temporal: Clock, ThrottlingTarget] private[control] ( - rateLimit: RateLimit[ThrottlingTarget], - semaphore: Semaphore[F], - workersStartTimes: Ref[F, List[Long]] +final class StandardThrottler[F[_]: MonadThrow: Async: Clock, ThrottlingTarget] private[control] ( + rateLimit: RateLimit[ThrottlingTarget], + semaphore: Semaphore[F] ) extends Throttler[F, ThrottlingTarget] { - private val MinTimeGap = (rateLimit.per.multiplierFor(NANOSECONDS) / rateLimit.items.value).toLong - private val NextAttemptSleep = FiniteDuration(MinTimeGap / 10, TimeUnit.NANOSECONDS) + private val MinTimeGap = (rateLimit.per.multiplierFor(NANOSECONDS) / rateLimit.items.value).toLong + private val NextAttemptSleep = FiniteDuration(MinTimeGap / 10, NANOSECONDS) + private val previousStartedAt = Ref.unsafe[F, Long](0L) - override def acquire(): F[Unit] = for { - _ <- semaphore.acquire - startTimes <- workersStartTimes.get - now <- Clock[F].monotonic - _ <- verifyThroughput(startTimes, now.toNanos) - } yield () + override def throttle[O](value: F[O]): F[O] = + waitIfTooFast >> value - private def verifyThroughput(startTimes: List[Long], now: Long) = - if (notTooEarly(startTimes, now)) for { - _ <- workersStartTimes.modify(old => (startTimes :+ now) -> old) - _ <- semaphore.release - } yield () - else - for { - _ <- semaphore.release - _ <- Temporal[F] sleep NextAttemptSleep - _ <- acquire() - } yield () - - private def notTooEarly(startTimes: List[Long], now: Long): Boolean = { - val (_, durations) = (startTimes.tail :+ now).foldLeft(startTimes.head -> List.empty[Long]) { - case ((previous, durationsSoFar), current) => - current -> (durationsSoFar :+ (current - previous)) + private def waitIfTooFast: F[Unit] = + semaphore.tryPermit.use[Unit] { + case true => + waitIfTooEarlyForNext >> + updateStartedAt() + case false => + Temporal[F].delayBy(waitIfTooFast, NextAttemptSleep) } - durations.forall(_ >= MinTimeGap) - } + private def waitIfTooEarlyForNext: F[Unit] = + (previousStartedAt.get -> now) + .flatMapN { + case (previousStartedAt, now) if (now - previousStartedAt) >= MinTimeGap => ().pure[F] + case _ => Temporal[F].delayBy(waitIfTooEarlyForNext, NextAttemptSleep) + } + + private def updateStartedAt() = + now >>= previousStartedAt.set - override def release(): F[Unit] = for { - _ <- semaphore.acquire - _ <- workersStartTimes.modify(old => old.tail -> old) - _ <- semaphore.release - } yield () + private def now = Clock[F].monotonic.map(_.toNanos) } object Throttler { - def apply[F[_]: Concurrent: Temporal: Clock, ThrottlingTarget]( + def apply[F[_]: Concurrent: Async: Clock, ThrottlingTarget]( rateLimit: RateLimit[ThrottlingTarget] - ): F[Throttler[F, ThrottlingTarget]] = for { - semaphore <- Semaphore[F](1) - workersStartTimes <- Clock[F].monotonic flatMap (now => Ref.of(List(now.toNanos))) - } yield new StandardThrottler[F, ThrottlingTarget](rateLimit, semaphore, workersStartTimes) + ): F[Throttler[F, ThrottlingTarget]] = + Semaphore[F](1).map(new StandardThrottler[F, ThrottlingTarget](rateLimit, _)) def noThrottling[F[_]: MonadThrow]: Throttler[F, Nothing] = new Throttler[F, Nothing] { - override def acquire(): F[Unit] = ().pure[F] - override def release(): F[Unit] = ().pure[F] + override def throttle[O](value: F[O]): F[O] = value } } diff --git a/graph-commons/src/main/scala/io/renku/http/client/RestClient.scala b/graph-commons/src/main/scala/io/renku/http/client/RestClient.scala index 4d8f55f7a4..d1b4199bd6 100644 --- a/graph-commons/src/main/scala/io/renku/http/client/RestClient.scala +++ b/graph-commons/src/main/scala/io/renku/http/client/RestClient.scala @@ -33,14 +33,14 @@ import io.renku.http.client.RestClientError._ import io.renku.logging.ExecutionTimeRecorder import io.renku.tinytypes.ByteArrayTinyType import io.renku.tinytypes.contenttypes.ZippedContent +import org.http4s._ import org.http4s.AuthScheme.Bearer import org.http4s.Credentials.Token import org.http4s.Status.BadRequest -import org.http4s._ import org.http4s.client.{Client, ConnectionFailure} import org.http4s.ember.client.EmberClientBuilder import org.http4s.ember.core.EmberException -import org.http4s.headers.{Authorization, `Content-Disposition`, `Content-Type`} +import org.http4s.headers.{`Content-Disposition`, `Content-Type`, Authorization} import org.http4s.multipart.{Multiparts, Part} import org.typelevel.ci._ import org.typelevel.log4cats.Logger @@ -109,11 +109,9 @@ abstract class RestClient[F[_]: Async: Logger, ThrottlingTarget]( request: HttpRequest[F] )(mapResponse: ResponseMapping[ResultType]): F[ResultType] = httpClientBuilder.build.use { httpClient => - for { - _ <- throttler.acquire() - callResult <- measureExecutionTime(callRemote(httpClient, request, mapResponse, attempt = 1), request) - _ <- throttler.release() - } yield callResult + throttler.throttle { + measureExecutionTime(callRemote(httpClient, request, mapResponse, attempt = 1), request) + } } private def httpClientBuilder: EmberClientBuilder[F] = { @@ -181,7 +179,7 @@ abstract class RestClient[F[_]: Async: Logger, ThrottlingTarget]( mapResponse: ResponseMapping[T], attempt: Int ): PartialFunction[Throwable, F[T]] = { - case error: RestClientError => throttler.release() >> error.raiseError[F, T] + case error: RestClientError => error.raiseError[F, T] case ConnectionError(exception) if attempt <= maxRetries.value => for { _ <- Logger[F].warn(LogMessage(request.request, s"timed out -> retrying attempt $attempt", exception)) @@ -189,12 +187,12 @@ abstract class RestClient[F[_]: Async: Logger, ThrottlingTarget]( result <- callRemote(httpClient, request, mapResponse, attempt + 1) } yield result case ConnectionError(exception) if attempt > maxRetries.value => - throttler.release() >> ConnectivityException(LogMessage(request.request, exception), exception).raiseError[F, T] + ConnectivityException(LogMessage(request.request, exception), exception).raiseError[F, T] case NonFatal(exception) => - throttler.release() >> ClientException(LogMessage(request.request, exception), exception).raiseError[F, T] + ClientException(LogMessage(request.request, exception), exception).raiseError[F, T] } - object ConnectionError { + private object ConnectionError { def unapply(ex: Throwable): Option[Throwable] = ex match { case _: ConnectionFailure | _: ConnectException | _: SocketException | _: UnknownHostException => diff --git a/graph-commons/src/test/scala/io/renku/control/ThrottlerSpec.scala b/graph-commons/src/test/scala/io/renku/control/ThrottlerSpec.scala index 5216e0f80a..7521186151 100644 --- a/graph-commons/src/test/scala/io/renku/control/ThrottlerSpec.scala +++ b/graph-commons/src/test/scala/io/renku/control/ThrottlerSpec.scala @@ -19,6 +19,7 @@ package io.renku.control import cats.effect._ +import cats.effect.std.CountDownLatch import cats.syntax.all._ import eu.timepit.refined.auto._ import io.renku.control.RateLimitUnit._ @@ -26,45 +27,49 @@ import io.renku.testtools.IOSpec import org.scalatest.matchers.should import org.scalatest.wordspec.AnyWordSpec -import java.util.concurrent.ConcurrentHashMap import scala.concurrent.duration._ -import scala.jdk.CollectionConverters._ class ThrottlerSpec extends AnyWordSpec with IOSpec with should.Matchers { - "Throttler" should { + "throttle" should { "enforce processing with throughput not greater than demanded" in new TestCase { val tasksNumber = 20 + val rate = RateLimit[ThrottlingTarget](10L, per = Second) val startTime = { for { - throttler <- Throttler[IO, ThrottlingTarget](RateLimit(10L, per = Second)) + throttler <- Throttler[IO, ThrottlingTarget](rate) startTime <- Clock[IO].monotonic - _ <- processConcurrently(tasksNumber, use = throttler) + latch <- processConcurrently(tasksNumber, use = throttler) + _ <- latch.await } yield startTime }.unsafeRunSync() - val startDelays = tasksStartDelays(startTime.toMillis) - startDelays.sum / startDelays.size should be >= 100L + val avgDelay = tasksStartDelays(startTime.toMillis).tail.sum / (tasksNumber - 1) + avgDelay should be >= 100L + avgDelay should be <= 300L totalTasksStartDelay(startTime.toMillis) should be > (tasksNumber * 100L) } "not sequence work items but process them in parallel" in new TestCase { - val tasksNumber = 20 + val tasksNumber = 20 + val rate = RateLimit[ThrottlingTarget](200L, per = Second) + val taskProcessingTime = 500 millis val startTime = { for { - throttler <- Throttler[IO, ThrottlingTarget](RateLimit(200L, per = Second)) + throttler <- Throttler[IO, ThrottlingTarget](rate) startTime <- Clock[IO].monotonic - _ <- processConcurrently(tasksNumber, use = throttler, taskProcessingTime = Some(1000 millis)) + latch <- processConcurrently(tasksNumber, use = throttler, taskProcessingTime = taskProcessingTime.some) + _ <- latch.await } yield startTime }.unsafeRunSync() - totalTasksStartDelay(startTime.toMillis) should be < (tasksNumber * 1000L) + totalTasksStartDelay(startTime.toMillis) should be < (tasksNumber * taskProcessingTime.toMillis) } } @@ -77,7 +82,8 @@ class ThrottlerSpec extends AnyWordSpec with IOSpec with should.Matchers { val startTime = { for { startTime <- Clock[IO].monotonic - _ <- processConcurrently(tasksNumber, use = Throttler.noThrottling[IO]) + latch <- processConcurrently(tasksNumber, use = Throttler.noThrottling[IO]) + _ <- latch.await } yield startTime }.unsafeRunSync() @@ -91,37 +97,45 @@ class ThrottlerSpec extends AnyWordSpec with IOSpec with should.Matchers { private trait TestCase { - val register = new ConcurrentHashMap[String, Long]() + private val register = Ref.unsafe[IO, List[Long]](List.empty) def processConcurrently[ThrottlingTarget](tasks: Int, use: Throttler[IO, ThrottlingTarget], taskProcessingTime: Option[FiniteDuration] = None - ) = ((1 to tasks) map (useThrottledResource(_, use, taskProcessingTime))).toList.parSequence + ): IO[CountDownLatch[IO]] = + CountDownLatch[IO](tasks) + .flatTap { latch => + (1 to tasks).toList.map(_ => useThrottledResource(use, latch, taskProcessingTime)).parSequence + } - private def useThrottledResource[Target](name: Int, - throttler: Throttler[IO, Target], + private def useThrottledResource[Target](throttler: Throttler[IO, Target], + latch: CountDownLatch[IO], taskProcessingTime: Option[FiniteDuration] - ): IO[Unit] = for { - _ <- throttler.acquire() - greenLight <- Clock[IO].monotonic - _ <- register.put(name.toString, greenLight.toMillis).pure[IO] - _ <- taskProcessingTime.map(Temporal[IO].sleep) getOrElse IO.unit - _ <- throttler.release() - } yield () + ): IO[Unit] = + throttler.throttle { + noteProcessingStartTime >> + latch.release >> + taskProcessingTime.map(Temporal[IO].sleep).getOrElse(IO.unit) + } + + private def noteProcessingStartTime = + Clock[IO].monotonic >>= (scheduledAt => register.update(times => scheduledAt.toMillis :: times)) def tasksStartDelays(startTime: Long): Seq[Long] = - register.asScala.values - .map(greenLight => greenLight - startTime) - .toList - .sorted - .foldLeft(List.empty[Long]) { case (diffs, item) => - diffs :+ item - diffs.sum - } + register.get + .map( + _.reverse.foldLeft(List.empty[Long], startTime) { case ((diffs, previousStart), item) => + (item - previousStart :: diffs) -> item + } + ) + .map(_._1) + .unsafeRunSync() + .reverse def totalTasksStartDelay(startTime: Long): Long = - register.asScala.values - .map(greenLight => greenLight - startTime) - .sum + register.get + .map(_.map(processingStartTime => processingStartTime - startTime).sum) + .unsafeRunSync() } private trait ThrottlingTarget diff --git a/graph-commons/src/test/scala/io/renku/http/client/RestClientSpec.scala b/graph-commons/src/test/scala/io/renku/http/client/RestClientSpec.scala index aa828807f0..9a2f7bdc37 100644 --- a/graph-commons/src/test/scala/io/renku/http/client/RestClientSpec.scala +++ b/graph-commons/src/test/scala/io/renku/http/client/RestClientSpec.scala @@ -18,7 +18,7 @@ package io.renku.http.client -import cats.effect.IO +import cats.effect.{IO, Ref} import cats.syntax.all._ import com.github.tomakehurst.wiremock.client.WireMock._ import com.github.tomakehurst.wiremock.http.Fault @@ -30,8 +30,8 @@ import io.circe.{Decoder, DecodingFailure, Json} import io.prometheus.client.Histogram import io.renku.config.ServiceUrl import io.renku.control.Throttler -import io.renku.generators.Generators.Implicits._ import io.renku.generators.Generators._ +import io.renku.generators.Generators.Implicits._ import io.renku.http.client.RestClientError._ import io.renku.interpreters.TestLogger import io.renku.interpreters.TestLogger.Level.Warn @@ -41,13 +41,14 @@ import io.renku.testtools.IOSpec import io.renku.tinytypes.ByteArrayTinyType import io.renku.tinytypes.TestTinyTypes.ByteArrayTestType import io.renku.tinytypes.contenttypes.ZippedContent +import org.http4s.{multipart => _, _} import org.http4s.MediaType._ import org.http4s.Method.{GET, POST} import org.http4s.circe.jsonOf -import org.http4s.{multipart => _, _} import org.scalamock.scalatest.MockFactory import org.scalatest.matchers.should import org.scalatest.wordspec.AnyWordSpec +import org.scalatest.OptionValues import org.typelevel.log4cats.Logger import java.net.ConnectException @@ -62,7 +63,8 @@ class RestClientSpec with IOSpec with ExternalServiceStubbing with MockFactory - with should.Matchers { + with should.Matchers + with OptionValues { "send" should { @@ -74,27 +76,27 @@ class RestClientSpec .willReturn(ok("1")) } - verifyThrottling() - client.callRemote(mapResponseToInt).unsafeRunSync() shouldBe 1 + verifyThrottling() + logger.loggedOnly(Warn(s"GET $hostUrl/resource finished${executionTimeRecorder.executionTimeInfo}")) } "succeed returning value calculated with the given response mapping rules " + - "and do not measure execution time if Time Recorder not given" in new TestCase { + "and do not measure execution time if TimeRecorder not given" in new TestCase { stubFor { get("/resource") .willReturn(ok("1")) } - verifyThrottling() - override val client = new TestRestClient(hostUrl, throttler, maybeTimeRecorder = None) client.callRemote(mapResponseToInt).unsafeRunSync() shouldBe 1 + verifyThrottling() + logger.expectNoLogs() } @@ -106,11 +108,11 @@ class RestClientSpec .willReturn(ok("1")) } - verifyThrottling() - val requestName: String Refined NonEmpty = "some request" client.callRemote(requestName).unsafeRunSync() shouldBe 1 + verifyThrottling() + logger.loggedOnly(Warn(s"$requestName finished${executionTimeRecorder.executionTimeInfo}")) } @@ -121,12 +123,12 @@ class RestClientSpec .willReturn(ok("1")) } - verifyThrottling() - val requestName: String Refined NonEmpty = "some request" client.callRemote(requestName).unsafeRunSync() shouldBe 1 - val Some(sample) = histogram.collect().asScala.flatMap(_.samples.asScala).lastOption + verifyThrottling() + + val sample = histogram.collect().asScala.flatMap(_.samples.asScala).last sample.value should be >= 0d sample.labelNames.asScala should contain only histogramLabel.value sample.labelValues.asScala should contain only requestName.value @@ -139,13 +141,13 @@ class RestClientSpec .willReturn(ok("1")) } - verifyThrottling() - override val histogram = Histogram.build("histogram", "help").create() client.callRemote(mapResponseToInt).unsafeRunSync() shouldBe 1 - val Some(sample) = histogram.collect().asScala.flatMap(_.samples.asScala).lastOption + verifyThrottling() + + val sample = histogram.collect().asScala.flatMap(_.samples.asScala).last sample.value should be >= 0d sample.labelNames.asScala shouldBe empty sample.labelValues.asScala shouldBe empty @@ -162,11 +164,11 @@ class RestClientSpec ) } - verifyThrottling() - intercept[UnexpectedResponseException] { client.callRemote(mapResponseToInt).unsafeRunSync() }.getMessage shouldBe s"GET $hostUrl/resource returned ${Status.NotFound}; body: some body" + + verifyThrottling() } "fail if remote responds with an empty body and status which doesn't match the response mapping rules" in new TestCase { @@ -176,11 +178,11 @@ class RestClientSpec .willReturn(noContent()) } - verifyThrottling() - intercept[UnexpectedResponseException] { client.callRemote(mapResponseToInt).unsafeRunSync() }.getMessage shouldBe s"GET $hostUrl/resource returned ${Status.NoContent}; body: " + + verifyThrottling() } "fail if remote responds with a BAD_REQUEST and it's not mapped in the given response mapping rules" in new TestCase { @@ -191,11 +193,11 @@ class RestClientSpec .willReturn(aResponse.withStatus(Status.BadRequest.code).withBody(responseBody)) } - verifyThrottling() - intercept[BadRequestException] { client.callRemote(mapResponseToInt).unsafeRunSync() }.getMessage shouldBe s"GET $hostUrl/resource returned ${Status.BadRequest}; body: $responseBody" + + verifyThrottling() } "fail if remote responds with a body which causes exception during mapping" in new TestCase { @@ -205,12 +207,12 @@ class RestClientSpec .willReturn(ok("non int")) } - verifyThrottling() - val exception = intercept[MappingException] { client.callRemote(mapResponseToInt).unsafeRunSync() } + verifyThrottling() + exception.getMessage shouldBe s"""GET $hostUrl/resource returned ${Status.Ok}; error: For input string: "non int"""" exception.getCause shouldBe a[NumberFormatException] } @@ -223,8 +225,6 @@ class RestClientSpec .willReturn(okJson(jsonBody)) } - verifyThrottling() - val customDecodingFailure = nonEmptyStrings().generateOne implicit val decoder: Decoder[Boolean] = Decoder.instance(_ => DecodingFailure(customDecodingFailure, Nil).asLeft) implicit val entityDecoder: EntityDecoder[IO, Boolean] = jsonOf[IO, Boolean] @@ -237,12 +237,15 @@ class RestClientSpec client.callRemote(mapResponseToBoolean).unsafeRunSync() } + verifyThrottling() + exception.getMessage should startWith(s"""GET $hostUrl/resource returned ${Status.Ok}; error: """) exception.getMessage should include(s" $jsonBody") exception.getMessage should endWith(s" $customDecodingFailure") } "fail after retrying if there is a persistent connectivity problem" in { + implicit val logger: TestLogger[IO] = TestLogger[IO]() val exceptionMessage = "Connection refused" @@ -269,12 +272,12 @@ class RestClientSpec .willReturn(aResponse withFault fault) } - verifyThrottling() - val exception = intercept[ConnectivityException] { client.callRemote(mapResponseToInt).unsafeRunSync() } + verifyThrottling() + val causeMessage = exception.getCause.getMessage logger.loggedOnly( @@ -364,27 +367,31 @@ class RestClientSpec .willReturn(ok("1")) } - verifyThrottling() - client.callMultipartEndpoint(jsonPart, textPart, zippedPart).unsafeRunSync() shouldBe 1 + verify { postRequestedFor(urlEqualTo("/resource")) } + + verifyThrottling() } } private trait TestCase { val histogramLabel: String Refined NonEmpty = "label" - val histogram = Histogram.build("histogram", "help").labelNames(histogramLabel.value).create() - val throttler = mock[Throttler[IO, Any]] + val histogram = Histogram.build("histogram", "help").labelNames(histogramLabel.value).create() + private val throttlerUsed = Ref.unsafe[IO, Boolean](false) + val throttler = new Throttler[IO, Any] { + override def throttle[O](value: IO[O]): IO[O] = throttlerUsed.set(true) >> value + } implicit val logger: TestLogger[IO] = TestLogger[IO]() val executionTimeRecorder = TestExecutionTimeRecorder[IO](Some(histogram)) val client = new TestRestClient(hostUrl, throttler, Some(executionTimeRecorder)) - def verifyThrottling() = inSequence { - (throttler.acquire _).expects().returning(IO.unit) - (throttler.release _).expects().returning(IO.unit) - } + def verifyThrottling() = + withClue("throttler called:") { + throttlerUsed.get.unsafeRunSync() shouldBe true + } } private lazy val hostUrl = ServiceUrl(externalServiceBaseUrl)