diff --git a/lib/src/binding_http/http_client.dart b/lib/src/binding_http/http_client.dart index 5be7e8a6..b8327bba 100644 --- a/lib/src/binding_http/http_client.dart +++ b/lib/src/binding_http/http_client.dart @@ -16,6 +16,7 @@ import "../../core.dart"; import "http_config.dart"; import "http_request_method.dart"; import "http_security_exception.dart"; +import "http_subscription.dart"; const _authorizationHeader = "Authorization"; @@ -306,13 +307,24 @@ final class HttpClient extends ProtocolClient @override Future subscribeResource( - Form form, { + AugmentedForm form, { required void Function(Content content) next, void Function(Exception error)? error, required void Function() complete, }) async { - // TODO(JKRhb): implement subscribeResource - throw UnimplementedError(); + if (form.subprotocol != "sse") { + throw const DartWotException( + "Only server-sent events are supported at the moment by dart_wot", + ); + } + + return HttpSseSubscription( + form, + complete, + next: next, + onError: error, + complete: complete, + ); } Future _sendDiscoveryRequest( diff --git a/lib/src/binding_http/http_client_factory.dart b/lib/src/binding_http/http_client_factory.dart index 779f4140..4aa86972 100644 --- a/lib/src/binding_http/http_client_factory.dart +++ b/lib/src/binding_http/http_client_factory.dart @@ -50,18 +50,7 @@ final class HttpClientFactory implements ProtocolClientFactory { @override bool supportsOperation(OperationType operationType, String? subprotocol) { - const unsupportedOperations = [ - OperationType.observeproperty, - OperationType.unobserveproperty, - OperationType.subscribeevent, - OperationType.unsubscribeevent, - ]; - - if (unsupportedOperations.contains(operationType)) { - return false; - } - - if (subprotocol != null) { + if (subprotocol != null && !["sse"].contains(subprotocol)) { return false; } diff --git a/lib/src/binding_http/http_subscription.dart b/lib/src/binding_http/http_subscription.dart new file mode 100644 index 00000000..2cf6196c --- /dev/null +++ b/lib/src/binding_http/http_subscription.dart @@ -0,0 +1,65 @@ +// Copyright 2024 Contributors to the Eclipse Foundation. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. +// +// SPDX-License-Identifier: BSD-3-Clause + +import "dart:convert"; + +import "package:sse_channel/sse_channel.dart"; + +import "../../core.dart"; + +/// A [ProtocolSubscription] for supporting server-sent events. +final class HttpSseSubscription extends ProtocolSubscription { + /// Constructor + HttpSseSubscription( + AugmentedForm form, + super._complete, { + required void Function(Content content) next, + void Function(Exception error)? onError, + void Function()? complete, + }) : _active = true, + _sseChannel = SseChannel.connect(form.resolvedHref) { + _sseChannel.stream.listen( + (data) { + if (data is! String) { + return; + } + next(Content( + form.contentType, Stream.fromIterable([utf8.encode(data)]))); + }, + onError: (error) { + if (error is! Exception) { + return; + } + + onError?.call(error); + }, + onDone: complete, + ); + } + + final SseChannel _sseChannel; + + bool _active; + + @override + bool get active => _active; + + @override + Future stop({ + int? formIndex, + Map? uriVariables, + Object? data, + }) async { + if (!_active) { + return; + } + _active = false; + + await _sseChannel.sink.close(); + await super + .stop(formIndex: formIndex, uriVariables: uriVariables, data: data); + } +}