python - विरल CSR सरणियों का आउट-ऑफ-कोर प्रसंस्करण



scipy apache-spark-mllib (1)

इसलिए मुझे जॉबलिब या डीस्क के बारे में कुछ भी पता नहीं है, अकेले अपने एप्लिकेशन विशिष्ट डेटा प्रारूप को दें। लेकिन विरल डेटा संरचना को बनाए रखते हुए डिस्क में विरल से विरल मैट्रिस पढ़ना वास्तव में संभव है।

जबकि CSR प्रारूप के लिए विकिपीडिया लेख यह बताता है कि यह कैसे काम करता है, मैं एक बढ़िया काम करता हूँ:

कुछ विरल मैट्रिक्स, उदा:

1 0 2
0 0 3
4 5 6

प्रत्येक गैर-मान को याद करके संग्रहीत किया जाता है और यह उस कॉलम में रहता है:

sparse.data    = 1 2 3 4 5 6  # acutal value
sparse.indices = 0 2 2 0 1 2  # number of column (0-indexed)

अब हम पंक्तियों को याद कर रहे हैं। संपीड़ित प्रारूप बस संग्रहीत करता है कि प्रत्येक पंक्ति में कितने गैर-शून्य मान हैं, बजाय प्रत्येक एकल मान पंक्ति को संग्रहीत करने के।

ध्यान दें कि नॉनजेरो-काउंट भी जमा हुआ है, इसलिए निम्न सरणी में इस पंक्ति तक और न होने तक गैर-शून्य मानों की संख्या शामिल है। आगे भी चीजों को जटिल करने के लिए, सरणी हमेशा 0 से शुरू होती है और इस प्रकार num_rows + 1 प्रविष्टियाँ होती हैं:

sparse.indptr = 0 2 3 6

तब तक ऊपर और दूसरी पंक्ति को शामिल करने के लिए 3 गैर-अक्षीय मान हैं, अर्थात् 1, 2 और 3।

चूंकि हमें इसका हल मिल गया है, इसलिए हम मैट्रिक्स को 'स्लाइसिंग' कर सकते हैं। लक्ष्य कुछ विखंडू के लिए डेटा, सूचकांकों और indptr सरणियों का निर्माण करना है। मान लें कि मूल विशाल मैट्रिक्स तीन बाइनरी फ़ाइलों में संग्रहीत है, जिसे हम आकस्मिक रूप से पढ़ेंगे। हम बार-बार कुछ चंक उपज करने के लिए एक जनरेटर का उपयोग करते हैं।

इसके लिए हमें यह जानना होगा कि प्रत्येक चंक में कितने गैर-शून्य मान हैं, और मानों और कॉलम-सूचकांकों के अनुसार राशि पढ़ें। नॉन-जीरो काउंट को आसानी से इंद्रप्रस्थ ऐरे से पढ़ा जा सकता है। यह वांछित इंडेंट फ़ाइल से संबंधित बड़ी मात्रा में प्रविष्टियों की कुछ मात्राओं को पढ़कर प्राप्त किया जाता है। Indptr फ़ाइल के उस हिस्से की अंतिम प्रविष्टि शून्य से शून्य मानों की संख्या को उस चंक में गैर शून्य की संख्या देती है। इसलिए चंक्स डेटा और इंडेक्स सरणियां सिर्फ बड़े डेटा और सूचकांकों की फाइलों से कटा हुआ हैं। Indptr सरणी को एक शून्य के साथ कृत्रिम रूप से तैयार करने की आवश्यकता है (यही प्रारूप चाहता है, मुझसे मत पूछो: D)।

तब हम सिर्फ एक स्पार्क्स मैट्रिक्स का निर्माण कर सकते हैं, जिसमें एक नया स्पार्स मैट्रिक्स प्राप्त करने के लिए चंक डेटा, सूचकांक और इंडिप्रेटर शामिल हैं।

यह ध्यान दिया जाना चाहिए कि वास्तविक मैट्रिक्स का आकार अकेले तीन सरणियों से सीधे खंगाला नहीं जा सकता है। यह या तो मैट्रिक्स का अधिकतम कॉलम इंडेक्स है, या यदि आप अशुभ हैं और अव्यक्त में कोई डेटा नहीं है। इसलिए हमें कॉलम की गिनती भी पास करनी होगी।

मैंने शायद चीजों को एक जटिल तरीके से समझाया, इसलिए इसे सिर्फ कोड के अपारदर्शी टुकड़े के रूप में पढ़ें, जो इस तरह के जनरेटर को लागू करता है:

import numpy as np
import scipy.sparse


def gen_batches(batch_size, sparse_data_path, sparse_indices_path, 
                sparse_indptr_path, dtype=np.float32, column_size=None):
    data_item_size = dtype().itemsize

    with open(sparse_data_path, 'rb') as data_file, \
            open(sparse_indices_path, 'rb') as indices_file, \
            open(sparse_indptr_path, 'rb') as indptr_file:
        nnz_before = np.fromstring(indptr_file.read(4), dtype=np.int32)

        while True:
            indptr_batch = np.frombuffer(nnz_before.tobytes() +
                              indptr_file.read(4*batch_size), dtype=np.int32)

            if len(indptr_batch) == 1:
                break

            batch_indptr = indptr_batch - nnz_before
            nnz_before = indptr_batch[-1]
            batch_nnz = np.asscalar(batch_indptr[-1])

            batch_data = np.frombuffer(data_file.read(
                                       data_item_size * batch_nnz), dtype=dtype)
            batch_indices = np.frombuffer(indices_file.read(
                                          4 * batch_nnz), dtype=np.int32)

            dimensions = (len(indptr_batch)-1, column_size)

            matrix = scipy.sparse.csr_matrix((batch_data, 
                           batch_indices, batch_indptr), shape=dimensions)

            yield matrix


if __name__ == '__main__':
    sparse = scipy.sparse.random(5, 4, density=0.1, format='csr', dtype=np.float32)

    sparse.data.tofile('sparse.data')        # dtype as specified above  ^^^^^^^^^^
    sparse.indices.tofile('sparse.indices')  # dtype=int32
    sparse.indptr.tofile('sparse.indptr')    # dtype=int32

    print(sparse.toarray())
    print('========')

    for batch in gen_batches(2, 'sparse.data', 'sparse.indices', 
                             'sparse.indptr', column_size=4):
        print(batch.toarray())

numpy.ndarray.tofile () केवल बाइनरी सरणियों को संग्रहीत करता है, इसलिए आपको डेटा प्रारूप को याद रखने की आवश्यकता है। scipy.sparse सूचकांकों का प्रतिनिधित्व करता है और int32 के रूप में Indptr, ताकि कुल मैट्रिक्स आकार के लिए एक सीमा है।

इसके अलावा, मैंने कोड को बेंचमार्क किया और पाया कि स्कैपी सीएसआर मैट्रिक्स कंस्ट्रक्टर छोटे मैट्रिसेस के लिए अड़चन है। आपका माइलेज अलग-अलग हो सकता है, यह सिर्फ एक 'सिद्धांत का प्रमाण' है।

अगर और अधिक परिष्कृत कार्यान्वयन की आवश्यकता है, या कुछ बहुत ही अड़चन है, तो बस मुझे मारें :)

पायथन का उपयोग करके डिस्क पर सहेजे गए विरल CSR सरणी के विखंडू पर समानांतर में कोई फ़ंक्शन कैसे लागू कर सकता है? क्रमिक रूप से यह किया जा सकता है उदाहरण के लिए joblib.dump साथ CSR सरणी को joblib.dump इसे joblib.load(.., mmap_mode="r") साथ joblib.load(.., mmap_mode="r") और एक-एक करके पंक्तियों के joblib.load(.., mmap_mode="r") संसाधित करें। क्या यह dask साथ अधिक कुशलता से किया जा सकता है?

विशेष रूप से, यह मानते हुए कि स्पार्स सरणियों पर किसी भी कोर ऑपरेशन से बाहर सभी संभावितों की आवश्यकता नहीं है, लेकिन समानांतर में पंक्ति विखंडू को लोड करने की क्षमता है (प्रत्येक चंक एक सीएसआर सरणी है) और उन्हें कुछ फ़ंक्शन लागू करें (मेरे मामले में) उदाहरण के तौर पर scikit-learn से estimator.predict(X) )।

इसके अलावा, क्या डिस्क पर एक फ़ाइल प्रारूप है जो इस कार्य के लिए उपयुक्त होगा? जॉबलिब काम करता है, लेकिन मैं सीएसआर सरणियों के (समानांतर) प्रदर्शन के बारे में निश्चित नहीं हूं कि मेमोरी मैप्स के रूप में लोड किया गया है; spark.mllib या तो कुछ कस्टम स्पार्स स्टोरेज फॉर्मेट (जो कि प्योर पाइथन पार्सर नहीं लगता) या LIBSVM फॉर्मेट का उपयोग करता प्रतीत होता है (scikit-learn में पार्सर मेरे अनुभव में, joblib.dump . joblib.dump बहुत धीमा है)। ।

नोट: मैंने documentation पढ़ा documentation , इसके बारे में विभिन्न मुद्दों https://github.com/dask/dask/ पर, लेकिन मुझे अभी भी यकीन नहीं है कि इस समस्या का सबसे अच्छा तरीका कैसे हो सकता है।

संपादित करें: अधिक व्यावहारिक उदाहरण देने के लिए, नीचे वह कोड है जो घने सरणियों के लिए काम करता है लेकिन इस त्रुटि के साथ विरल सरणियों का उपयोग करते समय विफल रहता है,

import numpy as np
import scipy.sparse

import joblib
import dask.array as da
from sklearn.utils import gen_batches

np.random.seed(42)
joblib.dump(np.random.rand(100000, 1000), 'X_dense.pkl')
joblib.dump(scipy.sparse.random(10000, 1000000, format='csr'), 'X_csr.pkl')

fh = joblib.load('X_dense.pkl', mmap_mode='r')

# computing the results without dask
results = np.vstack((fh[sl, :].sum(axis=1)) for sl in gen_batches(fh.shape[0], batch_size))

# computing the results with dask
x = da.from_array(fh, chunks=(2000))
results = x.sum(axis=1).compute()

Edit2: नीचे चर्चा के बाद, नीचे दी गई मिसाल पिछली त्रुटि को खत्म कर देती है, लेकिन dask/array/core.py:L3413 बारे में लोगों को dask/array/core.py:L3413 में IndexError: tuple index out of range dask/array/core.py:L3413 ,

import dask
# +imports from the example above
dask.set_options(get=dask.get)  # disable multiprocessing

fh = joblib.load('X_csr.pkl', mmap_mode='r')

def func(x):
    if x.ndim == 0:
        # dask does some heuristics with dummy data, if the x is a 0d array
        # the sum command would fail
        return x
    res = np.asarray(x.sum(axis=1, keepdims=True))
    return res

Xd = da.from_array(fh, chunks=(2000))
results_new = Xd.map_blocks(func).compute()




joblib