From df94933b70c89bd23c04cecaae97c03f1f72ae45 Mon Sep 17 00:00:00 2001 From: Isaque Neves Date: Tue, 9 Apr 2024 17:26:03 -0300 Subject: [PATCH] Fix bugs in lost connection detection to automatically reconnect. Updated postgres to 3.1.2 to be able to use the onOpen callback to configure connection settings like setting search path --- CHANGELOG.md | 4 ++ lib/src/capsule/manager.dart | 13 +++-- lib/src/connection.dart | 29 +++++----- lib/src/connectors/postgres_connector.dart | 45 +++++++++------- lib/src/database_manager.dart | 16 +++--- lib/src/detects_lost_connections.dart | 5 +- lib/src/pdo/postgres/postgres_pdo.dart | 18 ++++++- lib/src/pdo/postgres_v3/postgres_v3_pdo.dart | 57 +++++++++++++++++--- lib/src/utils/dsn_parser.dart | 14 ++++- pubspec.yaml | 4 +- 10 files changed, 141 insertions(+), 64 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 022b2b6..279a54c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -77,3 +77,7 @@ final manager = Manager(); ## 3.0.1 - fix bug on query builder count() + +## 3.1.2 + +- Fix bugs in lost connection detection to automatically reconnect. Updated postgres to 3.1.2 to be able to use the onOpen callback to configure connection settings like setting search path \ No newline at end of file diff --git a/lib/src/capsule/manager.dart b/lib/src/capsule/manager.dart index d9d2e9b..c197d99 100644 --- a/lib/src/capsule/manager.dart +++ b/lib/src/capsule/manager.dart @@ -109,8 +109,8 @@ class Manager { /// @param string $connection /// @return \Illuminate\Database\Connection /// - Future connection([String? connection]) { - return instance!.getConnection(connection); + Future connection([String? connection]) async { + return await instance!.getConnection(connection); } /// @@ -121,7 +121,7 @@ class Manager { /// @return \Illuminate\Database\Query\Builder /// Future table(String table, [String? connectionP]) async { - var com = await instance!.connection(connectionP); + final com = await instance!.connection(connectionP); return com.table(table); } @@ -132,7 +132,7 @@ class Manager { /// @return \Illuminate\Database\Schema\Builder /// Future schema([String? connectionP]) async { - var com = await instance!.connection(connectionP); + final com = await instance!.connection(connectionP); return com.getSchemaBuilder(); } @@ -142,8 +142,8 @@ class Manager { /// @param string $name /// @return \Illuminate\Database\Connection /// - Future getConnection([String? name]) { - return this.manager.connection(name); + Future getConnection([String? name]) async { + return await this.manager.connection(name); } /// @@ -187,7 +187,6 @@ class Manager { /// Manager setFetchMode(int fetchMode) { this.container['config']['database.fetch'] = fetchMode; - return this; } diff --git a/lib/src/connection.dart b/lib/src/connection.dart index 90fa4e2..7c898d9 100644 --- a/lib/src/connection.dart +++ b/lib/src/connection.dart @@ -375,7 +375,7 @@ class Connection with DetectsLostConnections implements ConnectionInterface { return [res]; }, timeoutInSeconds); - return resp ; + return resp; } /// @@ -577,16 +577,16 @@ class Connection with DetectsLostConnections implements ConnectionInterface { // Here we will run this query. If an exception occurs we'll determine if it was // caused by a connection that has been lost. If that is the cause, we'll try // to re-establish connection and re-run the query with a fresh connection. - // try { - // print('Connection@run'); - result = await this - .runQueryCallback(query, bindings, callback, timeoutInSeconds); - //print('Connection@run $result'); - // } catch (e) { - // //print('Connection@run error'); - // result = await this - // .tryAgainIfCausedByLostConnection(e, query, bindings, callback); - // } + try { + // print('Connection@run'); + result = await this + .runQueryCallback(query, bindings, callback, timeoutInSeconds); + //print('Connection@run $result'); + } catch (e) { + //print('Connection@run error $e'); + result = await this + .tryAgainIfCausedByLostConnection(e, query, bindings, callback, timeoutInSeconds); + } // Once we have run the query we will calculate the time that it took to run and // then log the query, bindings, and execution time so we will report them on @@ -652,19 +652,18 @@ class Connection with DetectsLostConnections implements ConnectionInterface { Connection, String, dynamic, int? timeoutInSeconds) callback, int? timeoutInSeconds, - {int delay = 2000}) async { + {int delay = 1000}) async { if (this.causedByLostConnection(e) && tryReconnectCount < tryReconnectLimit) { await Future.delayed(Duration(milliseconds: delay)); - //print('Eloquent@tryAgainIfCausedByLostConnection try reconnect...'); + // print('Eloquent@tryAgainIfCausedByLostConnection try reconnect...'); tryReconnectLimit++; await this.reconnect(); - await Future.delayed(Duration(milliseconds: 1000)); tryReconnectLimit = 0; return await this .runQueryCallback(query, bindings, callback, timeoutInSeconds); } - //print('tryAgainIfCausedByLostConnection'); + // print('tryAgainIfCausedByLostConnection'); throw e; } diff --git a/lib/src/connectors/postgres_connector.dart b/lib/src/connectors/postgres_connector.dart index fd4f386..fde72f3 100644 --- a/lib/src/connectors/postgres_connector.dart +++ b/lib/src/connectors/postgres_connector.dart @@ -37,34 +37,33 @@ class PostgresConnector extends Connector implements ConnectorInterface { // and if it has we will issue a statement to modify the timezone with the // database. Setting this DB timezone is an optional configuration item. - if (config.containsKey('timezone') && config['timezone'] != null) { - var timezone = config['timezone']; - await connection.execute("set time zone '$timezone'"); - } + // if (config.containsKey('timezone') && config['timezone'] != null) { + // var timezone = config['timezone']; + // await connection.execute("set time zone '$timezone'"); + // } // Unlike MySQL, Postgres allows the concept of "schema" and a default schema // may have been specified on the connections. If that is the case we will // set the default schema search paths to the specified database schema. - if (config.containsKey('schema') && config['schema'] != null) { - var schema = formatSchema(config['schema']); - - await connection.execute("set search_path to $schema"); - } + // if (config.containsKey('schema') && config['schema'] != null) { + // var schema = formatSchema(config['schema']); + // await connection.execute("set search_path to $schema"); + // } // Postgres allows an application_name to be set by the user and this name is // used to when monitoring the application with pg_stat_activity. So we'll // determine if the option has been specified and run a statement if so. - if (config.containsKey('application_name') && - config['application_name'] != null) { - var applicationName = config['application_name']; - try { - await connection.execute("set application_name to '$applicationName'"); - } catch (e) { - print( - 'Eloquent: Unable to set the application_name for this PostgreSQL driver.'); - } - } + // if (config.containsKey('application_name') && + // config['application_name'] != null) { + // var applicationName = config['application_name']; + // try { + // await connection.execute("set application_name to '$applicationName'"); + // } catch (e) { + // print( + // 'Eloquent: Unable to set the application_name for this PostgreSQL driver.'); + // } + // } return connection; } @@ -115,6 +114,14 @@ class PostgresConnector extends Connector implements ConnectorInterface { dsn += ";application_name=${config['application_name']}"; } + if (config['schema'] != null) { + dsn += ";schema=${config['schema']}"; + } + + if (config['timezone'] != null) { + dsn += ";timezone=${config['timezone']}"; + } + return dsn; } diff --git a/lib/src/database_manager.dart b/lib/src/database_manager.dart index 498eea1..18befee 100644 --- a/lib/src/database_manager.dart +++ b/lib/src/database_manager.dart @@ -49,6 +49,7 @@ class DatabaseManager implements ConnectionResolverInterface { Future connection([String? nameP]) async { var re = this.parseConnectionName(nameP); + var name = re[0]; var type = re[1]; // If we haven't created this connection, we'll create it based on the config @@ -56,7 +57,7 @@ class DatabaseManager implements ConnectionResolverInterface { // set the "fetch mode" for PDO which determines the query return types. if (!Utils.isset(this.connectionsProp[name])) { - var connection = await this.makeConnection(name); + final connection = await this.makeConnection(name); this.setPdoForType(connection, type); @@ -83,13 +84,12 @@ class DatabaseManager implements ConnectionResolverInterface { /// /// Disconnect from the given database and remove from local cache. /// - /// @param string $name + /// [name] Connection name /// @return void /// - void purge([String? name]) { - this.disconnect(name); - this.connectionsProp.remove(name); - //unset(this.connectionsProp[name]); + Future purge([String? name]) async { + await this.disconnect(name); + this.connectionsProp.remove(name); } /// @@ -100,7 +100,7 @@ class DatabaseManager implements ConnectionResolverInterface { /// Future disconnect([String? name]) async { name = name ?? this.getDefaultConnection(); - + if (Utils.isset(this.connectionsProp[name])) { await this.connectionsProp[name].disconnect(); } @@ -174,8 +174,6 @@ class DatabaseManager implements ConnectionResolverInterface { /// @return \Illuminate\Database\Connection /// Connection prepare(Connection connection) { - - if (this.app.bound('events')) { connection.setEventDispatcher(this.app['events']); } diff --git a/lib/src/detects_lost_connections.dart b/lib/src/detects_lost_connections.dart index 58d3cfd..3a445bf 100644 --- a/lib/src/detects_lost_connections.dart +++ b/lib/src/detects_lost_connections.dart @@ -12,10 +12,11 @@ mixin DetectsLostConnections { //TODO revise isso para outros cenĂ¡rios final isR = Utils.string_contains(message, [ '57P',//for posgresql restart - // 'Can't create a connection', + "Can't create a connection", 'Connection is closed', + 'connection is not open', ]); - print('causedByLostConnection $isR'); + return isR; } } diff --git a/lib/src/pdo/postgres/postgres_pdo.dart b/lib/src/pdo/postgres/postgres_pdo.dart index 7da2fc0..18d15a2 100644 --- a/lib/src/pdo/postgres/postgres_pdo.dart +++ b/lib/src/pdo/postgres/postgres_pdo.dart @@ -61,10 +61,24 @@ class PostgresPDO extends PDOInterface { dsnParser.database, username: user, password: password, - encoding: _getEncoding(dsnParser.charset ?? 'utf8'), + encoding: _getEncoding(dsnParser.charset ?? 'utf8'), ); await connection.open(); - await connection.query('''SET client_encoding = '${dsnParser.charset}';'''); + + if (dsnParser.charset != null) { + await connection.execute("SET client_encoding = '${dsnParser.charset}';"); + } + if (dsnParser.schema != null) { + await connection.execute("SET search_path TO '${dsnParser.schema}';"); + } + if (dsnParser.timezone != null) { + await connection.execute("SET time zone '${dsnParser.timezone};'"); + } + if (dsnParser.applicationName != null) { + await connection + .execute("SET application_name TO '${dsnParser.applicationName};'"); + } + return this; } diff --git a/lib/src/pdo/postgres_v3/postgres_v3_pdo.dart b/lib/src/pdo/postgres_v3/postgres_v3_pdo.dart index ea6fde5..33a6144 100644 --- a/lib/src/pdo/postgres_v3/postgres_v3_pdo.dart +++ b/lib/src/pdo/postgres_v3/postgres_v3_pdo.dart @@ -73,23 +73,60 @@ class PostgresV3PDO extends PDOInterface { connection = Pool.withEndpoints( [endpoint], settings: PoolSettings( + applicationName: dsnParser.applicationName, + timeZone: dsnParser.timezone, + onOpen: (conn) async { + if (dsnParser.charset != null) { + await conn + .execute("SET client_encoding = '${dsnParser.charset}';"); + } + if (dsnParser.schema != null) { + await conn.execute("SET search_path TO '${dsnParser.schema}';"); + } + if (dsnParser.timezone != null) { + await conn.execute("SET time zone '${dsnParser.timezone};'"); + } + if (dsnParser.applicationName != null) { + await conn.execute( + "SET application_name TO '${dsnParser.applicationName};'"); + } + }, maxConnectionCount: dsnParser.poolSize, encoding: _getEncoding(dsnParser.charset ?? 'utf8'), sslMode: sslMode, ), ); - await (connection as Pool) - .execute('''SET client_encoding = '${dsnParser.charset}';'''); - } else { + //final pool = await (connection as Pool); + } else { connection = await Connection.open(endpoint, settings: ConnectionSettings( + applicationName: dsnParser.applicationName, + timeZone: dsnParser.timezone, + onOpen: (conn) async { + if (dsnParser.charset != null) { + await conn + .execute("SET client_encoding = '${dsnParser.charset}';"); + } + if (dsnParser.schema != null) { + await conn.execute("SET search_path TO '${dsnParser.schema}';"); + } + if (dsnParser.timezone != null) { + await conn.execute("SET time zone '${dsnParser.timezone};'"); + } + if (dsnParser.applicationName != null) { + await conn.execute( + "SET application_name TO '${dsnParser.applicationName};'"); + } + }, encoding: _getEncoding(dsnParser.charset ?? 'utf8'), sslMode: sslMode, )); - await (connection as Connection) - .execute('''SET client_encoding = '${dsnParser.charset}';'''); + // final conn = await (connection as Connection); + // if (dsnParser.charset != null) { + // conn.execute('''SET client_encoding = '${dsnParser.charset}';'''); + // } } return this; @@ -151,13 +188,19 @@ class PostgresV3PDO extends PDOInterface { maps.add(map); } } - + final pdoResult = PDOResults(maps, rs.affectedRows); return pdoResult; } @override Future close() async { - await connection.close(); + // print('postgres_v3_pdo@close isOpen ${(connection).isOpen} '); + if (connection is Connection) { + await (connection as Connection).close(); + } else if (connection is Pool) { + await (connection as Pool).close(); + } + //print('postgres_v3_pdo@close isOpen ${(connection).isOpen} '); } } diff --git a/lib/src/utils/dsn_parser.dart b/lib/src/utils/dsn_parser.dart index 992ca44..ee22eba 100644 --- a/lib/src/utils/dsn_parser.dart +++ b/lib/src/utils/dsn_parser.dart @@ -52,8 +52,10 @@ class DSNParser { bool get allowReconnect => dsnParts['allowreconnect'].toString() == 'true'; String? get applicationName => dsnParts['application_name']; String? get sslmode => dsnParts['sslmode']; - Map? get options => dsnParts['options']; + String? get timezone => dsnParts['timezone']; + String? get schema => dsnParts['schema']; + Map? get options => dsnParts['options']; Map get params => dsnParts['params']; DSNParser(this.dsn, [this.dsnType = DsnType.pdoPostgreSql]) { @@ -130,6 +132,16 @@ class DSNParser { dsnParts['options'] = parts.lastWhere((p) => p.contains('options=')).split('=').last; } + + if (parts.join().contains('timezone=')) { + dsnParts['timezone'] = + parts.lastWhere((p) => p.contains('timezone=')).split('=').last; + } + + if (parts.join().contains('schema=')) { + dsnParts['schema'] = + parts.lastWhere((p) => p.contains('schema=')).split('=').last; + } } else if (dsnType == DsnType.heroku) { var patternString = '^' + '(?:' + diff --git a/pubspec.yaml b/pubspec.yaml index 2d618c2..1d1ebd3 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,5 +1,5 @@ name: eloquent -version: 3.0.1 +version: 3.1.2 description: eloquent query builder port from PHP Laravel homepage: https://github.com/insinfo/eloquent_dart #publish_to: none @@ -27,7 +27,7 @@ dependencies: mysql_client: ^0.0.27 # postgres v3 - postgres: ^3.0.5 + postgres: ^3.1.2 #^3.0.5 dev_dependencies: