永发信息网

netty3.6.2中写数据的过程,以及写数据写不出去后怎么处理

答案:1  悬赏:70  手机版
解决时间 2021-01-21 15:17
netty3.6.2中写数据的过程,以及写数据写不出去后怎么处理
最佳答案
netty写数据的时候,会先放到一个缓存队列AbstractNioChannel.writeBufferQueue中,这个队列是WriteRequestQueue
  Java代码
  public void eventSunk(
  ChannelPipeline pipeline, ChannelEvent e) throws Exception {
  if (e instanceof ChannelStateEvent) {
  ……
  } else if (e instanceof MessageEvent) {
  MessageEvent event = (MessageEvent) e;
  NioSocketChannel channel = (NioSocketChannel) event.getChannel();
  boolean offered = channel.writeBufferQueue.offer(event);//写到channel的writeBufferQueue
  assert offered;
  channel.worker.writeFromUserCode(channel);
  }
  }
  WriteRequestQueue的offer方法中会根据缓存消息的总大小(字节数)判断是否超过了高水位线highWaterMark,如果第一次超过了超过高水位线,就会fireChannelInterestChanged;后边如果仍然一直往队列放数据,缓存的消息的大小持续超过高水位线的时候,不会再fireChannelInterestChanged。
  Java代码
  public boolean offer(MessageEvent e) {
  boolean success = queue.offer(e);
  assert success;
  
  int messageSize = getMessageSize(e);
  int newWriteBufferSize = writeBufferSize.addAndGet(messageSize);
  int highWaterMark = getConfig().getWriteBufferHighWaterMark();
  
  if (newWriteBufferSize >= highWaterMark) {
  if (newWriteBufferSize - messageSize < highWaterMark) {
  highWaterMarkCounter.incrementAndGet();
  if (!notifying.get()) {
  notifying.set(Boolean.TRUE);
  fireChannelInterestChanged(AbstractNioChannel.this);
  notifying.set(Boolean.FALSE);
  }
  }
  }
  return true;
  }
  fireChannelInterestChanged这个会调到SimpleChannelUpstreamHandler.handleUpstream,触发SimpleChannelUpstreamHandler.channelInterestChanged,可以通过继承这个方法来自定义做些事情。高水位的值可以通过Bootstrap设置,最终会调到DefaultNioSocketChannelConfig.setOption。writeBufferHighWaterMark默认值为64K
  Java代码
  public boolean setOption(String key, Object value) {
  if (super.setOption(key, value)) {
  return true;
  }
  if ("writeBufferHighWaterMark".equals(key)) {
  setWriteBufferHighWaterMark0(ConversionUtil.toInt(value));
  } else if ("writeBufferLowWaterMark".equals(key)) {
  setWriteBufferLowWaterMark0(ConversionUtil.toInt(value));
  } else if ("writeSpinCount".equals(key)) {
  setWriteSpinCount(ConversionUtil.toInt(value));
  } else if ("receiveBufferSizePredictorFactory".equals(key)) {
  setReceiveBufferSizePredictorFactory((ReceiveBufferSizePredictorFactory) value);
  } else if ("receiveBufferSizePredictor".equals(key)) {
  setReceiveBufferSizePredictor((ReceiveBufferSizePredictor) value);
  } else {
  return false;
  }
  return true;
  }
  然后在write0的时候会从队列拉数据,拉数据的时候,如果发现本次拉的数据会导致缓存的数据大小(字节)从低水位writeBufferLowWaterMark之上,掉到了低水位之下,即跨过了低水位,会再次触发fireChannelInterestChanged事件。writeBufferLowWaterMark默认值为32K
  Java代码
  public MessageEvent poll() {
  MessageEvent e = queue.poll();
  if (e != null) {
  int messageSize = getMessageSize(e);
  int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize);
  int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
  
  
  if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) {
  if (newWriteBufferSize + messageSize >= lowWaterMark) {//本次拉取,是的缓存数据大小掉到了低水位之下
  highWaterMarkCounter.decrementAndGet();
  if (isConnected() && !notifying.get()) {
  notifying.set(Boolean.TRUE);
  fireChannelInterestChanged(AbstractNioChannel.this);
  notifying.set(Boolean.FALSE);
  }
  }
  }
  }
  return e;
  }
  超过高水位和低于低水位都会触发fireChannelInterestChanged,怎么区分呢?通过AbstractChannel. isWritable(),如果channel的interestOps里边有注册过OP_WRITE,则是不可写的,否则是可写的
  Java代码
  public boolean isWritable() {
  return (getInterestOps() & OP_WRITE) == 0;
  }
  public int getInterestOps() {
  if (!isOpen()) {
  return Channel.OP_WRITE;
  }
  
  int interestOps = getRawInterestOps();
  int writeBufferSize = this.writeBufferSize.get();
  if (writeBufferSize != 0) {
  if (highWaterMarkCounter.get() > 0) {//还记得这个值,放数据到发送队列的时候值+=1,从队列拉数据出来的时候值-=1
  int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
  if (writeBufferSize >= lowWaterMark) {//缓存队列数据量,超过高水位,也超过了低水位,意味着高水位>低水位,此时等于注册写操作
  interestOps |= Channel.OP_WRITE;
  } else {
  interestOps &= ~Channel.OP_WRITE;//缓存队列数据量,超过高水位但是低于低水位,意味着低水位>高水位,此时等于没有注册写操作
  }
  } else {//超过高水位counter<=0,意味着当前数据量小于高水位
  int highWaterMark = getConfig().getWriteBufferHighWaterMark();
  if (writeBufferSize >= highWaterMark) {//这里,缓存数据量仍然高于高水位.....并发?按道理说channel的处理是单线程处理的,此时等于注册写操作
  interestOps |= Channel.OP_WRITE;
  } else {
  interestOps &= ~Channel.OP_WRITE;
  }
  }
  } else {
  interestOps &= ~Channel.OP_WRITE;//写队列没数据,没有注册写操作
  }
  
  return interestOps;
  }
  即,如果超过高水位isWritable()==false,低于低水位isWritable()==true,低水位优先级高于高水位,即如果 当前水位>低水位 则不可写,否则可写

  如果在通过netty向某机器写数据,但是写很缓慢,则会导致数据都缓存到netty的发送队列中,如果不做控制,可能会导致full gc/cms gc频繁,甚至最终OOM。所以可以考虑用高水位和低水位的值来控制netty的缓存队列,即用AbstractChannel.isWritable来控制是否继续写,如果AbstractChannel.isWritable==false,则丢弃数据,或者记录发送数据的状态,待后续缓存数据队列水位下降到安全水位后再发送。
我要举报
如以上问答信息为低俗、色情、不良、暴力、侵权、涉及违法等信息,可以点下面链接进行举报!
大家都在看
kt knight空调显示E6
我跟他处对象,微信聊天,他总说那件事,我说
为什么开发商需要拥有一支足球队
胳膊上的东西都是什么
开个老棉被加工店生意好吗,有经验的网友可以
消防电气里的这根线算一根还是两根?DC24V联
王者荣耀2017年那些坦克英雄最厉害
嗯我是个外地媳妇,老公对我不好一生气就撵我
我减去160得540,这个数是多少
曹操最宠爱的儿子排名
豆腐放在冰箱里可以放几天?放在上面的冰冻
亲们,GTX750TI和GTX660选哪一个呢?哪一个性
m(m-2)²=8,求m的值
如图,在一棵树的10米高B处有两只猴子,其中
从来没有去省外办过什么账户,为什么会有记录
推荐资讯
腰椎盘求解
描写心理恐的词语
英语高手翻译
征途游戏攻击出个韧字是什么意思
如何欣赏美女
别人从关注的店铺里为什么点不开我的店铺
酸碱中和问题当碱液中滴加酸中和时,溶液太酸
请问这部漫画叫什么名字呀
我是一个准备去西班牙的劳工,厨师面试已一个
16年高二辍学,现在还能高考吗?
电脑怎么装dfs工具
我为债务方,对方已起诉,法院判决我败诉,可
正方形一边上任一点到这个正方形两条对角线的
阴历怎么看 ?