Skip to content
This repository has been archived by the owner on Aug 23, 2020. It is now read-only.

send heartbeat pulses and request tx using heartbeat #1830

Open
wants to merge 2 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/main/java/com/iota/iri/Iota.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.iota.iri.conf.IotaConfig;
import com.iota.iri.controllers.TipsViewModel;
import com.iota.iri.controllers.TransactionViewModel;
import com.iota.iri.network.HeartbeatPulse;
import com.iota.iri.network.NeighborRouter;
import com.iota.iri.network.TipsRequester;
import com.iota.iri.network.TransactionRequester;
Expand Down Expand Up @@ -113,6 +114,7 @@ public class Iota {

public LocalSnapshotsPersistenceProvider localSnapshotsDb;
public final CacheManager cacheManager;
public final HeartbeatPulse heartbeatPulse;

/**
* Initializes the latest snapshot and then creates all services needed to run an IOTA node.
Expand All @@ -130,7 +132,7 @@ public Iota(IotaConfig configuration, SpentAddressesProvider spentAddressesProvi
TransactionRequester transactionRequester, NeighborRouter neighborRouter,
TransactionProcessingPipeline transactionProcessingPipeline, TipsRequester tipsRequester,
TipsViewModel tipsViewModel, TipSelector tipsSelector, LocalSnapshotsPersistenceProvider localSnapshotsDb,
CacheManager cacheManager, TransactionSolidifier transactionSolidifier) {
CacheManager cacheManager, TransactionSolidifier transactionSolidifier, HeartbeatPulse heartbeatPulse) {
this.configuration = configuration;

this.ledgerService = ledgerService;
Expand Down Expand Up @@ -160,6 +162,7 @@ public Iota(IotaConfig configuration, SpentAddressesProvider spentAddressesProvi

this.tipsSelector = tipsSelector;
this.cacheManager = cacheManager;
this.heartbeatPulse = heartbeatPulse;
}

private void initDependencies() throws SnapshotException, SpentAddressesException {
Expand Down Expand Up @@ -223,6 +226,8 @@ public void init() throws Exception {
if (transactionPruner != null) {
transactionPruner.start();
}

heartbeatPulse.start();
}

private void rescanDb() throws Exception {
Expand Down Expand Up @@ -270,6 +275,7 @@ public void shutdown() throws Exception {
localSnapshotManager.shutdown();
}

heartbeatPulse.shutdown();
tipsRequester.shutdown();
txPipeline.shutdown();
neighborRouter.shutdown();
Expand Down
32 changes: 18 additions & 14 deletions src/main/java/com/iota/iri/MainInjectionConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@
import com.iota.iri.cache.impl.CacheManagerImpl;
import com.iota.iri.conf.IotaConfig;
import com.iota.iri.controllers.TipsViewModel;
import com.iota.iri.network.NeighborRouter;
import com.iota.iri.network.TipsRequester;
import com.iota.iri.network.TransactionRequester;
import com.iota.iri.network.*;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import com.iota.iri.network.pipeline.TransactionProcessingPipeline;
import com.iota.iri.service.API;
import com.iota.iri.service.ledger.LedgerService;
Expand Down Expand Up @@ -100,6 +98,12 @@ LatestMilestoneTracker provideLatestMilestoneTracker(Tangle tangle, SnapshotProv
return new LatestMilestoneTrackerImpl(tangle, snapshotProvider, milestoneService, milestoneSolidifier, configuration);
}

@Singleton
@Provides
HeartbeatPulse providerHeartbeatPulse(NeighborRouter neighborRouter, SnapshotProvider snapshotProvider){
return new HeartbeatPulseImpl(neighborRouter, snapshotProvider);
}

@Singleton
@Provides
LatestSolidMilestoneTracker provideLatestSolidMilestoneTracker(Tangle tangle, SnapshotProvider snapshotProvider,
Expand Down Expand Up @@ -171,21 +175,21 @@ TipSelector provideTipSelector(Tangle tangle, SnapshotProvider snapshotProvider,
@Singleton
@Provides
Iota provideIota(SpentAddressesProvider spentAddressesProvider, SpentAddressesService spentAddressesService,
SnapshotProvider snapshotProvider, SnapshotService snapshotService,
@Nullable LocalSnapshotManager localSnapshotManager, MilestoneService milestoneService,
LatestMilestoneTracker latestMilestoneTracker, LatestSolidMilestoneTracker latestSolidMilestoneTracker,
SeenMilestonesRetriever seenMilestonesRetriever, LedgerService ledgerService,
@Nullable TransactionPruner transactionPruner, MilestoneSolidifier milestoneSolidifier,
BundleValidator bundleValidator, Tangle tangle, TransactionValidator transactionValidator,
TransactionRequester transactionRequester, NeighborRouter neighborRouter,
TransactionProcessingPipeline transactionProcessingPipeline, TipsRequester tipsRequester,
TipsViewModel tipsViewModel, TipSelector tipsSelector, LocalSnapshotsPersistenceProvider localSnapshotsDb,
CacheManager cacheManager, TransactionSolidifier transactionSolidifier) {
SnapshotProvider snapshotProvider, SnapshotService snapshotService,
@Nullable LocalSnapshotManager localSnapshotManager, MilestoneService milestoneService,
LatestMilestoneTracker latestMilestoneTracker, LatestSolidMilestoneTracker latestSolidMilestoneTracker,
SeenMilestonesRetriever seenMilestonesRetriever, LedgerService ledgerService,
@Nullable TransactionPruner transactionPruner, MilestoneSolidifier milestoneSolidifier,
BundleValidator bundleValidator, Tangle tangle, TransactionValidator transactionValidator,
TransactionRequester transactionRequester, NeighborRouter neighborRouter,
TransactionProcessingPipeline transactionProcessingPipeline, TipsRequester tipsRequester,
TipsViewModel tipsViewModel, TipSelector tipsSelector, LocalSnapshotsPersistenceProvider localSnapshotsDb,
CacheManager cacheManager, TransactionSolidifier transactionSolidifier, HeartbeatPulse heartbeatPulse) {
return new Iota(configuration, spentAddressesProvider, spentAddressesService, snapshotProvider, snapshotService,
localSnapshotManager, milestoneService, latestMilestoneTracker, latestSolidMilestoneTracker,
seenMilestonesRetriever, ledgerService, transactionPruner, milestoneSolidifier, bundleValidator, tangle,
transactionValidator, transactionRequester, neighborRouter, transactionProcessingPipeline,
tipsRequester, tipsViewModel, tipsSelector, localSnapshotsDb, cacheManager, transactionSolidifier);
tipsRequester, tipsViewModel, tipsSelector, localSnapshotsDb, cacheManager, transactionSolidifier, heartbeatPulse);
}

@Singleton
Expand Down
21 changes: 21 additions & 0 deletions src/main/java/com/iota/iri/network/HeartbeatPulse.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.iota.iri.network;

/**
* A background worker that sends {@link com.iota.iri.network.protocol.Heartbeat}s to neighbors.
*/
public interface HeartbeatPulse {
/**
* Starts the background worker that calls {@link #sendHeartbeat()} rhythmically.
*/
void start();

/**
* Stops the background worker that sends out heartbeats.
*/
void shutdown();

/**
* Sends {@link com.iota.iri.network.protocol.Heartbeat} to all neighbors.
*/
void sendHeartbeat();
}
62 changes: 62 additions & 0 deletions src/main/java/com/iota/iri/network/HeartbeatPulseImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.iota.iri.network;

import com.iota.iri.network.neighbor.Neighbor;
import com.iota.iri.network.protocol.Heartbeat;
import com.iota.iri.service.snapshot.SnapshotProvider;
import com.iota.iri.utils.thread.DedicatedScheduledExecutorService;
import com.iota.iri.utils.thread.SilentScheduledExecutorService;

import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
* Implementation of {@link HeartbeatPulse} interface.
*/
public class HeartbeatPulseImpl implements HeartbeatPulse {

private NeighborRouter neighborRouter;
private SnapshotProvider snapshotProvider;

/**
* The rate in milliseconds, at which heartbeats are sent out.
*/
private static final int HEARTBEAT_RATE_MILLIS = 60000;

/**
* Holds a reference to the manager of the background worker.
*/
private final SilentScheduledExecutorService executorService = new DedicatedScheduledExecutorService(
"Heartbeat Pulse");

public HeartbeatPulseImpl(NeighborRouter neighborRouter, SnapshotProvider snapshotProvider) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.neighborRouter = neighborRouter;
this.snapshotProvider = snapshotProvider;
}

@Override
public void start() {
executorService.silentScheduleWithFixedDelay(this::sendHeartbeat, 0, HEARTBEAT_RATE_MILLIS,
TimeUnit.MILLISECONDS);
}

@Override
public void shutdown() {
executorService.shutdownNow();
}

@Override
public void sendHeartbeat() {
int lastSolidMilestoneIndex = snapshotProvider.getLatestSnapshot().getIndex();
// The first solid milestone in DB w/o pruning.
int firstSolidMilestoneIndex = 0; // TODO: how do we get this?

Heartbeat heartbeat = new Heartbeat();
heartbeat.setFirstSolidMilestoneIndex(firstSolidMilestoneIndex);
heartbeat.setLastSolidMilestoneIndex(lastSolidMilestoneIndex);

Map<String, Neighbor> currentlyConnectedNeighbors = neighborRouter.getConnectedNeighbors();
for (Neighbor neighbor : currentlyConnectedNeighbors.values()) {
neighborRouter.gossipHeartbeatTo(neighbor, heartbeat);
}
}
}
9 changes: 9 additions & 0 deletions src/main/java/com/iota/iri/network/NeighborRouter.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.iota.iri.network.neighbor.Neighbor;
import com.iota.iri.network.pipeline.TransactionProcessingPipeline;
import com.iota.iri.network.pipeline.TransactionProcessingPipelineImpl;
import com.iota.iri.network.protocol.Heartbeat;

import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -96,6 +97,14 @@ public interface NeighborRouter {
void gossipTransactionTo(Neighbor neighbor, TransactionViewModel tvm, boolean useHashOfTVM)
throws Exception;

/**
* Gossips the given heartbeat to the given neighbor.
*
* @param neighbor The {@link Neighbor} to gossip the heartbeat to
* @param heartbeat The {@link Heartbeat} to gossip
*/
void gossipHeartbeatTo(Neighbor neighbor, Heartbeat heartbeat);

/**
* Shut downs the {@link NeighborRouter} and all currently open connections.
*/
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/com/iota/iri/network/NeighborRouterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.iota.iri.network.pipeline.TransactionProcessingPipeline;
import com.iota.iri.network.pipeline.TransactionProcessingPipelineImpl;
import com.iota.iri.network.protocol.Handshake;
import com.iota.iri.network.protocol.Heartbeat;
import com.iota.iri.network.protocol.Protocol;
import com.iota.iri.utils.Converter;

Expand Down Expand Up @@ -909,6 +910,13 @@ public void gossipTransactionTo(Neighbor neighbor, TransactionViewModel tvm, boo
neighbor.getMetrics().incrSentTransactionsCount();
}

@Override
public void gossipHeartbeatTo(Neighbor neighbor, Heartbeat heartbeat) {
ByteBuffer packet = Protocol.createHeartbeatPacket(heartbeat);
neighbor.send(packet);
neighbor.getMetrics().incrSentHeartbeatCount();
}

@Override
public void shutdown() {
shutdown.set(true);
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/com/iota/iri/network/neighbor/NeighborMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,16 @@ public interface NeighborMetrics {
* @return the number of packets dropped from the neighbor's send queue
*/
long incrDroppedSendPacketsCount();

/**
* Increments the sent heartbeat count
* @return The number of heartbeat that have been sent
*/
long incrSentHeartbeatCount();

/**
* Gets the heartbeat count
* @return The heartbeat count
*/
long getHeartbeatCount();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public class NeighborMetricsImpl implements NeighborMetrics {
private AtomicLong sentTxsCount = new AtomicLong();
private AtomicLong newTxsCount = new AtomicLong();
private AtomicLong droppedSendPacketsCount = new AtomicLong();
private AtomicLong heartbeatCount = new AtomicLong();

@Override
public long getAllTransactionsCount() {
Expand Down Expand Up @@ -86,4 +87,14 @@ public long getDroppedSendPacketsCount() {
public long incrDroppedSendPacketsCount() {
return droppedSendPacketsCount.incrementAndGet();
}

@Override
public long incrSentHeartbeatCount() {
return heartbeatCount.incrementAndGet();
}

@Override
public long getHeartbeatCount() {
return heartbeatCount.get();
}
}
11 changes: 10 additions & 1 deletion src/main/java/com/iota/iri/network/pipeline/BroadcastStage.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.iota.iri.controllers.TransactionViewModel;
import com.iota.iri.network.NeighborRouter;
import com.iota.iri.network.neighbor.Neighbor;
import com.iota.iri.network.protocol.Heartbeat;
import com.iota.iri.service.validation.TransactionSolidifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -52,7 +53,15 @@ public ProcessingContext process(ProcessingContext ctx) {
continue;
}
try {
neighborRouter.gossipTransactionTo(neighbor, tvm);
boolean shouldGossip = true;
//neighbor supports STING. Else fall backwards
if(neighbor.getProtocolVersion() >= 2){
Heartbeat heartbeat = neighbor.heartbeat();
shouldGossip = tvm.snapshotIndex() >= heartbeat.getFirstSolidMilestoneIndex() && tvm.snapshotIndex() <= heartbeat.getLastSolidMilestoneIndex();
}
if(shouldGossip){
neighborRouter.gossipTransactionTo(neighbor, tvm);
}
} catch (Exception e) {
log.error(e.getMessage());
}
Expand Down
1 change: 1 addition & 0 deletions src/main/java/com/iota/iri/network/protocol/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class Protocol {
public final static byte[] SUPPORTED_PROTOCOL_VERSIONS = {
/* supports protocol version(s): 1 */
(byte) 0b00000001,
(byte) 0b00000011,
};
/**
* The amount of bytes dedicated for the message type in the packet header.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


/**
* <p>
* Creates a manager for the local snapshots, that takes care of automatically creating local snapshots when the defined
Expand Down