Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: avoid concurrent writes to jena #1577

Merged
merged 23 commits into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions acceptance-tests/src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,3 @@ services {
api-url = "http://localhost:9004/knowledge-graph"
}
}

projects-tokens {
db-port = 49998
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class CommitHistoryChangesSpec

sleep((10 seconds).toMillis)

`wait for events to be processed`(project.id, user.accessToken)
`wait for events to be processed`(project.id, user.accessToken, 5)

eventually {
EventLog.findEvents(project.id, events.EventStatus.TriplesStore).toSet shouldBe newCommits.toList.toSet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class CommitSyncFlowsSpec extends AcceptanceSpec with ApplicationServices with T
.status shouldBe Accepted

And("relevant commit events are processed")
`wait for events to be processed`(project.id, user.accessToken)
`wait for events to be processed`(project.id, user.accessToken, 0)

Then("the non missed events should be in the Triples Store")
eventually {
Expand All @@ -83,7 +83,7 @@ class CommitSyncFlowsSpec extends AcceptanceSpec with ApplicationServices with T
EventLog.forceCategoryEventTriggering(CategoryName("COMMIT_SYNC"), project.id)

And("commit events for the missed event are created and processed")
`wait for events to be processed`(project.id, user.accessToken)
`wait for events to be processed`(project.id, user.accessToken, 5)

Then("triples for both of the project's commits should be in the Triples Store")
eventually {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class EventFlowsSpec extends AcceptanceSpec with ApplicationServices with TSProv
.status shouldBe Accepted

And("commit events are processed")
`wait for events to be processed`(project.id, user.accessToken)
`wait for events to be processed`(project.id, user.accessToken, 5)

Then(s"all the events should get the $TriplesStore status in the Event Log")
EventLog.findEvents(project.id).map(_._2).toSet shouldBe Set(TriplesStore)
Expand Down Expand Up @@ -89,7 +89,7 @@ class EventFlowsSpec extends AcceptanceSpec with ApplicationServices with TSProv
.status shouldBe Accepted

And("relevant commit events are processed")
`wait for events to be processed`(project.id, user.accessToken)
`wait for events to be processed`(project.id, user.accessToken, 5)

And(s"all the events should get the $GenerationNonRecoverableFailure status in the Event Log")
EventLog.findEvents(project.id).map(_._2).toSet shouldBe Set(GenerationNonRecoverableFailure)
Expand Down Expand Up @@ -151,7 +151,7 @@ class EventFlowsSpec extends AcceptanceSpec with ApplicationServices with TSProv
.status shouldBe Accepted

And("relevant commit events are processed")
`wait for events to be processed`(project.id, user.accessToken)
`wait for events to be processed`(project.id, user.accessToken, 5)

Then(s"all the events should get the $TransformationNonRecoverableFailure status in the Event Log")
EventLog.findEvents(project.id).map(_._2).toSet shouldBe Set(TransformationNonRecoverableFailure)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class ProjectReProvisioningSpec extends AcceptanceSpec with ApplicationServices

Then("the old data in the TS should be replaced with the new")
sleep((10 seconds).toMillis)
`wait for events to be processed`(project.id, user.accessToken)
`wait for events to be processed`(project.id, user.accessToken, 5)

eventually {
knowledgeGraphClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,22 @@

package io.renku.graph.acceptancetests.db

import cats.Applicative
import cats.data.{Kleisli, NonEmptyList}
import cats.effect.IO._
import cats.effect.unsafe.IORuntime
import cats.effect.{IO, Resource}
import cats.syntax.all._
import com.dimafeng.testcontainers.FixedHostPortGenericContainer
import io.renku.db.{DBConfigProvider, PostgresContainer, SessionResource}
import io.renku.db.{DBConfigProvider, SessionResource}
import io.renku.eventlog._
import io.renku.events.CategoryName
import io.renku.graph.model.events.{CommitId, EventId, EventStatus}
import io.renku.graph.model.projects
import io.renku.graph.model.projects.GitLabId
import natchez.Trace.Implicits.noop
import org.typelevel.log4cats.Logger
import skunk._
import skunk.codec.text.varchar
import skunk.implicits._

import scala.collection.immutable
import scala.util.Try

object EventLog extends TypeSerializers {
Expand Down Expand Up @@ -95,18 +91,6 @@ object EventLog extends TypeSerializers {
private lazy val dbConfig: DBConfigProvider.DBConfig[EventLogDB] =
new EventLogDbConfigProvider[Try].get().fold(throw _, identity)

private lazy val postgresContainer = FixedHostPortGenericContainer(
imageName = PostgresContainer.image,
env = immutable.Map("POSTGRES_USER" -> dbConfig.user.value,
"POSTGRES_PASSWORD" -> dbConfig.pass.value,
"POSTGRES_DB" -> dbConfig.name.value
),
exposedPorts = Seq(dbConfig.port.value),
exposedHostPort = dbConfig.port.value,
exposedContainerPort = dbConfig.port.value,
command = Seq(s"-p ${dbConfig.port.value}")
)

def removeGlobalCommitSyncRow(projectId: GitLabId)(implicit ioRuntime: IORuntime): Unit = execute { session =>
val command: Command[projects.GitLabId] = sql"""
DELETE FROM subscription_category_sync_time
Expand All @@ -116,19 +100,11 @@ object EventLog extends TypeSerializers {
}

def startDB()(implicit logger: Logger[IO]): IO[Unit] = for {
_ <- Applicative[IO].unlessA(postgresContainer.container.isRunning)(IO(postgresContainer.start()))
_ <- PostgresDB.startPostgres
_ <- PostgresDB.initializeDatabase(dbConfig)
_ <- logger.info("event_log DB started")
} yield ()

private lazy val sessionResource: Resource[IO, SessionResource[IO, EventLogDB]] =
Session
.pooled(
host = postgresContainer.host,
port = dbConfig.port.value,
database = dbConfig.name.value,
user = dbConfig.user.value,
password = Some(dbConfig.pass.value),
max = dbConfig.connectionPool.value
)
.map(new SessionResource(_))
PostgresDB.sessionPoolResource(dbConfig)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright 2023 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.graph.acceptancetests.db

import cats.effect._
import com.dimafeng.testcontainers.FixedHostPortGenericContainer
import eu.timepit.refined.auto._
import io.renku.db.DBConfigProvider.DBConfig
import io.renku.db.{PostgresContainer, SessionResource}
import skunk.codec.all._
import skunk.implicits._
import skunk._
import natchez.Trace.Implicits.noop
import org.typelevel.log4cats.Logger
import scala.concurrent.duration._

object PostgresDB {
private[this] val starting: Ref[IO, Boolean] = Ref.unsafe[IO, Boolean](false)

private val dbConfig = DBConfig[Any](
host = "localhost",
port = 5432,
name = "postgres",
user = "at",
pass = "at",
connectionPool = 8
)

private val postgresContainer = {
val cfg = dbConfig
FixedHostPortGenericContainer(
imageName = PostgresContainer.image,
env = Map(
"POSTGRES_USER" -> cfg.user.value,
"POSTGRES_PASSWORD" -> cfg.pass.value
),
exposedPorts = Seq(cfg.port.value),
exposedHostPort = cfg.port.value,
exposedContainerPort = cfg.port.value,
command = Seq(s"-p ${cfg.port.value}")
)
}

def startPostgres(implicit L: Logger[IO]) =
starting.getAndUpdate(_ => true).flatMap {
case false =>
IO.unlessA(postgresContainer.container.isRunning)(
IO(postgresContainer.start()) *> waitForPostgres *> L.info(
"PostgreSQL database started"
)
)

case true =>
waitForPostgres
}

private def waitForPostgres =
fs2.Stream
.awakeDelay[IO](0.5.seconds)
.evalMap { _ =>
Session
.single[IO](
host = dbConfig.host,
port = dbConfig.port,
user = dbConfig.user.value,
password = Some(dbConfig.pass.value),
database = dbConfig.name.value
)
.use(_.unique(sql"SELECT 1".query(int4)))
.attempt
}
.map(_.fold(_ => 1, _ => 0))
.take(100)
.find(_ == 0)
.compile
.drain

def sessionPool(dbCfg: DBConfig[_]): Resource[IO, Resource[IO, Session[IO]]] =
Session
.pooled[IO](
host = postgresContainer.host,
port = dbConfig.port.value,
database = dbCfg.name.value,
user = dbCfg.user.value,
password = Some(dbCfg.pass.value),
max = dbConfig.connectionPool.value
)

def sessionPoolResource[A](dbCfg: DBConfig[_]): Resource[IO, SessionResource[IO, A]] =
sessionPool(dbCfg).map(new SessionResource[IO, A](_))

def initializeDatabase(cfg: DBConfig[_]): IO[Unit] = {
val session = Session.single[IO](
host = dbConfig.host,
port = dbConfig.port,
user = dbConfig.user.value,
password = Some(dbConfig.pass.value),
database = dbConfig.name.value
)

// note: it would be simpler to use the default user that is created with the container, but that requires
// to first refactor how the configuration is loaded. Currently services load it from the file every time and so
// this creates the users as expected from the given config
val createRole: Command[Void] =
sql"create role #${cfg.user.value} with password '#${cfg.pass.value}' superuser login".command
val createDatabase: Command[Void] = sql"create database #${cfg.name.value}".command
val grants: Command[Void] = sql"grant all on database #${cfg.name.value} to #${cfg.user.value}".command

session.use { s =>
s.execute(createRole) *>
s.execute(createDatabase) *>
s.execute(grants)
}.void
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,8 @@

package io.renku.graph.acceptancetests.db

import cats.Applicative
import cats.effect.IO
import cats.effect.unsafe.IORuntime
import com.dimafeng.testcontainers.FixedHostPortGenericContainer
import io.renku.db.{DBConfigProvider, PostgresContainer}
import io.renku.db.DBConfigProvider
import io.renku.tokenrepository.repository.{ProjectsTokensDB, ProjectsTokensDbConfigProvider}
import org.typelevel.log4cats.Logger

Expand All @@ -33,20 +30,9 @@ object TokenRepository {
private lazy val dbConfig: DBConfigProvider.DBConfig[ProjectsTokensDB] =
new ProjectsTokensDbConfigProvider[Try].get().fold(throw _, identity)

private lazy val postgresContainer = FixedHostPortGenericContainer(
imageName = PostgresContainer.image,
env = Map("POSTGRES_USER" -> dbConfig.user.value,
"POSTGRES_PASSWORD" -> dbConfig.pass.value,
"POSTGRES_DB" -> dbConfig.name.value
),
exposedPorts = Seq(dbConfig.port.value),
exposedHostPort = dbConfig.port.value,
exposedContainerPort = dbConfig.port.value,
command = Seq(s"-p ${dbConfig.port.value}")
)

def startDB()(implicit ioRuntime: IORuntime, logger: Logger[IO]): IO[Unit] = for {
_ <- Applicative[IO].unlessA(postgresContainer.container.isRunning)(IO(postgresContainer.start()))
def startDB()(implicit logger: Logger[IO]): IO[Unit] = for {
_ <- PostgresDB.startPostgres
_ <- PostgresDB.initializeDatabase(dbConfig)
_ <- logger.info("projects_tokens DB started")
} yield ()
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,32 @@

package io.renku.graph.acceptancetests.db

import cats.effect.{IO, Resource, Temporal}
import cats.{Applicative, Monad}
import cats.effect.{IO, Temporal}
import eu.timepit.refined.auto._
import io.renku.db.DBConfigProvider
import io.renku.triplesgenerator.TgLockDB.SessionResource
import io.renku.triplesgenerator.{TgLockDB, TgLockDbConfigProvider}
import io.renku.triplesstore._
import org.typelevel.log4cats.Logger

import scala.concurrent.duration._
import scala.util.Try

object TriplesStore extends InMemoryJena with ProjectsDataset with MigrationsDataset {

protected override val jenaRunMode: JenaRunMode = JenaRunMode.FixedPortContainer(3030)

private val dbConfig: DBConfigProvider.DBConfig[TgLockDB] =
new TgLockDbConfigProvider[Try].get().fold(throw _, identity)

lazy val sessionResource: Resource[IO, SessionResource[IO]] =
PostgresDB.sessionPoolResource(dbConfig)

def start()(implicit logger: Logger[IO]): IO[Unit] = for {
_ <- Applicative[IO].unlessA(isRunning)(IO(container.start()))
_ <- PostgresDB.startPostgres
_ <- PostgresDB.initializeDatabase(dbConfig)
_ <- waitForReadiness
_ <- logger.info("Triples Store started")
} yield ()
Expand All @@ -40,4 +52,5 @@ object TriplesStore extends InMemoryJena with ProjectsDataset with MigrationsDat

private def waitForReadiness(implicit logger: Logger[IO]): IO[Unit] =
Monad[IO].whileM_(IO(!isRunning))(logger.info("Waiting for TS") >> (Temporal[IO] sleep (500 millis)))

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package io.renku.graph.acceptancetests.flows

import cats.data.NonEmptyList
import cats.effect.unsafe.IORuntime
import cats.syntax.all._
import io.renku.events.CategoryName
import io.renku.generators.Generators.Implicits._
import io.renku.graph.acceptancetests.data
Expand Down Expand Up @@ -74,15 +75,20 @@ trait TSProvisioning
sleep((5 seconds).toMillis)
}

`wait for events to be processed`(project.id, accessToken)
`wait for events to be processed`(project.id, accessToken, 5)
}

def `wait for events to be processed`(projectId: projects.GitLabId, accessToken: AccessToken): Assertion =
def `wait for events to be processed`(
projectId: projects.GitLabId,
accessToken: AccessToken,
expectedMinTotal: Int
): Assertion =
eventually {
val response = webhookServiceClient.`GET projects/:id/events/status`(projectId, accessToken)
response.status shouldBe Ok
response.jsonBody.hcursor.downField("activated").as[Boolean].value shouldBe true
response.jsonBody.hcursor.downField("progress").downField("percentage").as[Double].value shouldBe 100d
response.jsonBody.hcursor.downField("progress").downField("total").as[Int].value should be >= expectedMinTotal
}

def `check hook cannot be found`(projectId: projects.GitLabId, accessToken: AccessToken): Assertion = eventually {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ class DatasetsResourcesSpec
gitLabStub.setupProject(project, commitId)
mockCommitDataOnTripleGenerator(project, toPayloadJsonLD(project), commitId)
`data in the Triples Store`(project, commitId, creator.accessToken)
`wait for events to be processed`(project.id, creator.accessToken)
`wait for events to be processed`(project.id, creator.accessToken, 5)

When("an authenticated and authorised user fetches dataset details through GET knowledge-graph/datasets/:id")
val detailsResponse =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.scalatest.time.{Minutes, Second, Span}
trait AcceptanceTestPatience extends AbstractPatienceConfiguration {

implicit override val patienceConfig: PatienceConfig = PatienceConfig(
timeout = scaled(Span(4, Minutes)),
timeout = scaled(Span(6, Minutes)),
interval = scaled(Span(1, Second))
)
}
Loading
Loading