Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dasithw/otel collector #3

Merged
merged 6 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
APP__ENVIRONMENT=development
OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 # OpenTelemetry Collector otlp endpoint (exposed from docker-compose)

OPENAI_API_BASE=
OPENAI_API_KEY=
OPENAI_DEPLOYMENT_ID=
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ coverage.xml

# Environments
.env
.env.*

# VS Code
.vscode/
Expand Down
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
.PHONY: frontend backend both
.PHONY: frontend backend both start-telemetry

frontend:
cd frontend && streamlit run app.py
Expand All @@ -8,3 +8,7 @@ backend:

both:
make -j2 frontend backend

start-telemetry:
-docker compose down
docker compose up -d
29 changes: 28 additions & 1 deletion app/main.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,37 @@
from contextlib import asynccontextmanager
import os
from fastapi.middleware.cors import CORSMiddleware
from services.observability import DEVELOPMENT_MODE, PRODUCTION_MODE, get_logger, initialize_observability, instrument_application
from routers.translate import translate_router
from routers.feedback import feedback_router
from fastapi import FastAPI
from dotenv import load_dotenv
load_dotenv()

# This method controls the lifecycle of the FastAPI app and is used to setup things post process fork
# https://fastapi.tiangolo.com/advanced/events/#use-case
@asynccontextmanager
async def lifespan(app: FastAPI):
app_environment = os.getenv("APP__ENVIRONMENT", "Unspecified")
# do the initialize logic here

app = FastAPI()
if app_environment.lower() == "development":
initialize_observability(DEVELOPMENT_MODE, service_name="AI Translator API", environment=app_environment)
else:
initialize_observability(PRODUCTION_MODE, service_name="AI Translator API", environment=app_environment)

logger = get_logger()
logger.info("Starting API server...")
yield
logger.info("Stopping API server...")


app = FastAPI(
title="AI Translate",
description="This is a simple API that translates text from one language to another",
version="0.1.0",
lifespan=lifespan,
)

# middleware for frontend
app.add_middleware(
Expand All @@ -25,3 +50,5 @@
@app.get("/")
def read_root():
return {"message": "Welcome to the ai translate!"}

instrument_application(app)
232 changes: 232 additions & 0 deletions app/services/observability.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
import logging
import os
import re
from logging import Logger
from typing import Dict, List, Literal

from fastapi import FastAPI

# OpenTelemetry
from opentelemetry import metrics, trace
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.openai import OpenAIInstrumentor
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.sdk.metrics import Meter, MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import Span, Tracer, TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.trace.sampling import ParentBasedTraceIdRatio

SENSITIVE_DATA_SPAN_NAME = "sensitive_data_logged"
SENSITIVE_DATA_INDICATOR_ATTRIBUTE_NAME = "contains_sensitive_data"

DEVELOPMENT_MODE = Literal["DEVELOPMENT"]
PRODUCTION_MODE = Literal["PRODUCTION"]

ACTIVE_SERVICE_NAME = "translator_service"

_has_already_init = False
run_mode: Literal["DEVELOPMENT", "PRODUCTION"] = DEVELOPMENT_MODE

_main_tracer: Tracer = None
_main_logger: Logger = logging.getLogger()
_main_meter: Meter = None
log_level = (os.getenv("OTEL_LOG_LEVEL") or "INFO").upper()

# https://opentelemetry-python-contrib.readthedocs.io/en/latest/instrumentation/logging/logging.html
logging.basicConfig(
level=log_level,
format="%(asctime)s - %(name)s[%(process)d] - %(levelname)s - %(message)s",
)


def ensure_initialized():
if not _has_already_init:
raise Exception(
"Observability module has not been initialized. Please call initialize_observability()."
)


def is_development() -> bool:
return run_mode == DEVELOPMENT_MODE


def get_tracer():
global _main_tracer
ensure_initialized()
return _main_tracer


def get_logger(name: str | None = None) -> logging.Logger:
logger = logging.getLogger(name)
return logger


def get_meter():
global _main_meter
ensure_initialized()
return _main_meter


def initialize_observability(
mode: Literal["DEVELOPMENT", "PRODUCTION"], service_name: str = "ai.translator", environment: str = "Unspecified"
):
"""Initializes the observability once for the lifetime of the application/process"""
global \
_has_already_init, \
run_mode, \
_main_tracer, \
_main_logger, \
_main_meter, \
ACTIVE_SERVICE_NAME

if _has_already_init:
_main_logger.warning("Attempt made to initialize observability more than once")
return

_has_already_init = True

run_mode = mode
_main_logger.info(f"Initializing the observability with mode: {mode}")
# See this for all the config options using environment variables: https://opentelemetry.io/docs/specs/otel/protocol/exporter/
opentelemetry_exporter_otlp_endpoint = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT")

if opentelemetry_exporter_otlp_endpoint:
_main_logger.info("🚀 Configuring OTLP telemetry")
service_name = os.getenv(
"OTEL_SERVICE_NAME", service_name
) # https://opentelemetry.io/docs/languages/sdk-configuration/general/#otel_service_name
sample_ratio = float(
os.getenv("OTEL_TRACES_SAMPLER_ARG", "1.0")
) # https://opentelemetry-python.readthedocs.io/en/latest/sdk/trace.sampling.html

# setup the instrumentors
resource = Resource.create(
attributes={
"service.name": service_name, # https://opentelemetry.io/docs/specs/semconv/resource/#service
"service.namespace": "ai.translator",
"deployment.environment.name": environment, # https://opentelemetry.io/docs/specs/semconv/resource/deployment-environment/
"process.pid": str(
os.getpid()
), # https://opentelemetry.io/docs/specs/semconv/attributes-registry/process/
}
)

ACTIVE_SERVICE_NAME = service_name

# tracing
trace.set_tracer_provider(
TracerProvider(
resource=resource, sampler=ParentBasedTraceIdRatio(sample_ratio)
)
)
span_processor = BatchSpanProcessor(OTLPSpanExporter())
trace.get_tracer_provider().add_span_processor(span_processor)
_main_tracer = trace.get_tracer_provider().get_tracer(service_name)

# metrics
metric_reader = PeriodicExportingMetricReader(OTLPMetricExporter())
meter_provider = MeterProvider(
resource=resource, metric_readers=[metric_reader]
)
metrics.set_meter_provider(meter_provider)
_main_meter = metrics.get_meter(service_name)

# logging
logger_provider = LoggerProvider(resource=resource)
batch_log_record_processor = BatchLogRecordProcessor(OTLPLogExporter())
logger_provider.add_log_record_processor(batch_log_record_processor)

handler = LoggingHandler(level=log_level, logger_provider=logger_provider)
# Attach OTLP handler to root logger
logging.getLogger().addHandler(handler)
else:
_main_logger.info(
"🚀 OTLP telemetry exporter not configured (set OTEL_EXPORTER_OTLP_ENDPOINT)"
)
_main_tracer = trace.get_tracer("default")
_main_meter = metrics.get_meter("default")

_main_logger = get_logger()
_main_logger.info("Observability initialization complete")


def mark_span_as_sensitive(span: Span):
span.set_attribute(SENSITIVE_DATA_INDICATOR_ATTRIBUTE_NAME, "true")


def add_sensitive_event(span: Span, event: str, attributes: dict[str, str]):
if not attributes:
attributes = {}

attributes[SENSITIVE_DATA_INDICATOR_ATTRIBUTE_NAME] = "true"
span.add_event(name=event, attributes=attributes)


def log_sensitive_data(
message: str,
attributes: str | Dict | int | List = None,
print_to_console: bool = False,
span_name: str | None = None,
) -> None:
if is_development() and print_to_console:
_main_logger.info(f"{message} - attributes={attributes}")

if not span_name:
span_name = SENSITIVE_DATA_SPAN_NAME

with get_tracer().start_as_current_span(span_name) as span:
if not attributes:
attributes = {}
if isinstance(attributes, dict):
span.set_attributes({k: str(v) for k, v in attributes.items()})
if attributes:
span.set_attribute("event.attributes", str(attributes))

span.set_attribute("message", message)
span.set_attribute(SENSITIVE_DATA_INDICATOR_ATTRIBUTE_NAME, "true")


def convert_to_metric_name(input_string: str) -> str:
"""
Converts a string into a metric name compatible with OpenTelemetry.
# https://opentelemetry.io/docs/specs/otel/metrics/api/#instrument-name-syntax

Args:
input_string (str): The input string to be converted.

Returns:
str: The converted metric name.
"""

# Remove leading and trailing whitespace
input_string = input_string.strip()
# Add leading alpha character
if not re.match(r"^[a-zA-Z]", input_string):
input_string = "A" + input_string
# Replace spaces with underscores
input_string = input_string.replace(" ", "_")
# Remove special characters and non-alphanumeric characters
input_string = re.sub(r"[^a-zA-Z0-9_]", "", input_string)
# Limit the length to 100 characters
input_string = input_string[:100]

return input_string


def instrument_application(app: FastAPI):
_main_logger.info("Setting up OpenTelemetry instrumentation...")
RequestsInstrumentor().instrument()
HTTPXClientInstrumentor().instrument()
OpenAIInstrumentor().instrument()
FastAPIInstrumentor.instrument_app(
app,
http_capture_headers_server_request=[".*"]
)
19 changes: 19 additions & 0 deletions docker-compose.yaml
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we need them to run the app - shall we add them to the devcontainer - so the dc goes in a compose file?

Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
services:
lgtm:
image: grafana/otel-lgtm
container_name: otel-lgtm
ports:
- "3000:3000" # LGTM UI

otel-collector:
image: otel/opentelemetry-collector-contrib:0.112.0
container_name: otel-collector
command: ["--config=/etc/otel-collector-config.yaml"]
volumes:
- ${HOST_PROJECT_PATH}/otel-collector-config.yaml:/etc/otel-collector-config.yaml
ports:
- "13133:13133" # health_check extension
- "4317:40317" # OTLP gRPC receiver
- "4318:40318" # OTLP http receiver
depends_on:
- lgtm
61 changes: 61 additions & 0 deletions otel-collector-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:40317
http:
endpoint: 0.0.0.0:40318

exporters:
debug:
verbosity: normal
otlp:
endpoint: "http://lgtm:4317"
tls:
insecure: true

processors:
batch:
transform/redact_special: # This processor will redact any spans and span events with the given regex patterns
# https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor
# https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl#grammar
error_mode: ignore
trace_statements:
- context: span # https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl/contexts/ottlspan
statements:
# https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl/ottlfuncs#replace_all_patterns
# Redact TFN (Tax File Number) from the attributes
- replace_all_patterns(attributes, "value", "\\b\\d{3}\\s?\\d{3}\\s?\\d{3}\\b", "{redacted}")
- context: spanevent # https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl/contexts/ottlspanevent
statements:
# https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl/ottlfuncs#replace_pattern
# Redact TFN (Tax File Number) from the name
- replace_pattern(name, "\\b\\d{3}\\s?\\d{3}\\s?\\d{3}\\b", "{redacted}")
# Redact TFN (Tax File Number) from the attributes
- replace_all_patterns(attributes, "value", "\\b\\d{3}\\s?\\d{3}\\s?\\d{3}\\b", "{redacted}")
log_statements:
- context: log
statements:
- replace_pattern(body.string, "\\b\\d{3}\\s?\\d{3}\\s?\\d{3}\\b", "{redacted}")
- replace_all_patterns(attributes, "value", "\\b\\d{3}\\s?\\d{3}\\s?\\d{3}\\b", "{redacted}")

extensions:
health_check:
pprof:
zpages:

service:
extensions: [pprof, zpages, health_check]
pipelines:
traces:
receivers: [otlp]
processors: [transform/redact_special, batch]
exporters: [debug, otlp]
metrics:
receivers: [otlp]
processors: [batch]
exporters: [debug, otlp]
logs:
receivers: [otlp]
processors: [transform/redact_special, batch]
exporters: [debug, otlp]
Loading