Skip to content

Commit

Permalink
Enable adapters to specify read vs write resources (#33)
Browse files Browse the repository at this point in the history
* Add !mermaid command

* Add read/write Resources and Kafka ACL controller

* Add insert-into syntax to yaml and mermaid commands

* Tweak Acl CRD

* Fix ACL template

* License select files under Apache 2

* Ignore case for CLI commands, h/t @hshukla

* Javadocs on public Resources, h/t @hshukla
  • Loading branch information
ryannedolan authored Jul 21, 2023
1 parent c064a16 commit da4bdb0
Show file tree
Hide file tree
Showing 42 changed files with 1,474 additions and 140 deletions.
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ deploy-dev-environment:
helm upgrade --install --atomic --set webhook.create=false flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator
kubectl apply -f "https://strimzi.io/install/latest?namespace=kafka" -n kafka
kubectl wait --for=condition=Established=True crds/kafkas.kafka.strimzi.io
kubectl apply -f "https://strimzi.io/examples/latest/kafka/kafka-ephemeral-single.yaml" -n kafka
kubectl apply -f ./deploy/dev

deploy-samples: deploy
Expand Down
105 changes: 105 additions & 0 deletions deploy/acls.crd.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: acls.hoptimator.linkedin.com
spec:
group: hoptimator.linkedin.com
names:
kind: Acl
listKind: AclList
plural: acls
singular: acl
shortNames:
- acl
preserveUnknownFields: false
scope: Namespaced
versions:
- name: v1alpha1
served: true
storage: true
schema:
openAPIV3Schema:
description: Access control rule (colloquially, an Acl)
type: object
properties:
apiVersion:
type: string
kind:
type: string
metadata:
type: object
spec:
description: A set of related ACL rules.
type: object
properties:
resource:
description: The resource being controlled.
type: object
properties:
kind:
description: The kind of resource being controlled.
type: string
name:
description: The name of the resource being controlled.
type: string
method:
description: The resource access method.
type: string
enum:
- Alter
- Create
- Delete
- Describe
- Read
- Write
- Post
- Put
- Get
- Head
- Patch
- Trace
- Options
- GetAll
- BatchGet
- BatchCreate
- BatchUpdate
- PartialUpdate
- BatchDelete
- BatchPartialDelete
principal:
description: The principal being allowed access. Format depends on principal type.
type: string
required:
- resource
- method
- principal
status:
description: Status, as set by the operator.
type: object
properties:
ready:
description: Whether the ACL rule has been applied.
type: boolean
message:
description: Human-readable status message.
type: string
subresources:
status: {}
additionalPrinterColumns:
- name: PRINCIPAL
type: string
description: A user, service, group, etc.
jsonPath: .spec.principal
- name: METHOD
type: string
description: A resource access method, e.g. Get.
jsonPath: .spec.method
- name: KIND
type: string
description: The resource being accessed.
jsonPath: .spec.resource.kind
- name: RESOURCE
type: string
description: The resource being accessed.
jsonPath: .spec.resource.name

52 changes: 52 additions & 0 deletions deploy/dev/kafka.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Copyright (c) 2023, LinkedIn
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Based on examples at:
# https://github.com/strimzi/strimzi-kafka-operator/blob/main/examples/kafka


apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: one
namespace: kafka
spec:
kafka:
version: 3.4.0
replicas: 1
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
default.replication.factor: 1
min.insync.replicas: 1
inter.broker.protocol.version: "3.4"
allow.everyone.if.no.acl.found: true
storage:
type: ephemeral
authorization:
type: simple
zookeeper:
replicas: 3
storage:
type: ephemeral
18 changes: 18 additions & 0 deletions deploy/dev/mysql.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
# Copyright (c) 2023, LinkedIn
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Based on examples at:
# https://debezium.io/documentation/reference/stable/operations/kubernetes.html


apiVersion: v1
kind: Service
metadata:
Expand Down
6 changes: 3 additions & 3 deletions deploy/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ metadata:
name: hoptimator-operator
rules:
- apiGroups: ["hoptimator.linkedin.com"]
resources: ["kafkatopics", "subscriptions"]
verbs: ["get", "watch", "list", "create"]
resources: ["acls", "kafkatopics", "subscriptions"]
verbs: ["get", "watch", "list", "update", "create"]
- apiGroups: ["hoptimator.linkedin.com"]
resources: ["kafkatopics/status", "subscriptions/status"]
resources: ["kafkatopics/status", "subscriptions/status", "acls/status"]
verbs: ["get", "patch"]
- apiGroups: ["flink.apache.org"]
resources: ["flinkdeployments"]
Expand Down
24 changes: 24 additions & 0 deletions deploy/samples/acls.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
apiVersion: hoptimator.linkedin.com/v1alpha1
kind: Acl
metadata:
name: sample-kafka-acl-1
spec:
resource:
kind: KafkaTopic
name: test-sink
method: Read
principal: User:ANONYMOUS

---

apiVersion: hoptimator.linkedin.com/v1alpha1
kind: Acl
metadata:
name: sample-kafka-acl-2
spec:
resource:
kind: KafkaTopic
name: test-sink
method: Write
principal: User:ANONYMOUS

2 changes: 1 addition & 1 deletion deploy/samples/kafkatopics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ metadata:
spec:
topicName: test-sink
clientOverrides:
bootstrap.servers: my-cluster-kafka-bootstrap.kafka.svc:9092
bootstrap.servers: one-kafka-bootstrap.kafka.svc:9092

5 changes: 4 additions & 1 deletion etc/integration-tests.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

!set maxWidth 80
!table
!schemas
!table

-- built-in bounded tables
SELECT * FROM DATAGEN.PERSON;
Expand All @@ -20,3 +20,6 @@ SELECT * FROM RAWKAFKA."products" LIMIT 1;
!insert into RAWKAFKA."test-sink" SELECT AGE AS PAYLOAD, NAME AS KEY FROM DATAGEN.PERSON
SELECT * FROM RAWKAFKA."test-sink" LIMIT 5;

-- test mermaid and yaml commands
!mermaid insert into RAWKAFKA."test-sink" SELECT AGE AS PAYLOAD, NAME AS KEY FROM DATAGEN.PERSON
!yaml insert into RAWKAFKA."test-sink" SELECT AGE AS PAYLOAD, NAME AS KEY FROM DATAGEN.PERSON
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,26 @@ public Database(String name, TableLister tableLister, TableResolver tableResolve
}

/** Convenience constructor for simple connector-based tables */
public Database(String name, TableLister tableLister, TableResolver tableResolver, ConfigProvider configProvider) {
this(name, tableLister, tableResolver, TableFactory.connector(configProvider));
public Database(String name, TableLister lister, TableResolver resolver,
ConfigProvider configs) {
this(name, lister, resolver, TableFactory.connector(configs));
}

/** Convenience constructor for simple connector-based tables */
public Database(String name, TableLister tableLister, TableResolver tableResolver, ConfigProvider configProvider, ResourceProvider resourceProvider) {
this(name, tableLister, tableResolver, TableFactory.connector(configProvider, resourceProvider));
/** Convenience constructor for simple connector-based tables with resources */
public Database(String name, TableLister lister, TableResolver resolver,
ConfigProvider configs, ResourceProvider resources) {
this(name, lister, resolver, TableFactory.connector(configs, resources));
}

/** Convenience constructor for a list of connector-based tables */
public Database(String name, Collection<String> tables, TableResolver tableResolver, ConfigProvider configProvider) {
this(name, tables, tableResolver, TableFactory.connector(configProvider));
public Database(String name, Collection<String> tables, TableResolver resolver,
ConfigProvider configs) {
this(name, tables, resolver, TableFactory.connector(configs));
}

/** Convenience constructor for a static list of tables */
public Database(String name, Collection<String> tables, TableResolver tableResolver, TableFactory tableFactory) {
this(name, () -> tables, tableResolver, tableFactory);
public Database(String name, Collection<String> tables, TableResolver resolver, TableFactory tableFactory) {
this(name, () -> tables, resolver, tableFactory);
}

/** Convenience constructor for a static table map */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,36 @@ public class HopTable extends AbstractTable implements ScriptImplementor, Transl
private final String database;
private final String name;
private final RelDataType rowType;
private final Collection<Resource> resources;
private final Collection<Resource> readResources;
private final Collection<Resource> writeResources;
private final ScriptImplementor implementor;

public HopTable(String database, String name, RelDataType rowType, Collection<Resource> resources,
public HopTable(String database, String name, RelDataType rowType,
Collection<Resource> readResources, Collection<Resource> writeResources,
ScriptImplementor implementor) {
this.database = database;
this.name = name;
this.rowType = rowType;
this.resources = resources;
this.readResources = readResources;
this.writeResources = writeResources;
this.implementor = implementor;
}

/** Convenience constructor for HopTables that only need a connector config. */
public HopTable(String database, String name, RelDataType rowType, Collection<Resource> resources,
public HopTable(String database, String name, RelDataType rowType,
Collection<Resource> readResources, Collection<Resource> writeResources,
Map<String, String> connectorConfig) {
this(database, name, rowType, resources,
this(database, name, rowType, readResources, writeResources,
new ScriptImplementor.ConnectorImplementor(database, name, rowType, connectorConfig));
}

/** Convenience constructor for HopTables that only need a connector config. */
public HopTable(String database, String name, RelDataType rowType, Collection<Resource> resources,
Map<String, String> connectorConfig) {
this(database, name, rowType, resources, resources, connectorConfig);
}


/** Convenience constructor for HopTables that only need a connector config. */
public HopTable(String database, String name, RelDataType rowType,
Map<String, String> connectorConfig) {
Expand All @@ -64,8 +75,14 @@ public RelDataType rowType() {
return rowType;
}

public Collection<Resource> resources() {
return resources;
/** Resources needed when reading from the table */
public Collection<Resource> readResources() {
return readResources;
}

/** Resources needed when writing to the table */
public Collection<Resource> writeResources() {
return writeResources;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ public abstract class Resource {
private final SortedMap<String, Supplier<String>> properties = new TreeMap<>();
private final List<Resource> inputs = new ArrayList<>();

/** A Resource which should be rendered with the given template */
public Resource(String template) {
this.template = template;
export("id", () -> id());
}

/** Copy constructor */
Expand All @@ -48,10 +50,16 @@ public Resource(Resource other) {
this.inputs.addAll(other.inputs);
}

/** The name of the template to render this Resource with */
public String template() {
return template;
}

/** A reasonably unique ID, based on a hash of the exported properties. */
public String id() {
return Integer.toHexString(hashCode());
}

/** Export a computed value to the template */
protected void export(String key, Supplier<String> supplier) {
properties.put(key, supplier);
Expand Down Expand Up @@ -110,13 +118,18 @@ public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("[ template: " + template() + " ");
for (Map.Entry<String, Supplier<String>> entry : properties.entrySet()) {
if (entry.getKey().equals("id")) {
// special case for "id" to avoid recursion
continue;
}
String value = entry.getValue().get();
if (value != null && !value.isEmpty()) {
sb.append(entry.getKey());
sb.append(":");
sb.append(entry.getValue().get());
sb.append(" ");
if (value == null || value.isEmpty()) {
continue;
}
sb.append(entry.getKey());
sb.append(":");
sb.append(entry.getValue().get());
sb.append(" ");
}
sb.append("]");
return sb.toString();
Expand Down
Loading

0 comments on commit da4bdb0

Please sign in to comment.