netty3.6.2中写数据的过程,以及写数据写不出去后怎么处理
答案:1 悬赏:70 手机版
解决时间 2021-01-21 15:17
- 提问者网友:嘚啵嘚啵
- 2021-01-20 20:51
netty3.6.2中写数据的过程,以及写数据写不出去后怎么处理
最佳答案
- 五星知识达人网友:荒野風
- 2021-01-20 21:49
netty写数据的时候,会先放到一个缓存队列AbstractNioChannel.writeBufferQueue中,这个队列是WriteRequestQueue
Java代码
public void eventSunk(
ChannelPipeline pipeline, ChannelEvent e) throws Exception {
if (e instanceof ChannelStateEvent) {
……
} else if (e instanceof MessageEvent) {
MessageEvent event = (MessageEvent) e;
NioSocketChannel channel = (NioSocketChannel) event.getChannel();
boolean offered = channel.writeBufferQueue.offer(event);//写到channel的writeBufferQueue
assert offered;
channel.worker.writeFromUserCode(channel);
}
}
WriteRequestQueue的offer方法中会根据缓存消息的总大小(字节数)判断是否超过了高水位线highWaterMark,如果第一次超过了超过高水位线,就会fireChannelInterestChanged;后边如果仍然一直往队列放数据,缓存的消息的大小持续超过高水位线的时候,不会再fireChannelInterestChanged。
Java代码
public boolean offer(MessageEvent e) {
boolean success = queue.offer(e);
assert success;
int messageSize = getMessageSize(e);
int newWriteBufferSize = writeBufferSize.addAndGet(messageSize);
int highWaterMark = getConfig().getWriteBufferHighWaterMark();
if (newWriteBufferSize >= highWaterMark) {
if (newWriteBufferSize - messageSize < highWaterMark) {
highWaterMarkCounter.incrementAndGet();
if (!notifying.get()) {
notifying.set(Boolean.TRUE);
fireChannelInterestChanged(AbstractNioChannel.this);
notifying.set(Boolean.FALSE);
}
}
}
return true;
}
fireChannelInterestChanged这个会调到SimpleChannelUpstreamHandler.handleUpstream,触发SimpleChannelUpstreamHandler.channelInterestChanged,可以通过继承这个方法来自定义做些事情。高水位的值可以通过Bootstrap设置,最终会调到DefaultNioSocketChannelConfig.setOption。writeBufferHighWaterMark默认值为64K
Java代码
public boolean setOption(String key, Object value) {
if (super.setOption(key, value)) {
return true;
}
if ("writeBufferHighWaterMark".equals(key)) {
setWriteBufferHighWaterMark0(ConversionUtil.toInt(value));
} else if ("writeBufferLowWaterMark".equals(key)) {
setWriteBufferLowWaterMark0(ConversionUtil.toInt(value));
} else if ("writeSpinCount".equals(key)) {
setWriteSpinCount(ConversionUtil.toInt(value));
} else if ("receiveBufferSizePredictorFactory".equals(key)) {
setReceiveBufferSizePredictorFactory((ReceiveBufferSizePredictorFactory) value);
} else if ("receiveBufferSizePredictor".equals(key)) {
setReceiveBufferSizePredictor((ReceiveBufferSizePredictor) value);
} else {
return false;
}
return true;
}
然后在write0的时候会从队列拉数据,拉数据的时候,如果发现本次拉的数据会导致缓存的数据大小(字节)从低水位writeBufferLowWaterMark之上,掉到了低水位之下,即跨过了低水位,会再次触发fireChannelInterestChanged事件。writeBufferLowWaterMark默认值为32K
Java代码
public MessageEvent poll() {
MessageEvent e = queue.poll();
if (e != null) {
int messageSize = getMessageSize(e);
int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize);
int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) {
if (newWriteBufferSize + messageSize >= lowWaterMark) {//本次拉取,是的缓存数据大小掉到了低水位之下
highWaterMarkCounter.decrementAndGet();
if (isConnected() && !notifying.get()) {
notifying.set(Boolean.TRUE);
fireChannelInterestChanged(AbstractNioChannel.this);
notifying.set(Boolean.FALSE);
}
}
}
}
return e;
}
超过高水位和低于低水位都会触发fireChannelInterestChanged,怎么区分呢?通过AbstractChannel. isWritable(),如果channel的interestOps里边有注册过OP_WRITE,则是不可写的,否则是可写的
Java代码
public boolean isWritable() {
return (getInterestOps() & OP_WRITE) == 0;
}
public int getInterestOps() {
if (!isOpen()) {
return Channel.OP_WRITE;
}
int interestOps = getRawInterestOps();
int writeBufferSize = this.writeBufferSize.get();
if (writeBufferSize != 0) {
if (highWaterMarkCounter.get() > 0) {//还记得这个值,放数据到发送队列的时候值+=1,从队列拉数据出来的时候值-=1
int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
if (writeBufferSize >= lowWaterMark) {//缓存队列数据量,超过高水位,也超过了低水位,意味着高水位>低水位,此时等于注册写操作
interestOps |= Channel.OP_WRITE;
} else {
interestOps &= ~Channel.OP_WRITE;//缓存队列数据量,超过高水位但是低于低水位,意味着低水位>高水位,此时等于没有注册写操作
}
} else {//超过高水位counter<=0,意味着当前数据量小于高水位
int highWaterMark = getConfig().getWriteBufferHighWaterMark();
if (writeBufferSize >= highWaterMark) {//这里,缓存数据量仍然高于高水位.....并发?按道理说channel的处理是单线程处理的,此时等于注册写操作
interestOps |= Channel.OP_WRITE;
} else {
interestOps &= ~Channel.OP_WRITE;
}
}
} else {
interestOps &= ~Channel.OP_WRITE;//写队列没数据,没有注册写操作
}
return interestOps;
}
即,如果超过高水位isWritable()==false,低于低水位isWritable()==true,低水位优先级高于高水位,即如果 当前水位>低水位 则不可写,否则可写
如果在通过netty向某机器写数据,但是写很缓慢,则会导致数据都缓存到netty的发送队列中,如果不做控制,可能会导致full gc/cms gc频繁,甚至最终OOM。所以可以考虑用高水位和低水位的值来控制netty的缓存队列,即用AbstractChannel.isWritable来控制是否继续写,如果AbstractChannel.isWritable==false,则丢弃数据,或者记录发送数据的状态,待后续缓存数据队列水位下降到安全水位后再发送。
Java代码
public void eventSunk(
ChannelPipeline pipeline, ChannelEvent e) throws Exception {
if (e instanceof ChannelStateEvent) {
……
} else if (e instanceof MessageEvent) {
MessageEvent event = (MessageEvent) e;
NioSocketChannel channel = (NioSocketChannel) event.getChannel();
boolean offered = channel.writeBufferQueue.offer(event);//写到channel的writeBufferQueue
assert offered;
channel.worker.writeFromUserCode(channel);
}
}
WriteRequestQueue的offer方法中会根据缓存消息的总大小(字节数)判断是否超过了高水位线highWaterMark,如果第一次超过了超过高水位线,就会fireChannelInterestChanged;后边如果仍然一直往队列放数据,缓存的消息的大小持续超过高水位线的时候,不会再fireChannelInterestChanged。
Java代码
public boolean offer(MessageEvent e) {
boolean success = queue.offer(e);
assert success;
int messageSize = getMessageSize(e);
int newWriteBufferSize = writeBufferSize.addAndGet(messageSize);
int highWaterMark = getConfig().getWriteBufferHighWaterMark();
if (newWriteBufferSize >= highWaterMark) {
if (newWriteBufferSize - messageSize < highWaterMark) {
highWaterMarkCounter.incrementAndGet();
if (!notifying.get()) {
notifying.set(Boolean.TRUE);
fireChannelInterestChanged(AbstractNioChannel.this);
notifying.set(Boolean.FALSE);
}
}
}
return true;
}
fireChannelInterestChanged这个会调到SimpleChannelUpstreamHandler.handleUpstream,触发SimpleChannelUpstreamHandler.channelInterestChanged,可以通过继承这个方法来自定义做些事情。高水位的值可以通过Bootstrap设置,最终会调到DefaultNioSocketChannelConfig.setOption。writeBufferHighWaterMark默认值为64K
Java代码
public boolean setOption(String key, Object value) {
if (super.setOption(key, value)) {
return true;
}
if ("writeBufferHighWaterMark".equals(key)) {
setWriteBufferHighWaterMark0(ConversionUtil.toInt(value));
} else if ("writeBufferLowWaterMark".equals(key)) {
setWriteBufferLowWaterMark0(ConversionUtil.toInt(value));
} else if ("writeSpinCount".equals(key)) {
setWriteSpinCount(ConversionUtil.toInt(value));
} else if ("receiveBufferSizePredictorFactory".equals(key)) {
setReceiveBufferSizePredictorFactory((ReceiveBufferSizePredictorFactory) value);
} else if ("receiveBufferSizePredictor".equals(key)) {
setReceiveBufferSizePredictor((ReceiveBufferSizePredictor) value);
} else {
return false;
}
return true;
}
然后在write0的时候会从队列拉数据,拉数据的时候,如果发现本次拉的数据会导致缓存的数据大小(字节)从低水位writeBufferLowWaterMark之上,掉到了低水位之下,即跨过了低水位,会再次触发fireChannelInterestChanged事件。writeBufferLowWaterMark默认值为32K
Java代码
public MessageEvent poll() {
MessageEvent e = queue.poll();
if (e != null) {
int messageSize = getMessageSize(e);
int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize);
int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) {
if (newWriteBufferSize + messageSize >= lowWaterMark) {//本次拉取,是的缓存数据大小掉到了低水位之下
highWaterMarkCounter.decrementAndGet();
if (isConnected() && !notifying.get()) {
notifying.set(Boolean.TRUE);
fireChannelInterestChanged(AbstractNioChannel.this);
notifying.set(Boolean.FALSE);
}
}
}
}
return e;
}
超过高水位和低于低水位都会触发fireChannelInterestChanged,怎么区分呢?通过AbstractChannel. isWritable(),如果channel的interestOps里边有注册过OP_WRITE,则是不可写的,否则是可写的
Java代码
public boolean isWritable() {
return (getInterestOps() & OP_WRITE) == 0;
}
public int getInterestOps() {
if (!isOpen()) {
return Channel.OP_WRITE;
}
int interestOps = getRawInterestOps();
int writeBufferSize = this.writeBufferSize.get();
if (writeBufferSize != 0) {
if (highWaterMarkCounter.get() > 0) {//还记得这个值,放数据到发送队列的时候值+=1,从队列拉数据出来的时候值-=1
int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
if (writeBufferSize >= lowWaterMark) {//缓存队列数据量,超过高水位,也超过了低水位,意味着高水位>低水位,此时等于注册写操作
interestOps |= Channel.OP_WRITE;
} else {
interestOps &= ~Channel.OP_WRITE;//缓存队列数据量,超过高水位但是低于低水位,意味着低水位>高水位,此时等于没有注册写操作
}
} else {//超过高水位counter<=0,意味着当前数据量小于高水位
int highWaterMark = getConfig().getWriteBufferHighWaterMark();
if (writeBufferSize >= highWaterMark) {//这里,缓存数据量仍然高于高水位.....并发?按道理说channel的处理是单线程处理的,此时等于注册写操作
interestOps |= Channel.OP_WRITE;
} else {
interestOps &= ~Channel.OP_WRITE;
}
}
} else {
interestOps &= ~Channel.OP_WRITE;//写队列没数据,没有注册写操作
}
return interestOps;
}
即,如果超过高水位isWritable()==false,低于低水位isWritable()==true,低水位优先级高于高水位,即如果 当前水位>低水位 则不可写,否则可写
如果在通过netty向某机器写数据,但是写很缓慢,则会导致数据都缓存到netty的发送队列中,如果不做控制,可能会导致full gc/cms gc频繁,甚至最终OOM。所以可以考虑用高水位和低水位的值来控制netty的缓存队列,即用AbstractChannel.isWritable来控制是否继续写,如果AbstractChannel.isWritable==false,则丢弃数据,或者记录发送数据的状态,待后续缓存数据队列水位下降到安全水位后再发送。
我要举报
如以上问答信息为低俗、色情、不良、暴力、侵权、涉及违法等信息,可以点下面链接进行举报!
大家都在看
推荐资讯