Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion AwsWrapperDataProvider/AwsWrapperConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class AwsWrapperConnection : DbConnection, IWrapper

private bool deferredInitialization = false;

internal ConnectionPluginManager? PluginManager { get; private set; }
public ConnectionPluginManager? PluginManager { get; private set; }

internal Dictionary<string, string>? ConnectionProperties { get; private set; }

Expand Down
12 changes: 10 additions & 2 deletions AwsWrapperDataProvider/Driver/ConnectionPluginManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@
using AwsWrapperDataProvider.Driver.HostInfo;
using AwsWrapperDataProvider.Driver.HostListProviders;
using AwsWrapperDataProvider.Driver.Plugins;
using AwsWrapperDataProvider.Driver.Utils;
using AwsWrapperDataProvider.Properties;
using Microsoft.Extensions.Logging;

namespace AwsWrapperDataProvider.Driver;

public class ConnectionPluginManager
{
private readonly Dictionary<string, Delegate> pluginChainDelegates = [];
protected IList<IConnectionPlugin> plugins = [];
protected string[] activePluginCodes = [];
protected IConnectionProvider defaultConnProvider;
protected IConnectionProvider? effectiveConnProvider;
protected ConfigurationProfile? configurationProfile;
Expand Down Expand Up @@ -90,6 +93,7 @@ public void InitConnectionPluginChain(
this.effectiveConnProvider,
props,
this.configurationProfile);
this.activePluginCodes = pluginChainBuilder.GetPluginCodes(this.pluginService, props);
}

private async Task<T> ExecuteWithSubscribedPlugins<T>(
Expand All @@ -104,7 +108,7 @@ private async Task<T> ExecuteWithSubscribedPlugins<T>(
if (!this.pluginChainDelegates.TryGetValue(methodName, out Delegate? del))
{
del = this.MakePluginChainDelegate<T>(methodName);
this.pluginChainDelegates.Add(methodName, del);
this.pluginChainDelegates[methodName] = del;
}

if (del is not PluginChainADONetDelegate<T> pluginChainDelegate)
Expand All @@ -130,7 +134,6 @@ private async Task<T> ExecuteWithSubscribedPlugins<T>(
private PluginChainADONetDelegate<T> MakePluginChainDelegate<T>(string methodName)
{
PluginChainADONetDelegate<T>? pluginChainDelegate = null;

for (int i = this.plugins.Count - 1; i >= 0; i--)
{
IConnectionPlugin plugin = this.plugins[i];
Expand Down Expand Up @@ -248,4 +251,9 @@ public virtual bool AcceptsStrategy(string strategy)
{
return this.defaultConnProvider.AcceptsStrategy(strategy);
}

public bool IsPluginActive(string pluginCode)
{
return this.activePluginCodes.Contains(pluginCode);
}
}
18 changes: 16 additions & 2 deletions AwsWrapperDataProvider/Driver/Dialects/AuroraMySqlDialect.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

namespace AwsWrapperDataProvider.Driver.Dialects;

public class AuroraMySqlDialect : MySqlDialect
public class AuroraMySqlDialect : MySqlDialect, IBlueGreenDialect
{
private static readonly ILogger<AuroraMySqlDialect> Logger = LoggerUtils.GetLogger<AuroraMySqlDialect>();

Expand All @@ -39,6 +39,10 @@ public class AuroraMySqlDialect : MySqlDialect
private static readonly string IsWriterQuery = "SELECT SERVER_ID FROM information_schema.replica_host_status "
+ "WHERE SESSION_ID = 'MASTER_SESSION_ID' AND SERVER_ID = @@aurora_server_id";

private static readonly string AuroraMySqlBgTopologyExistsQuery = "SELECT 1 AS tmp FROM information_schema.tables WHERE table_schema = 'mysql' AND table_name = 'rds_topology'";

private static readonly string AuroraMySqlBgStatusQuery = "SELECT * FROM mysql.rds_topology";

public override IList<Type> DialectUpdateCandidates { get; } = [
typeof(RdsMultiAzDbClusterMySqlDialect),
];
Expand All @@ -54,7 +58,7 @@ public override async Task<bool> IsDialect(DbConnection connection)
}
catch (Exception ex) when (this.ExceptionHandler.IsSyntaxError(ex))
{
// Syntax error - expected when querying against incorrect dialect
Logger.LogTrace(ex, Resources.Error_CantCheckDialect_Syntax, nameof(AuroraMySqlDialect));
}
catch (Exception ex)
{
Expand Down Expand Up @@ -85,4 +89,14 @@ private HostListProviderSupplier GetHostListProviderSupplier()
NodeIdQuery,
IsReaderQuery);
}

public async Task<bool> IsBlueGreenStatusAvailable(DbConnection connection)
{
return await DialectUtils.CheckExistenceQueries(connection, this.ExceptionHandler, Logger, AuroraMySqlBgTopologyExistsQuery);
}

public string GetBlueGreenStatusQuery()
{
return AuroraMySqlBgStatusQuery;
}
}
18 changes: 17 additions & 1 deletion AwsWrapperDataProvider/Driver/Dialects/AuroraPgDialect.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

using System.Data.Common;
using System.Reflection;
using AwsWrapperDataProvider.Driver.HostListProviders;
using AwsWrapperDataProvider.Driver.HostListProviders.Monitoring;
using AwsWrapperDataProvider.Driver.Utils;
Expand All @@ -21,7 +22,7 @@

namespace AwsWrapperDataProvider.Driver.Dialects;

public class AuroraPgDialect : PgDialect, IAuroraLimitlessDialect
public class AuroraPgDialect : PgDialect, IAuroraLimitlessDialect, IBlueGreenDialect
{
private const string ReaderOrdinal = "aurora_stat_utils";

Expand All @@ -48,6 +49,11 @@ public class AuroraPgDialect : PgDialect, IAuroraLimitlessDialect
private static readonly string IsWriterQuery = "SELECT SERVER_ID FROM pg_catalog.aurora_replica_status() "
+ "WHERE SESSION_ID OPERATOR(pg_catalog.=) 'MASTER_SESSION_ID' AND SERVER_ID OPERATOR(pg_catalog.=) aurora_db_instance_identifier()";

protected static readonly string AuroraPostgreSqlBgTopologyExistsQuery = "SELECT 'pg_catalog.get_blue_green_fast_switchover_metadata'::regproc";

protected static readonly string DriverVersion = "1.0.1";
protected static readonly string AuroraPostgreSqlBgStatusQuery = $"SELECT * FROM pg_catalog.get_blue_green_fast_switchover_metadata('aws_advanced_dotnet_data_provider_wrapper-{DriverVersion}')";

public override IList<Type> DialectUpdateCandidates { get; } = [
typeof(RdsMultiAzDbClusterPgDialect),
];
Expand Down Expand Up @@ -115,5 +121,15 @@ private HostListProviderSupplier GetHostListProviderSupplier()
IsReaderQuery);
}

public async Task<bool> IsBlueGreenStatusAvailable(DbConnection connection)
{
return await DialectUtils.CheckExistenceQueries(connection, this.ExceptionHandler, Logger, AuroraPostgreSqlBgTopologyExistsQuery);
}

public string GetBlueGreenStatusQuery()
{
return AuroraPostgreSqlBgStatusQuery;
}

public string LimitlessRouterEndpointQuery { get => "SELECT router_endpoint, load from aurora_limitless_router_endpoints()"; }
}
49 changes: 49 additions & 0 deletions AwsWrapperDataProvider/Driver/Dialects/DialectUtils.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License").
// You may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System.Data.Common;
using AwsWrapperDataProvider.Driver.Exceptions;
using AwsWrapperDataProvider.Properties;
using Microsoft.Extensions.Logging;

namespace AwsWrapperDataProvider.Driver.Dialects;

public static class DialectUtils
{
public static async Task<bool> CheckExistenceQueries(DbConnection connection, IExceptionHandler exceptionHandler, ILogger logger, string query)
{
try
{
await using var command = connection.CreateCommand();
command.CommandText = query;
await using var reader = await command.ExecuteReaderAsync();
return await reader.ReadAsync();
}
catch (Exception ex) when (exceptionHandler.IsSyntaxError(ex))
{
// Syntax error - expected when querying against incorrect dialect
}
catch (Exception ex)
{
logger.LogTrace(ex, Resources.Error_CantCheckDialect, nameof(AuroraMySqlDialect));
}

return false;
}

public static bool IsBlueGreenConnectionDialect(IDialect dialect)
{
return dialect is IBlueGreenDialect;
}
}
24 changes: 24 additions & 0 deletions AwsWrapperDataProvider/Driver/Dialects/IBlueGreenDialect.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License").
// You may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System.Data.Common;

namespace AwsWrapperDataProvider.Driver.Dialects;

public interface IBlueGreenDialect
{
Task<bool> IsBlueGreenStatusAvailable(DbConnection connection);

string GetBlueGreenStatusQuery();
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class RdsMultiAzDbClusterPgDialect : PgDialect
private static readonly ILogger<RdsMultiAzDbClusterPgDialect> Logger = LoggerUtils.GetLogger<RdsMultiAzDbClusterPgDialect>();

private static readonly string TopologyQuery =
$"SELECT id, endpoint, port FROM rds_tools.show_topology('aws_dotnet_driver-{DriverVersion}')";
$"SELECT id, endpoint, port FROM rds_tools.show_topology('aws_advanced_dotnet_data_provider_wrapper-{DriverVersion}')";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we grab the version dynamically here too

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going to change in future PR


private static readonly string FetchWriterNodeQuery =
"SELECT multi_az_db_cluster_source_dbi_resource_id FROM rds_tools.multi_az_db_cluster_source_dbi_resource_id()"
Expand Down
19 changes: 17 additions & 2 deletions AwsWrapperDataProvider/Driver/Dialects/RdsMySqlDialect.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@

namespace AwsWrapperDataProvider.Driver.Dialects;

public class RdsMySqlDialect : MySqlDialect
public class RdsMySqlDialect : MySqlDialect, IBlueGreenDialect
{
protected static readonly string RdsMySqlTopologyTableExistsQuery = "SELECT 1 AS tmp FROM information_schema.tables WHERE" +
" table_schema = 'mysql' AND table_name = 'rds_topology'";

protected static readonly string RdsMySqlBgStatusQuery = "SELECT * FROM mysql.rds_topology";

private static readonly ILogger<RdsMySqlDialect> Logger = LoggerUtils.GetLogger<RdsMySqlDialect>();

public override IList<Type> DialectUpdateCandidates { get; } =
Expand Down Expand Up @@ -56,7 +61,7 @@ public override async Task<bool> IsDialect(DbConnection conn)
}
catch (Exception ex) when (this.ExceptionHandler.IsSyntaxError(ex))
{
// Syntax error - expected when querying against incorrect dialect
Logger.LogTrace(ex, Resources.Error_CantCheckDialect_Syntax, nameof(RdsPgDialect));
}
catch (Exception ex)
{
Expand All @@ -65,4 +70,14 @@ public override async Task<bool> IsDialect(DbConnection conn)

return false;
}

public async Task<bool> IsBlueGreenStatusAvailable(DbConnection connection)
{
return await DialectUtils.CheckExistenceQueries(connection, this.ExceptionHandler, Logger, RdsMySqlTopologyTableExistsQuery);
}

public string GetBlueGreenStatusQuery()
{
return RdsMySqlBgStatusQuery;
}
}
18 changes: 16 additions & 2 deletions AwsWrapperDataProvider/Driver/Dialects/RdsPgDialect.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@

namespace AwsWrapperDataProvider.Driver.Dialects;

public class RdsPgDialect : PgDialect
public class RdsPgDialect : PgDialect, IBlueGreenDialect
{
internal const string ExtensionsSql = "SELECT (setting LIKE '%rds_tools%') AS rds_tools, "
+ "(setting LIKE '%aurora_stat_utils%') AS aurora_stat_utils "
+ "FROM pg_catalog.pg_settings "
+ "WHERE name OPERATOR(pg_catalog.=) 'rds.extensions'";

protected static readonly string RdsPgTopologyTableExistsQuery = "SELECT 'rds_tools.show_topology'::regproc";

protected static readonly string RdsPgBgStatusQuery = "SELECT * FROM rds_tools.show_topology('aws_advanced_dotnet_data_provider_wrapper')";

private static readonly ILogger<RdsPgDialect> Logger = LoggerUtils.GetLogger<RdsPgDialect>();

public override IList<Type> DialectUpdateCandidates { get; } =
Expand Down Expand Up @@ -58,7 +62,7 @@ public override async Task<bool> IsDialect(DbConnection conn)
}
catch (Exception ex) when (this.ExceptionHandler.IsSyntaxError(ex))
{
// Syntax error - expected when querying against incorrect dialect
Logger.LogTrace(ex, Resources.Error_CantCheckDialect_Syntax, nameof(RdsPgDialect));
}
catch (Exception ex)
{
Expand All @@ -67,4 +71,14 @@ public override async Task<bool> IsDialect(DbConnection conn)

return false;
}

public async Task<bool> IsBlueGreenStatusAvailable(DbConnection connection)
{
return await DialectUtils.CheckExistenceQueries(connection, this.ExceptionHandler, Logger, RdsPgTopologyTableExistsQuery);
}

public string GetBlueGreenStatusQuery()
{
return RdsPgBgStatusQuery;
}
}
7 changes: 7 additions & 0 deletions AwsWrapperDataProvider/Driver/HostInfo/HostSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -195,4 +195,11 @@ public override string ToString()
this.LastUpdateTime,
this.HostId);
}

public HostSpec Clone()
{
HostSpec copy = new(this.Host, this.Port, this.HostId, this.Role, this.RawAvailability, this.Weight, this.LastUpdateTime);
copy.AddAlias(this.aliases.Keys.ToArray());
return copy;
}
}
7 changes: 7 additions & 0 deletions AwsWrapperDataProvider/Driver/IPluginService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -182,4 +182,11 @@ public interface IPluginService : IExceptionHandlerService
HostSpec GetHostSpecByStrategy(HostRole hostRole, string strategy);

HostSpec GetHostSpecByStrategy(IList<HostSpec> hosts, HostRole hostRole, string strategy);

/// <summary>
/// Checks if a specific plugin type is currently in use.
/// </summary>
/// <param name="pluginCode">The name of the plugin.</param>
/// <returns>True if the plugin is in use, false otherwise.</returns>
bool IsPluginInUse(string pluginCode);
}
5 changes: 5 additions & 0 deletions AwsWrapperDataProvider/Driver/PluginService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,11 @@ public HostSpec GetHostSpecByStrategy(IList<HostSpec> hosts, HostRole hostRole,
return this.pluginManager.GetHostSpecByStrategy(hosts, hostRole, strategy, this.props);
}

public bool IsPluginInUse(string pluginCode)
{
return this.pluginManager.IsPluginActive(pluginCode);
}

private HostSpec GetCurrentHostSpec()
{
this.currentHostSpec = this.InitialConnectionHostSpec
Expand Down
Loading