for - How to parallelize a sum calculation in python numpy?





loop while (9)


Python: Slicing a list into n nearly-equal-length partitions

def partition(lst, n):
    division = len(lst) / float(n)
    return [ lst[int(round(division * i)): int(round(division * (i + 1)))] for i in xrange(n) ]

>>> partition([1,2,3,4,5],5)
[[1], [2], [3], [4], [5]]
>>> partition([1,2,3,4,5],2)
[[1, 2, 3], [4, 5]]
>>> partition([1,2,3,4,5],3)
[[1, 2], [3, 4], [5]]
>>> partition(range(105), 10)
[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10], [11, 12, 13, 14, 15, 16, 17, 18, 19, 20], [21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31], [32, 33, 34, 35, 36, 37, 38, 39, 40, 41], [42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52], [53, 54, 55, 56, 57, 58, 59, 60, 61, 62], [63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73], [74, 75, 76, 77, 78, 79, 80, 81, 82, 83], [84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94], [95, 96, 97, 98, 99, 100, 101, 102, 103, 104]]

Python 3 version:

def partition(lst, n):
    division = len(lst) / n
    return [lst[round(division * i):round(division * (i + 1))] for i in range(n)]

I have a sum that I'm trying to compute, and I'm having difficulty parallelizing the code. The calculation I'm trying to parallelize is kind of complex (it uses both numpy arrays and scipy sparse matrices). It spits out a numpy array, and I want to sum the output arrays from about 1000 calculations. Ideally, I would keep a running sum over all the iterations. However, I haven't been able to figure out how to do this.

So far, I've tried using joblib's Parallel function and the pool.map function with python's multiprocessing package. For both of these, I use an inner function that returns a numpy array. These functions return a list, which I convert to a numpy array and then sum over.

However, after the joblib Parallel function completes all iterations, the main program never continues running (it looks like the original job is in a suspended state, using 0% CPU). When I use pool.map, I get memory errors after all the iterations are complete.

Is there a way to simply parallelize a running sum of arrays?

Edit: The goal is to do something like the following, except in parallel.

def summers(num_iters):

    sumArr = np.zeros((1,512*512)) #initialize sum
    for index in range(num_iters):
        sumArr = sumArr + computation(index) #computation returns a 1 x 512^2 numpy array

    return sumArr



Just a different take, that only works if [[1,3,5],[2,4]] is an acceptable partition, in your example.

def partition ( lst, n ):
    return [ lst[i::n] for i in xrange(n) ]

This satisfies the example mentioned in @Daniel Stutzbach's example:

partition(range(105),10)
# [[0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100],
# [1, 11, 21, 31, 41, 51, 61, 71, 81, 91, 101],
# [2, 12, 22, 32, 42, 52, 62, 72, 82, 92, 102],
# [3, 13, 23, 33, 43, 53, 63, 73, 83, 93, 103],
# [4, 14, 24, 34, 44, 54, 64, 74, 84, 94, 104],
# [5, 15, 25, 35, 45, 55, 65, 75, 85, 95],
# [6, 16, 26, 36, 46, 56, 66, 76, 86, 96],
# [7, 17, 27, 37, 47, 57, 67, 77, 87, 97],
# [8, 18, 28, 38, 48, 58, 68, 78, 88, 98],
# [9, 19, 29, 39, 49, 59, 69, 79, 89, 99]]



Here's a version that's similar to Daniel's: it divides as evenly as possible, but puts all the larger partitions at the start:

def partition(lst, n):
    q, r = divmod(len(lst), n)
    indices = [q*i + min(i, r) for i in xrange(n+1)]
    return [lst[indices[i]:indices[i+1]] for i in xrange(n)]

It also avoids the use of float arithmetic, since that always makes me uncomfortable. :)

Edit: an example, just to show the contrast with Daniel Stutzbach's solution

>>> print [len(x) for x in partition(range(105), 10)]
[11, 11, 11, 11, 11, 10, 10, 10, 10, 10]



I'm not sure I understand the problem. Are you just trying to partition a list onto a pool of workers, have them keep a running sum of their computations, and sum the result?

#!/bin/env python
import sys
import random
import time
import multiprocessing
import numpy as np

numpows = 5
numitems = 25
nprocs = 4

def expensiveComputation( i ):
  time.sleep( random.random() * 10 )
  return np.array([i**j for j in range(numpows)])

def listsum( l ):
  sum = np.zeros_like(l[0])
  for item in l:
    sum = sum + item
  return sum

def partition(lst, n):
  division = len(lst) / float(n)
  return [ lst[int(round(division * i)): int(round(division * (i + 1)))] for i in xrange(n) ]

def myRunningSum( l ):
  sum = np.zeros(numpows)
  for item in l:
     sum = sum + expensiveComputation(item)
  return sum

if __name__ == '__main__':

  random.seed(1)
  data = range(numitems)

  pool = multiprocessing.Pool(processes=4,)
  calculations = pool.map(myRunningSum, partition(data,nprocs))

  print 'Answer is:', listsum(calculations)
  print 'Expected answer: ', np.array([25.,300.,4900.,90000.,1763020.])

(the partition function coming from Python: Slicing a list into n nearly-equal-length partitions )




A vectorized way : np.sum(np.triu(np.multiply.outer(arr1,arr2),1)).

for a 30x improvement:

In [9]: %timeit np.sum(np.triu(np.multiply.outer(arr1,arr2),1))
1000 loops, best of 3: 272 µs per loop

In [10]: %timeit np.sum( [arr1[i]*arr2[j] for i in range(n) 
                         for j in range(i+1, n)]
100 loops, best of 3: 7.9 ms per loop

In [11]: allclose(np.sum(np.triu(np.multiply.outer(arr1,arr2),1)),
np.sum(np.triu(np.multiply.outer(arr1,arr2),1)))
Out[11]: True

Another fast approch is to use numba :

from numba import jit
@jit
def t(arr1,arr2):
    s=0
    for i in range(n):
        for j in range(i+1,n):
            s+= arr1[i]*arr2[j]
    return s

for a 10x new factor :

In [12]: %timeit t(arr1,arr2)
10000 loops, best of 3: 21.1 µs per loop

And using @user2357112 minimal answer,

@jit
def t2357112(arr1,arr2):
    s=0
    c=0
    for i in range(n-2,-1,-1):
        c += arr2[i+1]
        s += arr1[i]*c
    return s

for

In [13]: %timeit t2357112(arr1,arr2)
100000 loops, best of 3: 2.33 µs per loop

, just doing the necessary operations.




I figured out how to do parallelize a sum of arrays with multiprocessing, apply_async, and callbacks, so I'm posting this here for other people. I used the example page for Parallel Python for the Sum callback class, although I did not actually use that package for implementation. It gave me the idea of using callbacks, though. Here's the simplified code for what I ended up using, and it does what I wanted it to do.

import multiprocessing
import numpy as np
import thread

class Sum: #again, this class is from ParallelPython's example code (I modified for an array and added comments)
    def __init__(self):
        self.value = np.zeros((1,512*512)) #this is the initialization of the sum
        self.lock = thread.allocate_lock()
        self.count = 0

    def add(self,value):
        self.count += 1
        self.lock.acquire() #lock so sum is correct if two processes return at same time
        self.value += value #the actual summation
        self.lock.release()

def computation(index):
    array1 = np.ones((1,512*512))*index #this is where the array-returning computation goes
    return array1

def summers(num_iters):
    pool = multiprocessing.Pool(processes=8)

    sumArr = Sum() #create an instance of callback class and zero the sum
    for index in range(num_iters):
        singlepoolresult = pool.apply_async(computation,(index,),callback=sumArr.add)

    pool.close()
    pool.join() #waits for all the processes to finish

    return sumArr.value

I was also able to get this working using a parallelized map, which was suggested in another answer. I had tried this earlier, but I wasn't implementing it correctly. Both ways work, and I think this answer explains the issue of which method to use (map or apply.async) pretty well. For the map version, you don't need to define the class Sum and the summers function becomes

def summers(num_iters):
    pool = multiprocessing.Pool(processes=8)

    outputArr = np.zeros((num_iters,1,512*512)) #you wouldn't have to initialize these
    sumArr = np.zeros((1,512*512))              #but I do to make sure I have the memory

    outputArr = np.array(pool.map(computation, range(num_iters)))
    sumArr = outputArr.sum(0)

    pool.close() #not sure if this is still needed since map waits for all iterations

    return sumArr



Regarding apply vs map:

pool.apply(f, args): f is only executed in ONE of the workers of the pool. So ONE of the processes in the pool will run f(args).

pool.map(f, iterable): This method chops the iterable into a number of chunks which it submits to the process pool as separate tasks. So you take advantage of all the processes in the pool.




Efficient Double Sum of Products

Rearrange the operation into an O(n) runtime algorithm instead of O(n^2), and take advantage of NumPy for the products and sums:

# arr1_weights[i] is the sum of all terms arr1[i] gets multiplied by in the
# original version
arr1_weights = arr2[::-1].cumsum()[::-1] - arr2

sum_prods = arr1.dot(arr1_weights)

Timing shows this to be about 200 times faster than the list comprehension for n == 100.

In [21]: %%timeit
   ....: np.sum([arr1[i] * arr2[j] for i in range(n) for j in range(i+1, n)])
   ....: 
100 loops, best of 3: 5.13 ms per loop

In [22]: %%timeit
   ....: arr1_weights = arr2[::-1].cumsum()[::-1] - arr2
   ....: sum_prods = arr1.dot(arr1_weights)
   ....: 
10000 loops, best of 3: 22.8 µs per loop



In Python there are two simple ways you can achieve this:

The Pythonic way: Using Python's 'in' Keyword-

in takes two "arguments", one on the left(substring) and one on the right, and returns True if the left argument is contained within the rightside argument and if not,it returns False.

example_string = "This is an example string"
substring = "example"
print(substring in example_string)

Output:

True

The non-Pythonic way: Using Python's str.find:

The find method returns the position of the string within the string or -1 if it's not found. But simply check if the position is not -1.

if example_string.find(substring) != -1:
    print('Substring found!')
else:
    print('Substring not found!')

Output:

Substring found!