- 发布日期:2025-01-04 15:47 点击次数:179
实时计算Flink版支持通过DataStream作业的方式运行支持规则动态更新的Flink CEP作业。本文结合实时营销中的反作弊场景,为您介绍如何基于Flink全托管快速构建一个动态加载最新规则来处理上游Kafka数据的Flink CEP作业。实际演示中,我们会先启动Flink CEP作业,然后插入规则1:连续3条action为0的事件发生后,下一条事件的action仍非1,其业务含义为连续3次访问该产品后最后没有购买。在匹配到相应事件并进行处理后,我们会动态更新规则1内容为连续5条action为0或2的事件发生后,下一条事件的action仍非1,来应对流量整体增加的场景,同时插入一条规则2,它将和规则1的初始规则一样,用于辅助展示多规则支持等功能。当然,您也可以添加一个全新规则。前提条件如果您使用RAM用户或RAM角色等身份访问,需要确认已具有Flink控制台相关权限,详情请参见权限管理。已创建Flink工作空间,详情请参见开通实时计算Flink版。上下游存储已创建RDS MySQL实例,详情请参见创建RDS MySQL实例。已创建消息队列Kafka实例,详情请参见概述。仅实时计算引擎VVR 6.0.2及以上版本支持动态CEP功能。操作流程步骤一:准备测试数据准备上游Kafka Topic登录云消息队列 Kafka 版控制台。创建一个名称为demo_topic的Topic,存放模拟的用户行为日志。准备RDS数据库在DMS数据管理控制台上,准备RDS MySQL的测试数据。使用高权限账号登录RDS MySQL。创建rds_demo规则表,用来记录Flink CEP作业中需要应用的规则。步骤二:配置IP白名单为了让Flink能访问RDS MySQL实例,您需要将Flink全托管工作空间的网段添加到在RDS MySQL的白名单中。获取Flink全托管工作空间的VPC网段。登录实时计算控制台。在目标工作空间右侧操作列,选择更多 > 工作空间详情。在工作空间详情对话框,查看Flink全托管虚拟交换机的网段信息。在RDS MySQL的IP白名单中,添加Flink全托管网段信息。步骤三:开发并启动Flink CEP作业配置Maven项目中的pom.xml文件所使用的仓库。在作业的Maven POM文件中添加flink-cep作为项目依赖。开发作业代码。构建Kafka Source。 构建CEP.dynamicPatterns()。关于DataStream、TimeBehaviour、TypeInformation等Flink作业常见概念详情,请参见DataStream、TimeBehaviour和TypeInformation。这里重点介绍PatternProcessor接口,一个PatternProcessor包含一个确定的模式(Pattern)用于描述如何去匹配事件,以及一个PatternProcessFunction用于描述如何处理一个匹配(例如发送警报)。除此之外,还包含id与version等用于标识PatternProcessor的信息。因此一个PatternProcessor既包含规则本身,又指明了规则触发时,Flink作业应如何响应。更多背景请参见提案。在实时计算控制台上,上传JAR包并部署JAR作业,具体操作详情请参见部署作业。为了让您可以快速测试使用,您需要下载实时计算Flink版测试JAR包。部署时需要配置的参数填写说明如下表所示。配置项说明部署作业类型选择为JAR。部署模式选择为流模式。部署名称填写对应的JAR作业名称。引擎版本从VVR 3.0.3版本(对应Flink 1.12版本)开始,VVP支持同时运行多个不同引擎版本的JAR作业。如果您的作业已使用了Flink 1.12及更早版本的引擎,您需要按照以下情况进行处理:Flink 1.12版本:停止后启动作业,系统将自动将引擎升级为vvr-3.0.3-flink-1.12版本。Flink 1.11或Flink 1.10版本:手动将作业引擎版本升级到vvr-3.0.3-flink-1.12或vvr-4.0.8-flink-1.13版本后重启作业,否则会在启动作业时超时报错。JAR URL上传打包好的JAR包,或者直接上传我们提供的测试JAR包。Entry Point Class填写为com.alibaba.ververica.cep.demo.CepDemo。Entry Point Main Arguments如果您是自己开发的作业,已经配置了相关上下游存储的信息,则此处可以不填写。但是,如果您是使用的我们提供的测试JAR包,则需要配置该参数。代码信息如下。--kafkaBrokers YOUR_KAFKA_BROKERS
--inputTopic YOUR_KAFKA_TOPIC
--inputTopicGroup YOUR_KAFKA_TOPIC_GROUP
--jdbcUrl jdbc:mysql://YOUR_DB_URL:port/DATABASE_NAME?user=YOUR_USERNAME&password=YOUR_PASSWORD
--tableName YOUR_TABLE_NAME
--jdbcIntervalMs 3000tableName:目标表名称。jdbcIntervalMs:轮询数据库的时间间隔。在部署详情页签中的其他配置,添加如下作业运行参数。在作业运维页面,单击目标作业操作列下的启动。步骤四:插入规则启动Flink CEP作业,然后插入规则1:连续3条action为0的事件发生后,下一条事件的action仍非1,其业务含义为连续3次访问该产品后最后没有购买。使用普通账号登录RDS MySQL。插入动态更新规则。对应的CEP API描述如下。Pattern<Event, Event> pattern =
Pattern.<Event>begin("start", AfterMatchSkipStrategy.skipPastLastEvent())
.where(new StartCondition("action == 0"))
.timesOrMore(3)
.followedBy("end")
.where(new EndCondition());对应的JSON字符串如下。{
"name": "end",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"SINGLE"
],
"times": null,
"untilCondition": null
},
"condition": null,
"nodes": [
{
"name": "end",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"SINGLE"
],
"times": null,
"untilCondition": null
},
"condition": {
"className": "com.alibaba.ververica.cep.demo.condition.EndCondition",
"type": "CLASS"
},
"type": "ATOMIC"
},
{
"name": "start",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"LOOPING"
],
"times": {
"from": 3,
"to": 3,
"windowTime": null
},
"untilCondition": null
},
"condition": {
"expression": "action == 0",
"type": "AVIATOR"
},
"type": "ATOMIC"
}
],
"edges": [
{
"source": "start",
"target": "end",
"type": "SKIP_TILL_NEXT"
}
],
"window": null,
"afterMatchStrategy": {
"type": "SKIP_PAST_LAST_EVENT",
"patternName": null
},
"type": "COMPOSITE",
"version": 1
}通过Kafka Client向demo_topic中发送消息。查看JobManager日志中打印的最新规则和TaskManager日志中打印的匹配。步骤五:更新匹配规则,并查看更新的规则是否生效在匹配到相应事件并进行处理后,动态更新规则1内容为连续5条action为0或为2的事件发生后,下一条事件的action仍非1,来应对流量整体增加的场景,同时插入一条规则2,它将和规则1的初始规则一样,用于辅助展示多规则支持等功能。使用在RDS控制台上,更新匹配规则。使用普通账号登录RDS MySQL。将StartCondition中的action == 0修改为action == 0