Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add desc for flink sql #100

Merged
merged 1 commit into from
Apr 8, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 137 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Add the dependency to your pom.xml.

## Example

To write data into Nebula Graph using Flink.
To write data into NebulaGraph using Flink.
```
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
NebulaClientOptions nebulaClientOptions = new NebulaClientOptions.NebulaClientOptionsBuilder()
Expand Down Expand Up @@ -63,6 +63,142 @@ DataStream<Row> dataStream = playerSource.map(row -> {
dataStream.addSink(nebulaSinkFunction);
env.execute("write nebula")
```

To read data from NebulaGraph using Flink.
```
NebulaClientOptions nebulaClientOptions = new NebulaClientOptions.NebulaClientOptionsBuilder()
.setMetaAddress("127.0.0.1:9559")
.build();
storageConnectionProvider = new NebulaStorageConnectionProvider(nebulaClientOptions);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

NebulaSourceFunction sourceFunction = new NebulaSourceFunction(storageConnectionProvider)
.setExecutionOptions(vertexExecutionOptions);
DataStreamSource<BaseTableRow> dataStreamSource = env.addSource(sourceFunction);
dataStreamSource.map(row -> {
List<ValueWrapper> values = row.getValues();
Row record = new Row(15);
record.setField(0, values.get(0).asLong());
record.setField(1, values.get(1).asString());
record.setField(2, values.get(2).asString());
record.setField(3, values.get(3).asLong());
record.setField(4, values.get(4).asLong());
record.setField(5, values.get(5).asLong());
record.setField(6, values.get(6).asLong());
record.setField(7, values.get(7).asDate());
record.setField(8, values.get(8).asDateTime().getUTCDateTimeStr());
record.setField(9, values.get(9).asLong());
record.setField(10, values.get(10).asBoolean());
record.setField(11, values.get(11).asDouble());
record.setField(12, values.get(12).asDouble());
record.setField(13, values.get(13).asTime().getUTCTimeStr());
record.setField(14, values.get(14).asGeography());
return record;
}).print();
env.execute("NebulaStreamSource");
```

To operate Schema and data using Flink SQL.

1. create graph space
```
NebulaCatalog nebulaCatalog = NebulaCatalogUtils.createNebulaCatalog(
"NebulaCatalog",
"default",
"root",
"nebula",
"127.0.0.1:9559",
"127.0.0.1:9669");

EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode()
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);

tableEnv.registerCatalog(CATALOG_NAME, nebulaCatalog);
tableEnv.useCatalog(CATALOG_NAME);

String createDataBase = "CREATE DATABASE IF NOT EXISTS `db1`"
+ " COMMENT 'space 1'"
+ " WITH ("
+ " 'partition_num' = '100',"
+ " 'replica_factor' = '3',"
+ " 'vid_type' = 'FIXED_STRING(10)'"
+ ")";
tableEnv.executeSql(createDataBase);
```
2. create tag
```
tableEnvironment.executeSql("CREATE TABLE `person` ("
+ " vid BIGINT,"
+ " col1 STRING,"
+ " col2 STRING,"
+ " col3 BIGINT,"
+ " col4 BIGINT,"
+ " col5 BIGINT,"
+ " col6 BIGINT,"
+ " col7 DATE,"
+ " col8 TIMESTAMP,"
+ " col9 BIGINT,"
+ " col10 BOOLEAN,"
+ " col11 DOUBLE,"
+ " col12 DOUBLE,"
+ " col13 TIME,"
+ " col14 STRING"
+ ") WITH ("
+ " 'connector' = 'nebula',"
+ " 'meta-address' = '127.0.0.1:9559',"
+ " 'graph-address' = '127.0.0.1:9669',"
+ " 'username' = 'root',"
+ " 'password' = 'nebula',"
+ " 'data-type' = 'vertex',"
+ " 'graph-space' = 'flink_test',"
+ " 'label-name' = 'person'"
+ ")"
);
```
3. create edge
```
tableEnvironment.executeSql("CREATE TABLE `friend` ("
+ " sid BIGINT,"
+ " did BIGINT,"
+ " rid BIGINT,"
+ " col1 STRING,"
+ " col2 STRING,"
+ " col3 BIGINT,"
+ " col4 BIGINT,"
+ " col5 BIGINT,"
+ " col6 BIGINT,"
+ " col7 DATE,"
+ " col8 TIMESTAMP,"
+ " col9 BIGINT,"
+ " col10 BOOLEAN,"
+ " col11 DOUBLE,"
+ " col12 DOUBLE,"
+ " col13 TIME,"
+ " col14 STRING"
+ ") WITH ("
+ " 'connector' = 'nebula',"
+ " 'meta-address' = '127.0.0.1:9559',"
+ " 'graph-address' = '127.0.0.1:9669',"
+ " 'username' = 'root',"
+ " 'password' = 'nebula',"
+ " 'graph-space' = 'flink_test',"
+ " 'label-name' = 'friend',"
+ " 'data-type'='edge',"
+ " 'src-id-index'='0',"
+ " 'dst-id-index'='1',"
+ " 'rank-id-index'='2'"
+ ")"
);
```
4. query edge data and insert into another edge type
```
Table table = tableEnvironment.sqlQuery("SELECT * FROM `friend`");
table.executeInsert("`friend_sink`").await();
```

## Version match

There are the version correspondence between Nebula Flink Connector and Nebula:
Expand Down
Loading