首页 » Netty实战 » Netty实战全文在线阅读

《Netty实战》第12章 WebSocket

关灯直达底部

本章主要内容

  • 实时Web的概念
  • WebSocket协议
  • 使用Netty构建一个基于WebSocket的聊天室服务器

如果你有跟进Web技术的最新进展,你很可能就遇到过“实时Web”这个短语,而如果你在工程领域中有实时应用程序的实战经验,那么你可能有点怀疑这个术语到底意味着什么。

因此,让我们首先澄清,这里并不是指所谓的硬实时服务质量(QoS),硬实时服务质量是保证计算结果将在指定的时间间隔内被递交。仅HTTP的请求/响应模式设计就使得其很难被支持,从过去所设计的各种方案中都没有提供一种能够提供令人满意的解决方案的事实中便可见一斑。

虽然已经有了一些关于正式定义实时Web服务[1]语义的学术讨论,但是被普遍接受的定义似乎还未出现。因此现在我们将采纳下面来自维基百科的非权威性描述:

实时Web利用技术和实践,使用户在信息的作者发布信息之后就能够立即收到信息,而不需要他们或者他们的软件周期性地检查信息源以获取更新。

简而言之,虽然全面的实时Web可能并不会马上到来,但是它背后的想法却助长了对于几乎瞬时获得信息的期望。我们将在本章中讨论的WebSocket[2]协议便是在这个方向上迈出的坚实的一步。

12.1 WebSocket简介

WebSocket协议是完全重新设计的协议,旨在为Web上的双向数据传输问题提供一个切实可行的解决方案,使得客户端和服务器之间可以在任意时刻传输消息,因此,这也就要求它们异步地处理消息回执。(作为HTML5客户端API的一部分,大部分最新的浏览器都已经支持了WebSocket。)

Netty对于WebSocket的支持包含了所有正在使用中的主要实现,因此在你的下一个应用程序中采用它将是简单直接的。和往常使用Netty一样,你可以完全使用该协议,而无需关心它内部的实现细节。我们将通过创建一个基于WebSocket的实时聊天应用程序来演示这一点。

12.2 我们的WebSocket示例应用程序

为了让示例应用程序展示它的实时功能,我们将通过使用WebSocket协议来实现一个基于浏览器的聊天应用程序,就像你可能在Facebook的文本消息功能中见到过的那样。我们将通过使得多个用户之间可以同时进行相互通信,从而更进一步。

图12-1说明了该应用程序的逻辑:

(1)客户端发送一个消息;

(2)该消息将被广播到所有其他连接的客户端。

图12-1 WebSocket应用程序逻辑

这正如你可能会预期的一个聊天室应当的工作方式:所有的人都可以和其他的人聊天。在示例中,我们将只实现服务器端,而客户端则是通过Web页面访问该聊天室的浏览器。正如同你将在接下来的几页中所看到的,WebSocket简化了编写这样的服务器的过程。

12.3 添加WebSocket支持

在从标准的HTTP或者HTTPS协议切换到WebSocket时,将会使用一种称为升级握手[3]的机制。因此,使用WebSocket的应用程序将始终以HTTP/S作为开始,然后再执行升级。这个升级动作发生的确切时刻特定于应用程序;它可能会发生在启动时,也可能会发生在请求了某个特定的URL之后。

我们的应用程序将采用下面的约定:如果被请求的URL以/ws结尾,那么我们将会把该协议升级为WebSocket;否则,服务器将使用基本的HTTP/S。在连接已经升级完成之后,所有数据都将会使用WebSocket进行传输。图12-2说明了该服务器逻辑,一如在Netty中一样,它由一组ChannelHandler实现。我们将会在下一节中,解释用于处理HTTP以及WebSocket协议的技术时,描述它们。

图12-2 服务器逻辑

12.3.1 处理HTTP请求

首先,我们将实现该处理HTTP请求的组件。这个组件将提供用于访问聊天室并显示由连接的客户端发送的消息的网页。代码清单12-1给出了这个HttpRequestHandler对应的代码,其扩展了SimpleChannelInboundHandler以处理FullHttpRequest消息。需要注意的是,channelRead0方法的实现是如何转发任何目标URI为/ws的请求的。

代码清单12-1 HTTPRequestHandler

public class HttpRequestHandler  extends SimpleChannelInboundHandler<FullHttpRequest> {   ← --  扩展SimpleChannel-InboundHandler 以处理FullHttpRequest 消息  private final String wsUri;  private static final File INDEX;  static {    URL location = HttpRequestHandler.class      .getProtectionDomain      .getCodeSource.getLocation;    try {      String path = location.toURI + "index.html";      path = !path.contains("file:") ? path : path.substring(5);      INDEX = new File(path);    } catch (URISyntaxException e) {      throw new IllegalStateException(        "Unable to locate index.html", e);    }  }   public HttpRequestHandler(String wsUri) {    this.wsUri = wsUri;  }  @Override  public void channelRead0(ChannelHandlerContext ctx,    FullHttpRequest request) throws Exception {    if (wsUri.equalsIgnoreCase(request.getUri)) {   ← -- ❶如果请求了WebSocket协议升级,则增加引用计数(调用retain方法),并将它传递给下一个ChannelInboundHandler      ctx.fireChannelRead(request.retain);    } else {      if (HttpHeaders.is100ContinueExpected(request)) {  ← -- ❷ 处理100 Continue请求以符合HTTP1.1 规范        send100Continue(ctx);      }      RandomAccessFile file = new RandomAccessFile(INDEX, "r");  ← -- 读取index.html      HttpResponse response = new DefaultHttpResponse(        request.getProtocolVersion, HttpResponseStatus.OK);      response.headers.set(        HttpHeaders.Names.CONTENT_TYPE,        "text/html; charset=UTF-8");      boolean keepAlive = HttpHeaders.isKeepAlive(request);      if (keepAlive) {  ← -- 如果请求了keep-alive,则添加所需要的HTTP头信息          response.headers.set(          HttpHeaders.Names.CONTENT_LENGTH, file.length);        response.headers.set( HttpHeaders.Names.CONNECTION,          HttpHeaders.Values.KEEP_ALIVE);      }      ctx.write(response);   ← -- ❸将HttpResponse写到客户端      if (ctx.pipeline.get(SslHandler.class) == null) {   ← -- ❹将index.html写到客户端        ctx.write(new DefaultFileRegion(          file.getChannel, 0, file.length));      } else {        ctx.write(new ChunkedNioFile(file.getChannel));      }      ChannelFuture future = ctx.writeAndFlush(   ← -- ❺写LastHttpContent并冲刷至客户端        LastHttpContent.EMPTY_LAST_CONTENT);      if (!keepAlive) {   ← -- ❻如果没有请求keep-alive,则在写操作完成后关闭Channel        future.addListener(ChannelFutureListener.CLOSE);      }    }  }  private static void send100Continue(ChannelHandlerContext ctx) {    FullHttpResponse response = new DefaultFullHttpResponse(      HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);    ctx.writeAndFlush(response);  }  @Override  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)    throws Exception {    cause.printStackTrace;    ctx.close;  }}  

如果该HTTP请求指向了地址为/ws的URI,那么HttpRequestHandler将调用FullHttp-Request对象上的retain方法,并通过调用fireChannelRead(msg)方法将它转发给下一个ChannelInboundHandler❶。之所以需要调用retain方法,是因为调用channelRead方法完成之后,它将调用FullHttpRequest对象上的release方法以释放它的资源。(参见我们在第6章中对于SimpleChannelInboundHandler的讨论。)

如果客户端发送了HTTP 1.1的HTTP头信息Expect: 100-continue,那么Http-RequestHandler将会发送一个100 Continue❷响应。在该HTTP头信息被设置之后,Http-RequestHandler将会写回一个HttpResponse❸给客户端。这不是一个FullHttp-Response,因为它只是响应的第一个部分。此外,这里也不会调用writeAndFlush方法,在结束的时候才会调用。

如果不需要加密和压缩,那么可以通过将index.html❹的内容存储到DefaultFile-Region中来达到最佳效率。这将会利用零拷贝特性来进行内容的传输。为此,你可以检查一下,是否有SslHandler存在于在ChannelPipeline中。否则,你可以使用ChunkedNioFile

HttpRequestHandler将写一个LastHttpContent❺来标记响应的结束。如果没有请求keep-alive❻,那么HttpRequestHandler将会添加一个ChannelFutureListener到最后一次写出动作的ChannelFuture,并关闭该连接。在这里,你将调用writeAndFlush方法以冲刷所有之前写入的消息。

这部分代码代表了聊天服务器的第一个部分,它管理纯粹的HTTP请求和响应。接下来,我们将处理传输实际聊天消息的WebSocket帧。

WEBSOCKET帧 WebSocket以帧的方式传输数据,每一帧代表消息的一部分。一个完整的消息可能会包含许多帧。

12.3.2 处理WebSocket帧

由IETF发布的WebSocket RFC,定义了6种帧,Netty为它们每种都提供了一个POJO实现。表12-1列出了这些帧类型,并描述了它们的用法。

表12-1 WebSocketFrame的类型

帧 类 型

描  述

BinaryWebSocketFrame

包含了二进制数据

TextWebSocketFrame

包含了文本数据

ContinuationWebSocketFrame

包含属于上一个BinaryWebSocketFrameTextWebSocket- Frame的文本数据或者二进制数据

CloseWebSocketFrame

表示一个CLOSE请求,包含一个关闭的状态码和关闭的原因

PingWebSocketFrame

请求传输一个PongWebSocketFrame

PongWebSocketFrame

作为一个对于PingWebSocketFrame的响应被发送

我们的聊天应用程序将使用下面几种帧类型:

  • CloseWebSocketFrame;
  • PingWebSocketFrame;
  • PongWebSocketFrame;
  • TextWebSocketFrame。

TextWebSocketFrame是我们唯一真正需要处理的帧类型。为了符合WebSocket RFC,Netty提供了WebSocketServerProtocolHandler来处理其他类型的帧。

代码清单12-2展示了我们用于处理TextWebSocketFrameChannelInboundHandler,其还将在它的ChannelGroup中跟踪所有活动的WebSocket连接。

代码清单12-2 处理文本帧

public class TextWebSocketFrameHandler  extends SimpleChannelInboundHandler<TextWebSocketFrame> {   ← --  扩展SimpleChannelInboundHandler,并处理TextWebSocketFrame 消息  private final ChannelGroup group;  public TextWebSocketFrameHandler(ChannelGroup group) {    this.group = group;  }  @Override  public void userEventTriggered(ChannelHandlerContext ctx,  ← --  重写userEventTriggered方法以处理自定义事件    Object evt) throws Exception {    if (evt == WebSocketServerProtocolHandler      .ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {      ctx.pipeline.remove(HttpRequestHandler.class);   ← --  如果该事件表示握手成功,则从该Channelipeline中移除Http-RequestHandler,因为将不会接收到任何HTTP 消息了      group.writeAndFlush(new TextWebSocketFrame(   ← --  ❶通知所有已经连接的WebSocket 客户端新的客户端已经连接上了         "Client " + ctx.channel + " joined"));  ← -- ❷将新的WebSocket Channel添加到ChannelGroup 中,以便它可以接收到所有的消息      group.add(ctx.channel);      } else {      super.userEventTriggered(ctx, evt);    }   }   @Override   public void channelRead0(ChannelHandlerContext ctx,     TextWebSocketFrame msg) throws Exception {     group.writeAndFlush(msg.retain);   ← -- ❸增加消息的引用计数,并将它写到ChannelGroup 中所有已经连接的客户端  }}  

TextWebSocketFrameHandler只有一组非常少量的责任。当和新客户端的WebSocket握手成功完成之后❶,它将通过把通知消息写到ChannelGroup中的所有Channel来通知所有已经连接的客户端,然后它将把这个新Channel加入到该ChannelGroup中❷。

如果接收到了TextWebSocketFrame消息❸,TextWebSocketFrameHandler将调用TextWebSocketFrame消息上的retain方法,并使用writeAndFlush方法来将它传输给ChannelGroup,以便所有已经连接的WebSocket Channel都将接收到它。

和之前一样,对于retain方法的调用是必需的,因为当channelRead0方法返回时,TextWebSocketFrame的引用计数将会被减少。由于所有的操作都是异步的,因此,writeAnd-Flush方法可能会在channelRead0方法返回之后完成,而且它绝对不能访问一个已经失效的引用。

因为Netty在内部处理了大部分剩下的功能,所以现在剩下唯一需要做的事情就是为每个新创建的Channel初始化其ChannelPipeline。为此,我们将需要一个ChannelInitializer

12.3.3 初始化ChannelPipeline

正如你已经学习到的,为了将ChannelHandler安装到ChannelPipeline中,你扩展了ChannelInitializer,并实现了initChannel方法。代码清单12-3展示了由此生成的ChatServerInitializer的代码。

代码清单12-3 初始化ChannelPipeline

public class ChatServerInitializer extends ChannelInitializer<Channel> {   ← --  扩展了ChannelInitializer  private final ChannelGroup group;  public ChatServerInitializer(ChannelGroup group) {    this.group = group;  }  @Override  protected void initChannel(Channel ch) throws Exception {   ← -- 将所有需要的ChannelHandler 添加到ChannelPipeline 中    ChannelPipeline pipeline = ch.pipeline;    pipeline.addLast(new HttpServerCodec);    pipeline.addLast(new ChunkedWriteHandler);    pipeline.addLast(new HttpObjectAggregator(64 * 1024));    pipeline.addLast(new HttpRequestHandler("/ws"));    pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));    pipeline.addLast(new TextWebSocketFrameHandler(group));  }}  

对于initChannel方法的调用,通过安装所有必需的ChannelHandler来设置该新注册的ChannelChannelPipeline。这些ChannelHandler以及它们各自的职责都被总结在了表12-2中。

表12-2 基于WebSocket聊天服务器的ChannelHandler

ChannelHandler

职  责

HttpServerCodec

将字节解码为HttpRequestHttpContentLastHttp- Content。并将HttpRequestHttpContentLast- HttpContent编码为字节

ChunkedWriteHandler

写入一个文件的内容

HttpObjectAggregator

将一个HttpMessage和跟随它的多个HttpContent聚合为单个FullHttpRequest或者FullHttpResponse(取决于它是被用来处理请求还是响应)。安装了这个之后,ChannelPipeline中的下一个ChannelHandler将只会收到完整的HTTP请求或响应

HttpRequestHandler

处理FullHttpRequest(那些不发送到/ws URI的请求)

WebSocketServerProtocolHandler

按照WebSocket规范的要求,处理WebSocket升级握手、PingWebSocketFramePongWebSocketFrameCloseWebSocketFrame

TextWebSocketFrameHandler

处理TextWebSocketFrame和握手完成事件

Netty的WebSocketServerProtocolHandler处理了所有委托管理的WebSocket帧类型以及升级握手本身。如果握手成功,那么所需的ChannelHandler将会被添加到ChannelPipeline中,而那些不再需要的ChannelHandler则将会被移除。

WebSocket协议升级之前的ChannelPipeline的状态如图12-3所示。这代表了刚刚被ChatServerInitializer初始化之后的ChannelPipeline

图12-3 WebSocket协议升级之前的ChannelPipeline

当WebSocket协议升级完成之后,WebSocketServerProtocolHandler将会把Http-RequestDecoder替换为WebSocketFrameDecoder,把HttpResponseEncoder替换为WebSocketFrameEncoder。为了性能最大化,它将移除任何不再被WebSocket连接所需要的ChannelHandler。这也包括了图12-3所示的 HttpObjectAggregatorHttpRequest-Handler

图12-4展示了这些操作完成之后的ChannelPipeline。需要注意的是,Netty目前支持4个版本的WebSocket协议,它们每个都具有自己的实现类。Netty将会根据客户端(这里指浏览器)所支持的版本[4],自动地选择正确版本的WebSocketFrameDecoderWebSocket-FrameEncoder

图12-4 WebSocket协议升级完成之后的ChannelPipeline

12.3.4 引导

这幅拼图最后的一部分是引导该服务器,并安装ChatServerInitializer的代码。这将由ChatServer类处理,如代码清单12-4所示。

代码清单12-4 引导服务器

public class ChatServer {  private final ChannelGroup channelGroup =    new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);   ← --  创建DefaultChannelGroup,其将保存所有已经连接的WebSocket Channel  private final EventLoopGroup group = new NioEventLoopGroup;  private Channel channel;  public ChannelFuture start(InetSocketAddress address) {  ← --  引导服务器    ServerBootstrap bootstrap = new ServerBootstrap;     bootstrap.group(group)      .channel(NioServerSocketChannel.class)      .childHandler(createInitializer(channelGroup));    ChannelFuture future = bootstrap.bind(address);    future.syncUninterruptibly;    channel = future.channel;    return future;  }  protected ChannelInitializer<Channel> createInitializer(   ← --  创建ChatServerInitializer     ChannelGroup group) {    return new ChatServerInitializer(group);  }  public void destroy {   ← --  处理服务器关闭,并释放所有的资源    if (channel != null) {      channel.close;    }    channelGroup.close;    group.shutdownGracefully;  }  public static void main(String args) throws Exception {    if (args.length != 1) {      System.err.println("Please give port as argument");      System.exit(1);    }    int port = Integer.parseInt(args[0]);    final ChatServer endpoint = new ChatServer;    ChannelFuture future = endpoint.start(      new InetSocketAddress(port));    Runtime.getRuntime.addShutdownHook(new Thread {      @Override      public void run {        endpoint.destroy;      }    });    future.channel.closeFuture.syncUninterruptibly;  }}  

这也就完成了该应用程序本身。现在让我们来测试它吧。

12.4 测试该应用程序

目录chapter12中的示例代码包含了你需要用来构建并运行该服务器的所有资源。(如果你还没有设置好你的包括Apache Maven在内的开发环境,参见第2章中的操作说明。)

我们将使用下面的Maven命令来构建和启动服务器:

mvn -PChatServer clean package exec:exec  

项目文件pom.xml被配置为在端口9999上启动服务器。如果要使用不同的端口,可以通过编辑文件中对应的值,或者使用一个System属性来对它进行重写:

mvn -PChatServer -Dport=1111 clean package exec:exec  

代码清单12-5展示了该命令主要的输出(无关紧要的行已经被删除了)。

代码清单12-5 编译并运行ChatServer

$ mvn -PChatServer clean package exec:exec[INFO] Scanning for projects...[INFO][INFO] ----------------------------------------------------------------[INFO] Building ChatServer 1.0-SNAPSHOT[INFO] ----------------------------------------------------------------...[INFO][INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ netty-in-action ---[INFO] Building jar: target/chat-server-1.0-SNAPSHOT.jar[INFO][INFO] --- exec-maven-plugin:1.2.1:exec (default-cli) @ chat-server ---Starting ChatServer on port 9999  

你通过将自己的浏览器指向http://localhost:9999来访问该应用程序。图12-5展示了其在Chrome浏览器中的UI。

图中展示了两个已经连接的客户端。第一个客户端是使用上面的界面连接的,第二个客户端则是通过底部的Chrome浏览器的命令行工具连接的。[5]你会注意到,两个客户端都发送了消息,并且每个消息都显示在两个客户端中。

这是一个非常简单的演示,演示了WebSocket如何在浏览器中实现实时通信。

图12-5 基于WebSocket的ChatServer的演示

如何进行加密

在真实世界的场景中,你将很快就会被要求向该服务器添加加密。使用Netty,这不过是将一个SslHandler添加到ChannelPipeline中,并配置它的问题。代码清单12-6展示了如何通过扩展我们的ChatServerInitializer来创建一个SecureChatServerInitializer以完成这个需求。

代码清单12-6 为ChannelPipeline添加加密

public class SecureChatServerInitializer extends ChatServerInitializer {   ← --  扩展ChatServerInitializer以添加加密   private final SslContext context;  public SecureChatServerInitializer(ChannelGroup group,    SslContext context) {    super(group);    this.context = context;  }  @Override  protected void initChannel(Channel ch) throws Exception {    super.initChannel(ch);  ← --  调用父类的initChannel方法    SSLEng.ine engine = context.newEngine(ch.alloc);    engine.setUseClientMode(false);    ch.pipeline.addFirst(new SslHandler(engine));  ← --  将SslHandler 添加到ChannelPipeline 中    }}  

最后一步是调整ChatServer以使用SecureChatServerInitializer,以便在Channel-Pipeline中安装SslHandler。这给了我们代码清单12-7中所展示的SecureChatServer

代码清单12-7 向ChatServer添加加密

public class SecureChatServer extends ChatServer {  ← --   SecureChatServer 扩展ChatServer 以支持加密  private final SslContext context;  public SecureChatServer(SslContext context) {    this.context = context;  }  @Override  protected ChannelInitializer<Channel> createInitializer(    ChannelGroup group) {    return new SecureChatServerInitializer(group, context);   ← --   返回之前创建的SecureChatServer-Initializer 以启用加密  }  public static void main(String args) throws Exception {    if (args.length != 1) {      System.err.println("Please give port as argument");      System.exit(1);    }    int port = Integer.parseInt(args[0]);    SelfSignedCertificate cert = new SelfSignedCertificate;    SslContext context = SslContext.newServerContext(    cert.certificate, cert.privateKey);    final SecureChatServer endpoint = new SecureChatServer(context);    ChannelFuture future = endpoint.start(new InetSocketAddress(port));    Runtime.getRuntime.addShutdownHook(new Thread {      @Override      public void run {        endpoint.destroy;      }    });    future.channel.closeFuture.syncUninterruptibly;  }}  

这就是为所有的通信启用SSL/TLS加密需要做的全部。和之前一样,可以使用Apache Maven来运行该应用程序,如代码清单12-8所示。它还将检索任何所需的依赖。

代码清单12-8 启动SecureChatServer

$ mvn -PSecureChatServer clean package exec:exec[INFO] Scanning for projects...[INFO][INFO] ----------------------------------------------------------------[INFO] Building ChatServer 1.0-SNAPSHOT[INFO] ----------------------------------------------------------------...[INFO][INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ netty-in-action ---[INFO] Building jar: target/chat-server-1.0-SNAPSHOT.jar[INFO][INFO] --- exec-maven-plugin:1.2.1:exec (default-cli) @ chat-server ---Starting SecureChatServer on port 9999  

现在,你便可以从SecureChatServer的HTTPS URL地址https://localhost:9999访问它了。

12.5 小结

在本章中,你学习了如何使用Netty的WebSocket实现来管理Web应用程序中的实时数据。我们覆盖了其所支持的数据类型,并讨论了你可能会遇到的一些限制。尽管不可能在所有的情况下都使用WebSocket,但是仍然需要清晰地认识到,它代表了Web技术的一个重要进展。


[1] “Real-time Web Services Orchestration and Choreography”:http://ceur-ws.org/Vol-601/EOMAS10_paper13.pdf。

[2] IETF RFC 6455, The WebSocket Protocol: http://tools.ietf.org/html/rfc6455。

[3] Mozilla开发者网络,“Protocol upgrade mechanism”:https://developer.mozilla.org/en-US/docs/HTTP/ Protocol_upgrade_mechanism。

[4] 在这个例子中,我们假设使用了13版的WebSocket协议,所以图中展示的是WebSocketFrameDecoder13WebSocketFrameEncoder13

[5] 也可以通过在一个新的浏览器中访问http://localhost:9999来达到同样的目的,从而代替Chrome浏览器的开发者工具。——译者注