zookeeper(13)源码分析-请求处理链(3)
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了zookeeper(13)源码分析-请求处理链(3),小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含2569字,纯文字阅读大概需要4分钟。
内容图文
![zookeeper(13)源码分析-请求处理链(3)](/upload/InfoBanner/zyjiaocheng/1127/534d7ec047e849a6b3a148030a54ba90.jpg)
public class FinalRequestProcessor implements RequestProcessor {
ZooKeeperServer zks;
}
FinalRequestProcessor只实现了RequestProcessor接口,需要实现process Request方法和shutdown方法。
核心属性为zks,表示Zookeeper服务器,可以通过zks访问到Zookeeper内存数据库。
我们看一下核心方法process Request代码:
同步代码块
synchronized (zks.outstandingChanges) {
// Need to process local session requests
// 当前节点,处理请求,若为事务性请求,则提交到ZooKeeper内存数据库中。
// 对于processTxn函数而言,其最终会调用DataTree的processTxn
rc = zks.processTxn(request);
// request.hdr is set for write requests, which are the only ones
// that add to outstandingChanges.
//只有写请求才会有消息头
if (request.getHdr() != null) {
TxnHeader hdr = request.getHdr();
Record txn = request.getTxn();
long zxid = hdr.getZxid();
//当outstandingChanges不为空且其首元素的zxid小于等于请求的zxid时,
// 就会一直从outstandingChanges中取出首元素,并且对outstandingChangesForPath做相应的操作
while (!zks.outstandingChanges.isEmpty()
&& zks.outstandingChanges.peek().zxid <= zxid) {
ChangeRecord cr = zks.outstandingChanges.remove();
if (cr.zxid < zxid) {
LOG.warn("Zxid outstanding " + cr.zxid
+ " is less than current " + zxid);
}
if (zks.outstandingChangesForPath.get(cr.path) == cr) {
zks.outstandingChangesForPath.remove(cr.path);
}
}
}
// do not add non quorum packets to the queue.
//判断是否为事务性请求则是通过调用isQuorum函数
//只将quorum包(事务性请求)添加进队列
//addCommittedProposal函数将请求添加至ZKDatabase的committedLog结构中
if (request.isQuorum()) {
zks.getZKDatabase().addCommittedProposal(request);
}
}
如果请求是ping
根据请求的创建时间来更新Zookeeper服务器的延迟,updateLatency函数中会记录最大延迟、最小延迟、总的延迟和延迟次数。
然后更新响应中的状态,如请求创建到响应该请求总共花费的时间、最后的操作类型等。然后设置响应后返回
case OpCode.ping: {
//更新延迟
zks.serverStats().updateLatency(request.createTime);
lastOp = "PING";
// 更新响应的状态
cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,
request.createTime, Time.currentElapsedTime());
// 设置响应
cnxn.sendResponse(new ReplyHeader(-2,
zks.getZKDatabase().getDataTreeLastProcessedZxid(), 0), null, "response");
return;
}
其他请求与此类似,
最后会根据其他请求再次更新服务器的延迟,设置响应的状态等
// 获取最后处理的zxid
long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();
// 响应头
ReplyHeader hdr =
new ReplyHeader(request.cxid, lastZxid, err.intValue());
// 更新服务器延迟
zks.serverStats().updateLatency(request.createTime);
// 更新状态
cnxn.updateStatsForResponse(request.cxid, lastZxid, lastOp,
request.createTime, Time.currentElapsedTime());
最后使用sendResponse函数将响应发送给请求方。
try {
//返回相应
cnxn.sendResponse(hdr, rsp, "response");
if (request.type == OpCode.closeSession) {
//关闭会话
cnxn.sendCloseSession();
}
} catch (IOException e) {
LOG.error("FIXMSG",e);
}
原文:https://blog.51cto.com/janephp/2458874
内容总结
以上是互联网集市为您收集整理的zookeeper(13)源码分析-请求处理链(3)全部内容,希望文章能够帮你解决zookeeper(13)源码分析-请求处理链(3)所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。