Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nebula-flink-connector中NebulaSinkFunction无法识别FlinkCDC的Source中Row的RowKind的类型 #104

Open
fanqiejiang8 opened this issue Sep 13, 2024 · 1 comment

Comments

@fanqiejiang8
Copy link

我自己应用程序中的代码如下:
VertexExecutionOptions executionOptions = new VertexExecutionOptions.ExecutionOptionBuilder() .setGraphSpace(ennVertexConfig.getGraphSpace()) .setTag(ennVertexConfig.getTagName()) .setIdIndex(ennVertexConfig.getIndex())//vid .setFields(ennVertexConfig.getFields()) .setWriteMode(WriteModeEnum.INSERT) .setPositions(ennVertexConfig.getPositions()) .setBatchSize(ennVertexConfig.getBatchSize()) .build(); NebulaVertexBatchOutputFormat outputFormat = new NebulaVertexBatchOutputFormat(graphConnectionProvider, metaConnectionProvider, executionOptions);
从这里我们可以看出来,在创建NebulaVertexBatchOutputFormat的时候就把WriteMode给确定了,但是对于FlinkCDC Source的数据,只有在反序列化成Row格式之后才能知道RowKind的类型,所以当使用 FlinkCDC的MysqSqlSource + NebulaSinkFunction的时候是无法处理删除的数据。
我们从源码中也能看出来,下面是源码:
`
@OverRide
public void addToBatch(Row record) {
NebulaEdge edge = converter.createEdge(record, executionOptions.getPolicy());
if (edge == null) {
return;
}
nebulaEdgeList.add(edge);
}

@OverRide
public void executeBatch(Session session) throws IOException {
if (isBatchEmpty()) {
return;
}
NebulaEdges nebulaEdges = new NebulaEdges(executionOptions.getLabel(),
executionOptions.getFields(), nebulaEdgeList, executionOptions.getPolicy(),
executionOptions.getPolicy());
// generate the write ngql statement
String statement = null;
switch (executionOptions.getWriteMode()) {
case INSERT:
statement = nebulaEdges.getInsertStatement();
break;
case UPDATE:
statement = nebulaEdges.getUpdateStatement();
break;
case DELETE:
statement = nebulaEdges.getDeleteStatement();
break;
default:
throw new IllegalArgumentException("write mode is not supported");
}
executeStatement(session, statement);
clearBatch();
}
`
然后我自己测试的结果如下:

c5228c338112ebd8bcafe95c26ca54e

@fanqiejiang8
Copy link
Author

补充一下:FlinkCDC的MysqSqlSource + NebulaSinkFunction的时候是无法处理删除的数据。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant