Skip to content

Commit

Permalink
Fix bugs in lost connection detection to automatically reconnect. Upd…
Browse files Browse the repository at this point in the history
…ated postgres to 3.1.2 to be able to use the onOpen callback to configure connection settings like setting search path
  • Loading branch information
Isaque Neves committed Apr 9, 2024
1 parent 0f0596b commit df94933
Show file tree
Hide file tree
Showing 10 changed files with 141 additions and 64 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,7 @@ final manager = Manager();
## 3.0.1

- fix bug on query builder count()

## 3.1.2

- Fix bugs in lost connection detection to automatically reconnect. Updated postgres to 3.1.2 to be able to use the onOpen callback to configure connection settings like setting search path
13 changes: 6 additions & 7 deletions lib/src/capsule/manager.dart
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ class Manager {
/// @param string $connection
/// @return \Illuminate\Database\Connection
///
Future<Connection> connection([String? connection]) {
return instance!.getConnection(connection);
Future<Connection> connection([String? connection]) async {
return await instance!.getConnection(connection);
}

///
Expand All @@ -121,7 +121,7 @@ class Manager {
/// @return \Illuminate\Database\Query\Builder
///
Future<QueryBuilder> table(String table, [String? connectionP]) async {
var com = await instance!.connection(connectionP);
final com = await instance!.connection(connectionP);
return com.table(table);
}

Expand All @@ -132,7 +132,7 @@ class Manager {
/// @return \Illuminate\Database\Schema\Builder
///
Future<SchemaBuilder> schema([String? connectionP]) async {
var com = await instance!.connection(connectionP);
final com = await instance!.connection(connectionP);
return com.getSchemaBuilder();
}

Expand All @@ -142,8 +142,8 @@ class Manager {
/// @param string $name
/// @return \Illuminate\Database\Connection
///
Future<Connection> getConnection([String? name]) {
return this.manager.connection(name);
Future<Connection> getConnection([String? name]) async {
return await this.manager.connection(name);
}

///
Expand Down Expand Up @@ -187,7 +187,6 @@ class Manager {
///
Manager setFetchMode(int fetchMode) {
this.container['config']['database.fetch'] = fetchMode;

return this;
}

Expand Down
29 changes: 14 additions & 15 deletions lib/src/connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ class Connection with DetectsLostConnections implements ConnectionInterface {
return [res];
}, timeoutInSeconds);

return resp ;
return resp;
}

///
Expand Down Expand Up @@ -577,16 +577,16 @@ class Connection with DetectsLostConnections implements ConnectionInterface {
// Here we will run this query. If an exception occurs we'll determine if it was
// caused by a connection that has been lost. If that is the cause, we'll try
// to re-establish connection and re-run the query with a fresh connection.
// try {
// print('Connection@run');
result = await this
.runQueryCallback(query, bindings, callback, timeoutInSeconds);
//print('Connection@run $result');
// } catch (e) {
// //print('Connection@run error');
// result = await this
// .tryAgainIfCausedByLostConnection(e, query, bindings, callback);
// }
try {
// print('Connection@run');
result = await this
.runQueryCallback(query, bindings, callback, timeoutInSeconds);
//print('Connection@run $result');
} catch (e) {
//print('Connection@run error $e');
result = await this
.tryAgainIfCausedByLostConnection(e, query, bindings, callback, timeoutInSeconds);
}

// Once we have run the query we will calculate the time that it took to run and
// then log the query, bindings, and execution time so we will report them on
Expand Down Expand Up @@ -652,19 +652,18 @@ class Connection with DetectsLostConnections implements ConnectionInterface {
Connection, String, dynamic, int? timeoutInSeconds)
callback,
int? timeoutInSeconds,
{int delay = 2000}) async {
{int delay = 1000}) async {
if (this.causedByLostConnection(e) &&
tryReconnectCount < tryReconnectLimit) {
await Future.delayed(Duration(milliseconds: delay));
//print('Eloquent@tryAgainIfCausedByLostConnection try reconnect...');
// print('Eloquent@tryAgainIfCausedByLostConnection try reconnect...');
tryReconnectLimit++;
await this.reconnect();
await Future.delayed(Duration(milliseconds: 1000));
tryReconnectLimit = 0;
return await this
.runQueryCallback(query, bindings, callback, timeoutInSeconds);
}
//print('tryAgainIfCausedByLostConnection');
// print('tryAgainIfCausedByLostConnection');
throw e;
}

Expand Down
45 changes: 26 additions & 19 deletions lib/src/connectors/postgres_connector.dart
Original file line number Diff line number Diff line change
Expand Up @@ -37,34 +37,33 @@ class PostgresConnector extends Connector implements ConnectorInterface {
// and if it has we will issue a statement to modify the timezone with the
// database. Setting this DB timezone is an optional configuration item.

if (config.containsKey('timezone') && config['timezone'] != null) {
var timezone = config['timezone'];
await connection.execute("set time zone '$timezone'");
}
// if (config.containsKey('timezone') && config['timezone'] != null) {
// var timezone = config['timezone'];
// await connection.execute("set time zone '$timezone'");
// }

// Unlike MySQL, Postgres allows the concept of "schema" and a default schema
// may have been specified on the connections. If that is the case we will
// set the default schema search paths to the specified database schema.
if (config.containsKey('schema') && config['schema'] != null) {
var schema = formatSchema(config['schema']);

await connection.execute("set search_path to $schema");
}
// if (config.containsKey('schema') && config['schema'] != null) {
// var schema = formatSchema(config['schema']);
// await connection.execute("set search_path to $schema");
// }

// Postgres allows an application_name to be set by the user and this name is
// used to when monitoring the application with pg_stat_activity. So we'll
// determine if the option has been specified and run a statement if so.

if (config.containsKey('application_name') &&
config['application_name'] != null) {
var applicationName = config['application_name'];
try {
await connection.execute("set application_name to '$applicationName'");
} catch (e) {
print(
'Eloquent: Unable to set the application_name for this PostgreSQL driver.');
}
}
// if (config.containsKey('application_name') &&
// config['application_name'] != null) {
// var applicationName = config['application_name'];
// try {
// await connection.execute("set application_name to '$applicationName'");
// } catch (e) {
// print(
// 'Eloquent: Unable to set the application_name for this PostgreSQL driver.');
// }
// }

return connection;
}
Expand Down Expand Up @@ -115,6 +114,14 @@ class PostgresConnector extends Connector implements ConnectorInterface {
dsn += ";application_name=${config['application_name']}";
}

if (config['schema'] != null) {
dsn += ";schema=${config['schema']}";
}

if (config['timezone'] != null) {
dsn += ";timezone=${config['timezone']}";
}

return dsn;
}

Expand Down
16 changes: 7 additions & 9 deletions lib/src/database_manager.dart
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,15 @@ class DatabaseManager implements ConnectionResolverInterface {
Future<Connection> connection([String? nameP]) async {
var re = this.parseConnectionName(nameP);


var name = re[0];
var type = re[1];
// If we haven't created this connection, we'll create it based on the config
// provided in the application. Once we've created the connections we will
// set the "fetch mode" for PDO which determines the query return types.

if (!Utils.isset(this.connectionsProp[name])) {
var connection = await this.makeConnection(name);
final connection = await this.makeConnection(name);

this.setPdoForType(connection, type);

Expand All @@ -83,13 +84,12 @@ class DatabaseManager implements ConnectionResolverInterface {
///
/// Disconnect from the given database and remove from local cache.
///
/// @param string $name
/// [name] Connection name
/// @return void
///
void purge([String? name]) {
this.disconnect(name);
this.connectionsProp.remove(name);
//unset(this.connectionsProp[name]);
Future<void> purge([String? name]) async {
await this.disconnect(name);
this.connectionsProp.remove(name);
}

///
Expand All @@ -100,7 +100,7 @@ class DatabaseManager implements ConnectionResolverInterface {
///
Future<void> disconnect([String? name]) async {
name = name ?? this.getDefaultConnection();

if (Utils.isset(this.connectionsProp[name])) {
await this.connectionsProp[name].disconnect();
}
Expand Down Expand Up @@ -174,8 +174,6 @@ class DatabaseManager implements ConnectionResolverInterface {
/// @return \Illuminate\Database\Connection
///
Connection prepare(Connection connection) {


if (this.app.bound('events')) {
connection.setEventDispatcher(this.app['events']);
}
Expand Down
5 changes: 3 additions & 2 deletions lib/src/detects_lost_connections.dart
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ mixin DetectsLostConnections {
//TODO revise isso para outros cenários
final isR = Utils.string_contains(message, [
'57P',//for posgresql restart
// 'Can't create a connection',
"Can't create a connection",
'Connection is closed',
'connection is not open',
]);
print('causedByLostConnection $isR');

return isR;
}
}
18 changes: 16 additions & 2 deletions lib/src/pdo/postgres/postgres_pdo.dart
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,24 @@ class PostgresPDO extends PDOInterface {
dsnParser.database,
username: user,
password: password,
encoding: _getEncoding(dsnParser.charset ?? 'utf8'),
encoding: _getEncoding(dsnParser.charset ?? 'utf8'),
);
await connection.open();
await connection.query('''SET client_encoding = '${dsnParser.charset}';''');

if (dsnParser.charset != null) {
await connection.execute("SET client_encoding = '${dsnParser.charset}';");
}
if (dsnParser.schema != null) {
await connection.execute("SET search_path TO '${dsnParser.schema}';");
}
if (dsnParser.timezone != null) {
await connection.execute("SET time zone '${dsnParser.timezone};'");
}
if (dsnParser.applicationName != null) {
await connection
.execute("SET application_name TO '${dsnParser.applicationName};'");
}

return this;
}

Expand Down
57 changes: 50 additions & 7 deletions lib/src/pdo/postgres_v3/postgres_v3_pdo.dart
Original file line number Diff line number Diff line change
Expand Up @@ -73,23 +73,60 @@ class PostgresV3PDO extends PDOInterface {
connection = Pool.withEndpoints(
[endpoint],
settings: PoolSettings(
applicationName: dsnParser.applicationName,
timeZone: dsnParser.timezone,
onOpen: (conn) async {
if (dsnParser.charset != null) {
await conn
.execute("SET client_encoding = '${dsnParser.charset}';");
}
if (dsnParser.schema != null) {
await conn.execute("SET search_path TO '${dsnParser.schema}';");
}
if (dsnParser.timezone != null) {
await conn.execute("SET time zone '${dsnParser.timezone};'");
}
if (dsnParser.applicationName != null) {
await conn.execute(
"SET application_name TO '${dsnParser.applicationName};'");
}
},
maxConnectionCount: dsnParser.poolSize,
encoding: _getEncoding(dsnParser.charset ?? 'utf8'),
sslMode: sslMode,
),
);

await (connection as Pool)
.execute('''SET client_encoding = '${dsnParser.charset}';''');
} else {
//final pool = await (connection as Pool);
} else {
connection = await Connection.open(endpoint,
settings: ConnectionSettings(
applicationName: dsnParser.applicationName,
timeZone: dsnParser.timezone,
onOpen: (conn) async {
if (dsnParser.charset != null) {
await conn
.execute("SET client_encoding = '${dsnParser.charset}';");
}
if (dsnParser.schema != null) {
await conn.execute("SET search_path TO '${dsnParser.schema}';");
}
if (dsnParser.timezone != null) {
await conn.execute("SET time zone '${dsnParser.timezone};'");
}
if (dsnParser.applicationName != null) {
await conn.execute(
"SET application_name TO '${dsnParser.applicationName};'");
}
},
encoding: _getEncoding(dsnParser.charset ?? 'utf8'),
sslMode: sslMode,
));

await (connection as Connection)
.execute('''SET client_encoding = '${dsnParser.charset}';''');
// final conn = await (connection as Connection);
// if (dsnParser.charset != null) {
// conn.execute('''SET client_encoding = '${dsnParser.charset}';''');
// }
}

return this;
Expand Down Expand Up @@ -151,13 +188,19 @@ class PostgresV3PDO extends PDOInterface {
maps.add(map);
}
}

final pdoResult = PDOResults(maps, rs.affectedRows);
return pdoResult;
}

@override
Future close() async {
await connection.close();
// print('postgres_v3_pdo@close isOpen ${(connection).isOpen} ');
if (connection is Connection) {
await (connection as Connection).close();
} else if (connection is Pool) {
await (connection as Pool).close();
}
//print('postgres_v3_pdo@close isOpen ${(connection).isOpen} ');
}
}
14 changes: 13 additions & 1 deletion lib/src/utils/dsn_parser.dart
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ class DSNParser {
bool get allowReconnect => dsnParts['allowreconnect'].toString() == 'true';
String? get applicationName => dsnParts['application_name'];
String? get sslmode => dsnParts['sslmode'];
Map<String, dynamic>? get options => dsnParts['options'];
String? get timezone => dsnParts['timezone'];
String? get schema => dsnParts['schema'];

Map<String, dynamic>? get options => dsnParts['options'];
Map<String, dynamic> get params => dsnParts['params'];

DSNParser(this.dsn, [this.dsnType = DsnType.pdoPostgreSql]) {
Expand Down Expand Up @@ -130,6 +132,16 @@ class DSNParser {
dsnParts['options'] =
parts.lastWhere((p) => p.contains('options=')).split('=').last;
}

if (parts.join().contains('timezone=')) {
dsnParts['timezone'] =
parts.lastWhere((p) => p.contains('timezone=')).split('=').last;
}

if (parts.join().contains('schema=')) {
dsnParts['schema'] =
parts.lastWhere((p) => p.contains('schema=')).split('=').last;
}
} else if (dsnType == DsnType.heroku) {
var patternString = '^' +
'(?:' +
Expand Down
Loading

0 comments on commit df94933

Please sign in to comment.