Skip to content

Commit

Permalink
Connection level bulk copy metadata caching (#2464)
Browse files Browse the repository at this point in the history
* Connection level caching for destination col metadata in bulkcopy

* Added new connection property toggle for bulk copy metadata caching

* Added tests

* Updated test

* Updated test p2

* Added comments for test

* Removed dupe of helper in project

* Additonal PR reviews

* Changed connection property name to cacheBulkCopyMetadata; Added logging
  • Loading branch information
tkyc authored Jul 5, 2024
1 parent 8f53c02 commit a3a9ca7
Show file tree
Hide file tree
Showing 14 changed files with 283 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -532,4 +532,19 @@ CallableStatement prepareCall(String sql, int nType, int nConcur, int nHold,
* @return flag for using Bulk Copy API for batch insert operations.
*/
boolean getUseBulkCopyForBatchInsert();

/**
* Returns value of 'cacheBulkCopyMetadata' from Connection String.
*
* @param cacheBulkCopyMetadata
* indicates whether the driver should use connection level caching of metadata for bulk copy
*/
void setcacheBulkCopyMetadata(boolean cacheBulkCopyMetadata);

/**
* Sets the value for 'cacheBulkCopyMetadata' property
*
* @return cacheBulkCopyMetadata boolean value
*/
boolean getcacheBulkCopyMetadata();
}
Original file line number Diff line number Diff line change
Expand Up @@ -1350,4 +1350,19 @@ public interface ISQLServerDataSource extends javax.sql.CommonDataSource {
* @return calcBigDecimalPrecision boolean value
*/
boolean getCalcBigDecimalPrecision();

/**
* Returns value of 'cacheBulkCopyMetadata' from Connection String.
*
* @param cacheBulkCopyMetadata
* indicates whether the driver should use connection level caching of metadata for bulk copy
*/
void setcacheBulkCopyMetadata(boolean cacheBulkCopyMetadata);

/**
* Sets the value for 'cacheBulkCopyMetadata' property
*
* @return cacheBulkCopyMetadata boolean value
*/
boolean getcacheBulkCopyMetadata();
}
147 changes: 106 additions & 41 deletions src/main/java/com/microsoft/sqlserver/jdbc/SQLServerBulkCopy.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package com.microsoft.sqlserver.jdbc;

import static com.microsoft.sqlserver.jdbc.SQLServerConnection.BULK_COPY_OPERATION_CACHE;
import static com.microsoft.sqlserver.jdbc.Util.getHashedSecret;
import static java.nio.charset.StandardCharsets.UTF_16LE;
import static java.nio.charset.StandardCharsets.UTF_8;

Expand All @@ -17,6 +19,8 @@
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.sql.Connection;
import java.sql.Date;
import java.sql.ResultSet;
Expand Down Expand Up @@ -46,6 +50,8 @@
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;

import javax.sql.RowSet;
Expand Down Expand Up @@ -263,6 +269,11 @@ class BulkColumnMetaData {
*/
private transient ScheduledFuture<?> timeout;

/**
* Shared timer
*/
private static final Lock DESTINATION_COL_METADATA_LOCK = new ReentrantLock();

/**
* The maximum temporal precision we can send when using varchar(precision) in bulkcommand, to send a
* smalldatetime/datetime value.
Expand Down Expand Up @@ -1719,61 +1730,115 @@ private void getDestinationMetadata() throws SQLServerException {
}

String escapedDestinationTableName = Util.escapeSingleQuotes(destinationTableName);
String key = null;

if (connection.getcacheBulkCopyMetadata()) {
String databaseName = connection.activeConnectionProperties
.getProperty(SQLServerDriverStringProperty.DATABASE_NAME.toString());
key = getHashedSecret(new String[] {escapedDestinationTableName, databaseName});
destColumnMetadata = BULK_COPY_OPERATION_CACHE.get(key);
}

if (null == destColumnMetadata || destColumnMetadata.isEmpty()) {
if (connection.getcacheBulkCopyMetadata()) {
DESTINATION_COL_METADATA_LOCK.lock();
destColumnMetadata = BULK_COPY_OPERATION_CACHE.get(key);

if (null == destColumnMetadata || destColumnMetadata.isEmpty()) {
try {
setDestinationColumnMetadata(escapedDestinationTableName);

// We are caching the following metadata about the table:
// 1. collation_name
// 2. is_computed
// 3. encryption_type
//
// Using this caching method, 'cacheBulkCopyMetadata', may have unintended consequences if the
// table changes somehow between inserts. For example, if the collation_name changes, the
// driver will not be aware of this and the inserted data will likely be corrupted. In such
// scenario, we can't detect this without making an additional metadata query, which would
// defeat the purpose of caching.
BULK_COPY_OPERATION_CACHE.put(key, destColumnMetadata);
} finally {
DESTINATION_COL_METADATA_LOCK.unlock();
}
}

if (loggerExternal.isLoggable(Level.FINER)) {
loggerExternal.finer(this.toString() + " Acquiring existing destination column metadata " +
"from cache for bulk copy");
}

} else {
setDestinationColumnMetadata(escapedDestinationTableName);

if (loggerExternal.isLoggable(Level.FINER)) {
loggerExternal.finer(this.toString() + " cacheBulkCopyMetadata=false - Querying server " +
"for destination column metadata");
}
}
}

destColumnCount = destColumnMetadata.size();
}

private void setDestinationColumnMetadata(String escapedDestinationTableName) throws SQLServerException {
SQLServerResultSet rs = null;
SQLServerStatement stmt = null;
String metaDataQuery = null;

if (null == destColumnMetadata || destColumnMetadata.isEmpty()) {
try {
if (null != destinationTableMetadata) {
rs = (SQLServerResultSet) destinationTableMetadata;
} else {
stmt = (SQLServerStatement) connection.createStatement(ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY, connection.getHoldability(), stmtColumnEncriptionSetting);
try {
if (null != destinationTableMetadata) {
rs = (SQLServerResultSet) destinationTableMetadata;
} else {
stmt = (SQLServerStatement) connection.createStatement(ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY, connection.getHoldability(), stmtColumnEncriptionSetting);

// Get destination metadata
rs = stmt.executeQueryInternal(
"sp_executesql N'SET FMTONLY ON SELECT * FROM " + escapedDestinationTableName + " '");
}
// Get destination metadata
rs = stmt.executeQueryInternal(
"sp_executesql N'SET FMTONLY ON SELECT * FROM " + escapedDestinationTableName + " '");
}

int destColumnMetadataCount = rs.getMetaData().getColumnCount();
destColumnMetadata = new HashMap<>();
destCekTable = rs.getCekTable();
int destColumnMetadataCount = rs.getMetaData().getColumnCount();
destColumnMetadata = new HashMap<>();
destCekTable = rs.getCekTable();

metaDataQuery = "select * from sys.columns where " + "object_id=OBJECT_ID('"
if (!connection.getServerSupportsColumnEncryption()) {
metaDataQuery = "select collation_name, is_computed from sys.columns where " + "object_id=OBJECT_ID('"
+ escapedDestinationTableName + "') " + "order by column_id ASC";
} else {
metaDataQuery = "select collation_name, is_computed, encryption_type from sys.columns where "
+ "object_id=OBJECT_ID('" + escapedDestinationTableName + "') " + "order by column_id ASC";
}

try (SQLServerStatement statementMoreMetadata = (SQLServerStatement) connection.createStatement();
SQLServerResultSet rsMoreMetaData = statementMoreMetadata.executeQueryInternal(metaDataQuery)) {
for (int i = 1; i <= destColumnMetadataCount; ++i) {
if (rsMoreMetaData.next()) {
String bulkCopyEncryptionType = null;
if (connection.getServerSupportsColumnEncryption()) {
bulkCopyEncryptionType = rsMoreMetaData.getString("encryption_type");
}
// Skip computed columns
if (!rsMoreMetaData.getBoolean("is_computed")) {
destColumnMetadata.put(i, new BulkColumnMetaData(rs.getColumn(i),
rsMoreMetaData.getString("collation_name"), bulkCopyEncryptionType));
}
} else {
destColumnMetadata.put(i, new BulkColumnMetaData(rs.getColumn(i)));
try (SQLServerStatement statementMoreMetadata = (SQLServerStatement) connection.createStatement();
SQLServerResultSet rsMoreMetaData = statementMoreMetadata.executeQueryInternal(metaDataQuery)) {
for (int i = 1; i <= destColumnMetadataCount; ++i) {
if (rsMoreMetaData.next()) {
String bulkCopyEncryptionType = null;
if (connection.getServerSupportsColumnEncryption()) {
bulkCopyEncryptionType = rsMoreMetaData.getString("encryption_type");
}
// Skip computed columns
if (!rsMoreMetaData.getBoolean("is_computed")) {
destColumnMetadata.put(i, new BulkColumnMetaData(rs.getColumn(i),
rsMoreMetaData.getString("collation_name"), bulkCopyEncryptionType));
}
} else {
destColumnMetadata.put(i, new BulkColumnMetaData(rs.getColumn(i)));
}
destColumnCount = destColumnMetadata.size();
}
} catch (SQLException e) {
// Unable to retrieve metadata for destination
throw new SQLServerException(SQLServerException.getErrString("R_unableRetrieveColMeta"), e);
} finally {
if (null != rs) {
rs.close();
}
if (null != stmt) {
stmt.close();
}
}
} catch (SQLException e) {
// Unable to retrieve metadata for destination
throw new SQLServerException(SQLServerException.getErrString("R_unableRetrieveColMeta"), e);
} finally {
if (null != rs) {
rs.close();
}
if (null != stmt) {
stmt.close();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ public class SQLServerConnection implements ISQLServerConnection, java.io.Serial
/** Current limit for this particular connection. */
private Boolean enablePrepareOnFirstPreparedStatementCall = null;

/** Used for toggling bulk copy caching */
private Boolean cacheBulkCopyMetadata = null;

/** Used for toggling use of sp_prepare */
private String prepareMethod = null;

Expand Down Expand Up @@ -1552,6 +1555,8 @@ public static void clearUserTokenCache() {
/** transaction descriptor */
private byte[] transactionDescriptor = new byte[8];

static final HashMap<String, Map<Integer, SQLServerBulkCopy.BulkColumnMetaData>> BULK_COPY_OPERATION_CACHE = new HashMap<>();

/**
* Flag (Yukon and later) set to true whenever a transaction is rolled back..The flag's value is reset to false when
* a new transaction starts or when the autoCommit mode changes.
Expand Down Expand Up @@ -3044,6 +3049,12 @@ else if (0 == requestedPacketSize)
useBulkCopyForBatchInsert = isBooleanPropertyOn(sPropKey, sPropValue);
}

sPropKey = SQLServerDriverBooleanProperty.ENABLE_BULK_COPY_CACHE.toString();
sPropValue = activeConnectionProperties.getProperty(sPropKey);
if (null != sPropValue) {
setcacheBulkCopyMetadata(isBooleanPropertyOn(sPropKey, sPropValue));
}

sPropKey = SQLServerDriverStringProperty.SSL_PROTOCOL.toString();
sPropValue = activeConnectionProperties.getProperty(sPropKey);
if (null == sPropValue) {
Expand Down Expand Up @@ -8050,6 +8061,20 @@ public void setEnablePrepareOnFirstPreparedStatementCall(boolean value) {
this.enablePrepareOnFirstPreparedStatementCall = value;
}

@Override
public boolean getcacheBulkCopyMetadata() {
if (null == this.cacheBulkCopyMetadata) {
return false;
}

return this.cacheBulkCopyMetadata;
}

@Override
public void setcacheBulkCopyMetadata(boolean value) {
this.cacheBulkCopyMetadata = value;
}

@Override
public String getPrepareMethod() {
if (null == this.prepareMethod) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,16 @@ public void setEnablePrepareOnFirstPreparedStatementCall(boolean value) {
wrappedConnection.setEnablePrepareOnFirstPreparedStatementCall(value);
}

@Override
public boolean getcacheBulkCopyMetadata() {
return wrappedConnection.getcacheBulkCopyMetadata();
}

@Override
public void setcacheBulkCopyMetadata(boolean value) {
wrappedConnection.setcacheBulkCopyMetadata(value);
}

@Override
public String getPrepareMethod() {
return wrappedConnection.getPrepareMethod();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,21 @@ public boolean getEnablePrepareOnFirstPreparedStatementCall() {
SQLServerDriverBooleanProperty.ENABLE_PREPARE_ON_FIRST_PREPARED_STATEMENT.toString(), defaultValue);
}

@Override
public void setcacheBulkCopyMetadata(boolean cacheBulkCopyMetadata) {
setBooleanProperty(connectionProps,
SQLServerDriverBooleanProperty.ENABLE_BULK_COPY_CACHE.toString(),
cacheBulkCopyMetadata);
}

@Override
public boolean getcacheBulkCopyMetadata() {
boolean defaultValue = SQLServerDriverBooleanProperty.ENABLE_BULK_COPY_CACHE
.getDefaultValue();
return getBooleanProperty(connectionProps,
SQLServerDriverBooleanProperty.ENABLE_BULK_COPY_CACHE.toString(), defaultValue);
}

@Override
public void setServerPreparedStatementDiscardThreshold(int serverPreparedStatementDiscardThreshold) {
setIntProperty(connectionProps,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,7 @@ enum SQLServerDriverBooleanProperty {
XOPEN_STATES("xopenStates", false),
FIPS("fips", false),
ENABLE_PREPARE_ON_FIRST_PREPARED_STATEMENT("enablePrepareOnFirstPreparedStatementCall", SQLServerConnection.DEFAULT_ENABLE_PREPARE_ON_FIRST_PREPARED_STATEMENT_CALL),
ENABLE_BULK_COPY_CACHE("cacheBulkCopyMetadata", false),
USE_BULK_COPY_FOR_BATCH_INSERT("useBulkCopyForBatchInsert", false),
USE_FMT_ONLY("useFmtOnly", false),
SEND_TEMPORAL_DATATYPES_AS_STRING_FOR_BULK_COPY("sendTemporalDataTypesAsStringForBulkCopy", true),
Expand Down Expand Up @@ -918,6 +919,9 @@ public final class SQLServerDriver implements java.sql.Driver {
new SQLServerDriverPropertyInfo(SQLServerDriverBooleanProperty.USE_BULK_COPY_FOR_BATCH_INSERT.toString(),
Boolean.toString(SQLServerDriverBooleanProperty.USE_BULK_COPY_FOR_BATCH_INSERT.getDefaultValue()),
false, TRUE_FALSE),
new SQLServerDriverPropertyInfo(SQLServerDriverBooleanProperty.ENABLE_BULK_COPY_CACHE.toString(),
Boolean.toString(SQLServerDriverBooleanProperty.ENABLE_BULK_COPY_CACHE.getDefaultValue()),
false, TRUE_FALSE),
new SQLServerDriverPropertyInfo(SQLServerDriverStringProperty.MSI_CLIENT_ID.toString(),
SQLServerDriverStringProperty.MSI_CLIENT_ID.getDefaultValue(), false, null),
new SQLServerDriverPropertyInfo(SQLServerDriverStringProperty.KEY_VAULT_PROVIDER_CLIENT_ID.toString(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import java.net.URI;
import java.net.URISyntaxException;

import java.security.MessageDigest;
import java.text.MessageFormat;

import java.util.Collections;
Expand Down Expand Up @@ -57,6 +56,8 @@
import com.microsoft.sqlserver.jdbc.SQLServerConnection.ActiveDirectoryAuthentication;
import com.microsoft.sqlserver.jdbc.SQLServerConnection.SqlFedAuthInfo;

import static com.microsoft.sqlserver.jdbc.Util.getHashedSecret;


class SQLServerMSAL4JUtils {

Expand Down Expand Up @@ -507,20 +508,6 @@ private static SQLServerException getCorrectedException(Exception e, String user
}
}

private static String getHashedSecret(String[] secrets) throws SQLServerException {
try {
MessageDigest md = MessageDigest.getInstance("SHA-256");
for (String secret : secrets) {
if (null != secret) {
md.update(secret.getBytes(java.nio.charset.StandardCharsets.UTF_16LE));
}
}
return new String(md.digest());
} catch (NoSuchAlgorithmException e) {
throw new SQLServerException(SQLServerException.getErrString("R_NoSHA256Algorithm"), e);
}
}

private static class TokenCacheMap {
private ConcurrentHashMap<String, PersistentTokenCacheAccessAspect> tokenCacheMap = new ConcurrentHashMap<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ protected Object[][] getContents() {
{"R_socketTimeoutPropertyDescription", "The number of milliseconds to wait before the java.net.SocketTimeoutException is raised."},
{"R_serverPreparedStatementDiscardThresholdPropertyDescription", "The threshold for when to close discarded prepare statements on the server (calling a batch of sp_unprepares). A value of 1 or less will cause sp_unprepare to be called immediately on PreparedStatment close."},
{"R_enablePrepareOnFirstPreparedStatementCallPropertyDescription", "This setting specifies whether a prepared statement is prepared (sp_prepexec) on first use (property=true) or on second after first calling sp_executesql (property=false)."},
{"R_cacheBulkCopyMetadataPropertyDescription", "This setting specifies whether the driver caches the metadata used for bulk copy for batch inserts."},
{"R_statementPoolingCacheSizePropertyDescription", "This setting specifies the size of the prepared statement cache for a connection. A value less than 1 means no cache."},
{"R_gsscredentialPropertyDescription", "Impersonated GSS Credential to access SQL Server."},
{"R_msiClientIdPropertyDescription", "Client Id of User Assigned Managed Identity to be used for generating access token for Azure AD MSI Authentication"},
Expand Down
Loading

0 comments on commit a3a9ca7

Please sign in to comment.