From 503ba7ca3828c57454c731b55c7843ffe69b20de Mon Sep 17 00:00:00 2001 From: Ryanne Dolan Date: Wed, 26 Jul 2023 00:10:46 -0500 Subject: [PATCH] Subscription "hints" (#37) * Add hints to Subscription CRD * Add Subscription hints and default values * License select files under Apache 2 * Rename magic template variable "sql" to "pipeline.sql" * Limit hints to sink resources --- bin/hoptimator | 2 +- deploy/samples/subscriptions.yaml | 3 +- deploy/subscriptions.crd.yaml | 5 ++ .../linkedin/hoptimator/catalog/Resource.java | 69 ++++++++++--------- hoptimator-cli/run.sh | 3 +- .../src/main/resources/SqlJob.yaml.template | 6 +- .../hoptimator/catalog/kafka/KafkaTopic.java | 7 +- .../catalog/kafka/RawKafkaSchemaFactory.java | 5 +- .../main/resources/KafkaTopic.yaml.template | 8 +-- .../resources/KafkaTopicAcl.yaml.template | 6 +- .../hoptimator/models/V1alpha1Acl.java | 2 +- .../hoptimator/models/V1alpha1AclList.java | 2 +- .../hoptimator/models/V1alpha1AclSpec.java | 2 +- .../models/V1alpha1AclSpecResource.java | 2 +- .../hoptimator/models/V1alpha1AclStatus.java | 2 +- .../hoptimator/models/V1alpha1KafkaTopic.java | 2 +- .../models/V1alpha1KafkaTopicList.java | 2 +- .../models/V1alpha1KafkaTopicSpec.java | 2 +- .../V1alpha1KafkaTopicSpecClientConfigs.java | 2 +- .../V1alpha1KafkaTopicSpecConfigMapRef.java | 2 +- .../models/V1alpha1KafkaTopicStatus.java | 2 +- .../models/V1alpha1Subscription.java | 2 +- .../models/V1alpha1SubscriptionList.java | 2 +- .../models/V1alpha1SubscriptionSpec.java | 44 +++++++++++- .../models/V1alpha1SubscriptionStatus.java | 2 +- hoptimator-operator/build.gradle | 1 + .../operator/RequestEnvironment.java | 14 ---- .../subscription/SubscriptionEnvironment.java | 50 ++++++++++++++ .../subscription/SubscriptionReconciler.java | 30 ++++++-- .../linkedin/hoptimator/planner/Pipeline.java | 41 +++++++++-- .../hoptimator/planner/PipelineRel.java | 11 +-- .../linkedin/hoptimator/planner/SqlJob.java | 2 +- 32 files changed, 233 insertions(+), 102 deletions(-) delete mode 100644 hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/RequestEnvironment.java create mode 100644 hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionEnvironment.java diff --git a/bin/hoptimator b/bin/hoptimator index dfd34d2..46d4602 100755 --- a/bin/hoptimator +++ b/bin/hoptimator @@ -1,3 +1,3 @@ #!/bin/sh -kubectl exec -it hoptimator -c hoptimator -- ./hoptimator --isolation=TRANSACTION_NONE -n "" -p "" -u "jdbc:calcite:model=/etc/config/model.yaml" -nn hoptimator "$@" +kubectl exec -it hoptimator -c hoptimator -- ./hoptimator -n "" -p "" -u "jdbc:calcite:model=/etc/config/model.yaml" "$@" diff --git a/deploy/samples/subscriptions.yaml b/deploy/samples/subscriptions.yaml index 39cfe83..010ebe9 100644 --- a/deploy/samples/subscriptions.yaml +++ b/deploy/samples/subscriptions.yaml @@ -6,4 +6,5 @@ metadata: spec: sql: SELECT "quantity", "product_id" AS KEY FROM INVENTORY."products_on_hand" database: RAWKAFKA - + hints: + numPartitions: "2" diff --git a/deploy/subscriptions.crd.yaml b/deploy/subscriptions.crd.yaml index 87fcbaf..258f9ce 100644 --- a/deploy/subscriptions.crd.yaml +++ b/deploy/subscriptions.crd.yaml @@ -38,6 +38,11 @@ spec: database: description: The database in which to create the output/sink table. type: string + hints: + description: Hints to adapters, which may disregard them. + type: object + additionalProperties: + type: string required: - sql - database diff --git a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/Resource.java b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/Resource.java index ba18a36..e08fe68 100644 --- a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/Resource.java +++ b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/Resource.java @@ -97,7 +97,11 @@ public Set keys() { /** Render this Resource using the given TemplateFactory */ public String render(TemplateFactory templateFactory) { - return templateFactory.get(this).render(this); + try { + return templateFactory.get(this).render(this); + } catch (Exception e) { + throw new RuntimeException("Error rendering " + template, e); + } } public String render(Template template) { @@ -148,53 +152,51 @@ public interface Environment { Environment EMPTY = new SimpleEnvironment(); Environment PROCESS = new ProcessEnvironment(); - String get(String key); + String getOrDefault(String key, String defaultValue); } /** Basic Environment implementation */ public static class SimpleEnvironment implements Environment { - private final Map vars; - - public SimpleEnvironment(Map vars) { - this.vars = vars; - } + private final Map vars = new HashMap<>(); public SimpleEnvironment() { - this(new HashMap<>()); } - public void export(String property, String value) { + protected void export(String property, String value) { vars.put(property, value); } - public SimpleEnvironment(Properties properties) { - this.vars = new HashMap<>(); - for (Map.Entry entry : properties.entrySet()) { - this.vars.put(entry.getKey().toString(), entry.getValue().toString()); - } + protected void exportAll(Map properties) { + vars.putAll(properties); } public SimpleEnvironment with(String key, String value) { Map newVars = new HashMap<>(); newVars.putAll(vars); newVars.put(key, value); - return new SimpleEnvironment(newVars); + return new SimpleEnvironment(){{ + exportAll(newVars); + }}; } @Override - public String get(String key) { - if (!vars.containsKey(key)) { + public String getOrDefault(String key, String defaultValue) { + if (defaultValue == null && !vars.containsKey(key)) { throw new IllegalArgumentException("No variable '" + key + "' found in the environment"); } - return vars.get(key); + return vars.getOrDefault(key, defaultValue); } } - /** Returns "{{key}}" for any key */ + /** Returns "{{key}}" for any key without a default */ public static class DummyEnvironment implements Environment { @Override - public String get(String key) { - return "{{" + key + "}}"; + public String getOrDefault(String key, String defaultValue) { + if (defaultValue != null) { + return defaultValue; + } else { + return "{{" + key + "}}"; + } } } @@ -202,11 +204,14 @@ public String get(String key) { public static class ProcessEnvironment implements Environment { @Override - public String get(String key) { + public String getOrDefault(String key, String defaultValue) { String value = System.getenv(key); if (value == null) { value = System.getProperty(key); } + if (value == null) { + value = defaultValue; + } if (value == null) { throw new IllegalArgumentException("Missing system property `" + key + "`"); } @@ -222,11 +227,12 @@ public interface Template { /** * Replaces `{{var}}` in a template file with the corresponding variable. * - * Resource-scoped variables take precedence over Environment-scoped variables. + * Resource-scoped variables take precedence over Environment-scoped + * variables. Default values can supplied with `{{var:default}}`. * - * If `var` contains multiple lines, the behavior depends on context; specifically, - * whether the pattern appears within a list or comment (prefixed with `-` or `#`). - * For example, if the template includes: + * If `var` contains multiple lines, the behavior depends on context; + * specifically, whether the pattern appears within a list or comment + * (prefixed with `-` or `#`). For example, if the template includes: * * - {{var}} * @@ -235,8 +241,8 @@ public interface Template { * - value line 1 * - value line 2 * - * To avoid this behavior (and just get a multiline string), use one of YAML's multiline - * markers, e.g. + * To avoid this behavior (and just get a multiline string), use one of + * YAML's multiline markers, e.g. * * - | * {{var}} @@ -255,7 +261,7 @@ public SimpleTemplate(Environment env, String template) { @Override public String render(Resource resource) { StringBuffer sb = new StringBuffer(); - Pattern p = Pattern.compile("([\\s\\-\\#]*)\\{\\{\\s*([\\w_\\-\\.]+)\\s*\\}\\}"); + Pattern p = Pattern.compile("([\\s\\-\\#]*)\\{\\{\\s*([\\w_\\-\\.]+)\\s*(:([\\w_\\-\\.]+))?\\s*\\}\\}"); Matcher m = p.matcher(template); while (m.find()) { String prefix = m.group(1); @@ -263,9 +269,10 @@ public String render(Resource resource) { prefix = ""; } String key = m.group(2); - String value = resource.getOrDefault(key, () -> env.get(key)); + String defaultValue = m.group(4); + String value = resource.getOrDefault(key, () -> env.getOrDefault(key, defaultValue)); if (value == null) { - throw new IllegalArgumentException("No value for key " + key); + throw new IllegalArgumentException(template + " has no value for key " + key + "."); } String quotedPrefix = Matcher.quoteReplacement(prefix); String quotedValue = Matcher.quoteReplacement(value); diff --git a/hoptimator-cli/run.sh b/hoptimator-cli/run.sh index 3c26943..a056a78 100755 --- a/hoptimator-cli/run.sh +++ b/hoptimator-cli/run.sh @@ -5,5 +5,6 @@ java \ --add-opens java.base/java.util=ALL-UNNAMED \ --add-opens java.base/java.time=ALL-UNNAMED \ -classpath "/opt/plugins/*/lib/*:./hoptimator-cli-all.jar" \ + -Dorg.slf4j.simpleLogger.defaultLogLevel=error \ $JAVA_OPTS \ - com.linkedin.hoptimator.HoptimatorCliApp --verbose=true -nn hoptimator "$@" + com.linkedin.hoptimator.HoptimatorCliApp --verbose=true -nn hoptimator --isolation=TRANSACTION_NONE "$@" diff --git a/hoptimator-flink-adapter/src/main/resources/SqlJob.yaml.template b/hoptimator-flink-adapter/src/main/resources/SqlJob.yaml.template index 06c5aaf..d2621e8 100644 --- a/hoptimator-flink-adapter/src/main/resources/SqlJob.yaml.template +++ b/hoptimator-flink-adapter/src/main/resources/SqlJob.yaml.template @@ -1,8 +1,8 @@ apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: - namespace: {{namespace}} - name: {{name}}-flink-job + namespace: {{pipeline.namespace}} + name: {{pipeline.name}}-flink-job spec: image: docker.io/library/hoptimator-flink-runner imagePullPolicy: Never @@ -21,7 +21,7 @@ spec: job: entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner args: - - {{sql}} + - {{pipeline.sql}} jarURI: local:///opt/hoptimator-flink-runner.jar parallelism: 1 upgradeMode: stateless diff --git a/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/KafkaTopic.java b/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/KafkaTopic.java index c980690..e91ac07 100644 --- a/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/KafkaTopic.java +++ b/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/KafkaTopic.java @@ -7,12 +7,9 @@ import java.util.Map; class KafkaTopic extends Resource { - public KafkaTopic(String name, Integer numPartitions, - Map clientOverrides) { + public KafkaTopic(String topicName, Map clientOverrides) { super("KafkaTopic"); - export("name", name); - export("numPartitions", Optional.ofNullable(numPartitions) - .map(x -> Integer.toString(x)).orElse("null")); + export("topicName", topicName); export("clientOverrides", clientOverrides); } } diff --git a/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/RawKafkaSchemaFactory.java b/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/RawKafkaSchemaFactory.java index 7b96599..7b02cba 100644 --- a/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/RawKafkaSchemaFactory.java +++ b/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/RawKafkaSchemaFactory.java @@ -44,11 +44,12 @@ public Schema create(SchemaPlus parentSchema, String name, Map o }; ConfigProvider topicConfigProvider = ConfigProvider.from(clientConfig); TableResolver resolver = x -> rowType.rel(); - Integer numPartitions = (Integer) operand.get("numPartitions"); + ResourceProvider resources = ResourceProvider.empty() - .with(x -> new KafkaTopic(x, numPartitions, topicConfigProvider.config(x))) + .with(x -> new KafkaTopic(x, topicConfigProvider.config(x))) .readWith(x -> new KafkaTopicAcl(x, principal, "Read")) .writeWith(x -> new KafkaTopicAcl(x, principal, "Write")); + Database database = new Database(name, tableLister, resolver, connectorConfigProvider, resources); return new DatabaseSchema(database); diff --git a/hoptimator-kafka-adapter/src/main/resources/KafkaTopic.yaml.template b/hoptimator-kafka-adapter/src/main/resources/KafkaTopic.yaml.template index 8b4df55..0e8b35e 100644 --- a/hoptimator-kafka-adapter/src/main/resources/KafkaTopic.yaml.template +++ b/hoptimator-kafka-adapter/src/main/resources/KafkaTopic.yaml.template @@ -1,10 +1,10 @@ apiVersion: hoptimator.linkedin.com/v1alpha1 kind: KafkaTopic metadata: - name: {{name}} - namespace: {{namespace}} + name: {{topicName}} + namespace: {{pipeline.namespace}} spec: - topicName: {{name}} - numPartitions: {{numPartitions}} + topicName: {{topicName}} + numPartitions: {{numPartitions:null}} clientOverrides: {{clientOverrides}} diff --git a/hoptimator-kafka-adapter/src/main/resources/KafkaTopicAcl.yaml.template b/hoptimator-kafka-adapter/src/main/resources/KafkaTopicAcl.yaml.template index 1e31162..9d1297b 100644 --- a/hoptimator-kafka-adapter/src/main/resources/KafkaTopicAcl.yaml.template +++ b/hoptimator-kafka-adapter/src/main/resources/KafkaTopicAcl.yaml.template @@ -1,11 +1,11 @@ apiVersion: hoptimator.linkedin.com/v1alpha1 kind: Acl metadata: - name: {{name}}-acl-{{id}} - namespace: {{namespace}} + name: {{topicName}}-acl-{{id}} + namespace: {{pipeline.namespace}} spec: resource: kind: KafkaTopic - name: {{name}} + name: {{topicName}} method: {{method}} principal: {{principal}} diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Acl.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Acl.java index 78c2c25..3e86c93 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Acl.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Acl.java @@ -31,7 +31,7 @@ * Access control rule (colloquially, an Acl) */ @ApiModel(description = "Access control rule (colloquially, an Acl)") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]") public class V1alpha1Acl implements io.kubernetes.client.common.KubernetesObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclList.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclList.java index 24178fc..0165ef5 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclList.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclList.java @@ -32,7 +32,7 @@ * AclList is a list of Acl */ @ApiModel(description = "AclList is a list of Acl") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]") public class V1alpha1AclList implements io.kubernetes.client.common.KubernetesListObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpec.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpec.java index b873b09..cf872bb 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpec.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpec.java @@ -29,7 +29,7 @@ * A set of related ACL rules. */ @ApiModel(description = "A set of related ACL rules.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]") public class V1alpha1AclSpec { /** * The resource access method. diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpecResource.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpecResource.java index 430e70a..b14592a 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpecResource.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpecResource.java @@ -28,7 +28,7 @@ * The resource being controlled. */ @ApiModel(description = "The resource being controlled.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]") public class V1alpha1AclSpecResource { public static final String SERIALIZED_NAME_KIND = "kind"; @SerializedName(SERIALIZED_NAME_KIND) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclStatus.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclStatus.java index 9b38615..aa7b6ac 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclStatus.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclStatus.java @@ -28,7 +28,7 @@ * Status, as set by the operator. */ @ApiModel(description = "Status, as set by the operator.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]") public class V1alpha1AclStatus { public static final String SERIALIZED_NAME_MESSAGE = "message"; @SerializedName(SERIALIZED_NAME_MESSAGE) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopic.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopic.java index 6838048..8422cbd 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopic.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopic.java @@ -31,7 +31,7 @@ * Kafka Topic */ @ApiModel(description = "Kafka Topic") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]") public class V1alpha1KafkaTopic implements io.kubernetes.client.common.KubernetesObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicList.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicList.java index f4429ac..5847d75 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicList.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicList.java @@ -32,7 +32,7 @@ * KafkaTopicList is a list of KafkaTopic */ @ApiModel(description = "KafkaTopicList is a list of KafkaTopic") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]") public class V1alpha1KafkaTopicList implements io.kubernetes.client.common.KubernetesListObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpec.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpec.java index 8a3588e..d1baaa7 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpec.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpec.java @@ -33,7 +33,7 @@ * Desired Kafka topic configuration. */ @ApiModel(description = "Desired Kafka topic configuration.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]") public class V1alpha1KafkaTopicSpec { public static final String SERIALIZED_NAME_CLIENT_CONFIGS = "clientConfigs"; @SerializedName(SERIALIZED_NAME_CLIENT_CONFIGS) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecClientConfigs.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecClientConfigs.java index 27536ba..1a4f08a 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecClientConfigs.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecClientConfigs.java @@ -28,7 +28,7 @@ /** * V1alpha1KafkaTopicSpecClientConfigs */ -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]") public class V1alpha1KafkaTopicSpecClientConfigs { public static final String SERIALIZED_NAME_CONFIG_MAP_REF = "configMapRef"; @SerializedName(SERIALIZED_NAME_CONFIG_MAP_REF) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecConfigMapRef.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecConfigMapRef.java index 5e999aa..f6e312a 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecConfigMapRef.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecConfigMapRef.java @@ -28,7 +28,7 @@ * Reference to a ConfigMap to use for AdminClient configuration. */ @ApiModel(description = "Reference to a ConfigMap to use for AdminClient configuration.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]") public class V1alpha1KafkaTopicSpecConfigMapRef { public static final String SERIALIZED_NAME_NAME = "name"; @SerializedName(SERIALIZED_NAME_NAME) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicStatus.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicStatus.java index 4cc046d..d4ff69f 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicStatus.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicStatus.java @@ -28,7 +28,7 @@ * Current state of the topic. */ @ApiModel(description = "Current state of the topic.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]") public class V1alpha1KafkaTopicStatus { public static final String SERIALIZED_NAME_MESSAGE = "message"; @SerializedName(SERIALIZED_NAME_MESSAGE) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Subscription.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Subscription.java index 013d6f6..00fb8f1 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Subscription.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Subscription.java @@ -31,7 +31,7 @@ * Hoptimator Subscription */ @ApiModel(description = "Hoptimator Subscription") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]") public class V1alpha1Subscription implements io.kubernetes.client.common.KubernetesObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionList.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionList.java index 85ca00b..7d97385 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionList.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionList.java @@ -32,7 +32,7 @@ * SubscriptionList is a list of Subscription */ @ApiModel(description = "SubscriptionList is a list of Subscription") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]") public class V1alpha1SubscriptionList implements io.kubernetes.client.common.KubernetesListObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionSpec.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionSpec.java index 807b842..1145306 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionSpec.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionSpec.java @@ -23,17 +23,24 @@ import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; /** * Subscription spec */ @ApiModel(description = "Subscription spec") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]") public class V1alpha1SubscriptionSpec { public static final String SERIALIZED_NAME_DATABASE = "database"; @SerializedName(SERIALIZED_NAME_DATABASE) private String database; + public static final String SERIALIZED_NAME_HINTS = "hints"; + @SerializedName(SERIALIZED_NAME_HINTS) + private Map hints = null; + public static final String SERIALIZED_NAME_SQL = "sql"; @SerializedName(SERIALIZED_NAME_SQL) private String sql; @@ -61,6 +68,37 @@ public void setDatabase(String database) { } + public V1alpha1SubscriptionSpec hints(Map hints) { + + this.hints = hints; + return this; + } + + public V1alpha1SubscriptionSpec putHintsItem(String key, String hintsItem) { + if (this.hints == null) { + this.hints = new HashMap<>(); + } + this.hints.put(key, hintsItem); + return this; + } + + /** + * Hints to adapters, which may disregard them. + * @return hints + **/ + @javax.annotation.Nullable + @ApiModelProperty(value = "Hints to adapters, which may disregard them.") + + public Map getHints() { + return hints; + } + + + public void setHints(Map hints) { + this.hints = hints; + } + + public V1alpha1SubscriptionSpec sql(String sql) { this.sql = sql; @@ -93,12 +131,13 @@ public boolean equals(Object o) { } V1alpha1SubscriptionSpec v1alpha1SubscriptionSpec = (V1alpha1SubscriptionSpec) o; return Objects.equals(this.database, v1alpha1SubscriptionSpec.database) && + Objects.equals(this.hints, v1alpha1SubscriptionSpec.hints) && Objects.equals(this.sql, v1alpha1SubscriptionSpec.sql); } @Override public int hashCode() { - return Objects.hash(database, sql); + return Objects.hash(database, hints, sql); } @@ -107,6 +146,7 @@ public String toString() { StringBuilder sb = new StringBuilder(); sb.append("class V1alpha1SubscriptionSpec {\n"); sb.append(" database: ").append(toIndentedString(database)).append("\n"); + sb.append(" hints: ").append(toIndentedString(hints)).append("\n"); sb.append(" sql: ").append(toIndentedString(sql)).append("\n"); sb.append("}"); return sb.toString(); diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionStatus.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionStatus.java index 0d2355f..9a62fca 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionStatus.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionStatus.java @@ -30,7 +30,7 @@ * Filled in by the operator. */ @ApiModel(description = "Filled in by the operator.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]") public class V1alpha1SubscriptionStatus { public static final String SERIALIZED_NAME_MESSAGE = "message"; @SerializedName(SERIALIZED_NAME_MESSAGE) diff --git a/hoptimator-operator/build.gradle b/hoptimator-operator/build.gradle index 3767b77..320ff64 100644 --- a/hoptimator-operator/build.gradle +++ b/hoptimator-operator/build.gradle @@ -26,6 +26,7 @@ dependencies { implementation libs.kubernetesExtendedClient implementation libs.slf4jApi implementation libs.commonsCli + implementation libs.avro testImplementation libs.junit testImplementation libs.assertj diff --git a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/RequestEnvironment.java b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/RequestEnvironment.java deleted file mode 100644 index 76be695..0000000 --- a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/RequestEnvironment.java +++ /dev/null @@ -1,14 +0,0 @@ -package com.linkedin.hoptimator.operator; - -import io.kubernetes.client.extended.controller.reconciler.Request; - -import com.linkedin.hoptimator.catalog.Resource; - -/** Exposes variables to resource templates */ -public class RequestEnvironment extends Resource.SimpleEnvironment { - - public RequestEnvironment(Request request) { - export("namespace", request.getNamespace()); - export("name", request.getName()); - } -} diff --git a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionEnvironment.java b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionEnvironment.java new file mode 100644 index 0000000..f47aeaf --- /dev/null +++ b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionEnvironment.java @@ -0,0 +1,50 @@ +package com.linkedin.hoptimator.operator.subscription; + +import io.kubernetes.client.extended.controller.reconciler.Request; + +import com.linkedin.hoptimator.catalog.AvroConverter; +import com.linkedin.hoptimator.catalog.Resource; +import com.linkedin.hoptimator.planner.Pipeline; + +import java.util.Map; +import java.util.Collections; + +/** + * Exposes Subscription variables to resource templates. + * + * Variables have a `pipeline.` prefix (even though they come from the + * Subscription object), because the planner is unaware of Subscriptions. + * For example, the CLI constructs pipelines without any corresponding + * Subscription object. In future, we may have additional K8s objects + * that result in pipelines. + * + * The exported variables include: + * + * - `pipeline.namespace`, the K8s namespace where the pipeline should be + * deployed. This is a recommendation -- templates may elect to ignore it. + * - `pipeline.name`, a unique name for the pipeline. Templates can use this + * as a basis for deriving K8s object names, Kafka topic names, etc. The + * name is guaranteed to be a valid K8s object name, e.g. `my-subscription`. + * - `pipeline.avroSchema`, an Avro schema for the pipeline's output type. + * + * In addition, any "hints" in the Subscription object (`.spec.hints`) are + * exported as-is. These can be used to provide optional properties to + * templates. When using such hints in a template, ensure that you provide a + * default value, e.g. `{{numPartitions:null}``, since they will usually be + * missing. + */ +public class SubscriptionEnvironment extends Resource.SimpleEnvironment { + + public SubscriptionEnvironment(String namespace, String name, Pipeline pipeline, + Map hints) { + exportAll(hints); + export("pipeline.namespace", namespace); + export("pipeline.name", name); + export("pipeline.avroSchema", AvroConverter.avro("com.linkedin.hoptimator", "OutputRecord", + pipeline.outputType()).toString(false)); + } + + public SubscriptionEnvironment(String namespace, String name, Pipeline pipeline) { + this(namespace, name, pipeline, Collections.emptyMap()); + } +} diff --git a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java index 9e8d731..c2271d4 100644 --- a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java +++ b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java @@ -7,7 +7,6 @@ import com.linkedin.hoptimator.models.V1alpha1SubscriptionStatus; import com.linkedin.hoptimator.operator.Operator; import com.linkedin.hoptimator.operator.ConfigAssembler; -import com.linkedin.hoptimator.operator.RequestEnvironment; import com.linkedin.hoptimator.planner.HoptimatorPlanner; import com.linkedin.hoptimator.planner.Pipeline; import com.linkedin.hoptimator.planner.PipelineRel; @@ -55,8 +54,6 @@ public Result reconcile(Request request) { log.info("Reconciling request {}", request); String name = request.getName(); String namespace = request.getNamespace(); - RequestEnvironment env = new RequestEnvironment(request); - Resource.TemplateFactory templateFactory = new Resource.SimpleTemplateFactory(env); Result result = new Result(true, operator.pendingRetryDuration()); try { @@ -87,9 +84,32 @@ public Result reconcile(Request request) { log.info("Planning a new pipeline for {}/{} with SQL `{}`...", kind, name, object.getSpec().getSql()); Pipeline pipeline = pipeline(object); - status.setResources(pipeline.resources().stream() + SubscriptionEnvironment env = new SubscriptionEnvironment(namespace, name, pipeline); + SubscriptionEnvironment sinkEnv = new SubscriptionEnvironment(namespace, name, pipeline, + object.getSpec().getHints()); + Resource.TemplateFactory templateFactory = new Resource.SimpleTemplateFactory(env); + Resource.TemplateFactory sinkTemplateFactory = new Resource.SimpleTemplateFactory(sinkEnv); + + // Render resources related to all source tables. + List upstreamResources = pipeline.upstreamResources().stream() .map(x -> x.render(templateFactory)) - .collect(Collectors.toList())); + .collect(Collectors.toList()); + + // Render the SQL job + String sqlJob = pipeline.sqlJob().render(templateFactory); + + // Render resources related to the sink table. For these resources, we pass along any + // "hints" as part of the environment. + List downstreamResources = pipeline.downstreamResources().stream() + .map(x -> x.render(sinkTemplateFactory)) + .collect(Collectors.toList()); + + List combined = new ArrayList<>(); + combined.addAll(upstreamResources); + combined.add(sqlJob); + combined.addAll(downstreamResources); + + status.setResources(combined); status.setSql(object.getSpec().getSql()); status.setReady(null); // null indicates that pipeline needs to be deployed diff --git a/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/Pipeline.java b/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/Pipeline.java index c1bb9c1..9082b4b 100644 --- a/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/Pipeline.java +++ b/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/Pipeline.java @@ -3,6 +3,7 @@ import org.apache.calcite.rel.type.RelDataType; import com.linkedin.hoptimator.catalog.Resource; +import com.linkedin.hoptimator.catalog.ResourceProvider; import java.util.Collection; import java.util.List; @@ -13,11 +14,16 @@ * */ public class Pipeline { - private final Collection resources; + private final Collection upstreamResources; + private final Collection downstreamResources; + private final SqlJob sqlJob; private final RelDataType outputType; - public Pipeline(Collection resources, RelDataType outputType) { - this.resources = resources; + public Pipeline(Collection upstreamResources, SqlJob sqlJob, + Collection downstreamResources, RelDataType outputType) { + this.upstreamResources = upstreamResources; + this.sqlJob = sqlJob; + this.downstreamResources = downstreamResources; this.outputType = outputType; } @@ -25,14 +31,37 @@ public RelDataType outputType() { return outputType; } + /** The SQL job which writes to the sink. */ + public SqlJob sqlJob() { + return sqlJob; + } + + /** Resources upstream of the SQL job, corresponding to all sources. */ + public Collection upstreamResources() { + return upstreamResources; + } + + /** Resources downstream of the SQL job, corresponding to the sink. */ + public Collection downstreamResources() { + return downstreamResources; + } + + /** All Resources in the pipeline, including the SQL job and sink. */ public Collection resources() { - return resources; + // We re-use ResourceProvider here for its source->sink relationships + ResourceProvider resourceProvider = ResourceProvider + .from(upstreamResources) + .to(sqlJob) + .toAll(x -> downstreamResources); + + // All resources are now "provided", so we can pass null here: + return resourceProvider.resources(null); } /** Render all resources as one big YAML stream */ public String render(Resource.TemplateFactory templateFactory) { StringBuilder sb = new StringBuilder(); - for (Resource resource : resources) { + for (Resource resource : resources()) { sb.append(templateFactory.get(resource).render(resource)); sb.append("\n---\n"); // yaml resource separator } @@ -43,7 +72,7 @@ public String render(Resource.TemplateFactory templateFactory) { public String mermaid() { StringBuilder sb = new StringBuilder(); sb.append("flowchart\n"); - Map> grouped = resources.stream() + Map> grouped = resources().stream() .collect(Collectors.groupingBy(x -> x.template())); grouped.forEach((k, v) -> { sb.append(" subgraph " + k + "\n"); diff --git a/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/PipelineRel.java b/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/PipelineRel.java index 5c9039d..1ce2cb6 100644 --- a/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/PipelineRel.java +++ b/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/PipelineRel.java @@ -94,15 +94,8 @@ public Pipeline pipeline(HopTable sink) { /** Combine SQL and any Resources into a Pipeline */ public Pipeline pipeline(HopTable sink, SqlDialect sqlDialect) { - // We re-use ResourceProvider here for its source->sink relationships - ResourceProvider resourceProvider = ResourceProvider.from(resources) - .to(new SqlJob(insertInto(sink).sql(sqlDialect))) - .toAll(x -> sink.writeResources()); - - // All resources are now "provided", so we can pass null here: - Collection resourcesAndJob = resourceProvider.resources(null); - - return new Pipeline(resourcesAndJob, rowType()); + SqlJob sqlJob = new SqlJob(insertInto(sink).sql(sqlDialect)); + return new Pipeline(resources, sqlJob, sink.writeResources(), rowType()); } } } diff --git a/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/SqlJob.java b/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/SqlJob.java index 54cbe89..fcd9124 100644 --- a/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/SqlJob.java +++ b/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/SqlJob.java @@ -16,6 +16,6 @@ public class SqlJob extends Resource { public SqlJob(String sql) { super("SqlJob"); - export("sql", sql); + export("pipeline.sql", sql); } }