diff --git a/gramjs/Version.ts b/gramjs/Version.ts index ee30760e..595b1faa 100644 --- a/gramjs/Version.ts +++ b/gramjs/Version.ts @@ -1 +1 @@ -export const version = "2.23.8"; +export const version = "2.24.8"; diff --git a/gramjs/client/TelegramClient.ts b/gramjs/client/TelegramClient.ts index ddd8eb8d..ca8cff73 100644 --- a/gramjs/client/TelegramClient.ts +++ b/gramjs/client/TelegramClient.ts @@ -1416,6 +1416,7 @@ export class TelegramClient extends TelegramBaseClient { client: this, securityChecks: this._securityChecks, autoReconnectCallback: this._handleReconnect.bind(this), + _exportedSenderPromises: this._exportedSenderPromises, }); } diff --git a/gramjs/client/telegramBaseClient.ts b/gramjs/client/telegramBaseClient.ts index c476584f..bd611119 100644 --- a/gramjs/client/telegramBaseClient.ts +++ b/gramjs/client/telegramBaseClient.ts @@ -212,7 +212,7 @@ export abstract class TelegramBaseClient { [ReturnType, Api.TypeUpdate[]] >(); /** @hidden */ - private _exportedSenderPromises = new Map>(); + public _exportedSenderPromises = new Map>(); /** @hidden */ private _exportedSenderReleaseTimeouts = new Map< number, @@ -540,7 +540,15 @@ export abstract class TelegramBaseClient { dcId, setTimeout(() => { this._exportedSenderReleaseTimeouts.delete(dcId); - sender.disconnect(); + if (sender._pendingState.values().length) { + console.log( + "sender already has some hanging states. reconnecting" + ); + sender._reconnect(); + this._borrowExportedSender(dcId, false, sender); + } else { + sender.disconnect(); + } }, EXPORTED_SENDER_RELEASE_TIMEOUT) ); @@ -561,6 +569,7 @@ export abstract class TelegramBaseClient { onConnectionBreak: this._cleanupExportedSender.bind(this), client: this as unknown as TelegramClient, securityChecks: this._securityChecks, + _exportedSenderPromises: this._exportedSenderPromises, }); } diff --git a/gramjs/extensions/MessagePacker.ts b/gramjs/extensions/MessagePacker.ts index df14f9ce..6403e83a 100644 --- a/gramjs/extensions/MessagePacker.ts +++ b/gramjs/extensions/MessagePacker.ts @@ -13,7 +13,7 @@ const USE_INVOKE_AFTER_WITH = new Set([ export class MessagePacker { private _state: MTProtoState; - private _pendingStates: RequestState[]; + public _pendingStates: RequestState[]; private _queue: any[]; private _ready: Promise; private setReady: ((value?: any) => void) | undefined; @@ -79,7 +79,7 @@ export class MessagePacker { this._pendingStates.push(state); state .promise! // Using finally causes triggering `unhandledrejection` event - .catch(() => {}) + .catch((err) => {}) .finally(() => { this._pendingStates = this._pendingStates.filter( (s) => s !== state diff --git a/gramjs/network/MTProtoSender.ts b/gramjs/network/MTProtoSender.ts index afe5e968..09a796ab 100644 --- a/gramjs/network/MTProtoSender.ts +++ b/gramjs/network/MTProtoSender.ts @@ -53,6 +53,7 @@ interface DEFAULT_OPTIONS { client: TelegramClient; onConnectionBreak?: CallableFunction; securityChecks: boolean; + _exportedSenderPromises: Map>; } export class MTProtoSender { @@ -94,7 +95,7 @@ export class MTProtoSender { readonly authKey: AuthKey; private readonly _state: MTProtoState; private _sendQueue: MessagePacker; - private _pendingState: PendingState; + _pendingState: PendingState; private readonly _pendingAck: Set; private readonly _lastAcks: any[]; private readonly _handlers: any; @@ -108,6 +109,7 @@ export class MTProtoSender { private _cancelSend: boolean; cancellableRecvLoopPromise?: CancellablePromise; private _finishedConnecting: boolean; + private _exportedSenderPromises = new Map>(); /** * @param authKey @@ -137,7 +139,7 @@ export class MTProtoSender { this._securityChecks = args.securityChecks; this._connectMutex = new Mutex(); - + this._exportedSenderPromises = args._exportedSenderPromises; /** * whether we disconnected ourself or telegram did it. */ @@ -404,6 +406,7 @@ export class MTProtoSender { "Connection to %s complete!".replace("%s", connection.toString()) ); } + async _disconnect() { const connection = this._connection; if (this._updateCallback) { @@ -494,17 +497,6 @@ export class MTProtoSender { ); data = await this._state.encryptMessageData(data); - - try { - await this._connection!.send(data); - } catch (e) { - this._log.debug(`Connection closed while sending data ${e}`); - if (this._log.canSend(LogLevel.DEBUG)) { - console.error(e); - } - this._sendLoopHandle = undefined; - return; - } for (const state of batch) { if (!Array.isArray(state)) { if (state.request.classType === "request") { @@ -518,6 +510,17 @@ export class MTProtoSender { } } } + try { + await this._connection!.send(data); + } catch (e) { + this._log.debug(`Connection closed while sending data ${e}`); + if (this._log.canSend(LogLevel.DEBUG)) { + console.error(e); + } + this._sendLoopHandle = undefined; + return; + } + this._log.debug("Encrypted messages put in a queue to be sent"); } @@ -639,6 +642,7 @@ export class MTProtoSender { this._onConnectionBreak(this._dcId); } } + /** * Adds the given message to the list of messages that must be * acknowledged and dispatches control to different ``_handle_*`` @@ -696,6 +700,7 @@ export class MTProtoSender { return []; } + /** * Handles the result for Remote Procedure Calls: * rpc_result#f35c6d01 req_msg_id:long result:bytes = RpcResult; @@ -744,9 +749,7 @@ export class MTProtoSender { try { const reader = new BinaryReader(result.body); const read = state.request.readResult(reader); - this._log.debug( - `Handling RPC result ${read?.constructor?.name}` - ); + this._log.debug(`Handling RPC result ${read?.className}`); state.resolve(read); } catch (err) { state.reject(err); @@ -968,9 +971,26 @@ export class MTProtoSender { * @private */ async _handleMsgAll(message: TLMessage) {} + reconnect() { if (this._userConnected && !this.isReconnecting) { this.isReconnecting = true; + if (this._isMainSender) { + this._log.debug("Reconnecting all senders"); + for (const promise of this._exportedSenderPromises.values()) { + promise + .then((sender) => { + if (sender && !sender._isMainSender) { + sender.reconnect(); + } + }) + .catch((error) => { + this._log.warn( + "Error getting sender to reconnect to" + ); + }); + } + } // we want to wait a second between each reconnect try to not flood the server with reconnects // in case of internal server issues. sleep(1000).then(() => { @@ -981,7 +1001,6 @@ export class MTProtoSender { } async _reconnect() { - this._log.debug("Closing current connection..."); try { this._log.warn("[Reconnect] Closing current connection..."); await this._disconnect(); @@ -994,6 +1013,17 @@ export class MTProtoSender { console.error(err); } } + this._log.debug( + `Adding ${this._sendQueue._pendingStates.length} old request to resend` + ); + for (let i = 0; i < this._sendQueue._pendingStates.length; i++) { + if (this._sendQueue._pendingStates[i].msgId != undefined) { + this._pendingState.set( + this._sendQueue._pendingStates[i].msgId!, + this._sendQueue._pendingStates[i] + ); + } + } this._sendQueue.clear(); this._state.reset(); diff --git a/gramjs/network/connection/TCPFull.ts b/gramjs/network/connection/TCPFull.ts index 93d12395..8599d877 100644 --- a/gramjs/network/connection/TCPFull.ts +++ b/gramjs/network/connection/TCPFull.ts @@ -44,7 +44,7 @@ export class FullPacketCodec extends PacketCodec { // # It has been observed that the length and seq can be -429, // # followed by the body of 4 bytes also being -429. // # See https://github.com/LonamiWebs/Telethon/issues/4042. - const body = await reader.readExactly(4) + const body = await reader.readExactly(4); throw new InvalidBufferError(body); } let body = await reader.readExactly(packetLen - 8); diff --git a/gramjs/sessions/StoreSession.ts b/gramjs/sessions/StoreSession.ts index 4303ad2b..6de6dcad 100644 --- a/gramjs/sessions/StoreSession.ts +++ b/gramjs/sessions/StoreSession.ts @@ -10,7 +10,9 @@ export class StoreSession extends MemorySession { constructor(sessionName: string, divider = ":") { super(); if (sessionName === "session") { - throw new Error("Session name can't be 'session'. Please use a different name."); + throw new Error( + "Session name can't be 'session'. Please use a different name." + ); } if (typeof localStorage === "undefined" || localStorage === null) { const LocalStorage = require("./localStorage").LocalStorage; diff --git a/package-lock.json b/package-lock.json index 18b55278..ab6b1fca 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "telegram", - "version": "2.23.8", + "version": "2.24.9", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "telegram", - "version": "2.23.8", + "version": "2.24.9", "license": "MIT", "dependencies": { "@cryptography/aes": "^0.1.1", diff --git a/package.json b/package.json index b8c0419e..d3c8897d 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "telegram", - "version": "2.23.8", + "version": "2.24.9", "description": "NodeJS/Browser MTProto API Telegram client library,", "main": "index.js", "types": "index.d.ts", @@ -70,4 +70,4 @@ "node-localstorage": "^2.2.1", "socks": "^2.6.2" } -} \ No newline at end of file +}