对PythonOperator的运行时配置 - python

我找不到任何有关如何使用通过“ -c”传递的JSON变量的有效文档,例如补余工作。

我一直在打印我的python任务** kwargs来查找,但是我仍然无法确定。 Provide_context =真

谁能指出我正确的方向?

所以,我想做的是:

airflow backfill mydag -c '{"override":"yes"}' -s 2018-12-01 -e 2018-12-12

我有一个PythonOperator:

PythonOperator(
    task_id = 'task_identifier',
    python_callable = 'run_task',
    op_kwargs = {
        'task_conf': task_config 
    },
    provide_context=True,
    dag = this_dag
)

在run_task中,我想访问重写变量:

def run_task(*args, **kwargs): 

    dag_run = kwargs.get('dag_run')
    logging.info(kwargs['dag_run'].conf.get('override'))

但我找不到访问此覆盖变量的方法

[2018-12-17 10:07:24,649] {models.py:1760} ERROR - 'NoneType' object has no attribute 'get'
Traceback (most recent call last):
  File "/home/daimonie/.local/lib/python3.6/site-packages/airflow/models.py", line 1659, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/home/daimonie/.local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 95, in execute
    return_value = self.execute_callable()
  File "/home/daimonie/.local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 100, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/home/daimonie/airflow/dags/dag_scheduled_queries.py", line 65, in run_query
    logging.info(kwargs['dag_run'].conf.get('override'))

编辑:我确实找到了一个配置设置和说明,似乎表明这些参数需要在代码中进行设置

# Whether to override params with dag_run.conf. If you pass some key-value pairs through `airflow backfill -c` or
# `airflow trigger_dag -c`, the key-value pairs will override the existing ones in params.
dag_run_conf_overrides_params=True

donot_pickle参数设置为False。

python参考方案

您需要做两件事。

kwargs['run_dag'].conf.get('override')替换为kwargs['dag_run'].conf.get('override')
同时更改签名
    从def run_task(*args, **kwargs):def run_task(**kwargs):

R'relaimpo'软件包的Python端口 - python

我需要计算Lindeman-Merenda-Gold(LMG)分数,以进行回归分析。我发现R语言的relaimpo包下有该文件。不幸的是,我对R没有任何经验。我检查了互联网,但找不到。这个程序包有python端口吗?如果不存在,是否可以通过python使用该包? python参考方案 最近,我遇到了pingouin库。

Python pytz时区函数返回的时区为9分钟 - python

由于某些原因,我无法从以下代码中找出原因:>>> from pytz import timezone >>> timezone('America/Chicago') 我得到:<DstTzInfo 'America/Chicago' LMT-1 day, 18:09:00 STD…

用大写字母拆分字符串,但忽略AAA Python Regex - python

我的正则表达式:vendor = "MyNameIsJoe. I'mWorkerInAAAinc." ven = re.split(r'(?<=[a-z])[A-Z]|[A-Z](?=[a-z])', vendor) 以大写字母分割字符串,例如:'我的名字是乔。 I'mWorkerInAAAinc”变成…

Python:同时在for循环中添加到列表列表 - python

我想用for循环外的0索引值创建一个新列表,然后使用for循环添加到相同的列表。我的玩具示例是:import random data = ['t1', 't2', 't3'] masterlist = [['col1', 'animal1', 'an…

Python sqlite3数据库已锁定 - python

我在Windows上使用Python 3和sqlite3。我正在开发一个使用数据库存储联系人的小型应用程序。我注意到,如果应用程序被强制关闭(通过错误或通过任务管理器结束),则会收到sqlite3错误(sqlite3.OperationalError:数据库已锁定)。我想这是因为在应用程序关闭之前,我没有正确关闭数据库连接。我已经试过了: connectio…