diff --git a/README.md b/README.md index f374b0b..bec6446 100644 --- a/README.md +++ b/README.md @@ -31,7 +31,7 @@ Add the dependency to your pom.xml. ## Example -To write data into Nebula Graph using Flink. +To write data into NebulaGraph using Flink. ``` StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); NebulaClientOptions nebulaClientOptions = new NebulaClientOptions.NebulaClientOptionsBuilder() @@ -63,6 +63,142 @@ DataStream dataStream = playerSource.map(row -> { dataStream.addSink(nebulaSinkFunction); env.execute("write nebula") ``` + +To read data from NebulaGraph using Flink. +``` + NebulaClientOptions nebulaClientOptions = new NebulaClientOptions.NebulaClientOptionsBuilder() + .setMetaAddress("127.0.0.1:9559") + .build(); + storageConnectionProvider = new NebulaStorageConnectionProvider(nebulaClientOptions); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + NebulaSourceFunction sourceFunction = new NebulaSourceFunction(storageConnectionProvider) + .setExecutionOptions(vertexExecutionOptions); + DataStreamSource dataStreamSource = env.addSource(sourceFunction); + dataStreamSource.map(row -> { + List values = row.getValues(); + Row record = new Row(15); + record.setField(0, values.get(0).asLong()); + record.setField(1, values.get(1).asString()); + record.setField(2, values.get(2).asString()); + record.setField(3, values.get(3).asLong()); + record.setField(4, values.get(4).asLong()); + record.setField(5, values.get(5).asLong()); + record.setField(6, values.get(6).asLong()); + record.setField(7, values.get(7).asDate()); + record.setField(8, values.get(8).asDateTime().getUTCDateTimeStr()); + record.setField(9, values.get(9).asLong()); + record.setField(10, values.get(10).asBoolean()); + record.setField(11, values.get(11).asDouble()); + record.setField(12, values.get(12).asDouble()); + record.setField(13, values.get(13).asTime().getUTCTimeStr()); + record.setField(14, values.get(14).asGeography()); + return record; + }).print(); + env.execute("NebulaStreamSource"); +``` + +To operate Schema and data using Flink SQL. + +1. create graph space +``` + NebulaCatalog nebulaCatalog = NebulaCatalogUtils.createNebulaCatalog( + "NebulaCatalog", + "default", + "root", + "nebula", + "127.0.0.1:9559", + "127.0.0.1:9669"); + + EnvironmentSettings settings = EnvironmentSettings.newInstance() + .inStreamingMode() + .build(); + TableEnvironment tableEnv = TableEnvironment.create(settings); + + tableEnv.registerCatalog(CATALOG_NAME, nebulaCatalog); + tableEnv.useCatalog(CATALOG_NAME); + + String createDataBase = "CREATE DATABASE IF NOT EXISTS `db1`" + + " COMMENT 'space 1'" + + " WITH (" + + " 'partition_num' = '100'," + + " 'replica_factor' = '3'," + + " 'vid_type' = 'FIXED_STRING(10)'" + + ")"; + tableEnv.executeSql(createDataBase); +``` +2. create tag +``` + tableEnvironment.executeSql("CREATE TABLE `person` (" + + " vid BIGINT," + + " col1 STRING," + + " col2 STRING," + + " col3 BIGINT," + + " col4 BIGINT," + + " col5 BIGINT," + + " col6 BIGINT," + + " col7 DATE," + + " col8 TIMESTAMP," + + " col9 BIGINT," + + " col10 BOOLEAN," + + " col11 DOUBLE," + + " col12 DOUBLE," + + " col13 TIME," + + " col14 STRING" + + ") WITH (" + + " 'connector' = 'nebula'," + + " 'meta-address' = '127.0.0.1:9559'," + + " 'graph-address' = '127.0.0.1:9669'," + + " 'username' = 'root'," + + " 'password' = 'nebula'," + + " 'data-type' = 'vertex'," + + " 'graph-space' = 'flink_test'," + + " 'label-name' = 'person'" + + ")" + ); +``` +3. create edge +``` + tableEnvironment.executeSql("CREATE TABLE `friend` (" + + " sid BIGINT," + + " did BIGINT," + + " rid BIGINT," + + " col1 STRING," + + " col2 STRING," + + " col3 BIGINT," + + " col4 BIGINT," + + " col5 BIGINT," + + " col6 BIGINT," + + " col7 DATE," + + " col8 TIMESTAMP," + + " col9 BIGINT," + + " col10 BOOLEAN," + + " col11 DOUBLE," + + " col12 DOUBLE," + + " col13 TIME," + + " col14 STRING" + + ") WITH (" + + " 'connector' = 'nebula'," + + " 'meta-address' = '127.0.0.1:9559'," + + " 'graph-address' = '127.0.0.1:9669'," + + " 'username' = 'root'," + + " 'password' = 'nebula'," + + " 'graph-space' = 'flink_test'," + + " 'label-name' = 'friend'," + + " 'data-type'='edge'," + + " 'src-id-index'='0'," + + " 'dst-id-index'='1'," + + " 'rank-id-index'='2'" + + ")" + ); +``` +4. query edge data and insert into another edge type +``` + Table table = tableEnvironment.sqlQuery("SELECT * FROM `friend`"); + table.executeInsert("`friend_sink`").await(); +``` + ## Version match There are the version correspondence between Nebula Flink Connector and Nebula: