Netty Channel 接口名词理解

Netty Channel 接口名词理解

使用非阻塞ServerSocketChannel、SocketChannel代替ServerSocket和Socket

在使用传统的ServerSocket和Socket的时候 很多时候程序是会阻塞的
比如 serversocket.accept() , socket.getInputStream().read() 的时候都会阻塞 accept()方法除非等到客户端socket的连接或者被异常中断 否则会一直等待下去
read()方法也是如此 除非在输入流中有了足够的数据 否则该方法也会一直等待下去知道数据的到来.在ServerSocket与Socket的方式中 服务器端往往要为每一个客户端(socket)分配一个线程,而每一个线程都有可能处于长时间的阻塞状态中.而过多的线程也会影响服务器的性能.在JDK1.4引入了非阻塞的通信方式,这样使得服务器端只需要一个线程就能处理所有客户端socket的请求.
下面是几个需要用到的核心类
ServerSocketChannel: ServerSocket 的替代类, 支持阻塞通信与非阻塞通信.
SocketChannel: Socket 的替代类, 支持阻塞通信与非阻塞通信.
Selector: 为ServerSocketChannel 监控接收客户端连接就绪事件, 为 SocketChannel 监控连接服务器就绪, 读就绪和写就绪事件.
SelectionKey: 代表 ServerSocketChannel 及 SocketChannel 向 Selector 注册事件的句柄. 当一个 SelectionKey 对象位于Selector 对象的 selected-keys 集合中时, 就表示与这个 SelectionKey 对象相关的事件发生了.在SelectionKey 类中有几个静态常量
SelectionKey.OP_ACCEPT       ->客户端连接就绪事件 等于监听serversocket.accept()返回一个socket
SelectionKey.OP_CONNECT   ->准备连接服务器就绪          跟上面类似,只不过是对于socket的 相当于监听了 socket.connect()
SelectionKey.OP_READ            ->读就绪事件,  表示输入流中已经有了可读数据, 可以执行读操作了
SelectionKey.OP_WRITE          ->写就绪事件

下面是服务器端:
Selector selector = Selector.open();         //静态方法 实例化selector
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);  //设置为非阻塞方式,如果为true 那么就为传统的阻塞方式
serverChannel.socket().bind(new InetSocketAddress(port));  //绑定IP 及 端口
serverChannel.register(selector, SelectionKey.OP_ACCEPT); //注册 OP_ACCEPT事件
new ServerThread().start(); //开启一个线程 处理所有请求
ServerThread中的run方法
view plainprint?
public void run()  
 {  
  while(true)  
  {  
   try  
   {  
    selector.select();  
    Set<SelectionKey> keys = selector.selectedKeys();  
    Iterator<SelectionKey> iter = keys.iterator();  
    SocketChannel sc ;  
    while(iter.hasNext())  
    {  
     SelectionKey key = iter.next();  
     if(key.isAcceptable());  // 新的连接  
      else if(key.isReadable()) ;// 可读      
      iter.remove(); //处理完事件的要从keys中删去        
    }  
   catch (Exception e)  
   {  
    e.printStackTrace();  
   }  
  }  
 }  
其中在 isAcceptable()中 通过        ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel  sc = ssc.accept(); 得到客户端的SocketChannel
在isReadable()中SocketChannel   sc = (SocketChannel) key.channel(); 得到SocketChannel .
在SocketChannel 对象中可以用write() read() 进行读写操作 只不过操作的对象不再是byte[] String之类 而是ByteBuffer

客户端基本一样
 selector = Selector.open();
 channel = SocketChannel.open(new InetSocketAddress(port));
 channel.configureBlocking(false);
 channel.register(selector,SelectionKey.OP_CONNECT);
 new ClientThread().start();
run方法
   while (true)
   {
    selector.select();
    Set<SelectionKey> keys = selector.selectedKeys();
    Iterator<SelectionKey> iter = keys.iterator();
    while(iter.hasNext())
    {
     SelectionKey key = iter.next();
     if(key.isConnectable());//连接成功&正常
     else  if(key.isReadable())//可读
     iter.remove();
   }
可以通过key.channel();方法得到当前的socketchannel对象

总结 其实这里将阻塞变为非阻塞实际是用一个while死循环来处理的
首先通过seleector.select()重新得到事件 只要有事件无论是什么 都交给循环体去处理 在循环体中分别进行不同的处理
而多个socket通过一个seleector进行同意管理
while(一直等待, 直到有接收连接就绪事件, 读就绪事件或写就绪事件发生){             //阻塞
             if(有客户连接)
                  接收客户的连接;                                                    //非阻塞
             if(某个 Socket 的输入流中有可读数据)
                  从输入流中读数据;                                                 //非阻塞
             if(某个 Socket 的输出流可以写数据)
                  向输出流写数据;                                                    //非阻塞
      }
类似这样  以上处理流程采用了轮询的工作方式, 当某一种操作就绪时, 就执行该操作, 否则就查看是否还有其他就绪的操作可以执行. 线程不会因为某一个操作还没有就绪, 就进入阻塞状态, 一直傻傻地在那里等待这个操作就绪.

-----------------------------------------------------

---1.Channel
channel 是负责数据读,写的对象,有点类似于老的io里面的stream。它和stream的区别,channel是双向的,既可以write 也可以read,而stream要分outstream和inputstream。而且在NIO中用户不应该直接从channel中读写数据,而是应该通过buffer,通过buffer再将数据读写到channel中。
一个channel 可以提供给用户下面几个信息
(1)channel的当前状态,比如open 还是closed
(2)ChannelConfig对象,表示channel的一些参数,比如bufferSize

(3)channel支持的所有i/o操作(比如read,write,connect.bind)以及ChannelPipeLine(下面解释)


2.ChannelConfig
channel的参数,以Map 数据结构来存储


3.ChannelEvent
ChannelEvent广义的认为Channel相关的事件,它是否分Upstream events和downstream events两大块,这里需要注意的,若是以server为主体的话,从client的数据到server的过程是Upstream;而server到client的数据传输过程叫downstream;而如果以client为主体的话,从server到client的过程对client来说是Upstream,而client到server的过程对client来说就是downstream。
Upstream events包括:
messageReceived:信息被接受时 ---MessageEvent
exceptionCaught:产生异常时 ---ExceptionEvent
channelOpen:channel被开启时 ---ChannelStateEvent
channelClosed:channel被关闭时 ---ChannelStateEvent
channelBound:channel被开启并准备去连接但还未连接上的时候 ---ChannelStateEvent
channelUnbound:channel被开启不准备去连接时候 ---ChannelStateEvent
channelConnected:channel被连接上的时候 ---ChannelStateEvent
channelDisconnected:channel连接断开的时候 ---ChannelStateEvent
channelInterestChanged:Channel的interestOps被改变的时候 ------ChannelStateEvent
writeComplete:写到远程端完成的时候 --WriteCompletionEvent

Downstream events包括:
write:发送信息给channel的时候 --MessageEvent
bind:绑定一个channel到指定的本地地址 --ChannelStateEvent
unbind:解除当前本地端口的绑定--ChannelStateEvent
connect:将channel连接到远程的机 --ChannelStateEvent
disconnect:将channel与远程的机连接断开 --ChannelStateEvent
close:关闭channel --ChannelStateEvent

需要注意的是,这里没有open event,这是因为当一个channel被channelFactory创建的话,channel总是已经被打开了。

此外还有两个事件类型是当父channel存在子channel的情况
childChannelOpen:子channel被打开 ---ChannelStateEvent
childChannelClosed:子channel被关闭 ---ChannelStateEvent

4.ChannelHandler
channel是负责传送数据的载体,那么数据肯定需要根据要求进行加工处理,那么这个时候就用到ChannelHandler
不同的加工可以构建不同的ChannelHandler,然后放入ChannelPipeline中
此外需要有ChannelEvent触发后才能到达ChannelHandler,因此根据event不同有下面两种的sub接口ChannelUpstreamHandler
和ChannelDownstreamHandler。
一个ChannelHandler通常需要存储一些状态信息作为判断信息,常用做法定义一个变量
比如

public class DataServerHandler extends {@link SimpleChannelHandler} {
*
*     <b>private boolean loggedIn;</b>
*
*     {@code @Override}
*     public void messageReceived({@link ChannelHandlerContext} ctx, {@link MessageEvent} e) {
*         {@link Channel} ch = e.getChannel();
*         Object o = e.getMessage();
*         if (o instanceof LoginMessage) {
*             authenticate((LoginMessage) o);
*             <b>loggedIn = true;</b>
*         } else (o instanceof GetDataMessage) {
*             if (<b>loggedIn</b>) {
*                 ch.write(fetchSecret((GetDataMessage) o));
*             } else {
*                 fail();
*             }
*         }
*     }
*     ...
* }

// Create a new handler instance per channel.
* // See {@link Bootstrap#setPipelineFactory(ChannelPipelineFactory)}.
* public class DataServerPipelineFactory implements {@link ChannelPipelineFactory} {
*     public {@link ChannelPipeline} getPipeline() {
*         return {@link Channels}.pipeline(<b>new DataServerHandler()</b>);
*     }
* }


除了这种,每个ChannelHandler都可以从ChannelHandlerContext中获取或设置数据,那么下面的做法就是利用ChannelHandlerContext
设置变量

* {@code @Sharable}
* public class DataServerHandler extends {@link SimpleChannelHandler} {
*
*     {@code @Override}
*     public void messageReceived({@link ChannelHandlerContext} ctx, {@link MessageEvent} e) {
*         {@link Channel} ch = e.getChannel();
*         Object o = e.getMessage();
*         if (o instanceof LoginMessage) {
*             authenticate((LoginMessage) o);
*             <b>ctx.setAttachment(true)</b>;
*         } else (o instanceof GetDataMessage) {
*             if (<b>Boolean.TRUE.equals(ctx.getAttachment())</b>) {
*                 ch.write(fetchSecret((GetDataMessage) o));
*             } else {
*                 fail();
*             }
*         }
*     }
*     ...
* }

* public class DataServerPipelineFactory implements {@link ChannelPipelineFactory} {
*
*     private static final DataServerHandler <b>SHARED</b> = new DataServerHandler();
*
*     public {@link ChannelPipeline} getPipeline() {
*         return {@link Channels}.pipeline(<b>SHARED</b>);
*     }
* }
这两种做法还是有区别的,上面的变量做法,每个new的handler 对象,变量是不共享的,而下面的ChannelHandlerContext是共享的

如果需要不同的handler之间共享数据,那怎么办,那就用ChannelLocal
例子:
 public final class DataServerState {
*
*     <b>public static final {@link ChannelLocal}&lt;Boolean&gt; loggedIn = new {@link ChannelLocal}&lt;Boolean&gt;() {
*         protected Boolean initialValue(Channel channel) {
*             return false;
*         }
*     }</b>
*     ...
* }
*
* {@code @Sharable}
* public class DataServerHandler extends {@link SimpleChannelHandler} {
*
*     {@code @Override}
*     public void messageReceived({@link ChannelHandlerContext} ctx, {@link MessageEvent} e) {
*         Channel ch = e.getChannel();
*         Object o = e.getMessage();
*         if (o instanceof LoginMessage) {
*             authenticate((LoginMessage) o);
*             <b>DataServerState.loggedIn.set(ch, true);</b>
*         } else (o instanceof GetDataMessage) {
*             if (<b>DataServerState.loggedIn.get(ch)</b>) {
*                 ctx.getChannel().write(fetchSecret((GetDataMessage) o));
*             } else {
*                 fail();
*             }
*         }
*     }
*     ...
* }
*
* // Print the remote addresses of the authenticated clients:
* {@link ChannelGroup} allClientChannels = ...;
* for ({@link Channel} ch: allClientChannels) {
*     if (<b>DataServerState.loggedIn.get(ch)</b>) {
*         System.out.println(ch.getRemoteAddress());
*     }
* }
* </pre>

5.ChannelPipeline
channelPipeline是一系列channelHandler的集合,他参照J2ee中的Intercepting Filter模式来实现的,让用户完全掌握如果在一个handler中处理事件,同时让pipeline里面的多个handler可以相互交互。

Intercepting Filter:http://java.sun.com/blueprints/corej2eepatterns/Patterns/InterceptingFilter.html 对于每一个channel都需要有相应的channelPipeline,当为channel设置了channelPipeline后就不能再为channel重新设置 channelPipeline。此外建议的做法的通过Channels 这个帮助类来生成ChannelPipeline 而不是自己去构建ChannelPipeline

通常pipeLine 添加多个handler,是基于业务逻辑的

比如下面
{@link ChannelPipeline} p = {@link Channels}.pipeline();
* p.addLast("1", new UpstreamHandlerA());
* p.addLast("2", new UpstreamHandlerB());
* p.addLast("3", new DownstreamHandlerA());
* p.addLast("4", new DownstreamHandlerB());
* p.addLast("5", new SimpleChannelHandler());
upstream event 执行的handler按顺序应该是 125
downstream event 执行的handler按顺序应该是 543
SimpleChannelHandler 是同时实现了 ChannelUpstreamHandler和ChannelDownstreamHandler的类
上面只是具有逻辑,如果数据需要通过格式来进行编码的话,那需要这些写
* {@link ChannelPipeline} pipeline = {@link Channels#pipeline() Channels.pipeline()};
* pipeline.addLast("decoder", new MyProtocolDecoder());
* pipeline.addLast("encoder", new MyProtocolEncoder());
* pipeline.addLast("executor", new {@link ExecutionHandler}(new {@link OrderedMemoryAwareThreadPoolExecutor}(16, 1048576, 1048576)));
* pipeline.addLast("handler", new MyBusinessLogicHandler());
其中:
Protocol Decoder - 将binary转换为java对象
Protocol Encoder - 将java对象转换为binary
ExecutionHandler - applies a thread model.
Business Logic Handler - performs the actual business logic(e.g. database access)
虽然不能为channel重新设置channelPipeline,但是channelPipeline本身是thread-safe,因此你可以在任何时候为channelPipeline添加删除channelHandler

需要注意的是,下面的代码写法不能达到预期的效果
* public class FirstHandler extends {@link SimpleChannelUpstreamHandler} {
*
*     {@code @Override}
*     public void messageReceived({@link ChannelHandlerContext} ctx, {@link MessageEvent} e) {
*         // Remove this handler from the pipeline,
*         ctx.getPipeline().remove(this);
*         // And let SecondHandler handle the current event.
*         ctx.getPipeline().addLast("2nd", new SecondHandler());
*         ctx.sendUpstream(e);
*     }
* }
前提现在Pipeline只有最后一个FirstHandler,
上面明显是想把FirstHandler从Pipeline中移除,然后添加SecondHandler。而pipeline需要有一个Handler,因此如果想到到达这个效果,那么可以
先添加SecondHandler,然后在移除FirstHandler。

6.ChannelFactory
channel的工厂类,也就是用来生成channel的类,ChannelFactory根据指定的通信和网络来生成相应的channel,比如
NioServerSocketChannelFactory生成的channel是基于NIO server socket的。
当一个channel创建后,ChannelPipeline将作为参数附属给该channel。
对于channelFactory的关闭,需要做两步操作
第一,关闭所有该factory产生的channel包括子channel。通常调用ChannelGroup#close()。
第二,释放channelFactory的资源,调用releaseExternalResources()

7.ChannelGroup
channel的组集合,他包含一个或多个open的channel,closed channel会自动从group中移除,一个channel可以在一个或者多个channelGroup
如果想将一个消息广播给多个channel,可以利用group来实现
比如:
{@link ChannelGroup} recipients = new {@link DefaultChannelGroup}()
recipients.add(channelA);
recipients.add(channelB);
recipients.write(ChannelBuffers.copiedBuffer("Service will shut down for maintenance in 5 minutes.",CharsetUtil.UTF_8));

当ServerChannel和非ServerChannel同时都在channelGroup中的时候,任何io请求的操作都是先在ServerChannel中执行再在其他Channel中执行。
这个规则对关闭一个server非常适用。

8.ChannelFuture
在netty中,所有的io传输都是异步,所有那么在传送的时候需要数据+状态来确定是否全部传送成功,而这个载体就是ChannelFuture。


9.ChannelGroupFuture
针对一次ChannelGroup异步操作的结果,他和ChannelFuture一样,包括数据和状态。不同的是他由channelGroup里面channel的所有channelFuture 组成。

10.ChannelGroupFutureListener
针对ChannelGroupFuture的监听器,同样建议使用ChannelGroupFutureListener而不是await();

11.ChannelFutureListener
ChannelFuture监听器,监听channelFuture的结果。

12.ChannelFutureProgressListener
监听ChannelFuture处理过程,比如一个大文件的传送。而ChannelFutureListener只监听ChannelFuture完成未完成

13.ChannelHandlerContext
如何让handler和他的pipeLine以及pipeLine中的其他handler交换,那么就要用到ChannelHandlerContext,ChannelHandler可以通过ChannelHandlerContext的sendXXXstream(ChannelEvent)将event传给最近的handler ,可以通过ChannelHandlerContext的getPipeline来得到Pipeline,并修改他,ChannelHandlerContext还可以存放一下状态信息attments。
一个ChannelHandler实例可以有一个或者多个ChannelHandlerContext

14.ChannelPipelineFactory
产生ChannelPipe的工厂类

15.ChannelState
记载channel状态常量

-----------

Read more

(Python实现)干货 | 滴滴 数据分析原来是这样做的!

(Python实现)干货 | 滴滴 数据分析原来是这样做的!

作者:Rilke Yang 公众号:凹凸数据 hi,我是 Rilke Yang 这是一篇我关于滴滴的数据实战,之前首发在和鲸,这次投稿到凹凸数据,希望能够帮助到大家~ 原文链接:https://www.kesci.com/home/project/5f06b0193af6a6002d0fa357 随着企业日常经营活动的进行,企业内部必然产生了各式各样的数据,如何利用这些数据得出有益的见解,并支持我们下一步的产品迭代以及领导决策就显得尤为重要。 A/B测试是互联网企业常用的一种基于数据的产品迭代方法,它的主要思想是在控制其他条件不变的前提下对不同(或同一、同质)样本设计不同实验水平(方案),并根据最终的数据变现来判断自变量对因变量的影响;A/B测试的理论基础主要源于数理统计中的假设检验部分,此部分统计学知识读者可自行探索。 长话短说,本次实战用到的数据集分为两个Excel文件,其中test.xlsx为滴滴出行某次A/B测试结果数据,city.xlsx为某城市运营数据。 数据说明 test.xlsxcity.xlsxdate:日期date:日期group:组别(

By Ne0inhk
中国程序员最容易发音错误的单词

中国程序员最容易发音错误的单词

来源:github.com/shimohq/chinese-programmer-wrong-pronunciation 一千个人眼里有一千个哈姆雷特,一千个程序员嘴里有一千种发音 你有没有遇到同一个单词,你的发音总是和同事的不一样,甚至互相听不到对方说的是哪个单词,我以前就真遇到过。 快来一起看看这些日常单词的正确发音究竟是怎么拼写的吧。 单词正确发音错误发音Linux✅ ['lɪnəks]❌ [ˈlɪnʌks; ˈlɪnjuːks]Ubuntu✅ [ʊ'bʊntʊ]❌ [juː'bʊntʊ]ASCII✅ ['æski]❌ [ɑːsk]Django✅ [ˈdʒæŋɡoʊ]❌ [diˈdʒæŋɡoʊ]Apache✅ [ə'pætʃɪ]❌ [ʌpʌtʃ]AJAX✅ ['eidʒæks]❌ [ə'dʒʌks]app✅ [æp]❌ [eipi'pi]SQL✅ [ˈsiːkwəl]/[ˈesˈkjuːˈel]

By Ne0inhk
入门级数据分析师,该掌握哪些技能

入门级数据分析师,该掌握哪些技能

接地气的陈老师 |  作者 接地气学堂 |  来源 很多同学很困惑:想做数据分析师,结果学了一大堆ESP软件操作,看了一堆统计学、机器学习书、跑了很多数据集,结果入职以后每天都在取数——而且还是很基础的数据。那到底自己算不算入门?啥水平才算是真正的数据分析师?今天系统讲解一下。 本质上看,问题来自于:网上对数据分析的描写太过理想化,把原本需要综合技能的工作,抽象成一些列简单操作,从而造成了一种错觉:只要我对着案例抄一遍代码,会做几道sql题,会把模型代码输入sklearn跑一遍就算是数据分析了。可实际上,作为一个工作,数据分析需要在具体企业上班,面对具体业务问题,应付具体的系统状况,和各色同事打交道。需要的远不止基础操作(如下图)。 况且,刚入门新兵,干的最多的就是跑数,就是脏活累活。招你进来不干脏活累活,难道让老鸟们干吗。原本抱着“数据驱动业务”“成为数据科学家”的理想,一下办成扫地抹桌倒尿罐,巨大的心理落差肯定让新人接受不了。唯一的问题是:如何在枯燥烦闷的基础工作中积累四大技能,尽快让自己脱颖而出。 1 第一,业务理解

By Ne0inhk
Python 实现Excel自动化办公《下》

Python 实现Excel自动化办公《下》

作者|无量测试之道 来源|无量测试之道 上一讲我们讲到了Python 针对Excel 里面的特殊数据处理以及各种数据统计,本讲我们将引入Pandas 这个第三方库来实现数据的统计,只要一个方法就可以统计到上一讲的数据统计内容,本讲也会扩展讲讲Pandas所涉及到的相关使用方法。 统计输出 import pandas as pd pd1=pd.read_excel("test1.xls") pd2=pd.read_excel("test2.xls",skiprows=2) #skiprows=2表示忽略前几行,skip_footer用来省略尾部的行数 #统计输出 print(pd1.describe()) #数字类型的统计输出,它是DateFrame类型 print(pd1.min()) #输出每一列里面最小值 print(pd1.

By Ne0inhk