Skip to content

Commit

Permalink
Fail the job when validation fails and ms.work.unit.min.units > 0
Browse files Browse the repository at this point in the history
  • Loading branch information
Chris Li committed Oct 25, 2021
1 parent e99da96 commit f09fc39
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,33 @@ public Long getMillis(State state) {
}
};

JsonArrayProperties MSTAGE_WATERMARK = new JsonArrayProperties("ms.watermark");
JsonArrayProperties MSTAGE_WATERMARK = new JsonArrayProperties("ms.watermark") {
@Override
public boolean isValid(State state) {
final List<String> types = Lists.newArrayList("unit", "datetime");
if (super.isValid(state) && !isBlank(state)) {
// Derived fields should meet general JsonArray configuration requirements
// and contain only JsonObject items that each has a "name" element and a "formula" element
JsonArray value = GSON.fromJson(state.getProp(getConfig()), JsonArray.class);
if (value.size() == 0) {
return false;
}
if (!value.get(0).isJsonObject()) {
return false;
}

if (!value.get(0).getAsJsonObject().has(KEY_WORD_NAME) || !value.get(0).getAsJsonObject().has(KEY_WORD_TYPE)) {
return false;
}

String type = value.get(0).getAsJsonObject().get(KEY_WORD_TYPE).getAsString();
if (types.stream().noneMatch(t -> t.equalsIgnoreCase(type))) {
return false;
}
}
return super.isValid(state);
}
};
JsonArrayProperties MSTAGE_WATERMARK_GROUPS = new JsonArrayProperties("ms.watermark.groups");

// default: 0, minimum: 0, maximum: -
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ public List<WorkUnit> getWorkunits(SourceState state) {
jobKeys.logUsage(state);
if (!jobKeys.validate(state)) {
LOG.error("Some parameters are invalid, job will do nothing until they are fixed.");
if (MSTAGE_WORK_UNIT_MIN_UNITS.get(state) > 0) {
throw new RuntimeException(String.format(EXCEPTION_WORK_UNIT_MINIMUM, MSTAGE_WORK_UNIT_MIN_UNITS.get(state), 0));
}
return new ArrayList<>();
}

Expand Down

0 comments on commit f09fc39

Please sign in to comment.