python - 使用大熊猫的“大数据”工作流程




mongodb pandas (9)

pymongo就是这种情况。 我也在python中使用sql server,sqlite,HDF,ORM(SQLAlchemy)进行了原型开发。 首先,pymongo是一个基于文档的数据库,因此每个人都将是一个文档(属性dict )。 许多人形成一个集合,你可以有很多集合(人,股票市场,收入)。

pd.dateframe - > pymongo注意:我使用read_csvchunksize将其保留为5到10k条记录(如果pymongo更大,则删除套接字)

aCollection.insert((a[1].to_dict() for a in df.iterrows()))

查询:gt =大于...

pd.DataFrame(list(mongoCollection.find({'anAttribute':{'$gt':2887000, '$lt':2889000}})))

.find()返回一个迭代器,所以我通常使用ichunked来切入较小的迭代器。

因为我通常可以将10个数据源粘贴在一起,所以如何加入:

aJoinDF = pandas.DataFrame(list(mongoCollection.find({'anAttribute':{'$in':Att_Keys}})))

那么(在我的情况下,有时候我必须在它的“可合并”之前首先aJoinDF一个aJoinDF )。

df = pandas.merge(df, aJoinDF, on=aKey, how='left')

然后,您可以通过下面的更新方法将新信息写入您的主要收藏。 (逻辑收集与物理数据源)。

collection.update({primarykey:foo},{key:change})

在较小的查找中,只是非规范化。 例如,您在文档中有代码,只需添加字段代码文本并在创建文档时进行dict查找。

现在你拥有一个基于人的好数据集,你可以在每个案例中释放你的逻辑并创造更多属性。 最后,您可以读入熊猫您的3个记忆最大关键指标,并进行枢轴/聚合/数据探索。 这对我来说适用于300万记录与数字/大文本/类别/代码/浮动/ ...

您还可以使用MongoDB中内置的两种方法(MapReduce和聚合框架)。 请参阅此处了解有关聚合框架的更多信息 ,因为它似乎比MapReduce更容易,并且对于快速聚合工作看起来很方便。 注意我不需要定义我的字段或关系,并且可以将项目添加到文档。 在快速变化的numpy,pandas,python工具集的当前状态下,MongoDB帮助我开始工作:)

在学习熊猫的过程中,我试图解答这个问题好几个月。 我将SAS用于日常工作,对于它的核心支持非常重要。 但是,由于许多其他原因,SAS作为一款软件很糟糕。

有一天,我希望用python和pandas取代我的SAS,但是我目前缺乏大型数据集的核心外工作流程。 我不是在谈论需要分布式网络的“大数据”,而是文件太大而不适合内存,但又足够小以适应硬盘驱动器。

我的第一个想法是使用HDFStore在磁盘上保存大型数据集,并将我需要的部分HDFStore到数据HDFStore进行分析。 其他人提到MongoDB是一种更易于使用的替代方案。 我的问题是这样的:

什么是一些最佳实践工作流程来完成以下工作:

  1. 将平面文件加载到磁盘上的永久数据库结构中
  2. 查询该数据库以检索要送入熊猫数据结构的数据
  3. 在熊猫中操作片段后更新数据库

真实世界的例子会受到大家的赞赏,尤其是那些使用“大数据”熊猫的人。

编辑 - 我希望如何工作的例子:

  1. 迭代地导入大型平面文件并将其存储在永久的磁盘数据库结构中。 这些文件通常太大而不适合内存。
  2. 为了使用熊猫,我想阅读这些数据的子集(通常只是几列),这些数据可以放在内存中。
  3. 我会通过对所选列执行各种操作来创建新列。
  4. 然后我必须将这些新列添加到数据库结构中。

我正试图找到执行这些步骤的最佳实践方式。 阅读关于熊猫和pytables的链接似乎是添加一个新列可能是一个问题。

编辑 - 特别回应Jeff的问题:

  1. 我正在建立消费者信用风险模型。 数据种类包括电话,SSN和地址特征; 财产价值; 像犯罪记录,破产等贬义性信息......我每天使用的数据集平均有近1,000至2,000个混合数据类型的字段:数字和字符数据的连续变量,名义变量和有序变量。 我很少追加行,但是我执行许多操作来创建新列。
  2. 典型的操作涉及将使用条件逻辑的多个列组合成新的复合列。 例如, if var1 > 2 then newvar = 'A' elif var2 = 4 then newvar = 'B' 。 这些操作的结果是我的数据集中每个记录的新列。
  3. 最后,我想将这些新列添加到磁盘数据结构中。 我会重复第2步,使用交叉表和描述性统计数据来探索数据,试图找到有趣,直观的模型关系。
  4. 一个典型的项目文件通常大约1GB。 文件被组织成一行,其中包含消费者数据的记录。 每行记录的每列都有相同的列数。 情况总是如此。
  5. 创建新列时,按行排序是非常罕见的。 但是,在创建报告或生成描述性统计信息时,对行进行子集的处理是非常常见的。 例如,我可能希望为特定的业务线创建一个简单的频率,比如零售信用卡。 要做到这一点,除了我想报告的任何列之外,我只会选择那些业务线=零售的记录。 但是,在创建新列时,我会提取所有数据行,并只提取操作所需的列。
  6. 建模过程要求我分析每一列,查找与某个结果变量有趣的关系,并创建描述这些关系的新复合列。 我探索的专栏通常以小集合完成。 例如,我将着重讨论一系列只是处理房产价值的20列,并观察它们与贷款违约的关系。 一旦探索了这些内容并创建了新的专栏,我就会转到另一组专栏,说大学教育,然后重复这个过程。 我正在做的是创建候选变量来解释我的数据和一些结果之间的关系。 在这个过程的最后,我应用了一些学习技术,从这些复合列中创建一个等式。

我很少会将行添加到数据集中。 我几乎总是会创建新的列(统计/机器学习术语中的变量或功能)。


在问题出现两年后的今天,还有一种“核心外”的熊猫daskdask 。 这是非常好的! 虽然它不支持所有的熊猫功能,但您可以使用它。


我发现对“大数据”用例有帮助的一个技巧是通过将浮点精度降低到32位来减少数据量。 它并不适用于所有情况,但在许多应用中,64位精度是矫枉过正的,节省2倍的内存是值得的。 更明显的一点是:

>>> df = pd.DataFrame(np.random.randn(int(1e8), 5))
>>> df.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000000 entries, 0 to 99999999
Data columns (total 5 columns):
...
dtypes: float64(5)
memory usage: 3.7 GB

>>> df.astype(np.float32).info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000000 entries, 0 to 99999999
Data columns (total 5 columns):
...
dtypes: float32(5)
memory usage: 1.9 GB

我发现这有点迟,但我也遇到类似的问题(按揭预付款模式)。 我的解决方案是跳过熊猫HDFStore图层并使用直接的pytables。 我将每列保存为最终文件中的单个HDF5阵列。

我的基本工作流程是首先从数据库中获取CSV文件。 我gzip它,所以它不是很大。 然后,我将它转换为面向行的HDF5文件,通过在python中迭代它,将每一行转换为实际的数据类型,并将其写入HDF5文件。 这需要几十分钟,但它不使用任何内存,因为它只能逐行操作。 然后,我将面向行的HDF5文件转换为一个面向列的HDF5文件。

表转置看起来像:

def transpose_table(h_in, table_path, h_out, group_name="data", group_path="/"):
    # Get a reference to the input data.
    tb = h_in.getNode(table_path)
    # Create the output group to hold the columns.
    grp = h_out.createGroup(group_path, group_name, filters=tables.Filters(complevel=1))
    for col_name in tb.colnames:
        logger.debug("Processing %s", col_name)
        # Get the data.
        col_data = tb.col(col_name)
        # Create the output array.
        arr = h_out.createCArray(grp,
                                 col_name,
                                 tables.Atom.from_dtype(col_data.dtype),
                                 col_data.shape)
        # Store the data.
        arr[:] = col_data
    h_out.flush()

再读回来看起来像:

def read_hdf5(hdf5_path, group_path="/data", columns=None):
    """Read a transposed data set from a HDF5 file."""
    if isinstance(hdf5_path, tables.file.File):
        hf = hdf5_path
    else:
        hf = tables.openFile(hdf5_path)

    grp = hf.getNode(group_path)
    if columns is None:
        data = [(child.name, child[:]) for child in grp]
    else:
        data = [(child.name, child[:]) for child in grp if child.name in columns]

    # Convert any float32 columns to float64 for processing.
    for i in range(len(data)):
        name, vec = data[i]
        if vec.dtype == np.float32:
            data[i] = (name, vec.astype(np.float64))

    if not isinstance(hdf5_path, tables.file.File):
        hf.close()
    return pd.DataFrame.from_items(data)

现在,我通常在一台拥有大量内存的机器上运行它,所以我可能没有足够小心处理内存使用情况。 例如,默认情况下,加载操作读取整个数据集。

这通常适用于我,但它有点笨重,我不能使用花式的pytables魔法。

编辑:这种方法的真正优点,在数组记录pytables的默认情况下,我可以使用h5r将数据加载到R中,而无法处理表格。 或者,至少,我一直无法让它加载异构表。


我知道这是一个古老的线程,但我认为Blaze库值得一试。 它适用于这些类型的情况。

从文档:

Blaze将NumPy和Pandas的可用性扩展到分布式和非核心计算。 Blaze提供了一个类似于NumPy ND-Array或Pandas DataFrame的界面,但将这些熟悉的界面映射到各种其他计算引擎,如Postgres或Spark。

编辑:顺便说一句,它由NumPy的作者ContinuumIO和Travis Oliphant支持。


我经常以这种方式使用几十千兆字节的数据,例如我通过查询读取磁盘上的表格,创建数据并追加回去。

这是值得阅读的文档,在这个线程的最后几个建议如何存储您的数据。

会影响您存储数据的详细信息,如:
尽可能多地提供细节; 我可以帮你开发一个结构。

  1. 数据大小,行数,列数,列类型; 你是追加行还是只追加列?
  2. 典型的操作将会是什么样子。 例如,对列进行查询以选择一堆行和特定列,然后执行操作(内存中),创建新列,保存这些列。
    (给出一个玩具的例子可以使我们提供更具体的建议。)
  3. 处理完之后,你会做什么? 第2步是临时的还是可重复的?
  4. 输入平面文件:在Gb中有多少个粗略的总大小。 这些如何组织如记录? 每个文件是否包含不同的字段,或者每个文件是否包含每个文件中的所有字段的记录?
  5. 你是否曾经根据标准选择行(记录)的子集(例如,选择字段A> 5的行)? 然后做一些事情,或者你只是选择字段A,B,C与所有的记录(然后做一些事情)?
  6. 你是否在“所有专栏(分组)中工作”,或者是否有很好的比例,你只能用于报告(例如,你想保留数据,但不需要在列中显示直到最终结果时间)?

确保你的熊猫至少安装了0.10.1

块逐块多表查询 迭代文件

由于pytables经过了优化,可以按行进行操作(这是您查询的内容),我们将为每组字段创建一个表。 通过这种方式,很容易选择一小组字段(这将与一张大桌子一起工作,但以这种方式更有效率......我想我可能能够在未来解决这个限制......这是无论如何更直观):
(以下是伪代码。)

import numpy as np
import pandas as pd

# create a store
store = pd.HDFStore('mystore.h5')

# this is the key to your storage:
#    this maps your fields to a specific group, and defines 
#    what you want to have as data_columns.
#    you might want to create a nice class wrapping this
#    (as you will want to have this map and its inversion)  
group_map = dict(
    A = dict(fields = ['field_1','field_2',.....], dc = ['field_1',....,'field_5']),
    B = dict(fields = ['field_10',......        ], dc = ['field_10']),
    .....
    REPORTING_ONLY = dict(fields = ['field_1000','field_1001',...], dc = []),

)

group_map_inverted = dict()
for g, v in group_map.items():
    group_map_inverted.update(dict([ (f,g) for f in v['fields'] ]))

读入文件并创建存储(主要是做append_to_multiple操作):

for f in files:
   # read in the file, additional options hmay be necessary here
   # the chunksize is not strictly necessary, you may be able to slurp each 
   # file into memory in which case just eliminate this part of the loop 
   # (you can also change chunksize if necessary)
   for chunk in pd.read_table(f, chunksize=50000):
       # we are going to append to each table by group
       # we are not going to create indexes at this time
       # but we *ARE* going to create (some) data_columns

       # figure out the field groupings
       for g, v in group_map.items():
             # create the frame for this group
             frame = chunk.reindex(columns = v['fields'], copy = False)    

             # append it
             store.append(g, frame, index=False, data_columns = v['dc'])

现在你拥有了文件中的所有表格(实际上,如果你愿意,你可以将它们存储在单独的文件中,你可能需要将文件名添加到group_map中,但这可能不是必需的)。

这是你如何获得专栏并创建新专栏:

frame = store.select(group_that_I_want)
# you can optionally specify:
# columns = a list of the columns IN THAT GROUP (if you wanted to
#     select only say 3 out of the 20 columns in this sub-table)
# and a where clause if you want a subset of the rows

# do calculations on this frame
new_frame = cool_function_on_frame(frame)

# to 'add columns', create a new group (you probably want to
# limit the columns in this new_group to be only NEW ones
# (e.g. so you don't overlap from the other tables)
# add this info to the group_map
store.append(new_group, new_frame.reindex(columns = new_columns_created, copy = False), data_columns = new_columns_created)

当你准备好post_processing时:

# This may be a bit tricky; and depends what you are actually doing.
# I may need to modify this function to be a bit more general:
report_data = store.select_as_multiple([groups_1,groups_2,.....], where =['field_1>0', 'field_1000=foo'], selector = group_1)

关于data_columns,您实际上并不需要定义任何 data_columns; 它们允许您基于列来选择行。 例如:

store.select(group, where = ['field_1000=foo', 'field_1001>0'])

在最终报告生成阶段,它们可能对你最有意思(本质上,数据列与其他列分隔,如果定义了很多,这可能会影响效率)。

你也可能想要:

  • 创建一个接收字段列表的函数,在groups_map中查找组,然后选择这些并连接结果,以便获得结果帧(这实际上是select_as_multiple所做的)。 这样的结构对你来说是非常透明的。
  • 某些数据列上的索引(使行子集更快)。
  • 启用压缩。

当你有问题时让我知道!


正如其他人所指出的,几年后,出现了一种“核心外”的熊猫daskdask 。 尽管dask不是熊猫及其所有功能的直接替代品,但出于以下原因:

Dask是一款灵活的分析计算并行计算库,针对动态任务调度进行了优化,适用于“大数据”集合的交互式计算工作负载,如并行数组,数据框和列表,这些工具将NumPy,Pandas或Python迭代器等常见接口扩展到大型数据库超出内存或分布式环境,并从笔记本电脑扩展到集群。

达斯克强调以下美德:

  • 熟悉:提供并行化的NumPy数组和Pandas DataFrame对象
  • 灵活:为更多自定义工作负载和与其他项目的集成提供任务调度接口。
  • 本机:通过访问PyData堆栈在Pure Python中启用分布式计算。
  • 快速:以低开销,低延迟和快速数值算法所需的最小序列化操作
  • 按比例缩放:在具有1000个核心的群集上弹性运行缩小比例:在单个进程中设置并在笔记本电脑上运行并不重要
  • 响应式:设计时考虑到交互式计算,它提供快速反馈和诊断以帮助人类

并添加一个简单的代码示例:

import dask.dataframe as dd
df = dd.read_csv('2015-*-*.csv')
df.groupby(df.user_id).value.mean().compute()

替换一些像这样的熊猫代码:

import pandas as pd
df = pd.read_csv('2015-01-01.csv')
df.groupby(df.user_id).value.mean()

特别值得注意的是,通过concurrent.futures界面提供了一个提交自定义任务的概要:

from dask.distributed import Client
client = Client('scheduler:port')

futures = []
for fn in filenames:
    future = client.submit(load, fn)
    futures.append(future)

summary = client.submit(summarize, futures)
summary.result()

考虑一下Ruffus是否可以创建一个数据管道的简单路径,该管道可以分解为多个较小的文件。


这里Ray值得一提的是,
它是一个分布式计算框架,它拥有分布式自己的熊猫实现。

只需更换熊猫导入,代码应该按原样工作:

# import pandas as pd
import ray.dataframe as pd

#use pd as usual

可以在这里阅读更多细节:

https://rise.cs.berkeley.edu/blog/pandas-on-ray/







large-data