Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Abandon encoded tuples as task definition in dsk graphs #9969

Open
fjetter opened this issue Feb 16, 2023 · 7 comments · Fixed by #11248 or dask/distributed#8797
Open

Abandon encoded tuples as task definition in dsk graphs #9969

fjetter opened this issue Feb 16, 2023 · 7 comments · Fixed by #11248 or dask/distributed#8797
Labels
core discussion Discussing a topic with no specific actions yet enhancement Improve existing functionality or make things work better highlevelgraph Issues relating to HighLevelGraphs. needs attention It's been a while since this was pushed on. Needs attention from the owner or a maintainer. scheduler

Comments

@fjetter
Copy link
Member

fjetter commented Feb 16, 2023

TLDR

I would like to introduce a Task class (name TBD) instead of using tuples to define our graphs. I believe this can help us with many problem spaces like serialization, stringification, graph composition and analysis, etc.

Motivation

Currently, all task graphs are defined as dictionaries that are mapping a hashable to a tuple of things

Example

dsk = {
    "key-1": (func, "a", "b"),
    "key-2": (func, "key-1", "b"),
    "key-3": (func, (func2, "c", "key-1"), "key-2")
}

This examples showcases three different possibilities of defining tasks

  • key-1: is the simplest example. This is a function that is supposed to be called with a couple of arguments. This will translate to an expression like key_1 = func("a", "b")
  • key-2: This is a task that defines a dependency to key-1 by defining the literal key value as an argument. [1, 2] It will evaluate to key_2 = func(key_1, "b").
  • key-3: It is even possible to define tasks recursively with arbitrary nesting levels. It will evaluate to key_3 = func(func2("c", key_1), key_2).

To handle this structure, several utility functions are typically used through the code base to convert strings, generate tokens or walk the graph recursively. Below a couple of important methods

  • istask inspects and object and guesses that if the structure is approximately as described above, the object is a task spec
  • get_deps, get_dependencies and keys_in_tasks which is used to walk the dask graph
  • _execute_task which is used to execute a task and walk a recursive function like key-3. Note that a very similar functionality is provided by SubgraphCallable.
  • dumps_task to serialize such a tuple and again recursing through it. The developer has to ensure that this only happens after ``

The important piece to take away is that we are iterating and recursing over almost every iterable we can find in the graph and replace matching keys automatically. Multiple times. This is very error prone and not particularly performance sensitive [3].

Apart from unnecessarily walking the graph, a lot of the HighLevelGraph and distributed.protocol complexity stems from the attempt to not deserialize payload data or user functions on the scheduler. The reasons for this are plentiful but also being debated recently (e.g. scheduler environments differ from clients, performance by not serializing data unnecessarily). I consider this only a minor motivator for this change but I think it's a nice to have and comes for free.

Proposal

Most, if not all, of the above described complexity could be avoided if we chose to go for a very explicit task representation that is not based on an encoded tuple which would avoid any semantic confusion and would reduce our need to walk and recurse significantly.

from distributed.utils_comm import WrappedKey
from typing import Callable, Any, NewType

Key = NewType("Key", str)

_RemotePlaceholder = object()


class Task:
    func: Callable
    key: Key
    dependencies: dict[int, Key | WrappedKey]
    argspec: list[Any]

    __slots__ = tuple(__annotations__)

    def __init__(self, key: Key, func, args):
        self.key = key
        self.func = func
        self.dependencies = {}
        parsed_args: list[Any] = [None] * len(args)
        for ix, arg in enumerate(args):
            if isinstance(arg, (Key, WrappedKey)):
                self.dependencies[ix] = arg
                parsed_args[ix] = _RemotePlaceholder
            else:
                parsed_args[ix] = arg
        self.argspec = parsed_args

    def pack(self):
        # Serialization could be expresse by just convering between two objects
        # ... or this logic would just be implemented in __reduce__
        return PackedTask(
            key=self.key,
            dependencies=self.dependencies,
            # This could be just a pickle.dumps
            runspec=dumps({"func": self.func, "argspec": self.argspec}),
        )

    def __call__(self, *args, **kwargs):
        # resolve the input and match it to the dependencies
        argspec = self.match_args_with_dependencies(*args, **kwargs)
        return self.func(*argspec)

# Note how graph related metadata stays untouched between packing/unpacking. In
# the end, the dask scheduler only cares about the graph metadata and everything
# else is a pass through.
# More complicated versions of this could also curry arguments, e.g. user
# functions or payload data, while keeping others still free for later
# optimizations

class PackedTask:
    key: Key
    runspec: bytes
    dependencies: dict[int, Key]
    __slots__ = tuple(__annotations__)

    def __init__(self, key: Key, runspec: bytes, dependencies: dict[int, Key]):
        self.key = key
        self.runspec = runspec
        self.dependencies = dependencies

    def unpack(self):
        spec = loads(self.runspec)
        return Task(
            key=self.key,
            # We could implement a fast path that
            # is not computing dependencies again
            #
            # dependencies=self.dependencies,
            func=spec["func"],
            args=spec["argspec"],
        )

    @property
    def nbytes(self) -> int:
        # Some utility things could become object properties
        ...

I ran a couple of micro benchmarks to estimate the performance impact of using a slotted class instead of tuples and could indeed see a slowdown during graph generation (maybe a factor of 2) but this appears to be easily amortized by just not walking the graph so often and not recursing into everything and matching strings, etc. since this class exposes everything we'd be interested to know.

Cherry on top, this would allow us to have an identical definition of a tasks runspec in vanilla dask, on the client, scheduler and worker. Specifically, we could get rid of all the dumps_function, loads_function, dumps_task, execute_task, etc. in distributed/worker.py.

A migration path would be straight forward since we would convert at the outmost layer a legacy task graph to a new task graph such that all internal systems can use this representation instead. Nested tuples would be converted to SubgraphCallable (in fact, I believe in this world we should make SubgraphCallable a Task but that's a detail)

I can genuinely only see benefits. This will be a bit of tedious work but I believe it would make working on internals so much easier, particularly around serialization topics.

Thoughts? Am I missing something?

[1] The fact that we're using simple literals to identify dependencies can cause problems for developers if "stringification" is not properly applied since keys are not necessarily required to be strings until they reach the scheduler. However, if graph construction was not done properly, this can cause spurious errors, e.g. because the user function receives the non-stringified literal instead of the actual data.
[2] Whenever users are providing a key themselves, e.g. in scatter or compute this can cause dask to resolve dependency structures falsely and confuses literal user argumnets with keys.
[3] All of these functions are written efficiently and are operating on builtins so the performance overhead is OK but still unnecessary.

cc @rjzamora, @madsbk, @jrbourbeau, @mrocklin, @crusaderky

@github-actions github-actions bot added the needs triage Needs a response from a contributor label Feb 16, 2023
@fjetter fjetter added scheduler core discussion Discussing a topic with no specific actions yet highlevelgraph Issues relating to HighLevelGraphs. enhancement Improve existing functionality or make things work better and removed needs triage Needs a response from a contributor labels Feb 16, 2023
@rjzamora
Copy link
Member

Thanks for writing this up @fjetter ! I am personally very open to this idea. We certainly spend a lot of time fighting with the repercussions of having such a "lax" task standard. It is certainly nice to be able to say "a Dask graph is just a dictionary of strings and tuples!", but I agree that it doesn't seem "worth it."

I ran a couple of micro benchmarks to estimate the performance impact of using a slotted class instead of tuples and could indeed see a slowdown during graph generation (maybe a factor of 2) but this appears to be easily amortized by just not walking the graph so often and not recursing into everything and matching strings, etc. since this class exposes everything we'd be interested to know.

Makes sense that generation itself will take a bit of a hit. We definitely want to make sure that end-to-end performance improves for real-world applications (or remains reasonably flat) - I have my fingers crossed.

@mrocklin
Copy link
Member

mrocklin commented Feb 17, 2023 via email

@mrocklin
Copy link
Member

mrocklin commented Feb 17, 2023 via email

@fjetter
Copy link
Member Author

fjetter commented Feb 17, 2023

If we truly wanted to avoid deserialization on scheduler side in the context of HLGs we'd need to put some thought into how runspec=dumps({"func": self.func, "argspec": self.argspec}), can be expressed better.
I haven't put much thought into this but from my limited experience with HLGs I think it should be possible. I think we just need a slightly more sophisticated way of describing argspec/runspec but I think we can do this in a follow up.

Makes sense that generation itself will take a bit of a hit. We definitely want to make sure that end-to-end performance improves for real-world applications (or remains reasonably flat) - I have my fingers crossed.

I did some benchmarks with graphs of ~100k nodes where every node had ~3-5 arguments of which half are actual dependencies. I think this is a rather dense graph and a "bad case" for our current logic. I measured a chain of basically gen_graph + get_deps and it was faster end to end (I somehow misplaced the code I was using yesterday, sorry :( ).

In terms of performance I'm less worried about speed. If anything, we should be slightly careful when it comes to memory usage. The slotted class itself (as defined above) has the same memory overhead as a (four-)tuple. However, the above naive implementation is using a dict to track dependencies. Dicts require "a lot" of memory and this could add up to a couple hundred MBs for large graphs.
Not a deal breaker but still something to keep in mind, especially if we put more things into this object

@crusaderky
Copy link
Collaborator

dsk = {
    "key-1": (func, "a", "b"),
    "key-2": (func, "key-1", "b"),
    "key-3": (func, (func2, "c", "key-1"), "key-2")
}

For the sake of completeness, this is missing a fourth and last use case, which is constants. A constant is defined as any value that is either not hashable or does not appear in the keys.

That Task class you defined above looks a lot like SubgraphCallable, minus the actual subgraph.

The important piece to take away is that we are iterating and recursing over almost every iterable we can find in the graph and replace matching keys automatically. Multiple times.

Yeah, I recall using dirty hacks like using a UserList for my payloads to stop dask from iterating over it.

Multiple times. This is very error prone and not particularly performance sensitive [3].

Are we doing it upstream or downstream of optimization and losing the HLG layers? If it's (or if it were to be) upstream of optimization, it would only be once per layer, right? So is performance an actual problem?
Do we have a measure of how much we're spending on this when we call compute for, say, a 100k keys graph?

Important to point out that, if we did this, we'd not be able to remove all complexity related to dealing with tuples any time soon, because of backwards compatibility with third-party dsk's. I mean, today we're still supporting collections with a raw dict as their __dask_graph__ instead of a HighLevelGraph, and the hlg is old enough that we're considering retiring it.

IMHO, this feels like a substantial amount of work for... a nice to have? None of the current design feels on fire to me, but I may be wrong.

@fjetter
Copy link
Member Author

fjetter commented Feb 17, 2023

That Task class you defined above looks a lot like SubgraphCallable, minus the actual subgraph.

Yes, that's the point. As I stated, the SubgraphCallable is a Task in this approach. The entire point of this approach is that we're maintaining a collection of dependencies as part of the task definition itself and using this instead of walking the graph. This is exactly what the SubgraphCallable is already doing.

Do we have a measure of how much we're spending on this when we call compute for, say, a 100k keys graph?

I don't have a measure but performance is only a secondary point. I'm not concerned about performance. This is about complexity reduction

Important to point out that, if we did this, we'd not be able to remove all complexity related to dealing with tuples any time soon, because of backwards compatibility with third-party dsk's.

The existing logic that is currently walking the graph can be used to convert to the new format while raising a DeprecationWarning. I agree this is nontrivial work but it can be rolled out incrementally

None of the current design feels on fire to me, but I may be wrong.

When dealing with serialization and graph construction/materialization this is a painful issue. Everything I described above comes from pain I encountered while dealing with HLGs, either while writing them old-style but also while working on dask/distributed#6028
Apart from a couple of key collision issues like dask/distributed#3965 / dask/distributed#7551 this is mostly about maintenance and developer velocity.

I also believe that our current serialization protocol could be substantially simplified but I haven't done the work to provide details.

Note: As most people are likely aware this change would mostly benefit dask/distributed but a lot of the work would happen in dask/dask which is why I posted it in this repo

@fjetter
Copy link
Member Author

fjetter commented Oct 25, 2024

this isn't entirely done yet

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core discussion Discussing a topic with no specific actions yet enhancement Improve existing functionality or make things work better highlevelgraph Issues relating to HighLevelGraphs. needs attention It's been a while since this was pushed on. Needs attention from the owner or a maintainer. scheduler
Projects
None yet
4 participants