Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add tracing with OpenTelemetry #932

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 165 additions & 34 deletions Tiltfile
Copy link
Contributor Author

@meobilivang meobilivang May 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My python environment has the Black linter by default so it just reformats the whole file 😗

Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ tools_bin = "./hack/tools/bin"
kubectl_cmd = "./hack/tools/bin/kubectl"
kind_cmd = "./hack/tools/bin/kind"

#Add tools to path
# Add tools to path
os.putenv("PATH", os.getenv("PATH") + ":" + tools_bin)

update_settings(k8s_upsert_timeout_secs = 60) # on first tilt up, often can take longer than 30 seconds
update_settings(
k8s_upsert_timeout_secs = 60,
) # on first tilt up, often can take longer than 30 seconds

# set defaults
settings = {
Expand All @@ -26,10 +28,12 @@ settings = {
keys = ["GCP_B64ENCODED_CREDENTIALS"]

# global settings
settings.update(read_json(
"tilt-settings.json",
default = {},
))
settings.update(
read_json(
"tilt-settings.json",
default = {},
),
)

if settings.get("trigger_mode") == "manual":
trigger_mode(TRIGGER_MODE_MANUAL)
Expand All @@ -43,33 +47,64 @@ if "default_registry" in settings:
# deploy CAPI
def deploy_capi():
version = settings.get("capi_version")
capi_uri = "https://github.com/kubernetes-sigs/cluster-api/releases/download/{}/cluster-api-components.yaml".format(version)
cmd = "curl -sSL {} | {} | {} apply -f -".format(capi_uri, envsubst_cmd, kubectl_cmd)
capi_uri = "https://github.com/kubernetes-sigs/cluster-api/releases/download/{}/cluster-api-components.yaml".format(
version,
)
cmd = "curl -sSL {} | {} | {} apply -f -".format(
capi_uri,
envsubst_cmd,
kubectl_cmd,
)
local(cmd, quiet = True)
if settings.get("extra_args"):
extra_args = settings.get("extra_args")
if extra_args.get("core"):
core_extra_args = extra_args.get("core")
if core_extra_args:
for namespace in ["capi-system"]:
patch_args_with_extra_args(namespace, "capi-controller-manager", core_extra_args)
patch_args_with_extra_args(
namespace,
"capi-controller-manager",
core_extra_args,
)
if extra_args.get("kubeadm-bootstrap"):
kb_extra_args = extra_args.get("kubeadm-bootstrap")
if kb_extra_args:
patch_args_with_extra_args("capi-kubeadm-bootstrap-system", "capi-kubeadm-bootstrap-controller-manager", kb_extra_args)
patch_args_with_extra_args(
"capi-kubeadm-bootstrap-system",
"capi-kubeadm-bootstrap-controller-manager",
kb_extra_args,
)

def patch_args_with_extra_args(namespace, name, extra_args):
args_str = str(local("{} get deployments {} -n {} -o jsonpath={{.spec.template.spec.containers[0].args}}".format(kubectl_cmd, name, namespace)))
args_str = str(
local(
"{} get deployments {} -n {} -o jsonpath={{.spec.template.spec.containers[0].args}}".format(
kubectl_cmd,
name,
namespace,
),
),
)
args_to_add = [arg for arg in extra_args if arg not in args_str]
if args_to_add:
args = args_str[1:-1].split()
args.extend(args_to_add)
patch = [{
"op": "replace",
"path": "/spec/template/spec/containers/0/args",
"value": args,
}]
local("{} patch deployment {} -n {} --type json -p='{}'".format(kubectl_cmd, name, namespace, str(encode_json(patch)).replace("\n", "")))
patch = [
{
"op": "replace",
"path": "/spec/template/spec/containers/0/args",
"value": args,
},
]
local(
"{} patch deployment {} -n {} --type json -p='{}'".format(
kubectl_cmd,
name,
namespace,
str(encode_json(patch)).replace("\n", ""),
),
)

# Users may define their own Tilt customizations in tilt.d. This directory is excluded from git and these files will
# not be checked in to version control.
Expand All @@ -78,9 +113,18 @@ def include_user_tilt_files():
for f in user_tiltfiles:
include(f)

def append_arg_for_container_in_deployment(yaml_stream, name, namespace, contains_image_name, args):
def append_arg_for_container_in_deployment(
yaml_stream,
name,
namespace,
contains_image_name,
args):
for item in yaml_stream:
if item["kind"] == "Deployment" and item.get("metadata").get("name") == name and item.get("metadata").get("namespace") == namespace:
if (
item["kind"] == "Deployment" and
item.get("metadata").get("name") == name and
item.get("metadata").get("namespace") == namespace
):
containers = item.get("spec").get("template").get("spec").get("containers")
for container in containers:
if contains_image_name in container.get("image"):
Expand All @@ -94,7 +138,11 @@ def validate_auth():
substitutions = settings.get("kustomize_substitutions", {})
missing = [k for k in keys if k not in substitutions]
if missing:
fail("missing kustomize_substitutions keys {} in tilt-settings.json".format(missing))
fail(
"missing kustomize_substitutions keys {} in tilt-settings.json".format(
missing,
),
)

tilt_helper_dockerfile_header = """
# Tilt image
Expand All @@ -119,29 +167,51 @@ def capg():
substitutions = settings.get("kustomize_substitutions", {})
os.environ.update(substitutions)

# yaml = str(kustomizesub("./hack/observability")) # build an observable kind deployment by default
yaml = str(kustomizesub("./config/default"))
yaml = str(
kustomizesub("./hack/observability"),
) # build an observable kind deployment by default
# TODO: consider to remove
# yaml = str(kustomizesub("./config/default"))

# add extra_args if they are defined
if settings.get("extra_args"):
gcp_extra_args = settings.get("extra_args").get("gcp")
if gcp_extra_args:
yaml_dict = decode_yaml_stream(yaml)
append_arg_for_container_in_deployment(yaml_dict, "capg-controller-manager", "capg-system", "cluster-api-gcp-controller", gcp_extra_args)
append_arg_for_container_in_deployment(
yaml_dict,
"capg-controller-manager",
"capg-system",
"cluster-api-gcp-controller",
gcp_extra_args,
)
yaml = str(encode_yaml_stream(yaml_dict))
yaml = fixup_yaml_empty_arrays(yaml)

# Set up a local_resource build of the provider's manager binary.
local_resource(
"manager",
cmd = 'mkdir -p .tiltbuild;CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags \'-extldflags "-static"\' -o .tiltbuild/manager',
deps = ["api", "cloud", "config", "controllers", "exp", "feature", "pkg", "go.mod", "go.sum", "main.go"],
cmd = "mkdir -p .tiltbuild;CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags '-extldflags \"-static\"' -o .tiltbuild/manager",
deps = [
"api",
"cloud",
"config",
"controllers",
"exp",
"feature",
"pkg",
"go.mod",
"go.sum",
"main.go",
],
)

dockerfile_contents = "\n".join([
tilt_helper_dockerfile_header,
tilt_dockerfile_header,
])
dockerfile_contents = "\n".join(
[
tilt_helper_dockerfile_header,
tilt_dockerfile_header,
],
)

entrypoint = ["sh", "/start.sh", "/manager"]
extra_args = settings.get("extra_args")
Expand All @@ -166,12 +236,62 @@ def capg():

k8s_yaml(blob(yaml))

def observability():
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setting up OTEL Collector + Jaeger using helm chart

# Install the OpenTelemetry helm chart
gcp_project_id = os.getenv("GCP_PROJECT_ID", "")

k8s_yaml(
helm(
"./hack/observability/opentelemetry/chart",
name = "opentelemetry-collector",
namespace = "capg-system",
values = ["./hack/observability/opentelemetry/values.yaml"],
# refer https://github.com/helm/helm/issues/1987
set = [
"extraEnvs[0].name=GCP_PROJECT_ID",
"extraEnvs[0].value=" + gcp_project_id,
],
),
)

k8s_yaml(
helm(
"./hack/observability/jaeger/chart",
name = "jaeger-all-in-one",
namespace = "capg-system",
set = [
# TODO: consider to remove
# "crd.install=false",
# "rbac.create=false",
"resources.limits.cpu=200m",
"resources.limits.memory=256Mi",
],
),
)

k8s_resource(
workload = "jaeger-all-in-one",
new_name = "traces: jaeger-all-in-one",
port_forwards = [
port_forward(16686, name = "View traces", link_path = "/search?service=capg"),
],
labels = ["observability"],
)

k8s_resource(workload = "opentelemetry-collector", labels = ["observability"])

def base64_encode(to_encode):
encode_blob = local("echo '{}' | tr -d '\n' | base64 - | tr -d '\n'".format(to_encode), quiet = True)
encode_blob = local(
"echo '{}' | tr -d '\n' | base64 - | tr -d '\n'".format(to_encode),
quiet = True,
)
return str(encode_blob)

def base64_encode_file(path_to_encode):
encode_blob = local("cat {} | tr -d '\n' | base64 - | tr -d '\n'".format(path_to_encode), quiet = True)
encode_blob = local(
"cat {} | tr -d '\n' | base64 - | tr -d '\n'".format(path_to_encode),
quiet = True,
)
return str(encode_blob)

def read_file_from_path(path_to_read):
Expand All @@ -187,9 +307,18 @@ def kustomizesub(folder):
return yaml

def waitforsystem():
local(kubectl_cmd + " wait --for=condition=ready --timeout=300s pod --all -n capi-kubeadm-bootstrap-system")
local(kubectl_cmd + " wait --for=condition=ready --timeout=300s pod --all -n capi-kubeadm-control-plane-system")
local(kubectl_cmd + " wait --for=condition=ready --timeout=300s pod --all -n capi-system")
local(
kubectl_cmd +
" wait --for=condition=ready --timeout=300s pod --all -n capi-kubeadm-bootstrap-system",
)
local(
kubectl_cmd +
" wait --for=condition=ready --timeout=300s pod --all -n capi-kubeadm-control-plane-system",
)
local(
kubectl_cmd +
" wait --for=condition=ready --timeout=300s pod --all -n capi-system",
)

##############################
# Actual work happens here
Expand All @@ -208,4 +337,6 @@ deploy_capi()

capg()

observability()

waitforsystem()
25 changes: 25 additions & 0 deletions cloud/scope/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"k8s.io/client-go/pkg/version"
"k8s.io/client-go/util/flowcontrol"
infrav1 "sigs.k8s.io/cluster-api-provider-gcp/api/v1beta1"
"sigs.k8s.io/cluster-api-provider-gcp/util/telemetry"
"sigs.k8s.io/controller-runtime/pkg/client"
)

Expand Down Expand Up @@ -89,6 +90,12 @@ func defaultClientOptions(ctx context.Context, credentialsRef *infrav1.ObjectRef
}

func newComputeService(ctx context.Context, credentialsRef *infrav1.ObjectReference, crClient client.Client) (*compute.Service, error) {

ctx, span := telemetry.Tracer().Start(
ctx, "cloud.clients.newComputeService",
)
defer span.End()

opts, err := defaultClientOptions(ctx, credentialsRef, crClient)
if err != nil {
return nil, fmt.Errorf("getting default gcp client options: %w", err)
Expand All @@ -103,6 +110,12 @@ func newComputeService(ctx context.Context, credentialsRef *infrav1.ObjectRefere
}

func newClusterManagerClient(ctx context.Context, credentialsRef *infrav1.ObjectReference, crClient client.Client) (*container.ClusterManagerClient, error) {

ctx, span := telemetry.Tracer().Start(
ctx, "cloud.clients.newClusterManagerClient",
)
defer span.End()

opts, err := defaultClientOptions(ctx, credentialsRef, crClient)
if err != nil {
return nil, fmt.Errorf("getting default gcp client options: %w", err)
Expand All @@ -117,6 +130,12 @@ func newClusterManagerClient(ctx context.Context, credentialsRef *infrav1.Object
}

func newIamCredentialsClient(ctx context.Context, credentialsRef *infrav1.ObjectReference, crClient client.Client) (*credentials.IamCredentialsClient, error) {

ctx, span := telemetry.Tracer().Start(
ctx, "cloud.clients.newIamCredentialsClient",
)
defer span.End()

opts, err := defaultClientOptions(ctx, credentialsRef, crClient)
if err != nil {
return nil, fmt.Errorf("getting default gcp client options: %w", err)
Expand All @@ -131,6 +150,12 @@ func newIamCredentialsClient(ctx context.Context, credentialsRef *infrav1.Object
}

func newInstanceGroupManagerClient(ctx context.Context, credentialsRef *infrav1.ObjectReference, crClient client.Client) (*computerest.InstanceGroupManagersClient, error) {

ctx, span := telemetry.Tracer().Start(
ctx, "cloud.clients.newInstanceGroupManagerClient",
)
defer span.End()

opts, err := defaultClientOptions(ctx, credentialsRef, crClient)
if err != nil {
return nil, fmt.Errorf("getting default gcp client options: %w", err)
Expand Down
7 changes: 7 additions & 0 deletions cloud/scope/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/utils/pointer"
infrav1 "sigs.k8s.io/cluster-api-provider-gcp/api/v1beta1"
"sigs.k8s.io/cluster-api-provider-gcp/cloud"
"sigs.k8s.io/cluster-api-provider-gcp/util/telemetry"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/util/patch"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -43,6 +44,12 @@ type ClusterScopeParams struct {
// NewClusterScope creates a new Scope from the supplied parameters.
// This is meant to be called for each reconcile iteration.
func NewClusterScope(ctx context.Context, params ClusterScopeParams) (*ClusterScope, error) {

ctx, span := telemetry.Tracer().Start(
ctx, "cloud.clusterScope.NewClusterScope",
)
defer span.End()

if params.Cluster == nil {
return nil, errors.New("failed to generate new scope from nil Cluster")
}
Expand Down
7 changes: 7 additions & 0 deletions cloud/scope/managedcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
infrav1 "sigs.k8s.io/cluster-api-provider-gcp/api/v1beta1"
"sigs.k8s.io/cluster-api-provider-gcp/cloud"
infrav1exp "sigs.k8s.io/cluster-api-provider-gcp/exp/api/v1beta1"
"sigs.k8s.io/cluster-api-provider-gcp/util/telemetry"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/util/patch"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -44,6 +45,12 @@ type ManagedClusterScopeParams struct {
// NewManagedClusterScope creates a new Scope from the supplied parameters.
// This is meant to be called for each reconcile iteration.
func NewManagedClusterScope(ctx context.Context, params ManagedClusterScopeParams) (*ManagedClusterScope, error) {

ctx, span := telemetry.Tracer().Start(
ctx, "cloud.managedClusterScope.NewManagedClusterScope",
)
defer span.End()

if params.Cluster == nil {
return nil, errors.New("failed to generate new scope from nil Cluster")
}
Expand Down
Loading
Loading