diff --git a/Directory.Packages.props b/Directory.Packages.props
index 3bdc2e614..8b24406c7 100644
--- a/Directory.Packages.props
+++ b/Directory.Packages.props
@@ -42,6 +42,7 @@
+
diff --git a/extensions/Postgres/Postgres/Internals/PostgresDbClient.cs b/extensions/Postgres/Postgres/Internals/PostgresDbClient.cs
index 16ff83dc1..f20f8c0ee 100644
--- a/extensions/Postgres/Postgres/Internals/PostgresDbClient.cs
+++ b/extensions/Postgres/Postgres/Internals/PostgresDbClient.cs
@@ -96,48 +96,51 @@ public async Task DoesTableExistAsync(
tableName = this.WithTableNamePrefix(tableName);
this._log.LogTrace("Checking if table {0} exists", tableName);
- NpgsqlConnection connection = await this.ConnectAsync(cancellationToken).ConfigureAwait(false);
- await using (connection)
+ var (dataSource, connection) = await this.ConnectAsync(cancellationToken).ConfigureAwait(false);
+ await using (dataSource.ConfigureAwait(false))
{
- try
+ await using (connection)
{
- NpgsqlCommand cmd = connection.CreateCommand();
- await using (cmd.ConfigureAwait(false))
+ try
{
+ NpgsqlCommand cmd = connection.CreateCommand();
+ await using (cmd.ConfigureAwait(false))
+ {
#pragma warning disable CA2100 // SQL reviewed
- cmd.CommandText = $@"
- SELECT table_name
- FROM information_schema.tables
- WHERE table_schema = @schema
- AND table_name = @table
- AND table_type = 'BASE TABLE'
- LIMIT 1
- ";
-
- cmd.Parameters.AddWithValue("@schema", this._schema);
- cmd.Parameters.AddWithValue("@table", tableName);
+ cmd.CommandText = $@"
+ SELECT table_name
+ FROM information_schema.tables
+ WHERE table_schema = @schema
+ AND table_name = @table
+ AND table_type = 'BASE TABLE'
+ LIMIT 1
+ ";
+
+ cmd.Parameters.AddWithValue("@schema", this._schema);
+ cmd.Parameters.AddWithValue("@table", tableName);
#pragma warning restore CA2100
- this._log.LogTrace("Schema: {0}, Table: {1}, SQL: {2}", this._schema, tableName, cmd.CommandText);
+ this._log.LogTrace("Schema: {0}, Table: {1}, SQL: {2}", this._schema, tableName, cmd.CommandText);
- NpgsqlDataReader dataReader = await cmd.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
- await using (dataReader.ConfigureAwait(false))
- {
- if (await dataReader.ReadAsync(cancellationToken).ConfigureAwait(false))
+ NpgsqlDataReader dataReader = await cmd.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
+ await using (dataReader.ConfigureAwait(false))
{
- var name = dataReader.GetString(dataReader.GetOrdinal("table_name"));
+ if (await dataReader.ReadAsync(cancellationToken).ConfigureAwait(false))
+ {
+ var name = dataReader.GetString(dataReader.GetOrdinal("table_name"));
- return string.Equals(name, tableName, StringComparison.OrdinalIgnoreCase);
- }
+ return string.Equals(name, tableName, StringComparison.OrdinalIgnoreCase);
+ }
- this._log.LogTrace("Table {0} does not exist", tableName);
- return false;
+ this._log.LogTrace("Table {0} does not exist", tableName);
+ return false;
+ }
}
}
- }
- finally
- {
- await connection.CloseAsync().ConfigureAwait(false);
+ finally
+ {
+ await connection.CloseAsync().ConfigureAwait(false);
+ }
}
}
}
@@ -159,66 +162,70 @@ public async Task CreateTableAsync(
Npgsql.PostgresException? createErr = null;
- NpgsqlConnection connection = await this.ConnectAsync(cancellationToken).ConfigureAwait(false);
- await using (connection)
+ var (dataSource, connection) = await this.ConnectAsync(cancellationToken).ConfigureAwait(false);
+ await using (dataSource.ConfigureAwait(false))
{
- try
+ await using (connection)
{
- NpgsqlCommand cmd = connection.CreateCommand();
- await using (cmd.ConfigureAwait(false))
+ try
{
- var lockId = GenLockId(tableName);
+ NpgsqlCommand cmd = connection.CreateCommand();
+ await using (cmd.ConfigureAwait(false))
+ {
+ var lockId = GenLockId(tableName);
#pragma warning disable CA2100 // SQL reviewed
- if (!string.IsNullOrEmpty(this._createTableSql))
- {
- cmd.CommandText = this._createTableSql
- .Replace(PostgresConfig.SqlPlaceholdersTableName, tableName, StringComparison.Ordinal)
- .Replace(PostgresConfig.SqlPlaceholdersVectorSize, $"{vectorSize}", StringComparison.Ordinal)
- .Replace(PostgresConfig.SqlPlaceholdersLockId, $"{lockId}", StringComparison.Ordinal);
+ if (!string.IsNullOrEmpty(this._createTableSql))
+ {
+ cmd.CommandText = this._createTableSql
+ .Replace(PostgresConfig.SqlPlaceholdersTableName, tableName, StringComparison.Ordinal)
+ .Replace(PostgresConfig.SqlPlaceholdersVectorSize, $"{vectorSize}", StringComparison.Ordinal)
+ .Replace(PostgresConfig.SqlPlaceholdersLockId, $"{lockId}", StringComparison.Ordinal);
- this._log.LogTrace("Creating table with custom SQL: {0}", cmd.CommandText);
- }
- else
- {
- cmd.CommandText = $@"
- BEGIN;
- SELECT pg_advisory_xact_lock({lockId});
- CREATE TABLE IF NOT EXISTS {tableName} (
- {this._colId} TEXT NOT NULL PRIMARY KEY,
- {this._colEmbedding} vector({vectorSize}),
- {this._colTags} TEXT[] DEFAULT '{{}}'::TEXT[] NOT NULL,
- {this._colContent} TEXT DEFAULT '' NOT NULL,
- {this._colPayload} JSONB DEFAULT '{{}}'::JSONB NOT NULL
- );
- CREATE INDEX IF NOT EXISTS idx_tags ON {tableName} USING GIN({this._colTags});
- COMMIT;";
+ this._log.LogTrace("Creating table with custom SQL: {0}", cmd.CommandText);
+ }
+ else
+ {
+ cmd.CommandText = $@"
+ BEGIN;
+ SELECT pg_advisory_xact_lock({lockId});
+ CREATE TABLE IF NOT EXISTS {tableName} (
+ {this._colId} TEXT NOT NULL PRIMARY KEY,
+ {this._colEmbedding} vector({vectorSize}),
+ {this._colTags} TEXT[] DEFAULT '{{}}'::TEXT[] NOT NULL,
+ {this._colContent} TEXT DEFAULT '' NOT NULL,
+ {this._colPayload} JSONB DEFAULT '{{}}'::JSONB NOT NULL
+ );
+ CREATE INDEX IF NOT EXISTS idx_tags ON {tableName} USING GIN({this._colTags});
+ COMMIT;
+ ";
#pragma warning restore CA2100
- this._log.LogTrace("Creating table with default SQL: {0}", cmd.CommandText);
- }
+ this._log.LogTrace("Creating table with default SQL: {0}", cmd.CommandText);
+ }
- int result = await cmd.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
- this._log.LogTrace("Table '{0}' creation result: {1}", tableName, result);
+ int result = await cmd.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
+ this._log.LogTrace("Table '{0}' creation result: {1}", tableName, result);
+ }
+ }
+ catch (Npgsql.PostgresException e) when (IsVectorTypeDoesNotExistException(e))
+ {
+ this._log.LogError(e, "Vector type not installed, check 'SELECT * FROM pg_extension'");
+ throw;
+ }
+ catch (Npgsql.PostgresException e) when (e.SqlState == PgErrUniqueViolation)
+ {
+ createErr = e;
+ }
+ catch (Exception e)
+ {
+ this._log.LogError(e, "Table '{0}' creation error: {1}. Err: {2}. InnerEx: {3}", tableName, e, e.Message, e.InnerException);
+ throw;
+ }
+ finally
+ {
+ await connection.CloseAsync().ConfigureAwait(false);
}
- }
- catch (Npgsql.PostgresException e) when (IsVectorTypeDoesNotExistException(e))
- {
- this._log.LogError(e, "Vector type not installed, check 'SELECT * FROM pg_extension'");
- throw;
- }
- catch (Npgsql.PostgresException e) when (e.SqlState == PgErrUniqueViolation)
- {
- createErr = e;
- }
- catch (Exception e)
- {
- this._log.LogError(e, "Table '{0}' creation error: {1}. Err: {2}. InnerEx: {3}", tableName, e, e.Message, e.InnerException);
- throw;
- }
- finally
- {
- await connection.CloseAsync().ConfigureAwait(false);
}
}
@@ -260,37 +267,40 @@ public async Task CreateTableAsync(
public async IAsyncEnumerable GetTablesAsync(
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
- NpgsqlConnection connection = await this.ConnectAsync(cancellationToken).ConfigureAwait(false);
- await using (connection)
+ var (dataSource, connection) = await this.ConnectAsync(cancellationToken).ConfigureAwait(false);
+ await using (dataSource.ConfigureAwait(false))
{
- try
+ await using (connection)
{
- NpgsqlCommand cmd = connection.CreateCommand();
- await using (cmd.ConfigureAwait(false))
+ try
{
- cmd.CommandText = @"SELECT table_name FROM information_schema.tables
+ NpgsqlCommand cmd = connection.CreateCommand();
+ await using (cmd.ConfigureAwait(false))
+ {
+ cmd.CommandText = @"SELECT table_name FROM information_schema.tables
WHERE table_schema = @schema AND table_type = 'BASE TABLE';";
- cmd.Parameters.AddWithValue("@schema", this._schema);
+ cmd.Parameters.AddWithValue("@schema", this._schema);
- this._log.LogTrace("Fetching list of tables. SQL: {0}. Schema: {1}", cmd.CommandText, this._schema);
+ this._log.LogTrace("Fetching list of tables. SQL: {0}. Schema: {1}", cmd.CommandText, this._schema);
- NpgsqlDataReader dataReader = await cmd.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
- await using (dataReader.ConfigureAwait(false))
- {
- while (await dataReader.ReadAsync(cancellationToken).ConfigureAwait(false))
+ NpgsqlDataReader dataReader = await cmd.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
+ await using (dataReader.ConfigureAwait(false))
{
- var tableNameWithPrefix = dataReader.GetString(dataReader.GetOrdinal("table_name"));
- if (tableNameWithPrefix.StartsWith(this._tableNamePrefix, StringComparison.OrdinalIgnoreCase))
+ while (await dataReader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
- yield return tableNameWithPrefix.Remove(0, this._tableNamePrefix.Length);
+ var tableNameWithPrefix = dataReader.GetString(dataReader.GetOrdinal("table_name"));
+ if (tableNameWithPrefix.StartsWith(this._tableNamePrefix, StringComparison.OrdinalIgnoreCase))
+ {
+ yield return tableNameWithPrefix.Remove(0, this._tableNamePrefix.Length);
+ }
}
}
}
}
- }
- finally
- {
- await connection.CloseAsync().ConfigureAwait(false);
+ finally
+ {
+ await connection.CloseAsync().ConfigureAwait(false);
+ }
}
}
}
@@ -306,29 +316,32 @@ public async Task DeleteTableAsync(
{
tableName = this.WithSchemaAndTableNamePrefix(tableName);
- NpgsqlConnection connection = await this.ConnectAsync(cancellationToken).ConfigureAwait(false);
- await using (connection)
+ var (dataSource, connection) = await this.ConnectAsync(cancellationToken).ConfigureAwait(false);
+ await using (dataSource.ConfigureAwait(false))
{
- try
+ await using (connection)
{
- NpgsqlCommand cmd = connection.CreateCommand();
- await using (cmd.ConfigureAwait(false))
+ try
{
+ NpgsqlCommand cmd = connection.CreateCommand();
+ await using (cmd.ConfigureAwait(false))
+ {
#pragma warning disable CA2100 // SQL reviewed
- cmd.CommandText = $"DROP TABLE IF EXISTS {tableName}";
+ cmd.CommandText = $"DROP TABLE IF EXISTS {tableName}";
#pragma warning restore CA2100
- this._log.LogTrace("Deleting table. SQL: {0}", cmd.CommandText);
- await cmd.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
+ this._log.LogTrace("Deleting table. SQL: {0}", cmd.CommandText);
+ await cmd.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
+ }
+ }
+ catch (Npgsql.PostgresException e) when (IsTableNotFoundException(e))
+ {
+ this._log.LogTrace("Table not found: {0}", tableName);
+ }
+ finally
+ {
+ await connection.CloseAsync().ConfigureAwait(false);
}
- }
- catch (Npgsql.PostgresException e) when (IsTableNotFoundException(e))
- {
- this._log.LogTrace("Table not found: {0}", tableName);
- }
- finally
- {
- await connection.CloseAsync().ConfigureAwait(false);
}
}
}
@@ -350,51 +363,54 @@ public async Task UpsertAsync(
const string EmptyContent = "";
string[] emptyTags = [];
- NpgsqlConnection connection = await this.ConnectAsync(cancellationToken).ConfigureAwait(false);
- await using (connection)
+ var (dataSource, connection) = await this.ConnectAsync(cancellationToken).ConfigureAwait(false);
+ await using (dataSource.ConfigureAwait(false))
{
- try
+ await using (connection)
{
- NpgsqlCommand cmd = connection.CreateCommand();
- await using (cmd.ConfigureAwait(false))
+ try
{
+ NpgsqlCommand cmd = connection.CreateCommand();
+ await using (cmd.ConfigureAwait(false))
+ {
#pragma warning disable CA2100 // SQL reviewed
- cmd.CommandText = $@"
- INSERT INTO {tableName}
- ({this._colId}, {this._colEmbedding}, {this._colTags}, {this._colContent}, {this._colPayload})
- VALUES
- (@id, @embedding, @tags, @content, @payload)
- ON CONFLICT ({this._colId})
- DO UPDATE SET
- {this._colEmbedding} = @embedding,
- {this._colTags} = @tags,
- {this._colContent} = @content,
- {this._colPayload} = @payload
- ";
-
- cmd.Parameters.AddWithValue("@id", record.Id);
- cmd.Parameters.AddWithValue("@embedding", record.Embedding);
- cmd.Parameters.AddWithValue("@tags", NpgsqlDbType.Array | NpgsqlDbType.Text, record.Tags.ToArray() ?? emptyTags);
- cmd.Parameters.AddWithValue("@content", NpgsqlDbType.Text, CleanContent(record.Content) ?? EmptyContent);
- cmd.Parameters.AddWithValue("@payload", NpgsqlDbType.Jsonb, record.Payload ?? EmptyPayload);
+ cmd.CommandText = $@"
+ INSERT INTO {tableName}
+ ({this._colId}, {this._colEmbedding}, {this._colTags}, {this._colContent}, {this._colPayload})
+ VALUES
+ (@id, @embedding, @tags, @content, @payload)
+ ON CONFLICT ({this._colId})
+ DO UPDATE SET
+ {this._colEmbedding} = @embedding,
+ {this._colTags} = @tags,
+ {this._colContent} = @content,
+ {this._colPayload} = @payload
+ ";
+
+ cmd.Parameters.AddWithValue("@id", record.Id);
+ cmd.Parameters.AddWithValue("@embedding", record.Embedding);
+ cmd.Parameters.AddWithValue("@tags", NpgsqlDbType.Array | NpgsqlDbType.Text, record.Tags.ToArray() ?? emptyTags);
+ cmd.Parameters.AddWithValue("@content", NpgsqlDbType.Text, CleanContent(record.Content) ?? EmptyContent);
+ cmd.Parameters.AddWithValue("@payload", NpgsqlDbType.Jsonb, record.Payload ?? EmptyPayload);
#pragma warning restore CA2100
- this._log.LogTrace("Upserting record '{0}' in table '{1}'", record.Id, tableName);
+ this._log.LogTrace("Upserting record '{0}' in table '{1}'", record.Id, tableName);
- await cmd.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
+ await cmd.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
+ }
+ }
+ catch (Npgsql.PostgresException e) when (IsTableNotFoundException(e))
+ {
+ throw new IndexNotFoundException(e.Message, e);
+ }
+ catch (Exception e)
+ {
+ throw new PostgresException(e.Message, e);
+ }
+ finally
+ {
+ await connection.CloseAsync().ConfigureAwait(false);
}
- }
- catch (Npgsql.PostgresException e) when (IsTableNotFoundException(e))
- {
- throw new IndexNotFoundException(e.Message, e);
- }
- catch (Exception e)
- {
- throw new PostgresException(e.Message, e);
- }
- finally
- {
- await connection.CloseAsync().ConfigureAwait(false);
}
}
}
@@ -444,68 +460,71 @@ DO UPDATE SET
this._log.LogTrace("Searching by similarity. Table: {0}. Threshold: {1}. Limit: {2}. Offset: {3}. Using SQL filter: {4}",
tableName, minSimilarity, limit, offset, string.IsNullOrWhiteSpace(filterSql) ? "false" : "true");
- NpgsqlConnection connection = await this.ConnectAsync(cancellationToken).ConfigureAwait(false);
- await using (connection)
+ var (dataSource, connection) = await this.ConnectAsync(cancellationToken).ConfigureAwait(false);
+ await using (dataSource.ConfigureAwait(false))
{
- try
+ await using (connection)
{
- NpgsqlCommand cmd = connection.CreateCommand();
- await using (cmd.ConfigureAwait(false))
+ try
{
-#pragma warning disable CA2100 // SQL reviewed
- string colDistance = "__distance";
-
- // When using 1 - (embedding <=> target) the index is not being used, therefore we calculate
- // the similarity (1 - distance) later. Furthermore, colDistance can't be used in the WHERE clause.
- cmd.CommandText = @$"
- SELECT {columns}, {this._colEmbedding} <=> @embedding AS {colDistance}
- FROM {tableName}
- WHERE {filterSql}
- ORDER BY {colDistance} ASC
- LIMIT @limit
- OFFSET @offset
- ";
-
- cmd.Parameters.AddWithValue("@embedding", target);
- cmd.Parameters.AddWithValue("@maxDistance", maxDistance);
- cmd.Parameters.AddWithValue("@limit", limit);
- cmd.Parameters.AddWithValue("@offset", offset);
-
- foreach (KeyValuePair kv in sqlUserValues)
+ NpgsqlCommand cmd = connection.CreateCommand();
+ await using (cmd.ConfigureAwait(false))
{
- cmd.Parameters.AddWithValue(kv.Key, kv.Value);
- }
+#pragma warning disable CA2100 // SQL reviewed
+ string colDistance = "__distance";
+
+ // When using 1 - (embedding <=> target) the index is not being used, therefore we calculate
+ // the similarity (1 - distance) later. Furthermore, colDistance can't be used in the WHERE clause.
+ cmd.CommandText = @$"
+ SELECT {columns}, {this._colEmbedding} <=> @embedding AS {colDistance}
+ FROM {tableName}
+ WHERE {filterSql}
+ ORDER BY {colDistance} ASC
+ LIMIT @limit
+ OFFSET @offset
+ ";
+
+ cmd.Parameters.AddWithValue("@embedding", target);
+ cmd.Parameters.AddWithValue("@maxDistance", maxDistance);
+ cmd.Parameters.AddWithValue("@limit", limit);
+ cmd.Parameters.AddWithValue("@offset", offset);
+
+ foreach (KeyValuePair kv in sqlUserValues)
+ {
+ cmd.Parameters.AddWithValue(kv.Key, kv.Value);
+ }
#pragma warning restore CA2100
- // TODO: rewrite code to stream results (need to combine yield and try-catch)
- var result = new List<(PostgresMemoryRecord record, double similarity)>();
- try
- {
- NpgsqlDataReader dataReader = await cmd.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
- await using (dataReader.ConfigureAwait(false))
+ // TODO: rewrite code to stream results (need to combine yield and try-catch)
+ var result = new List<(PostgresMemoryRecord record, double similarity)>();
+ try
{
- while (await dataReader.ReadAsync(cancellationToken).ConfigureAwait(false))
+ NpgsqlDataReader dataReader = await cmd.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
+ await using (dataReader.ConfigureAwait(false))
{
- double distance = dataReader.GetDouble(dataReader.GetOrdinal(colDistance));
- double similarity = 1 - distance;
- result.Add((this.ReadEntry(dataReader, withEmbeddings), similarity));
+ while (await dataReader.ReadAsync(cancellationToken).ConfigureAwait(false))
+ {
+ double distance = dataReader.GetDouble(dataReader.GetOrdinal(colDistance));
+ double similarity = 1 - distance;
+ result.Add((this.ReadEntry(dataReader, withEmbeddings), similarity));
+ }
}
}
- }
- catch (Npgsql.PostgresException e) when (IsTableNotFoundException(e))
- {
- this._log.LogTrace("Table not found: {0}", tableName);
- }
+ catch (Npgsql.PostgresException e) when (IsTableNotFoundException(e))
+ {
+ this._log.LogTrace("Table not found: {0}", tableName);
+ }
- // TODO: rewrite code to stream results (need to combine yield and try-catch)
- foreach (var x in result)
- {
- yield return x;
+ // TODO: rewrite code to stream results (need to combine yield and try-catch)
+ foreach (var x in result)
+ {
+ yield return x;
+ }
}
}
- }
- finally
- {
- await connection.CloseAsync().ConfigureAwait(false);
+ finally
+ {
+ await connection.CloseAsync().ConfigureAwait(false);
+ }
}
}
}
@@ -553,63 +572,66 @@ public async IAsyncEnumerable GetListAsync(
this._log.LogTrace("Fetching list of records. Table: {0}. Order by: {1}. Limit: {2}. Offset: {3}. Using SQL filter: {4}",
tableName, orderBySql, limit, offset, string.IsNullOrWhiteSpace(filterSql) ? "false" : "true");
- NpgsqlConnection connection = await this.ConnectAsync(cancellationToken).ConfigureAwait(false);
- await using (connection)
+ var (dataSource, connection) = await this.ConnectAsync(cancellationToken).ConfigureAwait(false);
+ await using (dataSource.ConfigureAwait(false))
{
- try
+ await using (connection)
{
- NpgsqlCommand cmd = connection.CreateCommand();
- await using (cmd.ConfigureAwait(false))
+ try
{
-#pragma warning disable CA2100 // SQL reviewed
- cmd.CommandText = @$"
- SELECT {columns} FROM {tableName}
- WHERE {filterSql}
- ORDER BY {orderBySql}
- LIMIT @limit
- OFFSET @offset
- ";
-
- cmd.Parameters.AddWithValue("@limit", limit);
- cmd.Parameters.AddWithValue("@offset", offset);
-
- if (sqlUserValues != null)
+ NpgsqlCommand cmd = connection.CreateCommand();
+ await using (cmd.ConfigureAwait(false))
{
- foreach (KeyValuePair kv in sqlUserValues)
+#pragma warning disable CA2100 // SQL reviewed
+ cmd.CommandText = @$"
+ SELECT {columns} FROM {tableName}
+ WHERE {filterSql}
+ ORDER BY {orderBySql}
+ LIMIT @limit
+ OFFSET @offset
+ ";
+
+ cmd.Parameters.AddWithValue("@limit", limit);
+ cmd.Parameters.AddWithValue("@offset", offset);
+
+ if (sqlUserValues != null)
{
- cmd.Parameters.AddWithValue(kv.Key, kv.Value);
+ foreach (KeyValuePair kv in sqlUserValues)
+ {
+ cmd.Parameters.AddWithValue(kv.Key, kv.Value);
+ }
}
- }
#pragma warning restore CA2100
- // TODO: rewrite code to stream results (need to combine yield and try-catch)
- var result = new List();
- try
- {
- NpgsqlDataReader dataReader = await cmd.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
- await using (dataReader.ConfigureAwait(false))
+ // TODO: rewrite code to stream results (need to combine yield and try-catch)
+ var result = new List();
+ try
{
- while (await dataReader.ReadAsync(cancellationToken).ConfigureAwait(false))
+ NpgsqlDataReader dataReader = await cmd.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
+ await using (dataReader.ConfigureAwait(false))
{
- result.Add(this.ReadEntry(dataReader, withEmbeddings));
+ while (await dataReader.ReadAsync(cancellationToken).ConfigureAwait(false))
+ {
+ result.Add(this.ReadEntry(dataReader, withEmbeddings));
+ }
}
}
- }
- catch (Npgsql.PostgresException e) when (IsTableNotFoundException(e))
- {
- this._log.LogTrace("Table not found: {0}", tableName);
- }
+ catch (Npgsql.PostgresException e) when (IsTableNotFoundException(e))
+ {
+ this._log.LogTrace("Table not found: {0}", tableName);
+ }
- // TODO: rewrite code to stream results (need to combine yield and try-catch)
- foreach (var x in result)
- {
- yield return x;
+ // TODO: rewrite code to stream results (need to combine yield and try-catch)
+ foreach (var x in result)
+ {
+ yield return x;
+ }
}
}
- }
- finally
- {
- await connection.CloseAsync().ConfigureAwait(false);
+ finally
+ {
+ await connection.CloseAsync().ConfigureAwait(false);
+ }
}
}
}
@@ -628,32 +650,35 @@ public async Task DeleteAsync(
tableName = this.WithSchemaAndTableNamePrefix(tableName);
this._log.LogTrace("Deleting record '{0}' from table '{1}'", id, tableName);
- NpgsqlConnection connection = await this.ConnectAsync(cancellationToken).ConfigureAwait(false);
- await using (connection)
+ var (dataSource, connection) = await this.ConnectAsync(cancellationToken).ConfigureAwait(false);
+ await using (dataSource.ConfigureAwait(false))
{
- try
+ await using (connection)
{
- NpgsqlCommand cmd = connection.CreateCommand();
- await using (cmd.ConfigureAwait(false))
+ try
{
+ NpgsqlCommand cmd = connection.CreateCommand();
+ await using (cmd.ConfigureAwait(false))
+ {
#pragma warning disable CA2100 // SQL reviewed
- cmd.CommandText = $"DELETE FROM {tableName} WHERE {this._colId}=@id";
- cmd.Parameters.AddWithValue("@id", id);
+ cmd.CommandText = $"DELETE FROM {tableName} WHERE {this._colId}=@id";
+ cmd.Parameters.AddWithValue("@id", id);
#pragma warning restore CA2100
- try
- {
- await cmd.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
- }
- catch (Npgsql.PostgresException e) when (IsTableNotFoundException(e))
- {
- this._log.LogTrace("Table not found: {0}", tableName);
+ try
+ {
+ await cmd.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
+ }
+ catch (Npgsql.PostgresException e) when (IsTableNotFoundException(e))
+ {
+ this._log.LogTrace("Table not found: {0}", tableName);
+ }
}
}
- }
- finally
- {
- await connection.CloseAsync().ConfigureAwait(false);
+ finally
+ {
+ await connection.CloseAsync().ConfigureAwait(false);
+ }
}
}
}
@@ -663,15 +688,12 @@ public async Task DeleteAsync(
///
///
///
- private async Task ConnectAsync(CancellationToken cancellationToken = default)
+ private async Task<(NpgsqlDataSource DataSource, NpgsqlConnection Connection)> ConnectAsync(CancellationToken cancellationToken = default)
{
try
{
var dataSource = this._dataSourceBuilder.Build();
- await using (dataSource.ConfigureAwait(false))
- {
- return await dataSource.OpenConnectionAsync(cancellationToken).ConfigureAwait(false);
- }
+ return (dataSource, await dataSource.OpenConnectionAsync(cancellationToken).ConfigureAwait(false));
}
catch (Npgsql.PostgresException e) when (IsDbNotFoundException(e))
{
diff --git a/extensions/Postgres/Postgres/Postgres.csproj b/extensions/Postgres/Postgres/Postgres.csproj
index a76e6fcf4..2c80dec15 100644
--- a/extensions/Postgres/Postgres/Postgres.csproj
+++ b/extensions/Postgres/Postgres/Postgres.csproj
@@ -14,6 +14,7 @@
+