管道流:
Java NIO 管道是2个线程之间的单向数据连接。Pipe有一个source通道和一个sink通道。数据会被写到sink通道,从source通道读取。
1 package base.nio.threaddemo; 2 3 import java.io.IOException; 4 import java.nio.ByteBuffer; 5 import java.nio.channels.Pipe; 6 7 /** 8 * @program: Lear-Java 9 * @description: 10 * @author: Mr.Dai11 * @create: 2018-10-05 20:4312 **/13 public class ThreadSend {14 15 private Pipe pipe;16 17 18 private void init() throws Exception {19 this.pipe = Pipe.open();20 }21 22 23 class SendInner1 extends Thread {24 25 @Override26 public void run() {27 // 单向流 发送数据28 try {29 Pipe.SinkChannel sink = pipe.sink();30 sink.configureBlocking(false);31 32 while (true) {33 if (sink.isOpen()) {34 sink.write(ByteBuffer.wrap("abcd".getBytes()));35 }36 Thread.sleep(1000);37 }38 } catch (InterruptedException | IOException e) {39 e.printStackTrace();40 }41 }42 }43 44 class ReverInner extends Thread {45 @Override46 public void run() {47 try {48 // 单向流 拿到数据49 Pipe.SourceChannel source = pipe.source();50 51 source.configureBlocking(false);52 53 while (true) {54 if (source.isOpen()) {55 ByteBuffer buffer = ByteBuffer.allocate(10);56 buffer.clear();57 source.read(buffer);58 // 这里必须去掉 trim59 if(new String(buffer.array()).trim().equals("")){60 continue;61 }62 System.out.println(new String(buffer.array()).trim());63 }64 Thread.sleep(1000);65 }66 } catch (InterruptedException | IOException e) {67 e.printStackTrace();68 }69 }70 }71 72 public static void main(String[] args) throws Exception {73 ThreadSend send = new ThreadSend();74 75 send.init();76 77 SendInner1 sendI = send.new SendInner1();78 79 ReverInner revI = send.new ReverInner();80 81 sendI.start();82 revI.start();83 }84 85 86 }
套接字通道流
非阻塞模式
ServerSocketChannel可以设置成非阻塞模式。在非阻塞模式下,accept() 方法会立刻返回,如果还没有新进来的连接,返回的将是null。 因此,需要检查返回的SocketChannel是否是null。如:
1 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); 2 3 serverSocketChannel.socket().bind(new InetSocketAddress(9999)); 4 serverSocketChannel.configureBlocking(false); 5 6 while(true){ 7 SocketChannel socketChannel = 8 serverSocketChannel.accept(); 9 10 if(socketChannel != null){ 11 //do something with socketChannel... 12 } 13 }
server:
1 package base.nio.chatdemo; 2 3 4 import java.net.InetSocketAddress; 5 import java.nio.ByteBuffer; 6 import java.nio.channels.SelectionKey; 7 import java.nio.channels.Selector; 8 import java.nio.channels.ServerSocketChannel; 9 import java.nio.channels.SocketChannel;10 import java.util.Iterator;11 import java.util.Set;12 13 /**14 * @program: Lear-Java15 * @description: Nio 聊天服务端16 * @author: Mr.Dai17 * @create: 2018-10-05 16:3118 **/19 public class ChatServer {20 21 /**22 * 通道管理器23 */24 private Selector selector;25 26 private void initServer(int port) throws Exception{27 28 ServerSocketChannel serverChannel = ServerSocketChannel.open();29 30 serverChannel .socket().bind(new InetSocketAddress(port));31 // 配置非阻塞32 serverChannel .configureBlocking(false);33 34 35 this.selector=Selector.open();36 37 /**38 * 将通道管理器和该通道绑定,并为该通道注册selectionKey.OP_ACCEPT事件39 * 注册该事件后,当事件到达的时候,selector.select()会返回,40 * 如果事件没有到达selector.select()会一直阻塞41 * selector.selectNow() 立即返回 无论是否准备好 可能返回042 */43 serverChannel .register(this.selector, SelectionKey.OP_ACCEPT);44 45 }46 47 /**48 * 采用轮训的方式监听selector上是否有需要处理的事件,如果有,进行处理49 */50 public void listen() throws Exception {51 System.out.println("start------------------->");52 while (true){53 // 在没有注册事件来到时 将会一直阻塞54 selector.select();55 Setset = selector.selectedKeys();56 Iterator iterator = set.iterator();57 58 while (iterator.hasNext()){59 SelectionKey key = iterator.next();60 // 移除当前阻塞队列61 iterator.remove();62 if(key.isAcceptable()){63 ServerSocketChannel server = (ServerSocketChannel) key.channel();64 65 SocketChannel channel = server.accept();66 channel.configureBlocking(false);67 // 服务端发送数据68 channel.write(ByteBuffer.wrap(new String("hello client").getBytes()));69 // 在客户端连接成功之后,为了可以接收到客户端的信息,需要给通道设置读的权限70 channel.register(this.selector,SelectionKey.OP_READ);71 72 }else if(key.isReadable()){73 SocketChannel channel = (SocketChannel) key.channel();74 75 ByteBuffer buffer = ByteBuffer.allocate(10);76 channel.read(buffer);77 78 String msg = new String(buffer.array()).trim();79 80 System.out.println("客户端发送过来的讯息:"+msg);81 // 在读取后 将柱塞队列数据 改变监听为Accept82 ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes());83 channel.write(outBuffer);84 }85 }86 }87 88 }89 90 public static void main(String[] args) throws Exception{91 ChatServer server = new ChatServer();92 server.initServer(8989);93 server.listen();94 }95 96 }
clien:
1 package base.nio.chatdemo; 2 3 import java.io.IOException; 4 import java.net.InetSocketAddress; 5 import java.nio.ByteBuffer; 6 import java.nio.channels.SelectionKey; 7 import java.nio.channels.Selector; 8 import java.nio.channels.SocketChannel; 9 import java.util.Iterator;10 11 /**12 * @program: Lear-Java13 * @description: nio 聊天客户端14 * @author: Mr.Dai15 * @create: 2018-10-05 16:3116 **/17 public class ChatClient {18 19 20 /**21 * 提供柱阻塞队列 管理器22 */23 private Selector selector;24 25 26 private void ininCliect(String ip,int port) throws Exception{27 28 SocketChannel channel = SocketChannel.open();29 30 channel .connect(new InetSocketAddress(ip,port));31 32 this.selector=Selector.open();33 34 channel .configureBlocking(false);35 36 37 channel .register(this.selector, SelectionKey.OP_CONNECT);38 39 }40 41 public void listen() throws Exception {42 43 while (true){44 45 selector.select();46 47 Iteratorite = selector.selectedKeys().iterator();48 49 while (ite.hasNext()){50 SelectionKey key = ite .next();51 ite .remove();52 if(key.isConnectable()){53 SocketChannel channel = (SocketChannel) key.channel();54 // 是否准备好连接55 if(channel.isConnectionPending()){56 channel.finishConnect();57 }58 channel.configureBlocking(false);59 // 向server 发送数据60 channel.write(ByteBuffer.wrap("向server 发送数据".getBytes()));61 62 channel.register(selector,SelectionKey.OP_READ);63 64 }else if(key.isReadable()){65 m1(key);66 }67 }68 }69 }70 71 private void m1(SelectionKey key) throws IOException {72 SocketChannel channel = (SocketChannel) key.channel();73 74 ByteBuffer buffer = ByteBuffer.allocate(10);75 channel.read(buffer);76 System.out.println("服务端的消息为:"+new String(buffer.array()));77 78 ByteBuffer outBuffer = ByteBuffer.wrap(new String("aaa").getBytes());79 channel.write(outBuffer);80 }81 82 public static void main(String[] args) throws Exception {83 ChatClient client = new ChatClient();84 85 client.ininCliect("127.0.0.1",8989);86 client.listen();87 }88 89 }