diff --git a/bin/cbj_hub.dart b/bin/cbj_hub.dart index 49ecf7f8..bfea1822 100644 --- a/bin/cbj_hub.dart +++ b/bin/cbj_hub.dart @@ -3,15 +3,8 @@ import 'dart:io'; import 'package:cbj_hub/application/boot_up/boot_up.dart'; import 'package:cbj_hub/infrastructure/cbj_web_server/cbj_web_server_repository.dart'; import 'package:cbj_hub/infrastructure/mqtt_server/mqtt_server_repository.dart'; -import 'package:cbj_integrations_controller/infrastructure/bindings/binding_repository.dart'; -import 'package:cbj_integrations_controller/infrastructure/local_db/local_db_hive_repository.dart'; import 'package:cbj_integrations_controller/infrastructure/node_red/node_red_repository.dart'; -import 'package:cbj_integrations_controller/infrastructure/room/saved_rooms_repo.dart'; -import 'package:cbj_integrations_controller/infrastructure/routines/routine_repository.dart'; -import 'package:cbj_integrations_controller/infrastructure/saved_devices/saved_devices_repo.dart'; -import 'package:cbj_integrations_controller/infrastructure/scenes/scene_repository.dart'; import 'package:cbj_integrations_controller/infrastructure/shared_variables.dart'; -import 'package:cbj_integrations_controller/infrastructure/system_commands/system_commands_manager_d.dart'; import 'package:cbj_integrations_controller/initialize_integrations_controller.dart'; import 'package:cbj_integrations_controller/injection.dart'; import 'package:network_tools/network_tools.dart'; @@ -26,20 +19,13 @@ Future main(List arguments) async { env: Env.devPc, ); - await BootUp.setup(); + BootUp(); } /// All instances of Repos void setInstancesOfRepos(String projectRootDirectoryPath) { - SystemCommandsManager(); MqttServerRepository(); CbjWebServerRepository(); - SavedRoomsRepo(); - SavedDevicesRepo(); - RoutineCbjRepository(); - HiveRepository(); NodeRedRepository(); - BindingCbjRepository(); - SceneCbjRepository(); - SharedVariables(projectRootDirectoryPath); + SharedVariables().projectRootDirectoryPath = projectRootDirectoryPath; } diff --git a/lib/application/boot_up/boot_up.dart b/lib/application/boot_up/boot_up.dart index 643c2eff..20472a19 100644 --- a/lib/application/boot_up/boot_up.dart +++ b/lib/application/boot_up/boot_up.dart @@ -1,6 +1,4 @@ import 'package:cbj_hub/application/connector/connector.dart'; -import 'package:cbj_integrations_controller/domain/rooms/i_saved_rooms_repo.dart'; -import 'package:cbj_integrations_controller/domain/scene/i_scene_cbj_repository.dart'; import 'package:cbj_integrations_controller/initialize_integrations_controller.dart'; class BootUp { @@ -8,14 +6,8 @@ class BootUp { setup(); } - static Future setup() async { - // Return all saved rooms - await ISavedRoomsRepo.instance.getAllRooms(); - - await ISceneCbjRepository.instance.getAllScenesAsMap(); - + Future setup() async { await setupIntegrationsController(); - Connector.startConnector(); } } diff --git a/lib/application/connector/connector.dart b/lib/application/connector/connector.dart index f831db4a..73d510ec 100644 --- a/lib/application/connector/connector.dart +++ b/lib/application/connector/connector.dart @@ -7,13 +7,13 @@ import 'package:cbj_integrations_controller/domain/room/room_entity.dart'; import 'package:cbj_integrations_controller/domain/room/value_objects_room.dart'; import 'package:cbj_integrations_controller/domain/rooms/i_saved_rooms_repo.dart'; import 'package:cbj_integrations_controller/domain/saved_devices/i_saved_devices_repo.dart'; -import 'package:cbj_integrations_controller/infrastructure/devices/companies_connector_conjector.dart'; +import 'package:cbj_integrations_controller/infrastructure/devices/companies_connector_conjecture.dart'; import 'package:cbj_integrations_controller/infrastructure/gen/cbj_hub_server/protoc_as_dart/cbj_hub_server.pbgrpc.dart'; import 'package:cbj_integrations_controller/infrastructure/generic_devices/abstract_device/device_entity_abstract.dart'; import 'package:cbj_integrations_controller/infrastructure/generic_devices/abstract_device/device_entity_dto_abstract.dart'; +import 'package:cbj_integrations_controller/infrastructure/hub_client/hub_client.dart'; import 'package:cbj_integrations_controller/utils.dart'; import 'package:mqtt_client/mqtt_client.dart'; -import 'package:rxdart/rxdart.dart'; class Connector { static Future startConnector() async { @@ -35,7 +35,7 @@ class Connector { final ISavedDevicesRepo savedDevicesRepo = ISavedDevicesRepo.instance; final Map allDevices = - await savedDevicesRepo.getAllDevices(); + await savedDevicesRepo.getAllDevicesAfterInitialize(); for (final String deviceId in allDevices.keys) { ConnectorStreamToMqtt.toMqttController.add( @@ -53,7 +53,7 @@ class Connector { IMqttServerRepository.instance.sendToApp(); - CompaniesConnectorConjector.updateAllDevicesReposWithDeviceChanges( + CompaniesConnectorConjecture().updateAllDevicesReposWithDeviceChanges( ConnectorDevicesStreamFromMqtt.fromMqttStream, ); @@ -135,19 +135,3 @@ class Connector { } } } - -/// Connect all streams from the internet devices into one stream that will be -/// send to mqtt broker to update devices states -class ConnectorStreamToMqtt { - static StreamController> toMqttController = - StreamController(); - - static Stream> get toMqttStream => - toMqttController.stream.asBroadcastStream(); -} - -/// Connect all streams from the mqtt devices changes into one stream that will -/// be sent to whoever need to be notify of changes -class ConnectorDevicesStreamFromMqtt { - static BehaviorSubject fromMqttStream = BehaviorSubject(); -} diff --git a/lib/domain/app_communication/i_app_communication_repository.dart b/lib/domain/app_communication/i_app_communication_repository.dart deleted file mode 100644 index 48219709..00000000 --- a/lib/domain/app_communication/i_app_communication_repository.dart +++ /dev/null @@ -1,20 +0,0 @@ -import 'package:cbj_integrations_controller/infrastructure/gen/cbj_hub_server/protoc_as_dart/cbj_hub_server.pbgrpc.dart'; -import 'package:mqtt_client/mqtt_client.dart'; - -abstract class IAppCommunicationRepository { - static late IAppCommunicationRepository instance; - - Future getFromApp({ - required Stream request, - required String requestUrl, - required bool isRemotePipes, - }); - - void sendToApp(Stream dataToSend); - - Future startRemotePipesConnection(String remotePipesDomain); - - Future startRemotePipesWhenThereIsConnectionToWww( - String remotePipesDomain, - ); -} diff --git a/lib/domain/remote_pipes/i_remote_pipes_repository.dart b/lib/domain/remote_pipes/i_remote_pipes_repository.dart deleted file mode 100644 index 7da41561..00000000 --- a/lib/domain/remote_pipes/i_remote_pipes_repository.dart +++ /dev/null @@ -1,9 +0,0 @@ -import 'package:cbj_hub/domain/remote_pipes/remote_pipes_entity.dart'; -import 'package:cbj_hub/domain/remote_pipes/remote_pipes_failures.dart'; -import 'package:dartz/dartz.dart'; - -abstract class IRemotePipesRepository { - Future> setRemotePipesDomainName( - RemotePipesEntity remotePipesEntity, - ); -} diff --git a/lib/domain/remote_pipes/remote_pipes_entity.dart b/lib/domain/remote_pipes/remote_pipes_entity.dart deleted file mode 100644 index a7a167cf..00000000 --- a/lib/domain/remote_pipes/remote_pipes_entity.dart +++ /dev/null @@ -1,30 +0,0 @@ -import 'package:cbj_hub/domain/remote_pipes/remote_pipes_failures.dart'; -import 'package:cbj_hub/domain/remote_pipes/remote_pipes_value_objects.dart'; -import 'package:cbj_hub/infrastructure/remote_pipes/remote_pipes_dtos.dart'; -import 'package:dartz/dartz.dart'; -import 'package:freezed_annotation/freezed_annotation.dart'; - -part 'remote_pipes_entity.freezed.dart'; - -@freezed -abstract class RemotePipesEntity implements _$RemotePipesEntity { - const factory RemotePipesEntity({ - required RemotePipesDomain? domainName, - }) = _RemotePipesEntity; - - const RemotePipesEntity._(); - - factory RemotePipesEntity.empty() => RemotePipesEntity( - domainName: RemotePipesDomain(''), - ); - - Option> get failureOption { - return domainName!.value.fold((f) => some(f), (_) => none()); - } - - RemotePipesDtos toInfrastructure() { - return RemotePipesDtos( - domainName: domainName!.getOrCrash(), - ); - } -} diff --git a/lib/domain/remote_pipes/remote_pipes_errors.dart b/lib/domain/remote_pipes/remote_pipes_errors.dart deleted file mode 100644 index f19bd433..00000000 --- a/lib/domain/remote_pipes/remote_pipes_errors.dart +++ /dev/null @@ -1,16 +0,0 @@ -import 'package:cbj_hub/domain/remote_pipes/remote_pipes_failures.dart'; - -class RemotePipesUnexpectedValueError extends Error { - RemotePipesUnexpectedValueError(this.homeUserValueFailure); - - final RemotePipesFailures homeUserValueFailure; - - @override - String toString() { - const explanation = - 'Encountered a ValueFailure at an unrecoverable point. Terminating.'; - return Error.safeToString( - '$explanation Failure was: $homeUserValueFailure', - ); - } -} diff --git a/lib/domain/remote_pipes/remote_pipes_failures.dart b/lib/domain/remote_pipes/remote_pipes_failures.dart deleted file mode 100644 index 3f6e56a5..00000000 --- a/lib/domain/remote_pipes/remote_pipes_failures.dart +++ /dev/null @@ -1,17 +0,0 @@ -import 'package:freezed_annotation/freezed_annotation.dart'; - -part 'remote_pipes_failures.freezed.dart'; - -@freezed -class RemotePipesFailures { - const factory RemotePipesFailures.empty({ - required T failedValue, - }) = _Empty; - - const factory RemotePipesFailures.unexpected() = _Unexpected; - - const factory RemotePipesFailures.insufficientPermission() = - _InsufficientPermission; - - const factory RemotePipesFailures.unableToUpdate() = _UnableToUpdate; -} diff --git a/lib/domain/remote_pipes/remote_pipes_validators.dart b/lib/domain/remote_pipes/remote_pipes_validators.dart deleted file mode 100644 index bae73a45..00000000 --- a/lib/domain/remote_pipes/remote_pipes_validators.dart +++ /dev/null @@ -1,16 +0,0 @@ -import 'package:cbj_hub/domain/remote_pipes/remote_pipes_failures.dart'; -import 'package:dartz/dartz.dart'; - -Either, String> validateRemotePipesEmpty( - String input, -) { - if (input.isNotEmpty) { - return right(input); - } else { - return left( - RemotePipesFailures.empty( - failedValue: input, - ), - ); - } -} diff --git a/lib/domain/remote_pipes/remote_pipes_value_objects.dart b/lib/domain/remote_pipes/remote_pipes_value_objects.dart deleted file mode 100644 index fb236e63..00000000 --- a/lib/domain/remote_pipes/remote_pipes_value_objects.dart +++ /dev/null @@ -1,50 +0,0 @@ -import 'package:cbj_hub/domain/remote_pipes/remote_pipes_errors.dart'; -import 'package:cbj_hub/domain/remote_pipes/remote_pipes_failures.dart'; -import 'package:cbj_hub/domain/remote_pipes/remote_pipes_validators.dart'; -import 'package:dartz/dartz.dart'; -import 'package:freezed_annotation/freezed_annotation.dart'; - -@immutable -abstract class RemotePipesValueObjectAbstract { - const RemotePipesValueObjectAbstract(); - - Either, T> get value; - - /// Throws [UnexpectedValueError] containing the [ManageWiFiFailures] - T getOrCrash() { - // id = identity - same as writing (right) => right - return value.fold((f) => throw RemotePipesUnexpectedValueError(f), id); - } - - Either, Unit> get failureOrUnit { - return value.fold((l) => left(l), (r) => right(unit)); - } - - bool isValid() => value.isRight(); - - @override - String toString() => 'Value($value)'; - - @override - @nonVirtual - bool operator ==(Object o) { - if (identical(this, o)) return true; - return o is RemotePipesValueObjectAbstract && o.value == value; - } - - @override - int get hashCode => value.hashCode; -} - -class RemotePipesDomain extends RemotePipesValueObjectAbstract { - factory RemotePipesDomain(String input) { - return RemotePipesDomain._( - validateRemotePipesEmpty(input), - ); - } - - const RemotePipesDomain._(this.value); - - @override - final Either, String> value; -} diff --git a/lib/infrastructure/app_communication/app_communication_repository.dart b/lib/infrastructure/app_communication/app_communication_repository.dart index e618a5ba..443e7040 100644 --- a/lib/infrastructure/app_communication/app_communication_repository.dart +++ b/lib/infrastructure/app_communication/app_communication_repository.dart @@ -1,37 +1,27 @@ import 'dart:async'; import 'dart:convert'; -import 'package:cbj_hub/domain/app_communication/i_app_communication_repository.dart'; import 'package:cbj_hub/infrastructure/app_communication/hub_app_server.dart'; import 'package:cbj_hub/infrastructure/remote_pipes/remote_pipes_client.dart'; -import 'package:cbj_hub/infrastructure/remote_pipes/remote_pipes_dtos.dart'; import 'package:cbj_hub/utils.dart'; +import 'package:cbj_integrations_controller/domain/app_communication/i_app_communication_repository.dart'; import 'package:cbj_integrations_controller/domain/core/value_objects.dart'; -import 'package:cbj_integrations_controller/domain/mqtt_server/i_mqtt_server_repository.dart'; import 'package:cbj_integrations_controller/domain/room/room_entity.dart'; import 'package:cbj_integrations_controller/domain/rooms/i_saved_rooms_repo.dart'; -import 'package:cbj_integrations_controller/domain/routine/i_routine_cbj_repository.dart'; -import 'package:cbj_integrations_controller/domain/routine/routine_cbj_entity.dart'; import 'package:cbj_integrations_controller/domain/saved_devices/i_saved_devices_repo.dart'; import 'package:cbj_integrations_controller/domain/scene/i_scene_cbj_repository.dart'; import 'package:cbj_integrations_controller/domain/scene/scene_cbj_entity.dart'; import 'package:cbj_integrations_controller/domain/scene/value_objects_scene_cbj.dart'; -import 'package:cbj_integrations_controller/domain/vendors/login_abstract/login_entity_abstract.dart'; import 'package:cbj_integrations_controller/infrastructure/devices/device_helper/device_helper.dart'; import 'package:cbj_integrations_controller/infrastructure/gen/cbj_hub_server/protoc_as_dart/cbj_hub_server.pbgrpc.dart'; import 'package:cbj_integrations_controller/infrastructure/generic_devices/abstract_device/device_entity_abstract.dart'; import 'package:cbj_integrations_controller/infrastructure/generic_devices/abstract_device/device_entity_dto_abstract.dart'; -import 'package:cbj_integrations_controller/infrastructure/generic_devices/abstract_device/value_objects_core.dart'; import 'package:cbj_integrations_controller/infrastructure/generic_devices/generic_empty_device/generic_empty_entity.dart'; -import 'package:cbj_integrations_controller/infrastructure/generic_vendors_login/vendor_helper.dart'; -import 'package:cbj_integrations_controller/infrastructure/room/room_entity_dtos.dart'; -import 'package:cbj_integrations_controller/infrastructure/routines/routine_cbj_dtos.dart'; -import 'package:cbj_integrations_controller/infrastructure/scenes/scene_cbj_dtos.dart'; +import 'package:cbj_integrations_controller/infrastructure/hub_client/hub_client.dart'; import 'package:cbj_integrations_controller/injection.dart'; import 'package:grpc/grpc.dart'; import 'package:internet_connection_checker/internet_connection_checker.dart'; import 'package:mqtt_client/mqtt_client.dart'; -import 'package:rxdart/rxdart.dart'; class AppCommunicationRepository extends IAppCommunicationRepository { AppCommunicationRepository() { @@ -97,7 +87,8 @@ class AppCommunicationRepository extends IAppCommunicationRepository { dataToSend.listen((MqttPublishMessage event) async { logger.i('Got hub requests to app'); - (await ISavedDevicesRepo.instance.getAllDevices()) + ISavedDevicesRepo.instance + .getAllDevices() .forEach((String id, deviceEntityToSend) { final DeviceEntityDtoAbstract deviceDtoAbstract = DeviceHelper.convertDomainToDto(deviceEntityToSend); @@ -117,102 +108,7 @@ class AppCommunicationRepository extends IAppCommunicationRepository { required String requestUrl, required bool isRemotePipes, }) async { - request.listen((event) async { - logger.i('Got From App'); - - if (event.sendingType == SendingType.entityType) { - final DeviceEntityAbstract deviceEntityFromApp = - DeviceHelper.convertJsonStringToDomain(event.allRemoteCommands); - - deviceEntityFromApp.entityStateGRPC = - EntityState(EntityStateGRPC.waitingInComp.toString()); - - IMqttServerRepository.instance.postToHubMqtt( - entityFromTheApp: deviceEntityFromApp, - gotFromApp: true, - ); - } else if (event.sendingType == SendingType.roomType) { - final RoomEntity roomEntityFromApp = RoomEntityDtos.fromJson( - jsonDecode(event.allRemoteCommands) as Map, - ).toDomain(); - - ISavedRoomsRepo.instance.saveAndActiveRoomToDb( - roomEntity: roomEntityFromApp, - ); - - IMqttServerRepository.instance.postToHubMqtt( - entityFromTheApp: roomEntityFromApp, - gotFromApp: true, - ); - } else if (event.sendingType == SendingType.vendorLoginType) { - final LoginEntityAbstract loginEntityFromApp = - VendorHelper.convertJsonStringToDomain(event.allRemoteCommands); - - ISavedDevicesRepo.instance - .saveAndActivateVendorLoginCredentialsDomainToDb( - loginEntity: loginEntityFromApp, - ); - } else if (event.sendingType == SendingType.firstConnection) { - AppCommunicationRepository.sendAllRoomsFromHubRequestsStream(); - AppCommunicationRepository.sendAllDevicesFromHubRequestsStream(); - AppCommunicationRepository.sendAllScenesFromHubRequestsStream(); - } else if (event.sendingType == SendingType.remotePipesInformation) { - final Map jsonDecoded = - jsonDecode(event.allRemoteCommands) as Map; - // TODO: Fix after new cbj_integrations_controller - // final RemotePipesEntity remotePipes = - RemotePipesDtos.fromJson(jsonDecoded).toDomain(); - // ISavedDevicesRepo.instance - // .saveAndActivateRemotePipesDomainToDb(remotePipes: remotePipes); - } else if (event.sendingType == SendingType.sceneType) { - final Map jsonSceneFromJsonString = - jsonDecode(event.allRemoteCommands) as Map; - - final SceneCbjEntity sceneCbj = - SceneCbjDtos.fromJson(jsonSceneFromJsonString).toDomain(); - - final String sceneStateGrpcTemp = - sceneCbj.entityStateGRPC.getOrCrash()!; - - // sceneCbj.copyWith( - // entityStateGRPC: SceneCbjDeviceStateGRPC( - // EntityStateGRPC.waitingInComp.toString(), - // ), - // ); - - if (sceneStateGrpcTemp == EntityStateGRPC.addingNewScene.toString()) { - ISceneCbjRepository.instance.addNewSceneAndSaveInDb(sceneCbj); - } else { - ISceneCbjRepository.instance.activateScene(sceneCbj); - } - } else if (event.sendingType == SendingType.routineType) { - final Map jsonRoutineFromJsonString = - jsonDecode(event.allRemoteCommands) as Map; - - final RoutineCbjEntity routineCbj = - RoutineCbjDtos.fromJson(jsonRoutineFromJsonString).toDomain(); - - final String routineStateGrpcTemp = - routineCbj.entityStateGRPC.getOrCrash()!; - - // routineCbj.copyWith( - // entityStateGRPC: RoutineCbjDeviceStateGRPC( - // EntityStateGRPC.waitingInComp.toString(), - // ), - // ); - - if (routineStateGrpcTemp == - EntityStateGRPC.addingNewRoutine.toString()) { - IRoutineCbjRepository.instance - .addNewRoutineAndSaveItToLocalDb(routineCbj); - } else { - // For a way to active it manually - // IRoutineCbjRepository.instance.activateRoutine(routineCbj); - } - } else { - logger.w('Request from app does not support this sending device type'); - } - }).onError((error) { + request.listen((event) async {}).onError((error) { if (error is GrpcError && error.code == 1) { logger.t('Client have disconnected'); } else if (error is GrpcError && error.code == 14) { @@ -240,12 +136,12 @@ class AppCommunicationRepository extends IAppCommunicationRepository { '$error', ); startRemotePipesWhenThereIsConnectionToWww(requestUrl); - } else { - logger.e( - 'Un none errno number\n' - '$error', - ); + return; } + logger.e( + 'Un none errno number\n' + '$error', + ); } else { if (error is GrpcError && isRemotePipes && @@ -253,61 +149,66 @@ class AppCommunicationRepository extends IAppCommunicationRepository { !error.message!.contains('errorCode: 0')) { logger.i('Client stream got terminated to create new one\n$error'); startRemotePipesWhenThereIsConnectionToWww(requestUrl); - } else { - logger.e('Client stream error\n$error'); + return; } + logger.e('Client stream error\n$error'); } }); } /// Trigger to send all rooms from hub to app using the /// HubRequestsToApp stream - static Future sendAllRoomsFromHubRequestsStream() async { + @override + Future sendAllRoomsFromHubRequestsStream() async { final Map allRooms = - await ISavedRoomsRepo.instance.getAllRooms(); + ISavedRoomsRepo.instance.getAllRooms(); - if (allRooms.isNotEmpty) { - allRooms.map((String id, RoomEntity d) { - HubRequestsToApp.streamRequestsToApp.sink.add(d.toInfrastructure()); - return MapEntry(id, jsonEncode(d.toInfrastructure().toJson())); - }); - } else { - logger.w("Can't find rooms in the local DB"); + if (allRooms.isEmpty) { + logger.w("Can't find rooms in the local DB"); + + return; } + allRooms.map((String id, RoomEntity d) { + HubRequestsToApp.streamRequestsToApp.sink.add(d.toInfrastructure()); + return MapEntry(id, jsonEncode(d.toInfrastructure().toJson())); + }); } /// Trigger to send all devices from hub to app using the /// HubRequestsToApp stream - static Future sendAllDevicesFromHubRequestsStream() async { + @override + Future sendAllDevicesFromHubRequestsStream() async { final Map allDevices = - await ISavedDevicesRepo.instance.getAllDevices(); + ISavedDevicesRepo.instance.getAllDevices(); final Map allRooms = - await ISavedRoomsRepo.instance.getAllRooms(); - - if (allRooms.isNotEmpty) { - /// The delay fix this issue in gRPC for some reason - /// https://github.com/grpc/grpc-dart/issues/558 - allRooms.map((String id, RoomEntity d) { - HubRequestsToApp.streamRequestsToApp.sink.add(d.toInfrastructure()); - return MapEntry(id, jsonEncode(d.toInfrastructure().toJson())); - }); + ISavedRoomsRepo.instance.getAllRooms(); - allDevices.map((String id, DeviceEntityAbstract d) { - HubRequestsToApp.streamRequestsToApp.sink.add(d.toInfrastructure()); - return MapEntry(id, DeviceHelper.convertDomainToJsonString(d)); - }); - } else { + if (allRooms.isEmpty) { logger.w("Can't find smart devices in the local DB, sending empty"); final DeviceEntityAbstract emptyEntity = GenericEmptyDE.empty(); HubRequestsToApp.streamRequestsToApp.sink .add(emptyEntity.toInfrastructure()); + return; } + + /// The delay fix this issue in gRPC for some reason + /// https://github.com/grpc/grpc-dart/issues/558 + allRooms.map((String id, RoomEntity d) { + HubRequestsToApp.streamRequestsToApp.sink.add(d.toInfrastructure()); + return MapEntry(id, jsonEncode(d.toInfrastructure().toJson())); + }); + + allDevices.map((String id, DeviceEntityAbstract d) { + HubRequestsToApp.streamRequestsToApp.sink.add(d.toInfrastructure()); + return MapEntry(id, DeviceHelper.convertDomainToJsonString(d)); + }); } /// Trigger to send all scenes from hub to app using the /// HubRequestsToApp stream - static Future sendAllScenesFromHubRequestsStream() async { + @override + Future sendAllScenesFromHubRequestsStream() async { final Map allScenes = await ISceneCbjRepository.instance.getAllScenesAsMap(); @@ -341,10 +242,3 @@ class AppCommunicationRepository extends IAppCommunicationRepository { } } } - -/// Connect all streams from the internet devices into one stream that will be -/// send to mqtt broker to update devices states -class HubRequestsToApp { - static BehaviorSubject streamRequestsToApp = - BehaviorSubject(); -} diff --git a/lib/infrastructure/app_communication/hub_app_server.dart b/lib/infrastructure/app_communication/hub_app_server.dart index bdef519e..57a3529a 100644 --- a/lib/infrastructure/app_communication/hub_app_server.dart +++ b/lib/infrastructure/app_communication/hub_app_server.dart @@ -1,16 +1,11 @@ -import 'dart:convert'; import 'dart:io'; -import 'package:cbj_hub/domain/app_communication/i_app_communication_repository.dart'; -import 'package:cbj_hub/infrastructure/app_communication/app_communication_repository.dart'; import 'package:cbj_hub/utils.dart'; -import 'package:cbj_integrations_controller/infrastructure/devices/device_helper/device_helper.dart'; +import 'package:cbj_integrations_controller/domain/app_communication/i_app_communication_repository.dart'; +import 'package:cbj_integrations_controller/infrastructure/devices/helper_methods/device_helper_methods.dart'; import 'package:cbj_integrations_controller/infrastructure/gen/cbj_hub_server/proto_gen_date.dart'; import 'package:cbj_integrations_controller/infrastructure/gen/cbj_hub_server/protoc_as_dart/cbj_hub_server.pbgrpc.dart'; -import 'package:cbj_integrations_controller/infrastructure/generic_devices/abstract_device/device_entity_dto_abstract.dart'; -import 'package:cbj_integrations_controller/infrastructure/room/room_entity_dtos.dart'; -import 'package:cbj_integrations_controller/infrastructure/routines/routine_cbj_dtos.dart'; -import 'package:cbj_integrations_controller/infrastructure/scenes/scene_cbj_dtos.dart'; +import 'package:cbj_integrations_controller/infrastructure/hub_client/hub_client.dart'; import 'package:grpc/service_api.dart'; /// Server to get and send information to the app @@ -29,34 +24,9 @@ class HubAppServer extends CbjHubServiceBase { isRemotePipes: false, ); - yield* HubRequestsToApp.streamRequestsToApp.map((dynamic entityDto) { - if (entityDto is DeviceEntityDtoAbstract) { - return RequestsAndStatusFromHub( - sendingType: SendingType.entityType, - allRemoteCommands: DeviceHelper.convertDtoToJsonString(entityDto), - ); - } else if (entityDto is RoomEntityDtos) { - return RequestsAndStatusFromHub( - sendingType: SendingType.roomType, - allRemoteCommands: jsonEncode(entityDto.toJson()), - ); - } else if (entityDto is SceneCbjDtos) { - return RequestsAndStatusFromHub( - sendingType: SendingType.sceneType, - allRemoteCommands: jsonEncode(entityDto.toJson()), - ); - } else if (entityDto is RoutineCbjDtos) { - return RequestsAndStatusFromHub( - sendingType: SendingType.routineType, - allRemoteCommands: jsonEncode(entityDto.toJson()), - ); - } else { - return RequestsAndStatusFromHub( - sendingType: SendingType.undefinedType, - allRemoteCommands: '', - ); - } - }).handleError((error) => logger.e('Stream have error $error')); + yield* HubRequestsToApp.streamRequestsToApp + .map(DeviceHelperMethods().dynamicToRequestsAndStatusFromHub) + .handleError((error) => logger.e('Stream have error $error')); } catch (e) { logger.e('Hub server error $e'); } diff --git a/lib/infrastructure/mqtt_server/mqtt_server_repository.dart b/lib/infrastructure/mqtt_server/mqtt_server_repository.dart index 35e3fd04..7bf7191c 100644 --- a/lib/infrastructure/mqtt_server/mqtt_server_repository.dart +++ b/lib/infrastructure/mqtt_server/mqtt_server_repository.dart @@ -1,7 +1,6 @@ import 'dart:convert'; import 'package:cbj_hub/application/connector/connector.dart'; -import 'package:cbj_hub/infrastructure/app_communication/app_communication_repository.dart'; import 'package:cbj_hub/utils.dart'; import 'package:cbj_integrations_controller/domain/mqtt_server/i_mqtt_server_repository.dart'; import 'package:cbj_integrations_controller/domain/saved_devices/i_saved_devices_repo.dart'; @@ -17,6 +16,7 @@ import 'package:cbj_integrations_controller/infrastructure/generic_devices/gener import 'package:cbj_integrations_controller/infrastructure/generic_devices/generic_smart_plug_device/generic_smart_plug_entity.dart'; import 'package:cbj_integrations_controller/infrastructure/generic_devices/generic_smart_tv/generic_smart_tv_entity.dart'; import 'package:cbj_integrations_controller/infrastructure/generic_devices/generic_switch_device/generic_switch_entity.dart'; +import 'package:cbj_integrations_controller/infrastructure/hub_client/hub_client.dart'; import 'package:mqtt_client/mqtt_client.dart'; import 'package:mqtt_client/mqtt_server_client.dart'; // ignore: implementation_imports @@ -49,7 +49,7 @@ class MqttServerRepository extends IMqttServerRepository { static Future? clientFuture; @override - Future asyncConstractor() async { + Future asyncConstructor() async { clientFuture = connect(); await clientFuture; } @@ -411,9 +411,8 @@ class MqttServerRepository extends IMqttServerRepository { bool? gotFromApp, }) async { if (entityFromTheApp is DeviceEntityAbstract) { - final ISavedDevicesRepo savedDevicesRepo = ISavedDevicesRepo.instance; final Map allDevices = - await savedDevicesRepo.getAllDevices(); + await ISavedDevicesRepo.instance.getAllDevices(); final DeviceEntityAbstract? savedDeviceEntity = allDevices[entityFromTheApp.getDeviceId()]; diff --git a/lib/infrastructure/remote_pipes/remote_pipes_client.dart b/lib/infrastructure/remote_pipes/remote_pipes_client.dart index 51f0b24c..dc27d626 100644 --- a/lib/infrastructure/remote_pipes/remote_pipes_client.dart +++ b/lib/infrastructure/remote_pipes/remote_pipes_client.dart @@ -1,24 +1,16 @@ import 'dart:async'; -import 'dart:convert'; -import 'package:cbj_hub/domain/app_communication/i_app_communication_repository.dart'; -import 'package:cbj_hub/infrastructure/app_communication/app_communication_repository.dart'; -import 'package:cbj_hub/utils.dart'; -import 'package:cbj_integrations_controller/infrastructure/devices/device_helper/device_helper.dart'; +import 'package:cbj_integrations_controller/domain/app_communication/i_app_communication_repository.dart'; +import 'package:cbj_integrations_controller/infrastructure/devices/helper_methods/device_helper_methods.dart'; import 'package:cbj_integrations_controller/infrastructure/gen/cbj_hub_server/protoc_as_dart/cbj_hub_server.pbgrpc.dart'; -import 'package:cbj_integrations_controller/infrastructure/generic_devices/abstract_device/device_entity_dto_abstract.dart'; -import 'package:cbj_integrations_controller/infrastructure/generic_devices/generic_ping_device/generic_ping_device_dtos.dart'; -import 'package:cbj_integrations_controller/infrastructure/generic_devices/generic_ping_device/generic_ping_entity.dart'; -import 'package:cbj_integrations_controller/infrastructure/room/room_entity_dtos.dart'; -import 'package:cbj_integrations_controller/infrastructure/routines/routine_cbj_dtos.dart'; -import 'package:cbj_integrations_controller/infrastructure/scenes/scene_cbj_dtos.dart'; +import 'package:cbj_integrations_controller/infrastructure/hub_client/hub_client.dart'; +import 'package:cbj_integrations_controller/utils.dart'; import 'package:grpc/grpc.dart'; -import 'package:rxdart/rxdart.dart'; +// TODO: Replace with HubClient class RemotePipesClient { static ClientChannel? channel; static CbjHubClient? stub; - static Timer? grpcKeepAlive; // createStreamWithRemotePipes /// Turn smart device on @@ -31,43 +23,12 @@ class RemotePipesClient { ResponseStream response; - grpcDartKeepAliveWorkaround(HubRequestsToApp.streamRequestsToApp); - try { response = stub!.hubTransferEntities( /// Transfer all requests from hub to the remote pipes->app - HubRequestsToApp.streamRequestsToApp.map((dynamic entityDtoToSend) { - if (entityDtoToSend is DeviceEntityDtoAbstract) { - if (entityDtoToSend is! GenericPingDeviceDtos) { - grpcDartKeepAliveWorkaround(HubRequestsToApp.streamRequestsToApp); - } - return RequestsAndStatusFromHub( - sendingType: SendingType.entityType, - allRemoteCommands: - DeviceHelper.convertDtoToJsonString(entityDtoToSend), - ); - } else if (entityDtoToSend is RoomEntityDtos) { - return RequestsAndStatusFromHub( - sendingType: SendingType.roomType, - allRemoteCommands: jsonEncode(entityDtoToSend.toJson()), - ); - } else if (entityDtoToSend is SceneCbjDtos) { - return RequestsAndStatusFromHub( - sendingType: SendingType.sceneType, - allRemoteCommands: jsonEncode(entityDtoToSend.toJson()), - ); - } else if (entityDtoToSend is RoutineCbjDtos) { - return RequestsAndStatusFromHub( - sendingType: SendingType.routineType, - allRemoteCommands: jsonEncode(entityDtoToSend.toJson()), - ); - } else { - logger.w('Not sure what type to send'); - return RequestsAndStatusFromHub( - sendingType: SendingType.undefinedType, - ); - } - }).handleError((error) { + HubRequestsToApp.streamRequestsToApp + .map(DeviceHelperMethods().dynamicToRequestsAndStatusFromHub) + .handleError((error) { logger.e('Stream have error $error'); }), ); @@ -84,19 +45,6 @@ class RemotePipesClient { } } - /// Workaround until gRPC dart implement keep alive - /// https://github.com/grpc/grpc-dart/issues/157 - static Future grpcDartKeepAliveWorkaround( - BehaviorSubject sendRequest, - ) async { - grpcKeepAlive?.cancel(); - final GenericPingDE genericEmptyDE = GenericPingDE.empty(); - grpcKeepAlive = Timer.periodic(const Duration(minutes: 9), (Timer t) { - logger.t('Ping device to Remote Pipes'); - sendRequest.add(genericEmptyDE.toInfrastructure()); - }); - } - static Future _createCbjHubClient( String deviceIp, int hubPort, diff --git a/lib/infrastructure/remote_pipes/remote_pipes_dtos.dart b/lib/infrastructure/remote_pipes/remote_pipes_dtos.dart deleted file mode 100644 index 9bd300b9..00000000 --- a/lib/infrastructure/remote_pipes/remote_pipes_dtos.dart +++ /dev/null @@ -1,46 +0,0 @@ -import 'package:cbj_hub/domain/remote_pipes/remote_pipes_entity.dart'; -import 'package:cbj_hub/domain/remote_pipes/remote_pipes_value_objects.dart'; -import 'package:freezed_annotation/freezed_annotation.dart'; - -part 'remote_pipes_dtos.freezed.dart'; -part 'remote_pipes_dtos.g.dart'; - -@freezed -abstract class RemotePipesDtos implements _$RemotePipesDtos { - factory RemotePipesDtos({ - // @JsonKey(ignore: true) - required String domainName, - // required ServerTimestampConverter() FieldValue serverTimeStamp, - }) = _RemotePipesDtos; - - RemotePipesDtos._(); - - factory RemotePipesDtos.fromDomain(RemotePipesEntity remotePipesDE) { - return RemotePipesDtos( - domainName: remotePipesDE.domainName!.getOrCrash(), - ); - } - - factory RemotePipesDtos.fromJson(Map json) => - _$RemotePipesDtosFromJson(json); - - RemotePipesEntity toDomain() { - return RemotePipesEntity( - domainName: RemotePipesDomain(domainName), - ); - } - - final String deviceDtoClassInstance = (RemotePipesDtos).toString(); -} - -// class ServerTimestampConverter implements JsonConverter { -// const ServerTimestampConverter(); -// -// @override -// FieldValue fromJson(Object json) { -// return FieldValue.serverTimestamp(); -// } -// -// @override -// Object toJson(FieldValue fieldValue) => fieldValue; -// } diff --git a/pubspec.yaml b/pubspec.yaml index f88b9887..4dd752bb 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -12,6 +12,7 @@ dependencies: # Contains utility classes in the style of dart:async to work with asynchronous computations. async: ^2.11.0 cbj_integrations_controller: +# path: ../cbj_integrations_controller git: https://github.com/CyBear-Jinni/cbj_integrations_controller.git # Package to create, convert, alter, and compare colors in a variety of colorspaces. color: ^3.0.0 @@ -50,7 +51,7 @@ dependencies: # Service discovery over multicast DNS (mDNS), Bonjour, and Avahi. multicast_dns: ^0.3.2+4 # Helps you discover open ports, devices on subnet and more. - network_tools: ^4.0.1 + network_tools: ^4.0.2 # Provides runtime support for a Dart implementation of protobufs. protobuf: ^3.1.0 # Extends the capabilities of Dart Streams and StreamControllers.