Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add JetStream support #56

Merged
merged 1 commit into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
251 changes: 251 additions & 0 deletions src/FishyFlip/ATJetStream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
// <copyright file="ATJetStream.cs" company="Drastic Actions">
// Copyright (c) Drastic Actions. All rights reserved.
// </copyright>

namespace FishyFlip;

/// <summary>
/// AT JetStream.
/// </summary>
public sealed class ATJetStream : IDisposable
{
private const int ReceiveBufferSize = 32768;
private readonly JsonSerializerOptions jsonSerializerOptions;
private readonly SourceGenerationContext sourceGenerationContext;
private ClientWebSocket client;
private bool disposedValue;
private ILogger? logger;
private Uri instanceUri;

/// <summary>
/// Initializes a new instance of the <see cref="ATJetStream"/> class.
/// </summary>
/// <param name="options"><see cref="ATJetStreamOptions"/>.</param>
public ATJetStream(ATJetStreamOptions options)
{
this.logger = options.Logger;
this.instanceUri = options.Url;
this.client = new ClientWebSocket();
this.jsonSerializerOptions = new JsonSerializerOptions()
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
PropertyNameCaseInsensitive = true,
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull | JsonIgnoreCondition.WhenWritingDefault,
Converters =
{
new AtUriJsonConverter(),
new AtHandlerJsonConverter(),
new AtDidJsonConverter(),
new EmbedConverter(),
new ATRecordJsonConverter(),
new ATCidConverter(),
new ATWebSocketCommitTypeConverter(),
new ATWebSocketEventConverter(),
},
};
this.sourceGenerationContext = new SourceGenerationContext(this.jsonSerializerOptions);
}

/// <summary>
/// On Connection Updated.
/// </summary>
public event EventHandler<SubscriptionConnectionStatusEventArgs>? OnConnectionUpdated;

/// <summary>
/// On Raw Message Received.
/// </summary>
public event EventHandler<JetStreamRawMessageEventArgs>? OnRawMessageReceived;

/// <summary>
/// On AT WebSocket Record Received.
/// </summary>
public event EventHandler<JetStreamATWebSocketRecordEventArgs>? OnRecordReceived;

/// <inheritdoc/>
void IDisposable.Dispose()
{
this.Dispose(disposing: true);
GC.SuppressFinalize(this);
}

/// <summary>
/// Connect to the JetStream instance via a WebSocket connection.
/// </summary>
/// <param name="wantedCollections">List of collection namespaces (ex. app.bsky.feed.post) you want to receive. Defaults to all.</param>
/// <param name="wantedDids">List of User ATDids to filter for. Defaults to All Repos.</param>
/// <param name="cursor">A unix microseconds timestamp cursor to begin playback from. Set the value from a previous <see cref="ATWebSocketRecord.TimeUs"/> value to start stream from this point. Defaults to live tail.</param>
/// <param name="token">CancellationToken.</param>
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
public Task ConnectAsync(string[]? wantedCollections = default, string[]? wantedDids = default, long cursor = 0, CancellationToken? token = default)
{
var subscribe = "/subscribe?";
if (wantedCollections is not null && wantedCollections.Length > 0)
{
foreach (var collection in wantedCollections)
{
subscribe += $"wantedCollections={collection}&";
}
}

if (wantedDids is not null && wantedDids.Length > 0)
{
foreach (var did in wantedDids)
{
subscribe += $"wantedDids={did}&";
}
}

if (cursor > 0)
{
subscribe += $"cursor={cursor}&";
}

if (subscribe.EndsWith("&"))
{
subscribe = subscribe.Substring(0, subscribe.Length - 1);
}

this.logger?.LogInformation($"WSS: Connecting to {this.instanceUri}{subscribe}");

return this.ConnectAsync(subscribe, token);
}

/// <summary>
/// Connect to the JetStream instance via a WebSocket connection.
/// </summary>
/// <param name="connection">Connection string.</param>
/// <param name="token">CancellationToken.</param>
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
public async Task ConnectAsync(string connection, CancellationToken? token = default)
{
if (this.client.State == WebSocketState.Open)
{
return;
}

if (this.client.State == WebSocketState.Aborted || this.client.State == WebSocketState.Closed)
{
this.client = new ClientWebSocket();
}

var endToken = token ?? CancellationToken.None;
await this.client.ConnectAsync(new Uri($"wss://{this.instanceUri.Host}{connection}"), endToken);
this.logger?.LogInformation($"WSS: Connected to {this.instanceUri}");
this.ReceiveMessages(this.client, endToken).FireAndForgetSafeAsync(this.logger);
this.OnConnectionUpdated?.Invoke(this, new SubscriptionConnectionStatusEventArgs(this.client.State));
}

/// <summary>
/// Close the existing WebSocket connection.
/// </summary>
/// <param name="status">Status for the shutdown. Defaults to <see cref="WebSocketCloseStatus.NormalClosure"/>.</param>
/// <param name="disconnectReason">Reason for the shutdown.</param>
/// <param name="token">CancellationToken.</param>
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
public async Task CloseAsync(WebSocketCloseStatus status = WebSocketCloseStatus.NormalClosure, string disconnectReason = "Client disconnecting", CancellationToken? token = default)
{
var endToken = token ?? CancellationToken.None;
this.logger?.LogInformation($"WSS: Disconnecting");
try
{
await this.client.CloseAsync(status, disconnectReason, endToken);
}
catch (Exception ex)
{
this.logger?.LogError(ex, "Failed to Close WebSocket connection.");
}

this.OnConnectionUpdated?.Invoke(this, new SubscriptionConnectionStatusEventArgs(this.client.State));
}

/// <summary>
/// Dispose.
/// </summary>
public void Dispose()
{
this.Dispose(disposing: true);
GC.SuppressFinalize(this);
}

private async Task ReceiveMessages(ClientWebSocket webSocket, CancellationToken token)
{
byte[] receiveBuffer = new byte[ReceiveBufferSize];
while (webSocket.State == WebSocketState.Open)
{
try
{
#if NETSTANDARD
var result =
await webSocket.ReceiveAsync(new ArraySegment<byte>(receiveBuffer), token);
if (result is not { MessageType: WebSocketMessageType.Text, EndOfMessage: true })
{
continue;
}

byte[] newArray = new byte[result.Count];
Array.Copy(receiveBuffer, 0, newArray, 0, result.Count);
#else
var result =
await webSocket.ReceiveAsync(new Memory<byte>(receiveBuffer), token);
if (result is not { MessageType: WebSocketMessageType.Text, EndOfMessage: true })
{
continue;
}

// Convert result to string
byte[] newArray = new byte[result.Count];
Array.Copy(receiveBuffer, 0, newArray, 0, result.Count);
#endif
var message = Encoding.UTF8.GetString(newArray);
this.OnRawMessageReceived?.Invoke(this, new JetStreamRawMessageEventArgs(message));
Task.Run(() => this.HandleMessage(message)).FireAndForgetSafeAsync(this.logger);
}
catch (OperationCanceledException)
{
this.logger?.LogDebug("WSS: Operation Canceled.");
}
catch (Exception e)
{
this.logger?.LogError(e, "WSS: ATError receiving message.");
}
}

this.OnConnectionUpdated?.Invoke(this, new SubscriptionConnectionStatusEventArgs(webSocket.State));
}

private void HandleMessage(string json)
{
if (string.IsNullOrEmpty(json))
{
this.logger?.LogDebug("WSS: Empty message received.");
return;
}

var atWebSocketRecord = JsonSerializer.Deserialize<ATWebSocketRecord>(json, this.sourceGenerationContext.ATWebSocketRecord);
if (atWebSocketRecord is null)
{
this.logger?.LogError("WSS: Failed to deserialize ATWebSocketRecord.");
this.logger?.LogError(json);
return;
}

this.OnRecordReceived?.Invoke(this, new JetStreamATWebSocketRecordEventArgs(atWebSocketRecord));
}

/// <summary>
/// Dispose.
/// </summary>
/// <param name="disposing">Is Disposing.</param>
private void Dispose(bool disposing)
{
if (!this.disposedValue)
{
if (disposing)
{
this.client.Dispose();
}

this.disposedValue = true;
}
}
}
72 changes: 72 additions & 0 deletions src/FishyFlip/ATJetStreamBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// <copyright file="ATJetStreamBuilder.cs" company="Drastic Actions">
// Copyright (c) Drastic Actions. All rights reserved.
// </copyright>

namespace FishyFlip;

/// <summary>
/// AT JetStream Builder.
/// </summary>
public class ATJetStreamBuilder
{
private readonly ATJetStreamOptions atProtocolOptions;

/// <summary>
/// Initializes a new instance of the <see cref="ATJetStreamBuilder"/> class.
/// </summary>
public ATJetStreamBuilder()
{
this.atProtocolOptions = new ATJetStreamOptions();
}

/// <summary>
/// Initializes a new instance of the <see cref="ATJetStreamBuilder"/> class.
/// </summary>
/// <param name="options">ATJetStreamOptions.</param>
public ATJetStreamBuilder(ATJetStreamOptions options)
{
this.atProtocolOptions = options;
}

/// <summary>
/// Set the instance url to connect to.
/// </summary>
/// <param name="url">Instance Url.</param>
/// <returns><see cref="ATJetStreamBuilder"/>.</returns>
public ATJetStreamBuilder WithInstanceUrl(Uri url)
{
this.atProtocolOptions.Url = url;
return this;
}

/// <summary>
/// Adds a logger.
/// </summary>
/// <param name="logger">Logger.</param>
/// <returns><see cref="ATJetStreamBuilder"/>.</returns>
public ATJetStreamBuilder WithLogger(ILogger? logger)
{
this.atProtocolOptions.Logger = logger;
return this;
}

/// <summary>
/// Returns the ATWebSocketProtocolOptions.
/// </summary>
/// <returns>ATJetStreamBuilder.</returns>
public ATJetStreamOptions BuildOptions()
{
return this.atProtocolOptions;
}

/// <summary>
/// Builds the Protocol.
/// </summary>
/// <returns>The <seealso cref="ATProtocol"/> build with these configs.</returns>
public ATJetStream Build()
{
var options = this.BuildOptions();

return new ATJetStream(options);
}
}
29 changes: 29 additions & 0 deletions src/FishyFlip/ATJetStreamOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// <copyright file="ATJetStreamOptions.cs" company="Drastic Actions">
// Copyright (c) Drastic Actions. All rights reserved.
// </copyright>

namespace FishyFlip;

/// <summary>
/// AT JetStream Options.
/// </summary>
public class ATJetStreamOptions
{
/// <summary>
/// Initializes a new instance of the <see cref="ATJetStreamOptions"/> class.
/// </summary>
public ATJetStreamOptions()
{
this.Url = new Uri("https://jetstream.atproto.tools");
}

/// <summary>
/// Gets the instance Url.
/// </summary>
public Uri Url { get; internal set; }

/// <summary>
/// Gets the logger.
/// </summary>
public ILogger? Logger { get; internal set; }
}
25 changes: 25 additions & 0 deletions src/FishyFlip/Events/JetStreamATWebSocketRecordEventArgs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// <copyright file="JetStreamATWebSocketRecordEventArgs.cs" company="Drastic Actions">
// Copyright (c) Drastic Actions. All rights reserved.
// </copyright>

namespace FishyFlip.Events;

/// <summary>
/// JetStream AT WebSocket Record Event Args.
/// </summary>
public class JetStreamATWebSocketRecordEventArgs : EventArgs
{
/// <summary>
/// Initializes a new instance of the <see cref="JetStreamATWebSocketRecordEventArgs"/> class.
/// </summary>
/// <param name="record"><see cref="ATWebSocketRecord"/>.</param>
public JetStreamATWebSocketRecordEventArgs(ATWebSocketRecord record)
{
this.Record = record;
}

/// <summary>
/// Gets the AT WebSocket Record.
/// </summary>
public ATWebSocketRecord Record { get; }
}
Loading