# flink-jobs-launcher **Repository Path**: wyspgbj/flink-jobs-launcher ## Basic Information - **Project Name**: flink-jobs-launcher - **Description**: flink-jobs应用程序启动器类库,玩转flink - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 11 - **Created**: 2021-10-12 - **Last Updated**: 2021-10-12 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # flink-jobs-launcher ## 介绍 flink-jobs-launcher是[flink-jobs](https://gitee.com/tenmg/flink-jobs)应用程序启动器类库,可用于启动flink-jobs或普通flink作业,通过flink-jobs-launcher可将flink快速集成到现有基于Java实现的系统中,还可以通过XML格式的配置文件玩转Flink SQL。 ## 起步 添加依赖(以Maven项目为例): ``` cn.tenmg flink-jobs-launcher ${flink-jobs-launcher.version} ``` 调用XMLConfigLoader的load方法加载XML配置文件并提交给启动器执行: ``` FlinkJobs flinkJobs = XMLConfigLoader.getInstance().load(AppTest.class.getClassLoader().getResourceAsStream("flink-jobs.xml")); CommandLineFlinkJobsLauncher flinkJobsLauncher = new CommandLineFlinkJobsLauncher(); flinkJobsLauncher.setFlinkHome("D:\\Programs\\flink-1.8.3"); flinkJobsLauncher.setAction(Action.RUN); FlinkJobsApplicationInfo appInfo = flinkJobsLauncher.launch(flinkJobs); ``` 或 ``` FlinkJobs flinkJobs = XMLConfigLoader.getInstance() .load("\r\n" + "\r\n" + ""); CommandLineFlinkJobsLauncher flinkJobsLauncher = new CommandLineFlinkJobsLauncher(); flinkJobsLauncher.setFlinkHome("/opt/flink-1.13.1"); flinkJobsLauncher.setAction(Action.RUN); FlinkJobsApplicationInfo appInfo = flinkJobsLauncher.launch(flinkJobs); ``` ## 快速入门 详见https://gitee.com/tenmg/flink-jobs-launcher-quickstart ## 任务配置 ### `` flink-jobs是flink-jobs任务XML配置文件的根节点,需注意必须配置正确的命名空间,通常结构如下: ``` ``` 相关属性及说明: 属性 | 类型 | 必需 | 说明 ------------|----------------------|----|-------- jar | `String` | 否 | 运行的JAR包。可通过配置文件的`flink.jobs.default.jar`配置指定默认运行的JAR包。 class | `String` | 否 | 运行的主类。可通过配置文件的`flink.jobs.default.class`配置指定默认运行的主类。 serviceName | `String` | 否 | 运行的服务名称。该名称由用户定义并实现根据服务名称获取服务的方法,[flink-jobs](https://gitee.com/tenmg/flink-jobs)则在运行时调用并确定运行的实际服务。在运行SQL任务时,通常通过flink-jobs内的其他标签(如``)指定操作,而无需指定serviceName。 runtimeMode | `String` | 否 | 运行模式。可选值:"BATCH"/"STREAMING"/"AUTOMATIC",相关含义详见[Flink](https://flink.apache.org)官方文档。 #### `` 运行选项配置,用于指定flink程序的运行选项。 属性 | 类型 | 必需 | 说明 ----------|----------|----|-------- keyPrefix | `String` | 否 | 运行选项的默认前缀,默认为“--”。 ##### ``或``。 #### `` 参数查找表配置。通常可用于SQL中,也可以在[flink-jobs](https://gitee.com/tenmg/flink-jobs)应用程序自定义的服务中通过arguments参数获取。 ##### `` 特定参数配置。 属性 | 类型 | 必需 | 说明 ------|----------|----|-------- name | `String` | 是 | 参数名。 value | `String` | 否 | 参数值。 #### `` 运行基于Beanshell的java代码的配置。 属性 | 类型 | 必需 | 说明 -------|-----------|----|-------- saveAs | `String` | 否 | 操作结果另存为一个新的变量的名称。变量的值是基于Beanshell的java代码的返回值(通过`return xxx;`表示)。 ##### `` 基于Beanshell的java代码使用的变量声明配置。 属性 | 类型 | 必需 | 说明 ------|--------|----|-------- name | `String` | 是 | Beanshell中使用的变量名称 value | `String` | 否 | 变量对应的值的名称。默认与name相同。[flink-jobs](https://gitee.com/tenmg/flink-jobs)会从参数查找表中查找名称为value值的参数值,如果指定参数存在且不是null,则该值作为该参数的值;否则,使用value值作为该变量的值。 ##### `` java代码。采用文本表示,如:`java code`或``。注意:使用泛型时,不能使用尖括号声明泛型。例如,使用Map不能使用“Map map = new HashMap();”,但可以使用“Map map = new HashMap();”。 #### `` 运行基于[DSL](https://gitee.com/tenmg/dsl)的SQL代码配置。 属性 | 类型 | 必需 | 说明 -----------|--------|----|-------- saveAs | `String` | 否 | 操作结果另存为一个新的变量的名称。变量的值是flink的`tableEnv.executeSql(statement);`的返回值。 dataSource | `String` | 否 | 使用的数据源名称。这里的数据源是在[flink-jobs](https://gitee.com/tenmg/flink-jobs)应用程序的配置文件中配置,并非在flink-jobs-launcher应用程序的配置文件中配置。详见[flink-jobs数据源配置](https://gitee.com/tenmg/flink-jobs#%E6%95%B0%E6%8D%AE%E6%BA%90%E9%85%8D%E7%BD%AE)。 catalog | `String` | 否 | 执行SQL使用的Flink SQL的catalog名称。 script | `String` | 否 | 基于[DSL](https://gitee.com/tenmg/dsl)的SQL脚本。采用文本表示,如:`SQL code`或``。由于Flink SQL不支持DELETE、UPDATE语句,因此如果配置的SQL脚本是DELETE或者UPDATE语句,该语句将在程序main函数中采用JDBC执行。 #### `` 运行基于[DSL](https://gitee.com/tenmg/dsl)的SQL查询代码配置。 属性 | 类型 | 必需 | 说明 -----------|--------|----|-------- saveAs | `String` | 否 | 查询结果另存为临时表的表名及操作结果另存为一个新的变量的名称。变量的值是flink的`tableEnv.executeSql(statement);`的返回值。 catalog | `String` | 否 | 执行SQL使用的Flink SQL的catalog名称。 script | `String` | 否 | 基于[DSL](https://gitee.com/tenmg/dsl)的SQL脚本。采用文本表示,如:`SQL code`或``。 #### `` 运行基于[DSL](https://gitee.com/tenmg/dsl)的JDBC SQL代码配置。目标JDBC SQL代码是在[flink-jobs](https://gitee.com/tenmg/flink-jobs)应用程序的main函数中运行的。 属性 | 类型 | 必需 | 说明 -----------|----------|----|-------- saveAs | `String` | 否 | 执行结果另存为一个新的变量的名称。变量的值是执行JDBC指定方法的返回值。 dataSource | `String` | 是 | 使用的数据源名称。这里的数据源是在flink-jobs应用程序的配置文件中配置,并非在flink-jobs-launcher应用程序的配置文件中配置。详见[flink-jobs数据源配置](https://gitee.com/tenmg/flink-jobs#%E6%95%B0%E6%8D%AE%E6%BA%90%E9%85%8D%E7%BD%AE)。 method | `String` | 否 | 调用的JDBC方法。默认是"executeLargeUpdate"。 script | `String` | 是 | 基于[DSL](https://gitee.com/tenmg/dsl)的SQL脚本。 ### XML配置示例 为了更好的理解flink-jobs的XML配置文件,以下提供几种常见场景的XML配置文件示例: #### 运行普通flink程序 ``` ``` #### 运行自定义服务 以下为一个自定义服务任务XML配置文件: ``` ``` #### 运行批处理SQL 以下为一个简单订单量统计SQL批处理任务XML配置文件: ``` 2021-01-01 2021-07-01 = :beginDate and o.business_date < :endDate group by cast(to_date(o.business_date) as date) ]]> = :beginDate and stats_date < :endDate ]]> ``` #### 运行流处理SQL 以下为通过Debezium实现异构数据库同步任务XML配置文件: ``` ``` ## 配置文件 默认的配置文件为`flink-jobs-launcher.properties`(注意:需在`classpath`下),可通过`flink-jobs-launcher-context-loader.properties`配置文件的`config.location`修改配置文件路径和名称。 属性 | 类型 | 必需 | 说明 -------------------------|----------|----|-------- flink.jobs.default.jar | `String` | 否 | 启动时默认向Flink提交的JAR包。即当任务配置中没有指定`jar`时,会采用此配置。 flink.jobs.default.class | `String` | 否 | 启动时默认向Flink提交运行的主类。即当任务配置中没有指定`class`时,会采用此配置。 ## 发布计划 计划将在1.1.2中发布以下功能: 标签 | 功能 | 说明 --------------|---------|-------- `` | 数据同步 | 实现基于Debezuim的数据同步,以便简化通过``实现的数据同步功能。 ## 参与贡献 1. Fork 本仓库 2. 新建 Feat_xxx 分支 3. 提交代码 4. 新建 Pull Request ## 相关链接 flink-jobs开源地址:https://gitee.com/tenmg/flink-jobs DSL开源地址:https://gitee.com/tenmg/dsl Flink官网:https://flink.apache.org Debezuim官网:https://debezium.io