Skip to content

Commit

Permalink
merge person activities and company activities (#101)
Browse files Browse the repository at this point in the history
  • Loading branch information
qishipengqsp authored Sep 19, 2024
1 parent 9cd63ff commit 2fbf67a
Show file tree
Hide file tree
Showing 22 changed files with 301 additions and 451 deletions.
5 changes: 4 additions & 1 deletion scripts/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ def run_local(
**({'spark.shuffle.spill.compress': 'true'}),
**({'spark.serializer': 'org.apache.spark.serializer.KryoSerializer'}),
**({'spark.executor.extraJavaOptions': '-XX:+UseG1GC'}),
**({'spark.driver.maxResultSize': '5g'}),
**({'spark.driver.maxResultSize': '0'}),
**({'spark.memory.offHeap.enabled': 'true'}),
**({'spark.memory.offHeap.size': '100g'}),
**({'spark.storage.memoryFraction': '0'}),
# **({'spark.driver.extraJavaOptions': '-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005'}), # Debug
# **({'spark.executor.extraJavaOptions': '-verbose:gc -XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps'}),
**spark_conf
Expand Down
15 changes: 8 additions & 7 deletions scripts/run_cluster.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,24 @@ echo "start: " `date`
# --conf "spark.dynamicAllocation.enabled=true" \
# --conf "spark.dynamicAllocation.minExecutors=1" \
# --conf "spark.dynamicAllocation.maxExecutors=10" \

# --conf "spark.yarn.maximizeResourceAllocation=true" \
# --conf "spark.memory.offHeap.enabled=true" \
# --conf "spark.memory.offHeap.size=100g" \
time spark-submit --master spark://finbench-large-00:7077 \
--class ldbc.finbench.datagen.LdbcDatagen \
--num-executors 2 \
--conf "spark.default.parallelism=640" \
--conf "spark.default.parallelism=800" \
--conf "spark.network.timeout=100000" \
--conf "spark.shuffle.compress=true" \
--conf "spark.shuffle.spill.compress=true" \
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
--conf "spark.driver.memory=200g" \
--conf "spark.driver.maxResultSize=5g" \
--conf "spark.executor.memory=300g" \
--conf "spark.executor.memoryOverheadFactor=0.2" \
--conf "spark.driver.memory=100g" \
--conf "spark.driver.maxResultSize=0" \
--conf "spark.executor.memory=400g" \
--conf "spark.executor.memoryOverheadFactor=0.5" \
--conf "spark.executor.extraJavaOptions=-XX:+UseG1GC" \
${LDBC_FINBENCH_DATAGEN_JAR} \
--scale-factor 30 \
--scale-factor 100 \
--output-dir ${OUTPUT_DIR}

echo "End: " `date`
2 changes: 1 addition & 1 deletion scripts/run_local.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@ time spark-submit --master local[*] \
--conf "spark.executor.extraJavaOptions=-XX:+UseG1GC" \
${LDBC_FINBENCH_DATAGEN_JAR} \
--scale-factor 10 \
--output-dir ${OUTPUT_DIR}
--output-dir ${OUTPUT_DIR}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public static void createCompanyGuaranteeCompany(RandomGeneratorFarm farm, Compa
toCompany, creationDate, 0, false,
"business associate", comment);
fromCompany.getGuaranteeSrc().add(companyGuaranteeCompany);
toCompany.getGuaranteeDst().add(companyGuaranteeCompany);
}

public long getFromCompanyId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public static void createPersonGuaranteePerson(RandomGeneratorFarm farm, Person
PersonGuaranteePerson personGuaranteePerson =
new PersonGuaranteePerson(fromPerson, toPerson, creationDate, 0, false, relation, comment);
fromPerson.getGuaranteeSrc().add(personGuaranteePerson);
toPerson.getGuaranteeDst().add(personGuaranteePerson);
}

public long getFromPersonId() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package ldbc.finbench.datagen.generation.events;

import java.io.Serializable;
import java.util.List;
import java.util.Random;
import ldbc.finbench.datagen.entities.edges.CompanyApplyLoan;
import ldbc.finbench.datagen.entities.edges.CompanyGuaranteeCompany;
import ldbc.finbench.datagen.entities.edges.CompanyOwnAccount;
import ldbc.finbench.datagen.entities.nodes.Account;
import ldbc.finbench.datagen.entities.nodes.Company;
import ldbc.finbench.datagen.entities.nodes.Loan;
import ldbc.finbench.datagen.generation.DatagenParams;
import ldbc.finbench.datagen.generation.dictionary.Dictionaries;
import ldbc.finbench.datagen.generation.generators.AccountGenerator;
import ldbc.finbench.datagen.generation.generators.LoanGenerator;
import ldbc.finbench.datagen.util.RandomGeneratorFarm;

public class CompanyActivitiesEvent implements Serializable {
private final RandomGeneratorFarm randomFarm;
private final Random randIndex;

public CompanyActivitiesEvent() {
randomFarm = new RandomGeneratorFarm();
randIndex = new Random(DatagenParams.defaultSeed);
}

private void resetState(int seed) {
randomFarm.resetRandomGenerators(seed);
randIndex.setSeed(seed);
}

public List<Company> companyActivities(List<Company> companies, AccountGenerator accountGenerator,
LoanGenerator loanGenerator, int blockId) {
resetState(blockId);
accountGenerator.resetState(blockId);

Random numAccRand = randomFarm.get(RandomGeneratorFarm.Aspect.NUM_ACCOUNTS_PER_COMPANY);

Random pickCompanyGuaRand = randomFarm.get(RandomGeneratorFarm.Aspect.PICK_COMPANY_GUARANTEE);
Random numGuaranteesRand = randomFarm.get(RandomGeneratorFarm.Aspect.NUM_GUARANTEES_PER_COMPANY);

Random pickCompanyLoanRand = randomFarm.get(RandomGeneratorFarm.Aspect.PICK_COMPANY_FOR_LOAN);
Random numLoansRand = randomFarm.get(RandomGeneratorFarm.Aspect.NUM_LOANS_PER_COMPANY);
Random dateRand = randomFarm.get(RandomGeneratorFarm.Aspect.COMPANY_APPLY_LOAN_DATE);

for (Company from : companies) {
// register accounts
int numAccounts = numAccRand.nextInt(DatagenParams.maxAccountsPerOwner);
for (int i = 0; i < Math.max(1, numAccounts); i++) {
Account account = accountGenerator.generateAccount(from.getCreationDate(), "company", blockId);
CompanyOwnAccount.createCompanyOwnAccount(randomFarm, from, account, account.getCreationDate());
}
// guarantee other companies
if (pickCompanyGuaRand.nextDouble() < DatagenParams.companyGuaranteeFraction) {
int numGuarantees = numGuaranteesRand.nextInt(DatagenParams.maxTargetsToGuarantee);
for (int i = 0; i < Math.max(1, numGuarantees); i++) {
Company to = companies.get(randIndex.nextInt(companies.size()));
if (from.canGuarantee(to)) {
CompanyGuaranteeCompany.createCompanyGuaranteeCompany(randomFarm, from, to);
}
}
}
// apply loans
if (pickCompanyLoanRand.nextDouble() < DatagenParams.companyLoanFraction) {
int numLoans = numLoansRand.nextInt(DatagenParams.maxLoans);
for (int i = 0; i < Math.max(1, numLoans); i++) {
long applyDate = Dictionaries.dates.randomCompanyToLoanDate(dateRand, from);
Loan to = loanGenerator.generateLoan(applyDate, "company", blockId);
CompanyApplyLoan.createCompanyApplyLoan(randomFarm, applyDate, from, to);
}
}
}

return companies;
}
}

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package ldbc.finbench.datagen.generation.events;

import java.io.Serializable;
import java.util.List;
import java.util.Random;
import ldbc.finbench.datagen.entities.edges.PersonApplyLoan;
import ldbc.finbench.datagen.entities.edges.PersonGuaranteePerson;
import ldbc.finbench.datagen.entities.edges.PersonOwnAccount;
import ldbc.finbench.datagen.entities.nodes.Account;
import ldbc.finbench.datagen.entities.nodes.Loan;
import ldbc.finbench.datagen.entities.nodes.Person;
import ldbc.finbench.datagen.generation.DatagenParams;
import ldbc.finbench.datagen.generation.dictionary.Dictionaries;
import ldbc.finbench.datagen.generation.generators.AccountGenerator;
import ldbc.finbench.datagen.generation.generators.LoanGenerator;
import ldbc.finbench.datagen.util.RandomGeneratorFarm;

public class PersonActivitiesEvent implements Serializable {
private final RandomGeneratorFarm randomFarm;
private final Random randIndex;

public PersonActivitiesEvent() {
randomFarm = new RandomGeneratorFarm();
randIndex = new Random(DatagenParams.defaultSeed);
}

private void resetState(int seed) {
randomFarm.resetRandomGenerators(seed);
randIndex.setSeed(seed);
}

// Generate accounts, guarantees, and loans for persons
public List<Person> personActivities(List<Person> persons, AccountGenerator accountGenerator,
LoanGenerator loanGenerator, int blockId) {
resetState(blockId);
accountGenerator.resetState(blockId);

Random numAccRand = randomFarm.get(RandomGeneratorFarm.Aspect.NUM_ACCOUNTS_PER_PERSON);

Random pickPersonGuaRand = randomFarm.get(RandomGeneratorFarm.Aspect.PICK_PERSON_GUARANTEE);
Random numGuaranteesRand = randomFarm.get(RandomGeneratorFarm.Aspect.NUM_GUARANTEES_PER_PERSON);

Random pickPersonLoanRand = randomFarm.get(RandomGeneratorFarm.Aspect.PICK_PERSON_LOAN);
Random numLoansRand = randomFarm.get(RandomGeneratorFarm.Aspect.NUM_LOANS_PER_PERSON);
Random dateRand = randomFarm.get(RandomGeneratorFarm.Aspect.PERSON_APPLY_LOAN_DATE);

for (Person from : persons) {
// register accounts
int numAccounts = numAccRand.nextInt(DatagenParams.maxAccountsPerOwner);
for (int i = 0; i < Math.max(1, numAccounts); i++) {
Account to = accountGenerator.generateAccount(from.getCreationDate(), "person", blockId);
PersonOwnAccount.createPersonOwnAccount(randomFarm, from, to, to.getCreationDate());
}
// guarantee other persons
if (pickPersonGuaRand.nextDouble() < DatagenParams.personGuaranteeFraction) {
int numGuarantees = numGuaranteesRand.nextInt(DatagenParams.maxTargetsToGuarantee);
for (int i = 0; i < Math.max(1, numGuarantees); i++) {
Person to = persons.get(randIndex.nextInt(persons.size()));
if (from.canGuarantee(to)) {
PersonGuaranteePerson.createPersonGuaranteePerson(randomFarm, from, to);
}
}
}
// apply loans
if (pickPersonLoanRand.nextDouble() < DatagenParams.personLoanFraction) {
int numLoans = numLoansRand.nextInt(DatagenParams.maxLoans);
for (int i = 0; i < Math.max(1, numLoans); i++) {
long applyDate = Dictionaries.dates.randomPersonToLoanDate(dateRand, from);
Loan to = loanGenerator.generateLoan(applyDate, "person", blockId);
PersonApplyLoan.createPersonApplyLoan(randomFarm, applyDate, from, to);
}
}
}

return persons;
}
}
Loading

0 comments on commit 2fbf67a

Please sign in to comment.