当前位置: 首页 > news >正文

【源码】Sharding-JDBC源码分析之ShardingSphereDataSource的创建原理

Sharding-JDBC系列

1、Sharding-JDBC分库分表的基本使用

2、Sharding-JDBC分库分表之SpringBoot分片策略

3、Sharding-JDBC分库分表之SpringBoot主从配置

4、SpringBoot集成Sharding-JDBC-5.3.0分库分表

5、SpringBoot集成Sharding-JDBC-5.3.0实现按月动态建表分表

6、【源码】Sharding-JDBC源码分析之JDBC

7、【源码】Sharding-JDBC源码分析之SPI机制

8、【源码】Sharding-JDBC源码分析之Yaml分片配置文件解析原理

9、【源码】Sharding-JDBC源码分析之Yaml分片配置原理(一)

10、【源码】Sharding-JDBC源码分析之Yaml分片配置原理(二)

11、【源码】Sharding-JDBC源码分析之Yaml分片配置转换原理

12、【源码】Sharding-JDBC源码分析之ShardingSphereDataSource的创建原理

前言

在【源码】Sharding-JDBC源码分析之JDBC-CSDN博客中分析了ShardingSphere框架中自定义ShardingSphereDataSource实现了JDBC中的javax.sql.DataSource,本文从源码角度分析ShardingSphereDataSource的实现。

ShardingSphereDataSource创建准备回顾

ShardingSphere框架创建ShardingSphereDataSource之前,主要执行如下:

1)解析分片配置yaml文件,解析后的对象为YamlRootConfiguration;

2)从YamlRootConfiguration中获取配置的dataSources,根据配置的dataSourceClassName,使用反射机制,创建对应的dataSource,并进行赋值。保存在Map<String, DataSource>中;

3)从YamlRootConfiguration中获取配置的rules、mode,通过对应的Swapper转换器,分别转换为Collection<RuleConfiguration>和ModeConfiguration;

以上代码的实现详见《Sharding-JDBC系列》的前几篇源码介绍。源码入口:

package org.apache.shardingsphere.driver.api.yaml;@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class YamlShardingSphereDataSourceFactory {/*** 创建ShardingSphereDataSource*/private static DataSource createDataSource(final Map<String, DataSource> dataSourceMap, final YamlRootConfiguration rootConfig) throws SQLException {// mode配置转换ModeConfiguration modeConfig = null == rootConfig.getMode() ? null : new YamlModeConfigurationSwapper().swapToObject(rootConfig.getMode());// rules配置转换Collection<RuleConfiguration> ruleConfigs = SWAPPER_ENGINE.swapToRuleConfigurations(rootConfig.getRules());// 创建ShardingSphereDataSourcereturn ShardingSphereDataSourceFactory.createDataSource(rootConfig.getDatabaseName(), modeConfig, dataSourceMap, ruleConfigs, rootConfig.getProps());}
}

最后调用ShardingSphereDataSourceFactory的createDataSource()方法,创建DataSource。源码如下:

package org.apache.shardingsphere.driver.api;@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ShardingSphereDataSourceFactory {/*** 创建DataSource,返回ShardingSphereDataSource对象*/public static DataSource createDataSource(final String databaseName, final ModeConfiguration modeConfig,final Map<String, DataSource> dataSourceMap, final Collection<RuleConfiguration> configs, final Properties props) throws SQLException {return new ShardingSphereDataSource(getDatabaseName(databaseName), modeConfig, dataSourceMap, null == configs ? new LinkedList<>() : configs, props);}/*** 获取数据库名称,默认为DefaultDatabase.LOGIC_NAME,为字符串"logic_db"*/private static String getDatabaseName(final String databaseName) {return Strings.isNullOrEmpty(databaseName) ? DefaultDatabase.LOGIC_NAME : databaseName;}
}

ShardingSphereDataSource

ShardingSphereDataSource的源码如下:

package org.apache.shardingsphere.driver.jdbc.core.datasource;/*** ShardingSphere的数据源*/
public final class ShardingSphereDataSource extends AbstractDataSourceAdapter implements AutoCloseable {// 数据库名称private final String databaseName;// 上下文管理,包括元数据上下文、实例上下文、执行引擎private final ContextManager contextManager;// JDBC上下文。从DataSource中获取一个Connection,获取数据库信息封装成CachedDatabaseMetaData。如url、username、最大连接数、是否支持分组等private final JDBCContext jdbcContext;public ShardingSphereDataSource(final String databaseName, final ModeConfiguration modeConfig) throws SQLException {this.databaseName = databaseName;contextManager = createContextManager(databaseName, modeConfig, new HashMap<>(), new LinkedList<>(), new Properties());jdbcContext = new JDBCContext(contextManager.getDataSourceMap(databaseName));}/**** @param databaseName 默认为logic_db* @param modeConfig 模式,单例或集群部署* @param dataSourceMap 数据源Map,key为定义的逻辑数据源名称、value为HikariDataSource等* @param ruleConfigs 规则配置* @param props props配置*/public ShardingSphereDataSource(final String databaseName, final ModeConfiguration modeConfig, final Map<String, DataSource> dataSourceMap,final Collection<RuleConfiguration> ruleConfigs, final Properties props) throws SQLException {this.databaseName = databaseName;contextManager = createContextManager(databaseName, modeConfig, dataSourceMap, ruleConfigs, null == props ? new Properties() : props);jdbcContext = new JDBCContext(contextManager.getDataSourceMap(databaseName));}/*** 创建上下文*/private ContextManager createContextManager(final String databaseName, final ModeConfiguration modeConfig, final Map<String, DataSource> dataSourceMap,final Collection<RuleConfiguration> ruleConfigs, final Properties props) throws SQLException {// 返回JDBCInstanceMetaData对象。JDBC实例元数据。记录id、ip和版本InstanceMetaData instanceMetaData = TypedSPIRegistry.getRegisteredService(InstanceMetaDataBuilder.class, "JDBC").build(-1);// 全局的规则配置Collection<RuleConfiguration> globalRuleConfigs = ruleConfigs.stream().filter(each -> each instanceof GlobalRuleConfiguration).collect(Collectors.toList());// 数据源规则配置信息,从ruleConfigs中拷贝一份Collection<RuleConfiguration> databaseRuleConfigs = new LinkedList<>(ruleConfigs);databaseRuleConfigs.removeAll(globalRuleConfigs);// 记录 ContextManagerBuilder的参数信息ContextManagerBuilderParameter param = new ContextManagerBuilderParameter(modeConfig, Collections.singletonMap(databaseName,new DataSourceProvidedDatabaseConfiguration(dataSourceMap, databaseRuleConfigs)), globalRuleConfigs, props, Collections.emptyList(), instanceMetaData, false);// 根据配置的mode,使用SPI,创建ContextManagerBuilderContextManagerBuilder contextManagerBuilder = null == modeConfig// 通过类,从SPI(基于Java的SPI机制)中获取对应实现类,且执行init()方法? RequiredSPIRegistry.getRegisteredService(ContextManagerBuilder.class)// 通过类,从SPI(基于Java的SPI机制)中获取对应实现类,且类型名称为配置的mode。如集群为Cluster,单机为Standalone: TypedSPIRegistry.getRegisteredService(ContextManagerBuilder.class, modeConfig.getType());return contextManagerBuilder.build(param);}/*** 默认返回一个ShardingSphereConnection对象* @return*/@Overridepublic Connection getConnection() {return DriverStateContext.getConnection(databaseName, contextManager, jdbcContext);}@Overridepublic Connection getConnection(final String username, final String password) {return getConnection();}/*** 关闭dataSource*/public void close(final Collection<String> dataSourceNames) throws Exception {Map<String, DataSource> dataSourceMap = contextManager.getDataSourceMap(databaseName);for (String each : dataSourceNames) {close(dataSourceMap.get(each));}contextManager.close();}private void close(final DataSource dataSource) throws Exception {if (dataSource instanceof AutoCloseable) {((AutoCloseable) dataSource).close();}}@Overridepublic void close() throws Exception {close(contextManager.getDataSourceMap(databaseName).keySet());}@Overridepublic int getLoginTimeout() throws SQLException {Map<String, DataSource> dataSourceMap = contextManager.getDataSourceMap(databaseName);return dataSourceMap.isEmpty() ? 0 : dataSourceMap.values().iterator().next().getLoginTimeout();}@Overridepublic void setLoginTimeout(final int seconds) throws SQLException {for (DataSource each : contextManager.getDataSourceMap(databaseName).values()) {each.setLoginTimeout(seconds);}}
}

3.1 构造方法

在ShardingSphereDataSource的构造方法主要执行如下:

3.1.1 保存databaseName

保存传入的databaseName,默认为DefaultDatabase.LOGIC_NAME,即logic_db。

3.1.2 创建ContextManager

调用createContextManager()创建一个ContextManager对象。

1)通过SPI获得类型为JDBC的InstanceMetaDataBuilder,即JDBCInstanceMetaDataBuilder,执行build()方法,获取一个InstanceMetaData对象。该对象记录id、ip和ShardingSphere的版本;

2)从rules规则中的Collection<RuleConfiguration>过滤出GlobalRuleConfiguration,即全局的rule规则,如SQL解析、事务等规则;

3)移除全局规则,余下的规则为数据源的配置;

4)创建DataSourceProvidedDatabaseConfiguration,保存数据源相关的配置信息;

5)创建ContextManagerBuilderParameter对象,记录 ContextManagerBuilder的参数信息;

6)根据配置的mode,使用SPI,创建ContextManagerBuilder;

如果没有配置mode,则执行RequiredSPIRegistry.getRegisteredService(ContextManagerBuilder.class)。因为StandaloneContextManagerBuilder的isDefault()为true,所以默认返回StandaloneContextManagerBuilder对象;

7)执行ContextManagerBuilder的build()创建ContextManager对象;

在build()方法中,主要执行如下:

a)根据配置的mode中的provider,持久化存储配置信息。在对应的provider中创建一张repository表,而后将配置信息,根据数据源、规则、属性等信息持久化到repository表中;

b)根据配置的数据源信息,创建ShardingSphereDatabase对象。该对象记录了配置的数据源的元数据信息。如数据源中创建的表、视图、表中的列、索引、约束等;

c)创建InstanceContext对象,该对象保存了当前系统的计算节点、集群节点、mode配置、锁上下文等信息;

d)根据配置的数据源信息,创建MetaDataContexts对象。该对象保存上面a和b的信息;

e)创建ContextManager对象。该对象保存了c和d的信息;

3.1.3 创建JDBCContext

创建一个JDBCContext对象,该对象从DataSource中获取一个Connection,获取数据库的DatabaseMetaData对象,将对象的信息获取出来,存放在CachedDatabaseMetaData中。如url、username、最大连接数、是否支持分组等。

JDBCContext的源码如下:

package org.apache.shardingsphere.driver.jdbc.context;/*** JDBC上下文。从DataSource中获取一个Connection,获取数据库信息封装成CachedDatabaseMetaData。如url、username、最大连接数、是否支持分组等*/
@Getter
public final class JDBCContext {// 存放Database的配置信息。如url、username、最大连接数、是否支持分组等private volatile CachedDatabaseMetaData cachedDatabaseMetaData;public JDBCContext(final Map<String, DataSource> dataSources) throws SQLException {cachedDatabaseMetaData = createCachedDatabaseMetaData(dataSources).orElse(null);}@Subscribepublic synchronized void refreshCachedDatabaseMetaData(final DataSourceChangedEvent event) throws SQLException {cachedDatabaseMetaData = createCachedDatabaseMetaData(DataSourcePoolCreator.create(event.getDataSourcePropertiesMap())).orElse(null);}/*** 从Datasource中获取一个Connection对象,只获取Connection的元数据,封装成CachedDatabaseMetaData*/private Optional<CachedDatabaseMetaData> createCachedDatabaseMetaData(final Map<String, DataSource> dataSources) throws SQLException {if (dataSources.isEmpty()) {return Optional.empty();}try (Connection connection = dataSources.values().iterator().next().getConnection()) {return Optional.of(new CachedDatabaseMetaData(connection.getMetaData()));}}
}

3.2 getConnection()

该方法执行DriverStateContext的getConnection()用于返回一个 java.sql.Connection 连接对象。

DriverStateContext的源码如下:

package org.apache.shardingsphere.driver.state;@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class DriverStateContext {/*** 通过SPI,获取DriverState状态的实现类(锁、熔断限流、OK三种状态)对应的DataSource,执行getConnection()获取连接*/public static Connection getConnection(final String databaseName, final ContextManager contextManager, final JDBCContext jdbcContext) {return TypedSPIRegistry.getRegisteredService(DriverState.class, contextManager.getInstanceContext().getInstance().getState().getCurrentState().name()).getConnection(databaseName, contextManager, jdbcContext);}
}

getConnection()方法执行如下:

1)根据当前实例的状态,通过SPI,获取DriverState的实现类;

a)实例的状态有三种,在StateType枚举类中定义,分别为OK、LOCK、CIRCUIT_BREAK,即正常状态、锁状态和断路器状态(流量治理,限流、熔断),默认为OK;

b)三种实例状态对应三个DriverState实现类,分别为OKDriverState、LockDriverState和CircuitBreakDriverState;

2)获取具体的DriverState实现类,执行DriverState.getConnection()获取一个 java.sql.Connection 连接对象,不同的DriverState实现处理方式不同;

3.2.1 OKDriverState

OKDriverState的源码如下:

package org.apache.shardingsphere.driver.state.ok;public final class OKDriverState implements DriverState {/*** 返回一个ShardingSphereConnection对象*/@Overridepublic Connection getConnection(final String databaseName, final ContextManager contextManager, final JDBCContext jdbcContext) {return new ShardingSphereConnection(databaseName, contextManager, jdbcContext);}@Overridepublic String getType() {return "OK";}
}

在getConnection()方法中,返回一个ShardingSphereConnection对象。

3.2.2 LockDriverState

LockDriverState的源码如下:

package org.apache.shardingsphere.driver.state.lock;public final class LockDriverState implements DriverState {@Overridepublic Connection getConnection(final String databaseName, final ContextManager contextManager, final JDBCContext jdbcContext) {throw new UnsupportedSQLOperationException("LockDriverState");}@Overridepublic String getType() {return "LOCK";}
}

暂时不支持LockDriverState。

3.2.3 CircuitBreakDriverState

CircuitBreakDriverState的源码如下:

package org.apache.shardingsphere.driver.state.circuit;public final class CircuitBreakDriverState implements DriverState {/*** 从CircuitBreakerDataSource对象中的getConnection()方法返回一个Connection连接*/@Overridepublic Connection getConnection(final String databaseName, final ContextManager contextManager, final JDBCContext jdbcContext) {return new CircuitBreakerDataSource().getConnection();}@Overridepublic String getType() {return "CIRCUIT_BREAK";}
}

在getConnection()方法中,新创建一个CircuitBreakerDataSource,从CircuitBreakerDataSource的getConnection()方法中返回一个Connection连接。

CircuitBreakerDataSource同ShardingSphereDataSource一样继承AbstractDataSourceAdapter,在getConnection()方法中返回CircuitBreakerConnection对象。此时系统处于熔断状态,不支持数据库的操作,所以CircuitBreakerConnection是一个空的Connection对象,创建的CircuitBreakerPreparedStatement不执行任何操作。

小结

限于篇幅,本篇先分享到这里,以下做一个小结:

1)程序启动时,执行ShardingSphereDriver的connect()方法;

2)解析分片配置yaml文件,解析后的对象为YamlRootConfiguration;

3)从YamlRootConfiguration中获取配置的dataSources,根据配置的dataSourceClassName,使用反射机制,创建对应的dataSource,并进行赋值。保存在Map<String, DataSource>中;

4)从YamlRootConfiguration中获取配置的rules、mode,通过对应的Swapper转换器,分别转换为Collection<RuleConfiguration>和ModeConfiguration;

5)根据配置的分片信息,创建ShardingSphereDataSource对象。该对象的两个核心方法为:

5.1)构造方法。在构造方法中,创建ContextManager、JDBCContext;

ContextManager存储了配置的数据源的元数据信息(如分片信息、数据源中的表、视图、表的列、索引等元数据)、实例上下文(如维护当前系统的计算节点、集群节点、元数据模型、元数据存储仓库、锁,及相关数据的维护,并提供计算节点状态修改);

JDBCContext从DataSource中获取一个Connection,获取数据库的DatabaseMetaData对象,将对象的信息获取出来,存放在CachedDatabaseMetaData中。如url、username、最大连接数、是否支持分组等;

5.2)getConnection()方法。该方法根据当前实例的StateType,从对应的DriverState中获取一个Connection连接对象;

a)实例的状态有三种,在StateType枚举类中定义,分别为OK、LOCK、CIRCUIT_BREAK,即正常状态、锁状态和断路器状态(流量治理,限流、熔断),默认为OK;

b)三种实例状态对应三个DriverState实现类,分别为OKDriverState、LockDriverState和CircuitBreakDriverState;

c)三个DriverState实现类的getConnection()方法分别返回ShardingSphereConnection、报异常、CircuitBreakConnection,其中CircuitBreakConnection为空的Connection对象,即不会执行真正的数据库操作;

以上为本篇分享的全部内容。

关于本篇内容你有什么自己的想法或独到见解,欢迎在评论区一起交流探讨下吧。


http://www.mrgr.cn/news/4707.html

相关文章:

  • Linux系统管理体系-服务管理
  • 探索RAG与Multi-Agent的结合:解决复杂任务的新方法
  • ArcGIS Pro基础:设置2个窗口同步联动界面
  • NVM安装管理node.js版本(简单易懂)
  • Java之文件操作和IO
  • 【网络编程】使用函数汇总总结
  • PythonStudio 控件使用常用方式(三十四)TTaskBar
  • day35
  • 远程供水无障碍,管线车助力全面消防防护_鼎跃安全
  • http 请求-02-Ajax XHR 的替代方案 fetch api 入门介绍
  • 供应商管理系统如何提升管理效能?
  • 企业网中网关的部署位置浅析
  • 计算机毕业设计选什么题目好? springboot 大学志愿填报系统
  • Java使用Easy Excel对Excel进行操作
  • Android:使用Gson常见问题(包含解决将Long型转化为科学计数法的问题)
  • 黑神话孙悟空:超燃视频混剪制作
  • 到底什么是虫情测报系统?
  • 前端性能优化的指标
  • 计算机毕业设计选题推荐-二手房价分析与预测-Python爬虫可视化-算法
  • 什么叫日志门面