江民钦的博客

show me code

Netty快速入门

Netty是由JBOSS提供的一个java开源的、基于NIO的、异步的、事件驱动的网络应用程序框架和工具,用来快速开发高性能、高并发、高可靠的网络服务器和客户端程序,大多数用于服务端开发,为Java游戏服务器开发必学框架之一。
简单来说,Netty是一个基于NIO(No-block IO)的提供了对TCP、UDP、HTTP以及文件传输的支持,通过Reactor反应器模式,可以快速开发高性能的应用程序。

上一个简单的dome:

Server端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class NettyServer {
public void bind(int port){
EventLoopGroup bossGroup=new NioEventLoopGroup();
EventLoopGroup workerGroup=new NioEventLoopGroup();
ServerBootstrap b=new ServerBootstrap();
b.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,1024)
.childHandler(new ChildChannelHandler());
try {
ChannelFuture f=b.bind(port).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally{
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
int port=8989;
new NettyServer().bind(port);
}
}
class ChildChannelHandler extends ChannelInitializer<SocketChannel>{
public static int count=1;
@Override
protected void initChannel(SocketChannel arg0) throws Exception {
// TODO Auto-generated method stub
arg0.pipeline().addLast(new NettyServerHandler());
System.out.println("connect server number: "+ChildChannelHandler.count++);
}
}
class NettyServerHandler extends ChannelHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// TODO Auto-generated method stub
ByteBuf buf=(ByteBuf)msg;
byte[] req=new byte[buf.readableBytes()];
buf.readBytes(req);
String body=new String(req,"utf-8");
System.out.println("receive: "+body);
String toClient="hello client";
ByteBuf resp=Unpooled.buffer(toClient.getBytes().length);
resp.writeBytes(toClient.getBytes());
ctx.write(resp);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// TODO Auto-generated method stub
ctx.close();
}
}

Client端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import java.util.logging.Logger;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyClient {
public void connect(String host,int port){
EventLoopGroup group=new NioEventLoopGroup();
Bootstrap b=new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel arg0) throws Exception {
// TODO Auto-generated method stub
arg0.pipeline().addLast(new NettyClientHandler());
}
});
try {
ChannelFuture f=b.connect(host,port).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally{
group.shutdownGracefully();
}
}
public static void main(String[] args) {
// TODO Auto-generated method stub
String host="127.0.0.1";
int port=8989;
new NettyClient().connect(host, port);
}
}
class NettyClientHandler extends ChannelHandlerAdapter{
private final static Logger logger=Logger.getLogger(NettyClientHandler.class.getName());
private ByteBuf firstMessage=null;
public NettyClientHandler(){
byte[] req="hello server".getBytes();
firstMessage=Unpooled.buffer(req.length);
firstMessage.writeBytes(req);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
ctx.writeAndFlush(firstMessage);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// TODO Auto-generated method stub
ByteBuf buf=(ByteBuf)msg;
byte[] req=new byte[buf.readableBytes()];
buf.readBytes(req);
String body=new String(req,"utf-8");
System.out.println("Now is: "+body);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// TODO Auto-generated method stub
logger.warning("Logger Warning: "+cause.getMessage()); //打印异常信息
ctx.close();
}
}

Server端和Client端的代码都极其相似,主要是Server用了两个NioEventLoopGroup,一个用来接受client连接,一个用来进行对SocketChannel读写。ServerBootstrap用来启动NIO服务端的辅助启动类,目的是降低服务端的开发程度。简单说一下Channel和SocketChannel,常规的TCP通信是基于Socket的,然后同过字节流的方式进行传输,效率很低,在NIO中,通过Channel这种以块的模式进行传输数据,SocketChannel是Channel中的一种用以Socket传输的方式。

在netty中,不管是客户端还是服务端,一旦建立连接,首先初始化channel,通过new一个ChannelInitializer这个类或者自己写一个外部类然后继承于这个类,并重写其中的initChannel方法,获得SocketChannel后可以通过管道的方式添加一个handler(可以是一个外部类通过继承ChannelHandlerAdapter,来实现具体的业务逻辑处理),这里还可以添加其它中间件,比如new StringDecoder(),用来将传输的字节流数据自动转换换成字符串;以及new LineBasedFrameDecoder()或者new DeLimitFrameDecoder()等进行TCP粘包/拆包。

在上边自定义ChannelHandlerAdapter的子类中,Server端中channelRead()方法是在接收到数据时运行,channelReadComplete()方法是在数据读取完成是运行,exceptionCaught是在发生异常是运行。Client不同的是
channelActive()方法是在连接成功后运行。其中发送数据都是ctx.writeAndFlush(ByteBuf),注意是发送字节,不能直接发送字符串,不然会发送不出去。

关于TCP粘包/拆包

简而言之,在网络传输中,数据是基于字节流的,一次最大能发送多少个字节的数据,跟操作系统有关以及硬件配置有关,发送的数据与数据之间是没有间隔的,是连接在一起的,会出现一次发送一条数据外加另一条数据的一部分等情况,那么怎么让客户端知道读取到某的地方时该是下一条数据,通常用\n或者\t\n来分辨,LineBasedFrameDecoder这个中间件就是通过\n和\t\n来区分。 DeLimitFrameDecoder则是以一个自定义的字符进行区分。

使用LineBasedFrameDecoder以及StringDecoder

Server:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public class NettyServer {
public void bind(int port){
EventLoopGroup bossGroup=new NioEventLoopGroup();
EventLoopGroup workerGroup=new NioEventLoopGroup();
ServerBootstrap b=new ServerBootstrap();
b.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,1024)
.childHandler(new ChildChannelHandler());
try {
ChannelFuture f=b.bind(port).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally{
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
int port=8989;
new NettyServer().bind(port);
}
}
class ChildChannelHandler extends ChannelInitializer<SocketChannel>{
public static int count=1;
@Override
protected void initChannel(SocketChannel arg0) throws Exception {
// TODO Auto-generated method stub
arg0.pipeline().addLast(new StringDecoder());
arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
arg0.pipeline().addLast(new NettyServerHandler());
System.out.println("connect server number: "+ChildChannelHandler.count++);
}
}
class NettyServerHandler extends ChannelHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// TODO Auto-generated method stub
String buf=(String)msg;
System.out.println("receive: "+buf);
String toClient="hello client\n";
ByteBuf resp=Unpooled.buffer(toClient.getBytes().length);
resp.writeBytes(toClient.getBytes());
ctx.write(resp);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// TODO Auto-generated method stub
ctx.close();
}
}

Client:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public class NettyClient {
public void connect(String host,int port){
EventLoopGroup group=new NioEventLoopGroup();
Bootstrap b=new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel arg0) throws Exception {
// TODO Auto-generated method stub
arg0.pipeline().addLast(new StringDecoder());
arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
arg0.pipeline().addLast(new NettyClientHandler());
}
});
try {
ChannelFuture f=b.connect(host,port).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally{
group.shutdownGracefully();
}
}
public static void main(String[] args) {
// TODO Auto-generated method stub
String host="127.0.0.1";
int port=8989;
new NettyClient().connect(host, port);
}
}
class NettyClientHandler extends ChannelHandlerAdapter{
private final static Logger logger=Logger.getLogger(NettyClientHandler.class.getName());
private ByteBuf firstMessage=null;
public NettyClientHandler(){
byte[] req="hello server\n".getBytes();
firstMessage=Unpooled.buffer(req.length);
firstMessage.writeBytes(req);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
ctx.writeAndFlush(firstMessage);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// TODO Auto-generated method stub
String buf=(String)msg;
System.out.println("Now is: "+buf);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// TODO Auto-generated method stub
logger.warning("Logger Warning: "+cause.getMessage());
ctx.close();
}
}

注意使用LineBasedFrameDecoder后记得在发送的数据后边加\n,使用StringDecoder后,在channelRead方法里边的msg自动被转换成了字符串,但是发送数据时,仍然要发送字节流数据。