Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dart grpc client #3144

Merged
merged 2 commits into from
Aug 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 161 additions & 0 deletions src/client/desktop_gui/lib/grpc_client.dart
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very clean and tidy implementation.

Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
import 'dart:io';

import 'package:async/async.dart';
import 'package:basics/int_basics.dart';
import 'package:built_collection/built_collection.dart';
import 'package:flutter_riverpod/flutter_riverpod.dart';
import 'package:grpc/grpc.dart';

import 'ffi.dart';
import 'generated/multipass.pbgrpc.dart';

typedef Status = InstanceStatus_Status;
typedef VmInfo = InfoReply_Info;

final grpcClientProvider = Provider(
(_) {
final address = getServerAddress();
final certPair = getCertPair();

var channelCredentials = CustomChannelCredentials(
authority: 'localhost',
certificate: certPair.cert,
certificateKey: certPair.key,
);

return GrpcClient(RpcClient(ClientChannel(
address.scheme == InternetAddressType.unix.name.toLowerCase()
? InternetAddress(address.path, type: InternetAddressType.unix)
: address.host,
port: address.port,
options: ChannelOptions(credentials: channelCredentials),
)));
},
);

final vmInfosStreamProvider = StreamProvider(
(ref) => ref.watch(grpcClientProvider).infoStream(),
);

final vmInfosProvider = Provider(
(ref) => ref.watch(vmInfosStreamProvider).valueOrNull ?? const [],
);

Map<String, Status> infosToStatusMap(Iterable<VmInfo> infos) {
return {for (final info in infos) info.name: info.instanceStatus.status};
}

final vmStatusesProvider = Provider(
(ref) => infosToStatusMap(ref.watch(vmInfosProvider)).build(),
);

final vmNamesProvider = Provider(
(ref) => ref.watch(vmStatusesProvider).keys.toBuiltSet(),
);

class GrpcClient {
final RpcClient _client;

GrpcClient(this._client);

ResponseStream<LaunchReply> launch(LaunchRequest request) {
return _client.launch(Stream.value(request), options: CallOptions());
}

Future<StartReply?> start(Iterable<String> names) async {
final instanceNames = InstanceNames()..instanceName.addAll(names);
final request = StartRequest()..instanceNames = instanceNames;
return _client.start(Stream.value(request)).firstOrNull;
}

Future<StopReply?> stop(Iterable<String> names) async {
final instanceNames = InstanceNames()..instanceName.addAll(names);
final request = StopRequest()..instanceNames = instanceNames;
return _client.stop(Stream.value(request)).firstOrNull;
}

Future<SuspendReply?> suspend(Iterable<String> names) async {
final instanceNames = InstanceNames()..instanceName.addAll(names);
final request = SuspendRequest()..instanceNames = instanceNames;
return _client.suspend(Stream.value(request)).firstOrNull;
}

Future<RestartReply?> restart(Iterable<String> names) async {
final instanceNames = InstanceNames()..instanceName.addAll(names);
final request = RestartRequest()..instanceNames = instanceNames;
return _client.restart(Stream.value(request)).firstOrNull;
}

Future<DeleteReply?> delete(Iterable<String> names) async {
final instanceNames = InstanceNames()..instanceName.addAll(names);
final request = DeleteRequest()..instanceNames = instanceNames;
return _client.delet(Stream.value(request)).firstOrNull;
}

Future<RecoverReply?> recover(Iterable<String> names) async {
final instanceNames = InstanceNames()..instanceName.addAll(names);
final request = RecoverRequest()..instanceNames = instanceNames;
return _client.recover(Stream.value(request)).firstOrNull;
}

Future<DeleteReply?> purge(Iterable<String> names) async {
final instanceNames = InstanceNames()..instanceName.addAll(names);
final request = DeleteRequest()
..instanceNames = instanceNames
..purge = true;
return _client.delet(Stream.value(request)).firstOrNull;
}

Stream<List<VmInfo>> infoStream() async* {
// this is to de-duplicate errors received from the stream
Object? lastError;
await for (final _ in Stream.periodic(1.seconds)) {
try {
final reply = await _client.info(Stream.value(InfoRequest())).last;
yield reply.info;
lastError = null;
} catch (error, stackTrace) {
if (error != lastError) yield* Stream.error(error, stackTrace);
lastError = error;
}
}
}

Future<List<FindReply_ImageInfo>> find() {
final request = FindRequest()
..showImages = true
..showBlueprints = true;
return _client.find(Stream.value(request)).single.then((r) => r.imagesInfo);
}

Future<String> version() {
final request = VersionRequest();
return _client
.version(Stream.value(request))
.single
.then((reply) => reply.version);
}
}

class CustomChannelCredentials extends ChannelCredentials {
final List<int> certificateChain;
final List<int> certificateKey;

CustomChannelCredentials({
required List<int> certificate,
required this.certificateKey,
String? authority,
}) : certificateChain = certificate,
super.secure(
certificates: certificate,
authority: authority,
onBadCertificate: allowBadCertificates);

@override
SecurityContext get securityContext {
final ctx = super.securityContext!;
ctx.useCertificateChainBytes(certificateChain);
ctx.usePrivateKeyBytes(certificateKey);
return ctx;
}
}
36 changes: 24 additions & 12 deletions src/client/desktop_gui/lib/main.dart
Original file line number Diff line number Diff line change
@@ -1,25 +1,37 @@
import 'package:flutter/material.dart';
import 'package:flutter_riverpod/flutter_riverpod.dart';

import 'ffi.dart';
import 'grpc_client.dart';

void main() {
runApp(const MaterialApp(home: MyHomePage()));
runApp(const ProviderScope(child: MaterialApp(home: MyHomePage())));
}

class MyHomePage extends StatefulWidget {
class MyHomePage extends ConsumerWidget {
const MyHomePage({super.key});

@override
State<MyHomePage> createState() => _MyHomePageState();
}

class _MyHomePageState extends State<MyHomePage> {
int _counter = 0;
Widget _buildInfosColumn(Iterable<VmInfo> infos) {
return Column(
crossAxisAlignment: CrossAxisAlignment.start,
children: infos
.map((info) => Text('${info.name} ${info.instanceStatus.status}'))
.toList(),
);
}

@override
Widget build(BuildContext context) {
Widget build(BuildContext context, WidgetRef ref) {
final instances = ref.watch(vmInfosStreamProvider).when(
data: _buildInfosColumn,
error: (error, _) => Text('$error'),
loading: () => const Text('Loading...'),
);
return Scaffold(
body: Center(child: Text('$_counter')),
floatingActionButton: FloatingActionButton(
onPressed: () => setState(() => _counter++),
body: Column(
mainAxisAlignment: MainAxisAlignment.spaceBetween,
crossAxisAlignment: CrossAxisAlignment.start,
children: [instances, Text(multipassVersion)],
),
);
}
Expand Down
Loading