气流-Python文件不在同一DAG文件夹中 - python

我正在尝试使用Airflow执行一个简单的任务python。

from __future__ import print_function
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta


from pprint import pprint

seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
                                  datetime.min.time())

args = {
    'owner': 'airflow',
    'start_date': seven_days_ago,
}

dag = DAG(dag_id='python_test', default_args=args)


def print_context(ds, **kwargs):
    pprint(kwargs)
    print(ds)
    return 'Whatever you return gets printed in the logs'

run_this = PythonOperator(
    task_id='print',
    provide_context=True,
    python_callable=print_context,
    dag=dag)

例如,如果我尝试:

气流测试python_test打印2015-01-01

有用!

现在我想将def print_context(ds, **kwargs)函数放到其他python文件中。所以我创建了另一个文件:simple_test.py并更改:

run_this = PythonOperator(
    task_id='print',
    provide_context=True,
    python_callable=simple_test.print_context,
    dag=dag)

现在,我尝试再次运行:

气流测试python_test打印2015-01-01

好吧!它仍然有效!

但是如果我创建一个模块,例如,带有文件SimplePython.py的工作程序模块,请导入(from worker import SimplePython)它并尝试:

气流测试python_test打印2015-01-01

它给出了消息:

ImportError:没有名为worker的模块

问题:

  • 是否可以在DAG定义中导入模块?
  • Airflow + Celery将如何在工作节点之间分发所有必要的python源文件?
  • 参考方案

    您可以按照以下步骤打包DAG的依赖项:

    https://airflow.apache.org/concepts.html#packaged-dags

    为此,您可以创建一个zip文件,该文件在zip文件的根目录中包含dag,并在目录中解压缩其他模块。
    例如,您可以创建一个如下所示的zip文件:

    my_dag1.py
    my_dag2.py
    package1/__init__.py
    package1/functions.py
    

    Airflow将扫描该zip文件,并尝试加载my_dag1.py和my_dag2.py。它不会进入子目录,因为它们被认为是潜在的软件包。

    使用CeleryExecutor时,您需要手动同步DAG目录,Airflow不会为您解决这些问题:

    https://airflow.apache.org/configuration.html?highlight=scaling%20out%20celery#scaling-out-with-celery

    工作人员需要有权访问其DAGS_FOLDER,并且您需要通过自己的方式同步文件系统

    Python GPU资源利用 - python

    我有一个Python脚本在某些深度学习模型上运行推理。有什么办法可以找出GPU资源的利用率水平?例如,使用着色器,float16乘法器等。我似乎在网上找不到太多有关这些GPU资源的文档。谢谢! 参考方案 您可以尝试在像Renderdoc这样的GPU分析器中运行pyxthon应用程序。它将分析您的跑步情况。您将能够获得有关已使用资源,已用缓冲区,不同渲染状态上…

    Python:图像处理可产生皱纹纸效果 - python

    也许很难描述我的问题。我正在寻找Python中的算法,以在带有某些文本的白色图像上创建皱纹纸效果。我的第一个尝试是在带有文字的图像上添加一些真实的皱纹纸图像(具有透明度)。看起来不错,但副作用是文本没有真正起皱。所以我正在寻找更好的解决方案,有什么想法吗?谢谢 参考方案 除了使用透明性之外,假设您有两张相同尺寸的图像,一张在皱纹纸上明亮,一张在白色背景上有深…

    Python uuid4,如何限制唯一字符的长度 - python

    在Python中,我正在使用uuid4()方法创建唯一的字符集。但是我找不到将其限制为10或8个字符的方法。有什么办法吗?uuid4()ffc69c1b-9d87-4c19-8dac-c09ca857e3fc谢谢。 参考方案 尝试:x = uuid4() str(x)[:8] 输出:"ffc69c1b" Is there a way to…

    Python:无法识别Pip命令 - python

    这是我拍摄的屏幕截图。当我尝试在命令提示符下使用pip时,出现以下错误消息:pip无法识别为内部或外部命令,可操作程序或批处理文件。我已经检查了这个线程:How do I install pip on Windows?我所能找到的就是我必须将"C:\PythonX\Scripts"添加到我的类路径中,其中X代表python版本。如您在我的…

    Python sqlite3数据库已锁定 - python

    我在Windows上使用Python 3和sqlite3。我正在开发一个使用数据库存储联系人的小型应用程序。我注意到,如果应用程序被强制关闭(通过错误或通过任务管理器结束),则会收到sqlite3错误(sqlite3.OperationalError:数据库已锁定)。我想这是因为在应用程序关闭之前,我没有正确关闭数据库连接。我已经试过了: connectio…