博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
漫谈流式计算的一致性
阅读量:6959 次
发布时间:2019-06-27

本文共 4184 字,大约阅读时间需要 13 分钟。

参考,

 

对于batch分析,fault-tolerant很容易做,失败只需要replay,就可以完美做到容错。

对于streaming分析, 数据流本身是动态,没有所谓的开始或结束,虽然可以replay buffer的部分数据,但fault-tolerant做起来会复杂的多

当前主流的一些streaming分析平台,都有一些各自特有的fault-tolerant的机制,在此分析和总结一下,

无状态流数据处理,

这是种比较简单的流式数据的场景,典型的应用是数据ETL,数据存储,数据流过是没有状态的

保证at least once语义,

分钟级别,Storm的acker机制,就可以很好的保证,
message没有被正确处理,收到ack时,可以选择重发,这样每条message对可以保证被处理到,但可能会被重复处理

小时,天级别,利用kafka的replay,一般达到天级别的cache

保证exactly once语义,

对于无状态数据流,其实只要依赖最终存储的去重性(deduplication), 就可以达到exactly once
比如对于数据库,通过unique key和insert ignore就可以解决这个问题,无论你之前重复处理多少次,最终我只存储一次。

如果最终存储不支持去重,或者场景比较复杂不仅仅是存储,比如做叠加计数 或 update

做叠加计数,当前的机制,你无法知道这个message是否加过
做update的时候,更新的时序性很重要,这个是ack机制无法保证的

Storm 0.7就提供transactional topology特性,

首先给message加上transaction id,这样有两个好处,可以保证时序性,在写入存储的时候,可以按transaction id顺序写入

并且在可以外部存储上记录当前最新的transaction id,保证相同的transaction,不会被重复写入
这个是transactional topology的核心思路,这样确实是可以保证强一致性,exactly once语义
但这个方案只适用于无状态,或是依赖外部存储的,状态必须要存储在外部存储上

至于使用batch,或将topology分为processing和commit阶段,都是对性能的优化,并不会提升一致性的保障

但由于使用micro-batch是必须的,所以也称这类方案是micro-batch方案,除了transactional topology,还有Apache Spark Streaming
micro-batch的坏处,
1. 改变编程模型,伪流式
2. windows based聚合的限制,只能是micro-batch的倍数,比如micro-batch是3分钟,你想做个5分钟聚合,没法做
2. 延迟变大,如果本身秒级别,但如果micro-batch是1分钟,那延迟就至少1分钟

有状态流数据处理,

典型的场景,就是windows-based的聚合或计算,比如计算1分钟内的计数或平均值,这样会有部分数据需要cache在内存中

这样当fail-over时,如何可以恢复cache,并保证exactly once语义

最直接的想法,

局部的snapshot

每个component对cache定期做snapshot,然后在fail-over后,各自恢复自己的cache,

这样做的问题,
1. snapshot很难增量做,如果cache比较大,成本会比较高
2. snapshot只能定期做,会有部分丢失
3. 最关键的,对于分布式系统,各个compoent独立的进行snapshot,很难达到同一个状态,每个component的处理速度都是不一样的,有的处理到n做了snapshot,而有的可能做到n+1才做,
缺乏一个统一的参照系。

 

change-log

每个 component,当接收到一个 message 的时候,产生一条 change log 记录该 message 和更新的状态,存入 transactional log 和数据库
当做 fail-over 的时候,只需要每个 component 将数据库中的 log,拿出来 replay 即可
这种方式使用的平台如 Google Cloud Dataflow,Apache Samza

对于 Apache Samza,会将 change log 放入kafka中,

image

当fail-over后,每个task从相应的kafka topic里面读出change-log,完成local state的replay

这样做的好处,是不用直接去snapshot local cache,如果cache比较大的话,这样是比较划算的

但是如果数据流很big的话,这样做也不合适了,因为change-log会非常大

 

Distributed Snapshots (Apache Flink),全局的 snapshot

针对前面提到的局部 snapshot 最关键的问题,提出全局 snapshot 的方法,

其实最大的问题仍然是分布式系统的根本问题,统一参照系的问题,如何让每个 component 在同一的状态下,进行 snapshot

这个原理来自 Chandy and Lamport, 1985,的paper “Distributed Snapshots: Determining Global States of Distributed Systems”

局部的snapshot会有的问题,

状态丢失,如下图,但状态中传输的时候,对P和Q进行snapshot,会导致队列中的绿蓝橙状态丢失

状态重复,brown状态中P和Q的snapshot里面同时出现

怎么解这样的问题?分布式系统中缺乏统一参照系的情况下,只有通过通信才能确定偏序的问题

所以这里使用marker来做组件间的同步,并防止丢失状态,会同时对组件,以及队列同时做snapshot, 如下图

P做snapshot,然后发送marker到Q

Q收到marker的时候,知道P做了snapshot,那么我也要做snapshot
同时还要对PQ channel做snapshot,此时channel中有个green,但是由于green是在marker后面的,说明它在P的snapshot里面已经做过,不需要再做,所以此时PQ的snapshot为空
Q在做完snapshot后,还需要把marker返回给P,因为在过程中orange从Q被发送到P
当P收到Q返回的marker时,由于P的snapshot已经做过,无法改变
所以把orange放在QP channel的snapshot中

最终做出的全局的snapshot为,

P(red, green, blue)channel PQ ()Q(brown, pink)channel QP (orange)
这样就解决了状态丢或重复的问题

 

Flink’s distributed snapshotting实现基于stream barriers

可见,barrier可以将流拆分成一段段的数据,每个barrier都是一个snapshot点,但是这种拆分不同于micro-batch,并不会影响到正常的流式处理

在DAG,即有向无环图的case下,是不需要对channel做snapshot的,场景会比较简单
只是每个组件收到barrier的时候去做snapshot就好,该算法的几个前提:
1. 网络可靠,消息FIFO;
2. channel可以block,unblock,支持对所有output channel进行广播
3. 可自动识别注入的barrier

完成过程如图,这是个有两条入边的case,相对复杂些

当收到一条channel的barrier时,需要先block该channel,然后等待另一个channel中的barrier
当两条channel的barrier都到达时,说明达到统一状态,进行checkpoint
然后unblock之前block的channel,并对所有的output channel广播该barrier

当DAG上的所有组件都完成snapshot时,那么一个全局的snapshot就完成了,以barrier为唯一标识

比较抽象,下图以kafka为例子解释一下,

对于kafka而言,不同的partition需要不同的线程读,

图中,4个source thread分别从4个partition读取数据
其中由唯一的master来发起checkpoint流程,
过程是,
1. Master给所有的source thread发checkpoint请求
2. source thread接收到cp请求后,会记录当前的offset,比如5791,并做该offset的message前发出streaming barrier
    并将offset返回给master

3. 这样master收到所有source的ack offset,就相当于对source做了snapshot,恢复时只需要将相应的source置到该offset即可

4. 中间每个组件,当收到所有input channel的barrier时,将cp存入数据库,并通知Master
5. 层层下去,直到所有Sink节点,最终节点,完成snapshot

6. master接收到所有节点的做完cp的ack,知道这次checkpoint全部完成

这个方案的最大的问题是,当多个input channel时,需要等所有的barrier到齐,这个明显会增加latency

Flink的优化是,不等,看到barrier就打snapshot,这样的问题就是无法保证exactly once,会重复,
因为后来的barrier打checkpoint时会覆盖先前的cp,
此时barrier先到的channel已经处理了一些barrier之后的数据,这部分结果也会存在cp中

但当fail-over的时候,因为replay是根据你发送barrier的offset来重发的,所以这部分会重复

转载地址:http://ywmil.baihongyu.com/

你可能感兴趣的文章
数据中心的那些未来技术
查看>>
如何善用产品设计的三个层级
查看>>
如何在Amazon AWS上设置一台Linux服务器
查看>>
网站优化遇到死链怎么合理的处理?
查看>>
全球智慧城市进入快速发展阶段
查看>>
AI 黑客会大规模进军网络安全领域吗?为时尚早,因为太贵了
查看>>
科通芯城康敬伟:不照抄别人美国也没这模式
查看>>
每个平安城市的背后,都需要一个默默付出的“她”!
查看>>
外资赴中国设厂加剧竞争,中国晶圆代工厂今年全力冲刺 28 纳米制程
查看>>
《Servlet和JSP学习指南》一1.10 处理HTML表单
查看>>
联发科Helio X30信息曝光 十核大杀器再升级!
查看>>
旬邑多举措推进智慧城市建设工作
查看>>
探寻虹膜识别背后的身份密码 | 硬创公开课
查看>>
科技巨头有钱怎么花?大规模并购挖掘增长动力
查看>>
欧洲工业无线网络技术采用中国标准
查看>>
信息通信研究院徐志发:区块链金融的“一升一降三创新” | 数博会2017
查看>>
我国云计算产业进入加速成长期 四大主线布局
查看>>
运用大数据构建“数字生态”体系
查看>>
自然语言处理第一番之文本分类器
查看>>
RFID全新风貌展现物联网主导实力
查看>>