Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

key should be given by "udf" #438

Open
github-actions bot opened this issue May 9, 2024 · 0 comments
Open

key should be given by "udf" #438

github-actions bot opened this issue May 9, 2024 · 0 comments
Labels

Comments

@github-actions
Copy link
Contributor

github-actions bot commented May 9, 2024

key should be given by "udf"

UDF specifies reducer function

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from pywy.graph.graph import Graph
from pywy.graph.traversal import Traversal
from pywy.protobuf.planwriter import MessageWriter
from pywy.orchestrator.operator import Operator
import pywy.orchestrator.operator
import itertools
import collections
import logging
from functools import reduce


# Wraps a Source operation to create an iterable
class DataQuantaBuilder:
    def __init__(self, descriptor):
        self.descriptor = descriptor

    def source(self, source):

        if type(source) is str:
            source_ori = open(source, "r")
        else:
            source_ori = source
        return DataQuanta(
            Operator(
                operator_type="source",
                udf=source,
                iterator=iter(source_ori),
                previous=[],
                python_exec=False
            ),
            descriptor=self.descriptor
        )


# Wraps an operation over an iterable
class DataQuanta:
    def __init__(self, operator=None, descriptor=None):
        self.operator = operator
        self.descriptor = descriptor
        if self.operator.is_source():
            self.descriptor.add_source(self.operator)
        if self.operator.is_sink():
            self.descriptor.add_sink(self.operator)

    # Operational Functions
    def filter(self, udf):
        def func(iterator):
            return filter(udf, iterator)

        return DataQuanta(
            Operator(
                operator_type="filter",
                udf=func,
                previous=[self.operator],
                python_exec=True
            ),
            descriptor=self.descriptor
        )

    def flatmap(self, udf):

        def auxfunc(iterator):
            return itertools.chain.from_iterable(map(udf, iterator))

        def func(iterator):
            mapped = map(udf, iterator)
            flattened = flatten_single_dim(mapped)
            yield from flattened

        def flatten_single_dim(mapped):
            for item in mapped:
                for subitem in item:
                    yield subitem

        return DataQuanta(
            Operator(
                operator_type="flatmap",
                udf=func,
                previous=[self.operator],
                python_exec=True
            ),
            descriptor=self.descriptor
        )

    def group_by(self, udf):
        def func(iterator):
            # TODO key should be given by "udf"
            return itertools.groupby(iterator, key=operator.itemgetter(0))
            #return itertools.groupby(sorted(iterator), key=itertools.itemgetter(0))

        return DataQuanta(
            Operator(
                operator_type="group_by",
                udf=func,
                previous=[self.operator],
                python_exec=True
            ),
            descriptor=self.descriptor
        )

    def map(self, udf):
        def func(iterator):
            return map(udf, iterator)

        return DataQuanta(
            Operator(
                operator_type="map",
                udf=func,
                previous=[self.operator],
                python_exec=True
            ),
            descriptor=self.descriptor
        )

    # Key specifies pivot dimensions
    # UDF specifies reducer function
    def reduce_by_key(self, keys, udf):

        op = Operator(
            operator_type="reduce_by_key",
            udf=udf,
            previous=[self.operator],
            python_exec=False
        )

        print(len(keys), keys)
        for i in range(0, len(keys)):
            """if keys[i] is int:
                op.set_parameter("vector_position|"+str(i), keys[i])
            else:
                op.set_parameter("dimension_key|"+str(i), keys[i])"""

            # TODO maybe would be better just leave the number as key
            op.set_parameter("dimension|"+str(i+1), keys[i])

        return DataQuanta(
            op,
            descriptor=self.descriptor
        )

    def reduce(self, udf):
        def func(iterator):
            return reduce(udf, iterator)

        return DataQuanta(
            Operator(
                operator_type="reduce",
                udf=func,
                previous=[self.operator],
                python_exec=True
            ),
            descriptor=self.descriptor
        )

    def sink(self, path, end="\n"):
        def consume(iterator):
            with open(path, 'w') as f:
                for x in iterator:
                    f.write(str(x) + end)

        def func(iterator):
            consume(iterator)
            # return self.__run(consume)

        return DataQuanta(
            Operator(
                operator_type="sink",

                udf=path,
                # To execute directly uncomment
                # udf=func,

                previous=[self.operator],
                python_exec=False
            ),
            descriptor=self.descriptor
        )

    def sort(self, udf):

        def func(iterator):
            return sorted(iterator, key=udf)

        return DataQuanta(
            Operator(
                operator_type="sort",
                udf=func,
                previous=[self.operator],
                python_exec=True
            ),
            descriptor=self.descriptor
        )

    # This function allow the union to be performed by Python
    # Nevertheless, current configuration runs it over Java
    def union(self, other):

        def func(iterator):
            return itertools.chain(iterator, other.operator.getIterator())

        return DataQuanta(
            Operator(
                operator_type="union",
                udf=func,
                previous=[self.operator, other.operator],
                python_exec=False
            ),
            descriptor=self.descriptor
        )

    def __run(self, consumer):
        consumer(self.operator.getIterator())

    # Execution Functions
    def console(self, end="\n"):
        def consume(iterator):
            for x in iterator:
                print(x, end=end)

        self.__run(consume)

    # Only for debugging purposes!
    # To execute the plan directly in the program driver
    def execute(self):
        logging.warn("DEBUG Execution")
        logging.info("Reminder to swap SINK UDF value from path to func")
        logging.debug(self.operator.previous[0].operator_type)
        if self.operator.is_sink():
            logging.debug(self.operator.operator_type)
            logging.debug(self.operator.udf)
            logging.debug(len(self.operator.previous))
            self.operator.udf(self.operator.previous[0].getIterator())
        else:
            logging.error("Plan must call execute from SINK type of operator")
            raise RuntimeError

    # Converts Python Functional Plan to valid Wayang Plan
    def to_wayang_plan(self):

        sinks = self.descriptor.get_sinks()
        if len(sinks) == 0:
            return

        graph = Graph()
        graph.populate(self.descriptor.get_sinks())

        # Uncomment to check the Graph built
        # graph.print_adjlist()

        # Function to be consumed by Traverse
        # Separates Python Plan into a List of Pipelines
        def define_pipelines(node1, current_pipeline, collection):
            def store_unique(pipe_to_insert):
                for pipe in collection:
                    if equivalent_lists(pipe, pipe_to_insert):
                        return
                collection.append(pipe_to_insert)

            def equivalent_lists(l1, l2):
                if collections.Counter(l1) == collections.Counter(l2):
                    return True
                else:
                    return False

            if not current_pipeline:
                current_pipeline = [node1]

            elif node1.operator.is_boundary():
                store_unique(current_pipeline.copy())
                current_pipeline.clear()
                current_pipeline.append(node1)

            else:
                current_pipeline.append(node1)

            if node1.operator.sink:
                store_unique(current_pipeline.copy())
                current_pipeline.clear()

            return current_pipeline

        # Works over the graph
        trans = Traversal(
            graph=graph,
            origin=self.descriptor.get_sources(),
            # udf=lambda x, y, z: d(x, y, z)
            # UDF always will receive:
            # x: a Node object,
            # y: an object representing the result of the last iteration,
            # z: a collection to store final results inside your UDF
            udf=lambda x, y, z: define_pipelines(x, y, z)
        )

        # Gets the results of the traverse process
        collected_stages = trans.get_collected_data()

        # Passing the Stages to a Wayang message writer
        writer = MessageWriter()
        a = 0
        # Stage is composed of class Node objects
        for stage in collected_stages:
            a += 1
            logging.info("///")
            logging.info("stage" + str(a))
            writer.process_pipeline(stage)

        writer.set_dependencies()

        # Uses a file to provide the plan
        # writer.write_message(self.descriptor)

        # Send the plan to Wayang REST api directly
        writer.send_message(self.descriptor)

c1f2ed0fa73244efb915223a8c6334a93055dc1c

@github-actions github-actions bot added the todo label May 9, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

0 participants