Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for OpenTelemetry in worker #1058

Merged
merged 2 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 122 additions & 30 deletions protobuf/src/proto/FunctionRpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ message StreamingMessage {
WorkerInitRequest worker_init_request = 17;
// Worker responds after initializing with its capabilities & status
WorkerInitResponse worker_init_response = 16;


// MESSAGE NOT USED
// Worker periodically sends empty heartbeat message to host
WorkerHeartbeat worker_heartbeat = 15;

Expand Down Expand Up @@ -85,6 +86,13 @@ message StreamingMessage {

// Host gets the list of function load responses
FunctionLoadResponseCollection function_load_response_collection = 32;

// Host sends required metadata to worker to warmup the worker
WorkerWarmupRequest worker_warmup_request = 33;

// Worker responds after warming up with the warmup result
WorkerWarmupResponse worker_warmup_response = 34;

}
}

Expand Down Expand Up @@ -120,7 +128,7 @@ message WorkerInitRequest {

// Worker responds with the result of initializing itself
message WorkerInitResponse {
// NOT USED
// PROPERTY NOT USED
// TODO: Remove from protobuf during next breaking change release
string worker_version = 1;

Expand Down Expand Up @@ -173,7 +181,7 @@ message StatusResult {
repeated RpcLog logs = 3;
}

// NOT USED
// MESSAGE NOT USED
// TODO: Remove from protobuf during next breaking change release
message WorkerHeartbeat {}

Expand All @@ -187,7 +195,7 @@ message WorkerTerminate {
message FileChangeEventRequest {
// Types of File change operations (See link for more info: https://msdn.microsoft.com/en-us/library/t6xf43e0(v=vs.110).aspx)
enum Type {
Unknown = 0;
Unknown = 0;
Created = 1;
Deleted = 2;
Changed = 4;
Expand Down Expand Up @@ -237,8 +245,26 @@ message FunctionEnvironmentReloadRequest {
}

message FunctionEnvironmentReloadResponse {
enum CapabilitiesUpdateStrategy {
// overwrites existing values and appends new ones
// ex. worker init: {A: foo, B: bar} + env reload: {A:foo, B: foo, C: foo} -> {A: foo, B: foo, C: foo}
merge = 0;
// existing capabilities are cleared and new capabilities are applied
// ex. worker init: {A: foo, B: bar} + env reload: {A:foo, C: foo} -> {A: foo, C: foo}
replace = 1;
}
// After specialization, worker sends capabilities & metadata.
// Worker metadata captured for telemetry purposes
WorkerMetadata worker_metadata = 1;

// A map of worker supported features/capabilities
map<string, string> capabilities = 2;

// Status of the response
StatusResult result = 3;

// If no strategy is defined, the host will default to merge
CapabilitiesUpdateStrategy capabilities_update_strategy = 4;
}

// Tell the out-of-proc worker to close any shared memory maps it allocated for given invocation
Expand Down Expand Up @@ -322,10 +348,13 @@ message RpcFunctionMetadata {
// A flag indicating if managed dependency is enabled or not
bool managed_dependency_enabled = 14;

// The optional function execution retry strategy to use on invocation failures.
RpcRetryOptions retry_options = 15;

// Properties for function metadata
// They're usually specific to a worker and largely passed along to the controller API for use
// outside the host
map<string,string> Properties = 16;
map<string,string> properties = 16;
}

// Host tells worker it is ready to receive metadata
Expand Down Expand Up @@ -369,14 +398,14 @@ message InvocationRequest {

// Host sends ActivityId, traceStateString and Tags from host
message RpcTraceContext {
// This corresponds to Activity.Current?.Id
string trace_parent = 1;
// This corresponds to Activity.Current?.Id
string trace_parent = 1;

// This corresponds to Activity.Current?.TraceStateString
string trace_state = 2;
// This corresponds to Activity.Current?.TraceStateString
string trace_state = 2;

// This corresponds to Activity.Current?.Tags
map<string, string> attributes = 3;
// This corresponds to Activity.Current?.Tags
map<string, string> attributes = 3;
}

// Host sends retry context for a function invocation
Expand All @@ -396,8 +425,8 @@ message InvocationCancel {
// Unique id for invocation
string invocation_id = 2;

// Time period before force shutdown
google.protobuf.Duration grace_period = 1; // could also use absolute time
// PROPERTY NOT USED
google.protobuf.Duration grace_period = 1;
}

// Worker responds with status of Invocation
Expand All @@ -415,6 +444,15 @@ message InvocationResponse {
StatusResult result = 3;
}

message WorkerWarmupRequest {
// Full path of worker.config.json location
string worker_directory = 1;
}

message WorkerWarmupResponse {
StatusResult result = 1;
}

// Used to encapsulate data which could be a variety of types
message TypedData {
oneof data {
Expand All @@ -429,6 +467,8 @@ message TypedData {
CollectionString collection_string = 9;
CollectionDouble collection_double = 10;
CollectionSInt64 collection_sint64 = 11;
ModelBindingData model_binding_data = 12;
CollectionModelBindingData collection_model_binding_data = 13;
}
}

Expand Down Expand Up @@ -496,20 +536,20 @@ message ParameterBinding {

// Used to describe a given binding on load
message BindingInfo {
// Indicates whether it is an input or output binding (or a fancy inout binding)
enum Direction {
in = 0;
out = 1;
inout = 2;
}

// Indicates the type of the data for the binding
enum DataType {
undefined = 0;
string = 1;
binary = 2;
stream = 3;
}
// Indicates whether it is an input or output binding (or a fancy inout binding)
enum Direction {
in = 0;
out = 1;
inout = 2;
}

// Indicates the type of the data for the binding
enum DataType {
undefined = 0;
string = 1;
binary = 2;
stream = 3;
}

// Type of binding (e.g. HttpTrigger)
string type = 2;
Expand All @@ -518,6 +558,9 @@ message BindingInfo {
Direction direction = 3;

DataType data_type = 4;

// Properties for binding metadata
map<string, string> properties = 5;
}

// Used to send logs back to the Host
Expand Down Expand Up @@ -582,13 +625,13 @@ message RpcException {
// Textual message describing the exception
string message = 2;

// Worker specifies whether exception is a user exception,
// for purpose of application insights logging. Defaults to false.
// Worker specifies whether exception is a user exception,
// for purpose of application insights logging. Defaults to false.
bool is_user_exception = 4;

// Type of exception. If it's a user exception, the type is passed along to app insights.
// Otherwise, it's ignored for now.
string type = 5;
string type = 5;
}

// Http cookie type. Note that only name and value are used for Http requests
Expand Down Expand Up @@ -647,3 +690,52 @@ message RpcHttp {
map<string,NullableString> nullable_params = 21;
map<string,NullableString> nullable_query = 22;
}

// Message representing Microsoft.Azure.WebJobs.ParameterBindingData
// Used for hydrating SDK-type bindings in out-of-proc workers
message ModelBindingData
{
// The version of the binding data content
string version = 1;

// The extension source of the binding data
string source = 2;

// The content type of the binding data content
string content_type = 3;

// The binding data content
bytes content = 4;
}

// Used to encapsulate collection model_binding_data
message CollectionModelBindingData {
repeated ModelBindingData model_binding_data = 1;
}

// Retry policy which the worker sends the host when the worker indexes
// a function.
message RpcRetryOptions
{
// The retry strategy to use. Valid values are fixed delay or exponential backoff.
enum RetryStrategy
{
exponential_backoff = 0;
fixed_delay = 1;
}

// The maximum number of retries allowed per function execution.
// -1 means to retry indefinitely.
int32 max_retry_count = 2;

// The delay that's used between retries when you're using a fixed delay strategy.
google.protobuf.Duration delay_interval = 3;

// The minimum retry delay when you're using an exponential backoff strategy
google.protobuf.Duration minimum_interval = 4;

// The maximum retry delay when you're using an exponential backoff strategy
google.protobuf.Duration maximum_interval = 5;

RetryStrategy retry_strategy = 6;
}
33 changes: 1 addition & 32 deletions src/DurableSDK/PowerShellServices.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,10 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Durable
using System.Collections.ObjectModel;
using System.Linq;
using System.Management.Automation;
using System.Reflection.Metadata;
using Microsoft.Azure.Functions.PowerShellWorker.PowerShell;
using Microsoft.Azure.Functions.PowerShellWorker.Utility;
using Microsoft.Azure.WebJobs.Script.Grpc.Messages;
using Newtonsoft.Json;
using LogLevel = WebJobs.Script.Grpc.Messages.RpcLog.Types.Level;

internal class PowerShellServices : IPowerShellServices
{
Expand Down Expand Up @@ -43,36 +41,7 @@ public PowerShellServices(PowerShell pwsh, ILogger logger)

public bool isExternalDurableSdkLoaded()
{
// Search for the external DF SDK in the current session
var matchingModules = _pwsh.AddCommand(Utils.GetModuleCmdletInfo)
.AddParameter("FullyQualifiedName", Utils.ExternalDurableSdkName)
.InvokeAndClearCommands<PSModuleInfo>();

// If we get at least one result, we know the external SDK was imported
var numCandidates = matchingModules.Count();
var isModuleInCurrentSession = numCandidates > 0;

if (isModuleInCurrentSession)
{
var candidatesInfo = matchingModules.Select(module => string.Format(
PowerShellWorkerStrings.FoundExternalDurableSdkInSession, module.Name, module.Version, module.Path));
var externalSDKModuleInfo = string.Join('\n', candidatesInfo);

if (numCandidates > 1)
{
// If there's more than 1 result, there may be runtime conflicts
// warn user of potential conflicts
_logger.Log(isUserOnlyLog: false, LogLevel.Warning, String.Format(
PowerShellWorkerStrings.MultipleExternalSDKsInSession,
numCandidates, Utils.ExternalDurableSdkName, externalSDKModuleInfo));
}
else
{
// a single external SDK is in session. Report its metadata
_logger.Log(isUserOnlyLog: false, LogLevel.Trace, externalSDKModuleInfo);
}
}
return isModuleInCurrentSession;
return PowerShellModuleDetector.IsPowerShellModuleLoaded(_pwsh, _logger, Utils.ExternalDurableSdkName);
}

public void EnableExternalDurableSDK()
Expand Down
20 changes: 20 additions & 0 deletions src/Logging/LoggingEventHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System;
using System.Collections.Generic;

namespace Microsoft.Azure.Functions.PowerShellWorker.Utility
{
internal class LoggingEventHandler
{
private Action<string, string, Exception> _eventHandler = (a, b, c) => { };

public void Subscribe(Action<string, string, Exception> handler)
{
_eventHandler = handler;
}

public void LogToHandlers(string level, string message, Exception exception = null)
{
_eventHandler(level, message, exception);
}
}
}
3 changes: 3 additions & 0 deletions src/Logging/RpcLogger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ internal class RpcLogger : ILogger
private readonly MessagingStream _msgStream;
private string _invocationId;
private string _requestId;
public LoggingEventHandler outputLogHandler = new LoggingEventHandler();

internal RpcLogger(MessagingStream msgStream)
{
Expand Down Expand Up @@ -55,6 +56,8 @@ public void Log(bool isUserOnlyLog, LogLevel logLevel, string message, Exception
};

_msgStream.Write(logMessage);

outputLogHandler.LogToHandlers(logLevel.ToString(), message, exception);
}
else
{
Expand Down
16 changes: 16 additions & 0 deletions src/OpenTelemetry/IPowerShellServicesForOpenTelemetry.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Microsoft.Azure.Functions.PowerShellWorker.OpenTelemetry
{
internal interface IPowerShellServicesForOpenTelemetry
{
bool? IsModuleLoaded();
void AddStartOpenTelemetryInvocationCommand(OpenTelemetryInvocationContext otelContext);
void StopOpenTelemetryInvocation(OpenTelemetryInvocationContext otelContext, bool invokeCommands);
void StartFunctionsLoggingListener(bool invokeCommands);
}
}
Loading
Loading