Spring Cloud(二十二):服务容错 Hystrix 框架核心原理

Netflix Github 官方文档:https://github.com/Netflix/Hystrix/wiki

在分布式环境中,系统所依赖的服务的稳定性存在不可控因素,不可避免地会存在失败。

Hystrix 通过隔离服务之间的访问点、阻止它们之间的级联故障并提供回退选项来做到这一点,所有这些都可以提高系统的整体弹性。

线程池隔离

Hystrix 通过命令模式将每个类型的业务请求封装成对应的命令处理器。把每一步操作封装成命令处理器,每个类型的 Command 都会初始化一个自己的线程池。创建好的线程池放入 ConcurrentHashMap 中。

抽象命令类是 AbstractCommand和抽象子类 HystrixCommand

Hystrix 线程池接口类 HystrixThreadPool ,内部一个静态工厂类 Factory用于创建线程池,一个默认的线程池实现类 HystrixThreadPoolDefault。线程池创建和维护原码如下:

AbstractCommand.initThreadPool:实现抽象命令AbstractCommand 的具体命令会调用抽象命令的构造方法来初始化命令对应的

1
2
3
4
5
6
7
8
9
// 初始化线程池
private static HystrixThreadPool initThreadPool(HystrixThreadPool fromConstructor, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults) {
if (fromConstructor == null) {
// get the default implementation of HystrixThreadPool
return HystrixThreadPool.Factory.getInstance(threadPoolKey, threadPoolPropertiesDefaults);
} else {
return fromConstructor;
}
}

HystrixThreadPool.Factory.getInstance:获取线程池,第一次使有会创建线程池并维护到 ConcurrentHashMap 中,后续使用从缓存中取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();

static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {
// 获取线程池的Key
String key = threadPoolKey.name();

// 缓存的线程池
HystrixThreadPool previouslyCached = threadPools.get(key);
if (previouslyCached != null) {
return previouslyCached;
}

// 走到这里说明是第一次使用, 需要初始化
synchronized (HystrixThreadPool.class) {
if (!threadPools.containsKey(key)) {
threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
}
}
return threadPools.get(key);
}

HystrixConcurrencyStrategy.getThreadPool:创建线程池是由 Hystrix 的抽象并发策略类负责创建。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
public abstract class HystrixConcurrencyStrategy {
// 获取线程池
public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize, HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);

final int dynamicCoreSize = corePoolSize.get();
final int dynamicMaximumSize = maximumPoolSize.get();

if (dynamicCoreSize > dynamicMaximumSize) {
// 如果核心线程数大于最大线程数, 给出错误提示并创建线程池, 最大线程数即为核心线程数
logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +
dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " +
dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime.get(), unit, workQueue, threadFactory);
} else {
return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime.get(), unit, workQueue, threadFactory);
}
}
// 获取线程池
public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);

final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get();
final int dynamicCoreSize = threadPoolProperties.coreSize().get();
final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();
final int maxQueueSize = threadPoolProperties.maxQueueSize().get();
final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);

if (allowMaximumSizeToDivergeFromCoreSize) {
final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
if (dynamicCoreSize > dynamicMaximumSize) {
// 如果核心线程数大于最大线程数, 给出错误提示并创建线程池, 最大线程数即为核心线程数
logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +
dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " +
dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
} else {
return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
}
} else {
return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
}
}
// 线程工厂,创建线程
private static ThreadFactory getThreadFactory(final HystrixThreadPoolKey threadPoolKey) {
if (!PlatformSpecific.isAppEngineStandardEnvironment()) {
return new ThreadFactory() {
private final AtomicInteger threadNumber = new AtomicInteger(0);

@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "hystrix-" + threadPoolKey.name() + "-" + threadNumber.incrementAndGet());
thread.setDaemon(true);
return thread;
}

};
} else {
return PlatformSpecific.getAppEngineThreadFactory();
}
}

/**
* 线程池的阻塞队列
*/
public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
if (maxQueueSize <= 0) {
return new SynchronousQueue<Runnable>();
} else {
return new LinkedBlockingQueue<Runnable>(maxQueueSize);
}
}

/**
* 提供在执行前包装/修饰{@code Callable<T>}的机会。
* 这可以用来注入额外的行为,比如复制线程状态(比如{@link ThreadLocal})。
* 默认实现, 可被子类重写
*/
public <T> Callable<T> wrapCallable(Callable<T> callable) {
return callable;
}

/**
* 请求参数,作用域是请求,不是 ThreadLocal
*/
public <T> HystrixRequestVariable<T> getRequestVariable(final HystrixRequestVariableLifecycle<T> rv) {
return new HystrixLifecycleForwardingRequestVariable<T>(rv);
}

}

信号量隔离

服务熔断

降级回退

Spring Cloud(二十二):服务容错 Hystrix 框架核心原理

http://blog.gxitsky.com/2022/02/21/SpringCloud-22-Hystrix-Circuit-Breaker-core-tec/

作者

光星

发布于

2022-02-21

更新于

2022-06-17

许可协议

评论