Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
487 changes: 487 additions & 0 deletions docs/SessionPool_Exception_Handling.md

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions src/Apache.IoTDB.Data/IoTDBDataReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ internal IoTDBDataReader(IoTDBCommand IoTDBCommand, SessionDataSet dataSet, bool
_command = IoTDBCommand;
_closeConnection = closeConnection;
_fieldCount = dataSet.GetColumnNames().Count;
_hasRows = dataSet.RowCount() > 0;
_recordsAffected = dataSet.RowCount();
_hasRows = dataSet.CurrentBatchRowCount() > 0;
_recordsAffected = -1; // Total row count is unknown; use -1 per ADO.NET convention

_closed = _closeConnection;
_metas = dataSet.GetColumnNames();
Expand Down
62 changes: 32 additions & 30 deletions src/Apache.IoTDB/ConcurrentClientQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ namespace Apache.IoTDB
public class ConcurrentClientQueue
{
public ConcurrentQueue<Client> ClientQueue { get; }
internal IPoolDiagnosticReporter DiagnosticReporter { get; set; }

public ConcurrentClientQueue(List<Client> clients)
{
Expand All @@ -47,50 +48,51 @@ public void Return(Client client)
Monitor.Exit(ClientQueue);
Thread.Sleep(0);
}
int _ref = 0;
public void AddRef()
{
lock (this)
{
_ref++;
}
}
public int GetRef()
{
return _ref;
}
public void RemoveRef()
{
lock (this)
{
_ref--;
}
}
private int _ref = 0;
public void AddRef() => Interlocked.Increment(ref _ref);
public int GetRef() => Volatile.Read(ref _ref);
public void RemoveRef() => Interlocked.Decrement(ref _ref);
public int Timeout { get; set; } = 10;
public Client Take()
{
Client client = null;
Monitor.Enter(ClientQueue);
while (true)
try
{
bool timeout = false;
if (ClientQueue.IsEmpty)
while (true)
{
timeout = !Monitor.Wait(ClientQueue, TimeSpan.FromSeconds(Timeout));
}
ClientQueue.TryDequeue(out client);
bool timeout = false;
if (ClientQueue.IsEmpty)
{
timeout = !Monitor.Wait(ClientQueue, TimeSpan.FromSeconds(Timeout));
}
ClientQueue.TryDequeue(out client);

if (client != null || timeout)
{
break;
if (client != null || timeout)
{
break;
}
}
}
Monitor.Exit(ClientQueue);
finally
{
Monitor.Exit(ClientQueue);
}
if (client == null)
{
throw new TimeoutException($"Connection pool is empty and wait time out({Timeout}s)!");
var reasonPhrase = $"Connection pool is empty and wait time out({Timeout}s)";
if (DiagnosticReporter != null)
{
throw DiagnosticReporter.BuildDepletionException(reasonPhrase);
}
throw new TimeoutException(reasonPhrase);
}
return client;
}
}

internal interface IPoolDiagnosticReporter
{
SessionPoolDepletedException BuildDepletionException(string reasonPhrase);
}
}
13 changes: 13 additions & 0 deletions src/Apache.IoTDB/DataStructure/SessionDataSet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,20 @@ public SessionDataSet(
public IReadOnlyList<string> GetColumnNames() => _rpcDataSet._columnNameList;
public IReadOnlyList<string> GetColumnTypes() => _rpcDataSet._columnTypeList;

/// <summary>
/// Gets the number of rows in the current fetched batch (tsBlock).
/// Note: This is NOT the total row count of the query result. Use HasNext() to check for more data.
/// </summary>
/// <returns>The number of rows in the current batch.</returns>
public int CurrentBatchRowCount() => _rpcDataSet._tsBlockSize;

/// <summary>
/// Gets the number of rows in the current fetched batch.
/// </summary>
/// <returns>The number of rows in the current batch.</returns>
[Obsolete("Use CurrentBatchRowCount() instead. This method returns batch size, not total row count.")]
public int RowCount() => _rpcDataSet._tsBlockSize;

public void ShowTableNames()
{
IReadOnlyList<string> columns = GetColumnNames();
Expand Down
52 changes: 52 additions & 0 deletions src/Apache.IoTDB/PoolHealthMetrics.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

using System.Threading;

namespace Apache.IoTDB
{
/// <summary>
/// Encapsulates real-time health statistics for connection pool monitoring.
/// Thread-safe implementation for concurrent access patterns.
/// </summary>
internal class PoolHealthMetrics
{
private int _reconnectionFailureTally;
private readonly int _configuredMaxSize;

public PoolHealthMetrics(int configuredMaxSize)
{
_configuredMaxSize = configuredMaxSize;
}

public void IncrementReconnectionFailures()
{
Interlocked.Increment(ref _reconnectionFailureTally);
}

public void ResetAllCounters()
{
Interlocked.Exchange(ref _reconnectionFailureTally, 0);
}

public int GetReconnectionFailureTally() => Volatile.Read(ref _reconnectionFailureTally);

public int GetConfiguredMaxSize() => _configuredMaxSize;
}
}
41 changes: 41 additions & 0 deletions src/Apache.IoTDB/ReconnectionFailedException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

using System;
using Thrift;

namespace Apache.IoTDB
{
/// <summary>
/// Exception thrown when all reconnection attempts to the server have failed.
/// This exception is used internally to distinguish reconnection failures from other errors.
/// </summary>
internal class ReconnectionFailedException : TException
{
internal ReconnectionFailedException(string message)
: base(message, null)
{
}

internal ReconnectionFailedException(string message, Exception innerException)
: base(message, innerException)
{
}
}
}
Loading
Loading