dubbo 服务消费原理分析之服务目录
文章目录
- 前言
- 一、RegistryDirectory
- 1、DynamicDirectory
- 2、RegistryProtocol.doCreateInvoker
- 2、RegistryProtocol.subscribe
- 3、ListenerRegistryWrapper.subscribe
- 4、FailbackRegistry.subscribe
- 5、ZookeeperRegistry.doSubscribe
- 6、RegistryDirectory.notify
- 7、RegistryDirectory.refreshOverrideAndInvoker
- 8、RegistryDirectory.toInvokers
- 二、Protocol
- 1、DubboProtocol.refer
- 2、DubboProtocol.getClients
- 3、NettyTransporter.connect
- 4、NettyClient
前言
文章基于3.1.0版本进行分析
<dependency><groupId>org.apache.dubbo</groupId><artifactId>dubbo</artifactId><version>3.1.0</version></dependency>
一、RegistryDirectory
RegistryDirectory是Dubbo中的服务目录,服务目录中存储了一些和服务提供者有关的信息。通过服务目录,服务消费者可获取到服务提供者的信息,比如 ip、端口、服务协议等。
通过这些信息,服务消费者就可通过 Netty 等客户端进行远程调用。
创建RegistryDirectory会出初始化父类DynamicDirectory ,初始化接口、分组、失败策略等,从DynamicDirectory 开始分析
1、DynamicDirectory
创建服务目录与初始化的一些信息,比如接口、分组、失败策略
public DynamicDirectory(Class<T> serviceType, URL url) {super(url, true);ModuleModel moduleModel = url.getOrDefaultModuleModel();// 容错适配器,Cluster$Adaptive 默认的容错机制是失效转移 failoverthis.cluster = moduleModel.getExtensionLoader(Cluster.class).getAdaptiveExtension();// 路由工厂适配器RouterFactory$Adaptivethis.routerFactory = moduleModel.getExtensionLoader(RouterFactory.class).getAdaptiveExtension();if (serviceType == null) {throw new IllegalArgumentException("service type is null.");}if (StringUtils.isEmpty(url.getServiceKey())) {throw new IllegalArgumentException("registry serviceKey is null.");}this.shouldRegister = !ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true);this.shouldSimplified = url.getParameter(SIMPLIFIED_KEY, false);// 接口类型 org.sjl.dubbo.AsyncProviderthis.serviceType = serviceType;// 接口key org.sjl.dubbo.AsyncProviderthis.serviceKey = super.getConsumerUrl().getServiceKey();this.directoryUrl = consumerUrl;String group = directoryUrl.getGroup("");this.multiGroup = group != null && (ANY_VALUE.equals(group) || group.contains(","));// 默认快速失败this.shouldFailFast = Boolean.parseBoolean(ConfigurationUtils.getProperty(moduleModel, Constants.SHOULD_FAIL_FAST_KEY, "true"));}
2、RegistryProtocol.doCreateInvoker
ClusterInvoker的创建过程
protected <T> ClusterInvoker<T> doCreateInvoker(DynamicDirectory<T> directory, Cluster cluster, Registry registry, Class<T> type) {// 初始化服务目录的注册中心和协议directory.setRegistry(registry);directory.setProtocol(protocol);// all attributes of REFER_KEYMap<String, String> parameters = new HashMap<>(directory.getConsumerUrl().getParameters());// urlToRegistry = consumer://192.168.0.101/org.sjl.dubbo.AsyncProvider?application=dubbo-springboot-start-consumer&background=false&dubbo=2.0.2// &interface=org.sjl.dubbo.AsyncProvider&methods=sayHiAsync,sayHello,sayHelloAsync&pid=15072&qos.enable=false&release=3.1.0// &side=consumer&sticky=false&timeout=30000×tamp=1725462541548URL urlToRegistry = new ServiceConfigURL(parameters.get(PROTOCOL_KEY) == null ? CONSUMER : parameters.get(PROTOCOL_KEY),parameters.remove(REGISTER_IP_KEY),0,getPath(parameters, type),parameters);urlToRegistry = urlToRegistry.setScopeModel(directory.getConsumerUrl().getScopeModel());urlToRegistry = urlToRegistry.setServiceModel(directory.getConsumerUrl().getServiceModel());if (directory.isShouldRegister()) {directory.setRegisteredConsumerUrl(urlToRegistry);// 消费者配置到注册中心registry.register(directory.getRegisteredConsumerUrl());}// 路由调用链directory.buildRouterChain(urlToRegistry);// 服务发现并订阅的逻辑 核心逻辑directory.subscribe(toSubscribeUrl(urlToRegistry));return (ClusterInvoker<T>) cluster.join(directory, true);}
2、RegistryProtocol.subscribe
public void subscribe(URL url) {setSubscribeUrl(url);registry.subscribe(url, this);}
3、ListenerRegistryWrapper.subscribe
进行包装,对订阅完成后的后置处理
public void subscribe(URL url, NotifyListener listener) {try {if (registry != null) {registry.subscribe(url, listener);}} finally {if (CollectionUtils.isNotEmpty(listeners)) {RuntimeException exception = null;for (RegistryServiceListener registryListener : listeners) {if (registryListener != null) {try {registryListener.onSubscribe(url, registry);} catch (RuntimeException t) {logger.error(t.getMessage(), t);exception = t;}}}if (exception != null) {throw exception;}}}}
4、FailbackRegistry.subscribe
failbackRegistry这个类,从名字就可以看出,它的主要作用就是实现具有故障恢复功能的服务订阅机制,简单来说就是如果在订阅服务注册中心时出现异常,会触发重试机制。
public void subscribe(URL url, NotifyListener listener) {super.subscribe(url, listener);// 移除失效的监听removeFailedSubscribed(url, listener);try {// Sending a subscription request to the server side// 发送订阅到服务端doSubscribe(url, listener);} catch (Exception e) {Throwable t = e;// *** 异常处理// Record a failed registration request to a failed list, retry regularlyaddFailedSubscribed(url, listener);}}
5、ZookeeperRegistry.doSubscribe
所有Service层发起订阅,类似于监控中心发起的订阅
指定的Service层发起的订阅,服务消费者的订阅
@Overridepublic void doSubscribe(final URL url, final NotifyListener listener) {try {checkDestroyed();// 针对通用服务接口if (ANY_VALUE.equals(url.getServiceInterface())) {// ***}// 针对指定服务,正常走到这里else {CountDownLatch latch = new CountDownLatch(1);try {List<URL> urls = new ArrayList<>();// ["/dubbo/org.sjl.dubbo.GreetingsProvider/providers", ".../configurators", ".../routers"]// 需要订阅以上三个节点下数据变化for (String path : toCategoriesPath(url)) {// 构建一个listeners集合,其中key是NotifyListener,// value表示针对这个RegistryDirectory注册的子节点监听。ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());ChildListener zkListener = listeners.computeIfAbsent(listener, k -> new RegistryChildListenerImpl(url, k, latch));if (zkListener instanceof RegistryChildListenerImpl) {((RegistryChildListenerImpl) zkListener).setLatch(latch);}// 创建非临时节点,不存在时候会创建 // /dubbo/org.sjl.dubbo.GreetingsProvider/providers// /dubbo/org.sjl.dubbo.GreetingsProvider/configurators// /dubbo/org.sjl.dubbo.GreetingsProvider/routerszkClient.create(path, false);// 服务目录创建完毕之后创建一个监听器用来监听子目录,同时要返回一个path目录的子子节点,// 比如providers下面的提供者节点列表,如果有多个可以返回多个List<String> children = zkClient.addChildListener(path, zkListener);if (children != null) {urls.addAll(toUrlsWithEmpty(url, path, children));}}// 调用notify方法触发回调通知notify(url, listener, urls);} finally {// tells the listener to run only after the sync notification of main thread finishes.latch.countDown();}}} catch (Throwable e) {throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);}}
children里面值实例,包括ip、port、接口等参数
dubbo://192.168.0.101:20880/org.sjl.dubbo.GreetingsProvider?anyhost=true
&application=dubbo-springboot-start-provider
&background=false
&deprecated=false
&dubbo=2.0.2
&dynamic=true
&generic=false
&interface=org.sjl.dubbo.GreetingsProvider
&methods=sayHello,sayHi
&pid=30316&release=3.1.0
&service-name-mapping=true
&side=provider&timeout=30000
×tamp=1725293330893
notify回调通知,链路结构为
FailbackRegistry -> AbstractRegistry -> RegistryDirectory
6、RegistryDirectory.notify
服务地址发生变化和更新时,调用Directory.notify来更新,意味着更新后的信息,会同步到Directory
public synchronized void notify(List<URL> urls) {if (isDestroyed()) {return;}// 过滤不需要的数据Map<String, List<URL>> categoryUrls = urls.stream().filter(Objects::nonNull).filter(this::isValidCategory).filter(this::isNotCompatibleFor26x).collect(Collectors.groupingBy(this::judgeCategory));// 假设当前进来的通知是 providers节点// 判断configurator是否为空,这个节点下的配置,是在dubbo-admin控制台上修改配置时,会先创建一个配置节点到这个路径下,// 注册中心收到这个变化时会通知服务消费者,服务消费者会根据新的配置重新构建InvokerList<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);// 判断路由规则配置是否为空,如果不为空,同样将路由规则添加到url中。List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());toRouters(routerURLs).ifPresent(this::addRouters);// providers// 得到服务提供者的地址列表List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());// 3.x added for extend URL addressExtensionLoader<AddressListener> addressListenerExtensionLoader = getUrl().getOrDefaultModuleModel().getExtensionLoader(AddressListener.class);List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);if (supportedListeners != null && !supportedListeners.isEmpty()) {for (AddressListener addressListener : supportedListeners) {providerURLs = addressListener.notify(providerURLs, getConsumerUrl(), this);}}// 刷新并覆盖注册中心里面的配置、url、Invoker,refreshOverrideAndInvoker(providerURLs);}
7、RegistryDirectory.refreshOverrideAndInvoker
路由调用refreshInvoker方法
当注册中心的服务地址发生变化时,会触发更新。而更新之后并不是直接把url地址存储到内存,而是把
url转化为invoker进行存储,这个invoker是作为通信的调用器来构建的领域对象,所以如果地址发生变
化,那么需要把老的invoker销毁,然后用心的invoker替代。
private synchronized void refreshOverrideAndInvoker(List<URL> urls) {// mock zookeeper://xxx?mock=return nullrefreshInvoker(urls);}private void refreshInvoker(List<URL> invokerUrls) {Assert.notNull(invokerUrls, "invokerUrls should not be null");// 如果只有一个服务提供者,并且如果是空协议,那么这个时候直接返回进制访问,并且销毁所有的invokersif (invokerUrls.size() == 1&& invokerUrls.get(0) != null&& EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {this.forbidden = true; // Forbid to accessrouterChain.setInvokers(BitList.emptyList());destroyAllInvokers(); // Close all invokers} else {this.forbidden = false; // Allow to access// 第一次初始化的时候,invokerUrls为空。if (invokerUrls == Collections.<URL>emptyList()) {invokerUrls = new ArrayList<>();}// use local reference to avoid NPE as this.cachedInvokerUrls will be set null by destroyAllInvokers().// 获取invoker缓存Set<URL> localCachedInvokerUrls = this.cachedInvokerUrls;if (invokerUrls.isEmpty() && localCachedInvokerUrls != null) {// 1-4 Empty address.logger.warn("1-4", "configuration ", "","Service" + serviceKey + " received empty address list with no EMPTY protocol set, trigger empty protection.");invokerUrls.addAll(localCachedInvokerUrls);} else {localCachedInvokerUrls = new HashSet<>();localCachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparisonthis.cachedInvokerUrls = localCachedInvokerUrls;}if (invokerUrls.isEmpty()) {return;}// use local reference to avoid NPE as this.urlInvokerMap will be set null concurrently at destroyAllInvokers().Map<URL, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap;// can't use local reference as oldUrlInvokerMap's mappings might be removed directly at toInvokers().Map<URL, Invoker<T>> oldUrlInvokerMap = null;if (localUrlInvokerMap != null) {// the initial capacity should be set greater than the maximum number of entries divided by the load factor to avoid resizing.oldUrlInvokerMap = new LinkedHashMap<>(Math.round(1 + localUrlInvokerMap.size() / DEFAULT_HASHMAP_LOAD_FACTOR));localUrlInvokerMap.forEach(oldUrlInvokerMap::put);}// 将URL转换为Invoker 这里会做一些协议的指定过滤操作Map<URL, Invoker<T>> newUrlInvokerMap = toInvokers(oldUrlInvokerMap, invokerUrls);// Translate url list to Invoker map/** If the calculation is wrong, it is not processed.** 1. The protocol configured by the client is inconsistent with the protocol of the server.* eg: consumer protocol = dubbo, provider only has other protocol services(rest).* 2. The registration center is not robust and pushes illegal specification data.**/if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {// 3-1 - Failed to convert the URL address into Invokers.logger.error("3-1", "inconsistency between the client protocol and the protocol of the server","", "urls to invokers error",new IllegalStateException("urls to invokers error. invokerUrls.size :" +invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));return;}List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));this.setInvokers(multiGroup ? new BitList<>(toMergeInvokerList(newInvokers)) : new BitList<>(newInvokers));// pre-route and build cacherouterChain.setInvokers(this.getInvokers());this.urlInvokerMap = newUrlInvokerMap;try {// 销毁不使用或者已经被删除的 Invoker。例如某个提供者的代码删除了destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker} catch (Exception e) {logger.warn("destroyUnusedInvokers error. ", e);}// notify invokers refreshedthis.invokersChanged();}}
将invokerURL列表转换为调用器映射。转换规则如下:
- 如果URL已转换为invoker,它将不再被重新引用并直接从缓存中获取,请注意,URL中的任何参数更改都将被重新引用。
- 如果传入调用器列表不为空,则表示它是最新的调用器列表。
- 如果传入invokerUrl的列表为空,则意味着该规则只是一个覆盖规则或路由规则,需要重新对比以决定是否重新引用。
8、RegistryDirectory.toInvokers
协议过滤,配置合并都会在这里面去做。url转换为调用程序,如果url已被引用,则不会重新引用。将放入newUrlInvokeMap的项将从OldUrlInvoceMap中删除。
private Map<URL, Invoker<T>> toInvokers(Map<URL, Invoker<T>> oldUrlInvokerMap, List<URL> urls) {Map<URL, Invoker<T>> newUrlInvokerMap = new ConcurrentHashMap<>(urls == null ? 1 : (int) (urls.size() / 0.75f + 1));if (urls == null || urls.isEmpty()) {return newUrlInvokerMap;}// 获取指定的协议String queryProtocols = this.queryMap.get(PROTOCOL_KEY);// 遍历所有提供者列表进行转换for (URL providerUrl : urls) {// If protocol is configured at the reference side, only the matching protocol is selectedif (queryProtocols != null && queryProtocols.length() > 0) {boolean accept = false;String[] acceptProtocols = queryProtocols.split(",");for (String acceptProtocol : acceptProtocols) {if (providerUrl.getProtocol().equals(acceptProtocol)) {accept = true;break;}}if (!accept) {continue;}}// 空协议 直接跳过if (EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {continue;}// 查询扩展是否存在,这个要注意如果是自定义扩展或者像webservice这样的扩展一般需要额外引入扩展包才可以if (!getUrl().getOrDefaultFrameworkModel().getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {// 4-1 - Unsupported protocollogger.error("4-1", "typo in URL", "", "Unsupported protocol.",new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() +" in notified url: " + providerUrl + " from registry " + getUrl().getAddress() +" to consumer " + NetUtils.getLocalHost() + ", supported protocol: " +getUrl().getOrDefaultFrameworkModel().getExtensionLoader(Protocol.class).getSupportedExtensions()));continue;}// 合并url参数。顺序是:覆盖协议配置(比如禁用服务有时候就这样做)> -D(JVM参数) > 消费者 > 提供者URL url = mergeUrl(providerUrl);// Cache key is url that does not merge with consumer side parameters,// regardless of how the consumer combines parameters,// if the server url changes, then refer againInvoker<T> invoker = oldUrlInvokerMap == null ? null : oldUrlInvokerMap.remove(url);if (invoker == null) { // Not in the cache, refer againtry {boolean enabled = true;if (url.hasParameter(DISABLED_KEY)) {enabled = !url.getParameter(DISABLED_KEY, false);} else {enabled = url.getParameter(ENABLED_KEY, true);}if (enabled) {// 消费者引用服务提供者invoker = protocol.refer(serviceType, url);}} catch (Throwable t) {// Thrown by AbstractProtocol.optimizeSerialization()if (t instanceof RpcException && t.getMessage().contains("serialization optimizer")) {// 4-2 - serialization optimizer class initialization failed.logger.error("4-2", "typo in optimizer class", "","Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);} else {// 4-3 - Failed to refer invoker by other reason.logger.error("4-3", "", "","Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);}}if (invoker != null) { // Put new invoker in cachenewUrlInvokerMap.put(url, invoker);}} else {newUrlInvokerMap.put(url, invoker);}}return newUrlInvokerMap;}
到这里我们可以看到核心功能已经展示出来了
- 指定注册中心的三个路径的子节点变更事件
- 触发时间变更之后,把变更的节点信息转化为Invoker
这时候得到的Invoker是最新的服务提供者地址,远程访问得到我们实际的结果
二、Protocol
根据协议创建通信连接
调用链
QosProtocolWrapper -> ProtocolFilterWrapper -> ProtocolListenerWrapper -> DubboProtocol
1、DubboProtocol.refer
@Overridepublic <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {checkDestroyed();return protocolBindingRefer(type, url);}@Overridepublic <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {checkDestroyed();optimizeSerialization(url);// create rpc invoker.// 创建通信client,封装成DubboInvokerDubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);invokers.add(invoker);return invoker;}
2、DubboProtocol.getClients
创建通信客户端
- 判断是否为共享连接,默认是共享同一个连接进行通信
- 是否配置了多个连接通道 connections,默认只有一个共享连接
private ExchangeClient[] getClients(URL url) {int connections = url.getParameter(CONNECTIONS_KEY, 0);// whether to share connection// if not configured, connection is shared, otherwise, one connection for one service// 如果没有配置的情况下,默认是采用共享连接,否则,就是针对一个服务提供一个连接。// 共享长连接,多路复用if (connections == 0) {/** The xml configuration should have a higher priority than properties.*/String shareConnectionsStr = StringUtils.isBlank(url.getParameter(SHARE_CONNECTIONS_KEY, (String) null))? ConfigurationUtils.getProperty(url.getOrDefaultApplicationModel(), SHARE_CONNECTIONS_KEY, DEFAULT_SHARE_CONNECTIONS): url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);connections = Integer.parseInt(shareConnectionsStr);// 返回共享连接List<ReferenceCountExchangeClient> shareClients = getSharedClient(url, connections);ExchangeClient[] clients = new ExchangeClient[connections];Arrays.setAll(clients, shareClients::get);return clients;}// 如果不是使用共享连接,则初始化一个新的客户端连接进行返回ExchangeClient[] clients = new ExchangeClient[connections];for (int i = 0; i < clients.length; i++) {clients[i] = initClient(url);}return clients;}
最后创建的共享客户端是netty客户端
org.apache.dubbo.remoting.transport.netty4.NettyTransporter,外层由HeaderExchanger包装
3、NettyTransporter.connect
@Overridepublic Client connect(URL url, ChannelHandler handler) throws RemotingException {return new NettyClient(url, handler);}
4、NettyClient
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.// the handler will be wrapped: MultiMessageHandler->HeartbeatHandler->handlersuper(url, wrapChannelHandler(url, handler));}public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {// 初始化父类构造器,指定编码、超时时间等参数super(url, handler);// set default needReconnect true when channel is not connectedneedReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, true);// 初始化线程池 默认为FixedThreadPoolinitExecutor(url);try {// 启动netty的核心代码初始化Bootstrap// 默认走的是NioSocketChannel// 初始化默认连接超时时间为3秒doOpen();} catch (Throwable t) {close();throw new RemotingException(url.toInetSocketAddress(), null,"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);}try {// connect.// 前面是初始化netty的客户端启动类Bootstrap 这里是执行连接的代码:bootstrap.connect(getConnectAddress());// 等待3秒连接失败则抛出异常connect();if (logger.isInfoEnabled()) {logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());}} catch (RemotingException t) {// ** 异常处理} catch (Throwable t) {// ** 异常处理}}
RegistryProtocol.refer过程中有一个关键步骤,即在监听到服务提供者url时触发RegistryDirectory.notify()方法。
RegistryDirectory.notify()方法调用 refreshInvoker()方法将服务提供者urls转换为对应的远程invoker,最终调用到 DubboProtocol.refer()方法生成对应的 DubboInvoker。
DubboInvoker的构造方法中有一项入参 ExchangeClient[] clients,对应网络客户端 Client,通过调用 client.request()方法完成网络通信的请求发送和响应接收功能。