Skip to content

Commit

Permalink
Merge pull request #83 from data-catering/http-validation
Browse files Browse the repository at this point in the history
Add in data validation for HTTP requests and responses
  • Loading branch information
pflooky authored Dec 6, 2024
2 parents e301fc7 + 5f32c7b commit 827305d
Show file tree
Hide file tree
Showing 57 changed files with 1,029 additions and 807 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package io.github.datacatering.datacaterer.api

import com.fasterxml.jackson.databind.annotation.JsonSerialize
import com.softwaremill.quicklens.ModifyPimp
import io.github.datacatering.datacaterer.api.ValidationHelper.cleanColumnName
import io.github.datacatering.datacaterer.api.connection.{ConnectionTaskBuilder, FileBuilder}
import io.github.datacatering.datacaterer.api.model.ConditionType.ConditionType
import io.github.datacatering.datacaterer.api.model.Constants.{AGGREGATION_AVG, AGGREGATION_COUNT, AGGREGATION_MAX, AGGREGATION_MIN, AGGREGATION_STDDEV, AGGREGATION_SUM, DEFAULT_VALIDATION_JOIN_TYPE, DEFAULT_VALIDATION_WEBHOOK_HTTP_DATA_SOURCE_NAME, VALIDATION_COLUMN_NAME_COUNT_BETWEEN, VALIDATION_COLUMN_NAME_COUNT_EQUAL, VALIDATION_COLUMN_NAME_MATCH_ORDER, VALIDATION_COLUMN_NAME_MATCH_SET, VALIDATION_PREFIX_JOIN_EXPRESSION, VALIDATION_UNIQUE}
Expand Down Expand Up @@ -164,7 +165,7 @@ case class ValidationBuilder(validation: Validation = ExpressionValidation(), op
* @return ColumnValidationBuilder
*/
def col(column: String): ColumnValidationBuilder = {
ColumnValidationBuilder(this, column)
ColumnValidationBuilder(this, cleanColumnName(column))
}

/**
Expand Down Expand Up @@ -932,4 +933,8 @@ case class CombinationPreFilterBuilder(
}.mkString(" ")
)
}
}
}

object ValidationHelper {
def cleanColumnName(column: String): String = column.split("\\.").map(c => s"`$c`").mkString(".")
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ object Constants {
lazy val STATIC = "static"
lazy val CLUSTERING_POSITION = "clusteringPos"
lazy val METADATA_IDENTIFIER = "metadataIdentifier"
lazy val VALIDATION_IDENTIFIER = "validationIdentifier"
lazy val FIELD_LABEL = "label"
lazy val IS_PII = "isPII"
lazy val HTTP_PARAMETER_TYPE = "httpParamType"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import Constants.{DEFAULT_COUNT_RECORDS, DEFAULT_DATA_SOURCE_NAME, DEFAULT_FIELD
import scala.language.implicitConversions

case class Plan(
name: String = "Default plan",
name: String = "default_plan",
description: String = "Data generation plan",
tasks: List[TaskSummary] = List(),
sinkOptions: Option[SinkOptions] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ class DataCatererConfigurationBuilderTest extends AnyFunSuite {
test("Can create basic configuration with defaults") {
val result = DataCatererConfigurationBuilder().build

assert(result.flagsConfig == FlagsConfig())
assert(result.foldersConfig == FoldersConfig())
assert(result.metadataConfig == MetadataConfig())
assert(result.generationConfig == GenerationConfig())
assertResult(FlagsConfig())(result.flagsConfig)
assertResult(FoldersConfig())(result.foldersConfig)
assertResult(MetadataConfig())(result.metadataConfig)
assertResult(GenerationConfig())(result.generationConfig)
assert(result.connectionConfigByName.isEmpty)
assert(result.runtimeConfig.size == 16)
assert(result.master == "local[*]")
assertResult(16)(result.runtimeConfig.size)
assertResult("local[*]")(result.master)
}

test("Can create postgres connection configuration") {
Expand All @@ -27,14 +27,14 @@ class DataCatererConfigurationBuilderTest extends AnyFunSuite {
.build
.connectionConfigByName

assert(result.size == 1)
assertResult(1)(result.size)
assert(result.contains("my_postgres"))
val config = result("my_postgres")
assert(config("url") == DEFAULT_POSTGRES_URL)
assert(config("user") == DEFAULT_POSTGRES_USERNAME)
assert(config("password") == DEFAULT_POSTGRES_PASSWORD)
assert(config("format") == "jdbc")
assert(config("driver") == "org.postgresql.Driver")
assertResult(DEFAULT_POSTGRES_URL)(config("url"))
assertResult(DEFAULT_POSTGRES_USERNAME)(config("user"))
assertResult(DEFAULT_POSTGRES_PASSWORD)(config("password"))
assertResult("jdbc")(config("format"))
assertResult("org.postgresql.Driver")(config("driver"))
}

test("Can create postgres connection with custom configuration") {
Expand All @@ -43,12 +43,12 @@ class DataCatererConfigurationBuilderTest extends AnyFunSuite {
.build
.connectionConfigByName

assert(result.size == 1)
assertResult(1)(result.size)
assert(result.contains("my_postgres"))
val config = result("my_postgres")
assert(config.size == 6)
assert(config("url") == "jdbc:postgresql://localhost:5432/customer")
assert(config("stringtype") == "undefined")
assertResult(6)(config.size)
assertResult("jdbc:postgresql://localhost:5432/customer")(config("url"))
assertResult("undefined")(config("stringtype"))
}

test("Can create mysql connection configuration") {
Expand All @@ -57,14 +57,14 @@ class DataCatererConfigurationBuilderTest extends AnyFunSuite {
.build
.connectionConfigByName

assert(result.size == 1)
assertResult(1)(result.size)
assert(result.contains("my_mysql"))
val config = result("my_mysql")
assert(config("url") == DEFAULT_MYSQL_URL)
assert(config("user") == DEFAULT_MYSQL_USERNAME)
assert(config("password") == DEFAULT_MYSQL_PASSWORD)
assert(config("format") == "jdbc")
assert(config("driver") == "com.mysql.cj.jdbc.Driver")
assertResult(DEFAULT_MYSQL_URL)(config("url"))
assertResult(DEFAULT_MYSQL_USERNAME)(config("user"))
assertResult(DEFAULT_MYSQL_PASSWORD)(config("password"))
assertResult("jdbc")(config("format"))
assertResult("com.mysql.cj.jdbc.Driver")(config("driver"))
}

test("Can create cassandra connection configuration") {
Expand All @@ -73,14 +73,14 @@ class DataCatererConfigurationBuilderTest extends AnyFunSuite {
.build
.connectionConfigByName

assert(result.size == 1)
assertResult(1)(result.size)
assert(result.contains("my_cassandra"))
val config = result("my_cassandra")
assert(config("spark.cassandra.connection.host") == "cassandraserver")
assert(config("spark.cassandra.connection.port") == "9042")
assert(config("spark.cassandra.auth.username") == DEFAULT_CASSANDRA_USERNAME)
assert(config("spark.cassandra.auth.password") == DEFAULT_CASSANDRA_PASSWORD)
assert(config("format") == "org.apache.spark.sql.cassandra")
assertResult("cassandraserver")(config("spark.cassandra.connection.host"))
assertResult("9042")(config("spark.cassandra.connection.port"))
assertResult(DEFAULT_CASSANDRA_USERNAME)(config("spark.cassandra.auth.username"))
assertResult(DEFAULT_CASSANDRA_PASSWORD)(config("spark.cassandra.auth.password"))
assertResult("org.apache.spark.sql.cassandra")(config("format"))
}

test("Can create solace connection configuration") {
Expand All @@ -89,16 +89,16 @@ class DataCatererConfigurationBuilderTest extends AnyFunSuite {
.build
.connectionConfigByName

assert(result.size == 1)
assertResult(1)(result.size)
assert(result.contains("my_solace"))
val config = result("my_solace")
assert(config("url") == DEFAULT_SOLACE_URL)
assert(config("user") == DEFAULT_SOLACE_USERNAME)
assert(config("password") == DEFAULT_SOLACE_PASSWORD)
assert(config("format") == "jms")
assert(config("vpnName") == DEFAULT_SOLACE_VPN_NAME)
assert(config("connectionFactory") == DEFAULT_SOLACE_CONNECTION_FACTORY)
assert(config("initialContextFactory") == DEFAULT_SOLACE_INITIAL_CONTEXT_FACTORY)
assertResult(DEFAULT_SOLACE_URL)(config("url"))
assertResult(DEFAULT_SOLACE_USERNAME)(config("user"))
assertResult(DEFAULT_SOLACE_PASSWORD)(config("password"))
assertResult("jms")(config("format"))
assertResult(DEFAULT_SOLACE_VPN_NAME)(config("vpnName"))
assertResult(DEFAULT_SOLACE_CONNECTION_FACTORY)(config("connectionFactory"))
assertResult(DEFAULT_SOLACE_INITIAL_CONTEXT_FACTORY)(config("initialContextFactory"))
}

test("Can create kafka connection configuration") {
Expand All @@ -107,11 +107,11 @@ class DataCatererConfigurationBuilderTest extends AnyFunSuite {
.build
.connectionConfigByName

assert(result.size == 1)
assertResult(1)(result.size)
assert(result.contains("my_kafka"))
val config = result("my_kafka")
assert(config("kafka.bootstrap.servers") == DEFAULT_KAFKA_URL)
assert(config("format") == "kafka")
assertResult(DEFAULT_KAFKA_URL)(config("kafka.bootstrap.servers"))
assertResult("kafka")(config("format"))
}

test("Can create http connection configuration") {
Expand All @@ -120,11 +120,11 @@ class DataCatererConfigurationBuilderTest extends AnyFunSuite {
.build
.connectionConfigByName

assert(result.size == 1)
assertResult(1)(result.size)
assert(result.contains("my_http"))
val config = result("my_http")
assert(config("user") == "user")
assert(config("password") == "pw")
assertResult("user")(config("user"))
assertResult("pw")(config("password"))
}

test("Can enable/disable flags") {
Expand Down Expand Up @@ -165,12 +165,12 @@ class DataCatererConfigurationBuilderTest extends AnyFunSuite {
.build
.foldersConfig

assert(result.planFilePath == "/my_plan")
assert(result.taskFolderPath == "/my_task")
assert(result.recordTrackingFolderPath == "/my_record_tracking")
assert(result.validationFolderPath == "/my_validation")
assert(result.generatedReportsFolderPath == "/my_generation_results")
assert(result.generatedPlanAndTaskFolderPath == "/my_generated_plan_tasks")
assertResult("/my_plan")(result.planFilePath)
assertResult("/my_task")(result.taskFolderPath)
assertResult("/my_record_tracking")(result.recordTrackingFolderPath)
assertResult("/my_validation")(result.validationFolderPath)
assertResult("/my_generation_results")(result.generatedReportsFolderPath)
assertResult("/my_generated_plan_tasks")(result.generatedPlanAndTaskFolderPath)
}

test("Can alter metadata configurations") {
Expand All @@ -183,11 +183,11 @@ class DataCatererConfigurationBuilderTest extends AnyFunSuite {
.build
.metadataConfig

assert(result.numRecordsFromDataSource == 1)
assert(result.numRecordsForAnalysis == 2)
assert(result.numGeneratedSamples == 3)
assert(result.oneOfMinCount == 100)
assert(result.oneOfDistinctCountVsCountThreshold == 0.3)
assertResult(1)(result.numRecordsFromDataSource)
assertResult(2)(result.numRecordsForAnalysis)
assertResult(3)(result.numGeneratedSamples)
assertResult(100)(result.oneOfMinCount)
assertResult(0.3)(result.oneOfDistinctCountVsCountThreshold)
}

test("Can alter generation configurations") {
Expand All @@ -197,7 +197,7 @@ class DataCatererConfigurationBuilderTest extends AnyFunSuite {
.build
.generationConfig

assert(result.numRecordsPerBatch == 100)
assertResult(100)(result.numRecordsPerBatch)
assert(result.numRecordsPerStep.contains(10))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,21 @@ class MetadataSourceBuilderTest extends AnyFunSuite {
val result = MetadataSourceBuilder().openApi("localhost:8080").metadataSource

assert(result.isInstanceOf[OpenAPISource])
assert(result.asInstanceOf[OpenAPISource].connectionOptions == Map(SCHEMA_LOCATION -> "localhost:8080"))
assertResult(Map(SCHEMA_LOCATION -> "localhost:8080"))(result.asInstanceOf[OpenAPISource].connectionOptions)
}

test("Can create Great Expectations metadata source") {
val result = MetadataSourceBuilder().greatExpectations("/tmp/expectations").metadataSource

assert(result.isInstanceOf[GreatExpectationsSource])
assert(result.asInstanceOf[GreatExpectationsSource].connectionOptions == Map(GREAT_EXPECTATIONS_FILE -> "/tmp/expectations"))
assertResult(Map(GREAT_EXPECTATIONS_FILE -> "/tmp/expectations"))(result.asInstanceOf[GreatExpectationsSource].connectionOptions)
}

test("Can create Open Data Contract Standard metadata source") {
val result = MetadataSourceBuilder().openDataContractStandard("/tmp/odcs").metadataSource

assert(result.isInstanceOf[OpenDataContractStandardSource])
assert(result.asInstanceOf[OpenDataContractStandardSource].connectionOptions == Map(DATA_CONTRACT_FILE -> "/tmp/odcs"))
assertResult(Map(DATA_CONTRACT_FILE -> "/tmp/odcs"))(result.asInstanceOf[OpenDataContractStandardSource].connectionOptions)
}

test("Can create Open Data Contract Standard metadata source with schema name") {
Expand All @@ -80,7 +80,7 @@ class MetadataSourceBuilderTest extends AnyFunSuite {
val result = MetadataSourceBuilder().dataContractCli("/tmp/datacli").metadataSource

assert(result.isInstanceOf[DataContractCliSource])
assert(result.asInstanceOf[DataContractCliSource].connectionOptions == Map(DATA_CONTRACT_FILE -> "/tmp/datacli"))
assertResult(Map(DATA_CONTRACT_FILE -> "/tmp/datacli"))(result.asInstanceOf[DataContractCliSource].connectionOptions)
}

test("Can create Data Contract CLI metadata source with schema name") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ class PlanBuilderTest extends AnyFunSuite {
.description(desc)
.taskSummaries(taskSummaries)

assert(result.plan.name == name)
assert(result.plan.description == desc)
assert(result.plan.tasks.size == 1)
assert(result.plan.tasks.head == taskSummaries.taskSummary)
assertResult(name)(result.plan.name)
assertResult(desc)(result.plan.description)
assertResult(1)(result.plan.tasks.size)
assertResult(taskSummaries.taskSummary)(result.plan.tasks.head)
}

test("Can implement PlanRun") {
Expand Down Expand Up @@ -73,14 +73,14 @@ class PlanBuilderTest extends AnyFunSuite {
execute(List(t), p, c, List(v))
}

assert(result._tasks.size == 1)
assert(result._tasks.head.name == "my task")
assert(result._tasks.head.steps.head.schema.fields.get.head.name == "account_id")
assertResult(1)(result._tasks.size)
assertResult("my task")(result._tasks.head.name)
assertResult("account_id")(result._tasks.head.steps.head.schema.fields.get.head.name)

assert(result._plan.name == "my plan")
assert(result._plan.tasks.size == 1)
assert(result._plan.tasks.head.name == "my task")
assert(result._plan.tasks.head.dataSourceName == "account_json")
assertResult("my plan")(result._plan.name)
assertResult(1)(result._plan.tasks.size)
assertResult("my task")(result._plan.tasks.head.name)
assertResult("account_json")(result._plan.tasks.head.dataSourceName)
assert(result._plan.tasks.head.enabled)
assert(result._plan.sinkOptions.get.seed.contains("1"))
assert(result._plan.sinkOptions.get.locale.contains("en"))
Expand All @@ -106,26 +106,26 @@ class PlanBuilderTest extends AnyFunSuite {
assert(!result._configuration.flagsConfig.enableSinkMetadata)
assert(result._configuration.flagsConfig.enableSaveReports)
assert(result._configuration.flagsConfig.enableValidation)
assert(result._configuration.connectionConfigByName.size == 2)
assertResult(2)(result._configuration.connectionConfigByName.size)
assert(result._configuration.connectionConfigByName.contains("account_json"))
assert(result._configuration.connectionConfigByName("account_json") == Map("format" -> "json"))
assertResult(Map("format" -> "json"))(result._configuration.connectionConfigByName("account_json"))
assert(result._configuration.connectionConfigByName.contains("txn_db"))
assert(result._configuration.connectionConfigByName("txn_db") == Map("format" -> "postgres"))
assert(result._configuration.runtimeConfig == DataCatererConfiguration().runtimeConfig ++ Map("spark.sql.shuffle.partitions" -> "2"))
assertResult(Map("format" -> "postgres"))(result._configuration.connectionConfigByName("txn_db"))
assertResult(DataCatererConfiguration().runtimeConfig ++ Map("spark.sql.shuffle.partitions" -> "2"))(result._configuration.runtimeConfig)

assert(result._validations.size == 1)
assert(result._validations.head.dataSources.size == 1)
assertResult(1)(result._validations.size)
assertResult(1)(result._validations.head.dataSources.size)
val dataSourceHead = result._validations.head.dataSources.head
assert(dataSourceHead._1 == "account_json")
assert(dataSourceHead._2.size == 1)
assert(dataSourceHead._2.head.validations.size == 1)
assertResult("account_json")(dataSourceHead._1)
assertResult(1)(dataSourceHead._2.size)
assertResult(1)(dataSourceHead._2.head.validations.size)
val validationHead = dataSourceHead._2.head.validations.head.validation
assert(validationHead.description.contains("name is equal to Peter"))
assert(validationHead.errorThreshold.contains(0.1))
assert(validationHead.isInstanceOf[ExpressionValidation])
assert(validationHead.asInstanceOf[ExpressionValidation].expr == "name == 'Peter'")
assert(dataSourceHead._2.head.options == Map("path" -> "test/path/json"))
assert(dataSourceHead._2.head.waitCondition == PauseWaitCondition())
assertResult("name == 'Peter'")(validationHead.asInstanceOf[ExpressionValidation].expr)
assertResult(Map("path" -> "test/path/json"))(dataSourceHead._2.head.options)
assertResult(PauseWaitCondition())(dataSourceHead._2.head.waitCondition)
}

test("Can define random seed and locale that get used across all data generators") {
Expand All @@ -149,7 +149,7 @@ class PlanBuilderTest extends AnyFunSuite {
assert(result.sinkOptions.isDefined)
val fk = result.sinkOptions.get.foreignKeys
assert(fk.nonEmpty)
assert(fk.size == 1)
assertResult(1)(fk.size)
assert(fk.exists(f => f._1.startsWith("my_json") && f._1.endsWith("account_id") &&
f._2.size == 1 && f._2.head.startsWith("my_csv") && f._2.head.endsWith("account_id")
))
Expand All @@ -162,7 +162,7 @@ class PlanBuilderTest extends AnyFunSuite {
assert(result2.sinkOptions.isDefined)
val fk2 = result2.sinkOptions.get.foreignKeys
assert(fk2.nonEmpty)
assert(fk2.size == 1)
assertResult(1)(fk2.size)
}

test("Throw runtime exception when foreign key column is not defined in data sources") {
Expand Down Expand Up @@ -196,7 +196,7 @@ class PlanBuilderTest extends AnyFunSuite {
assert(result.sinkOptions.isDefined)
val fk = result.sinkOptions.get.foreignKeys
assert(fk.nonEmpty)
assert(fk.size == 1)
assertResult(1)(fk.size)
}

test("Don't throw runtime exception when delete foreign key column, defined by SQL, is not defined in data sources") {
Expand All @@ -211,9 +211,9 @@ class PlanBuilderTest extends AnyFunSuite {
assert(result.sinkOptions.isDefined)
val fk = result.sinkOptions.get.foreignKeys
assert(fk.nonEmpty)
assert(fk.size == 1)
assertResult(1)(fk.size)
assert(fk.head._2.isEmpty)
assert(fk.head._3.size == 1)
assertResult(1)(fk.head._3.size)
}

test("Can create a step that will generate records for all combinations") {
Expand Down
Loading

0 comments on commit 827305d

Please sign in to comment.