diff --git a/setup.cfg b/setup.cfg index 4a639876..47f09686 100644 --- a/setup.cfg +++ b/setup.cfg @@ -7,4 +7,4 @@ line_length = 88 lines_between_types = 1 combine_as_imports = True default_section = THIRDPARTY -known_first_party = functions_framework, google.cloud.functions +known_first_party = functions_framework diff --git a/setup.py b/setup.py index d25934bb..566a2425 100644 --- a/setup.py +++ b/setup.py @@ -46,16 +46,19 @@ ], keywords="functions-framework", packages=find_packages(where="src"), - namespace_packages=["google", "google.cloud"], package_dir={"": "src"}, python_requires=">=3.5, <4", install_requires=[ + "grpcio==1.54.2", "flask>=1.0,<3.0", "click>=7.0,<9.0", - "watchdog>=1.0.0,<2.0.0", + "uvicorn>=0.22.0", "gunicorn>=19.2.0,<21.0; platform_system!='Windows'", "cloudevents>=1.2.0,<2.0.0", - "dapr>=1.6.0", + "dapr>=1.10.0", + "aiohttp==3.8.4", + "dapr-ext-grpc>=1.10.0", + "dapr-ext-fastapi>=1.10.0" ], entry_points={ "console_scripts": [ diff --git a/src/functions_framework/__init__.py b/src/functions_framework/__init__.py index a8b18e19..136f540a 100644 --- a/src/functions_framework/__init__.py +++ b/src/functions_framework/__init__.py @@ -1,4 +1,4 @@ -# Copyright 2020 Google LLC +# Copyright 2023 The OpenFunction Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -11,359 +11,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import functools -import io -import json -import logging -import os.path -import pathlib -import sys - -import cloudevents.exceptions as cloud_exceptions -import flask -import werkzeug - -from cloudevents.http import from_http, is_binary - -from functions_framework import _function_registry, event_conversion -from functions_framework.background_event import BackgroundEvent -from functions_framework.exceptions import ( - EventConversionException, - FunctionsFrameworkException, - MissingSourceException, -) -from google.cloud.functions.context import Context -from openfunction.dapr_output_middleware import dapr_output_middleware -from openfunction.async_server import AsyncApp - -MAX_CONTENT_LENGTH = 10 * 1024 * 1024 - -_FUNCTION_STATUS_HEADER_FIELD = "X-Google-Status" -_CRASH = "crash" - -_CLOUDEVENT_MIME_TYPE = "application/cloudevents+json" - - -class _LoggingHandler(io.TextIOWrapper): - """Logging replacement for stdout and stderr in GCF Python 3.7.""" - - def __init__(self, level, stderr=sys.stderr): - io.TextIOWrapper.__init__(self, io.StringIO(), encoding=stderr.encoding) - self.level = level - self.stderr = stderr - - def write(self, out): - payload = dict(severity=self.level, message=out.rstrip("\n")) - return self.stderr.write(json.dumps(payload) + "\n") - - -def cloud_event(func): - """Decorator that registers cloudevent as user function signature type.""" - _function_registry.REGISTRY_MAP[ - func.__name__ - ] = _function_registry.CLOUDEVENT_SIGNATURE_TYPE - - @functools.wraps(func) - def wrapper(*args, **kwargs): - return func(*args, **kwargs) - - return wrapper - - -def http(func): - """Decorator that registers http as user function signature type.""" - _function_registry.REGISTRY_MAP[ - func.__name__ - ] = _function_registry.HTTP_SIGNATURE_TYPE - - @functools.wraps(func) - def wrapper(*args, **kwargs): - return func(*args, **kwargs) - - return wrapper - - -def setup_logging(): - logging.getLogger().setLevel(logging.INFO) - info_handler = logging.StreamHandler(sys.stdout) - info_handler.setLevel(logging.NOTSET) - info_handler.addFilter(lambda record: record.levelno <= logging.INFO) - logging.getLogger().addHandler(info_handler) - - warn_handler = logging.StreamHandler(sys.stderr) - warn_handler.setLevel(logging.WARNING) - logging.getLogger().addHandler(warn_handler) - - -def setup_logging_level(debug): - if debug: - logging.getLogger().setLevel(logging.DEBUG) - - -def _http_view_func_wrapper(function, request): - def view_func(path): - return function(request._get_current_object()) - - return view_func - - -def _run_cloud_event(function, request): - data = request.get_data() - event = from_http(request.headers, data) - function(event) - - -def _cloud_event_view_func_wrapper(function, request): - def view_func(path): - ce_exception = None - event = None - try: - event = from_http(request.headers, request.get_data()) - except ( - cloud_exceptions.MissingRequiredFields, - cloud_exceptions.InvalidRequiredFields, - ) as e: - ce_exception = e - - if not ce_exception: - function(event) - return "OK" - - # Not a CloudEvent. Try converting to a CloudEvent. - try: - function(event_conversion.background_event_to_cloud_event(request)) - except EventConversionException as e: - flask.abort( - 400, - description=( - "Function was defined with FUNCTION_SIGNATURE_TYPE=cloudevent but" - " parsing CloudEvent failed and converting from background event to" - f" CloudEvent also failed.\nGot HTTP headers: {request.headers}\nGot" - f" data: {request.get_data()}\nGot CloudEvent exception: {repr(ce_exception)}" - f"\nGot background event conversion exception: {repr(e)}" - ), - ) - return "OK" - - return view_func - - -def _event_view_func_wrapper(function, request): - def view_func(path): - if event_conversion.is_convertable_cloud_event(request): - # Convert this CloudEvent to the equivalent background event data and context. - data, context = event_conversion.cloud_event_to_background_event(request) - function(data, context) - elif is_binary(request.headers): - # Support CloudEvents in binary content mode, with data being the - # whole request body and context attributes retrieved from request - # headers. - data = request.get_data() - context = Context( - eventId=request.headers.get("ce-eventId"), - timestamp=request.headers.get("ce-timestamp"), - eventType=request.headers.get("ce-eventType"), - resource=request.headers.get("ce-resource"), - ) - function(data, context) - else: - # This is a regular CloudEvent - event_data = event_conversion.marshal_background_event_data(request) - if not event_data: - flask.abort(400) - event_object = BackgroundEvent(**event_data) - data = event_object.data - context = Context(**event_object.context) - function(data, context) - - return "OK" - - return view_func - - -def _configure_app(app, function, signature_type, func_context): - # Mount the function at the root. Support GCF's default path behavior - # Modify the url_map and view_functions directly here instead of using - # add_url_rule in order to create endpoints that route all methods - if signature_type == _function_registry.HTTP_SIGNATURE_TYPE: - app.url_map.add( - werkzeug.routing.Rule("/", defaults={"path": ""}, endpoint="run") - ) - app.url_map.add(werkzeug.routing.Rule("/robots.txt", endpoint="error")) - app.url_map.add(werkzeug.routing.Rule("/favicon.ico", endpoint="error")) - app.url_map.add(werkzeug.routing.Rule("/", endpoint="run")) - app.view_functions["run"] = _http_view_func_wrapper(function, flask.request) - app.view_functions["error"] = lambda: flask.abort(404, description="Not Found") - app.after_request(read_request) - app.after_request(dapr_output_middleware(func_context)) - elif signature_type == _function_registry.BACKGROUNDEVENT_SIGNATURE_TYPE: - app.url_map.add( - werkzeug.routing.Rule( - "/", defaults={"path": ""}, endpoint="run", methods=["POST"] - ) - ) - app.url_map.add( - werkzeug.routing.Rule("/", endpoint="run", methods=["POST"]) - ) - app.view_functions["run"] = _event_view_func_wrapper(function, flask.request) - # Add a dummy endpoint for GET / - app.url_map.add(werkzeug.routing.Rule("/", endpoint="get", methods=["GET"])) - app.view_functions["get"] = lambda: "" - elif signature_type == _function_registry.CLOUDEVENT_SIGNATURE_TYPE: - app.url_map.add( - werkzeug.routing.Rule( - "/", defaults={"path": ""}, endpoint=signature_type, methods=["POST"] - ) - ) - app.url_map.add( - werkzeug.routing.Rule( - "/", endpoint=signature_type, methods=["POST"] - ) - ) - - app.view_functions[signature_type] = _cloud_event_view_func_wrapper( - function, flask.request - ) - else: - raise FunctionsFrameworkException( - "Invalid signature type: {signature_type}".format( - signature_type=signature_type - ) - ) - - -def read_request(response): - """ - Force the framework to read the entire request before responding, to avoid - connection errors when returning prematurely. - """ - - flask.request.get_data() - return response - - -def crash_handler(e): - """ - Return crash header to allow logging 'crash' message in logs. - """ - return str(e), 500, {_FUNCTION_STATUS_HEADER_FIELD: _CRASH} - -def create_async_app(target=None, source=None, func_context=None, debug=False): - target = _function_registry.get_function_target(target) - source = _function_registry.get_function_source(source) - - if not os.path.exists(source): - raise MissingSourceException( - "File {source} that is expected to define function doesn't exist".format( - source=source - ) - ) - - source_module, spec = _function_registry.load_function_module(source) - spec.loader.exec_module(source_module) - - function = _function_registry.get_user_function(source, source_module, target) - - setup_logging_level(debug) - - async_app = AsyncApp(func_context) - async_app.bind(function) - - return async_app.app - - -def create_app(target=None, source=None, signature_type=None, func_context=None, debug=False): - target = _function_registry.get_function_target(target) - source = _function_registry.get_function_source(source) - - # Set the template folder relative to the source path - # Python 3.5: join does not support PosixPath - template_folder = str(pathlib.Path(source).parent / "templates") - - if not os.path.exists(source): - raise MissingSourceException( - "File {source} that is expected to define function doesn't exist".format( - source=source - ) - ) - - source_module, spec = _function_registry.load_function_module(source) - - # Create the application - _app = flask.Flask(target, template_folder=template_folder) - _app.config["MAX_CONTENT_LENGTH"] = MAX_CONTENT_LENGTH - _app.register_error_handler(500, crash_handler) - global errorhandler - errorhandler = _app.errorhandler - - # Handle legacy GCF Python 3.7 behavior - if os.environ.get("ENTRY_POINT"): - os.environ["FUNCTION_NAME"] = os.environ.get("K_SERVICE", target) - _app.make_response_original = _app.make_response - - def handle_none(rv): - if rv is None: - rv = "OK" - return _app.make_response_original(rv) - - _app.make_response = handle_none - - # Handle log severity backwards compatibility - sys.stdout = _LoggingHandler("INFO", sys.stderr) - sys.stderr = _LoggingHandler("ERROR", sys.stderr) - setup_logging() - - setup_logging_level(debug) - - # Execute the module, within the application context - with _app.app_context(): - spec.loader.exec_module(source_module) - - # Get the configured function signature type - signature_type = _function_registry.get_func_signature_type(target, signature_type) - function = _function_registry.get_user_function(source, source_module, target) - - _configure_app(_app, function, signature_type, func_context) - - return _app - - -class LazyWSGIApp: - """ - Wrap the WSGI app in a lazily initialized wrapper to prevent initialization - at import-time - """ - - def __init__(self, target=None, source=None, signature_type=None, func_context=None, debug=False): - # Support HTTP frameworks which support WSGI callables. - # Note: this ability is currently broken in Gunicorn 20.0, and - # environment variables should be used for configuration instead: - # https://github.com/benoitc/gunicorn/issues/2159 - self.target = target - self.source = source - self.signature_type = signature_type - self.func_context = func_context - self.debug = debug - - # Placeholder for the app which will be initialized on first call - self.app = None - - def __call__(self, *args, **kwargs): - if not self.app: - self.app = create_app(self.target, self.source, self.signature_type, self.func_context, self.debug) - return self.app(*args, **kwargs) - - -app = LazyWSGIApp() - - -class DummyErrorHandler: - def __init__(self): - pass - - def __call__(self, *args, **kwargs): - return self - - -errorhandler = DummyErrorHandler() diff --git a/src/functions_framework/__main__.py b/src/functions_framework/__main__.py index 5f2e710c..3676cfdd 100644 --- a/src/functions_framework/__main__.py +++ b/src/functions_framework/__main__.py @@ -1,4 +1,4 @@ -# Copyright 2020 Google LLC +# Copyright 2023 The OpenFunction Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - from functions_framework._cli import _cli _cli(prog_name="python -m functions_framework") diff --git a/src/functions_framework/_cli.py b/src/functions_framework/_cli.py index 9699a5c2..0dfc3795 100644 --- a/src/functions_framework/_cli.py +++ b/src/functions_framework/_cli.py @@ -1,4 +1,4 @@ -# Copyright 2020 Google LLC +# Copyright 2023 The OpenFunction Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -11,44 +11,25 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import os - import click -from functions_framework import create_app, create_async_app -from functions_framework._http import create_server from functions_framework import _function_registry +from functions_framework.runner import Runner + @click.command() @click.option("--target", envvar="FUNCTION_TARGET", type=click.STRING, required=True) @click.option("--source", envvar="FUNCTION_SOURCE", type=click.Path(), default=None) -@click.option( - "--signature-type", - envvar="FUNCTION_SIGNATURE_TYPE", - type=click.Choice(["http", "event", "cloudevent"]), - default="http", -) @click.option("--host", envvar="HOST", type=click.STRING, default="0.0.0.0") @click.option("--port", envvar="PORT", type=click.INT, default=8080) @click.option("--debug", envvar="DEBUG", is_flag=True) @click.option("--dry-run", envvar="DRY_RUN", is_flag=True) -def _cli(target, source, signature_type, host, port, debug, dry_run): - context = _function_registry.get_openfunction_context(None) - - # determine if async or knative - if context and context.is_runtime_async(): - app = create_async_app(target, source, context, debug) - if dry_run: - run_dry(target, host, port) - else: - app.run(context.port) - else: - app = create_app(target, source, signature_type, context, debug) - if dry_run: - run_dry(target, host, port) - else: - create_server(app, debug).run(host, port) +def _cli(target, source, host, port, debug, dry_run): + # fetch the context + context = _function_registry.get_openfunction_context('') + + runner = Runner(context, target, source, host, port, debug, dry_run) + runner.run() def run_dry(target, host, port): diff --git a/src/functions_framework/_function_registry.py b/src/functions_framework/_function_registry.py index fc3a39f6..4759b78f 100644 --- a/src/functions_framework/_function_registry.py +++ b/src/functions_framework/_function_registry.py @@ -1,4 +1,4 @@ -# Copyright 2021 Google LLC +# Copyright 2023 The OpenFunction Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,19 +12,21 @@ # See the License for the specific language governing permissions and # limitations under the License. import importlib.util +import inspect +import json import os import sys import types -import json +from functions_framework.context.function_context import FunctionContext +from functions_framework.context.user_context import UserContext from functions_framework.exceptions import ( InvalidConfigurationException, + InvalidFunctionSignatureException, InvalidTargetTypeException, MissingTargetException, ) -from openfunction.function_context import FunctionContext - DEFAULT_SOURCE = os.path.realpath("./main.py") FUNCTION_SIGNATURE_TYPE = "FUNCTION_SIGNATURE_TYPE" @@ -38,6 +40,14 @@ REGISTRY_MAP = {} +# Default function signature rule. +def __function_signature_rule__(context: UserContext): + pass + + +FUNCTION_SIGNATURE_RULE = inspect.signature(__function_signature_rule__) + + def get_user_function(source, source_module, target): """Returns user function, raises exception for invalid function.""" # Extract the target function from the source file @@ -56,6 +66,15 @@ def get_user_function(source, source_module, target): source=source, target=target, target_type=type(function) ) ) + + if FUNCTION_SIGNATURE_RULE != inspect.signature(function): + raise InvalidFunctionSignatureException( + "The function defined in file {source} as {target} needs to be of " + "function signature {signature}, but got {target_signature}".format( + source=source, target=target, signature=FUNCTION_SIGNATURE_RULE, + target_signature=inspect.signature(function)) + ) + return function diff --git a/src/functions_framework/background_event.py b/src/functions_framework/background_event.py deleted file mode 100644 index be01960b..00000000 --- a/src/functions_framework/background_event.py +++ /dev/null @@ -1,44 +0,0 @@ -# Copyright 2021 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -class BackgroundEvent(object): - """BackgroundEvent is an event passed to GCF background event functions. - - Background event functions take data and context as parameters, both of - which this class represents. By contrast, CloudEvent functions take a - single CloudEvent object as their parameter. This class does not represent - CloudEvents. - """ - - # Supports v1beta1, v1beta2, and v1 event formats. - def __init__( - self, - context=None, - data="", - eventId="", - timestamp="", - eventType="", - resource="", - **kwargs, - ): - self.context = context - if not self.context: - self.context = { - "eventId": eventId, - "timestamp": timestamp, - "eventType": eventType, - "resource": resource, - } - self.data = data diff --git a/src/google/cloud/__init__.py b/src/functions_framework/constants.py similarity index 72% rename from src/google/cloud/__init__.py rename to src/functions_framework/constants.py index 72a55585..df946087 100644 --- a/src/google/cloud/__init__.py +++ b/src/functions_framework/constants.py @@ -1,4 +1,4 @@ -# Copyright 2020 Google LLC +# Copyright 2023 The OpenFunction Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -11,12 +11,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +DEFAULT_DAPR_APP_PORT = 50051 +DEFAULT_HTTP_APP_PORT = 8080 -try: - import pkg_resources +DAPR_BINDING_TYPE = "bindings" +DAPR_PUBSUB_TYPE = "pubsub" - pkg_resources.declare_namespace(__name__) -except ImportError: - import pkgutil - - __path__ = pkgutil.extend_path(__path__, __name__) +DEFAULT_DATA_CONTENT_TYPE = "application/json" diff --git a/src/google/cloud/functions_v1/__init__.py b/src/functions_framework/context/__init__.py similarity index 92% rename from src/google/cloud/functions_v1/__init__.py rename to src/functions_framework/context/__init__.py index 6913f02e..136f540a 100644 --- a/src/google/cloud/functions_v1/__init__.py +++ b/src/functions_framework/context/__init__.py @@ -1,4 +1,4 @@ -# Copyright 2020 Google LLC +# Copyright 2023 The OpenFunction Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/src/functions_framework/context/function_context.py b/src/functions_framework/context/function_context.py new file mode 100644 index 00000000..687c9603 --- /dev/null +++ b/src/functions_framework/context/function_context.py @@ -0,0 +1,148 @@ +# Copyright 2023 The OpenFunction Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from functions_framework.constants import DAPR_BINDING_TYPE, DAPR_PUBSUB_TYPE + + +class FunctionContext(object): + """OpenFunction's serving context.""" + + def __init__(self, name="", version="", dapr_triggers=None, http_trigger=None, + inputs=None, outputs=None, states=None, + pre_hooks=None, post_hooks=None, tracing=None, port=0): + self.name = name + self.version = version + self.dapr_triggers = dapr_triggers + self.http_trigger = http_trigger + self.inputs = inputs + self.outputs = outputs + self.states = states + self.pre_hooks = pre_hooks + self.post_hooks = post_hooks + self.tracing = tracing + self.port = port + + @staticmethod + def from_json(json_dct): + name = json_dct.get('name') + version = json_dct.get('version') + inputs_map = json_dct.get('inputs') + outputs_map = json_dct.get('outputs') + _dapr_triggers = json_dct.get('triggers', {}).get('dapr', []) + http_trigger = json_dct.get('triggers', {}).get('http', None) + states = json_dct.get('states', {}) + pre_hooks = json_dct.get('pre_hooks', []) + post_hooks = json_dct.get('post_hooks', []) + tracing = json_dct.get('tracing', {}) + port = json_dct.get('port', 0) + + inputs = None + if inputs_map: + inputs = {} + for k, v in inputs_map.items(): + _input = Component.from_json(v) + inputs[k] = _input + + outputs = None + if outputs_map: + outputs = {} + for k, v in outputs_map.items(): + output = Component.from_json(v) + outputs[k] = output + + dapr_triggers = [] + for trigger in _dapr_triggers: + dapr_triggers.append(DaprTrigger.from_json(trigger)) + + if http_trigger: + http_trigger = HTTPRoute.from_json(http_trigger) + + return FunctionContext(name, version, dapr_triggers, http_trigger, + inputs, outputs, states, pre_hooks, post_hooks, tracing, port) + + +class Component(object): + """Components for inputs and outputs.""" + + def __init__(self, component_name="", component_type="", topic="", metadata=None, operation=""): + self.topic = topic + self.component_name = component_name + self.component_type = component_type + self.metadata = metadata + self.operation = operation + + def get_type(self): + type_split = self.component_type.split(".") + if len(type_split) > 1: + t = type_split[0] + if t == DAPR_BINDING_TYPE or t == DAPR_PUBSUB_TYPE: + return t + + return "" + + def __str__(self): + return "{component_name: %s, component_type: %s, topic: %s, metadata: %s, operation: %s}" % ( + self.component_name, + self.component_type, + self.topic, + self.metadata, + self.operation + ) + + @staticmethod + def from_json(json_dct): + topic = json_dct.get('topic', '') + component_name = json_dct.get('componentName', '') + metadata = json_dct.get('metadata') + component_type = json_dct.get('componentType', '') + operation = json_dct.get('operation', '') + return Component(component_name, component_type, topic, metadata, operation) + + +class HTTPRoute(object): + """HTTP route.""" + + def __init__(self, port=""): + self.port = port + + def __str__(self): + return "{port: %s}" % ( + self.port + ) + + @staticmethod + def from_json(json_dct): + port = json_dct.get('port', '') + return HTTPRoute(port) + + +class DaprTrigger(object): + + def __init__(self, name, component_type, topic): + self.name = name + self.component_type = component_type + self.topic = topic + + def __str__(self): + return "{name: %s, component_type: %s, topic: %s}" % ( + self.name, + self.component_type, + self.topic + ) + + @staticmethod + def from_json(json_dct): + name = json_dct.get('name', '') + component_type = json_dct.get('type', '') + topic = json_dct.get('topic') + return DaprTrigger(name, component_type, topic) diff --git a/src/functions_framework/context/runtime_context.py b/src/functions_framework/context/runtime_context.py new file mode 100644 index 00000000..d69afdd9 --- /dev/null +++ b/src/functions_framework/context/runtime_context.py @@ -0,0 +1,55 @@ +# Copyright 2023 The OpenFunction Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from functions_framework.context.function_context import ( + Component, + DaprTrigger, + FunctionContext, + HTTPRoute, +) + + +class RuntimeContext: + """Context for runtime.""" + + def __init__(self, context: FunctionContext = None, logger=None): + self.context = context + self.logger = logger + + def __init_logger(self): + if self.logger: + self.logger.name = __name__ + + def has_http_trigger(self): + """Check if the function has http trigger.""" + return self.context and self.context.http_trigger + + def get_dapr_triggers(self) -> [DaprTrigger]: + """Get dapr trigger.""" + if self.context: + return self.context.dapr_triggers + else: + return [] + + def get_http_trigger(self) -> HTTPRoute: + """Get http trigger.""" + if self.context: + return self.context.http_trigger + else: + return None + + def get_outputs(self) -> [Component]: + if self.context and self.context.outputs: + return self.context.outputs + else: + return [] diff --git a/src/functions_framework/context/user_context.py b/src/functions_framework/context/user_context.py new file mode 100644 index 00000000..4d543d42 --- /dev/null +++ b/src/functions_framework/context/user_context.py @@ -0,0 +1,83 @@ +# Copyright 2023 The OpenFunction Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import copy +import json + +from dapr.clients import DaprClient + +from functions_framework import constants +from functions_framework.context.runtime_context import RuntimeContext +from functions_framework.exceptions import exception_handler +from functions_framework.openfunction.function_out import FunctionOut + + +class UserContext(object): + """Context for user.""" + + def __init__(self, runtime_context: RuntimeContext = None, + binding_request=None, topic_event=None, http_request=None, logger=None): + self.runtime_context = runtime_context + self.logger = logger + self.out = FunctionOut(0, None, "", {}) + self.dapr_client = None + self.__binding_request = binding_request + self.__topic_event = topic_event + self.__http_request = http_request + self.__init_dapr_client() + + def __init_dapr_client(self): + if not self.dapr_client: + self.dapr_client = DaprClient() + + def __init_logger(self): + if self.logger: + self.logger.name = __name__ + + def get_binding_request(self): + return copy.deepcopy(self.__binding_request) + + def get_topic_event(self): + return copy.deepcopy(self.__topic_event) + + def get_http_request(self): + return self.__http_request + + @exception_handler + def send(self, output_name, data): + """Send data to specify output component. + Args: + data: Bytes or str to send. + output_name: A string of designated output name. Only send this output if designated. + Returns: + Response from dapr. + """ + outputs = self.runtime_context.get_outputs() + resp = None + + if not outputs: + raise Exception("No outputs found.") + + if output_name not in outputs: + raise Exception("No output named {} found.".format(output_name)) + + target = outputs[output_name] + if target.component_type.startswith(constants.DAPR_BINDING_TYPE): + resp = self.dapr_client.invoke_binding(target.component_name, target.operation, data, target.metadata) + elif target.component_type.startswith(constants.DAPR_PUBSUB_TYPE): + data = json.dumps(data) + resp = self.dapr_client.publish_event( + target.component_name, target.topic, data, + data_content_type=constants.DEFAULT_DATA_CONTENT_TYPE, publish_metadata=target.metadata) + + return resp diff --git a/src/functions_framework/event_conversion.py b/src/functions_framework/event_conversion.py deleted file mode 100644 index 28cf2a1b..00000000 --- a/src/functions_framework/event_conversion.py +++ /dev/null @@ -1,346 +0,0 @@ -# Copyright 2021 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import re - -from datetime import datetime -from typing import Any, Optional, Tuple - -from cloudevents.exceptions import MissingRequiredFields -from cloudevents.http import CloudEvent, from_http, is_binary - -from functions_framework.background_event import BackgroundEvent -from functions_framework.exceptions import EventConversionException -from google.cloud.functions.context import Context - -_CLOUD_EVENT_SPEC_VERSION = "1.0" - -# Maps background/legacy event types to their equivalent CloudEvent types. -# For more info on event mappings see -# https://github.com/GoogleCloudPlatform/functions-framework-conformance/blob/master/docs/mapping.md -_BACKGROUND_TO_CE_TYPE = { - "google.pubsub.topic.publish": "google.cloud.pubsub.topic.v1.messagePublished", - "providers/cloud.pubsub/eventTypes/topic.publish": "google.cloud.pubsub.topic.v1.messagePublished", - "google.storage.object.finalize": "google.cloud.storage.object.v1.finalized", - "google.storage.object.delete": "google.cloud.storage.object.v1.deleted", - "google.storage.object.archive": "google.cloud.storage.object.v1.archived", - "google.storage.object.metadataUpdate": "google.cloud.storage.object.v1.metadataUpdated", - "providers/cloud.firestore/eventTypes/document.write": "google.cloud.firestore.document.v1.written", - "providers/cloud.firestore/eventTypes/document.create": "google.cloud.firestore.document.v1.created", - "providers/cloud.firestore/eventTypes/document.update": "google.cloud.firestore.document.v1.updated", - "providers/cloud.firestore/eventTypes/document.delete": "google.cloud.firestore.document.v1.deleted", - "providers/firebase.auth/eventTypes/user.create": "google.firebase.auth.user.v1.created", - "providers/firebase.auth/eventTypes/user.delete": "google.firebase.auth.user.v1.deleted", - "providers/google.firebase.analytics/eventTypes/event.log": "google.firebase.analytics.log.v1.written", - "providers/google.firebase.database/eventTypes/ref.create": "google.firebase.database.ref.v1.created", - "providers/google.firebase.database/eventTypes/ref.write": "google.firebase.database.ref.v1.written", - "providers/google.firebase.database/eventTypes/ref.update": "google.firebase.database.ref.v1.updated", - "providers/google.firebase.database/eventTypes/ref.delete": "google.firebase.database.ref.v1.deleted", - "providers/cloud.storage/eventTypes/object.change": "google.cloud.storage.object.v1.finalized", -} - -# _BACKGROUND_TO_CE_TYPE contains duplicate values for some keys. This set contains the duplicates -# that should be dropped when generating the inverse mapping _CE_TO_BACKGROUND_TYPE -_NONINVERTALBE_CE_TYPES = { - "providers/cloud.pubsub/eventTypes/topic.publish", - "providers/cloud.storage/eventTypes/object.change", -} - -# Maps CloudEvent types to the equivalent background/legacy event types (inverse -# of _BACKGROUND_TO_CE_TYPE) -_CE_TO_BACKGROUND_TYPE = { - v: k for k, v in _BACKGROUND_TO_CE_TYPE.items() if k not in _NONINVERTALBE_CE_TYPES -} - -# CloudEvent service names. -_FIREBASE_AUTH_CE_SERVICE = "firebaseauth.googleapis.com" -_FIREBASE_CE_SERVICE = "firebase.googleapis.com" -_FIREBASE_DB_CE_SERVICE = "firebasedatabase.googleapis.com" -_FIRESTORE_CE_SERVICE = "firestore.googleapis.com" -_PUBSUB_CE_SERVICE = "pubsub.googleapis.com" -_STORAGE_CE_SERVICE = "storage.googleapis.com" - -# Raw pubsub types -_PUBSUB_EVENT_TYPE = "google.pubsub.topic.publish" -_PUBSUB_MESSAGE_TYPE = "type.googleapis.com/google.pubsub.v1.PubsubMessage" - -_PUBSUB_TOPIC_REQUEST_PATH = re.compile(r"projects\/[^/?]+\/topics\/[^/?]+") - -# Maps background event services to their equivalent CloudEvent services. -_SERVICE_BACKGROUND_TO_CE = { - "providers/cloud.firestore/": _FIRESTORE_CE_SERVICE, - "providers/google.firebase.analytics/": _FIREBASE_CE_SERVICE, - "providers/firebase.auth/": _FIREBASE_AUTH_CE_SERVICE, - "providers/google.firebase.database/": _FIREBASE_DB_CE_SERVICE, - "providers/cloud.pubsub/": _PUBSUB_CE_SERVICE, - "providers/cloud.storage/": _STORAGE_CE_SERVICE, - "google.pubsub": _PUBSUB_CE_SERVICE, - "google.storage": _STORAGE_CE_SERVICE, -} - -# Maps CloudEvent service strings to regular expressions used to split a background -# event resource string into CloudEvent resource and subject strings. Each regex -# must have exactly two capture groups: the first for the resource and the second -# for the subject. -_CE_SERVICE_TO_RESOURCE_RE = { - _FIREBASE_CE_SERVICE: re.compile(r"^(projects/[^/]+)/(events/[^/]+)$"), - _FIREBASE_DB_CE_SERVICE: re.compile(r"^projects/_/(instances/[^/]+)/(refs/.+)$"), - _FIRESTORE_CE_SERVICE: re.compile( - r"^(projects/[^/]+/databases/\(default\))/(documents/.+)$" - ), - _STORAGE_CE_SERVICE: re.compile(r"^(projects/[^/]/buckets/[^/]+)/(objects/.+)$"), -} - -# Maps Firebase Auth background event metadata field names to their equivalent -# CloudEvent field names. -_FIREBASE_AUTH_METADATA_FIELDS_BACKGROUND_TO_CE = { - "createdAt": "createTime", - "lastSignedInAt": "lastSignInTime", -} -# Maps Firebase Auth CloudEvent metadata field names to their equivalent -# background event field names (inverse of _FIREBASE_AUTH_METADATA_FIELDS_BACKGROUND_TO_CE). -_FIREBASE_AUTH_METADATA_FIELDS_CE_TO_BACKGROUND = { - v: k for k, v in _FIREBASE_AUTH_METADATA_FIELDS_BACKGROUND_TO_CE.items() -} - - -def background_event_to_cloud_event(request) -> CloudEvent: - """Converts a background event represented by the given HTTP request into a CloudEvent.""" - event_data = marshal_background_event_data(request) - if not event_data: - raise EventConversionException("Failed to parse JSON") - - event_object = BackgroundEvent(**event_data) - data = event_object.data - context = Context(**event_object.context) - - if context.event_type not in _BACKGROUND_TO_CE_TYPE: - raise EventConversionException( - f'Unable to find CloudEvent equivalent type for "{context.event_type}"' - ) - new_type = _BACKGROUND_TO_CE_TYPE[context.event_type] - - service, resource, subject = _split_resource(context) - source = f"//{service}/{resource}" - - # Handle Pub/Sub events. - if service == _PUBSUB_CE_SERVICE: - if "messageId" not in data: - data["messageId"] = context.event_id - if "publishTime" not in data: - data["publishTime"] = context.timestamp - data = {"message": data} - - # Handle Firebase Auth events. - if service == _FIREBASE_AUTH_CE_SERVICE: - if "metadata" in data: - for old, new in _FIREBASE_AUTH_METADATA_FIELDS_BACKGROUND_TO_CE.items(): - if old in data["metadata"]: - data["metadata"][new] = data["metadata"][old] - del data["metadata"][old] - if "uid" in data: - uid = data["uid"] - subject = f"users/{uid}" - - # Handle Firebase DB events. - if service == _FIREBASE_DB_CE_SERVICE: - # The CE source of firebasedatabase CloudEvents includes location information - # that is inferred from the 'domain' field of legacy events. - if "domain" not in event_data: - raise EventConversionException( - "Invalid FirebaseDB event payload: missing 'domain'" - ) - - domain = event_data["domain"] - location = "us-central1" - if domain != "firebaseio.com": - location = domain.split(".")[0] - - resource = f"projects/_/locations/{location}/{resource}" - source = f"//{service}/{resource}" - - metadata = { - "id": context.event_id, - "time": context.timestamp, - "specversion": _CLOUD_EVENT_SPEC_VERSION, - "datacontenttype": "application/json", - "type": new_type, - "source": source, - } - - if subject: - metadata["subject"] = subject - - return CloudEvent(metadata, data) - - -def is_convertable_cloud_event(request) -> bool: - """Is the given request a known CloudEvent that can be converted to background event.""" - if is_binary(request.headers): - event_type = request.headers.get("ce-type") - event_source = request.headers.get("ce-source") - return ( - event_source is not None - and event_type is not None - and event_type in _CE_TO_BACKGROUND_TYPE - ) - return False - - -def _split_ce_source(source) -> Tuple[str, str]: - """Splits a CloudEvent source string into resource and subject components.""" - regex = re.compile(r"\/\/([^/]+)\/(.+)") - match = regex.fullmatch(source) - if not match: - raise EventConversionException("Unexpected CloudEvent source.") - - return match.group(1), match.group(2) - - -def cloud_event_to_background_event(request) -> Tuple[Any, Context]: - """Converts a background event represented by the given HTTP request into a CloudEvent.""" - try: - event = from_http(request.headers, request.get_data()) - data = event.data - service, name = _split_ce_source(event["source"]) - - if event["type"] not in _CE_TO_BACKGROUND_TYPE: - raise EventConversionException( - f'Unable to find background event equivalent type for "{event["type"]}"' - ) - - if service == _PUBSUB_CE_SERVICE: - resource = {"service": service, "name": name, "type": _PUBSUB_MESSAGE_TYPE} - if "message" in data: - data = data["message"] - if "messageId" in data: - del data["messageId"] - if "publishTime" in data: - del data["publishTime"] - elif service == _FIREBASE_AUTH_CE_SERVICE: - resource = name - if "metadata" in data: - for old, new in _FIREBASE_AUTH_METADATA_FIELDS_CE_TO_BACKGROUND.items(): - if old in data["metadata"]: - data["metadata"][new] = data["metadata"][old] - del data["metadata"][old] - elif service == _STORAGE_CE_SERVICE: - resource = { - "name": f"{name}/{event['subject']}", - "service": service, - "type": data["kind"], - } - elif service == _FIREBASE_DB_CE_SERVICE: - name = re.sub("/locations/[^/]+", "", name) - resource = f"{name}/{event['subject']}" - else: - resource = f"{name}/{event['subject']}" - - context = Context( - eventId=event["id"], - timestamp=event["time"], - eventType=_CE_TO_BACKGROUND_TYPE[event["type"]], - resource=resource, - ) - return (data, context) - except (AttributeError, KeyError, TypeError, MissingRequiredFields): - raise EventConversionException( - "Failed to convert CloudEvent to BackgroundEvent." - ) - - -def _split_resource(context: Context) -> Tuple[str, str, str]: - """Splits a background event's resource into a CloudEvent service, resource, and subject.""" - service = "" - resource = "" - if isinstance(context.resource, dict): - service = context.resource.get("service", "") - resource = context.resource["name"] - else: - resource = context.resource - - # If there's no service we'll choose an appropriate one based on the event type. - if not service: - for b_service, ce_service in _SERVICE_BACKGROUND_TO_CE.items(): - if context.event_type.startswith(b_service): - service = ce_service - break - if not service: - raise EventConversionException( - "Unable to find CloudEvent equivalent service " - f"for {context.event_type}" - ) - - # If we don't need to split the resource string then we're done. - if service not in _CE_SERVICE_TO_RESOURCE_RE: - return service, resource, "" - - # Split resource into resource and subject. - match = _CE_SERVICE_TO_RESOURCE_RE[service].fullmatch(resource) - if not match: - raise EventConversionException("Resource regex did not match") - - return service, match.group(1), match.group(2) - - -def marshal_background_event_data(request): - """Marshal the request body of a raw Pub/Sub HTTP request into the schema that is expected of - a background event""" - try: - request_data = request.get_json() - if not _is_raw_pubsub_payload(request_data): - # If this in not a raw Pub/Sub request, return the unaltered request data. - return request_data - return { - "context": { - "eventId": request_data["message"]["messageId"], - "timestamp": request_data["message"].get( - "publishTime", datetime.utcnow().isoformat() + "Z" - ), - "eventType": _PUBSUB_EVENT_TYPE, - "resource": { - "service": _PUBSUB_CE_SERVICE, - "type": _PUBSUB_MESSAGE_TYPE, - "name": _parse_pubsub_topic(request.path), - }, - }, - "data": { - "@type": _PUBSUB_MESSAGE_TYPE, - "data": request_data["message"]["data"], - "attributes": request_data["message"]["attributes"], - }, - } - except (AttributeError, KeyError, TypeError): - raise EventConversionException("Failed to convert Pub/Sub payload to event") - - -def _is_raw_pubsub_payload(request_data) -> bool: - """Does the given request body match the schema of a unmarshalled Pub/Sub request""" - return ( - request_data is not None - and "context" not in request_data - and "subscription" in request_data - and "message" in request_data - and "data" in request_data["message"] - and "messageId" in request_data["message"] - ) - - -def _parse_pubsub_topic(request_path) -> Optional[str]: - match = _PUBSUB_TOPIC_REQUEST_PATH.search(request_path) - if match: - return match.group(0) - else: - # It is possible to configure a Pub/Sub subscription to push directly to this function - # without passing the topic name in the URL path. - return "" diff --git a/src/functions_framework/exceptions.py b/src/functions_framework/exceptions.py index 671a28a4..95ea4d0b 100644 --- a/src/functions_framework/exceptions.py +++ b/src/functions_framework/exceptions.py @@ -1,4 +1,4 @@ -# Copyright 2020 Google LLC +# Copyright 2023 The OpenFunction Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -11,8 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - - class FunctionsFrameworkException(Exception): pass @@ -35,3 +33,17 @@ class MissingTargetException(FunctionsFrameworkException): class EventConversionException(FunctionsFrameworkException): pass + + +class InvalidFunctionSignatureException(FunctionsFrameworkException): + pass + + +def exception_handler(func): + def wrapper(*args, **kwargs): + try: + return func(*args, **kwargs) + except Exception as e: + return f"An error occurred: {e}" + + return wrapper diff --git a/src/functions_framework/log.py b/src/functions_framework/log.py new file mode 100644 index 00000000..477e0a57 --- /dev/null +++ b/src/functions_framework/log.py @@ -0,0 +1,48 @@ +# Copyright 2023 The OpenFunction Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging + + +def initialize_logger(name=None, level=logging.DEBUG): + if not name: + name = __name__ + _logger = logging.getLogger(name) + + # set logger level + _logger.setLevel(level) + + # create file handler + file_handler = logging.FileHandler("function.log") + file_handler.setLevel(level) + + # create console handler + console_handler = logging.StreamHandler() + console_handler.setLevel(level) + + # create formatter + formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") + + # add formatter to handlers + file_handler.setFormatter(formatter) + console_handler.setFormatter(formatter) + + # add handlers to logger + _logger.addHandler(file_handler) + _logger.addHandler(console_handler) + + return _logger + + +# initialize logger +logger = initialize_logger(__name__, logging.INFO) diff --git a/src/google/cloud/functions/__init__.py b/src/functions_framework/openfunction/__init__.py similarity index 92% rename from src/google/cloud/functions/__init__.py rename to src/functions_framework/openfunction/__init__.py index 6913f02e..136f540a 100644 --- a/src/google/cloud/functions/__init__.py +++ b/src/functions_framework/openfunction/__init__.py @@ -1,4 +1,4 @@ -# Copyright 2020 Google LLC +# Copyright 2023 The OpenFunction Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/src/functions_framework/openfunction/function_out.py b/src/functions_framework/openfunction/function_out.py new file mode 100644 index 00000000..fa833879 --- /dev/null +++ b/src/functions_framework/openfunction/function_out.py @@ -0,0 +1,49 @@ +# Copyright 2023 The OpenFunction Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +class FunctionOut: + def __init__(self, code: int, error, data, metadata): + self.__code = code + self.__error = error + self.__data = data + self.__metadata = metadata + + def __str__(self): + return f"FunctionOut(code={self.__code}, error={self.__error}, data={self.__data}, metadata={self.__metadata})" + + def __repr__(self): + return str(self) + + def set_code(self, code: int): + self.__code = code + + def get_code(self) -> int: + return self.__code + + def set_error(self, error): + self.__error = error + + def get_error(self): + return self.__error + + def set_data(self, data): + self.__data = data + + def get_data(self): + return self.__data + + def set_metadata(self, metadata): + self.__metadata = metadata + + def get_metadata(self): + return self.__metadata diff --git a/src/functions_framework/runner.py b/src/functions_framework/runner.py new file mode 100644 index 00000000..6ad12ea4 --- /dev/null +++ b/src/functions_framework/runner.py @@ -0,0 +1,77 @@ +# Copyright 2023 The OpenFunction Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +import os + +from dapr.ext.grpc import App + +from functions_framework import _function_registry, log +from functions_framework.context.function_context import FunctionContext +from functions_framework.context.runtime_context import RuntimeContext +from functions_framework.exceptions import MissingSourceException +from functions_framework.triggers.dapr_trigger.dapr import DaprTriggerHandler +from functions_framework.triggers.http_trigger.http import HTTPTriggerHandler + + +class Runner: + def __init__(self, context: FunctionContext, target=None, source=None, + host=None, port=None, debug=None, dry_run=None): + self.target = target + self.source = source + self.context = context + self.user_function = None + self.request = None + self.host = host + self.port = port + self.debug = debug + self.dry_run = dry_run + self.logger = None + self.load_user_function() + self.init_logger() + + def load_user_function(self): + _target = _function_registry.get_function_target(self.target) + _source = _function_registry.get_function_source(self.source) + + if not os.path.exists(_source): + raise MissingSourceException( + "File {source} that is expected to define function doesn't exist".format( + source=_source + ) + ) + + source_module, spec = _function_registry.load_function_module(_source) + spec.loader.exec_module(source_module) + + self.user_function = _function_registry.get_user_function(_source, source_module, _target) + + def init_logger(self): + level = logging.INFO + if self.debug: + level = logging.DEBUG + self.logger = log.initialize_logger(__name__, level) + + def run(self): + # convert to runtime context + runtime_context = RuntimeContext(self.context, self.logger) + + _trigger = runtime_context.get_http_trigger() + if _trigger: + http_trigger = HTTPTriggerHandler(self.context.port, _trigger, self.source, self.target, self.user_function) + http_trigger.start(runtime_context, logger=self.logger) + + _triggers = runtime_context.get_dapr_triggers() + if _triggers: + dapr_trigger = DaprTriggerHandler(self.context.port, _triggers, self.user_function) + dapr_trigger.start(runtime_context, logger=self.logger) diff --git a/src/google/cloud/functions_v1beta2/__init__.py b/src/functions_framework/triggers/__init__.py similarity index 92% rename from src/google/cloud/functions_v1beta2/__init__.py rename to src/functions_framework/triggers/__init__.py index 6913f02e..136f540a 100644 --- a/src/google/cloud/functions_v1beta2/__init__.py +++ b/src/functions_framework/triggers/__init__.py @@ -1,4 +1,4 @@ -# Copyright 2020 Google LLC +# Copyright 2023 The OpenFunction Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/src/google/cloud/functions/context.py b/src/functions_framework/triggers/dapr_trigger/__init__.py similarity index 76% rename from src/google/cloud/functions/context.py rename to src/functions_framework/triggers/dapr_trigger/__init__.py index 665d8b29..136f540a 100644 --- a/src/google/cloud/functions/context.py +++ b/src/functions_framework/triggers/dapr_trigger/__init__.py @@ -1,4 +1,4 @@ -# Copyright 2020 Google LLC +# Copyright 2023 The OpenFunction Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -11,9 +11,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -"""Definition of types used by Cloud Functions in Python..""" - -from google.cloud.functions_v1.context import Context - -__all__ = ["Context"] diff --git a/src/functions_framework/triggers/dapr_trigger/dapr.py b/src/functions_framework/triggers/dapr_trigger/dapr.py new file mode 100644 index 00000000..01a88976 --- /dev/null +++ b/src/functions_framework/triggers/dapr_trigger/dapr.py @@ -0,0 +1,55 @@ +# Copyright 2023 The OpenFunction Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from copy import deepcopy + +from cloudevents.sdk.event import v1 +from dapr.ext.grpc import App, BindingRequest + +from functions_framework import constants +from functions_framework.context.function_context import DaprTrigger +from functions_framework.context.runtime_context import RuntimeContext +from functions_framework.context.user_context import UserContext +from functions_framework.triggers.trigger import TriggerHandler + + +class DaprTriggerHandler(TriggerHandler): + """Handle dapr trigger.""" + def __init__(self, port, triggers: [DaprTrigger] = None, user_function=None): + self.port = port + self.triggers = triggers + self.app = App() + self.user_function = user_function + if self.port == 0: + self.port = constants.DEFAULT_DAPR_APP_PORT + + def start(self, context: RuntimeContext, logger=None): + if not self.triggers: + raise Exception("No triggers specified for DaprTriggerHandler") + + for trigger in self.triggers: + if trigger.component_type.startswith("bindings"): + @self.app.binding(trigger.name) + def binding_handler(request: BindingRequest): + rt_ctx = deepcopy(context) + user_ctx = UserContext(runtime_context=rt_ctx, binding_request=request, logger=logger) + self.user_function(user_ctx) + + if trigger.component_type.startswith("pubsub"): + @self.app.subscribe(pubsub_name=trigger.name, topic=trigger.topic) + def topic_handler(event: v1.Event): + rt_ctx = deepcopy(context) + user_ctx = UserContext(runtime_context=rt_ctx, topic_event=event, logger=logger) + self.user_function(user_ctx) + + self.app.run(self.port) diff --git a/src/functions_framework/triggers/http_trigger/__init__.py b/src/functions_framework/triggers/http_trigger/__init__.py new file mode 100644 index 00000000..f60f2693 --- /dev/null +++ b/src/functions_framework/triggers/http_trigger/__init__.py @@ -0,0 +1,217 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import functools +import io +import json +import logging +import os.path +import pathlib +import sys + +from copy import deepcopy + +import flask +import werkzeug + +from functions_framework import _function_registry +from functions_framework.context.runtime_context import RuntimeContext +from functions_framework.context.user_context import UserContext +from functions_framework.exceptions import MissingSourceException + +_FUNCTION_STATUS_HEADER_FIELD = "X-OpenFunction-Status" +_CRASH = "crash" + + +class _LoggingHandler(io.TextIOWrapper): + """Logging replacement for stdout and stderr in GCF Python 3.7.""" + + def __init__(self, level, stderr=sys.stderr): + io.TextIOWrapper.__init__(self, io.StringIO(), encoding=stderr.encoding) + self.level = level + self.stderr = stderr + + def write(self, out): + payload = dict(severity=self.level, message=out.rstrip("\n")) + return self.stderr.write(json.dumps(payload) + "\n") + + +def cloud_event(func): + """Decorator that registers cloudevent as user function signature type.""" + _function_registry.REGISTRY_MAP[ + func.__name__ + ] = _function_registry.CLOUDEVENT_SIGNATURE_TYPE + + @functools.wraps(func) + def wrapper(*args, **kwargs): + return func(*args, **kwargs) + + return wrapper + + +def http(func): + """Decorator that registers http as user function signature type.""" + _function_registry.REGISTRY_MAP[ + func.__name__ + ] = _function_registry.HTTP_SIGNATURE_TYPE + + @functools.wraps(func) + def wrapper(*args, **kwargs): + return func(*args, **kwargs) + + return wrapper + + +def setup_logging(): + logging.getLogger().setLevel(logging.INFO) + info_handler = logging.StreamHandler(sys.stdout) + info_handler.setLevel(logging.NOTSET) + info_handler.addFilter(lambda record: record.levelno <= logging.INFO) + logging.getLogger().addHandler(info_handler) + + warn_handler = logging.StreamHandler(sys.stderr) + warn_handler.setLevel(logging.WARNING) + logging.getLogger().addHandler(warn_handler) + + +def _http_view_func_wrapper(function, runtime_context: RuntimeContext, request, logger): + @functools.wraps(function) + def view_func(path): + rt_ctx = deepcopy(runtime_context) + user_ctx = UserContext(runtime_context=rt_ctx, http_request=request, logger=logger) + return function(user_ctx) + + return view_func + + +def _configure_app(wsgi_app, runtime_context: RuntimeContext, function, logger): + wsgi_app.url_map.add( + werkzeug.routing.Rule("/", defaults={"path": ""}, endpoint="run") + ) + wsgi_app.url_map.add(werkzeug.routing.Rule("/robots.txt", endpoint="error")) + wsgi_app.url_map.add(werkzeug.routing.Rule("/favicon.ico", endpoint="error")) + wsgi_app.url_map.add(werkzeug.routing.Rule("/", endpoint="run")) + wsgi_app.view_functions["run"] = _http_view_func_wrapper(function, runtime_context, flask.request, logger) + wsgi_app.view_functions["error"] = lambda: flask.abort(404, description="Not Found") + wsgi_app.after_request(read_request) + + +def read_request(response): + """ + Force the framework to read the entire request before responding, to avoid + connection errors when returning prematurely. Skipped on streaming responses + as these may continue to operate on the request after they are returned. + """ + + if not response.is_streamed: + flask.request.get_data() + + return response + + +def crash_handler(e): + """ + Return crash header to allow logging 'crash' message in logs. + """ + return str(e), 500, {_FUNCTION_STATUS_HEADER_FIELD: _CRASH} + + +def create_app(runtime_context: RuntimeContext = None, target=None, source=None, logger=None): + _target = _function_registry.get_function_target(target) + _source = _function_registry.get_function_source(source) + + # Set the template folder relative to the source path + # Python 3.5: join does not support PosixPath + template_folder = str(pathlib.Path(_source).parent / "templates") + + if not os.path.exists(_source): + raise MissingSourceException( + "File {source} that is expected to define function doesn't exist".format( + source=_source + ) + ) + + source_module, spec = _function_registry.load_function_module(_source) + + # Create the application + _app = flask.Flask(_target, template_folder=template_folder) + _app.register_error_handler(500, crash_handler) + global errorhandler + errorhandler = _app.errorhandler + + # Handle legacy GCF Python 3.7 behavior + if os.environ.get("ENTRY_POINT"): + os.environ["FUNCTION_NAME"] = os.environ.get("K_SERVICE", _target) + _app.make_response_original = _app.make_response + + def handle_none(rv): + if rv is None: + rv = "OK" + return _app.make_response_original(rv) + + _app.make_response = handle_none + + # Handle log severity backwards compatibility + sys.stdout = _LoggingHandler("INFO", sys.stderr) + sys.stderr = _LoggingHandler("ERROR", sys.stderr) + setup_logging() + + # Execute the module, within the application context + with _app.app_context(): + spec.loader.exec_module(source_module) + + # Get the configured function signature type + function = _function_registry.get_user_function(_source, source_module, _target) + + _configure_app(_app, runtime_context, function, logger) + + return _app + + +class LazyWSGIApp: + """ + Wrap the WSGI app in a lazily initialized wrapper to prevent initialization + at import-time + """ + + def __init__(self, target=None, source=None, signature_type=None): + # Support HTTP frameworks which support WSGI callables. + # Note: this ability is currently broken in Gunicorn 20.0, and + # environment variables should be used for configuration instead: + # https://github.com/benoitc/gunicorn/issues/2159 + self.target = target + self.source = source + self.signature_type = signature_type + + # Placeholder for the app which will be initialized on first call + self.app = None + + def __call__(self, *args, **kwargs): + if not self.app: + self.app = create_app(self.target, self.source, self.signature_type) + return self.app(*args, **kwargs) + + +app = LazyWSGIApp() + + +class DummyErrorHandler: + def __init__(self): + pass + + def __call__(self, *args, **kwargs): + return self + + +errorhandler = DummyErrorHandler() diff --git a/src/functions_framework/_http/__init__.py b/src/functions_framework/triggers/http_trigger/_http/__init__.py similarity index 81% rename from src/functions_framework/_http/__init__.py rename to src/functions_framework/triggers/http_trigger/_http/__init__.py index ca9b0f5c..6f7694b3 100644 --- a/src/functions_framework/_http/__init__.py +++ b/src/functions_framework/triggers/http_trigger/_http/__init__.py @@ -11,8 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -from functions_framework._http.flask import FlaskApplication +from functions_framework.triggers.http_trigger._http.flask import FlaskApplication class HTTPServer: @@ -25,7 +24,9 @@ def __init__(self, app, debug, **options): self.server_class = FlaskApplication else: try: - from functions_framework._http.gunicorn import GunicornApplication + from functions_framework.triggers.http_trigger._http.gunicorn import ( + GunicornApplication, + ) self.server_class = GunicornApplication except ImportError as e: @@ -39,4 +40,4 @@ def run(self, host, port): def create_server(wsgi_app, debug, **options): - return HTTPServer(wsgi_app, debug, **options) + return HTTPServer(wsgi_app, debug, **options) \ No newline at end of file diff --git a/src/functions_framework/_http/flask.py b/src/functions_framework/triggers/http_trigger/_http/flask.py similarity index 98% rename from src/functions_framework/_http/flask.py rename to src/functions_framework/triggers/http_trigger/_http/flask.py index b2edf563..8cc5987d 100644 --- a/src/functions_framework/_http/flask.py +++ b/src/functions_framework/triggers/http_trigger/_http/flask.py @@ -22,4 +22,4 @@ def __init__(self, app, host, port, debug, **options): self.options = options def run(self): - self.app.run(self.host, self.port, debug=self.debug, **self.options) + self.app.run(self.host, self.port, debug=self.debug, **self.options) \ No newline at end of file diff --git a/src/functions_framework/_http/gunicorn.py b/src/functions_framework/triggers/http_trigger/_http/gunicorn.py similarity index 97% rename from src/functions_framework/_http/gunicorn.py rename to src/functions_framework/triggers/http_trigger/_http/gunicorn.py index 25fdb790..f522b67f 100644 --- a/src/functions_framework/_http/gunicorn.py +++ b/src/functions_framework/triggers/http_trigger/_http/gunicorn.py @@ -20,7 +20,7 @@ def __init__(self, app, host, port, debug, **options): self.options = { "bind": "%s:%s" % (host, port), "workers": 1, - "threads": 8, + "threads": 1024, "timeout": 0, "loglevel": "error", "limit_request_line": 0, diff --git a/src/functions_framework/triggers/http_trigger/http.py b/src/functions_framework/triggers/http_trigger/http.py new file mode 100644 index 00000000..4d35e67a --- /dev/null +++ b/src/functions_framework/triggers/http_trigger/http.py @@ -0,0 +1,46 @@ +# Copyright 2023 The OpenFunction Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import uvicorn + +from functions_framework import constants +from functions_framework.context.function_context import HTTPRoute +from functions_framework.context.runtime_context import RuntimeContext +from functions_framework.triggers.http_trigger import create_app +from functions_framework.triggers.http_trigger._http import create_server +from functions_framework.triggers.trigger import TriggerHandler + + +class HTTPTriggerHandler(TriggerHandler): + """Handle http trigger.""" + def __init__(self, port, trigger: HTTPRoute, source=None, target=None, user_function=None, debug=False): + self.port = trigger.port if trigger.port else port + self.source = source + self.target = target + self.trigger = trigger + self.hostname = trigger.hostname + self.route_rules = trigger.rules + self.user_function = user_function + self.debug = debug + if self.port == 0: + self.port = constants.DEFAULT_HTTP_APP_PORT + + def start(self, context: RuntimeContext, logger=None): + if not self.trigger: + raise Exception("No trigger specified for HTTPTriggerHandler") + + app = create_app(context, self.target, self.source, logger) + create_server(app, self.debug).run("0.0.0.0", self.port) + + + diff --git a/src/google/__init__.py b/src/functions_framework/triggers/trigger.py similarity index 68% rename from src/google/__init__.py rename to src/functions_framework/triggers/trigger.py index 72a55585..8edf9076 100644 --- a/src/google/__init__.py +++ b/src/functions_framework/triggers/trigger.py @@ -1,4 +1,4 @@ -# Copyright 2020 Google LLC +# Copyright 2023 The OpenFunction Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -11,12 +11,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from abc import ABC, abstractmethod -try: - import pkg_resources +from functions_framework.context.runtime_context import RuntimeContext - pkg_resources.declare_namespace(__name__) -except ImportError: - import pkgutil - __path__ = pkgutil.extend_path(__path__, __name__) +class TriggerHandler(ABC): + @abstractmethod + def start(self, context: RuntimeContext): + pass + diff --git a/src/google/cloud/functions_v1/context.py b/src/google/cloud/functions_v1/context.py deleted file mode 100644 index 12670867..00000000 --- a/src/google/cloud/functions_v1/context.py +++ /dev/null @@ -1,33 +0,0 @@ -# Copyright 2020 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Definition of the context type used by Cloud Functions in Python.""" - - -class Context(object): - """Context passed to background functions.""" - - def __init__(self, eventId="", timestamp="", eventType="", resource=""): - self.event_id = eventId - self.timestamp = timestamp - self.event_type = eventType - self.resource = resource - - def __str__(self): - return "{event_id: %s, timestamp: %s, event_type: %s, resource: %s}" % ( - self.event_id, - self.timestamp, - self.event_type, - self.resource, - ) diff --git a/src/google/cloud/functions_v1beta2/context.py b/src/google/cloud/functions_v1beta2/context.py deleted file mode 100644 index 12670867..00000000 --- a/src/google/cloud/functions_v1beta2/context.py +++ /dev/null @@ -1,33 +0,0 @@ -# Copyright 2020 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Definition of the context type used by Cloud Functions in Python.""" - - -class Context(object): - """Context passed to background functions.""" - - def __init__(self, eventId="", timestamp="", eventType="", resource=""): - self.event_id = eventId - self.timestamp = timestamp - self.event_type = eventType - self.resource = resource - - def __str__(self): - return "{event_id: %s, timestamp: %s, event_type: %s, resource: %s}" % ( - self.event_id, - self.timestamp, - self.event_type, - self.resource, - ) diff --git a/src/openfunction/__init__.py b/src/openfunction/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/src/openfunction/async_server.py b/src/openfunction/async_server.py deleted file mode 100644 index c94916ad..00000000 --- a/src/openfunction/async_server.py +++ /dev/null @@ -1,32 +0,0 @@ -from dapr.ext.grpc import App, BindingRequest -from cloudevents.sdk.event import v1 -from openfunction.function_context import OPEN_FUNC_BINDING, OPEN_FUNC_TOPIC -from openfunction.function_runtime import OpenFunctionRuntime - -class AsyncApp(object): - """Init async server with dapr server.""" - - def __init__(self, func_context): - """Inits async server. - Args: - func_context: OpenFunction context - """ - self.ctx = OpenFunctionRuntime.parse(func_context) - self.app = App() - - - def bind(self, function): - """Bind user function with input binding/subscription. - Args: - function: user function - """ - for input in self.ctx.inputs.values(): - type = input.get_type() - if type == OPEN_FUNC_BINDING: - @self.app.binding(input.component_name) - def binding(request: BindingRequest): - function(self.ctx, request.data) - elif type == OPEN_FUNC_TOPIC: - @self.app.subscribe(pubsub_name=input.component_name, topic=input.uri, metadata=input.metadata) - def mytopic(event: v1.Event) -> None: - function(self.ctx, bytes(event.data)) \ No newline at end of file diff --git a/src/openfunction/dapr_output_middleware.py b/src/openfunction/dapr_output_middleware.py deleted file mode 100644 index f7d59134..00000000 --- a/src/openfunction/dapr_output_middleware.py +++ /dev/null @@ -1,19 +0,0 @@ -import logging - -from openfunction.function_runtime import OpenFunctionRuntime - -def dapr_output_middleware(context): - """Flask middleware for output binding.""" - def dapr_output_middleware(response): - if not context or not context.outputs or not context.is_runtime_knative(): - return response - - runtime = OpenFunctionRuntime.parse(context) - resp = runtime.send(response.get_data(True)) - - for key, value in resp.items(): - logging.debug("Dapr result for %s: %s", key, value.text()) - - return response - - return dapr_output_middleware \ No newline at end of file diff --git a/src/openfunction/function_context.py b/src/openfunction/function_context.py deleted file mode 100644 index 7be2a9d0..00000000 --- a/src/openfunction/function_context.py +++ /dev/null @@ -1,85 +0,0 @@ -OPEN_FUNC_BINDING = "bindings" -OPEN_FUNC_TOPIC = "pubsub" - -KNATIVE_RUNTIME_TYPE = "knative" -ASYNC_RUNTIME_TYPE = "async" - - -class FunctionContext(object): - """OpenFunction's serving context.""" - - def __init__(self, name="", version="", runtime="", inputs=None, outputs=None, port=8080): - self.name = name - self.version = version - self.runtime = runtime - self.inputs = inputs - self.outputs = outputs - self.port = port - - def is_runtime_async(self): - return self.runtime.lower() == ASYNC_RUNTIME_TYPE - - def is_runtime_knative(self): - return self.runtime.lower() == KNATIVE_RUNTIME_TYPE - - @staticmethod - def from_json(json_dct): - name = json_dct.get('name') - version = json_dct.get('version') - runtime = json_dct.get('runtime') - inputs_list = json_dct.get('inputs') - outputs_list = json_dct.get('outputs') - - inputs = None - if inputs_list: - inputs = {} - for k, v in inputs_list.items(): - input = Component.from_json(v) - inputs[k] = input - - outputs = None - if outputs_list: - outputs = {} - for k, v in outputs_list.items(): - output = Component.from_json(v) - outputs[k] = output - - return FunctionContext(name, version, runtime, inputs, outputs) - - -class Component(object): - """Components for inputs and outputs.""" - - def __init__(self, uri="", componentName="", componentType="", metadata=None, operation=""): - self.uri = uri - self.component_name = componentName - self.component_type = componentType - self.metadata = metadata - self.operation = operation - - def get_type(self): - type_split = self.component_type.split(".") - if len(type_split) > 1: - t = type_split[0] - if t == OPEN_FUNC_BINDING or t == OPEN_FUNC_TOPIC: - return t - - return "" - - def __str__(self): - return "{uri: %s, component_name: %s, component_type: %s, operation: %s, metadata: %s}" % ( - self.uri, - self.component_name, - self.component_type, - self.operation, - self.metadata - ) - - @staticmethod - def from_json(json_dct): - uri = json_dct.get('uri', '') - component_name = json_dct.get('componentName', '') - metadata = json_dct.get('metadata') - component_type = json_dct.get('componentType', '') - operation = json_dct.get('operation', '') - return Component(uri, component_name, component_type, metadata, operation) \ No newline at end of file diff --git a/src/openfunction/function_runtime.py b/src/openfunction/function_runtime.py deleted file mode 100644 index f09efcab..00000000 --- a/src/openfunction/function_runtime.py +++ /dev/null @@ -1,85 +0,0 @@ -from abc import abstractmethod -import os - -from dapr.clients import DaprGrpcClient -from dapr.conf import settings - -from openfunction.function_context import FunctionContext - -DAPR_GRPC_PORT = "DAPR_GRPC_PORT" -OPEN_FUNC_BINDING = "bindings" -OPEN_FUNC_TOPIC = "pubsub" - - -class OpenFunctionRuntime(object): - """OpenFunction Runtime.""" - - def __init__(self, context=None): - """Inits OpenFunction Runtime. - Args: - context: OpenFunction context - """ - self.context = context - - def __getattribute__(self, item): - try: - target = object.__getattribute__(self, item) - return target - except AttributeError: - target = object.__getattribute__(self, "context") - return getattr(target, item) - - def set_dapr_grpc_port(self): - port = os.environ.get(DAPR_GRPC_PORT) - if port: - settings.DAPR_GRPC_PORT = port - - @staticmethod - def parse(context: FunctionContext): - return DaprRuntime(context) - - @abstractmethod - def send(self, data, output): - """send data to certain ouput binding or pubsub topic""" - - -class DaprRuntime(OpenFunctionRuntime): - """Dapr runtime derived from OpenFunctionRuntime.""" - - def __init__(self, context=None): - """Inits Dapr Runtime. - Args: - context: OpenFunction context - """ - super().__init__(context) - super().set_dapr_grpc_port() - - self.client = DaprGrpcClient() - - def send(self, data, output=None): - """Inits Dapr Runtime. - Args: - data: Bytes or str to send. - output: A string of designated output name. Only send this output if designated. - Returns: - A dict mapping keys to the corresponding dapr response. - """ - outputs = self.context.outputs - filtered_outputs = {} - responses = {} - - if output and output in outputs: - filtered_outputs[output] = outputs[output] - else: - filtered_outputs = outputs - - for key, value in filtered_outputs.items(): - type = value.get_type() - if type == OPEN_FUNC_BINDING: - resp = self.client.invoke_binding(value.component_name, value.operation, data, value.metadata) - elif type == OPEN_FUNC_TOPIC: - resp = self.client.publish_event(value.component_name, value.uri, data, value.metadata) - responses[key] = resp - - return responses - diff --git a/tests/test_async.py b/tests/test_async.py index 3a13a4a1..edd321e1 100644 --- a/tests/test_async.py +++ b/tests/test_async.py @@ -1,5 +1,4 @@ import random -import pathlib import json import subprocess import threading @@ -10,8 +9,8 @@ from paho.mqtt import client as mqtt_client -from openfunction.function_context import FunctionContext -from openfunction.async_server import AsyncApp +from functions_framework.openfunction import FunctionContext +from functions_framework.openfunction import AsyncApp TEST_PAYLOAD = {"data": "hello world"} APP_ID="async.dapr" diff --git a/tests/test_binding.py b/tests/test_binding.py index 0f9219b0..4ae9f906 100644 --- a/tests/test_binding.py +++ b/tests/test_binding.py @@ -6,7 +6,7 @@ import pytest from functions_framework import create_app -from openfunction.function_context import FunctionContext +from functions_framework.openfunction import FunctionContext TEST_FUNCTIONS_DIR = pathlib.Path(__file__).resolve().parent / "test_functions" TEST_RESPONSE = "Hello world!" diff --git a/tests/test_convert.py b/tests/test_convert.py index 0d41d5ed..6128cd5d 100644 --- a/tests/test_convert.py +++ b/tests/test_convert.py @@ -22,7 +22,7 @@ from functions_framework import event_conversion from functions_framework.exceptions import EventConversionException -from google.cloud.functions.context import Context +from google_origin.cloud.functions.context import Context TEST_DATA_DIR = pathlib.Path(__file__).resolve().parent / "test_data" @@ -31,11 +31,11 @@ "context": { "eventId": "1215011316659232", "timestamp": "2020-05-18T12:13:19Z", - "eventType": "google.pubsub.topic.publish", + "eventType": "google_origin.pubsub.topic.publish", "resource": { "service": "pubsub.googleapis.com", "name": "projects/sample-project/topics/gcf-test", - "type": "type.googleapis.com/google.pubsub.v1.PubsubMessage", + "type": "type.googleapis.com/google_origin.pubsub.v1.PubsubMessage", }, }, "data": { @@ -71,7 +71,7 @@ "id": "1215011316659232", "source": "//pubsub.googleapis.com/projects/sample-project/topics/gcf-test", "time": "2020-05-18T12:13:19Z", - "type": "google.cloud.pubsub.topic.v1.messagePublished", + "type": "google_origin.cloud.pubsub.topic.v1.messagePublished", "datacontenttype": "application/json", "data": { "message": { @@ -104,17 +104,17 @@ def raw_pubsub_request(): def marshalled_pubsub_request(): return { "data": { - "@type": "type.googleapis.com/google.pubsub.v1.PubsubMessage", + "@type": "type.googleapis.com/google_origin.pubsub.v1.PubsubMessage", "data": "eyJmb28iOiJiYXIifQ==", "attributes": {"test": "123"}, }, "context": { "eventId": "1215011316659232", - "eventType": "google.pubsub.topic.publish", + "eventType": "google_origin.pubsub.topic.publish", "resource": { "name": "projects/sample-project/topics/gcf-test", "service": "pubsub.googleapis.com", - "type": "type.googleapis.com/google.pubsub.v1.PubsubMessage", + "type": "type.googleapis.com/google_origin.pubsub.v1.PubsubMessage", }, "timestamp": "2021-04-17T07:21:18.249Z", }, @@ -277,7 +277,7 @@ def test_marshal_background_event_data_bad_request(): ) def test_split_resource(background_resource): context = Context( - eventType="google.storage.object.finalize", resource=background_resource + eventType="google_origin.storage.object.finalize", resource=background_resource ) service, resource, subject = event_conversion._split_resource(context) assert service == "storage.googleapis.com" @@ -320,7 +320,7 @@ def test_split_resource_no_resource_regex_match(): "type": "storage#object", } context = Context( - eventType="google.storage.object.finalize", resource=background_resource + eventType="google_origin.storage.object.finalize", resource=background_resource ) with pytest.raises(EventConversionException) as exc_info: event_conversion._split_resource(context) @@ -412,25 +412,25 @@ def test_pubsub_emulator_request_with_invalid_message( "ce_event_type, ce_source, expected_type, expected_resource", [ ( - "google.firebase.database.ref.v1.written", + "google_origin.firebase.database.ref.v1.written", "//firebasedatabase.googleapis.com/projects/_/instances/my-project-id", - "providers/google.firebase.database/eventTypes/ref.write", + "providers/google_origin.firebase.database/eventTypes/ref.write", "projects/_/instances/my-project-id/my/subject", ), ( - "google.cloud.pubsub.topic.v1.messagePublished", + "google_origin.cloud.pubsub.topic.v1.messagePublished", "//pubsub.googleapis.com/projects/sample-project/topics/gcf-test", - "google.pubsub.topic.publish", + "google_origin.pubsub.topic.publish", { "service": "pubsub.googleapis.com", "name": "projects/sample-project/topics/gcf-test", - "type": "type.googleapis.com/google.pubsub.v1.PubsubMessage", + "type": "type.googleapis.com/google_origin.pubsub.v1.PubsubMessage", }, ), ( - "google.cloud.storage.object.v1.finalized", + "google_origin.cloud.storage.object.v1.finalized", "//storage.googleapis.com/projects/_/buckets/some-bucket", - "google.storage.object.finalize", + "google_origin.storage.object.finalize", { "service": "storage.googleapis.com", "name": "projects/_/buckets/some-bucket/my/subject", @@ -438,19 +438,19 @@ def test_pubsub_emulator_request_with_invalid_message( }, ), ( - "google.firebase.auth.user.v1.created", + "google_origin.firebase.auth.user.v1.created", "//firebaseauth.googleapis.com/projects/my-project-id", "providers/firebase.auth/eventTypes/user.create", "projects/my-project-id", ), ( - "google.firebase.database.ref.v1.written", + "google_origin.firebase.database.ref.v1.written", "//firebasedatabase.googleapis.com/projects/_/locations/us-central1/instances/my-project-id", - "providers/google.firebase.database/eventTypes/ref.write", + "providers/google_origin.firebase.database/eventTypes/ref.write", "projects/_/instances/my-project-id/my/subject", ), ( - "google.cloud.firestore.document.v1.written", + "google_origin.cloud.firestore.document.v1.written", "//firestore.googleapis.com/projects/project-id/databases/(default)", "providers/cloud.firestore/eventTypes/document.write", "projects/project-id/databases/(default)/my/subject", @@ -480,7 +480,7 @@ def test_cloud_event_to_legacy_event_with_pubsub_message_payload( create_ce_headers, ): headers = create_ce_headers( - "google.cloud.pubsub.topic.v1.messagePublished", + "google_origin.cloud.pubsub.topic.v1.messagePublished", "//pubsub.googleapis.com/projects/sample-project/topics/gcf-test", ) data = { @@ -494,7 +494,7 @@ def test_cloud_event_to_legacy_event_with_pubsub_message_payload( (res_data, res_context) = event_conversion.cloud_event_to_background_event(req) - assert res_context.event_type == "google.pubsub.topic.publish" + assert res_context.event_type == "google_origin.pubsub.topic.publish" assert res_data == {"data": "fizzbuzz"} @@ -502,7 +502,7 @@ def test_cloud_event_to_legacy_event_with_firebase_auth_ce( create_ce_headers, ): headers = create_ce_headers( - "google.firebase.auth.user.v1.created", + "google_origin.firebase.auth.user.v1.created", "//firebaseauth.googleapis.com/projects/my-project-id", ) data = { @@ -530,7 +530,7 @@ def test_cloud_event_to_legacy_event_with_firebase_auth_ce_empty_metadata( create_ce_headers, ): headers = create_ce_headers( - "google.firebase.auth.user.v1.created", + "google_origin.firebase.auth.user.v1.created", "//firebaseauth.googleapis.com/projects/my-project-id", ) data = {"metadata": {}, "uid": "my-id"} @@ -569,7 +569,7 @@ def test_cloud_event_to_legacy_event_with_invalid_event( exception_message, ): headers = create_ce_headers( - "google.firebase.database.ref.v1.written", + "google_origin.firebase.database.ref.v1.written", "//firebasedatabase.googleapis.com/projects/_/instances/my-project-id", ) for k, v in header_overrides.items(): diff --git a/tests/test_functions.py b/tests/test_functions.py index 69931b44..4d72f1e8 100644 --- a/tests/test_functions.py +++ b/tests/test_functions.py @@ -573,14 +573,14 @@ def test_errorhandler(monkeypatch): @pytest.mark.parametrize( "event_type", [ - "google.cloud.firestore.document.v1.written", - "google.cloud.pubsub.topic.v1.messagePublished", - "google.cloud.storage.object.v1.finalized", - "google.cloud.storage.object.v1.metadataUpdated", - "google.firebase.analytics.log.v1.written", - "google.firebase.auth.user.v1.created", - "google.firebase.auth.user.v1.deleted", - "google.firebase.database.ref.v1.written", + "google_origin.cloud.firestore.document.v1.written", + "google_origin.cloud.pubsub.topic.v1.messagePublished", + "google_origin.cloud.storage.object.v1.finalized", + "google_origin.cloud.storage.object.v1.metadataUpdated", + "google_origin.firebase.analytics.log.v1.written", + "google_origin.firebase.auth.user.v1.created", + "google_origin.firebase.auth.user.v1.deleted", + "google_origin.firebase.database.ref.v1.written", ], ) def tests_cloud_to_background_event_client( @@ -598,7 +598,7 @@ def tests_cloud_to_background_event_client( def tests_cloud_to_background_event_client_invalid_source( background_event_client, create_ce_headers, tempfile_payload ): - headers = create_ce_headers("google.cloud.firestore.document.v1.written") + headers = create_ce_headers("google_origin.cloud.firestore.document.v1.written") headers["ce-source"] = "invalid" resp = background_event_client.post("/", headers=headers, json=tempfile_payload) diff --git a/tests/test_functions/background_load_error/main.py b/tests/test_functions/background_load_error/main.py index d9db3c71..4fef385b 100644 --- a/tests/test_functions/background_load_error/main.py +++ b/tests/test_functions/background_load_error/main.py @@ -23,7 +23,7 @@ def function(event, context): Args: event: The event data which triggered this background function. - context (google.cloud.functions.Context): The Cloud Functions event context. + context (google_origin.cloud.functions.Context): The Cloud Functions event context. """ # Syntax error: an extra closing parenthesis in the line below. print('foo')) diff --git a/tests/test_functions/background_missing_dependency/main.py b/tests/test_functions/background_missing_dependency/main.py index 3050adfc..2d8685f3 100644 --- a/tests/test_functions/background_missing_dependency/main.py +++ b/tests/test_functions/background_missing_dependency/main.py @@ -25,7 +25,7 @@ def function(event, context): Args: event: The event data which triggered this background function. - context (google.cloud.functions.Context): The Cloud Functions event context. + context (google_origin.cloud.functions.Context): The Cloud Functions event context. """ del event del context diff --git a/tests/test_functions/background_multiple_entry_points/main.py b/tests/test_functions/background_multiple_entry_points/main.py index 56b1a73f..4a2a85eb 100644 --- a/tests/test_functions/background_multiple_entry_points/main.py +++ b/tests/test_functions/background_multiple_entry_points/main.py @@ -45,7 +45,7 @@ def myFunctionFoo( event: The event data (as dictionary) which triggered this background function. Must contain entries for 'value' and 'filename' keys in the data dictionary. - context (google.cloud.functions.Context): The Cloud Functions event context. + context (google_origin.cloud.functions.Context): The Cloud Functions event context. """ fun("myFunctionFoo", event) @@ -62,7 +62,7 @@ def myFunctionBar( event: The event data (as dictionary) which triggered this background function. Must contain entries for 'value' and 'filename' keys in the data dictionary. - context (google.cloud.functions.Context): The Cloud Functions event context. + context (google_origin.cloud.functions.Context): The Cloud Functions event context. """ fun("myFunctionBar", event) diff --git a/tests/test_functions/background_trigger/main.py b/tests/test_functions/background_trigger/main.py index 842c4889..14996857 100644 --- a/tests/test_functions/background_trigger/main.py +++ b/tests/test_functions/background_trigger/main.py @@ -27,7 +27,7 @@ def function( event: The event data (as dictionary) which triggered this background function. Must contain entries for 'value' and 'filename' keys in the data dictionary. - context (google.cloud.functions.Context): The Cloud Functions event context. + context (google_origin.cloud.functions.Context): The Cloud Functions event context. """ filename = event["filename"] value = event["value"] diff --git a/tests/test_functions/cloud_events/converted_background_event.py b/tests/test_functions/cloud_events/converted_background_event.py index 9264251d..c6fab38b 100644 --- a/tests/test_functions/cloud_events/converted_background_event.py +++ b/tests/test_functions/cloud_events/converted_background_event.py @@ -30,7 +30,7 @@ def function(cloud_event): """ data = { "message": { - "@type": "type.googleapis.com/google.pubsub.v1.PubsubMessage", + "@type": "type.googleapis.com/google_origin.pubsub.v1.PubsubMessage", "attributes": { "attr1": "attr1-value", }, @@ -45,7 +45,7 @@ def function(cloud_event): and cloud_event.data == data and cloud_event["source"] == "//pubsub.googleapis.com/projects/sample-project/topics/gcf-test" - and cloud_event["type"] == "google.cloud.pubsub.topic.v1.messagePublished" + and cloud_event["type"] == "google_origin.cloud.pubsub.topic.v1.messagePublished" and cloud_event["time"] == "2020-09-29T11:32:00.000Z" )