〈Inca〉incarcerated?

2023-03-28 05:05 32次浏览 财经

在Netflix,我们的实时数据基础设施采用了多集群Kafka架构和Flink支持的流处理平台,这使我们能够每天发送数万亿条消息。上面的博客详细描述了这种体系结构,并在下面的图表中进行了说明。

在Netflix,我们的实时数据基础设施采用了多集群Kafka架构,该架构以"流"概念为中心,使我们能够提供服务层,为不同的用例提供不同的消息交付保证。

最近,我们启动了一项新的工作,在这个流数据基础设施中创建分布式实时消息跟踪系统。我们把这个追踪系统命名为Inca,这是南美洲一条著名的山路。这个名字——背离了我们习惯将Everest和Rocky等山的名字用于数据基础设施——反映了我们的信念,即这是一条可以帮助我们爬得更高的道路。

为什么需要消息跟踪系统?

从历史上看,对于Netflix的大多数数据,我们会优化消息发布中的可用性。我们通常为每个Kafka分区配置两个副本,并启用dirty leader election[1]来最大化Kafka的可用性。代价是代理上潜在的数据丢失,原因是日志截断,补偿重置导致消费者消息重复。

unclean的leader选举不是数据不一致的唯一原因。Kafka峰会[2]上的这篇演讲指出,Kafka复制协议中的一些边缘情况也可能导致数据丢失。

我们的大多数数据分析工作可以容忍少量的数据丢失和重复。然而,对我们来说,最重要的是提供精确的度量来证明这种设计权衡的合理性。Kafka对信息发布的认可,使得我们可以很容易地提供消息发布的成功率。但请记住,Kafka承认后,信息可能会丢失。在Inca创建之前,对于端到端度量,我们只能依赖聚合的发送/接收计数。考虑到计数中可能存在重复,这可能会产生误导。

更重要的是,我们有来自studio production的新用例,其中变更数据捕获(change data capture, CDC)扮演着重要的角色。这要求更高级别的消息交付保证和对丢失消息的审计功能。

本质上,我们试图用Inca去解决以下问题:

· 如果我们的基础设施中的任何一个系统声称有确定的数据交付保证(例如,至少一次),它真的遵守它的承诺吗?

· 假设某些配置可以接受数据丢失,那么我们是否可以识别丢失的数据,以便有机会恢复它们?

· 我们能否提供一个良好的度量方法来衡量消息丢失率、重复率和延迟,以作为服务质量的指标?

在进程间通信中,Inca与传统跟踪的一个重要区别是,我们的重点是检测消息丢失和重复。它不打算生成数据流图谱(data flow graph)和stream lineage。相反,我们的托管流平台已经可以提供流谱系信息,我们可以利用这些信息作为Inca中的输入。

设计

简而言之,我们希望为系统中传输的消息生成"跟踪",并分析这些跟踪,以检测丢失并获得所需的度量。

在我们的系统中,我们用一个惟一的ID (UUID)标识每个消息,因此我们期望每个消息ID具有多个跟踪。

让我们从一个简单但常见的情况开始,如下所示。

假设流处理作业进行了简单的转换或充实,而不会改变消息的标识,则会为所跟踪的每条消息生成以下类型的跟踪:

· 表示事件生成器在将消息生成到Kafka集群A时成功发送事件的跟踪

· 当流处理作业使用Kafka集群A中的消息时,表示成功接收到的事件的跟踪

· 在流处理作业中生成发送到Kafka集群B的消息时表示成功发送事件的跟踪

注意,我们不会为不成功的消息发布生成任何跟踪。这种度量标准已经可用,应用程序可以简单地依赖Kafka的确认来找到它,申请重试并根据自己的判断采取行动。这意味着Inca只跟踪在发布时收到成功确认的消息。

跟踪消息将包括以下属性:

· 正在跟踪的消息的ID

· 生成跟踪的位置

· 对应的Kafka集群名称

· 跟踪类型:发送或接收

· 时间戳

· 正在跟踪的消息的Kafka TopicPartition和偏移量

· 任何自定义属性都可以帮助恢复被标识为丢失的消息,例如RDBMS中的行ID

我们决定使用Kafka作为消息代理进行跟踪。原因是Kafka已经被证明能够在客户端以较小的占用空间处理大量的流量。一旦数据在Kafka中,它就为数据流处理甚至批处理打开了所有的可能性,如果数据被传输到数据仓库。

下图显示了如何使用消息流进行跟踪:

生成跟踪消息

跟踪在源端上启用,它是我们的事件生成器。我们的基础设施允许流的所有者指定被跟踪的消息的百分比,最高为100%。如果需要审计流中所有丢失的消息,则需要打开100%的消息跟踪。这可能非常昂贵,但在实践中,此类流通常只有很小的通信量。另一方面,每秒数百万条消息的流通常关心的是交付指标,而不是单个丢失消息的标识。

指示消息必须沿着数据流路径跟踪的信号被设置为Kafka记录头,其中头的值是消息ID。每个处理组件都检查这个记录头,并保证它在所有处理器中传播。

以下是如何以及何时发送跟踪消息:

· Kafka生产者:我们通过连接到生产者发送回调来发送跟踪消息。当代理接收到成功的确认,并且根据采样率随机选择要跟踪的消息时,我们发送跟踪。为了在用Flink编写的流处理应用程序中实现这一点,我们修改了Flink source,使其标准的FlinkKafkaProducer可以用一个可选的用户回调工厂构造[3],该工厂负责创建一个生产者发送回调来生成和发送跟踪。

· Kafka使用者:我们实现了使用者拦截器来发送跟踪消息。我们选择依赖于消费者拦截器,因为它是消费者参与跟踪的最低障碍,这与添加拦截器类的消费者配置一样简单。使用者拦截器可以访问整个记录以及头,这使得跟踪的实现成为可能。

有几个设计方案值得讨论。其中之一是我们是否应该使用ZipKin[4]来生成跟踪。我们选择不使用ZipKin的原因如下:

· 正如在开始时提到的,我们跟踪的目标很简单——检测数据丢失/重复并生成相应的度量,但不是为了创建数据流图。因此,像ZipKin这样一个成熟的跟踪框架似乎有些矫枉过正。

· 现有的ZipKin Kafka客户端(https://github.com/openzipkin/brave)只与Kafka记录交互。这使得利用我们为数据基础设施设计的记录抽象变得困难。

跟踪数据处理

Inca使用Flink流处理来分析跟踪数据。使用流处理使我们能够几乎实时地发布结果,这样我们就可以立即采取行动来改进服务,而不是等待数小时或数天。

Inca的流处理作业按照跟踪id(与消息id相同)对跟踪数据进行分组,以便对同一消息的跟踪进行分析。该作业调用数据基础设施的平台的控制面板来获取每个流的路由信息。如果路径上缺少预期的跟踪,则可以检测到丢失。同样,如果在路由的特定位置发现多个相同类型的跟踪,则可以检测到重复。延迟度量可以通过比较跟踪消息在每个发送/接收点的时间戳来计算,除了我们的分布式系统中的小时钟漂移(ntp server 也会有时间差)。

例如,假设(Event Producer→Kafka Cluster A→Stream Process→Kafka Cluster B)的消息路由如前所述,我们可以声明如下

· 如果缺少对远程位置Kafka集群A接收到的类型的跟踪,则会丢失消息

· 如果发现多个远程位置Kafka集群A接收到的类型跟踪,则复制消息

这里最大的挑战是窗口[5]的选择。直观地说,会话窗口似乎非常适合这个用例。然而,很难确定会话窗口的间隔。如果差距太大,则意味着我们必须长时间保存状态,这将导致在跟踪大量潜在消息的情况下,我们的作业的磁盘空间非常大。另一方面,如果差距太小,作业将产生大量的数据丢失信号,这是假阳性。

我们的流数据基础设施的实际情况是,99%的数据将在数据生成后一分钟内被处理并交付到最终目的地。但对于剩下的1%,可能需要一两个小时。当流经历了数据量的显著增加,但是流处理工作还没有扩展,因此明显落后时,就会发生这种情况。考虑到这种性质,采用统一的窗口显然效率不高。

因此,问题变成了:是否有一种方法可以聪明地确定应该继续等待跟踪消息到达,还是干脆尽早放弃?这就像赶一辆不总是按时刻表的火车。当你冲到月台时,火车已经开走了,你会不会想,你是不是在等一辆可能永远不会到站的火车呢?

幸运的是,有一些外部信号可以帮助我们做出明智的决定:Kafka消费者补偿。我们的流处理作业将消费者偏移量提交给Kafka。对于每个消息,生产者发送的第一个跟踪将告诉我们它的Kafka偏移量,我们可以将处理作业的当前偏移量位置与之进行比较。如果更大,则意味着流处理作业应该已经处理了它,跟踪消息应该已经到达或即将到达。否则我们应该耐心等待。

我们最终选择了Flink的带有自定义触发器的全局窗口(GlobalWindow),以获得极大的灵活性。当作业处理每个跟踪消息并触发事件计时器时,整个处理逻辑就变成了一个状态机。在消费者补偿的帮助下,我们可以通过快捷方式快速减少状态,节省大量资源,特别是磁盘空间,并能够在丢失发生后几分钟内创建消息丢失信号。

下面的代码片段显示了构建在我们的流处理平台(SpaaS)和记录抽象之上的Flink作业的框架:

SingleOutputStreamOperator<Record<TraceContext>> sourceStream =
getSourceBuilder().fromKafka("traces").withOutputType).build();
env.setStreamTimeCharacteristic);
(
new BoundedOutOfOrdernessTimestampExtractor<Record<TraceContext>>(30)) {
@Override
public long extractTimestamp(Record<TraceContext> element) {
return element.getAttribute).get().getTimestamp();
}
}
)
.map(r -> new TraceContextRecord(r))
.keyBy((KeySelector<TraceContextRecord, String>) value -> value.getTraceContext().getTraceId())
.window())
.allowedLateness()))
.trigger(createTraceTrigger())
.process(new ProcessWindowFunction<TraceContextRecord, Record<Map<String, Object>>, String, GlobalWindow>() {
@Override
public void process(String s, Context context, Iterable<TraceContextRecord> elements,
Collector<Record<Map<String, Object>>> out) {
out.collect(createMessageLossRecord(s, elements));
}
})
.addSink(getSinkBuilder().toKafka("lost_messages").build());

下图展示了Inca的完整架构:

关于 lost traces

跟踪从数据基础结构中的不同组件发送到Kafka。跟踪的传递对跟踪处理的准确性至关重要。为了确保跟踪交付,我们采用了以下措施

· 用于跟踪的Kafka集群与其他集群隔离,并且供应过多

· 集群的配置具有很高的持久性和一致性

· 三个副本和两个最小的同步副本

· unclean leader选举残疾人士

· 由AWS EBS支持的代理实例

· ack=all,以确保一致性

在测试期间,我们发现重新启动流处理作业常常导致丢失跟踪。原因是,当作业关闭时,跟踪生成器没有得到刷新缓冲区的机会。解决这个问题的方法是利用Flink的检查点机制。当一个Flink任务被关闭时,它必须创建一个检查点,在这个检查点上,它需要为它已经成功处理并发送到接收器的所有消息提交Kafka使用者偏移量。在检查点期间为跟踪生成器添加刷新,确保跟踪消息将具有与Flink处理的消息相同的交付保证。

但是,请记住,Inca只在我们的数据基础设施中发挥辅助作用。当系统受到压力或跟踪集群变得无响应时,我们别无选择,只能停止发送跟踪,将资源分配给更关键的功能。事实上,我们花了大量的精力来确保跟踪永远不会成为瓶颈。因此,丢失跟踪消息是不可避免的,甚至在某些时候是需要的。

但是,失去踪迹是否会让Inca人变得不可信?没关系。假设我们的基础设施将丢失所有消息的0.01%和跟踪消息的0.005%(由于其高持久性配置),下面的表格显示了所有场景的概率:

我们可以做以下几点观察:

· 当跟踪丢失但消息被传递时,我们创建一个消息丢失信号,该信号为false positive。这代表了⅓声明的所有信息丢失的情况,可能会导致一些不必要的操作负担。好消息是,对于所有传输的消息,只有0.005%的几率发生这种情况。由于这是一个非常小的数量,并且考虑到我们的系统大多数都是幂等的,或者能够容忍小的重复,一个选择就是重新发送这些消息,而不分析它们是否为假阳性。

· 如果原始消息和跟踪消息都被发送到网关,那么我们就失去了捕获它的机会,而消息实际上是丢失了,没有机会恢复。但它只发生在概率为0.00005%的情况下。这意味着即使跟踪不是完美的,它也可以将消息损失率降低200倍。伟大的成就!

这仅仅是个开始

我们已经将Inca投入生产,用于一系列选定的数据流,并让它跟踪大约5亿条消息,每天处理20亿条跟踪消息。很快它就被证明是有效的。我们发现,大多数消息丢失是由于Kafka复制中的边缘情况造成的,其中复制因子被设置为2,并且分区leader发生了更改。在一个有趣的例子中,一个broker出现了硬件问题,导致时钟严重偏移,并开始以很高的频率截断日志。多亏了Inca,我们立即收到消息丢失的警报,并终止了异常值代理以避免进一步的损失。

下面是一个特定流的典型度量仪表板,它显示消息丢失/重复和延迟的速度和比率。

Inca还有很多事情要做。例如,我们有流处理应用程序,它从多个流消费,并向多个接收器生产。我们如何跟踪这个复杂的数据流?一些应用程序不仅希望跟踪源和接收操作符中的数据流,还希望跟踪中间处理操作符中的数据流。实现这一目标的最佳途径是什么?我们能否进一步提高准确率,降低资源利用率?

[1]:https://kafka.apache.org/documentation/#design_uncleanleader

[2]:https://kafka-summit.org/sessions/hardening-kafka-replication/

[3]:https://issues.apache.org/jira/browse/FLINK-11320

[4]:https://zipkin.io/

[5]:https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html

ref:

相关推荐