Netty5 序列化方式(Jboss Marshalling)

Netty作为很多高性能的底层通讯工具,被很多开发框架应用再底层,今天来说说常用的序列化工具,用Jboss的Marshalling。

直接上代码,Marshalling的工厂类

package com.netty.serialize.marshalling;

import io.netty.handler.codec.marshalling.*;

import org.jboss.marshalling.MarshallerFactory;

import org.jboss.marshalling.Marshalling;

import org.jboss.marshalling.MarshallingConfiguration;

/**

* Created by sdc on 2017/8/28.

*/

public class MarshallingCodeCFactory {

/**

* 解码

* @return

*/

public static MarshallingDecoder buildMarshallingDecoder() {

//首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。

final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");

//创建了MarshallingConfiguration对象,配置了版本号为5

final MarshallingConfiguration configuration = new MarshallingConfiguration();

configuration.setVersion(5);

//根据marshallerFactory和configuration创建provider

UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);

//构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度

MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);

return decoder;

}

/**

* 编码

* @return

*/

public static MarshallingEncoder buildMarshallingEncoder() {

final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");

final MarshallingConfiguration configuration = new MarshallingConfiguration();

configuration.setVersion(5);

MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);

//构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组

MarshallingEncoder encoder = new MarshallingEncoder(provider);

return encoder;

}

}

这个是Marshalling的序列化方式,Marshalling自带编解码,所以不用担心中途编解码半包的问题。

服务端的Server实现:

package com.netty.serialize.server;

import com.netty.serialize.coder.MsgDecoder;

import com.netty.serialize.coder.MsgEncoder;

import com.netty.serialize.handler.ServerHandler;

import com.netty.serialize.marshalling.MarshallingCodeCFactory;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.*;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioServerSocketChannel;

/**

* Created by sdc on 2017/8/26.

*/

public class MsgServer {

public void bind(int port) throws Exception {

EventLoopGroup bossGroup = new NioEventLoopGroup();

EventLoopGroup workerGroup = new NioEventLoopGroup();

try {

ServerBootstrap sb = new ServerBootstrap();

sb.group(bossGroup, workerGroup)

.channel(NioServerSocketChannel.class)

//                    .childHandler(new ChildChannelHandler())

.option(ChannelOption.SO_BACKLOG, 1024)

.childHandler(new ChannelInitializer<SocketChannel>() {

@Override

protected void initChannel(SocketChannel channel) throws Exception {

channel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());

channel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());

channel.pipeline().addLast(new ServerHandler());

}

})

.childOption(ChannelOption.SO_KEEPALIVE, true);

ChannelFuture cf = sb.bind(port).sync();

System.out.println("服务端已启动");

cf.channel().closeFuture().sync();

}finally {

bossGroup.shutdownGracefully();

workerGroup.shutdownGracefully();

}

}

public static class ChildChannelHandler extends ChannelInitializer {

protected void initChannel(Channel channel) throws Exception {

channel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());

channel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());

channel.pipeline().addLast(new ServerHandler());

}

}

public static void main(String[] args){

try {

new MsgServer().bind(8080);

} catch (Exception e) {

e.printStackTrace();

}

}

}

package com.netty.serialize.handler;

import com.netty.serialize.message.Message;

import com.netty.serialize.message.MsgHeader;

import io.netty.channel.ChannelHandlerAdapter;

import io.netty.channel.ChannelHandlerContext;

/**

* 用于测试服务端实现的

* Created by sdc on 2017/8/29.

*/

public class ServerHandler extends ChannelHandlerAdapter{

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

super.channelActive(ctx);

//        System.out.println("active");

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

cause.printStackTrace();

//        ctx.close();

super.exceptionCaught(ctx, cause);

}

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

Message newMsg = (Message)msg;

//        String msgStrClient = (String)msg;

System.out.println("获取客户端里的内容:" + newMsg);

Message message = new Message();

String msgStr = "客户端接受到通知";

MsgHeader header = new MsgHeader();

header.setStartTag(new Byte("0"));

header.setCmdCode("1234".getBytes());

header.setLength(msgStr.length());

header.setVersion("11".getBytes());

message.setBody(msgStr);

message.setHeader(header);

ctx.writeAndFlush(message);

}

}

客户端的实现:

package com.netty.serialize.client;

import com.netty.serialize.coder.MsgDecoder;

import com.netty.serialize.coder.MsgEncoder;

import com.netty.serialize.handler.ClientHandler;

import com.netty.serialize.handler.ServerHandler;

import com.netty.serialize.marshalling.MarshallingCodeCFactory;

import com.netty.serialize.message.Message;

import com.netty.serialize.message.MsgHeader;

import io.netty.bootstrap.Bootstrap;

import io.netty.channel.*;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioSocketChannel;

/**

* Created by sdc on 2017/8/26.

*/

public class MsgClient {

public void connect(String ip, int port) throws Exception {

EventLoopGroup workerGroup = new NioEventLoopGroup();

//        Message message = new Message();

//        String msgStr = "我想发送一条消息";

//        MsgHeader header = new MsgHeader();

//        header.setStartTag(new Byte("0"));

//        header.setCmdCode("1234".getBytes());

//        header.setLength(msgStr.length());

//        header.setVersion("11".getBytes());

//

//        message.setBody(msgStr);

//        message.setHeader(header);

try {

Bootstrap bs = new Bootstrap();

bs.group(workerGroup)

.channel(NioSocketChannel.class)

.option(ChannelOption.TCP_NODELAY, true)//

.handler(new ChildChannelHandler());

ChannelFuture f = bs.connect(ip,port).sync();

//写入消息

//            f.channel().writeAndFlush(message).sync();

f.channel().closeFuture().sync();

} finally {

workerGroup.shutdownGracefully();

}

}

public static class ChildChannelHandler extends ChannelInitializer {

protected void initChannel(Channel channel) throws Exception {

channel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());

channel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());

channel.pipeline().addLast(new ClientHandler());

}

}

public static void main(String[] args){

try {

new MsgClient().connect("127.0.0.1", 8080);

} catch (Exception e) {

e.printStackTrace();

}

}

}


 

package com.netty.serialize.handler;

import com.netty.serialize.message.Message;

import com.netty.serialize.message.MsgHeader;

import io.netty.channel.*;

import io.netty.util.ReferenceCountUtil;

/**

* Created by sdc on 2017/8/29.

*/

public class ClientHandler extends ChannelHandlerAdapter {

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

Message message = new Message();

String msgStr = "我想发送一条消息";

MsgHeader header = new MsgHeader();

header.setStartTag(new Byte("0"));

header.setCmdCode("1234".getBytes());

header.setLength(msgStr.length());

header.setVersion("11".getBytes());

message.setBody(msgStr);

message.setHeader(header);

ctx.writeAndFlush(message).addListener(new ChannelFutureListener() {

public void operationComplete(ChannelFuture channelFuture) throws Exception {

if (channelFuture.isSuccess()) {

// do sth

System.out.println("成功发送到服务端消息");

} else {

// do sth

System.out.println("失败服务端消息失败");

}

}

});

//        ctx.writeAndFlush(message);

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

//        ctx.close();

super.exceptionCaught(ctx, cause);

}

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

try {

Message newMsg = (Message) msg;

System.out.println("收到服务端的内容" + newMsg);

}finally {

ReferenceCountUtil.release(msg);

}

}

}

传输的POJO的类,是自定义的封装好的信息。

package com.netty.serialize.message;

import java.io.Serializable;

/**

* Created by sdc on 2017/8/26.

*/

public class Message implements Serializable{

/**

*

*/

private static final long serialVersionUID = 4923081103118853877L;

private MsgHeader header;

private Object body;

//检验和

//    private byte crcCode;

//    public byte getCrcCode() {

//        return crcCode;

//    }

//

//    public void setCrcCode(byte crcCode) {

//        this.crcCode = crcCode;

//    }

public MsgHeader getHeader() {

return header;

}

public void setHeader(MsgHeader header) {

this.header = header;

}

public Object getBody() {

return body;

}

public void setBody(Object body) {

this.body = body;

}

@Override

public String toString() {

return "Message{" +

"header=" + header +

", body=" + body +

//                ", crcCode=" + crcCode +

'}';

}

}

package com.netty.serialize.message;

import java.io.Serializable;

import java.util.Arrays;

/**

* Created by sdc on 2017/8/26.

*/

public class MsgHeader implements Serializable{

/**

*

*/

private static final long serialVersionUID = 4923081103118853877L;

//固定头

private byte startTag;

//命令码,4位

private byte[] cmdCode;

//版本 2位

private byte[] version;

private int length;

public byte[] getVersion() {

return version;

}

public void setVersion(byte[] version) {

this.version = version;

}

public byte[] getCmdCode() {

return cmdCode;

}

public void setCmdCode(byte[] cmdCode) {

this.cmdCode = cmdCode;

}

public byte getStartTag() {

return startTag;

}

public void setStartTag(byte startTag) {

this.startTag = startTag;

}

public int getLength() {

return length;

}

public void setLength(int length) {

this.length = length;

}

@Override

public String toString() {

return "MsgHeader{" +

"startTag=" + startTag +

", cmdCode=" + Arrays.toString(cmdCode) +

", version=" + Arrays.toString(version) +

", length=" + length +

'}';

}

}

到此就完事了,netty的版本,和marshalling的版本,其他的版本我不清楚会不会有什么错误。

<dependency>

<groupId>junit</groupId>

<artifactId>junit</artifactId>

<version>4.11</version>

<scope>test</scope>

</dependency>

<!--netty -->

<dependency>

<groupId>io.netty</groupId>

<artifactId>netty-all</artifactId>

<version>5.0.0.Alpha2</version>

</dependency>

<!--jboss-marshalling -->

<dependency>

<groupId>org.jboss.marshalling</groupId>

<artifactId>jboss-marshalling-serial</artifactId>

<version>2.0.0.Beta2</version>

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/f23dab5f0e25be827aea42c9bc5c886c.html