首页 » Netty实战 » Netty实战全文在线阅读

《Netty实战》第13章 使用UDP广播事件

关灯直达底部

本章主要内容

  • UDP概述
  • 一个示例广播应用程序

到目前为止,你所见过的绝大多数的例子都使用了基于连接的协议,如TCP。在本章中,我们将会把重点放在一个无连接协议即用户数据报协议(UDP)上,它通常用在性能至关重要并且能够容忍一定的数据包丢失的情况下[1]。

我们将会首先概述UDP的特性以及它的局限性。在这之后,我们将描述本章的示例应用程序,其将演示如何使用UDP的广播能力。我们还会使用一个编码器和一个解码器来处理作为广播消息格式的POJO。在本章的结束时候,你将能够在自己的应用程序中使用UDP。

13.1 UDP的基础知识

面向连接的传输(如TCP)管理了两个网络端点之间的连接的建立,在连接的生命周期内的有序和可靠的消息传输,以及最后,连接的有序终止。相比之下,在类似于UDP这样的无连接协议中,并没有持久化连接这样的概念,并且每个消息(一个UDP数据报)都是一个单独的传输单元。

此外,UDP也没有TCP的纠错机制,其中每个节点都将确认它们所接收到的包,而没有被确认的包将会被发送方重新传输。

通过类比,TCP连接就像打电话,其中一系列的有序消息将会在两个方向上流动。相反,UDP则类似于往邮箱中投入一叠明信片。你无法知道它们将以何种顺序到达它们的目的地,或者它们是否所有的都能够到达它们的目的地。

UDP的这些方面可能会让你感觉到严重的局限性,但是它们也解释了为何它会比TCP快那么多:所有的握手以及消息管理机制的开销都已经被消除了。显然,UDP很适合那些能够处理或者容忍消息丢失的应用程序,但可能不适合那些处理金融交易的应用程序[2]。

13.2 UDP广播

到目前为止,我们所有的例子采用的都是一种叫作单播[3]的传输模式,定义为发送消息给一个由唯一的地址所标识的单一的网络目的地。面向连接的协议和无连接协议都支持这种模式。

UDP提供了向多个接收者发送消息的额外传输模式:

  • 多播——传输到一个预定义的主机组;
  • 广播——传输到网络(或者子网)上的所有主机。

本章中的示例应用程序将通过发送能够被同一个网络中的所有主机所接收的消息来演示UDP广播的使用。为此,我们将使用特殊的受限广播地址或者零网络地址255.255.255.255。发送到这个地址的消息都将会被定向给本地网络(0.0.0.0)上的所有主机,而不会被路由器转发给其他的网络。

接下来,我们将讨论该应用程序的设计。

13.3 UDP示例应用程序

我们的示例程序将打开一个文件,随后将会通过UDP把每一行都作为一个消息广播到一个指定的端口。如果你熟悉类UNIX操作系统,你可能会认识到这是标准的syslog实用程序的一个非常简化的版本。UDP非常适合于这样的应用程序,因为考虑到日志文件本身已经被存储在了文件系统中,因此,偶尔丢失日志文件中的一两行是可以容忍的。此外,该应用程序还提供了极具价值的高效处理大量数据的能力。

接收方是怎么样的呢?通过UDP广播,只需简单地通过在指定的端口上启动一个监听程序,便可以创建一个事件监视器来接收日志消息。需要注意的是,这样的轻松访问性也带来了潜在的安全隐患,这也就是为何在不安全的环境中并不倾向于使用UDP广播的原因之一。出于同样的原因,路由器通常也会阻止广播消息,并将它们限制在它们的来源网络上。

发布/订阅模式 类似于syslog这样的应用程序通常会被归类为发布/订阅模式:一个生产者或者服务发布事件,而多个客户端进行订阅以接收它们。

图13-1展示了整个系统的一个高级别视图,其由一个广播者以及一个或者多个事件监视器所组成。广播者将监听新内容的出现,当它出现时,则通过UDP将它作为一个广播消息进行传输。

图13-1 广播系统概览

所有的在该UDP端口上监听的事件监视器都将会接收到广播消息。

为了简单起见,我们将不会为我们的示例程序添加身份认证、验证或者加密。但是,要加入这些功能并使得其成为一个健壮的、可用的实用程序应该也不难。

在下一节中,我们将开始探讨该广播者组件的设计以及实现细节。

13.4 消息POJO: LogEvent

在消息处理应用程序中,数据通常由POJO表示,除了实际上的消息内容,其还可以包含配置或处理信息。在这个应用程序中,我们将会把消息作为事件处理,并且由于该数据来自于日志文件,所以我们将它称为LogEvent。代码清单13-1展示了这个简单的POJO的详细信息。

代码清单13-1 LogEvent消息

public final class LogEvent {  public static final byte SEPARATOR = (byte) ':';  private final InetSocketAddress source;  private final String logfile;  private final String msg;  private final long received;  public LogEvent(String logfile, String msg) {   ← --  用于传出消息的构造函数    this(null, -1, logfile, msg);  }  public LogEvent(InetSocketAddress source, long received,  ← --  用于传入消息的构造函数    String logfile, String msg) {    this.source = source;    this.logfile = logfile;    this.msg = msg;    this.received = received;  }  public InetSocketAddress getSource {  ← --   返回发送LogEvent 的源的InetSocketAddress    return source;  }  public String getLogfile {   ← -- 返回所发送的LogEvent的日志文件的名称    return logfile;  }  public String getMsg {   ← -- 返回消息内容    return msg;  }  public long getReceivedTimestamp {    ← -- 返回接收LogEvent的时间    return received;  }}  

定义好了消息组件,我们便可以实现该应用程序的广播逻辑了。在下一节中,我们将研究用于编码和传输LogEvent消息的Netty框架类。

13.5 编写广播者

Netty提供了大量的类来支持UDP应用程序的编写。表13-1列出了我们将要使用的主要的消息容器以及Channel类型。

表13-1 在广播者中使用的Netty的UDP相关类

名  称

描  述

interface AddressedEnvelope
  <M, A extends SocketAddress>
  extends ReferenceCounted

定义一个消息,其包装了另一个消息并带有发送者和接收者地址。其中M是消息类型;A是地址类型

class DefaultAddressedEnvelope
  <M, A extends SocketAddress>
  implements AddressedEnvelope<M,A>

提供了interface AddressedEnvelope的默认实现

class DatagramPacket
  extends DefaultAddressedEnvelope
    <ByteBuf, InetSocketAddress>
  implements ByteBufHolder

扩展了DefaultAddressedEnvelope以使用ByteBuf作为消息数据容器

interface DatagramChannel
  extends Channel

扩展了Netty的Channel抽象以支持UDP的多播组管理

class NioDatagramChannnel
  extends AbstractNioMessageChannel
  implements DatagramChannel

定义了一个能够发送和接收Addressed- Envelope消息的Channel类型

Netty的DatagramPacket是一个简单的消息容器,DatagramChannel实现用它来和远程节点通信。类似于在我们先前的类比中的明信片,它包含了接收者(和可选的发送者)的地址以及消息的有效负载本身。

要将LogEvent消息转换为DatagramPacket,我们将需要一个编码器。但是没有必要从头开始编写我们自己的。我们将扩展Netty的MessageToMessageEncoder,在第10章和第11章中我们已经使用过了。

图13-2展示了正在广播的3个日志条目,每一个都将通过一个专门的DatagramPacket进行广播。

图13-2 通过DatagramPacket发送的日志条目

图13-3呈现了该LogEventBroadcasterChannelPipeline的一个高级别视图,展示了LogEvent消息是如何流经它的。

图13-3 LogEventBroadcasterChannelPipelineLogEvent事件流

正如你所看到的,所有的将要被传输的数据都被封装在了LogEvent消息中。LogEvent-Broadcaster将把这些写入到Channel中,并通过ChannelPipeline发送它们,在那里它们将会被转换(编码)为DatagramPacket消息。最后,他们都将通过UDP被广播,并由远程节点(监视器)所捕获。

代码清单13-2展示了我们自定义版本的MessageToMessageEncoder,其将执行刚才所描述的转换。

代码清单13-2 LogEventEncoder

public class LogEventEncoder extends MessageToMessageEncoder<LogEvent> {  private final InetSocketAddress remoteAddress;  public LogEventEncoder(InetSocketAddress remoteAddress) {   ← --  LogEventEncoder 创建了即将被发送到指定的InetSocketAddress 的DatagramPacket 消息    this.remoteAddress = remoteAddress;  }  @Override  protected void encode(ChannelHandlerContext channelHandlerContext,    LogEvent logEvent, List<Object> out) throws Exception {    byte file = logEvent.getLogfile.getBytes(CharsetUtil.UTF_8);    byte msg = logEvent.getMsg.getBytes(CharsetUtil.UTF_8);    ByteBuf buf = channelHandlerContext.alloc      .buffer(file.length + msg.length + 1);    buf.writeBytes(file);   ← -- 将文件名写入到ByteBuf 中    buf.writeByte(LogEvent.SEPARATOR);  ← -- 添加一个SEPARATOR     buf.writeBytes(msg); ← -- 将日志消息写入ByteBuf 中     out.add(new DatagramPacket(buf, remoteAddress)); ← -- 将一个拥有数据和目的地地址的新DatagramPacket添加到出站的消息列表中  }}  

LogEventEncoder被实现之后,我们已经准备好了引导该服务器,其包括设置各种各样的ChannelOption,以及在ChannelPipeline中安装所需要的ChannelHandler。这将通过主类LogEventBroadcaster完成,如代码清单13-3所示。

代码清单13-3 LogEventBroadcaster

public class LogEventBroadcaster {  private final EventLoopGroup group;  private final Bootstrap bootstrap;  private final File file;  public LogEventBroadcaster(InetSocketAddress address, File file) {    group = new NioEventLoopGroup;    bootstrap = new Bootstrap;    bootstrap.group(group).channel(NioDatagramChannel.class)   ← --  引导该NioDatagram-Channel(无连接的)      .option(ChannelOption.SO_BROADCAST, true)   ← --  设置SO_BROADCAST套接字选项      .handler(new LogEventEncoder(address));    this.file = file;  }  public void run throws Exception {    Channel ch = bootstrap.bind(0).sync.channel;   ← --  绑定Channel     long pointer = 0;    for (;;) {  ← -- 启动主处理循环       long len = file.length;      if (len < pointer) {        // file was reset        pointer = len;  ← -- 如果有必要,将文件指针设置到该文件的最后一个字节        } else if (len > pointer) {        // Content was added        RandomAccessFile raf = new RandomAccessFile(file, "r");        raf.seek(pointer);  ← -- 设置当前的文件指针,以确保没有任何的旧日志被发送        String line;        while ((line = raf.readLine) != null) {          ch.writeAndFlush(new LogEvent(null, -1, ← -- 对于每个日志条目,写入一个LogEvent到Channel 中          file.getAbsolutePath, line));        }        pointer = raf.getFilePointer;  ← -- 存储其在文件中的当前位置          raf.close;      }      try {        Thread.sleep(1000);        } catch (InterruptedException e) {  ← -- 休眠1 秒,如果被中断,则退出循环;否则重新处理它        Thread.interrupted;        break;      }    }  }  public void stop {    group.shutdownGracefully;  }  public static void main(String args) throws Exception {    if (args.length != 2) {      throw new IllegalArgumentException;    }    LogEventBroadcaster broadcaster = new LogEventBroadcaster(  ← -- 创建并启动一个新的LogEventBroadcaster的实例      new InetSocketAddress("255.255.255.255",        Integer.parseInt(args[0])), new File(args[1]));    try {      broadcaster.run;    }    finally {      broadcaster.stop;    }  }}  

这样就完成了该应用程序的广播者组件。对于初始测试,你可以使用netcat程序。在UNIX/Linux系统中,你能发现它已经作为nc被预装了。用于Windows的版本可以从http://nmap.org/ncat获取[4]。

netcat非常适合于对这个应用程序进行基本的测试;它只是监听某个指定的端口,并且将所有接收到的数据打印到标准输出。可以通过下面所示的方式,将其设置为监听UDP端口9999上的数据:

$ nc -l -u -p 9999  

现在我们需要启动我们的LogEventBroadcaster。代码清单13-4展示了如何使用mvn来编译和运行该广播者应用程序。pom.xml文件中的配置指向了一个将被频繁更新的文件,/var/log/messages(假设是一个UNIX/Linux环境),并将端口设置为了9999。该文件中的条目将会通过UDP广播到那个端口,并在你启动了netcat的终端上打印出来。

代码清单13-4 编译和启动LogEventBroadcaster

$ chapter13> mvn clean package exec:exec LogEventBroadcaster[INFO] Scanning for projects...[INFO][INFO] --------------------------------------------------------------------[INFO] Building UDP Broadcast 1.0-SNAPSHOT[INFO] --------------------------------------------------------------------......[INFO][INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ netty-in-action ---[INFO] Building jar: target/chapter13-1.0-SNAPSHOT.jar[INFO][INFO] --- exec-maven-plugin:1.2.1:exec (default-cli) @ netty-in-action – LogEventBroadcaster running  

要改变该日志文件和端口值,可以在启动mvn的时候通过System属性来指定它们。代码清单13-5展示了如何将日志文件设置为/var/log/mail.log,并将端口设置为8888

代码清单13-5 编译和启动LogEventBroadcaster

$ chapter13> mvn clean package exec:exec -PLogEventBroadcaster /-Dlogfile=/var/log/mail.log –Dport=8888 –........[INFO][INFO] --- exec-maven-plugin:1.2.1:exec (default-cli) @ netty-in-action –LogEventBroadcaster running  

当你看到LogEventBroadcaster running时,你便知道它已经成功地启动了。如果有错误发生,将会打印一个异常消息。一旦这个进程运行起来,它就会广播任何新被添加到该日志文件中的日志消息。

使用netcat对于测试来说是足够了,但是它并不适合于生产系统。这也就有了我们的应用程序的第二个部分——我们将在下一节中实现的广播监视器。

13.6 编写监视器

我们的目标是将netcat替换为一个更加完整的事件消费者,我们称之为LogEventMonitor。这个程序将:

(1)接收由LogEventBroadcaster广播的UDP DatagramPacket

(2)将它们解码为LogEvent消息;

(3)将LogEvent消息写出到System.out

和之前一样,该逻辑由一组自定义的ChannelHandler实现——对于我们的解码器来说,我们将扩展MessageToMessageDecoder。图13-4描绘了LogEventMonitorChannel-Pipeline,并且展示了LogEvent是如何流经它的。

图13-4 LogEventMonitor

ChannelPipeline中的第一个解码器LogEventDecoder负责将传入的DatagramPacket解码为LogEvent消息(一个用于转换入站数据的任何Netty应用程序的典型设置)。代码清单13-6展示了该实现。

代码清单13-6 LogEventDecoder

public class LogEventDecoder extends MessageToMessageDecoder<DatagramPacket> {  @Override  protected void decode(ChannelHandlerContext ctx,    DatagramPacket datagramPacket, List<Object> out) throws Exception {   ← --  获取对DatagramPacket 中的数据(ByteBuf)的引用    ByteBuf data = datagramPacket.content;      int idx = data.indexOf(0, data.readableBytes,   ← --  获取该SEPARATOR的索引      LogEvent.SEPARATOR);    String filename = data.slice(0, idx)  ← --  提取文件名       .toString(CharsetUtil.UTF_8);    String logMsg = data.slice(idx + 1,  ← -- 提取日志消息       data.readableBytes).toString(CharsetUtil.UTF_8);    LogEvent event = new LogEvent(datagramPacket.sender,   ← -- 构建一个新的LogEvent 对象,并且将它添加到(已经解码的消息的)列表中      System.currentTimeMillis, filename, logMsg);    out.add(event);  }}  

第二个ChannelHandler的工作是对第一个ChannelHandler所创建的LogEvent消息执行一些处理。在这个场景下,它只是简单地将它们写出到System.out。在真实世界的应用程序中,你可能需要聚合来源于不同日志文件的事件,或者将它们发布到数据库中。代码清单13-7展示了LogEventHandler,其说明了需要遵循的基本步骤。

代码清单13-7 LogEventHandler

public class LogEventHandler  extends SimpleChannelInboundHandler<LogEvent> {   ← --  扩展SimpleChannelInbound-Handler 以处理LogEvent 消息  @Override  public void exceptionCaught(ChannelHandlerContext ctx,    Throwable cause) throws Exception {    cause.printStackTrace;  ← --  当异常发生时,打印栈跟踪信息,并关闭对应的Channel    ctx.close;  }  @Override  public void channelRead0(ChannelHandlerContext ctx,    LogEvent event) throws Exception {    StringBuilder builder = new StringBuilder;  ← --   创建StringBuilder,并且构建输出的字符串    builder.append(event.getReceivedTimestamp);    builder.append(" [");    builder.append(event.getSource.toString);    builder.append("] [");    builder.append(event.getLogfile);    builder.append("] : ");    builder.append(event.getMsg);    System.out.println(builder.toString);  ← --  打印LogEvent的数据   }}  

LogEventHandler将以一种简单易读的格式打印LogEvent消息,包括以下的各项:

  • 以毫秒为单位的被接收的时间戳;
  • 发送方的InetSocketAddress,其由IP地址和端口组成;
  • 生成LogEvent消息的日志文件的绝对路径名;
  • 实际上的日志消息,其代表日志文件中的一行。

现在我们需要将我们的LogEventDecoderLogEventHandler安装到ChannelPipeline中,如图13-4所示。代码清单13-8展示了如何通过LogEventMonitor主类来做到这一点。

代码清单13-8 LogEventMonitor

public class LogEventMonitor {  private final EventLoopGroup group;  private final Bootstrap bootstrap;  public LogEventMonitor(InetSocketAddress address) {    group = new NioEventLoopGroup;    bootstrap = new Bootstrap;    bootstrap.group(group)   ← --  引导该NioDatagramChannel      .channel(NioDatagramChannel.class)      .option(ChannelOption.SO_BROADCAST, true)   ← --  设置套接字选项SO_BROADCAST      .handler( new ChannelInitializer<Channel> {        @Override        protected void initChannel(Channel channel)          throws Exception {          ChannelPipeline pipeline = channel.pipeline;          pipeline.addLast(new LogEventDecoder);   ← --  将LogEventDecoder 和LogEventHandler 添加到ChannelPipeline 中          pipeline.addLast(new LogEventHandler);        }      } )      .localAddress(address);  }  public Channel bind {    return bootstrap.bind.syncUninterruptibly.channel;  ← -- 绑定Channel。 注意,DatagramChannel 是无连接的  }  public void stop {    group.shutdownGracefully;  }  public static void main(String main) throws Exception {    if (args.length != 1) {      throw new IllegalArgumentException(      "Usage: LogEventMonitor <port>");    }    LogEventMonitor monitor = new LogEventMonitor(   ← -- 构造一个新的LogEventMonitor      new InetSocketAddress(Integer.parseInt(args[0])));    try {      Channel channel = monitor.bind;      System.out.println("LogEventMonitor running");      channel.closeFuture.sync;    } finally {      monitor.stop;    }  }}  

13.7 运行LogEventBroadcaster和LogEventMonitor

和之前一样,我们将使用Maven来运行该应用程序。这一次你将需要打开两个控制台窗口,每个都将运行一个应用程序。每个应用程序都将会在直到你按下了Ctrl+C组合键来停止它之前一直保持运行。

首先,你需要启动LogEventBroadcaster,因为你已经构建了该工程,所以下面的命令应该就足够了(使用默认值):

$ chapter13> mvn exec:exec -PLogEventBroadcaster  

和之前一样,这将通过UDP协议广播日志消息。

现在,在一个新窗口中,构建并且启动LogEventMonitor以接收和显示广播消息,如代码清单13-9所示。

代码清单13-9 编译并启动LogEventBroadcaster

$ chapter13> mvn clean package exec:exec -PLogEventMonitor[INFO] Scanning for projects...[INFO][INFO] --------------------------------------------------------------------[INFO] Building UDP Broadcast 1.0-SNAPSHOT[INFO] --------------------------------------------------------------------[INFO][INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ netty-in-action ---[INFO] Building jar: target/chapter14-1.0-SNAPSHOT.jar[INFO][INFO] --- exec-maven-plugin:1.2.1:exec (default-cli) @ netty-in-action ---LogEventMonitor running  

当你看到LogEventMonitor running时,你将知道它已经成功地启动了。如果有错误发生,则将会打印异常信息。

如代码清单13-10所示,当任何新的日志事件被添加到该日志文件中时,该终端都会显示它们。消息的格式则是由LogEventHandler创建的。

代码清单13-10 LogEventMonitor的输出

1364217299382 [/192.168.0.38:63182] [/var/log/messages] : Mar 25 13:55:08   dev-linux dhclient: DHCPREQUEST of 192.168.0.50 on eth2 to 192.168.0.254   port 671364217299382 [/192.168.0.38:63182] [/var/log/messages] : Mar 25 13:55:08   dev-linux dhclient: DHCPACK of 192.168.0.50 from 192.168.0.2541364217299382 [/192.168.0.38:63182] [/var/log/messages] : Mar 25 13:55:08   dev-linux dhclient: bound to 192.168.0.50 -- renewal in 270 seconds.1364217299382 [/192.168.0.38:63182] [[/var/log/messages] : Mar 25 13:59:38   dev-linux dhclient: DHCPREQUEST of 192.168.0.50 on eth2 to 192.168.0.254   port 671364217299382 [/192.168.0.38:63182] [/[/var/log/messages] : Mar 25 13:59:38   dev-linux dhclient: DHCPACK of 192.168.0.50 from 192.168.0.2541364217299382 [/192.168.0.38:63182] [/var/log/messages] : Mar 25 13:59:38   dev-linux dhclient: bound to 192.168.0.50 -- renewal in 259 seconds.1364217299383 [/192.168.0.38:63182] [/var/log/messages] : Mar 25 14:03:57   dev-linux dhclient: DHCPREQUEST of 192.168.0.50 on eth2 to 192.168.0.254   port 671364217299383 [/192.168.0.38:63182] [/var/log/messages] : Mar 25 14:03:57   dev-linux dhclient: DHCPACK of 192.168.0.50 from 192.168.0.2541364217299383 [/192.168.0.38:63182] [/var/log/messages] : Mar 25 14:03:57   dev-linux dhclient: bound to 192.168.0.50 -- renewal in 285 seconds.  

如果你不能访问UNIX的syslog,那么你可以创建一个自定义的文件,并手动提供内容以观测该应用程序的反应。以使用touch命令来创建一个空文件作为开始,下面所展示的步骤使用了UNIX命令。

$ touch ~/mylog.log  

现在再次启动LogEventBroadcaster,并通过设置系统属性来将其指向该文件:

$ chapter13> mvn exec:exec -PLogEventBroadcaster -Dlogfile=~/mylog.log  

一旦LogEventBroadcaster运行,你就可以手动将消息添加到该文件中,以在LogEventMonitor终端中查看广播输出。使用echo命令并将输出重定向到该文件,如下所示:

$ echo 'Test log entry' >> ~/mylog.log  

你可以根据需要启动任意多的监视器实例,它们每一个都将接收并显示相同的消息。

13.8 小结

在本章中,我们使用UDP作为例子介绍了无连接协议。我们构建了一个示例应用程序,其将日志条目转换为UDP数据报并广播它们,随后这些被广播出去的消息将被订阅的监视器客户端所捕获。我们的实现使用了一个POJO来表示日志数据,并通过一个自定义的编码器来将这个消息格式转换为Netty的DatagramPacket。这个例子说明了Netty的UDP应用程序可以很轻松地被开发和扩展用以支持专业化的用途。

在接下来的两章中,我们将把目光投向由知名公司的用户所提供的案例研究上,他们已使用Netty构建了工业级别的应用程序。


[1] 最有名的基于UDP的协议之一便是域名服务(DNS),其将完全限定的名称映射为数字的IP地址。

[2] 基于UDP协议实现的一些可靠传输协议可能不在此范畴内,如Quic、Aeron和UDT。——译者注

[3] 参见http://en.wikipedia.org/wiki/Unicast。

[4] 也可以使用scoop install netcat。——译者注