python - millions - pandas read large csv




«Большие данные» работают с использованием панд (9)

Я пытался много раз продумать ответ на этот вопрос, изучая панды. Я использую SAS для моей повседневной работы, и это отлично подходит для его внеочередной поддержки. Однако SAS является ужасным как часть программного обеспечения по многим другим причинам.

Однажды я надеюсь заменить мое использование SAS на python и pandas, но в настоящее время мне не хватает встроенного рабочего процесса для больших наборов данных. Я не говорю о «больших данных», для которых требуется распределенная сеть, а файлы слишком большие, чтобы вставлять их в память, но достаточно малые, чтобы они помещались на жесткий диск.

Моя первая мысль - использовать HDFStore для хранения больших наборов данных на диске и вытащить только те части, которые мне нужны, в dataframes для анализа. Другие упомянули MongoDB как более легкую в использовании альтернативу. Мой вопрос таков:

Каковы некоторые эффективные рабочие процессы для выполнения следующих задач:

  1. Загрузка плоских файлов в постоянную структуру базы данных на диске
  2. Запрос этой базы данных для извлечения данных для подачи в структуру данных pandas
  3. Обновление базы данных после манипуляции кусками в пандах

Реальные примеры будут высоко оценены, особенно от тех, кто использует панды на «больших данных».

Изменить - пример того, как я хотел бы, чтобы это работало:

  1. Итеративно импортируйте большой плоский файл и сохраните его в постоянной структуре базы данных на диске. Эти файлы обычно слишком велики для размещения в памяти.
  2. Чтобы использовать Pandas, я хотел бы прочитать подмножества этих данных (как правило, всего несколько столбцов за раз), которые могут вписываться в память.
  3. Я бы создал новые столбцы, выполнив различные операции над выбранными столбцами.
  4. Затем я должен добавить эти новые столбцы в структуру базы данных.

Я пытаюсь найти оптимальный способ выполнения этих шагов. Чтение ссылок на панды и pytables кажется, что добавление нового столбца может быть проблемой.

Изменить - Отвечая на вопросы Джеффа конкретно:

  1. Я строю модели потребительского кредитного риска. Типы данных включают телефон, SSN и характеристики адреса; значения свойств; уничижительная информация, такая как судимость, банкротства и т. д. ... Наборы данных, которые я использую каждый день, имеют от 1000 до 2000 полей в среднем по смешанным типам данных: непрерывные, номинальные и порядковые переменные как числовых, так и символьных данных. Я редко добавляю строки, но я выполняю много операций, которые создают новые столбцы.
  2. Типичные операции включают объединение нескольких столбцов с использованием условной логики в новый составной столбец. Например, if var1 > 2 then newvar = 'A' elif var2 = 4 then newvar = 'B' . Результатом этих операций является новый столбец для каждой записи в моем наборе данных.
  3. Наконец, я хотел бы добавить эти новые столбцы в структуру данных на диске. Я бы повторил шаг 2, исследуя данные с кросс-таблицами и описательной статистикой, пытаясь найти интересные, интуитивные отношения к модели.
  4. Типичный файл проекта обычно составляет около 1 ГБ. Файлы организованы таким образом, что строка состоит из записи потребительских данных. Каждая строка имеет одинаковое количество столбцов для каждой записи. Это всегда будет так.
  5. Довольно редко, когда я создавал новый столбец, я бы подмножал строки. Тем не менее, для меня довольно распространено подмножество строк при создании отчетов или генерации описательной статистики. Например, я могу создать простую частоту для определенной линии бизнеса, например, розничные кредитные карты. Для этого я бы выделил только те записи, в которых линия бизнеса = розничная торговля в дополнение к тому, какие столбцы я хочу сообщить. Однако при создании новых столбцов я бы вытащил все строки данных и только те столбцы, которые мне нужны для операций.
  6. Процесс моделирования требует, чтобы я анализировал каждый столбец, искал интересные отношения с некоторой переменной результата и создавал новые составные столбцы, описывающие эти отношения. Колонки, которые я исследую, обычно выполняются небольшими наборами. Например, я сосредоточусь на наборе из примерно 20 столбцов, посвященных только значениям свойств, и наблюдайте, как они связаны с дефолтом по кредиту. Когда они будут исследованы и будут созданы новые столбцы, я перейду к другой группе столбцов, скажем, об образовании в колледже, и повторю процесс. То, что я делаю, - это создание переменных-кандидатов, которые объясняют взаимосвязь между моими данными и некоторым результатом. В самом конце этого процесса я применяю некоторые методы обучения, которые создают уравнение из этих составных столбцов.

Редко я когда-либо добавлял строки в набор данных. Я почти всегда буду создавать новые столбцы (переменные или функции в языке статистики / машинного обучения).


Если ваши наборы данных находятся между 1 и 20 ГБ, вы должны получить рабочую станцию ​​с 48 ГБ оперативной памяти. Затем Pandas может хранить весь набор данных в ОЗУ. Я знаю, что это не тот ответ, который вы ищете здесь, но делать научные вычисления на ноутбуке с 4 ГБ ОЗУ не является разумным.


Еще одна вариация

Многие операции, выполняемые в пандах, также могут выполняться как запрос db (sql, mongo)

Использование RDBMS или mongodb позволяет выполнять некоторые из агрегатов в DB Query (который оптимизирован для больших данных и эффективно использует кеш и индексы)

Позже вы можете выполнять пост-обработку с помощью панд.

Преимущество этого метода заключается в том, что вы получаете оптимизацию БД для работы с большими данными, но при этом все еще определяете логику в декларативном синтаксисе высокого уровня - и не должны иметь дело с деталями решения, что делать в памяти и что делать ядро.

И хотя язык запросов и панды разные, обычно не сложно перевести часть логики из одной в другую.


Как уже отмечали другие, через несколько лет появился «неосновный» панд-эквивалент: dask . Хотя dask не является заменой панд и всей его функциональностью, он выделяется по нескольким причинам:

Dask - это гибкая библиотека параллельных вычислений для аналитических вычислений, оптимизированная для динамического планирования задач для интерактивных вычислительных нагрузок коллекций Big Data, таких как параллельные массивы, фреймы данных и списки, которые расширяют общие интерфейсы, такие как итераторы NumPy, Pandas или Python, чем память или распределенные среды и шкалы от ноутбуков до кластеров.

Даск подчеркивает следующие достоинства:

  • Знакомый: Предоставляет параллельный массив NumPy и объекты Pandas DataFrame
  • Гибкость: обеспечивает интерфейс планирования задач для получения дополнительных пользовательских рабочих нагрузок и интеграции с другими проектами.
  • Родной: разрешает распределенные вычисления на Pure Python с доступом к стеку PyData.
  • Быстро: работает с низкими накладными расходами, низкой задержкой и минимальной сериализацией, необходимой для быстрого численного алгоритма
  • Масштабирование: работает устойчиво на кластерах с 1000 ядрами. Масштабирование: тривиально настраивать и запускать на ноутбуке в одном процессе
  • Отзывчивый: разработанный с учетом интерактивных вычислений, он обеспечивает быструю обратную связь и диагностику, чтобы помочь людям

и добавить простой пример кода:

import dask.dataframe as dd
df = dd.read_csv('2015-*-*.csv')
df.groupby(df.user_id).value.mean().compute()

заменяет некоторый код pandas следующим образом:

import pandas as pd
df = pd.read_csv('2015-01-01.csv')
df.groupby(df.user_id).value.mean()

и, что особенно примечательно, обеспечивает через интерфейс concurrent.futures общий для представления пользовательских задач:

from dask.distributed import Client
client = Client('scheduler:port')

futures = []
for fn in filenames:
    future = client.submit(load, fn)
    futures.append(future)

summary = client.submit(summarize, futures)
summary.result()

Один трюк, который я нашел полезным для больших случаев использования данных, заключается в уменьшении объема данных за счет снижения точности float до 32-битного. Это не применимо во всех случаях, но во многих приложениях 64-битная точность переполнена, и экономия памяти 2x стоит того. Чтобы сделать очевидный момент еще более очевидным:

>>> df = pd.DataFrame(np.random.randn(int(1e8), 5))
>>> df.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000000 entries, 0 to 99999999
Data columns (total 5 columns):
...
dtypes: float64(5)
memory usage: 3.7 GB

>>> df.astype(np.float32).info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000000 entries, 0 to 99999999
Data columns (total 5 columns):
...
dtypes: float32(5)
memory usage: 1.9 GB

Теперь, через два года после вопроса, существует « dask » панд-эквивалент: dask . Это превосходно! Хотя он не поддерживает всю функциональность pandas, вы можете получить очень далеко от него.


Это относится к пимонго. Я также прототипировал использование sql-сервера, sqlite, HDF, ORM (SQLAlchemy) в python. Прежде всего pymongo - это база данных, основанная на документах, поэтому каждый человек будет документом (атрибутом атрибутов). Многие люди составляют коллекцию, и у вас может быть множество коллекций (людей, фондовый рынок, доход).

pd.dateframe -> pymongo Примечание. Я использую chunksize в read_csv чтобы сохранить его в 5-10k записей (pymongo сбрасывает сокет, если он больше)

aCollection.insert((a[1].to_dict() for a in df.iterrows()))

запрос: gt = больше ...

pd.DataFrame(list(mongoCollection.find({'anAttribute':{'$gt':2887000, '$lt':2889000}})))

.find() возвращает итератор, поэтому я обычно использую ichunked для измельчения в меньшие итераторы.

Как насчет объединения, так как я обычно получаю 10 источников данных для вставки вместе:

aJoinDF = pandas.DataFrame(list(mongoCollection.find({'anAttribute':{'$in':Att_Keys}})))

то (в моем случае иногда мне приходится сначала на aJoinDF перед его «слиянием».)

df = pandas.merge(df, aJoinDF, on=aKey, how='left')

Затем вы можете записать новую информацию в свою основную коллекцию с помощью метода обновления ниже. (логический сбор против физических источников данных).

collection.update({primarykey:foo},{key:change})

При меньших поисках просто денормализовать. Например, у вас есть код в документе, и вы просто добавляете текст кода поля и выполняете поиск по типу при создании документов.

Теперь у вас есть хороший набор данных, основанный на человеке, вы можете развязать свою логику в каждом случае и сделать больше атрибутов. Наконец, вы можете прочитать в pandas свои 3-кратные ключевые индикаторы памяти и выполнить разведку / анализ / анализ данных. Это работает для меня для 3 миллионов записей с цифрами / большим текстом / категориями / кодами / float / ...

Вы также можете использовать два метода, встроенные в MongoDB (MapReduce и агрегатная структура). См. Здесь для получения дополнительной информации об общей структуре , поскольку она кажется более простой, чем MapReduce, и выглядит удобной для быстрой агрегатной работы. Заметьте, мне не нужно было определять свои поля или отношения, и я могу добавлять элементы в документ. При текущем состоянии быстро меняющегося numpy, pandas, набора инструментов python, MongoDB помогает мне просто работать :)


Я заметил это немного поздно, но я работаю с аналогичной проблемой (модели предоплаты ипотеки). Мое решение состояло в том, чтобы пропустить слой панда HDFStore и использовать прямые pytables. Я сохраняю каждый столбец как отдельный массив HDF5 в своем конечном файле.

Мой основной рабочий процесс - сначала получить CSV-файл из базы данных. Я gzip его, поэтому он не такой огромный. Затем я конвертирую это в файл HDF5, ориентированный на ряд, итерируя его в python, преобразовывая каждую строку в реальный тип данных и записывая ее в файл HDF5. Это занимает несколько десятков минут, но не использует память, поскольку работает только последовательно. Затем я «транспонирую» ориентированный на строку HDF5 файл в файл, ориентированный на столбцы HDF5.

Транспортировка таблицы выглядит так:

def transpose_table(h_in, table_path, h_out, group_name="data", group_path="/"):
    # Get a reference to the input data.
    tb = h_in.getNode(table_path)
    # Create the output group to hold the columns.
    grp = h_out.createGroup(group_path, group_name, filters=tables.Filters(complevel=1))
    for col_name in tb.colnames:
        logger.debug("Processing %s", col_name)
        # Get the data.
        col_data = tb.col(col_name)
        # Create the output array.
        arr = h_out.createCArray(grp,
                                 col_name,
                                 tables.Atom.from_dtype(col_data.dtype),
                                 col_data.shape)
        # Store the data.
        arr[:] = col_data
    h_out.flush()

Чтение его обратно тогда выглядит так:

def read_hdf5(hdf5_path, group_path="/data", columns=None):
    """Read a transposed data set from a HDF5 file."""
    if isinstance(hdf5_path, tables.file.File):
        hf = hdf5_path
    else:
        hf = tables.openFile(hdf5_path)

    grp = hf.getNode(group_path)
    if columns is None:
        data = [(child.name, child[:]) for child in grp]
    else:
        data = [(child.name, child[:]) for child in grp if child.name in columns]

    # Convert any float32 columns to float64 for processing.
    for i in range(len(data)):
        name, vec = data[i]
        if vec.dtype == np.float32:
            data[i] = (name, vec.astype(np.float64))

    if not isinstance(hdf5_path, tables.file.File):
        hf.close()
    return pd.DataFrame.from_items(data)

Теперь я обычно запускаю это на машине с тонной памятью, поэтому я не могу быть достаточно осторожным с использованием памяти. Например, по умолчанию операция загрузки считывает весь набор данных.

Это, как правило, работает для меня, но это немного неуклюже, и я не могу использовать манеру фантазии pytables.

Изменить: реальным преимуществом такого подхода по сравнению с параметрами pitsables по умолчанию является то, что я могу затем загрузить данные в R, используя h5r, который не может обрабатывать таблицы. Или, по крайней мере, я не смог заставить его загружать гетерогенные таблицы.


Я знаю, что это старый поток, но я думаю, что библиотека Blaze стоит проверить. Он создан для таких ситуаций.

Из документов:

Blaze расширяет возможности использования NumPy и Pandas для распределенных и внеочередных вычислений. Blaze предоставляет интерфейс, аналогичный интерфейсу NumPy ND-Array или Pandas DataFrame, но сопоставляет эти знакомые интерфейсы с множеством других вычислительных движков, таких как Postgres или Spark.

Редактировать: Кстати, это поддерживается ContinuumIO и Трэвисом Олифантом, автором NumPy.


Я обычно использую десятки гигабайт данных именно таким образом, например, у меня есть таблицы на диске, которые я читаю через запросы, создаю данные и добавляю назад.

Стоит прочитать документы и в конце этого потока для нескольких предложений о том, как хранить ваши данные.

Подробности, которые повлияют на то, как вы храните свои данные, например:
Дайте как можно больше деталей; и я могу помочь вам разработать структуру.

  1. Размер данных, количество строк, столбцов, типы столбцов; вы добавляете строки или просто столбцы?
  2. Какими будут типичные операции. Например, выполните запрос по столбцам, чтобы выбрать группу строк и конкретных столбцов, затем выполните операцию (в памяти), создайте новые столбцы, сохраните их.
    (Приведение примера игрушек может позволить нам предложить более конкретные рекомендации.)
  3. После этой обработки, что вы делаете? Является ли шаг 2 ad hoc или повторяемым?
  4. Входные плоские файлы: сколько, приблизительный общий размер в Gb. Как они организованы, например, посредством записей? Каждый из них содержит разные поля или у них есть записи на каждый файл со всеми полями в каждом файле?
  5. Вы когда-нибудь выбирали подмножества строк (записей) на основе критериев (например, выбираете строки с полем A> 5)? а затем что-то сделать, или просто вы выбираете поля A, B, C со всеми записями (а затем что-то делаете)?
  6. Вы работаете над всеми вашими столбцами (в группах) или есть хорошая пропорция, которую вы можете использовать только для отчетов (например, вы хотите хранить данные вокруг, но не нужно втягивать эту колонку, пока конечное время выполнения)?

Решение

Убедитесь, что у вас установлены панды как минимум 0.10.1 .

Прочитайте повторяющиеся файлы chunk-by-chunk и несколько табличных запросов .

Поскольку pytables оптимизирован для работы по строкам (на который вы запрашиваете), мы создадим таблицу для каждой группы полей. Таким образом, легко выбрать небольшую группу полей (которая будет работать с большой таблицей, но это более эффективно сделать это таким образом ... Я думаю, что смогу исправить это ограничение в будущем ... это более интуитивно понятный):
(Ниже приведен псевдокод.)

import numpy as np
import pandas as pd

# create a store
store = pd.HDFStore('mystore.h5')

# this is the key to your storage:
#    this maps your fields to a specific group, and defines 
#    what you want to have as data_columns.
#    you might want to create a nice class wrapping this
#    (as you will want to have this map and its inversion)  
group_map = dict(
    A = dict(fields = ['field_1','field_2',.....], dc = ['field_1',....,'field_5']),
    B = dict(fields = ['field_10',......        ], dc = ['field_10']),
    .....
    REPORTING_ONLY = dict(fields = ['field_1000','field_1001',...], dc = []),

)

group_map_inverted = dict()
for g, v in group_map.items():
    group_map_inverted.update(dict([ (f,g) for f in v['fields'] ]))

Чтение в файлах и создание хранилища (по сути дела, что делает append_to_multiple ):

for f in files:
   # read in the file, additional options hmay be necessary here
   # the chunksize is not strictly necessary, you may be able to slurp each 
   # file into memory in which case just eliminate this part of the loop 
   # (you can also change chunksize if necessary)
   for chunk in pd.read_table(f, chunksize=50000):
       # we are going to append to each table by group
       # we are not going to create indexes at this time
       # but we *ARE* going to create (some) data_columns

       # figure out the field groupings
       for g, v in group_map.items():
             # create the frame for this group
             frame = chunk.reindex(columns = v['fields'], copy = False)    

             # append it
             store.append(g, frame, index=False, data_columns = v['dc'])

Теперь у вас есть все таблицы в файле (на самом деле вы могли бы хранить их в отдельных файлах, если хотите, вам нужно было бы добавить имя файла в group_map, но, вероятно, это не обязательно).

Вот как вы получаете столбцы и создаете новые:

frame = store.select(group_that_I_want)
# you can optionally specify:
# columns = a list of the columns IN THAT GROUP (if you wanted to
#     select only say 3 out of the 20 columns in this sub-table)
# and a where clause if you want a subset of the rows

# do calculations on this frame
new_frame = cool_function_on_frame(frame)

# to 'add columns', create a new group (you probably want to
# limit the columns in this new_group to be only NEW ones
# (e.g. so you don't overlap from the other tables)
# add this info to the group_map
store.append(new_group, new_frame.reindex(columns = new_columns_created, copy = False), data_columns = new_columns_created)

Когда вы будете готовы к post_processing:

# This may be a bit tricky; and depends what you are actually doing.
# I may need to modify this function to be a bit more general:
report_data = store.select_as_multiple([groups_1,groups_2,.....], where =['field_1>0', 'field_1000=foo'], selector = group_1)

О data_columns вам фактически не нужно определять ANY data_columns; они позволяют вам подбирать строки на основе столбца. Например, что-то вроде:

store.select(group, where = ['field_1000=foo', 'field_1001>0'])

Они могут быть наиболее интересны для вас на этапе подготовки окончательного отчета (по существу колонка данных отделена от других столбцов, что может повлиять на эффективность, если вы определите много).

Вы также можете захотеть:

  • создайте функцию, которая берет список полей, просматривает группы в group_map, затем выбирает их и объединяет результаты, чтобы получить результирующий фрейм (это, по сути, то, что делает select_as_multiple). Таким образом, структура будет довольно прозрачной для вас.
  • индексы на определенных столбцах данных (делает подмножество строк намного быстрее).
  • активировать сжатие.

Дайте мне знать, когда у вас возникнут вопросы!





large-data