高速 - python 大量 データ




Pandas を使った "大規模なデータ"ワークフロー (9)

私はパンダを学んでいる間、この質問への答えを何ヶ月も困惑させようとしました。 私はSASを日常業務に使用しており、アウトオブコアのサポートには最適です。 しかし、SASは他の多くの理由から、ソフトウェアとしては恐ろしいものです。

ある日、私はSASの使用をPythonとpandasに置き換えたいと考えていますが、現在は大規模なデータセットに対してコア外のワークフローがありません。 私は、分散ネットワークを必要とする "ビッグデータ"については言及していませんが、メモリには収まらないほど大きすぎるファイルですが、ハードドライブに収まるほど小さいファイルについては言いません。

私の最初の考えは、 HDFStoreを使用して大規模なデータセットをディスクに保持し、分析のために必要な部分のみをデータHDFStoreにプルすることです。 他の人は、MongoDBを使いやすいものとして挙げています。 私の質問はこれです:

以下を達成するためのベストプラクティスのワークフローは何ですか?

  1. フラットファイルをディスク上の永続的なデータベース構造にロードする
  2. そのデータベースに問い合わせて、パンダのデータ構造にデータを取り込む
  3. パンダで作品を操作した後のデータベースの更新

実際の例は、特に "大規模なデータ"でパンダを使用する人から非常に高く評価されます。

編集 - これをどのように機能させたいかの例:

  1. 大きなフラットファイルを繰り返しインポートし、ディスク上の永続的なデータベース構造に格納します。 これらのファイルは通常、大きすぎてメモリに収まらない。
  2. Pandasを使用するために、私はこのデータのサブセット(通常は一度に数列)を読み込み、メモリに収めることができます。
  3. 選択した列に対してさまざまな操作を実行して新しい列を作成します。
  4. これらの新しい列をデータベース構造に追加する必要があります。

私はこれらのステップを実行するベストプラクティスの方法を見つけることを試みています。 パンダとpytablesに関するリンクを読むと、新しい列を追加することが問題になるようです。

編集 - ジェフの質問に具体的に答える:

  1. 私は消費者信用リスクモデルを構築しています。 データの種類には、電話、SSN、アドレス特性などがあります。 プロパティ値。 私が毎日使っているデータセットには、数字データと文字データの連続、公称、序数変数の混合データタイプの平均で約1,000〜2,000フィールドがあります。 私はほとんど行を追加しませんが、私は新しい列を作成する多くの操作を実行します。
  2. 一般的な操作では、条件付きロジックを使用して複数の列を新しい複合列に結合します。 たとえば、 if var1 > 2 then newvar = 'A' elif var2 = 4 then newvar = 'B'です。 これらの操作の結果は、データセットのすべてのレコードの新しい列になります。
  3. 最後に、これらの新しい列をディスク上のデータ構造に追加したいと思います。 私はステップ2を繰り返し、クロス集計を使ってデータを探索し、興味深い直感的なモデル関係を見つけるための説明的な統計をとっていきます。
  4. 典型的なプロジェクトファイルは通常約1GBです。 ファイルは、行がコンシューマデータのレコードで構成されているような方法で編成されます。 各行は、すべてのレコードに対して同じ数の列を持ちます。 これは常にそうです。
  5. 新しい列を作成するときに行単位でサブセット化するのは非常にまれです。 しかし、レポートを作成したり、わかりやすい統計を生成したりするときに、行を部分集合化するのはかなり一般的です。 たとえば、特定のビジネスラインの簡単な頻度を作成することができます(Retailクレジットカードなど)。 これを行うには、報告対象の列に加えて、ビジネスライン=小売りのレコードだけを選択します。 しかし、新しい列を作成するときは、すべての行のデータとその操作に必要な列のみを取り出します。
  6. モデリングプロセスでは、すべての列を分析し、結果変数で興味深い関係を探し、それらの関係を記述する新しい複合列を作成する必要があります。 私が探索する列は、通常、小さなセットで行われます。 たとえば、プロパティ値を処理するだけで20のコラムを設定し、それらがどのようにローンのデフォルト設定に関連するかを観察します。 それらを探索して新しい列を作成したら、別の列の列、たとえば大学教育に移り、そのプロセスを繰り返します。 私がやっていることは、自分のデータとある結果の関係を説明する候補変数を作成することです。 このプロセスの最後に、これらの複合列から数式を作成するいくつかの学習テクニックを適用します。

データセットに行を追加することは稀です。 私はいつも新しい列(統計/機械学習の用語で変数や機能)を作成しています。


Rayもここで言及する価値がある、
それは分散計算フレームワークです。分散型の方法でパンダを独自に実装しています。

パンダのインポートを置き換えるだけで、コードはそのまま動作するはずです:

# import pandas as pd
import ray.dataframe as pd

#use pd as usual

ここで詳細を読むことができます:

https://rise.cs.berkeley.edu/blog/pandas-on-ray/


大規模なデータ使用の場合に役立つトリックの1つは、浮動小数点精度を32ビットに減らすことによってデータ量を減らすことです。 すべての場合に適用可能なわけではありませんが、多くのアプリケーションで64ビットの精度が高すぎるため、2倍のメモリを節約する価値があります。 明確な点をさらに明確にするには:

>>> df = pd.DataFrame(np.random.randn(int(1e8), 5))
>>> df.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000000 entries, 0 to 99999999
Data columns (total 5 columns):
...
dtypes: float64(5)
memory usage: 3.7 GB

>>> df.astype(np.float32).info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000000 entries, 0 to 99999999
Data columns (total 5 columns):
...
dtypes: float32(5)
memory usage: 1.9 GB

もう1つのバリエーション

パンダで行われた操作の多くは、dbクエリ(sql、mongo)として実行することもできます。

RDBMSまたはmongodbを使用すると、DBクエリ(大規模なデータに最適化され、キャッシュとインデックスを効率的に使用する)でいくつかの集計を実行できます。

後で、パンダを使用して後処理を実行できます。

この方法の利点は、大量のデータを処理するためのDB最適化を得る一方で、高レベルの宣言構文でロジックを定義しながら、メモリ内の処理内容と処理対象を決定する詳細を処理する必要がないことですコアの

また、クエリ言語とパンダは異なっていますが、ロジックの一部を別のものに変換するのは通常は複雑ではありません。


データセットが1〜20GBの場合、48GBのRAMを備えたワークステーションが必要です。 その後、PandasはRAM内のデータセット全体を保持できます。 あなたがここで探している答えではなく、4GBのRAMを搭載したノートパソコンで科学的な計算をするのは妥当ではないと私は思っています。


他の人が指摘しているように、数年後には、「アウトオブコア」のパンダが登場しdask 。 daskはパンダとそのすべての機能のドロップイン置換ではありませんが、それはいくつかの理由から際立っています。

Daskは、並列配列、データフレーム、およびNumPy、Pandas、Pythonイテレータのような一般的なインタフェースを大規模に拡張するリストなどの「ビッグデータ」コレクションのインタラクティブな計算作業負荷の動的タスクスケジューリングに最適化された、メモリよりも分散した環境でも、ラップトップからクラスタにまで拡張できます。

Daskは以下の美徳を強調しています:

  • 使い慣れた:並列化されたNumPy配列とPandas DataFrameオブジェクトを提供する
  • 柔軟性:より多くのカスタムワークロードと他のプロジェクトとの統合のためのタスクスケジューリングインターフェイスを提供します。
  • ネイティブ:Pure Pythonの分散コンピューティングを有効にし、PyDataスタックにアクセスします。
  • 高速:高速な数値アルゴリズムに必要な低オーバーヘッド、低遅延、および最小限のシリアライゼーションで動作します。
  • スケールアップ:1000sのコアを持つクラスタ上で弾力的に実行します。スケールダウン:単一のプロセスでラップトップをセットアップして実行することは簡単です
  • 応答性:インタラクティブコンピューティングを考慮して設計されているため、人間を支援するための迅速なフィードバックと診断が可能です

簡単なコードサンプルを追加するには:

import dask.dataframe as dd
df = dd.read_csv('2015-*-*.csv')
df.groupby(df.user_id).value.mean().compute()

次のようなパンダのコードを置き換えます:

import pandas as pd
df = pd.read_csv('2015-01-01.csv')
df.groupby(df.user_id).value.mean()

特に注目すべきことに、カスタムタスクを提出するためのコンカレント。フュージョンインタフェースを提供します。

from dask.distributed import Client
client = Client('scheduler:port')

futures = []
for fn in filenames:
    future = client.submit(load, fn)
    futures.append(future)

summary = client.submit(summarize, futures)
summary.result()

問題の2年後には、今や「アウトオブコア」のパンダに相当するものがdaskます。 それは素晴らしいです! パンダのすべての機能をサポートしているわけではありませんが、実際にはこれで実現できます。


私は少し遅れてこれを見つけましたが、私は同様の問題(モーゲージ・プリペイド・モデル)で働いています。 私の解決策は、パンダのHDFStoreレイヤーをスキップし、まっすぐなpytablesを使用することでした。 私は最終的なファイルに個々のHDF5配列として各列を保存します。

私の基本的なワークフローは、まずデータベースからCSVファイルを取得することです。 私はそれをgzipので、それは巨大ではない。 その後、行指向のHDF5ファイルに変換し、Pythonで反復処理し、各行を実際のデータ型に変換し、HDF5ファイルに書き出します。 これには数十分かかりますが、行単位でしか動作しないため、メモリは使用しません。 次に、行指向のHDF5ファイルを列指向のHDF5ファイルに「転記」します。

テーブルの転置は次のようになります。

def transpose_table(h_in, table_path, h_out, group_name="data", group_path="/"):
    # Get a reference to the input data.
    tb = h_in.getNode(table_path)
    # Create the output group to hold the columns.
    grp = h_out.createGroup(group_path, group_name, filters=tables.Filters(complevel=1))
    for col_name in tb.colnames:
        logger.debug("Processing %s", col_name)
        # Get the data.
        col_data = tb.col(col_name)
        # Create the output array.
        arr = h_out.createCArray(grp,
                                 col_name,
                                 tables.Atom.from_dtype(col_data.dtype),
                                 col_data.shape)
        # Store the data.
        arr[:] = col_data
    h_out.flush()

それを読み返してみると、次のようになります:

def read_hdf5(hdf5_path, group_path="/data", columns=None):
    """Read a transposed data set from a HDF5 file."""
    if isinstance(hdf5_path, tables.file.File):
        hf = hdf5_path
    else:
        hf = tables.openFile(hdf5_path)

    grp = hf.getNode(group_path)
    if columns is None:
        data = [(child.name, child[:]) for child in grp]
    else:
        data = [(child.name, child[:]) for child in grp if child.name in columns]

    # Convert any float32 columns to float64 for processing.
    for i in range(len(data)):
        name, vec = data[i]
        if vec.dtype == np.float32:
            data[i] = (name, vec.astype(np.float64))

    if not isinstance(hdf5_path, tables.file.File):
        hf.close()
    return pd.DataFrame.from_items(data)

さて、私は一般的に、これは大量のメモリを持つマシン上で実行するので、私は自分のメモリ使用量に十分注意しないかもしれません。 たとえば、デフォルトでは、ロード操作はデータセット全体を読み取ります。

これは一般的に私のために働いていますが、ちょっとぎこちなくて、幻想的なpytablesの魔法を使うことはできません。

編集:このアプローチの本当の利点は、レコードの配列pytablesのデフォルトよりも、テーブルを扱うことができないh5rを使ってRにデータをロードできることです。 少なくとも、異種のテーブルをロードすることができませんでした。


私は日常的にこのような方法で数十ギガバイトのデータを使用しています。たとえば、ディスク上にクエリを読み込んでデータを作成して後ろに追加するテーブルがあります。

ドキュメントを読む価値あります、このスレッドの後半では 、データを保存する方法についてのいくつかの提案があります。

データの保存方法に影響する詳細は次のとおりです。
できるだけ詳しく説明してください。 私はあなたが構造を開発するのを手助けすることができます。

  1. データのサイズ、行数、列数、列の種類。 行を追加しているのですか?
  2. 典型的な操作はどのように見えますか? たとえば列のクエリを実行して一連の行と特定の列を選択し、操作(メモリ内)を行い、新しい列を作成して保存します。
    (おもちゃの例を与えることで、より具体的な推奨事項を提供することができます)。
  3. その処理の後、あなたは何をしていますか? ステップ2はアドホックか反復可能ですか?
  4. フラットファイルを入力してください:Gbで表示される数、おおよその合計サイズ これらはどのように編成されていますか? それぞれに異なるフィールドが含まれているか、各ファイルのすべてのフィールドを含むファイルごとにいくつかのレコードがありますか?
  5. 条件に基づいて行(レコード)のサブセットを選択しますか(たとえば、フィールドA> 5の行を選択しますか?) 何かをするか、すべてのレコードでフィールドA、B、Cを選択するだけですか?
  6. すべての列を(グループ単位で)作業しているのか、レポートにのみ使用できる割合が高いのですか(たとえば、データを保持したいが、その列を明示的に取り込む必要はありません。最終的な結果の時間)?

溶液

少なくとも0.10.1パンダをインストールしていることを確認してください。

chunk-by-chunkファイル複数のテーブルクエリ繰り返し読み込みます

pytablesは行単位で操作するように最適化されているので(これはあなたが照会するものです)、フィールドのグループごとにテーブルを作成します。 この方法では、小さなグループのフィールドを選択するのは簡単です(大きなテーブルで動作しますが、この方法で効率的です...将来この制限を修正できる可能性があります...)とにかく直感的です):
(以下は擬似コードです。)

import numpy as np
import pandas as pd

# create a store
store = pd.HDFStore('mystore.h5')

# this is the key to your storage:
#    this maps your fields to a specific group, and defines 
#    what you want to have as data_columns.
#    you might want to create a nice class wrapping this
#    (as you will want to have this map and its inversion)  
group_map = dict(
    A = dict(fields = ['field_1','field_2',.....], dc = ['field_1',....,'field_5']),
    B = dict(fields = ['field_10',......        ], dc = ['field_10']),
    .....
    REPORTING_ONLY = dict(fields = ['field_1000','field_1001',...], dc = []),

)

group_map_inverted = dict()
for g, v in group_map.items():
    group_map_inverted.update(dict([ (f,g) for f in v['fields'] ]))

ファイルをappend_to_multipleストレージを作成する(基本的にappend_to_multipleが行うこと)

for f in files:
   # read in the file, additional options hmay be necessary here
   # the chunksize is not strictly necessary, you may be able to slurp each 
   # file into memory in which case just eliminate this part of the loop 
   # (you can also change chunksize if necessary)
   for chunk in pd.read_table(f, chunksize=50000):
       # we are going to append to each table by group
       # we are not going to create indexes at this time
       # but we *ARE* going to create (some) data_columns

       # figure out the field groupings
       for g, v in group_map.items():
             # create the frame for this group
             frame = chunk.reindex(columns = v['fields'], copy = False)    

             # append it
             store.append(g, frame, index=False, data_columns = v['dc'])

これで、ファイル内にすべてのテーブルが作成されました(実際には別々のファイルに格納することができますが、ファイル名をgroup_mapに追加する必要がありますが、これは必要ではないかもしれません)。

これは、列を取得して新しい列を作成する方法です。

frame = store.select(group_that_I_want)
# you can optionally specify:
# columns = a list of the columns IN THAT GROUP (if you wanted to
#     select only say 3 out of the 20 columns in this sub-table)
# and a where clause if you want a subset of the rows

# do calculations on this frame
new_frame = cool_function_on_frame(frame)

# to 'add columns', create a new group (you probably want to
# limit the columns in this new_group to be only NEW ones
# (e.g. so you don't overlap from the other tables)
# add this info to the group_map
store.append(new_group, new_frame.reindex(columns = new_columns_created, copy = False), data_columns = new_columns_created)

post_processingの準備ができたら:

# This may be a bit tricky; and depends what you are actually doing.
# I may need to modify this function to be a bit more general:
report_data = store.select_as_multiple([groups_1,groups_2,.....], where =['field_1>0', 'field_1000=foo'], selector = group_1)

data_columnsについては、実際には任意のdata_columnsを定義する必要はありません。 列に基づいて行をサブ選択することができます。 例:

store.select(group, where = ['field_1000=foo', 'field_1001>0'])

最終的なレポート作成段階で最も興味深い場合があります(基本的にデータ列は他の列から分離されています。これは、多くを定義すると効率に多少影響する可能性があります)。

また、

  • フィールドのリストを取得し、groups_mapのグループを検索し、結果を連結して結果のフレームを取得する関数を作成します(これは基本的にselect_as_multipleの機能です)。 このようにして、構造はあなたにかなり透明になります。
  • 特定のデータ列にインデックスを付ける(行サブセットをはるかに高速にする)。
  • 圧縮を有効にする。

あなたは質問があるとき私に知らせてください!


複数の小さなファイルに分割されたデータパイプラインを作成する単純な道を行くなら、 Ruffus考えてみましょう。







large-data