Flink官方文档笔记17 有状态的流处理

Flink官方文档笔记17 有状态的流处理

文章目录

Stateful Stream Processing

What is State? 什么是状态?

While many operations in a dataflow simply look at one individual event at a time (for example an event parser), some operations remember information across multiple events (for example window operators). These operations are called stateful.
虽然数据流中的许多操作一次只查看单个事件(例如事件解析器),但有些操作记住跨多个事件的信息(例如窗口操作符)。这些操作称为有状态操作。

Some examples of stateful operations:
一些有状态操作的例子:

When an application searches for certain event patterns, the state will store the sequence of events encountered so far.
当应用程序搜索某些事件模式时,状态将存储到目前为止遇到的事件序列。

When aggregating events per minute/hour/day, the state holds the pending aggregates.
当按分钟/小时/天聚合事件时,状态保存待处理的聚合。

When training a machine learning model over a stream of data points, the state holds the current version of the model parameters.
当在数据点流上训练机器学习模型时,状态保存模型参数的当前版本。

When historic data needs to be managed, the state allows efficient access to events that occurred in the past.
当需要管理历史数据时,该状态允许对过去发生的事件进行有效访问。

Flink needs to be aware of the state in order to make it fault tolerant using checkpoints and savepoints.
Flink需要敏锐的感知状态,以便使用检查点和保存点使其具有容错能力。

Knowledge about the state also allows for rescaling Flink applications, meaning that Flink takes care of redistributing state across parallel instances.
有关状态的知识还允许对Flink应用程序进行调整,这意味着Flink负责跨并行实例重新分布状态。

Queryable state allows you to access state from outside of Flink during runtime.
Queryable状态允许您在运行时从Flink外部访问状态。

When working with state, it might also be useful to read about Flink’s state backends.
在使用状态时,阅读有关Flink的状态后端也可能很有用。

Flink provides different state backends that specify how and where state is stored.
Flink提供了不同的状态后端,用于指定状态存储的方式和位置。

Keyed State 键控状态

Keyed state is maintained in what can be thought of as an embedded key/value store.
键控状态可以被认为是在嵌入式键/值存储中维护的。

The state is partitioned and distributed strictly together with the streams that are read by the stateful operators.
状态是与有状态操作符读取的流一起严格分区和分发的。

Hence, access to the key/value state is only possible on keyed streams, i.e. after a keyed/partitioned data exchange, and is restricted to the values associated with the current event’s key.
因此,只能在键控流上访问键/值状态,也就是说,在键控/分区数据交换之后,只能访问与当前事件的键相关的值。

Aligning the keys of streams and state makes sure that all state updates are local operations, guaranteeing consistency without transaction overhead.
对齐流和状态的键可以确保所有的状态更新都是本地操作,从而保证没有事务开销的一致性。

This alignment also allows Flink to redistribute the state and adjust the stream partitioning transparently.
这种对齐还允许Flink透明地重新分配状态和调整流分区。

www.zeeklog.com  - Flink官方文档笔记17 有状态的流处理


Keyed State is further organized into so-called Key Groups. Key Groups are the atomic unit by which Flink can redistribute Keyed State;
键控状态被进一步组织为所谓的键组。键组是Flink重新分配键控状态的原子单位;

there are exactly as many Key Groups as the defined maximum parallelism.
键组的数量与定义的最大并行度完全相同。

During execution each parallel instance of a keyed operator works with the keys for one or more Key Groups.
在执行期间,键操作符的每个并行实例与一个或多个键组的键一起工作。

State Persistence 状态持久化

Flink implements fault tolerance using a combination of stream replay and checkpointing.
Flink结合使用流重放和检查点来实现容错。

A checkpoint marks a specific point in each of the input streams along with the corresponding state for each of the operators.
检查点标记每个输入流中的特定点,以及每个操作符的对应状态。

A streaming dataflow can be resumed from a checkpoint while maintaining consistency (exactly-once processing semantics) by restoring the state of the operators and replaying the records from the point of the checkpoint.
流数据流可以从检查点恢复,同时保持一致性(确切地说,一次处理语义),方法是恢复操作符的状态并从检查点重放记录。

The checkpoint interval is a means of trading off the overhead of fault tolerance during execution with the recovery time (the number of records that need to be replayed).
检查点间隔是在执行期间的容错开销与恢复时间(需要重放的记录数量)之间进行权衡的一种方法。

The fault tolerance mechanism continuously draws snapshots of the distributed streaming data flow.
容错机制连续绘制分布式流数据流的快照。

For streaming applications with small state, these snapshots are very light-weight and can be drawn frequently without much impact on performance.
对于状态较小的流媒体应用程序,这些快照非常轻量级,可以经常绘制,而不会对性能造成太大影响。

The state of the streaming applications is stored at a configurable place, usually in a distributed file system.
流应用程序的状态存储在可配置的位置,通常在分布式文件系统中。

In case of a program failure (due to machine-, network-, or software failure), Flink stops the distributed streaming dataflow.
在程序失败的情况下(由于机器、网络或软件故障),Flink停止分布式流数据流。

The system then restarts the operators and resets them to the latest successful checkpoint.
然后系统重新启动操作人员,并将其重置为最新成功的检查点。

The input streams are reset to the point of the state snapshot.
输入流被重置到状态快照的位置。

Any records that are processed as part of the restarted parallel dataflow are guaranteed to not have affected the previously checkpointed state.
作为重新启动的并行数据流的一部分处理的任何记录都保证不会影响以前的检查点状态。

Note By default, checkpointing is disabled.
注意:默认情况下,检查点是禁用的。

See  for details on how to enable and configure checkpointing.
有关如何启用和配置检查点的详细信息,请参阅检查点。

Note For this mechanism to realize its full guarantees, the data stream source (such as message queue or broker) needs to be able to rewind the stream to a defined recent point.
注意,对于这种机制,要实现其完全保证,数据流源(如消息队列或代理)需要能够将流倒回到已定义的最近点。

has this ability and Flink’s connector to Kafka exploits this.
Apache Kafka有这种能力,Flink与Kafka的连接器利用了这一点。

See  for more information about the guarantees provided by Flink’s connectors.
有关Flink的连接器提供的保证的更多信息,请参阅数据源和接收器的容错保证。

Note Because Flink’s checkpoints are realized through distributed snapshots, we use the words snapshot and checkpoint interchangeably.
注意,因为Flink的检查点是通过分布式快照实现的,所以快照和检查点这两个词可以互换使用。

Often we also use the term snapshot to mean either checkpoint or savepoint.
通常我们还使用术语快照来表示检查点或保存点。

Checkpointing 检查点

The central part of Flink’s fault tolerance mechanism is drawing consistent snapshots of the distributed data stream and operator state.
Flink的容错机制的核心部分是绘制分布式数据流和操作员状态的一致快照。

These snapshots act as consistent checkpoints to which the system can fall back in case of a failure.
这些快照充当一致的检查点,在出现故障时,系统可以退回到这些检查点。

Flink’s mechanism for drawing these snapshots is described in “”.
Flink绘制这些快照的机制在“分布式数据流的轻量级异步快照”中有描述。

It is inspired by the standard  for distributed snapshots and is specifically tailored to Flink’s execution model.
它的灵感来自于用于分布式快照的标准Chandy-Lamport算法,是专门为Flink的执行模型定制的。

Keep in mind that everything to do with checkpointing can be done asynchronously.
请记住,与检查点相关的所有操作都可以异步完成。

The checkpoint barriers don’t travel in lock step and operations can asynchronously snapshot their state.
检查点屏障不会在锁定步骤中移动,操作可以异步快照它们的状态。

Since Flink 1.11, checkpoints can be taken with or without alignment. In this section, we describe aligned checkpoints first.
由于在Flink 1.11中,检查点可以使用校准或不校准。在本节中,我们首先描述对齐的检查点。

Barriers

A core element in Flink’s distributed snapshotting are the stream barriers.
Flink分布式快照的核心元素是流屏障。

These barriers are injected into the data stream and flow with the records as part of the data stream.
这些屏障被注入数据流和作为数据流一部分的记录流。

Barriers never overtake records, they flow strictly in line.
障碍永远不会超过记录,它们严格按照顺序流动。

A barrier separates the records in the data stream into the set of records that goes into the current snapshot, and the records that go into the next snapshot.
一个屏障将数据流中的记录分隔成进入当前快照的记录集和进入下一个快照的记录集。

Each barrier carries the ID of the snapshot whose records it pushed in front of it.
每个屏障都携带将其记录推到前面的快照的ID。

Barriers do not interrupt the flow of the stream and are hence very lightweight.
障碍物不会中断流,因此是非常轻量级的。

Multiple barriers from different snapshots can be in the stream at the same time, which means that various snapshots may happen concurrently.
流中可能同时存在来自不同快照的多个障碍,这意味着各种快照可能并发发生。

www.zeeklog.com  - Flink官方文档笔记17 有状态的流处理


Stream barriers are injected into the parallel data flow at the stream sources.
流屏障被注入流源的并行数据流中。

The point where the barriers for snapshot n are injected (let’s call it Sn) is the position in the source stream up to which the snapshot covers the data.
快照n的屏障被注入的位置(我们称之为Sn)是在源流中快照覆盖数据之前的位置。

For example, in Apache Kafka, this position would be the last record’s offset in the partition. This position Sn is reported to the checkpoint coordinator (Flink’s JobManager).
例如,在Apache Kafka中,这个位置将是分区中最后一条记录的偏移量。将此位置Sn报告给检查点协调器(Flink的JobManager)。

The barriers then flow downstream.
屏障然后顺流而下。

When an intermediate operator has received a barrier for snapshot n from all of its input streams, it emits a barrier for snapshot n into all of its outgoing streams.
当一个中间操作符从它的所有输入流中接收到快照n的barrier时,它会在它的所有输出流中发出快照n的barrier

Once a sink operator (the end of a streaming DAG) has received the barrier n from all of its input streams, it acknowledges that snapshot n to the checkpoint coordinator.
一旦接收操作符(流DAG的末尾)从它的所有输入流接收到barrier n,它就向检查点协调器确认快照n。

After all sinks have acknowledged a snapshot, it is considered completed.
在所有的接收都确认了一个快照之后,就认为它已经完成。

Once snapshot n has been completed, the job will never again ask the source for records from before Sn, since at that point these records (and their descendant records) will have passed through the entire data flow topology.
快照n一旦完成,作业将不再向源请求来自Sn之前的记录,因为此时这些记录(及其后代记录)将会通过整个数据流拓扑。

www.zeeklog.com  - Flink官方文档笔记17 有状态的流处理

Operators that receive more than one input stream need to align the input streams on the snapshot barriers.
接收多个输入流的操作符需要在快照屏障上对齐输入流。

The figure above illustrates this:
上图说明了这一点:

As soon as the operator receives snapshot barrier n from an incoming stream, it cannot process any further records from that stream until it has received the barrier n from the other inputs as well.
一旦操作符从传入流接收到snapshot barrier n,它就不能处理该流的任何记录,直到它也从其他输入接收到barrier n。
Otherwise, it would mix records that belong to snapshot n and with records that belong to snapshot n+1.
否则,它将混合属于快照n的记录和属于快照n+1的记录。

Once the last stream has received barrier n, the operator emits all pending outgoing records, and then emits snapshot n barriers itself.
一旦最后一个流接收到barrier n,操作符就会发出所有挂起的传出记录,然后发出快照n barrier本身。

It snapshots the state and resumes processing records from all input streams, processing records from the input buffers before processing the records from the streams.
它对状态进行快照,并从所有输入流中恢复处理记录,在处理来自流的记录之前处理来自输入缓冲区的记录。

Finally, the operator writes the state asynchronously to the state backend.
最后,操作符将状态异步写入到状态后端。

Note that the alignment is needed for all operators with multiple inputs and for operators after a shuffle when they consume output streams of multiple upstream subtasks.
请注意,对于具有多个输入的所有操作符,以及在使用多个上游子任务的输出流后进行洗牌的操作符,都需要对齐。

Snapshotting Operator State

When operators contain any form of state, this state must be part of the snapshots as well.
当操作符包含任何形式的状态时,该状态也必须是快照的一部分。

Operators snapshot their state at the point in time when they have received all snapshot barriers from their input streams, and before emitting the barriers to their output streams.
操作符从输入流中接收到所有快照屏障,并在将屏障发送到输出流之前对其状态进行快照快照。

At that point, all updates to the state from records before the barriers have been made, and no updates that depend on records from after the barriers have been applied.
此时,所有来自壁垒之前的记录的状态更新都已经完成,而依赖于壁垒之后的记录的更新都已经完成。

Because the state of a snapshot may be large, it is stored in a configurable state backend.
因为快照的状态可能很大,所以它存储在一个可配置的状态后端。

By default, this is the JobManager’s memory, but for production use a distributed reliable storage should be configured (such as HDFS).
默认情况下,这是JobManager的内存,但是对于生产使用,应该配置分布式可靠存储(比如HDFS)。

After the state has been stored, the operator acknowledges the checkpoint, emits the snapshot barrier into the output streams, and proceeds.
存储状态之后,操作人员确认检查点,将快照屏障发送到输出流中,然后继续操作。

The resulting snapshot now contains:
现在产生的快照包含:
For each parallel stream data source, the offset/position in the stream when the snapshot was started
对于每个并行流数据源,表示快照启动时流中的偏移量/位置
For each operator, a pointer to the state that was stored as part of the snapshot
对于每个操作符,都有一个指向作为快照的一部分存储的状态的指针

www.zeeklog.com  - Flink官方文档笔记17 有状态的流处理

Recovery 恢复

Recovery under this mechanism is straightforward: Upon a failure, Flink selects the latest completed checkpoint k.
这种机制下的恢复非常简单:当出现故障时,Flink选择最新完成的检查点k。

The system then re-deploys the entire distributed dataflow, and gives each operator the state that was snapshotted as part of checkpoint k.
然后系统重新部署整个分布式数据流,并给每个操作人员状态快照作为检查点k的一部分。

The sources are set to start reading the stream from position Sk.
源被设置为从位置Sk开始读取流。

For example in Apache Kafka, that means telling the consumer to start fetching from offset Sk.
例如,在Apache Kafka中,这意味着告诉使用者开始从offset Sk进行抓取。

If state was snapshotted incrementally, the operators start with the state of the latest full snapshot and then apply a series of incremental snapshot updates to that state.
如果状态是增量快照,操作符从最新的完整快照的状态开始,然后对该状态应用一系列增量快照更新。

See  for more information.
有关更多信息,请参见重启策略。

Unaligned Checkpointing Unaligned 检查点

Starting with Flink 1.11, checkpointing can also be performed unaligned.
从Flink 1.11开始,检查点也可以不对齐地执行。

The basic idea is that checkpoints can overtake all in-flight data as long as the in-flight data becomes part of the operator state.
基本思想是,只要飞行中的数据成为操作符状态的一部分,检查点就可以超越所有飞行中的数据。

Note that this approach is actually closer to the  , but Flink still inserts the barrier in the sources to avoid overloading the checkpoint coordinator.
注意,这种方法实际上更接近于Chandy-Lamport算法,但是Flink仍然在源中插入屏障,以避免检查点协调器过载。

Unaligned checkpointing

The figure depicts how an operator handles unaligned checkpoint barriers:
该图描述了操作员如何处理未对齐的检查点壁垒:

The operator reacts on the first barrier that is stored in its input buffers.
操作符对存储在其输入缓冲区中的第一个barrier作出反应。

It immediately forwards the barrier to the downstream operator by adding it to the end of the output buffers.
它通过将barrier添加到输出缓冲区的末尾,立即将它转发给下游操作符。

The operator marks all overtaken records to be stored asynchronously and creates a snapshot of its own state.
操作符将所有超过的记录标记为异步存储,并创建自己状态的快照。

Consequently, the operator only briefly stops the processing of input to mark the buffers, forwards the barrier, and creates the snapshot of the other state.
因此,操作符只短暂地停止对输入的处理,以标记缓冲区,转发barrier,并创建另一个状态的快照。

Unaligned checkpointing ensures that barriers are arriving at the sink as fast as possible.
未对齐检查点确保屏障以尽可能快的速度到达接收器。

It’s especially suited for applications with at least one slow moving data path, where alignment times can reach hours.
它特别适合至少有一个缓慢移动数据路径的应用程序,其中对齐时间可以达到数小时。

However, since it’s adding additional I/O pressure, it doesn’t help when the I/O to the state backends is the bottleneck.
但是,由于它增加了额外的I/O压力,所以当状态后端的I/O成为瓶颈时,它就不起作用了。

See the more in-depth discussion in  for other limitations.
有关其他限制,请参阅ops中更深入的讨论。

Note that savepoints will always be aligned.
注意,保存点总是对齐的。

Unaligned Recovery Unaligned恢复

Operators first recover the in-flight data before starting processing any data from upstream operators in unaligned checkpointing.
在开始处理来自上游操作符的任何数据之前,操作符首先在未对齐检查点中恢复飞行中的数据。

Aside from that, it performs the same steps as during
除此之外,它执行的步骤与恢复已对齐的检查点时相同。

State Backends 状态后端

The exact data structures in which the key/values indexes are stored depends on the chosen state backend.
存储键/值索引的确切数据结构取决于所选择的状态后端。

One state backend stores data in an in-memory hash map, another state backend uses RocksDB as the key/value store.
一个状态后端将数据存储在内存中的散列映射中,另一个状态后端使用RocksDB作为键/值存储。

In addition to defining the data structure that holds the state, the state backends also implement the logic to take a point-in-time snapshot of the key/value state and store that snapshot as part of a checkpoint.
除了定义保存状态的数据结构外,状态后端还实现了获取键/值状态的时间点快照并将该快照存储为检查点的一部分的逻辑。

State backends can be configured without changing your application logic.
可以在不更改应用程序逻辑的情况下配置状态后端。

www.zeeklog.com  - Flink官方文档笔记17 有状态的流处理

Savepoints 保存点

All programs that use checkpointing can resume execution from a savepoint.
所有使用检查点的程序都可以从保存点恢复执行。

Savepoints allow both updating your programs and your Flink cluster without losing any state.
保存点允许在不丢失任何状态的情况下更新程序和Flink集群。

are manually triggered checkpoints, which take a snapshot of the program and write it out to a state backend.
保存点是手动触发的检查点,它获取程序的快照并将其写入到状态后端。

They rely on the regular checkpointing mechanism for this.
它们依赖于常规的检查点机制。

Savepoints are similar to checkpoints except that they are triggered by the user and don’t automatically expire when newer checkpoints are completed.
保存点与检查点类似,除了它们是由用户触发的,并且不会在完成新的检查点时自动过期。

Exactly Once vs. At Least Once 正好一次vs.至少一次

The alignment step may add latency to the streaming program.
对齐步骤可能会增加流程序的延迟。

Usually, this extra latency is on the order of a few milliseconds, but we have seen cases where the latency of some outliers increased noticeably.
通常,这个额外的延迟大约是几毫秒,但是我们也看到一些异常值的延迟显著增加。

For applications that require consistently super low latencies (few milliseconds) for all records, Flink has a switch to skip the stream alignment during a checkpoint.
对于要求所有记录始终保持超低延迟(几毫秒)的应用程序,Flink有一个开关来跳过检查点期间的流对齐。

Checkpoint snapshots are still drawn as soon as an operator has seen the checkpoint barrier from each input.
当操作人员从每个输入中看到检查点屏障时,仍然会绘制检查点快照。

When the alignment is skipped, an operator keeps processing all inputs, even after some checkpoint barriers for checkpoint n arrived.
当跳过对齐时,操作符将继续处理所有输入,即使在到达检查点n的一些检查点屏障之后也是如此。

That way, the operator also processes elements that belong to checkpoint n+1 before the state snapshot for checkpoint n was taken.
这样,操作符还可以在获取检查点n的状态快照之前处理属于检查点n+1的元素。

On a restore, these records will occur as duplicates, because they are both included in the state snapshot of checkpoint n, and will be replayed as part of the data after checkpoint n.
在恢复时,这些记录将作为副本出现,因为它们都包含在检查点n的状态快照中,并且在检查点n之后将作为数据的一部分重新播放。

Note Alignment happens only for operators with multiple predecessors (joins) as well as operators with multiple senders (after a stream repartitioning/shuffle).
注意,对齐只发生在具有多个前身(连接)的操作符以及具有多个发送者的操作符(在流重新分区/洗牌之后)。

Because of that, dataflows with only embarrassingly parallel streaming operations (map(), flatMap(), filter(), …) actually give exactly once guarantees even in at least once mode.
正因为如此,只有令人尴尬的并行流操作(map()、flatMap()、filter(),…)的数据流实际上只提供一次保证,即使在至少一次模式下也是如此。

State and Fault Tolerance in Batch Programs 批处理程序中的状态和容错

Flink executes  as a special case of streaming programs, where the streams are bounded (finite number of elements).
作为流程序的一种特殊情况,Flink执行批处理程序,其中流是有界的(元素的有限数量)。

A DataSet is treated internally as a stream of data.
数据集在内部被视为数据流。

The concepts above thus apply to batch programs in the same way as well as they apply to streaming programs, with minor exceptions:
因此,上述概念同样适用于批处理程序,也适用于流媒体程序,只有少数例外:

does not use checkpointing.
批处理程序的容错不使用检查点。
Recovery happens by fully replaying the streams. That is possible, because inputs are bounded.
恢复是通过完全重放流来实现的。这是可能的,因为输入是有界的。
This pushes the cost more towards the recovery, but makes the regular processing cheaper, because it avoids checkpoints.
这推动了更多的成本恢复,但使常规处理更便宜,因为它避免了检查点。

Stateful operations in the DataSet API use simplified in-memory/out-of-core data structures, rather than key/value indexes.
数据集API中的状态操作使用简化的内存中/核外数据结构,而不是键/值索引。

The DataSet API introduces special synchronized (superstep-based) iterations, which are only possible on bounded streams. For details, check out the
DataSet API引入了特殊的同步(基于超步的)迭代,这只能在有界流上实现。有关详细信息,请查看迭代文档。