Flink官方文档笔记09 Flink功能测试集合
文章目录
Flink Operations Playground
There are many ways to deploy and operate Apache Flink in various environments.
在各种环境中有许多部署和操作Apache Flink的方法。
Regardless of this variety, the fundamental building blocks of a Flink Cluster remain the same, and similar operational principles apply.
不管这种类型是什么,Flink集群的基本构建块都是相同的,并应用类似的操作原则。
In this playground, you will learn how to manage and run Flink Jobs. You will see how to deploy and monitor an application, experience how Flink recovers from Job failure, and perform everyday operational tasks like upgrades and rescaling.
在这里,您将学习如何管理和运行Flink作业。您将看到如何部署和监视应用程序,体验Flink如何从作业失败中恢复,以及如何执行升级和重新缩放等日常操作任务。
👍JobManager & TaskManager & Kafka & Zookeeper
This playground consists of a long living Flink Session Cluster and a Kafka Cluster.
这个游乐场由一个长时间存在的Flink会话集群和一个Kafka集群组成。
A Flink Cluster always consists of a JobManager and one or more Flink TaskManagers.
一个Flink集群总是包含:
- 一个
JobManager
- 一个或多个
TaskManager
The JobManager is responsible for handling Job submissions, the supervision of Jobs as well as resource management.
JobManager
负责管理作业提交
,作业运行监控
,作业资源管理
The Flink TaskManagers are the worker processes and are responsible for the execution of the actual Tasks which make up a Flink Job.
TaskManager
是工作者进程,并且负责组成Job的实际Task的执行
。
In this playground you will start with a single TaskManager, but scale out(扩展) to more TaskManagers later.
你将以一个TaskManager开始,扩展到之后的多个TaskManager
Additionally, this playground comes with a dedicated client container, which we use to submit the Flink Job initially and to perform various operational tasks later on.
此外,这个平台还带有一个专用的客户机容器,我们最初使用它提交Flink作业,随后使用它执行各种操作任务。
The client container is not needed by the Flink Cluster itself but only included for ease of use.
Flink集群本身并不需要客户机容器,只是为了便于使用而包含了客户机容器。
The Kafka Cluster consists of a Zookeeper server and a Kafka Broker.
Kafka集群由Zookeeper服务和Kafka Broker组成。
When the playground is started a Flink Job called Flink Event Count will be submitted to the JobManager. Additionally, two Kafka Topics input and output are created.
当游乐场启动时,一个名为Flink Event Count的Flink作业将提交给JobManager。另外,创建了两个Kafka主题输入和输出。
The Job consumes ClickEvents from the input topic, each with a timestamp and a page. The events are then keyed by page and counted in 15 second windows. The results are written to the output topic.
作业使用来自输入主题的ClickEvents,每个ClickEvents都有一个时间戳和一个页面。事件然后按页面键控并在15秒的窗口中计数。结果被写入输出主题。
There are six different pages and we generate 1000 click events per page and 15 seconds. Hence, the output of the Flink job should show 1000 views per page and window.
有6个不同的页面,每个页面在15秒内生成1000次点击事件。因此,Flink作业的输出应该显示每个页面和窗口1000个视图。
Starting the Playground
The playground environment is set up in just a few steps. We will walk you through the necessary commands and show how to validate that everything is running correctly.
操场的环境只需几步就能搭建起来。我们将向您介绍必要的命令,并展示如何验证一切都在正确运行。
We assume that you have Docker (1.12+) and docker-compose (2.1+) installed on your machine.
我们假设您的机器上安装了Docker(1.12+)和Docker -compose(2.1+)。
The required configuration files are available in the flink-playgrounds repository. Check it out and spin up the environment:
所需的配置文件可以在flink-playgrounds存储库中获得。看看吧,让环境变得更美好:
git clone --branch release-1.11 https://github.com/apache/flink-playgrounds.git
cd flink-playgrounds/operations-playground
docker-compose build
docker-compose up -d
Afterwards, you can inspect the running Docker containers with the following command:
然后,您可以使用以下命令检查正在运行的Docker容器:
docker-compose ps
Name Command State Ports
-----------------------------------------------------------------------------------------------------------------------------
operations-playground_clickevent-generator_1 /docker-entrypoint.sh java ... Up 6123/tcp, 8081/tcp
operations-playground_client_1 /docker-entrypoint.sh flin ... Exit 0
operations-playground_jobmanager_1 /docker-entrypoint.sh jobm ... Up 6123/tcp, 0.0.0.0:8081->8081/tcp
operations-playground_kafka_1 start-kafka.sh Up 0.0.0.0:9094->9094/tcp
operations-playground_taskmanager_1 /docker-entrypoint.sh task ... Up 6123/tcp, 8081/tcp
operations-playground_zookeeper_1 /bin/sh -c /usr/sbin/sshd ... Up 2181/tcp, 22/tcp, 2888/tcp, 3888/tcp
This indicates that the client container has successfully submitted the Flink Job (Exit 0) and all cluster components as well as the data generator are running (Up).
这表明客户机容器已经成功提交了Flink作业(退出0),并且所有集群组件以及数据生成器都在运行(启动)。
You can stop the playground environment by calling:
你可以通过以下方式停止playground environment:
docker-compose down -v
Entering the Playground
There are many things you can try and check out in this playground.
在这个操场上有很多你可以尝试和检查的东西。
In the following two sections we will show you how to interact with the Flink Cluster and demonstrate some of Flink’s key features.
在接下来的两部分中,我们将向您展示如何与Flink集群交互,并演示Flink的一些关键特性。
👍Flink WebUI http://localhost:8081
The most natural starting point to observe your Flink Cluster is the WebUI exposed under http://localhost:8081.
观察Flink集群最自然的起点是在http://localhost:8081下公开的WebUI。
If everything went well, you’ll see that the cluster initially consists of one TaskManager and executes a Job called Click Event Count.
如果一切顺利,您将看到集群最初由一个TaskManager组成,并执行名为Click Event Count的作业。
The Flink WebUI contains a lot of useful and interesting information about your Flink Cluster and its Jobs (JobGraph, Metrics, Checkpointing Statistics, TaskManager Status,…).
Flink WebUI包含许多关于Flink集群及其作业的有用和有趣的信息(作业图、指标、检查点统计、TaskManager状态……)。
Logs
- JobManager
The JobManager logs can be tailed via docker-compose.
docker-compose logs -f jobmanager
After the initial startup you should mainly see log messages for every checkpoint completion.
在初始启动之后,您应该主要看到每个检查点完成的日志消息。
- TaskManager
The TaskManager log can be tailed in the same way.
docker-compose logs -f taskmanager
After the initial startup you should mainly see log messages for every checkpoint completion.
在初始启动之后,您应该主要看到每个检查点完成的日志消息。
Flink CLI
The Flink CLI can be used from within the client container.
Flink CLI可以在客户端容器内使用。
For example, to print the help message of the Flink CLI you can run
例如,要打印您可以运行的Flink CLI的帮助消息
docker-compose run --no-deps client flink --help
Flink REST API
The Flink REST API is exposed via localhost:8081 on the host or via jobmanager:8081 from the client container, e.g. to list all currently running jobs, you can run:
Flink REST API通过主机上的localhost:8081或客户端容器上的jobmanager:8081公开,例如,要列出当前运行的所有作业,您可以运行:
curl localhost:8081/jobs
Kafka Topics
You can look at the records that are written to the Kafka Topics by running
//input topic (1000 records/s)
docker-compose exec kafka kafka-console-consumer.sh \
--bootstrap-server localhost:9092 --topic input
//output topic (24 records/min)
docker-compose exec kafka kafka-console-consumer.sh \
--bootstrap-server localhost:9092 --topic output
Time to Play!
Now that you learned how to interact with Flink and the Docker containers, let’s have a look at some common operational tasks that you can try out on our playground.
现在您已经学习了如何与Flink和Docker容器交互,接下来让我们看看一些常见的操作任务,您可以在我们的活动场所试用它们。
All of these tasks are independent of each other, i.e. you can perform them in any order. Most tasks can be executed via the CLI and the REST API.
所有这些任务都是相互独立的,也就是说,您可以以任何顺序执行它们。大多数任务都可以通过CLI和REST API执行。
Listing Running Jobs
- CLI
Command
docker-compose run --no-deps client flink list
Expected Output
Waiting for response...
------------------ Running/Restarting Jobs -------------------
16.07.2019 16:37:55 : <job-id> : Click Event Count (RUNNING)
--------------------------------------------------------------
No scheduled jobs.
The JobID is assigned to a Job upon submission and is needed to perform actions on the Job via the CLI or REST API.
JobID在提交时分配给作业,需要通过CLI或REST API对作业执行操作。
👍观察测试Job运行失败和恢复
Flink provides exactly-once processing guarantees under (partial) failure. In this playground you can observe and - to some extent - verify this behavior.
Flink在(部分)故障情况下提供了一次处理保证。在操场上,你可以观察并在某种程度上验证这种行为。
步骤1:观察输出
As described above, the events in this playground are generate such that each window contains exactly one thousand records.
如上所述,生成操场上的事件时,每个窗口恰好包含1000条记录。
So, in order to verify that Flink successfully recovers from a TaskManager failure without data loss or duplication you can tail the output topic and check that - after recovery - all windows are present and the count is correct.
因此,为了验证Flink是否成功地从TaskManager故障中恢复,而没有数据丢失或重复,您可以跟踪输出主题,并检查恢复后所有窗口是否都存在,计数是否正确。
For this, start reading from the output topic and leave this command running until after recovery (Step 3).
为此,请从output主题开始读取并运行此命令,直到恢复完成(步骤3)。
docker-compose exec kafka kafka-console-consumer.sh \
--bootstrap-server localhost:9092 --topic output
步骤2:引入错误
In order to simulate a partial failure you can kill a TaskManager.
为了模拟部分失败,可以杀死TaskManager。
In a production setup, this could correspond to a loss of the TaskManager process, the TaskManager machine or simply a transient exception being thrown from the framework or user code (e.g. due to the temporary unavailability of an external resource).
在生产环境中,这可能意味着TaskManager进程、TaskManager机器的丢失,或者只是框架或用户代码抛出的一个暂时异常(例如,由于外部资源的临时不可用)。
docker-compose kill taskmanager
After a few seconds, the JobManager will notice the loss of the TaskManager, cancel the affected Job, and immediately resubmit it for recovery.
几秒钟后,JobManager将注意到TaskManager的丢失,取消受影响的作业,并立即重新提交它进行恢复。
When the Job gets restarted, its tasks remain in the SCHEDULED state, which is indicated by the purple colored squares (see screenshot below).
当作业重新启动时,它的任务仍然处于计划状态,这由紫色的方块表示(参见下面的屏幕截图)。
Note: Even though the tasks of the job are in SCHEDULED state and not RUNNING yet, the overall status of a Job is shown as RUNNING.
注意:即使作业的任务处于计划状态且尚未运行,作业的总体状态仍显示为运行。
At this point, the tasks of the Job cannot move from the SCHEDULED state to RUNNING because there are no resources (TaskSlots provided by TaskManagers) to the run the tasks. Until a new TaskManager becomes available, the Job will go through a cycle of cancellations and resubmissions.
此时,作业的任务不能从计划状态移动到运行状态,因为没有用于运行任务的资源(taskmanager提供的task lots)。在新的TaskManager可用之前,作业将经历一个取消和重新提交的循环。
In the meantime, the data generator keeps pushing ClickEvents into the input topic. This is similar to a real production setup where data is produced while the Job to process it is down.
同时,数据生成器不断将ClickEvents推入输入主题。这类似于实际的生产设置,即在处理数据的作业关闭时生成数据。
步骤3:恢复
Once you restart the TaskManager, it reconnects to the JobManager.
重新启动TaskManager后,它将重新连接到JobManager。
docker-compose up -d taskmanager
When the JobManager is notified about the new TaskManager, it schedules the tasks of the recovering Job to the newly available TaskSlots. Upon restart, the tasks recover their state from the last successful checkpoint that was taken before the failure and switch to the RUNNING state.
当JobManager收到关于新TaskManager的通知时,它会将恢复作业的任务调度到新可用的TaskSlots。重新启动时,任务将从失败前取得的最后一个成功检查点恢复其状态,并切换到运行状态。
The Job will quickly process the full backlog of input events (accumulated during the outage) from Kafka and produce output at a much higher rate (> 24 records/minute) until it reaches the head of the stream.
该作业将快速处理来自Kafka的输入事件的全部积压(在停机期间累积),并以更高的速率(> 24条记录/分钟)生成输出,直到它到达流的前端。
In the output you will see that all keys (pages) are present for all time windows and that every count is exactly one thousand.
在输出中,您将看到所有的键(页)都在所有时间窗口中出现,而且每次计数都正好是1000。
Since we are using the FlinkKafkaProducer in its “at-least-once” mode, there is a chance that you will see some duplicate output records.
因为我们使用的是“至少一次”模式的FlinkKafkaProducer,所以您可能会看到一些重复的输出记录。
Note: Most production setups rely on a resource manager (Kubernetes, Yarn, Mesos) to automatically restart failed processes.
注意:大多数生产设置依赖于资源管理器(Kubernetes、Yarn、Mesos)来自动重启失败的进程。
👍升级和重新标定工作
Upgrading a Flink Job always involves two steps:
升级一个Flink作业通常包括两个步骤:
First, the Flink Job is gracefully stopped with a Savepoint. A Savepoint is a consistent snapshot of the complete application state at a well-defined, globally consistent point in time (similar to a checkpoint).
首先,使用保存点优雅地停止Flink作业。保存点是定义良好、全局一致的时间点上的完整应用程序状态的一致快照(类似于检查点)。
Second, the upgraded Flink Job is started from the Savepoint. In this context “upgrade” can mean different things including the following:
其次,从保存点启动已升级的Flink作业。在这种情况下,“升级”可以有不同的含义,包括:
- An upgrade to the configuration (incl. the parallelism of the Job)对配置的升级(包括作业的并行性)
- An upgrade to the topology of the Job (added/removed Operators)作业拓扑的升级(添加/删除操作符)
- An upgrade to the user-defined functions of the Job作业的UDF函数升级
Before starting with the upgrade you might want to start tailing the output topic, in order to observe that no data is lost or corrupted in the course the upgrade.
在开始升级之前,您可能希望跟踪输出主题,以便观察在升级过程中没有数据丢失或损坏。
docker-compose exec kafka kafka-console-consumer.sh \
--bootstrap-server localhost:9092 --topic output
步骤1:停止作业
To gracefully stop the Job, you need to use the “stop” command of either the CLI or the REST API. For this you will need the JobID of the Job, which you can obtain by listing all running Jobs or from the WebUI. With the JobID you can proceed to stopping the Job:
要优雅地停止作业,您需要使用CLI或REST API的“stop”命令。为此,您将需要作业的JobID,您可以通过列出所有正在运行的作业或从WebUI获取该JobID。与JobID你可以继续停止工作:
命令
docker-compose run --no-deps client flink stop <job-id>
预期的输出值
Suspending job "<job-id>" with a savepoint.
Suspended job "<job-id>" with a savepoint.
The Savepoint has been stored to the state.savepoint.dir
configured in the flink-conf.yaml
, which is mounted under /tmp/flink-savepoints-directory/
on your local machine. You will need the path to this Savepoint in the next step.
保存点已存储到 state.savepoint.dir
。这是你在flink-conf中配置的路径,它挂载在本地机器的/tmp/flink-savepoints-directory/
目录下。在下一步中需要这个保存点的路径。
In case of the REST API this path was already part of the response, you will need to have a look at the filesystem directly.
对于REST API,此路径已经是响应的一部分,您需要直接查看文件系统。
命令:
ls -lia /tmp/flink-savepoints-directory
期望的输出值:
total 0
17 drwxr-xr-x 3 root root 60 17 jul 17:05 .
2 drwxrwxrwt 135 root root 3420 17 jul 17:09 ..
1002 drwxr-xr-x 2 root root 140 17 jul 17:05 savepoint-<short-job-id>-<uuid>
步骤2a:重新启动作业,不做任何更改
You can now restart the upgraded Job from this Savepoint. For simplicity, you can start by restarting it without any changes.
现在可以从这个保存点重新启动已升级的作业。为了简单起见,您可以在不做任何更改的情况下重新启动它。
命令
docker-compose run --no-deps client flink run -s <savepoint-path> \
-d /opt/ClickCountJob.jar \
--bootstrap.servers kafka:9092 --checkpointing --event-time
期望的输出值
Starting execution of program
Job has been submitted with JobID <job-id>
Once the Job is RUNNING again, you will see in the output Topic that records are produced at a higher rate while the Job is processing the backlog accumulated during the outage. Additionally, you will see that no data was lost during the upgrade: all windows are present with a count of exactly one thousand.
步骤2b:用不同的并行性重新启动作业(重新缩放)
Alternatively, you could also rescale the Job from this Savepoint by passing a different parallelism during resubmission.
或者,您还可以通过在重新提交期间传递不同的并行性,从这个保存点重新执行作业。
命令
docker-compose run --no-deps client flink run -p 3 -s <savepoint-path> \
-d /opt/ClickCountJob.jar \
--bootstrap.servers kafka:9092 --checkpointing --event-time
期望的输出值
Starting execution of program
Job has been submitted with JobID <job-id>
Now, the Job has been resubmitted, but it will not start as there are not enough TaskSlots to execute it with the increased parallelism (2 available, 3 needed). With
现在,作业已经重新提交,但是由于并行度增加,没有足够的task lots来执行它(可用2个,需要3个),所以它不会启动。
docker-compose scale taskmanager=2
you can add a second TaskManager with two TaskSlots to the Flink Cluster, which will automatically register with the JobManager. Shortly after adding the TaskManager the Job should start running again.
您可以向Flink集群添加带有两个TaskSlots的第二个TaskManager,它将自动注册到JobManager。添加TaskManager后不久,作业应该再次开始运行。
Once the Job is “RUNNING” again, you will see in the output Topic that no data was lost during rescaling: all windows are present with a count of exactly one thousand.
一旦作业再次“运行”,您将在输出主题中看到,在重新缩放期间没有数据丢失:所有窗口的计数都正好是1000。
查询作业的指标
The JobManager exposes system and user metrics via its REST API.
JobManager通过其REST API公开系统和用户指标。
The endpoint depends on the scope of these metrics. Metrics scoped to a Job can be listed via jobs/<job-id>/metrics
. The actual value of a metric can be queried via the get query parameter.
端点取决于这些度量的范围。可以通过’ jobs/< Job id>/ Metrics '列出作用域为作业的指标。指标的实际值可以通过get查询参数查询。
请求:
curl "localhost:8081/jobs/<jod-id>/metrics?get=lastCheckpointSize"
期望的回复 (pretty-printed; no placeholders)
[
{
"id": "lastCheckpointSize",
"value": "9378"
}
]
The REST API can not only be used to query metrics, but you can also retrieve detailed information about the status of a running Job.
REST API不仅可以用于查询指标,还可以检索关于运行作业状态的详细信息。
请求:
# find the vertex-id of the vertex of interest
curl localhost:8081/jobs/<jod-id>
期望的回复 (pretty-printed)
{
"jid": "<job-id>",
"name": "Click Event Count",
"isStoppable": false,
"state": "RUNNING",
"start-time": 1564467066026,
"end-time": -1,
"duration": 374793,
"now": 1564467440819,
"timestamps": {
"CREATED": 1564467066026,
"FINISHED": 0,
"SUSPENDED": 0,
"FAILING": 0,
"CANCELLING": 0,
"CANCELED": 0,
"RECONCILING": 0,
"RUNNING": 1564467066126,
"FAILED": 0,
"RESTARTING": 0
},
"vertices": [
{
"id": "<vertex-id>",
"name": "ClickEvent Source",
"parallelism": 2,
"status": "RUNNING",
"start-time": 1564467066423,
"end-time": -1,
"duration": 374396,
"tasks": {
"CREATED": 0,
"FINISHED": 0,
"DEPLOYING": 0,
"RUNNING": 2,
"CANCELING": 0,
"FAILED": 0,
"CANCELED": 0,
"RECONCILING": 0,
"SCHEDULED": 0
},
"metrics": {
"read-bytes": 0,
"read-bytes-complete": true,
"write-bytes": 5033461,
"write-bytes-complete": true,
"read-records": 0,
"read-records-complete": true,
"write-records": 166351,
"write-records-complete": true
}
},
{
"id": "<vertex-id>",
"name": "Timestamps/Watermarks",
"parallelism": 2,
"status": "RUNNING",
"start-time": 1564467066441,
"end-time": -1,
"duration": 374378,
"tasks": {
"CREATED": 0,
"FINISHED": 0,
"DEPLOYING": 0,
"RUNNING": 2,
"CANCELING": 0,
"FAILED": 0,
"CANCELED": 0,
"RECONCILING": 0,
"SCHEDULED": 0
},
"metrics": {
"read-bytes": 5066280,
"read-bytes-complete": true,
"write-bytes": 5033496,
"write-bytes-complete": true,
"read-records": 166349,
"read-records-complete": true,
"write-records": 166349,
"write-records-complete": true
}
},
{
"id": "<vertex-id>",
"name": "ClickEvent Counter",
"parallelism": 2,
"status": "RUNNING",
"start-time": 1564467066469,
"end-time": -1,
"duration": 374350,
"tasks": {
"CREATED": 0,
"FINISHED": 0,
"DEPLOYING": 0,
"RUNNING": 2,
"CANCELING": 0,
"FAILED": 0,
"CANCELED": 0,
"RECONCILING": 0,
"SCHEDULED": 0
},
"metrics": {
"read-bytes": 5085332,
"read-bytes-complete": true,
"write-bytes": 316,
"write-bytes-complete": true,
"read-records": 166305,
"read-records-complete": true,
"write-records": 6,
"write-records-complete": true
}
},
{
"id": "<vertex-id>",
"name": "ClickEventStatistics Sink",
"parallelism": 2,
"status": "RUNNING",
"start-time": 1564467066476,
"end-time": -1,
"duration": 374343,
"tasks": {
"CREATED": 0,
"FINISHED": 0,
"DEPLOYING": 0,
"RUNNING": 2,
"CANCELING": 0,
"FAILED": 0,
"CANCELED": 0,
"RECONCILING": 0,
"SCHEDULED": 0
},
"metrics": {
"read-bytes": 20668,
"read-bytes-complete": true,
"write-bytes": 0,
"write-bytes-complete": true,
"read-records": 6,
"read-records-complete": true,
"write-records": 0,
"write-records-complete": true
}
}
],
"status-counts": {
"CREATED": 0,
"FINISHED": 0,
"DEPLOYING": 0,
"RUNNING": 4,
"CANCELING": 0,
"FAILED": 0,
"CANCELED": 0,
"RECONCILING": 0,
"SCHEDULED": 0
},
"plan": {
"jid": "<job-id>",
"name": "Click Event Count",
"nodes": [
{
"id": "<vertex-id>",
"parallelism": 2,
"operator": "",
"operator_strategy": "",
"description": "ClickEventStatistics Sink",
"inputs": [
{
"num": 0,
"id": "<vertex-id>",
"ship_strategy": "FORWARD",
"exchange": "pipelined_bounded"
}
],
"optimizer_properties": {}
},
{
"id": "<vertex-id>",
"parallelism": 2,
"operator": "",
"operator_strategy": "",
"description": "ClickEvent Counter",
"inputs": [
{
"num": 0,
"id": "<vertex-id>",
"ship_strategy": "HASH",
"exchange": "pipelined_bounded"
}
],
"optimizer_properties": {}
},
{
"id": "<vertex-id>",
"parallelism": 2,
"operator": "",
"operator_strategy": "",
"description": "Timestamps/Watermarks",
"inputs": [
{
"num": 0,
"id": "<vertex-id>",
"ship_strategy": "FORWARD",
"exchange": "pipelined_bounded"
}
],
"optimizer_properties": {}
},
{
"id": "<vertex-id>",
"parallelism": 2,
"operator": "",
"operator_strategy": "",
"description": "ClickEvent Source",
"optimizer_properties": {}
}
]
}
}
Please consult the REST API reference for a complete list of possible queries including how to query metrics of different scopes (e.g. TaskManager metrics);
请参考REST API参考以获得可能查询的完整列表,包括如何查询不同范围的指标(例如TaskManager指标);
Variants(变体)
You might have noticed that the Click Event Count application was always started with --checkpointing
and --event-time
program arguments.
您可能已经注意到,Click事件计数应用程序总是以 --checkpointing
和 --event-time
程序参数开始。
By omitting(省略) these in the command of the client container in the docker-compose.yaml
, you can change the behavior of the Job.
通过在客户端容器的docker-compose.yaml
配置文件中省略这些。你可以改变工作的行为。
--checkpointing
enables checkpoint, which is Flink’s fault-tolerance mechanism. If you run without it and go through failure and recovery, you should will see that data is actually lost.
--checkpointing
支持检查点,这是Flink的容错机制。如果不使用它运行,并执行失败和恢复,您应该会看到数据实际上丢失了。
--event-time
enables event time semantics for your Job. When disabled, the Job will assign events to windows based on the wall-clock time instead of the timestamp of the ClickEvent.
--event-time
为您的作业启用事件时间语义。禁用后,作业将根据挂钟时间(而不是ClickEvent的时间戳)将事件分配给windows。
Consequently, the number of events per window will not be exactly one thousand anymore.
因此,每个窗口的事件数将不再是1000个。
The Click Event Count application also has another option, turned off by default, that you can enable to explore the behavior of this job under backpressure.
单击事件计数应用程序还有另一个选项,默认情况下是关闭的,您可以启用该选项来探究该作业在背压下的行为。
You can add this option in the command of the client container in docker-compose.yaml.
您可以在客户机容器的docker-compose.yaml
中添加此选项。
--backpressure
adds an additional operator into the middle of the job that causes severe backpressure during even-numbered minutes (e.g., during 10:12, but not during 10:13).
This can be observed by inspecting various network metrics such as outputQueueLength and outPoolUsage, and/or by using the backpressure monitoring available in the WebUI.