python - पायथन का उपयोग कर ES में कीवर्ड स्टोर करने के लिए थोक API का उपयोग कैसे करें




elasticsearch elasticsearch-bulk-api (2)

मुझे अपने पाइथन प्रोग्राम के साथ एलास्टिकशर्च एकीकृत करने में कुछ संदेश स्टोर करना होगा। अब मैं संदेश को स्टोर करने का प्रयास करता हूं:

d={"message":"this is message"}
    for index_nr in range(1,5):
        ElasticSearchAPI.addToIndex(index_nr, d)
        print d

इसका मतलब है कि अगर मेरे पास 10 संदेश हैं तो मुझे 10 बार अपना कोड दोहराना होगा। तो मैं क्या करना चाहता हूं स्क्रिप्ट फ़ाइल या बैच फ़ाइल बनाने का प्रयास करें। मैं लोचदार खोज गाइड की जांच करता हूं, बल्क एपीआई का उपयोग करना संभव है। http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/docs-bulk.html प्रारूप नीचे जैसा होना चाहिए:

{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_type" : "type1", "_id" : "2" } }
{ "create" : { "_index" : "test", "_type" : "type1", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_type" : "type1", "_index" : "index1"} }
{ "doc" : {"field2" : "value2"} }

मैंने क्या किया है:

{"index":{"_index":"test1","_type":"message","_id":"1"}}
{"message":"it is red"}
{"index":{"_index":"test2","_type":"message","_id":"2"}}
{"message":"it is green"}

मैं डॉक्टर को स्टोर करने के लिए कर्ल टूल का भी उपयोग करता हूं।

$ curl -s -XPOST localhost:9200/_bulk --data-binary @message.json

अब मैं फाइल को लोचदार खोज में संग्रहीत करने के लिए अपने पायथन कोड का उपयोग करना चाहता हूं।


(इस धागे में उल्लिखित अन्य दृष्टिकोण ईएस अपडेट के लिए पायथन सूची का उपयोग करते हैं, जो आज एक अच्छा समाधान नहीं है, खासकर जब आपको ईएस में लाखों डेटा जोड़ने की आवश्यकता होती है)

बेहतर दृष्टिकोण पाइथन जनरेटर का उपयोग कर रहा है - स्मृति से बाहर निकलने या गति पर समझौता किए बिना डेटा की प्रक्रियाओं को संसाधित करें

नीचे एक व्यावहारिक उपयोग मामले से एक उदाहरण स्निपेट है - विश्लेषण के लिए nginx लॉग फ़ाइल से डेटा को ES में जोड़ना।

def decode_nginx_log(_nginx_fd):
    for each_line in _nginx_fd:
        # Filter out the below from each log line
        remote_addr = ...
        timestamp   = ...
        ...

        # Index for elasticsearch. Typically timestamp.
        idx = ...

        es_fields_keys = ('remote_addr', 'timestamp', 'url', 'status')
        es_fields_vals = (remote_addr, timestamp, url, status)

        # We return a dict holding values from each line
        es_nginx_d = dict(zip(es_fields_keys, es_fields_vals))

        # Return the row on each iteration
        yield idx, es_nginx_d   # <- Note the usage of 'yield'

def es_add_bulk(nginx_file):
    # The nginx file can be gzip or just text. Open it appropriately.
    ...

    es = Elasticsearch(hosts = [{'host': 'localhost', 'port': 9200}])

    # NOTE the (...) round brackets. This is for a generator.
    k = ({
            "_index": "nginx",
            "_type" : "logs",
            "_id"   : idx,
            "_source": es_nginx_d,
         } for idx, es_nginx_d in decode_nginx_log(_nginx_fd))

    helpers.bulk(es, k)

# Now, just run it.
es_add_bulk('./nginx.1.log.gz')

यह कंकाल जनरेटर के उपयोग को दर्शाता है। यदि आपको आवश्यकता हो तो आप इसे एक नंगे मशीन पर भी इस्तेमाल कर सकते हैं। और आप अपनी जरूरतों को पूरा करने के लिए इस पर विस्तार करने जा सकते हैं।

पाइथन Elasticsearch here संदर्भ।


हालांकि @justinachen के कोड ने मुझे पाई-लोचदार खोज से शुरू करने में मदद की, स्रोत कोड में देखने के बाद मुझे एक सरल सुधार करने दो:

es = Elasticsearch()
j = 0
actions = []
while (j <= 10):
    action = {
        "_index": "tickets-index",
        "_type": "tickets",
        "_id": j,
        "_source": {
            "any":"data" + str(j),
            "timestamp": datetime.now()
            }
        }
    actions.append(action)
    j += 1

helpers.bulk(es, actions)

helpers.bulk() पहले से ही आपके लिए विभाजन करता है। और विभाजन से मेरा मतलब है कि चक्स हर बार सर्वर पर भेजे जाते हैं। यदि आप भेजे गए दस्तावेज़ों के हिस्से को कम करना चाहते हैं तो: helpers.bulk(es, actions, chunk_size=100)

शुरू करने के लिए कुछ आसान जानकारी:

helpers.bulk() केवल helpers.streaming_bulk का एक रैपर है लेकिन पहले एक सूची स्वीकार करता है जो इसे आसान बनाता है।

helpers.streaming_bulk Elasticsearch.bulk() पर आधारित है, इसलिए आपको चिंता करने की आवश्यकता नहीं है कि क्या चुनना है।

तो ज्यादातर मामलों में helpers.bulk() आपको बस चाहिए।





elasticsearch-bulk-api