Skip to content

Commit

Permalink
Subscription "hints" (#37)
Browse files Browse the repository at this point in the history
* Add hints to Subscription CRD

* Add Subscription hints and default values

* License select files under Apache 2

* Rename magic template variable "sql" to "pipeline.sql"

* Limit hints to sink resources
  • Loading branch information
ryannedolan authored Jul 26, 2023
1 parent da4bdb0 commit 503ba7c
Show file tree
Hide file tree
Showing 32 changed files with 233 additions and 102 deletions.
2 changes: 1 addition & 1 deletion bin/hoptimator
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#!/bin/sh

kubectl exec -it hoptimator -c hoptimator -- ./hoptimator --isolation=TRANSACTION_NONE -n "" -p "" -u "jdbc:calcite:model=/etc/config/model.yaml" -nn hoptimator "$@"
kubectl exec -it hoptimator -c hoptimator -- ./hoptimator -n "" -p "" -u "jdbc:calcite:model=/etc/config/model.yaml" "$@"
3 changes: 2 additions & 1 deletion deploy/samples/subscriptions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ metadata:
spec:
sql: SELECT "quantity", "product_id" AS KEY FROM INVENTORY."products_on_hand"
database: RAWKAFKA

hints:
numPartitions: "2"
5 changes: 5 additions & 0 deletions deploy/subscriptions.crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ spec:
database:
description: The database in which to create the output/sink table.
type: string
hints:
description: Hints to adapters, which may disregard them.
type: object
additionalProperties:
type: string
required:
- sql
- database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,11 @@ public Set<String> keys() {

/** Render this Resource using the given TemplateFactory */
public String render(TemplateFactory templateFactory) {
return templateFactory.get(this).render(this);
try {
return templateFactory.get(this).render(this);
} catch (Exception e) {
throw new RuntimeException("Error rendering " + template, e);
}
}

public String render(Template template) {
Expand Down Expand Up @@ -148,65 +152,66 @@ public interface Environment {
Environment EMPTY = new SimpleEnvironment();
Environment PROCESS = new ProcessEnvironment();

String get(String key);
String getOrDefault(String key, String defaultValue);
}

/** Basic Environment implementation */
public static class SimpleEnvironment implements Environment {
private final Map<String, String> vars;

public SimpleEnvironment(Map<String, String> vars) {
this.vars = vars;
}
private final Map<String, String> vars = new HashMap<>();

public SimpleEnvironment() {
this(new HashMap<>());
}

public void export(String property, String value) {
protected void export(String property, String value) {
vars.put(property, value);
}

public SimpleEnvironment(Properties properties) {
this.vars = new HashMap<>();
for (Map.Entry<Object, Object> entry : properties.entrySet()) {
this.vars.put(entry.getKey().toString(), entry.getValue().toString());
}
protected void exportAll(Map<String, String> properties) {
vars.putAll(properties);
}

public SimpleEnvironment with(String key, String value) {
Map<String, String> newVars = new HashMap<>();
newVars.putAll(vars);
newVars.put(key, value);
return new SimpleEnvironment(newVars);
return new SimpleEnvironment(){{
exportAll(newVars);
}};
}

@Override
public String get(String key) {
if (!vars.containsKey(key)) {
public String getOrDefault(String key, String defaultValue) {
if (defaultValue == null && !vars.containsKey(key)) {
throw new IllegalArgumentException("No variable '" + key + "' found in the environment");
}
return vars.get(key);
return vars.getOrDefault(key, defaultValue);
}
}

/** Returns "{{key}}" for any key */
/** Returns "{{key}}" for any key without a default */
public static class DummyEnvironment implements Environment {
@Override
public String get(String key) {
return "{{" + key + "}}";
public String getOrDefault(String key, String defaultValue) {
if (defaultValue != null) {
return defaultValue;
} else {
return "{{" + key + "}}";
}
}
}

/** Provides access to the process's environment variables */
public static class ProcessEnvironment implements Environment {

@Override
public String get(String key) {
public String getOrDefault(String key, String defaultValue) {
String value = System.getenv(key);
if (value == null) {
value = System.getProperty(key);
}
if (value == null) {
value = defaultValue;
}
if (value == null) {
throw new IllegalArgumentException("Missing system property `" + key + "`");
}
Expand All @@ -222,11 +227,12 @@ public interface Template {
/**
* Replaces `{{var}}` in a template file with the corresponding variable.
*
* Resource-scoped variables take precedence over Environment-scoped variables.
* Resource-scoped variables take precedence over Environment-scoped
* variables. Default values can supplied with `{{var:default}}`.
*
* If `var` contains multiple lines, the behavior depends on context; specifically,
* whether the pattern appears within a list or comment (prefixed with `-` or `#`).
* For example, if the template includes:
* If `var` contains multiple lines, the behavior depends on context;
* specifically, whether the pattern appears within a list or comment
* (prefixed with `-` or `#`). For example, if the template includes:
*
* - {{var}}
*
Expand All @@ -235,8 +241,8 @@ public interface Template {
* - value line 1
* - value line 2
*
* To avoid this behavior (and just get a multiline string), use one of YAML's multiline
* markers, e.g.
* To avoid this behavior (and just get a multiline string), use one of
* YAML's multiline markers, e.g.
*
* - |
* {{var}}
Expand All @@ -255,17 +261,18 @@ public SimpleTemplate(Environment env, String template) {
@Override
public String render(Resource resource) {
StringBuffer sb = new StringBuffer();
Pattern p = Pattern.compile("([\\s\\-\\#]*)\\{\\{\\s*([\\w_\\-\\.]+)\\s*\\}\\}");
Pattern p = Pattern.compile("([\\s\\-\\#]*)\\{\\{\\s*([\\w_\\-\\.]+)\\s*(:([\\w_\\-\\.]+))?\\s*\\}\\}");
Matcher m = p.matcher(template);
while (m.find()) {
String prefix = m.group(1);
if (prefix == null) {
prefix = "";
}
String key = m.group(2);
String value = resource.getOrDefault(key, () -> env.get(key));
String defaultValue = m.group(4);
String value = resource.getOrDefault(key, () -> env.getOrDefault(key, defaultValue));
if (value == null) {
throw new IllegalArgumentException("No value for key " + key);
throw new IllegalArgumentException(template + " has no value for key " + key + ".");
}
String quotedPrefix = Matcher.quoteReplacement(prefix);
String quotedValue = Matcher.quoteReplacement(value);
Expand Down
3 changes: 2 additions & 1 deletion hoptimator-cli/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ java \
--add-opens java.base/java.util=ALL-UNNAMED \
--add-opens java.base/java.time=ALL-UNNAMED \
-classpath "/opt/plugins/*/lib/*:./hoptimator-cli-all.jar" \
-Dorg.slf4j.simpleLogger.defaultLogLevel=error \
$JAVA_OPTS \
com.linkedin.hoptimator.HoptimatorCliApp --verbose=true -nn hoptimator "$@"
com.linkedin.hoptimator.HoptimatorCliApp --verbose=true -nn hoptimator --isolation=TRANSACTION_NONE "$@"
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
namespace: {{namespace}}
name: {{name}}-flink-job
namespace: {{pipeline.namespace}}
name: {{pipeline.name}}-flink-job
spec:
image: docker.io/library/hoptimator-flink-runner
imagePullPolicy: Never
Expand All @@ -21,7 +21,7 @@ spec:
job:
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
args:
- {{sql}}
- {{pipeline.sql}}
jarURI: local:///opt/hoptimator-flink-runner.jar
parallelism: 1
upgradeMode: stateless
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,9 @@
import java.util.Map;

class KafkaTopic extends Resource {
public KafkaTopic(String name, Integer numPartitions,
Map<String, String> clientOverrides) {
public KafkaTopic(String topicName, Map<String, String> clientOverrides) {
super("KafkaTopic");
export("name", name);
export("numPartitions", Optional.ofNullable(numPartitions)
.map(x -> Integer.toString(x)).orElse("null"));
export("topicName", topicName);
export("clientOverrides", clientOverrides);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,12 @@ public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> o
};
ConfigProvider topicConfigProvider = ConfigProvider.from(clientConfig);
TableResolver resolver = x -> rowType.rel();
Integer numPartitions = (Integer) operand.get("numPartitions");

ResourceProvider resources = ResourceProvider.empty()
.with(x -> new KafkaTopic(x, numPartitions, topicConfigProvider.config(x)))
.with(x -> new KafkaTopic(x, topicConfigProvider.config(x)))
.readWith(x -> new KafkaTopicAcl(x, principal, "Read"))
.writeWith(x -> new KafkaTopicAcl(x, principal, "Write"));

Database database = new Database(name, tableLister, resolver, connectorConfigProvider,
resources);
return new DatabaseSchema(database);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
apiVersion: hoptimator.linkedin.com/v1alpha1
kind: KafkaTopic
metadata:
name: {{name}}
namespace: {{namespace}}
name: {{topicName}}
namespace: {{pipeline.namespace}}
spec:
topicName: {{name}}
numPartitions: {{numPartitions}}
topicName: {{topicName}}
numPartitions: {{numPartitions:null}}
clientOverrides:
{{clientOverrides}}
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
apiVersion: hoptimator.linkedin.com/v1alpha1
kind: Acl
metadata:
name: {{name}}-acl-{{id}}
namespace: {{namespace}}
name: {{topicName}}-acl-{{id}}
namespace: {{pipeline.namespace}}
spec:
resource:
kind: KafkaTopic
name: {{name}}
name: {{topicName}}
method: {{method}}
principal: {{principal}}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* Access control rule (colloquially, an Acl)
*/
@ApiModel(description = "Access control rule (colloquially, an Acl)")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
public class V1alpha1Acl implements io.kubernetes.client.common.KubernetesObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* AclList is a list of Acl
*/
@ApiModel(description = "AclList is a list of Acl")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
public class V1alpha1AclList implements io.kubernetes.client.common.KubernetesListObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* A set of related ACL rules.
*/
@ApiModel(description = "A set of related ACL rules.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
public class V1alpha1AclSpec {
/**
* The resource access method.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* The resource being controlled.
*/
@ApiModel(description = "The resource being controlled.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
public class V1alpha1AclSpecResource {
public static final String SERIALIZED_NAME_KIND = "kind";
@SerializedName(SERIALIZED_NAME_KIND)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* Status, as set by the operator.
*/
@ApiModel(description = "Status, as set by the operator.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
public class V1alpha1AclStatus {
public static final String SERIALIZED_NAME_MESSAGE = "message";
@SerializedName(SERIALIZED_NAME_MESSAGE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* Kafka Topic
*/
@ApiModel(description = "Kafka Topic")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
public class V1alpha1KafkaTopic implements io.kubernetes.client.common.KubernetesObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* KafkaTopicList is a list of KafkaTopic
*/
@ApiModel(description = "KafkaTopicList is a list of KafkaTopic")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
public class V1alpha1KafkaTopicList implements io.kubernetes.client.common.KubernetesListObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
* Desired Kafka topic configuration.
*/
@ApiModel(description = "Desired Kafka topic configuration.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
public class V1alpha1KafkaTopicSpec {
public static final String SERIALIZED_NAME_CLIENT_CONFIGS = "clientConfigs";
@SerializedName(SERIALIZED_NAME_CLIENT_CONFIGS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
/**
* V1alpha1KafkaTopicSpecClientConfigs
*/
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
public class V1alpha1KafkaTopicSpecClientConfigs {
public static final String SERIALIZED_NAME_CONFIG_MAP_REF = "configMapRef";
@SerializedName(SERIALIZED_NAME_CONFIG_MAP_REF)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* Reference to a ConfigMap to use for AdminClient configuration.
*/
@ApiModel(description = "Reference to a ConfigMap to use for AdminClient configuration.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
public class V1alpha1KafkaTopicSpecConfigMapRef {
public static final String SERIALIZED_NAME_NAME = "name";
@SerializedName(SERIALIZED_NAME_NAME)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* Current state of the topic.
*/
@ApiModel(description = "Current state of the topic.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
public class V1alpha1KafkaTopicStatus {
public static final String SERIALIZED_NAME_MESSAGE = "message";
@SerializedName(SERIALIZED_NAME_MESSAGE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* Hoptimator Subscription
*/
@ApiModel(description = "Hoptimator Subscription")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
public class V1alpha1Subscription implements io.kubernetes.client.common.KubernetesObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* SubscriptionList is a list of Subscription
*/
@ApiModel(description = "SubscriptionList is a list of Subscription")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
public class V1alpha1SubscriptionList implements io.kubernetes.client.common.KubernetesListObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Loading

0 comments on commit 503ba7c

Please sign in to comment.