【源码】Sharding-JDBC源码分析之ShardingSphereDataSource的创建原理
Sharding-JDBC系列
1、Sharding-JDBC分库分表的基本使用
2、Sharding-JDBC分库分表之SpringBoot分片策略
3、Sharding-JDBC分库分表之SpringBoot主从配置
4、SpringBoot集成Sharding-JDBC-5.3.0分库分表
5、SpringBoot集成Sharding-JDBC-5.3.0实现按月动态建表分表
6、【源码】Sharding-JDBC源码分析之JDBC
7、【源码】Sharding-JDBC源码分析之SPI机制
8、【源码】Sharding-JDBC源码分析之Yaml分片配置文件解析原理
9、【源码】Sharding-JDBC源码分析之Yaml分片配置原理(一)
10、【源码】Sharding-JDBC源码分析之Yaml分片配置原理(二)
11、【源码】Sharding-JDBC源码分析之Yaml分片配置转换原理
12、【源码】Sharding-JDBC源码分析之ShardingSphereDataSource的创建原理
前言
在【源码】Sharding-JDBC源码分析之JDBC-CSDN博客中分析了ShardingSphere框架中自定义ShardingSphereDataSource实现了JDBC中的javax.sql.DataSource,本文从源码角度分析ShardingSphereDataSource的实现。
ShardingSphereDataSource创建准备回顾
ShardingSphere框架创建ShardingSphereDataSource之前,主要执行如下:
1)解析分片配置yaml文件,解析后的对象为YamlRootConfiguration;
2)从YamlRootConfiguration中获取配置的dataSources,根据配置的dataSourceClassName,使用反射机制,创建对应的dataSource,并进行赋值。保存在Map<String, DataSource>中;
3)从YamlRootConfiguration中获取配置的rules、mode,通过对应的Swapper转换器,分别转换为Collection<RuleConfiguration>和ModeConfiguration;
以上代码的实现详见《Sharding-JDBC系列》的前几篇源码介绍。源码入口:
package org.apache.shardingsphere.driver.api.yaml;@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class YamlShardingSphereDataSourceFactory {/*** 创建ShardingSphereDataSource*/private static DataSource createDataSource(final Map<String, DataSource> dataSourceMap, final YamlRootConfiguration rootConfig) throws SQLException {// mode配置转换ModeConfiguration modeConfig = null == rootConfig.getMode() ? null : new YamlModeConfigurationSwapper().swapToObject(rootConfig.getMode());// rules配置转换Collection<RuleConfiguration> ruleConfigs = SWAPPER_ENGINE.swapToRuleConfigurations(rootConfig.getRules());// 创建ShardingSphereDataSourcereturn ShardingSphereDataSourceFactory.createDataSource(rootConfig.getDatabaseName(), modeConfig, dataSourceMap, ruleConfigs, rootConfig.getProps());}
}
最后调用ShardingSphereDataSourceFactory的createDataSource()方法,创建DataSource。源码如下:
package org.apache.shardingsphere.driver.api;@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ShardingSphereDataSourceFactory {/*** 创建DataSource,返回ShardingSphereDataSource对象*/public static DataSource createDataSource(final String databaseName, final ModeConfiguration modeConfig,final Map<String, DataSource> dataSourceMap, final Collection<RuleConfiguration> configs, final Properties props) throws SQLException {return new ShardingSphereDataSource(getDatabaseName(databaseName), modeConfig, dataSourceMap, null == configs ? new LinkedList<>() : configs, props);}/*** 获取数据库名称,默认为DefaultDatabase.LOGIC_NAME,为字符串"logic_db"*/private static String getDatabaseName(final String databaseName) {return Strings.isNullOrEmpty(databaseName) ? DefaultDatabase.LOGIC_NAME : databaseName;}
}
ShardingSphereDataSource
ShardingSphereDataSource的源码如下:
package org.apache.shardingsphere.driver.jdbc.core.datasource;/*** ShardingSphere的数据源*/
public final class ShardingSphereDataSource extends AbstractDataSourceAdapter implements AutoCloseable {// 数据库名称private final String databaseName;// 上下文管理,包括元数据上下文、实例上下文、执行引擎private final ContextManager contextManager;// JDBC上下文。从DataSource中获取一个Connection,获取数据库信息封装成CachedDatabaseMetaData。如url、username、最大连接数、是否支持分组等private final JDBCContext jdbcContext;public ShardingSphereDataSource(final String databaseName, final ModeConfiguration modeConfig) throws SQLException {this.databaseName = databaseName;contextManager = createContextManager(databaseName, modeConfig, new HashMap<>(), new LinkedList<>(), new Properties());jdbcContext = new JDBCContext(contextManager.getDataSourceMap(databaseName));}/**** @param databaseName 默认为logic_db* @param modeConfig 模式,单例或集群部署* @param dataSourceMap 数据源Map,key为定义的逻辑数据源名称、value为HikariDataSource等* @param ruleConfigs 规则配置* @param props props配置*/public ShardingSphereDataSource(final String databaseName, final ModeConfiguration modeConfig, final Map<String, DataSource> dataSourceMap,final Collection<RuleConfiguration> ruleConfigs, final Properties props) throws SQLException {this.databaseName = databaseName;contextManager = createContextManager(databaseName, modeConfig, dataSourceMap, ruleConfigs, null == props ? new Properties() : props);jdbcContext = new JDBCContext(contextManager.getDataSourceMap(databaseName));}/*** 创建上下文*/private ContextManager createContextManager(final String databaseName, final ModeConfiguration modeConfig, final Map<String, DataSource> dataSourceMap,final Collection<RuleConfiguration> ruleConfigs, final Properties props) throws SQLException {// 返回JDBCInstanceMetaData对象。JDBC实例元数据。记录id、ip和版本InstanceMetaData instanceMetaData = TypedSPIRegistry.getRegisteredService(InstanceMetaDataBuilder.class, "JDBC").build(-1);// 全局的规则配置Collection<RuleConfiguration> globalRuleConfigs = ruleConfigs.stream().filter(each -> each instanceof GlobalRuleConfiguration).collect(Collectors.toList());// 数据源规则配置信息,从ruleConfigs中拷贝一份Collection<RuleConfiguration> databaseRuleConfigs = new LinkedList<>(ruleConfigs);databaseRuleConfigs.removeAll(globalRuleConfigs);// 记录 ContextManagerBuilder的参数信息ContextManagerBuilderParameter param = new ContextManagerBuilderParameter(modeConfig, Collections.singletonMap(databaseName,new DataSourceProvidedDatabaseConfiguration(dataSourceMap, databaseRuleConfigs)), globalRuleConfigs, props, Collections.emptyList(), instanceMetaData, false);// 根据配置的mode,使用SPI,创建ContextManagerBuilderContextManagerBuilder contextManagerBuilder = null == modeConfig// 通过类,从SPI(基于Java的SPI机制)中获取对应实现类,且执行init()方法? RequiredSPIRegistry.getRegisteredService(ContextManagerBuilder.class)// 通过类,从SPI(基于Java的SPI机制)中获取对应实现类,且类型名称为配置的mode。如集群为Cluster,单机为Standalone: TypedSPIRegistry.getRegisteredService(ContextManagerBuilder.class, modeConfig.getType());return contextManagerBuilder.build(param);}/*** 默认返回一个ShardingSphereConnection对象* @return*/@Overridepublic Connection getConnection() {return DriverStateContext.getConnection(databaseName, contextManager, jdbcContext);}@Overridepublic Connection getConnection(final String username, final String password) {return getConnection();}/*** 关闭dataSource*/public void close(final Collection<String> dataSourceNames) throws Exception {Map<String, DataSource> dataSourceMap = contextManager.getDataSourceMap(databaseName);for (String each : dataSourceNames) {close(dataSourceMap.get(each));}contextManager.close();}private void close(final DataSource dataSource) throws Exception {if (dataSource instanceof AutoCloseable) {((AutoCloseable) dataSource).close();}}@Overridepublic void close() throws Exception {close(contextManager.getDataSourceMap(databaseName).keySet());}@Overridepublic int getLoginTimeout() throws SQLException {Map<String, DataSource> dataSourceMap = contextManager.getDataSourceMap(databaseName);return dataSourceMap.isEmpty() ? 0 : dataSourceMap.values().iterator().next().getLoginTimeout();}@Overridepublic void setLoginTimeout(final int seconds) throws SQLException {for (DataSource each : contextManager.getDataSourceMap(databaseName).values()) {each.setLoginTimeout(seconds);}}
}
3.1 构造方法
在ShardingSphereDataSource的构造方法主要执行如下:
3.1.1 保存databaseName
保存传入的databaseName,默认为DefaultDatabase.LOGIC_NAME,即logic_db。
3.1.2 创建ContextManager
调用createContextManager()创建一个ContextManager对象。
1)通过SPI获得类型为JDBC的InstanceMetaDataBuilder,即JDBCInstanceMetaDataBuilder,执行build()方法,获取一个InstanceMetaData对象。该对象记录id、ip和ShardingSphere的版本;
2)从rules规则中的Collection<RuleConfiguration>过滤出GlobalRuleConfiguration,即全局的rule规则,如SQL解析、事务等规则;
3)移除全局规则,余下的规则为数据源的配置;
4)创建DataSourceProvidedDatabaseConfiguration,保存数据源相关的配置信息;
5)创建ContextManagerBuilderParameter对象,记录 ContextManagerBuilder的参数信息;
6)根据配置的mode,使用SPI,创建ContextManagerBuilder;
如果没有配置mode,则执行RequiredSPIRegistry.getRegisteredService(ContextManagerBuilder.class)。因为StandaloneContextManagerBuilder的isDefault()为true,所以默认返回StandaloneContextManagerBuilder对象;
7)执行ContextManagerBuilder的build()创建ContextManager对象;
在build()方法中,主要执行如下:
a)根据配置的mode中的provider,持久化存储配置信息。在对应的provider中创建一张repository表,而后将配置信息,根据数据源、规则、属性等信息持久化到repository表中;
b)根据配置的数据源信息,创建ShardingSphereDatabase对象。该对象记录了配置的数据源的元数据信息。如数据源中创建的表、视图、表中的列、索引、约束等;
c)创建InstanceContext对象,该对象保存了当前系统的计算节点、集群节点、mode配置、锁上下文等信息;
d)根据配置的数据源信息,创建MetaDataContexts对象。该对象保存上面a和b的信息;
e)创建ContextManager对象。该对象保存了c和d的信息;
3.1.3 创建JDBCContext
创建一个JDBCContext对象,该对象从DataSource中获取一个Connection,获取数据库的DatabaseMetaData对象,将对象的信息获取出来,存放在CachedDatabaseMetaData中。如url、username、最大连接数、是否支持分组等。
JDBCContext的源码如下:
package org.apache.shardingsphere.driver.jdbc.context;/*** JDBC上下文。从DataSource中获取一个Connection,获取数据库信息封装成CachedDatabaseMetaData。如url、username、最大连接数、是否支持分组等*/
@Getter
public final class JDBCContext {// 存放Database的配置信息。如url、username、最大连接数、是否支持分组等private volatile CachedDatabaseMetaData cachedDatabaseMetaData;public JDBCContext(final Map<String, DataSource> dataSources) throws SQLException {cachedDatabaseMetaData = createCachedDatabaseMetaData(dataSources).orElse(null);}@Subscribepublic synchronized void refreshCachedDatabaseMetaData(final DataSourceChangedEvent event) throws SQLException {cachedDatabaseMetaData = createCachedDatabaseMetaData(DataSourcePoolCreator.create(event.getDataSourcePropertiesMap())).orElse(null);}/*** 从Datasource中获取一个Connection对象,只获取Connection的元数据,封装成CachedDatabaseMetaData*/private Optional<CachedDatabaseMetaData> createCachedDatabaseMetaData(final Map<String, DataSource> dataSources) throws SQLException {if (dataSources.isEmpty()) {return Optional.empty();}try (Connection connection = dataSources.values().iterator().next().getConnection()) {return Optional.of(new CachedDatabaseMetaData(connection.getMetaData()));}}
}
3.2 getConnection()
该方法执行DriverStateContext的getConnection()用于返回一个 java.sql.Connection 连接对象。
DriverStateContext的源码如下:
package org.apache.shardingsphere.driver.state;@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class DriverStateContext {/*** 通过SPI,获取DriverState状态的实现类(锁、熔断限流、OK三种状态)对应的DataSource,执行getConnection()获取连接*/public static Connection getConnection(final String databaseName, final ContextManager contextManager, final JDBCContext jdbcContext) {return TypedSPIRegistry.getRegisteredService(DriverState.class, contextManager.getInstanceContext().getInstance().getState().getCurrentState().name()).getConnection(databaseName, contextManager, jdbcContext);}
}
getConnection()方法执行如下:
1)根据当前实例的状态,通过SPI,获取DriverState的实现类;
a)实例的状态有三种,在StateType枚举类中定义,分别为OK、LOCK、CIRCUIT_BREAK,即正常状态、锁状态和断路器状态(流量治理,限流、熔断),默认为OK;
b)三种实例状态对应三个DriverState实现类,分别为OKDriverState、LockDriverState和CircuitBreakDriverState;
2)获取具体的DriverState实现类,执行DriverState.getConnection()获取一个 java.sql.Connection 连接对象,不同的DriverState实现处理方式不同;
3.2.1 OKDriverState
OKDriverState的源码如下:
package org.apache.shardingsphere.driver.state.ok;public final class OKDriverState implements DriverState {/*** 返回一个ShardingSphereConnection对象*/@Overridepublic Connection getConnection(final String databaseName, final ContextManager contextManager, final JDBCContext jdbcContext) {return new ShardingSphereConnection(databaseName, contextManager, jdbcContext);}@Overridepublic String getType() {return "OK";}
}
在getConnection()方法中,返回一个ShardingSphereConnection对象。
3.2.2 LockDriverState
LockDriverState的源码如下:
package org.apache.shardingsphere.driver.state.lock;public final class LockDriverState implements DriverState {@Overridepublic Connection getConnection(final String databaseName, final ContextManager contextManager, final JDBCContext jdbcContext) {throw new UnsupportedSQLOperationException("LockDriverState");}@Overridepublic String getType() {return "LOCK";}
}
暂时不支持LockDriverState。
3.2.3 CircuitBreakDriverState
CircuitBreakDriverState的源码如下:
package org.apache.shardingsphere.driver.state.circuit;public final class CircuitBreakDriverState implements DriverState {/*** 从CircuitBreakerDataSource对象中的getConnection()方法返回一个Connection连接*/@Overridepublic Connection getConnection(final String databaseName, final ContextManager contextManager, final JDBCContext jdbcContext) {return new CircuitBreakerDataSource().getConnection();}@Overridepublic String getType() {return "CIRCUIT_BREAK";}
}
在getConnection()方法中,新创建一个CircuitBreakerDataSource,从CircuitBreakerDataSource的getConnection()方法中返回一个Connection连接。
CircuitBreakerDataSource同ShardingSphereDataSource一样继承AbstractDataSourceAdapter,在getConnection()方法中返回CircuitBreakerConnection对象。此时系统处于熔断状态,不支持数据库的操作,所以CircuitBreakerConnection是一个空的Connection对象,创建的CircuitBreakerPreparedStatement不执行任何操作。
小结
限于篇幅,本篇先分享到这里,以下做一个小结:
1)程序启动时,执行ShardingSphereDriver的connect()方法;
2)解析分片配置yaml文件,解析后的对象为YamlRootConfiguration;
3)从YamlRootConfiguration中获取配置的dataSources,根据配置的dataSourceClassName,使用反射机制,创建对应的dataSource,并进行赋值。保存在Map<String, DataSource>中;
4)从YamlRootConfiguration中获取配置的rules、mode,通过对应的Swapper转换器,分别转换为Collection<RuleConfiguration>和ModeConfiguration;
5)根据配置的分片信息,创建ShardingSphereDataSource对象。该对象的两个核心方法为:
5.1)构造方法。在构造方法中,创建ContextManager、JDBCContext;
ContextManager存储了配置的数据源的元数据信息(如分片信息、数据源中的表、视图、表的列、索引等元数据)、实例上下文(如维护当前系统的计算节点、集群节点、元数据模型、元数据存储仓库、锁,及相关数据的维护,并提供计算节点状态修改);
JDBCContext从DataSource中获取一个Connection,获取数据库的DatabaseMetaData对象,将对象的信息获取出来,存放在CachedDatabaseMetaData中。如url、username、最大连接数、是否支持分组等;
5.2)getConnection()方法。该方法根据当前实例的StateType,从对应的DriverState中获取一个Connection连接对象;
a)实例的状态有三种,在StateType枚举类中定义,分别为OK、LOCK、CIRCUIT_BREAK,即正常状态、锁状态和断路器状态(流量治理,限流、熔断),默认为OK;
b)三种实例状态对应三个DriverState实现类,分别为OKDriverState、LockDriverState和CircuitBreakDriverState;
c)三个DriverState实现类的getConnection()方法分别返回ShardingSphereConnection、报异常、CircuitBreakConnection,其中CircuitBreakConnection为空的Connection对象,即不会执行真正的数据库操作;
以上为本篇分享的全部内容。
关于本篇内容你有什么自己的想法或独到见解,欢迎在评论区一起交流探讨下吧。
