trino-435:dynamic catalog
一、背景:
trino对于数据源的注册方式为静态注册,在服务启动前需要配置好相关数据源的信息,当添加新的数据源时需要停止服务进行数据源的静态注册然后在重启服务;由于该操作可能会中断正在执行的任务,因此生产环境中这种方式是不可取的。为此再生产环境中需要进行数据源的动态配置,以满足生产环境的需求。在github trino issue Dynamic Catalogs #12709(https://github.com/trinodb/trino/issues/12709)中有相关trino动态数据源的相关推送,我们也依次为依据进行trino动态数据源的调研。
二、动态数据源存储核心接口
trino435版本有关数据源的存储分为静态存储和动态存储,其中动态存储的核心接口为:CatalogStore,包含如下方法:
方法名 | 返回值类型 | 形参 | 描述 |
---|---|---|---|
getCatalogs | Collection<StoredCatalog> | 无 | 获取全部的catalog |
createCatalogProperties | CatalogProperties | String catalogName, ConnectorName connectorName, Map<String, String> properties | 从原始的properties创建catalog properties |
addOrReplaceCatalog | void | CatalogProperties catalogProperties | 新增或更新catalog properties |
removeCatalog | void | String catalogName | 如果存在删除catalog |
该接口中还包含一个内部接口StoredCatalog,有如下方法:
方法名 | 返回值类型 | 形参 | 描述 |
---|---|---|---|
getName | Collection<StoredCatalog> | 无 | 获取全部的catalog |
loadProperties | CatalogProperties | 无 | 从原始的properties创建catalog properties。服务启动时会触发调用,触发入口:Server.class中调用io.trino.connector.ConnectorServicesProvider#loadInitialCatalogs |
1、数据库存储
为了实现catalogs的持久化,我们需要实现catalog数据库存储方便我们检索和项目上的使用。当我们要加入新的catalogs存储方式时,需要实现这两个接口。下面我们就讲解下将catalogs存储到数据库中的实现方式(以mysql存储为例)。
我们要实现catalogs的数据库的存储的一个前提是服务需要知道要存储数据库的url、用户名、密码等连接信息,因此需要声明对应的配置类(JdbcCatalogStoreConfig),并声明如下变量:
catalog.config-db-url
catalog.config-db-user
catalog.config-db-password
声明JdbcCatalogStore数据存储类实现了CatalogStore接口,完成一下流程改造:
- 并在构造方法中实现从数据库中加载catalogs信息,catalogs存储的表结构(动态数据源存储表结构)如下:
CREATE TABLE IF NOT EXISTS `catalogs` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`name` varchar(255) NOT NULL COMMENT 'catalog名称',
`properties` text,
`create_time` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) COMMENT '创建时间',
`update_time` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY index_name (`name`)
)
- 由于需要和mysql数据库交互,因此需要引入mysql驱动jar,并加载驱动
# 引入jar
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>8.2.0</version>
<scope>runtime</scope>
</dependency>
# 加载驱动
loaderJdbcDriver(this.getClass().getClassLoader(), "com.mysql.cj.jdbc.Driver", catalogsUrl);
private static void loaderJdbcDriver(ClassLoader classLoader, String driverClassName, String catalogUrl) {
try {
final Class<?> clazz = Class.forName(driverClassName, true, classLoader);
final Driver driver = (Driver) clazz.newInstance();
if (!driver.acceptsURL(catalogUrl)) {
log.error("Jdbc driver loading error. Driver {} cannot accept url.", driverClassName);
throw new RuntimeException("Jdbc driver loading error.");
}
} catch (final Exception e) {
throw new RuntimeException("Jdbc driver loading error.", e);
}
}
- 实现loadProperties方法,完成String到CatalogProperties的转变。
- 其他方法的实现可参考InMemoryCatalogStore和FileCatalogStore这两个类,需要注意的是在实现addOrReplaceCatalog方法时,需要实现Properties转String来存储到数据库,然而trino-server自带的JSONObject类转String时存在对“/”转义的情况,因此引入阿里巴巴的JSON转换工具,如下:
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.23</version>
</dependency>
2、动态数据源的restful API
主要完成catalog的注册与删除接口开发,下面分别对catalog注册和删除结构实现方式进行阐述。
(1)catalog的注册接口
- 将catalog保存到数据库后如何使其在trino服务中生效
- 如何将catalog注册信息有coordinator同步给worker节点
对于核心问题1,对io.trino.server.Server类的分析,新增catalog需要改造该类中的私有方法io.trino.server.Server#updateConnectorIds,实现将新增的catalog相关信息添加到Announcer类中。
对于核心问题2,对于coordinator节点提供了io.trino.failuredetector.HeartbeatFailureDetector类,可以获取worker节点地址信息,因此可以通过发送http请求向每个worker节点注册catalog(注意:对于worker节点catalog的注册不需要持久化操作,仅仅在内存中保存即可)
(2)catalog的删除接口
该接口主要完成catalog从内存和数据库中删除,并更新Announcer类。另外该接口并不需要和worker节点通信,因为在435版本中提供了catalog的修剪功能,该功能的实现类为io.trino.connector.CatalogPruneTask#CatalogPruneTask
有待优化点
- 数据源注册完成后并没有立即生效,需要服务的需要对worker节点的探活完成后才生效
- 在并发操作的情况下存在新加入的worker节点的数据源与coordinator不持一致的情况。
- 数据源注册完成后coordinator节点通过failureDetector.getStats().values()的形式获取worker节点的uri存在重复的情况,需要优化。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!