Skip to content

Commit

Permalink
#510 Use the SQL schema query generator when adding metadata to data …
Browse files Browse the repository at this point in the history
…frames.
  • Loading branch information
yruslan committed Nov 12, 2024
1 parent e28db14 commit b04783d
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@ import com.typesafe.config.Config
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.slf4j.LoggerFactory
import za.co.absa.pramen.api.Query
import za.co.absa.pramen.api.offset.{OffsetInfo, OffsetValue}
import za.co.absa.pramen.api.offset.OffsetValue
import za.co.absa.pramen.core.config.Keys
import za.co.absa.pramen.core.reader.model.{OffsetInfoParser, TableReaderJdbcConfig}
import za.co.absa.pramen.core.reader.model.TableReaderJdbcConfig
import za.co.absa.pramen.core.utils.{ConfigUtils, JdbcNativeUtils, JdbcSparkUtils, TimeUtils}

import java.time.format.DateTimeFormatter
import java.time.{Instant, LocalDate}
import scala.annotation.tailrec
import scala.util.{Failure, Success, Try}
Expand Down Expand Up @@ -214,8 +213,11 @@ class TableReaderJdbc(jdbcReaderConfig: TableReaderJdbcConfig,
}

if (isDataQuery && jdbcReaderConfig.enableSchemaMetadata) {
log.info(s"Reading JDBC metadata from the query: $sql")
JdbcSparkUtils.withJdbcMetadata(jdbcReaderConfig.jdbcConfig, sql) { (connection, jdbcMetadata) =>
val schemaQuery = tableOpt match {
case Some(table) => sqlGen.getSchemaQuery(table, Seq.empty)
case _ => JdbcSparkUtils.getSchemaQuery(sql)
}
JdbcSparkUtils.withJdbcMetadata(jdbcReaderConfig.jdbcConfig, schemaQuery) { (connection, jdbcMetadata) =>
val schemaWithMetadata = JdbcSparkUtils.addMetadataFromJdbc(df.schema, jdbcMetadata)
val schemaWithColumnDescriptions = tableOpt match {
case Some(table) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,11 @@ class TableReaderJdbcNative(jdbcReaderConfig: TableReaderJdbcConfig,
}

if (jdbcReaderConfig.enableSchemaMetadata) {
JdbcSparkUtils.withJdbcMetadata(jdbcConfig, sql) { (connection, _) =>
val schemaQuery = tableOpt match {
case Some(table) => sqlGen.getSchemaQuery(table, Seq.empty)
case _ => JdbcSparkUtils.getSchemaQuery(sql)
}
JdbcSparkUtils.withJdbcMetadata(jdbcConfig, schemaQuery) { (connection, _) =>
val schemaWithColumnDescriptions = tableOpt match {
case Some(table) =>
log.info(s"Reading JDBC metadata descriptions the table: $table")
Expand Down Expand Up @@ -129,7 +133,9 @@ class TableReaderJdbcNative(jdbcReaderConfig: TableReaderJdbcConfig,
}

if (jdbcReaderConfig.enableSchemaMetadata) {
JdbcSparkUtils.withJdbcMetadata(jdbcReaderConfig.jdbcConfig, sql) { (connection, _) =>
val schemaQuery = sqlGen.getSchemaQuery(tableName, columns)

JdbcSparkUtils.withJdbcMetadata(jdbcReaderConfig.jdbcConfig, schemaQuery) { (connection, _) =>
log.info(s"Reading JDBC metadata descriptions the table: $tableName")
df = spark.createDataFrame(df.rdd,
JdbcSparkUtils.addColumnDescriptionsFromJdbc(df.schema, sqlGen.unquote(tableName), connection))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,36 +179,35 @@ object JdbcSparkUtils {
* metadata of the query.
*
* @param jdbcConfig a JDBC configuration.
* @param nativeQuery a SQL query in the dialect native to the database.
* @param schemaQuery a SQL query in the dialect native to the database which does not return records.
* @param action the action to execute on a connection + resultset metadata.
*/
def withJdbcMetadata(jdbcConfig: JdbcConfig,
nativeQuery: String)
schemaQuery: String)
(action: (Connection, ResultSetMetaData) => Unit): Unit = {
val (url, connection) = JdbcNativeUtils.getConnection(jdbcConfig)
val (_, connection) = JdbcNativeUtils.getConnection(jdbcConfig)

connection.setAutoCommit(false)

/** If not filtered out, some JDBC drivers will try to receive all data before closing the result set.
* ToDo Fix this properly using SQL generators by adding a generator for schema query. */
val q = if (nativeQuery.toLowerCase.contains(" where ")) {
nativeQuery + " AND 0=1"
} else {
nativeQuery + " WHERE 0=1"
}

log.info(s"Successfully connected to JDBC URL: $url")
log.info(s"Getting metadata for: $q")
log.info(s"Getting metadata for: $schemaQuery")

try {
withMetadataResultSet(connection, q) { rs =>
withMetadataResultSet(connection, schemaQuery) { rs =>
action(connection, rs.getMetaData)
}
} finally {
connection.close()
}
}

private[core] def getSchemaQuery(sql: String): String = {
if (sql.toLowerCase.contains(" where ")) {
sql + " AND 0=1"
} else {
sql + " WHERE 0=1"
}
}

/**
* Executes a query against a JDBC connection and allows running an action on the result set.
* Handles the closure of created objects.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,22 @@ class JdbcSparkUtilsSuite extends AnyWordSpec with BeforeAndAfterAll with SparkT
}
}

"getSchemaQuery" should {
"work for queries without where clause" in {
val expected = s"SELECT * FROM A WHERE 0=1"
val actual = JdbcSparkUtils.getSchemaQuery("SELECT * FROM A")

assert(actual == expected)
}

"work for queries with where clause" in {
val expected = s"SELECT * FROM A WHERE X>0 AND 0=1"
val actual = JdbcSparkUtils.getSchemaQuery("SELECT * FROM A WHERE X>0")

assert(actual == expected)
}
}

"withMetadataResultSet" should {
"provide the resultset object for the query" in {
JdbcSparkUtils.withMetadataResultSet(getConnection, s"SELECT * FROM ${RdbExampleTable.Company.tableName}") { rs =>
Expand Down

0 comments on commit b04783d

Please sign in to comment.