python - airflow में लॉग के लिए s3 की स्थापना




amazon-s3 (4)

आपको airflow UI के माध्यम से s3 कनेक्शन स्थापित करने की आवश्यकता है। इसके लिए, आपको व्यवस्थापन पर जाना होगा -> एयरफ़्लो UI पर कनेक्शन टैब और अपने S3 कनेक्शन के लिए एक नई पंक्ति बनाएँ।

एक उदाहरण विन्यास होगा:

कोन आईडी: my_conn_S3

कोन प्रकार: S3

अतिरिक्त: {"aws_access_key_id": "your_aws_key_id", "aws_secret_access_key": "your_aws_secret_key"}

मैं स्केलेबल एयरफ्लो क्लस्टर स्थापित करने के लिए डॉक-कंपोज का उपयोग कर रहा हूं। मैंने इस डॉकरीफ़ाइल https://hub.docker.com/r/puckel/docker-airflow/ पर अपना दृष्टिकोण आधारित किया

मेरी समस्या s3 से लिखने / पढ़ने के लिए स्थापित लॉग हो रही है। जब एक डग पूरा हो जाता है तो मुझे इस तरह की त्रुटि मिलती है

*** Log file isn't local.
*** Fetching here: http://ea43d4d49f35:8793/log/xxxxxxx/2017-06-26T11:00:00
*** Failed to fetch log file from worker.

*** Reading remote logs...
Could not read logs from s3://buckets/xxxxxxx/airflow/logs/xxxxxxx/2017-06-
26T11:00:00

मैं इस तरह airflow.cfg फ़ाइल में एक नया खंड की airflow.cfg

[MyS3Conn]
aws_access_key_id = xxxxxxx
aws_secret_access_key = xxxxxxx
aws_default_region = xxxxxxx

और फिर airflow.cfg में रिमोट लॉग सेक्शन में s3 पथ निर्दिष्ट किया

remote_base_log_folder = s3://buckets/xxxx/airflow/logs
remote_log_conn_id = MyS3Conn

क्या मैंने इसे ठीक से स्थापित किया है और एक बग है? क्या यहां सफलता का कोई नुस्खा है जो मुझे याद आ रहा है?

-- अद्यतन करें

मैंने URI और JSON फॉर्मेट में एक्सपोर्ट करने की कोशिश की और न ही काम किया। मैंने फिर aws_access_key_id और aws_secret_access_key को निर्यात किया और फिर एयरफ़्लो ने इसे चुनना शुरू कर दिया। अब मुझे कार्यकर्ता लॉग में उसकी त्रुटि मिलती है

6/30/2017 6:05:59 PMINFO:root:Using connection to: s3
6/30/2017 6:06:00 PMERROR:root:Could not read logs from s3://buckets/xxxxxx/airflow/logs/xxxxx/2017-06-30T23:45:00
6/30/2017 6:06:00 PMERROR:root:Could not write logs to s3://buckets/xxxxxx/airflow/logs/xxxxx/2017-06-30T23:45:00
6/30/2017 6:06:00 PMLogging into: /usr/local/airflow/logs/xxxxx/2017-06-30T23:45:00

-- अद्यतन करें

मुझे यह लिंक https://www.mail-archive.com/[email protected]/msg00462.html

मैंने तब अपनी एक कार्यकर्ता मशीन (वेबसर्वर और शेड्यूलर से अलग) में गोलाबारी की और अजगर में इस बिट कोड को चलाया

import airflow
s3 = airflow.hooks.S3Hook('s3_conn')
s3.load_string('test', airflow.conf.get('core', 'remote_base_log_folder'))

मुझे यह त्रुटि प्राप्त हुई।

boto.exception.S3ResponseError: S3ResponseError: 403 Forbidden

मैंने कई अलग-अलग प्रकार के AIRFLOW_CONN_ envs को निर्यात करने की कोशिश की जैसा कि यहां बताया गया है कनेक्शन अनुभाग https://airflow.incubator.apache.org/concepts.html और इस प्रश्न के अन्य उत्तरों द्वारा।

s3://<AWS_ACCESS_KEY_ID>:<AWS_SECRET_ACCESS_KEY>@S3

{"aws_account_id":"<xxxxx>","role_arn":"arn:aws:iam::<xxxx>:role/<xxxxx>"}

{"aws_access_key_id":"<xxxxx>","aws_secret_access_key":"<xxxxx>"}

मैंने बिना किसी सफलता के AWS_ACCESS_KEY_ID और AWS_SECRET_ACCESS_KEY को भी निर्यात किया है।

ये क्रेडेंशियल एक डेटाबेस में संग्रहीत किए जा रहे हैं, इसलिए एक बार जब मैं उन्हें यूआई में जोड़ देता हूं तो उन्हें श्रमिकों द्वारा उठाया जाना चाहिए, लेकिन वे किसी कारण से लॉग लिखने / पढ़ने में सक्षम नहीं हैं।


क्या यह एयरफ्लो 10 के साथ क्यूब में काम कर रहा है। मेरे पास निम्नलिखित env var सेट हैं:

AIRFLOW_CONN_LOGS_S3=s3://id:[email protected]
AIRFLOW__CORE__REMOTE_LOGGING=True
AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://xxxx/logs
AIRFLOW__CORE__REMOTE_LOG_CONN_ID=logs_s3

(Airflow 1.10.2 के रूप में अद्यतन)

यदि आप व्यवस्थापक UI का उपयोग नहीं करते हैं तो यहां एक समाधान है

मेरा एयरफ़्लो एक निरंतर सर्वर पर नहीं चलता है ... (यह हर दिन एक डॉकटर कंटेनर में नए सिरे से लॉन्च किया जाता है, हर्को पर।) मुझे पता है कि मैं बहुत सारी शानदार विशेषताओं को याद कर रहा हूं, लेकिन अपने न्यूनतम सेटअप में, मैं कभी भी व्यवस्थापक UI या cfg फ़ाइल को न छुएं। इसके बजाय, मुझे एक बैश स्क्रिप्ट में एयरफ्लो-विशिष्ट पर्यावरण चर सेट करना होगा, जो .cfg फ़ाइल को ओवरराइड करता है।

अपाचे-हवा का प्रवाह [S3]

सबसे पहले, आपको अपने एयरफ़्लो लॉग को S3 में लिखने के लिए स्थापित s3 सबपेकेज की आवश्यकता है। ( boto3 आपके DAG के भीतर पायथन की नौकरियों के लिए ठीक काम करता है, लेकिन S3Hook पर निर्भर करता है।)

एक और पक्ष नोट: कोंडा इंस्टॉल अभी तक नहीं हुआ है , इसलिए मुझे pip install apache-airflow[s3] करना होगा।

पर्यावरण चर

बैश स्क्रिप्ट में, मैंने ये core चर सेट किए। इन निर्देशों से शुरू, लेकिन नामकरण सम्मेलन का उपयोग करते हुए AIRFLOW__{SECTION}__{KEY} पर्यावरण चर के लिए, मैं करता हूं:

export AIRFLOW__CORE__REMOTE_LOGGING=True
export AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://bucket/key
export AIRFLOW__CORE__REMOTE_LOG_CONN_ID=s3_uri
export AIRFLOW__CORE__ENCRYPT_S3_LOGS=False

S3 कनेक्शन आईडी

ऊपर s3_uri एक कनेक्शन आईडी है जिसे मैंने बनाया है। एयरफ्लो में, यह दूसरे पर्यावरण चर, AIRFLOW_CONN_S3_URI से मेल खाती है। इसका मूल्य आपका S3 पथ है, जिसे URI रूप में होना है। ऐसा इसलिए है

s3://access_key:[email protected]/key

हालाँकि आप इसे अन्य संवेदनशील वातावरण चर को संभालते हैं।

इस कॉन्फ़िगरेशन के साथ, एयरफ़्लो आपके लॉग को S3 में लिखने में सक्षम होगा। वे s3://bucket/key/dag/task_id/timestamp/1.log के पथ का अनुसरण करेंगे।

एयरफ्लो 1.8 से एयरफ्लो 1.10 में अपग्रेड करने पर परिशिष्ट

मैंने हाल ही में अपने उत्पादन पाइपलाइन को एयरफ्लो 1.8 से 1.9 तक, और फिर 1.10 में अपग्रेड किया। अच्छी खबर यह है कि परिवर्तन बहुत छोटे हैं; बाकी काम सिर्फ पैकेज इंस्टॉलेशन के साथ बारीकियों का पता लगा रहा था (S3 लॉग्स के मूल प्रश्न से असंबंधित)।

(1) सबसे पहले, मुझे एयरफ्लो 1.9 के साथ पायथन 3.6 में अपग्रेड करने की आवश्यकता थी।

(2) पैकेज का नाम airflow से apache-airflow में 1.9 के साथ बदल गया। आप अपने pip install में भी इसे चला सकते हैं।

(3) पैकेज psutil Airflow के लिए एक विशिष्ट संस्करण रेंज में होना चाहिए। आप इसे pip install apache-airflow कर सकते हैं जब आप pip install apache-airflow कर रहे हैं pip install apache-airflow

(4) एअरफ्लो 1.9+ के साथ पायथन 3-देव हेडर की आवश्यकता होती है।

(५) यहां मूल परिवर्तन हैं: export AIRFLOW__CORE__REMOTE_LOGGING=True अब आवश्यक है। तथा

(6) लॉग का S3 में थोड़ा अलग रास्ता है, जिसे मैंने उत्तर में अपडेट किया है: s3://bucket/key/dag/task_id/timestamp/1.log

लेकिन इतना ही! लॉग 1.9 में काम नहीं करते थे, इसलिए मैं सिर्फ 1.10 पर सीधे जाने की सलाह देता हूं, अब यह उपलब्ध है।


UPDATE Airflow 1.10 लॉगिंग को बहुत आसान बनाता है।

S3 लॉगिंग के लिए, उपरोक्त उत्तर के अनुसार कनेक्शन हुक सेट करें

और फिर बस airflow.cfg के लिए निम्नलिखित जोड़ें

    [core]
    # Airflow can store logs remotely in AWS S3. Users must supply a remote
    # location URL (starting with either 's3://...') and an Airflow connection
    # id that provides access to the storage location.
    remote_base_log_folder = s3://my-bucket/path/to/logs
    remote_log_conn_id = MyS3Conn
    # Use server-side encryption for logs stored in S3
    encrypt_s3_logs = False

Gcs लॉगिंग के लिए,

  1. पहले gcp_api पैकेज स्थापित करें, जैसे: पाइप अपाचे-एयरफ्लो स्थापित करें [gcp_api]।

  2. उपरोक्त उत्तर के अनुसार कनेक्शन हुक सेट करें

  3. निम्नलिखित को airflow.cfg में जोड़ें

    [core]
    # Airflow can store logs remotely in AWS S3. Users must supply a remote
    # location URL (starting with either 's3://...') and an Airflow connection
    # id that provides access to the storage location.
    remote_logging = True
    remote_base_log_folder = gs://my-bucket/path/to/logs
    remote_log_conn_id = MyGCSConn

नोट: एयरफ्लो के रूप में 1.9 रिमोट लॉगिंग में काफी बदलाव किया गया है यदि आप 1.9 का उपयोग कर रहे हैं, तो पढ़ें।

here संदर्भ

पूर्ण निर्देश:

  1. कॉन्फ़िगरेशन को संग्रहीत करने के लिए एक निर्देशिका बनाएं और इसे रखें ताकि यह PYTHONPATH में पाया जा सके। एक उदाहरण $ AIRFLOW_HOME / config है

  2. $ AIRFLOW_HOME / config / log_config.py और $ AIRFLOW_HOME / config / __ init__.py नामक खाली फाइलें बनाएँ

  3. airflow/config_templates/airflow_local_settings.py की सामग्री को उस लॉग_config.py फ़ाइल में कॉपी करें जो ऊपर दिए गए चरण में बनाया गया था।

  4. टेम्पलेट के निम्नलिखित भागों को अनुकूलित करें:

    #Add this variable to the top of the file. Note the trailing slash.
    S3_LOG_FOLDER = 's3://<bucket where logs should be persisted>/'
    
    Rename DEFAULT_LOGGING_CONFIG to LOGGING CONFIG
    LOGGING_CONFIG = ...
    
    Add a S3TaskHandler to the 'handlers' block of the LOGGING_CONFIG variable
    's3.task': {
        'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
        'formatter': 'airflow.task',
        'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
        's3_log_folder': S3_LOG_FOLDER,
        'filename_template': FILENAME_TEMPLATE,
    },
    
     Update the airflow.task and airflow.task_runner blocks to be 's3.task' instead >of 'file.task'.
    'loggers': {
        'airflow.task': {
            'handlers': ['s3.task'],
            ...
        },
        'airflow.task_runner': {
            'handlers': ['s3.task'],
            ...
        },
        'airflow': {
            'handlers': ['console'],
            ...
        },
    }
  5. सुनिश्चित करें कि उपरोक्त उत्तर के अनुसार, एयरफ्लो में एक s3 कनेक्शन हुक परिभाषित किया गया है। हुक को S3_LOG_FOLDER में ऊपर परिभाषित s3 बाल्टी तक पहुंच पढ़ना और लिखना चाहिए।

  6. अपडेट करने के लिए $ AIRFLOW_HOME / airflow.cfg अपडेट करें:

    task_log_reader = s3.task
    logging_config_class = log_config.LOGGING_CONFIG
    remote_log_conn_id = <name of the s3 platform hook>
  7. Airflow वेबसर्वर और शेड्यूलर को पुनरारंभ करें, और एक नया कार्य निष्पादन को ट्रिगर (या प्रतीक्षा करें) करें।

  8. सत्यापित करें कि लॉग आपके द्वारा परिभाषित बाल्टी में नए निष्पादित कार्यों के लिए दिखाई दे रहे हैं।

  9. सत्यापित करें कि UI में s3 संग्रहण दर्शक काम कर रहा है। एक नए निष्पादित कार्य को खींचें, और सत्यापित करें कि आप कुछ इस तरह देखते हैं:

    *** Reading remote log from gs://<bucket where logs should be persisted>/example_bash_operator/run_this_last/2017-10-03T00:00:00/16.log.
    [2017-10-03 21:57:50,056] {cli.py:377} INFO - Running on host chrisr-00532
    [2017-10-03 21:57:50,093] {base_task_runner.py:115} INFO - Running: ['bash', '-c', u'airflow run example_bash_operator run_this_last 2017-10-03T00:00:00 --job_id 47 --raw -sd DAGS_FOLDER/example_dags/example_bash_operator.py']
    [2017-10-03 21:57:51,264] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,263] {__init__.py:45} INFO - Using executor SequentialExecutor
    [2017-10-03 21:57:51,306] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,306] {models.py:186} INFO - Filling up the DagBag from /airflow/dags/example_dags/example_bash_operator.py




airflow