加入收藏 | 设为首页 | 会员中心 | 我要投稿 源码门户网 (https://www.92codes.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 站长学院 > MsSql教程 > 正文

mssql 运算符 Airflow 101:?隐藏小技巧帮你快速上手!

发布时间:2022-10-12 19:00:33 所属栏目:MsSql教程 来源:互联网
导读: 今日份知识你摄入了么?
目录
1.目标受众
本文的目标受众是那些对 Airflow 有少量经验、或没有经验,并希望快速上手的人。
2. 背景
Airflow
Crom不再用于 ETL 调度

关键特性:任务相关性

今日份知识你摄入了么?

目录

1.目标受众

本文的目标受众是那些对 Airflow 有少量经验、或没有经验,并希望快速上手的人。

2. 背景

Airflow

Crom不再用于 ETL 调度

关键特性:任务相关性

数据操作友好型:监控并提醒你的数据管道

图源:

如果数据不是最新的,则触发 data_freshness_failure_alert,会让一条消息发布到slack平台:

使用 PostToSlack 操作符可以通过 Airflow 将消息发布至 Slack

我的动力/动机

在我过去参与的五六个项目中,同样的问题一次又一次地浮现在我的脑海中——“如果数据管道出现故障,我们是否可以设置某种监控和提醒来通知我们?比如,通过 slack 或电子邮件?”

Airflow 通常被人们推荐为候选解决方案,但讨论也到此为止了。但是,想法是永远不会自己实现的。

所以我决定自己动手……

3. 入门

3.1. 托管程序 (Managed Provider)VS.本地安装 (Local Install):本地安装可快速帮助你进行实际操作

要避免白忙一场

3.2. 本地安装步骤

安装 Airflow l 的步骤

接下来:我将介绍一些核心的Airflow概念/理念。

4. Airflow概念

Airflow 中的主要概念要么会被描述为核心理念,要么被描述为附加功能。

4.1. 核心理念

DAG (有向无环图)

DAG 是 Airflow 的核心概念,它将任务组合在一起,通过相关性和关系组织起来,用于说明如何运行各项任务。这是从 Airflow 的文档中提取的一个基本示例图:

这个 DAG示例图定义了四项任务 (A、B、C 和 D),并规定了运行的顺序,并描述了任务相关性。同时,还需要定义DAG的运行频率,例如“从明天开始每 5 分钟一次”。

运算符

运算符定义了针对某项任务而执行的操作。Airflow 中两个捆绑运算符包括:

如果你需要的操作符没有默认安装在 Airflow里面,你可以在庞大的社区 Provider Package中找到。有一些运算符非常受人们欢迎,包括:

4.2. 附加功能

除了Airflow 的核心对象之外,还有许多相应的功能用于支持某些操作。

连接 (Connections)和挂钩 (Hooks)

连接 (Connections)

挂钩(Hooks)

XComs

变量 (Varibles)

接下来,我将展示一些DAG示例 ,例如让 Airflow 和Slack 、MSSQL 交互。

5. DAG 示例

在你设置了 Airflow 环境 (本地安装或托管提供程序都可)之后,你就可以继续创建你的Airflow 工作流程了。

在开始之前,我们可以查看一些 Airflow DAG示例,了解每个 DAG 所涉及的内容。在这篇文章中,我将简单介绍一下一下我整理的DAGs:

现在,让我们从样板 DAG 开始。

5.1. 样板 DAG

下面显示的是我开发 Airflow DAG 的初步操作:

这里需要为新手指出一些关键的部分:

1. Python Airflow模块

你通常至少会使用到以下 Python Airflow 模块:

请注意,我把导入 PythonOperator 和 Airflow 变量模块的调用comment掉了,因为虽然这两者非常常见,但并非总是这么用(具体取决于你的用法)。

2.default_args (默认参数)

3.Airflow任务 (例如,example_task)

5.2. 样板 MSSQL 查询 DAG

下面显示的是 Airflow DAG 执行 MSSQL DB 查询所需的骨架结构:

其中需要注意的重要事项为:

1. 任务:mssql_select_all_query

上述 DAG 的关键是在任务 mssql_select_all_query 中执行 (显示在脚本底部,如下所示),其中,脚本使用了 Python 模块 pymssql 执行 MSSQL 查询。

该任务简单使用了 PythonOperator 去调用 Python 函数 mssql_query (代码如下所示):

2. DAG 脚本与用于 Python mssql_query 函数的代码非常相似

3. Python Airflow模块

请注意脚本顶部使用的 2 个 Airflow 模块:

正如在第一个样板 dag 的注释中所提到的,这两个模块很常用,但在该脚本中的注释为占位符。而在这里,确实使用了这两个模块!

特别值得指出的是,变量模块已用于检索数据库凭据,这些凭据可用于查询。

5.2. “发布到 Slack” DAG

这是一个非常简单/精简的脚本,用于将所需内容发布到 slack。例如:

1. SlackWebhookOperator

要在Slack上发布消息,你需要使用 Slack Webhook。此外,你还需要使用 Python Airflow 运算符—SlackWebhookOperator,在脚本顶部导入:

2. slack_token

3. 用于Airflow连接的密码字符串

5.3. 任务失败时的“发布到 Slack”DAG

之前的 DAG 只是演示了如何从将Airflow 的消息发送至 Slack,而下面的 DAG 则包含Airflow 任务失败发送消息至 Slack 的功能。

下面的 DAG 是一个简单的例子,如果函数 read_op 中变量 i 的值为 1,则调用任务失败。

当任务失败时,会向 Slack 发布一条消息:

上面有一些关键事项:

1. on_failure_callback

这个脚本中要指出的关键是一个任务选项,on_failure_callback

该默认选项表示,“如果 Airflow 任务失败,则调用 Python 函数 slack_failure_msg” (如下所示)

2. 返回 failure_alert.execute(context=context)

这与最初的“post to DAG”脚本之间的另一个区别在于返回的对象。可以看到,在调用 Python 函数时,就会执行 Airflow failure_alert。最终发布类似于以下内容的帖子:

6. Airflow CLI

Airflow 的 CLI 可以让操作在UI上通过终端执行。这对于编写脚本/自动化活动非常有用。下表描述了一些我觉得很有用的 CLI 命令示例:

7. 注意事项、建议和一些提示

1. Airrflow 2.0 的变化——Airflow 核心和提供程序

在 Airflow 版本 1 中,有一个“统一”包mssql 运算符,用于构建外部服务的导入模块,例如AWS、GCP 等。

但是,在版本 2 更改中,Airflow 的程序包结构 (package structure)重新成为设计的核心,目前为止,有61个不同的程序包。每个程序包能用于某个外部服务 (Google、Amazon、Microsoft、Snowflake)、数据库 (Postgres、MySQL)或协议 (HTTP/FTP)

因此,我们在利用外部服务库提出的在线解决方案需要特别注意,并注意它们所针对的 Airflow 版本。

2. 在线提出解决方案时,注意所用的Airflow版本

从 S3 → AWS 连接类型的转变

尽管现在普遍改用 AWS 连接,但很多 Airflow Hooks 还在用 S3 连接。有大量挂钩尚未更新,提供断回兼容性 (break-back compatibility),例如 MSSQL 插件操作符 mssql_to_s3_operator

同样,这会导致 redshift_to_s3_operator 出现问题

所以我的建议是,注意在线解决方案的日期/最新更新日期。

2. XComs

最初研究 Airflow 时,我们并没有真正关注 XComs 的功能。

但结果是,我经常将XComs用于任务之间的传递输出,并且经常这样做。

下面显示的是上面列出的 DAG 的片段,“在任务失败时发布到 Slack DAG”,这个过程就用到了 XComs:

作为初步操作和示例,Python 函数返回的输出 (即上面代码中的 return eg_op)将用作 XCom 值示例,在这个过程中,Airflow 任务中将调用该函数。

在后续的 Python 函数中,需要执行以下操作,读取 Airflow 任务的输出:

3. 常用Airflow配置选项

你可以用 airflow.cfg 文件配置你的 Airflow 环境,该文件位于 ${AIRFLOW_HOME}/airflow.cfg 中。下面这些参数很有用:

通用Airflow配置选项

4. 时区感知 DAG

你可以在Airflow配置中设置默认时区。但是,如需在 DAG 代码中使用本地时区,你需要了解 DAG 时区。

Airflow 是一个调度程序,由于“下一个执行日期”和“开始日期”这些选项很重要,DAG 中的代码会使用 UTC 时区运行。要使你的DAG感知时区,你需要使用 Python 钟摆模块 (pendulum module),它能提供start_date的时区感知。请参阅以下示例:

5. 一些提示

1. Airflow:非常冷门的提示、技巧和最佳实践

* 使用默认参数避免重复

* 使用 PythonOperator 时的,在连接中存储敏感数据以及 “context”字典

2. Airflow注意事项

* “不要更改 start_date + interval”

* “开发过程中刷新 DAG”,“不要忘记启动调度程序”

不更改 start_date 和 interval ,对于新手来说尤其重要。

3. Airflow提示、技巧和注意事项

我建议你在构建第一个 DAG 并掌握主要概念以后阅读本文。本文提供了一些有用的概念,如任务状态回调 (Tasks States Callbacks),并提供了关于 UTC 日期问题的描述,都非常有用。

感谢阅读

(编辑:源码门户网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!