Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 8 additions & 15 deletions src/TriggerBinding/SqlTableChangeMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using Microsoft.Azure.WebJobs.Extensions.Sql.Telemetry;
using static Microsoft.Azure.WebJobs.Extensions.Sql.Telemetry.Telemetry;
using static Microsoft.Azure.WebJobs.Extensions.Sql.SqlTriggerConstants;
using static Microsoft.Azure.WebJobs.Extensions.Sql.SqlTriggerUtils;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Data.SqlClient;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -285,6 +286,8 @@ private async Task GetTableChangesAsync(SqlConnection connection, CancellationTo
{
try
{
await AcquireAppLockAsync(connection, transaction, this._logger, token);

// Update the version number stored in the global state table if necessary before using it.
using (SqlCommand updateTablesPreInvocationCommand = this.BuildUpdateTablesPreInvocation(connection, transaction))
{
Expand Down Expand Up @@ -525,6 +528,8 @@ private async Task RenewLeasesAsync(SqlConnection connection, CancellationToken
{
try
{
await AcquireAppLockAsync(connection, transaction, this._logger, token);

SqlCommand renewLeasesCommand = this.BuildRenewLeasesCommand(connection, transaction);
if (renewLeasesCommand != null)
{
Expand Down Expand Up @@ -634,6 +639,8 @@ private async Task ReleaseLeasesAsync(SqlConnection connection, CancellationToke
{
try
{
await AcquireAppLockAsync(connection, transaction, this._logger, token);

// Release the leases held on "_rowsToRelease".
using (SqlCommand releaseLeasesCommand = this.BuildReleaseLeasesCommand(connection, transaction))
{
Expand Down Expand Up @@ -789,8 +796,6 @@ private static SqlChangeOperation GetChangeOperation(IReadOnlyDictionary<string,
private SqlCommand BuildUpdateTablesPreInvocation(SqlConnection connection, SqlTransaction transaction)
{
string updateTablesPreInvocationQuery = $@"
{AppLockStatements}

DECLARE @min_valid_version bigint;
SET @min_valid_version = CHANGE_TRACKING_MIN_VALID_VERSION({this._userTableId});

Expand Down Expand Up @@ -834,8 +839,6 @@ private SqlCommand BuildGetChangesCommand(SqlConnection connection, SqlTransacti
// up regardless since we know it should be processed - no need to check the change version.
// Once a row is successfully processed the LeaseExpirationTime column is set to NULL.
string getChangesQuery = $@"
{AppLockStatements}

DECLARE @last_sync_version bigint;
SELECT @last_sync_version = LastSyncVersion
FROM {GlobalStateTableName}
Expand Down Expand Up @@ -882,8 +885,6 @@ private async Task<string> GetLeaseLockedOrMaxAttemptRowCountMessage(SqlConnecti
// * NULL LeaseExpirationTime OR LeaseExpirationTime <= Current Time
// * No attempts remaining (Attempt count = Max attempts)
string getLeaseLockedOrMaxAttemptRowCountQuery = $@"
{AppLockStatements}

DECLARE @last_sync_version bigint;
SELECT @last_sync_version = LastSyncVersion
FROM {GlobalStateTableName}
Expand Down Expand Up @@ -948,8 +949,6 @@ private SqlCommand BuildAcquireLeasesCommand(SqlConnection connection, SqlTransa
const string rowDataParameter = "@rowData";
// Create the merge query that will either update the rows that already exist or insert a new one if it doesn't exist
string query = $@"
{AppLockStatements}

WITH {acquireLeasesCte} AS ( SELECT * FROM OPENJSON(@rowData) WITH ({string.Join(",", cteColumnDefinitions)}) )
MERGE INTO {this._bracketedLeasesTableName}
AS ExistingData
Expand Down Expand Up @@ -989,8 +988,6 @@ private SqlCommand BuildRenewLeasesCommand(SqlConnection connection, SqlTransact
return null;
}
string renewLeasesQuery = $@"
{AppLockStatements}

UPDATE {this._bracketedLeasesTableName}
SET {LeasesTableLeaseExpirationTimeColumnName} = DATEADD(second, {LeaseIntervalInSeconds}, SYSDATETIME())
WHERE {matchCondition};
Expand Down Expand Up @@ -1021,9 +1018,7 @@ private SqlCommand BuildReleaseLeasesCommand(SqlConnection connection, SqlTransa
const string rowDataParameter = "@rowData";

string releaseLeasesQuery =
$@"{AppLockStatements}

WITH {releaseLeasesCte} AS ( SELECT * FROM OPENJSON(@rowData) WITH ({string.Join(",", cteColumnDefinitions)}) )
$@"WITH {releaseLeasesCte} AS ( SELECT * FROM OPENJSON(@rowData) WITH ({string.Join(",", cteColumnDefinitions)}) )
UPDATE {this._bracketedLeasesTableName}
SET
{LeasesTableChangeVersionColumnName} = cte.{SysChangeVersionColumnName},
Expand Down Expand Up @@ -1053,8 +1048,6 @@ private SqlCommand BuildUpdateTablesPostInvocation(SqlConnection connection, Sql
string leasesTableJoinCondition = string.Join(" AND ", this._primaryKeyColumns.Select(col => $"c.{col.name.AsBracketQuotedString()} = l.{col.name.AsBracketQuotedString()}"));

string updateTablesPostInvocationQuery = $@"
{AppLockStatements}

DECLARE @current_last_sync_version bigint;
SELECT @current_last_sync_version = LastSyncVersion
FROM {GlobalStateTableName}
Expand Down
11 changes: 2 additions & 9 deletions src/TriggerBinding/SqlTriggerListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ public async Task StartAsync(CancellationToken cancellationToken)
long createdSchemaDurationMs = 0L, createGlobalStateTableDurationMs = 0L, insertGlobalStateTableRowDurationMs = 0L, createLeasesTableDurationMs = 0L;
using (SqlTransaction transaction = connection.BeginTransaction(System.Data.IsolationLevel.RepeatableRead))
{
await AcquireAppLockAsync(connection, transaction, this._logger, cancellationToken);

createdSchemaDurationMs = await this.CreateSchemaAsync(connection, transaction, cancellationToken);
createGlobalStateTableDurationMs = await this.CreateGlobalStateTableAsync(connection, transaction, cancellationToken);
insertGlobalStateTableRowDurationMs = await this.InsertGlobalStateTableRowAsync(connection, transaction, userTableId, cancellationToken);
Expand Down Expand Up @@ -283,8 +285,6 @@ FROM sys.columns AS c
private async Task<long> CreateSchemaAsync(SqlConnection connection, SqlTransaction transaction, CancellationToken cancellationToken)
{
string createSchemaQuery = $@"
{AppLockStatements}

IF SCHEMA_ID(N'{SchemaName}') IS NULL
EXEC ('CREATE SCHEMA {SchemaName}');
";
Expand Down Expand Up @@ -328,8 +328,6 @@ IF SCHEMA_ID(N'{SchemaName}') IS NULL
private async Task<long> CreateGlobalStateTableAsync(SqlConnection connection, SqlTransaction transaction, CancellationToken cancellationToken)
{
string createGlobalStateTableQuery = $@"
{AppLockStatements}

IF OBJECT_ID(N'{GlobalStateTableName}', 'U') IS NULL
CREATE TABLE {GlobalStateTableName} (
UserFunctionID char(16) NOT NULL,
Expand Down Expand Up @@ -401,7 +399,6 @@ private async Task<long> InsertGlobalStateTableRowAsync(SqlConnection connection
}

string insertRowGlobalStateTableQuery = $@"
{AppLockStatements}
-- For back compatibility copy the lastSyncVersion from _hostIdFunctionId if it exists.
IF NOT EXISTS (
SELECT * FROM {GlobalStateTableName}
Expand Down Expand Up @@ -456,8 +453,6 @@ private async Task<long> CreateLeasesTableAsync(
// we're actually using the WEBSITE_SITE_NAME one (e.g. leasesTableName is different)
bool shouldMigrateOldLeasesTable = !string.IsNullOrEmpty(oldLeasesTableName) && oldLeasesTableName != leasesTableName;
string createLeasesTableQuery = shouldMigrateOldLeasesTable ? $@"
{AppLockStatements}

IF OBJECT_ID(N'{leasesTableName}', 'U') IS NULL
BEGIN
CREATE TABLE {leasesTableName} (
Expand All @@ -479,8 +474,6 @@ INSERT INTO {leasesTableName}
End
" :
$@"
{AppLockStatements}

IF OBJECT_ID(N'{leasesTableName}', 'U') IS NULL
CREATE TABLE {leasesTableName} (
{primaryKeysWithTypes},
Expand Down
4 changes: 2 additions & 2 deletions src/TriggerBinding/SqlTriggerMetricsProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ private async Task<long> GetUnprocessedChangeCountAsync()
{
try
{
await AcquireAppLockAsync(connection, transaction, this._logger, CancellationToken.None);

using (SqlCommand getUnprocessedChangesCommand = this.BuildGetUnprocessedChangesCommand(connection, transaction, primaryKeyColumns, userTableId))
{
var commandSw = Stopwatch.StartNew();
Expand Down Expand Up @@ -102,8 +104,6 @@ private SqlCommand BuildGetUnprocessedChangesCommand(SqlConnection connection, S
string leasesTableJoinCondition = string.Join(" AND ", primaryKeyColumns.Select(col => $"c.{col.name.AsBracketQuotedString()} = l.{col.name.AsBracketQuotedString()}"));
string bracketedLeasesTableName = GetBracketedLeasesTableName(this._userDefinedLeasesTableName, this._userFunctionId, userTableId);
string getUnprocessedChangesQuery = $@"
{AppLockStatements}

DECLARE @last_sync_version bigint;
SELECT @last_sync_version = LastSyncVersion
FROM {GlobalStateTableName}
Expand Down
18 changes: 18 additions & 0 deletions src/TriggerBinding/SqlTriggerUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,24 @@ namespace Microsoft.Azure.WebJobs.Extensions.Sql
public static class SqlTriggerUtils
{

/// <summary>
/// Acquires an exclusive application lock on the transaction to prevent deadlocks.
/// This should be called once at the beginning of each transaction rather than
/// being included in every individual query, since the lock is transaction-scoped
/// and subsequent acquisitions within the same transaction are no-ops.
/// </summary>
/// <param name="connection">The SQL connection</param>
/// <param name="transaction">The transaction to acquire the lock on</param>
/// <param name="logger">Logger for logging the command</param>
/// <param name="cancellationToken">Cancellation token</param>
internal static async Task AcquireAppLockAsync(SqlConnection connection, SqlTransaction transaction, ILogger logger, CancellationToken cancellationToken)
{
using (var command = new SqlCommand(AppLockStatements, connection, transaction))
{
await command.ExecuteNonQueryAsyncWithLogging(logger, cancellationToken, true);
}
}

/// <summary>
/// Gets the names and types of primary key columns of the user table.
/// </summary>
Expand Down