Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle doubly connected operators correctly in the stage assignment #44

Open
sekruse opened this issue Apr 6, 2017 · 2 comments
Open
Labels

Comments

@sekruse
Copy link
Member

sekruse commented Apr 6, 2017

The following code

    @Test
    public void testTightBroadcast() {
        RheemContext rheemContext = new RheemContext().with(Java.basicPlugin());
        JavaPlanBuilder javaPlanBuilder = new JavaPlanBuilder(rheemContext);

        LoadCollectionDataQuantaBuilder<Integer> inputDataQuanta =
                javaPlanBuilder.loadCollection(Arrays.asList(1, 2, 3));
        Collection<Integer> result = inputDataQuanta
                .map(x -> x + 1)
                .withBroadcast(inputDataQuanta, "inputDataQuanta")
                .collect();

        Assert.assertEquals(RheemCollections.asSet(2, 3, 4), RheemCollections.asSet(result));
    }

produces this error

Caused by: java.lang.AssertionError
	at org.qcri.rheem.core.optimizer.enumeration.StageAssignmentTraversal.<init>(StageAssignmentTraversal.java:92)
	at org.qcri.rheem.core.optimizer.enumeration.StageAssignmentTraversal.assignStages(StageAssignmentTraversal.java:112)
	at org.qcri.rheem.core.plan.executionplan.ExecutionPlan.createFrom(ExecutionPlan.java:222)
	at org.qcri.rheem.core.api.Job.createInitialExecutionPlan(Job.java:382)
	at org.qcri.rheem.core.api.Job.doExecute(Job.java:247)
	... 37 more

Apparently, this problem appears because inputDataQuanta and the map call are connected twice: via the regular data flow and via the broadcast. If one inserts a map(x->x) before the map call or before broadcasting, the example works fine.

The above test can be used to reproduce the bug and should be fixed.

@sekruse sekruse added the bug label Apr 6, 2017
@sekruse
Copy link
Member Author

sekruse commented Apr 6, 2017

It seems that the ExecutionTaskFlow is already not assembled correctly, as the following log message suggests:

[WARN] org.qcri.rheem.core.optimizer.enumeration.ExecutionTaskFlow - T[JavaMap[1+1->1, id=415b0b49]] has missing input channels among [null, CollectionChannel[T[JavaCollectionSource[0->1, id=4f49f6af]]->[T[JavaMap[1+1->1, id=415b0b49]]]]].

In this instance, only the broadcast is registered correctly, but the regular map input is missing.

@sekruse
Copy link
Member Author

sekruse commented Apr 6, 2017

I started working on this issue in branch rheem-44.

I pinpointed that both the optimizer and executor assume that each input channel is only fed once into each operator. Above code does break this assumption. I added changes to make the optimizer aware of the possibility of accessing an input channel twice. However, above test still fails during the execution (in the maintenance of the execution lineage). I stop working on this now for two reasons:

  1. I feel that there are much more potential problems with consuming a channel twice. If we do not fix all of them, we might run into bugs later, that are then even harder to spot and reproduce (e.g., in the re-optimization).
  2. The issue is too specific and the work-around to easy to put a lot of effort in.

So, please feel free to pick up this issue if you are feeling like it. 😉

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant