关于什么是Protobuf和用Protobuf能做点什么,需要看这篇文章

1. 介绍

Netty官方是提供了方法来集成使用Protobuf的,主要在于4个类,2个framer,2个codec:

使用encoder和decoder来编码和解码protobuf的消息,使用framer来加长度头和截断消息。

2. 问题

在Netty中使用Protobuf会遇到一个问题。当一个服务器使用ServerBootstrap启动的时候,它绑定的PiplineFactory是唯一的,那么它在处理消息的时候,只能进行一次定制,绑定一种消息,而无法处理多种消息。

这反映在代码上是:

[codesyntax lang="java"]

return new ChannelPipelineFactory() {
    public ChannelPipeline getPipeline() throws Exception {
        ChannelPipeline pipeline = Channels.pipeline();
        //...
        pipeline.addLast("protobufDecoder", new ProtobufDecoder(YourMessage.getDefaultInstance()));
        //...
    }
};

[/codesyntax]

我们只能在decoder上绑定一种protobuf的message。在服务器收到消息的时候,只能解析出一种消息。而实际上的应用绝对不可能只有一种消息。那么,我们应该怎么办?

3. 解决

主要的解决思路来自:stackoverflow >> How best to specify a Protobuf for use with Netty (preferably using the built-in protobuf support)

在官方文档中,这种处理方式被称为Union Types:https://developers.google.com/protocol-buffers/docs/techniques#union

4. 例子

下面的代码包含在我的github开源项目XJFXJF-Sample中。

Communication.proto

protobuf的消息定义:

[codesyntax lang="java"]

option java_package = "com.xenojoshua.xjf.netty.protobuf.protos";

//-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-
//-* Models
//-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-
message Player {
    required int32 id = 1;
    required string name = 2;
    required string password = 3;
}

message PlayerGroup {
    required int32 id = 1;
    required string name = 2;
    repeated Player players = 3;
}

//-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-
//-* APIs
//-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-
message I1001 {
    required Player player = 1;
}

message I1002 {
    required PlayerGroup group = 1;
}

//-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-
//-* Messages Object
//-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-
enum XjfMessageType {
    T1001 = 1;
    T1002 = 2;
}

message XjfMessage {
    required XjfMessageType type = 1;

    optional I1001 i1001 = 2;
    optional I1002 i1002 = 3;
}

[/codesyntax]

XjfNettyServerImpl.java

服务器端的PiplineFactory绑定:

[codesyntax lang="java"]

private static ExecutionHandler executionHandler = new ExecutionHandler(
    new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576)
);

//...

@Override
ChannelPipelineFactory buildPiplineFactory() {
    return new ChannelPipelineFactory() {
        public ChannelPipeline getPipeline() throws Exception {
            ChannelPipeline pipeline = Channels.pipeline();

            pipeline.addLast("frameDecoder", new ProtobufVarint32FrameDecoder());
            pipeline.addLast("protobufDecoder", new ProtobufDecoder(Communication.XjfMessage.getDefaultInstance()));

            pipeline.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender());
            pipeline.addLast("protobufEncoder", new ProtobufEncoder());

            pipeline.addLast("executor", executionHandler);
            pipeline.addLast("handler", new XjfNettyServerHandler());

            return pipeline;
        }
    };
}

[/codesyntax]

XjfNettyServerHandler.java

服务器端的消息接收解析:

[codesyntax lang="java"]

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
    Communication.XjfMessage messages = (Communication.XjfMessage) e.getMessage();

    // handle message type
    switch (messages.getType()) {
        case T1001:
            Communication.Player player = messages.getI1001().getPlayer();
            XjfLogger.get().debug(String.format("Player: id: %d, name: %s, password: %s", player.getId(), player.getName(), player.getPassword()));
            break;
        case T1002:
            Communication.PlayerGroup group = messages.getI1002().getGroup();
            XjfLogger.get().debug(String.format("Group: id: %d, name: %s", group.getId(), group.getName()));

            List<Communication.Player> playerList = group.getPlayersList();
            int listSize = playerList.size();
            XjfLogger.get().debug(String.format("Group member count: %d", listSize));
            if (listSize > 0) {
                XjfLogger.get().debug("Group Players: ");
                Iterator<Communication.Player> iterator = playerList.iterator();
                while (iterator.hasNext()) {
                    Communication.Player groupPlayer = iterator.next();
                    XjfLogger.get().debug(String.format("Player: id: %d, name: %s, password: %s", groupPlayer.getId(), groupPlayer.getName(), groupPlayer.getPassword()));
                }
            }
            break;
        default:
            break;
    }

    XjfLogger.get().debug("[xjf-netty-server] Message received!");
}

[/codesyntax]

Runner.java

客户端发送消息:

[codesyntax lang="java"]

XjfNettyClient client = new XjfNettyClientImpl(host, port);

// build players
Communication.Player.Builder playerBuilder = Communication.Player.newBuilder();
Communication.Player jonathan = playerBuilder.setId(292514701).setName("jonathan").setPassword(XjfUtil.md5("mypassword")).build();

Communication.Player luke = playerBuilder.clear().setId(287138441).setName("luke").setPassword(XjfUtil.md5("123")).build();

// build group
Communication.PlayerGroup.Builder groupBuilder = Communication.PlayerGroup.newBuilder();
Communication.PlayerGroup programmers = groupBuilder.setId(100).setName("Programmer").addPlayers(jonathan).addPlayers(luke).build();

// build & send I1001
Communication.I1001.Builder I1001Builder = Communication.I1001.newBuilder();
Communication.I1001 i1001 = I1001Builder.setPlayer(jonathan).build();

Communication.XjfMessage.Builder messagesBuilder = Communication.XjfMessage.newBuilder();
Communication.XjfMessage messageI1001 = messagesBuilder.setType(Communication.XjfMessageType.T1001).setI1001(i1001).build();

client.send(messageI1001);

// build & send I1002
Communication.I1002.Builder I1002Builder = Communication.I1002.newBuilder();
Communication.I1002 i1002 = I1002Builder.setGroup(programmers).build();

Communication.XjfMessage messageI1002 = messagesBuilder.clear().setType(Communication.XjfMessageType.T1002).setI1002(i1002).build();

client.send(messageI1002);

client.run();

[/codesyntax]