diff --git a/src/main/scala/ldbc/finbench/datagen/generation/generators/ActivityGenerator.scala b/src/main/scala/ldbc/finbench/datagen/generation/generators/ActivityGenerator.scala index 40500b54..c04429cb 100644 --- a/src/main/scala/ldbc/finbench/datagen/generation/generators/ActivityGenerator.scala +++ b/src/main/scala/ldbc/finbench/datagen/generation/generators/ActivityGenerator.scala @@ -94,17 +94,21 @@ class ActivityGenerator()(implicit spark: SparkSession) DatagenParams.companyInvestedFraction, sampleRandom.nextLong() ) - .mapPartitionsWithIndex((index, targets) => { + .mapPartitionsWithIndex { (index, targets) => personInvestEvent.resetState(index) companyInvestEvent.resetState(index) personInvestEvent .personInvestPartition(persons.value.asJava, targets.toList.asJava) companyInvestEvent - .companyInvestPartition(companies.value.asJava, targets.toList.asJava) - targets.map(target => target.scaleInvestmentRatios()) - }) - - companyRDD + .companyInvestPartition( + companies.value.asJava, + targets.toList.asJava + ) + targets.map { target => + target.scaleInvestmentRatios() + target + } + } } def signInEvent(