Luckylau's Blog

微服务架构之Dubbo集群容错(3)

以 Invoker 为中心,从 Cluster, Directory, Router, LoadBalance,来解析各个接口。

Cluster

Cluster 将 Directory 中的多个 Invoker 伪装成一个 Invoker,对上层透明,伪装过程包含了容错逻辑,调用失败后,重试另一个。

MergeableCluster

官方解释:按组合并返回结果 ,比如菜单服务,接口一样,但有多种实现,用group区分,现在消费方需从每种group中调用一次返回结果,合并结果返回,这样就可以实现聚合菜单项。

配置使用:

服务方:

1
2
<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService1" group="gro_1"/>
<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService2" group="gro_2"/>

消费方:

1
<dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService" group="gro_1,gro_2" merger="true"/>

源码分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
public Result invoke(final Invocation invocation) throws RpcException {
List<Invoker<T>> invokers = directory.list(invocation);
String merger = getUrl().getMethodParameter(invocation.getMethodName(), Constants.MERGER_KEY); //如果merger = null 时候,然后从其中一个组中返回即可
if (ConfigUtils.isEmpty(merger)) {
for (final Invoker<T> invoker : invokers) {
if (invoker.isAvailable()) {
return invoker.invoke(invocation);
}
}
return invokers.iterator().next().invoke(invocation);
}
Class<?> returnType;//获取调用方法的返回类型
try {
returnType = getInterface().getMethod(
invocation.getMethodName(), invocation.getParameterTypes()).getReturnType();
} catch (NoSuchMethodException e) {
returnType = null;
}
Map<String, Future<Result>> results = new HashMap<String, Future<Result>>();
for (final Invoker<T> invoker : invokers) {
Future<Result> future = executor.submit(new Callable<Result>() {
public Result call() throws Exception {
return invoker.invoke(new RpcInvocation(invocation, invoker));
}
});
results.put(invoker.getUrl().getServiceKey(), future);
}//异步调用
Object result = null;
List<Result> resultList = new ArrayList<Result>(results.size());
int timeout = getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);//获取调用超时时间
for (Map.Entry<String, Future<Result>> entry : results.entrySet()) {
Future<Result> future = entry.getValue();
try {
Result r = future.get(timeout, TimeUnit.MILLISECONDS);
if (r.hasException()) {
log.error(new StringBuilder(32).append("Invoke ")
.append(getGroupDescFromServiceKey(entry.getKey()))
.append(" failed: ")
.append(r.getException().getMessage()).toString(),
r.getException());
} else {
resultList.add(r);
}
} catch (Exception e) {
throw new RpcException(new StringBuilder(32)
.append("Failed to invoke service ")
.append(entry.getKey())
.append(": ")
.append(e.getMessage()).toString(),
e);
}
}//在超时间内获取调用结果,如果失败了,记录到日志中
if (resultList.size() == 0) {
return new RpcResult((Object) null);//如果返回为空,就直接返回了
} else if (resultList.size() == 1) {
return resultList.iterator().next();//如果就一个,就返回它即可
}
if (returnType == void.class) {
return new RpcResult((Object) null);//如果发现是void类型 返回为null
}
if (merger.startsWith(".")) { //当配置merger =".addAll"
merger = merger.substring(1);
Method method;
try {
//获取结果类型名字为merger的方法,例如 returnType是String merger指定为.compareTo
method = returnType.getMethod(merger, returnType);//获得对象所声明的公开方法
} catch (NoSuchMethodException e) {
throw new RpcException(new StringBuilder(32)
.append("Can not merge result because missing method [ ")
.append(merger)
.append(" ] in class [ ")
.append(returnType.getClass().getName())
.append(" ]")
.toString());
}
if (method != null) { //修改为公共访问
if (!Modifier.isPublic(method.getModifiers())) {
method.setAccessible(true);
}
result = resultList.remove(0).getValue();
try {
if (method.getReturnType() != void.class
&& method.getReturnType().isAssignableFrom(result.getClass())) {
for (Result r : resultList) {
result = method.invoke(result, r.getValue());
}
} else {
for (Result r : resultList) {
method.invoke(result, r.getValue());
}
}
} catch (Exception e) {
throw new RpcException(
new StringBuilder(32)
.append("Can not merge result: ")
.append(e.getMessage()).toString(),
e);
}
} else {
throw new RpcException(
new StringBuilder(32)
.append("Can not merge result because missing method [ ")
.append(merger)
.append(" ] in class [ ")
.append(returnType.getClass().getName())
.append(" ]")
.toString());
}
} else {
Merger resultMerger; //merger = "true"
if (ConfigUtils.isDefault(merger)) {
resultMerger = MergerFactory.getMerger(returnType);//根据类型获取相应的Merger
} else {
resultMerger = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(merger);//自定义的
}
if (resultMerger != null) {
List<Object> rets = new ArrayList<Object>(resultList.size());
for (Result r : resultList) {
rets.add(r.getValue());
}
result = resultMerger.merge(
rets.toArray((Object[]) Array.newInstance(returnType, 0)));//合并结果
} else {
throw new RpcException("There is no merger to merge result.");
}
}
return new RpcResult(result);
}

AvailableCluster

根据字面意思就是可用的。主要用于服务注册时候,生产代理对象。后面会讲到如何向注册中心注册,并获取服务端相应的信息,当然也可以配置使用:

1
<dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService" cluster="available"/>

源码很简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new AbstractClusterInvoker<T>(directory) {
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
for (Invoker<T> invoker : invokers) {
if (invoker.isAvailable()) {
return invoker.invoke(invocation);
}
}
throw new RpcException("No provider available in " + invokers);
}
};
}

ForkingCluster

并行调用多个服务器,只要一个成功即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。可通过 forks=”2” 来设置最大并行数(官方虽然这么说,但是还是不知道如何配置)。从代码看默认就是2个

源码分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
//如果invokers == null || invokers.size() == 0 抛异常
checkInvokers(invokers, invocation);
final List<Invoker<T>> selected;
//默认forks = 2
final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (forks <= 0 || forks >= invokers.size()) {
selected = invokers; //全选
} else {
selected = new ArrayList<Invoker<T>>();
for (int i = 0; i < forks; i++) {
// TODO. Add some comment here, refer chinese version for more details.
Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
if (!selected.contains(invoker)) {//Avoid add the same invoker several times.
selected.add(invoker);
}
}
}
RpcContext.getContext().setInvokers((List) selected);
final AtomicInteger count = new AtomicInteger();
final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();
for (final Invoker<T> invoker : selected) {
executor.execute(new Runnable() {
public void run() {
try {
Result result = invoker.invoke(invocation);
ref.offer(result);
} catch (Throwable e) {
int value = count.incrementAndGet();
if (value >= selected.size()) {
ref.offer(e);
}
}
}
});
}
try {
Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);//取队列头数据返回即可
if (ret instanceof Throwable) {
Throwable e = (Throwable) ret;
throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
}
return (Result) ret;
} catch (InterruptedException e) {
throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
}
}
}

FailfastCluster

快速失败,只发起一次调用,失败立即报错。通常用于非幂等性的写操作,比如新增记录。

1
2
3
4
5
6
7
8
9
10
11
12
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
checkInvokers(invokers, invocation);//同样先检测能不能用
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);//直接选中一个,进行调用
try {
return invoker.invoke(invocation);
} catch (Throwable e) {
if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.
throw (RpcException) e;
}
throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
}
}

FailoverCluster

失败自动切换,当出现失败,重试其它服务器。通常用于读操作,但重试会带来更长延迟。可通过 retries=”2” 来设置重试次数(不含第一次)。默认配置,详情见微服务架构之Dubbo集群容错(2)

FailbackCluster

失败自动恢复,后台记录失败请求,定时重发。通常用于消息通知操作。

1
2
3
4
5
6
7
8
9
10
11
12
protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
try {
checkInvokers(invokers, invocation);
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
return invoker.invoke(invocation);
} catch (Throwable e) {
logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "
+ e.getMessage() + ", ", e);
addFailed(invocation, this); //关键点,我们看看这个代码
return new RpcResult(); // ignore
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void addFailed(Invocation invocation, AbstractClusterInvoker<?> router) {
if (retryFuture == null) {
synchronized (this) {
if (retryFuture == null) {
retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
public void run() {
// collect retry statistics
try {
retryFailed(); // 重试
} catch (Throwable t) { // Defensive fault tolerance
logger.error("Unexpected error occur at collect statistic", t);
}
}
}, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS);
}
}
}
failed.put(invocation, router);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void retryFailed() {
if (failed.size() == 0) {
return;
}
for (Map.Entry<Invocation, AbstractClusterInvoker<?>> entry : new HashMap<Invocation, AbstractClusterInvoker<?>>(
failed).entrySet()) {
Invocation invocation = entry.getKey();
Invoker<?> invoker = entry.getValue();
try {
invoker.invoke(invocation);
failed.remove(invocation);
} catch (Throwable e) {
logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e);
}
}
}

FailsafeCluster

失败安全,出现异常时,直接忽略。通常用于写入审计日志等操作。

1
2
3
4
5
6
7
8
9
10
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
try {
checkInvokers(invokers, invocation);
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
return invoker.invoke(invocation);
} catch (Throwable e) {
logger.error("Failsafe ignore exception: " + e.getMessage(), e);
return new RpcResult(); // ignore
}
}

BroadcastCluster

广播调用所有提供者,逐个调用,任意一台报错则报错。通常用于通知所有提供者更新缓存或日志等本地资源信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
checkInvokers(invokers, invocation);//同样检测invokers == null 或 inovokers.size() == 0
RpcContext.getContext().setInvokers((List) invokers);
RpcException exception = null;
Result result = null;
for (Invoker<T> invoker : invokers) {
try {
result = invoker.invoke(invocation); //逐一调用
} catch (RpcException e) {
exception = e;
logger.warn(e.getMessage(), e);
} catch (Throwable e) {
exception = new RpcException(e.getMessage(), e);
logger.warn(e.getMessage(), e);
}
}
if (exception != null) {
throw exception;
}
return result;
}

参考:

https://www.jianshu.com/u/f7daa458b874

Luckylau wechat
如果对您有价值,看官可以打赏的!