Skip to content

Commit

Permalink
[Fix #467] Adding schema validation
Browse files Browse the repository at this point in the history
Signed-off-by: Francisco Javier Tirado Sarti <ftirados@redhat.com>
  • Loading branch information
fjtirado committed Nov 15, 2024
1 parent 4e4f3a0 commit d6e39dd
Show file tree
Hide file tree
Showing 23 changed files with 719 additions and 105 deletions.
6 changes: 5 additions & 1 deletion impl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<artifactId>serverlessworkflow-impl</artifactId>
<properties>
<version.org.glassfish.jersey>3.1.9</version.org.glassfish.jersey>
<version.net.thisptr>1.0.1</version.net.thisptr>
<version.net.thisptr>1.1.0</version.net.thisptr>
</properties>
<dependencies>
<dependency>
Expand All @@ -26,6 +26,10 @@
<artifactId>jersey-media-json-jackson</artifactId>
<version>${version.org.glassfish.jersey}</version>
</dependency>
<dependency>
<groupId>com.networknt</groupId>
<artifactId>json-schema-validator</artifactId>
</dependency>
<dependency>
<groupId>net.thisptr</groupId>
<artifactId>jackson-jq</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,44 +15,77 @@
*/
package io.serverlessworkflow.impl;

import static io.serverlessworkflow.impl.WorkflowUtils.*;
import static io.serverlessworkflow.impl.json.JsonUtils.*;

import com.fasterxml.jackson.databind.JsonNode;
import io.serverlessworkflow.api.types.Input;
import io.serverlessworkflow.api.types.Output;
import io.serverlessworkflow.api.types.TaskBase;
import io.serverlessworkflow.api.types.TaskItem;
import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.impl.executors.DefaultTaskExecutorFactory;
import io.serverlessworkflow.impl.executors.TaskExecutor;
import io.serverlessworkflow.impl.executors.TaskExecutorFactory;
import io.serverlessworkflow.impl.expressions.ExpressionFactory;
import io.serverlessworkflow.impl.expressions.JQExpressionFactory;
import io.serverlessworkflow.impl.json.JsonUtils;
import io.serverlessworkflow.impl.jsonschema.DefaultSchemaValidatorFactory;
import io.serverlessworkflow.impl.jsonschema.SchemaValidator;
import io.serverlessworkflow.impl.jsonschema.SchemaValidatorFactory;
import io.serverlessworkflow.resources.DefaultResourceLoader;
import io.serverlessworkflow.resources.ResourceLoader;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

public class WorkflowDefinition {

private WorkflowDefinition(
Workflow workflow,
TaskExecutorFactory taskFactory,
Collection<WorkflowExecutionListener> listeners) {
Collection<WorkflowExecutionListener> listeners,
WorkflowFactories factories) {
this.workflow = workflow;
this.taskFactory = taskFactory;
this.listeners = listeners;
this.factories = factories;
if (workflow.getInput() != null) {
Input input = workflow.getInput();
this.inputSchemaValidator =
getSchemaValidator(
factories.getValidatorFactory(), schemaToNode(factories, input.getSchema()));
this.inputFilter = buildWorkflowFilter(factories.getExpressionFactory(), input.getFrom());
}
if (workflow.getOutput() != null) {
Output output = workflow.getOutput();
this.outputSchemaValidator =
getSchemaValidator(
factories.getValidatorFactory(), schemaToNode(factories, output.getSchema()));
this.outputFilter = buildWorkflowFilter(factories.getExpressionFactory(), output.getAs());
}
}

private final Workflow workflow;
private final Collection<WorkflowExecutionListener> listeners;
private final TaskExecutorFactory taskFactory;
private final WorkflowFactories factories;
private Optional<SchemaValidator> inputSchemaValidator = Optional.empty();
private Optional<SchemaValidator> outputSchemaValidator = Optional.empty();
private Optional<WorkflowFilter> inputFilter = Optional.empty();
private Optional<WorkflowFilter> outputFilter = Optional.empty();

private final Map<String, TaskExecutor<? extends TaskBase>> taskExecutors =
new ConcurrentHashMap<>();

public static class Builder {
private final Workflow workflow;
private TaskExecutorFactory taskFactory = DefaultTaskExecutorFactory.get();
private ExpressionFactory exprFactory = JQExpressionFactory.get();
private Collection<WorkflowExecutionListener> listeners;
private ResourceLoader resourceLoader = DefaultResourceLoader.get();
private SchemaValidatorFactory schemaValidatorFactory = DefaultSchemaValidatorFactory.get();

private Builder(Workflow workflow) {
this.workflow = workflow;
Expand All @@ -71,13 +104,28 @@ public Builder withTaskExecutorFactory(TaskExecutorFactory factory) {
return this;
}

public Builder withExpressionFactory(ExpressionFactory factory) {
this.exprFactory = factory;
return this;
}

public Builder withResourceLoader(ResourceLoader resourceLoader) {
this.resourceLoader = resourceLoader;
return this;
}

public Builder withSchemaValidatorFactory(SchemaValidatorFactory factory) {
this.schemaValidatorFactory = factory;
return this;
}

public WorkflowDefinition build() {
return new WorkflowDefinition(
workflow,
taskFactory,
listeners == null
? Collections.emptySet()
: Collections.unmodifiableCollection(listeners));
: Collections.unmodifiableCollection(listeners),
new WorkflowFactories(taskFactory, resourceLoader, exprFactory, schemaValidatorFactory));
}
}

Expand All @@ -86,7 +134,7 @@ public static Builder builder(Workflow workflow) {
}

public WorkflowInstance execute(Object input) {
return new WorkflowInstance(taskFactory, JsonUtils.fromValue(input));
return new WorkflowInstance(JsonUtils.fromValue(input));
}

enum State {
Expand All @@ -101,11 +149,15 @@ public class WorkflowInstance {
private State state;
private WorkflowContext context;

private WorkflowInstance(TaskExecutorFactory factory, JsonNode input) {
private WorkflowInstance(JsonNode input) {
this.output = input;
this.state = State.STARTED;
inputSchemaValidator.ifPresent(v -> v.validate(input));
this.context = WorkflowContext.builder(input).build();
inputFilter.ifPresent(f -> output = f.apply(context, Optional.empty(), output));
this.state = State.STARTED;
processDo(workflow.getDo());
outputFilter.ifPresent(f -> output = f.apply(context, Optional.empty(), output));
outputSchemaValidator.ifPresent(v -> v.validate(output));
}

private void processDo(List<TaskItem> tasks) {
Expand All @@ -118,7 +170,7 @@ private void processDo(List<TaskItem> tasks) {
taskExecutors
.computeIfAbsent(
context.position().jsonPointer(),
k -> taskFactory.getTaskExecutor(task.getTask()))
k -> factories.getTaskFactory().getTaskExecutor(task.getTask(), factories))
.apply(context, output);
listeners.forEach(l -> l.onTaskEnded(context.position(), task.getTask()));
context.position().back().back();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl;

import io.serverlessworkflow.impl.executors.TaskExecutorFactory;
import io.serverlessworkflow.impl.expressions.ExpressionFactory;
import io.serverlessworkflow.impl.jsonschema.SchemaValidatorFactory;
import io.serverlessworkflow.resources.ResourceLoader;

public class WorkflowFactories {

private final TaskExecutorFactory taskFactory;
private final ResourceLoader resourceLoader;
private final ExpressionFactory expressionFactory;
private final SchemaValidatorFactory validatorFactory;

public WorkflowFactories(
TaskExecutorFactory taskFactory,
ResourceLoader loaderFactory,
ExpressionFactory expressionFactory,
SchemaValidatorFactory validatorFactory) {
this.taskFactory = taskFactory;
this.resourceLoader = loaderFactory;
this.expressionFactory = expressionFactory;
this.validatorFactory = validatorFactory;
}

public TaskExecutorFactory getTaskFactory() {
return taskFactory;
}

public ResourceLoader getResourceLoader() {
return resourceLoader;
}

public ExpressionFactory getExpressionFactory() {
return expressionFactory;
}

public SchemaValidatorFactory getValidatorFactory() {
return validatorFactory;
}
}
24 changes: 24 additions & 0 deletions impl/src/main/java/io/serverlessworkflow/impl/WorkflowFilter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl;

import com.fasterxml.jackson.databind.JsonNode;
import java.util.Optional;

@FunctionalInterface
public interface WorkflowFilter {
JsonNode apply(WorkflowContext workflow, Optional<TaskContext<?>> task, JsonNode node);
}
99 changes: 99 additions & 0 deletions impl/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl;

import com.fasterxml.jackson.databind.JsonNode;
import io.serverlessworkflow.api.types.ExportAs;
import io.serverlessworkflow.api.types.InputFrom;
import io.serverlessworkflow.api.types.OutputAs;
import io.serverlessworkflow.api.types.SchemaExternal;
import io.serverlessworkflow.api.types.SchemaInline;
import io.serverlessworkflow.api.types.SchemaUnion;
import io.serverlessworkflow.impl.expressions.Expression;
import io.serverlessworkflow.impl.expressions.ExpressionFactory;
import io.serverlessworkflow.impl.expressions.ExpressionUtils;
import io.serverlessworkflow.impl.json.JsonUtils;
import io.serverlessworkflow.impl.jsonschema.SchemaValidator;
import io.serverlessworkflow.impl.jsonschema.SchemaValidatorFactory;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.util.Map;
import java.util.Optional;

public class WorkflowUtils {

private WorkflowUtils() {}

public static Optional<SchemaValidator> getSchemaValidator(
SchemaValidatorFactory validatorFactory, Optional<JsonNode> node) {
return node.map(n -> validatorFactory.getValidator(n));
}

public static Optional<JsonNode> schemaToNode(WorkflowFactories factories, SchemaUnion schema) {
if (schema != null) {
if (schema.getSchemaInline() != null) {
SchemaInline inline = schema.getSchemaInline();
return Optional.of(JsonUtils.mapper().convertValue(inline.getDocument(), JsonNode.class));
} else if (schema.getSchemaExternal() != null) {
SchemaExternal external = schema.getSchemaExternal();
try (InputStream in =
factories.getResourceLoader().loadStatic(external.getResource()).open()) {
return Optional.of(JsonUtils.mapper().readTree(in));
} catch (IOException io) {
throw new UncheckedIOException(io);
}
}
}
return Optional.empty();
}

public static Optional<WorkflowFilter> buildWorkflowFilter(
ExpressionFactory exprFactory, InputFrom from) {
return from != null
? Optional.of(buildWorkflowFilter(exprFactory, from.getString(), from.getObject()))
: Optional.empty();
}

public static Optional<WorkflowFilter> buildWorkflowFilter(
ExpressionFactory exprFactory, OutputAs as) {
return as != null
? Optional.of(buildWorkflowFilter(exprFactory, as.getString(), as.getObject()))
: Optional.empty();
}

public static Optional<WorkflowFilter> buildWorkflowFilter(
ExpressionFactory exprFactory, ExportAs as) {
return as != null
? Optional.of(buildWorkflowFilter(exprFactory, as.getString(), as.getObject()))
: Optional.empty();
}

private static WorkflowFilter buildWorkflowFilter(
ExpressionFactory exprFactory, String str, Object object) {
if (str != null) {
Expression expression = exprFactory.getExpression(str);
return expression::eval;
} else {
Object exprObj = ExpressionUtils.buildExpressionObject(object, exprFactory);
return exprObj instanceof Map
? (w, t, n) ->
JsonUtils.fromValue(
ExpressionUtils.evaluateExpressionMap((Map<String, Object>) exprObj, w, t, n))
: (w, t, n) -> JsonUtils.fromValue(object);
}
}
}
Loading

0 comments on commit d6e39dd

Please sign in to comment.