不识Netty真面目,只缘未读此真经

Netty官网:https://netty.io/

Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients.

Java技术栈方向的朋友应该或多或少都听说过Netty是对Java中nio ( Non Blocking IO )的封装,让我们能快速开发出性能更高、扩展性更好的网络应用程序。那么Netty究竟对nio做了怎样的封装呢?本文主要从源码角度揭开这层面纱。

不识Netty真面目,只缘未读此真经

源码追踪中,我使用阿里的语雀产品的思维图记录主要方法调用,上面的图片是部分截图,完整原貌见:

预备知识

在初探Netty源码之前,至少需要理解Reactor Pattern、java.nio基本使用、Netty基本使用,这样后面才能把Netty的源码与java.nio对比着来看。

Reactor Pattern

不识Netty真面目,只缘未读此真经。Doug Lea (java.util.concurrent包的作者) 在《Scalable IO in Java》中循序渐进地分析了如何构建可伸缩的高性能IO服务以及服务模型的演变与进化。文中描述的Reactor Pattern,也被Netty等大多数高性能IO服务框架所借鉴。因此仔细阅读《Scalable IO in Java》有助于更好地理解Netty框架的架构与设计。详情见:

传统的服务模式

Server端为每一个Client端的连接请求都开启一个独立线程,也就是所谓的BIO (Blocking IO),即java.net.ServerSocket包下api的使用。

不识Netty真面目,只缘未读此真经

基于事件驱动模式

不识Netty真面目,只缘未读此真经


不识Netty真面目,只缘未读此真经

Reactor模式

Reactor responds to IO events by dispatching the appropriate handler (Similar to AWT thread)

Handlers perform non-blocking actions (Similar to AWT ActionListeners)

Manage by binding handlers to events (Similar to AWT addActionListener)

(1) 单线程版本

不识Netty真面目,只缘未读此真经

(2) 多线程版本

不识Netty真面目,只缘未读此真经

(3) 多Reactor版本 (一主多从、多主多从)

不识Netty真面目,只缘未读此真经


Netty正是借鉴了这种多Reactor版本的设计。

快速上手java.nio

Channels:Connections to files, sockets etc that support non-blocking reads

Buffers:Array-like objects that can be directly read or written by Channels

Selectors:Tell which of a set of Channels have IO events

SelectionKeys:Maintain IO event status and bindings

注意:以下Demo仅专注于主逻辑,没有处理异常,也没有关闭资源。

Server端 import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.channels.spi.SelectorProvider; import java.util.Iterator; public class NIOServer { private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); public static void main(String[] args) throws IOException { // ServerSocketChannel.open() ServerSocketChannel serverSocketChannel = DEFAULT_SELECTOR_PROVIDER.openServerSocketChannel(); serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(8080)); // Selector.open() Selector selector = DEFAULT_SELECTOR_PROVIDER.openSelector(); // register this serverSocketChannel with the selector serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // selector.select() while (!Thread.interrupted()) { selector.select(); Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey key = it.next(); it.remove(); // handle IO events handle(key); } } } private static void handle(SelectionKey key) throws IOException { if (key.isAcceptable()) { ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); socketChannel.register(key.selector(), SelectionKey.OP_READ); } else if (key.isReadable()) { SocketChannel socketChannel = (SocketChannel) key.channel(); // read client data ByteBuffer buffer = ByteBuffer.allocate(1024); int len = socketChannel.read(buffer); if (len != -1) { String msg = String.format("recv client[%s] data:%s", socketChannel.getRemoteAddress(), new String(buffer.array(), 0, len)); System.out.println(msg); } // response client ByteBuffer data = ByteBuffer.wrap("Hello, NIOClient!".getBytes()); socketChannel.write(data); key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); } else if (key.isWritable()) { // ... } } } Client端 import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.channels.spi.SelectorProvider; import java.util.Iterator; public class NIOClient { private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); public static void main(String[] args) throws IOException { // SocketChannel.open() SocketChannel socketChannel = DEFAULT_SELECTOR_PROVIDER.openSocketChannel(); socketChannel.configureBlocking(false); socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080)); // Selector.open() Selector selector = DEFAULT_SELECTOR_PROVIDER.openSelector(); // register this socketChannel with the selector socketChannel.register(selector, SelectionKey.OP_CONNECT); // selector.select() while (!Thread.interrupted()) { selector.select(); Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey key = it.next(); it.remove(); // handle IO events if (key.isConnectable()) { SocketChannel channel = (SocketChannel) key.channel(); if (channel.isConnectionPending()) { channel.finishConnect(); } channel.configureBlocking(false); // request server ByteBuffer buffer = ByteBuffer.wrap("Hello, NIOServer!".getBytes()); channel.write(buffer); channel.register(selector, SelectionKey.OP_READ); } else if (key.isReadable()) { SocketChannel channel = (SocketChannel) key.channel(); // read server data ByteBuffer buffer = ByteBuffer.allocate(1024); int len = channel.read(buffer); if (len != -1) { String msg = String.format("recv server[%s] data:%s", channel.getRemoteAddress(), new String(buffer.array(), 0, len)); System.out.println(msg); } } } } } } 快速上手Netty

更多官方example,请参考:

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wsfjxs.html