首页 » Netty实战 » Netty实战全文在线阅读

《Netty实战》第15章 案例研究,第二部分

关灯直达底部

本章主要内容

  • Facebook的案例研究

  • Twitter的案例研究

在本章中,我们将看到Facebook和Twitter(两个最流行的社交网络)是如何使用Netty的。他们都利用了Netty灵活和通用的设计来构建框架和服务,以满足对极端伸缩性以及可扩展性的需求。

这里所呈现的案例研究都是由那些负责设计和实现所述解决方案的工程师所撰写的。

15.1 Netty在Facebook的使用:Nifty和Swift[1]

Andrew Cox,Facebook软件工程师

在Facebook,我们在我们的几个后端服务中使用了Netty(用于处理来自手机应用程序的消息通信、用于HTTP客户端等),但是我们增长最快的用法还是通过我们所开发的用来构建Java的Thrift服务的两个新框架:Nifty和Swift。

15.1.1 什么是Thrift

Thrift是一个用来构建服务和客户端的框架,其通过远程过程调用(RPC)来进行通信。它最初是在Facebook开发的[2],用以满足我们构建能够处理客户端和服务器之间的特定类型的接口不匹配的服务的需要。这种方式十分便捷,因为服务器和它们的客户端通常不能全部同时升级。

Thrift的另一个重要的特点是它可以被用于多种语言。这使得在Facebook的团队可以为工作选择正确的语言,而不必担心他们是否能够找到和其他的服务相互交互的客户端代码。在Facebook,Thrift已经成为我们的后端服务之间相互通信的主要方式之一,同时它还被用于非RPC的序列化任务,因为它提供了一个通用的、紧凑的存储格式,能够被多种语言读取,以便后续处理。

自从Thrift在Facebook被开发以来,它已经作为一个Apache项目(http://thrift.apache.org/)开源了,在那里它将继续成长以满足服务开发人员的需要,不止在Facebook有使用,在其他公司也有使用,如Evernote和last.fm[3],以及主要的开源项目如Apache Cassandra和HBase等。

下面是Thrift的主要组件:

  • Thrift的接口定义语言(IDL)——用来定义你的服务,并且编排你的服务将要发送和接收的任何自定义类型;
  • 协议——用来控制将数据元素编码/解码为一个通用的二进制格式(如Thrift的二进制协议或者JSON);
  • 传输——提供了一个用于读/写不同媒体(如TCP套接字、管道、内存缓冲区)的通用接口;
  • Thrift编译器——解析Thrift的IDL文件以生成用于服务器和客户端的存根代码,以及在IDL中定义的自定义类型的序列化/反序列化代码;
  • 服务器实现——处理接受连接、从这些连接中读取请求、派发调用到实现了这些接口的对象,以及将响应发回给客户端;
  • 客户端实现——将方法调用转换为请求,并将它们发送给服务器。

15.1.2 使用Netty改善Java Thrift的现状

Thrift的Apache分发版本已经被移植到了大约20种不同的语言,而且还有用于其他语言的和Thrift相互兼容的独立框架(Twitter的用于Scala的Finagle便是一个很好的例子)。这些语言中的一些在Facebook多多少少有被使用,但是在Facebook最常用的用来编写Thrift服务的还是C++和Java。

当我加入Facebook时,我们已经在使用C++围绕着libevent,顺利地开发可靠的、高性能的、异步的Thrift实现了。通过libevent,我们得到了OS API之上的跨平台的异步I/O抽象,但是libevent并不会比,比如说,原始的Java NIO,更加容易使用。因此,我们也在其上构建了抽象,如异步的消息通道,同时我们还使用了来自Folly[4]的链式缓冲区尽可能地避免复制。这个框架还具有一个支持带有多路复用的异步调用的客户端实现,以及一个支持异步的请求处理的服务器实现。(该服务器可以启动一个异步任务来处理请求并立即返回,随后在响应就绪时调用一个回调或者稍后设置一个Future。)

同时,我们的Java Thrift框架却很少受到关注,而且我们的负载测试工具显示Java版本的性能明显落后于C++版本。虽然已经有了构建于NIO之上的Java Thrift框架,并且异步的基于NIO的客户端也可用。但是该客户端不支持流水线化以及请求的多路复用,而服务器也不支持异步的请求处理。由于这些缺失的特性,在Facebook,这里的Java Thrift服务开发人员遇到了那些在C++(的Thrift框架)中已经解决了的问题,并且它也成为了挫败感的源泉。

我们本来可以在NIO之上构建一个类似的自定义框架,并在那之上构建我们新的Java Thrift实现,就如同我们为C++版本的实现所做的一样。但是经验告诉我们,这需要巨大的工作量才能完成,不过碰巧,我们所需要的框架已经存在了,只等着我们去使用它:Netty。

我们很快地组装了一个服务器实现,并且将名字“Netty”和“Thrift”混在一起,为新的服务器实现提出了“Nifty”这个名字。相对于在C++版本中达到同样的效果我们所需要做的一切,那么少的代码便可以使得Nifty工作,这立即就让人印象深刻。

接下来,我们使用Nifty构建了一个简单的用于负载测试的Thrift服务器,并且使用我们的负载测试工具,将它和我们现有的服务器进行了对比。结果是显而易见的:Nifty的表现要优于其他的NIO服务器,而且和我们最新的C++版本的Thrift服务器的结果也不差上下。使用Netty就是为了要提高性能!

15.1.3 Nifty服务器的设计

Nifty(https://github.com/facebook/nifty)是一个开源的、使用Apache许可的、构建于Apache Thrift库之上的Thrift客户端/服务器实现。它被专门设计,以便无缝地从任何其他的Java Thrift服务器实现迁移过来:你可以重用相同的Thrift IDL文件、相同的Thrift代码生成器(与Apache Thrift库打包在一起),以及相同的服务接口实现。唯一真正需要改变的只是你的服务器的启动代码(Nifty的设置风格与Apache Thrift中的传统的Thrift服务器实现稍微有所不同)。

1.Nifty的编码器/解码器

默认的Nifty服务器能处理普通消息或者分帧消息(带有4字节的前缀)。它通过使用自定义的Netty帧解码器做到了这一点,其首先查看前几个字节,以确定如何对剩余的部分进行解码。然后,当发现了一个完整的消息时,解码器将会把消息的内容和一个指示了消息类型的字段包装在一起。服务器随后将会根据该字段来以相同的格式对响应进行编码。

Nifty还支持接驳你自己的自定义编解码器。例如,我们的一些服务使用了自定义的编解码器来从客户端在每条消息前面所插入的头部中读取额外的信息(包含可选的元数据、客户端的能力等)。解码器也可以被方便地扩展以处理其他类型的消息传输,如HTTP。

2.在服务器上排序响应

Java Thrift的初始版本使用了OIO套接字,并且服务器为每个活动连接都维护了一个线程。使用这种设置,在下一个响应被读取之前,每个请求都将在同一个线程中被读取、处理和应答。这保证了响应将总会以对应的请求所到达的顺序返回。

较新的异步I/O的服务器实现诞生了,其不需要每个连接一个线程,而且这些服务器可以处理更多的并发连接,但是客户端仍然主要使用同步I/O,因此服务器可以期望它在发送当前响应之前,不会收到下一个请求。这个请求/执行流如图15-1所示。

图15-1 同步的请求/响应流

客户端最初的伪异步用法开始于一些Thrift用户利用的一项事实:对于一个生成的客户端方法foo来说,方法send_foorecv_foo将会被单独暴露出来。这使得Thrift用户可以发送多个请求(无论是在多个客户端上,还是在同一个客户端上),然后调用相应的接收方法来开始等待并收集结果。

在这个新的场景下,服务器可能会在它完成处理第一个请求之前,从单个客户端读取多个请求。在一个理想的世界中,我们可以假设所有流水线化请求的异步Thrift客户端都能够处理以任意顺序到达的这些请求所对应的响应。然而,在现实生活中,虽然新的客户端可以处理这种情况,而那些旧一点的异步Thrift客户端可能会写出多个请求,但是必须要求按顺序接收响应。

这种问题可以通过使用Netty 4的EventExecutor或者Netty 3.x中的OrderedMemory-AwareThreadPoolExcecutor解决,其能够保证顺序地处理同一个连接上的所有传入消息,而不会强制所有这些消息都在同一个执行器线程上运行。

图15-2展示了流水线化的请求是如何被以正确的顺序处理的,这也就意味着对应于第一个请求的响应将会被首先返回,然后是对应于第二个请求的响应,以此类推。

图15-2 对于流水线化的请求的顺序化处理的请求/响应流

尽管Nifty有着特殊的要求:我们的目标是以客户端能够处理的最佳的响应顺序服务于每个客户端。我们希望允许用于来自于单个连接上的多个流水线化的请求的处理器能够被并行处理,但是那样我们又控制不了这些处理器完成的先后顺序。

相反,我们使用了一种涉及缓冲响应的方案;如果客户端要求保持顺序的响应,我们将会缓冲后续的响应,直到所有较早的响应也可用,然后我们将按照所要求的顺序将它们一起发送出去。见图15-3所示。

图15-3 对于流水线化的请求的并行处理的请求/响应流

当然,Nifty包括了实实在在支持无序响应的异步Channel(可以通过Swift使用)。当使用能够让客户端通知服务器此客户端的能力的自定义的传输时,服务器将会免除缓冲响应的负担,并且将以请求完成的任意顺序把它们发送回去。

15.1.4 Nifty异步客户端的设计

Nifty的客户端开发主要集中在异步客户端上。Nifty实际上也提供了一个针对Thrift的同步传输接口的Netty实现,但是它的使用相当受限,因为相对于Thrift所提供的标准的套接字传输,它并没有太多的优势。因此,用户应该尽可能地使用异步客户端。

1.流水线化

Thrift库拥有它自己的基于NIO的异步客户端实现,但是我们想要的一个特性是请求的流水线化。流水线化是一种在同一连接上发送多个请求,而不需要等待其响应的能力。如果服务器有空闲的工作线程,那么它便可以并行地处理这些请求,但是即使所有的工作线程都处于忙绿状态,流水线化仍然可以有其他方面的裨益。服务器将会花费更少的时间来等待读取数据,而客户端则可以在一个单一的TCP数据包里一起发送多个小请求,从而更好地利用网络带宽。

使用Netty,流水线化水到渠成。Netty做了所有管理各种NIO选择键的状态的艰涩的工作,Nifty则可以专注于解码请求以及编码响应。

2.多路复用

随着我们的基础设施的增长,我们开始看到在我们的服务器上建立起来了大量的连接。多路复用(为所有的连接来自于单一的来源的Thrift客户端共享连接)可以帮助减轻这种状况。但是在需要按序响应的客户端连接上进行多路复用会导致一个问题:该连接上的客户端可能会招致额外的延迟,因为它的响应必须要跟在对应于其他共享该连接的请求的响应之后。

基本的解决方案也相当简单:Thrift已经在发送每个消息时都捎带了一个序列标识符,所以为了支持无序响应,我们只需要客户端Channel维护一个从序列ID到响应处理器的一个映射,而不是一个使用队列。

但是问题的关键在于,在标准的同步Thrift客户端中,协议层将负责从消息中提取序列标识符,再由协议层协议调用传输层,而不是其他的方式。

对于同步客户端来说,这种简单的流程(如图15-4所示)能够良好地工作,其协议层可以在传输层上等待,以实际接收响应,但是对于异步客户端来说,其控制流就变得更加复杂了。客户端调用将会被分发到Swift库中,其将首先要求协议层将请求编码到一个缓冲区,然后将编码请求缓冲区传递给Nifty的Channel以便被写出。当该Channel收到来自服务器的响应时,它将会通知Swift库,其将再次使用协议层以对响应缓冲区进行解码。这就是图15-5中所展示的流程。

图15-4 多路复用/传输层

图15-5 派发

15.1.5 Swift:一种更快的构建Java Thrift服务的方式

我们新的Java Thrift框架的另一个关键部分叫作Swift。它使用了Nifty作为它的I/O引擎,但是其服务规范可以直接通过Java注解来表示,使得Thrift服务开发人员可以纯粹地使用Java进行开发。当你的服务启动时,Swift运行时将通过组合使用反射以及解析Swift的注解来收集所有相关服务以及类型的信息。通过这些信息,它可以构建出和Thrift编译器在解析Thrift IDL文件时构建的模型一样的模型。然后,它将使用这个模型,并通过从字节码生成用于序列化/反序列化这些自定义类型的新类,来直接运行服务器以及客户端(而不需要任何生成的服务器或者客户端的存根代码)。

跳过常规的Thrift代码生成,还能使添加新功能变得更加轻松,而无需修改IDL编译器,所以我们的许多新功能(如异步客户端)都是首先在Swift中得到支持。如果你感兴趣,可以查阅Swift的GitHub页面(https://github.com/facebook/swift)上的介绍信息。

15.1.6 结果

在下面的各节中,我们将量化一些我们使用Netty的过程中所观察到的一些成果。

1.性能比较

一种测量Thrift服务器性能的方式是对于空操作的基准测试。这种基准测试使用了长时间运行的客户端,这些客户端不间断地对发送回空响应的服务器进行Thrift调用。虽然这种测量方式对于大部分的实际Thrift服务来说,不是真实意义上的性能测试,但是它仍然很好地度量了Thrift服务的最大潜能,而且提高这一基准,通常也就意味着减少了该框架本身的CPU使用。

如表15-1所示,在这个基准测试下,Nifty的性能优于所有其他基于NIO的Thrift服务器(TNonblockingServer、TThreadedSelectorServer以及TThreadPoolServer)的实现。它甚至轻松地击败了我们以前的Java服务器实现(我们内部使用的一个Nifty之前的服务器实现,基于原始的NIO以及直接缓冲区)。

表15-1 不同实现的基准测试结果

Thrift服务器实现

空操作请求/秒

TNonblockingServer

~68 000

TThreadedSelectorServer

188 000

TThreadPoolServer

867 000

较老的Java服务器(使用NIO和直接缓冲区)

367 000

Nifty

963 000

较老的基于libevent的C++服务器

895 000

下一代基于libevent的C++服务器

1 150 000

我们所测试过的唯一能够和Nifty相提并论的Java服务器是TThreadPoolServer。这个服务器实现使用了原始的OIO,并且在一个专门的线程上运行每个连接。这使得它在处理少量的连接时表现不错;然而,使用OIO,当你的服务器需要处理大量的并发连接时,你将很容易遇到伸缩性问题。

Nifty甚至击败了之前的C++服务器实现,这是我们开始开发Nifty时最夺目的一点,虽然它相对于我们的下一代C++服务器框架还有一些差距,但至少也大致相当。

2.稳定性问题的例子

在Nifty之前,我们在Facebook的许多主要的Java服务都使用了一个较老的、自定义的基于NIO的Thrift服务器实现,它的工作方式类似于Nifty。该实现是一个较旧的代码库,有更多的时间成熟,但是由于它的异步I/O处理代码是从零开始构建的,而且因为Nifty是构建在Netty的异步I/O框架的坚实基础之上的,所以(相比之下)它的问题也就少了很多。

我们的一个自定义的消息队列服务是基于那个较旧的框架构建的,而它开始遭受一种套接字泄露。大量的连接都停留在了CLOSE_WAIT状态,这意味着服务器接收了客户端已经关闭了套接字的通知,但是服务器从来不通过其自身的调用来关闭套接字进行回应。这使得这些套接字都停滞在了CLOSE_WAIT状态。

问题发生得很慢;在处理这个服务的整个机器集群中,每秒可能有数以百万计的请求,但是通常在一个服务器上只有一个套接字会在一个小时之内进入这个状态。这不是一个迫在眉睫的问题,因为在那种速率下,在一个服务器需要重启前,将需要花费很长的时间,但是这也复杂化了追查原因的过程。彻底地挖掘代码也没有带来太大的帮助:最初的几个地方看起来可疑,但是最终都被排除了,而我们也并没有定位到问题所在。

最终,我们将该服务迁移到了Nifty之上。转换(包括在预发环境中进行测试)花了不到一天的时间,而这个问题就此消失了。使用Nifty,我们就真的再也没见过类似的问题了。

这只是在直接使用NIO时可能会出现的微妙bug的一个例子,而且它类似于那些在我们的C++ Thrift框架稳定的过程中,不得不一次又一次地解决的bug。但是我认为这是一个很好的例子,它说明了通过使用Netty是如何帮助我们利用它多年来收到的稳定性修复的。

3.改进C++实现的超时处理

Netty还通过为改进我们的C++框架提供一些启发间接地帮助了我们。一个这样的例子是基于散列轮的计时器。我们的C++框架使用了来自于libevent的超时事件来驱动客户端以及服务器的超时,但是为每个请求都添加一个单独的超时被证明是十分昂贵的,因此我们一直都在使用我们称之为超时集的东西。其思想是:一个到特定服务的客户端连接,对于由该客户端发出的每个请求,通常都具有相同的接收超时,因此对于一组共享了相同的时间间隔的超时集合,我们仅维护一个真正的计时器事件。每个新的超时都将被保证会在对于该超时集合的现存的超时被调度之后触发,因此当每个超时过期或者被取消时,我们将只会安排下一个超时。

然而,我们的用户偶尔想要为每个调用都提供单独的超时,为在相同连接上的不同的请求设置不同的超时值。在这种情况下,使用超时集合的好处就消失了,因此我们尝试了使用单独的计时器事件。在大量的超时被同时调度时,我们开始看到了性能问题。我们知道Nifty不会碰到这个问题,除了它不使用超时集的这个事实—— Netty通过它的HashedWheelTimer[5]解决了该问题。因此,带着来自Netty的灵感,我们为我们的C++ Thrift框架添加了一个基于散列轮的计时器,并解决了可变的每请求(per-request)超时时间间隔所带来的性能问题。

4.未来基于Netty 4的改进

Nifty目前运行在Netty 3上,这对我们来说已经很好了,但是我们已经有一个基于Netty 4的移植版本准备好了,现在第4版的Netty已经稳定下来了,我们很快就会迁移过去。我们热切地期待着Netty 4的API将会带给我们的一些益处。

一个我们计划如何更好地利用Netty 4的例子是实现更好地控制哪个线程将管理一个给定的连接。我们希望使用这项特性,可以使服务器的处理器方法能够从和该服务器调用所运行的I/O线程相同的线程开始异步的客户端调用。这是那些专门的C++服务器(如Thrift请求路由器)已经能够利用的特性。

从该例子延伸开来,我们也期待着能够构建更好的客户端连接池,使得能够把现有的池化连接迁移到期望的I/O工作线程上,这在第3版的Netty中是不可能做到的。

15.1.7 Facebook小结

在Netty的帮助下,我们已经能够构建更好的Java服务器框架了,其几乎能够与我们最快的C++ Thrift服务器框架的性能相媲美。我们已经将我们现有的一些主要的Java服务迁移到了Nifty,并解决了一些令人讨厌的稳定性和性能问题,同时我们还开始将一些来自Netty,以及Nifty和Swift开发过程中的思想,反馈到提高C++ Thrift的各个方面中。

不仅如此,使用Netty是令人愉悦的,并且它已经添加了大量的新特性,例如,对于Thrift客户端的内置SOCKS支持来说,添加起来小菜一碟。

但是我们并不止步于此。我们还有大量的性能调优工作要做,以及针对将来的大量的其他方面的改进计划。如果你对使用Java进行Thrift开发感兴趣,一定要关注哦!

15.2 Netty在Twitter的使用:Finagle

Jeff Smick,Twitter软件工程师

Finagle是Twitter构建在Netty之上的容错的、协议不可知的RPC框架。从提供用户信息、推特以及时间线的后端服务到处理HTTP请求的前端API端点,所有组成Twitter架构的核心服务都建立在Finagle之上。

15.2.1 Twitter成长的烦恼

Twitter最初是作为一个整体式的Ruby On Rails应用程序构建的,我们半亲切地称之为Monorail。随着Twitter开始经历大规模的成长,Ruby运行时以及Rails框架开始成为瓶颈。从计算机的角度来看,Ruby对资源的利用是相对低效的。从开发的角度来看,该Monorail开始变得难以维护。对一个部分的代码修改将会不透明地影响到另外的部分。代码的不同方面的所属权也不清楚。无关核心业务对象的小改动也需要一次完整的部署。核心业务对象也没有暴露清晰的API,其加剧了内部结构的脆弱性以及发生故障的可能性。

我们决定将该Monorail分拆为不同的服务,明确归属人并且精简API,使迭代更快速,维护更容易。每个核心业务对象都将由一个专门的团队维护,并且由它自己的服务提供支撑。公司内部已经有了在JVM上进行开发的先例—几个核心的服务已经从该Monorail中迁移出去,并已经用Scala重写了。我们的运维团队也有运维JVM服务的背景,并且知道如何运维它们。鉴于此,我们决定使用Java或者Scala在JVM上构建所有的新服务。大多数的服务开发团队都决定选用Scala作为他们的JVM语言。

15.2.2 Finagle的诞生

为了构建出这个新的架构,我们需要一个高性能的、容错的、协议不可知的、异步的RPC框架。在面向服务的架构中,服务花费了它们大部分的时间来等待来自其他上游的服务的响应。使用异步的库使得服务可以并发地处理请求,并且充分地利用硬件资源。尽管Finagle可以直接建立在NIO之上,但是Netty已经解决了许多我们可能会遇到的问题,并且它提供了一个简洁、清晰的API。

Twitter构建在几种开源的协议之上,主要是HTTP、Thrift、Memcached、MySQL以及Redis。我们的网络栈需要具备足够的灵活性,能够和任何的这些协议进行交流,并且具备足够的可扩展性,以便我们可以方便地添加更多的协议。Netty并没有绑定任何特定的协议。向它添加协议就像创建适当的ChannelHandler一样简单。这种扩展性也催生了许多社区驱动的协议实现,包括SPDY、PostgreSQL、WebSockets、IRC以及AWS[6]。

Netty的连接管理以及协议不可知的特性为构建Finagle提供了绝佳的基础。但是我们也有一些其他的Netty不能开箱即满足的需求,因为那些需求都更高级。客户端需要连接到服务器集群,并且需要做跨服务器集群的负载均衡。所有的服务都需要暴露运行指标(请求率、延迟等),其可以为调试服务的行为提供有价值的数据。在面向服务的架构中,一个单一的请求都可能需要经过数十种服务,使得如果没有一个由Dapper启发的跟踪框架,调试性能问题几乎是不可能的[7]。Finagle正是为了解决这些问题而构建的。

15.2.3 Finagle是如何工作的

Finagle的内部结构是非常模块化的。组件都是先独立编写,然后再堆叠在一起。根据所提供的配置,每个组件都可以被换入或者换出。例如,所有的跟踪器都实现了相同的接口,因此可以创建一个跟踪器用来将跟踪数据发送到本地文件、保存在内存中并暴露一个读取端点,或者将它写出到网络。

在Finagle栈的底部是Transport层。这个类表示了一个能够被异步读取和写入的对象流。Transport被实现为Netty的ChannelHandler,并被插入到了ChannelPipeline的尾端。来自网络的消息在被Netty接收之后,将经由ChannelPipeline,在那里它们将被编解码器解释,并随后被发送到Finagle的Transport层。从那里Finagle将会从Transport层读取消息,并且通过它自己的栈发送消息。

对于客户端的连接,Finagle维护了一个可以在其中进行负载均衡的传输(Transport)池。根据所提供的连接池的语义,Finagle将从Netty请求一个新连接或者复用一个现有的连接。当请求新连接时,将会根据客户端的编解码器创建一个Netty的ChannelPipeline。额外的用于统计、日志记录以及SSL的ChannelHandler将会被添加到该ChannelPipeline中。该连接随后将会被递给一个Finagle能够写入和读取的ChannelTransport[8]

在服务器端,创建了一个Netty服务器,然后向其提供一个管理编解码器、统计、超时以及日志记录的ChannelPipelineFactory。位于服务器ChannelPipeline尾端的ChannelHandler是一个Finagle的桥接器。该桥接器将监控新的传入连接,并为每一个传入连接创建一个新的Transport。该Transport将在新的Channel被递交给某个服务器的实现之前对其进行包装。随后从ChannelPipeline读取消息,并将其发送到已实现的服务器实例。

图15-6展示了Finagle的客户端和服务器之间的关系。

图15-6 Netty的使用

Netty/Finagle桥接器

代码清单15-1展示了一个使用默认选项的静态的ChannelFactory

代码清单15-1 设置ChannelFactory

object Netty3Transporter {  val channelFactory: ChannelFactory =    new NioClientSocketChannelFactory(    ← --  创建一个ChannelFactory的实例      Executor, 1 /*# boss threads*/, WorkerPool, DefaultTimer    ){      // no-op; unreleasable      override def releaseExternalResources =     }  val defaultChannelOptions: Map[String, Object] = Map(   ← -- 设置用于新Channel的选项    "tcpNoDelay" -> java.lang.Boolean.TRUE,    "reuseAddress" -> java.lang.Boolean.TRUE  )}  

这个ChannelFactory桥接了Netty的Channel和Finagle的Transport(为简洁起见,这里移除了统计代码)。当通过apply方法被调用时,这将创建一个新的Channel以及Transport。当该Channel已经连接或者连接失败时,将会返回一个被完整填充的Future

代码清单15-2展示了将Channel连接到远程主机的ChannelConnector

代码清单15-2 连接到远程服务器

private[netty3] class ChannelConnector[In, Out](  newChannel:  => Channel,  newTransport: Channel => Transport[In, Out]) extends (SocketAddress => Future[Transport[In, Out]]) {  def apply(addr: SocketAddress): Future[Transport[In, Out]] = {    require(addr != null)   ← --  如果Channel 创建失败,那么异常将会被包装在Future 中返回    val ch = try newChannel catch {       case NonFatal(exc) => return Future.exception(exc)    }    // Transport is now bound to the channel; this is done prior to    // it being connected so we don't lose any messages.    val transport = newTransport(ch)  ← -- 使用Channel创建一个新的Transport    val connectFuture = ch.connect(addr)   ← -- 创建一个新的Promise,以便在连接尝试完成时及时收到通知      val promise = new Promise[Transport[In, Out]]      promise setInterruptHandler { case _cause =>      // Propagate cancellations onto the netty future.      connectFuture.cancel    }    connectFuture.addListener(new ChannelFutureListener {      def operationComplete(f: ChannelFuture) {    ← -- 通过完全填充已经创建的Promise 来处理connectFuture的完成状态        if (f.isSuccess) {          promise.setValue(transport)        } else if (f.isCancelled) {          promise.setException(          WriteException(new CancelledConnectionException))        } else {          promise.setException(WriteException(f.getCause))        }      }    })    promise onFailure { _ => Channels.close(ch)    }  }}  

这个工厂提供了一个ChannelPipelineFactory,它是一个ChannelTransport的工厂。该工厂是通过apply方法调用的。一旦被调用,就会创建一个新的ChannelPipelinenewPipeline)。ChannelFactory将会使用这个ChannelPipeline来创建新的Channel,随后使用所提供的选项(newConfiguredChannel)对它进行配置。配置好的Channel将会被作为一个匿名的工厂传递给一个ChannelConnector。该连接器将会被调用,并返回一个Future[Transport]

代码清单15-3展示了细节[9]。

代码清单15-3 基于Netty 3的传输

case class Netty3Transporter[In, Out](  pipelineFactory: ChannelPipelineFactory,  newChannel: ChannelPipeline => Channel =    Netty3Transporter.channelFactory.newChannel(_),  newTransport: Channel => Transport[In, Out] =    new ChannelTransport[In, Out](_),  // various timeout/ssl options) extends (  (SocketAddress, StatsReceiver) => Future[Transport[In, Out]]){  private def newPipeline(    addr: SocketAddress,    statsReceiver: StatsReceiver  )={    val pipeline = pipelineFactory.getPipeline    // add stats, timeouts, and ssl handlers    pipeline    ← --  创建一个ChannelPipeline,并添加所需的ChannelHandler  }  private def newConfiguredChannel(    addr: SocketAddress,    statsReceiver: StatsReceiver  )={    val ch = newChannel(newPipeline(addr, statsReceiver))    ch.getConfig.setOptions(channelOptions.asJava)    ch  }  def apply(    addr: SocketAddress,    statsReceiver: StatsReceiver  ): Future[Transport[In, Out]] = {    val conn = new ChannelConnector[In, Out](       => newConfiguredChannel(addr, statsReceiver),      newTransport, statsReceiver)   ← --  创建一个内部使用的ChannelConnector    conn(addr)  }}  

Finagle服务器使用Listener将自身绑定到给定的地址。在这个示例中,监听器提供了一个ChannelPipelineFactory、一个ChannelFactory以及各种选项(为了简洁起见,这里没包括)。我们使用一个要绑定的地址以及一个用于通信的Transport调用了Listener。接着,创建并配置了一个Netty的ServerBootstrap。然后,创建了一个匿名的ServerBridge工厂,递给ChannelPipelineFactory,其将被递交给该引导服务器。最后,该服务器将会被绑定到给定的地址。

现在,让我们来看看基于Netty的Listener实现,如代码清单15-4所示。

代码清单15-4 基于Netty的Listener实现

case class Netty3Listener[In, Out](  pipelineFactory: ChannelPipelineFactory,  channelFactory: ServerChannelFactory  bootstrapOptions: Map[String, Object], ... // stats/timeouts/ssl config) extends Listener[In, Out] {  def newServerPipelineFactory(    statsReceiver: StatsReceiver, newBridge:  => ChannelHandler  ) = new ChannelPipelineFactory {    ← --  创建一个ChannelPipelineFactory    def getPipeline = {      val pipeline = pipelineFactory.getPipeline      ... // add stats/timeouts/ssl      pipeline.addLast("finagleBridge", newBridge)   ← --  将该桥接器添 加到ChannelPipeline 中      pipeline    }  }  def listen(addr: SocketAddress)(    serveTransport: Transport[In, Out] => Unit  ): ListeningServer =    new ListeningServer with CloseAwaitably {      val newBridge =  => new ServerBridge(serveTransport, ...)      val bootstrap = new ServerBootstrap(channelFactory)      bootstrap.setOptions(bootstrapOptions.asJava)      bootstrap.setPipelineFactory(        newServerPipelineFactory(scopedStatsReceiver, newBridge))      val ch = bootstrap.bind(addr)    }}  }  

当一个新的Channel打开时,该桥接器将会创建一个新的ChannelTransport并将其递回给Finagle服务器。代码清单15-5展示了所需的代码[10]。

代码清单15-5 桥接Netty和Finagle

class ServerBridge[In, Out](  serveTransport: Transport[In, Out] => Unit,) extends SimpleChannelHandler {  override def channelOpen(    ctx: ChannelHandlerContext,    e: ChannelStateEvent  ){    val channel = e.getChannel    val transport = new ChannelTransport[In, Out](channel)    ← --  创建一个ChannelTransport,以便在一个新Channel 被打开时桥接到Finagle    serveTransport(transport)    super.channelOpen(ctx, e)  }  override def exceptionCaught(    ctx: ChannelHandlerContext,    e: ExceptionEvent  ) { // log exception and close channel }}  

15.2.4 Finagle的抽象

Finagle的核心概念是一个从RequestFuture[Response]的[11]的简单函数(函数式编程语言是这里的关键)。

type Service[Req, Rep] = Req => Future[Rep][12]  

这种简单性释放了非常强大的组合性。Service是一种对称的API,同时代表了客户端以及服务器。服务器实现了该服务的接口。该服务器可以被具体地用于测试,或者Finagle也可以将它暴露到网络接口上。客户端将被提供一个服务实现,其要么是虚拟的,要么是某个远程服务器的具体表示。

例如,我们可以通过实现一个服务来创建一个简单的HTTP服务器,该服务接受HttpReq作为参数,返回一个代表最终响应的Future[HttpRep]

val s: Service[HttpReq, HttpRep] = new Service[HttpReq, HttpRep] {  def apply(req: HttpReq): Future[HttpRep] =    Future.value(HttpRep(Status.OK, req.body))}Http.serve(":80", s)  

随后,客户端将被提供一个该服务的对称表示。

val client: Service[HttpReq, HttpRep] = Http.newService("twitter.com:80")val f: Future[HttpRep] = client(HttpReq("/"))f map { rep => processResponse(rep) }  

这个例子将把该服务器暴露到所有网络接口的80端口上,并从twitter.com的80端口消费。

我们也可以选择不暴露该服务器,而是直接使用它。

server(HttpReq("/")) map { rep => processResponse(rep) }  

在这里,客户端代码有相同的行为,只是不需要网络连接。这使得测试客户端和服务器非常简单直接。

客户端以及服务器都提供了特定于应用程序的功能。但是,也有对和应用程序无关的功能的需求。这样的例子如超时、身份验证以及统计等。Filter为实现应用程序无关的功能提供了抽象。

过滤器接收一个请求和一个将被它组合的服务:

type Filter[Req, Rep] = (Req, Service[Req, Rep]) => Future[Rep]  

多个过滤器可以在被应用到某个服务之前链接在一起:

recordHandletime andThentraceRequest andThencollectJvmStats andThenmyService  

这允许了清晰的逻辑抽象以及良好的关注点分离。在内部,Finagle大量地使用了过滤器,其有助于提高模块化以及可复用性。它们已经被证明,在测试中很有价值,因为它们通过很小的模拟便可以被独立地单元测试。

过滤器可以同时修改请求和响应的数据以及类型。图15-7展示了一个请求,它在通过一个过滤器链之后到达了某个服务并返回。

图15-7 请求/响应流

我们可以使用类型修改来实现身份验证。

val auth: Filter[HttpReq, AuthHttpReq, HttpRes, HttpRes] =  { (req, svc) => authReq(req) flatMap { authReq => svc(authReq) } }val authedService: Service[AuthHttpReq, HttpRes] = ...val service: Service[HttpReq, HttpRes] =  auth andThen authedService  

这里我们有一个需要AuthHttpReq 的服务。为了满足这个需求,创建了一个能接收HttpReq并对它进行身份验证的过滤器。随后,该过滤器将和该服务进行组合,产生一个新的可以接受HttpReq并产生HttpRes的服务。这使得我们可以从该服务隔离,单独地测试身份验证过滤器。

15.2.5 故障管理

我们假设故障总是会发生;硬件会失效、网络会变得拥塞、网络链接会断开。对于库来说,如果它们正在上面运行的或者正在与之通信的系统发生故障,那么库所拥有的极高的吞吐量以及极低的延迟都将毫无意义。为此,Finagle是建立在有原则地管理故障的基础之上的。为了能够更好地管理故障,它牺牲了一些吞吐量以及延迟。

Finagle可以通过隐式地使用延迟作为启发式(算法的因子)来均衡跨集群主机的负载。Finagle客户端将在本地通过统计派发到单个主机的还未完成的请求数来追踪它所知道的每个主机上的负载。有了这些信息,Finagle会将新的请求(隐式地)派发给具有最低负载、最低延迟的主机。

失败的请求将导致Finagle关闭到故障主机的连接,并将它从负载均衡器中移除。在后台,Finagle将不断地尝试重新连接。只有在Finagle能够重新建立一个连接时,该主机才会被重新加入到负载均衡器中。然后,服务的所有者可以自由地关闭各个主机,而不会对下游的客户端造成负面的影响。

15.2.6 组合服务

Finagle的服务即函数(service-as-a-function)的观点允许编写简单但富有表现力的代码。例如,一个用户发出的对于他们的主页时间线的请求涉及了大量的服务,其中的核心是身份验证服务、时间线服务以及推特服务。这些关系可以被简洁地表达。

代码清单15-6 通过Finagle组合服务

val timelineSvc = Thrift.newIface[TimelineService](...)   ← --  为每个服务创建一个客户端val tweetSvc = Thrift.newIface[TweetService](...)val authSvc = Thrift.newIface[AuthService](...)val authFilter = Filter.mk[Req, AuthReq, Res, Res] { (req, svc) =>  ← --  创建一个新的过滤器,对传入的请求进行身份验证  authSvc.authenticate(req) flatMap svc(_)}val apiService = Service.mk[AuthReq, Res] { req =>   ← -- 创建一个服务,将已通过身份验证的时间线请求转换为一个JSON 响应    timelineSvc(req.userId) flatMap {tl =>    val tweets = tl map tweetSvc.getById(_)    Future.collect(tweets) map tweetsToJson(_)  }}Http.serve(":80", authFilter andThen apiService)   ← -- 使用该身份验证过滤器以及我们的服务在80 端口上启动一个新的HTTP 服务  

在这里,我们为时间线服务、推特服务以及身份验证服务都创建了客户端。并且,为了对原始的请求进行身份验证,创建了一个过滤器。最后,我们实现的服务,结合了身份验证过滤器,暴露在80端口上。

当收到请求时,身份验证过滤器将尝试对它进行身份验证。错误都会被立即返回,不会影响核心业务。身份验证成功之后,AuthReq将会被发送到API服务。该服务将会使用附加的userId通过时间线服务来查找该用户的时间线。然后,返回一组推特ID,并在稍后遍历。每个ID都会被用来请求与之相关联的推特。最后,这组推特请求会被收集起来,转换为一个JSON格式的响应。

正如你所看到的,我们定义了数据流,并且将并发的问题留给了Finagle。我们不必管理线程池,也不必担心竞态条件。这段代码既清晰又安全。

15.2.7 未来:Netty

为了改善Netty的各个部分,让Finagle以及更加广泛的社区都能够从中受益,我们一直在与Netty的维护者密切合作[13]。最近,Finagle的内部结构已经升级为更加模块化的结构,为升级到Netty 4铺平了道路。

15.2.8 Twitter小结

Finagle已经取得了辉煌的成绩。我们已经想方设法大幅度地提高了我们所能够处理的流量,同时也降低了延迟以及硬件需求。例如,在将我们的API端点从Ruby技术栈迁移到Finagle之后,我们看到,延迟从数百毫秒下降到了数十毫秒之内,同时还将所需要的机器数量从3位数减少到了个位数。我们新的技术栈已经使得我们达到了新的吞吐量记录。在撰写本文时,我们所记录的每秒的推特数是143 199[14]。这一数字对于我们的旧架构来说简直是难以想象的。

Finagle的诞生是为了满足Twitter横向扩展以支持全球数以亿计的用户的需求,而在当时支撑数以百万计的用户并保证服务在线已然是一项艰巨的任务了。使用Netty作为基础,我们能够快速地设计和建造Finagle,以解决我们的伸缩性难题。Finagle和Netty处理了Twitter所遇到的每一个请求。

15.3 小结

本章深入了解了对于像Facebook以及Twitter这样的大公司是如何使用Netty来保证最高水准的性能以及灵活性的。

  • Facebook的Nifty项目展示了,如何通过提供自定义的协议编码器以及解码器,利用Netty来替换现有的Thrift实现。
  • Twitter的Finagle展示了,如何基于Netty来构建你自己的高性能框架,并通过类似于负载均衡以及故障转移这样的特性来增强它的。

我们希望这里所提供的案例研究,能够成为你打造下一代杰作的时候的信息和灵感的来源。


[1] 本节所表达的观点都是本节作者的观点,并不一定反映了该作者的雇主的观点。

[2] 一份来自原始的Thrift的开发者的不旧不新的白皮书可以在http://thrift.apache.org/static/files/thrift-20070401. pdf找到。

[3] 可以在http://thrift.apache.org找到更多的例子。

[4] Folly是Facebook的开源C++公共库:https://www.facebook.com/notes/facebook-engineering/folly-the-facebook- open-source-library/10150864656793920。

[5] 有关HashedWheelTimer类的更多的信息,参见http://netty.io/4.1/api/io/netty/util/HashedWheel Timer.html。

[6] 关于SPDY的更多信息参见https://github.com/twitter/finagle/tree/master/finagle-spdy。关于PostgreSQL参见https://github.com/mairbek/finagle-postgres。关于WebSockets参见https://github.com/sprsquish/finagle- websocket。关于IRC参见https://github.com/sprsquish/finagle-irc。关于AWS参见https://github.com/sclasen/ finagle-aws。

[7] 有关Dapper的信息可以在http://research.google.com/pubs/pub36356.html找到。该分布式的跟踪框架是Zipkin,可以在https://github.com/twitter/zipkin找到。

[8] 相关的类可以在https://github.com/twitter/finagle/blob/develop/finagle-netty4/src/main/scala/com/twitter/finagle/ netty4/transport/ChannelTransport.scala找到。——译者注

[9] Finagle的源代码位于https://github.com/twitter/finagle。

[10] 完整的源代码在https://github.com/twitter/finagle。

[11] 这里的Future[Response]相当于Java 8 中的CompletionStage。——译者注

[12] 虽然不完全等价,但是可以理解为Java 8的public interface Service extends Function>{}。——译者注

[13] “Netty 4 at Twitter: Reduced GC Overhead”:https://blog.twitter.com/2013/netty-4-at-twitter-reduced-gc-overhead。

[14] “New Tweets per second record, and how!”:https://blog.twitter.com/2013/new-tweets-per-second-recordand-how。