We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
我自己应用程序中的代码如下: VertexExecutionOptions executionOptions = new VertexExecutionOptions.ExecutionOptionBuilder() .setGraphSpace(ennVertexConfig.getGraphSpace()) .setTag(ennVertexConfig.getTagName()) .setIdIndex(ennVertexConfig.getIndex())//vid .setFields(ennVertexConfig.getFields()) .setWriteMode(WriteModeEnum.INSERT) .setPositions(ennVertexConfig.getPositions()) .setBatchSize(ennVertexConfig.getBatchSize()) .build(); NebulaVertexBatchOutputFormat outputFormat = new NebulaVertexBatchOutputFormat(graphConnectionProvider, metaConnectionProvider, executionOptions); 从这里我们可以看出来,在创建NebulaVertexBatchOutputFormat的时候就把WriteMode给确定了,但是对于FlinkCDC Source的数据,只有在反序列化成Row格式之后才能知道RowKind的类型,所以当使用 FlinkCDC的MysqSqlSource + NebulaSinkFunction的时候是无法处理删除的数据。 我们从源码中也能看出来,下面是源码: ` @OverRide public void addToBatch(Row record) { NebulaEdge edge = converter.createEdge(record, executionOptions.getPolicy()); if (edge == null) { return; } nebulaEdgeList.add(edge); }
VertexExecutionOptions executionOptions = new VertexExecutionOptions.ExecutionOptionBuilder() .setGraphSpace(ennVertexConfig.getGraphSpace()) .setTag(ennVertexConfig.getTagName()) .setIdIndex(ennVertexConfig.getIndex())//vid .setFields(ennVertexConfig.getFields()) .setWriteMode(WriteModeEnum.INSERT) .setPositions(ennVertexConfig.getPositions()) .setBatchSize(ennVertexConfig.getBatchSize()) .build(); NebulaVertexBatchOutputFormat outputFormat = new NebulaVertexBatchOutputFormat(graphConnectionProvider, metaConnectionProvider, executionOptions);
@OverRide public void executeBatch(Session session) throws IOException { if (isBatchEmpty()) { return; } NebulaEdges nebulaEdges = new NebulaEdges(executionOptions.getLabel(), executionOptions.getFields(), nebulaEdgeList, executionOptions.getPolicy(), executionOptions.getPolicy()); // generate the write ngql statement String statement = null; switch (executionOptions.getWriteMode()) { case INSERT: statement = nebulaEdges.getInsertStatement(); break; case UPDATE: statement = nebulaEdges.getUpdateStatement(); break; case DELETE: statement = nebulaEdges.getDeleteStatement(); break; default: throw new IllegalArgumentException("write mode is not supported"); } executeStatement(session, statement); clearBatch(); } ` 然后我自己测试的结果如下:
The text was updated successfully, but these errors were encountered:
补充一下:FlinkCDC的MysqSqlSource + NebulaSinkFunction的时候是无法处理删除的数据。
Sorry, something went wrong.
No branches or pull requests
我自己应用程序中的代码如下:
VertexExecutionOptions executionOptions = new VertexExecutionOptions.ExecutionOptionBuilder() .setGraphSpace(ennVertexConfig.getGraphSpace()) .setTag(ennVertexConfig.getTagName()) .setIdIndex(ennVertexConfig.getIndex())//vid .setFields(ennVertexConfig.getFields()) .setWriteMode(WriteModeEnum.INSERT) .setPositions(ennVertexConfig.getPositions()) .setBatchSize(ennVertexConfig.getBatchSize()) .build(); NebulaVertexBatchOutputFormat outputFormat = new NebulaVertexBatchOutputFormat(graphConnectionProvider, metaConnectionProvider, executionOptions);
从这里我们可以看出来,在创建NebulaVertexBatchOutputFormat的时候就把WriteMode给确定了,但是对于FlinkCDC Source的数据,只有在反序列化成Row格式之后才能知道RowKind的类型,所以当使用 FlinkCDC的MysqSqlSource + NebulaSinkFunction的时候是无法处理删除的数据。
我们从源码中也能看出来,下面是源码:
`
@OverRide
public void addToBatch(Row record) {
NebulaEdge edge = converter.createEdge(record, executionOptions.getPolicy());
if (edge == null) {
return;
}
nebulaEdgeList.add(edge);
}
@OverRide
public void executeBatch(Session session) throws IOException {
if (isBatchEmpty()) {
return;
}
NebulaEdges nebulaEdges = new NebulaEdges(executionOptions.getLabel(),
executionOptions.getFields(), nebulaEdgeList, executionOptions.getPolicy(),
executionOptions.getPolicy());
// generate the write ngql statement
String statement = null;
switch (executionOptions.getWriteMode()) {
case INSERT:
statement = nebulaEdges.getInsertStatement();
break;
case UPDATE:
statement = nebulaEdges.getUpdateStatement();
break;
case DELETE:
statement = nebulaEdges.getDeleteStatement();
break;
default:
throw new IllegalArgumentException("write mode is not supported");
}
executeStatement(session, statement);
clearBatch();
}
`
然后我自己测试的结果如下:
The text was updated successfully, but these errors were encountered: