From 1fa8fae237b99164788060486b4999405552348e Mon Sep 17 00:00:00 2001 From: taojinlong Date: Tue, 22 Mar 2022 17:46:54 +0800 Subject: [PATCH 1/3] =?UTF-8?q?feat:=20=E6=94=AF=E6=8C=81=E9=9B=86?= =?UTF-8?q?=E7=BE=A4=EF=BC=88doris=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dto/datasource/DorisConfiguration.java | 3 ++ .../io/dataease/provider/DDLProvider.java | 3 +- .../io/dataease/provider/DDLProviderImpl.java | 3 +- .../engine/doris/DorisDDLProvider.java | 16 ++++++-- .../engine/mysql/MysqlDDLProvider.java | 3 +- .../service/dataset/ExtractDataService.java | 2 +- .../service/datasource/DatasourceService.java | 5 +++ .../service/engine/EngineService.java | 37 +++++++++---------- frontend/src/lang/zh.js | 5 +++ .../system/SysParam/SimpleModeSetting.vue | 1 - frontend/src/views/system/SysParam/index.vue | 7 +++- 11 files changed, 55 insertions(+), 30 deletions(-) diff --git a/backend/src/main/java/io/dataease/dto/datasource/DorisConfiguration.java b/backend/src/main/java/io/dataease/dto/datasource/DorisConfiguration.java index 18f3f41806..f3edc95760 100644 --- a/backend/src/main/java/io/dataease/dto/datasource/DorisConfiguration.java +++ b/backend/src/main/java/io/dataease/dto/datasource/DorisConfiguration.java @@ -8,4 +8,7 @@ import lombok.Setter; public class DorisConfiguration extends MysqlConfiguration { private Integer httpPort; + + private Integer replicationNum = 1; + private Integer bucketNum = 10; } diff --git a/backend/src/main/java/io/dataease/provider/DDLProvider.java b/backend/src/main/java/io/dataease/provider/DDLProvider.java index 758e9fc925..ea238d8957 100644 --- a/backend/src/main/java/io/dataease/provider/DDLProvider.java +++ b/backend/src/main/java/io/dataease/provider/DDLProvider.java @@ -1,6 +1,7 @@ package io.dataease.provider; import io.dataease.base.domain.DatasetTableField; +import io.dataease.base.domain.Datasource; import java.util.List; @@ -17,7 +18,7 @@ public abstract class DDLProvider { public abstract String replaceTable(String name); - public abstract String createTableSql(String name, List datasetTableFields); + public abstract String createTableSql(String name, List datasetTableFields, Datasource engine); public abstract String insertSql(String name, List dataList, int page, int pageNumber); } diff --git a/backend/src/main/java/io/dataease/provider/DDLProviderImpl.java b/backend/src/main/java/io/dataease/provider/DDLProviderImpl.java index f90781fcb3..0e53560056 100644 --- a/backend/src/main/java/io/dataease/provider/DDLProviderImpl.java +++ b/backend/src/main/java/io/dataease/provider/DDLProviderImpl.java @@ -1,6 +1,7 @@ package io.dataease.provider; import io.dataease.base.domain.DatasetTableField; +import io.dataease.base.domain.Datasource; import io.dataease.commons.utils.Md5Utils; import java.util.Arrays; @@ -28,7 +29,7 @@ public class DDLProviderImpl extends DDLProvider { } @Override - public String createTableSql(String name, List datasetTableFields) { + public String createTableSql(String name, List datasetTableFields, Datasource engine) { return null; } diff --git a/backend/src/main/java/io/dataease/provider/engine/doris/DorisDDLProvider.java b/backend/src/main/java/io/dataease/provider/engine/doris/DorisDDLProvider.java index 1d9965e2ca..950e146898 100644 --- a/backend/src/main/java/io/dataease/provider/engine/doris/DorisDDLProvider.java +++ b/backend/src/main/java/io/dataease/provider/engine/doris/DorisDDLProvider.java @@ -1,7 +1,12 @@ package io.dataease.provider.engine.doris; +import com.google.gson.Gson; import io.dataease.base.domain.DatasetTableField; +import io.dataease.base.domain.Datasource; import io.dataease.commons.utils.TableUtils; +import io.dataease.dto.datasource.DorisConfiguration; +import io.dataease.dto.datasource.JdbcConfiguration; +import io.dataease.dto.datasource.MysqlConfiguration; import io.dataease.provider.DDLProviderImpl; import org.springframework.stereotype.Service; @@ -16,8 +21,8 @@ public class DorisDDLProvider extends DDLProviderImpl { private static final String creatTableSql = "CREATE TABLE IF NOT EXISTS `TABLE_NAME`" + "Column_Fields" + "UNIQUE KEY(dataease_uuid)\n" + - "DISTRIBUTED BY HASH(dataease_uuid) BUCKETS 10\n" + - "PROPERTIES(\"replication_num\" = \"1\");"; + "DISTRIBUTED BY HASH(dataease_uuid) BUCKETS BUCKETS_NUM\n" + + "PROPERTIES(\"replication_num\" = \"ReplicationNum\");"; @Override public String createView(String name, String viewSQL) { @@ -41,9 +46,12 @@ public class DorisDDLProvider extends DDLProviderImpl { } @Override - public String createTableSql(String tableName, List datasetTableFields) { + public String createTableSql(String tableName, List datasetTableFields, Datasource engine) { + DorisConfiguration dorisConfiguration = new Gson().fromJson(engine.getConfiguration(), DorisConfiguration.class); String dorisTableColumnSql = createDorisTableColumnSql(datasetTableFields); - return creatTableSql.replace("TABLE_NAME", tableName).replace("Column_Fields", dorisTableColumnSql); + return creatTableSql.replace("TABLE_NAME", tableName).replace("Column_Fields", dorisTableColumnSql) + .replace("BUCKETS_NUM", dorisConfiguration.getBucketNum().toString()) + .replace("ReplicationNum", dorisConfiguration.getReplicationNum().toString()); } private String createDorisTableColumnSql(final List datasetTableFields) { diff --git a/backend/src/main/java/io/dataease/provider/engine/mysql/MysqlDDLProvider.java b/backend/src/main/java/io/dataease/provider/engine/mysql/MysqlDDLProvider.java index 91ba3ccf97..a920e7d01f 100644 --- a/backend/src/main/java/io/dataease/provider/engine/mysql/MysqlDDLProvider.java +++ b/backend/src/main/java/io/dataease/provider/engine/mysql/MysqlDDLProvider.java @@ -1,6 +1,7 @@ package io.dataease.provider.engine.mysql; import io.dataease.base.domain.DatasetTableField; +import io.dataease.base.domain.Datasource; import io.dataease.commons.utils.TableUtils; import io.dataease.provider.DDLProviderImpl; import org.springframework.stereotype.Service; @@ -43,7 +44,7 @@ public class MysqlDDLProvider extends DDLProviderImpl { } @Override - public String createTableSql(String tableName, List datasetTableFields) { + public String createTableSql(String tableName, List datasetTableFields, Datasource engine) { String dorisTableColumnSql = createDorisTableColumnSql(datasetTableFields); return creatTableSql.replace("TABLE_NAME", tableName).replace("Column_Fields", dorisTableColumnSql); } diff --git a/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java b/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java index 03afdf3c7e..e5f314a50f 100644 --- a/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java +++ b/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java @@ -605,7 +605,7 @@ public class ExtractDataService { DatasourceRequest datasourceRequest = new DatasourceRequest(); datasourceRequest.setDatasource(engine); DDLProvider ddlProvider = ProviderFactory.getDDLProvider(engine.getType()); - datasourceRequest.setQuery(ddlProvider.createTableSql(tableName, datasetTableFields)); + datasourceRequest.setQuery(ddlProvider.createTableSql(tableName, datasetTableFields, engine)); jdbcProvider.exec(datasourceRequest); } diff --git a/backend/src/main/java/io/dataease/service/datasource/DatasourceService.java b/backend/src/main/java/io/dataease/service/datasource/DatasourceService.java index fca4f1f6a1..05d54cc081 100644 --- a/backend/src/main/java/io/dataease/service/datasource/DatasourceService.java +++ b/backend/src/main/java/io/dataease/service/datasource/DatasourceService.java @@ -59,6 +59,11 @@ public class DatasourceService { @DeCleaner(DePermissionType.DATASOURCE) public Datasource addDatasource(Datasource datasource) throws Exception{ + try{ + DatasourceTypes datasourceType = DatasourceTypes.valueOf(datasource.getType()); + }catch (Exception e){ + throw e; + } checkName(datasource); long currentTimeMillis = System.currentTimeMillis(); datasource.setId(UUID.randomUUID().toString()); diff --git a/backend/src/main/java/io/dataease/service/engine/EngineService.java b/backend/src/main/java/io/dataease/service/engine/EngineService.java index 181d1acc6b..db635e721b 100644 --- a/backend/src/main/java/io/dataease/service/engine/EngineService.java +++ b/backend/src/main/java/io/dataease/service/engine/EngineService.java @@ -9,6 +9,7 @@ import io.dataease.commons.utils.BeanUtils; import io.dataease.controller.ResultHolder; import io.dataease.controller.request.datasource.DatasourceRequest; import io.dataease.dto.DatasourceDTO; +import io.dataease.listener.util.CacheUtils; import io.dataease.provider.ProviderFactory; import io.dataease.provider.datasource.DatasourceProvider; import io.dataease.service.datasource.DatasourceService; @@ -31,8 +32,6 @@ public class EngineService { private DeEngineMapper deEngineMapper; @Resource private DatasourceService datasource; - static private Datasource ds = null; - public Boolean isLocalMode(){ return env.getProperty("engine_mode", "local").equalsIgnoreCase("local"); @@ -51,7 +50,13 @@ public class EngineService { } public DeEngine info(){ - List deEngines = deEngineMapper.selectByExampleWithBLOBs(new DeEngineExample()); + DeEngineExample deEngineExample = new DeEngineExample(); + if(isClusterMode()){ + deEngineExample.createCriteria().andTypeEqualTo("engine_doris"); + }else { + deEngineExample.createCriteria().andTypeEqualTo("engine_mysql"); + } + List deEngines = deEngineMapper.selectByExampleWithBLOBs(deEngineExample); if(CollectionUtils.isEmpty(deEngines)){ return new DeEngine(); } @@ -69,7 +74,7 @@ public class EngineService { datasourceProvider.checkStatus(datasourceRequest); return ResultHolder.success(datasource); }catch (Exception e){ - return ResultHolder.error("Datasource is invalid: " + e.getMessage()); + return ResultHolder.error("Engine is invalid: " + e.getMessage()); } } @@ -79,26 +84,23 @@ public class EngineService { deEngineMapper.insert(engine); }else { deEngineMapper.updateByPrimaryKeyWithBLOBs(engine); + datasource.handleConnectionPool(getDeEngine(), "delete"); } - datasource.handleConnectionPool(this.ds, "delete"); setDs(engine); - datasource.handleConnectionPool(this.ds, "add"); + datasource.handleConnectionPool(getDeEngine(), "add"); return ResultHolder.success(engine); } private void setDs(DeEngine engine){ - if(this.ds == null){ - this.ds = new Datasource(); - BeanUtils.copyBean(this.ds, engine); - }else { - BeanUtils.copyBean(this.ds, engine); - } + CacheUtils.put("ENGINE", "engine", engine, null, null); } public Datasource getDeEngine() throws Exception{ - if (this.ds != null) { - return this.ds; + Object catcheEngine = CacheUtils.get("ENGINE", "engine"); + if(catcheEngine != null){ + return (Datasource) catcheEngine; } + if(isLocalMode()){ JSONObject jsonObject = new JSONObject(); jsonObject.put("dataSourceType", "jdbc"); @@ -123,12 +125,7 @@ public class EngineService { } setDs(deEngines.get(0)); } -// if(isSimpleMode()){ -// -// } - - //TODO cluster mode - return this.ds; + return getDeEngine(); } diff --git a/frontend/src/lang/zh.js b/frontend/src/lang/zh.js index 8907f20014..62a6c24259 100644 --- a/frontend/src/lang/zh.js +++ b/frontend/src/lang/zh.js @@ -1301,6 +1301,7 @@ export default { user_name: '用户名', password: '密码', host: '主机名/IP地址', + doris_host: 'Doris 地址', port: '端口', datasource_url: '地址', please_input_datasource_url: '请输入 Elasticsearch 地址,如: http://es_host:es_port', @@ -1332,6 +1333,10 @@ export default { initial_pool_size: '初始连接数', min_pool_size: '最小连接数', max_pool_size: '最大连接数', + bucket_num: 'Bucket 数量', + replication_num: '副本数量', + please_input_bucket_num: '请输入 Bucket 数量', + please_input_replication_num: '请输入副本数量', max_idle_time: '最大空闲(秒)', acquire_increment: '增长数', connect_timeout: '连接超时(秒)', diff --git a/frontend/src/views/system/SysParam/SimpleModeSetting.vue b/frontend/src/views/system/SysParam/SimpleModeSetting.vue index 6871b83ac5..1057a2181e 100644 --- a/frontend/src/views/system/SysParam/SimpleModeSetting.vue +++ b/frontend/src/views/system/SysParam/SimpleModeSetting.vue @@ -1,6 +1,5 @@ @@ -37,13 +41,14 @@ import BasicSetting from './BasicSetting' import EmailSetting from './EmailSetting' import SimpleMode from './SimpleModeSetting' +import ClusterMode from './ClusterModeSetting' import LayoutContent from '@/components/business/LayoutContent' import PluginCom from '@/views/system/plugin/PluginCom' import { pluginLoaded } from '@/api/user' import { engineMode } from '@/api/system/engine' export default { - components: { BasicSetting, EmailSetting, LayoutContent, PluginCom, SimpleMode}, + components: { BasicSetting, EmailSetting, LayoutContent, PluginCom, SimpleMode, ClusterMode}, data() { return { activeName: 'zero', From a045f51515808116a07dd982cfce19a844e807e5 Mon Sep 17 00:00:00 2001 From: taojinlong Date: Tue, 22 Mar 2022 19:33:09 +0800 Subject: [PATCH 2/3] =?UTF-8?q?feat:=20=E6=94=AF=E6=8C=81=E9=9B=86?= =?UTF-8?q?=E7=BE=A4=EF=BC=88doris=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dto/datasource/DorisConfiguration.java | 2 +- .../provider/datasource/JdbcProvider.java | 1 - .../query/mongodb/MongoQueryProvider.java | 1 - .../service/engine/EngineService.java | 82 +++-- frontend/src/lang/zh.js | 2 + .../system/SysParam/ClusterModeSetting.vue | 291 ++++++++++++++++++ 6 files changed, 356 insertions(+), 23 deletions(-) create mode 100644 frontend/src/views/system/SysParam/ClusterModeSetting.vue diff --git a/backend/src/main/java/io/dataease/dto/datasource/DorisConfiguration.java b/backend/src/main/java/io/dataease/dto/datasource/DorisConfiguration.java index f3edc95760..757c58b41a 100644 --- a/backend/src/main/java/io/dataease/dto/datasource/DorisConfiguration.java +++ b/backend/src/main/java/io/dataease/dto/datasource/DorisConfiguration.java @@ -7,7 +7,7 @@ import lombok.Setter; @Setter public class DorisConfiguration extends MysqlConfiguration { - private Integer httpPort; + private Integer httpPort = 8030; private Integer replicationNum = 1; private Integer bucketNum = 10; diff --git a/backend/src/main/java/io/dataease/provider/datasource/JdbcProvider.java b/backend/src/main/java/io/dataease/provider/datasource/JdbcProvider.java index af64cfca60..4786df662f 100644 --- a/backend/src/main/java/io/dataease/provider/datasource/JdbcProvider.java +++ b/backend/src/main/java/io/dataease/provider/datasource/JdbcProvider.java @@ -487,7 +487,6 @@ public class JdbcProvider extends DatasourceProvider { break; case impala: ImpalaConfiguration impalaConfiguration = new Gson().fromJson(datasourceRequest.getDatasource().getConfiguration(), ImpalaConfiguration.class); - System.out.println(new Gson().toJson(impalaConfiguration)); username = impalaConfiguration.getUsername(); password = impalaConfiguration.getPassword(); driver = impalaConfiguration.getDriver(); diff --git a/backend/src/main/java/io/dataease/provider/query/mongodb/MongoQueryProvider.java b/backend/src/main/java/io/dataease/provider/query/mongodb/MongoQueryProvider.java index eec743a7eb..d7569fc37d 100644 --- a/backend/src/main/java/io/dataease/provider/query/mongodb/MongoQueryProvider.java +++ b/backend/src/main/java/io/dataease/provider/query/mongodb/MongoQueryProvider.java @@ -43,7 +43,6 @@ public class MongoQueryProvider extends QueryProvider { @Override public Integer transFieldType(String field) { - System.out.println(field); field = field.toUpperCase(); switch (field) { case "CHAR": diff --git a/backend/src/main/java/io/dataease/service/engine/EngineService.java b/backend/src/main/java/io/dataease/service/engine/EngineService.java index db635e721b..5eb454b702 100644 --- a/backend/src/main/java/io/dataease/service/engine/EngineService.java +++ b/backend/src/main/java/io/dataease/service/engine/EngineService.java @@ -1,14 +1,20 @@ package io.dataease.service.engine; +import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; +import com.google.gson.Gson; +import com.google.gson.JsonArray; import io.dataease.base.domain.Datasource; import io.dataease.base.domain.DeEngine; import io.dataease.base.domain.DeEngineExample; import io.dataease.base.mapper.DeEngineMapper; import io.dataease.commons.utils.BeanUtils; +import io.dataease.commons.utils.HttpClientConfig; +import io.dataease.commons.utils.HttpClientUtil; import io.dataease.controller.ResultHolder; import io.dataease.controller.request.datasource.DatasourceRequest; import io.dataease.dto.DatasourceDTO; +import io.dataease.dto.datasource.DorisConfiguration; import io.dataease.listener.util.CacheUtils; import io.dataease.provider.ProviderFactory; import io.dataease.provider.datasource.DatasourceProvider; @@ -20,7 +26,9 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import javax.annotation.Resource; +import java.util.Base64; import java.util.List; +import java.util.Optional; import java.util.UUID; @Service @@ -33,38 +41,38 @@ public class EngineService { @Resource private DatasourceService datasource; - public Boolean isLocalMode(){ + public Boolean isLocalMode() { return env.getProperty("engine_mode", "local").equalsIgnoreCase("local"); } - public Boolean isSimpleMode(){ + public Boolean isSimpleMode() { return env.getProperty("engine_mode", "local").equalsIgnoreCase("simple"); } - public Boolean isClusterMode(){ + public Boolean isClusterMode() { return env.getProperty("engine_mode", "local").equalsIgnoreCase("cluster"); } - public String mode(){ + public String mode() { return env.getProperty("engine_mode", "local"); } - public DeEngine info(){ + public DeEngine info() { DeEngineExample deEngineExample = new DeEngineExample(); - if(isClusterMode()){ + if (isClusterMode()) { deEngineExample.createCriteria().andTypeEqualTo("engine_doris"); - }else { + } else { deEngineExample.createCriteria().andTypeEqualTo("engine_mysql"); } List deEngines = deEngineMapper.selectByExampleWithBLOBs(deEngineExample); - if(CollectionUtils.isEmpty(deEngines)){ + if (CollectionUtils.isEmpty(deEngines)) { return new DeEngine(); } return deEngines.get(0); } public ResultHolder validate(DatasourceDTO datasource) throws Exception { - if(StringUtils.isEmpty(datasource.getType()) || StringUtils.isEmpty(datasource.getConfiguration())){ + if (StringUtils.isEmpty(datasource.getType()) || StringUtils.isEmpty(datasource.getConfiguration())) { throw new Exception("未完整设置数据引擎"); } try { @@ -72,17 +80,49 @@ public class EngineService { DatasourceRequest datasourceRequest = new DatasourceRequest(); datasourceRequest.setDatasource(datasource); datasourceProvider.checkStatus(datasourceRequest); - return ResultHolder.success(datasource); - }catch (Exception e){ + } catch (Exception e) { return ResultHolder.error("Engine is invalid: " + e.getMessage()); } + + if (datasource.getType().equalsIgnoreCase("engine_doris")) { + DorisConfiguration dorisConfiguration = new Gson().fromJson(datasource.getConfiguration(), DorisConfiguration.class); + HttpClientConfig httpClientConfig = new HttpClientConfig(); + String authValue = "Basic " + Base64.getUrlEncoder().encodeToString((dorisConfiguration.getUsername() + + ":" + dorisConfiguration.getPassword()).getBytes()); + httpClientConfig.addHeader("Authorization", authValue); + String response; + try { + response = HttpClientUtil.get("http://" + dorisConfiguration.getHost() + ":" + dorisConfiguration.getHttpPort() + "/api/backends", httpClientConfig); + }catch (Exception e){ + return ResultHolder.error("Engine is invalid: " + e.getMessage()); + } + + JSONArray backends = Optional.ofNullable(JSONObject.parseObject(response).getJSONObject("data")).orElse(new JSONObject()).getJSONArray("backends"); + if(CollectionUtils.isEmpty(backends)){ + return ResultHolder.error("Engine is invalid: no backends found."); + } + + Integer alives = 0; + for (int i = 0; i < backends.size(); i++) { + JSONObject kv = backends.getJSONObject(i); + if (kv.getBoolean("is_alive")) { + alives ++; + } + } + + if(alives < dorisConfiguration.getReplicationNum()){ + return ResultHolder.error("Engine params is invalid: 副本数量不能大于节点数量."); + } + } + + return ResultHolder.success(datasource); } public ResultHolder save(DeEngine engine) throws Exception { - if(StringUtils.isEmpty(engine.getId())){ + if (StringUtils.isEmpty(engine.getId())) { engine.setId(UUID.randomUUID().toString()); deEngineMapper.insert(engine); - }else { + } else { deEngineMapper.updateByPrimaryKeyWithBLOBs(engine); datasource.handleConnectionPool(getDeEngine(), "delete"); } @@ -91,17 +131,19 @@ public class EngineService { return ResultHolder.success(engine); } - private void setDs(DeEngine engine){ - CacheUtils.put("ENGINE", "engine", engine, null, null); + private void setDs(DeEngine engine) { + Datasource datasource = new Datasource(); + BeanUtils.copyBean(datasource, engine); + CacheUtils.put("ENGINE", "engine", datasource, null, null); } - public Datasource getDeEngine() throws Exception{ + public Datasource getDeEngine() throws Exception { Object catcheEngine = CacheUtils.get("ENGINE", "engine"); - if(catcheEngine != null){ + if (catcheEngine != null) { return (Datasource) catcheEngine; } - if(isLocalMode()){ + if (isLocalMode()) { JSONObject jsonObject = new JSONObject(); jsonObject.put("dataSourceType", "jdbc"); jsonObject.put("dataBase", env.getProperty("doris.db", "doris")); @@ -118,9 +160,9 @@ public class EngineService { engine.setType("engine_doris"); engine.setConfiguration(jsonObject.toJSONString()); setDs(engine); - }else { + } else { List deEngines = deEngineMapper.selectByExampleWithBLOBs(new DeEngineExample()); - if(CollectionUtils.isEmpty(deEngines)){ + if (CollectionUtils.isEmpty(deEngines)) { throw new Exception("未设置数据引擎"); } setDs(deEngines.get(0)); diff --git a/frontend/src/lang/zh.js b/frontend/src/lang/zh.js index 532f27b309..a1089daca1 100644 --- a/frontend/src/lang/zh.js +++ b/frontend/src/lang/zh.js @@ -1303,6 +1303,8 @@ export default { host: '主机名/IP地址', doris_host: 'Doris 地址', port: '端口', + query_port: 'Query Port', + http_port: 'Http Port', datasource_url: '地址', please_input_datasource_url: '请输入 Elasticsearch 地址,如: http://es_host:es_port', please_input_data_base: '请输入数据库名称', diff --git a/frontend/src/views/system/SysParam/ClusterModeSetting.vue b/frontend/src/views/system/SysParam/ClusterModeSetting.vue new file mode 100644 index 0000000000..f0a5f9b2dc --- /dev/null +++ b/frontend/src/views/system/SysParam/ClusterModeSetting.vue @@ -0,0 +1,291 @@ + + + + + From e2d088a51ff4390cb425ee19a3b25b4638583fac Mon Sep 17 00:00:00 2001 From: taojinlong Date: Fri, 25 Mar 2022 16:59:22 +0800 Subject: [PATCH 3/3] =?UTF-8?q?feat:=20=E6=94=AF=E6=8C=81=E9=9B=86?= =?UTF-8?q?=E7=BE=A4=EF=BC=88kettle=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../commons/utils/ExcelXlsxReader.java | 2 +- .../dataset/DataSetGroupController.java | 5 +- .../controller/engine/EngineController.java | 2 + .../controller/engine/KettleController.java | 66 +++++ .../main/java/io/dataease/dto/KettleDTO.java | 11 + .../io/dataease/job/sechedule/Schedular.java | 8 + .../service/dataset/ExtractDataService.java | 50 +--- .../service/kettle/KettleService.java | 150 +++++++++++ frontend/src/api/system/kettle.js | 49 ++++ frontend/src/lang/en.js | 15 ++ frontend/src/lang/tw.js | 15 ++ frontend/src/lang/zh.js | 12 +- .../views/system/SysParam/KettleSetting.vue | 232 ++++++++++++++++++ frontend/src/views/system/SysParam/index.vue | 7 +- 14 files changed, 573 insertions(+), 51 deletions(-) create mode 100644 backend/src/main/java/io/dataease/controller/engine/KettleController.java create mode 100644 backend/src/main/java/io/dataease/dto/KettleDTO.java create mode 100644 backend/src/main/java/io/dataease/service/kettle/KettleService.java create mode 100644 frontend/src/api/system/kettle.js create mode 100644 frontend/src/views/system/SysParam/KettleSetting.vue diff --git a/backend/src/main/java/io/dataease/commons/utils/ExcelXlsxReader.java b/backend/src/main/java/io/dataease/commons/utils/ExcelXlsxReader.java index dda075e2c7..9ee28d1ad1 100644 --- a/backend/src/main/java/io/dataease/commons/utils/ExcelXlsxReader.java +++ b/backend/src/main/java/io/dataease/commons/utils/ExcelXlsxReader.java @@ -33,7 +33,7 @@ import java.util.*; public class ExcelXlsxReader extends DefaultHandler { /** - * 自定义获取表格某些信息 + * 自定义获取表格某些信 */ public Map map = new TreeMap(); /** diff --git a/backend/src/main/java/io/dataease/controller/dataset/DataSetGroupController.java b/backend/src/main/java/io/dataease/controller/dataset/DataSetGroupController.java index 38f63df659..876c7ab312 100644 --- a/backend/src/main/java/io/dataease/controller/dataset/DataSetGroupController.java +++ b/backend/src/main/java/io/dataease/controller/dataset/DataSetGroupController.java @@ -10,6 +10,7 @@ import io.dataease.controller.request.dataset.DataSetGroupRequest; import io.dataease.dto.dataset.DataSetGroupDTO; import io.dataease.service.dataset.DataSetGroupService; import io.dataease.service.dataset.ExtractDataService; +import io.dataease.service.kettle.KettleService; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import org.apache.shiro.authz.annotation.Logical; @@ -32,6 +33,8 @@ public class DataSetGroupController { private DataSetGroupService dataSetGroupService; @Resource private ExtractDataService extractDataService; + @Resource + private KettleService kettleService; @DePermissions(value = { @DePermission(type = DePermissionType.DATASET, value = "id"), @@ -71,6 +74,6 @@ public class DataSetGroupController { @ApiIgnore @PostMapping("/isKettleRunning") public boolean isKettleRunning() { - return extractDataService.isKettleRunning(); + return kettleService.isKettleRunning(); } } diff --git a/backend/src/main/java/io/dataease/controller/engine/EngineController.java b/backend/src/main/java/io/dataease/controller/engine/EngineController.java index c032534158..85572b13ec 100644 --- a/backend/src/main/java/io/dataease/controller/engine/EngineController.java +++ b/backend/src/main/java/io/dataease/controller/engine/EngineController.java @@ -42,4 +42,6 @@ public class EngineController { return engineService.save(engine); } + + } diff --git a/backend/src/main/java/io/dataease/controller/engine/KettleController.java b/backend/src/main/java/io/dataease/controller/engine/KettleController.java new file mode 100644 index 0000000000..11756093cd --- /dev/null +++ b/backend/src/main/java/io/dataease/controller/engine/KettleController.java @@ -0,0 +1,66 @@ +package io.dataease.controller.engine; + + +import com.github.pagehelper.Page; +import com.github.pagehelper.PageHelper; +import io.dataease.auth.annotation.DePermission; +import io.dataease.base.domain.DeEngine; +import io.dataease.commons.constants.DePermissionType; +import io.dataease.commons.constants.ResourceAuthLevel; +import io.dataease.commons.utils.PageUtils; +import io.dataease.commons.utils.Pager; +import io.dataease.controller.ResultHolder; +import io.dataease.dto.KettleDTO; +import io.dataease.plugins.common.entity.XpackConditionEntity; +import io.dataease.plugins.common.entity.XpackGridRequest; +import io.dataease.plugins.config.SpringContextUtil; +import io.dataease.plugins.xpack.auth.dto.request.DataSetColumnPermissionsDTO; +import io.dataease.plugins.xpack.auth.service.ColumnPermissionService; +import io.dataease.service.kettle.KettleService; +import io.swagger.annotations.ApiOperation; +import org.springframework.web.bind.annotation.*; +import springfox.documentation.annotations.ApiIgnore; + +import javax.annotation.Resource; +import java.util.ArrayList; +import java.util.List; + +@ApiIgnore +@RequestMapping("kettle") +@RestController +public class KettleController { + + @Resource + private KettleService kettleService; + + @ApiIgnore + @PostMapping("save") + public ResultHolder save(@RequestBody DeEngine engine) throws Exception{ + return kettleService.save(engine); + } + + + @ApiIgnore + @PostMapping("validate") + public void validate(@RequestBody KettleDTO kettleDTO) throws Exception{ + kettleService.validate(kettleDTO); + } + + @ApiIgnore + @PostMapping("validate/{id}") + public ResultHolder validate(@PathVariable String id) throws Exception{ + return kettleService.validate(id); + } + + @PostMapping("/pageList/{goPage}/{pageSize}") + public Pager> pageList( @PathVariable int goPage, @PathVariable int pageSize) { + Page page = PageHelper.startPage(goPage, pageSize, true); + return PageUtils.setPageInfo(page, kettleService.pageList()); + } + + @ApiIgnore + @DeleteMapping("delete/{id}") + public void delete(@PathVariable String id) throws Exception{ + kettleService.delete(id); + } +} diff --git a/backend/src/main/java/io/dataease/dto/KettleDTO.java b/backend/src/main/java/io/dataease/dto/KettleDTO.java new file mode 100644 index 0000000000..23e7fb2327 --- /dev/null +++ b/backend/src/main/java/io/dataease/dto/KettleDTO.java @@ -0,0 +1,11 @@ +package io.dataease.dto; + +import lombok.Data; + +@Data +public class KettleDTO { + private String carte; + private String port; + private String user; + private String passwd; +} diff --git a/backend/src/main/java/io/dataease/job/sechedule/Schedular.java b/backend/src/main/java/io/dataease/job/sechedule/Schedular.java index 87c0d4c3b6..6a1693e6ff 100644 --- a/backend/src/main/java/io/dataease/job/sechedule/Schedular.java +++ b/backend/src/main/java/io/dataease/job/sechedule/Schedular.java @@ -3,6 +3,7 @@ package io.dataease.job.sechedule; import com.fit2cloud.quartz.anno.QuartzScheduled; import io.dataease.service.datasource.DatasourceService; import io.dataease.service.dataset.DataSetTableService; +import io.dataease.service.kettle.KettleService; import org.springframework.stereotype.Component; import javax.annotation.Resource; @@ -13,6 +14,8 @@ public class Schedular { private DataSetTableService dataSetTableService; @Resource private DatasourceService datasourceService; + @Resource + private KettleService kettleService; @QuartzScheduled(cron = "0 0/3 * * * ?") public void updateDatasetTableStatus() { @@ -24,4 +27,9 @@ public class Schedular { datasourceService.updateDatasourceStatus(); } + @QuartzScheduled(cron = "0 0/30 * * * ?") + public void updateKettleStatus() { + kettleService.updateKettleStatus(); + } + } diff --git a/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java b/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java index e5f314a50f..6d6f441a42 100644 --- a/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java +++ b/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java @@ -25,6 +25,7 @@ import io.dataease.exception.DataEaseException; import io.dataease.listener.util.CacheUtils; import io.dataease.provider.QueryProvider; import io.dataease.service.engine.EngineService; +import io.dataease.service.kettle.KettleService; import io.dataease.service.message.DeMsgutil; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.io.FileUtils; @@ -98,6 +99,8 @@ public class ExtractDataService { private ExtChartViewMapper extChartViewMapper; @Resource private EngineService engineService; + @Resource + private KettleService kettleService; private static final String lastUpdateTime = "${__last_update_time__}"; private static final String currentUpdateTime = "${__current_update_time__}"; @@ -107,14 +110,6 @@ public class ExtractDataService { @Value("${kettle.files.keep:false}") private boolean kettleFilesKeep; - @Value("${carte.host:127.0.0.1}") - private String carte; - @Value("${carte.port:8080}") - private String port; - @Value("${carte.user:cluster}") - private String user; - @Value("${carte.passwd:cluster}") - private String passwd; private static final String shellScript = "result=`curl --location-trusted -u %s:%s -H \"label:%s\" -H \"column_separator:%s\" -H \"columns:%s\" -H \"merge_type: %s\" -T %s -XPUT http://%s:%s/api/%s/%s/_stream_load`\n" + "if [ $? -eq 0 ] ; then\n" + @@ -730,7 +725,7 @@ public class ExtractDataService { break; } - SlaveServer remoteSlaveServer = getSlaveServer(); + SlaveServer remoteSlaveServer = kettleService.getSlaveServer(); JobExecutionConfiguration jobExecutionConfiguration = new JobExecutionConfiguration(); jobExecutionConfiguration.setRemoteServer(remoteSlaveServer); jobExecutionConfiguration.setRepository(repository); @@ -738,7 +733,6 @@ public class ExtractDataService { TransExecutionConfiguration transExecutionConfiguration = new TransExecutionConfiguration(); transExecutionConfiguration.setRepository(repository); transExecutionConfiguration.setRemoteServer(remoteSlaveServer); - String lastTranceId = Trans.sendToSlaveServer(transMeta, transExecutionConfiguration, repository, null); SlaveServerTransStatus transStatus = null; boolean executing = true; @@ -772,15 +766,6 @@ public class ExtractDataService { } } - private SlaveServer getSlaveServer() { - SlaveServer remoteSlaveServer = new SlaveServer(); - remoteSlaveServer.setHostname(carte);// 设置远程IP - remoteSlaveServer.setPort(port);// 端口 - remoteSlaveServer.setUsername(user); - remoteSlaveServer.setPassword(passwd); - return remoteSlaveServer; - } - private void generateJobFile(String extractType, DatasetTable datasetTable, String columnFields) throws Exception { if (engineService.isSimpleMode()) { return; @@ -1251,33 +1236,6 @@ public class ExtractDataService { } } - public boolean isKettleRunning() { - try { - if (!InetAddress.getByName(carte).isReachable(1000)) { - return false; - } - } catch (Exception e) { - return false; - } - HttpGet getMethod = new HttpGet("http://" + carte + ":" + port); - HttpClientManager.HttpClientBuilderFacade clientBuilder = HttpClientManager.getInstance().createBuilder(); - clientBuilder.setConnectionTimeout(1); - clientBuilder.setCredentials(user, passwd); - try { - CloseableHttpClient httpClient = clientBuilder.build(); - HttpResponse httpResponse = httpClient.execute(getMethod); - int statusCode = httpResponse.getStatusLine().getStatusCode(); - if (statusCode != -1 && statusCode < 400) { - httpResponse.getEntity().getContent().close(); - return true; - } else { - return false; - } - } catch (Exception e) { - return false; - } - } - private final static String handleBinaryType = " \t\tif(\"FIELD\".equalsIgnoreCase(filed)){\n" + " get(Fields.Out, filed).setValue(r, \"\");\n" + " get(Fields.Out, filed).getValueMeta().setType(2);\n" + diff --git a/backend/src/main/java/io/dataease/service/kettle/KettleService.java b/backend/src/main/java/io/dataease/service/kettle/KettleService.java new file mode 100644 index 0000000000..aff3dc667f --- /dev/null +++ b/backend/src/main/java/io/dataease/service/kettle/KettleService.java @@ -0,0 +1,150 @@ +package io.dataease.service.kettle; + +import com.google.gson.Gson; +import io.dataease.base.domain.DeEngine; +import io.dataease.base.domain.DeEngineExample; +import io.dataease.base.mapper.DeEngineMapper; +import io.dataease.commons.utils.HttpClientConfig; +import io.dataease.commons.utils.HttpClientUtil; +import io.dataease.controller.ResultHolder; +import io.dataease.dto.KettleDTO; +import io.dataease.service.engine.EngineService; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.checkerframework.checker.units.qual.K; +import org.pentaho.di.cluster.SlaveServer; +import org.pentaho.di.core.util.HttpClientManager; +import org.springframework.core.env.Environment; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.net.InetAddress; +import java.util.Base64; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.stream.Collectors; + +@Service +public class KettleService { + + @Resource + private Environment env; + @Resource + private DeEngineMapper deEngineMapper; + @Resource + private EngineService engineService; + + public ResultHolder save(DeEngine kettle) throws Exception { + try { + validate(new Gson().fromJson(kettle.getConfiguration(), KettleDTO.class)); + kettle.setStatus("Success"); + }catch (Exception e){ + kettle.setStatus("Error"); + } + + if (StringUtils.isEmpty(kettle.getId())) { + kettle.setId(UUID.randomUUID().toString()); + kettle.setType("kettle"); + deEngineMapper.insert(kettle); + } else { + deEngineMapper.updateByPrimaryKeyWithBLOBs(kettle); + } + return ResultHolder.success(kettle); + } + + public void delete(String id){ + deEngineMapper.deleteByPrimaryKey(id); + } + + public void validate(KettleDTO kettleDTO) throws Exception { + HttpClientConfig httpClientConfig = new HttpClientConfig(); + String authValue = "Basic " + Base64.getUrlEncoder().encodeToString((kettleDTO.getUser() + + ":" + kettleDTO.getPasswd()).getBytes()); + httpClientConfig.addHeader("Authorization", authValue); + String response = HttpClientUtil.get("http://" + kettleDTO.getCarte() + ":" + kettleDTO.getPort() + "/kettle/status/", httpClientConfig); + } + + public ResultHolder validate(String id) { + DeEngine kettle = deEngineMapper.selectByPrimaryKey(id); + try { + validate(new Gson().fromJson(kettle.getConfiguration(), KettleDTO.class)); + kettle.setStatus("Success"); + deEngineMapper.updateByPrimaryKeyWithBLOBs(kettle); + return ResultHolder.success(kettle); + }catch (Exception e){ + kettle.setStatus("Error"); + deEngineMapper.updateByPrimaryKeyWithBLOBs(kettle); + return ResultHolder.error(e.getMessage()); + } + } + + public List pageList(){ + DeEngineExample deEngineExample = new DeEngineExample(); + deEngineExample.createCriteria().andTypeEqualTo("kettle"); + return deEngineMapper.selectByExampleWithBLOBs(deEngineExample); + } + + public void updateKettleStatus(){ + if(!engineService.isClusterMode()){ + return; + } + Listkettles = pageList(); + kettles.forEach(kettle -> { + validate(kettle.getId()); + }); + } + + public SlaveServer getSlaveServer() throws Exception{ + SlaveServer remoteSlaveServer = new SlaveServer(); + if(engineService.isLocalMode()){ + remoteSlaveServer.setHostname(env.getProperty("carte.host", "127.0.0.1")); + remoteSlaveServer.setPort(env.getProperty("carte.port", "8080")); + remoteSlaveServer.setUsername(env.getProperty("carte.user", "cluster")); + remoteSlaveServer.setPassword(env.getProperty("carte.passwd", "cluster")); + }else { + List kettles = pageList().stream().filter(kettle -> kettle.getStatus() != null && kettle.getStatus().equalsIgnoreCase("Success")) + .collect(Collectors.toList()); + if(CollectionUtils.isEmpty(kettles)){ + throw new Exception("No valid kettle service."); + } + DeEngine kettle = kettles.get(new Random().nextInt(kettles.size())); + KettleDTO kettleDTO = new Gson().fromJson(kettle.getConfiguration(), KettleDTO.class); + remoteSlaveServer.setHostname(kettleDTO.getCarte()); + remoteSlaveServer.setPort(kettleDTO.getPort()); + remoteSlaveServer.setUsername(kettleDTO.getUser()); + remoteSlaveServer.setPort(kettleDTO.getPasswd()); + } + return remoteSlaveServer; + } + + public boolean isKettleRunning() { + if(engineService.isLocalMode()){ + try { + KettleDTO kettleDTO = new KettleDTO(); + kettleDTO.setCarte(env.getProperty("carte.host", "127.0.0.1")); + kettleDTO.setPort(env.getProperty("carte.port", "8080")); + kettleDTO.setUser(env.getProperty("carte.user", "cluster")); + kettleDTO.setPasswd(env.getProperty("carte.passwd", "cluster")); + validate(kettleDTO); + return true; + }catch (Exception e){ + return false; + } + } + if(engineService.isClusterMode()){ + List kettles = pageList().stream().filter(kettle -> kettle.getStatus() != null && kettle.getStatus().equalsIgnoreCase("Success")) + .collect(Collectors.toList()); + if(CollectionUtils.isEmpty(kettles)){ + return false; + }else { + return true; + } + } + return false; + } + +} diff --git a/frontend/src/api/system/kettle.js b/frontend/src/api/system/kettle.js new file mode 100644 index 0000000000..dc8ceea939 --- /dev/null +++ b/frontend/src/api/system/kettle.js @@ -0,0 +1,49 @@ +import request from '@/utils/request' +import {validateDs} from "@/api/system/datasource"; + + + +export function validate(data) { + return request({ + url: '/kettle/validate', + method: 'post', + loading: true, + data + }) +} + +export function validateById(id) { + return request({ + url: '/kettle/validate/' + id, + method: 'post', + loading: true + }) +} + +export function save(data) { + return request({ + url: '/kettle/save', + method: 'post', + loading: true, + data + }) +} + +export function deleteKettle(id) { + return request({ + url: '/delete/' + id, + method: 'delete', + loading: true + }) +} + + + +export function pageList(url, data) { + return request({ + url: url, + method: 'post', + loading: true, + data + }) +} diff --git a/frontend/src/lang/en.js b/frontend/src/lang/en.js index 515a63644f..d3f53e2be2 100644 --- a/frontend/src/lang/en.js +++ b/frontend/src/lang/en.js @@ -1328,6 +1328,13 @@ export default { min_pool_size: 'Minimum of connections', max_pool_size: 'Maximum connection', max_idle_time: 'Maximum idle (seconds)', + doris_host: 'Doris Address', + query_port: 'Query Port', + http_port: 'Http Port', + bucket_num: 'Bucket number', + replication_num: 'Replication number', + please_input_bucket_num: 'Please enter Bucket number', + please_input_replication_num: 'Please enter Replication number', acquire_increment: 'Growth number', connect_timeout: 'Connection timeout (seconds)', please_input_initial_pool_size: 'Please enter the number of initial connections', @@ -1927,5 +1934,13 @@ export default { email: 'Email:', tel: 'Tel:', web: 'Web:' + }, + kettle: { + add: 'Add Kettle', + status: 'Status', + carte: 'Kettle Address', + port: 'Port', + user: 'User', + passwd: 'Password' } } diff --git a/frontend/src/lang/tw.js b/frontend/src/lang/tw.js index 1d2826763c..4864991c48 100644 --- a/frontend/src/lang/tw.js +++ b/frontend/src/lang/tw.js @@ -1329,6 +1329,13 @@ export default { min_pool_size: '最小連接數', max_pool_size: '最大連接數', max_idle_time: '最大空閑(秒)', + doris_host: 'Doris 地址', + query_port: 'Query Port', + http_port: 'Http Port', + bucket_num: 'Bucket 數量', + replication_num: '副本數量', + please_input_bucket_num: '請輸入 Bucket 數量', + please_input_replication_num: '請輸入副本數量', acquire_increment: '增長數', connect_timeout: '連接超時(秒)', please_input_initial_pool_size: '請輸入初始連接數', @@ -1937,5 +1944,13 @@ export default { email: '郵箱:', tel: '電話:', web: '網址:' + }, + kettle: { + add: '添加 Kettle 服務', + status: '狀態', + carte: 'Kettle 地址', + port: '端口', + user: '用戶名', + passwd: '密碼' } } diff --git a/frontend/src/lang/zh.js b/frontend/src/lang/zh.js index a1089daca1..0e55f057c2 100644 --- a/frontend/src/lang/zh.js +++ b/frontend/src/lang/zh.js @@ -279,7 +279,7 @@ export default { id: 'ID', millisecond: '毫秒', cannot_be_null: '不能为空', - required: '{0}是必填的', + required: '必填', already_exists: '名称不能重复', modifier: '修改人', validate: '校验', @@ -1302,9 +1302,9 @@ export default { password: '密码', host: '主机名/IP地址', doris_host: 'Doris 地址', - port: '端口', query_port: 'Query Port', http_port: 'Http Port', + port: '端口', datasource_url: '地址', please_input_datasource_url: '请输入 Elasticsearch 地址,如: http://es_host:es_port', please_input_data_base: '请输入数据库名称', @@ -1952,5 +1952,13 @@ export default { email: '邮箱:', tel: '电话:', web: '网址:' + }, + kettle: { + add: '添加 Kettle 服务', + status: '状态', + carte: 'Kettle 地址', + port: '端口', + user: '用户名', + passwd: '密码' } } diff --git a/frontend/src/views/system/SysParam/KettleSetting.vue b/frontend/src/views/system/SysParam/KettleSetting.vue new file mode 100644 index 0000000000..4a43f68110 --- /dev/null +++ b/frontend/src/views/system/SysParam/KettleSetting.vue @@ -0,0 +1,232 @@ + + + + + diff --git a/frontend/src/views/system/SysParam/index.vue b/frontend/src/views/system/SysParam/index.vue index 760e7bf2da..f95f402272 100644 --- a/frontend/src/views/system/SysParam/index.vue +++ b/frontend/src/views/system/SysParam/index.vue @@ -34,6 +34,10 @@ + + + + @@ -42,13 +46,14 @@ import BasicSetting from './BasicSetting' import EmailSetting from './EmailSetting' import SimpleMode from './SimpleModeSetting' import ClusterMode from './ClusterModeSetting' +import KettleSetting from './KettleSetting' import LayoutContent from '@/components/business/LayoutContent' import PluginCom from '@/views/system/plugin/PluginCom' import { pluginLoaded } from '@/api/user' import { engineMode } from '@/api/system/engine' export default { - components: { BasicSetting, EmailSetting, LayoutContent, PluginCom, SimpleMode, ClusterMode}, + components: { BasicSetting, EmailSetting, LayoutContent, PluginCom, SimpleMode, ClusterMode, KettleSetting}, data() { return { activeName: 'zero',