Skip to content

Commit

Permalink
Merge pull request #14 from skbkontur/ConnectionAcquiringMeter
Browse files Browse the repository at this point in the history
Add new metric which indicates new connections acquire rpm
  • Loading branch information
gangstatracer authored Oct 26, 2020
2 parents 0343fcb + 7cdc778 commit 4ce98bc
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using NUnit.Framework;

using SkbKontur.Cassandra.ThriftClient.Core.GenericPool;
using SkbKontur.Cassandra.ThriftClient.Core.Metrics;
using SkbKontur.Cassandra.TimeBasedUuid;

using Vostok.Logging.Abstractions;
Expand All @@ -18,7 +19,7 @@ public class IdleConnectionRemovalFromPoolTest
[Test]
public void TestRemoveConnection()
{
using (var pool = new Pool<Item>(x => new Item(), new SilentLog()))
using (var pool = new Pool<Item>(x => new Item(), NoOpMetrics.Instance, new SilentLog()))
{
var item1 = pool.Acquire();
var item2 = pool.Acquire();
Expand Down Expand Up @@ -49,7 +50,7 @@ public void TestRemoveConnectionMultiThread()
{
Interlocked.Increment(ref newPoolItemsCreatedCount);
return new Item();
}, new SilentLog()))
}, NoOpMetrics.Instance, new SilentLog()))
{
const int threadCount = 100;
const int initialPoolItemsCount = threadCount - 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@
using System.Linq;
using System.Threading;

using Moq;

using NUnit.Framework;

using SkbKontur.Cassandra.ThriftClient.Core.GenericPool;
using SkbKontur.Cassandra.ThriftClient.Core.GenericPool.Exceptions;
using SkbKontur.Cassandra.ThriftClient.Core.Metrics;
using SkbKontur.Cassandra.TimeBasedUuid;

using Vostok.Logging.Abstractions;
Expand All @@ -25,7 +28,7 @@ public void AcquireItemFromEmptyPool()
{
factoryInvokeCount++;
return lastFactoryResult = new Item();
}, new SilentLog()))
}, NoOpMetrics.Instance, new SilentLog()))
{
var item = pool.Acquire();
Assert.That(item, Is.EqualTo(lastFactoryResult));
Expand All @@ -36,7 +39,7 @@ public void AcquireItemFromEmptyPool()
[Test]
public void DisposeAndReleaseDeadItemsThroughAcquire()
{
using (var pool = new Pool<Item>(x => new Item(), new SilentLog()))
using (var pool = new Pool<Item>(x => new Item(), NoOpMetrics.Instance, new SilentLog()))
{
var item1 = pool.Acquire();
item1.IsAlive = false;
Expand All @@ -50,12 +53,12 @@ public void DisposeAndReleaseDeadItemsThroughAcquire()
[Test]
public void DisposeAndReleaseDeadItemsThroughAcquireExists()
{
using (var pool = new Pool<Item>(x => new Item(), new SilentLog()))
using (var pool = new Pool<Item>(x => new Item(), NoOpMetrics.Instance, new SilentLog()))
{
var item1 = pool.Acquire();
item1.IsAlive = false;
pool.Release(item1);
Assert.That(!pool.TryAcquireExists(out var item2));
Assert.That(!pool.TryAcquireExists(out _));
Assert.That(item1.Disposed);
}
}
Expand All @@ -69,7 +72,7 @@ public void AcquireAndReleaseItemFromEmptyPool()
{
factoryInvokeCount++;
return new Item();
}, new SilentLog()))
}, NoOpMetrics.Instance, new SilentLog()))
{
var item1 = pool.Acquire();
pool.Release(item1);
Expand All @@ -83,7 +86,7 @@ public void AcquireAndReleaseItemFromEmptyPool()
[Test]
public void TryReleaseItemTwice()
{
using (var pool = new Pool<Item>(x => new Item(), new SilentLog()))
using (var pool = new Pool<Item>(x => new Item(), NoOpMetrics.Instance, new SilentLog()))
{
var item1 = pool.Acquire();
pool.Release(item1);
Expand All @@ -96,7 +99,7 @@ public void DisposeItemsAfterPoolDisposed()
{
Item item1;
Item item2;
using (var pool = new Pool<Item>(x => new Item(), new SilentLog()))
using (var pool = new Pool<Item>(x => new Item(), NoOpMetrics.Instance, new SilentLog()))
{
item1 = pool.Acquire();
item2 = pool.Acquire();
Expand All @@ -111,32 +114,42 @@ public void DisposeItemsAfterPoolDisposed()
[Test]
public void TestAcquireNew()
{
using (var pool = new Pool<Item>(x => new Item(), new SilentLog()))
var metricsMock = new Mock<IConnectionPoolMetrics>(MockBehavior.Strict);
metricsMock.Setup(x => x.AcquireNewConnectionContext())
.Returns(NoOpContext.Instance)
.Verifiable();
using (var pool = new Pool<Item>(x => new Item(), metricsMock.Object, new SilentLog()))
{
var item1 = pool.AcquireNew();
pool.Release(item1);
var item2 = pool.AcquireNew();
Assert.That(item2, Is.Not.EqualTo(item1));
metricsMock.Verify(x => x.AcquireNewConnectionContext(), Times.Exactly(2));
}
}

[Test]
public void TestAcquireExists()
{
using (var pool = new Pool<Item>(x => new Item(), new SilentLog()))
var metricsMock = new Mock<IConnectionPoolMetrics>(MockBehavior.Strict);
metricsMock.Setup(x => x.AcquireNewConnectionContext())
.Returns(NoOpContext.Instance)
.Verifiable();
using (var pool = new Pool<Item>(x => new Item(), metricsMock.Object, new SilentLog()))
{
var item1 = pool.AcquireNew();
pool.Release(item1);
Assert.That(pool.TryAcquireExists(out var item2));
Assert.That(item2, Is.EqualTo(item1));
Assert.That(!pool.TryAcquireExists(out var item3));
Assert.That(!pool.TryAcquireExists(out _));
metricsMock.Verify(x => x.AcquireNewConnectionContext(), Times.Exactly(1));
}
}

[Test]
public void TestRemoveItemFromPool()
{
using (var pool = new Pool<Item>(x => new Item(), new SilentLog()))
using (var pool = new Pool<Item>(x => new Item(), NoOpMetrics.Instance, new SilentLog()))
{
var item1 = pool.Acquire();
var item2 = pool.Acquire();
Expand All @@ -153,7 +166,7 @@ public void TestRemoveItemFromPool()
[Test]
public void TestTryRemoveReleasedItemFromPool()
{
using (var pool = new Pool<Item>(x => new Item(), new SilentLog()))
using (var pool = new Pool<Item>(x => new Item(), NoOpMetrics.Instance, new SilentLog()))
{
var item1 = pool.Acquire();
var item2 = pool.Acquire();
Expand All @@ -166,7 +179,7 @@ public void TestTryRemoveReleasedItemFromPool()
[Test]
public void TestTryRemoveItemDoesNotBelongInPool()
{
using (var pool = new Pool<Item>(x => new Item(), new SilentLog()))
using (var pool = new Pool<Item>(x => new Item(), NoOpMetrics.Instance, new SilentLog()))
{
var item1 = pool.Acquire();
pool.Release(item1);
Expand All @@ -177,7 +190,7 @@ public void TestTryRemoveItemDoesNotBelongInPool()
[Test]
public void MultiThreadTest()
{
using (var pool = new Pool<Item>(x => new Item(), new SilentLog()))
using (var pool = new Pool<Item>(x => new Item(), NoOpMetrics.Instance, new SilentLog()))
{
var threads = Enumerable
.Range(0, 100)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

using SkbKontur.Cassandra.ThriftClient.Core.GenericPool;
using SkbKontur.Cassandra.ThriftClient.Core.GenericPool.Exceptions;
using SkbKontur.Cassandra.ThriftClient.Core.Metrics;

using Vostok.Logging.Abstractions;

Expand Down Expand Up @@ -197,8 +198,8 @@ public void TestAcquireOnlyLiveItemsWithDeadNode()
using (var pool = CreatePool<Item, ItemKey, ReplicaKey>(
new[] {new ReplicaKey("replica1"), new ReplicaKey("replica2")},
(x, r) => r.Name == "replica2" ?
new Pool<Item>(y => new Item(x, r) {IsAlive = false}, new SilentLog()) :
new Pool<Item>(y => new Item(x, r), new SilentLog())))
new Pool<Item>(y => new Item(x, r) {IsAlive = false}, NoOpMetrics.Instance, new SilentLog()) :
new Pool<Item>(y => new Item(x, r), NoOpMetrics.Instance, new SilentLog())))
{
var itemKey = new ItemKey("key1");

Expand Down Expand Up @@ -232,8 +233,8 @@ public void TestCreationItemsOnDeadReplica()
{
deadNodeAttemptCount++;
return new Item(x, r) {IsAlive = false};
}, new SilentLog()) :
new Pool<Item>(y => new Item(x, r), new SilentLog())))
}, NoOpMetrics.Instance, new SilentLog()) :
new Pool<Item>(y => new Item(x, r), NoOpMetrics.Instance, new SilentLog())))
{
var itemKey = new ItemKey("key1");

Expand Down Expand Up @@ -265,8 +266,8 @@ public void TestCreationItemsAfterDeadReplica()
{
deadNodeAttemptCount++;
return new Item(x, r) {IsAlive = false};
}, new SilentLog()) :
new Pool<Item>(y => new Item(x, r), new SilentLog())))
}, NoOpMetrics.Instance, new SilentLog()) :
new Pool<Item>(y => new Item(x, r), NoOpMetrics.Instance, new SilentLog())))
{
Enumerable.Range(0, 100).ToList().ForEach(x => pool.BadReplica(new ReplicaKey("replica2"))); // Health: 0.01

Expand Down Expand Up @@ -298,9 +299,9 @@ public void TestAcquireNewFromDeadNode()
{
acquireFromDeadNodeCount++;
return new Item(x, r) {IsAlive = false};
}, new SilentLog());
}, NoOpMetrics.Instance, new SilentLog());
}
return new Pool<Item>(y => new Item(x, r), new SilentLog());
return new Pool<Item>(y => new Item(x, r), NoOpMetrics.Instance, new SilentLog());
}))
{
var itemKey = new ItemKey("key1");
Expand Down Expand Up @@ -344,7 +345,7 @@ public void TestAcquireNewFromDeadNode()
[Test]
public void TestAcquireNewWithDeadNodes()
{
using (var pool = CreatePool<Item, ItemKey, ReplicaKey>(new[] {new ReplicaKey("replica1"), new ReplicaKey("replica2")}, (x, z) => new Pool<Item>(y => new Item(x, z) {IsAlive = false}, new SilentLog())))
using (var pool = CreatePool<Item, ItemKey, ReplicaKey>(new[] {new ReplicaKey("replica1"), new ReplicaKey("replica2")}, (x, z) => new Pool<Item>(y => new Item(x, z) {IsAlive = false}, NoOpMetrics.Instance, new SilentLog())))
{
Assert.Throws<AllItemsIsDeadExceptions>(() => pool.Acquire(new ItemKey("1")));
Assert.Throws<AllItemsIsDeadExceptions>(() => pool.Acquire(new ItemKey("1")));
Expand All @@ -359,7 +360,7 @@ public void TestAcquireConnectionWithExceptionInOnePool()
if (z.Name == "replica1")
throw new Exception("FakeException");
return new Item(x, z);
}, new SilentLog())))
}, NoOpMetrics.Instance, new SilentLog())))
{
for (var i = 0; i < 1000; i++)
{
Expand All @@ -372,7 +373,7 @@ public void TestAcquireConnectionWithExceptionInOnePool()
[Test]
public void TestTryAcquireConnectionWithExceptionAllPools()
{
using (var pool = CreatePool<Item, ItemKey, ReplicaKey>(new[] {new ReplicaKey("replica1"), new ReplicaKey("replica2")}, (x, z) => new Pool<Item>(y => { throw new Exception("FakeException"); }, new SilentLog())))
using (var pool = CreatePool<Item, ItemKey, ReplicaKey>(new[] {new ReplicaKey("replica1"), new ReplicaKey("replica2")}, (x, z) => new Pool<Item>(y => { throw new Exception("FakeException"); }, NoOpMetrics.Instance, new SilentLog())))
{
for (var i = 0; i < 1000; i++)
{
Expand All @@ -396,7 +397,7 @@ public void TestTryAcquireConnectionWithExceptionAllPools()
[Test]
public void TestRemoveUnusedConnection()
{
using (var pool = ReplicaSetPool.Create<Item, ItemKey, ReplicaKey>(new[] {new ReplicaKey("replica1")}, (x, z) => new Pool<Item>(y => new Item(x, z), new SilentLog()), PoolSettings.CreateDefault(), TimeSpan.FromMilliseconds(100), new SilentLog()))
using (var pool = ReplicaSetPool.Create<Item, ItemKey, ReplicaKey>(new[] {new ReplicaKey("replica1")}, (x, z) => new Pool<Item>(y => new Item(x, z), NoOpMetrics.Instance, new SilentLog()), PoolSettings.CreateDefault(), TimeSpan.FromMilliseconds(100), new SilentLog()))
{
var item1 = pool.Acquire(null);
var item2 = pool.Acquire(null);
Expand All @@ -417,7 +418,7 @@ public void TestRemoveUnusedConnection()
[Test]
public void TestRemoveAcquiredConnectionFromPool()
{
using (var pool = CreatePool<Item, ItemKey, ReplicaKey>(new[] {new ReplicaKey("replica1")}, (x, z) => new Pool<Item>(y => new Item(x, z), new SilentLog())))
using (var pool = CreatePool<Item, ItemKey, ReplicaKey>(new[] {new ReplicaKey("replica1")}, (x, z) => new Pool<Item>(y => new Item(x, z), NoOpMetrics.Instance, new SilentLog())))
{
var item1 = pool.Acquire(null);
var item2 = pool.Acquire(null);
Expand Down Expand Up @@ -562,7 +563,7 @@ private static ReplicaSetPool<Item, ItemKey, ReplicaKey> CreateReplicaSetPool(in
.Select(n => string.Format(nameFormat, n))
.Select(x => new ReplicaKey(x))
.ToArray();
var pool = CreatePool<Item, ItemKey, ReplicaKey>(replicas, (x, z) => new Pool<Item>(y => new Item(x, z), new SilentLog()));
var pool = CreatePool<Item, ItemKey, ReplicaKey>(replicas, (x, z) => new Pool<Item>(y => new Item(x, z), NoOpMetrics.Instance, new SilentLog()));
return pool;
}

Expand Down Expand Up @@ -685,7 +686,7 @@ public ReplicaSetPoolManager(int count, PoolSettings poolSettings)
});
replicaSetPool = ReplicaSetPool.Create<Item, ItemKey, ReplicaKey>(
replicaInfos.Values.Select(x => x.Key).ToArray(),
(x, z) => new Pool<Item>(y => CreateReplicaConnection(z, x), new SilentLog()),
(x, z) => new Pool<Item>(y => CreateReplicaConnection(z, x), NoOpMetrics.Instance, new SilentLog()),
poolSettings, new SilentLog());
}

Expand Down
14 changes: 12 additions & 2 deletions Cassandra.ThriftClient/Clusters/CassandraCluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using SkbKontur.Cassandra.ThriftClient.Connections;
using SkbKontur.Cassandra.ThriftClient.Core;
using SkbKontur.Cassandra.ThriftClient.Core.GenericPool;
using SkbKontur.Cassandra.ThriftClient.Core.Metrics;
using SkbKontur.Cassandra.ThriftClient.Core.Pools;

using Vostok.Logging.Abstractions;
Expand Down Expand Up @@ -103,7 +104,7 @@ private ReplicaSetPool<IThriftConnection, string, IPEndPoint> CreateFierceConnec

private Pool<IThriftConnection> GetDataConnectionPool(ICassandraClusterSettings settings, IPEndPoint nodeEndpoint, string keyspaceName)
{
var result = new Pool<IThriftConnection>(pool => CreateThriftConnection(nodeEndpoint, keyspaceName, settings.Timeout, settings.Credentials), logger);
var result = CreateConnectionPool(settings, nodeEndpoint, keyspaceName, settings.Timeout);
logger.Debug("Pool for node with endpoint {0} for keyspace '{1}' was created.", nodeEndpoint, keyspaceName);
return result;
}
Expand All @@ -117,11 +118,20 @@ private ThriftConnectionInPoolWrapper CreateThriftConnection(IPEndPoint nodeEndp

private Pool<IThriftConnection> CreateFiercePool(ICassandraClusterSettings settings, IPEndPoint nodeEndpoint, string keyspaceName)
{
var result = new Pool<IThriftConnection>(pool => CreateThriftConnection(nodeEndpoint, keyspaceName, settings.FierceTimeout, settings.Credentials), logger);
var result = CreateConnectionPool(settings, nodeEndpoint, keyspaceName, settings.FierceTimeout);
logger.Debug("Pool for node with endpoint {0} for keyspace '{1}'[Fierce] was created.", nodeEndpoint, keyspaceName);
return result;
}

private Pool<IThriftConnection> CreateConnectionPool(ICassandraClusterSettings settings, IPEndPoint nodeEndpoint, string keyspaceName, int timeout)
{
var connectionPoolMetrics = CommandMetricsFactory.GetConnectionPoolMetrics(settings, nodeEndpoint.Address.ToString().Replace('.', '_'), keyspaceName);
return new Pool<IThriftConnection>(
pool => CreateThriftConnection(nodeEndpoint, keyspaceName, timeout, settings.Credentials),
connectionPoolMetrics,
logger);
}

private readonly ICassandraClusterSettings clusterSettings;
private readonly ILog logger;
private readonly ICommandExecutor<ISimpleCommand> commandExecutor;
Expand Down
11 changes: 9 additions & 2 deletions Cassandra.ThriftClient/Core/GenericPool/Pool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

using SkbKontur.Cassandra.ThriftClient.Core.GenericPool.Exceptions;
using SkbKontur.Cassandra.ThriftClient.Core.GenericPool.Utils;
using SkbKontur.Cassandra.ThriftClient.Core.Metrics;
using SkbKontur.Cassandra.TimeBasedUuid;

using Vostok.Logging.Abstractions;
Expand All @@ -17,9 +18,10 @@ namespace SkbKontur.Cassandra.ThriftClient.Core.GenericPool
{
internal class Pool<T> : IDisposable where T : class, IDisposable, ILiveness
{
public Pool(Func<Pool<T>, T> itemFactory, ILog logger)
public Pool(Func<Pool<T>, T> itemFactory, IConnectionPoolMetrics connectionPoolMetrics, ILog logger)
{
this.itemFactory = itemFactory;
this.connectionPoolMetrics = connectionPoolMetrics;
this.logger = logger;
}

Expand Down Expand Up @@ -60,7 +62,11 @@ public void Release(T item)

public T AcquireNew()
{
var result = itemFactory(this);
T result;
using (connectionPoolMetrics.AcquireNewConnectionContext())
{
result = itemFactory(this);
}
MarkItemAsBusy(result);
return result;
}
Expand Down Expand Up @@ -141,6 +147,7 @@ private void MarkItemAsBusy(T result)
private int busyItemCount;
private readonly ReaderWriterLockSlim unusedItemCollectorLock = new ReaderWriterLockSlim();
private readonly Func<Pool<T>, T> itemFactory;
private readonly IConnectionPoolMetrics connectionPoolMetrics;
private readonly ILog logger;
private readonly ConcurrentStack<FreeItemInfo> freeItems = new ConcurrentStack<FreeItemInfo>();
private readonly ConcurrentDictionary<T, object> busyItems = new ConcurrentDictionary<T, object>(ObjectReferenceEqualityComparer<T>.Default);
Expand Down
Loading

0 comments on commit 4ce98bc

Please sign in to comment.