diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 69697d244..c5f9d7878 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -556,7 +556,6 @@ jobs: env: CSP_TEST_SKIP_EXAMPLES: "1" - ################################# #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~# #~~~~~~~~~|##########|~~~~~~~~~~# @@ -625,6 +624,79 @@ jobs: run: make test if: ${{ contains( 'numpy', matrix.package )}} + ########################### + #~~~~~~~~~~~~~~~~~~~~~~~~~# + #~~~~~~|#############|~~~~# + #~~~~~~|#|~~~~~~~/##/~~~~~# + #~~~~~~|#|~~~~~/##/~~~~~~~# + #~~~~~~~~~~~~/##/~~~~~~~~~# + #~~~~~~~~~~/##/~~~~~~~~~~~# + #~~~~~~~~/##/~~~~~~~~~~~~~# + #~~~~~~/##/~~~~~~~~~~~~~~~# + #~~~~~~~~~~~~~~~~~~~~~~~~~# + # Test Service Adapters # + #~~~~~~~~~~~~~~~~~~~~~~~~~# + test_adapters: + needs: + - initialize + - build + + strategy: + matrix: + os: + - ubuntu-20.04 + python-version: + - 3.9 + adapter: + - kafka + + runs-on: ${{ matrix.os }} + + steps: + - name: Checkout + uses: actions/checkout@v4 + with: + submodules: recursive + + - name: Set up Python ${{ matrix.python-version }} + uses: ./.github/actions/setup-python + with: + version: '${{ matrix.python-version }}' + + - name: Install python dependencies + run: make requirements + + - name: Install test dependencies + shell: bash + run: sudo apt-get install graphviz + + # Download artifact + - name: Download wheel + uses: actions/download-artifact@v4 + with: + name: csp-dist-${{ runner.os }}-${{ runner.arch }}-${{ matrix.python-version }} + + - name: Install wheel + run: python -m pip install -U *manylinux2014*.whl --target . + + - name: Spin up adapter service + run: make dockerup ADAPTER=${{ matrix.adapter }} DOCKERARGS="--wait --wait-timeout 30" + + - name: Wait a few seconds after images have been spun up + run: sleep 30 + + # Run tests + - name: Setup test flags + shell: bash + run: echo "CSP_TEST_$( echo ${{ matrix.adapter }} | awk '{print toupper($0)}' )=1" >> $GITHUB_ENV + + - name: Python Test Steps + run: make test TEST_ARGS="-k ${{ matrix.adapter }}" + + - name: Spin down adapter service + run: make dockerdown ADAPTER=${{ matrix.adapter }} + if: ${{ always() }} + ############################# #~~~~~~~~~~~~~~~~~~~~~~~~~~~# #~~~~~~|#############|~~~~~~# diff --git a/Makefile b/Makefile index 2b0177ef3..e28922648 100644 --- a/Makefile +++ b/Makefile @@ -98,7 +98,8 @@ tests: test .PHONY: dockerup dockerps dockerdown initpodmanmac ADAPTER := kafka -DOCKER := podman +DOCKER_COMPOSE := docker compose # or podman-compose +DOCKERARGS := initpodmanmac: podman machine stop @@ -106,13 +107,13 @@ initpodmanmac: podman machine start dockerup: ## spin up docker compose services for adapter testing - $(DOCKER) compose -f ci/$(ADAPTER)/docker-compose.yml up -d + $(DOCKER_COMPOSE) -f ci/$(ADAPTER)/docker-compose.yml up -d $(DOCKERARGS) dockerps: ## spin up docker compose services for adapter testing - $(DOCKER) compose -f ci/$(ADAPTER)/docker-compose.yml ps + $(DOCKER_COMPOSE) -f ci/$(ADAPTER)/docker-compose.yml ps dockerdown: ## spin up docker compose services for adapter testing - $(DOCKER) compose -f ci/$(ADAPTER)/docker-compose.yml down + $(DOCKER_COMPOSE) -f ci/$(ADAPTER)/docker-compose.yml down ########### # VERSION # diff --git a/ci/kafka/docker-compose.yml b/ci/kafka/docker-compose.yml index d2674945f..e1e359800 100644 --- a/ci/kafka/docker-compose.yml +++ b/ci/kafka/docker-compose.yml @@ -87,84 +87,6 @@ services: CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components" CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR - control-center: - image: confluentinc/cp-enterprise-control-center:7.5.3 - hostname: control-center - container_name: control-center - depends_on: - - broker - - schema-registry - - connect - - ksqldb-server - ports: - - "9021:9021" - environment: - CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092' - CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083' - CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088" - CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088" - CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" - CONTROL_CENTER_REPLICATION_FACTOR: 1 - CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1 - CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1 - CONFLUENT_METRICS_TOPIC_REPLICATION: 1 - PORT: 9021 - - ksqldb-server: - image: confluentinc/cp-ksqldb-server:7.5.3 - hostname: ksqldb-server - container_name: ksqldb-server - depends_on: - - broker - - connect - ports: - - "8088:8088" - environment: - KSQL_CONFIG_DIR: "/etc/ksql" - KSQL_BOOTSTRAP_SERVERS: "broker:29092" - KSQL_HOST_NAME: ksqldb-server - KSQL_LISTENERS: "http://0.0.0.0:8088" - KSQL_CACHE_MAX_BYTES_BUFFERING: 0 - KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" - KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor" - KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor" - KSQL_KSQL_CONNECT_URL: "http://connect:8083" - KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1 - KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true' - KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true' - - # ksqldb-cli: - # image: confluentinc/cp-ksqldb-cli:7.5.3 - # container_name: ksqldb-cli - # depends_on: - # - broker - # - connect - # - ksqldb-server - # entrypoint: /bin/sh - # tty: true - - # ksql-datagen: - # image: confluentinc/ksqldb-examples:7.5.3 - # hostname: ksql-datagen - # container_name: ksql-datagen - # depends_on: - # - ksqldb-server - # - broker - # - schema-registry - # - connect - # command: "bash -c 'echo Waiting for Kafka to be ready... && \ - # cub kafka-ready -b broker:29092 1 40 && \ - # echo Waiting for Confluent Schema Registry to be ready... && \ - # cub sr-ready schema-registry 8081 40 && \ - # echo Waiting a few seconds for topic creation to finish... && \ - # sleep 11 && \ - # tail -f /dev/null'" - # environment: - # KSQL_CONFIG_DIR: "/etc/ksql" - # STREAMS_BOOTSTRAP_SERVERS: broker:29092 - # STREAMS_SCHEMA_REGISTRY_HOST: schema-registry - # STREAMS_SCHEMA_REGISTRY_PORT: 8081 - rest-proxy: image: confluentinc/cp-kafka-rest:7.5.3 depends_on: @@ -178,4 +100,28 @@ services: KAFKA_REST_HOST_NAME: rest-proxy KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092' KAFKA_REST_LISTENERS: "http://0.0.0.0:8082" - KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081' \ No newline at end of file + KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081' + + # Uncomment for a helpful UI + # control-center: + # image: confluentinc/cp-enterprise-control-center:7.5.3 + # hostname: control-center + # container_name: control-center + # depends_on: + # - broker + # - schema-registry + # - connect + # ports: + # - "9021:9021" + # environment: + # CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092' + # CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083' + # CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088" + # CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088" + # CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" + # CONTROL_CENTER_REPLICATION_FACTOR: 1 + # CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1 + # CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1 + # CONFLUENT_METRICS_TOPIC_REPLICATION: 1 + # PORT: 9021 + diff --git a/csp/adapters/kafka.py b/csp/adapters/kafka.py index 6871004e3..324a355fc 100644 --- a/csp/adapters/kafka.py +++ b/csp/adapters/kafka.py @@ -73,7 +73,7 @@ def __init__( consumer_properties = { "group.id": group_id, - # To get end of parition notification for live / not live flag + # To get end of partition notification for live / not live flag "enable.partition.eof": "true", } diff --git a/csp/tests/adapters/conftest.py b/csp/tests/adapters/conftest.py index 774295861..6a2ea1160 100644 --- a/csp/tests/adapters/conftest.py +++ b/csp/tests/adapters/conftest.py @@ -12,6 +12,6 @@ def kafkabroker(): def kafkaadapter(kafkabroker): group_id = "group.id123" _kafkaadapter = KafkaAdapterManager( - broker=kafkabroker, group_id=group_id, rd_kafka_conf_options={"allow.auto.create.topics": "true"} + broker=kafkabroker, group_id=group_id ) return _kafkaadapter diff --git a/csp/tests/adapters/test_kafka.py b/csp/tests/adapters/test_kafka.py index 36d67614c..f828f1df2 100644 --- a/csp/tests/adapters/test_kafka.py +++ b/csp/tests/adapters/test_kafka.py @@ -79,11 +79,10 @@ def graph(count: int): } topic = f"test.metadata.{os.getpid()}" - _precreate_topic(topic) subKey = "foo" pubKey = ["mapped_a", "mapped_b", "mapped_c"] - c = csp.count(csp.timer(timedelta(seconds=0.1))) + c = csp.count(csp.timer(timedelta(seconds=1))) t = csp.sample(c, csp.const("foo")) pubStruct = MetaPubData.collectts( @@ -104,15 +103,17 @@ def graph(count: int): ) csp.add_graph_output("sub_data", sub_data) - # csp.print('sub', sub_data) + csp.print('sub', sub_data) # Wait for at least count ticks and until we get a live tick - done_flag = csp.count(sub_data) >= count - done_flag = csp.and_(done_flag, sub_data.mapped_live is True) + done_flag = csp.and_(csp.count(sub_data) >= count, sub_data.mapped_live == True) # noqa: E712 stop = csp.filter(done_flag, done_flag) csp.stop_engine(stop) - count = 5 - results = csp.run(graph, count, starttime=datetime.utcnow(), endtime=timedelta(seconds=30), realtime=True) + # warm up the topic + results = csp.run(graph, 1, starttime=datetime.utcnow(), endtime=timedelta(seconds=3), realtime=True) + + # now send some live in + results = csp.run(graph, 5, starttime=datetime.utcnow(), endtime=timedelta(seconds=20), realtime=True) assert len(results["sub_data"]) >= 5 print(results) for result in results["sub_data"]: @@ -120,6 +121,9 @@ def graph(count: int): assert result[1].mapped_offset >= 0 assert result[1].mapped_live is not None assert result[1].mapped_timestamp < datetime.utcnow() + # first record should be non live + assert results["sub_data"][0][1].mapped_live is False + # last record should be live assert results["sub_data"][-1][1].mapped_live @pytest.mark.skipif(not os.environ.get("CSP_TEST_KAFKA"), reason="Skipping kafka adapter tests") @@ -145,8 +149,7 @@ def graph(symbols: list, count: int): struct_field_map = {"b": "b2", "i": "i2", "d": "d2", "s": "s2", "dt": "dt2"} done_flags = [] - topic = f"mktdata.{os.getpid()}" - _precreate_topic(topic) + for symbol in symbols: kafkaadapter.publish(msg_mapper, topic, symbol, b, field_map="b") kafkaadapter.publish(msg_mapper, topic, symbol, i, field_map="i") @@ -183,10 +186,12 @@ def graph(symbols: list, count: int): stop = csp.filter(stop, stop) csp.stop_engine(stop) + topic = f"mktdata.{os.getpid()}" + _precreate_topic(topic) symbols = ["AAPL", "MSFT"] count = 100 results = csp.run( - graph, symbols, count, starttime=datetime.utcnow(), endtime=timedelta(seconds=30), realtime=True + graph, symbols, count, starttime=datetime.utcnow(), endtime=timedelta(seconds=10), realtime=True ) for symbol in symbols: pub = results[f"pall_{symbol}"] @@ -212,7 +217,7 @@ def pub_graph(): csp.stop_engine(stop) # csp.print('pub', struct) - csp.run(pub_graph, starttime=datetime.utcnow(), endtime=timedelta(seconds=30), realtime=True) + csp.run(pub_graph, starttime=datetime.utcnow(), endtime=timedelta(seconds=10), realtime=True) # grab start/end times def get_times_graph(): @@ -232,7 +237,7 @@ def get_times_graph(): # csp.print('sub', data) # csp.print('status', kafkaadapter.status()) - all_data = csp.run(get_times_graph, starttime=datetime.utcnow(), endtime=timedelta(seconds=30), realtime=True)[ + all_data = csp.run(get_times_graph, starttime=datetime.utcnow(), endtime=timedelta(seconds=10), realtime=True)[ "data" ] min_time = all_data[0][1].dt @@ -258,7 +263,7 @@ def get_data(start_offset, expected_count): KafkaStartOffset.EARLIEST, 10, starttime=datetime.utcnow(), - endtime=timedelta(seconds=30), + endtime=timedelta(seconds=10), realtime=True, )["data"] # print(res) @@ -276,7 +281,7 @@ def get_data(start_offset, expected_count): assert len(res) == 0 res = csp.run( - get_data, KafkaStartOffset.START_TIME, 10, starttime=min_time, endtime=timedelta(seconds=30), realtime=True + get_data, KafkaStartOffset.START_TIME, 10, starttime=min_time, endtime=timedelta(seconds=10), realtime=True )["data"] assert len(res) == 10 @@ -287,12 +292,12 @@ def get_data(start_offset, expected_count): stime = all_data[2][1].dt + timedelta(milliseconds=1) expected = [x for x in all_data if x[1].dt >= stime] res = csp.run( - get_data, stime, len(expected), starttime=datetime.utcnow(), endtime=timedelta(seconds=30), realtime=True + get_data, stime, len(expected), starttime=datetime.utcnow(), endtime=timedelta(seconds=10), realtime=True )["data"] assert len(res) == len(expected) res = csp.run( - get_data, timedelta(seconds=0), len(expected), starttime=stime, endtime=timedelta(seconds=30), realtime=True + get_data, timedelta(seconds=0), len(expected), starttime=stime, endtime=timedelta(seconds=10), realtime=True )["data"] assert len(res) == len(expected) @@ -314,8 +319,6 @@ def graph(symbols: list, count: int): msg_mapper = RawBytesMessageMapper() done_flags = [] - topic = f"test_str.{os.getpid()}" - _precreate_topic(topic) for symbol in symbols: topic = f"test_str.{os.getpid()}" kafkaadapter.publish(msg_mapper, topic, symbol, d) @@ -356,10 +359,13 @@ def graph(symbols: list, count: int): stop = csp.filter(stop, stop) csp.stop_engine(stop) + topic = f"test_str.{os.getpid()}" + _precreate_topic(topic) + symbols = ["AAPL", "MSFT"] count = 10 results = csp.run( - graph, symbols, count, starttime=datetime.utcnow(), endtime=timedelta(seconds=30), realtime=True + graph, symbols, count, starttime=datetime.utcnow(), endtime=timedelta(seconds=10), realtime=True ) # print(results) for symbol in symbols: diff --git a/csp/tests/adapters/test_status.py b/csp/tests/adapters/test_status.py index 66cd41dd6..4b34be9cd 100644 --- a/csp/tests/adapters/test_status.py +++ b/csp/tests/adapters/test_status.py @@ -14,7 +14,7 @@ class SubData(csp.Struct): a: bool -class TestStatus: +class TestStatusKafka: @pytest.mark.skipif(not os.environ.get("CSP_TEST_KAFKA"), reason="Skipping kafka adapter tests") def test_basic(self, kafkaadapter): topic = f"csp.unittest.{os.getpid()}"