From da4bdb0deb0408406ad8c1bb4bba3b13723a247b Mon Sep 17 00:00:00 2001 From: Ryanne Dolan Date: Fri, 21 Jul 2023 16:56:55 -0500 Subject: [PATCH] Enable adapters to specify read vs write resources (#33) * Add !mermaid command * Add read/write Resources and Kafka ACL controller * Add insert-into syntax to yaml and mermaid commands * Tweak Acl CRD * Fix ACL template * License select files under Apache 2 * Ignore case for CLI commands, h/t @hshukla * Javadocs on public Resources, h/t @hshukla --- Makefile | 1 - deploy/acls.crd.yaml | 105 ++++++++ deploy/dev/kafka.yaml | 52 ++++ deploy/dev/mysql.yaml | 18 ++ deploy/rbac.yaml | 6 +- deploy/samples/acls.yaml | 24 ++ deploy/samples/kafkatopics.yaml | 2 +- etc/integration-tests.sql | 5 +- .../linkedin/hoptimator/catalog/Database.java | 21 +- .../linkedin/hoptimator/catalog/HopTable.java | 31 ++- .../linkedin/hoptimator/catalog/Resource.java | 23 +- .../hoptimator/catalog/ResourceProvider.java | 92 ++++++- .../hoptimator/catalog/TableFactory.java | 12 +- .../linkedin/hoptimator/HoptimatorCliApp.java | 147 ++++++----- hoptimator-cli/src/main/resources/intro.txt | 16 +- .../hoptimator/catalog/kafka/KafkaTopic.java | 2 +- .../catalog/kafka/KafkaTopicAcl.java | 13 + .../catalog/kafka/RawKafkaSchemaFactory.java | 9 +- .../kafka/KafkaControllerProvider.java | 24 +- .../kafka/KafkaTopicAclReconciler.java | 134 ++++++++++ .../main/resources/KafkaTopic.yaml.template | 2 +- .../resources/KafkaTopicAcl.yaml.template | 11 + hoptimator-models/generate-models.sh | 1 + .../hoptimator/models/V1alpha1Acl.java | 219 ++++++++++++++++ .../hoptimator/models/V1alpha1AclList.java | 195 ++++++++++++++ .../hoptimator/models/V1alpha1AclSpec.java | 239 ++++++++++++++++++ ...cRef.java => V1alpha1AclSpecResource.java} | 51 +++- .../hoptimator/models/V1alpha1AclStatus.java | 129 ++++++++++ .../hoptimator/models/V1alpha1KafkaTopic.java | 2 +- .../models/V1alpha1KafkaTopicList.java | 2 +- .../models/V1alpha1KafkaTopicSpec.java | 2 +- .../V1alpha1KafkaTopicSpecClientConfigs.java | 2 +- .../V1alpha1KafkaTopicSpecConfigMapRef.java | 2 +- .../models/V1alpha1KafkaTopicStatus.java | 2 +- .../models/V1alpha1Subscription.java | 2 +- .../models/V1alpha1SubscriptionList.java | 2 +- .../models/V1alpha1SubscriptionSpec.java | 2 +- .../models/V1alpha1SubscriptionStatus.java | 2 +- .../subscription/SubscriptionReconciler.java | 2 +- .../linkedin/hoptimator/planner/Pipeline.java | 1 + .../hoptimator/planner/PipelineRel.java | 5 +- test-model.yaml | 2 +- 42 files changed, 1474 insertions(+), 140 deletions(-) create mode 100644 deploy/acls.crd.yaml create mode 100644 deploy/dev/kafka.yaml create mode 100644 deploy/samples/acls.yaml create mode 100644 hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/KafkaTopicAcl.java create mode 100644 hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/operator/kafka/KafkaTopicAclReconciler.java create mode 100644 hoptimator-kafka-adapter/src/main/resources/KafkaTopicAcl.yaml.template create mode 100644 hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Acl.java create mode 100644 hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclList.java create mode 100644 hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpec.java rename hoptimator-models/src/main/java/com/linkedin/hoptimator/models/{V1alpha1SubscriptionStatusKafkaTopicRef.java => V1alpha1AclSpecResource.java} (60%) create mode 100644 hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclStatus.java diff --git a/Makefile b/Makefile index b1a2ddc..85d937a 100644 --- a/Makefile +++ b/Makefile @@ -30,7 +30,6 @@ deploy-dev-environment: helm upgrade --install --atomic --set webhook.create=false flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator kubectl apply -f "https://strimzi.io/install/latest?namespace=kafka" -n kafka kubectl wait --for=condition=Established=True crds/kafkas.kafka.strimzi.io - kubectl apply -f "https://strimzi.io/examples/latest/kafka/kafka-ephemeral-single.yaml" -n kafka kubectl apply -f ./deploy/dev deploy-samples: deploy diff --git a/deploy/acls.crd.yaml b/deploy/acls.crd.yaml new file mode 100644 index 0000000..0c55ef1 --- /dev/null +++ b/deploy/acls.crd.yaml @@ -0,0 +1,105 @@ +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: acls.hoptimator.linkedin.com +spec: + group: hoptimator.linkedin.com + names: + kind: Acl + listKind: AclList + plural: acls + singular: acl + shortNames: + - acl + preserveUnknownFields: false + scope: Namespaced + versions: + - name: v1alpha1 + served: true + storage: true + schema: + openAPIV3Schema: + description: Access control rule (colloquially, an Acl) + type: object + properties: + apiVersion: + type: string + kind: + type: string + metadata: + type: object + spec: + description: A set of related ACL rules. + type: object + properties: + resource: + description: The resource being controlled. + type: object + properties: + kind: + description: The kind of resource being controlled. + type: string + name: + description: The name of the resource being controlled. + type: string + method: + description: The resource access method. + type: string + enum: + - Alter + - Create + - Delete + - Describe + - Read + - Write + - Post + - Put + - Get + - Head + - Patch + - Trace + - Options + - GetAll + - BatchGet + - BatchCreate + - BatchUpdate + - PartialUpdate + - BatchDelete + - BatchPartialDelete + principal: + description: The principal being allowed access. Format depends on principal type. + type: string + required: + - resource + - method + - principal + status: + description: Status, as set by the operator. + type: object + properties: + ready: + description: Whether the ACL rule has been applied. + type: boolean + message: + description: Human-readable status message. + type: string + subresources: + status: {} + additionalPrinterColumns: + - name: PRINCIPAL + type: string + description: A user, service, group, etc. + jsonPath: .spec.principal + - name: METHOD + type: string + description: A resource access method, e.g. Get. + jsonPath: .spec.method + - name: KIND + type: string + description: The resource being accessed. + jsonPath: .spec.resource.kind + - name: RESOURCE + type: string + description: The resource being accessed. + jsonPath: .spec.resource.name + diff --git a/deploy/dev/kafka.yaml b/deploy/dev/kafka.yaml new file mode 100644 index 0000000..aad9907 --- /dev/null +++ b/deploy/dev/kafka.yaml @@ -0,0 +1,52 @@ +# Copyright (c) 2023, LinkedIn +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Based on examples at: +# https://github.com/strimzi/strimzi-kafka-operator/blob/main/examples/kafka + + +apiVersion: kafka.strimzi.io/v1beta2 +kind: Kafka +metadata: + name: one + namespace: kafka +spec: + kafka: + version: 3.4.0 + replicas: 1 + listeners: + - name: plain + port: 9092 + type: internal + tls: false + - name: tls + port: 9093 + type: internal + tls: true + config: + offsets.topic.replication.factor: 1 + transaction.state.log.replication.factor: 1 + transaction.state.log.min.isr: 1 + default.replication.factor: 1 + min.insync.replicas: 1 + inter.broker.protocol.version: "3.4" + allow.everyone.if.no.acl.found: true + storage: + type: ephemeral + authorization: + type: simple + zookeeper: + replicas: 3 + storage: + type: ephemeral diff --git a/deploy/dev/mysql.yaml b/deploy/dev/mysql.yaml index 1df9e42..06b7f16 100644 --- a/deploy/dev/mysql.yaml +++ b/deploy/dev/mysql.yaml @@ -1,3 +1,21 @@ +# Copyright (c) 2023, LinkedIn +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Based on examples at: +# https://debezium.io/documentation/reference/stable/operations/kubernetes.html + + apiVersion: v1 kind: Service metadata: diff --git a/deploy/rbac.yaml b/deploy/rbac.yaml index 9d097a1..bb3785b 100644 --- a/deploy/rbac.yaml +++ b/deploy/rbac.yaml @@ -5,10 +5,10 @@ metadata: name: hoptimator-operator rules: - apiGroups: ["hoptimator.linkedin.com"] - resources: ["kafkatopics", "subscriptions"] - verbs: ["get", "watch", "list", "create"] + resources: ["acls", "kafkatopics", "subscriptions"] + verbs: ["get", "watch", "list", "update", "create"] - apiGroups: ["hoptimator.linkedin.com"] - resources: ["kafkatopics/status", "subscriptions/status"] + resources: ["kafkatopics/status", "subscriptions/status", "acls/status"] verbs: ["get", "patch"] - apiGroups: ["flink.apache.org"] resources: ["flinkdeployments"] diff --git a/deploy/samples/acls.yaml b/deploy/samples/acls.yaml new file mode 100644 index 0000000..a981a2a --- /dev/null +++ b/deploy/samples/acls.yaml @@ -0,0 +1,24 @@ +apiVersion: hoptimator.linkedin.com/v1alpha1 +kind: Acl +metadata: + name: sample-kafka-acl-1 +spec: + resource: + kind: KafkaTopic + name: test-sink + method: Read + principal: User:ANONYMOUS + +--- + +apiVersion: hoptimator.linkedin.com/v1alpha1 +kind: Acl +metadata: + name: sample-kafka-acl-2 +spec: + resource: + kind: KafkaTopic + name: test-sink + method: Write + principal: User:ANONYMOUS + diff --git a/deploy/samples/kafkatopics.yaml b/deploy/samples/kafkatopics.yaml index 8cffbe7..08de5ed 100644 --- a/deploy/samples/kafkatopics.yaml +++ b/deploy/samples/kafkatopics.yaml @@ -5,5 +5,5 @@ metadata: spec: topicName: test-sink clientOverrides: - bootstrap.servers: my-cluster-kafka-bootstrap.kafka.svc:9092 + bootstrap.servers: one-kafka-bootstrap.kafka.svc:9092 diff --git a/etc/integration-tests.sql b/etc/integration-tests.sql index 1b43225..d107fd8 100644 --- a/etc/integration-tests.sql +++ b/etc/integration-tests.sql @@ -1,7 +1,7 @@ !set maxWidth 80 -!table !schemas +!table -- built-in bounded tables SELECT * FROM DATAGEN.PERSON; @@ -20,3 +20,6 @@ SELECT * FROM RAWKAFKA."products" LIMIT 1; !insert into RAWKAFKA."test-sink" SELECT AGE AS PAYLOAD, NAME AS KEY FROM DATAGEN.PERSON SELECT * FROM RAWKAFKA."test-sink" LIMIT 5; +-- test mermaid and yaml commands +!mermaid insert into RAWKAFKA."test-sink" SELECT AGE AS PAYLOAD, NAME AS KEY FROM DATAGEN.PERSON +!yaml insert into RAWKAFKA."test-sink" SELECT AGE AS PAYLOAD, NAME AS KEY FROM DATAGEN.PERSON diff --git a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/Database.java b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/Database.java index 4c635ee..b1397e5 100644 --- a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/Database.java +++ b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/Database.java @@ -22,23 +22,26 @@ public Database(String name, TableLister tableLister, TableResolver tableResolve } /** Convenience constructor for simple connector-based tables */ - public Database(String name, TableLister tableLister, TableResolver tableResolver, ConfigProvider configProvider) { - this(name, tableLister, tableResolver, TableFactory.connector(configProvider)); + public Database(String name, TableLister lister, TableResolver resolver, + ConfigProvider configs) { + this(name, lister, resolver, TableFactory.connector(configs)); } - /** Convenience constructor for simple connector-based tables */ - public Database(String name, TableLister tableLister, TableResolver tableResolver, ConfigProvider configProvider, ResourceProvider resourceProvider) { - this(name, tableLister, tableResolver, TableFactory.connector(configProvider, resourceProvider)); + /** Convenience constructor for simple connector-based tables with resources */ + public Database(String name, TableLister lister, TableResolver resolver, + ConfigProvider configs, ResourceProvider resources) { + this(name, lister, resolver, TableFactory.connector(configs, resources)); } /** Convenience constructor for a list of connector-based tables */ - public Database(String name, Collection tables, TableResolver tableResolver, ConfigProvider configProvider) { - this(name, tables, tableResolver, TableFactory.connector(configProvider)); + public Database(String name, Collection tables, TableResolver resolver, + ConfigProvider configs) { + this(name, tables, resolver, TableFactory.connector(configs)); } /** Convenience constructor for a static list of tables */ - public Database(String name, Collection tables, TableResolver tableResolver, TableFactory tableFactory) { - this(name, () -> tables, tableResolver, tableFactory); + public Database(String name, Collection tables, TableResolver resolver, TableFactory tableFactory) { + this(name, () -> tables, resolver, tableFactory); } /** Convenience constructor for a static table map */ diff --git a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/HopTable.java b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/HopTable.java index 385e1e1..29a6e3c 100644 --- a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/HopTable.java +++ b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/HopTable.java @@ -27,25 +27,36 @@ public class HopTable extends AbstractTable implements ScriptImplementor, Transl private final String database; private final String name; private final RelDataType rowType; - private final Collection resources; + private final Collection readResources; + private final Collection writeResources; private final ScriptImplementor implementor; - public HopTable(String database, String name, RelDataType rowType, Collection resources, + public HopTable(String database, String name, RelDataType rowType, + Collection readResources, Collection writeResources, ScriptImplementor implementor) { this.database = database; this.name = name; this.rowType = rowType; - this.resources = resources; + this.readResources = readResources; + this.writeResources = writeResources; this.implementor = implementor; } /** Convenience constructor for HopTables that only need a connector config. */ - public HopTable(String database, String name, RelDataType rowType, Collection resources, + public HopTable(String database, String name, RelDataType rowType, + Collection readResources, Collection writeResources, Map connectorConfig) { - this(database, name, rowType, resources, + this(database, name, rowType, readResources, writeResources, new ScriptImplementor.ConnectorImplementor(database, name, rowType, connectorConfig)); } + /** Convenience constructor for HopTables that only need a connector config. */ + public HopTable(String database, String name, RelDataType rowType, Collection resources, + Map connectorConfig) { + this(database, name, rowType, resources, resources, connectorConfig); + } + + /** Convenience constructor for HopTables that only need a connector config. */ public HopTable(String database, String name, RelDataType rowType, Map connectorConfig) { @@ -64,8 +75,14 @@ public RelDataType rowType() { return rowType; } - public Collection resources() { - return resources; + /** Resources needed when reading from the table */ + public Collection readResources() { + return readResources; + } + + /** Resources needed when writing to the table */ + public Collection writeResources() { + return writeResources; } @Override diff --git a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/Resource.java b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/Resource.java index e40d517..ba18a36 100644 --- a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/Resource.java +++ b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/Resource.java @@ -37,8 +37,10 @@ public abstract class Resource { private final SortedMap> properties = new TreeMap<>(); private final List inputs = new ArrayList<>(); + /** A Resource which should be rendered with the given template */ public Resource(String template) { this.template = template; + export("id", () -> id()); } /** Copy constructor */ @@ -48,10 +50,16 @@ public Resource(Resource other) { this.inputs.addAll(other.inputs); } + /** The name of the template to render this Resource with */ public String template() { return template; } + /** A reasonably unique ID, based on a hash of the exported properties. */ + public String id() { + return Integer.toHexString(hashCode()); + } + /** Export a computed value to the template */ protected void export(String key, Supplier supplier) { properties.put(key, supplier); @@ -110,13 +118,18 @@ public String toString() { StringBuilder sb = new StringBuilder(); sb.append("[ template: " + template() + " "); for (Map.Entry> entry : properties.entrySet()) { + if (entry.getKey().equals("id")) { + // special case for "id" to avoid recursion + continue; + } String value = entry.getValue().get(); - if (value != null && !value.isEmpty()) { - sb.append(entry.getKey()); - sb.append(":"); - sb.append(entry.getValue().get()); - sb.append(" "); + if (value == null || value.isEmpty()) { + continue; } + sb.append(entry.getKey()); + sb.append(":"); + sb.append(entry.getValue().get()); + sb.append(" "); } sb.append("]"); return sb.toString(); diff --git a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ResourceProvider.java b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ResourceProvider.java index 36ad1f4..1ba8f5f 100644 --- a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ResourceProvider.java +++ b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ResourceProvider.java @@ -18,10 +18,25 @@ public interface ResourceProvider { /** Resources for the given table */ Collection resources(String tableName); + /** Resources required when reading from the table */ + default Collection readResources(String tableName) { + return resources(tableName).stream() + .filter(x -> !(x instanceof WriteResource)) + .collect(Collectors.toList()); + } + + /** Resources required when writing to the table */ + default Collection writeResources(String tableName) { + return resources(tableName).stream() + .filter(x -> !(x instanceof ReadResource)) + .collect(Collectors.toList()); + } + /** - * Establishes a source->sink relationship between this ResourceProvider and a sink Resource. + * Establishes a source->sink relationship between ResourceProviders. * - * All leaf-node Resources provided by this ResourceProvider will become inputs to the sink. + * All leaf-node Resources provided by this ResourceProvider will become sources. All nodes + * provided by the given ResourceProvider will be sinks. * * e.g. *
@@ -47,10 +62,17 @@ default ResourceProvider toAll(ResourceProvider sink) {
       sources.removeAll(sources.stream().flatMap(y -> y.inputs().stream())
         .collect(Collectors.toList()));
 
+      // remove all read/write-only upstream Resources
+      sources.removeAll(sources.stream()
+        .filter(y -> y instanceof ReadResource || y instanceof WriteResource)
+        .collect(Collectors.toList()));
+
       // link all sources to all sinks       
       sink.resources(x).forEach(y -> {
         combined.add(new Resource(y) {{
-          sources.forEach(z -> input(z));
+          if (!(y instanceof ReadResource || y instanceof WriteResource)) {
+            sources.forEach(z -> input(z));
+          }
         }});
       });
 
@@ -58,10 +80,12 @@ default ResourceProvider toAll(ResourceProvider sink) {
     };
   }
 
+  /** Provide a sink resource. */
   default ResourceProvider to(Resource resource) {
     return toAll(x -> Collections.singleton(resource));
   }
 
+  /** Provide a sink resource. */
   default ResourceProvider to(Function resourceFunc) {
     return toAll(x -> Collections.singleton(resourceFunc.apply(x)));
   }
@@ -76,14 +100,60 @@ default ResourceProvider withAll(ResourceProvider resourceProvider) {
     };
   }
 
+  /** Provide a resource. */
   default ResourceProvider with(Resource resource) {
     return withAll(x -> Collections.singleton(resource));
   }
 
+  /** Provide a resource. */
   default ResourceProvider with(Function resourceFunc) {
     return withAll(x -> Collections.singleton(resourceFunc.apply(x)));
   }
 
+  /** Provide the given resources, but only when the table needs to be read from. */
+  default ResourceProvider readWithAll(ResourceProvider readResourceProvider) {
+    return x -> {
+      List combined = new ArrayList<>();
+      combined.addAll(resources(x));
+      combined.addAll(readResourceProvider.resources(x).stream()
+        .map(y -> new ReadResource(y))
+        .collect(Collectors.toList()));
+      return combined;
+    };
+  }
+
+  /** Provide the given resources, but only when the table needs to be written to. */
+  default ResourceProvider writeWithAll(ResourceProvider writeResourceProvider) {
+    return x -> {
+      List combined = new ArrayList<>();
+      combined.addAll(resources(x));
+      combined.addAll(writeResourceProvider.resources(x).stream()
+        .map(y -> new WriteResource(y))
+        .collect(Collectors.toList()));
+      return combined;
+    };
+  }
+
+  /** Provide the given resource, but only when the table needs to be read from. */
+  default ResourceProvider readWith(Function resourceFunc) {
+    return readWithAll(x -> Collections.singleton(resourceFunc.apply(x)));
+  }
+
+  /** Provide the given resource, but only when the table needs to be read from. */
+  default ResourceProvider readWith(Resource resource) {
+    return readWithAll(x -> Collections.singleton(resource));
+  }
+
+  /** Provide the given resource, but only when the table needs to be written to. */
+  default ResourceProvider writeWith(Function resourceFunc) {
+    return writeWithAll(x -> Collections.singleton(resourceFunc.apply(x)));
+  }
+
+  /** Provide the given resource, but only when the table needs to be written to. */
+  default ResourceProvider writeWith(Resource resource) {
+    return writeWithAll(x -> Collections.singleton(resource));
+  }
+
   static ResourceProvider empty() {
     return x -> Collections.emptyList();
   }
@@ -95,4 +165,20 @@ static ResourceProvider from(Collection resources) {
   static ResourceProvider from(Resource resource) {
     return x -> Collections.singleton(resource);
   }
+
+  /** A Resource that shouldn't be provided when reading from the table. */
+  static class ReadResource extends Resource {
+
+    public ReadResource(Resource resource) {
+      super(resource);
+    }
+  }
+
+  /** A Resource that shouldn't be provided when wriring to the table. */
+  static class WriteResource extends Resource {
+
+    public WriteResource(Resource resource) {
+      super(resource);
+    }
+  }
 }
diff --git a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/TableFactory.java b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/TableFactory.java
index 32602b0..dbe41b3 100644
--- a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/TableFactory.java
+++ b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/TableFactory.java
@@ -9,14 +9,13 @@ public interface TableFactory {
   HopTable table(String database, String name, RelDataType rowType);
 
   /** Construct a ConnectorFactory */
-  static ConnectorFactory connector(ConfigProvider configProvider) {
-    return new ConnectorFactory(configProvider);
+  static ConnectorFactory connector(ConfigProvider configs) {
+    return new ConnectorFactory(configs);
   }
 
   /** Construct a ConnectorFactory */
-  static ConnectorFactory connector(ConfigProvider configProvider,
-      ResourceProvider resourceProvider) {
-    return new ConnectorFactory(configProvider, resourceProvider);
+  static ConnectorFactory connector(ConfigProvider configs, ResourceProvider resources) {
+    return new ConnectorFactory(configs, resources);
   }
 
   /** A TableFactory which is implemented as a simple connector. */
@@ -35,7 +34,8 @@ public ConnectorFactory(ConfigProvider configProvider) {
 
     @Override
     public HopTable table(String database, String name, RelDataType rowType) {
-      return new HopTable(database, name, rowType, resourceProvider.resources(name),
+      return new HopTable(database, name, rowType, resourceProvider.readResources(name),
+        resourceProvider.writeResources(name),
         ScriptImplementor.empty().connector(database, name, rowType, configProvider.config(name)));
     }
   }
diff --git a/hoptimator-cli/src/main/java/com/linkedin/hoptimator/HoptimatorCliApp.java b/hoptimator-cli/src/main/java/com/linkedin/hoptimator/HoptimatorCliApp.java
index f23bf73..ec94e41 100644
--- a/hoptimator-cli/src/main/java/com/linkedin/hoptimator/HoptimatorCliApp.java
+++ b/hoptimator-cli/src/main/java/com/linkedin/hoptimator/HoptimatorCliApp.java
@@ -55,6 +55,38 @@ protected int run(String[] args) throws IOException {
     sqlline.updateCommandHandlers(commandHandlers);
     return sqlline.begin(args, null, true).ordinal();
   }
+
+  private static InsertInto parseInsertInto(String s) {
+      String sql = s.trim();
+      if (!startsWith(sql, "insert into ")) {
+        throw new IllegalArgumentException("Expected insert into ... ");
+      }
+      String[] parts = sql.substring(12).split("(?i)SELECT"); // case insensitive
+      if (parts.length != 2) {
+        throw new IllegalArgumentException("Expected ... SELECT ...");
+      }
+      String[] parts2 = parts[0].split("\\.");
+      if (parts2.length != 2) {
+        throw new IllegalArgumentException("Expected ... DATABASE.TABLE ...");
+      }
+      // TODO unquote correctly
+      String database = parts2[0].replaceAll("[\\\"']", "").trim();
+      String table = parts2[1].replaceAll("[\\\"']", "").trim();
+      String query = "SELECT " + parts[1];
+      return new InsertInto(database, table, query);
+  }
+
+  private static class InsertInto {
+    private final String database;
+    private final String table;
+    private final String query;
+
+    InsertInto(String database, String table, String query) {
+      this.database = database;
+      this.table = table;
+      this.query = query;
+    }
+  }
  
   private class AvroCommandHandler implements CommandHandler {
 
@@ -76,11 +108,11 @@ public String getHelpText() {
     @Override
     public String matches(String line) {
       String sql = line;
-      if (sql.startsWith(SqlLine.COMMAND_PREFIX)) {
+      if (startsWith(sql, SqlLine.COMMAND_PREFIX)) {
         sql = sql.substring(1);
       }
 
-      if (sql.startsWith("avro")) {
+      if (startsWith(sql, "avro")) {
         sql = sql.substring("avro".length() + 1);
         return sql;
       }
@@ -91,11 +123,11 @@ public String matches(String line) {
     @Override
     public void execute(String line, DispatchCallback dispatchCallback) {
       String sql = line;
-      if (sql.startsWith(SqlLine.COMMAND_PREFIX)) {
+      if (startsWith(sql, SqlLine.COMMAND_PREFIX)) {
         sql = sql.substring(1);
       }
 
-      if (sql.startsWith("avro")) {
+      if (startsWith(sql, "avro")) {
         sql = sql.substring("avro".length() + 1);
       }
 
@@ -143,12 +175,12 @@ public String getHelpText() {
     @Override
     public String matches(String line) {
       String sql = line;
-      if (sql.startsWith(SqlLine.COMMAND_PREFIX)) {
+      if (startsWith(sql, SqlLine.COMMAND_PREFIX)) {
         sql = sql.substring(1);
       }
 
-      if (sql.startsWith("yaml")) {
-        sql = sql.substring("yaml".length() + 1);
+      if (startsWith(sql, "yaml insert into ")) {
+        sql = sql.substring("yaml insert into ".length());
         return sql;
       }
 
@@ -158,22 +190,23 @@ public String matches(String line) {
     @Override
     public void execute(String line, DispatchCallback dispatchCallback) {
       String sql = line;
-      if (sql.startsWith(SqlLine.COMMAND_PREFIX)) {
+      if (startsWith(sql, SqlLine.COMMAND_PREFIX)) {
         sql = sql.substring(1);
       }
 
-      if (sql.startsWith("yaml")) {
+      if (startsWith(sql, "yaml")) {
         sql = sql.substring("yaml".length() + 1);
       }
 
       String connectionUrl = sqlline.getConnectionMetadata().getUrl();
       try {
+        InsertInto insertInto = parseInsertInto(sql);
         HoptimatorPlanner planner = HoptimatorPlanner.fromModelFile(connectionUrl, new Properties());
-        PipelineRel plan = planner.pipeline(sql);
+        PipelineRel plan = planner.pipeline(insertInto.query);
         PipelineRel.Implementor impl = new PipelineRel.Implementor(plan);
-        HopTable outputTable = new HopTable("PIPELINE", "SINK", plan.getRowType(),
-          Collections.singletonMap("connector", "dummy"));
-        Pipeline pipeline = impl.pipeline(outputTable);
+        HopTable sink = planner.database(insertInto.database)
+          .makeTable(insertInto.table, impl.rowType());
+        Pipeline pipeline = impl.pipeline(sink);
         // TODO provide generated avro schema to environment
         Resource.TemplateFactory templateFactory = new Resource.SimpleTemplateFactory(new Resource.DummyEnvironment());
         sqlline.output(pipeline.render(templateFactory));
@@ -216,11 +249,11 @@ public String getHelpText() {
     @Override
     public String matches(String line) {
       String sql = line;
-      if (sql.startsWith(SqlLine.COMMAND_PREFIX)) {
+      if (startsWith(sql, SqlLine.COMMAND_PREFIX)) {
         sql = sql.substring(1);
       }
 
-      if (sql.startsWith("pipeline")) {
+      if (startsWith(sql, "pipeline")) {
         sql = sql.substring("pipeline".length() + 1);
         return sql;
       }
@@ -231,25 +264,26 @@ public String matches(String line) {
     @Override
     public void execute(String line, DispatchCallback dispatchCallback) {
       String sql = line;
-      if (sql.startsWith(SqlLine.COMMAND_PREFIX)) {
+      if (startsWith(sql, SqlLine.COMMAND_PREFIX)) {
         sql = sql.substring(1);
       }
 
-      if (sql.startsWith("pipeline")) {
+      if (startsWith(sql, "pipeline")) {
         sql = sql.substring("pipeline".length() + 1);
       }
 
       String connectionUrl = sqlline.getConnectionMetadata().getUrl();
       try {
+        InsertInto insertInto = parseInsertInto(sql);
         HoptimatorPlanner planner = HoptimatorPlanner.fromModelFile(connectionUrl, new Properties());
-        PipelineRel plan = planner.pipeline(sql);
+        PipelineRel plan = planner.pipeline(insertInto.query);
         sqlline.output("PLAN:");
         sqlline.output(plan.explain());
         PipelineRel.Implementor impl = new PipelineRel.Implementor(plan);
         sqlline.output("SQL:");
-        HopTable outputTable = new HopTable("PIPELINE", "SINK", plan.getRowType(),
-          Collections.singletonMap("connector", "dummy"));
-        sqlline.output(impl.insertInto(outputTable).sql(MysqlSqlDialect.DEFAULT));
+        HopTable sink = planner.database(insertInto.database)
+          .makeTable(insertInto.table, impl.rowType());
+        sqlline.output(impl.insertInto(sink).sql(MysqlSqlDialect.DEFAULT));
         dispatchCallback.setToSuccess();
       } catch (Exception e) {
         sqlline.error(e.toString());
@@ -289,11 +323,11 @@ public String getHelpText() {
     @Override
     public String matches(String line) {
       String sql = line;
-      if (sql.startsWith(SqlLine.COMMAND_PREFIX)) {
+      if (startsWith(sql, SqlLine.COMMAND_PREFIX)) {
         sql = sql.substring(1);
       }
 
-      if (sql.startsWith("check")) {
+      if (startsWith(sql, "check")) {
         sql = sql.substring("check".length() + 1);
         return sql;
       }
@@ -304,11 +338,11 @@ public String matches(String line) {
     @Override
     public void execute(String line, DispatchCallback dispatchCallback) {
       String sql = line;
-      if (sql.startsWith(SqlLine.COMMAND_PREFIX)) {
+      if (startsWith(sql, SqlLine.COMMAND_PREFIX)) {
         sql = sql.substring(1);
       }
 
-      if (sql.startsWith("check")) {
+      if (startsWith(sql, "check")) {
         sql = sql.substring("check".length() + 1);
       }
 
@@ -414,11 +448,11 @@ public String getHelpText() {
     @Override
     public String matches(String line) {
       String sql = line;
-      if (sql.startsWith(SqlLine.COMMAND_PREFIX)) {
+      if (startsWith(sql, SqlLine.COMMAND_PREFIX)) {
         sql = sql.substring(1);
       }
 
-      if (sql.startsWith("insert into")) {
+      if (startsWith(sql, "insert into")) {
         sql = sql.substring("insert into".length() + 1);
         return sql;
       }
@@ -429,34 +463,21 @@ public String matches(String line) {
     @Override
     public void execute(String line, DispatchCallback dispatchCallback) {
       String sql = line;
-      if (sql.startsWith(SqlLine.COMMAND_PREFIX)) {
+      if (startsWith(sql, SqlLine.COMMAND_PREFIX)) {
         sql = sql.substring(1);
       }
 
-      if (sql.startsWith("insert into")) {
-        sql = sql.substring("insert into".length() + 1);
-      }
-
       String connectionUrl = sqlline.getConnectionMetadata().getUrl();
       try {
-        String[] parts = sql.split("(?i)SELECT"); // case insensitive
-        if (parts.length != 2) {
-          throw new IllegalArgumentException("Expected ... SELECT ...");
-        }
-        String[] parts2 = parts[0].split("\\.");
-        if (parts2.length != 2) {
-          throw new IllegalArgumentException("Expected ... DATABASE.TABLE ...");
-        }
-        // TODO unquote correctly
-        String database = parts2[0].replaceAll("[\\\"']", "").trim();
-        String table = parts2[1].replaceAll("[\\\"']", "").trim();
-        String query = parts[1];
-
-        HoptimatorPlanner planner = HoptimatorPlanner.fromModelFile(connectionUrl, new Properties());
-        PipelineRel plan = planner.pipeline("SELECT " + query);
+        InsertInto insertInto = parseInsertInto(sql);
+        HoptimatorPlanner planner = HoptimatorPlanner.fromModelFile(connectionUrl,
+          new Properties());
+        PipelineRel plan = planner.pipeline(insertInto.query);
         PipelineRel.Implementor impl = new PipelineRel.Implementor(plan);
-        HopTable sink = planner.database(database).makeTable(table, impl.rowType());
-        String pipelineSql = impl.insertInto(sink).sql(MysqlSqlDialect.DEFAULT) + "\nSELECT 'SUCCESS';";
+        HopTable sink = planner.database(insertInto.database)
+          .makeTable(insertInto.table, impl.rowType());
+        String pipelineSql = impl.insertInto(sink).sql(MysqlSqlDialect.DEFAULT)
+          + "\nSELECT 'SUCCESS';";
         FlinkIterable iterable = new FlinkIterable(pipelineSql);
         Iterator iter = iterable.field(0).iterator();
         if (iter.hasNext()) {
@@ -505,7 +526,7 @@ public String getHelpText() {
 
     @Override
     public String matches(String line) {
-      if (line.startsWith("!intro") || line.startsWith("intro")) {
+      if (startsWith(line, "!intro") || startsWith(line, "intro")) {
         return line;
       } else {
         return null;
@@ -551,12 +572,12 @@ public String getHelpText() {
     @Override
     public String matches(String line) {
       String sql = line;
-      if (sql.startsWith(SqlLine.COMMAND_PREFIX)) {
+      if (startsWith(sql, SqlLine.COMMAND_PREFIX)) {
         sql = sql.substring(1);
       }
 
-      if (sql.startsWith("mermaid")) {
-        sql = sql.substring("mermaid".length() + 1);
+      if (startsWith(sql, "mermaid insert into ")) {
+        sql = sql.substring("mermaid insert into ".length());
         return sql;
       }
 
@@ -566,12 +587,12 @@ public String matches(String line) {
     @Override
     public void execute(String line, DispatchCallback dispatchCallback) {
       String sql = line;
-      if (sql.startsWith(SqlLine.COMMAND_PREFIX)) {
+      if (startsWith(sql, SqlLine.COMMAND_PREFIX)) {
         sql = sql.substring(1);
       }
 
-      if (sql.startsWith("mermaid")) {
-        sql = sql.substring("mermaid".length() + 1);
+      if (startsWith(sql, "mermaid ")) {
+        sql = sql.substring("mermaid ".length());
       }
 
       //remove semicolon from query if present
@@ -581,12 +602,13 @@ public void execute(String line, DispatchCallback dispatchCallback) {
 
       String connectionUrl = sqlline.getConnectionMetadata().getUrl();
       try {
+        InsertInto insertInto = parseInsertInto(sql);
         HoptimatorPlanner planner = HoptimatorPlanner.fromModelFile(connectionUrl, new Properties());
-        PipelineRel plan = planner.pipeline(sql);
+        PipelineRel plan = planner.pipeline(insertInto.query);
         PipelineRel.Implementor impl = new PipelineRel.Implementor(plan);
-        HopTable outputTable = new HopTable("PIPELINE", "SINK", plan.getRowType(),
-          Collections.singletonMap("connector", "dummy"));
-        Pipeline pipeline = impl.pipeline(outputTable);
+        HopTable sink = planner.database(insertInto.database)
+          .makeTable(insertInto.table, impl.rowType());
+        Pipeline pipeline = impl.pipeline(sink);
         sqlline.output(pipeline.mermaid());
         dispatchCallback.setToSuccess();
       } catch (Exception e) {
@@ -605,4 +627,9 @@ public boolean echoToFile() {
       return false;
     }
   }
+
+  // case-insensitive prefix match
+  static boolean startsWith(String s, String prefix) {
+    return s.matches("(?i)" + prefix + ".*");
+  }
 }
diff --git a/hoptimator-cli/src/main/resources/intro.txt b/hoptimator-cli/src/main/resources/intro.txt
index 4cd9ab5..8c1700d 100644
--- a/hoptimator-cli/src/main/resources/intro.txt
+++ b/hoptimator-cli/src/main/resources/intro.txt
@@ -37,19 +37,17 @@ Try the following queries:
   > SELECT * FROM DATAGEN.PERSON;
   > SELECT AGE FROM DATAGEN.PERSON p, DATAGEN.COMPANY c WHERE c.CEO = p.NAME;
 
-  -- Hoptimator understands and infers Avro schemas:
-  > !avro SELECT * FROM DATAGEN.PERSON
-  > !avro SELECT AGE FROM DATAGEN.PERSON p, DATAGEN.COMPANY c WHERE c.CEO = p.NAME
+  -- For testing purposes, you can run pipelines locally:
+  > !insert into RAWKAFKA."test-sink" SELECT * FROM DATAGEN.PERSON
 
-  -- For debugging purposes, you may wish to see the pipeline implementation:
+  -- There are several commands for debugging purposes:
   > !pipeline SELECT * FROM DATAGEN.PERSON
   > !pipeline SELECT AGE FROM DATAGEN.PERSON p, DATAGEN.COMPANY c WHERE c.CEO = p.NAME
+  > !avro SELECT AGE FROM DATAGEN.PERSON p, DATAGEN.COMPANY c WHERE c.CEO = p.NAME
+  > !yaml insert into RAWKAFKA."test-sink" SELECT * FROM DATAGEN.PERSON
+  > !mermaid insert into RAWKAFKA."test-sink" SELECT * FROM DATAGEN.PERSON
 
-  -- Pipelines can be deployed to Kubernetes by generating YAML:
-  > !yaml SELECT * FROM DATAGEN.PERSON
-  > !yaml SELECT AGE FROM DATAGEN.PERSON p, DATAGEN.COMPANY c WHERE c.CEO = p.NAME
-
-  -- Good luck!
+  -- Happy hopping!
   > !help
 
 
diff --git a/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/KafkaTopic.java b/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/KafkaTopic.java
index fe50e4b..c980690 100644
--- a/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/KafkaTopic.java
+++ b/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/KafkaTopic.java
@@ -6,7 +6,7 @@
 import java.util.Collections;
 import java.util.Map;
 
-public class KafkaTopic extends Resource {
+class KafkaTopic extends Resource {
   public KafkaTopic(String name, Integer numPartitions,
       Map clientOverrides) {
     super("KafkaTopic");
diff --git a/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/KafkaTopicAcl.java b/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/KafkaTopicAcl.java
new file mode 100644
index 0000000..56054f1
--- /dev/null
+++ b/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/KafkaTopicAcl.java
@@ -0,0 +1,13 @@
+package com.linkedin.hoptimator.catalog.kafka;
+
+import com.linkedin.hoptimator.catalog.Resource;
+
+class KafkaTopicAcl extends Resource {
+  public KafkaTopicAcl(String topicName, String principal, String method) {
+    super("KafkaTopicAcl");
+    export("topicName", topicName);
+    export("principal", principal);
+    export("method", method);
+  }
+}
+
diff --git a/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/RawKafkaSchemaFactory.java b/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/RawKafkaSchemaFactory.java
index f12193f..7b96599 100644
--- a/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/RawKafkaSchemaFactory.java
+++ b/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/RawKafkaSchemaFactory.java
@@ -24,6 +24,7 @@ public class RawKafkaSchemaFactory implements SchemaFactory {
   @Override
   @SuppressWarnings("unchecked")
   public Schema create(SchemaPlus parentSchema, String name, Map operand) {
+    String principal = (String) operand.getOrDefault("principal", "User:ANONYMOUS");
     Map clientConfig = (Map) operand.get("clientConfig");
     DataType.Struct rowType = DataType.struct()
       .with("PAYLOAD", DataType.VARCHAR_NULL)
@@ -44,10 +45,12 @@ public Schema create(SchemaPlus parentSchema, String name, Map o
     ConfigProvider topicConfigProvider = ConfigProvider.from(clientConfig);
     TableResolver resolver = x -> rowType.rel();
     Integer numPartitions = (Integer) operand.get("numPartitions");
-    ResourceProvider resourceProvider = x -> Collections.singleton(new KafkaTopic(x,
-      numPartitions, topicConfigProvider.config(x)));
+    ResourceProvider resources = ResourceProvider.empty()
+      .with(x -> new KafkaTopic(x, numPartitions, topicConfigProvider.config(x)))
+      .readWith(x -> new KafkaTopicAcl(x, principal, "Read"))
+      .writeWith(x -> new KafkaTopicAcl(x, principal, "Write"));
     Database database = new Database(name, tableLister, resolver, connectorConfigProvider,
-      resourceProvider);
+      resources);
     return new DatabaseSchema(database);
   }
 }
diff --git a/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/operator/kafka/KafkaControllerProvider.java b/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/operator/kafka/KafkaControllerProvider.java
index 8a72eb2..7d6515a 100644
--- a/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/operator/kafka/KafkaControllerProvider.java
+++ b/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/operator/kafka/KafkaControllerProvider.java
@@ -2,6 +2,8 @@
 
 import com.linkedin.hoptimator.operator.ControllerProvider;
 import com.linkedin.hoptimator.operator.Operator;
+import com.linkedin.hoptimator.models.V1alpha1Acl;
+import com.linkedin.hoptimator.models.V1alpha1AclList;
 import com.linkedin.hoptimator.models.V1alpha1KafkaTopic;
 import com.linkedin.hoptimator.models.V1alpha1KafkaTopicList;
 
@@ -9,8 +11,8 @@
 import io.kubernetes.client.extended.controller.builder.ControllerBuilder;
 import io.kubernetes.client.extended.controller.reconciler.Reconciler;
 
+import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 
 /** Provides a Controller plugin for KafkaTopics. */
 public class KafkaControllerProvider implements ControllerProvider {
@@ -20,14 +22,26 @@ public Collection controllers(Operator operator) {
     operator.registerApi("KafkaTopic", "kafkatopic", "kafkatopics", "hoptimator.linkedin.com",
       "v1alpha1", V1alpha1KafkaTopic.class, V1alpha1KafkaTopicList.class);
 
-    Reconciler reconciler = new KafkaTopicReconciler(operator);
-    Controller controller = ControllerBuilder.defaultBuilder(operator.informerFactory())
-      .withReconciler(reconciler)
+    // N.B. this shared CRD may be re-registered by other ControllerProviders
+    operator.registerApi("Acl", "acl", "acls", "hoptimator.linkedin.com",
+      "v1alpha1", V1alpha1Acl.class, V1alpha1AclList.class);
+
+    Reconciler topicReconciler = new KafkaTopicReconciler(operator);
+    Controller topicController = ControllerBuilder.defaultBuilder(operator.informerFactory())
+      .withReconciler(topicReconciler)
       .withName("kafka-topic-controller")
       .withWorkerCount(1)
       .watch(x -> ControllerBuilder.controllerWatchBuilder(V1alpha1KafkaTopic.class, x).build())
       .build();
 
-    return Collections.singleton(controller);
+    Reconciler topicAclReconciler = new KafkaTopicAclReconciler(operator);
+    Controller topicAclController = ControllerBuilder.defaultBuilder(operator.informerFactory())
+      .withReconciler(topicAclReconciler)
+      .withName("kafka-topic-acl-controller")
+      .withWorkerCount(1)
+      .watch(x -> ControllerBuilder.controllerWatchBuilder(V1alpha1Acl.class, x).build())
+      .build();
+
+    return Arrays.asList(new Controller[]{topicController, topicAclController});
   }
 }
diff --git a/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/operator/kafka/KafkaTopicAclReconciler.java b/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/operator/kafka/KafkaTopicAclReconciler.java
new file mode 100644
index 0000000..aa643ad
--- /dev/null
+++ b/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/operator/kafka/KafkaTopicAclReconciler.java
@@ -0,0 +1,134 @@
+package com.linkedin.hoptimator.operator.kafka;
+
+import com.linkedin.hoptimator.operator.Operator;
+import com.linkedin.hoptimator.operator.ConfigAssembler;
+import com.linkedin.hoptimator.models.V1alpha1KafkaTopic;
+import com.linkedin.hoptimator.models.V1alpha1Acl;
+import com.linkedin.hoptimator.models.V1alpha1AclSpec;
+
+import io.kubernetes.client.extended.controller.reconciler.Reconciler;
+import io.kubernetes.client.extended.controller.reconciler.Request;
+import io.kubernetes.client.extended.controller.reconciler.Result;
+import io.kubernetes.client.openapi.ApiClient;
+import io.kubernetes.client.openapi.ApiException;
+import io.kubernetes.client.openapi.models.V1OwnerReference;
+
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.clients.admin.AdminClient;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+public class KafkaTopicAclReconciler implements Reconciler {
+  private final static Logger log = LoggerFactory.getLogger(KafkaTopicAclReconciler.class);
+  private final static String ACL = "hoptimator.linkedin.com/v1alpha1/Acl";
+  private final static String KAFKATOPIC = "hoptimator.linkedin.com/v1alpha1/KafkaTopic";
+
+  private final Operator operator;
+
+  public KafkaTopicAclReconciler(Operator operator) {
+    this.operator = operator;
+  }
+
+  @Override
+  public Result reconcile(Request request) {
+    log.info("Reconciling request {}", request);
+    String name = request.getName();
+    String namespace = request.getNamespace();
+
+    try {
+      V1alpha1Acl object = operator.fetch(ACL, namespace, name);
+
+      if (object == null) {
+        log.info("Object {}/{} deleted. Skipping.", namespace, name);
+        return new Result(false);
+      }
+
+      String targetKind = object.getSpec().getResource().getKind();
+
+      if (!targetKind.equals("KafkaTopic")) {
+        log.info("Not a KafkaTopic Acl. Skipping.");
+      }
+
+      V1alpha1AclSpec.MethodEnum method = object.getSpec().getMethod();
+      AclOperation operation;
+      switch (method) {
+      case READ:
+        operation = AclOperation.READ;
+        break;
+      case WRITE:
+        operation = AclOperation.WRITE;
+        break;
+      default:
+        log.info("Unsupported KafkaTopic Acl operation {}. Skipping.", method);
+        return new Result(false);
+      }
+      
+      String targetName = object.getSpec().getResource().getName();
+      String principal = object.getSpec().getPrincipal();
+
+      V1alpha1KafkaTopic target = operator.fetch(KAFKATOPIC, namespace, targetName);
+
+      if (target == null) {
+        log.info("Target KafkaTopic {}/{} not found. Retrying.", namespace, targetName);
+        return new Result(true, operator.failureRetryDuration());
+      }
+
+      // assemble AdminClient config
+      ConfigAssembler assembler = new ConfigAssembler(operator);
+      list(target.getSpec().getClientConfigs()).forEach(x ->
+        assembler.addRef(namespace, x.getConfigMapRef().getName()));
+      map(target.getSpec().getClientOverrides()).forEach((k, v) -> assembler.addOverride(k, v));
+      Properties properties = assembler.assembleProperties();
+      log.info("Using AdminClient config: {}", properties);
+
+      AdminClient admin = AdminClient.create(properties);
+      try {
+        log.info("Creating KafkaTopic Acl for {}...", target.getSpec().getTopicName());
+        AclBinding binding = new AclBinding(new ResourcePattern(ResourceType.TOPIC,
+          target.getSpec().getTopicName(), PatternType.LITERAL), new AccessControlEntry(
+          principal, "*", operation, AclPermissionType.ALLOW));
+        admin.createAcls(Collections.singleton(binding)).all().get();
+        log.info("Granted {} {} access to {}.", principal, method, target.getSpec().getTopicName());
+      } finally {
+        admin.close();
+      }
+    } catch (Exception e) {
+      log.error("Encountered exception while reconciling KafkaTopic Acl {}/{}", namespace, name, e);
+      return new Result(true, operator.failureRetryDuration());
+    }
+    log.info("Done reconciling {}/{}", namespace, name);
+    return new Result(false);
+  }
+
+  private static  List list(List maybeNull) {
+    if (maybeNull == null) {
+      return Collections.emptyList();
+    } else {
+      return maybeNull;
+    }
+  }
+
+  private static  Map map(Map maybeNull) {
+    if (maybeNull == null) {
+      return Collections.emptyMap();
+    } else {
+      return maybeNull;
+    }
+  }
+}
+
diff --git a/hoptimator-kafka-adapter/src/main/resources/KafkaTopic.yaml.template b/hoptimator-kafka-adapter/src/main/resources/KafkaTopic.yaml.template
index f434b1c..8b4df55 100644
--- a/hoptimator-kafka-adapter/src/main/resources/KafkaTopic.yaml.template
+++ b/hoptimator-kafka-adapter/src/main/resources/KafkaTopic.yaml.template
@@ -1,7 +1,7 @@
 apiVersion: hoptimator.linkedin.com/v1alpha1
 kind: KafkaTopic
 metadata:
-  name: {{name}}-kafka-topic
+  name: {{name}}
   namespace: {{namespace}}
 spec:
   topicName: {{name}}
diff --git a/hoptimator-kafka-adapter/src/main/resources/KafkaTopicAcl.yaml.template b/hoptimator-kafka-adapter/src/main/resources/KafkaTopicAcl.yaml.template
new file mode 100644
index 0000000..1e31162
--- /dev/null
+++ b/hoptimator-kafka-adapter/src/main/resources/KafkaTopicAcl.yaml.template
@@ -0,0 +1,11 @@
+apiVersion: hoptimator.linkedin.com/v1alpha1
+kind: Acl
+metadata:
+  name: {{name}}-acl-{{id}}
+  namespace: {{namespace}}
+spec:
+  resource:
+    kind: KafkaTopic
+    name: {{name}}
+  method: {{method}}
+  principal: {{principal}} 
diff --git a/hoptimator-models/generate-models.sh b/hoptimator-models/generate-models.sh
index e48a97e..473697f 100755
--- a/hoptimator-models/generate-models.sh
+++ b/hoptimator-models/generate-models.sh
@@ -10,6 +10,7 @@ docker run \
   --network host \
   ghcr.io/kubernetes-client/java/crd-model-gen:v1.0.6 \
   /generate.sh -o "$(pwd)/hoptimator-models" -n "" -p "com.linkedin.hoptimator" \
+  -u "$(pwd)/deploy/acls.crd.yaml" \
   -u "$(pwd)/deploy/kafkatopics.crd.yaml" \
   -u "$(pwd)/deploy/subscriptions.crd.yaml" \
   && echo "done."
diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Acl.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Acl.java
new file mode 100644
index 0000000..78c2c25
--- /dev/null
+++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Acl.java
@@ -0,0 +1,219 @@
+/*
+ * Kubernetes
+ * No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)
+ *
+ * The version of the OpenAPI document: v1.21.1
+ * 
+ *
+ * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
+ * https://openapi-generator.tech
+ * Do not edit the class manually.
+ */
+
+
+package com.linkedin.hoptimator.models;
+
+import java.util.Objects;
+import java.util.Arrays;
+import com.google.gson.TypeAdapter;
+import com.google.gson.annotations.JsonAdapter;
+import com.google.gson.annotations.SerializedName;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+import com.linkedin.hoptimator.models.V1alpha1AclSpec;
+import com.linkedin.hoptimator.models.V1alpha1AclStatus;
+import io.kubernetes.client.openapi.models.V1ObjectMeta;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import java.io.IOException;
+
+/**
+ * Access control rule (colloquially, an Acl)
+ */
+@ApiModel(description = "Access control rule (colloquially, an Acl)")
+@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
+public class V1alpha1Acl implements io.kubernetes.client.common.KubernetesObject {
+  public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
+  @SerializedName(SERIALIZED_NAME_API_VERSION)
+  private String apiVersion;
+
+  public static final String SERIALIZED_NAME_KIND = "kind";
+  @SerializedName(SERIALIZED_NAME_KIND)
+  private String kind;
+
+  public static final String SERIALIZED_NAME_METADATA = "metadata";
+  @SerializedName(SERIALIZED_NAME_METADATA)
+  private V1ObjectMeta metadata = null;
+
+  public static final String SERIALIZED_NAME_SPEC = "spec";
+  @SerializedName(SERIALIZED_NAME_SPEC)
+  private V1alpha1AclSpec spec;
+
+  public static final String SERIALIZED_NAME_STATUS = "status";
+  @SerializedName(SERIALIZED_NAME_STATUS)
+  private V1alpha1AclStatus status;
+
+
+  public V1alpha1Acl apiVersion(String apiVersion) {
+    
+    this.apiVersion = apiVersion;
+    return this;
+  }
+
+   /**
+   * APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources
+   * @return apiVersion
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources")
+
+  public String getApiVersion() {
+    return apiVersion;
+  }
+
+
+  public void setApiVersion(String apiVersion) {
+    this.apiVersion = apiVersion;
+  }
+
+
+  public V1alpha1Acl kind(String kind) {
+    
+    this.kind = kind;
+    return this;
+  }
+
+   /**
+   * Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds
+   * @return kind
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds")
+
+  public String getKind() {
+    return kind;
+  }
+
+
+  public void setKind(String kind) {
+    this.kind = kind;
+  }
+
+
+  public V1alpha1Acl metadata(V1ObjectMeta metadata) {
+    
+    this.metadata = metadata;
+    return this;
+  }
+
+   /**
+   * Get metadata
+   * @return metadata
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "")
+
+  public V1ObjectMeta getMetadata() {
+    return metadata;
+  }
+
+
+  public void setMetadata(V1ObjectMeta metadata) {
+    this.metadata = metadata;
+  }
+
+
+  public V1alpha1Acl spec(V1alpha1AclSpec spec) {
+    
+    this.spec = spec;
+    return this;
+  }
+
+   /**
+   * Get spec
+   * @return spec
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "")
+
+  public V1alpha1AclSpec getSpec() {
+    return spec;
+  }
+
+
+  public void setSpec(V1alpha1AclSpec spec) {
+    this.spec = spec;
+  }
+
+
+  public V1alpha1Acl status(V1alpha1AclStatus status) {
+    
+    this.status = status;
+    return this;
+  }
+
+   /**
+   * Get status
+   * @return status
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "")
+
+  public V1alpha1AclStatus getStatus() {
+    return status;
+  }
+
+
+  public void setStatus(V1alpha1AclStatus status) {
+    this.status = status;
+  }
+
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    V1alpha1Acl v1alpha1Acl = (V1alpha1Acl) o;
+    return Objects.equals(this.apiVersion, v1alpha1Acl.apiVersion) &&
+        Objects.equals(this.kind, v1alpha1Acl.kind) &&
+        Objects.equals(this.metadata, v1alpha1Acl.metadata) &&
+        Objects.equals(this.spec, v1alpha1Acl.spec) &&
+        Objects.equals(this.status, v1alpha1Acl.status);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(apiVersion, kind, metadata, spec, status);
+  }
+
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("class V1alpha1Acl {\n");
+    sb.append("    apiVersion: ").append(toIndentedString(apiVersion)).append("\n");
+    sb.append("    kind: ").append(toIndentedString(kind)).append("\n");
+    sb.append("    metadata: ").append(toIndentedString(metadata)).append("\n");
+    sb.append("    spec: ").append(toIndentedString(spec)).append("\n");
+    sb.append("    status: ").append(toIndentedString(status)).append("\n");
+    sb.append("}");
+    return sb.toString();
+  }
+
+  /**
+   * Convert the given object to string with each line indented by 4 spaces
+   * (except the first line).
+   */
+  private String toIndentedString(Object o) {
+    if (o == null) {
+      return "null";
+    }
+    return o.toString().replace("\n", "\n    ");
+  }
+
+}
+
diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclList.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclList.java
new file mode 100644
index 0000000..24178fc
--- /dev/null
+++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclList.java
@@ -0,0 +1,195 @@
+/*
+ * Kubernetes
+ * No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)
+ *
+ * The version of the OpenAPI document: v1.21.1
+ * 
+ *
+ * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
+ * https://openapi-generator.tech
+ * Do not edit the class manually.
+ */
+
+
+package com.linkedin.hoptimator.models;
+
+import java.util.Objects;
+import java.util.Arrays;
+import com.google.gson.TypeAdapter;
+import com.google.gson.annotations.JsonAdapter;
+import com.google.gson.annotations.SerializedName;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+import com.linkedin.hoptimator.models.V1alpha1Acl;
+import io.kubernetes.client.openapi.models.V1ListMeta;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * AclList is a list of Acl
+ */
+@ApiModel(description = "AclList is a list of Acl")
+@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
+public class V1alpha1AclList implements io.kubernetes.client.common.KubernetesListObject {
+  public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
+  @SerializedName(SERIALIZED_NAME_API_VERSION)
+  private String apiVersion;
+
+  public static final String SERIALIZED_NAME_ITEMS = "items";
+  @SerializedName(SERIALIZED_NAME_ITEMS)
+  private List items = new ArrayList<>();
+
+  public static final String SERIALIZED_NAME_KIND = "kind";
+  @SerializedName(SERIALIZED_NAME_KIND)
+  private String kind;
+
+  public static final String SERIALIZED_NAME_METADATA = "metadata";
+  @SerializedName(SERIALIZED_NAME_METADATA)
+  private V1ListMeta metadata = null;
+
+
+  public V1alpha1AclList apiVersion(String apiVersion) {
+    
+    this.apiVersion = apiVersion;
+    return this;
+  }
+
+   /**
+   * APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources
+   * @return apiVersion
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources")
+
+  public String getApiVersion() {
+    return apiVersion;
+  }
+
+
+  public void setApiVersion(String apiVersion) {
+    this.apiVersion = apiVersion;
+  }
+
+
+  public V1alpha1AclList items(List items) {
+    
+    this.items = items;
+    return this;
+  }
+
+  public V1alpha1AclList addItemsItem(V1alpha1Acl itemsItem) {
+    this.items.add(itemsItem);
+    return this;
+  }
+
+   /**
+   * List of acls. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md
+   * @return items
+  **/
+  @ApiModelProperty(required = true, value = "List of acls. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md")
+
+  public List getItems() {
+    return items;
+  }
+
+
+  public void setItems(List items) {
+    this.items = items;
+  }
+
+
+  public V1alpha1AclList kind(String kind) {
+    
+    this.kind = kind;
+    return this;
+  }
+
+   /**
+   * Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds
+   * @return kind
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds")
+
+  public String getKind() {
+    return kind;
+  }
+
+
+  public void setKind(String kind) {
+    this.kind = kind;
+  }
+
+
+  public V1alpha1AclList metadata(V1ListMeta metadata) {
+    
+    this.metadata = metadata;
+    return this;
+  }
+
+   /**
+   * Get metadata
+   * @return metadata
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "")
+
+  public V1ListMeta getMetadata() {
+    return metadata;
+  }
+
+
+  public void setMetadata(V1ListMeta metadata) {
+    this.metadata = metadata;
+  }
+
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    V1alpha1AclList v1alpha1AclList = (V1alpha1AclList) o;
+    return Objects.equals(this.apiVersion, v1alpha1AclList.apiVersion) &&
+        Objects.equals(this.items, v1alpha1AclList.items) &&
+        Objects.equals(this.kind, v1alpha1AclList.kind) &&
+        Objects.equals(this.metadata, v1alpha1AclList.metadata);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(apiVersion, items, kind, metadata);
+  }
+
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("class V1alpha1AclList {\n");
+    sb.append("    apiVersion: ").append(toIndentedString(apiVersion)).append("\n");
+    sb.append("    items: ").append(toIndentedString(items)).append("\n");
+    sb.append("    kind: ").append(toIndentedString(kind)).append("\n");
+    sb.append("    metadata: ").append(toIndentedString(metadata)).append("\n");
+    sb.append("}");
+    return sb.toString();
+  }
+
+  /**
+   * Convert the given object to string with each line indented by 4 spaces
+   * (except the first line).
+   */
+  private String toIndentedString(Object o) {
+    if (o == null) {
+      return "null";
+    }
+    return o.toString().replace("\n", "\n    ");
+  }
+
+}
+
diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpec.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpec.java
new file mode 100644
index 0000000..b873b09
--- /dev/null
+++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpec.java
@@ -0,0 +1,239 @@
+/*
+ * Kubernetes
+ * No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)
+ *
+ * The version of the OpenAPI document: v1.21.1
+ * 
+ *
+ * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
+ * https://openapi-generator.tech
+ * Do not edit the class manually.
+ */
+
+
+package com.linkedin.hoptimator.models;
+
+import java.util.Objects;
+import java.util.Arrays;
+import com.google.gson.TypeAdapter;
+import com.google.gson.annotations.JsonAdapter;
+import com.google.gson.annotations.SerializedName;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+import com.linkedin.hoptimator.models.V1alpha1AclSpecResource;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import java.io.IOException;
+
+/**
+ * A set of related ACL rules.
+ */
+@ApiModel(description = "A set of related ACL rules.")
+@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
+public class V1alpha1AclSpec {
+  /**
+   * The resource access method.
+   */
+  @JsonAdapter(MethodEnum.Adapter.class)
+  public enum MethodEnum {
+    ALTER("Alter"),
+    
+    CREATE("Create"),
+    
+    DELETE("Delete"),
+    
+    DESCRIBE("Describe"),
+    
+    READ("Read"),
+    
+    WRITE("Write"),
+    
+    POST("Post"),
+    
+    PUT("Put"),
+    
+    GET("Get"),
+    
+    HEAD("Head"),
+    
+    PATCH("Patch"),
+    
+    TRACE("Trace"),
+    
+    OPTIONS("Options"),
+    
+    GETALL("GetAll"),
+    
+    BATCHGET("BatchGet"),
+    
+    BATCHCREATE("BatchCreate"),
+    
+    BATCHUPDATE("BatchUpdate"),
+    
+    PARTIALUPDATE("PartialUpdate"),
+    
+    BATCHDELETE("BatchDelete"),
+    
+    BATCHPARTIALDELETE("BatchPartialDelete");
+
+    private String value;
+
+    MethodEnum(String value) {
+      this.value = value;
+    }
+
+    public String getValue() {
+      return value;
+    }
+
+    @Override
+    public String toString() {
+      return String.valueOf(value);
+    }
+
+    public static MethodEnum fromValue(String value) {
+      for (MethodEnum b : MethodEnum.values()) {
+        if (b.value.equals(value)) {
+          return b;
+        }
+      }
+      throw new IllegalArgumentException("Unexpected value '" + value + "'");
+    }
+
+    public static class Adapter extends TypeAdapter {
+      @Override
+      public void write(final JsonWriter jsonWriter, final MethodEnum enumeration) throws IOException {
+        jsonWriter.value(enumeration.getValue());
+      }
+
+      @Override
+      public MethodEnum read(final JsonReader jsonReader) throws IOException {
+        String value =  jsonReader.nextString();
+        return MethodEnum.fromValue(value);
+      }
+    }
+  }
+
+  public static final String SERIALIZED_NAME_METHOD = "method";
+  @SerializedName(SERIALIZED_NAME_METHOD)
+  private MethodEnum method;
+
+  public static final String SERIALIZED_NAME_PRINCIPAL = "principal";
+  @SerializedName(SERIALIZED_NAME_PRINCIPAL)
+  private String principal;
+
+  public static final String SERIALIZED_NAME_RESOURCE = "resource";
+  @SerializedName(SERIALIZED_NAME_RESOURCE)
+  private V1alpha1AclSpecResource resource;
+
+
+  public V1alpha1AclSpec method(MethodEnum method) {
+    
+    this.method = method;
+    return this;
+  }
+
+   /**
+   * The resource access method.
+   * @return method
+  **/
+  @ApiModelProperty(required = true, value = "The resource access method.")
+
+  public MethodEnum getMethod() {
+    return method;
+  }
+
+
+  public void setMethod(MethodEnum method) {
+    this.method = method;
+  }
+
+
+  public V1alpha1AclSpec principal(String principal) {
+    
+    this.principal = principal;
+    return this;
+  }
+
+   /**
+   * The principal being allowed access. Format depends on principal type.
+   * @return principal
+  **/
+  @ApiModelProperty(required = true, value = "The principal being allowed access. Format depends on principal type.")
+
+  public String getPrincipal() {
+    return principal;
+  }
+
+
+  public void setPrincipal(String principal) {
+    this.principal = principal;
+  }
+
+
+  public V1alpha1AclSpec resource(V1alpha1AclSpecResource resource) {
+    
+    this.resource = resource;
+    return this;
+  }
+
+   /**
+   * Get resource
+   * @return resource
+  **/
+  @ApiModelProperty(required = true, value = "")
+
+  public V1alpha1AclSpecResource getResource() {
+    return resource;
+  }
+
+
+  public void setResource(V1alpha1AclSpecResource resource) {
+    this.resource = resource;
+  }
+
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    V1alpha1AclSpec v1alpha1AclSpec = (V1alpha1AclSpec) o;
+    return Objects.equals(this.method, v1alpha1AclSpec.method) &&
+        Objects.equals(this.principal, v1alpha1AclSpec.principal) &&
+        Objects.equals(this.resource, v1alpha1AclSpec.resource);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(method, principal, resource);
+  }
+
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("class V1alpha1AclSpec {\n");
+    sb.append("    method: ").append(toIndentedString(method)).append("\n");
+    sb.append("    principal: ").append(toIndentedString(principal)).append("\n");
+    sb.append("    resource: ").append(toIndentedString(resource)).append("\n");
+    sb.append("}");
+    return sb.toString();
+  }
+
+  /**
+   * Convert the given object to string with each line indented by 4 spaces
+   * (except the first line).
+   */
+  private String toIndentedString(Object o) {
+    if (o == null) {
+      return "null";
+    }
+    return o.toString().replace("\n", "\n    ");
+  }
+
+}
+
diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionStatusKafkaTopicRef.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpecResource.java
similarity index 60%
rename from hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionStatusKafkaTopicRef.java
rename to hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpecResource.java
index 4ce3482..430e70a 100644
--- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionStatusKafkaTopicRef.java
+++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpecResource.java
@@ -25,28 +25,55 @@
 import java.io.IOException;
 
 /**
- * For KafkaTopic subscriptions, the KafkaTopic where the data can be consumed.
+ * The resource being controlled.
  */
-@ApiModel(description = "For KafkaTopic subscriptions, the KafkaTopic where the data can be consumed.")
-@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-04-15T21:24:30.587Z[Etc/UTC]")
-public class V1alpha1SubscriptionStatusKafkaTopicRef {
+@ApiModel(description = "The resource being controlled.")
+@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
+public class V1alpha1AclSpecResource {
+  public static final String SERIALIZED_NAME_KIND = "kind";
+  @SerializedName(SERIALIZED_NAME_KIND)
+  private String kind;
+
   public static final String SERIALIZED_NAME_NAME = "name";
   @SerializedName(SERIALIZED_NAME_NAME)
   private String name;
 
 
-  public V1alpha1SubscriptionStatusKafkaTopicRef name(String name) {
+  public V1alpha1AclSpecResource kind(String kind) {
+    
+    this.kind = kind;
+    return this;
+  }
+
+   /**
+   * The kind of resource being controlled.
+   * @return kind
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "The kind of resource being controlled.")
+
+  public String getKind() {
+    return kind;
+  }
+
+
+  public void setKind(String kind) {
+    this.kind = kind;
+  }
+
+
+  public V1alpha1AclSpecResource name(String name) {
     
     this.name = name;
     return this;
   }
 
    /**
-   * The name of the KafkaTopic rescource.
+   * The name of the resource being controlled.
    * @return name
   **/
   @javax.annotation.Nullable
-  @ApiModelProperty(value = "The name of the KafkaTopic rescource.")
+  @ApiModelProperty(value = "The name of the resource being controlled.")
 
   public String getName() {
     return name;
@@ -66,20 +93,22 @@ public boolean equals(Object o) {
     if (o == null || getClass() != o.getClass()) {
       return false;
     }
-    V1alpha1SubscriptionStatusKafkaTopicRef v1alpha1SubscriptionStatusKafkaTopicRef = (V1alpha1SubscriptionStatusKafkaTopicRef) o;
-    return Objects.equals(this.name, v1alpha1SubscriptionStatusKafkaTopicRef.name);
+    V1alpha1AclSpecResource v1alpha1AclSpecResource = (V1alpha1AclSpecResource) o;
+    return Objects.equals(this.kind, v1alpha1AclSpecResource.kind) &&
+        Objects.equals(this.name, v1alpha1AclSpecResource.name);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(name);
+    return Objects.hash(kind, name);
   }
 
 
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();
-    sb.append("class V1alpha1SubscriptionStatusKafkaTopicRef {\n");
+    sb.append("class V1alpha1AclSpecResource {\n");
+    sb.append("    kind: ").append(toIndentedString(kind)).append("\n");
     sb.append("    name: ").append(toIndentedString(name)).append("\n");
     sb.append("}");
     return sb.toString();
diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclStatus.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclStatus.java
new file mode 100644
index 0000000..9b38615
--- /dev/null
+++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclStatus.java
@@ -0,0 +1,129 @@
+/*
+ * Kubernetes
+ * No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)
+ *
+ * The version of the OpenAPI document: v1.21.1
+ * 
+ *
+ * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
+ * https://openapi-generator.tech
+ * Do not edit the class manually.
+ */
+
+
+package com.linkedin.hoptimator.models;
+
+import java.util.Objects;
+import java.util.Arrays;
+import com.google.gson.TypeAdapter;
+import com.google.gson.annotations.JsonAdapter;
+import com.google.gson.annotations.SerializedName;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import java.io.IOException;
+
+/**
+ * Status, as set by the operator.
+ */
+@ApiModel(description = "Status, as set by the operator.")
+@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
+public class V1alpha1AclStatus {
+  public static final String SERIALIZED_NAME_MESSAGE = "message";
+  @SerializedName(SERIALIZED_NAME_MESSAGE)
+  private String message;
+
+  public static final String SERIALIZED_NAME_READY = "ready";
+  @SerializedName(SERIALIZED_NAME_READY)
+  private Boolean ready;
+
+
+  public V1alpha1AclStatus message(String message) {
+    
+    this.message = message;
+    return this;
+  }
+
+   /**
+   * Human-readable status message.
+   * @return message
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "Human-readable status message.")
+
+  public String getMessage() {
+    return message;
+  }
+
+
+  public void setMessage(String message) {
+    this.message = message;
+  }
+
+
+  public V1alpha1AclStatus ready(Boolean ready) {
+    
+    this.ready = ready;
+    return this;
+  }
+
+   /**
+   * Whether the ACL rule has been applied.
+   * @return ready
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "Whether the ACL rule has been applied.")
+
+  public Boolean getReady() {
+    return ready;
+  }
+
+
+  public void setReady(Boolean ready) {
+    this.ready = ready;
+  }
+
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    V1alpha1AclStatus v1alpha1AclStatus = (V1alpha1AclStatus) o;
+    return Objects.equals(this.message, v1alpha1AclStatus.message) &&
+        Objects.equals(this.ready, v1alpha1AclStatus.ready);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(message, ready);
+  }
+
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("class V1alpha1AclStatus {\n");
+    sb.append("    message: ").append(toIndentedString(message)).append("\n");
+    sb.append("    ready: ").append(toIndentedString(ready)).append("\n");
+    sb.append("}");
+    return sb.toString();
+  }
+
+  /**
+   * Convert the given object to string with each line indented by 4 spaces
+   * (except the first line).
+   */
+  private String toIndentedString(Object o) {
+    if (o == null) {
+      return "null";
+    }
+    return o.toString().replace("\n", "\n    ");
+  }
+
+}
+
diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopic.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopic.java
index 51f9516..6838048 100644
--- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopic.java
+++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopic.java
@@ -31,7 +31,7 @@
  * Kafka Topic
  */
 @ApiModel(description = "Kafka Topic")
-@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-06-29T22:51:02.615Z[Etc/UTC]")
+@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
 public class V1alpha1KafkaTopic implements io.kubernetes.client.common.KubernetesObject {
   public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
   @SerializedName(SERIALIZED_NAME_API_VERSION)
diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicList.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicList.java
index cac3ee8..f4429ac 100644
--- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicList.java
+++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicList.java
@@ -32,7 +32,7 @@
  * KafkaTopicList is a list of KafkaTopic
  */
 @ApiModel(description = "KafkaTopicList is a list of KafkaTopic")
-@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-06-29T22:51:02.615Z[Etc/UTC]")
+@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
 public class V1alpha1KafkaTopicList implements io.kubernetes.client.common.KubernetesListObject {
   public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
   @SerializedName(SERIALIZED_NAME_API_VERSION)
diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpec.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpec.java
index 57c3eef..8a3588e 100644
--- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpec.java
+++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpec.java
@@ -33,7 +33,7 @@
  * Desired Kafka topic configuration.
  */
 @ApiModel(description = "Desired Kafka topic configuration.")
-@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-06-29T22:51:02.615Z[Etc/UTC]")
+@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
 public class V1alpha1KafkaTopicSpec {
   public static final String SERIALIZED_NAME_CLIENT_CONFIGS = "clientConfigs";
   @SerializedName(SERIALIZED_NAME_CLIENT_CONFIGS)
diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecClientConfigs.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecClientConfigs.java
index 1e9ba00..27536ba 100644
--- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecClientConfigs.java
+++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecClientConfigs.java
@@ -28,7 +28,7 @@
 /**
  * V1alpha1KafkaTopicSpecClientConfigs
  */
-@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-06-29T22:51:02.615Z[Etc/UTC]")
+@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
 public class V1alpha1KafkaTopicSpecClientConfigs {
   public static final String SERIALIZED_NAME_CONFIG_MAP_REF = "configMapRef";
   @SerializedName(SERIALIZED_NAME_CONFIG_MAP_REF)
diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecConfigMapRef.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecConfigMapRef.java
index 03fdb91..5e999aa 100644
--- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecConfigMapRef.java
+++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecConfigMapRef.java
@@ -28,7 +28,7 @@
  * Reference to a ConfigMap to use for AdminClient configuration.
  */
 @ApiModel(description = "Reference to a ConfigMap to use for AdminClient configuration.")
-@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-06-29T22:51:02.615Z[Etc/UTC]")
+@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
 public class V1alpha1KafkaTopicSpecConfigMapRef {
   public static final String SERIALIZED_NAME_NAME = "name";
   @SerializedName(SERIALIZED_NAME_NAME)
diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicStatus.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicStatus.java
index 3113c19..4cc046d 100644
--- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicStatus.java
+++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicStatus.java
@@ -28,7 +28,7 @@
  * Current state of the topic.
  */
 @ApiModel(description = "Current state of the topic.")
-@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-06-29T22:51:02.615Z[Etc/UTC]")
+@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
 public class V1alpha1KafkaTopicStatus {
   public static final String SERIALIZED_NAME_MESSAGE = "message";
   @SerializedName(SERIALIZED_NAME_MESSAGE)
diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Subscription.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Subscription.java
index 47ef54c..013d6f6 100644
--- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Subscription.java
+++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Subscription.java
@@ -31,7 +31,7 @@
  * Hoptimator Subscription
  */
 @ApiModel(description = "Hoptimator Subscription")
-@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-06-29T22:51:02.615Z[Etc/UTC]")
+@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
 public class V1alpha1Subscription implements io.kubernetes.client.common.KubernetesObject {
   public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
   @SerializedName(SERIALIZED_NAME_API_VERSION)
diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionList.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionList.java
index d81833f..85ca00b 100644
--- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionList.java
+++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionList.java
@@ -32,7 +32,7 @@
  * SubscriptionList is a list of Subscription
  */
 @ApiModel(description = "SubscriptionList is a list of Subscription")
-@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-06-29T22:51:02.615Z[Etc/UTC]")
+@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
 public class V1alpha1SubscriptionList implements io.kubernetes.client.common.KubernetesListObject {
   public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
   @SerializedName(SERIALIZED_NAME_API_VERSION)
diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionSpec.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionSpec.java
index 265b040..807b842 100644
--- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionSpec.java
+++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionSpec.java
@@ -28,7 +28,7 @@
  * Subscription spec
  */
 @ApiModel(description = "Subscription spec")
-@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-06-29T22:51:02.615Z[Etc/UTC]")
+@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
 public class V1alpha1SubscriptionSpec {
   public static final String SERIALIZED_NAME_DATABASE = "database";
   @SerializedName(SERIALIZED_NAME_DATABASE)
diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionStatus.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionStatus.java
index b13acec..0d2355f 100644
--- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionStatus.java
+++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionStatus.java
@@ -30,7 +30,7 @@
  * Filled in by the operator.
  */
 @ApiModel(description = "Filled in by the operator.")
-@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-06-29T22:51:02.615Z[Etc/UTC]")
+@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
 public class V1alpha1SubscriptionStatus {
   public static final String SERIALIZED_NAME_MESSAGE = "message";
   @SerializedName(SERIALIZED_NAME_MESSAGE)
diff --git a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java
index 918ca5c..9e8d731 100644
--- a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java
+++ b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java
@@ -144,7 +144,7 @@ Pipeline pipeline(V1alpha1Subscription object) throws Exception {
 
     // Create an output/sink table using the subscription name, and add it to the pipeline.
     HopTable sink = planner.database(database).makeTable(name, impl.rowType());
-    log.info("Implementing sink table {}.{} with {} resources.", database, name, sink.resources().size());
+    log.info("Implementing sink table {}.{} with {} resources.", database, name, sink.writeResources().size());
     impl.implement(sink);
 
     return impl.pipeline(sink);
diff --git a/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/Pipeline.java b/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/Pipeline.java
index 1c9c70d..c1bb9c1 100644
--- a/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/Pipeline.java
+++ b/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/Pipeline.java
@@ -51,6 +51,7 @@ public String mermaid() {
         String description = x.keys().stream()
           .filter(k2 -> x.property(k2) != null)
           .filter(k2 -> !x.property(k2).isEmpty())
+          .filter(k2 -> !"id".equals(k2))
           .map(k2 -> k2 + ": " + sanitize(x.property(k2)))
           .collect(Collectors.joining("\n"));
         sb.append("  " + id(x) + "[\"" + description + "\"]\n");
diff --git a/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/PipelineRel.java b/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/PipelineRel.java
index 878a740..5c9039d 100644
--- a/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/PipelineRel.java
+++ b/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/PipelineRel.java
@@ -84,7 +84,7 @@ public ScriptImplementor insertInto(HopTable sink) {
     /** Add any resources, SQL, DDL etc required to access the table. */
     public void implement(HopTable table) {
       script = script.database(table.database()).with(table);
-      table.resources().forEach(x -> resource(x));
+      table.readResources().forEach(x -> resource(x));
     }
 
     /** Combine SQL and any Resources into a Pipeline, using ANSI dialect */
@@ -96,7 +96,8 @@ public Pipeline pipeline(HopTable sink) {
     public Pipeline pipeline(HopTable sink, SqlDialect sqlDialect) {
       // We re-use ResourceProvider here for its source->sink relationships
       ResourceProvider resourceProvider = ResourceProvider.from(resources)
-        .to(new SqlJob(insertInto(sink).sql(sqlDialect)));
+        .to(new SqlJob(insertInto(sink).sql(sqlDialect)))
+        .toAll(x -> sink.writeResources());
 
       // All resources are now "provided", so we can pass null here:
       Collection resourcesAndJob = resourceProvider.resources(null);
diff --git a/test-model.yaml b/test-model.yaml
index 084ee11..df6c158 100644
--- a/test-model.yaml
+++ b/test-model.yaml
@@ -11,7 +11,7 @@ schemas:
   factory: com.linkedin.hoptimator.catalog.kafka.RawKafkaSchemaFactory
   operand:
     clientConfig:
-      bootstrap.servers: my-cluster-kafka-bootstrap.kafka.svc:9092
+      bootstrap.servers: one-kafka-bootstrap.kafka.svc:9092
       group.id: hoptimator-test
       auto.offset.reset: earliest