Luckylau's Blog

微服务架构之Dubbo服务发布(2)

本地暴露

上一篇中微服务架构之Dubbo服务发布(1)我们提到本地暴露,是在exportLocal(URL url)中

传进来的url如图所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private void exportLocal(URL url) {
//如果协议不是injvm的我们就需要处理,例如传进来是dubbo,我们发现后面的逻辑就是将协议转化为injvm
if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
URL local = URL.valueOf(url.toFullString())
.setProtocol(Constants.LOCAL_PROTOCOL)
.setHost(LOCALHOST)
.setPort(0);//转化为本地url ,即injvm协议的
ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref));
Exporter<?> exporter = protocol.export(
proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);
logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
}
}

经过这段代码后

其中protocol.export的实现来自InjvmProtocol,在调用InjvmProtocol类的export方法之前会先调用ProtocolFilterWrapper类的export方法由JavassistProxyFactory提供的getInvoker方法返回的Invoker转化为Exporter。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//InjvmProtocol
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}
InjvmExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
super(invoker);
this.key = key;
this.exporterMap = exporterMap;
exporterMap.put(key, this);//这里有个缓存
}
//JavassistProxyFactory
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}

流程图为:

这就是本地暴露的过程,那么如何测试本地暴露呢?我们需要这么配置,但之前要保证服务提供方与服务消费方启动在同一个 JVM

服务消费方配置:

1
2
3
<dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService" scope="local" registry="N/A"/>
或者
<dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService" injvm="true" registry="N/A"/>

服务提供方配置:

1
<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService" registry="N/A" scope="local"/>

从消费方到提供方,消费方主要会涉及Dubbo的服务引用,提供方主要会涉及Dubbo的服务发布,看完这2大块内容,就会明白本地暴露的消费流程。

远程暴露

首先我们接着看代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
//注册中心的URL地址
if (registryURLs != null && registryURLs.size() > 0) {
for (URL registryURL : registryURLs) {
//将前面合成的url添加新的参数dynamic
url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
URL monitorUrl = loadMonitor(registryURL);
// 如果有monitor信息,则在url上增加monitor配置
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
//没有注册中心
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
}
}

经过上面代码后

其中protocol.export的实现来自RegistryProtocol,在调用RegisterProtocol类的export方法之前会先调用ProtocolFilterWrapper类的export方法, 将由JavassistProxyFactory提供的getInvoker方法返回的Invoker转化为Exporter。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
//RegistryProtocol
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//export invoker
//这里会开启netty,具体流程会单独说明
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
//获取注册中心的地址,并连接,注册
URL registryUrl = getRegistryUrl(originInvoker);
final Registry registry = getRegistry(originInvoker);
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
boolean register = registedProviderUrl.getParameter("register", true);
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl);
if (register) {
register(registryUrl, registedProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
// 订阅override数据 ,FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
//提供者向注册中心订阅注册服务的覆盖配置,当由此服务的覆盖配置注册进来,就推送消息给提供者,重新暴露服务
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//保证每次export都返回一个新的exporter实例
return new Exporter<T>() {
public Invoker<T> getInvoker() {
return exporter.getInvoker();
}
public void unexport() {
try {
exporter.unexport();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
registry.unregister(registedProviderUrl);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
overrideListeners.remove(overrideSubscribeUrl);
registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
};
}

流程图为:

整体总结

Luckylau wechat
如果对您有价值,看官可以打赏的!