Skip to content

Commit

Permalink
EventQueue rewrite. Real world doesn't match wiki docu in the slighte…
Browse files Browse the repository at this point in the history
…st... sorry about that!
  • Loading branch information
cinderblocks committed Feb 5, 2023
1 parent 27789b8 commit 3b78c40
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 95 deletions.
188 changes: 103 additions & 85 deletions LibreMetaverse/Capabilities/EventQueueClient.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* Copyright (c) 2006-2016, openmetaverse.co
* Copyright (c) 2022, Sjofn LLC.
* Copyright (c) 2022-2023, Sjofn LLC.
* All rights reserved.
*
* - Redistribution and use in source and binary forms, with or without
Expand Down Expand Up @@ -38,23 +38,24 @@ namespace OpenMetaverse.Http
public class EventQueueClient
{
/// <summary>For exponential backoff on error.</summary>
public const int REQUEST_BACKOFF_SECONDS = 15 * 1000; // 15 seconds start
public const int REQUEST_BACKOFF_SECONDS_INC = 5 * 1000; // 5 seconds increase
public const int REQUEST_BACKOFF_SECONDS_MAX = 5 * 60 * 1000; // 5 minutes
private const int REQUEST_BACKOFF_SECONDS = 15 * 1000; // 15 seconds start
private const int REQUEST_BACKOFF_SECONDS_INC = 5 * 1000; // 5 seconds increase
private const int REQUEST_BACKOFF_SECONDS_MAX = 5 * 60 * 1000; // 5 minutes

public delegate void ConnectedCallback();
public delegate void EventCallback(string eventName, OSDMap body);

public ConnectedCallback OnConnected;
public EventCallback OnEvent;

public bool Running => _Running;
public bool Running { get; private set; }

private readonly Uri _Address;
private readonly Simulator _Simulator;
private readonly CancellationTokenSource _queueCts;
private bool _Dead;
private OSD _AckPayload;

protected readonly Uri _Address;
protected readonly Simulator _Simulator;
protected bool _Dead;
protected bool _Running;
private CancellationTokenSource _QueueCts;

/// <summary>Number of times we've received an unknown CAPS exception in series.</summary>
private int _errorCount;
Expand All @@ -63,25 +64,71 @@ public EventQueueClient(Uri eventQueueLocation, Simulator sim)
{
_Address = eventQueueLocation;
_Simulator = sim;
_queueCts = new CancellationTokenSource();
}

public void Start()
{
_Dead = false;
_Running = true;

// Create an EventQueueGet request
OSDMap payload = new OSDMap {["ack"] = new OSD(), ["done"] = OSD.FromBoolean(false)};
Running = true;

_QueueCts = new CancellationTokenSource();
CancellationTokenSource timeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
using (CancellationTokenSource linkedCts =
CancellationTokenSource.CreateLinkedTokenSource(_QueueCts.Token, timeoutCts.Token))
Task.Factory.StartNew(() =>
{
Task req = _Simulator.Client.HttpCapsClient.PostRequestAsync(_Address, OSDFormat.Xml, payload,
linkedCts.Token,
RequestCompletedHandler, null, ConnectedResponseHandler);
}
// Initial request
_AckPayload = new OSDMap { ["ack"] = new OSD(), ["done"] = OSD.FromBoolean(false) };
var initialTimeoutCts = CancellationTokenSource.CreateLinkedTokenSource(_queueCts.Token);
initialTimeoutCts.CancelAfter(TimeSpan.FromSeconds(30));
Task initialReq = _Simulator.Client.HttpCapsClient.PostRequestAsync(_Address, OSDFormat.Xml, _AckPayload,
initialTimeoutCts.Token, RequestCompletedHandler, null, ConnectedResponseHandler);
initialReq.Wait(initialTimeoutCts.Token);
initialTimeoutCts.Dispose();
while (Running && !_queueCts.Token.IsCancellationRequested)
{
if (_errorCount > 0)
{ // Exponentially back off, so we don't hammer the CPU
while (!_queueCts.Token.IsCancellationRequested)
{
_queueCts.Token.WaitHandle.WaitOne(
Math.Min(REQUEST_BACKOFF_SECONDS + _errorCount * REQUEST_BACKOFF_SECONDS_INC,
REQUEST_BACKOFF_SECONDS_MAX));
}
}
// Resume the connection.
if (!_queueCts.Token.IsCancellationRequested)
{
using (CancellationTokenSource timeoutCts =
CancellationTokenSource.CreateLinkedTokenSource(_queueCts.Token))
{
timeoutCts.CancelAfter(TimeSpan.FromSeconds(30));
try
{
Task req = _Simulator.Client.HttpCapsClient.PostRequestAsync(_Address, OSDFormat.Xml,
_AckPayload, timeoutCts.Token, RequestCompletedHandler);
req.Wait(timeoutCts.Token);
}
catch (OperationCanceledException)
{
/*no-op*/
}
}
}
// If the event queue is dead at this point, turn it off since
// that was the last thing we want to do
if (_Dead)
{
Running = false;
Logger.DebugLog($"Sent event queue shutdown message for {_Simulator}");
}
}
}, _queueCts.Token); // TOOD: TaskCreationOptions.LongRunning?

}

public void Stop(bool immediate)
Expand All @@ -90,17 +137,17 @@ public void Stop(bool immediate)

if (immediate)
{
_Running = false;
Running = false;
}

_QueueCts.Cancel();
_queueCts.Cancel();
}

void ConnectedResponseHandler(HttpResponseMessage response)
{
if (!response.IsSuccessStatusCode) { return; }

_Running = true;
Running = true;

// The event queue is starting up for the first time
if (OnConnected != null)
Expand All @@ -126,56 +173,57 @@ void RequestCompletedHandler(HttpResponseMessage response, byte[] responseData,
}
else
{
Logger.Log($"Got an unparseable response from {_Simulator} event queue: \"" +
Logger.Log($"Received an invalid response from {_Simulator} event queue: \"" +
System.Text.Encoding.UTF8.GetString(responseData) + "\"", Helpers.LogLevel.Warning);
}
}
else if (error != null)
else if (response != null && error != null)
{
#region Error handling

switch (response.StatusCode)
{
case HttpStatusCode.NotFound:
case HttpStatusCode.Gone:
Logger.Log($"Closing event queue at {_Simulator} due to missing caps URI", Helpers.LogLevel.Info);
Logger.Log($"Closing event queue at {_Simulator} due to missing caps URI", Helpers.LogLevel.Info, _Simulator.Client);

_Running = false;
Running = false;
_Dead = true;
break;
case (HttpStatusCode)499: // weird error returned occasionally, ignore for now
// I believe this is the timeout error invented by LL for LSL HTTP-out requests (gwyneth 20220413)
Logger.Log($"Possible HTTP-out timeout error from {_Simulator}, no need to continue", Helpers.LogLevel.Debug);
Logger.Log($"Possible HTTP-out timeout error from {_Simulator}, no need to continue", Helpers.LogLevel.Debug, _Simulator.Client);

_Running = false;
Running = false;
_Dead = true;
break;
case HttpStatusCode.InternalServerError:
// As per LL's instructions, we ought to consider this a
// 'request to close client' (gwyneth 20220413)
Logger.Log($"Grid sent a {response.StatusCode} at {_Simulator}, closing connection", Helpers.LogLevel.Debug);
Logger.Log($"Grid sent a {response.StatusCode} at {_Simulator}, closing connection", Helpers.LogLevel.Debug, _Simulator.Client);

// ... but do we happen to have an InnerException? Log it!
if (error.InnerException != null)
{
// unravel the whole inner error message, so we finally figure out what it is!
// (gwyneth 20220414)
Logger.Log($"Unrecognized internal caps exception from {_Simulator}: '{error.InnerException.Message}'", Helpers.LogLevel.Warning);
Logger.Log($"\nMessage ---\n{error.Message}", Helpers.LogLevel.Warning);
Logger.Log($"Unrecognized internal caps exception from {_Simulator}: '{error.InnerException.Message}'",
Helpers.LogLevel.Warning, _Simulator.Client);
Logger.Log($"\nMessage ---\n{error.Message}", Helpers.LogLevel.Warning, _Simulator.Client);
if (error.Data.Count > 0)
{
Logger.Log(" Extra details:", Helpers.LogLevel.Warning);
foreach (DictionaryEntry de in error.Data)
{
Logger.Log(String.Format(" Key: {0,-20} Value: '{1}'",
de.Key, de.Value),
Helpers.LogLevel.Warning);
Helpers.LogLevel.Warning, _Simulator.Client);
}
}
// but we'll nevertheless close this connection (gwyneth 20220414)
}

_Running = false;
Running = false;
_Dead = true;
break;
case HttpStatusCode.BadGateway:
Expand All @@ -188,7 +236,8 @@ void RequestCompletedHandler(HttpResponseMessage response, byte[] responseData,
// Note: if this condition persists, it _might_ be the grid trying to request
// that the client closes the connection, as per LL's specs (gwyneth 20220414)
Logger.Log($"Grid sent a Bad Gateway Error at {_Simulator}; " +
$"probably a time-out from the grid's EventQueue server (normal) -- ignoring and continuing", Helpers.LogLevel.Debug);
$"probably a time-out from the grid's EventQueue server (normal) -- ignoring and continuing",
Helpers.LogLevel.Debug, _Simulator.Client);
break;
default:
++_errorCount;
Expand All @@ -197,86 +246,55 @@ void RequestCompletedHandler(HttpResponseMessage response, byte[] responseData,
if (response.StatusCode != HttpStatusCode.OK)
{
Logger.Log($"Unrecognized caps connection problem from {_Simulator}: {response.StatusCode} {response.ReasonPhrase}",
Helpers.LogLevel.Warning);
Helpers.LogLevel.Warning, _Simulator.Client);
}
else if (error.InnerException != null)
{
// see comment above (gwyneth 20220414)
Logger.Log($"Unrecognized internal caps exception from {_Simulator}: '{error.InnerException.Message}'", Helpers.LogLevel.Warning);
Logger.Log($"Message ---\n{error.Message}", Helpers.LogLevel.Warning);
Logger.Log($"Unrecognized internal caps exception from {_Simulator}: '{error.InnerException.Message}'",
Helpers.LogLevel.Warning, _Simulator.Client);
Logger.Log($"Message ---\n{error.Message}", Helpers.LogLevel.Warning, _Simulator.Client);
if (error.Data.Count > 0)
{
Logger.Log(" Extra details:", Helpers.LogLevel.Warning);
Logger.Log(" Extra details:", Helpers.LogLevel.Warning, _Simulator.Client);
foreach (DictionaryEntry de in error.Data)
{
Logger.Log(string.Format(" Key: {0,-20} Value: {1}",
"'" + de.Key + "'", de.Value),
Helpers.LogLevel.Warning);
Helpers.LogLevel.Warning, _Simulator.Client);
}
}
}
else
{
Logger.Log($"Unrecognized caps exception from {_Simulator}: {error.Message}",
Helpers.LogLevel.Warning);
Helpers.LogLevel.Warning, _Simulator.Client);
}
break;
} // end switch

#endregion Error handling
}
else if (error != null)
{
++_errorCount;
Logger.Log($"Exception thrown from {_Simulator} event queue: " +
$"{(error.InnerException != null ? error.InnerException.Message : error.Message)}", Helpers.LogLevel.Warning, _Simulator.Client);
}
else
{
++_errorCount;

Logger.Log($"No response from {_Simulator} event queue but no reported error either", Helpers.LogLevel.Warning);
Logger.Log($"No response from {_Simulator} event queue with no reported error", Helpers.LogLevel.Warning, _Simulator.Client);
}

#pragma warning disable CS0164 // This label has not been referenced
HandlingDone:

#region Resume the connection

if (_Running)

if (Running)
{
CancellationToken queueCancelToken = _QueueCts.Token;
OSDMap payload = new OSDMap();
if (ack != 0) { payload["ack"] = OSD.FromInteger(ack); }
else { payload["ack"] = new OSD(); }
payload["done"] = OSD.FromBoolean(_Dead);

if (_errorCount > 0)
{ // Exponentially back off, so we don't hammer the CPU
while (!queueCancelToken.IsCancellationRequested)
{
queueCancelToken.WaitHandle.WaitOne(
Math.Min(REQUEST_BACKOFF_SECONDS + _errorCount * REQUEST_BACKOFF_SECONDS_INC,
REQUEST_BACKOFF_SECONDS_MAX));
}
}
// Resume the connection.
if (!queueCancelToken.IsCancellationRequested)
{
CancellationTokenSource timeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
using (CancellationTokenSource linkedCts =
CancellationTokenSource.CreateLinkedTokenSource(queueCancelToken, timeoutCts.Token))
{
Task req = _Simulator.Client.HttpCapsClient.PostRequestAsync(_Address, OSDFormat.Xml,
payload, queueCancelToken, RequestCompletedHandler);
}
}

// If the event queue is dead at this point, turn it off since
// that was the last thing we want to do
if (_Dead)
{
_Running = false;
Logger.DebugLog($"Sent event queue shutdown message for {_Simulator}");
}
_AckPayload = payload;
}
#pragma warning restore CS0164 // This label has not been referenced

#endregion Resume the connection

#region Handle incoming events

Expand Down
27 changes: 17 additions & 10 deletions LibreMetaverse/Capabilities/HttpCapsClient.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022, Sjofn, LLC.
* Copyright (c) 2022-2023, Sjofn, LLC.
* All rights reserved.
*
* - Redistribution and use in source and binary forms, with or without
Expand Down Expand Up @@ -234,19 +234,26 @@ public async Task DeleteRequestAsync(Uri uri, OSDFormat format, OSD payload, Can
private async Task SendRequestAsync(HttpRequestMessage request, CancellationToken? cancellationToken,
DownloadCompleteHandler completeHandler, DownloadProgressHandler progressHandler, ConnectedHandler connectedHandler)
{
using (var response = (cancellationToken.HasValue)
? await SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken.Value)
: await SendAsync(request, HttpCompletionOption.ResponseHeadersRead))
try
{
if (!response.IsSuccessStatusCode)
using (var response = (cancellationToken.HasValue)
? await SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken.Value)
: await SendAsync(request, HttpCompletionOption.ResponseHeadersRead))
{
completeHandler?.Invoke(response, null,
new HttpRequestException(response.StatusCode + " " + response.ReasonPhrase));
}
if (!response.IsSuccessStatusCode)
{
completeHandler?.Invoke(response, null,
new HttpRequestException(response.StatusCode + " " + response.ReasonPhrase));
}

connectedHandler?.Invoke(response);
connectedHandler?.Invoke(response);

await ProcessResponseAsync(response, cancellationToken, completeHandler, progressHandler);
await ProcessResponseAsync(response, cancellationToken, completeHandler, progressHandler);
}
}
catch (HttpRequestException ex)
{
completeHandler?.Invoke(null, null, ex);
}
}

Expand Down

0 comments on commit 3b78c40

Please sign in to comment.