Flink官方文档笔记13 流的分析的基本介绍

Flink官方文档笔记13 流的分析的基本介绍

文章目录

Streaming Analytics

Event Time and Watermarks 事件时间和水印机制

Introduction
介绍

Flink explicitly supports three different notions of time:
Flink明确支持三种不同的时间概念:

event time: the time when an event occurred, as recorded by the device producing (or storing) the event
事件发生的时间,由产生(或存储)该事件的设备所记录

ingestion time: a timestamp recorded by Flink at the moment it ingests the event
Flink记录的事件发生时的时间戳

processing time: the time when a specific operator in your pipeline is processing the event
管道中的特定操作符处理事件的时间

For reproducible results, e.g., when computing the maximum price a stock reached during the first hour of trading on a given day, you should use event time.
对于可重现的结果,例如,当计算某一天的股票在第一个小时的交易中达到的最高价格时,你应该使用事件时间。

In this way the result won’t depend on when the calculation is performed.
通过这种方式,结果将不依赖于执行计算的时间。

This kind of real-time application is sometimes performed using processing time, but then the results are determined by the events that happen to be processed during that hour, rather than the events that occurred then.
这种实时应用程序有时使用处理时间来执行,但随后的结果由该小时内碰巧被处理的事件决定,而不是当时发生的事件。

Computing analytics based on processing time causes inconsistencies, and makes it difficult to re-analyze historic data or test new implementations.
基于处理时间的计算分析会导致不一致,并使重新分析历史数据或测试新实现变得困难。

Working with Event Time 用Event Time解决问题

By default, Flink will use processing time. To change this, you can set the Time Characteristic:
默认情况下,Flink将使用processing time。要改变这一点,你可以设置时间特性:

final StreamExecutionEnvironment env =
    StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

If you want to use event time, you will also need to supply a Timestamp Extractor and Watermark Generator that Flink will use to track the progress of event time.

如果希望使用事件时间,还需要提供一个时间戳提取器水印生成器,Flink将使用它们来跟踪事件时间的进度。

This will be covered in the section below on Working with Watermarks, but first we should explain what watermarks are.
这将在下面的章节中涉及到水印,但是首先我们应该解释什么是水印。

Watermarks 什么是Flink的Watermark水印机制?

Let’s work through a simple example that will show why watermarks are needed, and how they work.
让我们通过一个简单的例子来说明为什么需要水印,以及它们是如何工作的。

In this example you have a stream of timestamped events that arrive somewhat out of order, as shown below.
在本例中,您拥有一个带有时间戳的事件流,这些事件的到达顺序有些混乱,如下所示。

The numbers shown are timestamps that indicate when these events actually occurred.
显示的数字是时间戳,指示这些事件实际发生的时间。

The first event to arrive happened at time 4, and it is followed by an event that happened earlier, at time 2, and so on:
第一个到达的事件发生在时间4,随后发生的事件发生在时间2,以此类推:

··· 23 19 22 24 21 14 17 13 12 15 9 11 7 2 4 →

Now imagine that you are trying create a stream sorter.
现在假设您正在尝试创建一个流排序器。

This is meant to be an application that processes each event from a stream as it arrives, and emits a new stream containing the same events, but ordered by their timestamps.
这意味着应用程序在到达时处理来自流的每个事件,并发出包含相同事件的新流,但按事件的时间戳排序。

Some observations:
一些观察:

(1) The first element your stream sorter sees is the 4, but you can’t just immediately release it as the first element of the sorted stream. It may have arrived out of order, and an earlier event might yet arrive. In fact, you have the benefit of some god-like knowledge of this stream’s future, and you can see that your stream sorter should wait at least until the 2 arrives before producing any results.
流排序器看到的第一个元素是4,但不能立即将它作为排序后的流的第一个元素释放。它可能是无序地到达的,更早的事件也可能到来。事实上,您对这个流的未来有一些类似上帝般的了解,并且您可以看到您的流排序器至少应该等到2到达后才产生任何结果。

Some buffering, and some delay, is necessary.
一些缓冲和一些延迟是必要的。

(2) If you do this wrong, you could end up waiting forever. First the sorter saw an event from time 4, and then an event from time 2. Will an event with a timestamp less than 2 ever arrive? Maybe. Maybe not. You could wait forever and never see a 1.
如果你做错了,你可能会永远等下去。首先分类者看到时间4中的一个事件,然后是时间2中的一个事件。时间戳小于2的事件会到达吗?也许吧。也许不是。你可以一直等,永远看不到1。

Eventually you have to be courageous and emit the 2 as the start of the sorted stream.
最终,您必须勇敢地发出2作为排序流的开始。

(3) What you need then is some sort of policy that defines when, for any given timestamped event, to stop waiting for the arrival of earlier events.
然后,您需要某种策略来定义对于任何给定的带有时间戳的事件,何时停止等待更早的事件的到来。

This is precisely what watermarks do — they define when to stop waiting for earlier events.
这正是水印的作用——它们定义何时停止等待更早的事件

Event time processing in Flink depends on watermark generators that insert special timestamped elements into the stream, called watermarks.
Flink中的事件时间处理依赖于水印生成器,该生成器将特殊的经过时间戳的元素插入流中,称为水印。

A watermark for time t is an assertion that the stream is (probably) now complete up through time t.
时间t的水印是一个断言,即流现在(可能)在时间t之前已经完成。

问:When should this stream sorter stop waiting, and push out the 2 to start the sorted stream?
这个流排序器何时应该停止等待,并推出2来开始排序的流?

答:When a watermark arrives with a timestamp of 2, or greater.
当时间戳为2或更大的水印到达时。

(4) You might imagine different policies for deciding how to generate watermarks.
您可以设想用于决定如何生成水印的不同策略。

Each event arrives after some delay, and these delays vary, so some events are delayed more than others.
每个事件都经过了一些延迟后到达,而这些延迟是不同的,因此有些事件比其他事件延迟得更久。

One simple approach is to assume that these delays are bounded by some maximum delay.
一种简单的方法是假设这些延迟被一些最大延迟限制。

Flink refers to this strategy as bounded-out-of-orderness watermarking.
Flink将这种策略称为边界外的水印。

It is easy to imagine more complex approaches to watermarking, but for most applications a fixed delay works well enough.
很容易想到更复杂的水印方法,但对于大多数应用来说,固定的延迟就足够了。

Latency vs. Completeness 延迟和完整性

Another way to think about watermarks is that they give you, the developer of a streaming application, control over the tradeoff between latency and completeness.
另一种考虑水印的方式是,作为流媒体应用程序的开发人员,它可以控制延迟和完整性之间的权衡。

Unlike in batch processing, where one has the luxury of being able to have complete knowledge of the input before producing any results, with streaming you must eventually stop waiting to see more of the input, and produce some sort of result.
与批处理不同的是,在批处理中,您可以在生成任何结果之前对输入有完整的了解,而在流处理中,您最终必须停止等待更多的输入,并生成某种结果。

You can either configure your watermarking aggressively, with a short bounded delay, and thereby take the risk of producing results with rather incomplete knowledge of the input – i.e., a possibly wrong result, produced quickly.
您可以配置您的水印,具有短的有界延迟,从而冒产生结果的风险,相当不完全的知识的输入-即。,一个可能错误的结果,迅速产生。

Or you can wait longer, and produce results that take advantage of having more complete knowledge of the input stream(s).
或者您可以等待更长的时间,并利用对输入流有更完整的了解的优势来生成结果。

It is also possible to implement hybrid solutions that produce initial results quickly, and then supply updates to those results as additional (late) data is processed.
还可以实现快速生成初始结果的混合解决方案,然后在处理附加(后期)数据时对这些结果提供更新。

This is a good approach for some applications.
对于某些应用程序来说,这是一种很好的方法。

Lateness 迟到机制

Lateness is defined relative to the watermarks.
迟到机制是相对于水印定义的。

A Watermark(t) asserts that the stream is complete up through time t; any event following this watermark whose timestamp is ≤ t is late.
水印(t)断言流在时间t之前是完整的;时间戳≤t的水印后的任何事件都是迟到的。

Working with Watermarks

In order to perform event-time-based event processing, Flink needs to know the time associated with each event, and it also needs the stream to include watermarks.
为了执行基于事件时间的事件处理,Flink需要知道与每个事件相关联的时间,它还需要流包含水印。

The Taxi data sources used in the hands-on exercises take care of these details for you.
在实际操作中使用的出租车数据源会为您处理这些细节。

But in your own applications you will have to take care of this yourself, which is usually done by implementing a class that extracts the timestamps from the events, and generates watermarks on demand.
但是在您自己的应用程序中,您必须自己处理这个问题,这通常是通过实现一个类来完成的,这个类从事件中提取时间戳,并根据需要生成水印。

The easiest way to do this is by using a WatermarkStrategy:
最简单的方法就是使用水印策略:

DataStream<Event> stream = ...

WatermarkStrategy<Event> strategy = WatermarkStrategy
        .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(20))
        .withTimestampAssigner((event, timestamp) -> event.timestamp);

DataStream<Event> withTimestampsAndWatermarks =
    stream.assignTimestampsAndWatermarks(strategy);

Windows 窗口机制

Flink features very expressive window semantics.
Flink具有非常富有表现力的窗口语法。

In this section you will learn:
在本节中,您将了解:

  • how windows are used to compute aggregates on unbounded streams,
    如何使用窗口计算无界流上的聚合,
  • which types of windows Flink supports, and
    Flink支持哪些类型的窗口
  • how to implement a DataStream program with a windowed aggregation
    如何实现一个数据流程序与窗口聚合

Introduction 介绍

It is natural when doing stream processing to want to compute aggregated analytics on bounded subsets of the streams in order to answer questions like these:
在做流处理时,想要对流的有界子集进行聚合分析来回答以下问题是很自然的:

  • number of page views per minute每分钟的页面浏览量
  • number of sessions per user per week每个用户每周的会话数
  • maximum temperature per sensor per minute每分钟每传感器的最高温度

Computing windowed analytics with Flink depends on two principal abstractions: Window Assigners that assign events to windows (creating new window objects as necessary), and Window Functions that are applied to the events assigned to a window.
使用Flink计算窗口分析依赖于两个主要的抽象:将事件分配给窗口的窗口分配者(根据需要创建新的窗口对象),以及应用于分配给窗口的事件的窗口函数。

Flink’s windowing API also has notions of Triggers, which determine when to call the window function, and Evictors, which can remove elements collected in a window.
Flink的窗口API还具有触发器和驱逐器的概念,触发器决定何时调用窗口函数,驱逐器可以删除在窗口中收集的元素。

In its basic form, you apply windowing to a keyed stream like this:
在它的基本形式,你应用窗口一个键控流像这样:

stream.
    .keyBy(<key selector>)
    .window(<window assigner>)
    .reduce|aggregate|process(<window function>)

You can also use windowing with non-keyed streams, but keep in mind that in this case, the processing will not be done in parallel:
你也可以使用非键控流的窗口,但是要记住,在这种情况下,处理不会并行完成:

stream.
    .windowAll(<window assigner>)
    .reduce|aggregate|process(<window function>)

Window Assigners 窗口分配器

Flink has several built-in types of window assigners, which are illustrated below:
Flink有几个内置类型的窗口分配器,如下所示:

www.zeeklog.com  - Flink官方文档笔记13 流的分析的基本介绍


Some examples of what these window assigners might be used for, and how to specify them:
一些例子,这些窗口分配可能被使用,以及如何指定他们:

Tumbling time windows

 page views per minute
 TumblingEventTimeWindows.of(Time.minutes(1))

Sliding time windows

 page views per minute computed every 10 seconds
 SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(10))

Session windows

 page views per session, where sessions are defined by a gap of at least 30 minutes between sessions
 EventTimeSessionWindows.withGap(Time.minutes(30))

Durations can be specified using one of Time.milliseconds(n), Time.seconds(n), Time.minutes(n), Time.hours(n), and Time.days(n).
可以使用以上其中之一指定持续时间

The time-based window assigners (including session windows) come in both event time and processing time flavors.
基于时间的窗口分配器(包括会话窗口)有事件时间和处理时间两种形式。

There are significant tradeoffs between these two types of time windows.
在这两种类型的时间窗之间要有重要的权衡。

With processing time windowing you have to accept these limitations:
使用processing time 窗口,你必须接受这些限制:

  • can not correctly process historic data,不能正确处理历史数据,
  • can not correctly handle out-of-order data,不能正确处理乱序数据,
  • results will be non-deterministic,结果是不确定的,

but with the advantage of lower latency.
但具有较低的延迟优势。

When working with count-based windows, keep in mind that these windows will not fire until a batch is complete.
当使用基于计数的窗口时,请记住,这些窗口只有在批处理完成后才会触发。

There’s no option to time-out and process a partial window, though you could implement that behavior yourself with a custom Trigger.
虽然您可以使用自定义触发器自己实现该行为,但没有选项来超时和处理部分窗口。

A global window assigner assigns every event (with the same key) to the same global window.
全局窗口分配者将每个事件(具有相同的键)分配到同一个全局窗口。

This is only useful if you are going to do your own custom windowing, with a custom Trigger.
这只在你打算使用自定义触发器来自定义窗口时有用。

In many cases where this might seem useful you will be better off using a ProcessFunction as described in .
在许多可能有用的情况下,您最好使用另一节中描述的ProcessFunction。

Window Functions 窗口函数

You have three basic options for how to process the contents of your windows:
你有三个基本选项来处理你的windows内容:

  1. as a batch, using a ProcessWindowFunction that will be passed an Iterable with the window’s contents;
    作为批处理,使用ProcessWindowFunction,将传递一个窗口内容的迭代;
  2. incrementally, with a ReduceFunction or an AggregateFunction that is called as each event is assigned to the window;
    递增地,使用ReduceFunction或AggregateFunction,在每个事件被分配到窗口时调用;
  3. or with a combination of the two, wherein the pre-aggregated results of a ReduceFunction or an AggregateFunction are supplied to a ProcessWindowFunction when the window is triggered.
    或两者的组合,其中还原函数或AggregateFunction的预聚合结果在窗口被触发时提供给ProcessWindowFunction。

Here are examples of approaches 1 and 3. Each implementation finds the peak value from each sensor in 1 minute event time windows, and producing a stream of Tuples containing (key, end-of-window-timestamp, max_value).
下面是方法1和3的示例。每个实现在1分钟的事件时间窗口中从每个传感器找到峰值,并生成包含(键、窗口结束时间戳、max_value)的元组流。

ProcessWindowFunction Example

DataStream<SensorReading> input = ...

input
    .keyBy(x -> x.key)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .process(new MyWastefulMax());

public static class MyWastefulMax extends ProcessWindowFunction<
        SensorReading,                  // input type
        Tuple3<String, Long, Integer>,  // output type
        String,                         // key type
        TimeWindow> {                   // window type
    
    @Override
    public void process(
            String key,
            Context context, 
            Iterable<SensorReading> events,
            Collector<Tuple3<String, Long, Integer>> out) {

        int max = 0;
        for (SensorReading event : events) {
            max = Math.max(event.value, max);
        }
        out.collect(Tuple3.of(key, context.window().getEnd(), max));
    }
}

A couple of things to note in this implementation:
在这个实现中需要注意几件事:

  • All of the events assigned to the window have to be buffered in keyed Flink state until the window is triggered. This is potentially quite expensive.
    所有分配给窗口的事件都必须在键控的Flink状态下缓冲,直到窗口被触发。这可能相当昂贵。
  • Our ProcessWindowFunction is being passed a Context object from which contains information about the window. Its interface looks like this:
    ProcessWindowFunction被传递了一个上下文对象,该对象包含关于窗口的信息。它的界面是这样的:
public abstract class Context implements java.io.Serializable {
    public abstract W window();
    
    public abstract long currentProcessingTime();
    public abstract long currentWatermark();

    public abstract KeyedStateStore windowState();
    public abstract KeyedStateStore globalState();
}

windowState and globalState are places where you can store per-key, per-window, or global per-key information for all windows of that key.
windowState和globalState是您可以存储该键的所有窗口的每个键、每个窗口或全局的每个键信息的地方。

This might be useful, for example, if you want to record something about the current window and use that when processing a subsequent window.
这可能很有用,例如,如果您想要记录当前窗口的某些内容,并在处理后续窗口时使用它。

Incremental Aggregation Example增量聚合的例子

DataStream<SensorReading> input = ...

input
    .keyBy(x -> x.key)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .reduce(new MyReducingMax(), new MyWindowFunction());

private static class MyReducingMax implements ReduceFunction<SensorReading> {
    public SensorReading reduce(SensorReading r1, SensorReading r2) {
        return r1.value() > r2.value() ? r1 : r2;
    }
}

private static class MyWindowFunction extends ProcessWindowFunction<
    SensorReading, Tuple3<String, Long, SensorReading>, String, TimeWindow> {

    @Override
    public void process(
            String key,
            Context context,
            Iterable<SensorReading> maxReading,
            Collector<Tuple3<String, Long, SensorReading>> out) {

        SensorReading max = maxReading.iterator().next();
        out.collect(Tuple3.of(key, context.window().getEnd(), max));
    }
}

Notice that the Iterable<SensorReading> will contain exactly one reading – the pre-aggregated maximum computed by MyReducingMax.
注意,“Iterable”将只包含一个读数——MyReducingMax计算的预聚合的最大读数。

Late Events

By default, when using event time windows, late events are dropped.
默认情况下,在使用event time窗口时,迟到的事件将被删除。

There are two optional parts of the window API that give you more control over this.
窗口API中有两个可选部分可以让您更好地控制这一点。

You can arrange for the events that would be dropped to be collected to an alternate output stream instead, using a mechanism called .
您可以使用称为边输出的机制安排将被删除的事件收集到备用输出流。

Here is an example of what that might look like:
下面是一个例子:

OutputTag<Event> lateTag = new OutputTag<Event>("late"){};

SingleOutputStreamOperator<Event> result = stream.
    .keyBy(...)
    .window(...)
    .sideOutputLateData(lateTag)
    .process(...);
  
DataStream<Event> lateStream = result.getSideOutput(lateTag);

You can also specify an interval of allowed lateness during which the late events will continue to be assigned to the appropriate window(s) (whose state will have been retained).
您还可以指定允许的延迟时间间隔,在此期间,延迟事件将继续分配给适当的窗口(其状态将被保留)。

By default each late event will cause the window function to be called again (sometimes called a late firing).
默认情况下,每个延迟事件将导致窗口函数再次被调用(有时称为延迟触发)。

By default the allowed lateness is 0. In other words, elements behind the watermark are dropped (or sent to the side output).
默认情况下允许的迟到为0。换句话说,水印后面的元素被丢弃(或发送到侧输出)。

For example:
例如:

stream.
    .keyBy(...)
    .window(...)
    .allowedLateness(Time.seconds(10))
    .process(...);

When the allowed lateness is greater than zero, only those events that are so late that they would be dropped are sent to the side output (if it has been configured).
当允许延迟大于0时,只有那些延迟到会被删除的事件才会被发送到副输出(如果已经配置)。

Surprises

Some aspects of Flink’s windowing API may not behave in the way you would expect.
Flink的窗口API的某些方面可能不像您期望的那样运行。

Based on frequently asked questions on  and elsewhere, here are some facts about windows that may surprise you.
基于硬用户邮件列表和其他地方的常见问题,这里有一些关于windows的事实,可能会让您大吃一惊。

Sliding Windows Make Copies滑动窗口复制

Sliding window assigners can create lots of window objects, and will copy each event into every relevant window.
滑动窗口分配者可以创建许多窗口对象,并将每个事件复制到每个相关的窗口中。

For example, if you have sliding windows every 15 minutes that are 24-hours in length, each event will be copied into 4 * 24 = 96 windows.
例如,如果您每15分钟有一个24小时的滑动窗口,那么每个事件将被复制到4 * 24 = 96个窗口中。

Time Windows are Aligned to the Epoch 与时俱进的时间窗口

Just because you are using hour-long processing-time windows and start your application running at 12:05 does not mean that the first window will close at 1:05.
仅仅因为您使用了一个小时的处理时间窗口并在12:05开始运行您的应用程序,并不意味着第一个窗口将在1:05关闭。

The first window will be 55 minutes long and close at 1:00.
第一个窗口是55分钟,在1点关闭。

Note, however, that the tumbling and sliding window assigners take an optional offset parameter that can be used to change the alignment of the windows.
但是请注意,滚动和滑动窗口分配器有一个可选的偏移参数,可以用来改变窗口的对齐方式。

See Tumbling Windows and Sliding Windows for details.
详情请参阅翻滚的窗户和滑动的窗户。

Windows Can Follow Windows 窗口可以跟随窗口

For example, it works to do this:
例如,它可以这样做:

stream
    .keyBy(t -> t.key)
    .timeWindow(<time specification>)
    .reduce(<reduce function>)
    .timeWindowAll(<same time specification>)
    .reduce(<same reduce function>)

You might expect Flink’s runtime to be smart enough to do this parallel pre-aggregation for you (provided you are using a ReduceFunction or AggregateFunction), but it’s not.
您可能期望Flink的运行时足够智能,能够为您完成这种并行预聚合(假设您使用的是ReduceFunction或AggregateFunction),但它不是。

The reason why this works is that the events produced by a time window are assigned timestamps based on the time at the end of the window.
之所以这样做,是因为时间窗口生成的事件是根据窗口末尾的时间分配的时间戳。

So, for example, all of the events produced by an hour-long window will have timestamps marking the end of an hour.
因此,例如,一个小时长的窗口生成的所有事件都将具有标记一个小时结束的时间戳。

Any subsequent window consuming those events should have a duration that is the same as, or a multiple of, the previous window.
使用这些事件的任何后续窗口的持续时间应该与前一个窗口相同,或者是前一个窗口的多个持续时间。

No Results for Empty TimeWindows空时间窗口没有结果

Windows are only created when events are assigned to them.
只有当事件被分配给Windows时,Windows才会被创建。

So if there are no events in a given time frame, no results will be reported.
因此,如果在给定的时间范围内没有事件,则不会报告任何结果。

Late Events Can Cause Late Merges延迟的事件会导致合并的延迟

Session windows are based on an abstraction of windows that can merge.
会话窗口基于可以合并的窗口的抽象。

Each element is initially assigned to a new window, after which windows are merged whenever the gap between them is small enough.
每个元素最初都被分配给一个新窗口,之后每当窗口之间的距离足够小时,就会合并它们。

In this way, a late event can bridge the gap separating two previously separate sessions, producing a late merge.
通过这种方式,一个后期事件可以弥补之前两个独立会话之间的差距,从而产生一个后期合并。

Hands-on 练习

The hands-on exercise that goes with this section is the .
这个部分的动手练习是 每小时提示练习。