Skip to content

Commit

Permalink
chore(pubsub): run yarn lint:fix
Browse files Browse the repository at this point in the history
  • Loading branch information
HuiSF committed Mar 5, 2024
1 parent a91c70b commit 6d4b112
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 41 deletions.
6 changes: 4 additions & 2 deletions packages/pubsub/src/Providers/AWSIot.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import { MqttOverWS, MqttOptions } from './MqttOverWS';
import { Signer } from '@aws-amplify/core/internals/utils';
import { fetchAuthSession } from '@aws-amplify/core';

import { MqttOptions, MqttOverWS } from './MqttOverWS';

const SERVICE_NAME = 'iotdevicegateway';

export interface AWSIoTOptions extends MqttOptions {
Expand All @@ -21,7 +23,7 @@ export class AWSIoT extends MqttOverWS {

protected get endpoint() {
return (async () => {
const endpoint = this.options.endpoint;
const { endpoint } = this.options;

const serviceInfo = {
service: SERVICE_NAME,
Expand Down
65 changes: 36 additions & 29 deletions packages/pubsub/src/Providers/MqttOverWS.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,36 @@
// SPDX-License-Identifier: Apache-2.0

// @ts-ignore
import * as Paho from '../vendor/paho-mqtt.js';
import { Observable, SubscriptionLike as Subscription, Observer } from 'rxjs';
import { Observable, Observer, SubscriptionLike as Subscription } from 'rxjs';
import { ConsoleLogger, Hub, HubPayload } from '@aws-amplify/core';
import { amplifyUuid } from '@aws-amplify/core/internals/utils';

import { AbstractPubSub } from './PubSub';
import {
ConnectionState,
PubSubContentObserver,
PubSubContent,
PubSubContentObserver,
PubSubOptions,
PublishInput,
SubscribeInput,
} from '../types/PubSub';
import { Hub, HubPayload, ConsoleLogger } from '@aws-amplify/core';
import { amplifyUuid } from '@aws-amplify/core/internals/utils';
import * as Paho from '../vendor/paho-mqtt.js';
import {
ConnectionStateMonitor,
CONNECTION_CHANGE,
ConnectionStateMonitor,
} from '../utils/ConnectionStateMonitor';
import {
ReconnectEvent,
ReconnectionMonitor,
} from '../utils/ReconnectionMonitor';

import { AbstractPubSub } from './PubSub';
import { AMPLIFY_SYMBOL, CONNECTION_STATE_CHANGE } from './constants';

const logger = new ConsoleLogger('MqttOverWS');

export function mqttTopicMatch(filter: string, topic: string) {
const filterArray = filter.split('/');
const length = filterArray.length;
const { length } = filterArray;
const topicArray = topic.split('/');

for (let i = 0; i < length; ++i) {
Expand All @@ -39,6 +40,7 @@ export function mqttTopicMatch(filter: string, topic: string) {
if (left === '#') return topicArray.length >= length;
if (left !== '+' && left !== right) return false;
}

return length === topicArray.length;
}

Expand All @@ -49,23 +51,23 @@ export interface MqttOptions extends PubSubOptions {
}

interface PahoClient {
onMessageArrived: (params: {
onMessageArrived(params: {
destinationName: string;
payloadString: string;
}) => void;
onConnectionLost: (params: { errorCode: number }) => void;
connect: (params: {
[k: string]: string | number | boolean | (() => void);
}) => void;
disconnect: () => void;
isConnected: () => boolean;
subscribe: (topic: string) => void;
unsubscribe: (topic: string) => void;
}): void;
onConnectionLost(params: { errorCode: number }): void;
connect(
params: Record<string, string | number | boolean | (() => void)>,
): void;
disconnect(): void;
isConnected(): boolean;
subscribe(topic: string): void;
unsubscribe(topic: string): void;
send(topic: string, message: string): void;
}

class ClientsQueue {
private promises: Map<string, Promise<PahoClient | undefined>> = new Map();
private promises = new Map<string, Promise<PahoClient | undefined>>();

async get(
clientId: string,
Expand All @@ -78,6 +80,7 @@ class ClientsQueue {
const newPromise = clientFactory(clientId);
this.promises.set(clientId, newPromise);
newPromise.catch(() => this.promises.delete(clientId));

return newPromise;
}

Expand Down Expand Up @@ -146,9 +149,8 @@ export class MqttOverWS extends AbstractPubSub<MqttOptions> {
}

protected get isSSLEnabled() {
return !this.options[
'aws_appsync_dangerously_connect_to_http_endpoint_for_testing'
];
return !this.options
.aws_appsync_dangerously_connect_to_http_endpoint_for_testing;
}

public onDisconnect({
Expand Down Expand Up @@ -203,7 +205,9 @@ export class MqttOverWS extends AbstractPubSub<MqttOptions> {
client.connect({
useSSL: this.isSSLEnabled,
mqttVersion: 3,
onSuccess: () => resolve(true),
onSuccess: () => {
resolve(true);
},
onFailure: () => {
if (clientId) this._clientsQueue.remove(clientId);
this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED);
Expand Down Expand Up @@ -236,6 +240,7 @@ export class MqttOverWS extends AbstractPubSub<MqttOptions> {
},
);
}

return client;
});
}
Expand All @@ -258,7 +263,9 @@ export class MqttOverWS extends AbstractPubSub<MqttOptions> {

if (client) {
logger.debug('Publishing to topic(s)', targetTopics.join(','), message);
targetTopics.forEach(topic => client.send(topic, msg));
targetTopics.forEach(topic => {
client.send(topic, msg);
});
} else {
logger.debug(
'Publishing to topic(s) failed',
Expand All @@ -268,11 +275,9 @@ export class MqttOverWS extends AbstractPubSub<MqttOptions> {
}
}

protected _topicObservers: Map<string, Set<PubSubContentObserver>> =
new Map();
protected _topicObservers = new Map<string, Set<PubSubContentObserver>>();

protected _clientIdObservers: Map<string, Set<PubSubContentObserver>> =
new Map();
protected _clientIdObservers = new Map<string, Set<PubSubContentObserver>>();

private _onMessage(topic: string, msg: string) {
try {
Expand All @@ -290,7 +295,9 @@ export class MqttOverWS extends AbstractPubSub<MqttOptions> {
}

matchedTopicObservers.forEach(observersForTopic => {
observersForTopic.forEach(observer => observer.next(parsedMessage));
observersForTopic.forEach(observer => {
observer.next(parsedMessage);
});
});
} catch (error) {
logger.warn('Error handling message', error, msg);
Expand Down
6 changes: 4 additions & 2 deletions packages/pubsub/src/Providers/PubSub.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import { Observable } from 'rxjs';
import { ConsoleLogger } from '@aws-amplify/core';

import {
PubSubBase,
PubSubOptions,
PubSubContent,
PubSubOptions,
PublishInput,
SubscribeInput,
} from '../types/PubSub';
import { ConsoleLogger } from '@aws-amplify/core';

const logger = new ConsoleLogger('AbstractPubSubProvider');

export abstract class AbstractPubSub<T extends PubSubOptions>
Expand Down
11 changes: 6 additions & 5 deletions packages/pubsub/src/types/PubSub.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import { Observer, Observable } from 'rxjs';
import { Observable, Observer } from 'rxjs';

export interface SubscriptionObserver<T> {
closed: boolean;
next(value: T): void;
Expand Down Expand Up @@ -69,13 +70,13 @@ export interface PubSubBase {
subscribe(input: SubscribeInput): Observable<PubSubContent>;
}

export type PublishInput = {
export interface PublishInput {
topics: string[] | string;
message: PubSubContent;
options?: PubSubOptions;
};
}

export type SubscribeInput = {
export interface SubscribeInput {
topics: string[] | string;
options?: PubSubOptions;
};
}
10 changes: 7 additions & 3 deletions packages/pubsub/src/utils/ConnectionStateMonitor.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

import { Observable, Observer, SubscriptionLike, map, filter } from 'rxjs';
import { Observable, Observer, SubscriptionLike, filter, map } from 'rxjs';

import { ConnectionState } from '../types/PubSub';

import { ReachabilityMonitor } from './ReachabilityMonitor';

// Internal types for tracking different connection states
type LinkedConnectionState = 'connected' | 'disconnected';
type LinkedHealthState = 'healthy' | 'unhealthy';
type LinkedConnectionStates = {
interface LinkedConnectionStates {
networkState: LinkedConnectionState;
connectionState: LinkedConnectionState | 'connecting';
intendedConnectionState: LinkedConnectionState;
keepAliveState: LinkedHealthState;
};
}

export const CONNECTION_CHANGE: {
[key in
Expand Down Expand Up @@ -130,6 +132,7 @@ export class ConnectionStateMonitor {
filter(current => {
const toInclude = current !== previous;
previous = current;

return toInclude;
}),
);
Expand Down Expand Up @@ -194,6 +197,7 @@ export class ConnectionStateMonitor {
// All remaining states directly correspond to the connection state
if (connectionState === 'connecting') return ConnectionState.Connecting;
if (connectionState === 'disconnected') return ConnectionState.Disconnected;

return ConnectionState.Connected;
}
}
1 change: 1 addition & 0 deletions packages/pubsub/src/utils/ReconnectionMonitor.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import { Observer } from 'rxjs';

import { RECONNECT_DELAY, RECONNECT_INTERVAL } from '../Providers/constants';

export enum ReconnectEvent {
Expand Down

0 comments on commit 6d4b112

Please sign in to comment.