diff --git a/src/main/java/ldbc/finbench/datagen/generation/DatagenParams.java b/src/main/java/ldbc/finbench/datagen/generation/DatagenParams.java index 9ee76add..e2e6953f 100644 --- a/src/main/java/ldbc/finbench/datagen/generation/DatagenParams.java +++ b/src/main/java/ldbc/finbench/datagen/generation/DatagenParams.java @@ -53,7 +53,6 @@ public class DatagenParams { public static double transferLimitProCorrelated = 0.0; public static long transferMaxAmount = 0; public static String transferGenerationMode; - public static int transferShuffleTimes; public static double accountWithdrawFraction = 0.0; public static int maxWithdrawals = 0; public static long withdrawMaxAmount = 0; @@ -113,7 +112,6 @@ public static void readConf(DatagenConfiguration conf) { transferLimitProCorrelated = doubleConf(conf, "transfer.limitProCorrelated"); transferMaxAmount = longConf(conf, "transfer.maxAmount"); transferGenerationMode = stringConf(conf, "transfer.generationMode"); - transferShuffleTimes = intConf(conf, "transfer.shuffleTimes"); accountWithdrawFraction = doubleConf(conf, "withdraw.accountWithdrawFraction"); maxWithdrawals = intConf(conf, "withdraw.maxWithdrawals"); diff --git a/src/main/java/ldbc/finbench/datagen/generation/events/TransferEvent.java b/src/main/java/ldbc/finbench/datagen/generation/events/TransferEvent.java index a0d02084..6991cc42 100644 --- a/src/main/java/ldbc/finbench/datagen/generation/events/TransferEvent.java +++ b/src/main/java/ldbc/finbench/datagen/generation/events/TransferEvent.java @@ -12,10 +12,8 @@ public class TransferEvent implements Serializable { private final RandomGeneratorFarm randomFarm; private final DegreeDistribution multiplicityDist; - private final double partRatio; public TransferEvent() { - this.partRatio = 1.0 / DatagenParams.transferShuffleTimes; randomFarm = new RandomGeneratorFarm(); multiplicityDist = DatagenParams.getTransferMultiplicityDistribution(); multiplicityDist.initialize(); @@ -37,15 +35,9 @@ private LinkedList getIndexList(int size) { // Generation to parts will mess up the average degree(make it bigger than expected) caused by ceiling operations. // Also, it will mess up the long tail range of powerlaw distribution of degrees caused by 1 rounded to 2. // See the plot drawn by check_transfer.py for more details. - public List transferPart(List accounts, int blockId) { + public List transfer(List accounts, int blockId) { resetState(blockId); - // scale to percentage - for (Account account : accounts) { - account.setMaxOutDegree((long) Math.ceil(account.getRawMaxOutDegree() * partRatio)); - account.setMaxInDegree((long) Math.ceil(account.getRawMaxInDegree() * partRatio)); - } - List transfers = new LinkedList<>(); LinkedList availableToAccountIds = getIndexList(accounts.size()); // available transferTo accountIds @@ -77,8 +69,6 @@ public List transferPart(List accounts, int blockId) { System.out.println("[Transfer] All accounts skipped for " + from.getAccountId()); break; // end loop if all accounts are skipped } - // System.out.println("Loop for " + from.getAccountId() + ", skippedCount: " + skippedCount + ", " - // + "availableToAccountIds " + availableToAccountIds.size()); } } return transfers; diff --git a/src/main/resources/params_default.ini b/src/main/resources/params_default.ini index 6a1bba5b..e3c6498c 100644 --- a/src/main/resources/params_default.ini +++ b/src/main/resources/params_default.ini @@ -24,7 +24,7 @@ transfer.baseProbCorrelated:0.99 transfer.limitProCorrelated:0.5 transfer.maxAmount:10000000 transfer.generationMode:loose -transfer.shuffleTimes:3 +transfer.shuffleTimes:1 withdraw.accountWithdrawFraction:0.3 withdraw.maxWithdrawals:30 diff --git a/src/main/scala/ldbc/finbench/datagen/generation/generators/ActivityGenerator.scala b/src/main/scala/ldbc/finbench/datagen/generation/generators/ActivityGenerator.scala index b010a268..c1e48ebc 100644 --- a/src/main/scala/ldbc/finbench/datagen/generation/generators/ActivityGenerator.scala +++ b/src/main/scala/ldbc/finbench/datagen/generation/generators/ActivityGenerator.scala @@ -202,18 +202,13 @@ class ActivityGenerator()(implicit spark: SparkSession) def transferEvent(accountRDD: RDD[Account]): RDD[Transfer] = { val transferEvent = new TransferEvent - Array - .fill(DatagenParams.transferShuffleTimes) { - accountRDD - .repartition(accountRDD.getNumPartitions) - .mapPartitionsWithIndex((index, accounts) => { - transferEvent - .transferPart(accounts.toList.asJava, index) - .iterator() - .asScala - }) - } - .reduce(_ union _) + accountRDD + .mapPartitionsWithIndex((index, accounts) => { + transferEvent + .transfer(accounts.toList.asJava, index) + .iterator() + .asScala + }) } // TODO: rewrite it with account centric