tutorial - python plot title variable




"Große Daten" arbeiten mit Pandas (9)

Betrachten Sie Ruffus wenn Sie den einfachen Pfad zum Erstellen einer Datenpipeline verwenden, die in mehrere kleinere Dateien unterteilt ist.

Ich habe viele Monate lang versucht, eine Antwort auf diese Frage zu finden, während ich Pandas lernte. Ich benutze SAS für meine tägliche Arbeit und es ist großartig für die Unterstützung außerhalb des Kerns. Wie auch immer, SAS ist aus zahlreichen anderen Gründen eine schreckliche Software.

Eines Tages hoffe ich, meine Verwendung von SAS durch Python und Pandas zu ersetzen, aber mir fehlt derzeit ein Out-of-Core-Workflow für große Datasets. Ich spreche nicht von "Big Data", die ein verteiltes Netzwerk benötigen, sondern Dateien, die zu groß sind, um in den Speicher zu passen, aber klein genug, um auf eine Festplatte zu passen.

Mein erster Gedanke ist, HDFStore zu verwenden, um große Datensätze auf der Festplatte zu speichern und nur die Teile, die ich brauche, in Datenrahmen zur Analyse zu ziehen. Andere haben MongoDB als eine einfacher zu verwendende Alternative erwähnt. Meine Frage ist das:

Was sind Best-Practice-Workflows, um Folgendes zu erreichen:

  1. Laden von flachen Dateien in eine permanente Datenbankstruktur
  2. Abfragen dieser Datenbank, um Daten abzurufen, die in eine Pandas-Datenstruktur eingegeben werden sollen
  3. Aktualisierung der Datenbank nach Manipulation von Teilen in Pandas

Wirkliche Beispiele würden sehr geschätzt, besonders von jedem, der Pandas auf "großen Daten" verwendet.

Edit - ein Beispiel, wie ich das gerne hätte:

  1. Importieren Sie iterativ eine große Flat-Datei und speichern Sie sie in einer permanenten Datenbankstruktur. Diese Dateien sind normalerweise zu groß, um in den Speicher zu passen.
  2. Um Pandas zu verwenden, möchte ich Teilmengen dieser Daten lesen (normalerweise nur ein paar Spalten gleichzeitig), die in den Speicher passen.
  3. Ich würde neue Spalten erstellen, indem ich verschiedene Operationen für die ausgewählten Spalten durchführe.
  4. Ich müsste dann diese neuen Spalten in die Datenbankstruktur einfügen.

Ich versuche, eine Best-Practice-Methode zur Durchführung dieser Schritte zu finden. Beim Lesen von Links über Pandas und Pytables könnte das Anhängen einer neuen Spalte ein Problem sein.

Bearbeiten - Reagieren auf Jeffs Fragen speziell:

  1. Ich baue Modelle für Konsumentenkredit-Risiken auf. Die Arten von Daten umfassen Telefon-, SSN- und Adressmerkmale; Eigenschaftswerte; abfällige Informationen wie Vorstrafen, Konkurse, etc ... Die Datensätze, die ich jeden Tag verwende, haben fast 1.000 bis 2.000 Felder im Durchschnitt gemischter Datentypen: kontinuierliche, nominale und ordinale Variablen von numerischen und Zeichendaten. Ich füge nur selten Zeilen an, aber ich führe viele Operationen durch, die neue Spalten erstellen.
  2. Typische Operationen umfassen das Kombinieren mehrerer Spalten unter Verwendung von bedingter Logik in einer neuen zusammengesetzten Spalte. Zum Beispiel, if var1 > 2 then newvar = 'A' elif var2 = 4 then newvar = 'B' . Das Ergebnis dieser Vorgänge ist eine neue Spalte für jeden Datensatz in meiner Datenmenge.
  3. Schließlich möchte ich diese neuen Spalten in die Datenstruktur auf der Festplatte einfügen. Ich würde Schritt 2 wiederholen und die Daten mit Kreuztabellen und deskriptiven Statistiken untersuchen, um interessante, intuitive Beziehungen zum Modell zu finden.
  4. Eine typische Projektdatei umfasst normalerweise 1 GB. Dateien sind so organisiert, dass eine Zeile aus einem Datensatz von Verbraucherdaten besteht. Jede Zeile hat die gleiche Anzahl von Spalten für jeden Datensatz. Dies wird immer der Fall sein.
  5. Es ist ziemlich selten, dass ich beim Erstellen einer neuen Spalte nach Zeilen unterteilen würde. Es ist jedoch ziemlich üblich, dass ich bei der Erstellung von Berichten oder der Erstellung deskriptiver Statistiken Teilmengen in Zeilen unterteile. Ich möchte zum Beispiel eine einfache Frequenz für eine bestimmte Branche erstellen, z. B. Retail-Kreditkarten. Um dies zu tun, würde ich nur diejenigen Datensätze auswählen, bei denen die Branche = Einzelhandel zusätzlich zu den Spalten, über die ich berichten möchte. Beim Erstellen neuer Spalten würde ich jedoch alle Datenzeilen und nur die Spalten ziehen, die ich für die Operationen benötige.
  6. Der Modellierungsprozess erfordert, dass ich jede Spalte analysiere, nach interessanten Beziehungen mit einer Ergebnisvariable suche und neue zusammengesetzte Spalten erstelle, die diese Beziehungen beschreiben. Die Spalten, die ich erkunde, werden normalerweise in kleinen Mengen erstellt. Zum Beispiel werde ich mich auf eine Reihe von sagen 20 Spalten konzentrieren, die sich nur mit Eigenschaftswerten befassen und beobachten, wie sie sich auf den Ausfall eines Kredits beziehen. Sobald diese erkundet sind und neue Spalten erstellt werden, gehe ich zu einer anderen Gruppe von Spalten über, sagen wir College-Ausbildung, und wiederhole den Prozess. Was ich mache, ist die Erstellung von Kandidatenvariablen, die die Beziehung zwischen meinen Daten und einem bestimmten Ergebnis erklären. Ganz am Ende dieses Prozesses verwende ich einige Lerntechniken, die aus diesen zusammengesetzten Spalten eine Gleichung erstellen.

Es ist selten, dass ich dem Datensatz jemals Zeilen hinzufügen würde. Ich werde fast immer neue Spalten erstellen (Variablen oder Features im Statistik / Machine Learning-Sprachgebrauch).


Dies ist der Fall für Pymongo. Ich habe auch Prototyping mit SQL-Server, SQLite, HDF, ORM (SQLAlchemy) in Python. In erster Linie ist Pymongo eine dokumentbasierte DB, so dass jede Person ein Dokument ( dict of attributes) wäre. Viele Leute bilden eine Sammlung und Sie können viele Sammlungen haben (Leute, Börse, Einkommen).

pd.dateframe -> pymongo Hinweis: Ich verwende chunksize in read_csv , um es auf 5 bis 10k Datensätze zu halten (pymongo lässt den Socket fallen, wenn größer)

aCollection.insert((a[1].to_dict() for a in df.iterrows()))

Abfrage: gt = größer als ...

pd.DataFrame(list(mongoCollection.find({'anAttribute':{'$gt':2887000, '$lt':2889000}})))

.find() gibt einen Iterator zurück, also ichunked ich ichunked , um kleinere Iteratoren zu hacken.

Wie wäre es mit einem Join, da ich normalerweise 10 Datenquellen zum Einfügen zusammenbekomme:

aJoinDF = pandas.DataFrame(list(mongoCollection.find({'anAttribute':{'$in':Att_Keys}})))

dann (in meinem Fall muss ich manchmal zuerst auf aJoinDF bevor es "zusammenführbar" ist.)

df = pandas.merge(df, aJoinDF, on=aKey, how='left')

Und Sie können dann die neuen Informationen über die Update-Methode in Ihre Hauptsammlung schreiben. (logische Sammlung vs. physische Datenquellen).

collection.update({primarykey:foo},{key:change})

Bei kleineren Lookups einfach denormalisieren. Beispielsweise haben Sie Code im Dokument und Sie fügen einfach den dict und führen eine dict Suche durch, während Sie Dokumente erstellen.

Jetzt haben Sie einen schönen Datensatz, der auf einer Person basiert. Sie können Ihre Logik in jedem Fall freisetzen und mehr Attribute erstellen. Schließlich können Sie Ihre 3 bis max. Schlüsselindikatoren in Pandas einlesen und Pivots / Agg / Data Exploration durchführen. Das funktioniert bei mir für 3 Millionen Datensätze mit Zahlen / großem Text / Kategorien / Codes / Floats / ...

Sie können auch die beiden in MongoDB integrierten Methoden (MapReduce und Aggregat-Framework) verwenden. Weitere Informationen zum Aggregat-Framework finden Sie hier , da es einfacher zu sein scheint als MapReduce und für schnelles aggregiertes Arbeiten praktisch ist. Hinweis Ich musste meine Felder oder Beziehungen nicht definieren und kann einem Dokument Elemente hinzufügen. Beim aktuellen Zustand des sich schnell ändernden Toolkits numpy, pandas, python hilft mir MongoDB, einfach zur Arbeit zu kommen :)


Eine weitere Variante

Viele der in Pandas ausgeführten Operationen können auch als Datenbankabfrage ausgeführt werden (sql, mongo)

Mit einem RDBMS oder mongodb können Sie einige der Aggregationen in der DB-Abfrage durchführen (die für große Daten optimiert ist und Cache und Indizes effizient verwendet).

Später können Sie die Nachbearbeitung mit Pandas durchführen.

Der Vorteil dieser Methode besteht darin, dass Sie die DB-Optimierungen für die Arbeit mit großen Daten erhalten und gleichzeitig die Logik in einer deklarativen Syntax auf hoher Ebene definieren - und sich nicht mit den Details der Entscheidung, was im Speicher zu tun ist und was zu tun ist, befassen müssen von Kern.

Und obwohl die Abfragesprache und die Pandas unterschiedlich sind, ist es normalerweise nicht kompliziert, einen Teil der Logik von einem zum anderen zu übersetzen.


Es gibt jetzt, zwei Jahre nach der Frage, ein "out-of-core" Pandas-Äquivalent: dask . Es ist exzellent! Obwohl es nicht alle Pandas-Funktionen unterstützt, kannst du sehr weit kommen.


Ich benutze routinemäßig Dutzende Gigabyte an Daten auf diese Weise, zB habe ich Tabellen auf der Festplatte, die ich über Abfragen lese, Daten erstelle und zurück anfüge.

Es lohnt sich, die Dokumente und später in diesem Thread nach mehreren Vorschlägen zum Speichern Ihrer Daten zu lesen.

Details, die sich darauf auswirken, wie Sie Ihre Daten speichern:
Gib so viel Details wie du kannst; und ich kann dir helfen, eine Struktur zu entwickeln.

  1. Größe der Daten, Anzahl der Zeilen, Spalten, Spaltenarten; Hängen Sie Reihen oder nur Spalten an?
  2. Wie werden typische Operationen aussehen? ZB führen Sie eine Abfrage nach Spalten durch, um eine Reihe von Zeilen und bestimmten Spalten auszuwählen, führen Sie dann eine Operation aus (in-memory), erstellen Sie neue Spalten und speichern Sie diese.
    (Ein Spielzeugbeispiel könnte uns ermöglichen, spezifischere Empfehlungen anzubieten.)
  3. Nach dieser Verarbeitung, was machst du dann? Ist Schritt 2 ad hoc oder wiederholbar?
  4. Flache Dateien eingeben: wie viele, grobe Gesamtgröße in GB. Wie sind diese zB durch Aufzeichnungen organisiert? Enthält jede Datei unterschiedliche Felder oder haben sie einige Datensätze pro Datei mit allen Feldern in jeder Datei?
  5. Wählen Sie je nach Kriterium Teilmengen von Zeilen (Datensätzen) aus (z. B. die Zeilen mit Feld A> 5 auswählen)? und dann tun Sie etwas, oder wählen Sie nur die Felder A, B, C mit allen Datensätzen (und dann etwas tun)?
  6. Arbeiten Sie an all Ihren Spalten (in Gruppen), oder gibt es einen guten Anteil, den Sie nur für Berichte verwenden können (z. B. wenn Sie die Daten behalten möchten, aber diese Spalte nicht explizit eingeben müssen, bis Endergebnisse Zeit)?

Lösung

Stellen Sie sicher, dass Pandas mindestens 0.10.1 installiert sind.

Lesen Sie Iterationsdateien Chunk-by-Chunk und mehrere Tabellenabfragen .

Da pytables für den zeilenweisen Betrieb optimiert ist (was Sie abfragen), erstellen wir für jede Gruppe von Feldern eine Tabelle. Auf diese Weise ist es einfach, eine kleine Gruppe von Feldern auszuwählen (die mit einem großen Tisch funktionieren, aber es ist effizienter, dies auf diese Weise zu tun ... Ich denke, dass ich diese Einschränkung in der Zukunft beheben kann ... das ist es irgendwie intuitiver):
(Das Folgende ist Pseudocode.)

import numpy as np
import pandas as pd

# create a store
store = pd.HDFStore('mystore.h5')

# this is the key to your storage:
#    this maps your fields to a specific group, and defines 
#    what you want to have as data_columns.
#    you might want to create a nice class wrapping this
#    (as you will want to have this map and its inversion)  
group_map = dict(
    A = dict(fields = ['field_1','field_2',.....], dc = ['field_1',....,'field_5']),
    B = dict(fields = ['field_10',......        ], dc = ['field_10']),
    .....
    REPORTING_ONLY = dict(fields = ['field_1000','field_1001',...], dc = []),

)

group_map_inverted = dict()
for g, v in group_map.items():
    group_map_inverted.update(dict([ (f,g) for f in v['fields'] ]))

Einlesen der Dateien und Erstellen des Speichers (im Wesentlichen tun, was append_to_multiple tut):

for f in files:
   # read in the file, additional options hmay be necessary here
   # the chunksize is not strictly necessary, you may be able to slurp each 
   # file into memory in which case just eliminate this part of the loop 
   # (you can also change chunksize if necessary)
   for chunk in pd.read_table(f, chunksize=50000):
       # we are going to append to each table by group
       # we are not going to create indexes at this time
       # but we *ARE* going to create (some) data_columns

       # figure out the field groupings
       for g, v in group_map.items():
             # create the frame for this group
             frame = chunk.reindex(columns = v['fields'], copy = False)    

             # append it
             store.append(g, frame, index=False, data_columns = v['dc'])

Jetzt haben Sie alle Tabellen in der Datei (eigentlich könnten Sie sie in separaten Dateien speichern, wenn Sie möchten, müssten Sie möglicherweise den Dateinamen der group_map hinzufügen, aber wahrscheinlich ist dies nicht notwendig).

So erhalten Sie Spalten und erstellen neue:

frame = store.select(group_that_I_want)
# you can optionally specify:
# columns = a list of the columns IN THAT GROUP (if you wanted to
#     select only say 3 out of the 20 columns in this sub-table)
# and a where clause if you want a subset of the rows

# do calculations on this frame
new_frame = cool_function_on_frame(frame)

# to 'add columns', create a new group (you probably want to
# limit the columns in this new_group to be only NEW ones
# (e.g. so you don't overlap from the other tables)
# add this info to the group_map
store.append(new_group, new_frame.reindex(columns = new_columns_created, copy = False), data_columns = new_columns_created)

Wenn Sie bereit sind für post_processing:

# This may be a bit tricky; and depends what you are actually doing.
# I may need to modify this function to be a bit more general:
report_data = store.select_as_multiple([groups_1,groups_2,.....], where =['field_1>0', 'field_1000=foo'], selector = group_1)

Über data_columns müssen Sie eigentlich keine Daten_spalten definieren; Mit ihnen können Sie Zeilen basierend auf der Spalte auswählen. ZB so etwas wie:

store.select(group, where = ['field_1000=foo', 'field_1001>0'])

Sie können für Sie in der Endberichtgenerierungsphase am interessantesten sein (im Wesentlichen ist eine Datenspalte von anderen Spalten getrennt, was die Effizienz etwas beeinflussen kann, wenn Sie eine Menge definieren).

Vielleicht möchten Sie auch:

  • Erstellen Sie eine Funktion, die eine Liste von Feldern annimmt, die Gruppen in der groups_map nachschlägt, dann diese auswählt und die Ergebnisse verkettet, so dass Sie den resultierenden Rahmen erhalten (dies ist im Wesentlichen was select_as_multiple). Auf diese Weise wäre die Struktur für Sie ziemlich transparent.
  • Indizes für bestimmte Datenspalten (macht die Zeilenunterteilung viel schneller).
  • Aktivieren Sie die Komprimierung.

Lass es mich wissen, wenn du Fragen hast!


Ich bin kürzlich auf ein ähnliches Problem gestoßen. Ich fand einfach das Lesen der Daten in Chunks und Anhängen, wie ich es in Chunks schreibe, um das gleiche csv funktioniert gut. Mein Problem war das Hinzufügen einer Datumsspalte basierend auf Informationen in einer anderen Tabelle, wobei der Wert bestimmter Spalten wie folgt verwendet wurde. Dies kann denen helfen, die durch dask und hdf5 verwirrt sind, aber eher mit Pandas wie mir vertraut sind.

def addDateColumn():
"""Adds time to the daily rainfall data. Reads the csv as chunks of 100k 
   rows at a time and outputs them, appending as needed, to a single csv. 
   Uses the column of the raster names to get the date.
"""
    df = pd.read_csv(pathlist[1]+"CHIRPS_tanz.csv", iterator=True, 
                     chunksize=100000) #read csv file as 100k chunks

    '''Do some stuff'''

    count = 1 #for indexing item in time list 
    for chunk in df: #for each 100k rows
        newtime = [] #empty list to append repeating times for different rows
        toiterate = chunk[chunk.columns[2]] #ID of raster nums to base time
        while count <= toiterate.max():
            for i in toiterate: 
                if i ==count:
                    newtime.append(newyears[count])
            count+=1
        print "Finished", str(chunknum), "chunks"
        chunk["time"] = newtime #create new column in dataframe based on time
        outname = "CHIRPS_tanz_time2.csv"
        #append each output to same csv, using no header
        chunk.to_csv(pathlist[2]+outname, mode='a', header=None, index=None)

Ich habe das etwas verspätet gesehen, aber ich arbeite mit einem ähnlichen Problem (Hypothekenzahlungsmodelle). Meine Lösung bestand darin, den pandas HDFStore-Layer zu überspringen und gerade Pytables zu verwenden. Ich speichere jede Spalte als einzelnes HDF5-Array in meiner endgültigen Datei.

Mein grundlegender Arbeitsablauf besteht darin, zuerst eine CSV-Datei von der Datenbank zu erhalten. Ich gzip es, also ist es nicht so groß. Dann wandle ich das in eine reihenorientierte HDF5-Datei um, indem ich in Python darüber iteriere, jede Zeile in einen echten Datentyp umwandle und sie in eine HDF5-Datei schreibe. Das dauert einige zehn Minuten, aber es wird kein Speicher verwendet, da es nur Zeile für Zeile läuft. Dann "transponiere" ich die reihenorientierte HDF5-Datei in eine spaltenorientierte HDF5-Datei.

Die Tabelle transponiert sieht wie folgt aus:

def transpose_table(h_in, table_path, h_out, group_name="data", group_path="/"):
    # Get a reference to the input data.
    tb = h_in.getNode(table_path)
    # Create the output group to hold the columns.
    grp = h_out.createGroup(group_path, group_name, filters=tables.Filters(complevel=1))
    for col_name in tb.colnames:
        logger.debug("Processing %s", col_name)
        # Get the data.
        col_data = tb.col(col_name)
        # Create the output array.
        arr = h_out.createCArray(grp,
                                 col_name,
                                 tables.Atom.from_dtype(col_data.dtype),
                                 col_data.shape)
        # Store the data.
        arr[:] = col_data
    h_out.flush()

Es zurückzulesen sieht dann so aus:

def read_hdf5(hdf5_path, group_path="/data", columns=None):
    """Read a transposed data set from a HDF5 file."""
    if isinstance(hdf5_path, tables.file.File):
        hf = hdf5_path
    else:
        hf = tables.openFile(hdf5_path)

    grp = hf.getNode(group_path)
    if columns is None:
        data = [(child.name, child[:]) for child in grp]
    else:
        data = [(child.name, child[:]) for child in grp if child.name in columns]

    # Convert any float32 columns to float64 for processing.
    for i in range(len(data)):
        name, vec = data[i]
        if vec.dtype == np.float32:
            data[i] = (name, vec.astype(np.float64))

    if not isinstance(hdf5_path, tables.file.File):
        hf.close()
    return pd.DataFrame.from_items(data)

Nun, ich betreibe das im Allgemeinen auf einer Maschine mit einer Menge Speicher, so dass ich nicht vorsichtig genug mit meinem Speicherverbrauch sein kann. Zum Beispiel liest die Ladeoperation standardmäßig den gesamten Datensatz.

Das funktioniert in der Regel für mich, aber es ist ein wenig klobig, und ich kann die Phantasie pytables Magie nicht verwenden.

Bearbeiten: Der eigentliche Vorteil dieses Ansatzes gegenüber dem pytables-Array-of-records besteht darin, dass ich die Daten dann mithilfe von h5r, das keine Tabellen verarbeiten kann, in R laden kann. Zumindest konnte ich keine heterogenen Tabellen laden.


Ich weiß, dass dies ein alter Thread ist, aber ich denke, die Blaze Bibliothek ist es wert, sie zu überprüfen. Es ist für diese Art von Situationen gebaut.

Aus den Dokumenten:

Blaze erweitert die Benutzerfreundlichkeit von NumPy und Pandas auf verteilte und Out-of-Core-Computing. Blaze bietet eine ähnliche Schnittstelle wie NumPy ND-Array oder Pandas DataFrame, bildet diese bekannten Schnittstellen jedoch auf eine Vielzahl anderer Rechenmaschinen wie Postgres oder Spark ab.

Edit: Übrigens wird es von ContinuumIO und Travis Oliphant, Autor von NumPy, unterstützt.


Wie von anderen bemerkt, hat sich nach einigen Jahren ein "out-of-core" Pandas-Äquivalent herauskristallisiert: dask . Obwohl dask kein Ersatz für Pandas und seine Funktionalität ist, sticht es aus mehreren Gründen heraus:

Dask ist eine flexible Parallel-Computing-Bibliothek für das analytische Computing, die für die dynamische Aufgabenplanung für interaktive Rechenlasten von "Big Data" -Ansammlungen wie parallelen Arrays, Datenrahmen und Listen optimiert ist, die gängige Schnittstellen wie NumPy, Pandas oder Python-Iteratoren zu größeren erweitern. Than-Memory oder verteilte Umgebungen und Skalen von Laptops zu Clustern.

Dask betont die folgenden Tugenden:

  • Vertraut: Bietet parallelisierte NumPy-Arrays und Pandas DataFrame-Objekte
  • Flexibel: Bietet eine Task-Scheduling-Schnittstelle für mehr benutzerdefinierte Workloads und die Integration mit anderen Projekten.
  • Nativ: Ermöglicht verteiltes Computing in Pure Python mit Zugriff auf den PyData-Stack.
  • Schnell: Arbeitet mit geringem Overhead, geringer Latenz und minimaler Serialisierung für schnelle numerische Algorithmen
  • Scales up: Läuft elastisch auf Clustern mit 1000er Kernen Scales down: Trivial zum Einrichten und Ausführen auf einem Laptop in einem einzigen Prozess
  • Responsive: Entwickelt mit Interactive Computing im Hinterkopf bietet es schnelle Rückmeldung und Diagnose, um Menschen zu helfen

und um ein einfaches Codebeispiel hinzuzufügen:

import dask.dataframe as dd
df = dd.read_csv('2015-*-*.csv')
df.groupby(df.user_id).value.mean().compute()

ersetzt einige Pandas-Codes wie folgt:

import pandas as pd
df = pd.read_csv('2015-01-01.csv')
df.groupby(df.user_id).value.mean()

und, besonders bemerkenswert, bietet über die Schnittstelle concurrent.futures einen allgemeinen für die Einreichung von benutzerdefinierten Aufgaben:

from dask.distributed import Client
client = Client('scheduler:port')

futures = []
for fn in filenames:
    future = client.submit(load, fn)
    futures.append(future)

summary = client.submit(summarize, futures)
summary.result()




large-data