1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
|
using System;
using System.Collections.Generic;
using System.Text;
using System.Data;
using System.Data.SqlClient;
using System.Data.SqlTypes;
namespace indigo
{
public class RingoIndexData : BingoIndexData
{
object _sync_object = new Object();
public RingoIndexData (BingoIndexID id, string id_column, string data_column, string bingo_schema) :
base(id, id_column, data_column, bingo_schema)
{
}
public override IndexType getIndexType ()
{
return IndexType.Reaction;
}
public override void CreateTables (SqlConnection conn)
{
base.CreateTables(conn);
StringBuilder cmd = new StringBuilder();
// Create shadow table
cmd.AppendFormat(@"CREATE TABLE {0}
(id int not null, storage_id int not null, crf varbinary(max), hash int not null)", shadowTable);
BingoSqlUtils.ExecNonQuery(conn, cmd.ToString());
}
public override void createIndices (SqlConnection conn)
{
BingoSqlUtils.ExecNonQuery(conn,
"ALTER TABLE {0} ADD PRIMARY KEY (storage_id)", shadowTable);
BingoSqlUtils.ExecNonQuery(conn,
"CREATE UNIQUE INDEX id ON {0}(id)", shadowTable);
BingoSqlUtils.ExecNonQuery(conn,
"CREATE INDEX hash ON {0}(hash)", shadowTable);
}
public override void DropTables (SqlConnection conn)
{
base.DropTables(conn);
BingoSqlUtils.ExecNonQueryNoThrow(conn, "DROP TABLE " + shadowTable);
}
DataTable shadow_datatable = null;
public void addToShadowTable (SqlConnection conn, RingoIndex index, int id, int storage_id)
{
lock (_sync_object)
{
if (shadow_datatable == null)
_createDataTable();
if (shadow_datatable.Rows.Count >= 10000)
_flushShadowTable(conn);
DataRow shadow_row = shadow_datatable.NewRow();
shadow_row["id"] = id;
shadow_row["storage_id"] = storage_id;
shadow_row["crf"] = index.crf;
shadow_row["hash"] = index.hash;
shadow_datatable.Rows.Add(shadow_row);
}
}
private void _createDataTable ()
{
shadow_datatable = new DataTable();
DataColumnCollection sc = shadow_datatable.Columns;
sc.Add(new DataColumn("id", Type.GetType("System.Int32")));
sc.Add(new DataColumn("storage_id", Type.GetType("System.Int32")));
sc.Add(new DataColumn("crf", Type.GetType("System.Array")));
sc.Add(new DataColumn("hash", Type.GetType("System.Int32")));
}
public override bool needFlush()
{
lock (_sync_object)
{
if (base.needFlush())
return true;
return shadow_datatable != null && shadow_datatable.Rows.Count > 0;
}
}
public override void flush(SqlConnection conn)
{
lock (_sync_object)
{
base.flush(conn);
_flushShadowTable(conn);
}
}
private void _flushShadowTable (SqlConnection conn)
{
if (shadow_datatable == null || shadow_datatable.Rows.Count == 0)
return;
if (conn.ConnectionString == "context connection=true")
{
// SqlBulkInsert cannot be used in the context connection
_flushShadowTableInContext(conn);
return;
}
BingoTimer timer = new BingoTimer("shadow_table.flush");
using (SqlTransaction transaction =
conn.BeginTransaction())
{
// Copy shadow table
using (SqlBulkCopy bulkCopy = new SqlBulkCopy(conn,
SqlBulkCopyOptions.TableLock, transaction))
{
bulkCopy.DestinationTableName = shadowTable;
foreach (DataColumn dc in shadow_datatable.Columns)
bulkCopy.ColumnMappings.Add(dc.ColumnName, dc.ColumnName);
bulkCopy.BatchSize = shadow_datatable.Rows.Count;
bulkCopy.BulkCopyTimeout = 3600;
bulkCopy.WriteToServer(shadow_datatable);
}
shadow_datatable.Rows.Clear();
transaction.Commit();
}
timer.end();
}
private void _flushShadowTableInContext (SqlConnection conn)
{
foreach (DataRow row in shadow_datatable.Rows)
{
using (SqlCommand cmd = new SqlCommand())
{
cmd.CommandTimeout = 3600;
StringBuilder cmd_text = new StringBuilder();
cmd_text.AppendFormat("INSERT INTO {0} VALUES ", shadowTable);
cmd_text.AppendFormat("({0}, {1}, @crf, {2})",
row["id"], row["storage_id"], row["hash"]);
cmd.Parameters.AddWithValue("@crf", new SqlBinary((byte[])row["crf"]));
cmd.Connection = conn;
cmd.CommandText = cmd_text.ToString();
cmd.ExecuteNonQuery();
}
}
shadow_datatable.Rows.Clear();
}
public override void prepareForDeleteRecord (SqlConnection conn)
{
lock (_sync_object)
{
_flushShadowTable(conn);
}
}
}
}
|