python - pyspark dataframe foreach




使用连接时,Spark迭代时间呈指数增长 (2)

Rdds是不可变的。 尝试做 rdd = rdd.cache()

我对Spark还是很陌生,我正在尝试实现一些迭代算法,以马尔可夫模型表示的质心进行聚类(期望最大化)。 因此,我需要进行迭代和联接。

我遇到的一个问题是每次迭代的时间呈指数增长。
经过一些试验,我发现进行迭代时需要保留将在下一次迭代中重用的RDD,否则每次迭代火花都会创建执行计划,该计划将从开始重新计算RDD,从而增加了计算时间。

init = sc.parallelize(xrange(10000000), 3)
init.cache()

for i in range(6):
    print i
    start = datetime.datetime.now()

    init2 = init.map(lambda n: (n, n*3))        
    init = init2.map(lambda n: n[0])
#     init.cache()

    print init.count()    
    print str(datetime.datetime.now() - start)

结果是:

0
10000000
0:00:04.283652
1
10000000
0:00:05.998830
2
10000000
0:00:08.771984
3
10000000
0:00:11.399581
4
10000000
0:00:14.206069
5
10000000
0:00:16.856993

因此,添加cache()可以帮助并使迭代时间变得恒定。

init = sc.parallelize(xrange(10000000), 3)
init.cache()

for i in range(6):
    print i
    start = datetime.datetime.now()

    init2 = init.map(lambda n: (n, n*3))        
    init = init2.map(lambda n: n[0])
    init.cache()

    print init.count()    
    print str(datetime.datetime.now() - start)
0
10000000
0:00:04.966835
1
10000000
0:00:04.609885
2
10000000
0:00:04.324358
3
10000000
0:00:04.248709
4
10000000
0:00:04.218724
5
10000000
0:00:04.223368

但是当在迭代中加入Join时,问题又回来了。 这是我演示问题的一些简单代码。 即使在每个RDD转换上进行缓存也不能解决问题:

init = sc.parallelize(xrange(10000), 3)
init.cache()

for i in range(6):
    print i
    start = datetime.datetime.now()

    init2 = init.map(lambda n: (n, n*3))
    init2.cache()

    init3 = init.map(lambda n: (n, n*2))
    init3.cache()

    init4 = init2.join(init3)
    init4.count()
    init4.cache()

    init = init4.map(lambda n: n[0])
    init.cache()

    print init.count()    
    print str(datetime.datetime.now() - start)

这是输出。 如您所见,迭代时间呈指数增长:(

0
10000
0:00:00.674115
1
10000
0:00:00.833377
2
10000
0:00:01.525314
3
10000
0:00:04.194715
4
10000
0:00:08.139040
5
10000
0:00:17.852815

我将非常感谢您的帮助:)


问题是(如zero323在其详尽的回答中所指出的),未指定分区数量而调用join可能会(确实)导致分区数量的增加。 分区的数量可以无限增加(显然)。 重复调用join时,有(至少)两种方法可以防止分区数量增加(无限制)。

方法1:

如zero323所指出的,您可以在调用join时手动指定分区数。 例如

rdd1.join(rdd2, numPartitions)

这将确保分区的数量不超过numPartitions,尤其是分区的数量不会持续增长。

方法2:

创建SparkConf时,可以指定默认的并行度。 如果设置了此值,则在不指定numPartitions的情况下调用诸如 join 函数时,将使用默认的并行性,从而有效地限制了分区的数量并防止它们增长。 您可以将此参数设置为

conf=SparkConf.set("spark.default.parallelism", numPartitions)
sc = SparkContex(conf=conf)   




pyspark