Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduled batch supervisor #17353

Open
wants to merge 24 commits into
base: master
Choose a base branch
from

Conversation

abhishekrb19
Copy link
Contributor

@abhishekrb19 abhishekrb19 commented Oct 15, 2024

This change introduces a scheduled batch supervisor in Druid. The supervisor periodically wakes up to submit an MSQ ingest query, allowing users to automate batch ingestion directly within Druid. Think of it as simple batch task workflows natively integrated into Druid, though it doesn't replace more sophisticated workflow management systems like Apache Airflow. This is an experimental feature.

Summary of changes:

The scheduled_batch supervisor can be configured as follows:

{
    "type": "scheduled_batch",
    "schedulerConfig": {
        "type": "unix",
        "schedule": "*/5 * * * *"
    },
    "spec": {
        "query": "REPLACE INTO foo OVERWRITE ALL SELECT * FROM bar PARTITIONED BY DAY"
    },
    "suspended": false
}

The supervisor will submit the REPLACE sql query repeatedly every 5 minutes. The supervisor supports two types of cron scheduler configurations:

  1. Unix cron syntax:
    • Type must be set to unix.
    • Follows the standard Unix cron format, e.g., */5 * * * * to schedule the SQL task every 5 minutes.
    • Supports macro expressions such as @daily, @hourly, @monthly, etc.
  2. Quartz cron syntax:
    • Type must be set to quartz.
    • Offers more flexibility and control for scheduling tasks.
    • Example: 0 0 0 ? 3,6,9,12 MON-FRI to schedule tasks at midnight on weekdays during March, June, September, and December.

Key points:

  • User can specify the query along with any context in the spec. This structure is identical to what the MSQ task engine accepts.
  • Currently, the batch supervisor will repeatedly submit the DML SQL present in the spec as-is on its schedule.
  • This can be useful for recurring batch ingestion or reindexing tasks.
  • There is no parameterization or "change detection" support for input sources yet, but some simple mechanisms are planned for the future.
  • Users can configure multiple scheduled batch supervisors for the same datasource.
  • Only DML ingestion queries are currently supported by the scheduled batch supervisor.
  • Users can suspend, resume and terminate the scheduled batch supervisor. Suspending the supervisor stops new tasks from being scheduled.

Some implementation details:

  • The indexing-service module now depends on the druid-sql module. This allows the scheduled batch supervisor, running on the Overlord, to communicate with the Broker to:
    a. Validate and parse the user-specified query.
    b. Submit MSQ queries to the /druid/v2/sql/task/ endpoint.
  • A ScheduledBatchScheduler is injected in the Overlord, which is responsible for scheduling and unscheduling all scheduled batch instances.
  • A BrokerClient implementation has been added, leveraging the ServiceClient functionality.
  • The SqlTaskStatus and its unit test SqlTaskStatusTest have been moved from the msq module to the sql module so it can be reused by the BrokerClient implementation in the sql module.
  • Added ExplainPlanInformation class, which is used to deserialize the explain plan response from the Broker.

The status API response for the supervisor contains the scheduler state along with active and completed tasks:

curl -X GET http://localhost:8888/druid/indexer/v1/supervisor/scheduled_batch__foo__3a916bcf-060d-4a42-aab6-904ff61d25cd/status
{
  "supervisorId": "scheduled_batch__foo__3a916bcf-060d-4a42-aab6-904ff61d25cd",
  "status": "SCHEDULER_RUNNING",
  "lastTaskSubmittedTime": "2024-10-18T17:35:00.000Z",
  "nextTaskSubmissionTime": "2024-10-18T17:40:00.000Z",
  "timeUntilNextTaskSubmission": "PT122.431S",
  "activeTasks": {
    "query-6a6f14e1-809a-4832-8b71-7128a23165e9": {
      "id": "query-6a6f14e1-809a-4832-8b71-7128a23165e9",
      "status": "RUNNING",
      "duration": -1,
      "errorMsg": null,
      "location": {
        "host": null,
        "port": -1,
        "tlsPort": -1
      }
    }
  },
  "completedTasks": {
    "query-c38c8c09-c101-45a1-a6d8-3dc64ce16ad3": {
      "id": "query-c38c8c09-c101-45a1-a6d8-3dc64ce16ad3",
      "status": "SUCCESS",
      "duration": 503,
      "errorMsg": null,
      "location": {
        "host": null,
        "port": -1,
        "tlsPort": -1
      }
    },
    "query-6badb07f-6ed4-49ce-bb70-8a13be53876b": {
      "id": "query-6badb07f-6ed4-49ce-bb70-8a13be53876b",
      "status": "SUCCESS",
      "duration": 518,
      "errorMsg": null,
      "location": {
        "host": null,
        "port": -1,
        "tlsPort": -1
      }
    }
  }
}

TODOs in this PR:

  • Track metrics of submitted/active and completed tasks per supervisor.
  • Add javadocs for the new classes
  • Clean up some interfaces.
  • More test coverage.

Future Improvements:

  • Scheduler State Persistence: This feature doesn't yet persist the scheduler state. Adding persistence will improve robustness (e.g., preventing missed jobs during Overlord restarts).
  • Task Limits: There is currently no limit on the maximum number of tasks allowed by the scheduled batch supervisor. We may want to consider this in the future.
  • The Supervisors page in the Druid web-console should change a bit to accommodate this change (more broadly non-streaming supervisors).

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@github-actions github-actions bot added Area - Batch Ingestion Area - Querying Area - Dependencies Area - Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 labels Oct 15, 2024
@abhishekrb19 abhishekrb19 marked this pull request as draft October 15, 2024 21:49
public ScheduledBatchScheduler(
final TaskMaster taskMaster,
final ScheduledExecutorFactory executorFactory,
final ServiceEmitter emitter,

Check notice

Code scanning / CodeQL

Useless parameter Note

The parameter 'emitter' is never used.
// do nothing
}

public enum State implements SupervisorStateManager.State

Check notice

Code scanning / CodeQL

Class has same name as super class Note

State has the same name as its supertype
org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager$State
.
assertEquals(spec.getDataSources(), observedSpec.getDataSources());
}
catch (Exception e) {
throw DruidException.defensive(e, "Error while performing serde of spec[%s].", spec);

Check notice

Code scanning / CodeQL

Use of default toString() Note test

Default toString(): ScheduledBatchSupervisorSpec inherits toString() from Object, and so is not suitable for printing.
@a2l007
Copy link
Contributor

a2l007 commented Oct 17, 2024

This sounds exciting @abhishekrb19 ! Sorry I haven't reviewed the code yet but are there extension points to modify/add the ingest query during each supervisor run? Or will the same ingest query be executed during every run?

@abhishekrb19 abhishekrb19 marked this pull request as ready for review October 18, 2024 18:30
@abhishekrb19
Copy link
Contributor Author

I have updated the PR description to include details on the design and implementation. While the TODOs noted in the summary still need to be addressed, the changes are generally ready for review.

@abhishekrb19
Copy link
Contributor Author

This sounds exciting @abhishekrb19 ! Sorry I haven't reviewed the code yet but are there extension points to modify/add the ingest query during each supervisor run? Or will the same ingest query be executed during every run?

@a2l007, thanks for taking a look! Yeah, the same ingest query will be repeatedly submitted during the supervisor's scheduled runs. We plan to add simple change detection or templating mechanisms for some common use cases, which will be introduced in a future patch.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants