Luckylau's Blog

微服务架构之Dubbo服务发布(1)

​ 这一部分主要梳理Dubbo的服务发布的原理,会涉及dubbo-config等模块内容,从以下几个方面来一步一步深入学习。

dubbo-config模块包括dubbo-config-api 和dubbo-config-spring 。

dubbo-config-api中重要的类包括ReferenceConfig和ServiceConfig,分别处理消费端的配置(服务引用)和提供服务端的配置(服务发布)

dubbo-config-spring 主要是扩展spring配置标签的扩展;

服务发布的开始还是要从ServiceBean说起,我们利用idea的工具查看ServiceBean的继承体系图。

里面有许多与spring相关的东西,其中重要的关注点是InitializeBean和ApplicationListener。我们会遇到这样的配置

1
2
<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService" delay="5000"/>
<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService" delay="-1"/>

当我们设置为delay=“5000”时,意味着设置延时暴露,这个时候就是通过回调InitializingBean接口的afterPropertySet()方法进行服务发布的。

1
2
3
4
5
6
7
8
9
10
11
if (!isDelay()) {
export();
}//这里的isDelay()会返回false
private boolean isDelay() {
Integer delay = getDelay();
ProviderConfig provider = getProvider();
if (delay == null && provider != null) {
delay = provider.getDelay();
}
return supportedApplicationListener && (delay == null || delay == -1);
}

当我们设置为delay=“-1”或者不设置时,意味着延迟到 Spring 初始化完成后, 再暴露服务,这个时候会通知实现了ApplicationListener的接口进行回调onApplicationEvent方法进行服务发布。

1
2
3
4
5
6
7
8
9
public void onApplicationEvent(ContextRefreshedEvent event) {
//条件是没有设置delay且没有暴露过
if (isDelay() && !isExported() && !isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
export();
}
}

重要的方法出现了,就是ServiceBean继承ServiceConfig的export()。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//版本是2.6
public synchronized void export() {
if (provider != null) {
if (export == null) {
export = provider.getExport();//配置export, 默认是true暴露
}
if (delay == null) {
delay = provider.getDelay();
}
}
if (export != null && !export) {
return;
}
if (delay != null && delay > 0) {
delayExportExecutor.schedule(new Runnable() {
public void run() {
doExport();
}//delay = "5000"的逻辑
}, delay, TimeUnit.MILLISECONDS);
} else {
doExport(); //delay = "-1"或者没有配置逻辑
}
}

接着就是doExport()方法,前面是做了一系列的检查,provider,application,module,registries,monitor这些参数是否为空,是否是GenericService类型的服务,检查要注册的bean的引用和方法等。在方法的最后会调用doExportUrls方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected synchronized void doExport() {
......
......
doExportUrls();
ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);
ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
}
private void doExportUrls() {
//加载所有的注册中心,服务有可能注册在多个注册中心,因为我们暴露服务需要注册到注册中心中去。
List<URL> registryURLs = loadRegistries(true);
//不同协议的注册。dubbo/hessian/rmi.....
for (ProtocolConfig protocolConfig : protocols) {
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
String name = protocolConfig.getName();
if (name == null || name.length() == 0) {
name = "dubbo"; //没有配置默认为dubbo
}
Map<String, String> map = new HashMap<String, String>();
map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());//我的是2.6
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, provider, Constants.DEFAULT_KEY);
appendParameters(map, protocolConfig);
appendParameters(map, this);
//暴露服务指定了服务的方法,retry次数逻辑处理
if (methods != null && methods.size() > 0) {
for (MethodConfig method : methods) {
appendParameters(map, method, method.getName());
String retryKey = method.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if ("false".equals(retryValue)) {
map.put(method.getName() + ".retries", "0");
}
}
List<ArgumentConfig> arguments = method.getArguments();
//方法的参数设置
if (arguments != null && arguments.size() > 0) {
for (ArgumentConfig argument : arguments) {
// convert argument type
if (argument.getType() != null && argument.getType().length() > 0) {
Method[] methods = interfaceClass.getMethods();
// visit all methods
if (methods != null && methods.length > 0) {
for (int i = 0; i < methods.length; i++) {
String methodName = methods[i].getName();
// target the method, and get its signature
if (methodName.equals(method.getName())) {
Class<?>[] argtypes = methods[i].getParameterTypes();
// one callback in the method
if (argument.getIndex() != -1) {
if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
} else {
// multiple callbacks in the method
for (int j = 0; j < argtypes.length; j++) {
Class<?> argclazz = argtypes[j];
if (argclazz.getName().equals(argument.getType())) {
appendParameters(map, argument, method.getName() + "." + j);
if (argument.getIndex() != -1 && argument.getIndex() != j) {
throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
}
}
}
}
}
}
} else if (argument.getIndex() != -1) {
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
}
}
}
} // end of methods for
}
//如果是泛化实现,generic=true,method=*表示任意方法
if (ProtocolUtils.isGeneric(generic)) {
map.put("generic", generic);
map.put("methods", Constants.ANY_VALUE);
} else {
//方法的版本信息
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put("revision", revision);
}
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("NO method found in service interface " + interfaceClass.getName());
map.put("methods", Constants.ANY_VALUE);
} else {
map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
//令牌验证
if (!ConfigUtils.isEmpty(token)) {
if (ConfigUtils.isDefault(token)) {
map.put("token", UUID.randomUUID().toString());
} else {
map.put("token", token);
}
}
// injvm不需要暴露服务,标注notify=false
if ("injvm".equals(protocolConfig.getName())) {
protocolConfig.setRegister(false);
map.put("notify", "false");
}
// export service
String contextPath = protocolConfig.getContextpath();
if ((contextPath == null || contextPath.length() == 0) && provider != null) {
contextPath = provider.getContextpath();
}
// 根据参数创建url对象
String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = this.findConfigedPorts(protocolConfig, name, map);
URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
// 如果url使用的协议存在扩展,调用对应的扩展来修改原url。目前扩展有override,absent
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
//如果我们没有配置scope ,scope为null,此时同时进行本地暴露和远程暴露
String scope = url.getParameter(Constants.SCOPE_KEY);
//配置为none不暴露
if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
//配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务)
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
exportLocal(url); //本地暴露
}
//如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露本地服务)
if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if (registryURLs != null && registryURLs.size() > 0) {
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
}
}
this.urls.add(url);
}

总结整个服务暴露的流程:

首先会检查各种配置信息,填充各种属性,总之就是保证我在开始暴露服务之前,所有的东西都准备好了,并且是正确的。doExport()

加载所有的注册中心,因为我们要暴露的服务需要注册到注册中心中去。doExportUrls()

根据配置的协议和注册中心url分别进行导出。doExportUrlsFor1Protocol()

根据配置筛入一些信息,重点在scope, 如果没有配置scope,即scope =null 时候,同时进行本地和远程暴露;如果scope = “none” 则不进行暴露;如果scope=”remote”只进行远程暴露;如果scope=”local”只进行本地暴露;

后面详细讲解不同暴露的流程。

本地暴露

远程暴露

netty的启动

zoopkeeper连接,注册与监听

Luckylau wechat
如果对您有价值,看官可以打赏的!