From ac2d3e4e00d2b34ab581311b28599423612f6dd4 Mon Sep 17 00:00:00 2001 From: Brar Piening Date: Tue, 13 Sep 2022 18:32:11 +0200 Subject: [PATCH] Add NpgsqlConnection ReloadTypesAsync() Possibly fixes #4369 but will probably be replaced by a more general approach. --- src/Npgsql/NpgsqlConnection.cs | 85 ++++++++++++++++++++++------ src/Npgsql/PublicAPI.Unshipped.txt | 1 + test/Npgsql.Tests/ConnectionTests.cs | 27 ++++++--- test/Npgsql.Tests/ReaderTests.cs | 10 ++-- test/Npgsql.Tests/TestUtil.cs | 22 +++++++ 5 files changed, 114 insertions(+), 31 deletions(-) diff --git a/src/Npgsql/NpgsqlConnection.cs b/src/Npgsql/NpgsqlConnection.cs index 23fca29424..ad1aa21aaf 100644 --- a/src/Npgsql/NpgsqlConnection.cs +++ b/src/Npgsql/NpgsqlConnection.cs @@ -1163,7 +1163,7 @@ async Task BeginBinaryImport(string copyFromCommand, bool throw new ArgumentException("Must contain a COPY FROM STDIN command!", nameof(copyFromCommand)); CheckReady(); - var connector = StartBindingScope(ConnectorBindingScope.Copy); + var connector = await StartBindingScope(ConnectorBindingScope.Copy, async); LogMessages.StartingBinaryImport(connector.LoggingConfiguration.CopyLogger, connector.Id); // no point in passing a cancellationToken here, as we register the cancellation in the Init method @@ -1217,7 +1217,7 @@ async Task BeginBinaryExport(string copyToCommand, bool as throw new ArgumentException("Must contain a COPY TO STDOUT command!", nameof(copyToCommand)); CheckReady(); - var connector = StartBindingScope(ConnectorBindingScope.Copy); + var connector = await StartBindingScope(ConnectorBindingScope.Copy, async); LogMessages.StartingBinaryExport(connector.LoggingConfiguration.CopyLogger, connector.Id); // no point in passing a cancellationToken here, as we register the cancellation in the Init method @@ -1277,7 +1277,7 @@ async Task BeginTextImport(string copyFromCommand, bool async, Cance throw new ArgumentException("Must contain a COPY FROM STDIN command!", nameof(copyFromCommand)); CheckReady(); - var connector = StartBindingScope(ConnectorBindingScope.Copy); + var connector = await StartBindingScope(ConnectorBindingScope.Copy, async); LogMessages.StartingTextImport(connector.LoggingConfiguration.CopyLogger, connector.Id); // no point in passing a cancellationToken here, as we register the cancellation in the Init method @@ -1338,7 +1338,7 @@ async Task BeginTextExport(string copyToCommand, bool async, Cancell throw new ArgumentException("Must contain a COPY TO STDOUT command!", nameof(copyToCommand)); CheckReady(); - var connector = StartBindingScope(ConnectorBindingScope.Copy); + var connector = await StartBindingScope(ConnectorBindingScope.Copy, async); LogMessages.StartingTextExport(connector.LoggingConfiguration.CopyLogger, connector.Id); // no point in passing a cancellationToken here, as we register the cancellation in the Init method @@ -1399,7 +1399,7 @@ async Task BeginRawBinaryCopy(string copyCommand, bool asyn throw new ArgumentException("Must contain a COPY TO STDOUT OR COPY FROM STDIN command!", nameof(copyCommand)); CheckReady(); - var connector = StartBindingScope(ConnectorBindingScope.Copy); + var connector = await StartBindingScope(ConnectorBindingScope.Copy, async); LogMessages.StartingRawCopy(connector.LoggingConfiguration.CopyLogger, connector.Id); // no point in passing a cancellationToken here, as we register the cancellation in the Init method @@ -1807,10 +1807,36 @@ async ValueTask StartBindingScopeAsync() } } + /// + /// Synchronously starts a binding scope + /// + /// The to start + /// The to start + /// + /// A representing the connector to use within the started binding scope + /// + internal ValueTask StartBindingScope(ConnectorBindingScope scope, bool async) + => StartBindingScope(scope, NpgsqlTimeout.Infinite, async, CancellationToken.None); + + /// + /// Synchronously starts a binding scope + /// + /// The to start + /// A connector to use within the started binding scope + /// + /// Warning: Never use this in async methods when multiplexing because it may block and cause a deadlock. + /// internal NpgsqlConnector StartBindingScope(ConnectorBindingScope scope) - => StartBindingScope(scope, NpgsqlTimeout.Infinite, async: false, CancellationToken.None) - .GetAwaiter().GetResult(); + => StartBindingScope(scope, async: false).GetAwaiter().GetResult(); + /// + /// Synchronously starts a temporary binding scope + /// + /// A connector to execute the temporary commands + /// An which ends the temporary binding scope when it is disposed + /// + /// Warning: Never use this in async methods when multiplexing because it may block and cause a deadlock. + /// internal EndScopeDisposable StartTemporaryBindingScope(out NpgsqlConnector connector) { connector = StartBindingScope(ConnectorBindingScope.Temporary); @@ -2042,6 +2068,9 @@ public void UnprepareAll() /// public void ReloadTypes() { + if (Settings.Multiplexing) + throw new NotSupportedException(); + CheckReady(); using var scope = StartTemporaryBindingScope(out var connector); connector.LoadDatabaseInfo( @@ -2053,17 +2082,41 @@ public void ReloadTypes() // Increment the change counter on the global type mapper. This will make conn.Open() pick up the // new DatabaseInfo and set up a new connection type mapper TypeMapping.GlobalTypeMapper.Instance.RecordChange(); + } - if (Settings.Multiplexing) + /// + public async Task ReloadTypesAsync() + { + CheckReady(); + var connector = await StartBindingScope(ConnectorBindingScope.Temporary, true); + try + { + await connector.LoadDatabaseInfo( + forceReload: true, + NpgsqlTimeout.Infinite, + async: true, + CancellationToken.None); + + // Increment the change counter on the global type mapper. This will make conn.Open() pick up the + // new DatabaseInfo and set up a new connection type mapper + TypeMapping.GlobalTypeMapper.Instance.RecordChange(); + + if (Settings.Multiplexing) + { + var multiplexingTypeMapper = ((MultiplexingDataSource)NpgsqlDataSource).MultiplexingTypeMapper!; + Debug.Assert(multiplexingTypeMapper == connector.TypeMapper, + "A connector must reference the exact same TypeMapper the MultiplexingConnectorPool does"); + // It's very probable that we've called ReloadTypes on the different connection than + // the MultiplexingConnectorPool references. + // Which means, we have to explicitly call Reset after we change the connector's DatabaseInfo to reload type mappings. + multiplexingTypeMapper.Connector.DatabaseInfo = connector.TypeMapper.DatabaseInfo; + multiplexingTypeMapper.Reset(); + } + + } + finally { - var multiplexingTypeMapper = ((MultiplexingDataSource)NpgsqlDataSource).MultiplexingTypeMapper!; - Debug.Assert(multiplexingTypeMapper == connector.TypeMapper, - "A connector must reference the exact same TypeMapper the MultiplexingConnectorPool does"); - // It's very probable that we've called ReloadTypes on the different connection than - // the MultiplexingConnectorPool references. - // Which means, we have to explicitly call Reset after we change the connector's DatabaseInfo to reload type mappings. - multiplexingTypeMapper.Connector.DatabaseInfo = connector.TypeMapper.DatabaseInfo; - multiplexingTypeMapper.Reset(); + EndBindingScope(ConnectorBindingScope.Temporary); } } diff --git a/src/Npgsql/PublicAPI.Unshipped.txt b/src/Npgsql/PublicAPI.Unshipped.txt index 5e26f00162..6b65275b82 100644 --- a/src/Npgsql/PublicAPI.Unshipped.txt +++ b/src/Npgsql/PublicAPI.Unshipped.txt @@ -1,4 +1,5 @@ #nullable enable +Npgsql.NpgsqlConnection.ReloadTypesAsync() -> System.Threading.Tasks.Task! Npgsql.NpgsqlLoggingConfiguration static Npgsql.NpgsqlLoggingConfiguration.InitializeLogging(Microsoft.Extensions.Logging.ILoggerFactory! loggerFactory, bool parameterLoggingEnabled = false) -> void *REMOVED*Npgsql.NpgsqlConnection.Settings.get -> Npgsql.NpgsqlConnectionStringBuilder! diff --git a/test/Npgsql.Tests/ConnectionTests.cs b/test/Npgsql.Tests/ConnectionTests.cs index 4f5f31225f..8376447a42 100644 --- a/test/Npgsql.Tests/ConnectionTests.cs +++ b/test/Npgsql.Tests/ConnectionTests.cs @@ -1039,8 +1039,7 @@ public void Clone() } [Test, IssueLink("https://github.com/npgsql/npgsql/issues/824")] - [NonParallelizable] - public async Task ReloadTypes() + public async Task ReloadTypes([Values]bool async) { if (IsMultiplexing) return; @@ -1049,19 +1048,29 @@ public async Task ReloadTypes() using (var conn = await OpenConnectionAsync(connectionString)) using (var conn2 = await OpenConnectionAsync(connectionString)) { - Assert.That(await conn.ExecuteScalarAsync("SELECT EXISTS (SELECT * FROM pg_type WHERE typname='reload_types_enum')"), - Is.False); - await conn.ExecuteNonQueryAsync("CREATE TYPE pg_temp.reload_types_enum AS ENUM ('First', 'Second')"); - Assert.That(() => conn.TypeMapper.MapEnum(), Throws.Exception.TypeOf()); - conn.ReloadTypes(); - conn.TypeMapper.MapEnum(); + await using var tmpEnum = await CreateEnum(conn, out var enumName, out _); + Assert.That(() => conn.TypeMapper.MapEnum(enumName), Throws.Exception.TypeOf()); + if (async) + await conn.ReloadTypesAsync(); + else + { + if (IsMultiplexing) + { + Assert.That(() => conn.ReloadTypes(), Throws.InvalidOperationException); + return; + } + // ReSharper disable once MethodHasAsyncOverload + conn.ReloadTypes(); + } + + conn.TypeMapper.MapEnum(enumName); // Make sure conn2 picks up the new type after a pooled close var connId = conn2.ProcessID; conn2.Close(); conn2.Open(); Assert.That(conn2.ProcessID, Is.EqualTo(connId), "Didn't get the same connector back"); - conn2.TypeMapper.MapEnum(); + conn2.TypeMapper.MapEnum(enumName); } } enum ReloadTypesEnum { First, Second }; diff --git a/test/Npgsql.Tests/ReaderTests.cs b/test/Npgsql.Tests/ReaderTests.cs index b149a2c5f2..52f4250f3e 100644 --- a/test/Npgsql.Tests/ReaderTests.cs +++ b/test/Npgsql.Tests/ReaderTests.cs @@ -362,18 +362,17 @@ public async Task GetDataTypeName(string typeName, string? normalizedName = null Assert.That(reader.GetDataTypeName(0), Is.EqualTo(normalizedName)); } - [Test] + [Test, IssueLink("https://github.com/npgsql/npgsql/issues/4369")] public async Task GetDataTypeName_enum() { var csb = new NpgsqlConnectionStringBuilder(ConnectionString) { - MaxPoolSize = 1 + MaxPoolSize = 1, }; await using var conn = await OpenConnectionAsync(csb); await using var _ = await GetTempTypeName(conn, out var typeName); await conn.ExecuteNonQueryAsync($"CREATE TYPE {typeName} AS ENUM ('one')"); - await Task.Yield(); // TODO: fix multiplexing deadlock bug - conn.ReloadTypes(); + await conn.ReloadTypesAsync(); await using var cmd = new NpgsqlCommand($"SELECT 'one'::{typeName}", conn); await using var reader = await cmd.ExecuteReaderAsync(Behavior); await reader.ReadAsync(); @@ -390,8 +389,7 @@ public async Task GetDataTypeName_domain() await using var conn = await OpenConnectionAsync(csb); await using var _ = await GetTempTypeName(conn, out var typeName); await conn.ExecuteNonQueryAsync($"CREATE DOMAIN {typeName} AS VARCHAR(10)"); - await Task.Yield(); // TODO: fix multiplexing deadlock bug - conn.ReloadTypes(); + await conn.ReloadTypesAsync(); await using var cmd = new NpgsqlCommand($"SELECT 'one'::{typeName}", conn); await using var reader = await cmd.ExecuteReaderAsync(Behavior); await reader.ReadAsync(); diff --git a/test/Npgsql.Tests/TestUtil.cs b/test/Npgsql.Tests/TestUtil.cs index 52145eade4..f62d23d550 100644 --- a/test/Npgsql.Tests/TestUtil.cs +++ b/test/Npgsql.Tests/TestUtil.cs @@ -8,6 +8,7 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; +using Npgsql.NameTranslation; using NUnit.Framework; namespace Npgsql.Tests @@ -189,6 +190,27 @@ internal static Task CreateTempSchema(NpgsqlConnection conn, o TaskContinuationOptions.OnlyOnRanToCompletion); } + + /// + /// Creates a schema with a unique name, usable for a single test, and returns an to + /// drop it at the end of the test. + /// + internal static Task CreateEnum(NpgsqlConnection conn, out string enumName, out string[] enumValues, bool parallelizable = true, INpgsqlNameTranslator? nameTranslator = null) + where TEnum : struct, Enum + { + nameTranslator ??= new NpgsqlSnakeCaseNameTranslator(); + var enumType = typeof(TEnum); + enumName = nameTranslator.TranslateTypeName(enumType.Name); + enumValues = Enum.GetNames(enumType).Select(n => nameTranslator.TranslateMemberName(n)).ToArray(); + if (parallelizable) + enumName += Interlocked.Increment(ref _tempTypeCounter); + return conn.ExecuteNonQueryAsync($"DROP TYPE IF EXISTS {enumName} CASCADE; CREATE TYPE {enumName} AS ENUM ('{string.Join("', '", enumValues)}')") + .ContinueWith( + (_, name) => new DatabaseObjectDropper(conn, (string)name!, "TYPE"), + enumName, + TaskContinuationOptions.OnlyOnRanToCompletion); + } + /// /// Generates a unique table name, usable for a single test, and drops it if it already exists. /// Actual creation of the table is the responsibility of the caller.