Skip to content

Commit

Permalink
simplify transfer
Browse files Browse the repository at this point in the history
  • Loading branch information
qishipengqsp committed Sep 16, 2024
1 parent d05e55d commit 3de5e79
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ public class DatagenParams {
public static double transferLimitProCorrelated = 0.0;
public static long transferMaxAmount = 0;
public static String transferGenerationMode;
public static int transferShuffleTimes;
public static double accountWithdrawFraction = 0.0;
public static int maxWithdrawals = 0;
public static long withdrawMaxAmount = 0;
Expand Down Expand Up @@ -113,7 +112,6 @@ public static void readConf(DatagenConfiguration conf) {
transferLimitProCorrelated = doubleConf(conf, "transfer.limitProCorrelated");
transferMaxAmount = longConf(conf, "transfer.maxAmount");
transferGenerationMode = stringConf(conf, "transfer.generationMode");
transferShuffleTimes = intConf(conf, "transfer.shuffleTimes");

accountWithdrawFraction = doubleConf(conf, "withdraw.accountWithdrawFraction");
maxWithdrawals = intConf(conf, "withdraw.maxWithdrawals");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@
public class TransferEvent implements Serializable {
private final RandomGeneratorFarm randomFarm;
private final DegreeDistribution multiplicityDist;
private final double partRatio;

public TransferEvent() {
this.partRatio = 1.0 / DatagenParams.transferShuffleTimes;
randomFarm = new RandomGeneratorFarm();
multiplicityDist = DatagenParams.getTransferMultiplicityDistribution();
multiplicityDist.initialize();
Expand All @@ -37,15 +35,9 @@ private LinkedList<Integer> getIndexList(int size) {
// Generation to parts will mess up the average degree(make it bigger than expected) caused by ceiling operations.
// Also, it will mess up the long tail range of powerlaw distribution of degrees caused by 1 rounded to 2.
// See the plot drawn by check_transfer.py for more details.
public List<Transfer> transferPart(List<Account> accounts, int blockId) {
public List<Transfer> transfer(List<Account> accounts, int blockId) {
resetState(blockId);

// scale to percentage
for (Account account : accounts) {
account.setMaxOutDegree((long) Math.ceil(account.getRawMaxOutDegree() * partRatio));
account.setMaxInDegree((long) Math.ceil(account.getRawMaxInDegree() * partRatio));
}

List<Transfer> transfers = new LinkedList<>();
LinkedList<Integer> availableToAccountIds = getIndexList(accounts.size()); // available transferTo accountIds

Expand Down Expand Up @@ -77,8 +69,6 @@ public List<Transfer> transferPart(List<Account> accounts, int blockId) {
System.out.println("[Transfer] All accounts skipped for " + from.getAccountId());
break; // end loop if all accounts are skipped
}
// System.out.println("Loop for " + from.getAccountId() + ", skippedCount: " + skippedCount + ", "
// + "availableToAccountIds " + availableToAccountIds.size());
}
}
return transfers;
Expand Down
2 changes: 1 addition & 1 deletion src/main/resources/params_default.ini
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ transfer.baseProbCorrelated:0.99
transfer.limitProCorrelated:0.5
transfer.maxAmount:10000000
transfer.generationMode:loose
transfer.shuffleTimes:3
transfer.shuffleTimes:1

withdraw.accountWithdrawFraction:0.3
withdraw.maxWithdrawals:30
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,18 +202,13 @@ class ActivityGenerator()(implicit spark: SparkSession)
def transferEvent(accountRDD: RDD[Account]): RDD[Transfer] = {
val transferEvent = new TransferEvent

Array
.fill(DatagenParams.transferShuffleTimes) {
accountRDD
.repartition(accountRDD.getNumPartitions)
.mapPartitionsWithIndex((index, accounts) => {
transferEvent
.transferPart(accounts.toList.asJava, index)
.iterator()
.asScala
})
}
.reduce(_ union _)
accountRDD
.mapPartitionsWithIndex((index, accounts) => {
transferEvent
.transfer(accounts.toList.asJava, index)
.iterator()
.asScala
})
}

// TODO: rewrite it with account centric
Expand Down

0 comments on commit 3de5e79

Please sign in to comment.