cours - pandas python tutorial




Flux de travail «grandes données» utilisant des pandas (9)

C'est le cas du pymongo. J'ai aussi prototypé en utilisant sql server, sqlite, HDF, ORM (SQLAlchemy) en python. Tout d'abord, pymongo est un DB basé sur un document, donc chaque personne serait un document ( dict of attributes). Beaucoup de gens forment une collection et vous pouvez avoir beaucoup de collections (personnes, bourse, revenu).

pd.dateframe -> pymongo Note: J'utilise chunksize dans read_csv pour le garder entre 5 et 10k (pymongo supprime le socket s'il est plus grand)

aCollection.insert((a[1].to_dict() for a in df.iterrows()))

interroger: gt = plus grand que ...

pd.DataFrame(list(mongoCollection.find({'anAttribute':{'$gt':2887000, '$lt':2889000}})))

.find() retourne un itérateur donc j'utilise couramment ichunked pour couper en itérateurs plus petits.

Que diriez-vous d'une jointure puisque je reçois normalement 10 sources de données à coller ensemble:

aJoinDF = pandas.DataFrame(list(mongoCollection.find({'anAttribute':{'$in':Att_Keys}})))

puis (dans mon cas, parfois, je dois d'abord agg sur aJoinDF avant de pouvoir "fusionner".)

df = pandas.merge(df, aJoinDF, on=aKey, how='left')

Et vous pouvez ensuite écrire les nouvelles informations à votre collection principale via la méthode de mise à jour ci-dessous. (collection logique vs sources de données physiques).

collection.update({primarykey:foo},{key:change})

Sur les recherches plus petites, il suffit de dénormaliser. Par exemple, vous avez du code dans le document et vous ajoutez simplement le texte du code de champ et effectuez une recherche dict lorsque vous créez des documents.

Maintenant, vous avez un bon jeu de données basé sur une personne, vous pouvez libérer votre logique sur chaque cas et faire plus d'attributs. Enfin, vous pouvez lire dans les pandas vos 3 indicateurs de mémoire maximum et faire des pivots / agg / exploration de données. Cela fonctionne pour moi pour 3 millions d'enregistrements avec des nombres / gros texte / catégories / codes / flotteurs / ...

Vous pouvez également utiliser les deux méthodes intégrées à MongoDB (MapReduce et framework global). Voir ici pour plus d'informations sur le framework agrégé , car il semble être plus facile que MapReduce et semble pratique pour un travail agrégé rapide. Notez que je n'ai pas besoin de définir mes champs ou relations, et je peux ajouter des éléments à un document. À l'état actuel de l'ensemble des outils PNMP, pandas, python qui change rapidement, MongoDB m'aide à me mettre au travail :)

J'ai essayé de trouver une réponse à cette question pendant plusieurs mois tout en apprenant des pandas. J'utilise SAS pour mon travail de tous les jours et c'est formidable pour son support hors-noyau. Cependant, SAS est horrible en tant que logiciel pour de nombreuses autres raisons.

Un jour, j'espère remplacer mon utilisation de SAS par python et pandas, mais je n'ai pas de workflow pour les grands ensembles de données. Je ne parle pas de «big data» qui nécessite un réseau distribué, mais plutôt des fichiers trop volumineux pour tenir dans la mémoire mais assez petits pour tenir sur un disque dur.

Ma première idée est d'utiliser HDFStore pour stocker de grands ensembles de données sur disque et extraire uniquement les données dont j'ai besoin dans des données pour les analyser. D'autres ont mentionné MongoDB comme une alternative plus facile à utiliser. Ma question est la suivante:

Quels sont les meilleurs workflows pour accomplir les tâches suivantes:

  1. Chargement de fichiers plats dans une structure de base de données permanente sur disque
  2. Interrogation de cette base de données pour extraire des données à alimenter dans une structure de données pandas
  3. Mise à jour de la base de données après manipulation de pièces sur des pandas

Des exemples du monde réel seraient très appréciés, surtout de la part de quiconque utilise des pandas sur des «données volumineuses».

Edit - un exemple de comment je voudrais que cela fonctionne:

  1. Importez de manière itérative un grand fichier plat et stockez-le dans une structure de base de données permanente sur disque. Ces fichiers sont généralement trop volumineux pour tenir dans la mémoire.
  2. Afin d'utiliser Pandas, je voudrais lire des sous-ensembles de ces données (généralement juste quelques colonnes à la fois) qui peuvent tenir dans la mémoire.
  3. Je créerais de nouvelles colonnes en effectuant diverses opérations sur les colonnes sélectionnées.
  4. Je devrais alors ajouter ces nouvelles colonnes dans la structure de la base de données.

J'essaie de trouver une façon pratique d'effectuer ces étapes. En lisant des liens sur les pandas et les pytables, il semble que l'ajout d'une nouvelle colonne pourrait poser problème.

Éditer - Répondre aux questions de Jeff spécifiquement:

  1. Je construis des modèles de risque de crédit à la consommation. Les types de données comprennent les caractéristiques de téléphone, de SSN et d'adresse; valeurs de propriété; des données désobligeantes comme des casiers judiciaires, des faillites, etc ... Les jeux de données que j'utilise chaque jour ont près de 1.000 à 2.000 champs en moyenne de types de données mixtes: variables continues, nominales et ordinales de données numériques et de caractères. J'ajoute rarement des lignes, mais j'effectue de nombreuses opérations qui créent de nouvelles colonnes.
  2. Les opérations typiques impliquent de combiner plusieurs colonnes en utilisant une logique conditionnelle dans une nouvelle colonne composée. Par exemple, if var1 > 2 then newvar = 'A' elif var2 = 4 then newvar = 'B' . Le résultat de ces opérations est une nouvelle colonne pour chaque enregistrement de mon ensemble de données.
  3. Enfin, je voudrais ajouter ces nouvelles colonnes dans la structure de données sur disque. Je répète l'étape 2, en explorant les données avec des tableaux croisés et des statistiques descriptives en essayant de trouver des relations intéressantes et intuitives au modèle.
  4. Un fichier de projet typique est généralement d'environ 1 Go. Les fichiers sont organisés de telle sorte qu'une rangée est constituée d'un enregistrement de données de consommation. Chaque ligne a le même nombre de colonnes pour chaque enregistrement. Ce sera toujours le cas.
  5. Il est assez rare que je sous-ensembles par des lignes lors de la création d'une nouvelle colonne. Cependant, il m'est assez courant de sous-créer des lignes lors de la création de rapports ou de la génération de statistiques descriptives. Par exemple, je pourrais vouloir créer une fréquence simple pour un secteur d'activité spécifique, disons les cartes de crédit au détail. Pour ce faire, je sélectionne uniquement les enregistrements dont le secteur d'activité = vente au détail, en plus des colonnes sur lesquelles je souhaite créer un rapport. Lors de la création de nouvelles colonnes, cependant, je tirerais toutes les lignes de données et seulement les colonnes dont j'ai besoin pour les opérations.
  6. Le processus de modélisation nécessite que j'analyse chaque colonne, que je recherche des relations intéressantes avec une variable de résultat et que je crée de nouvelles colonnes composées décrivant ces relations. Les colonnes que j'explore sont généralement faites en petits ensembles. Par exemple, je vais me concentrer sur un ensemble de 20 colonnes traitant simplement des valeurs de propriété et observer comment elles se rapportent à la défaillance sur un prêt. Une fois que ceux-ci sont explorés et de nouvelles colonnes sont créées, je passe ensuite à un autre groupe de colonnes, disons l'éducation collégiale, et répète le processus. Ce que je fais est de créer des variables candidates qui expliquent la relation entre mes données et certains résultats. À la toute fin de ce processus, j'applique des techniques d'apprentissage qui créent une équation à partir de ces colonnes composées.

Il est rare que j'ajoute des lignes à l'ensemble de données. Je vais presque toujours créer de nouvelles colonnes (variables ou caractéristiques dans le langage statistique / apprentissage automatique).


Comme d'autres l'ont noté, après quelques années, un équivalent de pandas «out-of-core» a émergé: dask . Bien que Dask ne soit pas un remplacement direct des pandas et de toutes ses fonctionnalités, il se distingue pour plusieurs raisons:

Dask est une bibliothèque informatique parallèle flexible pour le calcul analytique optimisée pour la planification dynamique des tâches pour les charges de travail interactives des collections "Big Data" telles que les tableaux parallèles, les dataframes et les listes qui étendent des interfaces communes telles que NumPy, Pandas ou Python. environnements de mémoire ou distribués et échelles des ordinateurs portables aux clusters.

Dask souligne les vertus suivantes:

  • Familier: Fournit un tableau NumPy parallélisé et des objets Pandas DataFrame
  • Flexible: Fournit une interface de planification des tâches pour plus de charges de travail personnalisées et l'intégration avec d'autres projets.
  • Native: Active l'informatique distribuée en Pure Python avec accès à la pile PyData.
  • Rapide: Fonctionne avec un surcoût faible, une faible latence et une sérialisation minimale nécessaire aux algorithmes numériques rapides
  • Échelle: s'exécute de manière élastique sur les clusters avec des milliers de cœurs. Échelle: Trivial pour configurer et exécuter sur un ordinateur portable en un seul processus.
  • Réactif: Conçu en pensant à l'informatique interactive, il fournit une rétroaction rapide et des diagnostics pour aider les humains

et pour ajouter un échantillon de code simple:

import dask.dataframe as dd
df = dd.read_csv('2015-*-*.csv')
df.groupby(df.user_id).value.mean().compute()

remplace un code pandas comme celui-ci:

import pandas as pd
df = pd.read_csv('2015-01-01.csv')
df.groupby(df.user_id).value.mean()

et, particulièrement remarquable, fournit à travers l'interface concurrente.futures un général pour la soumission de tâches personnalisées:

from dask.distributed import Client
client = Client('scheduler:port')

futures = []
for fn in filenames:
    future = client.submit(load, fn)
    futures.append(future)

summary = client.submit(summarize, futures)
summary.result()

Il est intéressant de mentionner ici Ray ,
c'est un framework de calcul distribué, qui a sa propre implémentation pour les pandas de manière distribuée.

Il suffit de remplacer l'importation de pandas, et le code devrait fonctionner tel quel:

# import pandas as pd
import ray.dataframe as pd

#use pd as usual

peut lire plus de détails ici:

https://rise.cs.berkeley.edu/blog/pandas-on-ray/


Il y a maintenant, deux ans après la question, un équivalent de pandas «out-of-core»: dask . C'est excellent! Bien qu'il ne supporte pas toutes les fonctionnalités de pandas, vous pouvez aller très loin avec lui.


Je l'ai repéré un peu en retard, mais je travaille avec un problème similaire (modèles de prépaiement hypothécaire). Ma solution a été d'ignorer la couche HDFStore de pandas et d'utiliser des pytables droits. Je sauvegarde chaque colonne en tant que tableau HDF5 individuel dans mon fichier final.

Mon flux de travail de base consiste à obtenir d'abord un fichier CSV à partir de la base de données. Je le gomme, donc ce n'est pas aussi énorme. Ensuite, je convertis cela en un fichier HDF5 orienté ligne, en l'itérant sur python, en convertissant chaque ligne en un type de données réel, et en l'écrivant dans un fichier HDF5. Cela prend quelques dizaines de minutes, mais il n'utilise pas de mémoire, car il ne fonctionne que ligne par ligne. Ensuite, je "transpose" le fichier HDF5 orienté lignes dans un fichier HDF5 orienté colonnes.

La table transpose ressemble à:

def transpose_table(h_in, table_path, h_out, group_name="data", group_path="/"):
    # Get a reference to the input data.
    tb = h_in.getNode(table_path)
    # Create the output group to hold the columns.
    grp = h_out.createGroup(group_path, group_name, filters=tables.Filters(complevel=1))
    for col_name in tb.colnames:
        logger.debug("Processing %s", col_name)
        # Get the data.
        col_data = tb.col(col_name)
        # Create the output array.
        arr = h_out.createCArray(grp,
                                 col_name,
                                 tables.Atom.from_dtype(col_data.dtype),
                                 col_data.shape)
        # Store the data.
        arr[:] = col_data
    h_out.flush()

La relire ressemble alors à:

def read_hdf5(hdf5_path, group_path="/data", columns=None):
    """Read a transposed data set from a HDF5 file."""
    if isinstance(hdf5_path, tables.file.File):
        hf = hdf5_path
    else:
        hf = tables.openFile(hdf5_path)

    grp = hf.getNode(group_path)
    if columns is None:
        data = [(child.name, child[:]) for child in grp]
    else:
        data = [(child.name, child[:]) for child in grp if child.name in columns]

    # Convert any float32 columns to float64 for processing.
    for i in range(len(data)):
        name, vec = data[i]
        if vec.dtype == np.float32:
            data[i] = (name, vec.astype(np.float64))

    if not isinstance(hdf5_path, tables.file.File):
        hf.close()
    return pd.DataFrame.from_items(data)

Maintenant, je l'exécute généralement sur une machine avec une tonne de mémoire, donc je ne suis peut-être pas assez prudent avec mon utilisation de la mémoire. Par exemple, par défaut, l'opération de chargement lit l'ensemble des données.

Cela fonctionne généralement pour moi, mais c'est un peu maladroit, et je ne peux pas utiliser la magie fantaisie pytables.

Edit: Le vrai avantage de cette approche, par rapport à la valeur par défaut de pytables du tableau d'enregistrements, est que je peux ensuite charger les données dans R en utilisant h5r, qui ne peut pas gérer les tables. Ou, au moins, j'ai été incapable de l'obtenir pour charger des tables hétérogènes.


Je pense que les réponses ci-dessus manquent une approche simple que j'ai trouvée très utile.

Lorsque j'ai un fichier qui est trop volumineux pour être chargé en mémoire, je divise le fichier en plusieurs fichiers plus petits (par ligne ou par col)

Exemple: Dans le cas de 30 jours d'échange de données d'une taille de ~ 30 Go, je le casse dans un fichier par jour de ~ 1 Go de taille. Je traite ensuite chaque fichier séparément et regroupe les résultats à la fin

L'un des plus grands avantages est qu'il permet un traitement parallèle des fichiers (plusieurs threads ou processus)

L'autre avantage est que la manipulation de fichiers (comme l'ajout / suppression de dates dans l'exemple) peut être accomplie par des commandes shell régulières, ce qui n'est pas possible dans des formats de fichiers plus avancés / compliqués

Cette approche ne couvre pas tous les scénarios, mais est très utile dans beaucoup d'entre eux


Je suis récemment tombé sur un problème similaire. J'ai trouvé simplement en lisant les données en morceaux et en l'ajoutant comme je l'écris en morceaux au même csv fonctionne bien. Mon problème était d'ajouter une colonne de date basée sur des informations dans une autre table, en utilisant la valeur de certaines colonnes comme suit. Cela peut aider ceux qui sont désorientés par dask et hdf5 mais plus familier avec les pandas comme moi.

def addDateColumn():
"""Adds time to the daily rainfall data. Reads the csv as chunks of 100k 
   rows at a time and outputs them, appending as needed, to a single csv. 
   Uses the column of the raster names to get the date.
"""
    df = pd.read_csv(pathlist[1]+"CHIRPS_tanz.csv", iterator=True, 
                     chunksize=100000) #read csv file as 100k chunks

    '''Do some stuff'''

    count = 1 #for indexing item in time list 
    for chunk in df: #for each 100k rows
        newtime = [] #empty list to append repeating times for different rows
        toiterate = chunk[chunk.columns[2]] #ID of raster nums to base time
        while count <= toiterate.max():
            for i in toiterate: 
                if i ==count:
                    newtime.append(newyears[count])
            count+=1
        print "Finished", str(chunknum), "chunks"
        chunk["time"] = newtime #create new column in dataframe based on time
        outname = "CHIRPS_tanz_time2.csv"
        #append each output to same csv, using no header
        chunk.to_csv(pathlist[2]+outname, mode='a', header=None, index=None)

Si vos jeux de données sont entre 1 et 20 Go, vous devriez obtenir un poste de travail avec 48 Go de RAM. Ensuite, les Pandas peuvent contenir tout le jeu de données en RAM. Je sais que ce n'est pas la réponse que vous cherchez ici, mais faire du calcul scientifique sur un ordinateur portable avec 4 Go de RAM n'est pas raisonnable.


Une autre variation

De nombreuses opérations effectuées sur des pandas peuvent également être effectuées en tant que requête db (sql, mongo)

L'utilisation d'un SGBDR ou d'un mongodb vous permet d'effectuer certaines agrégations dans la requête DB (qui est optimisée pour les données volumineuses et utilise efficacement le cache et les index).

Plus tard, vous pouvez effectuer un post-traitement en utilisant des pandas.

L'avantage de cette méthode est que vous gagnez les optimisations DB pour travailler avec des données volumineuses, tout en définissant la logique dans une syntaxe déclarative de haut niveau - sans avoir à gérer les détails de décider quoi faire en mémoire et quoi faire du noyau.

Et bien que le langage de requête et les pandas soient différents, il n'est généralement pas compliqué de traduire une partie de la logique de l'un à l'autre.





large-data