龙空技术网

数据密集型应用

好肉一只皮卡丘 119

前言:

现时咱们对“拓扑排序算法应用”都比较关怀,姐妹们都需要剖析一些“拓扑排序算法应用”的相关知识。那么小编在网络上汇集了一些关于“拓扑排序算法应用””的相关文章,希望咱们能喜欢,我们一起来了解一下吧!

1、数据系统基石1.1 可靠性reliabiity 可扩展性scalability 可维护性maintainability

很多应用程序都是数据密集型(data-intensive)而不是计算密集型(compute-intensive)。因此CPU很少成为这类应用瓶颈,问题主要来自数据量、数据复杂性以及数据变更速度。

存储数据:database记住开销昂贵的操作结果,加快读取速度 cache按关键字搜索,各种方式过滤(es solar)向其他进程发消息,进行异步处理(stream processing)定期处理大批量数据(batch processing)

数据库、消息队列、缓存表面有一些相似性(存储一段时间的数据),但他们有不同的访问模式,意味着迥异的性能特征和实现手段。

可靠性应用程序表现出用户期望的功能允许用户犯错,允许用户以出乎意料的方式使用软件在预期负载和数据量下,性能满足要求系统能防止未经授权的访问和滥用fault-tolerant容错性,有硬件错误、软件错误、人为错误。硬件错误包括硬盘崩溃、内存出错、断电。硬盘平均无故障时间(MTTF mean time to failure)约为10-50年。在拥有1w个磁盘的存储集群上,平均每天就有1个磁盘出故障。应对:磁盘raid,双路电源、热插拔CPU。软件错误:接受特定错误输入,导致所有应用服务器实例崩溃。比如12年6-30的闰秒,linux内核一个错误,许多应用同时挂了。失控进程占用一些共享资源,包括cpu、内存、磁盘空间、网络带宽。系统依赖的服务变慢;级联故障,一个小组件故掌触发另一个组件中的故障;解决方法:仔细考虑系统中假设和交互;彻底测试;进程隔离;允许进程崩溃并重启;测量、监控分析系统行为。扩展性

性能:

增加负载参数并保持系统资源(CPU 内存 网络带宽)不变,系统性能受到什么影响增加负载参数并保持性能不变,需要增加多少系统资源

系统性能:吞吐量(每秒处理请求量 每秒处理数据量)、延迟、响应时间。

百分点通常用于服务级别目标(SLO service level objectives)和服务级别协议(SLA service level agreements),即定义服务预期性和可用性合同。SLA可能会声明:如果服务响应时间中位数小于200ms,且99.9百分位点小于1s,则认为服务工作正常。

实践中的百分位点在多重调用的后端服务中,高百分位数十分重要。即使并行调用,只有一个调用变慢,整个请求就会变慢,称为尾部延迟放大效应。如果想将响应时间百分点添加到服务监视仪表板,需要有效的计算。简单的实现是在时间窗口内保存所有请求响应时间列表,每分钟对列表进行排序。大数据量情况下可能效率太低。有些算法如前向衰减、t-digest、HdrHistogram、hyperLogLog来计算百分位近似值。

适应某个级别负载的架构不太可能应付10倍于此的负载。当负载发生数量级增长,需要重新考虑架构。通常有纵向扩展(scaling up)【垂直扩展vertical scaling转向更强大的机器】和横向扩展scaling out【水平扩展 horizontal scaling,将负载分布到多台小机器上】。跨多台机器分配负载也称为无共享架构share-nothing。有些系统是弹性的,意味着可以检测到负载增加时自动增加计算资源。跨多台机器部署无状态服务非常简单,将带状态的数据系统从单节点变为分布式配置会引入许多额外复杂度。

可维护性可操作性(Operability)便于运维团队平稳运行简单性(Simplicity)从系统中消除尽可能多的复杂度,使新工程师也能轻松理解系统可演化性(evolability)也称为可扩展性extensibility1.2数据模型与查询语言

数据模型层次:

观察世界:人、货、行为、资金,我们可以用对象、数据结构以及操控那些数据的API进行建模;存储这些数据结构可以用json/xml,关系数据库中的表、图来存储;数据库要选择如何以内存、磁盘或网络上字节表示json/xml/关系/图数据,这类表示形式使数据可能有各种方式来查询、搜索、操纵和处理。更低层次上,硬件工程师可以使用电流、光脉冲、磁场或其他东西来表示字节。

关系模型与文档模型

关系模型:即传统的关系数据库SQL;文档模型:NoSQL。网络模型:是层次模型的推广,每条记录可能有多个父节点。

文档模型中的架构灵活性文档数据库有时称为无模式(schemaless),具有误导性,因为读取数据的代码通常假定某种结构,更精确的是schema-on-read 数据是隐含的,只有在数据被读取时才被解释。传统关系数据库是schema-on-write写时模式。读时模式类似编程语言中动态(运行时)类型检查,而写时模式类似静态(编译时)类型检查。就像静态和动态类型检查的相对优点具有很大争议一样。

查询的数据局部性文档通常以单个连续字符串形式进行存储,如果应用程序经常访问整个文档,那么存储局部性会带来性能优势。局部性仅仅适用于同时需要文档绝大部分内容的情况。即使之访问其中一小部分,数据库通常需要加载整个文档,十分浪费。更新文档需要整个重写。为了局部性而分组集合相关数据不局限于文档模型,spanner在关系数据模型中提供了同样的局部性属性,允许模式声明一个表的行应该交错嵌套在附父表内。orable中的多表索引集群表multi-table index cluster tables。bigtable中cassandra、hbase中的列族概念与管理局部性类似。随着时间推移,关系数据库和文档数据库变的越来越相似,模型相互补充。

数据查询

SQL声明式查询语言(编程语言是命令式的)MapReduce查询,用于在多台机器上批量处理大规模数据。是一个相当底层的编程模型,用于计算机集群分布式执行。图数据模型 neo4j cypher1.3 存储与检索

主要有两种存储引擎:日志结构(log-structured)、面向页面(page-oriented)B树。数据存储的诸多问题:文件格式、删除记录、崩溃恢复、部分写入、并发控制。为什么追加日志:追加和分段合并是顺序写入,比随机写快。某种程度上顺序写基于闪存的固态硬盘SSD也是很好的;如果段文件是附加的或不可变得,并发和崩溃恢复就简单了;合并旧段可以避免数据文件随着时间推移分散的问题。哈希索引缺点

散列表必须可以放进内存范围查询效率不高SSTables和LSM树

每个日志结构存储段都是一系列键值对。假设要求键值对的序列按键排序,就把这种格式称为排序字符串表(Sorted String Table),简称SSTable。压缩保证每个键只在每个合并段文件出现一次。优点:

合并简单高效。类似归并排序为了在文件中找到一个特定的键,不需要保存内存中所有键的索引。

在磁盘上维护有序结构是可能的(B树),但是内存中更容易。比如红黑树、AVL树。那么构建存储引擎的思路:

写入时,添加到内存中的AVL树,这个内存树被称为内存表(memtable)当内存表大于某个阈值(几兆),将其作为SSTable写入磁盘。写入后,继续写入到一个新的内存表;为了提供读取请求,首先尝试在内存表中找到关键字,然后在最近的磁盘段中,在下一个较旧的段中找到该关键字;有时在后台运行合并、压缩过程以组合段文件并丢弃覆盖或删除的值。 如果数据库崩溃,则最近写入的内存表则会丢失,所以在写入日志时,日记追加到磁盘上,以便崩溃后恢复。

用SSTable制作LSM树:算法本质上是LevelDB、RocksDB中使用关键值存储引擎库,被设计嵌入到其他应用程序中。Cassandra、HBase使用了类似存储引擎,都是受到SSTable、memtable启发。lucene是ES、Solr使用的一种全文搜索引擎,使用类似方法它的词典。全文索引比键值索引复杂,基于类似想法:在搜索查询中给出一个单词,找到提及的单词的所有文档。通过键值结构实现,键是单词(term),值包含单词(文章列表)的所有文档的ID列表。

性能优化当查找数据库中不存在的键时,LSM树算法可能很慢:必须检查内存表,可能会从磁盘读取每一个,才能确定键是否存在。为了优化这种访问,存储引擎通常使用额外的bloom过滤器。不同策略确定SSTable如何被压缩、合并顺序时间。分层压缩、平坦压缩,比如levelDB使用平坦压缩,HBase使用大小分层,cassandra同时支持。规模级别的调整中,更新和更小的SSTable先后被合并到更老、更大的SSTable中。水平压缩,关键范围被拆分成更小的SSTable,较旧的数据被移动到单独的水平上,使得压缩能够递增进行,使用更少的磁盘空间。

B树

像SSTable一样,B树保持按键排序的键值对,允许高效的键值查找和范围查询。日志结构索引将数据库分解为可变大小的段,总是按顺序编写段。相比B树将数据库分解为固定大小的块或页,传统上为4kb(有时更大),并且一次只能读取或写入一个页面。这种设计更接近底层硬件,因为磁盘也被安排在固定大小的块中。让B树更可靠B树基本底层写操作是用新数据覆盖磁盘上页面,假定覆盖不改变页面位置。这与日志结构索引LSM形成对比,LSM只附加到文件,删除过时文件。硬盘上发生的操作:磁头移到正确位置,转到正确位置后,用新的数据覆盖适当扇区。固态硬盘上,SSD必须一次擦除和重写相当大的存储芯片块。为了防止数据库崩溃数据丢失,B树也实现了一个额外的磁盘数据结构:预写式日志(WAL write-ahead-log redo log)。仅追加的文件,多个线程修改的时候,需要进行并发控制。

比较B树和LSM树

LSM树写入速度更快,B树读取速度更快。读取LSM需要压缩不同阶段检查几个不同数据结构和SSTable。LSM树优点

B树索引至少两次写入每一段数据:一次写入预先写入日志,一次写入树页面本身。即使页面几个字节发生变化,也需要一次编写整个页面。由于反复压缩、合并SSTable,日志结构也会重写数据。在数据库的生命周期中写入数据库导致对磁盘的多次写入,被称为写放大write amplification。LSM树通常能够比B树支持更高的写入吞吐量,具有较低的写放大,因为顺序写入紧凑的SSTable不是覆盖树中几个页面。LSM树可以被压缩的更好,比B树在磁盘上产生更小的文件。B树存储引擎会由于分割留下一些未使用的磁盘空间:当页面被拆分或某行不能放入现有页面,页面中某些空间仍未被使用。由于LSM树不是面向页的,并且定期重写SSTable去除碎片,所以有较小存储开销,特别是使用平坦压缩。

LSM缺点

压缩过程有时会干扰正在进行的读写操作。尽管存储引擎尝试逐步执行压缩而不影响并发访问,但是磁盘资源有限,很容易发生请求需要等待磁盘完成昂贵压缩操作。对吞吐量和平均响应时间影响通常很小。压缩另一个问题出现在高写入吞吐量:磁盘的有限写入带宽需要在初始写入(记录和刷新内存表到磁盘)和后台运行的压缩线程间共享。数据库越大,压缩所需的磁盘带宽就越多。如果写入吞吐量很高,并且压缩没有仔细配置,压缩跟不上写入速度。这种情况下,磁盘上未合并段数量不断增加,直到磁盘空间用完,读取速度也会减慢。通常情况,即使压缩无法跟上,基于SSTable的存储引擎也不会限制写入速度,需要进行监控。

B树一个优点是:每个键只存在于索引一个位置,而日志结构化存储引擎可能在不同段中有相同键多个副本,B树可以更容易实现事务。

主键索引 二级索引多列索引 将多个字段组合成一个键全文搜索和模糊索引 Lucene为其词典使用了一个类似SSTable的结构,这个结构需要一个小的内存索引,告诉查询在排序文件中哪个偏移量查找关键字。Lucene中内存中索引是键中字符的有限状态自动机,类似于Trie,支持在给定编辑距离内有效搜索单词。缓存 内存数据库。内存数据库重启需要从磁盘、网络从副本加载状态。列存储 不要讲所有来自一行的值存储在一起,将来自每列的值存储在一起。还可以进一步通过压缩数据来进一步降低对磁盘吞吐量的需求,面向列存储很适合压缩,使用位图。cassandra和Hbase有列族的概念,从BigTable继承,面向列存储是有误导性的,因为在每个列族中,将一行中的所有列与行键一起存储,并且不适用列压缩,因此BigTable模型仍然主要面向行的。

内存带宽和向量处理

需要扫描数百万行的数据仓库查询,巨大的瓶颈是从磁盘获取数据到内存的带宽。另一个地方是:如何有效利用主存储器带宽到CPU缓存的带宽,避免CPU指令处理流水线中的分支错误预测和泡沫,以及现代中使用单指令多数据(SIMD)指定CPU。除了减少从磁盘加载的数据量外,面向列存储也可以有效利用CPU周期。比如查询引擎可以将大量压缩的列数据放在L1缓存中,然后在紧密的循环中循环(没有函数调用)。前面描述的按位“与”和“或”运算可以被设计为直接在这样的压缩列数据块上操作。这种技术被称为矢量化处理。2 分布式数据共享内存架构

许多处理器,内存和磁盘可以在同一个OS下相互连接,快速的相互连接允许任意处理器访问内存、磁盘的任意部分。在这种共享内存架构(share-memory),所有组件都可以看作一台单独的机器。(大型机中,尽管任意处理器可以访问内存任意部分,但总有一些区域与一些处理器更接近,称为非均匀内存访问NUMA,为了更有效利用这种架构特性,需要对处理进行细分,以便每个处理器主要访问临近内存,分区partitioning仍是必要的)共享内存问题:成本增长速度快于线性增长。双倍处理器、双倍内存不足以处理双倍载荷。共享内存架构提供有限的容错能力,尽管高端机器可以使用热插拔组件(不更换磁盘、内存、甚至处理器)

共享磁盘shared-disk

使用多台具有独立处理器和内存的机器,但将数据存储在机器之间的共享磁盘整列上,磁盘通过网络连接,这种架构用于某些数据仓库,但竞争和锁定的开销限制了共享磁盘方法的可扩展性。

无共享架构shared-nothing

有时称为水平扩展scale out。每个节点只使用各自处理器、内存、磁盘。节点间的任何协调,都是在软件层面使用传统网络实现。无共享架构不需要使用特殊硬件,可以使用任意机器。

复制replication VS 分区partitioning

数据分布在多个节点上有两种常见方式:复制,在几个不同节点保存相同副本,复制提供冗余。分区:将一个大型数据库拆分成较小子集,从而不同分区指派给不同节点node,也称为分片shard。

为什么数据需要复制使得数据与用户在地理上接近(从而减少延迟)系统一部分出现故障,系统也能继续工作(提高可用性)扩展可以接受请求的机器数量(提高读吞吐量)

复制困难之处在于处理复制数据的变更,主要有三种流行的复制算法:单领导者(single leader) 多领导者(multi leader) 无领导者(leaderless)。复制时需要考虑,同步复制还是异步复制,如何处理失败的副本。

处理异常:

从库失效:追赶恢复主库失效:故障切换。将其中一个从库提升为新的主库,以将他们的写操作发送给新主库,这个过程称为failover。复制日志的实现

基于语句的复制主库记录执行的每个请求(语句statement),并将该语句日志发送给其从库。对于关系数据库来说,就是每个insert update delete语句。但是也有一些问题:

任何调用非确定性函数(nondeterministic)语句,可能会在每个副本上生成不同的值,比如Now(),rand()。如果语句使用了自增列(auto increment),或者依赖现有数据,则必须在每个副本按照完全相同的顺序执行,否则会产生不同的效果。当有多个并发执行事务时,可能成为一个限制。有副作用语句(触发器、存储过程)可能会在每个副本产生不同副作用。解决方法:主库可以用固定返回值替代确定的函数调用,以便从库获得相同的值。

传输预写式日志

对于日志结构存储引擎,日志段在后台压缩,并进行垃圾回收对于覆写单个磁盘块的B树,每次修改都会写入预写式日志(WAL)

任何情况日志都包含所有数据库写入的仅追加字节序列,可以使用完全相同的日志在另一个节点上构建副本。除了将日志写入磁盘,主库还可以通过网络将其发给从库。

Postgre和Oracle使用这种复制方法,缺点是:日志记录非常底层,WAL包含哪些磁盘块中,哪些字节发生更改。这使复制与存储引擎紧密耦合。

逻辑日志复制(基于行)复制和存储引擎使用不同的日志格式,可以使复制日志从存储引擎内部分离出来。这种复制日志称为逻辑日志,以将其与存储引擎(物理)的数据表区分开。

对于插入的行,日志包含所有列的新值对于删除的行,日志包含足够的信息标识已删除的行对于更新的行,日志包含更新的行,以及更新的新值。

基于触发器复制相比其他复制具有更高的开销,更容易出错,也有很多限制,但更灵活。

多主复制应用场景:数据库的副本分布在不同数据中心,常规基于领导者复制,主库必须位于其中一个数据中心,所有写入都要经过该数据中心。要解决不同数据中心同时修改相同数据,需处理冲突。另一种场景:应用程序在断网后仍需要继续工作。协同编辑场景。避免冲突:通过一些手段指定固定的leader。或者通过唯一ID、时间戳。自定义冲突解决逻辑,写时执行或读时执行,让应用自己选择解决。复制拓扑有环形拓扑、星型拓扑、雪花拓扑。

无主复制单主复制、多主复制都是基于这样想法:客户端向一个主库发送写请求,数据库系统负责将写入复制到其他副本,主库决定写入的顺序,从库按相同顺序应用主库写入。最早一些复制系统是无领导的leaderless,现在又开始流行起来。

读修复和反熵读修复:客户端并行读取多个节点,可以检测到任何陈旧响应,判定新值,写会复制品。反熵过程:数据存储的后台不断查找副本间数据差异,将任何缺少的数据从一个副本复制到另一个副本。

分区

数据在不同节点上的副本,对于非常大的数据集成本很高(吞吐量很大),仅仅复制是不够的。还需要将数据进行分区partitions,也成为分片sharding。【与网络分区不是同一个概念】

分区概念在MongoDB、ES、Solor中称为分片shard,HBase中称为region,Bigtable中是tablet,Cassandra是vnode虚节点。

分区主要是为了扩展性,不同分区可以放在不共享集群中不同节点上,因此大数据集可以分布在多个磁盘上。分区通常与复制结合使用,使得每个分区副本存储在多个节点上,意味着每条记录属于一个分去,仍然可以存储在多个不同节点上获得容错能力。

键值数据如何分区,哪些节点存储哪些数据,如果分区不公平,会导致形成高负载的分区,也成为热点hot spot。

根据键的范围分区

为每个分区指定一块连续的键范围,如果知道范围边界,则可以轻松确定哪个分区包含某个值。key range分区缺点是某些特定模式会导致热点。

根据键的散列分区

根据散列函数、哈希、加密算法,在分区之间分配键,分区边界可以是均匀间隔的,也可以是伪随机选择的,有时也可以称之为一致性哈希。

通过使用Key散列进行分区,失去了键范围分区的一个很好的属性:高效执行范围查询的能力。组合索引为一对多关系提供了一个优雅的数据模型,比如主键(userId,updateTime)。

负载倾斜与消除热点

极端情况,所有读写可能是针对同一个键的,所有请求都会被路由到同一分区。大数据系统无法自动补偿这种高度偏斜的负载,应用程序有责任减少偏斜。比如在火爆ID的前后加一些数字,保证其分布在不同分区。

分片与次级索引

前面的分区方案依赖键值数据模型,如果只通过主键访问记录,可以确定键分区。如果涉及次级索引,会更加复杂。通常辅助索引不能唯一标识记录。次级索引是关系数据库基础,在文档数据库也很普遍。次级索引不能整齐映射到分区,有两种用二级索引对数据库进行分区的方法:基于文档的分区document-based 基于关键词term-based

按文档的二级索引

假如有一个汽车列表,每个列表有一个唯一ID(文档ID),利用ID对数据库进行分区,可以通过颜色、厂商过滤,所以需要在颜色、厂商建次级索引(文档数据库中是字段field,关系数据库中是列column)。比如有俩分区

partition0:191 -> {color:"red",make:"Honda"} 214 ->{color:"black",make:"Dodge"} 306 ->{color:"red",make:"Ford"}次级索引: color:black -> [214] color:red -> [191,306] color:yellow -> []Partition1:515 -> {color:"silver"} 768 -> {color:"red"} 893 -> {color:"silver"}color:black -> [] color:red -> [768] color:siler -> [515,893]

这种索引方法每个分区独立,分区维护自己的二级索引。文档分区索引也被称为本地索引local index。查询的时候需要发送到所有分区,这种查询称为分散/聚集(scatter gather),并且可能会使二级索引上读取查询很昂贵,即时并行查询分区,分散/聚集也容易导致尾部延迟放大。cassandra、es、solor中广泛使用。

根据关键词(Term)的二级索引

也可以构建一个覆盖所有分区数据的全局索引,而不是给每个分区创建自己的次级索引。但是不能只把这个索引存储在一个节点上,可能会成为瓶颈,违背分区目的。全局索引也必须进行分区,可以采用与主键不同分区方式。

partition0: color:black -> [214] color:red -> [191,306,768]partition1: color:silver -> [515,893] color:yellow -> []

可以通过关键词本身或散列进行索引分区,根据其本身对范围扫描非常有用(数字,汽车报价),而对关键词的哈希分区提供了负载均衡能力。

分区再平衡

随着时间推移,数据库的变化:

查询吞吐量增加,增加更多CPU数据集大小增加,增加磁盘 RAM机器故障,其他机器来接管

这些更改都需要将数据和请求从一个节点移动到另一个节点,将负载从集群一个节点移向另一个节点称为再平衡reblancing。

再平衡需要满足:

负载在集群节点间公平共享再平衡发生时 数据库继续接受读取写入节点之间只移动必须的数据,以便快速再平衡,减少网络和磁盘IO平衡策略固定数量分区:创建比节点更多的分区,每个节点分配多个分区。如运行在10个节点的集群可能被拆为1000个分区,大约有100个分区分配给每个节点。如果一个节点添加到集群中,则可以从旧节点分配一些区给新节点。分区数量不变,还可以解决硬件不匹配问题:为更强大的节点分配更多区,承受更多负载,如ES couchHbase动态分区 对于使用键范围分区的数据库,具有固定边界的固定数量的分区将非常不便。按键范围进行分区的数据库(如HBase)会动态创建分区,分区增长超过配置大小,会被分成两个分区。数据量小的分区可以合并。按节点比例分区。以上两种策略中,分区的数量与节点数量无关。cassandra使用的是分区数与节点数成正比,每个节点具有固定数量分区。一个新节点加入集群,随机选择固定数量现有分区进行拆分,然后占有这些拆分分区中每个分区一半。可能产生不公平分割。请求路由

客户发出请求如何知道连接哪个节点,概括为服务发现service discovery。

允许客户连接任何节点(如Round-Robin load balancer)。如果该节点恰巧拥有请求的分区,则处理返回;否则将请求转发到适当节点;将客户端请求转发到路由层,决定怎么转发,负责负载均衡客户端知道分区和节点分配。HBase、Solor、Kafka使用zk跟踪分区分配,mongoDB依赖自身配置服务config server,cassandra使用gossip协议。索引和快照隔离

索引如何在多版本数据库工作,一种是使索引简单指向对象所有版本,并且需要索引查询来过滤掉当前事务不可见的任何对象版本。当垃圾手机删除任何事务,相应索引条目也可以被删除。实践中许多实现细节决定了多版本并发控制的性能。在CouchDB、LMDB,使用的是B树,但是使用的是仅追加/写时拷贝,更新时不覆盖树的页面,而是创建一个副本。从父页面直到树根级联更新。仅追加B树,每个写入事务都会创建一棵新的B树。当创建时,从该特定树根生长的树就是数据库一个一致性快照。没必要根据事务ID过滤对象,因为不能修改现有的B树。这种方法也需要一个负责压缩和垃圾收集的后台进程。

分布式系统的麻烦云计算与超级计算机一种是一端是高性能计算(HPC)。具有数千个CPU的超级计算机,通常用于计算密集型科学计算任务,如天气预报、分子动力学。另一个极端是云计算,通常与多租户数据中心,连接IP网络的商品计算机,弹性/按需资源分配以及计量计费相关联;传统企业数据中心位于两个极端之间

不同哲学会有不同故障处理方式。超级计算机通常会将计算状态持久化到磁盘,一个节点故障简单的停止整个集群工作。故障修复后,从上一个检查点重新开始。互联网服务系统有很大不同:

互联网应用程序都是在线的,需要能够随时以低延迟服务用户。超级计算机通常由专用硬件构成,每个节点相当可靠,节点通过共享内存、远程直接内存RDMA进行通信。云服务节点是由商品机器构建而成,较低成本提供相同性能,具有较高故障率。大型数据中心网络通常基于IP和以太网,以闭合拓扑排列,提供更高的二等分带宽。超级计算机通常使用专门的网络拓扑结构,例如多维网格和环面,这为已知通信模式的hPC工作负载提供更好的性能

不可靠网络分布式系统是无共享系统,通过网络连接一堆机器,一台机器不能访问另一台机器内存或磁盘。无共享并不是构建系统的唯一方式,但是已成为主要方式。相对便宜、不需要特殊的硬件、利用商品化的云计算服务、通过跨多个地理分布的数据中心进行冗余可以实现高可用。

互联网和数据中心中大多数内部网络都是异步分组网络,不保证消息可靠送达。

同步网络:即使数据经过多个路由器,也不会受到排队影响,因为呼叫的16位空间已经在网络的下一挑中保留了下来。而且由于没有排队,网络的最大端到端延迟是固定的。电话网络中的电路与TCP连接有很大不同:电路是固定数量预留带宽,电路建立时没有其他人可以使用,而TCP连接的数据包机会性的使用任何网络带宽。可以给TCP一个可变大小的数据块(网页 电子邮件),尽可能在最短的时间内传输,TCP连接空间时,不使用任何带宽(除了keepalive包)。

为什么数据中心和互联网使用分组交换?针对突发流量,进行优化。一个电路适用于音频或视频通话,通话期间传送相当数量的比特;请求网页没有特定的带宽要求,只需要尽快完成即可。如果想通过电路传输文件,不得不猜测一个带宽分配。太高电路无法建立,太低,会有电路浪费网络容量。相比TCP动态调整数据传输速率以适应可用的网络容量

延迟和资源利用:一般来讲可以将延迟变化为动态资源分区的结果。如果资源是静态分区的,则在某些环境中可以实现延迟保证,但是以降低利用率为代价的。网络中的可变延迟不是一种自然规律,而只是成本/收益权衡的结果。

不可靠的时钟分布式系统中,时间很棘手,因为通信不是即时的,很难确定在涉及多台机器时发生事情的顺序。而且网络上每台机器都有自己的时钟,这是一个硬件设备,通常是石英晶体振荡器。而且是不完全准确的,每台机器都有自己的时间概念。可以在一定程度上同步时钟,最常用的机制是网络时间协议NTP,允许根据一组服务器报告的时间来调整计算机时钟,更精确的可以使用时间源,如GPS机获取时间

单调钟与时钟

时钟 根据某个日历(也称为挂钟时间wall-clock time)返回当前日期和时间,如linux上clock_gettime。java中system.currentTImeMills返回自epoch。虽然时钟被称为实时时钟,但它与OS无关。时钟通常与NTP同步,意味着一台机器时间戳与另一台机器时间戳相同。当本地时钟比NTP服务器跑的快,可能会被强制重置,好像跳回了先前的时间点。这些跳跃以及经常忽略闰秒的事实,使时钟不能用于测量经过的时间。单调钟适用于测量持续时间(时间间隔),如超时或服务响应时间,java中system.nanoTime。具有多个CPU插槽的服务器,每个cpu可能有一个单独计时器,不一定与其他cpu同步。os会补偿所有差异,明智做法不要太信任单调性保证。NTP协议检测到计算机本地石英钟比NTP服务器更快、更慢,可以调整单调钟向前走的频率。NTP允许时钟速率增加或减慢最高至0.05%。NTP不能使单调使用向前向后跳转。

时钟同步与准确性单调钟不需要同步,始终需要根据NTP服务器或其他外部时间源来设置才有用。硬件时钟和NTP可能变幻莫测,计算机中石英钟不够精确,会漂移drifts。漂移取决与机器温度。google假设其服务器每30s与服务器重新同步一次的时钟漂移为6ms,或者每天重新同步的时钟漂移为17s。

如果计算机时钟与NTP服务器时钟差别太大,可能拒绝同步或者本地时钟将被强制重置。NTP服务器意外阻塞,可能在一段时间内忽略错误配置NTP同步只能和网络延迟一样。通过互联网同步,35ms的最小误差是可以实现的。较大的网络延迟会导致NTP客户端完全放弃。闰秒导致59分钟或许有61s。闰秒已经使许多大型系统崩溃说明关于时钟的假设是多么容易溜进系统,处理闰秒的最佳方法是通过在一天中逐渐执行闰秒调整。虚拟机中,硬件时钟会被虚拟化。当一个cpu核心在虚拟机间共享,每个虚拟机都会暂停几十毫秒,而另一个虚拟机正在运行。从应用角度看,这种停顿表现为时钟突然向前跳跃。如果在未完全控制的设备上运行软件(移动、嵌入式设备),坑完全不信任该设备硬件时钟。一些用户估计将其硬件时钟设置为不正确日期和时间,例如为了规避游戏中时间限制,时钟可能会被设置到很远的过去或将来。

欧洲金融机构要求所有高频交易基金在UTC在时间100微秒内同步时钟,以便调试闪崩等市场异常现象,帮助检测市场操纵。很多东西都会导致时钟同步错误,如果NTP守护进程配置错误,或者防火墙阻止了NTP通信,由漂移引起的时钟误差可能很快就变大。

有序事件的时间戳

有的分布式系统会依赖时钟,多个节点存在时钟误差的情况下,会发生冲突。有一种解决冲突的策略:最后写入为准LWW。在多领导者、无领导者数据库如cassandra中被广泛使用。有些是在客户端生成时间戳,但是不能改变LWW基本问题:

数据库写入神秘消失:具有滞后时钟的节点无法用快速时钟覆盖之前由节点写入的值,直到节点之间的时钟偏差过去LWW无法区分高频顺序写入(客户端B的增量操作一定发生在客户端A写入滞后)和真正并发写如。需要额外因果关系跟踪机制(版本向量),以防止因果关系冲突。

所谓逻辑时钟是基于递增计数器而不是震荡石英晶体,对于排序事件是更安全的选择。逻辑时钟不测量一天中的时间或经过的秒数,而仅测量事件相对顺序。相反用来测量实际经过时间的时钟和单调钟被称为物理时钟。

spanner的trueTime API会报告本地时钟的置信区间,当询问当前时间,会得到【最早,最晚】

全局快照的同步时钟

当数据库分布在许多机器上,由于需要协调全局单调递增的事务ID可能很难生成,事务ID必须反映因果关系。在有大量小规模、高频率的事务情景下,分布式系统中创建事务ID成为一个瓶颈。Twitter的snowflake,通过将ID空间的块分配给不同节点近似单调的增加唯一ID。但是无法保证与因果关系一致的排序。

可以使用同步时钟的时间戳作为事务ID,如果能够获得足够好的同步性,这种方法具有很合适的属性,更晚的事务具有更大的时间戳。**spanner以这种方式实现快照隔离,使用TrueTime APi报告时钟置信区间,如果有两个置信区间,每个区间包含最早、最晚时间戳。nodeA=[a,b],nodeb=[c,e]。如果两个区间不重合,肯定有先后顺序。如果重叠,在提交事务时故意等待置信区间长度的时间。**为了保持尽可能短的等待时间,spanner需要保持尽可能小的时钟不确定性。为此google在每个数据中心都部署的一个GPS接收器或原子钟,允许时钟在大约7ms内同步。

暂停进程假设一个数据库,每个分区只有一个领导者,只有领导允许接受写入,一个节点如何知道它仍是领导者,并且可以安全的接受写入。

一种选择是领导者从其他节点获得一个租约lease,类似一个带超时的锁。任一时刻只有一个节点可以持有该租约。租约类就是leader,到期后节点必须周期性续约;这样的设计会依赖同步时钟:租约到期时间由另一台机器设置(如当前时间加上30s,计算到期时间)。并将其与本地系统时钟进行比较,如果时钟超过几秒不同步,就开始做奇怪的事情。 如果颁发者的时钟比接收者的时钟慢,则当接收者认为 lease 已经过期的时候,颁发者依旧认为 lease 有效。接收者可以用在 lease 到期前申请新的 lease 的方式解决这个问题。 如果颁发者的时钟比接收者的时钟快,则当颁发者认为 lease 已经过期的时候,接收者依旧认为 lease 有效,颁发者可能将 lease颁发给其他节点,造成承诺失效,影响系统的正确性。对于这种时钟不同步,实践中的通常做法是将颁发者的有效期设置得比接收者的略大,只需大过时钟误差就可以避免对 lease 的有效性的影响即使协议改为使用本地单调时钟,存在的问题:代码假定在执行剩余时间检查currentTimeMills和实际执行请求process中时间间隔非常短,10s足够确保租约在请求处理到一半不会过期,但是程序执行中出现意外停顿,假如lease.isValid周围停止15s,然后终止。这种情况下,在请求被处理的时候,租约可能已经过期,另一个节点已经接管了leader。直到循环到下一个迭代,可能已经做了一些不安全处理请求。引起线程暂停的原因:gc、虚拟化环境中可以挂起虚拟机并恢复,这个暂停可以在进程执行的任何时候发生,可以持续任意长时间,这个功能可用于虚拟机从一个主机写到另一个主机的实时迁移,不需重启。暂停长度取决于进程写入内存的速率;笔记本关盖子;OS上下文切换;程序同步访问磁盘,等待IO完成;

while(true){    request=getRequest()    if(lease.expireTIme-currentTImeMills<10000){    lease.renew()}    if(lease.isValid()){        process(request)    }}
响应时间保证

某些软件运行环境要求很高,如飞机主控计算机,火箭、机器人。在这些系统中,软件必须有一个特定截止时间,如果截止时间不满足,可能导致整个系统故障,这就是所谓的硬实时。实时操作系统允许在指定时间间隔内保证cpu时间分配,库函数必须记录最坏情况下的执行时间;动态内存分配可能受到限制或完全不允许(实时垃圾收集器存在,但是应用程序仍然必须确保它不会给GC太多负担),必须进行大量测试和测量;这些都需要大量额外的工作,严重限制了可以使用的编程语言、库和工具范围。

限制垃圾收集的影响语言运行在计算垃圾回收时有一定灵活性,可以跟踪对象分配的速度和随着时间的推移剩余的空闲内存。新型的想法是将GC暂停视为一个节点的短暂计划中断,并让其他节点处理来自客户端的请求,同时一个节点正在收集垃圾。一些对延迟敏感的金融交易系统使用这种方法。

知识、真相与谎言

分布式系统与单机不同:没有共享内存,只能通过不可靠网络传递消息,系统可能遭受部分失效,不可靠的时钟和处理暂停。

网络中一个节点无法确切的知道任何事情,只能根据网络接收到的消息进行猜测。节点只能通过交换消息来找出另一个节点所处的状态(存储了哪些数据、是否正确运行)。

这些系统的讨论与哲学有关:系统中什么是真什么是假?如果感知和测量的机制都不可靠,那么关于这些知识又能有多少确定。软件系统应该遵循对物理世界所期望的法则、因果关系么

分布式系统中,可以陈诉关于行为的假设,并以满足这些假设的方式设计实际系统。算法可以被证明在某个系统模型中正确运行。意味底层系统模型提供很少保证也可以实现可靠行为。

分布式算法要具有安全性和活性。唯一性、单调序列就是安全属性,可用性就是活性。安全性和活性对于推理分布式算法的正确性非常有用。

兰伯特时间戳

这种方法可以产生因果关系一致的时间戳。基本思想:每个节点和客户端都跟踪迄今为止见到的最大计数器的值,并在每个请求中包含这个最大计数器的值。一个节点收到最大计数器值大于自身计数器值得请求或响应时,它立即将自己的计数器设置为这个最大值。

光有时间戳排序还不够,比如两个人不能建相同的账户,但是确保全局唯一,还得知道全序何时落地。正确的全序广播算法必须始终保证可靠性、有序性,即使节点或网络出现故障,网络中断,消息无法传递,但是算法可以重试,或者网络修复时,消息能及时送达。

etcd和zk这样的共识服务实现了全序广播,这正是数据库复制所需的,每个消息都代表一次数据库写入,每个副本按相同顺序处理相同写入,副本间保持一致。这个原理是状态机复制。

3、 衍生数据

从高层次看,存储和处理数据的系统可以分为两大类:

记录系统(System of record)。也被称为source of truth,持有数据的权威版本。当新的数据进入首先会记录在这里。衍生数据系统derived data system。通常是另一个系统中现有数据以某种方式进行转换或处理的结果。如果丢失衍生数据,可以从原始源重新创建。

从技术上讲,衍生数据是冗余的,因为重复了已有信息。但是衍生数据对于获得良好的只读性能通常很重要。大多数数据库、存储引擎和查询语言,本质上既不是记录系统也不是衍生系统。数据库只是一个工具,如何使用取决于个人;记录系统和衍生系统之间区别不在于工具,而在于应用程序的使用方式。

批处理

有三种不同系统:

服务(在线系统)。服务等待客户请求或指令到达,每收到一个,服务会试图尽快处理,并发回一个响应。响应时间通常是服务性能的主要衡量指标,可用性非常重要。批处理系统(离线系统)。一个批处理系统有大量输入数据,跑一个作业job来处理,并生成一些输出数据,往往需要一段时间。批量作业通常会定期运行。批处理作业的主要性能衡量标准通常是吞吐量。流处理系统(准实时系统)。像批处理系统一样,流处理消费输入并产生输出,但是流式作业在事件发生后不久就会对事件进行操作,而批处理作业则需要等待固定一组输入数据。

批处理是一种很古老的计算方式,早在可编程数字计算机出现前,打孔卡制表机实现了半机械化的批处理形式。MapReduce和其他一些批处理算法和框架,并探索在现代数据系统中的作用。Unix的思想和经验教训可以迁移到大规模、异构的分布式数据库系统中。

Unix中cat、awk、head等工具能在积分中内处理几GB日志文件。另一方面如果作业工作集大于可用内存,则排序方法的优点是可以高效使用磁盘。与在SSTable和LSM树种讨论的原理一样;数据块可以在内存中排序作为段文件写入磁盘,灾后合并为一个更大的排序文件。归并排序具有在磁盘上运行酿好的顺序访问模式。linux中sort通过溢出至磁盘的方式来自动应对大于内存的数据集,并同时使用多个CPU核进行并行排序。意味着unix命令很容易扩展到大数据集,且不会耗尽内存。瓶颈可能是从磁盘读取输入文件的速度。

Unix哲学

Unix管道的发明者描述:当需要将消息从一个程序传递到另一个程序时,需要一种类似水管的拼接程序的方式。I/O也应该按照这种方式进行。Unix哲学:

每个程序做好一件事。要做一件新的工作,写一个新程序,而不是通过添加功能让老程序复杂化;期待每个程序的输出称为另一个程序的输入。不要将无关信息混入输出,避免使用严格的列数据或二进制输入格式,不要坚持交互式输入;设计和构建软件、OS,要尽早尝试,最好几周内完成。扔掉笨拙的部分。优先使用工具来减轻编程任务。即使必须曲线救国编写工具。

这种自动化、快速原型设计、增量式迭代、对试验友好,将大型项目分解成可管理的块,很像现在的敏捷开发、DevOps。

统一的接口

如果想要一个程序的输出称为另一个程序的输入,意味这些程序必须使用相同的数据格式--一个兼容的接口。如果想要将任何程序的输出连接到任何程序的输入,意味所有程序都必须使用相同IO接口。Unix中这种接口是一个文件file(一个文件描述符)。一个文件只是一串有序的字节序列。统一接口的另一个例子是URL和HTTP。

MapReduce和分布式文件系统

和大多数Unix工具一样,运行MapReduce作业通常不会修改输入,除了生成输出外没有任何副作用。虽然unix使用stdin和stout作为输入、输出,但是MR在HDFS上读写文件,一个google文件系统GFS的开源实现。与网络连接存储NAS和存储区域网络SAN架构等的共享磁盘方案相比,HDFS基于无共享原则。共享磁盘存储由集中式存储设备实现,通常使用定制硬件和专用网络基础设施(光纤通信到),而无共享不需要特殊硬件

处理倾斜如果存在与单个键关联的大量数据,在单个reducer中收集与某个名流相关的所有活动可能导致严重倾斜,也称为热点hot spot。由于MR作业只有在所有Mapper和Reducer都完成时才完成,所以后续作业必须等待最慢的Reducer才能进行。

Pig中首先运行一个抽样作业来确定哪些是热键。Mapper会将热键关联记录随机到几个reducer之一,与热键相关的记录需要被复制到所有处理该键的reducer上。这样可以更好的并行化,代价是需要连接另一侧的输入记录复制到多个reducer上。Hive采用另一个方法,在表格元数据中显示指定热键,并将与这些键相关的记录单独存放,与其他文件分开,对于热键,使用Map端连接。Map端连接

Mapper扮演着预处理输入数据的角色:从每个输入记录中提取键值,将键值对分配给reducer分区,按键值排序。Reduce端的优点是不需要对输入数据做任何假设,无论其属性如何,mapper都可以对其预处理以备连接。不利的是,排序、复制至reducer以及合并reducer输入,开销巨大。

如果能对输入数据做出某些假设,通过使用所谓的map端连接加快连接速度是可行的,这种方法使用了一个阉掉reduce与排序Mapreduce作业,每个mapper只是简单从分布式文件系统读取一个输入文件快,然后将输出文件写入文件系统。

广播散列连接

适用于Map端连接的最简单的场景是最大数据集与小数据集连接的情况。小数据集需要足够小,可以将其全部加载到每个Mapper的内存中。参与连接的较大输入的每个文件各有一个Mapper。这种简单有效的连接叫做广播散列连接:每个连接较大输入端分区的Mapper都会将较小输入端数据集整个读入内存。Pig中是复制链接,hive中叫MapJoin。也被Impala的数据仓库查询引擎使用。另一种方法是将较小输入存储在本地磁盘上的只读索引中,索引中经常使用的部分保留在OS的页面缓存中,几乎可以提供与内存散列表几乎一样的随机查找性能

广播散列连接

如果map端连接的输入以相同的方式进行区分,则散列连接方法可以独立应用于每个分区。比如可以用根据用户ID最后一位十进制数字来对活动事件和用户数据库进行分区,如Mapper3首先将所有具有3结尾的ID的用户加载到散列表中,然后扫描ID为3的每个用户活动事件。如果分区无误,可能需要连接的记录都落在同一个编号的分区中。因此每个Mapper只需要从输入两端各读取一个分区就足够,好处是每个Mapper都可以在内存散列表中少放点数据。这种方法只有连接两端输入有相同的分区树,且两侧记录使用相同的键和相同哈希函数才使用。

Map端合并连接

如果输入数据集不仅以相同的方式进行分区,而且还基于相同的键进行排序,还可以使用另一种map端连接的变体。这种情况下,输入是否小到能放入内存并不重要,因为这时候Mapper同样可以执行归并操作:按键递增的顺序依次读取两个输入文件,将具有相同键的记录配对。如果能进行Map端合并连接,通常意味着前一个MR作业可能一开始已经把输入数据做了分区并进行排序。

MR工作流与Map端连接

当下游作业使用MR连接的输出时,选择map端连接或reduce端连接会影响输出的结构,reduce端连接的输出是按照连接键进行分区和排序的,而map端连接的输出则按照与较大输入相同的方式进行分区和排序。(无论是使用分区连接还是广播连接,连接较大输入端的每个文件块都会启动一个map任务)

Map端连接也对输入集的大小、有序性和分区做出了很多假设。在优化连接策略时,了解分布式文件系统中数据集的物理布局很重要,仅仅知道编码格式和数据存储目录是不够的,还必须知道数据是按哪些键做的分区和排序、以及分区的数量。

批处理工作流的输出

批处理不属于事务处理,也不是分析,批处理通常会扫过输入数据集绝大部分,输出的通常不是报表而是一些其他类型的结构。

建立搜索引擎。google最初使用MR为其搜索引擎建索引,至今MR仍是为Lucene/Solr构建索引的好方法。如果索引文档集合发生改变,一种选择是定期重跑整个索引工作流,并在完成后用新的索引文件批量替换以前的索引文件。优点:文档进、索引出。另一种选择,增量建立索引,如果在索引中添加、删除、更新文档,lucene会写新的段文件,并在后台异步合并压缩段文件。键值存储作为批处理输出。另一个用途是构建机器学习系统,比如分类器(垃圾邮件过滤、异常检测、图像识别)与推荐系统。批处理输出的哲学

Unix哲学中鼓励以显示指明数据流的方式进行试验,程序读取输入并写入输出。这一过程中,输入保持不变,任何先前的输出都被新输出完全替换,没有其他副作用。

MR作业输出处理遵循同样的原理,通过将输入视为不可变且避免副作用,批处理作业不仅实现良好的性能,而且更容易维护:

代码中引入了错误,输出错误,简单回滚到先前版本重新运行该作业即可。由于回滚容易,这种最小化不可逆性的原则有利于敏捷软件开发。如果Map或Reduce任务失败,MR框架会自动重新调度,并在同样的输入上再次运行。如果是代码错误,会在几次重试后失败。同一组文件可用作各种不同作业的输入,包括计算指标的监控作业可以评估作业的输出。与Unix类似,MR作业将逻辑与布线(配置输入和输出目录)分离,使得关注点分离,可以重用代码。存储多样性

数据库要求根据特定的模型(关系或文档)来构造数据,而分布式文件系统中文件只是字节序列,可以使用任何数据模型和编码来编写,可以使记录的集合、文本、图像、视频、稀疏矩阵、特征向量。

Hadoop开放了将数据不加区分的存储到HDFS的可能性,允许后续进一步处理,相比将数据导入数据库专有存储格式前,MPP数据库通常需要对数据和查询模式进行仔细的前期建模。

Hadoop这种方式与数据仓库类似,将大型组织各个部分的数据集中在一起是很有价值的,可以跨越以前分离的数据集进行连接,mapp数据库要求的谨慎模式设计拖慢了集中式数据收集速度。以原始形式收集数据,再操心模式的设计,能使数据收集速度加快,有时也被称为数据湖data lake或企业数据中心。

处理模型多样性

MPP数据库是单体的、紧密集成的软件,负责磁盘上的存储布局、查询计划、调度和执行。

另一方面,并非所有类型的处理都可以合理的表达为SQL查询,比如要构建机器学习、推荐系统或者使用相关性排名模型的全文搜索索引、或者图像分析,很可能需要更一般的数据处理模型。

针对直接使用MR的困难,MR上有很多高级编程模型被创造出来如Pig、Hive、Cascading、Crunch。但是MR执行模型本身也有一些问题,这些问题并没有通过增加另一个抽象层次而解决,对于某些类型的处理,表现的很差经。

一方面,MR非常稳健,可以使用它在任务会频繁终止的多租户系统上处理几乎任意大量级的数据。另外对于某些类型的处理而言1,其他工具有时会快几个数量级

很多情况下,知道一个作业的输出只能用作另一个作业的输入,这些作业由一个团队维护,这种情况下,HDFS上的文件只是中间状态:一种将数据从一个作业传递到下一个作业的方式,一个构建推荐系统的作业可能由50-100个MR作业组成的复杂工作流,存在很多中间状态。将这个中间状态写入文件的过程称为物化,**意味着对某个操作结果立即求职并写出来,而不是请求时按需计算。**Unix管道讲一个命令的输出与另一个命令输入连接起来,管道并没有完全物化中间状态,而只是使用一个小的内存缓冲区,将输出增量地流向输出。

与Unix管道相比,MR完全物化中间状态的不足之处:

MR只有在前驱作业所有任务完成才能启动,而unix管道连接的进行会同时启动,输出一旦生成就会被消费。不同机器上的数据倾斜会拖慢整个工作流的执行Mapper通常是多余的,仅仅是读取刚刚由reducer写入的同样的文件,为下一个阶段的分区和排序做准备。许多情况下,Mapper可能是前驱reducer一部分,如果reducer和mapper输出有着相同的分区和排序方式,那么reducer就可以直接串在一起,而不用与mapper相互交织。中间状态存储在HDFS中,意味着要复制到多个节点,完全没必要数据流引擎

为了解决MR这些问题,几种用于分布式批处理的新执行引擎被开发出来,spark、Tez、Flink,设计方式有很多区别,但是有一个共同点:把整个工作流作为单个作业来处理,而不是把它分解为独立子作业。

由于它们将工作流显示建模为数据从几个处理阶段穿过,所以这些系统被称为数据流引擎,像MR它们在一条线上通过反复调用用户定义的函数来处理记录,通过分区来并行化载荷,通过网络将一个函数的输出复制到另一个函数的输入。

与MR不同,这些功能不需要严格扮演交织的Map与Reduce的角色,而是可以用更灵活的方式进行组合。称这些函数为算子operators,数据流引擎提供了几种不同的选项来将一个算子的输出连接到另一个算子的输入:

一种选项是对记录按键重新分区并排序,就像在MR的混洗阶段一样,这种功能可用于实现排序合并连接和分组,就像在MR中一样另一种是接受多个输入,以相同方式进行分区,但跳过排序。当记录的分区重要但顺序无关紧要时,省去了分区散列连接的工作,因为构建散列表会把顺序随机打乱;对于广播散列连接,可以将一个算子的输出发送到连接算子的所有分区;

这种类型的处理引擎与MR模型相比,有几个优点:

排序等昂贵的工作只需要在实际需要的地方执行,而不是默认在每个map和reduce阶段之间出现没有不必要的map任务,因为Mapper所做的工作通常可以合并到前面的reduce算子中(因为Mapper不会更改数据集的分区)由于工作流中所有连接和数据依赖都是显示声明的,因此调度程序能够纵览全局,知道哪里需要哪些数据,因而能够利用局部性进行优化。例如可以尝试将消费某些数据的任务放在与生成这些数据的任务的相同机器上,从而数据可以通过共享内存缓冲区传输,不必通过网络复制;算子的中间状态足以保存在内存或写入本地磁盘,比写入HDFS需要更少的IO(复制到多台机器)。MR已经对Mapper的输出做了这种优化,但是数据流引擎将这种思想推广至所有的中间状态;算子可以在输入就绪后立即开始执行;后续阶段无需等待前驱阶段整个完成再开始;与MR(为每个任务启动一个新的jvm)相比,现有jvm进程可以重用来运行新算子,减少启动开销;

可以使用数据流引擎执行与MR工作流同样的计算,由于所做的优化速度也会快很多;既然算子是Map和reduce的泛化,那么相同的处理代码可以在任一执行引擎上运行。

容错

完全物化中间状态是分布式文件系统的一个优点:具有持久性,使得MR的容错很容易,一个任务失败,可以在另一台机器上重启,并从文件系统重新读取相同的输入。spark、flink、tez避免将中间状态写入HDFS,采取了不同策略容错,如果一台机器发生故障,会从其他可用的数据重新计算。

为了实现这种重新计算,框架必须跟踪一个给定的数据是如何计算的--使用了哪些分区、应用哪些算子。spark使用弹性分布式数据集RDD的抽象来跟踪数据的谱系,flink对算子的状态存档,允许恢复运行在执行过程中遇到的错误算子。

在重新计算数据时,重要的是要知道计算是否是确定性的:给定相同的输入数据,算子是否始终产生相同的输出,如果一些丢失的数据已经发送给下游算子,这个问题就很重要。如果算子重新启动,重新计算的数据与原有的丢失数据不一致,下游算子很难解决新旧数据之间的矛盾。对于不确定的算子,解决方法:通常是杀死下游算子,再重跑新数据。

为了避免这种级联故障,最好让算子具有确定性,非确定性行为如:迭代哈希表的元素时不能对顺序做出保证、许多概率和统计算法显示依赖于使用随机数,以及用到系统时钟或外部数据源,这些都是不确定行为。为了能可靠的从故障中恢复,需要消除这种不确定因素,比如使用固定种子生成伪随机数。

通过重计算数据来从故障中恢复并不总是正确的答案:如果中间状态数据比源数据小得多或者计算量非常大,那么中间数据物化为文件可能要比重新计算廉价的多。

关于物化的讨论

MR就像是将每个命令的输出写入临时文件,而数据流引擎更像是Unix管道,尤其Flink是基于管道执行的思想而建立的,即将算子的输出增量的传递给其他算子,不待输入完成就开始处理。

排序算子需要消费全部输入后才能生成任何输入,因为输入中最后一条输入记录可能具有最小的键,因此需要作为第一条记录输出。因此任何需要排序的算子都需要至少暂时的累计状态,工作流的许多其他部分都可以以流水线方式执行。

图与迭代处理

批处理上下文的图很有趣,目标是在整个图上执行某种离线处理或分析。这种需求经常出现在机器学习应用或排序系统中。如最著名的图形分析算法之一的PageRank,试图根据链接到某个网页的其他网页来估计该网页的流行度,作为配方的一部分,用于确定网络搜索引擎呈现结果的排序。

像Spark、Flink、Tez这样的数据流引擎通常将算子作为有向无环图DAG的一部分安排在作业中,与图处理不一样:在数据流引擎中,从一个算子到另一个算子的数据流被构造成一个图,而数据本身通常由关系型元祖构成,在图处理中,数据本身具有图的形式。

许多图算法是通过一次遍历一条边来表示,将一个顶点与近邻的顶点连接起来,以传播一些信息不断重复,直到满足一些条件为止,如直到没有更多的边要跟进或直到一些指标收敛。比如重复跟进标明地点归属关系的边,生成数据库中北美包含的所有地点列表。(这种算法也被称为闭包传递)

可以在分布式文件系统中存储图(包含顶点和边的列表的文件),但是这种重复至完成的想法不能用MR表示,因为只扫过一趟数据,通常以迭代风格实现:

外部调度程序运行批处理来计算算法的一个步骤当批处理过程完成时,调度器检查它是否完成(基于完成条件,如没有更多的边要跟进,或与上次迭代相比的变化低于某个阈值)如果尚未完成,则调度程序返回到步骤1运行另一轮批处理

这种方法有效但是MR实现往往很低效,因为MR没考虑迭代性质:它总是读取整个输入数据集,即使与上次迭代相比改变的仅仅是图中一小部分。

Pregel处理模型

针对图批处理的优化---批量同步并行BSP计算模型已经开始流行起来,其中Apache Giraph、Spark的Graph X和Flink的Gelly API实现了它,也被称为Pregel模型,因为google的pregel论文推广了这种处理图的方法。

MR中,mapper在概念上向reducer的特定调用发送消息,因为框架将所有具有相同键的mapper输出集中在一起,Pregel后面有一个类似想法:一个顶点可以向另一个顶点发送消息,通常这些消息是沿着图的边发送的

每次迭代中,为买个顶点调用一个函数,将所有发送给它的消息传递给它,就像调用reducer一样,与MR不同之处在于,在Pregel模型中,顶点在一次迭代到下一次迭代过程中会记住它的状态,所以这个函数只需要处理新的传入消息。如果图的某个部分没有被发送消息,那里就不需要做任何工作。

与Actor模型有些相似,除了顶点状态和顶点之间的消息具有容错性和耐久性,且通信以固定的方式进行:在每次迭代中,框架递送上次迭代中发送的所有消息,actor通常没有这样的时间保证。

容错

顶点只能通过消息传递有助于提高pregel作业的性能,因为消息可以批处理。唯一的等待是在迭代之间:由于pregel模型保证所有在一轮迭代中发送的消息都在下轮迭代中送达,所以在下一轮迭代开始前,先前的迭代必须完全完成,而所有消息必须在网络上完成复制。即使底层网络可能丢失、重复或任意延迟消息,pregel的实现能保证在后续迭代中,消息在其目标顶点恰好处理一次。即使底层网络可能丢失、重复、任意延迟,也能保证在后续迭代中消息在其目标顶点恰好处理一次。像MR一样,框架能从故障中透明的恢复,以简化在pregel上实现算法的编程模型。

这种容错通过在迭代结束,定期存档所有顶点状态实现,即将其全部状态写入持久化存储。如果某个节点发生故障且其内存中状态丢失,简单的解决方法是将整个图计算回滚到上一个存档点

并行执行

顶点不需要知道它在哪台物理机器上执行,当它向其他顶点发送消息时,只是简单将消息发往某个顶点ID。图的分区取决于框架,即哪个顶点运行在哪台机器上,以及如何通过网络路由消息。理想情况需要大量通信,最好能被分区到同一台机器上,实践中经常按照任意分配顶点ID分区,图算法通常会有跨机通信的额外开销,而中间状态往往比原始图大,通过网络发送消息的开销会显著拖慢分布式图算法速度。

声明式查询语言

与硬写连接代码相比,指定连接关系算子的优点:框架可以分析连接输入的属性,并自动决定哪种上述连接算法最适合当前任务。甚至可以改变连接顺序,最小化中间状态的数量。

MR及其后继者数据流在其他方面与SQL的完全声明式查询模型有很大区别,MR是围绕着回调函数的概念产生的:对于每条记录或一组记录,调用一个用户定义的函数,并且该函数可以自由的调用任意代码来决定输出什么。优点:可以基于大量已有库的生态系统创作:解析、自然语言分析、图像分析以及运行数值算法或统计计算法。

**自由运行任意代码长期以来都是传统MR批处理系统与MPP数据库的区别所在。**虽然数据库具有编写用户定义函数的功能,但是使用麻烦,而且与大多数编程语言中广泛使用的程序包管理器和依赖管理系统兼容不佳。(java的mvn js的npm ruby的gems)

数据流引擎支持除连接之外的更多声明特性还有其他优势,如果一个回调函数只包含一个简单过滤条件、或者只是从一条记录选择一些字段,那么在为每条记录调用函数时会有相当大CPU开销。如果以声明式方式表示这些简单的过滤和映射操作,那么查询优化器可以利用面向列的存储布局,只从磁盘读取所需的列。hive、spark dataframes和Impala还使用了向量化执行,在对cpu缓存友好的内部循环中迭代数据,避免函数调用。Spark生成jvm字节码,impala使用LLVM为这些内部循环生成本机代码。 通过在高级API引入声明式部分,批处理框架看起来越来越像MPP数据库了;同时通过拥有运行任意代码和以任意格式读取数据的可扩展性,保持了灵活性的优势。

流处理

“流”是指随着时间推移可用的数据,这个概念出现在很多的地方:Unix的stdin、stdout,编程语言的惰性列表、文件系统API(如java的FileInputStream)、Tcp连接,通过互联网传递音频和视频。将事件流event stream视为一种数据管理机制:无界限 增量处理。

发布/订阅模式中,不同系统采取各种各样方法,并没有针对所有的目的的通用答案:

如果生产者发送消息的速度比消费者处理的速度快,有三种选择:系统丢掉消息,将消息放入缓冲队列或使用背压(backpressure)【也称为流量控制flow control】,即阻塞生产者,以免发送更多消息。例如Unix和TCP使用背压:它们有一个固定大小的小缓冲区,如果填满,发送者会被阻塞,直到接收者从缓冲区取出数据。如果节点崩溃或暂时脱机,是否有消息丢失?与数据库一样,持久性可能需要写入磁盘/或复制的某种组合。如果能接受有时消息会丢失,则可能在同一硬件上获得更高的吞吐量和更低的延迟。

直接从生产者传递给消费者(不通过中间节点):

UDP组播广泛用于金融行业,例如股票市场,其中低时延很重要,虽然UDP本身不可靠,但应用层协议可以恢复丢失的数据包无代理的消息库,如ZeroMQ和nanomsg采取类似的方法,通过TCP或IP多播实现发布/订阅消息传递;StatsD和Brubeck使用不可靠的UDP消息传递来收集网络中所有机器的指标并对其进行监控。如果消费者在网络上公开了服务,生产者可以直接发送HTTP或RPC请求。

尽管这些直接消息传递在设计它们的环境中运行良好,但是它们通常要求应用代码意识到消息丢失的可能性。容错程度极为有限:即使协议检测到并重传在网络中丢失的数据包,它们通常也只是假设生产者和消费者始终在线。如果消费者处于脱机状态,可能会丢失其不可达时发送的消息。

消息代理

一种广泛的替代方法是通过消息代理发送消息,消息代理实质上是一种针对处理消息流而优化的数据库。排队的结果是,消费者通常是异步的,会有一定的延迟。

周期性的完整数据转储过于缓慢,有时会使用双写double write,其中应用代码在数据变更时明确写入每个系统,例如先写入数据库,再更新搜索引擎,然后使缓存失效。双写会产生竞争条件。

变更数据捕获

数十年来,许多数据库根本没有记录在档的,获取变更日志的方式。由于这个原因,捕获数据库中的所有变更,然后将其复制到其他存储引擎中(搜索索引、缓存、数据仓库)。

处理流以产生其它衍生流,处理这样的流的代码片段,被称为算法operator或作业job。数据流的模式是相似的:一个流出器以只读的方式使用输入流,并将其输出已追加的方式写入一个不同的位置。

流处理的应用,一直用于监控目的,如果某个事件发生,单位希望得到警报:

欺诈检测交易系统的价格变化制造系统需要监控工厂机器的状态军事和情报系统需要跟踪潜在的侵略者活动流分析

使用流处理的另一个领域是对流进行分析,CEP(complex event and processing)符合事件处理与流分析的边界是模糊的,分析往往对找出特定事件序列并不关心,而更关注大量事件上的聚合和统计指标:

测量某种类型事件的速率滚动计算一段时间窗口内某个值得平均值将当前统计值与先前的时间区间的值对比(如检测趋势 当指标与上周同比异常偏离或偏低时报警)

这些统计值通常是在固定时间区间内进行计算的,如过去5min内服务每秒查询次数的均值,以及此时间段内响应时间的第99百分位点,**有时系统会使用概率算法,如Bloom filter、HyperLogLog用于基数估计以及各种百分比估计算法。**概率算法产出近似值的结果,比起精确算法的优点是内存使用要少得多。

窗口的类型滚动窗口Tumbling window。滚动窗口有着固定长度,比如10:00和10:05都会被分组到一个窗口跳动窗口(Hopping Window)。跳动窗口也有着固定的长度,但允许窗口重叠以提供一些平滑。窗口之间有交叉。滑动窗口。并不断从窗口移除过期的旧事件。会话窗口。将同一用户出现时间相近的所有时间分组在一起。

流式连接(窗口连接):在流中搜索结果。 流表连接,如用户活动事件和用户档案数据库。表表连接。

数据系统的未来

数据集成的目标,确保数据最终能在所有正确的地方表现出正确的形式。需要消费输入、转换、连接、过滤、聚合、训练模型、评估。批处理和流处理的输出是衍生数据集,例如搜索索引、物化视图,向用户显示的建议,聚合指标等等;

spark在批处理引擎上执行流处理,将流分解为微批次,flink则在流处理引擎上执行批处理。原则上一种类型的处理可以用另一种类型来模拟,但是性能特征会有所不同。

Lambda架构

Lambda架构的核心思想是通过将不可变事件附加到不断增长的数据集来记录传入数据,这类似于时间溯源。为了从这些事件中衍生出读取优化的视图,Lambda架构建议并行运行两个不同系统:批处理系统和独立的流处理系统。

标签: #拓扑排序算法应用