Skip to content

Commit

Permalink
Setup CI for adapter with service dependencies
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Paine <3105306+timkpaine@users.noreply.github.com>
  • Loading branch information
timkpaine committed May 3, 2024
1 parent a32cef3 commit 4eca056
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 106 deletions.
74 changes: 73 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,6 @@ jobs:
env:
CSP_TEST_SKIP_EXAMPLES: "1"


#################################
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~#
#~~~~~~~~~|##########|~~~~~~~~~~#
Expand Down Expand Up @@ -625,6 +624,79 @@ jobs:
run: make test
if: ${{ contains( 'numpy', matrix.package )}}

###########################
#~~~~~~~~~~~~~~~~~~~~~~~~~#
#~~~~~~|#############|~~~~#
#~~~~~~|#|~~~~~~~/##/~~~~~#
#~~~~~~|#|~~~~~/##/~~~~~~~#
#~~~~~~~~~~~~/##/~~~~~~~~~#
#~~~~~~~~~~/##/~~~~~~~~~~~#
#~~~~~~~~/##/~~~~~~~~~~~~~#
#~~~~~~/##/~~~~~~~~~~~~~~~#
#~~~~~~~~~~~~~~~~~~~~~~~~~#
# Test Service Adapters #
#~~~~~~~~~~~~~~~~~~~~~~~~~#
test_adapters:
needs:
- initialize
- build

strategy:
matrix:
os:
- ubuntu-20.04
python-version:
- 3.9
adapter:
- kafka

runs-on: ${{ matrix.os }}

steps:
- name: Checkout
uses: actions/checkout@v4
with:
submodules: recursive

- name: Set up Python ${{ matrix.python-version }}
uses: ./.github/actions/setup-python
with:
version: '${{ matrix.python-version }}'

- name: Install python dependencies
run: make requirements

- name: Install test dependencies
shell: bash
run: sudo apt-get install graphviz

# Download artifact
- name: Download wheel
uses: actions/download-artifact@v4
with:
name: csp-dist-${{ runner.os }}-${{ runner.arch }}-${{ matrix.python-version }}

- name: Install wheel
run: python -m pip install -U *manylinux2014*.whl --target .

- name: Spin up adapter service
run: make dockerup ADAPTER=${{ matrix.adapter }} DOCKERARGS="--wait --wait-timeout 30"

- name: Wait a few seconds after images have been spun up
run: sleep 30

# Run tests
- name: Setup test flags
shell: bash
run: echo "CSP_TEST_$( echo ${{ matrix.adapter }} | awk '{print toupper($0)}' )=1" >> $GITHUB_ENV

- name: Python Test Steps
run: make test TEST_ARGS="-k ${{ matrix.adapter }}"

- name: Spin down adapter service
run: make dockerdown ADAPTER=${{ matrix.adapter }}
if: ${{ always() }}

#############################
#~~~~~~~~~~~~~~~~~~~~~~~~~~~#
#~~~~~~|#############|~~~~~~#
Expand Down
9 changes: 5 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -98,21 +98,22 @@ tests: test

.PHONY: dockerup dockerps dockerdown initpodmanmac
ADAPTER := kafka
DOCKER := podman
DOCKER_COMPOSE := docker compose # or podman-compose
DOCKERARGS :=

initpodmanmac:
podman machine stop
podman machine set --cpus 4 --memory 8096
podman machine start

dockerup: ## spin up docker compose services for adapter testing
$(DOCKER) compose -f ci/$(ADAPTER)/docker-compose.yml up -d
$(DOCKER_COMPOSE) -f ci/$(ADAPTER)/docker-compose.yml up -d $(DOCKERARGS)

dockerps: ## spin up docker compose services for adapter testing
$(DOCKER) compose -f ci/$(ADAPTER)/docker-compose.yml ps
$(DOCKER_COMPOSE) -f ci/$(ADAPTER)/docker-compose.yml ps

dockerdown: ## spin up docker compose services for adapter testing
$(DOCKER) compose -f ci/$(ADAPTER)/docker-compose.yml down
$(DOCKER_COMPOSE) -f ci/$(ADAPTER)/docker-compose.yml down

###########
# VERSION #
Expand Down
104 changes: 25 additions & 79 deletions ci/kafka/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,84 +87,6 @@ services:
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR

control-center:
image: confluentinc/cp-enterprise-control-center:7.5.3
hostname: control-center
container_name: control-center
depends_on:
- broker
- schema-registry
- connect
- ksqldb-server
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021

ksqldb-server:
image: confluentinc/cp-ksqldb-server:7.5.3
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- broker
- connect
ports:
- "8088:8088"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
KSQL_BOOTSTRAP_SERVERS: "broker:29092"
KSQL_HOST_NAME: ksqldb-server
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
KSQL_KSQL_CONNECT_URL: "http://connect:8083"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'

# ksqldb-cli:
# image: confluentinc/cp-ksqldb-cli:7.5.3
# container_name: ksqldb-cli
# depends_on:
# - broker
# - connect
# - ksqldb-server
# entrypoint: /bin/sh
# tty: true

# ksql-datagen:
# image: confluentinc/ksqldb-examples:7.5.3
# hostname: ksql-datagen
# container_name: ksql-datagen
# depends_on:
# - ksqldb-server
# - broker
# - schema-registry
# - connect
# command: "bash -c 'echo Waiting for Kafka to be ready... && \
# cub kafka-ready -b broker:29092 1 40 && \
# echo Waiting for Confluent Schema Registry to be ready... && \
# cub sr-ready schema-registry 8081 40 && \
# echo Waiting a few seconds for topic creation to finish... && \
# sleep 11 && \
# tail -f /dev/null'"
# environment:
# KSQL_CONFIG_DIR: "/etc/ksql"
# STREAMS_BOOTSTRAP_SERVERS: broker:29092
# STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
# STREAMS_SCHEMA_REGISTRY_PORT: 8081

rest-proxy:
image: confluentinc/cp-kafka-rest:7.5.3
depends_on:
Expand All @@ -178,4 +100,28 @@ services:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'

# Uncomment for a helpful UI
# control-center:
# image: confluentinc/cp-enterprise-control-center:7.5.3
# hostname: control-center
# container_name: control-center
# depends_on:
# - broker
# - schema-registry
# - connect
# ports:
# - "9021:9021"
# environment:
# CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
# CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
# CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
# CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
# CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
# CONTROL_CENTER_REPLICATION_FACTOR: 1
# CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
# CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
# CONFLUENT_METRICS_TOPIC_REPLICATION: 1
# PORT: 9021

2 changes: 1 addition & 1 deletion csp/adapters/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def __init__(

consumer_properties = {
"group.id": group_id,
# To get end of parition notification for live / not live flag
# To get end of partition notification for live / not live flag
"enable.partition.eof": "true",
}

Expand Down
2 changes: 1 addition & 1 deletion csp/tests/adapters/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ def kafkabroker():
def kafkaadapter(kafkabroker):
group_id = "group.id123"
_kafkaadapter = KafkaAdapterManager(
broker=kafkabroker, group_id=group_id, rd_kafka_conf_options={"allow.auto.create.topics": "true"}
broker=kafkabroker, group_id=group_id
)
return _kafkaadapter
44 changes: 25 additions & 19 deletions csp/tests/adapters/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,10 @@ def graph(count: int):
}

topic = f"test.metadata.{os.getpid()}"
_precreate_topic(topic)
subKey = "foo"
pubKey = ["mapped_a", "mapped_b", "mapped_c"]

c = csp.count(csp.timer(timedelta(seconds=0.1)))
c = csp.count(csp.timer(timedelta(seconds=1)))
t = csp.sample(c, csp.const("foo"))

pubStruct = MetaPubData.collectts(
Expand All @@ -104,22 +103,27 @@ def graph(count: int):
)

csp.add_graph_output("sub_data", sub_data)
# csp.print('sub', sub_data)
csp.print('sub', sub_data)
# Wait for at least count ticks and until we get a live tick
done_flag = csp.count(sub_data) >= count
done_flag = csp.and_(done_flag, sub_data.mapped_live is True)
done_flag = csp.and_(csp.count(sub_data) >= count, sub_data.mapped_live == True) # noqa: E712
stop = csp.filter(done_flag, done_flag)
csp.stop_engine(stop)

count = 5
results = csp.run(graph, count, starttime=datetime.utcnow(), endtime=timedelta(seconds=30), realtime=True)
# warm up the topic
results = csp.run(graph, 1, starttime=datetime.utcnow(), endtime=timedelta(seconds=3), realtime=True)

# now send some live in
results = csp.run(graph, 5, starttime=datetime.utcnow(), endtime=timedelta(seconds=20), realtime=True)
assert len(results["sub_data"]) >= 5
print(results)
for result in results["sub_data"]:
assert result[1].mapped_partition >= 0
assert result[1].mapped_offset >= 0
assert result[1].mapped_live is not None
assert result[1].mapped_timestamp < datetime.utcnow()
# first record should be non live
assert results["sub_data"][0][1].mapped_live is False
# last record should be live
assert results["sub_data"][-1][1].mapped_live

@pytest.mark.skipif(not os.environ.get("CSP_TEST_KAFKA"), reason="Skipping kafka adapter tests")
Expand All @@ -145,8 +149,7 @@ def graph(symbols: list, count: int):
struct_field_map = {"b": "b2", "i": "i2", "d": "d2", "s": "s2", "dt": "dt2"}

done_flags = []
topic = f"mktdata.{os.getpid()}"
_precreate_topic(topic)

for symbol in symbols:
kafkaadapter.publish(msg_mapper, topic, symbol, b, field_map="b")
kafkaadapter.publish(msg_mapper, topic, symbol, i, field_map="i")
Expand Down Expand Up @@ -183,10 +186,12 @@ def graph(symbols: list, count: int):
stop = csp.filter(stop, stop)
csp.stop_engine(stop)

topic = f"mktdata.{os.getpid()}"
_precreate_topic(topic)
symbols = ["AAPL", "MSFT"]
count = 100
results = csp.run(
graph, symbols, count, starttime=datetime.utcnow(), endtime=timedelta(seconds=30), realtime=True
graph, symbols, count, starttime=datetime.utcnow(), endtime=timedelta(seconds=10), realtime=True
)
for symbol in symbols:
pub = results[f"pall_{symbol}"]
Expand All @@ -212,7 +217,7 @@ def pub_graph():
csp.stop_engine(stop)
# csp.print('pub', struct)

csp.run(pub_graph, starttime=datetime.utcnow(), endtime=timedelta(seconds=30), realtime=True)
csp.run(pub_graph, starttime=datetime.utcnow(), endtime=timedelta(seconds=10), realtime=True)

# grab start/end times
def get_times_graph():
Expand All @@ -232,7 +237,7 @@ def get_times_graph():
# csp.print('sub', data)
# csp.print('status', kafkaadapter.status())

all_data = csp.run(get_times_graph, starttime=datetime.utcnow(), endtime=timedelta(seconds=30), realtime=True)[
all_data = csp.run(get_times_graph, starttime=datetime.utcnow(), endtime=timedelta(seconds=10), realtime=True)[
"data"
]
min_time = all_data[0][1].dt
Expand All @@ -258,7 +263,7 @@ def get_data(start_offset, expected_count):
KafkaStartOffset.EARLIEST,
10,
starttime=datetime.utcnow(),
endtime=timedelta(seconds=30),
endtime=timedelta(seconds=10),
realtime=True,
)["data"]
# print(res)
Expand All @@ -276,7 +281,7 @@ def get_data(start_offset, expected_count):
assert len(res) == 0

res = csp.run(
get_data, KafkaStartOffset.START_TIME, 10, starttime=min_time, endtime=timedelta(seconds=30), realtime=True
get_data, KafkaStartOffset.START_TIME, 10, starttime=min_time, endtime=timedelta(seconds=10), realtime=True
)["data"]
assert len(res) == 10

Expand All @@ -287,12 +292,12 @@ def get_data(start_offset, expected_count):
stime = all_data[2][1].dt + timedelta(milliseconds=1)
expected = [x for x in all_data if x[1].dt >= stime]
res = csp.run(
get_data, stime, len(expected), starttime=datetime.utcnow(), endtime=timedelta(seconds=30), realtime=True
get_data, stime, len(expected), starttime=datetime.utcnow(), endtime=timedelta(seconds=10), realtime=True
)["data"]
assert len(res) == len(expected)

res = csp.run(
get_data, timedelta(seconds=0), len(expected), starttime=stime, endtime=timedelta(seconds=30), realtime=True
get_data, timedelta(seconds=0), len(expected), starttime=stime, endtime=timedelta(seconds=10), realtime=True
)["data"]
assert len(res) == len(expected)

Expand All @@ -314,8 +319,6 @@ def graph(symbols: list, count: int):
msg_mapper = RawBytesMessageMapper()

done_flags = []
topic = f"test_str.{os.getpid()}"
_precreate_topic(topic)
for symbol in symbols:
topic = f"test_str.{os.getpid()}"
kafkaadapter.publish(msg_mapper, topic, symbol, d)
Expand Down Expand Up @@ -356,10 +359,13 @@ def graph(symbols: list, count: int):
stop = csp.filter(stop, stop)
csp.stop_engine(stop)

topic = f"test_str.{os.getpid()}"
_precreate_topic(topic)

symbols = ["AAPL", "MSFT"]
count = 10
results = csp.run(
graph, symbols, count, starttime=datetime.utcnow(), endtime=timedelta(seconds=30), realtime=True
graph, symbols, count, starttime=datetime.utcnow(), endtime=timedelta(seconds=10), realtime=True
)
# print(results)
for symbol in symbols:
Expand Down
Loading

0 comments on commit 4eca056

Please sign in to comment.