diff --git a/CHANGELOG.md b/CHANGELOG.md index 279a54c..293a634 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -80,4 +80,10 @@ final manager = Manager(); ## 3.1.2 -- Fix bugs in lost connection detection to automatically reconnect. Updated postgres to 3.1.2 to be able to use the onOpen callback to configure connection settings like setting search path \ No newline at end of file +- fix bugs in lost connection detection to automatically reconnect. Updated postgres to 3.1.2 to be able to use the onOpen callback to configure connection settings like setting search path + +## 3.2.0 + +- fix bugs in onOpen callback to configure connection settings +- improvements to README +- implemented connection pool for 'postgres' (v2) driver_implementation \ No newline at end of file diff --git a/README.md b/README.md index d6979d5..5c57c6f 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,7 @@ for now it only works with PostgreSQL and MySQL 'password': 'pass', 'charset': 'utf8', 'prefix': '', - 'schema': ['public'], + 'schema': 'public', }); manager.setAsGlobal(); final db = await manager.connection(); @@ -76,6 +76,105 @@ for now it only works with PostgreSQL and MySQL await db.table('temp_location') .where('id', '=', 1).delete(); +``` +## using connection pool (works for mysql and postgresql) + +```dart + var manager = new Manager(); + manager.addConnection({ + 'driver': 'pgsql', + 'driver_implementation': 'postgres_v3', + 'host': 'localhost', + 'port': '5432', + 'database': 'database_name', + 'username': 'user_name', + 'password': 'pass', + 'charset': 'utf8', + 'prefix': '', + 'schema': 'public,other', + 'pool': true, + 'poolsize': 8, + }); + + final db = await manager.connection(); + + final query = db.table('temp_location'); + + final res = await query + .select(['temp_location.id', 'city', 'street']) + .where('temp_location.id', '=', 1) + .join('people', 'people.id', '=', 'temp_location.id_people', 'inner') + .limit(1) + .offset(0) + .get(); + + +``` + + +## connect and disconnect in loop + +```dart + var manager = new Manager(); + manager.addConnection({ + 'driver': 'pgsql', + 'host': 'localhost', + 'port': '5432', + 'database': 'database_name', + 'username': 'user_name', + 'password': 'pass', + 'charset': 'utf8', + 'prefix': '', + 'schema': 'public,other', + }); + + while (true){ + //connect + final db = await manager.connection(); + final res = await query + .select(['temp_location.id', 'city', 'street']) + .where('temp_location.id', '=', 1) + .join('people', 'people.id', '=', 'temp_location.id_people', 'inner') + .limit(1) + .offset(0) + .get(); + //disconnect + await manager.getDatabaseManager().purge(); + } + +``` + +## using different drivers implementation for postgresql + +```dart + var manager = new Manager(); + manager.addConnection({ + 'driver': 'pgsql', + 'driver_implementation': 'postgres_v3', // postgres | dargres | postgres_v3 + 'host': 'localhost', + 'port': '5432', + 'database': 'database_name', + 'username': 'user_name', + 'password': 'pass', + 'charset': 'utf8', + 'prefix': '', + 'schema': 'public,other', + }); + + + //connect + final db = await manager.connection(); + final res = await query + .select(['temp_location.id', 'city', 'street']) + .where('temp_location.id', '=', 1) + .join('people', 'people.id', '=', 'temp_location.id_people', 'inner') + .limit(1) + .offset(0) + .get(); + //disconnect + await manager.getDatabaseManager().purge(); + + ``` ## mysql example diff --git a/lib/src/connection.dart b/lib/src/connection.dart index 7c898d9..fe13a57 100644 --- a/lib/src/connection.dart +++ b/lib/src/connection.dart @@ -583,9 +583,21 @@ class Connection with DetectsLostConnections implements ConnectionInterface { .runQueryCallback(query, bindings, callback, timeoutInSeconds); //print('Connection@run $result'); } catch (e) { - //print('Connection@run error $e'); - result = await this - .tryAgainIfCausedByLostConnection(e, query, bindings, callback, timeoutInSeconds); + // print('Connection@run error $e $e'); + // result = await this + // .tryAgainIfCausedByLostConnection(e, query, bindings, callback, timeoutInSeconds); + + if (this.causedByLostConnection(e) && + tryReconnectCount < tryReconnectLimit) { + await Future.delayed(Duration(milliseconds: 1000)); + // print('Eloquent@tryAgainIfCausedByLostConnection try reconnect...'); + tryReconnectLimit++; + await this.reconnect(); + tryReconnectLimit = 0; + return await this + .runQueryCallback(query, bindings, callback, timeoutInSeconds); + } + rethrow; } // Once we have run the query we will calculate the time that it took to run and @@ -656,14 +668,14 @@ class Connection with DetectsLostConnections implements ConnectionInterface { if (this.causedByLostConnection(e) && tryReconnectCount < tryReconnectLimit) { await Future.delayed(Duration(milliseconds: delay)); - // print('Eloquent@tryAgainIfCausedByLostConnection try reconnect...'); + // print('Eloquent@tryAgainIfCausedByLostConnection try reconnect...'); tryReconnectLimit++; await this.reconnect(); tryReconnectLimit = 0; return await this .runQueryCallback(query, bindings, callback, timeoutInSeconds); } - // print('tryAgainIfCausedByLostConnection'); + // print('tryAgainIfCausedByLostConnection'); throw e; } diff --git a/lib/src/detects_lost_connections.dart b/lib/src/detects_lost_connections.dart index 3a445bf..0775059 100644 --- a/lib/src/detects_lost_connections.dart +++ b/lib/src/detects_lost_connections.dart @@ -7,7 +7,7 @@ mixin DetectsLostConnections { /// @param \Exception $e /// @return bool /// - bool causedByLostConnection(Exception e) { + bool causedByLostConnection(dynamic e) { final message = '$e'; //TODO revise isso para outros cenários final isR = Utils.string_contains(message, [ diff --git a/lib/src/pdo/postgres/dependencies/executor/executor.dart b/lib/src/pdo/postgres/dependencies/executor/executor.dart new file mode 100644 index 0000000..6981fd1 --- /dev/null +++ b/lib/src/pdo/postgres/dependencies/executor/executor.dart @@ -0,0 +1,292 @@ + +library executor; + +import 'dart:async'; +import 'dart:collection'; + +import 'package:stack_trace/stack_trace.dart'; + + +/// An async task that completes with a Future or a value. +typedef AsyncTask = FutureOr? Function(); + +/// An async task that completes after the Stream is closed. +typedef StreamTask = Stream? Function(); + +/// No more than [maximum] tasks can be started over any given [period]. +class Rate { + /// The maximum number of tasks to start in any given [period]. + final int maximum; + + /// The period of the [Rate], in which [maximum] tasks can be started. + final Duration period; + + /// Creates a rate limit. + const Rate(this.maximum, this.period); + + /// Creates a rate limit per second. + factory Rate.perSecond(int maximum) => Rate(maximum, Duration(seconds: 1)); + + /// Creates a rate limit per minute. + factory Rate.perMinute(int maximum) => Rate(maximum, Duration(minutes: 1)); + + /// Creates a rate limit per hour. + factory Rate.perHour(int maximum) => Rate(maximum, Duration(hours: 1)); + + @override + bool operator ==(Object other) => + identical(this, other) || + other is Rate && + runtimeType == other.runtimeType && + maximum == other.maximum && + period == other.period; + + @override + int get hashCode => maximum.hashCode ^ period.hashCode; +} + +/// Executes async tasks with a configurable maximum [concurrency] and [rate]. +abstract class Executor { + /// The maximum number of tasks running concurrently. + int concurrency = 1; + + /// The maximum rate of how frequently tasks can be started. + Rate? rate; + + /// Async task executor. + factory Executor({ + int concurrency = 1, + Rate? rate, + }) => + _Executor(concurrency, rate); + + /// The number of tasks that are currently running. + int get runningCount; + + /// The number of tasks that are currently waiting to be started. + int get waitingCount; + + /// The total number of tasks scheduled ([runningCount] + [waitingCount]). + int get scheduledCount; + + /// Schedules an async task and returns with a future that completes when the + /// task is finished. Task may not get executed immediately. + Future scheduleTask(AsyncTask task); + + /// Schedules an async task and returns its stream. The task is considered + /// running until the stream is closed. + Stream scheduleStream(StreamTask task); + + /// Returns a [Future] that completes when all currently running tasks + /// complete. + /// + /// If [withWaiting] is set, it will include the waiting tasks too. + Future> join({bool withWaiting = false}); + + /// Notifies the listeners about a state change in [Executor], for example: + /// - one or more tasks have started + /// - one or more tasks have completed + /// + /// Clients can use this to monitor [scheduledCount] and queue more tasks to + /// ensure [Executor] is running on full capacity. + Stream get onChange; + + /// Closes the executor and reject tasks. + Future close(); +} + +class _Executor implements Executor { + int _concurrency; + Rate? _rate; + final ListQueue<_Item> _waiting = ListQueue<_Item>(); + final ListQueue<_Item> _running = ListQueue<_Item>(); + final ListQueue _started = ListQueue(); + final StreamController _onChangeController = StreamController.broadcast(); + bool _closing = false; + Timer? _triggerTimer; + + _Executor(this._concurrency, this._rate) { + assert(_concurrency > 0); + } + + @override + int get runningCount => _running.length; + + @override + int get waitingCount => _waiting.length; + + @override + int get scheduledCount => runningCount + waitingCount; + + bool get isClosing => _closing; + + @override + int get concurrency => _concurrency; + + @override + set concurrency(int value) { + if (_concurrency == value) return; + assert(value > 0); + _concurrency = value; + _trigger(); + } + + @override + Rate? get rate => _rate; + + @override + set rate(Rate? value) { + if (_rate == value) return; + _rate = value; + _trigger(); + } + + @override + Future scheduleTask(AsyncTask task) async { + if (isClosing) throw Exception('Executor doesn\'t accept tasks.'); + final item = _Item(); + _waiting.add(item); + _trigger(); + await item.trigger.future; + if (isClosing) { + item.result.completeError( + TimeoutException('Executor is closing'), Trace.current(1)); + } else { + try { + final r = await task(); + item.result.complete(r); + } catch (e, st) { + final chain = Chain([Trace.from(st), Trace.current(1)]); + item.result.completeError(e, chain); + } + } + _running.remove(item); + _trigger(); + item.done.complete(); + return await item.result.future + // Nullable R is used to allow using catchError with null output, so + // we must convert R? into R for the caller + .then((v) => v as R); + } + + @override + Stream scheduleStream(StreamTask task) { + final streamController = StreamController(); + StreamSubscription? streamSubscription; + final resourceCompleter = Completer(); + complete() { + if (streamSubscription != null) { + streamSubscription?.cancel(); + streamSubscription = null; + } + if (!resourceCompleter.isCompleted) { + resourceCompleter.complete(); + } + if (!streamController.isClosed) { + streamController.close(); + } + } + + completeWithError(e, st) { + if (!streamController.isClosed) { + streamController.addError(e as Object, st as StackTrace); + } + complete(); + } + + streamController + ..onCancel = complete + ..onPause = (() => streamSubscription?.pause()) + ..onResume = () => streamSubscription?.resume(); + scheduleTask(() { + if (resourceCompleter.isCompleted) return null; + try { + final stream = task(); + if (stream == null) { + complete(); + return null; + } + streamSubscription = stream.listen(streamController.add, + onError: streamController.addError, + onDone: complete, + cancelOnError: true); + } catch (e, st) { + completeWithError(e, st); + } + return resourceCompleter.future; + }).catchError(completeWithError); + return streamController.stream; + } + + @override + Future> join({bool withWaiting = false}) { + final futures = >[]; + for (final item in _running) { + futures.add(item.result.future.catchError((_) async => null)); + } + if (withWaiting) { + for (final item in _waiting) { + futures.add(item.result.future.catchError((_) async => null)); + } + } + if (futures.isEmpty) return Future.value([]); + return Future.wait(futures); + } + + @override + Stream get onChange => _onChangeController.stream; + + @override + Future close() async { + _closing = true; + _trigger(); + await join(withWaiting: true); + _triggerTimer?.cancel(); + await _onChangeController.close(); + } + + void _trigger() { + _triggerTimer?.cancel(); + _triggerTimer = null; + + while (_running.length < _concurrency && _waiting.isNotEmpty) { + final rate = _rate; + if (rate != null) { + final now = DateTime.now(); + final limitStart = now.subtract(rate.period); + while (_started.isNotEmpty && _started.first.isBefore(limitStart)) { + _started.removeFirst(); + } + if (_started.isNotEmpty) { + final gap = rate.period ~/ rate.maximum; + final last = now.difference(_started.last); + if (gap > last) { + final diff = gap - last; + _triggerTimer ??= Timer(diff, _trigger); + return; + } + } + _started.add(now); + } + + final item = _waiting.removeFirst(); + _running.add(item); + item.done.future.whenComplete(() { + _trigger(); + if (!_closing && + _onChangeController.hasListener && + !_onChangeController.isClosed) { + _onChangeController.add(null); + } + }); + item.trigger.complete(); + } + } +} + +class _Item { + final trigger = Completer(); + // Nullable R is used here so that we can return null during catchError + final result = Completer(); + final done = Completer(); +} \ No newline at end of file diff --git a/lib/src/pdo/postgres/dependencies/postgres_pool/postgres_pool.dart b/lib/src/pdo/postgres/dependencies/postgres_pool/postgres_pool.dart new file mode 100644 index 0000000..c758863 --- /dev/null +++ b/lib/src/pdo/postgres/dependencies/postgres_pool/postgres_pool.dart @@ -0,0 +1,903 @@ +import 'dart:async'; +import 'dart:convert'; +import 'dart:io'; +import 'dart:math'; + +import '../executor/executor.dart'; +import 'package:postgres_fork/postgres.dart'; +import '../retry/retry.dart'; + +/// A session is a continuous use of a single connection (inside or outside of a +/// transaction). +/// +/// This callback function will be called once a connection becomes available. +typedef PgSessionFn = Future Function(PostgreSQLExecutionContext c); + +/// The PostgreSQL server endpoint and its configuration to use when opening a +/// new connection. +class PgEndpoint { + final String host; + final int port; + final String database; + final String? username; + final String? password; + final bool requireSsl; + final bool isUnixSocket; + + /// If provided, it will set that name to the Postgres connection for debugging + /// purposes. + /// If a different name is desired for different connections opened by the pool, + /// the name can contain '{{connectionId}}' which would get replaced at run time. + /// + /// Active connections and their names can be obtained in Postgres with + /// `SELECT * FROM pg_stat_activity` + final String? applicationName; + + PgEndpoint({ + required this.host, + this.port = 5432, + required this.database, + this.username, + this.password, + this.requireSsl = false, + this.isUnixSocket = false, + this.applicationName, + }); + + /// Parses the most common connection URL formats: + /// - postgresql://user:password@host:port/dbname + /// - postgresql://host:port/dbname?username=user&password=pwd + /// - postgresql://host:port/dbname?application_name=myapp + /// + /// Set ?sslmode=require to force secure SSL connection. + factory PgEndpoint.parse(url) { + final uri = url is Uri ? url : Uri.parse(url as String); + final userInfoParts = uri.userInfo.split(':'); + final username = userInfoParts.length == 2 ? userInfoParts[0] : null; + final password = userInfoParts.length == 2 ? userInfoParts[1] : null; + final isUnixSocketParam = uri.queryParameters['is-unix-socket']; + final applicationNameParam = uri.queryParameters['application_name']; + + return PgEndpoint( + host: uri.host, + port: uri.port, + database: uri.path.substring(1), + username: username ?? uri.queryParameters['username'], + password: password ?? uri.queryParameters['password'], + requireSsl: uri.queryParameters['sslmode'] == 'require', + isUnixSocket: isUnixSocketParam == '1', + applicationName: applicationNameParam, + ); + } + + /// Creates a new [PgEndpoint] by replacing the current values with non-null + /// parameters. + /// + /// Parameters with `null` values are ignored (keeping current value). + PgEndpoint replace({ + String? host, + int? port, + String? database, + String? username, + String? password, + bool? requireSsl, + bool? isUnixSocket, + String? Function()? applicationName, + }) { + return PgEndpoint( + host: host ?? this.host, + port: port ?? this.port, + database: database ?? this.database, + username: username ?? this.username, + password: password ?? this.password, + requireSsl: requireSsl ?? this.requireSsl, + isUnixSocket: isUnixSocket ?? this.isUnixSocket, + applicationName: + applicationName == null ? this.applicationName : applicationName(), + ); + } + + @override + String toString() => Uri( + scheme: 'postgres', + host: host, + port: port, + path: database, + queryParameters: { + 'username': username, + 'password': password, + 'sslmode': requireSsl ? 'require' : 'allow', + if (isUnixSocket) 'is-unix-socket': '1', + if (applicationName != null) 'application_name': applicationName, + }, + ).toString(); + + @override + bool operator ==(Object other) => + identical(this, other) || + other is PgEndpoint && + runtimeType == other.runtimeType && + host == other.host && + port == other.port && + database == other.database && + username == other.username && + password == other.password && + requireSsl == other.requireSsl && + isUnixSocket == other.isUnixSocket && + applicationName == other.applicationName; + + @override + int get hashCode => + host.hashCode ^ + port.hashCode ^ + database.hashCode ^ + username.hashCode ^ + password.hashCode ^ + requireSsl.hashCode ^ + isUnixSocket.hashCode ^ + applicationName.hashCode; +} + +/// The list of [PgPool] actions. +abstract class PgPoolAction { + static final connecting = 'connecting'; + static final connectingCompleted = 'connecting-completed'; + static final connectingFailed = 'connecting-failed'; + static final closing = 'closing'; + static final closingCompleted = 'closing-completed'; + static final closingFailed = 'closing-failed'; + static final query = 'query'; + static final queryCompleted = 'query-completed'; + static final queryFailed = 'query-failed'; +} + +/// Describes a pool event (with error - if there were any). +class PgPoolEvent { + /// The id of the connection + final int connectionId; + + /// The identifier of the session (e.g. activity type) + final String? sessionId; + + /// The unique identifier of a request (used for correlating log entries). + final String? traceId; + + /// One of [PgPoolAction] values. + final String action; + + /// The SQL query (if there was any). + final String? query; + + /// The SQL query's substitution values (if there was any). + final dynamic substitutionValues; + + /// The elapsed time since the operation started (if applicable). + final Duration? elapsed; + + /// The error object (if there was any). + final dynamic error; + + /// The stack trace when the error happened (if there was any). + final StackTrace? stackTrace; + + PgPoolEvent({ + required this.connectionId, + this.sessionId, + this.traceId, + required this.action, + this.query, + this.substitutionValues, + this.elapsed, + this.error, + this.stackTrace, + }); + + PgPoolEvent.fromPgPoolEvent(PgPoolEvent other) + : connectionId = other.connectionId, + sessionId = other.sessionId, + traceId = other.traceId, + action = other.action, + query = other.query, + substitutionValues = other.substitutionValues, + elapsed = other.elapsed, + error = other.error, + stackTrace = other.stackTrace; +} + +/// A snapshot of the connection's internal state. +class PgConnectionStatus { + /// The numerical id of the connection. + final int connectionId; + + /// If the connection is open and not in use. + final bool isIdle; + + /// The time when the connection was opened. + final DateTime opened; + + PgConnectionStatus({ + required this.connectionId, + required this.isIdle, + required this.opened, + }); +} + +/// A snapshot of the [PgPool]'s internal state. +class PgPoolStatus { + /// The status of the connections. + final List connections; + + /// The number of sessions using an active connection. + final int activeSessionCount; + + /// The number of sessions waiting for a connection to become available. + final int pendingSessionCount; + + PgPoolStatus({ + required this.connections, + required this.activeSessionCount, + required this.pendingSessionCount, + }); + + PgPoolStatus.fromPgPoolStatus(PgPoolStatus other) + : connections = other.connections, + activeSessionCount = other.activeSessionCount, + pendingSessionCount = other.pendingSessionCount; +} + +/// The settings of the [PgPool]. +class PgPoolSettings { + /// The maximum number of concurrent sessions. + int maxConnectionCount = 1; + + /// The timeout after the connection attempt is assumed to be failing. + /// Fractional seconds will be omitted. + /// Value is applied only on new connections. + Duration connectTimeout = Duration(seconds: 15); + + /// The timeout after a query is assumed to be failing. + /// Fractional seconds will be omitted. + /// Value is applied only on new connections. + Duration queryTimeout = Duration(minutes: 5); + + /// If a connection is idle for longer than this threshold, it will be tested + /// with a simple SQL query before allocating it to a session. + Duration idleTestThreshold = Duration(minutes: 1); + + /// The maximum duration a connection is kept open. + /// New sessions won't be scheduled after this limit is reached. + Duration maxConnectionAge = Duration(hours: 12); + + /// The maximum duration a connection is used by sessions. + /// New sessions won't be scheduled after this limit is reached. + Duration maxSessionUse = Duration(hours: 8); + + /// The maximum number of error events to be collected on a connection. + /// New sessions won't be scheduled after this limit is reached. + int maxErrorCount = 128; + + /// The maximum number of queries to be run on a connection. + /// New sessions won't be scheduled after this limit is reached. + int maxQueryCount = 1024 * 1024; + + /// Timezone for the connetion + String timeZone = 'UTC'; + + Encoding encoding = utf8; + + /// This callback function will be called after opening the connection. + Future Function(PostgreSQLExecutionContext connection)? onOpen; + + /// The default retry options for `run` / `runTx` operations. + RetryOptions retryOptions = RetryOptions( + maxAttempts: 1, + delayFactor: Duration(milliseconds: 5), + maxDelay: Duration(seconds: 1), + randomizationFactor: 0.1, + ); + + void applyFrom(PgPoolSettings other) { + maxConnectionCount = other.maxConnectionCount; + connectTimeout = other.connectTimeout; + queryTimeout = other.queryTimeout; + idleTestThreshold = other.idleTestThreshold; + maxConnectionAge = other.maxConnectionAge; + maxSessionUse = other.maxSessionUse; + maxErrorCount = other.maxErrorCount; + maxQueryCount = other.maxQueryCount; + retryOptions = other.retryOptions; + timeZone = other.timeZone; + } +} + +/// Single-server connection pool for PostgresSQL database access. +class PgPool implements PostgreSQLExecutionContext { + final PgEndpoint _url; + + final Executor _executor; + final _random = Random(); + final _connections = <_ConnectionCtx>[]; + final _events = StreamController.broadcast(); + int _nextConnectionId = 1; + + /// Makes sure only one connection is opening at a time. + Completer? _openCompleter; + PgPoolSettings settings; + + PgPool(PgEndpoint url, {PgPoolSettings? settings}) + : settings = settings ?? PgPoolSettings(), + _executor = Executor(), + _url = url; + + /// Get the current debug information of the pool's internal state. + PgPoolStatus status() => PgPoolStatus( + connections: _connections.map((c) => c.status()).toList(), + activeSessionCount: _executor.runningCount, + pendingSessionCount: _executor.waitingCount, + ); + + /// The events that happen while the pool is working. + Stream get events => _events.stream; + + /// Runs [fn] outside of a transaction. + Future run( + PgSessionFn fn, { + RetryOptions? retryOptions, + FutureOr Function()? orElse, + FutureOr Function(Exception)? retryIf, + String? sessionId, + String? traceId, + }) async { + retryOptions ??= settings.retryOptions; + try { + return await retryOptions.retry( + () async { + return await _withConnection( + (c) => fn(_PgExecutionContextWrapper( + c.connectionId, + c.connection, + sessionId, + traceId, + _events, + )), + ); + }, + retryIf: (e) async => + e is! PostgreSQLException && + e is! IOException && + (retryIf == null || await retryIf(e)), + ); + } catch (e) { + if (orElse != null) { + return await orElse(); + } + rethrow; + } + } + + /// Runs [fn] in a transaction. + Future runTx( + PgSessionFn fn, { + RetryOptions? retryOptions, + FutureOr Function()? orElse, + FutureOr Function(Exception)? retryIf, + String? sessionId, + String? traceId, + }) async { + retryOptions ??= settings.retryOptions; + try { + return await retryOptions.retry( + () async { + return await _withConnection((c) async { + return await c.connection.transaction( + (conn) => fn(_PgExecutionContextWrapper( + c.connectionId, + conn, + sessionId, + traceId, + _events, + )), + ) as R; + }); + }, + retryIf: (e) async => + e is! PostgreSQLException && + e is! IOException && + (retryIf == null || await retryIf(e)), + ); + } catch (e) { + if (orElse != null) { + return await orElse(); + } + rethrow; + } + } + + Future close() async { + await _executor.close(); + while (_connections.isNotEmpty) { + await Future.wait(List.of(_connections).map(_close)); + } + await _events.close(); + } + + Future _withConnection(Future Function(_ConnectionCtx c) body) { + _executor.concurrency = settings.maxConnectionCount; + return _executor.scheduleTask(() async { + return await _useOrCreate(body); + }); + } + + _ConnectionCtx? _lockIdle() { + final list = _connections.where((c) => c.isIdle).toList(); + if (list.isEmpty) return null; + final entry = + list.length == 1 ? list.single : list[_random.nextInt(list.length)]; + entry.isIdle = false; + return entry; + } + + Future<_ConnectionCtx?> _tryAcquireAvailable() async { + for (var ctx = _lockIdle(); ctx != null; ctx = _lockIdle()) { + if (await _testConnection(ctx)) { + return ctx; + } else { + await _close(ctx); + } + } + return null; + } + + Future _useOrCreate(Future Function(_ConnectionCtx c) body) async { + final ctx = await _tryAcquireAvailable() ?? await _open(); + final sw = Stopwatch()..start(); + try { + final r = await body(ctx); + ctx.lastReturned = DateTime.now(); + ctx.elapsed += sw.elapsed; + ctx.queryCount++; + ctx.isIdle = true; + return r; + } on PostgreSQLException catch (_) { + await _close(ctx); + rethrow; + } on IOException catch (_) { + await _close(ctx); + rethrow; + } on TimeoutException catch (_) { + await _close(ctx); + rethrow; + } catch (e) { + ctx.lastReturned = DateTime.now(); + ctx.elapsed += sw.elapsed; + ctx.errorCount++; + ctx.queryCount++; + ctx.isIdle = true; + rethrow; + } + } + + Future<_ConnectionCtx> _open() async { + while (_openCompleter != null) { + await _openCompleter!.future; + } + _openCompleter = Completer(); + final connectionId = _nextConnectionId++; + _events.add(PgPoolEvent( + connectionId: connectionId, + action: PgPoolAction.connecting, + )); + try { + for (var i = 3; i > 0; i--) { + final sw = Stopwatch()..start(); + try { + final c = PostgreSQLConnection( + _url.host, + _url.port, + _url.database, + username: _url.username, + password: _url.password, + useSSL: _url.requireSsl, + isUnixSocket: _url.isUnixSocket, + timeoutInSeconds: settings.connectTimeout.inSeconds, + queryTimeoutInSeconds: settings.queryTimeout.inSeconds, + timeZone: settings.timeZone, + encoding: settings.encoding, + ); + await c.open(); + if (settings.onOpen != null) { + await settings.onOpen!(c); + } + final ctx = _ConnectionCtx(connectionId, c); + _connections.add(ctx); + + // Set the application connection name + final applicationName = _url.applicationName; + if (applicationName != null) { + await _setApplicationName( + c, + applicationName: applicationName, + connectionId: connectionId, + ); + } + + _events.add(PgPoolEvent( + connectionId: connectionId, + action: PgPoolAction.connectingCompleted, + elapsed: sw.elapsed, + )); + + return ctx; + } catch (e, st) { + if (i == 1) { + _events.add(PgPoolEvent( + connectionId: connectionId, + action: PgPoolAction.connectingFailed, + elapsed: sw.elapsed, + error: e, + stackTrace: st, + )); + rethrow; + } + } + } + throw StateError('Should not reach this code.'); + } finally { + final c = _openCompleter!; + _openCompleter = null; + c.complete(); + } + } + + Future _testConnection(_ConnectionCtx ctx) async { + final now = DateTime.now(); + final totalAge = now.difference(ctx.opened).abs(); + final shouldClose = (totalAge >= settings.maxConnectionAge) || + (ctx.elapsed >= settings.maxSessionUse) || + (ctx.errorCount >= settings.maxErrorCount) || + (ctx.queryCount >= settings.maxQueryCount); + if (shouldClose) { + return false; + } + final idleAge = now.difference(ctx.lastReturned).abs(); + if (idleAge < settings.idleTestThreshold) { + return true; + } + try { + await ctx.connection.query('SELECT 1;', timeoutInSeconds: 2); + return true; + } catch (_) {} + return false; + } + + Future _close(_ConnectionCtx ctx) async { + ctx.isIdle = false; + if (ctx.closingCompleter != null) { + await ctx.closingCompleter!.future; + return; + } + ctx.closingCompleter = Completer(); + + final sw = Stopwatch()..start(); + try { + if (!ctx.connection.isClosed) { + _events.add(PgPoolEvent( + connectionId: ctx.connectionId, + action: PgPoolAction.closing, + )); + await ctx.connection.close(); + _events.add(PgPoolEvent( + connectionId: ctx.connectionId, + action: PgPoolAction.closingCompleted, + elapsed: sw.elapsed, + )); + } + } catch (e, st) { + _events.add(PgPoolEvent( + connectionId: ctx.connectionId, + action: PgPoolAction.closingFailed, + elapsed: sw.elapsed, + error: e, + stackTrace: st, + )); + } finally { + _connections.remove(ctx); + } + } + + @override + int get queueSize => + _connections.fold(0, (a, b) => a + b.connection.queueSize); + + @override + Future query( + String fmtString, { + dynamic substitutionValues, + bool? allowReuse = true, + int? timeoutInSeconds, + bool? useSimpleQueryProtocol, + String? sessionId, + String? traceId, + PlaceholderIdentifier placeholderIdentifier = PlaceholderIdentifier.atSign, + }) { + return run( + (c) => c.query( + fmtString, + substitutionValues: substitutionValues, + allowReuse: allowReuse, + timeoutInSeconds: timeoutInSeconds, + useSimpleQueryProtocol: useSimpleQueryProtocol, + placeholderIdentifier: placeholderIdentifier, + ), + sessionId: sessionId, + traceId: traceId, + ); + } + + @override + Future execute( + String fmtString, { + dynamic substitutionValues, + int? timeoutInSeconds, + String? sessionId, + String? traceId, + PlaceholderIdentifier placeholderIdentifier = PlaceholderIdentifier.atSign, + }) { + return run( + (c) => c.execute( + fmtString, + substitutionValues: substitutionValues, + timeoutInSeconds: timeoutInSeconds, + placeholderIdentifier: placeholderIdentifier, + ), + sessionId: sessionId, + traceId: traceId, + ); + } + + @override + void cancelTransaction({String? reason}) { + // no-op + } + + @override + Future>>> mappedResultsQuery( + String fmtString, { + dynamic substitutionValues, + bool? allowReuse = true, + int? timeoutInSeconds, + String? sessionId, + String? traceId, + PlaceholderIdentifier placeholderIdentifier = PlaceholderIdentifier.atSign, + }) { + return run( + (c) => c.mappedResultsQuery( + fmtString, + substitutionValues: substitutionValues, + allowReuse: allowReuse, + timeoutInSeconds: timeoutInSeconds, + placeholderIdentifier: placeholderIdentifier, + ), + sessionId: sessionId, + traceId: traceId, + ); + } + + @override + Future>> queryAsMap( + String fmtString, { + dynamic substitutionValues, + bool? allowReuse = true, + int? timeoutInSeconds, + String? sessionId, + String? traceId, + PlaceholderIdentifier placeholderIdentifier = PlaceholderIdentifier.atSign, + }) { + return run( + (c) => c.queryAsMap( + fmtString, + substitutionValues: substitutionValues, + allowReuse: allowReuse, + timeoutInSeconds: timeoutInSeconds, + placeholderIdentifier: placeholderIdentifier, + ), + sessionId: sessionId, + traceId: traceId, + ); + } + + /// Sets the application_name to the provided postgres connection + /// Current implementation is done by executing a postgres command. + /// A future improvement could be to send the application_name + /// through the postgres messaging protocol + Future _setApplicationName( + PostgreSQLConnection pgConn, { + required String applicationName, + required int connectionId, + }) async { + // The connectionId can be injected into the name at runtime + final effectiveName = + applicationName.replaceAll('{{connectionId}}', connectionId.toString()); + + await pgConn.execute( + 'SET application_name = @app_name', + substitutionValues: { + 'app_name': effectiveName, + }, + ); + } +} + +class _ConnectionCtx { + final int connectionId; + final PostgreSQLConnection connection; + final DateTime opened; + late DateTime lastReturned; + int queryCount = 0; + int errorCount = 0; + Duration elapsed = Duration.zero; + bool isIdle = false; + Completer? closingCompleter; + + _ConnectionCtx(this.connectionId, this.connection) : opened = DateTime.now(); + + PgConnectionStatus status() { + return PgConnectionStatus( + connectionId: connectionId, + isIdle: isIdle, + opened: opened, + ); + } +} + +class _PgExecutionContextWrapper implements PostgreSQLExecutionContext { + final int connectionId; + final PostgreSQLExecutionContext _delegate; + final String? sessionId; + final String? traceId; + final Sink _eventSink; + + _PgExecutionContextWrapper( + this.connectionId, + this._delegate, + this.sessionId, + this.traceId, + this._eventSink, + ); + + Future _run( + Future Function() body, + String query, + dynamic substitutionValues, + ) async { + final sw = Stopwatch()..start(); + try { + _eventSink.add(PgPoolEvent( + connectionId: connectionId, + sessionId: sessionId, + traceId: traceId, + action: PgPoolAction.query, + query: query, + substitutionValues: substitutionValues, + )); + final r = await body(); + _eventSink.add(PgPoolEvent( + connectionId: connectionId, + sessionId: sessionId, + traceId: traceId, + action: PgPoolAction.queryCompleted, + query: query, + substitutionValues: substitutionValues, + elapsed: sw.elapsed, + )); + return r; + } catch (e, st) { + _eventSink.add(PgPoolEvent( + connectionId: connectionId, + sessionId: sessionId, + traceId: traceId, + action: PgPoolAction.queryFailed, + query: query, + substitutionValues: substitutionValues, + elapsed: sw.elapsed, + error: e, + stackTrace: st, + )); + rethrow; + } finally { + sw.stop(); + } + } + + @override + void cancelTransaction({String? reason}) { + _delegate.cancelTransaction(reason: reason); + } + + @override + Future execute( + String fmtString, { + dynamic substitutionValues, + int? timeoutInSeconds, + PlaceholderIdentifier placeholderIdentifier = PlaceholderIdentifier.atSign, + }) { + return _run( + () => _delegate.execute( + fmtString, + substitutionValues: substitutionValues, + timeoutInSeconds: timeoutInSeconds, + placeholderIdentifier: placeholderIdentifier, + ), + fmtString, + substitutionValues, + ); + } + + @override + Future query( + String fmtString, { + dynamic substitutionValues, + bool? allowReuse = true, + int? timeoutInSeconds, + bool? useSimpleQueryProtocol, + PlaceholderIdentifier placeholderIdentifier = PlaceholderIdentifier.atSign, + }) { + return _run( + () => _delegate.query( + fmtString, + substitutionValues: substitutionValues, + allowReuse: allowReuse, + timeoutInSeconds: timeoutInSeconds, + useSimpleQueryProtocol: useSimpleQueryProtocol, + placeholderIdentifier: placeholderIdentifier, + ), + fmtString, + substitutionValues, + ); + } + + @override + Future>>> mappedResultsQuery( + String fmtString, { + dynamic substitutionValues, + bool? allowReuse = true, + int? timeoutInSeconds, + PlaceholderIdentifier placeholderIdentifier = PlaceholderIdentifier.atSign, + }) { + return _run( + () => _delegate.mappedResultsQuery( + fmtString, + substitutionValues: substitutionValues, + allowReuse: allowReuse, + timeoutInSeconds: timeoutInSeconds, + placeholderIdentifier: placeholderIdentifier, + ), + fmtString, + substitutionValues, + ); + } + + @override + Future>> queryAsMap( + String fmtString, { + dynamic substitutionValues, + bool? allowReuse = true, + int? timeoutInSeconds, + PlaceholderIdentifier placeholderIdentifier = PlaceholderIdentifier.atSign, + }) { + return _run( + () => _delegate.queryAsMap( + fmtString, + substitutionValues: substitutionValues, + allowReuse: allowReuse, + timeoutInSeconds: timeoutInSeconds, + placeholderIdentifier: placeholderIdentifier, + ), + fmtString, + substitutionValues, + ); + } + + @override + int get queueSize => _delegate.queueSize; +} diff --git a/lib/src/pdo/postgres/dependencies/retry/retry.dart b/lib/src/pdo/postgres/dependencies/retry/retry.dart new file mode 100644 index 0000000..25ecbe5 --- /dev/null +++ b/lib/src/pdo/postgres/dependencies/retry/retry.dart @@ -0,0 +1,174 @@ +/// Retry asynchronous functions with exponential backoff. +/// +/// For a simple solution see [retry], to modify and persist retry options see +/// [RetryOptions]. Note, in many cases the added configurability is +/// unnecessary and using [retry] is perfectly fine. +library retry; + +import 'dart:async'; +import 'dart:math' as math; + +final _rand = math.Random(); + +/// Object holding options for retrying a function. +/// +/// With the default configuration functions will be retried up-to 7 times +/// (8 attempts in total), sleeping 1st, 2nd, 3rd, ..., 7th attempt: +/// 1. 400 ms +/- 25% +/// 2. 800 ms +/- 25% +/// 3. 1600 ms +/- 25% +/// 4. 3200 ms +/- 25% +/// 5. 6400 ms +/- 25% +/// 6. 12800 ms +/- 25% +/// 7. 25600 ms +/- 25% +/// +/// **Example** +/// ```dart +/// final r = RetryOptions(); +/// final response = await r.retry( +/// // Make a GET request +/// () => http.get('https://google.com').timeout(Duration(seconds: 5)), +/// // Retry on SocketException or TimeoutException +/// retryIf: (e) => e is SocketException || e is TimeoutException, +/// ); +/// print(response.body); +/// ``` +final class RetryOptions { + /// Delay factor to double after every attempt. + /// + /// Defaults to 200 ms, which results in the following delays: + /// + /// 1. 400 ms + /// 2. 800 ms + /// 3. 1600 ms + /// 4. 3200 ms + /// 5. 6400 ms + /// 6. 12800 ms + /// 7. 25600 ms + /// + /// Before application of [randomizationFactor]. + final Duration delayFactor; + + /// Percentage the delay should be randomized, given as fraction between + /// 0 and 1. + /// + /// If [randomizationFactor] is `0.25` (default) this indicates 25 % of the + /// delay should be increased or decreased by 25 %. + final double randomizationFactor; + + /// Maximum delay between retries, defaults to 30 seconds. + final Duration maxDelay; + + /// Maximum number of attempts before giving up, defaults to 8. + final int maxAttempts; + + /// Create a set of [RetryOptions]. + /// + /// Defaults to 8 attempts, sleeping as following after 1st, 2nd, 3rd, ..., + /// 7th attempt: + /// 1. 400 ms +/- 25% + /// 2. 800 ms +/- 25% + /// 3. 1600 ms +/- 25% + /// 4. 3200 ms +/- 25% + /// 5. 6400 ms +/- 25% + /// 6. 12800 ms +/- 25% + /// 7. 25600 ms +/- 25% + const RetryOptions({ + this.delayFactor = const Duration(milliseconds: 200), + this.randomizationFactor = 0.25, + this.maxDelay = const Duration(seconds: 30), + this.maxAttempts = 8, + }); + + /// Delay after [attempt] number of attempts. + /// + /// This is computed as `pow(2, attempt) * delayFactor`, then is multiplied by + /// between `-randomizationFactor` and `randomizationFactor` at random. + Duration delay(int attempt) { + assert(attempt >= 0, 'attempt cannot be negative'); + if (attempt <= 0) { + return Duration.zero; + } + final rf = (randomizationFactor * (_rand.nextDouble() * 2 - 1) + 1); + final exp = math.min(attempt, 31); // prevent overflows. + final delay = (delayFactor * math.pow(2.0, exp) * rf); + return delay < maxDelay ? delay : maxDelay; + } + + /// Call [fn] retrying so long as [retryIf] return `true` for the exception + /// thrown. + /// + /// At every retry the [onRetry] function will be called (if given). The + /// function [fn] will be invoked at-most [this.attempts] times. + /// + /// If no [retryIf] function is given this will retry any for any [Exception] + /// thrown. To retry on an [Error], the error must be caught and _rethrown_ + /// as an [Exception]. + Future retry( + FutureOr Function() fn, { + FutureOr Function(Exception)? retryIf, + FutureOr Function(Exception)? onRetry, + }) async { + var attempt = 0; + // ignore: literal_only_boolean_expressions + while (true) { + attempt++; // first invocation is the first attempt + try { + return await fn(); + } on Exception catch (e) { + if (attempt >= maxAttempts || + (retryIf != null && !(await retryIf(e)))) { + rethrow; + } + if (onRetry != null) { + await onRetry(e); + } + } + + // Sleep for a delay + await Future.delayed(delay(attempt)); + } + } +} + +/// Call [fn] retrying so long as [retryIf] return `true` for the exception +/// thrown, up-to [maxAttempts] times. +/// +/// Defaults to 8 attempts, sleeping as following after 1st, 2nd, 3rd, ..., +/// 7th attempt: +/// 1. 400 ms +/- 25% +/// 2. 800 ms +/- 25% +/// 3. 1600 ms +/- 25% +/// 4. 3200 ms +/- 25% +/// 5. 6400 ms +/- 25% +/// 6. 12800 ms +/- 25% +/// 7. 25600 ms +/- 25% +/// +/// ```dart +/// final response = await retry( +/// // Make a GET request +/// () => http.get('https://google.com').timeout(Duration(seconds: 5)), +/// // Retry on SocketException or TimeoutException +/// retryIf: (e) => e is SocketException || e is TimeoutException, +/// ); +/// print(response.body); +/// ``` +/// +/// If no [retryIf] function is given this will retry any for any [Exception] +/// thrown. To retry on an [Error], the error must be caught and _rethrown_ +/// as an [Exception]. +Future retry( + FutureOr Function() fn, { + Duration delayFactor = const Duration(milliseconds: 200), + double randomizationFactor = 0.25, + Duration maxDelay = const Duration(seconds: 30), + int maxAttempts = 8, + FutureOr Function(Exception)? retryIf, + FutureOr Function(Exception)? onRetry, +}) => + RetryOptions( + delayFactor: delayFactor, + randomizationFactor: randomizationFactor, + maxDelay: maxDelay, + maxAttempts: maxAttempts, + ).retry(fn, retryIf: retryIf, onRetry: onRetry); \ No newline at end of file diff --git a/lib/src/pdo/postgres/dependencies/stack_trace/src/chain.dart b/lib/src/pdo/postgres/dependencies/stack_trace/src/chain.dart new file mode 100644 index 0000000..15d361c --- /dev/null +++ b/lib/src/pdo/postgres/dependencies/stack_trace/src/chain.dart @@ -0,0 +1,262 @@ + + +import 'dart:async'; +import 'dart:math' as math; + +import 'frame.dart'; +import 'lazy_chain.dart'; +import 'stack_zone_specification.dart'; +import 'trace.dart'; +import 'utils.dart'; + +/// A function that handles errors in the zone wrapped by [Chain.capture]. +@Deprecated('Will be removed in stack_trace 2.0.0.') +typedef ChainHandler = void Function(dynamic error, Chain chain); + +/// An opaque key used to track the current [StackZoneSpecification]. +final _specKey = Object(); + +/// A chain of stack traces. +/// +/// A stack chain is a collection of one or more stack traces that collectively +/// represent the path from `main` through nested function calls to a particular +/// code location, usually where an error was thrown. Multiple stack traces are +/// necessary when using asynchronous functions, since the program's stack is +/// reset before each asynchronous callback is run. +/// +/// Stack chains can be automatically tracked using [Chain.capture]. This sets +/// up a new [Zone] in which the current stack chain is tracked and can be +/// accessed using [Chain.current]. Any errors that would be top-leveled in +/// the zone can be handled, along with their associated chains, with the +/// `onError` callback. For example: +/// +/// Chain.capture(() { +/// // ... +/// }, onError: (error, stackChain) { +/// print("Caught error $error\n" +/// "$stackChain"); +/// }); +class Chain implements StackTrace { + /// The stack traces that make up this chain. + /// + /// Like the frames in a stack trace, the traces are ordered from most local + /// to least local. The first one is the trace where the actual exception was + /// raised, the second one is where that callback was scheduled, and so on. + final List traces; + + /// The [StackZoneSpecification] for the current zone. + static StackZoneSpecification? get _currentSpec => + Zone.current[_specKey] as StackZoneSpecification?; + + /// If [when] is `true`, runs [callback] in a [Zone] in which the current + /// stack chain is tracked and automatically associated with (most) errors. + /// + /// If [when] is `false`, this does not track stack chains. Instead, it's + /// identical to [runZoned], except that it wraps any errors in + /// [Chain.forTrace]—which will only wrap the trace unless there's a different + /// [Chain.capture] active. This makes it easy for the caller to only capture + /// stack chains in debug mode or during development. + /// + /// If [onError] is passed, any error in the zone that would otherwise go + /// unhandled is passed to it, along with the [Chain] associated with that + /// error. Note that if [callback] produces multiple unhandled errors, + /// [onError] may be called more than once. If [onError] isn't passed, the + /// parent Zone's `unhandledErrorHandler` will be called with the error and + /// its chain. + /// + /// The zone this creates will be an error zone if either [onError] is + /// not `null` and [when] is false, + /// or if both [when] and [errorZone] are `true`. + /// If [errorZone] is `false`, [onError] must be `null`. + /// + /// If [callback] returns a value, it will be returned by [capture] as well. + /// + /// [zoneValues] is added to the [runZoned] calls. + static T capture(T Function() callback, + {void Function(Object error, Chain)? onError, + bool when = true, + bool errorZone = true, + Map? zoneValues}) { + if (!errorZone && onError != null) { + throw ArgumentError.value( + onError, 'onError', 'must be null if errorZone is false'); + } + + if (!when) { + if (onError == null) return runZoned(callback, zoneValues: zoneValues); + return runZonedGuarded(callback, (error, stackTrace) { + onError(error, Chain.forTrace(stackTrace)); + }, zoneValues: zoneValues) as T; + } + + var spec = StackZoneSpecification(onError, errorZone: errorZone); + return runZoned(() { + try { + return callback(); + } on Object catch (error, stackTrace) { + // Forward synchronous errors through the async error path to match the + // behavior of `runZonedGuarded`. + Zone.current.handleUncaughtError(error, stackTrace); + + // If the expected return type of capture() is not nullable, this will + // throw a cast exception. But the only other alternative is to throw + // some other exception. Casting null to T at least lets existing uses + // where T is a nullable type continue to work. + return null as T; + } + }, zoneSpecification: spec.toSpec(), zoneValues: { + ...?zoneValues, + _specKey: spec, + StackZoneSpecification.disableKey: false + }); + } + + /// If [when] is `true` and this is called within a [Chain.capture] zone, runs + /// [callback] in a [Zone] in which chain capturing is disabled. + /// + /// If [callback] returns a value, it will be returned by [disable] as well. + static T disable(T Function() callback, {bool when = true}) { + var zoneValues = + when ? {_specKey: null, StackZoneSpecification.disableKey: true} : null; + + return runZoned(callback, zoneValues: zoneValues); + } + + /// Returns [futureOrStream] unmodified. + /// + /// Prior to Dart 1.7, this was necessary to ensure that stack traces for + /// exceptions reported with [Completer.completeError] and + /// [StreamController.addError] were tracked correctly. + @Deprecated('Chain.track is not necessary in Dart 1.7+.') + static dynamic track(Object? futureOrStream) => futureOrStream; + + /// Returns the current stack chain. + /// + /// By default, the first frame of the first trace will be the line where + /// [Chain.current] is called. If [level] is passed, the first trace will + /// start that many frames up instead. + /// + /// If this is called outside of a [capture] zone, it just returns a + /// single-trace chain. + factory Chain.current([int level = 0]) { + if (_currentSpec != null) return _currentSpec!.currentChain(level + 1); + + var chain = Chain.forTrace(StackTrace.current); + return LazyChain(() { + // JS includes a frame for the call to StackTrace.current, but the VM + // doesn't, so we skip an extra frame in a JS context. + var first = Trace(chain.traces.first.frames.skip(level + (inJS ? 2 : 1)), + original: chain.traces.first.original.toString()); + return Chain([first, ...chain.traces.skip(1)]); + }); + } + + /// Returns the stack chain associated with [trace]. + /// + /// The first stack trace in the returned chain will always be [trace] + /// (converted to a [Trace] if necessary). If there is no chain associated + /// with [trace] or if this is called outside of a [capture] zone, this just + /// returns a single-trace chain containing [trace]. + /// + /// If [trace] is already a [Chain], it will be returned as-is. + factory Chain.forTrace(StackTrace trace) { + if (trace is Chain) return trace; + if (_currentSpec != null) return _currentSpec!.chainFor(trace); + if (trace is Trace) return Chain([trace]); + return LazyChain(() => Chain.parse(trace.toString())); + } + + /// Parses a string representation of a stack chain. + /// + /// If [chain] is the output of a call to [Chain.toString], it will be parsed + /// as a full stack chain. Otherwise, it will be parsed as in [Trace.parse] + /// and returned as a single-trace chain. + factory Chain.parse(String chain) { + if (chain.isEmpty) return Chain([]); + if (chain.contains(vmChainGap)) { + return Chain(chain + .split(vmChainGap) + .where((line) => line.isNotEmpty) + .map(Trace.parseVM)); + } + if (!chain.contains(chainGap)) return Chain([Trace.parse(chain)]); + + return Chain(chain.split(chainGap).map(Trace.parseFriendly)); + } + + /// Returns a new [Chain] comprised of [traces]. + Chain(Iterable traces) : traces = List.unmodifiable(traces); + + /// Returns a terser version of this chain. + /// + /// This calls [Trace.terse] on every trace in [traces], and discards any + /// trace that contain only internal frames. + /// + /// This won't do anything with a raw JavaScript trace, since there's no way + /// to determine which frames come from which Dart libraries. However, the + /// [`source_map_stack_trace`](https://pub.dev/packages/source_map_stack_trace) + /// package can be used to convert JavaScript traces into Dart-style traces. + Chain get terse => foldFrames((_) => false, terse: true); + + /// Returns a new [Chain] based on this chain where multiple stack frames + /// matching [predicate] are folded together. + /// + /// This means that whenever there are multiple frames in a row that match + /// [predicate], only the last one is kept. In addition, traces that are + /// composed entirely of frames matching [predicate] are omitted. + /// + /// This is useful for limiting the amount of library code that appears in a + /// stack trace by only showing user code and code that's called by user code. + /// + /// If [terse] is true, this will also fold together frames from the core + /// library or from this package, and simplify core library frames as in + /// [Trace.terse]. + Chain foldFrames(bool Function(Frame) predicate, {bool terse = false}) { + var foldedTraces = + traces.map((trace) => trace.foldFrames(predicate, terse: terse)); + var nonEmptyTraces = foldedTraces.where((trace) { + // Ignore traces that contain only folded frames. + if (trace.frames.length > 1) return true; + if (trace.frames.isEmpty) return false; + + // In terse mode, the trace may have removed an outer folded frame, + // leaving a single non-folded frame. We can detect a folded frame because + // it has no line information. + if (!terse) return false; + return trace.frames.single.line != null; + }); + + // If all the traces contain only internal processing, preserve the last + // (top-most) one so that the chain isn't empty. + if (nonEmptyTraces.isEmpty && foldedTraces.isNotEmpty) { + return Chain([foldedTraces.last]); + } + + return Chain(nonEmptyTraces); + } + + /// Converts this chain to a [Trace]. + /// + /// The trace version of a chain is just the concatenation of all the traces + /// in the chain. + Trace toTrace() => Trace(traces.expand((trace) => trace.frames)); + + @override + String toString() { + // Figure out the longest path so we know how much to pad. + var longest = traces + .map((trace) => trace.frames + .map((frame) => frame.location.length) + .fold(0, math.max)) + .fold(0, math.max); + + // Don't call out to [Trace.toString] here because that doesn't ensure that + // padding is consistent across all traces. + return traces + .map((trace) => trace.frames + .map((frame) => + '${frame.location.padRight(longest)} ${frame.member}\n') + .join()) + .join(chainGap); + } +} diff --git a/lib/src/pdo/postgres/dependencies/stack_trace/src/frame.dart b/lib/src/pdo/postgres/dependencies/stack_trace/src/frame.dart new file mode 100644 index 0000000..2d991e8 --- /dev/null +++ b/lib/src/pdo/postgres/dependencies/stack_trace/src/frame.dart @@ -0,0 +1,341 @@ + + +import 'package:path/path.dart' as path; + +import 'trace.dart'; +import 'unparsed_frame.dart'; + +// #1 Foo._bar (file:///home/nweiz/code/stuff.dart:42:21) +// #1 Foo._bar (file:///home/nweiz/code/stuff.dart:42) +// #1 Foo._bar (file:///home/nweiz/code/stuff.dart) +final _vmFrame = RegExp(r'^#\d+\s+(\S.*) \((.+?)((?::\d+){0,2})\)$'); + +// at Object.stringify (native) +// at VW.call$0 (https://example.com/stuff.dart.js:560:28) +// at VW.call$0 (eval as fn +// (https://example.com/stuff.dart.js:560:28), efn:3:28) +// at https://example.com/stuff.dart.js:560:28 +final _v8Frame = + RegExp(r'^\s*at (?:(\S.*?)(?: \[as [^\]]+\])? \((.*)\)|(.*))$'); + +// https://example.com/stuff.dart.js:560:28 +// https://example.com/stuff.dart.js:560 +final _v8UrlLocation = RegExp(r'^(.*?):(\d+)(?::(\d+))?$|native$'); + +// eval as function (https://example.com/stuff.dart.js:560:28), efn:3:28 +// eval as function (https://example.com/stuff.dart.js:560:28) +// eval as function (eval as otherFunction +// (https://example.com/stuff.dart.js:560:28)) +final _v8EvalLocation = + RegExp(r'^eval at (?:\S.*?) \((.*)\)(?:, .*?:\d+:\d+)?$'); + +// anonymous/<@https://example.com/stuff.js line 693 > Function:3:40 +// anonymous/<@https://example.com/stuff.js line 693 > eval:3:40 +final _firefoxEvalLocation = + RegExp(r'(\S+)@(\S+) line (\d+) >.* (Function|eval):\d+:\d+'); + +// .VW.call$0@https://example.com/stuff.dart.js:560 +// .VW.call$0("arg")@https://example.com/stuff.dart.js:560 +// .VW.call$0/name<@https://example.com/stuff.dart.js:560 +// .VW.call$0@https://example.com/stuff.dart.js:560:36 +// https://example.com/stuff.dart.js:560 +final _firefoxSafariFrame = RegExp(r'^' + r'(?:' // Member description. Not present in some Safari frames. + r'([^@(/]*)' // The actual name of the member. + r'(?:\(.*\))?' // Arguments to the member, sometimes captured by Firefox. + r'((?:/[^/]*)*)' // Extra characters indicating a nested closure. + r'(?:\(.*\))?' // Arguments to the closure. + r'@' + r')?' + r'(.*?)' // The frame's URL. + r':' + r'(\d*)' // The line number. Empty in Safari if it's unknown. + r'(?::(\d*))?' // The column number. Not present in older browsers and + // empty in Safari if it's unknown. + r'$'); + +// foo/bar.dart 10:11 Foo._bar +// foo/bar.dart 10:11 (anonymous function).dart.fn +// https://dart.dev/foo/bar.dart Foo._bar +// data:... 10:11 Foo._bar +final _friendlyFrame = RegExp(r'^(\S+)(?: (\d+)(?::(\d+))?)?\s+([^\d].*)$'); + +/// A regular expression that matches asynchronous member names generated by the +/// VM. +final _asyncBody = RegExp(r'<(|[^>]+)_async_body>'); + +final _initialDot = RegExp(r'^\.'); + +/// A single stack frame. Each frame points to a precise location in Dart code. +class Frame { + /// The URI of the file in which the code is located. + /// + /// This URI will usually have the scheme `dart`, `file`, `http`, or `https`. + final Uri uri; + + /// The line number on which the code location is located. + /// + /// This can be null, indicating that the line number is unknown or + /// unimportant. + final int? line; + + /// The column number of the code location. + /// + /// This can be null, indicating that the column number is unknown or + /// unimportant. + final int? column; + + /// The name of the member in which the code location occurs. + /// + /// Anonymous closures are represented as `` in this member string. + final String? member; + + /// Whether this stack frame comes from the Dart core libraries. + bool get isCore => uri.scheme == 'dart'; + + /// Returns a human-friendly description of the library that this stack frame + /// comes from. + /// + /// This will usually be the string form of [uri], but a relative URI will be + /// used if possible. Data URIs will be truncated. + String get library { + if (uri.scheme == 'data') return 'data:...'; + return path.prettyUri(uri); + } + + /// Returns the name of the package this stack frame comes from, or `null` if + /// this stack frame doesn't come from a `package:` URL. + String? get package { + if (uri.scheme != 'package') return null; + return uri.path.split('/').first; + } + + /// A human-friendly description of the code location. + String get location { + if (line == null) return library; + if (column == null) return '$library $line'; + return '$library $line:$column'; + } + + /// Returns a single frame of the current stack. + /// + /// By default, this will return the frame above the current method. If + /// [level] is `0`, it will return the current method's frame; if [level] is + /// higher than `1`, it will return higher frames. + factory Frame.caller([int level = 1]) { + if (level < 0) { + throw ArgumentError('Argument [level] must be greater than or equal ' + 'to 0.'); + } + + return Trace.current(level + 1).frames.first; + } + + /// Parses a string representation of a Dart VM stack frame. + factory Frame.parseVM(String frame) => _catchFormatException(frame, () { + // The VM sometimes folds multiple stack frames together and replaces + // them with "...". + if (frame == '...') { + return Frame(Uri(), null, null, '...'); + } + + var match = _vmFrame.firstMatch(frame); + if (match == null) return UnparsedFrame(frame); + + // Get the pieces out of the regexp match. Function, URI and line should + // always be found. The column is optional. + var member = match[1]! + .replaceAll(_asyncBody, '') + .replaceAll('', ''); + var uri = match[2]!.startsWith(' 1 ? int.parse(lineAndColumn[1]) : null; + var column = + lineAndColumn.length > 2 ? int.parse(lineAndColumn[2]) : null; + return Frame(uri, line, column, member); + }); + + /// Parses a string representation of a Chrome/V8 stack frame. + factory Frame.parseV8(String frame) => _catchFormatException(frame, () { + var match = _v8Frame.firstMatch(frame); + if (match == null) return UnparsedFrame(frame); + + // v8 location strings can be arbitrarily-nested, since it adds a layer + // of nesting for each eval performed on that line. + Frame parseLocation(String location, String member) { + var evalMatch = _v8EvalLocation.firstMatch(location); + while (evalMatch != null) { + location = evalMatch[1]!; + evalMatch = _v8EvalLocation.firstMatch(location); + } + + if (location == 'native') { + return Frame(Uri.parse('native'), null, null, member); + } + + var urlMatch = _v8UrlLocation.firstMatch(location); + if (urlMatch == null) return UnparsedFrame(frame); + + final uri = _uriOrPathToUri(urlMatch[1]!); + final line = int.parse(urlMatch[2]!); + final columnMatch = urlMatch[3]; + final column = columnMatch != null ? int.parse(columnMatch) : null; + return Frame(uri, line, column, member); + } + + // V8 stack frames can be in two forms. + if (match[2] != null) { + // The first form looks like " at FUNCTION (LOCATION)". V8 proper + // lists anonymous functions within eval as "", while IE10 + // lists them as "Anonymous function". + return parseLocation( + match[2]!, + match[1]! + .replaceAll('', '') + .replaceAll('Anonymous function', '') + .replaceAll('(anonymous function)', '')); + } else { + // The second form looks like " at LOCATION", and is used for + // anonymous functions. + return parseLocation(match[3]!, ''); + } + }); + + /// Parses a string representation of a JavaScriptCore stack trace. + factory Frame.parseJSCore(String frame) => Frame.parseV8(frame); + + /// Parses a string representation of an IE stack frame. + /// + /// IE10+ frames look just like V8 frames. Prior to IE10, stack traces can't + /// be retrieved. + factory Frame.parseIE(String frame) => Frame.parseV8(frame); + + /// Parses a Firefox 'eval' or 'function' stack frame. + /// + /// for example: + /// anonymous/<@https://example.com/stuff.js line 693 > Function:3:40 + /// anonymous/<@https://example.com/stuff.js line 693 > eval:3:40 + factory Frame._parseFirefoxEval(String frame) => + _catchFormatException(frame, () { + final match = _firefoxEvalLocation.firstMatch(frame); + if (match == null) return UnparsedFrame(frame); + var member = match[1]!.replaceAll('/<', ''); + final uri = _uriOrPathToUri(match[2]!); + final line = int.parse(match[3]!); + if (member.isEmpty || member == 'anonymous') { + member = ''; + } + return Frame(uri, line, null, member); + }); + + /// Parses a string representation of a Firefox stack frame. + factory Frame.parseFirefox(String frame) => _catchFormatException(frame, () { + var match = _firefoxSafariFrame.firstMatch(frame); + if (match == null) return UnparsedFrame(frame); + + if (match[3]!.contains(' line ')) { + return Frame._parseFirefoxEval(frame); + } + + // Normally this is a URI, but in a jsshell trace it can be a path. + var uri = _uriOrPathToUri(match[3]!); + + var member = match[1]; + if (member != null) { + member += + List.filled('/'.allMatches(match[2]!).length, '.').join(); + if (member == '') member = ''; + + // Some Firefox members have initial dots. We remove them for + // consistency with other platforms. + member = member.replaceFirst(_initialDot, ''); + } else { + member = ''; + } + + var line = match[4] == '' ? null : int.parse(match[4]!); + var column = + match[5] == null || match[5] == '' ? null : int.parse(match[5]!); + return Frame(uri, line, column, member); + }); + + /// Parses a string representation of a Safari 6.0 stack frame. + @Deprecated('Use Frame.parseSafari instead.') + factory Frame.parseSafari6_0(String frame) => Frame.parseFirefox(frame); + + /// Parses a string representation of a Safari 6.1+ stack frame. + @Deprecated('Use Frame.parseSafari instead.') + factory Frame.parseSafari6_1(String frame) => Frame.parseFirefox(frame); + + /// Parses a string representation of a Safari stack frame. + factory Frame.parseSafari(String frame) => Frame.parseFirefox(frame); + + /// Parses this package's string representation of a stack frame. + factory Frame.parseFriendly(String frame) => _catchFormatException(frame, () { + var match = _friendlyFrame.firstMatch(frame); + if (match == null) { + throw FormatException( + "Couldn't parse package:stack_trace stack trace line '$frame'."); + } + // Fake truncated data urls generated by the friendly stack trace format + // cause Uri.parse to throw an exception so we have to special case + // them. + var uri = match[1] == 'data:...' + ? Uri.dataFromString('') + : Uri.parse(match[1]!); + // If there's no scheme, this is a relative URI. We should interpret it + // as relative to the current working directory. + if (uri.scheme == '') { + uri = path.toUri(path.absolute(path.fromUri(uri))); + } + + var line = match[2] == null ? null : int.parse(match[2]!); + var column = match[3] == null ? null : int.parse(match[3]!); + return Frame(uri, line, column, match[4]); + }); + + /// A regular expression matching an absolute URI. + static final _uriRegExp = RegExp(r'^[a-zA-Z][-+.a-zA-Z\d]*://'); + + /// A regular expression matching a Windows path. + static final _windowsRegExp = RegExp(r'^([a-zA-Z]:[\\/]|\\\\)'); + + /// Converts [uriOrPath], which can be a URI, a Windows path, or a Posix path, + /// to a URI (absolute if possible). + static Uri _uriOrPathToUri(String uriOrPath) { + if (uriOrPath.contains(_uriRegExp)) { + return Uri.parse(uriOrPath); + } else if (uriOrPath.contains(_windowsRegExp)) { + return Uri.file(uriOrPath, windows: true); + } else if (uriOrPath.startsWith('/')) { + return Uri.file(uriOrPath, windows: false); + } + + // As far as I've seen, Firefox and V8 both always report absolute paths in + // their stack frames. However, if we do get a relative path, we should + // handle it gracefully. + if (uriOrPath.contains('\\')) return path.windows.toUri(uriOrPath); + return Uri.parse(uriOrPath); + } + + /// Runs [body] and returns its result. + /// + /// If [body] throws a [FormatException], returns an [UnparsedFrame] with + /// [text] instead. + static Frame _catchFormatException(String text, Frame Function() body) { + try { + return body(); + } on FormatException catch (_) { + return UnparsedFrame(text); + } + } + + Frame(this.uri, this.line, this.column, this.member); + + @override + String toString() => '$location in $member'; +} diff --git a/lib/src/pdo/postgres/dependencies/stack_trace/src/lazy_chain.dart b/lib/src/pdo/postgres/dependencies/stack_trace/src/lazy_chain.dart new file mode 100644 index 0000000..5edfcdf --- /dev/null +++ b/lib/src/pdo/postgres/dependencies/stack_trace/src/lazy_chain.dart @@ -0,0 +1,31 @@ + + +import 'chain.dart'; +import 'frame.dart'; +import 'lazy_trace.dart'; +import 'trace.dart'; + +/// A thunk for lazily constructing a [Chain]. +typedef ChainThunk = Chain Function(); + +/// A wrapper around a [ChainThunk]. This works around issue 9579 by avoiding +/// the conversion of native [StackTrace]s to strings until it's absolutely +/// necessary. +class LazyChain implements Chain { + final ChainThunk _thunk; + late final Chain _chain = _thunk(); + + LazyChain(this._thunk); + + @override + List get traces => _chain.traces; + @override + Chain get terse => _chain.terse; + @override + Chain foldFrames(bool Function(Frame) predicate, {bool terse = false}) => + LazyChain(() => _chain.foldFrames(predicate, terse: terse)); + @override + Trace toTrace() => LazyTrace(_chain.toTrace); + @override + String toString() => _chain.toString(); +} diff --git a/lib/src/pdo/postgres/dependencies/stack_trace/src/lazy_trace.dart b/lib/src/pdo/postgres/dependencies/stack_trace/src/lazy_trace.dart new file mode 100644 index 0000000..d5f0fdd --- /dev/null +++ b/lib/src/pdo/postgres/dependencies/stack_trace/src/lazy_trace.dart @@ -0,0 +1,31 @@ + + +import 'frame.dart'; +import 'trace.dart'; + +/// A thunk for lazily constructing a [Trace]. +typedef TraceThunk = Trace Function(); + +/// A wrapper around a [TraceThunk]. This works around issue 9579 by avoiding +/// the conversion of native [StackTrace]s to strings until it's absolutely +/// necessary. +class LazyTrace implements Trace { + final TraceThunk _thunk; + late final Trace _trace = _thunk(); + + LazyTrace(this._thunk); + + @override + List get frames => _trace.frames; + @override + StackTrace get original => _trace.original; + @override + StackTrace get vmTrace => _trace.vmTrace; + @override + Trace get terse => LazyTrace(() => _trace.terse); + @override + Trace foldFrames(bool Function(Frame) predicate, {bool terse = false}) => + LazyTrace(() => _trace.foldFrames(predicate, terse: terse)); + @override + String toString() => _trace.toString(); +} diff --git a/lib/src/pdo/postgres/dependencies/stack_trace/src/stack_zone_specification.dart b/lib/src/pdo/postgres/dependencies/stack_trace/src/stack_zone_specification.dart new file mode 100644 index 0000000..2b8f737 --- /dev/null +++ b/lib/src/pdo/postgres/dependencies/stack_trace/src/stack_zone_specification.dart @@ -0,0 +1,260 @@ + + +import 'dart:async'; + +import 'chain.dart'; +import 'lazy_chain.dart'; +import 'lazy_trace.dart'; +import 'trace.dart'; +import 'utils.dart'; + +/// A class encapsulating the zone specification for a [Chain.capture] zone. +/// +/// Until they're materialized and exposed to the user, stack chains are tracked +/// as linked lists of [Trace]s using the [_Node] class. These nodes are stored +/// in three distinct ways: +/// +/// * When a callback is registered, a node is created and stored as a captured +/// local variable until the callback is run. +/// +/// * When a callback is run, its captured node is set as the [_currentNode] so +/// it can be available to [Chain.current] and to be linked into additional +/// chains when more callbacks are scheduled. +/// +/// * When a callback throws an error or a Future or Stream emits an error, the +/// current node is associated with that error's stack trace using the +/// [_chains] expando. +/// +/// Since [ZoneSpecification] can't be extended or even implemented, in order to +/// get a real [ZoneSpecification] instance it's necessary to call [toSpec]. +class StackZoneSpecification { + /// An opaque object used as a zone value to disable chain tracking in a given + /// zone. + /// + /// If `Zone.current[disableKey]` is `true`, no stack chains will be tracked. + static final disableKey = Object(); + + /// Whether chain-tracking is disabled in the current zone. + bool get _disabled => Zone.current[disableKey] == true; + + /// The expando that associates stack chains with [StackTrace]s. + /// + /// The chains are associated with stack traces rather than errors themselves + /// because it's a common practice to throw strings as errors, which can't be + /// used with expandos. + /// + /// The chain associated with a given stack trace doesn't contain a node for + /// that stack trace. + final _chains = Expando<_Node>('stack chains'); + + /// The error handler for the zone. + /// + /// If this is null, that indicates that any unhandled errors should be passed + /// to the parent zone. + final void Function(Object error, Chain)? _onError; + + /// The most recent node of the current stack chain. + _Node? _currentNode; + + /// Whether this is an error zone. + final bool _errorZone; + + StackZoneSpecification(this._onError, {bool errorZone = true}) + : _errorZone = errorZone; + + /// Converts this specification to a real [ZoneSpecification]. + ZoneSpecification toSpec() => ZoneSpecification( + handleUncaughtError: _errorZone ? _handleUncaughtError : null, + registerCallback: _registerCallback, + registerUnaryCallback: _registerUnaryCallback, + registerBinaryCallback: _registerBinaryCallback, + errorCallback: _errorCallback); + + /// Returns the current stack chain. + /// + /// By default, the first frame of the first trace will be the line where + /// [currentChain] is called. If [level] is passed, the first trace will start + /// that many frames up instead. + Chain currentChain([int level = 0]) => _createNode(level + 1).toChain(); + + /// Returns the stack chain associated with [trace], if one exists. + /// + /// The first stack trace in the returned chain will always be [trace] + /// (converted to a [Trace] if necessary). If there is no chain associated + /// with [trace], this just returns a single-trace chain containing [trace]. + Chain chainFor(StackTrace? trace) { + if (trace is Chain) return trace; + trace ??= StackTrace.current; + + var previous = _chains[trace] ?? _currentNode; + if (previous == null) { + // If there's no [_currentNode], we're running synchronously beneath + // [Chain.capture] and we should fall back to the VM's stack chaining. We + // can't use [Chain.from] here because it'll just call [chainFor] again. + if (trace is Trace) return Chain([trace]); + return LazyChain(() => Chain.parse(trace!.toString())); + } else { + if (trace is! Trace) { + var original = trace; + trace = LazyTrace(() => Trace.parse(_trimVMChain(original))); + } + + return _Node(trace, previous).toChain(); + } + } + + /// Tracks the current stack chain so it can be set to [_currentNode] when + /// [f] is run. + ZoneCallback _registerCallback( + Zone self, ZoneDelegate parent, Zone zone, R Function() f) { + if (_disabled) return parent.registerCallback(zone, f); + var node = _createNode(1); + return parent.registerCallback(zone, () => _run(f, node)); + } + + /// Tracks the current stack chain so it can be set to [_currentNode] when + /// [f] is run. + ZoneUnaryCallback _registerUnaryCallback( + Zone self, + ZoneDelegate parent, + Zone zone, + @pragma('vm:awaiter-link') R Function(T) f) { + if (_disabled) return parent.registerUnaryCallback(zone, f); + var node = _createNode(1); + return parent.registerUnaryCallback( + zone, (arg) => _run(() => f(arg), node)); + } + + /// Tracks the current stack chain so it can be set to [_currentNode] when + /// [f] is run. + ZoneBinaryCallback _registerBinaryCallback( + Zone self, ZoneDelegate parent, Zone zone, R Function(T1, T2) f) { + if (_disabled) return parent.registerBinaryCallback(zone, f); + + var node = _createNode(1); + return parent.registerBinaryCallback( + zone, (arg1, arg2) => _run(() => f(arg1, arg2), node)); + } + + /// Looks up the chain associated with [stackTrace] and passes it either to + /// [_onError] or [parent]'s error handler. + void _handleUncaughtError(Zone self, ZoneDelegate parent, Zone zone, + Object error, StackTrace stackTrace) { + if (_disabled) { + parent.handleUncaughtError(zone, error, stackTrace); + return; + } + + var stackChain = chainFor(stackTrace); + if (_onError == null) { + parent.handleUncaughtError(zone, error, stackChain); + return; + } + + // TODO(nweiz): Currently this copies a lot of logic from [runZoned]. Just + // allow [runBinary] to throw instead once issue 18134 is fixed. + try { + // TODO(rnystrom): Is the null-assertion correct here? It is nullable in + // Zone. Should we check for that here? + self.parent!.runBinary(_onError!, error, stackChain); + } on Object catch (newError, newStackTrace) { + if (identical(newError, error)) { + parent.handleUncaughtError(zone, error, stackChain); + } else { + parent.handleUncaughtError(zone, newError, newStackTrace); + } + } + } + + /// Attaches the current stack chain to [stackTrace], replacing it if + /// necessary. + AsyncError? _errorCallback(Zone self, ZoneDelegate parent, Zone zone, + Object error, StackTrace? stackTrace) { + if (_disabled) return parent.errorCallback(zone, error, stackTrace); + + // Go up two levels to get through [_CustomZone.errorCallback]. + if (stackTrace == null) { + stackTrace = _createNode(2).toChain(); + } else { + if (_chains[stackTrace] == null) _chains[stackTrace] = _createNode(2); + } + + var asyncError = parent.errorCallback(zone, error, stackTrace); + return asyncError ?? AsyncError(error, stackTrace); + } + + /// Creates a [_Node] with the current stack trace and linked to + /// [_currentNode]. + /// + /// By default, the first frame of the first trace will be the line where + /// [_createNode] is called. If [level] is passed, the first trace will start + /// that many frames up instead. + _Node _createNode([int level = 0]) => + _Node(_currentTrace(level + 1), _currentNode); + + // TODO(nweiz): use a more robust way of detecting and tracking errors when + // issue 15105 is fixed. + /// Runs [f] with [_currentNode] set to [node]. + /// + /// If [f] throws an error, this associates [node] with that error's stack + /// trace. + T _run(T Function() f, _Node node) { + var previousNode = _currentNode; + _currentNode = node; + try { + return f(); + } catch (e, stackTrace) { + // We can see the same stack trace multiple times if it's rethrown through + // guarded callbacks. The innermost chain will have the most + // information so it should take precedence. + _chains[stackTrace] ??= node; + rethrow; + } finally { + _currentNode = previousNode; + } + } + + /// Like [Trace.current], but if the current stack trace has VM chaining + /// enabled, this only returns the innermost sub-trace. + Trace _currentTrace([int? level]) { + var stackTrace = StackTrace.current; + return LazyTrace(() { + var text = _trimVMChain(stackTrace); + var trace = Trace.parse(text); + // JS includes a frame for the call to StackTrace.current, but the VM + // doesn't, so we skip an extra frame in a JS context. + return Trace(trace.frames.skip((level ?? 0) + (inJS ? 2 : 1)), + original: text); + }); + } + + /// Removes the VM's stack chains from the native [trace], since we're + /// generating our own and we don't want duplicate frames. + String _trimVMChain(StackTrace trace) { + var text = trace.toString(); + var index = text.indexOf(vmChainGap); + return index == -1 ? text : text.substring(0, index); + } +} + +/// A linked list node representing a single entry in a stack chain. +class _Node { + /// The stack trace for this link of the chain. + final Trace trace; + + /// The previous node in the chain. + final _Node? previous; + + _Node(StackTrace trace, [this.previous]) : trace = Trace.from(trace); + + /// Converts this to a [Chain]. + Chain toChain() { + var nodes = []; + _Node? node = this; + while (node != null) { + nodes.add(node.trace); + node = node.previous; + } + return Chain(nodes); + } +} diff --git a/lib/src/pdo/postgres/dependencies/stack_trace/src/trace.dart b/lib/src/pdo/postgres/dependencies/stack_trace/src/trace.dart new file mode 100644 index 0000000..5ce04f2 --- /dev/null +++ b/lib/src/pdo/postgres/dependencies/stack_trace/src/trace.dart @@ -0,0 +1,336 @@ + + +import 'dart:math' as math; + +import 'chain.dart'; +import 'frame.dart'; +import 'lazy_trace.dart'; +import 'unparsed_frame.dart'; +import 'utils.dart'; +import 'vm_trace.dart'; + +final _terseRegExp = RegExp(r'(-patch)?([/\\].*)?$'); + +/// A RegExp to match V8's stack traces. +/// +/// V8's traces start with a line that's either just "Error" or else is a +/// description of the exception that occurred. That description can be multiple +/// lines, so we just look for any line other than the first that begins with +/// three or four spaces and "at". +final _v8Trace = RegExp(r'\n ?at '); + +/// A RegExp to match indidual lines of V8's stack traces. +/// +/// This is intended to filter out the leading exception details of the trace +/// though it is possible for the message to match this as well. +final _v8TraceLine = RegExp(r' ?at '); + +/// A RegExp to match Firefox's eval and Function stack traces. +/// +/// https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Error/stack +/// +/// These stack traces looks like: +/// anonymous/<@https://example.com/stuff.js line 693 > Function:3:40 +/// anonymous/<@https://example.com/stuff.js line 693 > eval:3:40 +final _firefoxEvalTrace = RegExp(r'@\S+ line \d+ >.* (Function|eval):\d+:\d+'); + +/// A RegExp to match Firefox and Safari's stack traces. +/// +/// Firefox and Safari have very similar stack trace formats, so we use the same +/// logic for parsing them. +/// +/// Firefox's trace frames start with the name of the function in which the +/// error occurred, possibly including its parameters inside `()`. For example, +/// `.VW.call$0("arg")@https://example.com/stuff.dart.js:560`. +/// +/// Safari traces occasionally don't include the initial method name followed by +/// "@", and they always have both the line and column number (or just a +/// trailing colon if no column number is available). They can also contain +/// empty lines or lines consisting only of `[native code]`. +final _firefoxSafariTrace = RegExp( + r'^' + r'(' // Member description. Not present in some Safari frames. + r'([.0-9A-Za-z_$/<]|\(.*\))*' // Member name and arguments. + r'@' + r')?' + r'[^\s]*' // Frame URL. + r':\d*' // Line or column number. Some older frames only have a line number. + r'$', + multiLine: true); + +/// A RegExp to match this package's stack traces. +final _friendlyTrace = + RegExp(r'^[^\s<][^\s]*( \d+(:\d+)?)?[ \t]+[^\s]+$', multiLine: true); + +/// A stack trace, comprised of a list of stack frames. +class Trace implements StackTrace { + /// The stack frames that comprise this stack trace. + final List frames; + + /// The original stack trace from which this trace was parsed. + final StackTrace original; + + /// Returns a human-readable representation of [stackTrace]. If [terse] is + /// set, this folds together multiple stack frames from the Dart core + /// libraries, so that only the core library method directly called from user + /// code is visible (see [Trace.terse]). + static String format(StackTrace stackTrace, {bool terse = true}) { + var trace = Trace.from(stackTrace); + if (terse) trace = trace.terse; + return trace.toString(); + } + + /// Returns the current stack trace. + /// + /// By default, the first frame of this trace will be the line where + /// [Trace.current] is called. If [level] is passed, the trace will start that + /// many frames up instead. + factory Trace.current([int level = 0]) { + if (level < 0) { + throw ArgumentError('Argument [level] must be greater than or equal ' + 'to 0.'); + } + + var trace = Trace.from(StackTrace.current); + return LazyTrace( + () => + // JS includes a frame for the call to StackTrace.current, but the VM + // doesn't, so we skip an extra frame in a JS context. + Trace(trace.frames.skip(level + (inJS ? 2 : 1)), + original: trace.original.toString()), + ); + } + + /// Returns a new stack trace containing the same data as [trace]. + /// + /// If [trace] is a native [StackTrace], its data will be parsed out; if it's + /// a [Trace], it will be returned as-is. + factory Trace.from(StackTrace trace) { + if (trace is Trace) return trace; + if (trace is Chain) return trace.toTrace(); + return LazyTrace(() => Trace.parse(trace.toString())); + } + + /// Parses a string representation of a stack trace. + /// + /// [trace] should be formatted in the same way as a Dart VM or browser stack + /// trace. If it's formatted as a stack chain, this will return the equivalent + /// of [Chain.toTrace]. + factory Trace.parse(String trace) { + try { + if (trace.isEmpty) return Trace([]); + if (trace.contains(_v8Trace)) return Trace.parseV8(trace); + if (trace.contains('\tat ')) return Trace.parseJSCore(trace); + if (trace.contains(_firefoxSafariTrace) || + trace.contains(_firefoxEvalTrace)) { + return Trace.parseFirefox(trace); + } + if (trace.contains(chainGap)) return Chain.parse(trace).toTrace(); + if (trace.contains(_friendlyTrace)) { + return Trace.parseFriendly(trace); + } + + // Default to parsing the stack trace as a VM trace. This is also hit on + // IE and Safari, where the stack trace is just an empty string (issue + // 11257). + return Trace.parseVM(trace); + } on FormatException catch (error) { + throw FormatException('${error.message}\nStack trace:\n$trace'); + } + } + + /// Parses a string representation of a Dart VM stack trace. + Trace.parseVM(String trace) : this(_parseVM(trace), original: trace); + + static List _parseVM(String trace) { + // Ignore [vmChainGap]. This matches the behavior of + // `Chain.parse().toTrace()`. + var lines = trace + .trim() + .replaceAll(vmChainGap, '') + .split('\n') + .where((line) => line.isNotEmpty); + + if (lines.isEmpty) { + return []; + } + + var frames = lines.take(lines.length - 1).map(Frame.parseVM).toList(); + + // TODO(nweiz): Remove this when issue 23614 is fixed. + if (!lines.last.endsWith('.da')) { + frames.add(Frame.parseVM(lines.last)); + } + + return frames; + } + + /// Parses a string representation of a Chrome/V8 stack trace. + Trace.parseV8(String trace) + : this( + trace + .split('\n') + .skip(1) + // It's possible that an Exception's description contains a line + // that looks like a V8 trace line, which will screw this up. + // Unfortunately, that's impossible to detect. + .skipWhile((line) => !line.startsWith(_v8TraceLine)) + .map(Frame.parseV8), + original: trace); + + /// Parses a string representation of a JavaScriptCore stack trace. + Trace.parseJSCore(String trace) + : this( + trace + .split('\n') + .where((line) => line != '\tat ') + .map(Frame.parseV8), + original: trace); + + /// Parses a string representation of an Internet Explorer stack trace. + /// + /// IE10+ traces look just like V8 traces. Prior to IE10, stack traces can't + /// be retrieved. + Trace.parseIE(String trace) : this.parseV8(trace); + + /// Parses a string representation of a Firefox stack trace. + Trace.parseFirefox(String trace) + : this( + trace + .trim() + .split('\n') + .where((line) => line.isNotEmpty && line != '[native code]') + .map(Frame.parseFirefox), + original: trace); + + /// Parses a string representation of a Safari stack trace. + Trace.parseSafari(String trace) : this.parseFirefox(trace); + + /// Parses a string representation of a Safari 6.1+ stack trace. + @Deprecated('Use Trace.parseSafari instead.') + Trace.parseSafari6_1(String trace) : this.parseSafari(trace); + + /// Parses a string representation of a Safari 6.0 stack trace. + @Deprecated('Use Trace.parseSafari instead.') + Trace.parseSafari6_0(String trace) + : this( + trace + .trim() + .split('\n') + .where((line) => line != '[native code]') + .map(Frame.parseFirefox), + original: trace); + + /// Parses this package's string representation of a stack trace. + /// + /// This also parses string representations of [Chain]s. They parse to the + /// same trace that [Chain.toTrace] would return. + Trace.parseFriendly(String trace) + : this( + trace.isEmpty + ? [] + : trace + .trim() + .split('\n') + // Filter out asynchronous gaps from [Chain]s. + .where((line) => !line.startsWith('=====')) + .map(Frame.parseFriendly), + original: trace); + + /// Returns a new [Trace] comprised of [frames]. + Trace(Iterable frames, {String? original}) + : frames = List.unmodifiable(frames), + original = StackTrace.fromString(original ?? ''); + + /// Returns a VM-style [StackTrace] object. + /// + /// The return value's [toString] method will always return a string + /// representation in the Dart VM's stack trace format, regardless of what + /// platform is being used. + StackTrace get vmTrace => VMTrace(frames); + + /// Returns a terser version of this trace. + /// + /// This is accomplished by folding together multiple stack frames from the + /// core library or from this package, as in [foldFrames]. Remaining core + /// library frames have their libraries, "-patch" suffixes, and line numbers + /// removed. If the outermost frame of the stack trace is a core library + /// frame, it's removed entirely. + /// + /// This won't do anything with a raw JavaScript trace, since there's no way + /// to determine which frames come from which Dart libraries. However, the + /// [`source_map_stack_trace`][https://pub.dev/packages/source_map_stack_trace] + /// package can be used to convert JavaScript traces into Dart-style traces. + /// + /// For custom folding, see [foldFrames]. + Trace get terse => foldFrames((_) => false, terse: true); + + /// Returns a new [Trace] based on `this` where multiple stack frames matching + /// [predicate] are folded together. + /// + /// This means that whenever there are multiple frames in a row that match + /// [predicate], only the last one is kept. This is useful for limiting the + /// amount of library code that appears in a stack trace by only showing user + /// code and code that's called by user code. + /// + /// If [terse] is true, this will also fold together frames from the core + /// library or from this package, simplify core library frames, and + /// potentially remove the outermost frame as in [Trace.terse]. + Trace foldFrames(bool Function(Frame) predicate, {bool terse = false}) { + if (terse) { + var oldPredicate = predicate; + predicate = (frame) { + if (oldPredicate(frame)) return true; + + if (frame.isCore) return true; + if (frame.package == 'stack_trace') return true; + + // Ignore async stack frames without any line or column information. + // These come from the VM's async/await implementation and represent + // internal frames. They only ever show up in stack chains and are + // always surrounded by other traces that are actually useful, so we can + // just get rid of them. + // TODO(nweiz): Get rid of this logic some time after issue 22009 is + // fixed. + if (!frame.member!.contains('')) return false; + return frame.line == null; + }; + } + + var newFrames = []; + for (var frame in frames.reversed) { + if (frame is UnparsedFrame || !predicate(frame)) { + newFrames.add(frame); + } else if (newFrames.isEmpty || !predicate(newFrames.last)) { + newFrames.add(Frame(frame.uri, frame.line, frame.column, frame.member)); + } + } + + if (terse) { + newFrames = newFrames.map((frame) { + if (frame is UnparsedFrame || !predicate(frame)) return frame; + var library = frame.library.replaceAll(_terseRegExp, ''); + return Frame(Uri.parse(library), null, null, frame.member); + }).toList(); + + if (newFrames.length > 1 && predicate(newFrames.first)) { + newFrames.removeAt(0); + } + } + + return Trace(newFrames.reversed, original: original.toString()); + } + + @override + String toString() { + // Figure out the longest path so we know how much to pad. + var longest = + frames.map((frame) => frame.location.length).fold(0, math.max); + + // Print out the stack trace nicely formatted. + return frames.map((frame) { + if (frame is UnparsedFrame) return '$frame\n'; + return '${frame.location.padRight(longest)} ${frame.member}\n'; + }).join(); + } +} diff --git a/lib/src/pdo/postgres/dependencies/stack_trace/src/unparsed_frame.dart b/lib/src/pdo/postgres/dependencies/stack_trace/src/unparsed_frame.dart new file mode 100644 index 0000000..2be6090 --- /dev/null +++ b/lib/src/pdo/postgres/dependencies/stack_trace/src/unparsed_frame.dart @@ -0,0 +1,30 @@ + +import 'frame.dart'; + +/// A frame that failed to parse. +/// +/// The [member] property contains the original frame's contents. +class UnparsedFrame implements Frame { + @override + final Uri uri = Uri(path: 'unparsed'); + @override + final int? line = null; + @override + final int? column = null; + @override + final bool isCore = false; + @override + final String library = 'unparsed'; + @override + final String? package = null; + @override + final String location = 'unparsed'; + + @override + final String member; + + UnparsedFrame(this.member); + + @override + String toString() => member; +} diff --git a/lib/src/pdo/postgres/dependencies/stack_trace/src/utils.dart b/lib/src/pdo/postgres/dependencies/stack_trace/src/utils.dart new file mode 100644 index 0000000..4ddb0b7 --- /dev/null +++ b/lib/src/pdo/postgres/dependencies/stack_trace/src/utils.dart @@ -0,0 +1,11 @@ +/// The line used in the string representation of stack chains to represent +/// the gap between traces. +const chainGap = '===== asynchronous gap ===========================\n'; + +/// The line used in the string representation of VM stack chains to represent +/// the gap between traces. +final vmChainGap = RegExp(r'^\n?$', multiLine: true); + +// TODO(nweiz): When cross-platform imports work, use them to set this. +/// Whether we're running in a JS context. +const bool inJS = 0.0 is int; diff --git a/lib/src/pdo/postgres/dependencies/stack_trace/src/vm_trace.dart b/lib/src/pdo/postgres/dependencies/stack_trace/src/vm_trace.dart new file mode 100644 index 0000000..afffb06 --- /dev/null +++ b/lib/src/pdo/postgres/dependencies/stack_trace/src/vm_trace.dart @@ -0,0 +1,28 @@ +import 'frame.dart'; + +/// An implementation of [StackTrace] that emulates the behavior of the VM's +/// implementation. +/// +/// In particular, when [toString] is called, this returns a string in the VM's +/// stack trace format. +class VMTrace implements StackTrace { + /// The stack frames that comprise this stack trace. + final List frames; + + VMTrace(this.frames); + + @override + String toString() { + var i = 1; + return frames.map((frame) { + var number = '#${i++}'.padRight(8); + var member = frame.member! + .replaceAllMapped(RegExp(r'[^.]+\.'), + (match) => '${match[1]}.<${match[1]}_async_body>') + .replaceAll('', ''); + var line = frame.line ?? 0; + var column = frame.column ?? 0; + return '$number$member (${frame.uri}:$line:$column)\n'; + }).join(); + } +} diff --git a/lib/src/pdo/postgres/dependencies/stack_trace/stack_trace.dart b/lib/src/pdo/postgres/dependencies/stack_trace/stack_trace.dart new file mode 100644 index 0000000..0e4b85b --- /dev/null +++ b/lib/src/pdo/postgres/dependencies/stack_trace/stack_trace.dart @@ -0,0 +1,4 @@ +export 'src/chain.dart'; +export 'src/frame.dart'; +export 'src/trace.dart'; +export 'src/unparsed_frame.dart'; diff --git a/lib/src/pdo/postgres/postgres_pdo.dart b/lib/src/pdo/postgres/postgres_pdo.dart index 18d15a2..3c9084f 100644 --- a/lib/src/pdo/postgres/postgres_pdo.dart +++ b/lib/src/pdo/postgres/postgres_pdo.dart @@ -5,6 +5,8 @@ import 'package:enough_convert/windows.dart'; import 'postgres_pdo_transaction.dart'; import 'package:postgres_fork/postgres.dart'; +import 'dependencies/postgres_pool/postgres_pool.dart'; + class PostgresPDO extends PDOInterface { /// default query Timeout = 30 seconds static const defaultTimeoutInSeconds = 30; @@ -31,7 +33,7 @@ class PostgresPDO extends PDOInterface { } /// CoreConnection - late PostgreSQLConnection connection; + late dynamic connection; Encoding _getEncoding(String encoding) { switch (encoding.toLowerCase()) { @@ -55,31 +57,55 @@ class PostgresPDO extends PDOInterface { Future connect() async { final dsnParser = DSNParser(dsn, DsnType.pdoPostgreSql); - connection = PostgreSQLConnection( - dsnParser.host, - dsnParser.port, - dsnParser.database, - username: user, - password: password, - encoding: _getEncoding(dsnParser.charset ?? 'utf8'), - ); - await connection.open(); + if (dsnParser.pool == true) { + final endpoint = PgEndpoint( + host: dsnParser.host, + port: dsnParser.port, + database: dsnParser.database, + username: user, + password: password, + ); + final settings = PgPoolSettings(); + settings.encoding = _getEncoding(dsnParser.charset ?? 'utf8'); + settings.maxConnectionCount = dsnParser.poolSize; + settings.onOpen = (conn) async { + await _onOpen(conn, dsnParser); + }; + connection = PgPool(endpoint, settings: settings); + //print('dsnParser.pool ${dsnParser.pool} $connection'); + } else { + connection = PostgreSQLConnection( + dsnParser.host, + dsnParser.port, + dsnParser.database, + username: user, + password: password, + encoding: _getEncoding(dsnParser.charset ?? 'utf8'), + ); + final conn = connection as PostgreSQLConnection; + await conn.open(); + await _onOpen(conn, dsnParser); + } + + return this; + } + /// inicializa configurações ao conectar com o banco de dados + Future _onOpen( + PostgreSQLExecutionContext conn, DSNParser dsnParser) async { if (dsnParser.charset != null) { - await connection.execute("SET client_encoding = '${dsnParser.charset}';"); + await conn.execute("SET client_encoding = '${dsnParser.charset}'"); } if (dsnParser.schema != null) { - await connection.execute("SET search_path TO '${dsnParser.schema}';"); + await conn.execute("SET search_path TO ${dsnParser.schema}"); } if (dsnParser.timezone != null) { - await connection.execute("SET time zone '${dsnParser.timezone};'"); + await conn.execute("SET time zone '${dsnParser.timezone}'"); } if (dsnParser.applicationName != null) { - await connection - .execute("SET application_name TO '${dsnParser.applicationName};'"); + await conn + .execute("SET application_name TO '${dsnParser.applicationName}'"); } - - return this; } Future runInTransaction(Future operation(PostgresPDOTransaction ctx), @@ -87,12 +113,22 @@ class PostgresPDO extends PDOInterface { if (timeoutInSeconds == null) { timeoutInSeconds = defaultTimeoutInSeconds; } - final res = await connection.transaction((transaCtx) async { - final pdoCtx = PostgresPDOTransaction(transaCtx, this); - return operation(pdoCtx); - }, commitTimeoutInSeconds: timeoutInSeconds); - - return res as T; + if (connection is PostgreSQLConnection) { + final res = await (connection as PostgreSQLConnection).transaction( + (transaCtx) async { + final pdoCtx = PostgresPDOTransaction(transaCtx, this); + return operation(pdoCtx); + }, commitTimeoutInSeconds: timeoutInSeconds); + return res as T; + } else { + final res = await (connection as PgPool).runTx( + (transaCtx) async { + final pdoCtx = PostgresPDOTransaction(transaCtx, this); + return operation(pdoCtx); + }, + ).timeout(Duration(seconds: timeoutInSeconds)); + return res; + } } /// Executa uma instrução SQL e retornar o número de linhas afetadas @@ -100,11 +136,18 @@ class PostgresPDO extends PDOInterface { if (timeoutInSeconds == null) { timeoutInSeconds = defaultTimeoutInSeconds; } - final result = await connection.execute(statement, - timeoutInSeconds: timeoutInSeconds, - placeholderIdentifier: PlaceholderIdentifier.onlyQuestionMark); - - return result; + if (connection is PostgreSQLConnection) { + final result = await (connection as PostgreSQLConnection).execute( + statement, + timeoutInSeconds: timeoutInSeconds, + placeholderIdentifier: PlaceholderIdentifier.onlyQuestionMark); + return result; + } else { + final result = await (connection as PgPool).execute(statement, + timeoutInSeconds: timeoutInSeconds, + placeholderIdentifier: PlaceholderIdentifier.onlyQuestionMark); + return result; + } } /// Prepares and executes an SQL statement @@ -113,37 +156,37 @@ class PostgresPDO extends PDOInterface { if (timeoutInSeconds == null) { timeoutInSeconds = defaultTimeoutInSeconds; } - // final rows = await connection.mappedResultsQuery( - // query, - // substitutionValues: params, - // timeoutInSeconds: timeoutInSeconds, - // placeholderIdentifier: PlaceholderIdentifier.onlyQuestionMark, - // ); - - final rs = await connection.query( - query, - substitutionValues: params, - // allowReuse: allowReuse ?? false, - timeoutInSeconds: timeoutInSeconds, - placeholderIdentifier: PlaceholderIdentifier.onlyQuestionMark, - ); - - final rows = rs.map((row) => row.toColumnMap()).toList(); - - // final maps = >[]; - // if (rows.isNotEmpty) { - // for (var item in rows) { - // //Combine/merge multiple maps into 1 map - // maps.add(item.values.reduce((map1, map2) => map1..addAll(map2))); - // } - // } - - final pdoResult = PDOResults(rows, rs.affectedRowCount); - return pdoResult; + if (connection is PostgreSQLConnection) { + final rs = await (connection as PostgreSQLConnection).query( + query, + substitutionValues: params, + // allowReuse: allowReuse ?? false, + timeoutInSeconds: timeoutInSeconds, + placeholderIdentifier: PlaceholderIdentifier.onlyQuestionMark, + ); + final rows = rs.map((row) => row.toColumnMap()).toList(); + final pdoResult = PDOResults(rows, rs.affectedRowCount); + return pdoResult; + } else { + final rs = await (connection as PgPool).query( + query, + substitutionValues: params, + // allowReuse: allowReuse ?? false, + timeoutInSeconds: timeoutInSeconds, + placeholderIdentifier: PlaceholderIdentifier.onlyQuestionMark, + ); + final rows = rs.map((row) => row.toColumnMap()).toList(); + final pdoResult = PDOResults(rows, rs.affectedRowCount); + return pdoResult; + } } @override Future close() async { - await connection.close(); + if (connection is PostgreSQLConnection) { + await (connection as PostgreSQLConnection).close(); + } else { + await (connection as PgPool).close(); + } } } diff --git a/lib/src/pdo/postgres_v3/postgres_v3_pdo.dart b/lib/src/pdo/postgres_v3/postgres_v3_pdo.dart index 33a6144..bfa45eb 100644 --- a/lib/src/pdo/postgres_v3/postgres_v3_pdo.dart +++ b/lib/src/pdo/postgres_v3/postgres_v3_pdo.dart @@ -56,7 +56,6 @@ class PostgresV3PDO extends PDOInterface { Future connect() async { final dsnParser = DSNParser(dsn, DsnType.pdoPostgreSql); - // dsnParser.sslmode?.toString() == 'require' final endpoint = Endpoint( host: dsnParser.host, port: dsnParser.port, @@ -76,20 +75,7 @@ class PostgresV3PDO extends PDOInterface { applicationName: dsnParser.applicationName, timeZone: dsnParser.timezone, onOpen: (conn) async { - if (dsnParser.charset != null) { - await conn - .execute("SET client_encoding = '${dsnParser.charset}';"); - } - if (dsnParser.schema != null) { - await conn.execute("SET search_path TO '${dsnParser.schema}';"); - } - if (dsnParser.timezone != null) { - await conn.execute("SET time zone '${dsnParser.timezone};'"); - } - if (dsnParser.applicationName != null) { - await conn.execute( - "SET application_name TO '${dsnParser.applicationName};'"); - } + await _onOpen(conn, dsnParser); }, maxConnectionCount: dsnParser.poolSize, encoding: _getEncoding(dsnParser.charset ?? 'utf8'), @@ -97,41 +83,40 @@ class PostgresV3PDO extends PDOInterface { ), ); - //final pool = await (connection as Pool); + } else { connection = await Connection.open(endpoint, settings: ConnectionSettings( applicationName: dsnParser.applicationName, timeZone: dsnParser.timezone, onOpen: (conn) async { - if (dsnParser.charset != null) { - await conn - .execute("SET client_encoding = '${dsnParser.charset}';"); - } - if (dsnParser.schema != null) { - await conn.execute("SET search_path TO '${dsnParser.schema}';"); - } - if (dsnParser.timezone != null) { - await conn.execute("SET time zone '${dsnParser.timezone};'"); - } - if (dsnParser.applicationName != null) { - await conn.execute( - "SET application_name TO '${dsnParser.applicationName};'"); - } + await _onOpen(conn, dsnParser); }, encoding: _getEncoding(dsnParser.charset ?? 'utf8'), sslMode: sslMode, )); - - // final conn = await (connection as Connection); - // if (dsnParser.charset != null) { - // conn.execute('''SET client_encoding = '${dsnParser.charset}';'''); - // } } return this; } + /// inicializa configurações ao conectar com o banco de dados + Future _onOpen(Connection conn, DSNParser dsnParser) async { + if (dsnParser.charset != null) { + await conn.execute("SET client_encoding = '${dsnParser.charset}'"); + } + if (dsnParser.schema != null) { + await conn.execute("SET search_path TO ${dsnParser.schema}"); + } + if (dsnParser.timezone != null) { + await conn.execute("SET time zone '${dsnParser.timezone}'"); + } + if (dsnParser.applicationName != null) { + await conn + .execute("SET application_name TO '${dsnParser.applicationName}'"); + } + } + Future runInTransaction( Future operation(PostgresV3PDOTransaction ctx), [int? timeoutInSeconds]) async { @@ -195,7 +180,7 @@ class PostgresV3PDO extends PDOInterface { @override Future close() async { - // print('postgres_v3_pdo@close isOpen ${(connection).isOpen} '); + // print('postgres_v3_pdo@close isOpen ${(connection).isOpen} '); if (connection is Connection) { await (connection as Connection).close(); } else if (connection is Pool) { diff --git a/pubspec.yaml b/pubspec.yaml index 1d1ebd3..7f77816 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,5 +1,5 @@ name: eloquent -version: 3.1.2 +version: 3.2.0 description: eloquent query builder port from PHP Laravel homepage: https://github.com/insinfo/eloquent_dart #publish_to: none