气流中的execution_date:需要以变量形式访问

我真的是这个论坛的新手。 但是一段时间以来,我一直在为公司服务。 抱歉,这个问题听起来真的很蠢。

我正在使用BashOperators一堆编写管道。基本上,对于每个任务,我只想使用“ curl”调用REST api

这是我的管道的样子(非常简化的版本):

from airflow import DAG
from airflow.operators import BashOperator, PythonOperator
from dateutil import tz
import datetime

datetime_obj = datetime.datetime

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.datetime.combine(datetime_obj.today() - datetime.timedelta(1), datetime_obj.min.time()),
    'email': ['xxxx@xxx.xxx'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': datetime.timedelta(minutes=5),
}


current_datetime = datetime_obj.now(tz=tz.tzlocal())

dag = DAG(
    'test_run', default_args=default_args, schedule_interval=datetime.timedelta(minutes=60))

curl_cmd='curl -XPOST "'+hostname+':8000/run?st='+current_datetime +'"'


t1 = BashOperator(
    task_id='rest-api-1',
    bash_command=curl_cmd,
    dag=dag)

如果您注意到我在做current_datetime= datetime_obj.now(tz=tz.tzlocal())相反,我在这里想要的是“ execution_date”

如何直接使用'execution_date'并将其分配给我的python文件中的变量?

我遇到了访问args的一般问题。任何帮助将不胜感激。

谢谢

Roger asked 2020-08-09T00:59:55Z
6个解决方案
35 votes

dsds_nodash参数是模板。 您可以使用execution_date变量将任何模板中的macros作为datetime对象访问。 在模板中,您可以使用任何jinja2方法来对其进行操作。

将以下内容用作ds ds_nodash字符串:

# pass in the first of the current month
some_command.sh {{ execution_date.replace(day=1) }}

# last day of previous month
some_command.sh {{ execution_date.replace(day=1) - macros.timedelta(days=1) }}

如果只希望与执行日期等效的字符串,则ds将返回日期戳(YYYY-MM-DD),ds_nodash返回相同的日期而不带破折号(YYYYMMDD),依此类推。有关更多信息,请参见Api Docs中的macros


您的最终运算符将如下所示:

command = """curl -XPOST '%(hostname)s:8000/run?st={{ ds }}'""" % locals()
t1 = BashOperator( task_id='rest-api-1', bash_command=command, dag=dag)
Erik Schuchmann answered 2020-08-09T01:00:14Z
25 votes

PythonOperator构造函数采用'provide_context'参数(请参阅[https://pythonhosted.org/airflow/code.html]。)如果为True,则它将通过kwargs将许多参数传递给python_callable。 我相信kwargs ['execution_date']是您想要的。

像这样:

def python_method(ds, **kwargs):
    Variable.set('execution_date', kwargs['execution_date'])
    return

doit = PythonOperator(
    task_id='doit',
    provide_context=True,
    python_callable=python_method,
    dag=dag)

我不确定如何使用BashOperator进行操作,但是您可能会从以下问题开始:[https://github.com/airbnb/airflow/issues/775]

Ziggy Eunicien answered 2020-08-09T01:00:43Z
14 votes

我认为您不能从任务实例外部的气流上下文中为变量分配值,它们仅在运行时可用。 基本上,在气流中加载和执行dag时,有2个不同的步骤:

  • 首先,您的dag文件将被解释和解析。 它必须能够工作和编译,并且任务定义必须正确(无语法错误或任何错误)。 在此步骤中,如果您进行函数调用以填充某些值,则这些函数将无法访问气流上下文(例如,执行日期,如果要进行一些回填,则更多)。

  • 第二步是执行dag。 只有在第二步中,气流(context)提供的变量才可用,因为它们与dag的执行有关。

因此,您无法使用Airflow上下文初始化全局变量,但是,Airflow提供了多种机制来实现相同的效果:

  1. 在命令中使用jinja模板(它可以在代码中的字符串或文件中,都将被处理)。 您可以在此处获得可用模板的列表:[https://airflow.apache.org/macros.html#default-variables。]请注意,某些功能也可用,尤其是用于计算日期增量和日期格式。

  2. 使用在其中传递上下文的PythonOperator(带有context参数)。 这将允许您使用语法kwargs['<variable_name']访问同一模板。如果需要,可以从PythonOperator返回一个值,该值将存储在XCOM变量中,以后可在任何模板中使用。 使用以下语法访问XCOM变量:[https://airflow.apache.org/concepts.html#xcoms]

  3. 如果您编写自己的运算符,则可以使用dict context访问气流变量。

Babcool answered 2020-08-09T01:01:32Z
9 votes
def execute(self, context):
    execution_date = context.get("execution_date")

这应该在Operator的execute()方法内部

l0n3r4ng3r answered 2020-08-09T01:01:52Z
0 votes

要在PythonOperator的可调用函数中打印执行日期,您可以在Airflow脚本中使用以下内容,还可以如下添加start_timeend_time

def python_func(**kwargs):
    ts = kwargs["execution_date"]
    end_time = str(ts)
    start_time = str(ts.add(minutes=-30))

我已经将datetime值转换为字符串,因为我需要在SQL查询中传递它。 我们也可以使用它。

Aditi Srivastava answered 2020-08-09T01:02:17Z
0 votes

您可以考虑使用SimpleHttpOperator [https://airflow.apache.org/_api/airflow/operators/http_operator/index.html#airflow.operators.http_operator.SimpleHttpOperator。]发出HTTP请求非常简单。 您可以通过模板传递带有端点参数的execution_date。

gigkokman answered 2020-08-09T01:02:37Z
translate from https://stackoverflow.com:/questions/36730714/execution-date-in-airflow-need-to-access-as-a-variable