Skip to content

A CQRS implementation of a simple single field ingestion service with a backup/restore from topic function

License

Notifications You must be signed in to change notification settings

0xste/pollinate-timestamp-service

Repository files navigation

pollinate-timestamp-service

Assignment

  1. Create a proof of concept for the following:
  • Create an application with simple API that will be used to submit a curl command to insert the date-timestamp into your database.
    • curl -X POST http://<someip>/api/v1/timestamp/2009-09-28T19:03:12Z
  1. The proof of concept should consist of a single web-application and a single database
    • This has been somewhat changed in this implementation:
      • timestamp-command-service ingests the HTTP request and publishes to Kafka (returns a "ticket" id)
      • timestamp-consumer-service consumes the message from Kafka and writes the Postgres
      • timestamp-query-service reads from the DB to validate a command has been ingested successfully
  2. Create a GitHub project for this assignment - https://github.com/stefanomantini/pollinate-timestamp-service
  3. Automation the provisioning your database node and web node with the API you created.
    • See [Getting Started](#Getting Started) section below
  4. A readme file must be populated with your git project detailing your solution and how to run it.
    • This file
  5. Provide details of your high-level design that would meet the requirements outlined above.

Getting started

Notes:

  • Given time constraints, the automation of Kafka & Postgres in a scaleable way was not feasible...
  • A middleground has been to deploy a "platform" stack using docker-compose running on the host network
  • The services are deployed alongside the platform stack and make use of the host network for consuming/publishing
  • An initial implementation of the deployment to minikube using ansible has been started under main.yml, however would requires either: deployment of k8s/postgres to the same deployment, networking configuration for the services to communicate on the host network, or DNS setup.
  • My preference would be to build in some CI to build the containers (see docker-hub for automation command, consumer, query) and deploy these through gitlab-ci directly to the cluster.

Deployment

  • Starting the "platform" services i.e. kafka/postgres

  • Checking the platform is up

    • docker-compose ps
  • Further Testing

    • Import the postman collection pollinate-timestmap.postman_collection.json to perform some manual tests
    • A comprehensive acceptance/integration test suite lives in the ./integration directory, run go test ./... to produce output

Development

  • Common Non Functional Requirements

    • It reads in configuration from environment variables
    • It is stateless
    • Exposes a /health endpoint curl '{{host}}:{{port}}/health' --header 'correlationId: 95d5905e-a25a-4932-bc44-aa95563a7a38' to check system status (for use in liveness checks)
  • timestamp-command-service

    • Non Functional Requirements:
      • This service is written in go
      • An ingress is required for this service
      • An nginx sidecar/api-gateway would be deployed to terminate TLS for this service
    • Functional Requirements:
      • Accepts a HTTP request:
        • Method: POST
        • URL: /api/v1/timestamp/{{RFC3339_TIMEATMP}}
        • Headers:
          • correlationId: {{UUIDv4}}
        • Example requests
          • curl --location --request POST 'http://localhost:7080/api/v1/timestamp/2006-01-02T15:04:05Z' --header 'correlationId: 52f2665d-c087-42e5-a571-89061021a37c'
          • curl --location --request POST 'http://localhost:7080/app' --header 'correlationId: 52f2665d-c087-42e5-a571-89061021a37c'
        • Example Message published to kafka
          • {"command_id":"add1f52d-7c56-4c9f-b319-1a266c355f6b", "event_timestamp":"2006-01-02T15:04:05Z"
        • Example HTTP response to client
          • {"id":"add1f52d-7c56-4c9f-b319-1a266c355f6b"}
        • Description:
          • This api accepts a timestamp (or can be left empty and will use the current timestamp)
          • The api is versioned to allow for breaking changes in future
          • The correlationId header is passed from the client for the purposes of tracing, it is logged out to trace spans
          • The returned ID will be used as the DB primary key the consumer, and will be retrieveable via the query-service
          • This service doesn't actually "Write to the DB" directly, but instead forms the first half of the distributed transaction
  • timestamp-consumer-service

    • Non Functional Requirements:
      • This service is written in java
      • This service will create the DB table if not exists
      • To rebuild the DB, it's as simple as changing the consumer group, which will reset the offset and re-add all messages to the DB
    • Functional Requirements:
      • Accepts a kafka message:
        • Topic: timestamp.command.v1
        • Headers:
          • correlationId: {{UUIDv4}} (optional)
        • Example message recieved
          • {"command_id":"add1f52d-7c56-4c9f-b319-1a266c355f6b", "event_timestamp":"2006-01-02T15:04:05Z"
        • Example DB write
          • INSERT INTO TIMESTAMP_RECORDS (ID, EVENT_TIMESTAMP, CREATED_AT) VALUES ('add1f52d-7c56-4c9f-b319-1a266c355f6b', '2006-01-02T15:04:05Z', '2020-08-31T15:04:05Z')
        • Description:
          • This api accepts a kafka message in json format
          • The api is versioned to allow for breaking changes in future
          • The correlationId header is passed from the client for the purposes of tracing, it is logged out to trace spans
          • the command-id is used as the primary key, and will be retrieveable via the query-service
  • timestamp-query-service

    • Non Functional Requirements:
      • This service is written in java
      • An ingress is required for this service
      • An nginx sidecar/api-gateway would be deployed to terminate TLS for this service
      • This service will create the DB table if not exists
    • Functional Requirements:
      • Accepts a HTTP request:
        • Method: GET
        • URL: /api/v1/timestamp/{{COMMAND_ID}}
        • Headers:
          • correlationId: {{UUIDv4}}
        • Example request
          • curl --location --request GET 'http://localhost:7081/api/v1/timestamp/add1f52d-7c56-4c9f-b319-1a266c355f6b' --header 'correlationId: 52f2665d-c087-42e5-a571-89061021a37c'
        • Example HTTP response to client
          • {"command_id":"add1f52d-7c56-4c9f-b319-1a266c355f6b", "event_timestamp":"2006-01-02T15:04:05Z","created_at":"2020-08-31T15:04:05Z"
        • Description:
          • This api accepts a command-id and is used to validate a successful write to the api
          • The api is versioned to allow for breaking changes in future
          • The correlationId header is passed from the client for the purposes of tracing, it is logged out to trace spans

Disaster Recovery

This solution is predicated on kafka being Highly Available, Fault Tolerant with failover. My reccomendation would be to use a managed-service to operate kafka as is done often to reduce the operational burden of ensuring consistency and availability between nodes.

The DR scenario to be recovered from in the scenario detailed here would be predicated on data in kafka being held as the golden record (see architecture diagram for further detail).

  • Background:

    • The postgres DB has been deleted
    • There were records in the DB that were being used by a downstream system
    • The DB is not recoverable
    • Kafka is still Available and consistent
  • Scenario:

    • Setup
      • Create 1001 records in the DB
        • run go test ./... in the ./integreation directory
      • Query a record in the DB using the query service (baseline)
        • curl 'localhost:7082/api/v1/timestamp/{{id}}' \ --header 'correlationId: 85464eff-f6f9-4fe5-ab9d-55a259502077'
    • Scenario
      • Go to PGAdmin:80 and delete all records in the postgres db public schema -TRUNCATE TIMESTAMP_RECORDS
    • Recovery
      • modify the docker-compose.yml to change the value of the timestamp-command-service environment variable KAFKA_GROUP_ID, provided to a unique value (increment the number on the end)
      • Notes:
        • This example scenario makes use of the same DB, however would likely be performed on a separate DB, and would work in the same way
        • There's idempotency checks on the DB to ensure the record can't be re-inserted or upserted
        • The KAFKA_GROUP_ID is being used in the instance that
        • All writes must be IDEMPOTENT in the consumer
        • No retries/deadlettering is considered in this solution, if processing fails, the consumer offset will not be committed and processing will block for all consumers in the same consumer group

Future Improvements

  • Deployments
    • Migrate away from docker-compose to Helm
    • Following the convention in main.yml to deploy to minikube for dev
  • Build
    • Migrate to GitLab CI
    • Use the k8s integration for building and deploying
  • Logging/Monitoring
    • Use injected correlationId in all logs
    • Make all logs follow same convention
    • Zipkin/Prometheus for logging
  • Operational Improvements
    • Implement a spring cloud config server backed by vault
    • Implement service discovery using consul for intra-service comms
    • Harden kafka (probably deploy the confluent operator cluster)

About

A CQRS implementation of a simple single field ingestion service with a backup/restore from topic function

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published