Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
qishipengqsp committed Aug 23, 2024
1 parent a0f0231 commit 938d68d
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 27 deletions.
1 change: 1 addition & 0 deletions scripts/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def run_local(
**({'spark.serializer': 'org.apache.spark.serializer.KryoSerializer'}),
**({'spark.executor.extraJavaOptions': '-XX:+UseG1GC'}),
**({'spark.driver.maxResultSize': '5g'}),
# **({'spark.driver.extraJavaOptions': '-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005'}), # Debug
# **({'spark.executor.extraJavaOptions': '-verbose:gc -XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps'}),
**spark_conf
}
Expand Down
2 changes: 1 addition & 1 deletion scripts/run_local.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ OUTPUT_DIR=out

# For more command line arguments, see the main entry for more information at
# src/main/scala/ldbc/finbench/datagen/LdbcDatagen.scala
time python3 scripts/run.py --jar $LDBC_FINBENCH_DATAGEN_JAR --main-class ldbc.finbench.datagen.LdbcDatagen --memory 500g -- --scale-factor 30 --output-dir ${OUTPUT_DIR}
time python3 scripts/run.py --jar $LDBC_FINBENCH_DATAGEN_JAR --main-class ldbc.finbench.datagen.LdbcDatagen --memory 500g -- --scale-factor 1 --output-dir ${OUTPUT_DIR}
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
package ldbc.finbench.datagen.generation.events;

import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import ldbc.finbench.datagen.entities.edges.CompanyInvestCompany;
import ldbc.finbench.datagen.entities.edges.PersonInvestCompany;
import ldbc.finbench.datagen.entities.nodes.Company;
import ldbc.finbench.datagen.entities.nodes.Person;
import ldbc.finbench.datagen.generation.DatagenParams;
import ldbc.finbench.datagen.util.RandomGeneratorFarm;

Expand All @@ -25,11 +22,7 @@ public void resetState(int seed) {
randIndex.setSeed(seed);
}

public void companyInvest(Company investor, Company target) {
CompanyInvestCompany.createCompanyInvestCompany(randomFarm, investor, target);
}

public void companyInvestPartition(List<Company> investors, List<Company> targets) {
public List<Company> companyInvestPartition(List<Company> investors, List<Company> targets) {
Random numInvestorsRand = randomFarm.get(RandomGeneratorFarm.Aspect.NUMS_COMPANY_INVEST);
Random chooseInvestorRand = randomFarm.get(RandomGeneratorFarm.Aspect.CHOOSE_COMPANY_INVESTOR);
for (Company target : targets) {
Expand All @@ -45,6 +38,7 @@ public void companyInvestPartition(List<Company> investors, List<Company> target
CompanyInvestCompany.createCompanyInvestCompany(randomFarm, investor, target);
}
}
return targets;
}

public boolean cannotInvest(Company investor, Company target) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,7 @@ public void resetState(int seed) {
randIndex.setSeed(seed);
}

public void personInvest(Person person, Company target) {
PersonInvestCompany.createPersonInvestCompany(randomFarm, person, target);
}

public void personInvestPartition(List<Person> investors, List<Company> targets) {
public List<Company> personInvestPartition(List<Person> investors, List<Company> targets) {
Random numInvestorsRand = randomFarm.get(RandomGeneratorFarm.Aspect.NUMS_PERSON_INVEST);
Random chooseInvestorRand = randomFarm.get(RandomGeneratorFarm.Aspect.CHOOSE_PERSON_INVESTOR);
for (Company target : targets) {
Expand All @@ -44,6 +40,7 @@ public void personInvestPartition(List<Person> investors, List<Company> targets)
}
System.out.println("[personInvest]: person invest company " + numInvestors + " times");
}
return targets;
}

public boolean cannotInvest(Person investor, Company target) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/ldbc/finbench/datagen/LdbcDatagen.scala
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ object LdbcDatagen extends SparkApp with Logging {
format = args.factorFormat
)
log.info("[Main] Starting factoring stage")
FactorGenerationStage.run(factorArgs)
// FactorGenerationStage.run(factorArgs)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,18 +96,22 @@ class ActivityGenerator()(implicit spark: SparkSession)
)
.mapPartitionsWithIndex { (index, targets) =>
personInvestEvent.resetState(index)
companyInvestEvent.resetState(index)
personInvestEvent
.personInvestPartition(persons.value.asJava, targets.toList.asJava)
.iterator()
.asScala
}
.mapPartitionsWithIndex { (index, targets) =>
companyInvestEvent.resetState(index)
companyInvestEvent
.companyInvestPartition(
companies.value.asJava,
targets.toList.asJava
)
targets.map { target =>
target.scaleInvestmentRatios()
}
.iterator()
.asScala
}
.map(_.scaleInvestmentRatios())
}

def signInEvent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,10 +341,9 @@ class ActivitySerializer(sink: RawSink)(implicit spark: SparkSession)
val futures = Seq(
SparkUI.jobAsync("Write person invest", "Write Person Invest") {
val rawPersonInvestCompany = investedCompaniesRDD.flatMap { c =>
printf(
"[Invest] Company %d, PersonInvestCompanies count: %d\n",
c.getCompanyId,
c.getPersonInvestCompanies.size()
log.info(
"[Invest] Company " + c.getCompanyId + ", PersonInvestCompanies count: " +
c.getPersonInvestCompanies.size()
)
c.getPersonInvestCompanies.asScala.map { pic =>
PersonInvestCompanyRaw(
Expand All @@ -356,6 +355,10 @@ class ActivitySerializer(sink: RawSink)(implicit spark: SparkSession)
)
}
}
log.info(
"[Invest] PersonInvestCompany count: " + rawPersonInvestCompany
.count()
)
spark
.createDataFrame(rawPersonInvestCompany)
.write
Expand All @@ -365,10 +368,9 @@ class ActivitySerializer(sink: RawSink)(implicit spark: SparkSession)
},
SparkUI.jobAsync("Write company invest", "Write Company Invest") {
val rawCompanyInvestCompany = investedCompaniesRDD.flatMap { c =>
printf(
"[Invest] Company %d, CompanyInvestCompanies count: %d\n",
c.getCompanyId,
c.getCompanyInvestCompanies.size()
log.info(
"[Invest] Company " + c.getCompanyId + ", CompanyInvestCompanies count: " +
c.getCompanyInvestCompanies.size()
)
c.getCompanyInvestCompanies.asScala.map { cic =>
CompanyInvestCompanyRaw(
Expand All @@ -380,6 +382,10 @@ class ActivitySerializer(sink: RawSink)(implicit spark: SparkSession)
)
}
}
log.info(
"[Invest] CompanyInvestCompany count: " + rawCompanyInvestCompany
.count()
)
spark
.createDataFrame(rawCompanyInvestCompany)
.write
Expand Down

0 comments on commit 938d68d

Please sign in to comment.