预置的 ChannelHandler 和编解码器

大彬大约 6 分钟

预置的 ChannelHandler 和编解码器

SSL/TLS

Java 提供了 javax.net.ssl 支持 SSL/TSL,用以实现数据安全。

添加 SSL/TLS 支持:

public class SslChannelInitializer extends ChannelInitializer<Channel> {
    private final SslContext context;
    private final boolean startTls;

    public SslChannelInitializer(SslContext context, boolean startTls) {
        this.context = context;
        this.startTls = startTls;
    }

    @Override
    protected void initChannel(Channel channel) throws Exception {
        SSLEngine engine = context.newEngine(channel.alloc());//alloc返回channel所配置的ByteBufAllocator
        channel.pipeline().addFirst("ssl",
                new SslHandler(engine, startTls));//大多数情况SslHandler是第一个ChannelHandler
                                                  //这确保了所有其他的ChannelHandler处理数据之后,才会进行加密。
    }
}

HTTP/HTTPS 应用程序

完整的 HTTP 请求(FullHttpRequest)包括请求头信息、若干个 HTTPContent 和 LastHttpContent。

完整的 HTTP 响应(FullHttpResponse)包括响应头信息、若干个 HTTPContent 和 LastHttpContent。

所有类型的 HTTP 消息都实现了 HttpObject 接口。

HTTP 编解码器:HttpRequestEncoder、HttpResponseEncoder、HttpReqeustDecoder 和 HttpResponseDecoder。

HttpResponseDecoder:将字节解码为 HttpResponse、HttpContent 和 LastHttpContent。

添加 HTTP 支持

public class HttpPipelineInitializer extends ChannelInitializer<Channel> {
    private final boolean client;

    public HttpPipelineInitializer(boolean client) {
        this.client = client;
    }
    
    @Override
    protected void initChannel(Channel channel) throws Exception {
        ChannelPipeline pipeline = channel.pipeline();
        if (client) {
            pipeline.addLast("decoder", new HttpResponseDecoder());
            pipeline.addLast("encoder", new HttpRequestEncoder());
        } else {
            pipeline.addLast("decoder", new HttpRequestDecoder());
            pipeline.addLast("encoder", new HttpResponseEncoder());
        }
    }
}

判断是否是客户端,如果是客户端,则添加 HttpResponseDecoder 对服务器响应进行解码。

聚合 HTTP 消息

由于 HTTP 请求和响应可能由多个部分组成,需要将它们聚合成完整的消息。Netty 提供了一个聚合器,可以将多个消息部分合并成 FullHttpRequest 或者 FullHttpResponse 消息。

自动聚合 HTTP 的消息片段:

public class HttpAggregarotInitializer extends ChannelInitializer<Channel> {
    private final boolean isClient;

    public HttpAggregarotInitializer(boolean isClient) {
        this.isClient = isClient;
    }

    @Override
    protected void initChannel(Channel channel) throws Exception {
        ChannelPipeline pipeline = channel.pipeline();
        if (isClient) {
            pipeline.addLast("codec", new HttpClientCodec());
        } else {
            pipeline.addLast("codec", new HttpServerCodec());
        }
        pipeline.addLast("aggregator", //最大消息大小是512kb
                new HttpObjectAggregator(512*1024));
    }
}

HttpServerCodec 里面组合了HttpResponseEncoder和HttpRequestDecoder。

HttpClientCodec 里面组合了HttpRequestEncoder和HttpResponseDecoder。

HTTP 压缩

当使用HTTP 时,建议服务器端开启压缩功能以尽可能多地减小传输数据的大小。Netty 为压缩和解压缩提供了ChannelHandler 实现,它们同时支持gzip 和deflate 编码。

自动压缩 HTTP 消息:

public class HttpCompressionInitializer extends ChannelInitializer<Channel> {
    private final boolean isClient;
    
    public HttpCompressionInitializer(boolean isClient) {
        this.isClient = isClient;
    }
    
    @Override
    protected void initChannel(Channel channel) throws Exception {
        ChannelPipeline pipeline = channel.pipeline();
        if (isClient) {
            pipeline.addLast("codec", new HttpClientCodec());
            pipeline.addLast("decompressor",
                    new HttpContentDecompressor());//处理来自服务器的压缩内容
        } else {
            pipeline.addLast("codec", new HttpServerCodec());
            pipeline.addLast("compressor", 
                    new HttpContentCompressor());//服务器端压缩数据
        }
    }
}

HTTPS

启动 HTTPS 只需要将 SslHandler 添加到 ChannelPipeline。

public class HttpsCodecInitializer extends ChannelInitializer<Channel> {
    private final SslContext context;
    private final boolean isClient;
    
    public HttpsCodecInitializer(SslContext context, boolean isClient) {
        this.context = context;
        this.isClient = isClient;
    }
    
    @Override
    protected void initChannel(Channel channel) throws Exception {
        ChannelPipeline pipeline = channel.pipeline();
        SSLEngine engine = context.newEngine(channel.alloc());
        pipeline.addFirst("ssl", new SslHandler(engine));//添加SslHandler之后可以使用https
        
        if (isClient) {
            pipeline.addLast("codec", new HttpClientCodec());
        } else {
            pipeline.addLast("codec", new HttpServerCodec());
        }
    }
}

WebSocket

WebSocket 在客户端和服务器之间提供了真正的双向数据交换。

WebSocketFrame 类型:

名称描述
BinaryWebSocketFrame二进制数据帧
TextWebSocketFrame文本数据帧
ContinuationWebSocketFrame二进制和文本数据帧结合体
CloseWebSocketFrame控制帧:一个close请求、关闭的状态码以及关闭的原因
PingWebSocketFrame控制帧:请求一个PongWebSocketFrame
PongWebSocketFrame控制帧:对PingWebSocketFrame请求的响应

WebSocketServerProtocolHandler 处理协议升级握手,以及三种控制帧--Close、Ping 和 Pong。Text和Binary数据帧将会被传递给下一个 ChannelHandler 进行处理。

public class WebSocketServerInitializer extends ChannelInitializer<Channel> {
    @Override
    protected void initChannel(Channel channel) throws Exception {
        channel.pipeline().addLast(
                new HttpServerCodec(),
                new HttpObjectAggregator(65536),
                new WebSocketServerProtocolHandler("/websocket"),//升级握手
                new TextFrameHandler(),
                new BinaryFrameHandler(),
                new ContinuationFrameHandler());

    }
    public static final class TextFrameHandler extends
            SimpleChannelInboundHandler<TextWebSocketFrame> {

        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
            //handle text frame
        }
    }

    public static final class BinaryFrameHandler extends
            SimpleChannelInboundHandler<BinaryWebSocketFrame> {

        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, BinaryWebSocketFrame binaryWebSocketFrame) throws Exception {
            //handle binary frame
        }
    }

    public static final class ContinuationFrameHandler extends
            SimpleChannelInboundHandler<ContinuationWebSocketFrame> {

        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, ContinuationWebSocketFrame continuationWebSocketFrame) throws Exception {
            //handle continuation frame
        }
    }
}

要想为WebSocket 添加安全性,只需要将SslHandler 作为第一个ChannelHandler 添加到ChannelPipeline 中。

空闲的连接和超时

用于空闲连接以及超时的 ChannelHandler。

发送心跳:

public class IdleStateHandlerInitializer extends ChannelInitializer<Channel> {
    @Override
    protected void initChannel(Channel channel) throws Exception {
        ChannelPipeline pipeline = channel.pipeline();
        //60s没有接受或发送数据,IdelStateHandler会使用IdleStateEvent调用fireUserEventTriggered()
        pipeline.addLast(new IdleStateHandler(
                0, 0, 60, TimeUnit.SECONDS));
        pipeline.addLast(new HeartbeatHandler());
    }

    public static final class HeartbeatHandler extends
            ChannelInboundHandlerAdapter {
        //发送到远程节点的心跳信息
        private static final ByteBuf HEARTBEAT_SEQUENCE =
                Unpooled.unreleasableBuffer(Unpooled.copiedBuffer(
                        "HEARTBEAT", CharsetUtil.ISO_8859_1));

        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof IdleStateEvent) {
                //连接空闲时间太长时,发送心跳消息,并在发送失败时关闭该连接
                ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate())
                        .addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                super.userEventTriggered(ctx, evt);//传递给下一个ChannelInboundHandler
            }
        }
    }
}

使用 IdleStateHandler 测试远程节点是否还活着,失活时关闭连接释放资源。

基于分隔符的协议

基于分隔符的协议的解码器

名称描述
DelimiterBasedFrameDecoder使用自定义分隔符提取帧的通用解码器
LineBasedFrameDecoder提取由行尾符分隔的解码器,速度比DeimiterBasedFrameDecoder快

分隔符提取帧:

public class LineBasedHandlerInitializer extends ChannelInitializer<Channel> {
    @Override
    protected void initChannel(Channel channel) throws Exception {
        ChannelPipeline pipeline = channel.pipeline();
        //提取帧,并传给下一个ChannelHandler
        pipeline.addLast(new LineBasedFrameDecoder(64*1024));
        pipeline.addLast(new FrameHandler());//接收数据帧
    }
    
    public static final class FrameHandler extends
            SimpleChannelInboundHandler<ByteBuf> {

        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
            //处理LineBasedFrameDecoder传进的帧
        }
    }

示例:1.每个帧都由换行符(\n)分隔;2.每个帧由一系列的元素组成,每个元素都由的单个空格字符分隔;3.一个帧内容代表一个命令,定义为一个命令名称后面跟着数目可变的参数。

public class CmdHandlerInitializer extends ChannelInitializer<Channel> {
    static final byte SPACE = (byte)' ';

    @Override
    protected void initChannel(Channel channel) throws Exception {
        ChannelPipeline pipeline = channel.pipeline();
        pipeline.addLast(new CmdDecoder(64 * 1024));
        pipeline.addLast(new CmdHandler());
    }

    public static final class Cmd {
        private final ByteBuf name;
        private final ByteBuf args;

        public Cmd(ByteBuf name, ByteBuf args) {
            this.name = name;
            this.args = args;
        }

        public ByteBuf getName() {
            return name;
        }

        public ByteBuf getArgs() {
            return args;
        }
    }

    public static final class CmdDecoder
            extends LineBasedFrameDecoder {

        public CmdDecoder(int maxLength) {
            super(maxLength);
        }

        @Override
        protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
            ByteBuf frame = (ByteBuf) super.decode(ctx, buffer);
            if (frame == null) {
                return null;
            }
            //查找第一个空格字符的索引,空格前是命令名称,后面是参数
            int index = frame.indexOf(frame.readerIndex(),
                    frame.writerIndex(), SPACE);
            return new Cmd(frame.slice(frame.readerIndex(), index),
                    frame.slice(index + 1, frame.writerIndex()));
        }
    }

    public static final class CmdHandler extends
            SimpleChannelInboundHandler<Cmd> {

        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, Cmd cmd) throws Exception {
            //处理cmd
        }
    }
}

基于长度的协议

基于长度的协议的解码器:

名称描述
FixedLengthFrameDecoder提取固定长度的帧
LengthFieldBasedFrameDecoder根据帧头部中的长度值提取帧;该字段的偏移量以及长度在构造函数中指定

变长帧:

public class LengthBasedInitializer extends ChannelInitializer<Channel> {
    @Override
    protected void initChannel(Channel channel) throws Exception {
        ChannelPipeline pipeLine = channel.pipeline();
        //帧起始的前8字节是帧长度
        pipeLine.addLast(new LengthFieldBasedFrameDecoder(64 * 1024, 0, 8));
        pipeLine.addLast(new FrameHandler());
    }
    
    public static class FrameHandler extends
            SimpleChannelInboundHandler<ByteBuf> {

        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
            //处理帧
        }
    }
}

写大型数据

当写大型数据到远程节点时,如果连接速度比较慢,数据依然不断的往内存写,可能导致内存耗尽。利用 NIO 的零拷贝特性,可以消除将文件内容从文件系统移动到网络栈的复制过程。应用程序需要做的就是实现一个 FileRegion 的接口。

利用零拷贝特性(FileRegion)来传输一个文件的内容。

FileInputStream in = new FileInputStream(File);
FileRegion region = new DefaultFileRegion(in.getChannel(), 0, file.length());
channel.writeAndFlush(region).addListener(
	new ChannelFuture(region).addListener(
    	new ChannelFutureListener() {
            
        }
    )
);
Loading...