From 4595000eabf3ffc7c954791d2c63a29d3f052841 Mon Sep 17 00:00:00 2001 From: qishipengqsp Date: Thu, 19 Sep 2024 22:33:33 +0800 Subject: [PATCH] wip --- .../generation/generators/ActivityGenerator.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/main/scala/ldbc/finbench/datagen/generation/generators/ActivityGenerator.scala b/src/main/scala/ldbc/finbench/datagen/generation/generators/ActivityGenerator.scala index d4e93ea6..185bf5ac 100644 --- a/src/main/scala/ldbc/finbench/datagen/generation/generators/ActivityGenerator.scala +++ b/src/main/scala/ldbc/finbench/datagen/generation/generators/ActivityGenerator.scala @@ -173,23 +173,23 @@ class ActivityGenerator()(implicit spark: SparkSession) loanRDD: RDD[Loan], accountRDD: RDD[Account] ): (RDD[Loan]) = { - val sampledAccounts = spark.sparkContext.broadcast( - accountRDD + val numPartition = loanRDD.getNumPartitions + loanRDD.mapPartitionsWithIndex((index, loans) => { + val sampledAccounts = accountRDD .sample( withReplacement = false, - DatagenParams.loanInvolvedAccountsFraction, + DatagenParams.loanInvolvedAccountsFraction / numPartition, sampleRandom.nextLong() ) .collect() .toList - ) + .asJava - loanRDD.mapPartitionsWithIndex((index, loans) => { val loanSubEvents = new LoanActivitiesEvents loanSubEvents .afterLoanApplied( loans.toList.asJava, - sampledAccounts.value.asJava, + sampledAccounts, index ) .iterator()