探秘微服务治理之Spring Cloud Netflix Eureka
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了探秘微服务治理之Spring Cloud Netflix Eureka,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含17550字,纯文字阅读大概需要26分钟。
内容图文
说起微服务框架,可能大家首先想到的就是Spring Cloud的大家族了。微服务简单理解就是很小、功能相对集中的服务。在微服务的思想下,原有功能都集中在一起实现的业务,现在拆分出来很多的服务模块,那服务之间的互相调用就不可避免,同时服务的高可用也要能够有所保证。要解决上述需要,少不了微服务中至关重要的内容,即服务治理。
服务治理包括三大组件:服务注册中心、服务的注册、服务的发现。举个例子:就好比我们租房,房东呢,想要把房子租出去;房客想要租房。房东把自己的房子信息挂在租房平台上,房客通过租房平台了解到出租房屋的信息,进而可以给房东打电话租房。
这里呢租房平台就相当于服务注册中心,房东在平台上登记出租房屋相当于房屋的注册,房客从平台上获取出租信息相当于服务的发现。进入到程序的世界,Spring Cloud Eureka组件就提供了服务治理的功能。Eureka的服务端就是注册中心;客户端既可以是消费者也可以是提供者,承担着服务的注册和发现。
简单总结各种解决要提供的主要功能:
服务端:
接收注册。
接收续期。
提供服务下线。
提供服务列表
客户端:
注册服务。
续期。
获取服务列表。
维持心跳。
大体上了解之后,我们就来探索一下在底层是如何实现的。
搭建Eureka环境(单实例环境)
基于Springboot我们可以轻松的搭建Eureka的客户端和服务端。
一、服务端搭建:
引入依赖:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId></dependency>
主类上添加注解:
@SpringBootApplication
@EnableEurekaServer
public class EurekaServer_6001 {
? ?public static void main(String[] args) {
? ? ? ?SpringApplication.run(EurekaServer_6001.class, args);
? ?}
}
在application.properties中相关Eureka的配置:
eureka: instance: hostname: eurekapeer1 #eureka的注册实例名称 client: register-with-eureka: false # 服务注册,不让自己注册到eureka中 fetch-registry: false # 服务的发现 , 自己不从服务中发现信息 service-url: defaultZone: http://localhost:6001/eureka/
启动服务,使用浏览器访问配置的端口,可以看到Eureka的服务端的页面:
二、客户端搭建:
引入依赖:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency>
主类添加注解:
@EnableDiscoveryClient@SpringBootApplicationpublic class ManagerZuulFilter { public static void main(String[] args) { SpringApplication.run(ManagerZuulFilter.class,args); }}
在application.properties中相关Eureka的配置:
eureka: client: register-with-eureka: true #服务注册开关 fetch-registry: true #服务发现开关 serviceUrl: defaultZone: http://localhost:6001/eureka/
启动服务,刷新服务端Eureka的页面,看到客户端服务注册到Eureka的页面。
关闭客户端服务,服务端Eureka的管理页面,客户端的服务从列表中下线。
源码详解
一、客户端源码分析
客户端服务的治理包括:服务的注册、服务的续期、服务的下线、服务列表的发现等。
Eureka的注册和发现的内容在DiscoveryClient类中。
@Inject
? ?DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider) {
? ? ? ?//...省略一些初始化的内容
? ? ? //不注册到Eureka server并且不从Eureka server中获取服务列表,相当于单节点的Eureka服务端
? ? ? ?if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
? ? ? ? ? ?logger.info("Client configured to neither register nor query for data.");
? ? ? ? ? ?this.scheduler = null;
? ? ? ? ? ?this.heartbeatExecutor = null;
? ? ? ? ? ?this.cacheRefreshExecutor = null;
? ? ? ? ? ?this.eurekaTransport = null;
? ? ? ? ? ?this.instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), this.clientConfig.getRegion());
? ? ? ? ? ?DiscoveryManager.getInstance().setDiscoveryClient(this);
? ? ? ? ? ?DiscoveryManager.getInstance().setEurekaClientConfig(config);
? ? ? ? ? ?this.initTimestampMs = System.currentTimeMillis();
? ? ? ? ? ?logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", this.initTimestampMs, this.getApplications().size());
? ? ? ?} else {
? ? ? ? ? ?try {
? ? ? ? ? ? ? ?//注册到Eureka并且从Eureka中获取服务列表的逻辑处理
? ? ? ? ? ? ? ?this.scheduler = Executors.newScheduledThreadPool(2, (new ThreadFactoryBuilder()).setNameFormat("DiscoveryClient-%d").setDaemon(true).build());
? ? ? ? ? ? ? ?//创建心跳定时执行任务
? ? ? ? ? ? ? ?this.heartbeatExecutor = new ThreadPoolExecutor(1, this.clientConfig.getHeartbeatExecutorThreadPoolSize(), 0L, TimeUnit.SECONDS, new SynchronousQueue(), (new ThreadFactoryBuilder()).setNameFormat("DiscoveryClient-HeartbeatExecutor-%d").setDaemon(true).build());
? ? ? ? ? ? ? ?//创建刷新缓存定时执行任务
? ? ? ? ? ? ? ?this.cacheRefreshExecutor = new ThreadPoolExecutor(1, this.clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0L, TimeUnit.SECONDS, new SynchronousQueue(), (new ThreadFactoryBuilder()).setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d").setDaemon(true).build());
? ? ? ? ? ? //注册前处理器,可以在服务注册到Eureka之前做些操作
? ? ? ? ? ?if (this.preRegistrationHandler != null) {
? ? ? ? ? ? ? ?this.preRegistrationHandler.beforeRegistration();
? ? ? ? ? ?}
? ? ? ? ? ?//是否强制初始阶段就注册到Eureka
? ? ? ? ? ?if (this.clientConfig.shouldRegisterWithEureka() && this.clientConfig.shouldEnforceRegistrationAtInit()) {
? ? ? ? ? ? ? ?try {
? ? ? ? ? ? ? ? ? ?if (!this.register()) {
? ? ? ? ? ? ? ? ? ? ? ?throw new IllegalStateException("Registration error at startup. Invalid server response.");
? ? ? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ?} catch (Throwable var8) {
? ? ? ? ? ? ? ? ? ?logger.error("Registration error at startup: {}", var8.getMessage());
? ? ? ? ? ? ? ? ? ?throw new IllegalStateException(var8);
? ? ? ? ? ? ? ?}
? ? ? ? ? ?}
? ? ? ? ? ?//执行定时任务
? ? ? ? ? ?this.initScheduledTasks();
? ? ? ? ? ?try {
? ? ? ? ? ? ? ?Monitors.registerObject(this);
? ? ? ? ? ?} catch (Throwable var7) {
? ? ? ? ? ? ? ?logger.warn("Cannot register timers", var7);
? ? ? ? ? ?}
? ? ? ? ? ?DiscoveryManager.getInstance().setDiscoveryClient(this);
? ? ? ? ? ?DiscoveryManager.getInstance().setEurekaClientConfig(config);
? ? ? ? ? ?this.initTimestampMs = System.currentTimeMillis();
? ? ? ? ? ?logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", this.initTimestampMs, this.getApplications().size());
? ? ? ?}
? ?}
//初始化定时任务
private void initScheduledTasks() {
? ? ? ?int renewalIntervalInSecs;
? ? ? ?int expBackOffBound;
? ? ? //定时获取服务列表任务开启
? ? ? ?if (this.clientConfig.shouldFetchRegistry()) {
? ? ? ? ? ?renewalIntervalInSecs = this.clientConfig.getRegistryFetchIntervalSeconds();
? ? ? ? ? ?expBackOffBound = this.clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
? ? ? ? ? ?this.scheduler.schedule(new TimedSupervisorTask("cacheRefresh", this.scheduler, this.cacheRefreshExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new DiscoveryClient.CacheRefreshThread()), (long)renewalIntervalInSecs, TimeUnit.SECONDS);
? ? ? ?}
? ? ? ?//注册到Eureka,心跳定时任务开始。要让Eureka Server知道客户端还活着
? ? ? ?if (this.clientConfig.shouldRegisterWithEureka()) {
? ? ? ? ? ?renewalIntervalInSecs = this.instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
? ? ? ? ? ?expBackOffBound = this.clientConfig.getHeartbeatExecutorExponentialBackOffBound();
? ? ? ? ? ?logger.info("Starting heartbeat executor: renew interval is: {}", renewalIntervalInSecs);
? ? ? ? ? ?this.scheduler.schedule(new TimedSupervisorTask("heartbeat", this.scheduler, this.heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new DiscoveryClient.HeartbeatThread()), (long)renewalIntervalInSecs, TimeUnit.SECONDS);
? ? ? ? ? ?this.instanceInfoReplicator = new InstanceInfoReplicator(this, this.instanceInfo, this.clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2);
? ? ? ? ? ?this.statusChangeListener = new StatusChangeListener() {
? ? ? ? ? ? ? ?public String getId() {
? ? ? ? ? ? ? ? ? ?return "statusChangeListener";
? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ?public void notify(StatusChangeEvent statusChangeEvent) {
? ? ? ? ? ? ? ? ? ?if (InstanceStatus.DOWN != statusChangeEvent.getStatus() && InstanceStatus.DOWN != statusChangeEvent.getPreviousStatus()) {
? ? ? ? ? ? ? ? ? ? ? ?DiscoveryClient.logger.info("Saw local status change event {}", statusChangeEvent);
? ? ? ? ? ? ? ? ? ?} else {
? ? ? ? ? ? ? ? ? ? ? ?DiscoveryClient.logger.warn("Saw local status change event {}", statusChangeEvent);
? ? ? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ? ? ?DiscoveryClient.this.instanceInfoReplicator.onDemandUpdate();
? ? ? ? ? ? ? ?}
? ? ? ? ? ?};
? ? ? ? ? ?if (this.clientConfig.shouldOnDemandUpdateStatusChange()) {
? ? ? ? ? ? ? ?this.applicationInfoManager.registerStatusChangeListener(this.statusChangeListener);
? ? ? ? ? ?}
? ? ? ?this.instanceInfoReplicator.start(this.clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
? ? ? ?} else {
? ? ? ? ? ?logger.info("Not registering with Eureka server per configuration");
? ? ? ?}
? ?}
//注册发送,REST请求,注册服务到Eureka Server中。
boolean register() throws Throwable {
? ? ? ?logger.info("DiscoveryClient_{}: registering service...", this.appPathIdentifier);
? ? ? ?EurekaHttpResponse httpResponse;
? ? ? ?try {
? ? ? ? ? ?httpResponse = this.eurekaTransport.registrationClient.register(this.instanceInfo);
? ? ? ?} catch (Exception var3) {
? ? ? ? ? ?logger.warn("DiscoveryClient_{} - registration failed {}", new Object[]{this.appPathIdentifier, var3.getMessage(), var3});
? ? ? ? ? ?throw var3;
? ? ? ?}
? ? ? ?if (logger.isInfoEnabled()) {
? ? ? ? ? ?logger.info("DiscoveryClient_{} - registration status: {}", this.appPathIdentifier, httpResponse.getStatusCode());
? ? ? ?}
? ? ? ?return httpResponse.getStatusCode() == 204;
? ?}
查看线程,发现确实有上述几类线程。
二、服务端源码分析
服务端的主要逻辑就是:接收客户端的注册/续期/下线等请求、提供服务列表、维护注册信息等。
#eureka server端的功能代码主要是在eureka-core包中主要涉及类和接口有:
EurekaServerBootstrap
InstanceRegistry
下面主要为大家讲一下客户端注册在服务端如何处理:eureka-server包下org.springframework.cloud.netflix.eureka.server.EurekaServerBootstrap初始化eurekserver相关的内容:
public class EurekaServerBootstrap { public void contextInitialized(ServletContext context) { try { this.initEurekaEnvironment();//初始化eureka的部署环境,默认test this.initEurekaServerContext();//初始化eureka server的上下文 context.setAttribute(EurekaServerContext.class.getName(), this.serverContext); } catch (Throwable var3) { log.error("Cannot bootstrap eureka server :", var3); throw new RuntimeException("Cannot bootstrap eureka server :", var3); } }}
eureka-server包下的org.springframework.cloud.netflix.eureka.server.InstanceRegistry中方法为跟服务治理有关系的内容。
public class InstanceRegistry extends PeerAwareInstanceRegistryImpl implements ApplicationContextAware {
? ?public void register(InstanceInfo info, int leaseDuration, boolean isReplication) {
? ? ? ?this.handleRegistration(info, leaseDuration, isReplication);
? ? ? ?super.register(info, leaseDuration, isReplication);//带有租期的注册,默认租期90s
? ?}
}
public abstract class AbstractInstanceRegistry implements InstanceRegistry {
? ? //存放注册到eureka server上的服务的数据接口
? ?//第一层的String key是指服务名
? ?//第二层的String key是指每个服务的instanceId:默认hostmame:applicationName:port
? ?//最里面的InstanceInfo就是一个服务的详细信息
? ?private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap();
? ?//客户端注册到服务端的具体处理过程
? ?public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
? ? ? ?try {
? ? ? ? ? ?this.read.lock();
? ? ? ? ? ?//该Map存储的就是eureka server上的服务信息
? ? ? ? ? ?//根据应用名获取该应用名下的列表
? ? ? ? ? ?Map<String, Lease<InstanceInfo>> gMap = (Map)this.registry.get(registrant.getAppName());
? ? ? ? ? ?EurekaMonitors.REGISTER.increment(isReplication);
? ? ? ? ? ?if (gMap == null) {
? ? ? ? ? ? ? ?//没有注册过,则新创建
? ? ? ? ? ? ? ?ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap();
? ? ? ? ? ? ? ?gMap = (Map)this.registry.putIfAbsent(registrant.getAppName(), gNewMap);
? ? ? ? ? ? ? ?if (gMap == null) {
? ? ? ? ? ? ? ? ? ?gMap = gNewMap;
? ? ? ? ? ? ? ?}
? ? ? ? ? ?}
? ? ? ? ? //通过instanceId获取具体的服务
? ? ? ? ? ?Lease<InstanceInfo> existingLease = (Lease)((Map)gMap).get(registrant.getId());
? ? ? ? ? ?if (existingLease != null && existingLease.getHolder() != null) {
? ? ? ? ? ? ? ?Long existingLastDirtyTimestamp = ((InstanceInfo)existingLease.getHolder()).getLastDirtyTimestamp();
? ? ? ? ? ? ? ?Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
? ? ? ? ? ? ? ?logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
? ? ? ? ? ? ? ?if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
? ? ? ? ? ? ? ? ? ?logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
? ? ? ? ? ? ? ? ? ?logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
? ? ? ? ? ? ? ? ? ?registrant = (InstanceInfo)existingLease.getHolder();
? ? ? ? ? ? ? ?}
? ? ? ? ? ?} else {
? ? ? ? ? ? ? ?synchronized(this.lock) {
? ? ? ? ? ? ? ? ? ?if (this.expectedNumberOfRenewsPerMin > 0) {
? ? ? ? ? ? ? ? ? ? ? ?this.expectedNumberOfRenewsPerMin += 2;
? ? ? ? ? ? ? ? ? ? ? ?this.numberOfRenewsPerMinThreshold = (int)((double)this.expectedNumberOfRenewsPerMin * this.serverConfig.getRenewalPercentThreshold());
? ? ? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ?logger.debug("No previous lease information found; it is new registration");
? ? ? ? ? ?}
? ? ? ? ? ?//创建新的租期信息
? ? ? ? ? ?Lease<InstanceInfo> lease = new Lease(registrant, leaseDuration);
? ? ? ? ? ?if (existingLease != null) {
? ? ? ? ? ? ? ?lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
? ? ? ? ? ?}
? ? ? ? ? ?//注册服务或者更新服务
? ? ? ? ? ?((Map)gMap).put(registrant.getId(), lease);
? ? ? ? ? ?synchronized(this.recentRegisteredQueue) {
? ? ? ? ? ? ? ?this.recentRegisteredQueue.add(new Pair(System.currentTimeMillis(), registrant.getAppName() + "(" + registrant.getId() + ")"));
? ? ? ? ? ?}
? ? ? ? ? ?if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
? ? ? ? ? ? ? ?logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the overrides", registrant.getOverriddenStatus(), registrant.getId());
? ? ? ? ? ? ? ?if (!this.overriddenInstanceStatusMap.containsKey(registrant.getId())) {
? ? ? ? ? ? ? ? ? ?logger.info("Not found overridden id {} and hence adding it", registrant.getId());
? ? ? ? ? ? ? ? ? ?this.overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
? ? ? ? ? ? ? ?}
? ? ? ? ? ?}
? ? ? ? ? ?InstanceStatus overriddenStatusFromMap = (InstanceStatus)this.overriddenInstanceStatusMap.get(registrant.getId());
? ? ? ? ? ?if (overriddenStatusFromMap != null) {
? ? ? ? ? ? ? ?logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
? ? ? ? ? ? ? ?registrant.setOverriddenStatus(overriddenStatusFromMap);
? ? ? ? ? ?}
? ? ? ? ? ?InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(registrant, existingLease, isReplication);
? ? ? ? ? ?registrant.setStatusWithoutDirty(overriddenInstanceStatus);
? ? ? ? ? ?if (InstanceStatus.UP.equals(registrant.getStatus())) {
? ? ? ? ? ? ? ?lease.serviceUp();
? ? ? ? ? ?}
? ? ? ? ? ?registrant.setActionType(ActionType.ADDED);
? ? ? ? ? ?this.recentlyChangedQueue.add(new AbstractInstanceRegistry.RecentlyChangedItem(lease));
? ? ? ? ? ?registrant.setLastUpdatedTimestamp();
? ? ? ? ? ?this.invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
? ? ? ? ? ?logger.info("Registered instance {}/{} with status {} (replication={})", new Object[]{registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication});
? ? ? ?} finally {
? ? ? ? ? ?this.read.unlock();
? ? ? ?}
? ?}
}
配置类
常用配置说明:客户端、服务端、以及实例配置可以直接在配置类的属性中查到:
客户端配置:前缀为eureka.client
org.springframework.cloud.netflix.eureka.EurekaClientConfigBean
服务端配置:前缀为eureka.server
org.springframework.cloud.netflix.eureka.server.EurekaServerConfigBean
实例配置:前缀eureka.instance
org.springframework.cloud.netflix.eureka.EurekaInstanceConfigBean
常用配置
注册中心配置:
#关闭注册中心的保护机制,Eureka 会统计15分钟之内心跳失败的比例低于85%将会触发保护机制,不剔除服务提供者eureka.server.enable-self-preservation = false
服务实例配置:
#不使用主机名来定义注册中心的地址,而使用IP地址的形式eureka.instance.prefer-ip-address = true
服务注册类配置:
#获取服务注册列表eureka.client.fetch-registery = true#注册服务到Eureka Servereureka.client.register-with-eureka = true#从Eureka服务器端获取注册信息的间隔时间,单位:秒eureka.client.registery-fetch-interval-seconds = 30#读取EurekaServer信息的超时时间,单位:秒eureka.client.eureka-server-read-timeout-seconds = 8#连接EurekaServer的超时时间,单位:秒eureka.client.eureka-server-connect-timeout-seconds = 5
通用配置:
eureka.client.service-url.defaultZone = http://localhost:8761/eureka
Spring cloud Eureka中还提供了其他的一些功能,比如认证和限流的内容,有兴趣的可以了解一下。除Eureka提供的服务治理外,还有其他的框架也可以实现服务治理,如Dubbo+Zookeeper,有兴趣可以对比看一下。
框架可以算是一种工具,了解工具的内部是为了更好的让工具为我们服务。优秀框架的设计思想是我们要学习的重点,抽丝剥茧之后,原理其实很基础。
内容总结
以上是互联网集市为您收集整理的探秘微服务治理之Spring Cloud Netflix Eureka全部内容,希望文章能够帮你解决探秘微服务治理之Spring Cloud Netflix Eureka所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。