From 038a48979ac151e358ae7fe2613d8997b6bf1842 Mon Sep 17 00:00:00 2001 From: Swen Mun Date: Tue, 26 Feb 2019 01:05:52 +0900 Subject: [PATCH 01/12] Retry when exception occured during fetching blocks --- Libplanet/Net/Swarm.cs | 168 +++++++++++++++++++++++++---------------- 1 file changed, 105 insertions(+), 63 deletions(-) diff --git a/Libplanet/Net/Swarm.cs b/Libplanet/Net/Swarm.cs index f26731fcdb4..328171784ba 100644 --- a/Libplanet/Net/Swarm.cs +++ b/Libplanet/Net/Swarm.cs @@ -805,87 +805,129 @@ private async Task ProcessBlockHashes( try { - // We assume that the blocks are sorted in order. - Block oldest = blocks.First(); - Block latest = blocks.Last(); - Block tip = blockChain.Tip; + await AppendBlocksAsync( + blockChain, peer, blocks, cancellationToken + ); + _logger.Debug("Append complete."); + } + catch (Exception e) + { + _logger.Error(e, $"Append Failed. exception: {e}"); + throw; + } + } - if (tip == null || latest.Index >= tip.Index) + private async Task AppendBlocksAsync( + BlockChain blockChain, + Peer peer, + List> blocks, + CancellationToken cancellationToken + ) + where T : IAction + { + // We assume that the blocks are sorted in order. + Block oldest = blocks.First(); + Block latest = blocks.Last(); + Block tip = blockChain.Tip; + + if (tip == null || latest.Index >= tip.Index) + { + using (await _blockSyncMutex.LockAsync()) { - using (await _blockSyncMutex.LockAsync()) - { - _logger.Debug("Trying to find branchpoint..."); - BlockLocator locator = blockChain.GetBlockLocator(); - IEnumerable> hashes = - await GetBlockHashesAsync( - peer, locator, oldest.Hash, cancellationToken); - HashDigest branchPoint = hashes.First(); + _logger.Debug("Trying to find branchpoint..."); + BlockLocator locator = blockChain.GetBlockLocator(); + IEnumerable> hashes = + await GetBlockHashesAsync( + peer, locator, oldest.Hash, cancellationToken); + HashDigest branchPoint = hashes.First(); - _logger.Debug( - $"Branchpoint is " + - $"{ByteUtil.Hex(branchPoint.ToByteArray())}" - ); + _logger.Debug( + $"Branchpoint is " + + $"{ByteUtil.Hex(branchPoint.ToByteArray())}" + ); - BlockChain toSync; + BlockChain toSync; - if (tip == null || branchPoint == tip.Hash) - { - _logger.Debug("it doesn't need fork."); - toSync = blockChain; - } + if (tip == null || branchPoint == tip.Hash) + { + _logger.Debug("it doesn't need fork."); + toSync = blockChain; + } + + // FIXME BlockChain.Blocks.ContainsKey() can be very + // expensive. + // we can omit this clause if assume every chain shares + // same genesis block... + else if (!blockChain.Blocks.ContainsKey(branchPoint)) + { + toSync = new BlockChain( + blockChain.Policy, + blockChain.Store); + } + else + { + _logger.Debug("Forking needed. trying to fork..."); + toSync = blockChain.Fork(branchPoint); + _logger.Debug("Forking complete. "); + } - // FIXME BlockChain.Blocks.ContainsKey() can be very - // expensive. - // we can omit this clause if assume every chain shares - // same genesis block... - else if (!blockChain.Blocks.ContainsKey(branchPoint)) + _logger.Debug("Trying to fill up previous blocks..."); + + int retry = 3; + while (true) + { + try { - toSync = new BlockChain( - blockChain.Policy, - blockChain.Store); + await FillBlocksAsync( + peer, + toSync, + oldest.PreviousHash, + cancellationToken + ); + + break; } - else + catch (Exception e) { - _logger.Debug("Forking needed. trying to fork..."); - toSync = blockChain.Fork(branchPoint); - _logger.Debug("Forking complete. "); + if (retry > 0) + { + _logger.Error( + e, + "FillBlockAsync() failed. retrying..." + ); + retry--; + } + else + { + throw; + } } + } - _logger.Debug("Trying to fill up previous blocks..."); - await FillBlocksAsync( - peer, toSync, oldest.PreviousHash, cancellationToken - ); - _logger.Debug("Filled up. trying to concatenation..."); + _logger.Debug("Filled up. trying to concatenation..."); - foreach (Block block in blocks) - { - toSync.Append(block); - } + foreach (Block block in blocks) + { + toSync.Append(block); + } - _logger.Debug("Sync is done."); - if (!toSync.Id.Equals(blockChain.Id)) - { - _logger.Debug("trying to swapping chain..."); - blockChain.Swap(toSync); - _logger.Debug("Swapping complete"); - } + _logger.Debug("Sync is done."); + if (!toSync.Id.Equals(blockChain.Id)) + { + _logger.Debug("trying to swapping chain..."); + blockChain.Swap(toSync); + _logger.Debug("Swapping complete"); } } - else - { - _logger.Information( - "Received index is older than current chain's tip." + - " ignored."); - } - - BlockReceived.Set(); - _logger.Debug("Append complete."); } - catch (Exception e) + else { - _logger.Error(e, $"Append Failed. exception: {e}"); - throw; + _logger.Information( + "Received index is older than current chain's tip." + + " ignored."); } + + BlockReceived.Set(); } private async Task FillBlocksAsync( From 3206277dc4a4fb2dca3f1c22e782de94a2ff3d09 Mon Sep 17 00:00:00 2001 From: Swen Mun Date: Thu, 28 Feb 2019 15:04:51 +0900 Subject: [PATCH 02/12] Adjust a logging level on Swarm - hide raw message payload in an unittest logs --- Libplanet.Tests/Net/SwarmTest.cs | 2 +- Libplanet/Net/Swarm.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Libplanet.Tests/Net/SwarmTest.cs b/Libplanet.Tests/Net/SwarmTest.cs index aef7fc13fd0..7eaebfe632f 100644 --- a/Libplanet.Tests/Net/SwarmTest.cs +++ b/Libplanet.Tests/Net/SwarmTest.cs @@ -34,7 +34,7 @@ public class SwarmTest : IDisposable public SwarmTest(ITestOutputHelper output) { Log.Logger = new LoggerConfiguration() - .MinimumLevel.Verbose() + .MinimumLevel.Debug() .Enrich.WithThreadId() .WriteTo.TestOutput(output, outputTemplate: "{Timestamp:HH:mm:ss}[@{Swarm_listenUrl}][{ThreadId}] - {Message}") .CreateLogger() diff --git a/Libplanet/Net/Swarm.cs b/Libplanet/Net/Swarm.cs index 328171784ba..ce5e65ab4d4 100644 --- a/Libplanet/Net/Swarm.cs +++ b/Libplanet/Net/Swarm.cs @@ -665,7 +665,7 @@ private async Task ReceiveMessageAsync( continue; } - _logger.Debug($"The raw message[{raw}] has received."); + _logger.Verbose($"The raw message[{raw}] has received."); Message message = Message.Parse(raw, reply: false); _logger.Debug($"The message[{message}] has parsed."); From 848fa4031018e786433e3d97635b9e76818422c8 Mon Sep 17 00:00:00 2001 From: Swen Mun Date: Tue, 26 Feb 2019 19:48:39 +0900 Subject: [PATCH 03/12] Fix logger in SwarmTest --- Libplanet.Tests/Net/SwarmTest.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Libplanet.Tests/Net/SwarmTest.cs b/Libplanet.Tests/Net/SwarmTest.cs index 7eaebfe632f..76b0e2ed2c5 100644 --- a/Libplanet.Tests/Net/SwarmTest.cs +++ b/Libplanet.Tests/Net/SwarmTest.cs @@ -36,7 +36,7 @@ public SwarmTest(ITestOutputHelper output) Log.Logger = new LoggerConfiguration() .MinimumLevel.Debug() .Enrich.WithThreadId() - .WriteTo.TestOutput(output, outputTemplate: "{Timestamp:HH:mm:ss}[@{Swarm_listenUrl}][{ThreadId}] - {Message}") + .WriteTo.TestOutput(output, outputTemplate: "{Timestamp:HH:mm:ss}[@{SwarmId}][{ThreadId}] - {Message}") .CreateLogger() .ForContext(); From 2346d5372d63d5fb4694816b5cad5902669a3a75 Mon Sep 17 00:00:00 2001 From: Swen Mun Date: Tue, 26 Feb 2019 01:48:37 +0900 Subject: [PATCH 04/12] Boost peer related tests --- Libplanet.Tests/Net/SwarmTest.cs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Libplanet.Tests/Net/SwarmTest.cs b/Libplanet.Tests/Net/SwarmTest.cs index 76b0e2ed2c5..9f74c39746b 100644 --- a/Libplanet.Tests/Net/SwarmTest.cs +++ b/Libplanet.Tests/Net/SwarmTest.cs @@ -578,7 +578,6 @@ public void CanResolveUrl() private async Task StartAsync( Swarm swarm, BlockChain blockChain, - int millisecondsDistributeInterval = 1500, CancellationToken cancellationToken = default ) where T : IAction @@ -586,7 +585,7 @@ private async Task StartAsync( Task task = Task.Run( async () => await swarm.StartAsync( blockChain, - millisecondsDistributeInterval, + 200, cancellationToken ) ); From 6ebc633f162b2457d0cfd57cd74a42769790072f Mon Sep 17 00:00:00 2001 From: Swen Mun Date: Tue, 26 Feb 2019 20:07:26 +0900 Subject: [PATCH 05/12] Change Swarm._dealers to ConcurrentDictionary. --- Libplanet/Net/Swarm.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Libplanet/Net/Swarm.cs b/Libplanet/Net/Swarm.cs index ce5e65ab4d4..edfb53399d1 100644 --- a/Libplanet/Net/Swarm.cs +++ b/Libplanet/Net/Swarm.cs @@ -1,6 +1,7 @@ using System; using System.Collections; using System.Collections.Async; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Collections.Immutable; using System.Diagnostics; @@ -86,7 +87,7 @@ public Swarm( TxReceived = new AsyncAutoResetEvent(); BlockReceived = new AsyncAutoResetEvent(); - _dealers = new Dictionary(); + _dealers = new ConcurrentDictionary(); _router = new RouterSocket(); _distributeMutex = new AsyncLock(); From fc834b9637a4764669e9a6e76db2e567825d0ac2 Mon Sep 17 00:00:00 2001 From: Swen Mun Date: Wed, 27 Feb 2019 15:20:36 +0900 Subject: [PATCH 06/12] Shutdown swarm when SwarmTest was disposed. --- Libplanet.Tests/Net/SwarmTest.cs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/Libplanet.Tests/Net/SwarmTest.cs b/Libplanet.Tests/Net/SwarmTest.cs index 9f74c39746b..0bfe321fc81 100644 --- a/Libplanet.Tests/Net/SwarmTest.cs +++ b/Libplanet.Tests/Net/SwarmTest.cs @@ -71,6 +71,11 @@ public void Dispose() _fx1.Dispose(); _fx2.Dispose(); _fx3.Dispose(); + + foreach (Swarm s in _swarms) + { + s.Dispose(); + } } [Fact] From 140244fb6b557958a720d1306c40e1cd05348e5a Mon Sep 17 00:00:00 2001 From: Swen Mun Date: Wed, 27 Feb 2019 20:50:26 +0900 Subject: [PATCH 07/12] Add some wait condition to stabilize tests. --- Libplanet.Tests/Net/SwarmTest.cs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Libplanet.Tests/Net/SwarmTest.cs b/Libplanet.Tests/Net/SwarmTest.cs index 0bfe321fc81..08cfb2350f9 100644 --- a/Libplanet.Tests/Net/SwarmTest.cs +++ b/Libplanet.Tests/Net/SwarmTest.cs @@ -473,6 +473,7 @@ public async Task CanBroadcastTx() await EnsureExchange(swarmA, swarmB); await EnsureExchange(swarmA, swarmC); + await EnsureExchange(swarmB, swarmC); await swarmA.BroadcastTxsAsync(new[] { tx }); @@ -530,6 +531,7 @@ public async Task CanBroadcastBlock() await EnsureExchange(swarmA, swarmB); await EnsureExchange(swarmA, swarmC); + await EnsureExchange(swarmB, swarmC); await swarmB.BroadcastBlocksAsync(new[] { chainB.Last() }); From 3f711cdb867525ff223d864ea613857f16ef9b6e Mon Sep 17 00:00:00 2001 From: Swen Mun Date: Thu, 28 Feb 2019 14:50:25 +0900 Subject: [PATCH 08/12] Fix a conflict when stopping and running simultaneously. --- Libplanet.Tests/Net/SwarmTest.cs | 13 +++++++++++++ Libplanet/Net/Swarm.cs | 2 -- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/Libplanet.Tests/Net/SwarmTest.cs b/Libplanet.Tests/Net/SwarmTest.cs index 08cfb2350f9..6b4ba56401e 100644 --- a/Libplanet.Tests/Net/SwarmTest.cs +++ b/Libplanet.Tests/Net/SwarmTest.cs @@ -582,6 +582,19 @@ public void CanResolveUrl() s.AsPeer.Urls); } + [Fact] + public async Task CanStopGracefullyWhileStarting() + { + Swarm a = _swarms[0]; + Swarm b = _swarms[1]; + + await StartAsync(b, _blockchains[1]); + await a.AddPeersAsync(new[] { b.AsPeer }); + + Task t = await StartAsync(a, _blockchains[0]); + await Task.WhenAll(a.StopAsync(), t); + } + private async Task StartAsync( Swarm swarm, BlockChain blockChain, diff --git a/Libplanet/Net/Swarm.cs b/Libplanet/Net/Swarm.cs index edfb53399d1..09c198f4119 100644 --- a/Libplanet/Net/Swarm.cs +++ b/Libplanet/Net/Swarm.cs @@ -646,8 +646,6 @@ private async Task ReceiveMessageAsync( BlockChain blockChain, CancellationToken cancellationToken) where T : IAction { - CheckStarted(); - while (Running) { try From 10d4b69ffec9aaa5875ff0eaacf0236746e2c656 Mon Sep 17 00:00:00 2001 From: Swen Mun Date: Thu, 28 Feb 2019 15:11:03 +0900 Subject: [PATCH 09/12] Fix SwarmTest.CanBoradcastBlock - replace fixed delay with Swarm.BlockReceive event. --- Libplanet.Tests/Net/SwarmTest.cs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/Libplanet.Tests/Net/SwarmTest.cs b/Libplanet.Tests/Net/SwarmTest.cs index 6b4ba56401e..561f15b6c5e 100644 --- a/Libplanet.Tests/Net/SwarmTest.cs +++ b/Libplanet.Tests/Net/SwarmTest.cs @@ -535,7 +535,8 @@ public async Task CanBroadcastBlock() await swarmB.BroadcastBlocksAsync(new[] { chainB.Last() }); - await Task.Delay(5000); + await swarmC.BlockReceived.WaitAsync(); + await swarmA.BlockReceived.WaitAsync(); Assert.Equal(chainB.AsEnumerable(), chainC); @@ -545,7 +546,8 @@ public async Task CanBroadcastBlock() await swarmA.BroadcastBlocksAsync(new[] { chainA.Last() }); - await Task.Delay(5000); + await swarmB.BlockReceived.WaitAsync(); + await swarmC.BlockReceived.WaitAsync(); Assert.Equal(chainA.AsEnumerable(), chainB); Assert.Equal(chainA.AsEnumerable(), chainC); From a31307fc0e24dfc70ff24c023cd3533567f5a067 Mon Sep 17 00:00:00 2001 From: Swen Mun Date: Thu, 28 Feb 2019 18:18:11 +0900 Subject: [PATCH 10/12] Add mutex to avoid conflicts surrounding Swarm.Running --- Libplanet/Net/Swarm.cs | 80 +++++++++++++++++++++++------------------- 1 file changed, 44 insertions(+), 36 deletions(-) diff --git a/Libplanet/Net/Swarm.cs b/Libplanet/Net/Swarm.cs index 09c198f4119..5d55da4cf22 100644 --- a/Libplanet/Net/Swarm.cs +++ b/Libplanet/Net/Swarm.cs @@ -37,6 +37,7 @@ public partial class Swarm : ICollection, IDisposable private readonly IDictionary _dealers; private readonly TimeSpan _dialTimeout; + private readonly AsyncLock _runningMutex; private readonly AsyncLock _distributeMutex; private readonly AsyncLock _receiveMutex; private readonly AsyncLock _blockSyncMutex; @@ -93,6 +94,7 @@ public Swarm( _distributeMutex = new AsyncLock(); _receiveMutex = new AsyncLock(); _blockSyncMutex = new AsyncLock(); + _runningMutex = new AsyncLock(); _ipAddress = ipAddress ?? GetLocalIPAddress(); _listenPort = listenPort; @@ -312,20 +314,23 @@ public async Task StopAsync( CancellationToken cancellationToken = default(CancellationToken)) { _logger.Debug("Stopping..."); - if (Running) + using (await _runningMutex.LockAsync()) { - _removedPeers[AsPeer] = DateTimeOffset.UtcNow; - await DistributeDeltaAsync(false, cancellationToken); - - _router.Dispose(); - foreach (DealerSocket s in _dealers.Values) + if (Running) { - s.Dispose(); - } + _removedPeers[AsPeer] = DateTimeOffset.UtcNow; + await DistributeDeltaAsync(false, cancellationToken); + + _router.Dispose(); + foreach (DealerSocket s in _dealers.Values) + { + s.Dispose(); + } - _dealers.Clear(); + _dealers.Clear(); - Running = false; + Running = false; + } } _logger.Debug("Stopped."); @@ -388,36 +393,39 @@ public async Task StartAsync( try { - Running = true; - foreach (Peer peer in _peers.Keys) + using (await _runningMutex.LockAsync()) { - try + Running = true; + foreach (Peer peer in _peers.Keys) { - Peer replacedPeer = await DialPeerAsync( - peer, - cancellationToken - ); - if (replacedPeer != peer) + try { - _peers[replacedPeer] = _peers[peer]; - _peers.Remove(peer); + Peer replacedPeer = await DialPeerAsync( + peer, + cancellationToken + ); + if (replacedPeer != peer) + { + _peers[replacedPeer] = _peers[peer]; + _peers.Remove(peer); + } + } + catch (TimeoutException e) + { + _logger.Error( + e, + $"TimeoutException occured ({peer})." + ); + continue; + } + catch (IOException e) + { + _logger.Error( + e, + $"IOException occured ({peer})." + ); + continue; } - } - catch (TimeoutException e) - { - _logger.Error( - e, - $"TimeoutException occured ({peer})." - ); - continue; - } - catch (IOException e) - { - _logger.Error( - e, - $"IOException occured ({peer})." - ); - continue; } } From 9e82583272ba4af7c851d096e27bf161c1078536 Mon Sep 17 00:00:00 2001 From: Swen Mun Date: Thu, 28 Feb 2019 19:05:58 +0900 Subject: [PATCH 11/12] Add changelog. --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index ac9e8193801..118ae80bf76 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -4,7 +4,7 @@ Libplanet changelog Version 0.1.1 ------------- -To be released. +- Improved stability of `Swarm` and `SwarmTest`. Version 0.1.0 From 4ada47e983e519b5de7571bab478222b5db1e4e4 Mon Sep 17 00:00:00 2001 From: Swen Mun Date: Sat, 2 Mar 2019 15:51:38 +0900 Subject: [PATCH 12/12] Fix travis script checking changelog --- .travis.yml | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/.travis.yml b/.travis.yml index 5c3088126cc..4b93b483999 100644 --- a/.travis.yml +++ b/.travis.yml @@ -135,12 +135,7 @@ script: elif [[ "$TRAVIS_TAG" != "" ]]; then ! grep -i "to be released" CHANGES.md else - if [[ "$TRAVIS_PULL_REQUEST_SHA" = "" ]]; then - commit_range="$TRAVIS_COMMIT_RANGE" - else - commit_range="$TRAVIS_BRANCH...$TRAVIS_PULL_REQUEST_SHA" - fi - [[ "$(git diff --name-only "$commit_range" | grep CHANGES\.md)" != "" ]] + [[ "$(git diff --name-only "$TRAVIS_COMMIT_RANGE" | grep CHANGES\.md)" != "" ]] fi # Check coding styles