zookeeper(14)源码分析-服务器(1)
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了zookeeper(14)源码分析-服务器(1),小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含6362字,纯文字阅读大概需要10分钟。
内容图文
ZooKeeperServer,为所有服务器的父类。
QuorumZooKeeperServer,其是所有参与选举的服务器的父类,是抽象类,其继承了ZooKeeperServer类。
LeaderZooKeeperServer,Leader服务器,继承了QuorumZooKeeperServer类,也会继承ZooKeeperServer中的很多方法。
LearnerZooKeeper,其是Learner服务器的父类,为抽象类,也继承了QuorumZooKeeperServer类。
FollowerZooKeeperServer,Follower服务器,继承了LearnerZooKeeper。
ObserverZooKeeperServer,Observer服务器,继承了LearnerZooKeeper。
ReadOnlyZooKeeperServer,只读服务器,不提供写服务,继承QuorumZooKeeperServer。
ZooKeeperServer
1、类的继承关系
public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {}
ZooKeeperServer是ZooKeeper中所有服务器的父类,其实现了Session.Expirer和ServerStats.Provider接口,SessionExpirer中定义了expire方法(表示会话过期)和getServerId方法(表示获取服务器ID),而Provider则主要定义了获取服务器某些数据的方法。
2、类属性
protected static final Logger LOG;
static {
LOG = LoggerFactory.getLogger(ZooKeeperServer.class);
Environment.logEnv("Server environment:", LOG);
}
//jmx服务
protected ZooKeeperServerBean jmxServerBean;
protected DataTreeBean jmxDataTreeBean;
// 默认心跳频率
public static final int DEFAULT_TICK_TIME = 3000;
protected int tickTime = DEFAULT_TICK_TIME;
// 最小会话过期时间
/** value of -1 indicates unset, use default */
protected int minSessionTimeout = -1;
// 最大会话过期时间
/** value of -1 indicates unset, use default */
protected int maxSessionTimeout = -1;
protected SessionTracker sessionTracker;
// 事务日志快照
private FileTxnSnapLog txnLogFactory = null;
// Zookeeper内存数据库
private ZKDatabase zkDb;
private final AtomicLong hzxid = new AtomicLong(0);
public final static Exception ok = new Exception("No prob");
// 请求处理器
protected RequestProcessor firstProcessor;
protected volatile State state = State.INITIAL;
protected enum State {
INITIAL, RUNNING, SHUTDOWN, ERROR
}
/**
* This is the secret that we use to generate passwords. For the moment,
* it's more of a checksum that's used in reconnection, which carries no
* security weight, and is treated internally as if it carries no
* security weight.
*/
static final private long superSecret = 0XB3415C00L;
private final AtomicInteger requestsInProcess = new AtomicInteger(0);
// 未处理的ChangeRecord
final Deque<ChangeRecord> outstandingChanges = new ArrayDeque<>();
// this data structure must be accessed under the outstandingChanges lock
// 记录path对应的ChangeRecord
final HashMap<String, ChangeRecord> outstandingChangesForPath =
new HashMap<String, ChangeRecord>();
protected ServerCnxnFactory serverCnxnFactory;
protected ServerCnxnFactory secureServerCnxnFactory;
// 服务器统计数据
private final ServerStats serverStats;
private final ZooKeeperServerListener listener;
private ZooKeeperServerShutdownHandler zkShutdownHandler;
private volatile int createSessionTrackerServerId = 1;
3、核心函数
3.1 loadData
该函数用于加载数据,其首先会判断内存库是否已经加载设置zxid,之后会调用killSession函数删除过期的会话
if(zkDb.isInitialized()){ // 内存数据库已被初始化
// 设置为最后处理的Zxid
setZxid(zkDb.getDataTreeLastProcessedZxid());
}
else {
// 未被初始化,则加载数据库
setZxid(zkDb.loadDataBase());
}
// Clean up dead sessions
LinkedList<Long> deadSessions = new LinkedList<Long>();
for (Long session : zkDb.getSessions()) {// 遍历所有的会话
if (zkDb.getSessionWithTimeOuts().get(session) == null) {
deadSessions.add(session);
}
}
for (long session : deadSessions) { // 删除过期的会话
// XXX: Is lastProcessedZxid really the best thing to use?
killSession(session, zkDb.getDataTreeLastProcessedZxid());
}
// Make a clean snapshot
//初始化一个快照
takeSnapshot();
3.2、submitRequest
提交请求,处理器进行处理
public void submitRequest(Request si) {
if (firstProcessor == null) {// 第一个处理器为空
synchronized (this) {
try {
// Since all requests are passed to the request
// processor it should wait for setting up the request
// processor chain. The state will be updated to RUNNING
// after the setup.
//服务器调用链还未初始化完成
while (state == State.INITIAL) {
wait(1000);
}
} catch (InterruptedException e) {
LOG.warn("Unexpected interruption", e);
}
if (firstProcessor == null || state != State.RUNNING) {
throw new RuntimeException("Not started");
}
}
}
try {
touch(si.cnxn);
// 是否为合法的请求
boolean validpacket = Request.isValid(si.type);
if (validpacket) {
//调用链第一处理器开始处理
firstProcessor.proce***equest(si);
if (si.cnxn != null) {
incInProcess();
}
} else {
LOG.warn("Received packet at server of unknown type " + si.type);
new UnimplementedRequestProcessor().proce***equest(si);
}
} catch (MissingSessionException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Dropping request: " + e.getMessage());
}
} catch (RequestProcessorException e) {
LOG.error("Unable to process request:" + e.getMessage(), e);
}
}
LeaderZooKeeperServer介绍
1、类属性
// 提交请求处理器
CommitProcessor commitProcessor;
//处理链请求第一个处理处理器
PrepRequestProcessor prepRequestProcessor;
2、构造方法
LeaderZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) throws IOException {
super(logFactory, self.tickTime, self.minSessionTimeout, self.maxSessionTimeout, zkDb, self);
}
直接调用父类QuorumZooKeeperServer的构造函数,然后再调用ZooKeeperServer的构造函数,逐级构造。
3、核心函数
3.1、setupRequestProcessors
@Override
protected void setupRequestProcessors() {
//创建FinalRequestProcessor,处理链最后一个处理器
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
//创建ToBeAppliedRequestProcessor
RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
// 创建CommitProcessor,提交处理器
commitProcessor = new CommitProcessor(toBeAppliedProcessor,
Long.toString(getServerId()), false,
getZooKeeperServerListener());
// 启动CommitProcessor
commitProcessor.start();
// 创建ProposalRequestProcessor
ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
commitProcessor);
// 初始化ProposalProcessor
proposalProcessor.initialize();
//创建PrepRequestProcessor,作为以第一个处理链处理器
prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
prepRequestProcessor.start();
// firstProcessor为PrepRequestProcessor
firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);
setupContainerManager();
}
setupRequestProcessors函数表示创建处理链,可以看到其处理链的顺序为PrepRequestProcessor -> ProposalRequestProcessor -> CommitProcessor -> Leader.ToBeAppliedRequestProcessor -> FinalRequestProcessor。
内容总结
以上是互联网集市为您收集整理的zookeeper(14)源码分析-服务器(1)全部内容,希望文章能够帮你解决zookeeper(14)源码分析-服务器(1)所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。