c# net Zeilensatz mit CLR und GZIP komprimieren




zip datei entpacken c# (2)

Ich möchte eine große Tabelle komprimieren, die historische Daten enthält, die selten oder überhaupt nicht gelesen werden. Ich habe zuerst versucht, die varchar(max) Komprimierungen zu verwenden ( row , page , column stored , varchar(max) column-stored archive ), aber keiner von ihnen kann varchar(max) , nvarchar(max) ) nvarchar(max) und schließlich beenden versuchen, CLR Lösung zu verwenden.

Die SQL Server Compressed Rowset-Beispiellösung komprimiert den gesamten Zeilensatz, der von einer bestimmten Abfrage zurückgegeben wird, mit einem benutzerdefinierten CLR Typ.

Beispielsweise:

CREATE TABLE Archive
(
     [Date] DATETIME2 DEFAULT(GETUTCDATE())
    ,[Data] [dbo].[CompressedRowset]
)

INSERT INTO Archive([Data])
SELECT [dbo].[CompressQueryResults]('SELECT * FROM [dbo].[A]')

Es funktioniert, aber ich habe die folgenden Probleme festgestellt:

  • Wenn ich versuche, einen großen Ergebniszeilensatz zu komprimieren, erhalte ich den folgenden Fehler :

    Meldung 0, Ebene 11, Status 0, Zeile 0 Beim aktuellen Befehl ist ein schwerwiegender Fehler aufgetreten. Die Ergebnisse, falls vorhanden, sollten verworfen werden.

    Außerdem funktioniert die folgende Anweisung:

    SELECT [dbo].[CompressQueryResults] ('SELECT * FROM [dbo].[LargeA]')

    aber diese sind nicht:

    INSERT INTO Archive
    SELECT [dbo].[CompressQueryResults] ('SELECT * FROM [dbo].[LargeA]'
    
    DECLARE @A [dbo].[CompressedRowset]
    SELECT @A = [dbo].[CompressQueryResults] ('SELECT * FROM [dbo].[LargeA]')
  • Um einen t-sql type zu komprimieren, sollte der t-sql type auf .net type abgebildet werden. Leider gilt dies nicht für alle SQL-Typen - Mapping CLR Parameter Data ; Ich habe bereits die folgende Funktion erweitert, um mehr Typen zu behandeln, aber zum Beispiel mit Typen wie geography umgehen :

    static SqlDbType ToSqlType(Type t){
        if (t == typeof(int)){
            return SqlDbType.Int;
        }
    
        ...
    
        if (t == typeof(Byte[])){
            return SqlDbType.VarBinary;
        } else {
            throw new NotImplementedException("CLR Type " + t.Name + " Not supported for conversion");
        }
    }

Hier ist der ganze .net Code:

using System;
using System.Data;
using System.Data.SqlClient;
using System.Data.SqlTypes;
using Microsoft.SqlServer.Server;
using System.IO;
using System.Runtime.Serialization.Formatters.Binary;
using System.IO.Compression;
using System.Xml.Serialization;
using System.Xml;

[Serializable]
[Microsoft.SqlServer.Server.SqlUserDefinedType
    (
        Format.UserDefined
        ,IsByteOrdered = false
        ,IsFixedLength = false
        ,MaxByteSize = -1
    )
]
public struct CompressedRowset : INullable, IBinarySerialize, IXmlSerializable
{
    DataTable rowset;

    public DataTable Data
    {
        get { return this.rowset; }
        set { this.rowset = value; }
    }

    public override string ToString()
    {
        using (var sw = new StringWriter())
        using (var xw = new XmlTextWriter(sw))
        {
            WriteXml(xw);
            xw.Flush();
            sw.Flush();
            return sw.ToString();
        }
    }

    public bool IsNull
    {
        get { return (this.rowset == null);}
    }

    public static CompressedRowset Null
    {
        get
        {
            CompressedRowset h = new CompressedRowset();
            return h;
        }
    }

    public static CompressedRowset Parse(SqlString s)
    {
        using (var sr = new StringReader(s.Value))
        using (var xr = new XmlTextReader(sr))
        {
            var c = new CompressedRowset();
            c.ReadXml(xr);
            return c;
        }
    }


    #region "Stream Wrappers"
    abstract class WrapperStream : Stream
    {
        public override bool CanSeek
        {
            get { return false; }
        }

        public override bool CanWrite
        {
            get { return false; }
        }

        public override void Flush()
        {

        }

        public override long Length
        {
            get { throw new NotImplementedException(); }
        }

        public override long Position
        {
            get
            {
                throw new NotImplementedException();
            }
            set
            {
                throw new NotImplementedException();
            }
        }


        public override long Seek(long offset, SeekOrigin origin)
        {
            throw new NotImplementedException();
        }

        public override void SetLength(long value)
        {
            throw new NotImplementedException();
        }


    }

    class BinaryWriterStream : WrapperStream
    {
        BinaryWriter br;
        public BinaryWriterStream(BinaryWriter br)
        {
            this.br = br;
        }
        public override bool CanRead
        {
            get { return false; }
        }
        public override bool CanWrite
        {
            get { return true; }
        }
        public override int Read(byte[] buffer, int offset, int count)
        {
            throw new NotImplementedException();
        }
        public override void Write(byte[] buffer, int offset, int count)
        {
            br.Write(buffer, offset, count);
        }
    }

    class BinaryReaderStream : WrapperStream
    {
        BinaryReader br;
        public BinaryReaderStream(BinaryReader br)
        {
            this.br = br;
        }
        public override bool CanRead
        {
            get { return true; }
        }
        public override bool CanWrite
        {
            get { return false; }
        }
        public override int Read(byte[] buffer, int offset, int count)
        {
            return br.Read(buffer, offset, count);
        }
        public override void Write(byte[] buffer, int offset, int count)
        {
            throw new NotImplementedException();
        }
    }
    #endregion

    #region "IBinarySerialize"
    public void Read(System.IO.BinaryReader r)
    {
        using (var rs = new BinaryReaderStream(r))
        using (var cs = new GZipStream(rs, CompressionMode.Decompress))
        {
            var ser = new BinaryFormatter();
            this.rowset = (DataTable)ser.Deserialize(cs);
        }
    }
    public void Write(System.IO.BinaryWriter w)
    {
        if (this.IsNull)
            return;

        rowset.RemotingFormat = SerializationFormat.Binary;
        var ser = new BinaryFormatter();
        using (var binaryWriterStream = new BinaryWriterStream(w))
        using (var compressionStream = new GZipStream(binaryWriterStream, CompressionMode.Compress))
        {
            ser.Serialize(compressionStream, rowset);
        }

    }

    #endregion

    /// <summary>
    /// This procedure takes an arbitrary query, runs it and compresses the results into a varbinary(max) blob.
    /// If the query has a large result set, then this procedure will use a large amount of memory to buffer the results in 
    /// a DataTable, and more to copy it into a compressed buffer to return. 
    /// </summary>
    /// <param name="query"></param>
    /// <param name="results"></param>
    //[Microsoft.SqlServer.Server.SqlProcedure]
    [SqlFunction(DataAccess = DataAccessKind.Read, SystemDataAccess = SystemDataAccessKind.Read, IsDeterministic = false, IsPrecise = false)]
    public static CompressedRowset CompressQueryResults(string query)
    {
        //open a context connection
        using (var con = new SqlConnection("Context Connection=true"))
        {
            con.Open();
            var cmd = new SqlCommand(query, con);
            var dt = new DataTable();
            using (var rdr = cmd.ExecuteReader())
            {
                dt.Load(rdr);
            }
            //configure the DataTable for binary serialization
            dt.RemotingFormat = SerializationFormat.Binary;
            var bf = new BinaryFormatter();

            var cdt = new CompressedRowset();
            cdt.rowset = dt;
            return cdt;


        }
    }

    /// <summary>
    /// partial Type mapping between SQL and .NET
    /// </summary>
    /// <param name="t"></param>
    /// <returns></returns>
    static SqlDbType ToSqlType(Type t)
    {
        if (t == typeof(int))
        {
            return SqlDbType.Int;
        }
        if (t == typeof(string))
        {
            return SqlDbType.NVarChar;
        }
        if (t == typeof(Boolean))
        {
            return SqlDbType.Bit;
        }
        if (t == typeof(decimal))
        {
            return SqlDbType.Decimal;
        }
        if (t == typeof(float))
        {
            return SqlDbType.Real;
        }
        if (t == typeof(double))
        {
            return SqlDbType.Float;
        }
        if (t == typeof(DateTime))
        {
            return SqlDbType.DateTime;
        }
        if (t == typeof(Int64))
        {
            return SqlDbType.BigInt;
        }
        if (t == typeof(Int16))
        {
            return SqlDbType.SmallInt;
        }
        if (t == typeof(byte))
        {
            return SqlDbType.TinyInt;
        }
        if ( t == typeof(Guid))
        {
            return SqlDbType.UniqueIdentifier;
        }
        //!!!!!!!!!!!!!!!!!!!
        if (t == typeof(Byte[]))
        {
            return SqlDbType.VarBinary;
        }   
        else
        {
            throw new NotImplementedException("CLR Type " + t.Name + " Not supported for conversion");
        }

    }

    /// <summary>
    /// This stored procedure takes a compressed DataTable and returns it as a resultset to the clinet
    /// or into a table using exec .... into ...
    /// </summary>
    /// <param name="results"></param>
    [Microsoft.SqlServer.Server.SqlProcedure]
    public static void UnCompressRowset(CompressedRowset results)
    {
        if (results.IsNull)
            return;

        DataTable dt = results.rowset;
        var fields = new SqlMetaData[dt.Columns.Count];
        for (int i = 0; i < dt.Columns.Count; i++)
        {
            var col = dt.Columns[i];
            var sqlType = ToSqlType(col.DataType);
            var colName = col.ColumnName;
            if (sqlType == SqlDbType.NVarChar || sqlType == SqlDbType.VarBinary)
            {
                fields[i] = new SqlMetaData(colName, sqlType, col.MaxLength);
            }
            else
            {
                fields[i] = new SqlMetaData(colName, sqlType);
            }
        }
        var record = new SqlDataRecord(fields);

        SqlContext.Pipe.SendResultsStart(record);
        foreach (DataRow row in dt.Rows)
        {
            record.SetValues(row.ItemArray);
            SqlContext.Pipe.SendResultsRow(record);
        }
        SqlContext.Pipe.SendResultsEnd();

    }

    public System.Xml.Schema.XmlSchema GetSchema()
    {
        return null;
    }

    public void ReadXml(System.Xml.XmlReader reader)
    {
        if (rowset != null)
        {
            throw new InvalidOperationException("rowset already read");
        }
        var ser = new XmlSerializer(typeof(DataTable));
        rowset = (DataTable)ser.Deserialize(reader);
    }

    public void WriteXml(System.Xml.XmlWriter writer)
    {
        if (String.IsNullOrEmpty(rowset.TableName))
            rowset.TableName = "Rows";

        var ser = new XmlSerializer(typeof(DataTable));
        ser.Serialize(writer, rowset);
    }
}

und hier ist die Erstellung von SQL-Objekten:

CREATE TYPE [dbo].[CompressedRowset]
     EXTERNAL NAME [CompressedRowset].[CompressedRowset];

GO

CREATE FUNCTION [dbo].[CompressQueryResults] (@query [nvarchar](4000))
RETURNS [dbo].[CompressedRowset]
AS EXTERNAL NAME [CompressedRowset].[CompressedRowset].[CompressQueryResults];

GO

CREATE PROCEDURE [dbo].[UnCompressRowset] @results [dbo].[CompressedRowset]
AS EXTERNAL NAME [CompressedRowset].[CompressedRowset].[UnCompressRowset];

GO

Für die ursprüngliche Frage wahrscheinlich zu spät, aber für andere ist es vielleicht eine Überlegung wert: In SQL Server 2016 gibt es Komprimierungs- und Dekomprimierungsfunktionen (siehe hier und hier ), die hier nützlich sein könnten, wenn die Daten, die Sie archivieren möchten, groß sind Werte in [N]VARCHAR VARBINARY und VARBINARY Spalten.

Sie müssten dies in Ihrer Business-Logik-Ebene erstellen oder eine Vereinbarung in SQL Server erstellen, wobei Sie Ihre unkomprimierte Tabelle als Ansicht auf eine Sicherungstabelle replizieren (wo die komprimierten Werte sind) und die unkomprimierten Daten über DECOMPRESS und INSTEAD OF triggers, die die Backing-Tabelle aktualisieren (so verhält sich die View wie die Originaltabelle für select / insert / update / delete neben Performance-Unterschieden). Ein bisschen hacky, aber es würde funktionieren ...

Bei älteren SQL-Versionen könnten Sie wahrscheinlich auch eine CLR-Funktion schreiben, um den Job zu erledigen.

Diese Methode funktioniert natürlich nicht bei Datensätzen, die aus kleinen Feldern bestehen. Dieser Komprimierungsstil wird bei kleinen Werten nichts erreichen (in Wirklichkeit werden sie größer).


Haben Sie stattdessen in Betracht gezogen, eine neue 'Archiv' Datenbank zu erstellen (vielleicht auf ein einfaches Wiederherstellungsmodell gesetzt), wo Sie alle Ihre alten Daten ablegen? Das könnte problemlos in Abfragen zugegriffen werden, also keine Schmerzen da zB

SELECT * FROM archive..olddata

Wenn Sie die Datenbank erstellen, legen Sie sie auf eine andere Festplatte und gehen Sie bei der Sicherungsprozedur anders vor - vielleicht führen Sie das Archivierungsverfahren einmal pro Woche durch, dann muss es erst danach gesichert werden - und nachdem Sie es fast gequetscht haben Null mit 7 Zip / Rar.

Versuchen Sie nicht, die Datenbank mit NTFS-Komprimierung zu komprimieren, SQL Server unterstützt sie nicht - das habe ich selbst herausgefunden, ein sehr sehr später Abend :)





sqlclr