Skip to content

Commit

Permalink
Changes to schema
Browse files Browse the repository at this point in the history
  • Loading branch information
Remper committed Sep 27, 2018
1 parent 5c43fbd commit 67938f2
Show file tree
Hide file tree
Showing 29 changed files with 551 additions and 740 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ private void startPipeline(Path input, Configuration parameters) throws Exceptio
.project(1, 2);

reducedTweets
.groupBy(1).reduce((value1, value2) -> new Tuple2<>(value1.f0, value1.f1 + "\n" + value2.f1))
.groupBy(0).reduce((value1, value2) -> new Tuple2<>(value1.f0, value1.f1 + "\n" + value2.f1))
.output(textOutputFormat).withParameters(parameters);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
Expand All @@ -35,13 +36,15 @@ public class ExtractSocialGraph {
private static final String TWEETS_PATH = "tweets-path";

private OutputFormat<Tuple3<Long, Long, Float>> forwardOutputFormat;
private OutputFormat<Tuple3<Long, String, String>> forwardCondensedPGFormat;
private OutputFormat<Tuple3<Long, Long, Float>> backwardOutputFormat;

private void start(Path input, Path output) throws Exception {
final Configuration parameters = new Configuration();
parameters.setString("db.file", output.getPath());

forwardOutputFormat = new PostgresFileSink<Tuple3<Long, Long, Float>>("forward").testFile(parameters);
forwardCondensedPGFormat = new PostgresFileSink<Tuple3<Long, String, String>>("forward.pg").testFile(parameters);
backwardOutputFormat = new PostgresFileSink<Tuple3<Long, Long, Float>>("backward").testFile(parameters);

startPipeline(input, parameters);
Expand Down Expand Up @@ -75,11 +78,17 @@ private void processGraphAndOutput(DataSet<JsonObject> tweets, GraphEmitter drop
.sum(2)
.filter(new GraphFilter(2));

jointEdges
.groupBy(0)
UnsortedGrouping<Tuple3<Long, Long, Integer>> forwardGrouped = jointEdges
.groupBy(0);

forwardGrouped
.reduceGroup(new EdgeNormalizer())
.output(forwardOutputFormat).withParameters(parameters);

forwardGrouped
.reduceGroup(new CondenseAndNormalizeGraph())
.output(forwardCondensedPGFormat).setParallelism(1).withParameters(parameters);

jointEdges
.groupBy(1)
.reduceGroup(new EdgeNormalizer())
Expand Down Expand Up @@ -125,6 +134,45 @@ public void flatMap(JsonObject status, Collector<Tuple3<Long, Long, Integer>> ou
}
}

private static class CondenseAndNormalizeGraph implements GroupReduceFunction<Tuple3<Long, Long, Integer>, Tuple3<Long, String, String>> {
@Override
public void reduce(Iterable<Tuple3<Long, Long, Integer>> edges, Collector<Tuple3<Long, String, String>> out) throws Exception {
int sum = 0;

LinkedList<Tuple3<Long, Long, Integer>> listedEdges = new LinkedList<>();
for (Tuple3<Long, Long, Integer> edge : edges) {
sum += edge.f2;
listedEdges.add(edge);
}

StringBuilder ids = new StringBuilder();
StringBuilder weights = new StringBuilder();
Long id = null;
for (Tuple3<Long, Long, Integer> edge : listedEdges) {
id = edge.f0;
if (ids.length() > 0) {
ids.append(',');
}
ids.append(edge.f1.toString());

if (weights.length() > 0) {
weights.append(',');
}
weights.append(String.valueOf((float) edge.f2 / sum));

}
if (id == null) {
return;
}

out.collect(new Tuple3<>(
id,
"{"+ids.toString()+"}",
"{"+weights.toString()+"}"
));
}
}

private static class EdgeNormalizer implements GroupReduceFunction<Tuple3<Long, Long, Integer>, Tuple3<Long, Long, Float>> {
@Override
public void reduce(Iterable<Tuple3<Long, Long, Integer>> edges, Collector<Tuple3<Long, Long, Float>> out) throws Exception {
Expand Down
16 changes: 4 additions & 12 deletions alignments/src/main/java/eu/fbk/fm/alignments/index/db/Keys.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,12 @@


import eu.fbk.fm.alignments.index.db.tables.Alignments;
import eu.fbk.fm.alignments.index.db.tables.KbIndex;
import eu.fbk.fm.alignments.index.db.tables.UserIndex;
import eu.fbk.fm.alignments.index.db.tables.UserObjects;
import eu.fbk.fm.alignments.index.db.tables.UserSg;
import eu.fbk.fm.alignments.index.db.tables.UserText;
import eu.fbk.fm.alignments.index.db.tables.UserTextArr;
import eu.fbk.fm.alignments.index.db.tables.records.AlignmentsRecord;
import eu.fbk.fm.alignments.index.db.tables.records.KbIndexRecord;
import eu.fbk.fm.alignments.index.db.tables.records.UserIndexRecord;
import eu.fbk.fm.alignments.index.db.tables.records.UserObjectsRecord;
import eu.fbk.fm.alignments.index.db.tables.records.UserTextArrRecord;
import eu.fbk.fm.alignments.index.db.tables.records.UserSgRecord;
import eu.fbk.fm.alignments.index.db.tables.records.UserTextRecord;

import javax.annotation.Generated;
Expand Down Expand Up @@ -47,11 +43,9 @@ public class Keys {
// -------------------------------------------------------------------------

public static final UniqueKey<AlignmentsRecord> ALIGNMENTS_PKEY = UniqueKeys0.ALIGNMENTS_PKEY;
public static final UniqueKey<KbIndexRecord> KB_INDEX_PKEY = UniqueKeys0.KB_INDEX_PKEY;
public static final UniqueKey<UserIndexRecord> USER_INDEX_PKEY = UniqueKeys0.USER_INDEX_PKEY;
public static final UniqueKey<UserObjectsRecord> USER_OBJECTS_PKEY = UniqueKeys0.USER_OBJECTS_PKEY;
public static final UniqueKey<UserSgRecord> USER_SG_PKEY = UniqueKeys0.USER_SG_PKEY;
public static final UniqueKey<UserTextRecord> USER_TEXT_PKEY = UniqueKeys0.USER_TEXT_PKEY;
public static final UniqueKey<UserTextArrRecord> USER_TEXT_ARR_PKEY = UniqueKeys0.USER_TEXT_ARR_PKEY;

// -------------------------------------------------------------------------
// FOREIGN KEY definitions
Expand All @@ -64,10 +58,8 @@ public class Keys {

private static class UniqueKeys0 extends AbstractKeys {
public static final UniqueKey<AlignmentsRecord> ALIGNMENTS_PKEY = createUniqueKey(Alignments.ALIGNMENTS, "alignments_pkey", Alignments.ALIGNMENTS.RESOURCE_ID, Alignments.ALIGNMENTS.UID);
public static final UniqueKey<KbIndexRecord> KB_INDEX_PKEY = createUniqueKey(KbIndex.KB_INDEX, "kb_index_pkey", KbIndex.KB_INDEX.KBID);
public static final UniqueKey<UserIndexRecord> USER_INDEX_PKEY = createUniqueKey(UserIndex.USER_INDEX, "user_index_pkey", UserIndex.USER_INDEX.FULLNAME, UserIndex.USER_INDEX.UID);
public static final UniqueKey<UserObjectsRecord> USER_OBJECTS_PKEY = createUniqueKey(UserObjects.USER_OBJECTS, "user_objects_pkey", UserObjects.USER_OBJECTS.UID);
public static final UniqueKey<UserSgRecord> USER_SG_PKEY = createUniqueKey(UserSg.USER_SG, "user_sg_pkey", UserSg.USER_SG.UID);
public static final UniqueKey<UserTextRecord> USER_TEXT_PKEY = createUniqueKey(UserText.USER_TEXT, "user_text_pkey", UserText.USER_TEXT.UID);
public static final UniqueKey<UserTextArrRecord> USER_TEXT_ARR_PKEY = createUniqueKey(UserTextArr.USER_TEXT_ARR, "user_text_arr_pkey", UserTextArr.USER_TEXT_ARR.UID);
}
}
32 changes: 16 additions & 16 deletions alignments/src/main/java/eu/fbk/fm/alignments/index/db/Public.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@


import eu.fbk.fm.alignments.index.db.tables.Alignments;
import eu.fbk.fm.alignments.index.db.tables.KbIndex;
import eu.fbk.fm.alignments.index.db.tables.UserIndex;
import eu.fbk.fm.alignments.index.db.tables.UserObjects;
import eu.fbk.fm.alignments.index.db.tables.UserSg;
import eu.fbk.fm.alignments.index.db.tables.UserText;
import eu.fbk.fm.alignments.index.db.tables.UserTextArr;
import eu.fbk.fm.alignments.index.db.udt.Test;

import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -20,6 +19,7 @@

import org.jooq.Catalog;
import org.jooq.Table;
import org.jooq.UDT;
import org.jooq.impl.SchemaImpl;


Expand All @@ -36,7 +36,7 @@
@SuppressWarnings({ "all", "unchecked", "rawtypes" })
public class Public extends SchemaImpl {

private static final long serialVersionUID = 275761985;
private static final long serialVersionUID = 2094861583;

/**
* The reference instance of <code>public</code>
Expand All @@ -48,11 +48,6 @@ public class Public extends SchemaImpl {
*/
public final Alignments ALIGNMENTS = eu.fbk.fm.alignments.index.db.tables.Alignments.ALIGNMENTS;

/**
* The table <code>public.kb_index</code>.
*/
public final KbIndex KB_INDEX = eu.fbk.fm.alignments.index.db.tables.KbIndex.KB_INDEX;

/**
* The table <code>public.user_index</code>.
*/
Expand All @@ -73,11 +68,6 @@ public class Public extends SchemaImpl {
*/
public final UserText USER_TEXT = eu.fbk.fm.alignments.index.db.tables.UserText.USER_TEXT;

/**
* The table <code>public.user_text_arr</code>.
*/
public final UserTextArr USER_TEXT_ARR = eu.fbk.fm.alignments.index.db.tables.UserTextArr.USER_TEXT_ARR;

/**
* No further instances allowed
*/
Expand All @@ -104,11 +94,21 @@ public final List<Table<?>> getTables() {
private final List<Table<?>> getTables0() {
return Arrays.<Table<?>>asList(
Alignments.ALIGNMENTS,
KbIndex.KB_INDEX,
UserIndex.USER_INDEX,
UserObjects.USER_OBJECTS,
UserSg.USER_SG,
UserText.USER_TEXT,
UserTextArr.USER_TEXT_ARR);
UserText.USER_TEXT);
}

@Override
public final List<UDT<?>> getUDTs() {
List result = new ArrayList();
result.addAll(getUDTs0());
return result;
}

private final List<UDT<?>> getUDTs0() {
return Arrays.<UDT<?>>asList(
Test.TEST);
}
}
12 changes: 0 additions & 12 deletions alignments/src/main/java/eu/fbk/fm/alignments/index/db/Tables.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@


import eu.fbk.fm.alignments.index.db.tables.Alignments;
import eu.fbk.fm.alignments.index.db.tables.KbIndex;
import eu.fbk.fm.alignments.index.db.tables.UserIndex;
import eu.fbk.fm.alignments.index.db.tables.UserObjects;
import eu.fbk.fm.alignments.index.db.tables.UserSg;
import eu.fbk.fm.alignments.index.db.tables.UserText;
import eu.fbk.fm.alignments.index.db.tables.UserTextArr;

import javax.annotation.Generated;

Expand All @@ -33,11 +31,6 @@ public class Tables {
*/
public static final Alignments ALIGNMENTS = eu.fbk.fm.alignments.index.db.tables.Alignments.ALIGNMENTS;

/**
* The table <code>public.kb_index</code>.
*/
public static final KbIndex KB_INDEX = eu.fbk.fm.alignments.index.db.tables.KbIndex.KB_INDEX;

/**
* The table <code>public.user_index</code>.
*/
Expand All @@ -57,9 +50,4 @@ public class Tables {
* The table <code>public.user_text</code>.
*/
public static final UserText USER_TEXT = eu.fbk.fm.alignments.index.db.tables.UserText.USER_TEXT;

/**
* The table <code>public.user_text_arr</code>.
*/
public static final UserTextArr USER_TEXT_ARR = eu.fbk.fm.alignments.index.db.tables.UserTextArr.USER_TEXT_ARR;
}
29 changes: 29 additions & 0 deletions alignments/src/main/java/eu/fbk/fm/alignments/index/db/UDTs.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/**
* This class is generated by jOOQ
*/
package eu.fbk.fm.alignments.index.db;


import eu.fbk.fm.alignments.index.db.udt.Test;

import javax.annotation.Generated;


/**
* Convenience access to all UDTs in public
*/
@Generated(
value = {
"http://www.jooq.org",
"jOOQ version:3.8.4"
},
comments = "This class is generated by jOOQ"
)
@SuppressWarnings({ "all", "unchecked", "rawtypes" })
public class UDTs {

/**
* The type <code>public.test</code>
*/
public static Test TEST = eu.fbk.fm.alignments.index.db.udt.Test.TEST;
}
Loading

0 comments on commit 67938f2

Please sign in to comment.