From 6d4b11246aecdfdd2db11e0c0ef0666ce3a557cf Mon Sep 17 00:00:00 2001 From: Hui Zhao Date: Mon, 4 Mar 2024 14:53:54 -0800 Subject: [PATCH] chore(pubsub): run yarn lint:fix --- packages/pubsub/src/Providers/AWSIot.ts | 6 +- packages/pubsub/src/Providers/MqttOverWS.ts | 65 ++++++++++--------- packages/pubsub/src/Providers/PubSub.ts | 6 +- packages/pubsub/src/types/PubSub.ts | 11 ++-- .../src/utils/ConnectionStateMonitor.ts | 10 ++- .../pubsub/src/utils/ReconnectionMonitor.ts | 1 + 6 files changed, 58 insertions(+), 41 deletions(-) diff --git a/packages/pubsub/src/Providers/AWSIot.ts b/packages/pubsub/src/Providers/AWSIot.ts index b517c516f06..cf88bfacbb6 100644 --- a/packages/pubsub/src/Providers/AWSIot.ts +++ b/packages/pubsub/src/Providers/AWSIot.ts @@ -1,8 +1,10 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -import { MqttOverWS, MqttOptions } from './MqttOverWS'; import { Signer } from '@aws-amplify/core/internals/utils'; import { fetchAuthSession } from '@aws-amplify/core'; + +import { MqttOptions, MqttOverWS } from './MqttOverWS'; + const SERVICE_NAME = 'iotdevicegateway'; export interface AWSIoTOptions extends MqttOptions { @@ -21,7 +23,7 @@ export class AWSIoT extends MqttOverWS { protected get endpoint() { return (async () => { - const endpoint = this.options.endpoint; + const { endpoint } = this.options; const serviceInfo = { service: SERVICE_NAME, diff --git a/packages/pubsub/src/Providers/MqttOverWS.ts b/packages/pubsub/src/Providers/MqttOverWS.ts index c3eb89c4c8d..0e5298d91e1 100644 --- a/packages/pubsub/src/Providers/MqttOverWS.ts +++ b/packages/pubsub/src/Providers/MqttOverWS.ts @@ -2,35 +2,36 @@ // SPDX-License-Identifier: Apache-2.0 // @ts-ignore -import * as Paho from '../vendor/paho-mqtt.js'; -import { Observable, SubscriptionLike as Subscription, Observer } from 'rxjs'; +import { Observable, Observer, SubscriptionLike as Subscription } from 'rxjs'; +import { ConsoleLogger, Hub, HubPayload } from '@aws-amplify/core'; +import { amplifyUuid } from '@aws-amplify/core/internals/utils'; -import { AbstractPubSub } from './PubSub'; import { ConnectionState, - PubSubContentObserver, PubSubContent, + PubSubContentObserver, PubSubOptions, PublishInput, SubscribeInput, } from '../types/PubSub'; -import { Hub, HubPayload, ConsoleLogger } from '@aws-amplify/core'; -import { amplifyUuid } from '@aws-amplify/core/internals/utils'; +import * as Paho from '../vendor/paho-mqtt.js'; import { - ConnectionStateMonitor, CONNECTION_CHANGE, + ConnectionStateMonitor, } from '../utils/ConnectionStateMonitor'; import { ReconnectEvent, ReconnectionMonitor, } from '../utils/ReconnectionMonitor'; + +import { AbstractPubSub } from './PubSub'; import { AMPLIFY_SYMBOL, CONNECTION_STATE_CHANGE } from './constants'; const logger = new ConsoleLogger('MqttOverWS'); export function mqttTopicMatch(filter: string, topic: string) { const filterArray = filter.split('/'); - const length = filterArray.length; + const { length } = filterArray; const topicArray = topic.split('/'); for (let i = 0; i < length; ++i) { @@ -39,6 +40,7 @@ export function mqttTopicMatch(filter: string, topic: string) { if (left === '#') return topicArray.length >= length; if (left !== '+' && left !== right) return false; } + return length === topicArray.length; } @@ -49,23 +51,23 @@ export interface MqttOptions extends PubSubOptions { } interface PahoClient { - onMessageArrived: (params: { + onMessageArrived(params: { destinationName: string; payloadString: string; - }) => void; - onConnectionLost: (params: { errorCode: number }) => void; - connect: (params: { - [k: string]: string | number | boolean | (() => void); - }) => void; - disconnect: () => void; - isConnected: () => boolean; - subscribe: (topic: string) => void; - unsubscribe: (topic: string) => void; + }): void; + onConnectionLost(params: { errorCode: number }): void; + connect( + params: Record void)>, + ): void; + disconnect(): void; + isConnected(): boolean; + subscribe(topic: string): void; + unsubscribe(topic: string): void; send(topic: string, message: string): void; } class ClientsQueue { - private promises: Map> = new Map(); + private promises = new Map>(); async get( clientId: string, @@ -78,6 +80,7 @@ class ClientsQueue { const newPromise = clientFactory(clientId); this.promises.set(clientId, newPromise); newPromise.catch(() => this.promises.delete(clientId)); + return newPromise; } @@ -146,9 +149,8 @@ export class MqttOverWS extends AbstractPubSub { } protected get isSSLEnabled() { - return !this.options[ - 'aws_appsync_dangerously_connect_to_http_endpoint_for_testing' - ]; + return !this.options + .aws_appsync_dangerously_connect_to_http_endpoint_for_testing; } public onDisconnect({ @@ -203,7 +205,9 @@ export class MqttOverWS extends AbstractPubSub { client.connect({ useSSL: this.isSSLEnabled, mqttVersion: 3, - onSuccess: () => resolve(true), + onSuccess: () => { + resolve(true); + }, onFailure: () => { if (clientId) this._clientsQueue.remove(clientId); this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED); @@ -236,6 +240,7 @@ export class MqttOverWS extends AbstractPubSub { }, ); } + return client; }); } @@ -258,7 +263,9 @@ export class MqttOverWS extends AbstractPubSub { if (client) { logger.debug('Publishing to topic(s)', targetTopics.join(','), message); - targetTopics.forEach(topic => client.send(topic, msg)); + targetTopics.forEach(topic => { + client.send(topic, msg); + }); } else { logger.debug( 'Publishing to topic(s) failed', @@ -268,11 +275,9 @@ export class MqttOverWS extends AbstractPubSub { } } - protected _topicObservers: Map> = - new Map(); + protected _topicObservers = new Map>(); - protected _clientIdObservers: Map> = - new Map(); + protected _clientIdObservers = new Map>(); private _onMessage(topic: string, msg: string) { try { @@ -290,7 +295,9 @@ export class MqttOverWS extends AbstractPubSub { } matchedTopicObservers.forEach(observersForTopic => { - observersForTopic.forEach(observer => observer.next(parsedMessage)); + observersForTopic.forEach(observer => { + observer.next(parsedMessage); + }); }); } catch (error) { logger.warn('Error handling message', error, msg); diff --git a/packages/pubsub/src/Providers/PubSub.ts b/packages/pubsub/src/Providers/PubSub.ts index 5750d0a1940..22c124f041e 100644 --- a/packages/pubsub/src/Providers/PubSub.ts +++ b/packages/pubsub/src/Providers/PubSub.ts @@ -1,14 +1,16 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import { Observable } from 'rxjs'; +import { ConsoleLogger } from '@aws-amplify/core'; + import { PubSubBase, - PubSubOptions, PubSubContent, + PubSubOptions, PublishInput, SubscribeInput, } from '../types/PubSub'; -import { ConsoleLogger } from '@aws-amplify/core'; + const logger = new ConsoleLogger('AbstractPubSubProvider'); export abstract class AbstractPubSub diff --git a/packages/pubsub/src/types/PubSub.ts b/packages/pubsub/src/types/PubSub.ts index 40b3f6eca25..5f2368c5742 100644 --- a/packages/pubsub/src/types/PubSub.ts +++ b/packages/pubsub/src/types/PubSub.ts @@ -1,6 +1,7 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -import { Observer, Observable } from 'rxjs'; +import { Observable, Observer } from 'rxjs'; + export interface SubscriptionObserver { closed: boolean; next(value: T): void; @@ -69,13 +70,13 @@ export interface PubSubBase { subscribe(input: SubscribeInput): Observable; } -export type PublishInput = { +export interface PublishInput { topics: string[] | string; message: PubSubContent; options?: PubSubOptions; -}; +} -export type SubscribeInput = { +export interface SubscribeInput { topics: string[] | string; options?: PubSubOptions; -}; +} diff --git a/packages/pubsub/src/utils/ConnectionStateMonitor.ts b/packages/pubsub/src/utils/ConnectionStateMonitor.ts index f0e7c38c3ba..5cf35013616 100644 --- a/packages/pubsub/src/utils/ConnectionStateMonitor.ts +++ b/packages/pubsub/src/utils/ConnectionStateMonitor.ts @@ -1,19 +1,21 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -import { Observable, Observer, SubscriptionLike, map, filter } from 'rxjs'; +import { Observable, Observer, SubscriptionLike, filter, map } from 'rxjs'; + import { ConnectionState } from '../types/PubSub'; + import { ReachabilityMonitor } from './ReachabilityMonitor'; // Internal types for tracking different connection states type LinkedConnectionState = 'connected' | 'disconnected'; type LinkedHealthState = 'healthy' | 'unhealthy'; -type LinkedConnectionStates = { +interface LinkedConnectionStates { networkState: LinkedConnectionState; connectionState: LinkedConnectionState | 'connecting'; intendedConnectionState: LinkedConnectionState; keepAliveState: LinkedHealthState; -}; +} export const CONNECTION_CHANGE: { [key in @@ -130,6 +132,7 @@ export class ConnectionStateMonitor { filter(current => { const toInclude = current !== previous; previous = current; + return toInclude; }), ); @@ -194,6 +197,7 @@ export class ConnectionStateMonitor { // All remaining states directly correspond to the connection state if (connectionState === 'connecting') return ConnectionState.Connecting; if (connectionState === 'disconnected') return ConnectionState.Disconnected; + return ConnectionState.Connected; } } diff --git a/packages/pubsub/src/utils/ReconnectionMonitor.ts b/packages/pubsub/src/utils/ReconnectionMonitor.ts index 50ada9cf1b5..f521f2a7208 100644 --- a/packages/pubsub/src/utils/ReconnectionMonitor.ts +++ b/packages/pubsub/src/utils/ReconnectionMonitor.ts @@ -1,6 +1,7 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import { Observer } from 'rxjs'; + import { RECONNECT_DELAY, RECONNECT_INTERVAL } from '../Providers/constants'; export enum ReconnectEvent {