From e58efcb049b812152d761b361fbf56e2721c2cb0 Mon Sep 17 00:00:00 2001 From: Dan Chevalier Date: Tue, 19 Dec 2023 15:36:22 +0000 Subject: [PATCH] Connecting DTD and DTD_impl together. This change adds the happy path for DTD_impl and DTD being able to: - register streams - register serviceMethods - postEvents to streams - call serviceMethods Change-Id: I73865071745ef19a4493f86714e0855930243dd5 Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/341700 Reviewed-by: Ben Konyi Reviewed-by: Kenzie Davisson Commit-Queue: Dan Chevalier --- pkg/dds/lib/src/client.dart | 13 +- pkg/dtd/analysis_options.yaml | 1 + pkg/dtd/example/dtd_example.dart | 3 + pkg/dtd/example/dtd_service_example.dart | 68 ++++++++ pkg/dtd/example/dtd_stream_example.dart | 28 +++ pkg/dtd/lib/src/dart_tooling_daemon.dart | 6 +- pkg/dtd/lib/src/dtd_connection.dart | 188 +++++++++++++++++---- pkg/dtd/pubspec.yaml | 1 + pkg/dtd_impl/analysis_options.yaml | 1 + pkg/dtd_impl/bin/constants.dart | 19 +++ pkg/dtd_impl/bin/dart_tooling_daemon.dart | 197 ++++++++++++++++++++++ pkg/dtd_impl/bin/dtd_client.dart | 183 ++++++++++++++++++++ pkg/dtd_impl/bin/dtd_client_manager.dart | 18 ++ pkg/dtd_impl/bin/dtd_server.dart | 34 +--- pkg/dtd_impl/bin/dtd_stream_manager.dart | 50 ++++++ pkg/dtd_impl/bin/rpc_error_codes.dart | 63 +++++++ pkg/dtd_impl/pubspec.yaml | 16 +- 17 files changed, 821 insertions(+), 68 deletions(-) create mode 100644 pkg/dtd/example/dtd_service_example.dart create mode 100644 pkg/dtd/example/dtd_stream_example.dart create mode 100644 pkg/dtd_impl/bin/constants.dart create mode 100644 pkg/dtd_impl/bin/dart_tooling_daemon.dart create mode 100644 pkg/dtd_impl/bin/dtd_client.dart create mode 100644 pkg/dtd_impl/bin/dtd_client_manager.dart create mode 100644 pkg/dtd_impl/bin/dtd_stream_manager.dart create mode 100644 pkg/dtd_impl/bin/rpc_error_codes.dart diff --git a/pkg/dds/lib/src/client.dart b/pkg/dds/lib/src/client.dart index 9cda5d06a873..a8d7b3baf247 100644 --- a/pkg/dds/lib/src/client.dart +++ b/pkg/dds/lib/src/client.dart @@ -92,8 +92,7 @@ class DartDevelopmentServiceClient { return await _clientPeer.sendRequest(method, parameters); } - /// Registers handlers for JSON RPC methods which need to be intercepted by - /// DDS as well as fallback request forwarder. + /// Registers handlers for JSON RPC method endpoints. void _registerJsonRpcMethods() { _clientPeer.registerMethod('streamListen', (parameters) async { final streamId = parameters['streamId'].asString; @@ -107,12 +106,22 @@ class DartDevelopmentServiceClient { return RPCResponses.success; }); + /// jrpc endpoint for cancelling a stream. + /// + /// Parameters: + /// 'streamId': the stream to be cancelled. _clientPeer.registerMethod('streamCancel', (parameters) async { final streamId = parameters['streamId'].asString; await dds.streamManager.streamCancel(this, streamId); return RPCResponses.success; }); + /// jrpc endpoint for posting an event to a stream. + /// + /// Parameters: + /// 'eventKind': the kind of event being sent. + /// 'data': the data being sent over the stream. + /// 'stream: the stream that is being posted to. _clientPeer.registerMethod('postEvent', (parameters) async { final eventKind = parameters['eventKind'].asString; final eventData = parameters['eventData'].asMap; diff --git a/pkg/dtd/analysis_options.yaml b/pkg/dtd/analysis_options.yaml index 7e0b773bfef0..3bdaf584e752 100644 --- a/pkg/dtd/analysis_options.yaml +++ b/pkg/dtd/analysis_options.yaml @@ -4,3 +4,4 @@ linter: rules: - avoid_void_async - unawaited_futures + - require_trailing_commas diff --git a/pkg/dtd/example/dtd_example.dart b/pkg/dtd/example/dtd_example.dart index 212447f171d7..6330e1b8b842 100644 --- a/pkg/dtd/example/dtd_example.dart +++ b/pkg/dtd/example/dtd_example.dart @@ -5,6 +5,9 @@ import 'package:dtd/dtd.dart'; void main() { + // TODO(@danchevalier) make simple testing services for ide's to know that + // things are working. + // TODO(@danchevalier): make this example meaningful DartToolingDaemon.connect(Uri.parse('wss://127.0.0.1:12345')); } diff --git a/pkg/dtd/example/dtd_service_example.dart b/pkg/dtd/example/dtd_service_example.dart new file mode 100644 index 000000000000..e75b26c1b179 --- /dev/null +++ b/pkg/dtd/example/dtd_service_example.dart @@ -0,0 +1,68 @@ +// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:dtd/dtd.dart'; + +//Extension side +class Bar extends DTDResponse { + late String baz; + late int bazCount; + late String bazDescription; + + // ignore: use_super_parameters + Bar.fromDTDResponse(DTDResponse response) : super.fromDTDResponse(response) { + baz = result['baz'] as String; + bazCount = result['bazCount'] as int; + bazDescription = result['bazDescription'] as String; + } + + @override + String toString() { + return 'Bar(baz:$baz, bazCount:$bazCount, bazDescription:$bazDescription)'; + } +} + +extension FooServiceExtension on DTDConnection { + Future barExtension() async { + final result = await call( + 'Foo', + 'bar', + params: { + 'baz': 'the baz', + 'bazCount': 1, + 'bazDescription': 'there is one baz', + }, + ); + return Bar.fromDTDResponse(result); + } +} + +void main(List args) async { + final url = args[0]; // pass the url as a param to the example + final fooService = await DartToolingDaemon.connect(Uri.parse('ws://$url')); + final client = await DartToolingDaemon.connect(Uri.parse('ws://$url')); + + await fooService.registerService( + 'Foo', + 'bar', + (params) async { + final baz = params['baz'].value; + final bazCount = params['bazCount'].value; + final bazDescription = params['bazDescription'].value; + final result = { + 'type': 'Bar', + 'baz': baz, + 'bazCount': bazCount, + 'bazDescription': bazDescription, + }; + return result; + }, + ); + final response = await client.barExtension(); + final bar = Bar.fromDTDResponse(response); + + print('Got a bar response: $bar'); +} diff --git a/pkg/dtd/example/dtd_stream_example.dart b/pkg/dtd/example/dtd_stream_example.dart new file mode 100644 index 000000000000..1a6f61a73ffd --- /dev/null +++ b/pkg/dtd/example/dtd_stream_example.dart @@ -0,0 +1,28 @@ +import 'package:dtd/dtd.dart'; + +void main(List args) async { + final url = args[0]; // pass the url as a param to the example + final clientA = await DartToolingDaemon.connect(Uri.parse('ws://$url')); + final clientB = await DartToolingDaemon.connect(Uri.parse('ws://$url')); + + clientA.onEvent('Foo').listen((event) { + print('A Received $event from Foo Stream'); + }); + clientB.onEvent('Foo').listen((event) { + print('B Received $event from Foo Stream'); + }); + + await clientA.streamListen('Foo'); + await clientB.streamListen('Foo'); + + clientA.postEvent('Foo', 'kind1', {'event': 1}); + + clientB.postEvent('Foo', 'kind2', {'event': 2}); + + // delayed so the Daemon connection is still up by the time the events come + // back. + await Future.delayed(const Duration(seconds: 10)); + + await clientA.close(); + await clientB.close(); +} diff --git a/pkg/dtd/lib/src/dart_tooling_daemon.dart b/pkg/dtd/lib/src/dart_tooling_daemon.dart index f78b2d727993..65c70aa1fb36 100644 --- a/pkg/dtd/lib/src/dart_tooling_daemon.dart +++ b/pkg/dtd/lib/src/dart_tooling_daemon.dart @@ -3,13 +3,15 @@ // BSD-style license that can be found in the LICENSE file. import 'dart:async'; - import 'package:web_socket_channel/web_socket_channel.dart'; import 'dtd_connection.dart'; +// TODO(@danchevalier) make sure that there is html documentation just like the +// analysis server. + abstract class DartToolingDaemon { - // TODO(@danchevalier) + // TODO(@danchevalier) Dart Docs static Future connect(Uri uri) async { final channel = WebSocketChannel.connect(uri); return DTDConnection(channel); diff --git a/pkg/dtd/lib/src/dtd_connection.dart b/pkg/dtd/lib/src/dtd_connection.dart index 99d1a2e572f2..6f5b89d94a19 100644 --- a/pkg/dtd/lib/src/dtd_connection.dart +++ b/pkg/dtd/lib/src/dtd_connection.dart @@ -2,43 +2,94 @@ // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. +import 'dart:async'; +import 'dart:convert'; + +import 'package:json_rpc_2/json_rpc_2.dart'; import 'package:stream_channel/stream_channel.dart'; +typedef DTDServiceCallback = Future> Function( + Parameters params, +); + +// TODO(danchevalier): add a serviceMethodIsAvailable experience. it will listen +// to a stream that announces servicemethods getting registered and +// unregistered. The state can then be presented as a listenable so that clients +// can gate their behaviour on a serviceMethod going up/down. + +// TODO(danchevalier) dart docs class DTDConnection { - DTDConnection(this._connectionChannel) { - // TODO(@danchevalier); + late final Peer _clientPeer; + late final Future _done; + final _subscribedStreamControllers = >{}; + + DTDConnection(this._connectionChannel) + : _clientPeer = Peer(_connectionChannel.cast()) { + _clientPeer.registerMethod('streamNotify', (Parameters params) { + final streamId = params['streamId'].value as String; + final event = params['event']; + final eventKind = event['eventKind'].value as String; + final eventData = event['eventData'].value as Map; + final timestamp = event['timestamp'].value as int; + + _subscribedStreamControllers[streamId]?.add( + DTDEvent(streamId, eventKind, eventData, timestamp), + ); + }); + + _done = _clientPeer.listen(); } /// Terminates the connection with the Dart Tooling Daemon. - Future close() async { - // TODO(@danchevalier) - return; - } + Future close() => _clientPeer.close(); - // TODO(@danchevalier) /// A `Future` that completes when the connection with the Dart Tooling Daemon /// is terminated. - Future get done async => Future.value(); + Future get done => _done; /// Returns the current list of services available. Future> getRegisteredServices() async { - // TODO(@danchevalier) - return Future.value([]); + return await _clientPeer.sendRequest( + 'getRegisteredServices', + ) as List; } /// Returns the current list of streams with active subscribers. Future> getRegisteredStreams() async { - // TODO(@danchevalier) - return Future.value([]); + return await _clientPeer.sendRequest( + 'getRegisteredStreams', + ) as List; + } + + Future registerService( + String service, + String method, + DTDServiceCallback callback, + ) async { + final combinedName = '$service.$method'; + await _clientPeer.sendRequest('registerService', { + 'service': service, + 'method': method, + }); + + _clientPeer.registerMethod( + combinedName, + callback, + ); } /// Subscribes this client to events posted on [streamId]. /// /// If this client is already subscribed to [streamId], an exception will be /// thrown. - Future streamListen(String streamId) async { + Future streamListen(String streamId) { // TODO(@danchevalier) - return; + return _clientPeer.sendRequest( + 'streamListen', + { + 'streamId': streamId, + }, + ); } /// Cancel the subscription to [streamId]. @@ -48,9 +99,14 @@ class DTDConnection { /// /// If this client was not subscribed to [streamId], an exception will be /// thrown. - Future streamCancel(Stream streamId) async { + Future streamCancel(Stream streamId) { // TODO(@danchevalier) - return; + return _clientPeer.sendRequest( + 'streamCancel', + { + 'streamId': streamId, + }, + ); } /// Creates a `Stream` for events received on [streamId]. @@ -59,15 +115,30 @@ class DTDConnection { /// events aren't dropped. [streamListen(streamId)] must be called before any /// events will appear on the returned stream. Stream onEvent(String streamId) { - // TODO(@danchevalier) - return const Stream.empty(); + return _subscribedStreamControllers + .putIfAbsent( + streamId, + StreamController.new, + ) + .stream; } /// Posts an [DTDEvent] with [eventData] to [streamId]. /// /// If no clients are subscribed to [streamId], the event will be dropped. - void postEvent(String streamId, Map eventData) { - // TODO(@danchevalier) + void postEvent( + String streamId, + String eventKind, + Map eventData, + ) { + _clientPeer.sendRequest( + 'postEvent', + { + 'streamId': streamId, + 'eventKind': eventKind, + 'eventData': eventData, + }, + ); } /// Invokes a service with the name `serviceName.methodName`. @@ -80,27 +151,86 @@ class DTDConnection { /// /// If the parameters included in [params] are invalid, an exception will be /// thrown. - Future call( + Future call( String serviceName, String methodName, { Map? params, }) async { - // TODO(@danchevalier) - // ignore: null_argument_to_non_null_type - return Future.value(); + final json = await _clientPeer.sendRequest( + '$serviceName.$methodName', + params ?? {}, + ) as Map; + + final type = json['type'] as String?; + if (type == null) { + throw DTDConnectionException.callResponseMissingType(json); + } + + //TODO(danchevalier): Find out how to get access to the id. + return DTDResponse('-1', type, json); } // ignore: unused_field final StreamChannel _connectionChannel; } -abstract class DTDResponse { - String get id; +class DTDResponse { + DTDResponse(this._id, this._type, this._result); + + DTDResponse.fromDTDResponse(DTDResponse other) + : this( + other.id, + other.type, + other.result, + ); + final String _id; + final String _type; + final Map _result; + + String get id => _id; - String get type; + String get type => _type; - Map get json; + Map get result => _result; } // TODO(@danchevalier): is this how event should be done? -abstract class DTDEvent {} +class DTDEvent { + DTDEvent(this.stream, this.kind, this.data, this.timestamp); + String stream; + int timestamp; + String kind; + Map data; + + @override + String toString() { + return jsonEncode({ + 'stream': stream, + 'timestamp': timestamp, + 'kind': kind, + 'data': data, + }); + } +} + +class DTDConnectionException implements Exception { + static const int callParamsMissingTypeError = 1; + + /// The response to a call method is missing the top level type parameter. + factory DTDConnectionException.callResponseMissingType( + Map json, + ) { + return DTDConnectionException._( + callParamsMissingTypeError, + 'call received an invalid response, ' + "it is missing the 'type' param. Got: $json", + ); + } + DTDConnectionException._(this.errorCode, this.message); + + @override + String toString() => 'DTDConnectionException: $message'; + + final int errorCode; + final String message; +} diff --git a/pkg/dtd/pubspec.yaml b/pkg/dtd/pubspec.yaml index 447a6a243f7e..deb9d6ec3f5e 100644 --- a/pkg/dtd/pubspec.yaml +++ b/pkg/dtd/pubspec.yaml @@ -7,6 +7,7 @@ environment: sdk: ">=3.0.0 <4.0.0" dependencies: + json_rpc_2: ^3.0.2 stream_channel: ^2.1.2 web_socket_channel: ^2.4.0 diff --git a/pkg/dtd_impl/analysis_options.yaml b/pkg/dtd_impl/analysis_options.yaml index d63aed71e411..46c327593039 100644 --- a/pkg/dtd_impl/analysis_options.yaml +++ b/pkg/dtd_impl/analysis_options.yaml @@ -4,3 +4,4 @@ linter: rules: - avoid_void_async - unawaited_futures + - require_trailing_commas diff --git a/pkg/dtd_impl/bin/constants.dart b/pkg/dtd_impl/bin/constants.dart new file mode 100644 index 000000000000..381d9ff0d412 --- /dev/null +++ b/pkg/dtd_impl/bin/constants.dart @@ -0,0 +1,19 @@ +// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +abstract class RPCResponses { + static const success = { + 'type': 'Success', + }; + + static const collectedSentinel = { + 'type': 'Sentinel', + 'kind': 'Collected', + 'valueAsString': '', + }; +} + +// Give connections time to reestablish before considering them closed. +// Required to reestablish connections killed by UberProxy. +const sseKeepAlive = Duration(seconds: 30); diff --git a/pkg/dtd_impl/bin/dart_tooling_daemon.dart b/pkg/dtd_impl/bin/dart_tooling_daemon.dart new file mode 100644 index 000000000000..77b155c31494 --- /dev/null +++ b/pkg/dtd_impl/bin/dart_tooling_daemon.dart @@ -0,0 +1,197 @@ +// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; +import 'dart:io'; + +import 'package:dart_service_protocol_shared/dart_service_protocol_shared.dart'; +import 'package:shelf/shelf.dart'; +import 'package:sse/server/sse_handler.dart'; +import 'package:web_socket_channel/web_socket_channel.dart'; +import 'package:shelf_web_socket/shelf_web_socket.dart'; +import 'package:shelf/shelf_io.dart' as io; + +import 'constants.dart'; +import 'dtd_client.dart'; +import 'dtd_client_manager.dart'; +import 'dtd_stream_manager.dart'; + +/// A service that facilitates communication between dart tools. +class DartToolingDaemon { + DartToolingDaemon._({ + bool ipv6 = false, + bool shouldLogRequests = false, + }) : _ipv6 = ipv6, + _shouldLogRequests = shouldLogRequests { + streamManager = DTDStreamManager(this); + clientManager = DTDClientManager(); + } + static const _kSseHandlerPath = '\$debugHandler'; + + /// Manages the streams for the current [DartToolingDaemon] service. + late final DTDStreamManager streamManager; + + /// Manages the connected clients of the current [DartToolingDaemon] service. + late final ClientManager clientManager; + + final bool _ipv6; + late HttpServer _server; + final bool _shouldLogRequests; + + /// The uri of the current [DartToolingDaemon] service. + Uri? get uri => _uri; + Uri? _uri; + + Future _startService() async { + final host = + (_ipv6 ? InternetAddress.loopbackIPv6 : InternetAddress.loopbackIPv4) + .host; + var port = 0; + + // Start the DTD server. Run in an error Zone to ensure that asynchronous + // exceptions encountered during request handling are handled, as exceptions + // thrown during request handling shouldn't take down the entire service. + late String errorMessage; + final tmpServer = await runZonedGuarded( + () async { + Future startServer() async { + try { + return await io.serve(_handlers().handler, host, port); + } on SocketException catch (e) { + errorMessage = e.message; + if (e.osError != null) { + errorMessage += ' (${e.osError!.message})'; + } + errorMessage += ': ${e.address?.host}:${e.port}'; + return null; + } + } + + return await startServer(); + }, + (error, stack) { + if (_shouldLogRequests) { + print('Asynchronous error: $error\n$stack'); + } + }, + ); + if (tmpServer == null) { + throw DartToolingDaemonException.connectionIssue(errorMessage); + } + _server = tmpServer; + + _uri = Uri( + scheme: 'http', + host: host, + port: _server.port, + path: '/', + ); + } + + /// Starts a [DartToolingDaemon] service. + /// + /// Set [ipv6] to true to have the service use ipv6 instead of ipv4. + /// + /// Set [shouldLogRequests] to true to enable logging. + static Future startService({ + bool ipv6 = false, + bool shouldLogRequests = false, + }) async { + final dtd = DartToolingDaemon._( + ipv6: ipv6, + shouldLogRequests: shouldLogRequests, + ); + await dtd._startService(); + return dtd; + } + + // Attempt to upgrade HTTP requests to a websocket before processing them as + // standard HTTP requests. The websocket handler will fail quickly if the + // request doesn't appear to be a websocket upgrade request. + Cascade _handlers() { + return Cascade().add(_webSocketHandler()).add(_sseHandler()); + } + + Handler _webSocketHandler() => webSocketHandler((WebSocketChannel ws) { + final client = DTDClient.fromWebSocket( + this, + ws, + ); + clientManager.addClient(client); + }); + + Handler _sseHandler() { + final handler = SseHandler( + Uri.parse('/$_kSseHandlerPath'), + keepAlive: sseKeepAlive, + ); + + handler.connections.rest.listen((sseConnection) { + final client = DTDClient.fromSSEConnection( + this, + sseConnection, + ); + clientManager.addClient(client); + }); + + return handler.handler; + } +} + +// TODO(danchevalier): clean up these exceptions so they are more relevant to +// DTD. Also add docs to the factories that remain. +class DartToolingDaemonException implements Exception { + // TODO(danchevalier): add a relevant dart doc here + static const int existingDtdInstanceError = 1; + + /// Set when the connection to the remote VM service terminates unexpectedly + /// during Dart Development Service startup. + static const int failedToStartError = 2; + + /// Set when a connection error has occurred after startup. + static const int connectionError = 3; + + factory DartToolingDaemonException.existingDtdInstance( + String message, { + Uri? dtdUri, + }) { + return ExistingDTDImplException._(message, dtdUri: dtdUri); + } + + factory DartToolingDaemonException.failedToStart() { + return DartToolingDaemonException._( + failedToStartError, + 'Failed to start Dart Development Service', + ); + } + + factory DartToolingDaemonException.connectionIssue(String message) { + return DartToolingDaemonException._(connectionError, message); + } + + DartToolingDaemonException._(this.errorCode, this.message); + + @override + String toString() => 'DartDevelopmentServiceException: $message'; + + final int errorCode; + final String message; +} + +class ExistingDTDImplException extends DartToolingDaemonException { + ExistingDTDImplException._( + String message, { + this.dtdUri, + }) : super._( + DartToolingDaemonException.existingDtdInstanceError, + message, + ); + + /// The URI of the existing DTD instance, if available. + /// + /// This URL is the base HTTP URI such as `http://127.0.0.1:1234/AbcDefg=/`, + /// not the WebSocket URI (which can be obtained by mapping the scheme to + /// `ws` (or `wss`) and appending `ws` to the path segments). + final Uri? dtdUri; +} diff --git a/pkg/dtd_impl/bin/dtd_client.dart b/pkg/dtd_impl/bin/dtd_client.dart new file mode 100644 index 000000000000..7e036df8df1a --- /dev/null +++ b/pkg/dtd_impl/bin/dtd_client.dart @@ -0,0 +1,183 @@ +// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:dart_service_protocol_shared/dart_service_protocol_shared.dart'; +import 'package:sse/server/sse_handler.dart'; +import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc; +import 'package:stream_channel/stream_channel.dart'; +import 'package:web_socket_channel/web_socket_channel.dart'; + +import 'constants.dart'; +import 'dart_tooling_daemon.dart'; +import 'rpc_error_codes.dart'; + +/// Represents a client that is connected to a DTD service. +class DTDClient extends Client { + final StreamChannel connection; + late json_rpc.Peer _clientPeer; + final DartToolingDaemon dtd; + late final Future _done; + + Future get done => _done; + + DTDClient.fromWebSocket( + DartToolingDaemon dtd, + WebSocketChannel ws, + ) : this._( + dtd, + ws, + ); + + DTDClient.fromSSEConnection( + DartToolingDaemon dtd, + SseConnection sse, + ) : this._( + dtd, + sse, + ); + + DTDClient._( + this.dtd, + this.connection, + ) { + _clientPeer = json_rpc.Peer( + connection.cast(), + strictProtocolChecks: false, + ); + _registerJsonRpcMethods(); + _done = listen(); + } + + @override + Future close() => _clientPeer.close(); + + @override + Future sendRequest({ + required String method, + dynamic parameters, + }) async { + if (_clientPeer.isClosed) { + return; + } + + return await _clientPeer.sendRequest(method, parameters.asMap); + } + + @override + void streamNotify(String streamId, Object eventData) { + _clientPeer.sendNotification('streamNotify', eventData); + } + + /// Start receiving JSON RPC requests from the client. + /// + /// Returned future completes when the peer is closed. + Future listen() => _clientPeer.listen().then( + (_) => dtd.streamManager.onClientDisconnect(this), + ); + + /// Registers handlers for the Dart Tooling Daemon JSON RPC method endpoints. + void _registerJsonRpcMethods() { + // TODO(danchevalier): do a once over of all methods and ensure that we have + // all necessary validations. + _clientPeer.registerMethod('streamListen', _streamListen); + _clientPeer.registerMethod('streamCancel', _streamCancel); + _clientPeer.registerMethod('postEvent', _postEvent); + _clientPeer.registerMethod('registerService', _registerService); + _clientPeer.registerMethod('getRegisteredStreams', _getRegisteredStreams); + + // Handle service extension invocations. + _clientPeer.registerFallback(_fallback); + } + + /// jrpc endpoint for cancelling a stream. + /// + /// Parameters: + /// 'streamId': the stream to be cancelled. + _streamListen(parameters) async { + final streamId = parameters['streamId'].asString; + await dtd.streamManager.streamListen( + this, + streamId, + ); + return RPCResponses.success; + } + + /// jrpc endpoint for stopping listening to a stream. + /// + /// Parameters: + /// 'streamId': the stream that the client would like to stop listening to. + _streamCancel(parameters) async { + final streamId = parameters['streamId'].asString; + await dtd.streamManager.streamCancel(this, streamId); + return RPCResponses.success; + } + + /// jrpc endpoint for posting an event to a stream. + /// + /// Parameters: + /// 'eventKind': the kind of event being sent. + /// 'data': the data being sent over the stream. + /// 'stream: the stream that is being posted to. + _postEvent(parameters) async { + final eventKind = parameters['eventKind'].asString; + final eventData = parameters['eventData'].asMap; + final stream = parameters['streamId'].asString; + dtd.streamManager.postEventHelper(stream, eventKind, eventData); + return RPCResponses.success; + } + + /// jrpc endpoint for registering a service to the tooling daemon. + /// + /// Parameters: + /// 'service': the name of the service that is being registered to. + /// 'method': the name of the method that is being registered on the service. + _registerService(parameters) { + final serviceName = parameters['service'].asString; + final method = parameters['method'].asString; + final combinedName = '$serviceName.$method'; + + // TODO(danchevalier): enforce only one client can register methods to a + // service. + if (services.containsKey(combinedName)) { + throw RpcErrorCodes.buildRpcException( + RpcErrorCodes.kServiceAlreadyRegistered, + ); + } + services[combinedName] = method; + return RPCResponses.success; + } + + _getRegisteredStreams(parameters) { + // TODO(danchevalier) implement this. + return []; + } + + /// jrpc fallback handler. + /// + /// Handles all service method calls that will be forwarded to the respective + /// client which registered that service method. + _fallback(parameters) async { + // Lookup the client associated with the service extension's namespace. + // If the client exists and that client has registered the specified + // method, forward the request to that client. + final serviceMethod = parameters.method; + + final client = dtd.clientManager.findFirstClientThatHandlesService( + serviceMethod, + ); + if (client == null) { + throw json_rpc.RpcException( + RpcErrorCodes.kMethodNotFound, + 'Unknown service method: $serviceMethod', + ); + } + + return await client.sendRequest( + method: serviceMethod, + parameters: parameters, + ); + } +} diff --git a/pkg/dtd_impl/bin/dtd_client_manager.dart b/pkg/dtd_impl/bin/dtd_client_manager.dart new file mode 100644 index 000000000000..6e135050be18 --- /dev/null +++ b/pkg/dtd_impl/bin/dtd_client_manager.dart @@ -0,0 +1,18 @@ +// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'package:dart_service_protocol_shared/dart_service_protocol_shared.dart'; + +import 'dtd_client.dart'; + +/// Used for keeping track and managing clients that are connected to a given +/// service. +class DTDClientManager extends ClientManager { + @override + void addClient(Client client) { + client as DTDClient; + super.addClient(client); + client.done.then((_) => removeClient(client)); + } +} diff --git a/pkg/dtd_impl/bin/dtd_server.dart b/pkg/dtd_impl/bin/dtd_server.dart index ea89a5cc44d6..5adce39a03ff 100644 --- a/pkg/dtd_impl/bin/dtd_server.dart +++ b/pkg/dtd_impl/bin/dtd_server.dart @@ -2,38 +2,14 @@ // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. -import 'dart:io'; - -import 'package:shelf/shelf.dart'; -import 'package:shelf/shelf_io.dart'; -import 'package:shelf_router/shelf_router.dart'; - -// Configure routes. -final _router = Router() - ..get('/', _rootHandler) - ..get('/echo/', _echoHandler); - -Response _rootHandler(Request req) { - return Response.ok('Hello, World!\n'); -} - -Response _echoHandler(Request request) { - final message = request.params['message']; - return Response.ok('$message\n'); -} +import 'dart_tooling_daemon.dart'; void main(List args) async { - // Use any available host or container IP (usually `0.0.0.0`). - final ip = InternetAddress.anyIPv4; - - // Configure a pipeline that logs requests. - final handler = - Pipeline().addMiddleware(logRequests()).addHandler(_router.call); + final dartToolingDaemon = await DartToolingDaemon.startService( + shouldLogRequests: true, + ); // TODO(@danchevalier): turn off logging - // For running in containers, we respect the PORT environment variable. - final port = int.parse(Platform.environment['PORT'] ?? '8080'); - final server = await serve(handler, ip, port); print( - 'The Dart Tooling Daemon is listening on ${server.address.host}:${server.port}', + 'The Dart Tooling Daemon is listening on ${dartToolingDaemon.uri?.host}:${dartToolingDaemon.uri?.port}', ); } diff --git a/pkg/dtd_impl/bin/dtd_stream_manager.dart b/pkg/dtd_impl/bin/dtd_stream_manager.dart new file mode 100644 index 000000000000..f7b04ce67223 --- /dev/null +++ b/pkg/dtd_impl/bin/dtd_stream_manager.dart @@ -0,0 +1,50 @@ +// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'package:dart_service_protocol_shared/dart_service_protocol_shared.dart'; + +import 'dart_tooling_daemon.dart'; + +/// Manages state related to stream subscriptions made by [DTDClient]s. +class DTDStreamManager extends StreamManager { + DTDStreamManager(this.dtd); + + final DartToolingDaemon dtd; + + /// Send an event to the [stream]. + /// + /// [stream] must be a registered custom stream (i.e., not a stream specified + /// as part of the VM service protocol). + /// + /// If [stream] is not a registered custom stream, an [RPCError] with code + /// [kCustomStreamDoesNotExist] will be thrown. + /// + /// If [stream] is a core stream, an [RPCError] with code + /// [kCoreStreamNotAllowed] will be thrown. + void postEventHelper( + String stream, + String eventKind, + Map eventData, + ) { + super.postEvent( + stream, + { + 'streamId': stream, + 'event': { + 'timestamp': DateTime.now().millisecondsSinceEpoch, + 'eventData': eventData, + 'eventKind': eventKind, + }, + }, + ); + } + + /// Send `streamNotify` notifications to clients subscribed to `streamId`. + void streamNotify( + String streamId, + Map data, + ) { + super.postEvent(streamId, data); + } +} diff --git a/pkg/dtd_impl/bin/rpc_error_codes.dart b/pkg/dtd_impl/bin/rpc_error_codes.dart new file mode 100644 index 000000000000..c0dadc721c3d --- /dev/null +++ b/pkg/dtd_impl/bin/rpc_error_codes.dart @@ -0,0 +1,63 @@ +// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc; + +// TODO(danchevalier): get this from DDS instead. +abstract class RpcErrorCodes { + static json_rpc.RpcException buildRpcException(int code, {dynamic data}) { + return json_rpc.RpcException( + code, + errorMessages[code]!, + data: data, + ); + } + + // These error codes must be kept in sync with those in vm/json_stream.h and + // vmservice.dart. + // static const kParseError = -32700; + // static const kInvalidRequest = -32600; + static const kMethodNotFound = -32601; + + static const kInvalidParams = -32602; + // static const kInternalError = -32603; + + // static const kExtensionError = -32000; + + static const kFeatureDisabled = 100; + + // static const kCannotAddBreakpoint = 102; + static const kStreamAlreadySubscribed = 103; + static const kStreamNotSubscribed = 104; + + // static const kIsolateMustBeRunnable = 105; + static const kIsolateMustBePaused = 106; + // static const kCannotResume = 107; + // static const kIsolateIsReloading = 108; + // static const kIsolateReloadBarred = 109; + // static const kIsolateMustHaveReloaded = 110; + static const kServiceAlreadyRegistered = 111; + static const kServiceDisappeared = 112; + static const kExpressionCompilationError = 113; + + // static const kInvalidTimelineRequest = 114; + static const kCustomStreamDoesNotExist = 130; + static const kCoreStreamNotAllowed = 131; + + // Experimental (used in private rpcs). + // static const kFileSystemAlreadyExists = 1001; + // static const kFileSystemDoesNotExist = 1002; + // static const kFileDoesNotExist = 1003; + + static const errorMessages = { + kFeatureDisabled: 'Feature is disabled', + kStreamAlreadySubscribed: 'Stream already subscribed', + kStreamNotSubscribed: 'Stream not subscribed', + kServiceAlreadyRegistered: 'Service already registered', + kServiceDisappeared: 'Service has disappeared', + kExpressionCompilationError: 'Expression compilation error', + kCustomStreamDoesNotExist: 'Custom stream does not exist', + kCoreStreamNotAllowed: 'Core streams are not allowed', + }; +} diff --git a/pkg/dtd_impl/pubspec.yaml b/pkg/dtd_impl/pubspec.yaml index 8e3f3ab4564d..9a9cc9d1610c 100644 --- a/pkg/dtd_impl/pubspec.yaml +++ b/pkg/dtd_impl/pubspec.yaml @@ -4,17 +4,21 @@ repository: https://github.com/dart-lang/sdk/tree/main/pkg/dtd_impl publish_to: none environment: - sdk: ^3.3.0-152.0.dev + sdk: ">=3.0.0 <4.0.0" # Use 'any' constraints here; we get our versions from the DEPS file. dependencies: + dart_service_protocol_shared: any + json_rpc_2: any shelf: any - shelf_router: any + shelf_web_socket: any + sse: any + stream_channel: any + web_socket_channel: any + # We use 'any' version constraints here as we get our package versions from # the dart-lang/sdk repo's DEPS file. Note that this is a special case; the # best practice for packages is to specify their compatible version ranges. # See also https://dart.dev/tools/pub/dependencies. -# dev_dependencies: -# http: any -# lints: any -# test: any +dev_dependencies: + lints: any