From 21229ed4e5b4b04a663b30e4fba838db7a92a19d Mon Sep 17 00:00:00 2001 From: Tim Paine <3105306+timkpaine@users.noreply.github.com> Date: Sat, 3 Feb 2024 15:58:28 -0500 Subject: [PATCH] Setup CI for adapter with service dependencies Signed-off-by: Tim Paine <3105306+timkpaine@users.noreply.github.com> --- .github/workflows/build.yml | 66 +++++++++- Makefile | 13 +- ci/kafka/docker-compose.yml | 193 ++++-------------------------- conda/dev-environment-unix.yml | 1 - conda/dev-environment-win.yml | 1 - csp/adapters/kafka.py | 14 +-- csp/tests/adapters/conftest.py | 5 +- csp/tests/adapters/kafka_utils.py | 23 ++-- csp/tests/adapters/test_kafka.py | 57 +++++---- csp/tests/adapters/test_status.py | 9 +- pyproject.toml | 2 - 11 files changed, 148 insertions(+), 236 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index eac564769..33f3ea913 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -636,7 +636,6 @@ jobs: env: CSP_TEST_SKIP_EXAMPLES: "1" - #################################################### #..................................................# #..|########|..|########|..../####\....|########|..# @@ -736,7 +735,68 @@ jobs: ########################################################################################################### # Test Service Adapters # ########################################################################################################### - # Coming soon! + test_adapters: + needs: + - initialize + - build + + strategy: + matrix: + os: + - ubuntu-24.04 + python-version: + - 3.11 + adapter: + - kafka + + runs-on: ${{ matrix.os }} + + steps: + - name: Checkout + uses: actions/checkout@v4 + with: + submodules: recursive + + - name: Set up Python ${{ matrix.python-version }} + uses: ./.github/actions/setup-python + with: + version: '${{ matrix.python-version }}' + + - name: Install python dependencies + run: make requirements + + - name: Install test dependencies + shell: bash + run: sudo apt-get install graphviz + + # Download artifact + - name: Download wheel + uses: actions/download-artifact@v4 + with: + name: csp-dist-${{ runner.os }}-${{ runner.arch }}-${{ matrix.python-version }} + + - name: Install wheel + run: | + python -m pip install -U *manylinux*.whl + python -m pip install -U --no-deps *manylinux*.whl --target . + + - name: Spin up adapter service + run: make dockerup ADAPTER=${{ matrix.adapter }} DOCKERARGS="--wait --wait-timeout 30" + + - name: Wait a few seconds after docker images have been spun up + run: sleep 30 + + # Run tests + - name: Setup test flags + shell: bash + run: echo "CSP_TEST_$( echo ${{ matrix.adapter }} | awk '{print toupper($0)}' )=1" >> $GITHUB_ENV + + - name: Python Test Steps + run: make test-py TEST_ARGS="-k ${{ matrix.adapter }}" + + - name: Spin down adapter service + run: make dockerdown ADAPTER=${{ matrix.adapter }} + if: ${{ always() }} ############################################################################################ #..........................................................................................# @@ -751,7 +811,6 @@ jobs: ############################################################################################ # Upload Release Artifacts # ############################################################################################ - # only publish artifacts on tags, but otherwise this always runs # Note this whole workflow only triggers on release tags (e.g. "v0.1.0") publish_release_artifacts: @@ -763,6 +822,7 @@ jobs: - test - test_sdist - test_dependencies + - test_adapters if: startsWith(github.ref, 'refs/tags/v') runs-on: ubuntu-24.04 diff --git a/Makefile b/Makefile index 6c8bf66f0..c572f7644 100644 --- a/Makefile +++ b/Makefile @@ -105,7 +105,8 @@ tests: test .PHONY: dockerup dockerps dockerdown initpodmanmac ADAPTER := kafka -DOCKER := podman +DOCKER_COMPOSE := docker compose # or podman-compose +DOCKERARGS := initpodmanmac: podman machine stop @@ -113,13 +114,13 @@ initpodmanmac: podman machine start dockerup: ## spin up docker compose services for adapter testing - $(DOCKER) compose -f ci/$(ADAPTER)/docker-compose.yml up -d + $(DOCKER_COMPOSE) -f ci/$(ADAPTER)/docker-compose.yml up -d $(DOCKERARGS) -dockerps: ## spin up docker compose services for adapter testing - $(DOCKER) compose -f ci/$(ADAPTER)/docker-compose.yml ps +dockerps: ## get status of current docker compose services + $(DOCKER_COMPOSE) -f ci/$(ADAPTER)/docker-compose.yml ps -dockerdown: ## spin up docker compose services for adapter testing - $(DOCKER) compose -f ci/$(ADAPTER)/docker-compose.yml down +dockerdown: ## spin down docker compose services for adapter testing + $(DOCKER_COMPOSE) -f ci/$(ADAPTER)/docker-compose.yml down ########### # VERSION # diff --git a/ci/kafka/docker-compose.yml b/ci/kafka/docker-compose.yml index d2674945f..7a0b99da9 100644 --- a/ci/kafka/docker-compose.yml +++ b/ci/kafka/docker-compose.yml @@ -1,181 +1,40 @@ -# https://docs.confluent.io/platform/current/platform-quickstart.html -# https://raw.githubusercontent.com/confluentinc/cp-all-in-one/7.5.3-post/cp-all-in-one-kraft/docker-compose.yml +# https://github.com/conduktor/kafka-stack-docker-compose --- -version: '2' +version: '2.1' + services: - zookeeper: - image: confluentinc/cp-zookeeper:7.5.3 - hostname: zookeeper - container_name: zookeeper + zoo1: + image: confluentinc/cp-zookeeper:7.3.2 + hostname: zoo1 + container_name: zoo1 ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 - ZOOKEEPER_TICK_TIME: 2000 + ZOOKEEPER_SERVER_ID: 1 + ZOOKEEPER_SERVERS: zoo1:2888:3888 - broker: - image: confluentinc/cp-server:7.5.3 - hostname: broker - container_name: broker - depends_on: - - zookeeper + kafka1: + image: confluentinc/cp-kafka:7.3.2 + hostname: kafka1 + container_name: kafka1 ports: - "9092:9092" - - "9101:9101" + - "29092:29092" + - "9999:9999" environment: + KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL + KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181" KAFKA_BROKER_ID: 1 - KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092 - KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter + KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 - KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1 - KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1 - KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 - KAFKA_JMX_PORT: 9101 - KAFKA_JMX_HOSTNAME: localhost - KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081 - CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092 - CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1 - CONFLUENT_METRICS_ENABLE: 'true' - CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous' - KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' - TOPIC_AUTO_CREATE: 'true' - - schema-registry: - image: confluentinc/cp-schema-registry:7.5.3 - hostname: schema-registry - container_name: schema-registry - depends_on: - - broker - ports: - - "8081:8081" - environment: - SCHEMA_REGISTRY_HOST_NAME: schema-registry - SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092' - SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 - - connect: - image: cnfldemos/cp-server-connect-datagen:0.6.2-7.5.0 - hostname: connect - container_name: connect - depends_on: - - broker - - schema-registry - ports: - - "8083:8083" - environment: - CONNECT_BOOTSTRAP_SERVERS: 'broker:29092' - CONNECT_REST_ADVERTISED_HOST_NAME: connect - CONNECT_GROUP_ID: compose-connect-group - CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs - CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 - CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000 - CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets - CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 - CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status - CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 - CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter - CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter - CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081 - # CLASSPATH required due to CC-2422 - CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.5.3.jar - CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor" - CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor" - CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components" - CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR - - control-center: - image: confluentinc/cp-enterprise-control-center:7.5.3 - hostname: control-center - container_name: control-center - depends_on: - - broker - - schema-registry - - connect - - ksqldb-server - ports: - - "9021:9021" - environment: - CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092' - CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083' - CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088" - CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088" - CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" - CONTROL_CENTER_REPLICATION_FACTOR: 1 - CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1 - CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1 - CONFLUENT_METRICS_TOPIC_REPLICATION: 1 - PORT: 9021 - - ksqldb-server: - image: confluentinc/cp-ksqldb-server:7.5.3 - hostname: ksqldb-server - container_name: ksqldb-server - depends_on: - - broker - - connect - ports: - - "8088:8088" - environment: - KSQL_CONFIG_DIR: "/etc/ksql" - KSQL_BOOTSTRAP_SERVERS: "broker:29092" - KSQL_HOST_NAME: ksqldb-server - KSQL_LISTENERS: "http://0.0.0.0:8088" - KSQL_CACHE_MAX_BYTES_BUFFERING: 0 - KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" - KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor" - KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor" - KSQL_KSQL_CONNECT_URL: "http://connect:8083" - KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1 - KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true' - KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true' - - # ksqldb-cli: - # image: confluentinc/cp-ksqldb-cli:7.5.3 - # container_name: ksqldb-cli - # depends_on: - # - broker - # - connect - # - ksqldb-server - # entrypoint: /bin/sh - # tty: true - - # ksql-datagen: - # image: confluentinc/ksqldb-examples:7.5.3 - # hostname: ksql-datagen - # container_name: ksql-datagen - # depends_on: - # - ksqldb-server - # - broker - # - schema-registry - # - connect - # command: "bash -c 'echo Waiting for Kafka to be ready... && \ - # cub kafka-ready -b broker:29092 1 40 && \ - # echo Waiting for Confluent Schema Registry to be ready... && \ - # cub sr-ready schema-registry 8081 40 && \ - # echo Waiting a few seconds for topic creation to finish... && \ - # sleep 11 && \ - # tail -f /dev/null'" - # environment: - # KSQL_CONFIG_DIR: "/etc/ksql" - # STREAMS_BOOTSTRAP_SERVERS: broker:29092 - # STREAMS_SCHEMA_REGISTRY_HOST: schema-registry - # STREAMS_SCHEMA_REGISTRY_PORT: 8081 - - rest-proxy: - image: confluentinc/cp-kafka-rest:7.5.3 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_JMX_PORT: 9999 + KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1} + KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer + KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true" depends_on: - - broker - - schema-registry - ports: - - 8082:8082 - hostname: rest-proxy - container_name: rest-proxy - environment: - KAFKA_REST_HOST_NAME: rest-proxy - KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092' - KAFKA_REST_LISTENERS: "http://0.0.0.0:8082" - KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081' \ No newline at end of file + - zoo1 diff --git a/conda/dev-environment-unix.yml b/conda/dev-environment-unix.yml index bacc1dfed..83a16cdc8 100644 --- a/conda/dev-environment-unix.yml +++ b/conda/dev-environment-unix.yml @@ -15,7 +15,6 @@ dependencies: - flex - graphviz - gtest - - httpx>=0.20,<1 - isort>=5,<6 - libarrow=16 - libboost>=1.80.0 diff --git a/conda/dev-environment-win.yml b/conda/dev-environment-win.yml index a27d172ce..add285e57 100644 --- a/conda/dev-environment-win.yml +++ b/conda/dev-environment-win.yml @@ -13,7 +13,6 @@ dependencies: - exprtk - graphviz - gtest - - httpx>=0.20,<1 - isort>=5,<6 - libarrow=16 - libboost>=1.80.0 diff --git a/csp/adapters/kafka.py b/csp/adapters/kafka.py index d2cf97f31..d3c34a241 100644 --- a/csp/adapters/kafka.py +++ b/csp/adapters/kafka.py @@ -6,18 +6,12 @@ import csp from csp import ts from csp.adapters.status import Status -from csp.adapters.utils import ( - BytesMessageProtoMapper, - DateTimeType, - JSONTextMessageMapper, - MsgMapper, - RawBytesMessageMapper, - RawTextMessageMapper, -) +from csp.adapters.utils import MsgMapper from csp.impl.wiring import input_adapter_def, output_adapter_def, status_adapter_def from csp.lib import _kafkaadapterimpl -_ = BytesMessageProtoMapper, DateTimeType, JSONTextMessageMapper, RawBytesMessageMapper, RawTextMessageMapper +__all__ = ("KafkaStatusMessageType", "KafkaStartOffset", "KafkaAdapterManager") + T = TypeVar("T") @@ -73,7 +67,7 @@ def __init__( consumer_properties = { "group.id": group_id, - # To get end of parition notification for live / not live flag + # To get end of partition notification for live / not live flag "enable.partition.eof": "true", } diff --git a/csp/tests/adapters/conftest.py b/csp/tests/adapters/conftest.py index 774295861..4fe5a7eaa 100644 --- a/csp/tests/adapters/conftest.py +++ b/csp/tests/adapters/conftest.py @@ -5,13 +5,12 @@ @pytest.fixture(scope="module", autouse=True) def kafkabroker(): + # Defined in ci/kafka/docker-compose.yml return "localhost:9092" @pytest.fixture(scope="module", autouse=True) def kafkaadapter(kafkabroker): group_id = "group.id123" - _kafkaadapter = KafkaAdapterManager( - broker=kafkabroker, group_id=group_id, rd_kafka_conf_options={"allow.auto.create.topics": "true"} - ) + _kafkaadapter = KafkaAdapterManager(broker=kafkabroker, group_id=group_id) return _kafkaadapter diff --git a/csp/tests/adapters/kafka_utils.py b/csp/tests/adapters/kafka_utils.py index 98544cd34..e4b1955b8 100644 --- a/csp/tests/adapters/kafka_utils.py +++ b/csp/tests/adapters/kafka_utils.py @@ -1,13 +1,16 @@ -import httpx +from datetime import datetime, timedelta +import csp +from csp.adapters.utils import DateTimeType, JSONTextMessageMapper -def _precreate_topic(topic): +__all__ = ("_precreate_topic",) + + +def _precreate_topic(adapter, topic): """Since we test against confluent kafka, just use the kafka rest addon""" - rest_broker = "http://localhost:8082" - cluster_info = httpx.get(f"{rest_broker}/v3/clusters") - cluster_id = cluster_info.json()["data"][0]["cluster_id"] - resp = httpx.post(f"{rest_broker}/v3/clusters/{cluster_id}/topics", json={"topic_name": topic}) - if resp.status_code != 201 and "already exists" not in resp.content.decode("utf8"): - raise Exception( - f"Could not create topic {topic} on cluster {rest_broker}/v3/clusters/{cluster_id}/topics - received {resp.content}" - ) + + def g(): + msg_mapper = JSONTextMessageMapper(datetime_type=DateTimeType.UINT64_MICROS) + adapter.publish(msg_mapper, topic, "foo", csp.const("test"), field_map="a") + + csp.run(g, starttime=datetime.utcnow(), endtime=timedelta(), realtime=True) diff --git a/csp/tests/adapters/test_kafka.py b/csp/tests/adapters/test_kafka.py index 36d67614c..334d6cb43 100644 --- a/csp/tests/adapters/test_kafka.py +++ b/csp/tests/adapters/test_kafka.py @@ -4,14 +4,8 @@ import csp from csp import ts -from csp.adapters.kafka import ( - DateTimeType, - JSONTextMessageMapper, - KafkaAdapterManager, - KafkaStartOffset, - RawBytesMessageMapper, - RawTextMessageMapper, -) +from csp.adapters.kafka import KafkaAdapterManager, KafkaStartOffset +from csp.adapters.utils import DateTimeType, JSONTextMessageMapper, RawBytesMessageMapper, RawTextMessageMapper from .kafka_utils import _precreate_topic @@ -64,6 +58,8 @@ class MetaSubData(csp.Struct): class TestKafka: @pytest.mark.skipif(not os.environ.get("CSP_TEST_KAFKA"), reason="Skipping kafka adapter tests") def test_metadata(self, kafkaadapter): + topic = f"test.metadata.{os.getpid()}" + def graph(count: int): msg_mapper = JSONTextMessageMapper(datetime_type=DateTimeType.UINT64_MICROS) @@ -78,12 +74,10 @@ def graph(count: int): "timestamp": "mapped_timestamp", } - topic = f"test.metadata.{os.getpid()}" - _precreate_topic(topic) subKey = "foo" pubKey = ["mapped_a", "mapped_b", "mapped_c"] - c = csp.count(csp.timer(timedelta(seconds=0.1))) + c = csp.count(csp.timer(timedelta(seconds=1))) t = csp.sample(c, csp.const("foo")) pubStruct = MetaPubData.collectts( @@ -104,15 +98,15 @@ def graph(count: int): ) csp.add_graph_output("sub_data", sub_data) - # csp.print('sub', sub_data) + # Wait for at least count ticks and until we get a live tick - done_flag = csp.count(sub_data) >= count - done_flag = csp.and_(done_flag, sub_data.mapped_live is True) + done_flag = csp.and_(csp.count(sub_data) >= count, sub_data.mapped_live == True) # noqa: E712 stop = csp.filter(done_flag, done_flag) csp.stop_engine(stop) - count = 5 - results = csp.run(graph, count, starttime=datetime.utcnow(), endtime=timedelta(seconds=30), realtime=True) + # warm up the topic + _precreate_topic(kafkaadapter, topic) + results = csp.run(graph, 5, starttime=datetime.utcnow(), endtime=timedelta(seconds=10), realtime=True) assert len(results["sub_data"]) >= 5 print(results) for result in results["sub_data"]: @@ -120,6 +114,9 @@ def graph(count: int): assert result[1].mapped_offset >= 0 assert result[1].mapped_live is not None assert result[1].mapped_timestamp < datetime.utcnow() + # first record should be non live + assert results["sub_data"][0][1].mapped_live is False + # last record should be live assert results["sub_data"][-1][1].mapped_live @pytest.mark.skipif(not os.environ.get("CSP_TEST_KAFKA"), reason="Skipping kafka adapter tests") @@ -145,8 +142,7 @@ def graph(symbols: list, count: int): struct_field_map = {"b": "b2", "i": "i2", "d": "d2", "s": "s2", "dt": "dt2"} done_flags = [] - topic = f"mktdata.{os.getpid()}" - _precreate_topic(topic) + for symbol in symbols: kafkaadapter.publish(msg_mapper, topic, symbol, b, field_map="b") kafkaadapter.publish(msg_mapper, topic, symbol, i, field_map="i") @@ -183,10 +179,12 @@ def graph(symbols: list, count: int): stop = csp.filter(stop, stop) csp.stop_engine(stop) + topic = f"mktdata.{os.getpid()}" + _precreate_topic(kafkaadapter, topic) symbols = ["AAPL", "MSFT"] count = 100 results = csp.run( - graph, symbols, count, starttime=datetime.utcnow(), endtime=timedelta(seconds=30), realtime=True + graph, symbols, count, starttime=datetime.utcnow(), endtime=timedelta(seconds=10), realtime=True ) for symbol in symbols: pub = results[f"pall_{symbol}"] @@ -198,7 +196,7 @@ def graph(symbols: list, count: int): @pytest.mark.skipif(not os.environ.get("CSP_TEST_KAFKA"), reason="Skipping kafka adapter tests") def test_start_offsets(self, kafkaadapter, kafkabroker): topic = f"test_start_offsets.{os.getpid()}" - _precreate_topic(topic) + _precreate_topic(kafkaadapter, topic) msg_mapper = JSONTextMessageMapper(datetime_type=DateTimeType.UINT64_MICROS) count = 10 @@ -212,7 +210,7 @@ def pub_graph(): csp.stop_engine(stop) # csp.print('pub', struct) - csp.run(pub_graph, starttime=datetime.utcnow(), endtime=timedelta(seconds=30), realtime=True) + csp.run(pub_graph, starttime=datetime.utcnow(), endtime=timedelta(seconds=10), realtime=True) # grab start/end times def get_times_graph(): @@ -232,7 +230,7 @@ def get_times_graph(): # csp.print('sub', data) # csp.print('status', kafkaadapter.status()) - all_data = csp.run(get_times_graph, starttime=datetime.utcnow(), endtime=timedelta(seconds=30), realtime=True)[ + all_data = csp.run(get_times_graph, starttime=datetime.utcnow(), endtime=timedelta(seconds=10), realtime=True)[ "data" ] min_time = all_data[0][1].dt @@ -258,7 +256,7 @@ def get_data(start_offset, expected_count): KafkaStartOffset.EARLIEST, 10, starttime=datetime.utcnow(), - endtime=timedelta(seconds=30), + endtime=timedelta(seconds=10), realtime=True, )["data"] # print(res) @@ -276,7 +274,7 @@ def get_data(start_offset, expected_count): assert len(res) == 0 res = csp.run( - get_data, KafkaStartOffset.START_TIME, 10, starttime=min_time, endtime=timedelta(seconds=30), realtime=True + get_data, KafkaStartOffset.START_TIME, 10, starttime=min_time, endtime=timedelta(seconds=10), realtime=True )["data"] assert len(res) == 10 @@ -287,12 +285,12 @@ def get_data(start_offset, expected_count): stime = all_data[2][1].dt + timedelta(milliseconds=1) expected = [x for x in all_data if x[1].dt >= stime] res = csp.run( - get_data, stime, len(expected), starttime=datetime.utcnow(), endtime=timedelta(seconds=30), realtime=True + get_data, stime, len(expected), starttime=datetime.utcnow(), endtime=timedelta(seconds=10), realtime=True )["data"] assert len(res) == len(expected) res = csp.run( - get_data, timedelta(seconds=0), len(expected), starttime=stime, endtime=timedelta(seconds=30), realtime=True + get_data, timedelta(seconds=0), len(expected), starttime=stime, endtime=timedelta(seconds=10), realtime=True )["data"] assert len(res) == len(expected) @@ -314,8 +312,6 @@ def graph(symbols: list, count: int): msg_mapper = RawBytesMessageMapper() done_flags = [] - topic = f"test_str.{os.getpid()}" - _precreate_topic(topic) for symbol in symbols: topic = f"test_str.{os.getpid()}" kafkaadapter.publish(msg_mapper, topic, symbol, d) @@ -356,10 +352,13 @@ def graph(symbols: list, count: int): stop = csp.filter(stop, stop) csp.stop_engine(stop) + topic = f"test_str.{os.getpid()}" + _precreate_topic(kafkaadapter, topic) + symbols = ["AAPL", "MSFT"] count = 10 results = csp.run( - graph, symbols, count, starttime=datetime.utcnow(), endtime=timedelta(seconds=30), realtime=True + graph, symbols, count, starttime=datetime.utcnow(), endtime=timedelta(seconds=10), realtime=True ) # print(results) for symbol in symbols: diff --git a/csp/tests/adapters/test_status.py b/csp/tests/adapters/test_status.py index 66cd41dd6..f009834e8 100644 --- a/csp/tests/adapters/test_status.py +++ b/csp/tests/adapters/test_status.py @@ -4,8 +4,9 @@ import csp from csp import ts -from csp.adapters.kafka import DateTimeType, JSONTextMessageMapper, KafkaStatusMessageType +from csp.adapters.kafka import KafkaStatusMessageType from csp.adapters.status import Level +from csp.adapters.utils import DateTimeType, JSONTextMessageMapper from .kafka_utils import _precreate_topic @@ -14,7 +15,7 @@ class SubData(csp.Struct): a: bool -class TestStatus: +class TestStatusKafka: @pytest.mark.skipif(not os.environ.get("CSP_TEST_KAFKA"), reason="Skipping kafka adapter tests") def test_basic(self, kafkaadapter): topic = f"csp.unittest.{os.getpid()}" @@ -41,8 +42,8 @@ def graph(): done_flag = csp.count(status) == 1 csp.stop_engine(done_flag) - _precreate_topic(topic) - results = csp.run(graph, starttime=datetime.utcnow(), endtime=timedelta(seconds=10), realtime=True) + _precreate_topic(kafkaadapter, topic) + results = csp.run(graph, starttime=datetime.utcnow(), endtime=timedelta(seconds=5), realtime=True) status = results["status"][0][1] assert status.status_code == KafkaStatusMessageType.MSG_RECV_ERROR assert status.level == Level.ERROR diff --git a/pyproject.toml b/pyproject.toml index e9178160e..0c02f7dc8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -76,7 +76,6 @@ develop = [ "graphviz", "pillow", # adapters - "httpx>=0.20,<1", # kafka "polars", # parquet "psutil", # test_engine/test_history "sqlalchemy", # db @@ -94,7 +93,6 @@ test = [ "pytest-asyncio", "pytest-cov", "pytest-sugar", - "httpx>=0.20,<1", "polars", "psutil", "requests",