Skip to content

Commit

Permalink
Try to fix reconnects when downloading.
Browse files Browse the repository at this point in the history
  • Loading branch information
painor committed Aug 24, 2024
1 parent 1094d12 commit b0d317c
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 28 deletions.
2 changes: 1 addition & 1 deletion gramjs/Version.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export const version = "2.23.8";
export const version = "2.24.8";
1 change: 1 addition & 0 deletions gramjs/client/TelegramClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1416,6 +1416,7 @@ export class TelegramClient extends TelegramBaseClient {
client: this,
securityChecks: this._securityChecks,
autoReconnectCallback: this._handleReconnect.bind(this),
_exportedSenderPromises: this._exportedSenderPromises,
});
}

Expand Down
13 changes: 11 additions & 2 deletions gramjs/client/telegramBaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ export abstract class TelegramBaseClient {
[ReturnType<typeof setTimeout>, Api.TypeUpdate[]]
>();
/** @hidden */
private _exportedSenderPromises = new Map<number, Promise<MTProtoSender>>();
public _exportedSenderPromises = new Map<number, Promise<MTProtoSender>>();
/** @hidden */
private _exportedSenderReleaseTimeouts = new Map<
number,
Expand Down Expand Up @@ -540,7 +540,15 @@ export abstract class TelegramBaseClient {
dcId,
setTimeout(() => {
this._exportedSenderReleaseTimeouts.delete(dcId);
sender.disconnect();
if (sender._pendingState.values().length) {
console.log(
"sender already has some hanging states. reconnecting"
);
sender._reconnect();
this._borrowExportedSender(dcId, false, sender);
} else {
sender.disconnect();
}
}, EXPORTED_SENDER_RELEASE_TIMEOUT)
);

Expand All @@ -561,6 +569,7 @@ export abstract class TelegramBaseClient {
onConnectionBreak: this._cleanupExportedSender.bind(this),
client: this as unknown as TelegramClient,
securityChecks: this._securityChecks,
_exportedSenderPromises: this._exportedSenderPromises,
});
}

Expand Down
4 changes: 2 additions & 2 deletions gramjs/extensions/MessagePacker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const USE_INVOKE_AFTER_WITH = new Set([

export class MessagePacker {
private _state: MTProtoState;
private _pendingStates: RequestState[];
public _pendingStates: RequestState[];
private _queue: any[];
private _ready: Promise<unknown>;
private setReady: ((value?: any) => void) | undefined;
Expand Down Expand Up @@ -79,7 +79,7 @@ export class MessagePacker {
this._pendingStates.push(state);
state
.promise! // Using finally causes triggering `unhandledrejection` event
.catch(() => {})
.catch((err) => {})
.finally(() => {
this._pendingStates = this._pendingStates.filter(
(s) => s !== state
Expand Down
64 changes: 47 additions & 17 deletions gramjs/network/MTProtoSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ interface DEFAULT_OPTIONS {
client: TelegramClient;
onConnectionBreak?: CallableFunction;
securityChecks: boolean;
_exportedSenderPromises: Map<number, Promise<MTProtoSender>>;
}

export class MTProtoSender {
Expand Down Expand Up @@ -94,7 +95,7 @@ export class MTProtoSender {
readonly authKey: AuthKey;
private readonly _state: MTProtoState;
private _sendQueue: MessagePacker;
private _pendingState: PendingState;
_pendingState: PendingState;
private readonly _pendingAck: Set<any>;
private readonly _lastAcks: any[];
private readonly _handlers: any;
Expand All @@ -108,6 +109,7 @@ export class MTProtoSender {
private _cancelSend: boolean;
cancellableRecvLoopPromise?: CancellablePromise<any>;
private _finishedConnecting: boolean;
private _exportedSenderPromises = new Map<number, Promise<MTProtoSender>>();

/**
* @param authKey
Expand Down Expand Up @@ -137,7 +139,7 @@ export class MTProtoSender {
this._securityChecks = args.securityChecks;

this._connectMutex = new Mutex();

this._exportedSenderPromises = args._exportedSenderPromises;
/**
* whether we disconnected ourself or telegram did it.
*/
Expand Down Expand Up @@ -404,6 +406,7 @@ export class MTProtoSender {
"Connection to %s complete!".replace("%s", connection.toString())
);
}

async _disconnect() {
const connection = this._connection;
if (this._updateCallback) {
Expand Down Expand Up @@ -494,17 +497,6 @@ export class MTProtoSender {
);

data = await this._state.encryptMessageData(data);

try {
await this._connection!.send(data);
} catch (e) {
this._log.debug(`Connection closed while sending data ${e}`);
if (this._log.canSend(LogLevel.DEBUG)) {
console.error(e);
}
this._sendLoopHandle = undefined;
return;
}
for (const state of batch) {
if (!Array.isArray(state)) {
if (state.request.classType === "request") {
Expand All @@ -518,6 +510,17 @@ export class MTProtoSender {
}
}
}
try {
await this._connection!.send(data);
} catch (e) {
this._log.debug(`Connection closed while sending data ${e}`);
if (this._log.canSend(LogLevel.DEBUG)) {
console.error(e);
}
this._sendLoopHandle = undefined;
return;
}

this._log.debug("Encrypted messages put in a queue to be sent");
}

Expand Down Expand Up @@ -639,6 +642,7 @@ export class MTProtoSender {
this._onConnectionBreak(this._dcId);
}
}

/**
* Adds the given message to the list of messages that must be
* acknowledged and dispatches control to different ``_handle_*``
Expand Down Expand Up @@ -696,6 +700,7 @@ export class MTProtoSender {

return [];
}

/**
* Handles the result for Remote Procedure Calls:
* rpc_result#f35c6d01 req_msg_id:long result:bytes = RpcResult;
Expand Down Expand Up @@ -744,9 +749,7 @@ export class MTProtoSender {
try {
const reader = new BinaryReader(result.body);
const read = state.request.readResult(reader);
this._log.debug(
`Handling RPC result ${read?.constructor?.name}`
);
this._log.debug(`Handling RPC result ${read?.className}`);
state.resolve(read);
} catch (err) {
state.reject(err);
Expand Down Expand Up @@ -968,9 +971,26 @@ export class MTProtoSender {
* @private
*/
async _handleMsgAll(message: TLMessage) {}

reconnect() {
if (this._userConnected && !this.isReconnecting) {
this.isReconnecting = true;
if (this._isMainSender) {
this._log.debug("Reconnecting all senders");
for (const promise of this._exportedSenderPromises.values()) {
promise
.then((sender) => {
if (sender && !sender._isMainSender) {
sender.reconnect();
}
})
.catch((error) => {
this._log.warn(
"Error getting sender to reconnect to"
);
});
}
}
// we want to wait a second between each reconnect try to not flood the server with reconnects
// in case of internal server issues.
sleep(1000).then(() => {
Expand All @@ -981,7 +1001,6 @@ export class MTProtoSender {
}

async _reconnect() {
this._log.debug("Closing current connection...");
try {
this._log.warn("[Reconnect] Closing current connection...");
await this._disconnect();
Expand All @@ -994,6 +1013,17 @@ export class MTProtoSender {
console.error(err);
}
}
this._log.debug(
`Adding ${this._sendQueue._pendingStates.length} old request to resend`
);
for (let i = 0; i < this._sendQueue._pendingStates.length; i++) {
if (this._sendQueue._pendingStates[i].msgId != undefined) {
this._pendingState.set(
this._sendQueue._pendingStates[i].msgId!,
this._sendQueue._pendingStates[i]
);
}
}

this._sendQueue.clear();
this._state.reset();
Expand Down
2 changes: 1 addition & 1 deletion gramjs/network/connection/TCPFull.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export class FullPacketCodec extends PacketCodec {
// # It has been observed that the length and seq can be -429,
// # followed by the body of 4 bytes also being -429.
// # See https://github.com/LonamiWebs/Telethon/issues/4042.
const body = await reader.readExactly(4)
const body = await reader.readExactly(4);
throw new InvalidBufferError(body);
}
let body = await reader.readExactly(packetLen - 8);
Expand Down
4 changes: 3 additions & 1 deletion gramjs/sessions/StoreSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ export class StoreSession extends MemorySession {
constructor(sessionName: string, divider = ":") {
super();
if (sessionName === "session") {
throw new Error("Session name can't be 'session'. Please use a different name.");
throw new Error(
"Session name can't be 'session'. Please use a different name."
);
}
if (typeof localStorage === "undefined" || localStorage === null) {
const LocalStorage = require("./localStorage").LocalStorage;
Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "telegram",
"version": "2.23.8",
"version": "2.24.9",
"description": "NodeJS/Browser MTProto API Telegram client library,",
"main": "index.js",
"types": "index.d.ts",
Expand Down Expand Up @@ -70,4 +70,4 @@
"node-localstorage": "^2.2.1",
"socks": "^2.6.2"
}
}
}

0 comments on commit b0d317c

Please sign in to comment.