Skip to content

Commit

Permalink
chore(pubsub): enable eslint and remove tslint (#13081)
Browse files Browse the repository at this point in the history
* chore(pubsub): enable eslint and remove tslint

* chore(pubsub): run yarn lint:fix

* chore(pubsub): manual fix of linter reproted error
  • Loading branch information
HuiSF authored Mar 6, 2024
1 parent 18bf983 commit 14c51e1
Show file tree
Hide file tree
Showing 10 changed files with 78 additions and 99 deletions.
2 changes: 1 addition & 1 deletion .eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ module.exports = {
'packages/interactions',
'packages/notifications',
'packages/predictions',
'packages/pubsub',
// 'packages/pubsub',
'packages/react-native',
'packages/rtn-push-notification',
'packages/rtn-web-browser',
Expand Down
3 changes: 2 additions & 1 deletion packages/pubsub/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
"clean": "npm run clean:size && rimraf dist lib lib-esm",
"clean:size": "rimraf dual-publish-tmp tmp*",
"format": "echo \"Not implemented\"",
"lint": "tslint 'src/**/*.ts' && npm run ts-coverage",
"lint": "eslint '**/*.{ts,tsx}' && npm run ts-coverage",
"lint:fix": "eslint '**/*.{ts,tsx}' --fix",
"ts-coverage": "typescript-coverage-report -p ./tsconfig.build.json -t 93.0 -i src/vendor/paho-mqtt.js"
},
"typesVersions": {
Expand Down
6 changes: 4 additions & 2 deletions packages/pubsub/src/Providers/AWSIot.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import { MqttOverWS, MqttOptions } from './MqttOverWS';
import { Signer } from '@aws-amplify/core/internals/utils';

Check warning on line 3 in packages/pubsub/src/Providers/AWSIot.ts

View workflow job for this annotation

GitHub Actions / e2e / unit-tests / Unit Test - @aws-amplify/pubsub

Deprecated
import { fetchAuthSession } from '@aws-amplify/core';

import { MqttOptions, MqttOverWS } from './MqttOverWS';

const SERVICE_NAME = 'iotdevicegateway';

export interface AWSIoTOptions extends MqttOptions {
Expand All @@ -21,7 +23,7 @@ export class AWSIoT extends MqttOverWS {

protected get endpoint() {
return (async () => {
const endpoint = this.options.endpoint;
const { endpoint } = this.options;

const serviceInfo = {
service: SERVICE_NAME,
Expand Down
89 changes: 51 additions & 38 deletions packages/pubsub/src/Providers/MqttOverWS.ts
Original file line number Diff line number Diff line change
@@ -1,36 +1,38 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

// @ts-ignore
import * as Paho from '../vendor/paho-mqtt.js';
import { Observable, SubscriptionLike as Subscription, Observer } from 'rxjs';
import { Observable, Observer, SubscriptionLike as Subscription } from 'rxjs';
import { ConsoleLogger, Hub, HubPayload } from '@aws-amplify/core';
import { amplifyUuid } from '@aws-amplify/core/internals/utils';

import { AbstractPubSub } from './PubSub';
import {
ConnectionState,
PubSubContentObserver,
PubSubContent,
PubSubContentObserver,
PubSubOptions,
PublishInput,
SubscribeInput,
} from '../types/PubSub';
import { Hub, HubPayload, ConsoleLogger } from '@aws-amplify/core';
import { amplifyUuid } from '@aws-amplify/core/internals/utils';
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore this module is expected to not have declaration file
import * as Paho from '../vendor/paho-mqtt.js';
import {
ConnectionStateMonitor,
CONNECTION_CHANGE,
ConnectionStateMonitor,
} from '../utils/ConnectionStateMonitor';
import {
ReconnectEvent,
ReconnectionMonitor,
} from '../utils/ReconnectionMonitor';

import { AbstractPubSub } from './PubSub';
import { AMPLIFY_SYMBOL, CONNECTION_STATE_CHANGE } from './constants';

const logger = new ConsoleLogger('MqttOverWS');

export function mqttTopicMatch(filter: string, topic: string) {
const filterArray = filter.split('/');
const length = filterArray.length;
const { length } = filterArray;
const topicArray = topic.split('/');

for (let i = 0; i < length; ++i) {
Expand All @@ -39,6 +41,7 @@ export function mqttTopicMatch(filter: string, topic: string) {
if (left === '#') return topicArray.length >= length;
if (left !== '+' && left !== right) return false;
}

return length === topicArray.length;
}

Expand All @@ -49,23 +52,23 @@ export interface MqttOptions extends PubSubOptions {
}

interface PahoClient {
onMessageArrived: (params: {
onMessageArrived(params: {
destinationName: string;
payloadString: string;
}) => void;
onConnectionLost: (params: { errorCode: number }) => void;
connect: (params: {
[k: string]: string | number | boolean | (() => void);
}) => void;
disconnect: () => void;
isConnected: () => boolean;
subscribe: (topic: string) => void;
unsubscribe: (topic: string) => void;
}): void;
onConnectionLost(params: { errorCode: number }): void;
connect(
params: Record<string, string | number | boolean | (() => void)>,
): void;
disconnect(): void;
isConnected(): boolean;
subscribe(topic: string): void;
unsubscribe(topic: string): void;
send(topic: string, message: string): void;
}

class ClientsQueue {
private promises: Map<string, Promise<PahoClient | undefined>> = new Map();
private promises = new Map<string, Promise<PahoClient | undefined>>();

async get(
clientId: string,
Expand All @@ -78,6 +81,7 @@ class ClientsQueue {
const newPromise = clientFactory(clientId);
this.promises.set(clientId, newPromise);
newPromise.catch(() => this.promises.delete(clientId));

return newPromise;
}

Expand Down Expand Up @@ -146,9 +150,8 @@ export class MqttOverWS extends AbstractPubSub<MqttOptions> {
}

protected get isSSLEnabled() {
return !this.options[
'aws_appsync_dangerously_connect_to_http_endpoint_for_testing'
];
return !this.options
.aws_appsync_dangerously_connect_to_http_endpoint_for_testing;
}

public onDisconnect({
Expand Down Expand Up @@ -177,7 +180,8 @@ export class MqttOverWS extends AbstractPubSub<MqttOptions> {
logger.debug('Creating new MQTT client', clientId);

this.connectionStateMonitor.record(CONNECTION_CHANGE.OPENING_CONNECTION);
// @ts-ignore
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore this module is expected to not have declaration file
const client = new Paho.Client(url, clientId) as PahoClient;

client.onMessageArrived = ({
Expand All @@ -199,11 +203,13 @@ export class MqttOverWS extends AbstractPubSub<MqttOptions> {
this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED);
};

const connected = await new Promise((resolve, reject) => {
const connected = await new Promise((resolve, _reject) => {
client.connect({
useSSL: this.isSSLEnabled,
mqttVersion: 3,
onSuccess: () => resolve(true),
onSuccess: () => {
resolve(true);
},
onFailure: () => {
if (clientId) this._clientsQueue.remove(clientId);
this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED);
Expand All @@ -225,8 +231,11 @@ export class MqttOverWS extends AbstractPubSub<MqttOptions> {
clientId: string,
options: MqttOptions = {},
): Promise<PahoClient | undefined> {
return await this.clientsQueue.get(clientId, async clientId => {
const client = await this.newClient({ ...options, clientId });
return this.clientsQueue.get(clientId, async inputClientId => {
const client = await this.newClient({
...options,
clientId: inputClientId,
});

if (client) {
// Once connected, subscribe to all topics registered observers
Expand All @@ -236,6 +245,7 @@ export class MqttOverWS extends AbstractPubSub<MqttOptions> {
},
);
}

return client;
});
}
Expand All @@ -258,7 +268,9 @@ export class MqttOverWS extends AbstractPubSub<MqttOptions> {

if (client) {
logger.debug('Publishing to topic(s)', targetTopics.join(','), message);
targetTopics.forEach(topic => client.send(topic, msg));
targetTopics.forEach(topic => {
client.send(topic, msg);
});
} else {
logger.debug(
'Publishing to topic(s) failed',
Expand All @@ -268,11 +280,9 @@ export class MqttOverWS extends AbstractPubSub<MqttOptions> {
}
}

protected _topicObservers: Map<string, Set<PubSubContentObserver>> =
new Map();
protected _topicObservers = new Map<string, Set<PubSubContentObserver>>();

protected _clientIdObservers: Map<string, Set<PubSubContentObserver>> =
new Map();
protected _clientIdObservers = new Map<string, Set<PubSubContentObserver>>();

private _onMessage(topic: string, msg: string) {
try {
Expand All @@ -285,12 +295,13 @@ export class MqttOverWS extends AbstractPubSub<MqttOptions> {
const parsedMessage: PubSubContent = JSON.parse(msg);

if (typeof parsedMessage === 'object') {
// @ts-ignore
parsedMessage[topicSymbol] = topic;
}

matchedTopicObservers.forEach(observersForTopic => {
observersForTopic.forEach(observer => observer.next(parsedMessage));
observersForTopic.forEach(observer => {
observer.next(parsedMessage);
});
});
} catch (error) {
logger.warn('Error handling message', error, msg);
Expand Down Expand Up @@ -350,9 +361,11 @@ export class MqttOverWS extends AbstractPubSub<MqttOptions> {
await getClient();

// Add an observable to the reconnection list to manage reconnection for this subscription
reconnectSubscription = new Observable(observer => {
this.reconnectionMonitor.addObserver(observer);
}).subscribe(() => {
reconnectSubscription = new Observable(
reconnectSubscriptionObserver => {
this.reconnectionMonitor.addObserver(reconnectSubscriptionObserver);
},
).subscribe(() => {
getClient();
});
})();
Expand Down
6 changes: 4 additions & 2 deletions packages/pubsub/src/Providers/PubSub.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import { Observable } from 'rxjs';
import { ConsoleLogger } from '@aws-amplify/core';

import {
PubSubBase,
PubSubOptions,
PubSubContent,
PubSubOptions,
PublishInput,
SubscribeInput,
} from '../types/PubSub';
import { ConsoleLogger } from '@aws-amplify/core';

const logger = new ConsoleLogger('AbstractPubSubProvider');

export abstract class AbstractPubSub<T extends PubSubOptions>
Expand Down
13 changes: 7 additions & 6 deletions packages/pubsub/src/types/PubSub.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import { Observer, Observable } from 'rxjs';
import { Observable, Observer } from 'rxjs';

export interface SubscriptionObserver<T> {
closed: boolean;
next(value: T): void;
Expand Down Expand Up @@ -52,7 +53,7 @@ export enum ConnectionState {
ConnectedPendingKeepAlive = 'ConnectedPendingKeepAlive',
}

export type PubSubContent = Record<string, unknown>;
export type PubSubContent = Record<string | symbol, unknown>;
export type PubSubContentObserver = Observer<PubSubContent>;

export interface PubSubOptions {
Expand All @@ -69,13 +70,13 @@ export interface PubSubBase {
subscribe(input: SubscribeInput): Observable<PubSubContent>;
}

export type PublishInput = {
export interface PublishInput {
topics: string[] | string;
message: PubSubContent;
options?: PubSubOptions;
};
}

export type SubscribeInput = {
export interface SubscribeInput {
topics: string[] | string;
options?: PubSubOptions;
};
}
10 changes: 7 additions & 3 deletions packages/pubsub/src/utils/ConnectionStateMonitor.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

import { Observable, Observer, SubscriptionLike, map, filter } from 'rxjs';
import { Observable, Observer, SubscriptionLike, filter, map } from 'rxjs';

Check warning on line 4 in packages/pubsub/src/utils/ConnectionStateMonitor.ts

View workflow job for this annotation

GitHub Actions / e2e / unit-tests / Unit Test - @aws-amplify/pubsub

Deprecated: Use a closure instead of a `thisArg`. Signatures accepting a `thisArg` will be removed in v8

import { ConnectionState } from '../types/PubSub';

import { ReachabilityMonitor } from './ReachabilityMonitor';

// Internal types for tracking different connection states
type LinkedConnectionState = 'connected' | 'disconnected';
type LinkedHealthState = 'healthy' | 'unhealthy';
type LinkedConnectionStates = {
interface LinkedConnectionStates {
networkState: LinkedConnectionState;
connectionState: LinkedConnectionState | 'connecting';
intendedConnectionState: LinkedConnectionState;
keepAliveState: LinkedHealthState;
};
}

export const CONNECTION_CHANGE: {
[key in
Expand Down Expand Up @@ -130,6 +132,7 @@ export class ConnectionStateMonitor {
filter(current => {
const toInclude = current !== previous;
previous = current;

return toInclude;
}),
);
Expand Down Expand Up @@ -194,6 +197,7 @@ export class ConnectionStateMonitor {
// All remaining states directly correspond to the connection state
if (connectionState === 'connecting') return ConnectionState.Connecting;
if (connectionState === 'disconnected') return ConnectionState.Disconnected;

return ConnectionState.Connected;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

import { Reachability } from '@aws-amplify/core/internals/utils';
import { default as NetInfo } from '@react-native-community/netinfo';
import NetInfo from '@react-native-community/netinfo';

export const ReachabilityMonitor = () =>
new Reachability().networkMonitor(NetInfo);
1 change: 1 addition & 0 deletions packages/pubsub/src/utils/ReconnectionMonitor.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import { Observer } from 'rxjs';

import { RECONNECT_DELAY, RECONNECT_INTERVAL } from '../Providers/constants';

export enum ReconnectEvent {
Expand Down
Loading

0 comments on commit 14c51e1

Please sign in to comment.