From 52843b0e6cddc80a3184f4aa787418d283ff6167 Mon Sep 17 00:00:00 2001 From: Tobbe Gyllebring Date: Tue, 2 Jan 2018 12:41:08 +0100 Subject: [PATCH] Dataflow. --- DataBoss.Data/DataBossBulkCopy.cs | 19 +++-- DataBoss.Data/DataBossConnectionProvider.cs | 9 ++- DataBoss.Data/Dataflow/ProcessingBlock.cs | 73 +++++++++++++++++++ DataBoss.Data/Linq/MissinqLinq.cs | 5 ++ DataBoss.Data/SequenceDataReader.cs | 3 + DataBoss.Data/SqlConnectionExtensions.cs | 8 +- .../DataBossConnectionProviderSpec.cs | 10 +++ .../SqlServerMaintenancePlanWizard.cs | 40 +++++++++- 8 files changed, 151 insertions(+), 16 deletions(-) create mode 100644 DataBoss.Data/Dataflow/ProcessingBlock.cs diff --git a/DataBoss.Data/DataBossBulkCopy.cs b/DataBoss.Data/DataBossBulkCopy.cs index 7f750c0..c81ff79 100644 --- a/DataBoss.Data/DataBossBulkCopy.cs +++ b/DataBoss.Data/DataBossBulkCopy.cs @@ -12,7 +12,7 @@ struct IdRow { public int Id; } public class DataBossBulkCopySettings { - public int BatchSize; + public int? BatchSize; public int? CommandTimeout; } @@ -25,9 +25,6 @@ public class DataBossBulkCopy public readonly SqlConnection Connection; public readonly SqlTransaction Transaction; - int? CommandTimeout => settings.CommandTimeout; - int BatchSize => settings.BatchSize; - public DataBossBulkCopy(SqlConnection connection) : this(connection, null, new DataBossBulkCopySettings()) { } public DataBossBulkCopy(SqlConnection connection, DataBossBulkCopySettings settings) : this(connection, null, settings) { } public DataBossBulkCopy(SqlConnection connection, SqlTransaction transaction) : this(connection, transaction, new DataBossBulkCopySettings()) { } @@ -46,7 +43,8 @@ public void Insert(string destinationTable, IDataReader toInsert) { } } - public void Insert(string destinationTable, IEnumerable rows) => Insert(destinationTable, SequenceDataReader.Create(rows, x => x.MapAll())); + public void Insert(string destinationTable, IEnumerable rows) => + Insert(destinationTable, SequenceDataReader.Create(rows, x => x.MapAll())); public ICollection InsertAndGetIdentities(string destinationTable, IEnumerable rows) { var n = 0; @@ -75,8 +73,8 @@ order by [$] drop table {TempTableName} ")) { - if(CommandTimeout.HasValue) - cmd.CommandTimeout = CommandTimeout.Value; + if(settings.CommandTimeout.HasValue) + cmd.CommandTimeout = settings.CommandTimeout.Value; cmd.Transaction = Transaction; using (var reader = ObjectReader.For(cmd.ExecuteReader(CommandBehavior.SingleResult | CommandBehavior.SequentialAccess))) { var ids = reader.Read().Select(x => x.Id).ToList(); @@ -90,10 +88,11 @@ SqlBulkCopy NewBulkCopy(string destinationTable) { var bulkCopy = new SqlBulkCopy(Connection, SqlBulkCopyOptions.TableLock, Transaction) { DestinationTableName = destinationTable, EnableStreaming = true, - BatchSize = BatchSize, }; - if(CommandTimeout.HasValue) - bulkCopy.BulkCopyTimeout = CommandTimeout.Value; + if(settings.BatchSize.HasValue) + bulkCopy.BatchSize = settings.BatchSize.Value; + if(settings.CommandTimeout.HasValue) + bulkCopy.BulkCopyTimeout = settings.CommandTimeout.Value; return bulkCopy; } } diff --git a/DataBoss.Data/DataBossConnectionProvider.cs b/DataBoss.Data/DataBossConnectionProvider.cs index a87c594..bae4435 100644 --- a/DataBoss.Data/DataBossConnectionProvider.cs +++ b/DataBoss.Data/DataBossConnectionProvider.cs @@ -5,10 +5,11 @@ using System.Data.SqlClient; using System.Linq; using System.Threading; +using DataBoss.Linq; namespace DataBoss.Data { - public class DataBossConnectionProvider + public class DataBossConnectionProvider : IDisposable { public struct ProviderStatistics : IEnumerable> { @@ -19,6 +20,7 @@ public ProviderStatistics(Dictionary stats) { } public ByteSize BytesReceived => new ByteSize(GetOrDefault(nameof(BytesReceived))); + public ByteSize BytesSent => new ByteSize(GetOrDefault(nameof(BytesSent))); public long ConnectionsCreated => GetOrDefault(nameof(ConnectionsCreated)); public long LiveConnections => GetOrDefault(nameof(LiveConnections)); public long SelectCount => GetOrDefault(nameof(SelectCount)); @@ -96,5 +98,10 @@ public void ResetStatistics() { foreach(var item in connections.Values) item.ResetStatistics(); } + + void IDisposable.Dispose() => Cleanup(); + + public void Cleanup() => + connections.Values.ForEach(x => x.Dispose()); } } diff --git a/DataBoss.Data/Dataflow/ProcessingBlock.cs b/DataBoss.Data/Dataflow/ProcessingBlock.cs new file mode 100644 index 0000000..de301c7 --- /dev/null +++ b/DataBoss.Data/Dataflow/ProcessingBlock.cs @@ -0,0 +1,73 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace DataBoss.Data.Dataflow +{ + public interface IActionBlock + { + Task Completion { get; } + } + + public interface IMessageSink + { + void Post(T item); + } + + public interface IProcessingBlock : IActionBlock, IMessageSink + { + void Complete(); + } + + public static class Block + { + class TaskBlock : IActionBlock + { + readonly Task worker; + + internal TaskBlock(Task worker) { + this.worker = worker; + } + + public Task Completion => worker; + } + + class SequenceProcessingBlock : IProcessingBlock + { + readonly BlockingCollection inputs; + readonly Action> process; + readonly Task worker; + + internal SequenceProcessingBlock(TaskFactory tasks, Action> process, int? maxQueue) { + this.inputs = maxQueue.HasValue ? new BlockingCollection(maxQueue.Value) : new BlockingCollection(); + this.process = process; + this.worker = tasks.StartNew(() => process(inputs.GetConsumingEnumerable()), TaskCreationOptions.LongRunning); + } + + public Task Completion => worker; + public void Complete() => inputs.CompleteAdding(); + + public void Post(T item) => inputs.Add(item); + } + + public static IActionBlock Action(Action action) => + new TaskBlock(Task.Factory.StartNew(action, TaskCreationOptions.LongRunning)); + + public static IActionBlock Generator(Func> generator, Func> getSink) { + var sink = getSink(); + return new TaskBlock(Task.Factory.StartNew(() => { + + foreach (var item in generator()) + sink.Post(item); + }, TaskCreationOptions.LongRunning)); + } + + public static IProcessingBlock Sequence(Action> process, int? maxQueue = null) => + new SequenceProcessingBlock(Task.Factory, process, maxQueue); + + public static IProcessingBlock> Batches(Action> process, int? maxQueue = null) => + Sequence>(xs => process(xs.SelectMany(x => x)), maxQueue); + } +} diff --git a/DataBoss.Data/Linq/MissinqLinq.cs b/DataBoss.Data/Linq/MissinqLinq.cs index 12bd7ed..f452219 100644 --- a/DataBoss.Data/Linq/MissinqLinq.cs +++ b/DataBoss.Data/Linq/MissinqLinq.cs @@ -153,4 +153,9 @@ public static TOutput[] ConvertAll(this T[] self, Converter(T[] ts) => ts.Length == 1 ? ts[0] : throw new InvalidOperationException("Array contains more than one element."); } + + public static class KeyValuePair + { + public static KeyValuePair Create(TKey key, TValue value) => new KeyValuePair(key, value); + } } \ No newline at end of file diff --git a/DataBoss.Data/SequenceDataReader.cs b/DataBoss.Data/SequenceDataReader.cs index 8dbddaf..9300fb0 100644 --- a/DataBoss.Data/SequenceDataReader.cs +++ b/DataBoss.Data/SequenceDataReader.cs @@ -11,6 +11,7 @@ public static class DataReaderSchemaColumns public const string ColumnName = "ColumnName"; public const string ColumnOrdinal = "ColumnOrdinal"; public const string ColumnSize = "ColumnSize"; + public const string DataType = "DataType"; } public static class SequenceDataReader @@ -90,6 +91,7 @@ DataTable IDataReader.GetSchemaTable() { var columnOrdinal = schema.Columns.Add(DataReaderSchemaColumns.ColumnOrdinal, typeof(int)); var columnSize = schema.Columns.Add(DataReaderSchemaColumns.ColumnSize, typeof(int)); var isNullable = schema.Columns.Add(DataReaderSchemaColumns.AllowDBNull, typeof(bool)); + var dataType = schema.Columns.Add(DataReaderSchemaColumns.DataType, typeof(Type)); for(var i = 0; i != FieldCount; ++i) { var r = schema.NewRow(); var dbType = dbTypes[i]; @@ -97,6 +99,7 @@ DataTable IDataReader.GetSchemaTable() { r[columnOrdinal] = i; r[columnSize] = dbType.ColumnSize.HasValue ? (object)dbType.ColumnSize.Value : DBNull.Value; r[isNullable] = dbType.IsNullable; + r[dataType] = GetFieldType(i); schema.Rows.Add(r); } return schema; diff --git a/DataBoss.Data/SqlConnectionExtensions.cs b/DataBoss.Data/SqlConnectionExtensions.cs index 717b413..948cb4f 100644 --- a/DataBoss.Data/SqlConnectionExtensions.cs +++ b/DataBoss.Data/SqlConnectionExtensions.cs @@ -56,7 +56,7 @@ public static void Into(this SqlConnection connection, string destinationTable, public static void Into(this SqlConnection connection, string destinationTable, IDataReader toInsert, DataBossBulkCopySettings settings) { var scripter = new DataBossScripter(); connection.ExecuteNonQuery(scripter.ScriptTable(destinationTable, toInsert)); - connection.Insert(null, destinationTable, toInsert, settings); + connection.Insert(destinationTable, toInsert, settings); } public static void Insert(this SqlConnection connection, string destinationTable, IEnumerable rows) => @@ -65,6 +65,9 @@ public static void Insert(this SqlConnection connection, string destinationTa public static void Insert(this SqlConnection connection, SqlTransaction transaction, string destinationTable, IEnumerable rows) => Insert(connection, transaction, destinationTable, rows, new DataBossBulkCopySettings()); + public static void Insert(this SqlConnection connection, string destinationTable, IEnumerable rows, DataBossBulkCopySettings settings) => + connection.Insert(null, destinationTable, rows, settings); + public static void Insert(this SqlConnection connection, SqlTransaction transaction, string destinationTable, IEnumerable rows, DataBossBulkCopySettings settings) => new DataBossBulkCopy(connection, transaction, settings).Insert(destinationTable, rows); @@ -74,6 +77,9 @@ public static void Insert(this SqlConnection connection, string destinationTable public static void Insert(this SqlConnection connection, SqlTransaction transaction, string destinationTable, IDataReader toInsert) => Insert(connection, transaction, destinationTable, toInsert, new DataBossBulkCopySettings()); + public static void Insert(this SqlConnection connection, string destinationTable, IDataReader toInsert, DataBossBulkCopySettings settings) => + Insert(connection, null, destinationTable, toInsert, settings); + public static void Insert(this SqlConnection connection, SqlTransaction transaction, string destinationTable, IDataReader toInsert, DataBossBulkCopySettings settings) => new DataBossBulkCopy(connection, transaction, settings).Insert(destinationTable, toInsert); diff --git a/DataBoss.Specs/DataBossConnectionProviderSpec.cs b/DataBoss.Specs/DataBossConnectionProviderSpec.cs index 6729336..2f0bdd2 100644 --- a/DataBoss.Specs/DataBossConnectionProviderSpec.cs +++ b/DataBoss.Specs/DataBossConnectionProviderSpec.cs @@ -61,5 +61,15 @@ public void reset_statistics() { .That(stats => stats["SelectCount"] == 0); } } + + public void cleanup() { + var disposed = false; + var db = connections.NewConnection(); + db.Disposed += (_, __) => disposed = true; + connections.Cleanup(); + Check.That( + () => disposed, + () => connections.LiveConnections == 0); + } } } diff --git a/DataBoss/Diagnostics/SqlServerMaintenancePlanWizard.cs b/DataBoss/Diagnostics/SqlServerMaintenancePlanWizard.cs index 7c45a82..39b6ce3 100644 --- a/DataBoss/Diagnostics/SqlServerMaintenancePlanWizard.cs +++ b/DataBoss/Diagnostics/SqlServerMaintenancePlanWizard.cs @@ -1,8 +1,11 @@ using System; using System.Collections.Generic; +using System.Data; using System.Data.SqlClient; using System.Linq; +using System.Runtime.ExceptionServices; using DataBoss.Data; +using DataBoss.Linq; namespace DataBoss.Diagnostics { @@ -20,6 +23,17 @@ public SqlServerMaintenancePlan(string server, string database, string[] command } } + public class SqlServerMaintenancePlanWizardErrorEventArgs : EventArgs + { + public readonly SqlConnection Connection; + public readonly Exception Error; + + public SqlServerMaintenancePlanWizardErrorEventArgs(SqlConnection connection, Exception error) { + this.Connection = connection; + this.Error = error; + } + } + public class SqlServerMaintenancePlanWizard { public bool UpdateStatistics = true; @@ -37,12 +51,12 @@ class IndexStatsRow } #pragma warning restore CS0649 + public event EventHandler OnError; + public IEnumerable MakePlans(Func getServer, string[] dbs) { - using(var server = getServer()) { - server.Open(); + using(var server = getServer()) foreach(var item in MakePlans(server, dbs)) yield return item; - } } public IEnumerable MakePlans(SqlConnection server, string[] dbs) { @@ -50,9 +64,27 @@ public IEnumerable MakePlans(SqlConnection server, str var reader = new DbObjectReader(server) { CommandTimeout = null }; foreach (var database in dbs) { + if(server.State != ConnectionState.Open) + server.Open(); server.ChangeDatabase(database); - yield return new SqlServerMaintenancePlan(serverName, database, GetMaintenanceCommands(reader).ToArray()); + if(TryCreateMaintenancePlan(serverName, database, reader, out var result)) + yield return result.Key; + else if(OnError != null) + OnError(this, new SqlServerMaintenancePlanWizardErrorEventArgs(server, result.Value)); + else ExceptionDispatchInfo.Capture(result.Value).Throw(); + } + } + + bool TryCreateMaintenancePlan(string serverName, string database, DbObjectReader reader, out KeyValuePair result) { + try { + result = KeyValuePair.Create(new SqlServerMaintenancePlan(serverName, database, GetMaintenanceCommands(reader).ToArray()), (Exception)null); + return true; + } + catch (Exception ex) { + result = KeyValuePair.Create((SqlServerMaintenancePlan)null, ex); + return false; } + } IEnumerable GetMaintenanceCommands(DbObjectReader reader) {