python tutorial Flux de travail «grandes données» utilisant des pandas




pandas python tutorial (11)

J'utilise couramment des dizaines de gigaoctets de données de cette façon, par exemple j'ai des tables sur le disque que je lis via des requêtes, créer des données et ajouter.

Il vaut la peine de lire les documents et tard dans ce fil pour plusieurs suggestions sur la façon de stocker vos données.

Détails qui affecteront la façon dont vous stockez vos données, comme:
Donnez autant de détails que possible. et je peux vous aider à développer une structure.

  1. Taille des données, nombre de lignes, colonnes, types de colonnes; êtes-vous en train d'ajouter des lignes ou simplement des colonnes?
  2. À quoi ressembleront les opérations typiques? Par exemple, faites une requête sur les colonnes pour sélectionner un groupe de lignes et des colonnes spécifiques, puis effectuez une opération (en mémoire), créez de nouvelles colonnes, enregistrez-les.
    (Donner un exemple de jouet pourrait nous permettre d'offrir des recommandations plus spécifiques.)
  3. Après ce traitement, alors que faites-vous? L'étape 2 est-elle ad hoc ou répétable?
  4. Entrer les fichiers plats: combien, taille totale approximative en Gb. Comment sont-ils organisés par exemple par des disques? Est-ce que chacun contient des champs différents, ou ont-ils des enregistrements par fichier avec tous les champs de chaque fichier?
  5. Avez-vous déjà sélectionné des sous-ensembles de lignes (enregistrements) en fonction de critères (par exemple, sélectionnez les lignes avec le champ A> 5)? puis faites quelque chose, ou sélectionnez-vous simplement les champs A, B, C avec tous les enregistrements (et ensuite faites quelque chose)?
  6. Travaillez-vous sur toutes vos colonnes (en groupes), ou y a-t-il une bonne proportion que vous ne pouvez utiliser que pour les rapports (par exemple, vous voulez conserver les données, mais vous n'avez pas besoin d'extraire temps de résultat final)?

Solution

Assurez-vous d'avoir des pandas au moins 0.10.1 installés.

Lisez les fichiers d'itération morceaux par morceaux et les requêtes de tables multiples .

Puisque pytables est optimisé pour fonctionner en ligne (ce que vous recherchez), nous allons créer une table pour chaque groupe de champs. De cette façon, il est facile de sélectionner un petit groupe de champs (qui fonctionnera avec une grande table, mais il est plus efficace de le faire de cette façon ... Je pense que je pourrai peut-être corriger cette limite dans le futur ... c'est plus intuitif de toute façon):
(Ce qui suit est un pseudocode.)

import numpy as np
import pandas as pd

# create a store
store = pd.HDFStore('mystore.h5')

# this is the key to your storage:
#    this maps your fields to a specific group, and defines 
#    what you want to have as data_columns.
#    you might want to create a nice class wrapping this
#    (as you will want to have this map and its inversion)  
group_map = dict(
    A = dict(fields = ['field_1','field_2',.....], dc = ['field_1',....,'field_5']),
    B = dict(fields = ['field_10',......        ], dc = ['field_10']),
    .....
    REPORTING_ONLY = dict(fields = ['field_1000','field_1001',...], dc = []),

)

group_map_inverted = dict()
for g, v in group_map.items():
    group_map_inverted.update(dict([ (f,g) for f in v['fields'] ]))

Lire dans les fichiers et créer le stockage (essentiellement faire ce append_to_multiple fait append_to_multiple ):

for f in files:
   # read in the file, additional options hmay be necessary here
   # the chunksize is not strictly necessary, you may be able to slurp each 
   # file into memory in which case just eliminate this part of the loop 
   # (you can also change chunksize if necessary)
   for chunk in pd.read_table(f, chunksize=50000):
       # we are going to append to each table by group
       # we are not going to create indexes at this time
       # but we *ARE* going to create (some) data_columns

       # figure out the field groupings
       for g, v in group_map.items():
             # create the frame for this group
             frame = chunk.reindex(columns = v['fields'], copy = False)    

             # append it
             store.append(g, frame, index=False, data_columns = v['dc'])

Maintenant vous avez toutes les tables dans le fichier (en fait, vous pouvez les stocker dans des fichiers séparés si vous le souhaitez, vous devrez probablement ajouter le nom de fichier dans le fichier group_map, mais ce n'est probablement pas nécessaire).

Voici comment obtenir des colonnes et en créer de nouvelles:

frame = store.select(group_that_I_want)
# you can optionally specify:
# columns = a list of the columns IN THAT GROUP (if you wanted to
#     select only say 3 out of the 20 columns in this sub-table)
# and a where clause if you want a subset of the rows

# do calculations on this frame
new_frame = cool_function_on_frame(frame)

# to 'add columns', create a new group (you probably want to
# limit the columns in this new_group to be only NEW ones
# (e.g. so you don't overlap from the other tables)
# add this info to the group_map
store.append(new_group, new_frame.reindex(columns = new_columns_created, copy = False), data_columns = new_columns_created)

Lorsque vous êtes prêt pour post_processing:

# This may be a bit tricky; and depends what you are actually doing.
# I may need to modify this function to be a bit more general:
report_data = store.select_as_multiple([groups_1,groups_2,.....], where =['field_1>0', 'field_1000=foo'], selector = group_1)

A propos de data_columns, vous n'avez pas besoin de définir ANY data_columns; Ils vous permettent de sélectionner des lignes en fonction de la colonne. Par exemple quelque chose comme:

store.select(group, where = ['field_1000=foo', 'field_1001>0'])

Ils peuvent vous intéresser au stade de la génération du rapport final (essentiellement, une colonne de données est séparée des autres colonnes, ce qui peut avoir un impact sur l'efficacité si vous définissez beaucoup).

Vous pourriez aussi vouloir:

  • créez une fonction qui prend une liste de champs, recherche les groupes dans groups_map, puis les sélectionne et concatène les résultats pour obtenir l'image résultante (c'est essentiellement ce que fait select_as_multiple). De cette façon, la structure serait assez transparente pour vous.
  • indexes sur certaines colonnes de données (rend la segmentation des lignes beaucoup plus rapide).
  • activer la compression.

Faites-moi savoir quand vous avez des questions!

J'ai essayé de trouver une réponse à cette question pendant plusieurs mois tout en apprenant des pandas. J'utilise SAS pour mon travail de tous les jours et c'est formidable pour son support hors-noyau. Cependant, SAS est horrible en tant que logiciel pour de nombreuses autres raisons.

Un jour, j'espère remplacer mon utilisation de SAS par python et pandas, mais je n'ai pas de workflow pour les grands ensembles de données. Je ne parle pas de «big data» qui nécessite un réseau distribué, mais plutôt des fichiers trop volumineux pour tenir dans la mémoire mais assez petits pour tenir sur un disque dur.

Ma première idée est d'utiliser HDFStore pour stocker de grands ensembles de données sur disque et extraire uniquement les données dont j'ai besoin dans des données pour les analyser. D'autres ont mentionné MongoDB comme une alternative plus facile à utiliser. Ma question est la suivante:

Quels sont les meilleurs workflows pour accomplir les tâches suivantes:

  1. Chargement de fichiers plats dans une structure de base de données permanente sur disque
  2. Interrogation de cette base de données pour extraire des données à alimenter dans une structure de données pandas
  3. Mise à jour de la base de données après manipulation de pièces sur des pandas

Des exemples du monde réel seraient très appréciés, surtout de la part de quiconque utilise des pandas sur des «données volumineuses».

Edit - un exemple de comment je voudrais que cela fonctionne:

  1. Importez de manière itérative un grand fichier plat et stockez-le dans une structure de base de données permanente sur disque. Ces fichiers sont généralement trop volumineux pour tenir dans la mémoire.
  2. Afin d'utiliser Pandas, je voudrais lire des sous-ensembles de ces données (généralement juste quelques colonnes à la fois) qui peuvent tenir dans la mémoire.
  3. Je créerais de nouvelles colonnes en effectuant diverses opérations sur les colonnes sélectionnées.
  4. Je devrais alors ajouter ces nouvelles colonnes dans la structure de la base de données.

J'essaie de trouver une façon pratique d'effectuer ces étapes. En lisant des liens sur les pandas et les pytables, il semble que l'ajout d'une nouvelle colonne pourrait poser problème.

Éditer - Répondre aux questions de Jeff spécifiquement:

  1. Je construis des modèles de risque de crédit à la consommation. Les types de données comprennent les caractéristiques de téléphone, de SSN et d'adresse; valeurs de propriété; des données désobligeantes comme des casiers judiciaires, des faillites, etc ... Les jeux de données que j'utilise chaque jour ont près de 1.000 à 2.000 champs en moyenne de types de données mixtes: variables continues, nominales et ordinales de données numériques et de caractères. J'ajoute rarement des lignes, mais j'effectue de nombreuses opérations qui créent de nouvelles colonnes.
  2. Les opérations typiques impliquent de combiner plusieurs colonnes en utilisant une logique conditionnelle dans une nouvelle colonne composée. Par exemple, if var1 > 2 then newvar = 'A' elif var2 = 4 then newvar = 'B' . Le résultat de ces opérations est une nouvelle colonne pour chaque enregistrement de mon ensemble de données.
  3. Enfin, je voudrais ajouter ces nouvelles colonnes dans la structure de données sur disque. Je répète l'étape 2, en explorant les données avec des tableaux croisés et des statistiques descriptives en essayant de trouver des relations intéressantes et intuitives au modèle.
  4. Un fichier de projet typique est généralement d'environ 1 Go. Les fichiers sont organisés de telle sorte qu'une rangée est constituée d'un enregistrement de données de consommation. Chaque ligne a le même nombre de colonnes pour chaque enregistrement. Ce sera toujours le cas.
  5. Il est assez rare que je sous-ensembles par des lignes lors de la création d'une nouvelle colonne. Cependant, il m'est assez courant de sous-créer des lignes lors de la création de rapports ou de la génération de statistiques descriptives. Par exemple, je pourrais vouloir créer une fréquence simple pour un secteur d'activité spécifique, disons les cartes de crédit au détail. Pour ce faire, je sélectionne uniquement les enregistrements dont le secteur d'activité = vente au détail, en plus des colonnes sur lesquelles je souhaite créer un rapport. Lors de la création de nouvelles colonnes, cependant, je tirerais toutes les lignes de données et seulement les colonnes dont j'ai besoin pour les opérations.
  6. Le processus de modélisation nécessite que j'analyse chaque colonne, que je recherche des relations intéressantes avec une variable de résultat et que je crée de nouvelles colonnes composées décrivant ces relations. Les colonnes que j'explore sont généralement faites en petits ensembles. Par exemple, je vais me concentrer sur un ensemble de 20 colonnes traitant simplement des valeurs de propriété et observer comment elles se rapportent à la défaillance sur un prêt. Une fois que ceux-ci sont explorés et de nouvelles colonnes sont créées, je passe ensuite à un autre groupe de colonnes, disons l'éducation collégiale, et répète le processus. Ce que je fais est de créer des variables candidates qui expliquent la relation entre mes données et certains résultats. À la toute fin de ce processus, j'applique des techniques d'apprentissage qui créent une équation à partir de ces colonnes composées.

Il est rare que j'ajoute des lignes à l'ensemble de données. Je vais presque toujours créer de nouvelles colonnes (variables ou caractéristiques dans le langage statistique / apprentissage automatique).


Comme d'autres l'ont noté, après quelques années, un équivalent de pandas «out-of-core» a émergé: dask . Bien que Dask ne soit pas un remplacement direct des pandas et de toutes ses fonctionnalités, il se distingue pour plusieurs raisons:

Dask est une bibliothèque informatique parallèle flexible pour le calcul analytique optimisée pour la planification dynamique des tâches pour les charges de travail interactives des collections "Big Data" telles que les tableaux parallèles, les dataframes et les listes qui étendent des interfaces communes telles que NumPy, Pandas ou Python. environnements de mémoire ou distribués et échelles des ordinateurs portables aux clusters.

Dask souligne les vertus suivantes:

  • Familier: Fournit un tableau NumPy parallélisé et des objets Pandas DataFrame
  • Flexible: Fournit une interface de planification des tâches pour plus de charges de travail personnalisées et l'intégration avec d'autres projets.
  • Native: Active l'informatique distribuée en Pure Python avec accès à la pile PyData.
  • Rapide: Fonctionne avec un surcoût faible, une faible latence et une sérialisation minimale nécessaire aux algorithmes numériques rapides
  • Échelle: s'exécute de manière élastique sur les clusters avec des milliers de cœurs. Échelle: Trivial pour configurer et exécuter sur un ordinateur portable en un seul processus.
  • Réactif: Conçu en pensant à l'informatique interactive, il fournit une rétroaction rapide et des diagnostics pour aider les humains

et pour ajouter un échantillon de code simple:

import dask.dataframe as dd
df = dd.read_csv('2015-*-*.csv')
df.groupby(df.user_id).value.mean().compute()

remplace un code pandas comme celui-ci:

import pandas as pd
df = pd.read_csv('2015-01-01.csv')
df.groupby(df.user_id).value.mean()

et, particulièrement remarquable, fournit à travers l'interface concurrente.futures un général pour la soumission de tâches personnalisées:

from dask.distributed import Client
client = Client('scheduler:port')

futures = []
for fn in filenames:
    future = client.submit(load, fn)
    futures.append(future)

summary = client.submit(summarize, futures)
summary.result()

Une autre variation

De nombreuses opérations effectuées sur des pandas peuvent également être effectuées en tant que requête db (sql, mongo)

L'utilisation d'un SGBDR ou d'un mongodb vous permet d'effectuer certaines agrégations dans la requête DB (qui est optimisée pour les données volumineuses et utilise efficacement le cache et les index).

Plus tard, vous pouvez effectuer un post-traitement en utilisant des pandas.

L'avantage de cette méthode est que vous gagnez les optimisations DB pour travailler avec des données volumineuses, tout en définissant la logique dans une syntaxe déclarative de haut niveau - sans avoir à gérer les détails de décider quoi faire en mémoire et quoi faire du noyau.

Et bien que le langage de requête et les pandas soient différents, il n'est généralement pas compliqué de traduire une partie de la logique de l'un à l'autre.


Si vos jeux de données sont entre 1 et 20 Go, vous devriez obtenir un poste de travail avec 48 Go de RAM. Ensuite, les Pandas peuvent contenir tout le jeu de données en RAM. Je sais que ce n'est pas la réponse que vous cherchez ici, mais faire du calcul scientifique sur un ordinateur portable avec 4 Go de RAM n'est pas raisonnable.


Je sais que c'est un vieux fil mais je pense que la bibliothèque Blaze vaut la peine d'être vérifiée. Il est construit pour ces types de situations.

De la docs:

Blaze étend l'utilisation de NumPy et Pandas à l'informatique distribuée et hors-noyau. Blaze fournit une interface similaire à celle de NumPy ND-Array ou Pandas DataFrame mais fait correspondre ces interfaces familières à une variété d'autres moteurs de calcul tels que Postgres ou Spark.

Edit: En passant, il est soutenu par ContinuumIO et Travis Oliphant, auteur de NumPy.


Je l'ai repéré un peu en retard, mais je travaille avec un problème similaire (modèles de prépaiement hypothécaire). Ma solution a été d'ignorer la couche HDFStore de pandas et d'utiliser des pytables droits. Je sauvegarde chaque colonne en tant que tableau HDF5 individuel dans mon fichier final.

Mon flux de travail de base consiste à obtenir d'abord un fichier CSV à partir de la base de données. Je le gomme, donc ce n'est pas aussi énorme. Ensuite, je convertis cela en un fichier HDF5 orienté ligne, en l'itérant sur python, en convertissant chaque ligne en un type de données réel, et en l'écrivant dans un fichier HDF5. Cela prend quelques dizaines de minutes, mais il n'utilise pas de mémoire, car il ne fonctionne que ligne par ligne. Ensuite, je "transpose" le fichier HDF5 orienté lignes dans un fichier HDF5 orienté colonnes.

La table transpose ressemble à:

def transpose_table(h_in, table_path, h_out, group_name="data", group_path="/"):
    # Get a reference to the input data.
    tb = h_in.getNode(table_path)
    # Create the output group to hold the columns.
    grp = h_out.createGroup(group_path, group_name, filters=tables.Filters(complevel=1))
    for col_name in tb.colnames:
        logger.debug("Processing %s", col_name)
        # Get the data.
        col_data = tb.col(col_name)
        # Create the output array.
        arr = h_out.createCArray(grp,
                                 col_name,
                                 tables.Atom.from_dtype(col_data.dtype),
                                 col_data.shape)
        # Store the data.
        arr[:] = col_data
    h_out.flush()

La relire ressemble alors à:

def read_hdf5(hdf5_path, group_path="/data", columns=None):
    """Read a transposed data set from a HDF5 file."""
    if isinstance(hdf5_path, tables.file.File):
        hf = hdf5_path
    else:
        hf = tables.openFile(hdf5_path)

    grp = hf.getNode(group_path)
    if columns is None:
        data = [(child.name, child[:]) for child in grp]
    else:
        data = [(child.name, child[:]) for child in grp if child.name in columns]

    # Convert any float32 columns to float64 for processing.
    for i in range(len(data)):
        name, vec = data[i]
        if vec.dtype == np.float32:
            data[i] = (name, vec.astype(np.float64))

    if not isinstance(hdf5_path, tables.file.File):
        hf.close()
    return pd.DataFrame.from_items(data)

Maintenant, je l'exécute généralement sur une machine avec une tonne de mémoire, donc je ne suis peut-être pas assez prudent avec mon utilisation de la mémoire. Par exemple, par défaut, l'opération de chargement lit l'ensemble des données.

Cela fonctionne généralement pour moi, mais c'est un peu maladroit, et je ne peux pas utiliser la magie fantaisie pytables.

Edit: Le vrai avantage de cette approche, par rapport à la valeur par défaut de pytables du tableau d'enregistrements, est que je peux ensuite charger les données dans R en utilisant h5r, qui ne peut pas gérer les tables. Ou, au moins, j'ai été incapable de l'obtenir pour charger des tables hétérogènes.


Je pense que les réponses ci-dessus manquent une approche simple que j'ai trouvée très utile.

Lorsque j'ai un fichier qui est trop volumineux pour être chargé en mémoire, je divise le fichier en plusieurs fichiers plus petits (par ligne ou par col)

Exemple: Dans le cas de 30 jours d'échange de données d'une taille de ~ 30 Go, je le casse dans un fichier par jour de ~ 1 Go de taille. Je traite ensuite chaque fichier séparément et regroupe les résultats à la fin

L'un des plus grands avantages est qu'il permet un traitement parallèle des fichiers (plusieurs threads ou processus)

L'autre avantage est que la manipulation de fichiers (comme l'ajout / suppression de dates dans l'exemple) peut être accomplie par des commandes shell régulières, ce qui n'est pas possible dans des formats de fichiers plus avancés / compliqués

Cette approche ne couvre pas tous les scénarios, mais est très utile dans beaucoup d'entre eux


Envisager Ruffus si vous allez le chemin simple de la création d'un pipeline de données qui est divisé en plusieurs petits fichiers.


Une astuce que j'ai trouvée utile pour les cas d'utilisation de «grandes données» est de réduire le volume des données en réduisant la précision du flottant à 32 bits. Il n'est pas applicable dans tous les cas, mais dans de nombreuses applications, la précision 64 bits est excessive et les économies de mémoire 2x en valent la peine. Pour rendre un point évident encore plus évident:

>>> df = pd.DataFrame(np.random.randn(int(1e8), 5))
>>> df.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000000 entries, 0 to 99999999
Data columns (total 5 columns):
...
dtypes: float64(5)
memory usage: 3.7 GB

>>> df.astype(np.float32).info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000000 entries, 0 to 99999999
Data columns (total 5 columns):
...
dtypes: float32(5)
memory usage: 1.9 GB

Il y a maintenant, deux ans après la question, un équivalent de pandas «out-of-core»: dask . C'est excellent! Bien qu'il ne supporte pas toutes les fonctionnalités de pandas, vous pouvez aller très loin avec lui.


Je suis récemment tombé sur un problème similaire. J'ai trouvé simplement en lisant les données en morceaux et en l'ajoutant comme je l'écris en morceaux au même csv fonctionne bien. Mon problème était d'ajouter une colonne de date basée sur des informations dans une autre table, en utilisant la valeur de certaines colonnes comme suit. Cela peut aider ceux qui sont désorientés par dask et hdf5 mais plus familier avec les pandas comme moi.

def addDateColumn():
"""Adds time to the daily rainfall data. Reads the csv as chunks of 100k 
   rows at a time and outputs them, appending as needed, to a single csv. 
   Uses the column of the raster names to get the date.
"""
    df = pd.read_csv(pathlist[1]+"CHIRPS_tanz.csv", iterator=True, 
                     chunksize=100000) #read csv file as 100k chunks

    '''Do some stuff'''

    count = 1 #for indexing item in time list 
    for chunk in df: #for each 100k rows
        newtime = [] #empty list to append repeating times for different rows
        toiterate = chunk[chunk.columns[2]] #ID of raster nums to base time
        while count <= toiterate.max():
            for i in toiterate: 
                if i ==count:
                    newtime.append(newyears[count])
            count+=1
        print "Finished", str(chunknum), "chunks"
        chunk["time"] = newtime #create new column in dataframe based on time
        outname = "CHIRPS_tanz_time2.csv"
        #append each output to same csv, using no header
        chunk.to_csv(pathlist[2]+outname, mode='a', header=None, index=None)






large-data