Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Interactive MSQ profile (Dart) #17139

Open
gianm opened this issue Sep 24, 2024 · 4 comments
Open

Interactive MSQ profile (Dart) #17139

gianm opened this issue Sep 24, 2024 · 4 comments
Labels
Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 Design Review Proposal

Comments

@gianm
Copy link
Contributor

gianm commented Sep 24, 2024

Motivation

Druid 24 included a task-based multi-stage query engine proposed in #12262. This has proved useful for DML (REPLACE, INSERT) and querying directly from deep storage.

This proposal is to introduce the natural next evolution: an interactive "profile" of the engine. The same engine is configured to run interactively, including changes such as:

The main purpose of this engine is to provide a way to run queries that are too lightweight for the task-based MSQ engine to make sense, but too heavyweight for the standard native query engine to make sense. A good example would be a GROUP BY with an intermediate resultset of hundreds of millions of rows. In general this engine would specialize in the sort of midweight, ad-hoc queries that are common in the data warehousing world. I believe with some additional work it would also be possible to run lightweight, high QPS queries competitively with the standard native query engine.

Proposed changes

Name

In the initial PR I used the name dart for this profile of the engine. Darts are lightweight and go fast, which are good qualities in an interactive query engine. It even has a possible backronym: "Distributed Asynchronous Runtime Topology".

API

Initially I'm proposing an API that is compatible with the SQL query API, to make it easy to try out the new engine.

To issue a query, POST /druid/v2/sql/dart/ the same form of JSON payload that would be accepted by /druid/v2/sql/. Results are also in the same format. This is a synchronous API, although internally the engine is asynchronous, so it is definitely possible to introduce an asychronous API later on.

To issue a query and also return a report with stages, counters, etc, POST /druid/v2/sql/dart/?fullReport. This is like an
EXPLAIN ANALYZE. The report is in the same format as the reports generated by the task-based engine.

To see a list of running queries (a feature that the native engine does not have), GET /druid/v2/sql/dart/.

To cancel a query, DELETE /druid/v2/sql/dart/{sqlQueryId}.

To check if the engine is enabled, GET /druid/v2/sql/dart/enabled (returns 200 or 404).

Servers and resource management

Controllers run on Brokers (one per query) and the workers run on Historicals. Resource management would be bare-bones in the initial version, limited to simple controls on the number of concurrent queries that can execute on each server.

On Brokers, there are three configs:

  • druid.msq.dart.enabled = true to enable Dart.
  • druid.msq.dart.controller.concurrentQueries provides a limit to the number of query controllers that can run concurrently on that Broker. Additional controllers beyond this number queue up. Default is 1.
  • druid.msq.dart.query.context.targetPartitionsPerWorker sets the number of partitions per worker to create during a shuffle. Generally this should be set to the number of threads available on workers, so they can process shuffled data fully multithreaded.

Brokers only run controllers, so they do not need meaningful CPU or memory resources beyond what is needed to gather partition statistics for global sorts. (And anyway, I'd like to use fewer global sorts in the future; see "Future work" around hashLocalSort.)

On Historicals, there are three configs:

  • druid.msq.dart.enabled = true to enable Dart.
  • druid.msq.dart.worker.concurrentQueries provides a limit to the number of query workers that can run concurrently on that Historical. Default is equal to the number of merge buffers, because each query needs one merge buffer. Ideally this should be set to something equal to, or larger than, the sum of the concurrentQueries setting on all Brokers.
  • druid.msq.dart.worker.heapFraction provides a limit to the amount of heap used across all Dart queries. The default is 0.35, or 35% of heap.

The initial version does not run on realtime tasks, meaning realtime data is not included in queries.

Resource management is very simple in the initial version. It works like this in the version that is headed for Druid 31:

  • Concurrency: limit on concurrent queries at the Broker and at each Historical, given by server configuration.
  • Broker HTTP threads: each query currently ties up an HTTP thread. But it doesn't necessarily need to; this is only happening because of hooking into the existing SqlResource code. The engine is internally async.
  • Historical HTTP threads: not tied up; the Broker-to-Historical protocol is async.
  • Memory: each concurrently-running query gets one merge buffer and one slice of heap. Each query gets the same size of heap slice. Standard processing buffers are not used.
  • CPU: each query can use all available CPUs in a fine-grained way. Because memory is allocated to the query, not to a processing thread, it is possible to time-slice the processing pool more finely than with the standard query engine.
  • Disk: usage is currently uncontrolled; it is possible to fill up local disk with a heavyweight enough query.
  • No timeouts, priorities, or lanes. (yet.)

I expect this will evolve over time. The "Future work" section includes thoughts on how resource management could evolve.

Operational impact

None if the engine is disabled or if queries are not being issued. If queries are being issued, on Historicals, Dart queries use the same merge buffers and processing pool as regular native queries, so would potentially conflict with other queries that need those resources. They also use up to 35% of heap space if actually running.

On Brokers, Dart queries use the same HTTP threads as regular native queries, and could conflict there as well.

The API and all configuration parameters should be considered experimental and subject to breaking changes in upcoming Druid releases, as the initial version of the feature evolves. The ability for Dart queries to function properly in a mixed-version environment (such as during a rolling update) is also not be guaranteed for these initial experimental releases. Nevertheless, this would have no impact on regular queries.

Future work

Some thoughts on future work items.

System:

  • The task-based profile always pulls data from deep storage, and the initial version of the interactive profile always uses locally pre-cached data. Some hybrid is a clear next move.
  • Include realtime data, using workers running on realtime tasks.
  • Graceful shutdown of Historicals (currently, the async nature of the API means that when a Historical is TERMed, it exits immediately, even if a query is in flight).

API:

  • JDBC support.
  • Add a way to specify Dart as the engine for /druid/v2/sql/statements.

Resource management:

  • Set the "concurrentQueries" and "targetPartitionsPerWorker" parameters automatically based on available resources. We should allow user-supplied configuration but it should not be required to get a good baseline level of performance.
  • Implement timeouts, priorities, and lanes.
  • Allow queries to burst up to an "attic" of additional memory. (Currently all queries get the same amount of memory, and need to use disk when they run out.)
  • Automatic reprioritization or relaning of queries based on runtime characteristics. Experience tells us that it is difficult for users to set good priorities on all queries before they are issued. There is a need for the system to determine the appropriate priority on its own. I think this will need to involve some degree of canceling or suspending, and then restarting, expensive queries in some cases.
  • Release workers when they are no longer needed. (Currently workers are held even if not all are needed for future stages.)
  • Controls on disk usage.

Performance items:

  • Multithread hashLocalSort shuffles. Currently only one partition is sorted at a time, even on a multithreaded worker. This is the main reason the initial version is using globalSort so much, even though globalSort involves more overhead on the controller.
  • Use hashLocalSort for aggregation rather than globalSort, once it's multithreaded, to reduce dependency on the controller and on statistics gathering.
  • Aggregate (combine) opportunistically during sorting.
  • For aggregation, use per-thread hash tables that persist across segments. Currently each segment is processed with its own hash table, then the contents of the hash tables are sorted. It would be better to continue the hash aggregation as far as possible.
  • For aggregation, use a global hash table to fully aggregate any data that fit in memory, rather than always using a sort for data from different threads.
  • Improve maximum possible QPS by reducing overheads on simple queries like SELECT COUNT(*) FROM tbl.
  • Add QueryMetrics to reports for better insight into performance.
@gianm gianm added Design Review Proposal Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 labels Sep 24, 2024
gianm added a commit to gianm/druid that referenced this issue Sep 24, 2024
This patch adds a profile of MSQ named "Dart" that runs on Brokers and
Historicals, and which is compatible with the standard SQL query API.
For more high-level description, and notes on future work, refer to apache#17139.

This patch contains the following changes, grouped into packages.

Controller (org.apache.druid.msq.dart.controller):

The controller runs on Brokers. Main classes are,

- DartSqlResource, which serves /druid/v2/sql/dart/.
- DartSqlEngine and DartQueryMaker, the entry points from SQL that actually
  run the MSQ controller code.
- DartControllerContext, which configures the MSQ controller.
- DartMessageRelays, which sets up relays (see "message relays" below) to read
  messages from workers' DartControllerClients.
- DartTableInputSpecSlicer, which assigns work based on a TimelineServerView.

Worker (org.apache.druid.msq.dart.worker)

The worker runs on Historicals. Main classes are,

- DartWorkerResource, which supplies the regular MSQ WorkerResource, plus
  Dart-specific APIs.
- DartWorkerRunner, which runs MSQ worker code.
- DartWorkerContext, which configures the MSQ worker.
- DartProcessingBuffersProvider, which provides processing buffers from
  sliced-up merge buffers.
- DartDataSegmentProvider, which provides segments from the Historical's
  local cache.

Message relays (org.apache.druid.messages):

To avoid the need for Historicals to contact Brokers during a query, which
would create opportunities for queries to get stuck, all connections are
opened from Broker to Historical. This is made possible by a message relay
system, where the relay server (worker) has an outbox of messages.

The relay client (controller) connects to the outbox and retrieves messages.
Code for this system lives in the "server" package to keep it separate from
the MSQ extension and make it easier to maintain. The worker-to-controller
ControllerClient is implemented using message relays.

Other changes:

- Controller: Added the method "hasWorker". Used by the ControllerMessageListener
  to notify the appropriate controllers when a worker fails.
- WorkerResource: No longer tries to respond more than once in the
  "httpGetChannelData" API. This comes up when a response due to resolved future
  is ready at about the same time as a timeout occurs.
- MSQTaskQueryMaker: Refactor to separate out some useful functions for reuse
  in DartQueryMaker.
- SqlEngine: Add "queryContext" to "resultTypeForSelect" and "resultTypeForInsert".
  This allows the DartSqlEngine to modify result format based on whether a "fullReport"
  context parameter is set.
- LimitedOutputStream: New utility class. Used when in "fullReport" mode.
- TimelineServerView: Add getDruidServerMetadata as a performance optimization.
- CliHistorical: Add SegmentWrangler, so it can query inline data, lookups, etc.
- ServiceLocation: Add "fromUri" method, relocating some code from ServiceClientImpl.
- FixedServiceLocator: New locator for a fixed set of service locations. Useful for
  URI locations.
@LakshSingla
Copy link
Contributor

LakshSingla commented Sep 25, 2024

I am still reviewing the changes, so it might be moot once I complete the review:

they do not need meaningful CPU or memory resources beyond what is needed to gather partition statistics for global sorts

The controller can consume up to a maximum of 300MB of heap space for the collection of statistics, and this maximum seems a lot given that it will be per query, if compared with the default value of something like maxSubqueryBytes.

For reference, a broker with 20 GB of heap space, and 50 max concurrent queries and no lookups would allocate 0.5 * 1/50 * (20GB) of heap space per query for inlining results which is 200MB, while each Dart query can theoretically take up to 300MBs. This parity will be more for brokers with smaller heap sizes. Do we require some limiting for the brokers at the moment (like we do with subqueries) or would we take this up once we start tuning concurrency?

@gianm
Copy link
Contributor Author

gianm commented Sep 25, 2024

The controller can consume up to a maximum of 300MB of heap space for the collection of statistics, and this maximum seems a lot given that it will be per query, if compared with the default value of something like maxSubqueryBytes.

For reference, a broker with 20 GB of heap space, and 50 max concurrent queries and no lookups would allocate 0.5 * 1/50 * (20GB) of heap space per query for inlining results which is 200MB, while each Dart query can theoretically take up to 300MBs. This parity will be more for brokers with smaller heap sizes. Do we require some limiting for the brokers at the moment (like we do with subqueries) or would we take this up once we start tuning concurrency?

The heap space used by partition statistics is capped to 15% of the overall Broker heap, and is split across all possible controllers based on the maximum Dart concurrency. So a Broker with 20 GB of heap and 50 max concurrent Dart queries would use at most 60 MB for partition statistics per controller. Check out DartControllerMemoryManagementModule for where this is set up.

Another thing is that I expect we won't be gathering partition statistics for most queries for very long. When the following two future-work items are complete, then globalSort (and the statistics gathering) will only be needed when a query has an ORDER BY without a LIMIT at the outer level. Anything else would be able to use hash partitioning with local sorting, skipping stats gathering:

  • Multithread hashLocalSort shuffles. Currently only one partition is sorted at a time, even on a multithreaded worker. This is the main reason the initial version is using globalSort so much, even though globalSort involves more overhead on the controller.
  • Use hashLocalSort for aggregation rather than globalSort, once it's multithreaded, to reduce dependency on the controller and on statistics gathering.

gianm added a commit that referenced this issue Oct 1, 2024
This patch adds a profile of MSQ named "Dart" that runs on Brokers and
Historicals, and which is compatible with the standard SQL query API.
For more high-level description, and notes on future work, refer to #17139.

This patch contains the following changes, grouped into packages.

Controller (org.apache.druid.msq.dart.controller):

The controller runs on Brokers. Main classes are,

- DartSqlResource, which serves /druid/v2/sql/dart/.
- DartSqlEngine and DartQueryMaker, the entry points from SQL that actually
  run the MSQ controller code.
- DartControllerContext, which configures the MSQ controller.
- DartMessageRelays, which sets up relays (see "message relays" below) to read
  messages from workers' DartControllerClients.
- DartTableInputSpecSlicer, which assigns work based on a TimelineServerView.

Worker (org.apache.druid.msq.dart.worker)

The worker runs on Historicals. Main classes are,

- DartWorkerResource, which supplies the regular MSQ WorkerResource, plus
  Dart-specific APIs.
- DartWorkerRunner, which runs MSQ worker code.
- DartWorkerContext, which configures the MSQ worker.
- DartProcessingBuffersProvider, which provides processing buffers from
  sliced-up merge buffers.
- DartDataSegmentProvider, which provides segments from the Historical's
  local cache.

Message relays (org.apache.druid.messages):

To avoid the need for Historicals to contact Brokers during a query, which
would create opportunities for queries to get stuck, all connections are
opened from Broker to Historical. This is made possible by a message relay
system, where the relay server (worker) has an outbox of messages.

The relay client (controller) connects to the outbox and retrieves messages.
Code for this system lives in the "server" package to keep it separate from
the MSQ extension and make it easier to maintain. The worker-to-controller
ControllerClient is implemented using message relays.

Other changes:

- Controller: Added the method "hasWorker". Used by the ControllerMessageListener
  to notify the appropriate controllers when a worker fails.
- WorkerResource: No longer tries to respond more than once in the
  "httpGetChannelData" API. This comes up when a response due to resolved future
  is ready at about the same time as a timeout occurs.
- MSQTaskQueryMaker: Refactor to separate out some useful functions for reuse
  in DartQueryMaker.
- SqlEngine: Add "queryContext" to "resultTypeForSelect" and "resultTypeForInsert".
  This allows the DartSqlEngine to modify result format based on whether a "fullReport"
  context parameter is set.
- LimitedOutputStream: New utility class. Used when in "fullReport" mode.
- TimelineServerView: Add getDruidServerMetadata as a performance optimization.
- CliHistorical: Add SegmentWrangler, so it can query inline data, lookups, etc.
- ServiceLocation: Add "fromUri" method, relocating some code from ServiceClientImpl.
- FixedServiceLocator: New locator for a fixed set of service locations. Useful for
  URI locations.
@vogievetsky
Copy link
Contributor

There are a number of web console changes that are spread out between #17132, #17135, and #17147 but that were all developed together with the Dart engine as the prime motivation. Primarily the stage detail view has been updated.

image

Notably:

  • There is a visualization column that is added to showcase the structure of the stage dependency graph
  • There timing column has been improved to visualize a Gantt chart letting you intuitively know which stages were running concurrently (stages running concurrently is itself a new feature!)
  • There is a new column showing the CPU timing counters that are now available on MSQ task and Dart queries.
  • The output column has been condensed from two columns to afford extra space for the other additions.
  • Last but not least, there is a new tooltip with additional data that you will see when hovering over some of the UI elements (seen in the screenshot showing the clustering in the Output column of Stage4).

kfaraz pushed a commit to kfaraz/druid that referenced this issue Oct 4, 2024
This patch adds a profile of MSQ named "Dart" that runs on Brokers and
Historicals, and which is compatible with the standard SQL query API.
For more high-level description, and notes on future work, refer to apache#17139.

This patch contains the following changes, grouped into packages.

Controller (org.apache.druid.msq.dart.controller):

The controller runs on Brokers. Main classes are,

- DartSqlResource, which serves /druid/v2/sql/dart/.
- DartSqlEngine and DartQueryMaker, the entry points from SQL that actually
  run the MSQ controller code.
- DartControllerContext, which configures the MSQ controller.
- DartMessageRelays, which sets up relays (see "message relays" below) to read
  messages from workers' DartControllerClients.
- DartTableInputSpecSlicer, which assigns work based on a TimelineServerView.

Worker (org.apache.druid.msq.dart.worker)

The worker runs on Historicals. Main classes are,

- DartWorkerResource, which supplies the regular MSQ WorkerResource, plus
  Dart-specific APIs.
- DartWorkerRunner, which runs MSQ worker code.
- DartWorkerContext, which configures the MSQ worker.
- DartProcessingBuffersProvider, which provides processing buffers from
  sliced-up merge buffers.
- DartDataSegmentProvider, which provides segments from the Historical's
  local cache.

Message relays (org.apache.druid.messages):

To avoid the need for Historicals to contact Brokers during a query, which
would create opportunities for queries to get stuck, all connections are
opened from Broker to Historical. This is made possible by a message relay
system, where the relay server (worker) has an outbox of messages.

The relay client (controller) connects to the outbox and retrieves messages.
Code for this system lives in the "server" package to keep it separate from
the MSQ extension and make it easier to maintain. The worker-to-controller
ControllerClient is implemented using message relays.

Other changes:

- Controller: Added the method "hasWorker". Used by the ControllerMessageListener
  to notify the appropriate controllers when a worker fails.
- WorkerResource: No longer tries to respond more than once in the
  "httpGetChannelData" API. This comes up when a response due to resolved future
  is ready at about the same time as a timeout occurs.
- MSQTaskQueryMaker: Refactor to separate out some useful functions for reuse
  in DartQueryMaker.
- SqlEngine: Add "queryContext" to "resultTypeForSelect" and "resultTypeForInsert".
  This allows the DartSqlEngine to modify result format based on whether a "fullReport"
  context parameter is set.
- LimitedOutputStream: New utility class. Used when in "fullReport" mode.
- TimelineServerView: Add getDruidServerMetadata as a performance optimization.
- CliHistorical: Add SegmentWrangler, so it can query inline data, lookups, etc.
- ServiceLocation: Add "fromUri" method, relocating some code from ServiceClientImpl.
- FixedServiceLocator: New locator for a fixed set of service locations. Useful for
  URI locations.
@gianm
Copy link
Contributor Author

gianm commented Oct 16, 2024

I've been doing some benchmarking of the version of this similar to the one that will be included in Druid 31 (as the experimental Dart engine). So far, it looks promising; bottom line from what I've seen is:

  • Dart outperforms native on individual heavyweight queries (high cardinality groupbys + distinct counts, large subqueries). This is mostly because of being able to fully parallelize through the entire query. On one query I tested below, the outperformance was nearly 7x (284s -> 42s) on an 8-core machine (my laptop).
  • Native and Dart have similar performance on individual lightweight queries.
  • Native outperforms Dart on high concurrency lightweight queries.

These results are in line with the initial focus on more complex, multi-stage queries. The performance on lightweight queries is better than I expected, and I think shows that if we put some work into it then the new engine could handle the whole spectrum of workloads.

These findings are from some test queries on the NYC Taxi dataset. There are benchmarks floating around that use the following four queries, which are all pretty lightweight:

  • Q1: SELECT cab_type, COUNT(*) FROM trips GROUP BY cab_type
  • Q2: SELECT passenger_count, AVG(total_amount) FROM trips GROUP BY passenger_count
  • Q3: SELECT passenger_count, FLOOR(__time TO YEAR), COUNT(*) FROM trips GROUP BY passenger_count, FLOOR(__time TO YEAR)
  • Q4: SELECT passenger_count, FLOOR(__time TO YEAR), trip_distance, COUNT(*) FROM trips GROUP BY passenger_count, FLOOR(__time TO YEAR), trip_distance ORDER BY FLOOR(__time TO YEAR), COUNT(*) DESC

I added two more, exercising high-cardinality groupbys and count-distincts:

  • Q5: SELECT ROUND(pickup_latitude, 3), ROUND(pickup_longitude, 3), ROUND(dropoff_latitude, 3), ROUND(dropoff_longitude, 3), COUNT(*) FROM trips GROUP BY 1, 2, 3, 4 ORDER BY 5 DESC LIMIT 1000
  • Q6: SELECT ROUND(pickup_latitude, 3), ROUND(pickup_longitude, 3), ROUND(dropoff_latitude, 3), ROUND(dropoff_longitude, 3), COUNT(DISTINCT trip_id) FROM trips GROUP BY 1, 2, 3, 4 ORDER BY 5 DESC LIMIT 1000

I also added a "needle-haystack" query, which selects just two rows matching a highly selective filter:

  • Q7: SELECT COUNT(*) from trips_60m where dropoff = '0101000020E6100000000000E0F87E52C00000008090564440'

I loaded an abbreviated version of this dataset on my laptop (60 million rows) and ran the test queries native and Dart. The results were (in ms):

  • Q1: 79 (native), 91 (Dart) [median of 5] <-- similar speeds
  • Q2: 312 (native), 324 (Dart) [median of 5] <-- similar speeds
  • Q3: 185 (native), 414 (Dart) [median of 5] <-- native faster
  • Q4: 611 (native), 1312 (Dart) [median of 5] <-- native faster
  • Q5: 37223 (native), 25110 (Dart) [median of 3] <-- Dart faster
  • Q6: 284314 (native), 42681 (Dart) [just 1] <-- Dart faster
  • Q7: 6 (native), 7 (Dart) [median of 100] <-- similar speeds

I looked into the factors driving the different performance on Q3, Q4, Q5, and Q6:

  • Q3: Dart is slower because it's missing the timestampResultField + timestampResultFieldGranularity optimization that native has, which swaps out timestamp_floor for granularity. This is activated for native for the FLOOR(__time TO YEAR).
  • Q4: Dart is slower mostly because of the same missing FLOOR(__time TO YEAR) optimization as Q3. A smaller factor comes from the slightly larger resultset here (82,418 final results; 508,581 intermediate results before cross-segment merge). With the native engine, the cross-segment merge happens uses a single concurrent hash table. With Dart, the cross-segment merge uses a sort. The concurrent hash table performs better and wins over the sort. [one of the "future work" items is relevant to this.]
  • Q5: Some spilling and disk-based sort is needed with both engines. Dart parallelizes through the entire query, whereas native gets bottlenecked on merging the spill files.
  • Q6: Again, spilling is required with both engines. Due to the COUNT(DISTINCT trip_id), there's also a large subquery. Dart parallelizes through the entire query and handles it nearly 7x faster. Native gets bottlenecked at processing the subquery.

Finally, I ran a concurrency test with the "needle-haystack" Q7, using 50 threads issuing 500 queries each, back-to-back as fast as they could. The system was configured with druid.msq.dart.controller.concurrentQueries = 25, druid.msq.dart.worker.concurrentQueries = 25, and druid.query.scheduler.numThreads = 25, so both native and Dart got to run 25 concurrent queries. The results were 1529 QPS (native), 730 QPS (Dart). There are various areas of potential improvement for this kind of workload, such as reducing the number of controller<->worker messages transferred during a query, processing multiple segments in a single frame processor, logging more quietly, etc.

@gianm gianm changed the title Interactive MSQ profile Interactive MSQ profile (Dart) Oct 18, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 Design Review Proposal
Projects
None yet
Development

No branches or pull requests

3 participants