diff --git a/pyms/flask/app/create_app.py b/pyms/flask/app/create_app.py index eac66b2..6a462d6 100644 --- a/pyms/flask/app/create_app.py +++ b/pyms/flask/app/create_app.py @@ -72,6 +72,7 @@ def example(): Current services are swagger, request, tracer, metrics """ + config_resource = CONFIG_BASE services: List[str] = [] application = Flask @@ -79,6 +80,7 @@ def example(): request: Optional[DriverService] = None tracer: Optional[DriverService] = None metrics: Optional[DriverService] = None + opentelemetry: Optional[DriverService] = None _singleton = True def __init__(self, *args, **kwargs): @@ -105,7 +107,9 @@ def init_services(self) -> None: """ services_resources = ServicesResource() for service_name, service in services_resources.get_services(): - if service_name not in self.services or not getattr(self, service_name, False): + if service_name not in self.services or not getattr( + self, service_name, False + ): self.services.append(service_name) setattr(self, service_name, service) @@ -150,7 +154,7 @@ def init_logger(self) -> None: :return: """ self.application.logger = logger - os.environ['WERKZEUG_RUN_MAIN'] = "true" + os.environ["WERKZEUG_RUN_MAIN"] = "true" formatter = CustomJsonFormatter() formatter.add_service_name(self.application.config["APP_NAME"]) @@ -172,11 +176,16 @@ def init_app(self) -> Flask: :return: None """ if self.swagger: - application = self.swagger.init_app(config=self.config.to_flask(), path=self.path) + application = self.swagger.init_app( + config=self.config.to_flask(), path=self.path + ) else: check_package_exists("flask") - application = Flask(__name__, static_folder=os.path.join(self.path, 'static'), - template_folder=os.path.join(self.path, 'templates')) + application = Flask( + __name__, + static_folder=os.path.join(self.path, "static"), + template_folder=os.path.join(self.path, "templates"), + ) application.root_path = self.path @@ -192,11 +201,32 @@ def init_metrics(self) -> None: if self.metrics: self.application.register_blueprint(self.metrics.metrics_blueprint) self.metrics.add_logger_handler( - self.application.logger, - self.application.config["APP_NAME"] + self.application.logger, self.application.config["APP_NAME"] ) self.metrics.monitor(self.application.config["APP_NAME"], self.application) + def init_opentelemetry(self) -> None: + if self.opentelemetry: + if self.opentelemetry.config.metrics.enabled: + # Set metrics backend + self.opentelemetry.set_metrics_backend() + # Set the metrics blueprint + # DISCLAIMER this endpoint may be only necessary with prometheus client + self.application.register_blueprint(self.opentelemetry.blueprint) + # Set instrumentations + if self.opentelemetry.config.metrics.instrumentations.flask: + self.opentelemetry.monitor( + self.application.config["APP_NAME"], self.application + ) + if self.opentelemetry.config.metrics.instrumentations.logger: + self.opentelemetry.add_logger_handler( + self.application.logger, self.application.config["APP_NAME"] + ) + if self.opentelemetry.config.tracing.enabled: + self.opentelemetry.set_tracing_backend() + if self.opentelemetry.config.logging.enabled: + self.opentelemetry.set_logging_backend() + def reload_conf(self): self.delete_services() self.config.reload() @@ -230,7 +260,11 @@ def create_app(self) -> Flask: self.init_metrics() - logger.debug("Started app with PyMS and this services: {}".format(self.services)) + self.init_opentelemetry() + + logger.debug( + "Started app with PyMS and this services: {}".format(self.services) + ) return self.application diff --git a/pyms/flask/services/opentelemetry.py b/pyms/flask/services/opentelemetry.py new file mode 100644 index 0000000..a2cf7ec --- /dev/null +++ b/pyms/flask/services/opentelemetry.py @@ -0,0 +1,133 @@ +import logging +import time +from typing import Text + +from flask import Blueprint, Response, request +from pyms.flask.services.driver import DriverService + +from opentelemetry import metrics +from opentelemetry.exporter.prometheus import PrometheusMetricsExporter +from opentelemetry.sdk.metrics import Counter, MeterProvider, ValueRecorder +from opentelemetry.sdk.metrics.export.controller import PushController +from prometheus_client import generate_latest + +# TODO set sane defaults +# https://github.com/python-microservices/pyms/issues/218 +# TODO validate config +# https://github.com/python-microservices/pyms/issues/219 +PROMETHEUS_CLIENT = "prometheus" + + +class FlaskMetricsWrapper: + def __init__(self, app_name: str, meter: MeterProvider): + self.app_name = app_name + # TODO add Histogram support for flask when available + # https://github.com/open-telemetry/opentelemetry-python/issues/1255 + self.flask_request_latency = meter.create_metric( + "http_server_requests_seconds", + "Flask Request Latency", + "http_server_requests_seconds", + float, + ValueRecorder, + ("service", "method", "uri", "status"), + ) + self.flask_request_count = meter.create_metric( + "http_server_requests_count", + "Flask Request Count", + "http_server_requests_count", + int, + Counter, + ["service", "method", "uri", "status"], + ) + + def before_request(self): # pylint: disable=R0201 + request.start_time = time.time() + + def after_request(self, response: Response) -> Response: + if hasattr(request.url_rule, "rule"): + path = request.url_rule.rule + else: + path = request.path + request_latency = time.time() - request.start_time + labels = { + "service": self.app_name, + "method": str(request.method), + "uri": path, + "status": str(response.status_code), + } + + self.flask_request_latency.record(request_latency, labels) + self.flask_request_count.add(1, labels) + + return response + + +class Service(DriverService): + """ + Adds [OpenTelemetry](https://opentelemetry.io/) metrics using the [Opentelemetry Client Library](https://opentelemetry-python.readthedocs.io/en/latest/exporter/). + """ + + config_resource: Text = "opentelemetry" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.blueprint = Blueprint("opentelemetry", __name__) + self.serve_metrics() + + def set_metrics_backend(self): + # Set meter provider + metrics.set_meter_provider(MeterProvider()) + self.meter = metrics.get_meter(__name__) + if self.config.metrics.backend.lower() == PROMETHEUS_CLIENT: + exporter = PrometheusMetricsExporter() + else: + pass + # Create the push controller that will update the metrics when the + # interval is met + PushController(self.meter, exporter, self.config.metrics.interval) + + def set_tracing_backend(self): + pass + + def set_logging_backend(self): + pass + + def monitor(self, app_name, app): + metric = FlaskMetricsWrapper(app_name, self.meter) + app.before_request(metric.before_request) + app.after_request(metric.after_request) + + def serve_metrics(self): + @self.blueprint.route("/metrics", methods=["GET"]) + def metrics(): # pylint: disable=unused-variable + return Response( + generate_latest(), + mimetype="text/print()lain", + content_type="text/plain; charset=utf-8", + ) + + def add_logger_handler( + self, logger: logging.Logger, service_name: str + ) -> logging.Logger: + logger.addHandler(MetricsLogHandler(service_name, self.meter)) + return logger + + +class MetricsLogHandler(logging.Handler): + """A LogHandler that exports logging metrics for OpenTelemetry.""" + + def __init__(self, app_name: str, meter: MeterProvider): + super().__init__() + self.app_name = str(app_name) + self.logger_total_messages = meter.create_metric( + "logger_messages_total", + "Count of log entries by service and level.", + "logger_messages_total", + int, + Counter, + ["service", "level"], + ) + + def emit(self, record) -> None: + labels = {"service": self.app_name, "level": record.levelname} + self.logger_total_messages.add(1, labels) diff --git a/setup.py b/setup.py index 36c788b..1b7967e 100644 --- a/setup.py +++ b/setup.py @@ -52,6 +52,11 @@ 'prometheus_client>=0.8.0', ] +install_opentelemetry_requires = [ + 'opentelemetry-exporter-prometheus>=0.14b0', + 'opentelemetry-sdk>=0.14b0', +] + install_tests_requires = [ 'requests-mock>=1.8.0', 'coverage>=5.3',