python - Доступ к соседним ячейкам для массива NumPy




(6)

Анализируя проблему на более мелкие, мы видим, что решение actully @jakevdp делает свою работу, но забывает о проверке термина mask<0.25 после свертки с маской, чтобы некоторые значения могли позже отстать от 0.25 (возможно, 8 тестов для каждого пиксель), поэтому должен быть цикл for, если только нет встроенной функции, о которой я не слышал ..

Вот мое предложение:

# x or y first depends if u want rows or cols , .. different results
for x in range(arr.shape[1]-3):
    for y in range(arr.shape[0]-3):
        k = arr[y:y+3,x:x+3]
        arr[y:y+3,x:x+3] = k/10**(k>0.25)

Как я могу эффективно получить доступ и изменить 8 окружающих ячеек для двумерного массива пустышек?

У меня есть двумерный массив, как это:

arr = np.random.rand(720, 1440)

Для каждой ячейки сетки я хочу уменьшить на 10% центральную ячейку, окружающие 8 ячеек (меньше для угловых ячеек), но только если значение окружающей ячейки превышает 0,25. Я подозреваю, что единственный способ сделать это - использовать цикл for, но хотел бы посмотреть, есть ли лучшие / более быстрые решения.

- РЕДАКТИРОВАТЬ: для цикла на основе Soln

arr = np.random.rand(720, 1440)

for (x, y), value in np.ndenumerate(arr):
    # Find 10% of current cell
    reduce_by = value * 0.1

    # Reduce the nearby 8 cells by 'reduce_by' but only if the cell value exceeds 0.25
    # [0] [1] [2]
    # [3] [*] [5]
    # [6] [7] [8]
    # * refers to current cell

    # cell [0]
    arr[x-1][y+1] = arr[x-1][y+1] * reduce_by if arr[x-1][y+1] > 0.25 else arr[x-1][y+1]

    # cell [1]
    arr[x][y+1] = arr[x][y+1] * reduce_by if arr[x][y+1] > 0.25 else arr[x][y+1]

    # cell [2]
    arr[x+1][y+1] = arr[x+1][y+1] * reduce_by if arr[x+1][y+1] > 0.25 else arr[x+1][y+1]

    # cell [3]
    arr[x-1][y] = arr[x-1][y] * reduce_by if arr[x-1][y] > 0.25 else arr[x-1][y]

    # cell [4] or current cell
    # do nothing

    # cell [5]
    arr[x+1][y] = arr[x+1][y] * reduce_by if arr[x+1][y] > 0.25 else arr[x+1][y]

    # cell [6]
    arr[x-1][y-1] = arr[x-1][y-1] * reduce_by if arr[x-1][y-1] > 0.25 else arr[x-1][y-1]

    # cell [7]
    arr[x][y-1] = arr[x][y-1] * reduce_by if arr[x][y-1] > 0.25 else arr[x][y-1]

    # cell [8]
    arr[x+1][y-1] = arr[x+1][y-1] * reduce_by if arr[x+1][y-1] > 0.25 else arr[x+1][y-1]

Ваш размер массива является типичным размером экрана, поэтому я предполагаю, что ячейки представляют собой значения пикселей в диапазоне [0, 1). Теперь значения пикселей никогда не умножаются друг на друга. Если бы они были, операции будут зависеть от диапазона (например, [0, 1) или [0, 255]), но они никогда не делают. Поэтому я бы предположил, что когда вы говорите «уменьшить на 10% ячейки», вы имеете в виду «вычесть 10% ячейки». Но даже в этом случае операция остается зависимой от порядка, в котором она применяется к ячейкам, поскольку обычный способ сначала вычислить общее изменение ячейки, а затем применить его (как в свертке) приведет к тому, что некоторые значения ячейки станут отрицательными ( например, 0,251 - 8 * 0,1 * 0,999), что не имеет смысла, если они являются пикселями.

Позвольте мне сейчас предположить, что вы действительно хотите умножить ячейки друг на друга и на коэффициент, и что вы хотите сделать это, сначала применив к каждой ячейке свой соседний номер 0 (ваша нумерация), затем соседний номер 1, и так далее для соседей с номерами 2, 3, 5, 7 и 8. Как правило, такой вид операций легче определить с «точки зрения» целевых ячеек, чем с точки зрения исходных ячеек. Поскольку numpy быстро работает с полными массивами (или их представлениями), способ сделать это - сместить всех соседей в положение ячейки, которая должна быть изменена. У Numpy нет функции shift() , но есть функция roll() которая для наших целей так же хороша, потому что нас не волнуют граничные ячейки, которые, согласно вашему комментарию, могут быть восстановлены до исходного значения как последний шаг. Вот код:

import numpy as np

arr = np.random.rand(720, 1440)
threshold = 0.25
factor    = 0.1
#                                                0 1 2
#                                    neighbors:  3   5
#                                                6 7 8
#                                                       ∆y  ∆x    axes
arr0 = np.where(arr  > threshold, arr  * np.roll(arr,   (1,  1), (0, 1)) * factor, arr)
arr1 = np.where(arr0 > threshold, arr0 * np.roll(arr0,   1,       0    ) * factor, arr0)
arr2 = np.where(arr1 > threshold, arr1 * np.roll(arr1,  (1, -1), (0, 1)) * factor, arr1)
arr3 = np.where(arr2 > threshold, arr2 * np.roll(arr2,       1,      1 ) * factor, arr2)
arr5 = np.where(arr3 > threshold, arr3 * np.roll(arr3,      -1,      1 ) * factor, arr3)
arr6 = np.where(arr5 > threshold, arr5 * np.roll(arr5, (-1,  1), (0, 1)) * factor, arr5)
arr7 = np.where(arr6 > threshold, arr6 * np.roll(arr6,  -1,       0    ) * factor, arr6)
res  = np.where(arr7 > threshold, arr7 * np.roll(arr7, (-1, -1), (0, 1)) * factor, arr7)
# fix the boundary:
res[:,  0] = arr[:,  0]
res[:, -1] = arr[:, -1]
res[ 0, :] = arr[ 0, :]
res[-1, :] = arr[-1, :]

Обратите внимание, что несмотря на это, основные шаги отличаются от того, что вы делаете в своем решении. Но они обязательно есть, потому что переписывание вашего решения в numpy приведет к тому, что массивы будут считываться и записываться в одной и той же операции, и это не то, что numpy может сделать предсказуемым образом.

Если вам нужно передумать и решить вычитать вместо умножения, вам нужно всего лишь изменить столбец * s перед np.roll на столбец - s. Но это будет только первый шаг в направлении правильной свертки (обычная и важная операция с 2D-изображениями), для которой вам, однако, потребуется полностью переформулировать свой вопрос.

Два примечания: в вашем примере кода вы индексировали массив как arr[x][y] , но по умолчанию в numy массивах крайний левый индекс является наиболее медленно меняющимся, то есть в 2D вертикальный индекс, так что правильная индексация: arr[y][x] . Это подтверждается порядком размеров вашего массива. Во-вторых, на изображениях, в матрицах и в куске вертикальный размер обычно представлен в виде увеличения вниз. Это заставляет вашу нумерацию соседей отличаться от моей. Просто умножьте вертикальные сдвиги на -1, если это необходимо.

РЕДАКТИРОВАТЬ

Вот альтернативная реализация, которая дает точно такие же результаты. Это немного быстрее, но модифицирует массив на месте:

arr[1:-1, 1:-1] = np.where(arr[1:-1, 1:-1] > threshold, arr[1:-1, 1:-1] * arr[ :-2,  :-2] * factor, arr[1:-1, 1:-1])
arr[1:-1, 1:-1] = np.where(arr[1:-1, 1:-1] > threshold, arr[1:-1, 1:-1] * arr[ :-2, 1:-1] * factor, arr[1:-1, 1:-1])
arr[1:-1, 1:-1] = np.where(arr[1:-1, 1:-1] > threshold, arr[1:-1, 1:-1] * arr[ :-2, 2:  ] * factor, arr[1:-1, 1:-1])
arr[1:-1, 1:-1] = np.where(arr[1:-1, 1:-1] > threshold, arr[1:-1, 1:-1] * arr[1:-1,  :-2] * factor, arr[1:-1, 1:-1])
arr[1:-1, 1:-1] = np.where(arr[1:-1, 1:-1] > threshold, arr[1:-1, 1:-1] * arr[1:-1, 2:  ] * factor, arr[1:-1, 1:-1])
arr[1:-1, 1:-1] = np.where(arr[1:-1, 1:-1] > threshold, arr[1:-1, 1:-1] * arr[2:  ,  :-2] * factor, arr[1:-1, 1:-1])
arr[1:-1, 1:-1] = np.where(arr[1:-1, 1:-1] > threshold, arr[1:-1, 1:-1] * arr[2:  , 1:-1] * factor, arr[1:-1, 1:-1])
arr[1:-1, 1:-1] = np.where(arr[1:-1, 1:-1] > threshold, arr[1:-1, 1:-1] * arr[2:  , 2:  ] * factor, arr[1:-1, 1:-1])

Невозможно избежать цикла, потому что сокращение выполняется последовательно, а не параллельно.

Вот моя реализация. Для каждого (i,j) создайте вид блока 3x3 a центром в точке a[i,j] (значение, которое я временно установил на 0, чтобы оно было ниже порогового значения, поскольку мы не хотим его уменьшать) , Для (i,j) на границе блок равен 2x2 в углах и 2x3 или 3x2 в других местах. Затем блок маскируется порогом, а немаскированные элементы умножаются на a_ij*0.1 .

def reduce(a, threshold=0.25, r=0.1):
    for (i, j), a_ij in np.ndenumerate(a):
        a[i,j] = 0       
        block = a[0 if i == 0 else (i-1):i+2, 0 if j == 0 else (j-1):j+2]   
        np.putmask(block, block>threshold, block*a_ij*r)  
        a[i,j] = a_ij   
    return a

Обратите внимание, что сокращение также выполняется с граничных ячеек на окружающие их ячейки, т.е. цикл начинается с первого угла массива, a[0, 0] который имеет 3 соседей: a[0,1] , a[1,0] и a[1,1] , которые уменьшаются на a[0,0]*0.1 если они> 0,25. Затем он переходит к ячейке a[0,1] которая имеет 5 соседей и т. Д. Если вы хотите работать строго с ячейками, которые имеют 8 соседей, т.е. с окном размером 3x3, цикл должен перейти от a[1,1] к a[-2, -2] , и функцию следует изменить следующим образом:

def reduce_(a, threshold=0.25, r=0.1):
    ''' without borders -- as in OP's solution'''
    for (i, j), a_ij in np.ndenumerate(a[1:-1,1:-1]):
        block = a[i:i+3, j:j+3]
        mask = ~np.diag([False, True, False])*(block > threshold)
        np.putmask(block, mask, block*a_ij*r)   
    return a

Пример:

>>> a = np.random.rand(4, 4)
array([[0.55197876, 0.95840616, 0.88332771, 0.97894739],
       [0.06717366, 0.39165116, 0.10248439, 0.42335457],
       [0.73611318, 0.09655115, 0.79041814, 0.40971255],
       [0.34336608, 0.39239233, 0.14236677, 0.92172401]])

>>> reduce(a.copy())    
array([[0.00292008, 0.05290198, 0.00467298, 0.00045746],
       [0.06717366, 0.02161831, 0.10248439, 0.00019783],
       [0.00494474, 0.09655115, 0.00170875, 0.00419891],
       [0.00016979, 0.00019403, 0.14236677, 0.0001575 ]])

>>> reduce_(a.copy())
array([[0.02161831, 0.03753609, 0.03459563, 0.01003268],
       [0.06717366, 0.00401381, 0.10248439, 0.00433872],
       [0.02882996, 0.09655115, 0.03095682, 0.00419891],
       [0.00331524, 0.00378859, 0.14236677, 0.00285336]])

Еще один пример для массива 3х2:

>>> a = np.random.rand(3, 2)
array([[0.17246979, 0.42743388],
       [0.1911065 , 0.41250723],
       [0.73389051, 0.22333497]])

>>> reduce(a.copy())
array([[0.17246979, 0.00737194],
       [0.1911065 , 0.0071145 ],
       [0.01402513, 0.22333497]])

>>> reduce_(a.copy())  # same as a because there are no cells with 8 neighbors
array([[0.17246979, 0.42743388],
       [0.1911065 , 0.41250723],
       [0.73389051, 0.22333497]])

Нет необходимости в циклах, избегайте обычных петель Python, они очень медленные. Для большей эффективности полагайтесь на встроенную матричную работу numpy, «универсальные» функции, фильтры, маски и условия, когда это возможно. https://realpython.com/numpy-array-programmin Для сложных вычислений векторизация не так уж плоха, посмотрите некоторые диаграммы и тесты. Самый эффективный способ отобразить функцию на массив numpy (просто не используйте ее для более простых операций с матрицами, таких как квадратирование ячеек Встроенные функции будут работать быстрее)

Легко видеть, что каждая внутренняя ячейка будет умножена на 0,9 до 8 раз из-за 8 соседей (что уменьшается на 0,1), а также из-за центральной ячейки, но ее нельзя уменьшить ниже .25 / .9 = 5/18. Для границы и угла ячейки количество уменьшается в 6 и 3 раза.

Следовательно

x1 = 700  # for debugging use lesser arrays
x2 = 1400

neighbors = 8 # each internal cell has 8 neighbors


for i in range(neighbors):
     view1 = arr[1:-1, 1:-1] # internal cells only
     arr [1:x1, 1:-1] = np.multiply(view1,.9, where = view1 > .25)

arr [1:-1, 1:-1] *= .9 

Границы и углы обрабатываются одинаково с соседями = 5 и 3 соответственно и разными видами. Я предполагаю, что все три случая могут быть объединены в одну формулу со сложным случаем, но ускорение будет умеренным, поскольку границы и углы занимают небольшую долю всех ячеек.

Здесь я использовал небольшой цикл, но это всего лишь 8 повторений. Следует также избавиться от цикла, используя функции power, log, integer part и max, в результате чего получается несколько неуклюжий, но несколько более быстрый однострочник, что-то вокруг

      numpy.multiply( view1, x ** numpy.max( numpy.ceil( (numpy.log (* view1/x... / log(.9)

Мы также можем попробовать другую полезную технику, векторизацию. Векторизация создает функцию, которая затем может быть применена ко всем элементам массива.

Для изменения, давайте предустановим поля / пороги, чтобы узнать точный коэффициент для умножения. Вот как выглядит код

n = 8
decrease_by = numpy.logspace(1,N,num=n, base=x, endpoint=False)

margins = decrease_by * .25

# to do : save border rows for further analysis, skip this for simplicity now    
view1 = a [1: -1, 1: -1]

def decrease(x):


    k = numpy.searchsorted(margin, a)
    return x * decrease_by[k]

f = numpy.vectorize(decrease)
f(view1)

Замечание 1 Можно попытаться использовать разные комбинации подходов, например, использовать предварительно вычисленные поля с матричной арифметикой, а не векторизацию. Возможно, есть еще несколько хитростей, чтобы немного ускорить каждое из вышеуказанных решений или комбинаций выше.

Замечание 2 PyTorch во многом похож на функциональность Numpy, но может значительно выиграть от использования графического процессора. Если у вас приличный GPU, рассмотрите PyTorch. Были попытки покурить numpy на основе gpu (глюон, заброшенный gnumpy, minpy) Подробнее о https://stsievert.com/blog/2016/07/01/numpy-gpu/ на gpu


РЕДАКТИРОВАТЬ: ах, я вижу, что когда вы говорите «уменьшить», вы имеете в виду умножение, а не вычитание. Я также не смог распознать, что вы хотите уменьшить сложность, чего не делает это решение. Так что это неправильно, но я оставлю это на всякий случай, если это будет полезно.

Вы можете сделать это векторизованным способом, используя scipy.signal.convolve2d :

import numpy as np
from scipy.signal import convolve2d

arr = np.random.rand(720, 1440)

mask = np.zeros((arr.shape[0] + 2, arr.shape[1] + 2))
mask[1:-1, 1:-1] = arr
mask[mask < 0.25] = 0
conv = np.ones((3, 3))
conv[1, 1] = 0

arr -= 0.1 * convolve2d(mask, conv, mode='valid')

Это происходит из-за того, что вы думаете о своей проблеме с другой стороны: из каждого квадрата должно быть вычтено в 0,1 раза все окружающие значения. Массив conv кодирует это, и мы перемещаем его по массиву mask используя scipy.signal.convolve2d для накопления значений, которые должны быть вычтены.


Этот ответ предполагает, что вы действительно хотите сделать именно то, что написали в своем вопросе. Ну, почти точно, так как ваш код падает, потому что индексы выходят за пределы. Самый простой способ исправить это - добавить условия, такие как, например,

if x > 0 and y < y_max:
    arr[x-1][y+1] = ...

Причина, по которой основная операция не может быть векторизована с использованием numpy или scipy, заключается в том, что все ячейки «сокращены» некоторыми соседними ячейками, которые уже «сокращены». Numpy или scipy будут использовать незатронутые значения соседей в каждой операции. В моем другом ответе я покажу, как сделать это с помощью numpy, если вам разрешено группировать операции в 8 шагов, каждый в направлении одного конкретного соседа, но каждый из которых использует значение в этом шаге для этого соседа. Как я уже сказал, здесь я предполагаю, что вы должны действовать последовательно.

Прежде чем я продолжу, позвольте мне поменять местами x и y в вашем коде. Ваш массив имеет типичный размер экрана, где 720 - это высота, а 1440 - ширина. Изображения обычно хранятся в виде строк, и самый правый индекс в ndarray по умолчанию - тот, который изменяется быстрее, поэтому все имеет смысл. Это по общему признанию нелогично, но правильная индексация - arr[y, x] .

Основная оптимизация, которая может быть применена к вашему коду (которая сокращает время выполнения с ~ 9 с до ~ 3,9 с на моем Mac), заключается в том, чтобы не присваивать ячейку себе, когда в этом нет необходимости, в сочетании с умножением на месте и с [y, x] вместо [y][x] индексации. Как это:

y_size, x_size = arr.shape
y_max, x_max = y_size - 1, x_size - 1
for (y, x), value in np.ndenumerate(arr):
    reduce_by = value * 0.1
    if y > 0 and x < x_max:
        if arr[y - 1, x + 1] > 0.25: arr[y - 1, x + 1] *= reduce_by
    if x < x_max:
        if arr[y    , x + 1] > 0.25: arr[y    , x + 1] *= reduce_by
    if y < y_max and x < x_max:
        if arr[y + 1, x + 1] > 0.25: arr[y + 1, x + 1] *= reduce_by
    if y > 0:
        if arr[y - 1, x    ] > 0.25: arr[y - 1, x    ] *= reduce_by
    if y < y_max:
        if arr[y + 1, x    ] > 0.25: arr[y + 1, x    ] *= reduce_by
    if y > 0 and x > 0:
        if arr[y - 1, x - 1] > 0.25: arr[y - 1, x - 1] *= reduce_by
    if x > 0:
        if arr[y    , x - 1] > 0.25: arr[y    , x - 1] *= reduce_by
    if y < y_max and x > 0:
        if arr[y + 1, x - 1] > 0.25: arr[y + 1, x - 1] *= reduce_by

Другая оптимизация (которая сокращает время выполнения до ~ 3,0 с на моем Mac) состоит в том, чтобы избежать проверки границ, используя массив с дополнительными граничными ячейками. Нам не важно, какое значение содержит граница, потому что она никогда не будет использоваться. Вот код:

y_size, x_size = arr.shape
arr1 = np.empty((y_size + 2, x_size + 2))
arr1[1:-1, 1:-1] = arr
for y in range(1, y_size + 1):
    for x in range(1, x_size + 1):
        reduce_by = arr1[y, x] * 0.1
        if arr1[y - 1, x + 1] > 0.25: arr1[y - 1, x + 1] *= reduce_by
        if arr1[y    , x + 1] > 0.25: arr1[y    , x + 1] *= reduce_by
        if arr1[y + 1, x + 1] > 0.25: arr1[y + 1, x + 1] *= reduce_by
        if arr1[y - 1, x    ] > 0.25: arr1[y - 1, x    ] *= reduce_by
        if arr1[y + 1, x    ] > 0.25: arr1[y + 1, x    ] *= reduce_by
        if arr1[y - 1, x - 1] > 0.25: arr1[y - 1, x - 1] *= reduce_by
        if arr1[y    , x - 1] > 0.25: arr1[y    , x - 1] *= reduce_by
        if arr1[y + 1, x - 1] > 0.25: arr1[y + 1, x - 1] *= reduce_by
arr = arr1[1:-1, 1:-1]

Для записей, если бы операции можно было векторизовать с использованием numpy или scipy, ускорение по сравнению с этим решением было бы как минимум в 35 раз (измерено на моем Mac).

NB: если numpy выполнял операции над кусочками массива последовательно, следующее получило бы факториалы (т. Е. Произведения целых положительных чисел с точностью до числа) - но это не так:

>>> import numpy as np
>>> arr = np.arange(1, 11)
>>> arr
array([ 1,  2,  3,  4,  5,  6,  7,  8,  9, 10])
>>> arr[1:] *= arr[:-1]
>>> arr
array([ 1,  2,  6, 12, 20, 30, 42, 56, 72, 90])






numpy