Skip to content

Commit

Permalink
Dataflow.
Browse files Browse the repository at this point in the history
  • Loading branch information
drunkcod committed Jan 2, 2018
1 parent 52e911b commit 52843b0
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 16 deletions.
19 changes: 9 additions & 10 deletions DataBoss.Data/DataBossBulkCopy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ struct IdRow { public int Id; }

public class DataBossBulkCopySettings
{
public int BatchSize;
public int? BatchSize;
public int? CommandTimeout;
}

Expand All @@ -25,9 +25,6 @@ public class DataBossBulkCopy
public readonly SqlConnection Connection;
public readonly SqlTransaction Transaction;

int? CommandTimeout => settings.CommandTimeout;
int BatchSize => settings.BatchSize;

public DataBossBulkCopy(SqlConnection connection) : this(connection, null, new DataBossBulkCopySettings()) { }
public DataBossBulkCopy(SqlConnection connection, DataBossBulkCopySettings settings) : this(connection, null, settings) { }
public DataBossBulkCopy(SqlConnection connection, SqlTransaction transaction) : this(connection, transaction, new DataBossBulkCopySettings()) { }
Expand All @@ -46,7 +43,8 @@ public void Insert(string destinationTable, IDataReader toInsert) {
}
}

public void Insert<T>(string destinationTable, IEnumerable<T> rows) => Insert(destinationTable, SequenceDataReader.Create(rows, x => x.MapAll()));
public void Insert<T>(string destinationTable, IEnumerable<T> rows) =>
Insert(destinationTable, SequenceDataReader.Create(rows, x => x.MapAll()));

public ICollection<int> InsertAndGetIdentities<T>(string destinationTable, IEnumerable<T> rows) {
var n = 0;
Expand Down Expand Up @@ -75,8 +73,8 @@ order by [$]
drop table {TempTableName}
"))
{
if(CommandTimeout.HasValue)
cmd.CommandTimeout = CommandTimeout.Value;
if(settings.CommandTimeout.HasValue)
cmd.CommandTimeout = settings.CommandTimeout.Value;
cmd.Transaction = Transaction;
using (var reader = ObjectReader.For(cmd.ExecuteReader(CommandBehavior.SingleResult | CommandBehavior.SequentialAccess))) {
var ids = reader.Read<IdRow>().Select(x => x.Id).ToList();
Expand All @@ -90,10 +88,11 @@ SqlBulkCopy NewBulkCopy(string destinationTable) {
var bulkCopy = new SqlBulkCopy(Connection, SqlBulkCopyOptions.TableLock, Transaction) {
DestinationTableName = destinationTable,
EnableStreaming = true,
BatchSize = BatchSize,
};
if(CommandTimeout.HasValue)
bulkCopy.BulkCopyTimeout = CommandTimeout.Value;
if(settings.BatchSize.HasValue)
bulkCopy.BatchSize = settings.BatchSize.Value;
if(settings.CommandTimeout.HasValue)
bulkCopy.BulkCopyTimeout = settings.CommandTimeout.Value;
return bulkCopy;
}
}
Expand Down
9 changes: 8 additions & 1 deletion DataBoss.Data/DataBossConnectionProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
using System.Data.SqlClient;
using System.Linq;
using System.Threading;
using DataBoss.Linq;

namespace DataBoss.Data
{
public class DataBossConnectionProvider
public class DataBossConnectionProvider : IDisposable
{
public struct ProviderStatistics : IEnumerable<KeyValuePair<string, long>>
{
Expand All @@ -19,6 +20,7 @@ public ProviderStatistics(Dictionary<string, long> stats) {
}

public ByteSize BytesReceived => new ByteSize(GetOrDefault(nameof(BytesReceived)));
public ByteSize BytesSent => new ByteSize(GetOrDefault(nameof(BytesSent)));
public long ConnectionsCreated => GetOrDefault(nameof(ConnectionsCreated));
public long LiveConnections => GetOrDefault(nameof(LiveConnections));
public long SelectCount => GetOrDefault(nameof(SelectCount));
Expand Down Expand Up @@ -96,5 +98,10 @@ public void ResetStatistics() {
foreach(var item in connections.Values)
item.ResetStatistics();
}

void IDisposable.Dispose() => Cleanup();

public void Cleanup() =>
connections.Values.ForEach(x => x.Dispose());
}
}
73 changes: 73 additions & 0 deletions DataBoss.Data/Dataflow/ProcessingBlock.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace DataBoss.Data.Dataflow
{
public interface IActionBlock
{
Task Completion { get; }
}

public interface IMessageSink<T>
{
void Post(T item);
}

public interface IProcessingBlock<T> : IActionBlock, IMessageSink<T>
{
void Complete();
}

public static class Block
{
class TaskBlock : IActionBlock
{
readonly Task worker;

internal TaskBlock(Task worker) {
this.worker = worker;
}

public Task Completion => worker;
}

class SequenceProcessingBlock<T> : IProcessingBlock<T>
{
readonly BlockingCollection<T> inputs;
readonly Action<IEnumerable<T>> process;
readonly Task worker;

internal SequenceProcessingBlock(TaskFactory tasks, Action<IEnumerable<T>> process, int? maxQueue) {
this.inputs = maxQueue.HasValue ? new BlockingCollection<T>(maxQueue.Value) : new BlockingCollection<T>();
this.process = process;
this.worker = tasks.StartNew(() => process(inputs.GetConsumingEnumerable()), TaskCreationOptions.LongRunning);
}

public Task Completion => worker;
public void Complete() => inputs.CompleteAdding();

public void Post(T item) => inputs.Add(item);
}

public static IActionBlock Action(Action action) =>
new TaskBlock(Task.Factory.StartNew(action, TaskCreationOptions.LongRunning));

public static IActionBlock Generator<T>(Func<IEnumerable<T>> generator, Func<IMessageSink<T>> getSink) {
var sink = getSink();
return new TaskBlock(Task.Factory.StartNew(() => {

foreach (var item in generator())
sink.Post(item);
}, TaskCreationOptions.LongRunning));
}

public static IProcessingBlock<T> Sequence<T>(Action<IEnumerable<T>> process, int? maxQueue = null) =>
new SequenceProcessingBlock<T>(Task.Factory, process, maxQueue);

public static IProcessingBlock<IEnumerable<T>> Batches<T>(Action<IEnumerable<T>> process, int? maxQueue = null) =>
Sequence<IEnumerable<T>>(xs => process(xs.SelectMany(x => x)), maxQueue);
}
}
5 changes: 5 additions & 0 deletions DataBoss.Data/Linq/MissinqLinq.cs
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,9 @@ public static TOutput[] ConvertAll<T, TOutput>(this T[] self, Converter<T, TOutp

public static T Single<T>(T[] ts) => ts.Length == 1 ? ts[0] : throw new InvalidOperationException("Array contains more than one element.");
}

public static class KeyValuePair
{
public static KeyValuePair<TKey, TValue> Create<TKey, TValue>(TKey key, TValue value) => new KeyValuePair<TKey, TValue>(key, value);
}
}
3 changes: 3 additions & 0 deletions DataBoss.Data/SequenceDataReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public static class DataReaderSchemaColumns
public const string ColumnName = "ColumnName";
public const string ColumnOrdinal = "ColumnOrdinal";
public const string ColumnSize = "ColumnSize";
public const string DataType = "DataType";
}

public static class SequenceDataReader
Expand Down Expand Up @@ -90,13 +91,15 @@ DataTable IDataReader.GetSchemaTable() {
var columnOrdinal = schema.Columns.Add(DataReaderSchemaColumns.ColumnOrdinal, typeof(int));
var columnSize = schema.Columns.Add(DataReaderSchemaColumns.ColumnSize, typeof(int));
var isNullable = schema.Columns.Add(DataReaderSchemaColumns.AllowDBNull, typeof(bool));
var dataType = schema.Columns.Add(DataReaderSchemaColumns.DataType, typeof(Type));
for(var i = 0; i != FieldCount; ++i) {
var r = schema.NewRow();
var dbType = dbTypes[i];
r[columnName] = fieldNames[i];
r[columnOrdinal] = i;
r[columnSize] = dbType.ColumnSize.HasValue ? (object)dbType.ColumnSize.Value : DBNull.Value;
r[isNullable] = dbType.IsNullable;
r[dataType] = GetFieldType(i);
schema.Rows.Add(r);
}
return schema;
Expand Down
8 changes: 7 additions & 1 deletion DataBoss.Data/SqlConnectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public static void Into(this SqlConnection connection, string destinationTable,
public static void Into(this SqlConnection connection, string destinationTable, IDataReader toInsert, DataBossBulkCopySettings settings) {
var scripter = new DataBossScripter();
connection.ExecuteNonQuery(scripter.ScriptTable(destinationTable, toInsert));
connection.Insert(null, destinationTable, toInsert, settings);
connection.Insert(destinationTable, toInsert, settings);
}

public static void Insert<T>(this SqlConnection connection, string destinationTable, IEnumerable<T> rows) =>
Expand All @@ -65,6 +65,9 @@ public static void Insert<T>(this SqlConnection connection, string destinationTa
public static void Insert<T>(this SqlConnection connection, SqlTransaction transaction, string destinationTable, IEnumerable<T> rows) =>
Insert(connection, transaction, destinationTable, rows, new DataBossBulkCopySettings());

public static void Insert<T>(this SqlConnection connection, string destinationTable, IEnumerable<T> rows, DataBossBulkCopySettings settings) =>
connection.Insert(null, destinationTable, rows, settings);

public static void Insert<T>(this SqlConnection connection, SqlTransaction transaction, string destinationTable, IEnumerable<T> rows, DataBossBulkCopySettings settings) =>
new DataBossBulkCopy(connection, transaction, settings).Insert(destinationTable, rows);

Expand All @@ -74,6 +77,9 @@ public static void Insert(this SqlConnection connection, string destinationTable
public static void Insert(this SqlConnection connection, SqlTransaction transaction, string destinationTable, IDataReader toInsert) =>
Insert(connection, transaction, destinationTable, toInsert, new DataBossBulkCopySettings());

public static void Insert(this SqlConnection connection, string destinationTable, IDataReader toInsert, DataBossBulkCopySettings settings) =>
Insert(connection, null, destinationTable, toInsert, settings);

public static void Insert(this SqlConnection connection, SqlTransaction transaction, string destinationTable, IDataReader toInsert, DataBossBulkCopySettings settings) =>
new DataBossBulkCopy(connection, transaction, settings).Insert(destinationTable, toInsert);

Expand Down
10 changes: 10 additions & 0 deletions DataBoss.Specs/DataBossConnectionProviderSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,15 @@ public void reset_statistics() {
.That(stats => stats["SelectCount"] == 0);
}
}

public void cleanup() {
var disposed = false;
var db = connections.NewConnection();
db.Disposed += (_, __) => disposed = true;
connections.Cleanup();
Check.That(
() => disposed,
() => connections.LiveConnections == 0);
}
}
}
40 changes: 36 additions & 4 deletions DataBoss/Diagnostics/SqlServerMaintenancePlanWizard.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.Linq;
using System.Runtime.ExceptionServices;
using DataBoss.Data;
using DataBoss.Linq;

namespace DataBoss.Diagnostics
{
Expand All @@ -20,6 +23,17 @@ public SqlServerMaintenancePlan(string server, string database, string[] command
}
}

public class SqlServerMaintenancePlanWizardErrorEventArgs : EventArgs
{
public readonly SqlConnection Connection;
public readonly Exception Error;

public SqlServerMaintenancePlanWizardErrorEventArgs(SqlConnection connection, Exception error) {
this.Connection = connection;
this.Error = error;
}
}

public class SqlServerMaintenancePlanWizard
{
public bool UpdateStatistics = true;
Expand All @@ -37,22 +51,40 @@ class IndexStatsRow
}
#pragma warning restore CS0649

public event EventHandler<SqlServerMaintenancePlanWizardErrorEventArgs> OnError;

public IEnumerable<SqlServerMaintenancePlan> MakePlans(Func<SqlConnection> getServer, string[] dbs) {
using(var server = getServer()) {
server.Open();
using(var server = getServer())
foreach(var item in MakePlans(server, dbs))
yield return item;
}
}

public IEnumerable<SqlServerMaintenancePlan> MakePlans(SqlConnection server, string[] dbs) {
var serverName = new SqlConnectionStringBuilder(server.ConnectionString).DataSource;
var reader = new DbObjectReader(server) { CommandTimeout = null };

foreach (var database in dbs) {
if(server.State != ConnectionState.Open)
server.Open();
server.ChangeDatabase(database);
yield return new SqlServerMaintenancePlan(serverName, database, GetMaintenanceCommands(reader).ToArray());
if(TryCreateMaintenancePlan(serverName, database, reader, out var result))
yield return result.Key;
else if(OnError != null)
OnError(this, new SqlServerMaintenancePlanWizardErrorEventArgs(server, result.Value));
else ExceptionDispatchInfo.Capture(result.Value).Throw();
}
}

bool TryCreateMaintenancePlan(string serverName, string database, DbObjectReader reader, out KeyValuePair<SqlServerMaintenancePlan, Exception> result) {
try {
result = KeyValuePair.Create(new SqlServerMaintenancePlan(serverName, database, GetMaintenanceCommands(reader).ToArray()), (Exception)null);
return true;
}
catch (Exception ex) {
result = KeyValuePair.Create((SqlServerMaintenancePlan)null, ex);
return false;
}

}

IEnumerable<string> GetMaintenanceCommands(DbObjectReader reader) {
Expand Down

0 comments on commit 52843b0

Please sign in to comment.