How to parallelize a sum calculation in python numpy?
I'm not sure I understand the problem. Are you just trying to partition a list onto a pool of workers, have them keep a running sum of their computations, and sum the result?
#!/bin/env python import sys import random import time import multiprocessing import numpy as np numpows = 5 numitems = 25 nprocs = 4 def expensiveComputation( i ): time.sleep( random.random() * 10 ) return np.array([i**j for j in range(numpows)]) def listsum( l ): sum = np.zeros_like(l) for item in l: sum = sum + item return sum def partition(lst, n): division = len(lst) / float(n) return [ lst[int(round(division * i)): int(round(division * (i + 1)))] for i in xrange(n) ] def myRunningSum( l ): sum = np.zeros(numpows) for item in l: sum = sum + expensiveComputation(item) return sum if __name__ == '__main__': random.seed(1) data = range(numitems) pool = multiprocessing.Pool(processes=4,) calculations = pool.map(myRunningSum, partition(data,nprocs)) print 'Answer is:', listsum(calculations) print 'Expected answer: ', np.array([25.,300.,4900.,90000.,1763020.])
(the partition function coming from Python: Slicing a list into n nearly-equal-length partitions )
I have a sum that I'm trying to compute, and I'm having difficulty parallelizing the code. The calculation I'm trying to parallelize is kind of complex (it uses both numpy arrays and scipy sparse matrices). It spits out a numpy array, and I want to sum the output arrays from about 1000 calculations. Ideally, I would keep a running sum over all the iterations. However, I haven't been able to figure out how to do this.
So far, I've tried using joblib's Parallel function and the pool.map function with python's multiprocessing package. For both of these, I use an inner function that returns a numpy array. These functions return a list, which I convert to a numpy array and then sum over.
However, after the joblib Parallel function completes all iterations, the main program never continues running (it looks like the original job is in a suspended state, using 0% CPU). When I use pool.map, I get memory errors after all the iterations are complete.
Is there a way to simply parallelize a running sum of arrays?
Edit: The goal is to do something like the following, except in parallel.
def summers(num_iters): sumArr = np.zeros((1,512*512)) #initialize sum for index in range(num_iters): sumArr = sumArr + computation(index) #computation returns a 1 x 512^2 numpy array return sumArr
Efficient Double Sum of Products
Rearrange the operation into an O(n) runtime algorithm instead of O(n^2), and take advantage of NumPy for the products and sums:
# arr1_weights[i] is the sum of all terms arr1[i] gets multiplied by in the # original version arr1_weights = arr2[::-1].cumsum()[::-1] - arr2 sum_prods = arr1.dot(arr1_weights)
Timing shows this to be about 200 times faster than the list comprehension for
n == 100.
In : %%timeit ....: np.sum([arr1[i] * arr2[j] for i in range(n) for j in range(i+1, n)]) ....: 100 loops, best of 3: 5.13 ms per loop In : %%timeit ....: arr1_weights = arr2[::-1].cumsum()[::-1] - arr2 ....: sum_prods = arr1.dot(arr1_weights) ....: 10000 loops, best of 3: 22.8 µs per loop
Here's a version that's similar to Daniel's: it divides as evenly as possible, but puts all the larger partitions at the start:
def partition(lst, n): q, r = divmod(len(lst), n) indices = [q*i + min(i, r) for i in xrange(n+1)] return [lst[indices[i]:indices[i+1]] for i in xrange(n)]
It also avoids the use of float arithmetic, since that always makes me uncomfortable. :)
Edit: an example, just to show the contrast with Daniel Stutzbach's solution
>>> print [len(x) for x in partition(range(105), 10)] [11, 11, 11, 11, 11, 10, 10, 10, 10, 10]
f is only executed in ONE of the workers of the pool. So ONE of the processes in the pool will run
pool.map(f, iterable): This method chops the iterable into a number of chunks which it submits to the process pool as separate tasks. So you take advantage of all the processes in the pool.
You can use the following broadcasting trick:
a = np.sum(np.triu(arr1[:,None]*arr2[None,:],1)) b = np.sum( [arr1[i]*arr2[j] for i in xrange(n) for j in xrange(i+1, n)] ) print a == b # True
Basically, I'm paying the price of calculating the product of all elements pairwise in
arr2 to take advantage of the speed of numpy broadcasting/vectorization being done much faster in low-level code.
%timeit np.sum(np.triu(arr1[:,None]*arr2[None,:],1)) 10000 loops, best of 3: 55.9 µs per loop %timeit np.sum( [arr1[i]*arr2[j] for i in xrange(n) for j in xrange(i+1, n)] ) 1000 loops, best of 3: 1.45 ms per loop