python - 集計 - rank method




ベクトル化によるλxの置換によるランキング関数の性能向上 (2)

私はnumpyを使って関数を作りたい
私はpandasグループ内で定義された各グループ内でこれを使うgroupby

def rnk(df):
    a = df.values.argsort(0)
    n, m = a.shape
    r = np.arange(a.shape[1])
    b = np.empty_like(a)
    b[a, np.arange(m)[None, :]] = np.arange(n)[:, None]
    return pd.DataFrame(b / n, df.index, df.columns)

gcols = ['date_id', 'category']
rcols = ['var_1', 'var_2', 'var_3']
df.groupby(gcols)[rcols].apply(rnk).add_suffix('_ranked')

   var_1_ranked  var_2_ranked  var_3_ranked
0      0.333333      0.809524      0.428571
1      0.160000      0.360000      0.240000
2      0.153846      0.384615      0.461538
3      0.000000      0.315789      0.105263
4      0.560000      0.200000      0.160000
...

使い方

  • 私はランキングがソートに関連していることを知っているので、私はこれをより速くするためにいくつかの賢いソートを使いたいです。
  • numpyargsortは配列をソートされた配列にスライスするために使用できる順列を生成します。

    a = np.array([25, 300, 7])
    b = a.argsort()
    print(b)
    
    [2 0 1]
    
    print(a[b])
    
    [  7  25 300]
  • そのため、代わりに、 argsortを使用して、1番目、2番目、および3番目にランク付けされた要素がどこにあるのかをargsortします。

    # create an empty array that is the same size as b or a
    # but these will be ranks, so I want them to be integers
    # so I use empty_like(b) because b is the result of 
    # argsort and is already integers.
    u = np.empty_like(b)
    
    # now just like when I sliced a above with a[b]
    # I slice u the same way but instead I assign to
    # those positions, the ranks I want.
    # In this case, I defined the ranks as np.arange(b.size) + 1
    u[b] = np.arange(b.size) + 1
    
    print(u)
    
    [2 3 1]
  • そしてそれはまさに正しかったです。 7は最後のポジションでしたが、私たちの最初のランクでした。 300位は2位で、3位でした。 25位が1位、2位だった。

  • 最後に、パーセンタイルを得るためにランクの中の数で割ります。 この例のように1ベースのnp.arange(1, n+1)またはnp.arange(n) + 1とは対照的に、ゼロベースのランク付けnp.arange(n)を使用したので、百分位数を取得するための単純な除算。
  • やるべきことは、このロジックを各グループに適用することです。 groupbygroupbypandasでこれができる
  • 欠けている詳細のいくつかは、私がどうやってargsort(0)を使って列ごとに独立したソートを行うのか、そしてそれぞれの列を独立して並べ替えるために細かいスライスをすることを含みます。

私たちはgroupbyを避けて、 groupbyことができますか?
njitいくつかのことをスピードアップするためにコンパイルするだけで、 numbaのジャストインタイムコンパイルも利用しnjit

from numba import njit

@njit
def count_factor(f):
    c = np.arange(f.max() + 2) * 0
    for i in f:
        c[i + 1] += 1
    return c

@njit
def factor_fun(f):
    c = count_factor(f)
    cc = c[:-1].cumsum()
    return c[1:][f], cc[f]

def lexsort(a, f):
    n, m = a.shape
    f = f * (a.max() - a.min() + 1)
    return (f.reshape(-1, 1) + a).argsort(0)


def rnk_numba(df, gcols, rcols):
    tups = list(zip(*[df[c].values.tolist() for c in gcols]))
    f = pd.Series(tups).factorize()[0]
    a = lexsort(np.column_stack([df[c].values for c in rcols]), f)
    c, cc = factor_fun(f)
    c = c[:, None]
    cc = cc[:, None]
    n, m = a.shape
    r = np.arange(a.shape[1])
    b = np.empty_like(a)
    b[a, np.arange(m)[None, :]] = np.arange(n)[:, None]
    return pd.DataFrame((b - cc) / c, df.index, rcols).add_suffix('_ranked')

使い方

  • 正直なところ、これは精神的に処理するのは困難です。 私は上で説明したことを拡張することに固執します。
  • もう一度argsortを使ってランキングを正しい位置に落としたいです。 しかし、私はグループ化コラムと対立しなければなりません。 だから私はtupleのリストをコンパイルし、 こここの質問で扱ったようにそれらをfactorize
  • 分解されたtupleのセットができたので、分解されたtupleグループ内でソートする修正されたlexsortを実行できます。 この質問はlexsortます。
  • 私がすべてのグループのために新鮮なランクを得るように私がそれぞれのグループのサイズによって新しい見つけられたランクを相殺しなければならないところで注意が必要なビットが残っている。 これは、以下のコードの小さなスニペットb - ccで処理されています。 しかし、 cc計算は必要な要素です。

だから、それはハイレベルの哲学の一部です。 @njitどうですか?

  • 因数分解すると、整数0からn - 1にマッピングします0ここで、 nは一意のグループ化tupleの数です。 カウントを追跡する便利な方法として、長さn配列を使用できます。
  • groupby offsetを達成するために、私はそれらがtuplesのリストまたはそれらのtupleの因数分解されたバージョンで表されるようにそれらのグループの位置でカウントと累積カウントを追跡する必要がありました。 因数分解された配列fを線形スキャンし、観測値をnumbaループで数えることにしました。 この情報を入手している間に、必要な累積オフセットを作成するために必要な情報も作成します。
  • numbaは、非常に効率的なコンパイル済み関数を作成するためのインターフェースを提供します。 それは微妙であり、あなたは何が可能で何が不可能であるかを知るために何らかの経験を習得しなければなりません。 私は、 numbaデコレータ@njit前に付けた2つの関数をnumbaすることにし@njit 。 このコードはこれらのデコレータなしでも同様に動作しますが、スピードアップされています。

タイミング

%%timeit 
ranked_cols = [col + '_ranked' for col in to_rank]

ranked = df[['date_id', 'category'] + to_rank].groupby(['date_id', 'category'], as_index = False).apply(lambda x: rank_fun(x, to_rank)) 
ranked.columns = ranked_cols        
ranked.reset_index(inplace = True)
ranked.set_index('level_1', inplace = True)  
1 loop, best of 3: 481 ms per loop

gcols = ['date_id', 'category']
rcols = ['var_1', 'var_2', 'var_3']

%timeit df.groupby(gcols)[rcols].apply(rnk_numpy).add_suffix('_ranked')
100 loops, best of 3: 16.4 ms per loop

%timeit rnk_numba(df, gcols, rcols).head()
1000 loops, best of 3: 1.03 ms per loop

実行に数分かかる、数百万行という多数の列に適用するランキング機能があります。 .rank(メソッドを適用するためのデータを準備しているロジックをすべて削除することによって、つまり、次のようにします。

ranked = df[['period_id', 'sector_name'] + to_rank].groupby(['period_id', 'sector_name']).transform(lambda x: (x.rank(ascending = True) - 1)*100/len(x))        

私はこれを何秒にもすることができた。 しかし、私は自分の論理を維持し、コードを再構築するのに苦労しています。最終的に、最大のボトルネックは私のlambda x:の二重使用ですが、明らかに他の側面が遅くなっています(下記参照)。 私は下記の私のランク付け機能と一緒にサンプルデータフレーム、すなわちMCVEを提供しました。 おおまかに言って、私の質問は次のようにまとめられると思います。

(i)コード内の.apply(lambda x使用法を高速のベクトル化された同等のものと置き換えるにはどうすればよいですか?)(ii)マルチインデックス、グループ化、データフレームをループして関数を適用する方法date_id列とcategory列のそれぞれ固有の組み合わせ。
(iii)ランキングの論理をスピードアップするために他に何ができるでしょうか? 主なオーバーヘッドは.value_counts()です。 これは上記の(i)と重なります。 おそらく、ランク付けのために送信する前に、一時的な列を作成することによって、このロジックのほとんどをdfで実行できます。 同様に、1回の呼び出しでサブデータフレームをランク付けできますか。
(iv) pd.qcut()ではなくpd.qcut()使用する理由 後者はcython化されており、より柔軟な関係の扱い方をしているようですが、両者の比較はできません、そしてpd.qcut()が最も広く使われているようです。

サンプル入力データは次のとおりです。

import pandas as pd
import numpy as np
import random

to_rank = ['var_1', 'var_2', 'var_3']
df = pd.DataFrame({'var_1' : np.random.randn(1000), 'var_2' : np.random.randn(1000), 'var_3' : np.random.randn(1000)})
df['date_id'] = np.random.choice(range(2001, 2012), df.shape[0])
df['category'] = ','.join(chr(random.randrange(97, 97 + 4 + 1)).upper() for x in range(1,df.shape[0]+1)).split(',')

2つのランク付け関数は以下のとおりです。

def rank_fun(df, to_rank): # calls ranking function f(x) to rank each category at each date
    #extra data tidying logic here beyond scope of question - can remove
    ranked = df[to_rank].apply(lambda x: f(x))
    return ranked


def f(x):
    nans = x[np.isnan(x)] # Remove nans as these will be ranked with 50
    sub_df = x.dropna() # 
    nans_ranked = nans.replace(np.nan, 50) # give nans rank of 50

    if len(sub_df.index) == 0: #check not all nan.  If no non-nan data, then return with rank 50
        return nans_ranked

    if len(sub_df.unique()) == 1: # if all data has same value, return rank 50
        sub_df[:] = 50
        return sub_df

    #Check that we don't have too many clustered values, such that we can't bin due to overlap of ties, and reduce bin size provided we can at least quintile rank.
    max_cluster = sub_df.value_counts().iloc[0] #value_counts sorts by counts, so first element will contain the max
    max_bins = len(sub_df) / max_cluster 

    if max_bins > 100: #if largest cluster <1% of available data, then we can percentile_rank
        max_bins = 100

    if max_bins < 5: #if we don't have the resolution to quintile rank then assume no data.
        sub_df[:] = 50
        return sub_df

    bins = int(max_bins) # bin using highest resolution that the data supports, subject to constraints above (max 100 bins, min 5 bins)

    sub_df_ranked = pd.qcut(sub_df, bins, labels=False) #currently using pd.qcut.  pd.rank( seems to have extra functionality, but overheads similar in practice
    sub_df_ranked *= (100 / bins) #Since we bin using the resolution specified in bins, to convert back to decile rank, we have to multiply by 100/bins.  E.g. with quintiles, we'll have scores 1 - 5, so have to multiply by 100 / 5 = 20 to convert to percentile ranking
    ranked_df = pd.concat([sub_df_ranked, nans_ranked])
    return ranked_df

そして、私のランキング関数を呼び出してdfと再結合するコードは、

# ensure don't get duplicate columns if ranking already executed
ranked_cols = [col + '_ranked' for col in to_rank]

ranked = df[['date_id', 'category'] + to_rank].groupby(['date_id', 'category'], as_index = False).apply(lambda x: rank_fun(x, to_rank)) 
ranked.columns = ranked_cols        
ranked.reset_index(inplace = True)
ranked.set_index('level_1', inplace = True)    
df = df.join(ranked[ranked_cols])

両方のlambda x呼び出しを削除することで、このランキングロジックをできるだけ早く取得しようとしています。 f(x)のロジックのみが適用されるようにrank_funのロジックを削除できますが、ベクトル化された方法でマルチインデックスデータフレームを処理する方法もわかりません。 追加の質問は、 pd.qcut(pd.qcut(との間の関係については異なるやり方があるようですが、.rank(は、このことは誤解を招く可能性があります。主な諸経費が私のlambda xの使用によるものだとすれば、

f(x)%lprunを実行したところ、次の結果が得られました。ただし、主なオーバーヘッドは、ベクトル化されたアプローチではなく.apply(lambda xの使用です。

行#ヒットあたりのヒット時間%タイムラインの内容

 2                                           def tst_fun(df, field):
 3         1          685    685.0      0.2      x = df[field]
 4         1        20726  20726.0      5.8      nans = x[np.isnan(x)]
 5         1        28448  28448.0      8.0      sub_df = x.dropna()
 6         1          387    387.0      0.1      nans_ranked = nans.replace(np.nan, 50)
 7         1            5      5.0      0.0      if len(sub_df.index) == 0: 
 8                                                   pass #check not empty.  May be empty due to nans for first 5 years e.g. no revenue/operating margin data pre 1990
 9                                                   return nans_ranked
10                                           
11         1        65559  65559.0     18.4      if len(sub_df.unique()) == 1: 
12                                                   sub_df[:] = 50 #e.g. for subranks where all factors had nan so ranked as 50 e.g. in 1990
13                                                   return sub_df
14                                           
15                                               #Finally, check that we don't have too many clustered values, such that we can't bin, and reduce bin size provided we can at least quintile rank.
16         1        74610  74610.0     20.9      max_cluster = sub_df.value_counts().iloc[0] #value_counts sorts by counts, so first element will contain the max
17                                               # print(counts)
18         1            9      9.0      0.0      max_bins = len(sub_df) / max_cluster #
19                                           
20         1            3      3.0      0.0      if max_bins > 100: 
21         1            0      0.0      0.0          max_bins = 100 #if largest cluster <1% of available data, then we can percentile_rank
22                                           
23                                           
24         1            0      0.0      0.0      if max_bins < 5: 
25                                                   sub_df[:] = 50 #if we don't have the resolution to quintile rank then assume no data.
26                                           
27                                               #     return sub_df
28                                           
29         1            1      1.0      0.0      bins = int(max_bins) # bin using highest resolution that the data supports, subject to constraints above (max 100 bins, min 5 bins)
30                                           
31                                               #should track bin resolution for all data.  To add.
32                                           
33                                               #if get here, then neither nans_ranked, nor sub_df are empty
34                                               # sub_df_ranked = pd.qcut(sub_df, bins, labels=False)
35         1       160530 160530.0     45.0      sub_df_ranked = (sub_df.rank(ascending = True) - 1)*100/len(x)
36                                           
37         1         5777   5777.0      1.6      ranked_df = pd.concat([sub_df_ranked, nans_ranked])
38                                               
39         1            1      1.0      0.0      return ranked_df

このコードを試してみることをお勧めします。 それはあなたの3倍速く、そしてより明確です。

ランク関数:

def rank(x):
    counts = x.value_counts()
    bins = int(0 if len(counts) == 0 else x.count() / counts.iloc[0])
    bins = 100 if bins > 100 else bins
    if bins < 5:
        return x.apply(lambda x: 50)
    else:
        return (pd.qcut(x, bins, labels=False) * (100 / bins)).fillna(50).astype(int)

シングルスレッドが適用されます。

for col in to_rank:
    df[col + '_ranked'] = df.groupby(['date_id', 'category'])[col].apply(rank)

複数のスレッドが適用されます。

import sys
from multiprocessing import Pool

def tfunc(col):
    return df.groupby(['date_id', 'category'])[col].apply(rank)

pool = Pool(len(to_rank))
result = pool.map_async(tfunc, to_rank).get(sys.maxint)
for (col, val) in zip(to_rank, result):
    df[col + '_ranked'] = val






ranking