一、漏洞背景
Airflow是一個可編程,調度和監控的工作流平臺,它采用python語言編寫,基于有向無環圖(DAG),airflow可以定義一組有依賴的任務,按照依賴依次執行。airflow提供了豐富的命令行工具用于系統管控,而其web管理界面同樣也可以方便的管控調度任務,并且對任務運行狀態進行實時監控,方便了系統的運維和管理。
2020年7月14日,公布了CVE-2020-11978。按照cve mitre的描述,漏洞產生自Airflow自帶的示例DAG中。
二、漏洞分析
example_trigger_target_dag代碼
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.? See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.? The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.? You may obtain a copy of the License at
#
#?? http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.? See the License for the
# specific language governing permissions and limitations
# under the License.
import pprint
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
pp = pprint.PrettyPrinter(indent=4)
# This example illustrates the use of the TriggerDagRunOperator. There are 2
# entities at work in this scenario:
# 1. The Controller DAG - the DAG that conditionally executes the trigger
#??? (in example_trigger_controller.py)
# 2. The Target DAG - DAG being triggered
#
# This example illustrates the following features :
# 1. A TriggerDagRunOperator that takes:
#?? a. A python callable that decides whether or not to trigger the Target DAG
#?? b. An optional params dict passed to the python callable to help in
#????? evaluating whether or not to trigger the Target DAG
#?? c. The id (name) of the Target DAG
#?? d. The python callable can add contextual info to the DagRun created by
#????? way of adding a Pickleable payload (e.g. dictionary of primitives). This
#????? state is then made available to the TargetDag
# 2. A Target DAG : c.f. example_trigger_target_dag.py
dag = DAG(
??? dag_id="example_trigger_target_dag",
??? default_args={"start_date": days_ago(2), "owner": "Airflow"},
??? schedule_interval=None,
??? tags=['example']
)
def run_this_func(ds, **kwargs):
??? print("Remotely received value of {} for key=message".? ? ? ? format(kwargs['dag_run'].conf['message']))
run_this = PythonOperator(
??? task_id='run_this',
??? provide_context=True,
??? python_callable=run_this_func,
??? dag=dag,
)
# You can also access the DagRun object in templates
bash_task = BashOperator(
??? task_id="bash_task",
??? bash_command='echo "Here is the message: '
???????????????? '{{ dag_run.conf["message"] if dag_run else "" }}" ',
??? dag=dag,
)
??? print("Remotely received value of {} for key=message".format(context["dag_run"].conf["message"]))
run_this = PythonOperator(task_id="run_this", python_callable=run_this_func, dag=dag)
bash_task = BashOperator(
??? task_id="bash_task",
??? bash_command='echo "Here is the message: \'{{ dag_run.conf["message"] if dag_run else "" }}\'"',
??? dag=dag,
可以明顯看到example_trigger_target_dag的代碼中的
bash_command='echo "Here is the message: '
???????????????? '{{ dag_run.conf["message"] if dag_run else "" }}" '
? ? ?存在命令執行。也就是說,我們只要能夠控制dag_run.conf的數據,就能實現命令注入。
? ? ?而Airflow中,提供了一個Trigger DAG供開發者進行修改配置,也就是修改conf的模塊。此時,我們便可以構造命令注入payload從而進行命令執行。
三、漏洞復現
我們需要構造語句進行命令繞過注入,重寫conf構造形式如下
{"message":"'\";code;#"}
執行效果如下
四、影響版本
漏洞影響 Apache Airflow 小于1.10.11的版本
五、修復方式
1、 對管理界面添加身份認證
2、刪除受影響的example_trigger_target_dag和example_trigger_controller_dag
3、升級Apache Airflow到1.10.11