From e2d088a51ff4390cb425ee19a3b25b4638583fac Mon Sep 17 00:00:00 2001 From: taojinlong Date: Fri, 25 Mar 2022 16:59:22 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=94=AF=E6=8C=81=E9=9B=86=E7=BE=A4?= =?UTF-8?q?=EF=BC=88kettle=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../commons/utils/ExcelXlsxReader.java | 2 +- .../dataset/DataSetGroupController.java | 5 +- .../controller/engine/EngineController.java | 2 + .../controller/engine/KettleController.java | 66 +++++ .../main/java/io/dataease/dto/KettleDTO.java | 11 + .../io/dataease/job/sechedule/Schedular.java | 8 + .../service/dataset/ExtractDataService.java | 50 +--- .../service/kettle/KettleService.java | 150 +++++++++++ frontend/src/api/system/kettle.js | 49 ++++ frontend/src/lang/en.js | 15 ++ frontend/src/lang/tw.js | 15 ++ frontend/src/lang/zh.js | 12 +- .../views/system/SysParam/KettleSetting.vue | 232 ++++++++++++++++++ frontend/src/views/system/SysParam/index.vue | 7 +- 14 files changed, 573 insertions(+), 51 deletions(-) create mode 100644 backend/src/main/java/io/dataease/controller/engine/KettleController.java create mode 100644 backend/src/main/java/io/dataease/dto/KettleDTO.java create mode 100644 backend/src/main/java/io/dataease/service/kettle/KettleService.java create mode 100644 frontend/src/api/system/kettle.js create mode 100644 frontend/src/views/system/SysParam/KettleSetting.vue diff --git a/backend/src/main/java/io/dataease/commons/utils/ExcelXlsxReader.java b/backend/src/main/java/io/dataease/commons/utils/ExcelXlsxReader.java index dda075e2c7..9ee28d1ad1 100644 --- a/backend/src/main/java/io/dataease/commons/utils/ExcelXlsxReader.java +++ b/backend/src/main/java/io/dataease/commons/utils/ExcelXlsxReader.java @@ -33,7 +33,7 @@ import java.util.*; public class ExcelXlsxReader extends DefaultHandler { /** - * 自定义获取表格某些信息 + * 自定义获取表格某些信 */ public Map map = new TreeMap(); /** diff --git a/backend/src/main/java/io/dataease/controller/dataset/DataSetGroupController.java b/backend/src/main/java/io/dataease/controller/dataset/DataSetGroupController.java index 38f63df659..876c7ab312 100644 --- a/backend/src/main/java/io/dataease/controller/dataset/DataSetGroupController.java +++ b/backend/src/main/java/io/dataease/controller/dataset/DataSetGroupController.java @@ -10,6 +10,7 @@ import io.dataease.controller.request.dataset.DataSetGroupRequest; import io.dataease.dto.dataset.DataSetGroupDTO; import io.dataease.service.dataset.DataSetGroupService; import io.dataease.service.dataset.ExtractDataService; +import io.dataease.service.kettle.KettleService; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import org.apache.shiro.authz.annotation.Logical; @@ -32,6 +33,8 @@ public class DataSetGroupController { private DataSetGroupService dataSetGroupService; @Resource private ExtractDataService extractDataService; + @Resource + private KettleService kettleService; @DePermissions(value = { @DePermission(type = DePermissionType.DATASET, value = "id"), @@ -71,6 +74,6 @@ public class DataSetGroupController { @ApiIgnore @PostMapping("/isKettleRunning") public boolean isKettleRunning() { - return extractDataService.isKettleRunning(); + return kettleService.isKettleRunning(); } } diff --git a/backend/src/main/java/io/dataease/controller/engine/EngineController.java b/backend/src/main/java/io/dataease/controller/engine/EngineController.java index c032534158..85572b13ec 100644 --- a/backend/src/main/java/io/dataease/controller/engine/EngineController.java +++ b/backend/src/main/java/io/dataease/controller/engine/EngineController.java @@ -42,4 +42,6 @@ public class EngineController { return engineService.save(engine); } + + } diff --git a/backend/src/main/java/io/dataease/controller/engine/KettleController.java b/backend/src/main/java/io/dataease/controller/engine/KettleController.java new file mode 100644 index 0000000000..11756093cd --- /dev/null +++ b/backend/src/main/java/io/dataease/controller/engine/KettleController.java @@ -0,0 +1,66 @@ +package io.dataease.controller.engine; + + +import com.github.pagehelper.Page; +import com.github.pagehelper.PageHelper; +import io.dataease.auth.annotation.DePermission; +import io.dataease.base.domain.DeEngine; +import io.dataease.commons.constants.DePermissionType; +import io.dataease.commons.constants.ResourceAuthLevel; +import io.dataease.commons.utils.PageUtils; +import io.dataease.commons.utils.Pager; +import io.dataease.controller.ResultHolder; +import io.dataease.dto.KettleDTO; +import io.dataease.plugins.common.entity.XpackConditionEntity; +import io.dataease.plugins.common.entity.XpackGridRequest; +import io.dataease.plugins.config.SpringContextUtil; +import io.dataease.plugins.xpack.auth.dto.request.DataSetColumnPermissionsDTO; +import io.dataease.plugins.xpack.auth.service.ColumnPermissionService; +import io.dataease.service.kettle.KettleService; +import io.swagger.annotations.ApiOperation; +import org.springframework.web.bind.annotation.*; +import springfox.documentation.annotations.ApiIgnore; + +import javax.annotation.Resource; +import java.util.ArrayList; +import java.util.List; + +@ApiIgnore +@RequestMapping("kettle") +@RestController +public class KettleController { + + @Resource + private KettleService kettleService; + + @ApiIgnore + @PostMapping("save") + public ResultHolder save(@RequestBody DeEngine engine) throws Exception{ + return kettleService.save(engine); + } + + + @ApiIgnore + @PostMapping("validate") + public void validate(@RequestBody KettleDTO kettleDTO) throws Exception{ + kettleService.validate(kettleDTO); + } + + @ApiIgnore + @PostMapping("validate/{id}") + public ResultHolder validate(@PathVariable String id) throws Exception{ + return kettleService.validate(id); + } + + @PostMapping("/pageList/{goPage}/{pageSize}") + public Pager> pageList( @PathVariable int goPage, @PathVariable int pageSize) { + Page page = PageHelper.startPage(goPage, pageSize, true); + return PageUtils.setPageInfo(page, kettleService.pageList()); + } + + @ApiIgnore + @DeleteMapping("delete/{id}") + public void delete(@PathVariable String id) throws Exception{ + kettleService.delete(id); + } +} diff --git a/backend/src/main/java/io/dataease/dto/KettleDTO.java b/backend/src/main/java/io/dataease/dto/KettleDTO.java new file mode 100644 index 0000000000..23e7fb2327 --- /dev/null +++ b/backend/src/main/java/io/dataease/dto/KettleDTO.java @@ -0,0 +1,11 @@ +package io.dataease.dto; + +import lombok.Data; + +@Data +public class KettleDTO { + private String carte; + private String port; + private String user; + private String passwd; +} diff --git a/backend/src/main/java/io/dataease/job/sechedule/Schedular.java b/backend/src/main/java/io/dataease/job/sechedule/Schedular.java index 87c0d4c3b6..6a1693e6ff 100644 --- a/backend/src/main/java/io/dataease/job/sechedule/Schedular.java +++ b/backend/src/main/java/io/dataease/job/sechedule/Schedular.java @@ -3,6 +3,7 @@ package io.dataease.job.sechedule; import com.fit2cloud.quartz.anno.QuartzScheduled; import io.dataease.service.datasource.DatasourceService; import io.dataease.service.dataset.DataSetTableService; +import io.dataease.service.kettle.KettleService; import org.springframework.stereotype.Component; import javax.annotation.Resource; @@ -13,6 +14,8 @@ public class Schedular { private DataSetTableService dataSetTableService; @Resource private DatasourceService datasourceService; + @Resource + private KettleService kettleService; @QuartzScheduled(cron = "0 0/3 * * * ?") public void updateDatasetTableStatus() { @@ -24,4 +27,9 @@ public class Schedular { datasourceService.updateDatasourceStatus(); } + @QuartzScheduled(cron = "0 0/30 * * * ?") + public void updateKettleStatus() { + kettleService.updateKettleStatus(); + } + } diff --git a/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java b/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java index e5f314a50f..6d6f441a42 100644 --- a/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java +++ b/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java @@ -25,6 +25,7 @@ import io.dataease.exception.DataEaseException; import io.dataease.listener.util.CacheUtils; import io.dataease.provider.QueryProvider; import io.dataease.service.engine.EngineService; +import io.dataease.service.kettle.KettleService; import io.dataease.service.message.DeMsgutil; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.io.FileUtils; @@ -98,6 +99,8 @@ public class ExtractDataService { private ExtChartViewMapper extChartViewMapper; @Resource private EngineService engineService; + @Resource + private KettleService kettleService; private static final String lastUpdateTime = "${__last_update_time__}"; private static final String currentUpdateTime = "${__current_update_time__}"; @@ -107,14 +110,6 @@ public class ExtractDataService { @Value("${kettle.files.keep:false}") private boolean kettleFilesKeep; - @Value("${carte.host:127.0.0.1}") - private String carte; - @Value("${carte.port:8080}") - private String port; - @Value("${carte.user:cluster}") - private String user; - @Value("${carte.passwd:cluster}") - private String passwd; private static final String shellScript = "result=`curl --location-trusted -u %s:%s -H \"label:%s\" -H \"column_separator:%s\" -H \"columns:%s\" -H \"merge_type: %s\" -T %s -XPUT http://%s:%s/api/%s/%s/_stream_load`\n" + "if [ $? -eq 0 ] ; then\n" + @@ -730,7 +725,7 @@ public class ExtractDataService { break; } - SlaveServer remoteSlaveServer = getSlaveServer(); + SlaveServer remoteSlaveServer = kettleService.getSlaveServer(); JobExecutionConfiguration jobExecutionConfiguration = new JobExecutionConfiguration(); jobExecutionConfiguration.setRemoteServer(remoteSlaveServer); jobExecutionConfiguration.setRepository(repository); @@ -738,7 +733,6 @@ public class ExtractDataService { TransExecutionConfiguration transExecutionConfiguration = new TransExecutionConfiguration(); transExecutionConfiguration.setRepository(repository); transExecutionConfiguration.setRemoteServer(remoteSlaveServer); - String lastTranceId = Trans.sendToSlaveServer(transMeta, transExecutionConfiguration, repository, null); SlaveServerTransStatus transStatus = null; boolean executing = true; @@ -772,15 +766,6 @@ public class ExtractDataService { } } - private SlaveServer getSlaveServer() { - SlaveServer remoteSlaveServer = new SlaveServer(); - remoteSlaveServer.setHostname(carte);// 设置远程IP - remoteSlaveServer.setPort(port);// 端口 - remoteSlaveServer.setUsername(user); - remoteSlaveServer.setPassword(passwd); - return remoteSlaveServer; - } - private void generateJobFile(String extractType, DatasetTable datasetTable, String columnFields) throws Exception { if (engineService.isSimpleMode()) { return; @@ -1251,33 +1236,6 @@ public class ExtractDataService { } } - public boolean isKettleRunning() { - try { - if (!InetAddress.getByName(carte).isReachable(1000)) { - return false; - } - } catch (Exception e) { - return false; - } - HttpGet getMethod = new HttpGet("http://" + carte + ":" + port); - HttpClientManager.HttpClientBuilderFacade clientBuilder = HttpClientManager.getInstance().createBuilder(); - clientBuilder.setConnectionTimeout(1); - clientBuilder.setCredentials(user, passwd); - try { - CloseableHttpClient httpClient = clientBuilder.build(); - HttpResponse httpResponse = httpClient.execute(getMethod); - int statusCode = httpResponse.getStatusLine().getStatusCode(); - if (statusCode != -1 && statusCode < 400) { - httpResponse.getEntity().getContent().close(); - return true; - } else { - return false; - } - } catch (Exception e) { - return false; - } - } - private final static String handleBinaryType = " \t\tif(\"FIELD\".equalsIgnoreCase(filed)){\n" + " get(Fields.Out, filed).setValue(r, \"\");\n" + " get(Fields.Out, filed).getValueMeta().setType(2);\n" + diff --git a/backend/src/main/java/io/dataease/service/kettle/KettleService.java b/backend/src/main/java/io/dataease/service/kettle/KettleService.java new file mode 100644 index 0000000000..aff3dc667f --- /dev/null +++ b/backend/src/main/java/io/dataease/service/kettle/KettleService.java @@ -0,0 +1,150 @@ +package io.dataease.service.kettle; + +import com.google.gson.Gson; +import io.dataease.base.domain.DeEngine; +import io.dataease.base.domain.DeEngineExample; +import io.dataease.base.mapper.DeEngineMapper; +import io.dataease.commons.utils.HttpClientConfig; +import io.dataease.commons.utils.HttpClientUtil; +import io.dataease.controller.ResultHolder; +import io.dataease.dto.KettleDTO; +import io.dataease.service.engine.EngineService; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.checkerframework.checker.units.qual.K; +import org.pentaho.di.cluster.SlaveServer; +import org.pentaho.di.core.util.HttpClientManager; +import org.springframework.core.env.Environment; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.net.InetAddress; +import java.util.Base64; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.stream.Collectors; + +@Service +public class KettleService { + + @Resource + private Environment env; + @Resource + private DeEngineMapper deEngineMapper; + @Resource + private EngineService engineService; + + public ResultHolder save(DeEngine kettle) throws Exception { + try { + validate(new Gson().fromJson(kettle.getConfiguration(), KettleDTO.class)); + kettle.setStatus("Success"); + }catch (Exception e){ + kettle.setStatus("Error"); + } + + if (StringUtils.isEmpty(kettle.getId())) { + kettle.setId(UUID.randomUUID().toString()); + kettle.setType("kettle"); + deEngineMapper.insert(kettle); + } else { + deEngineMapper.updateByPrimaryKeyWithBLOBs(kettle); + } + return ResultHolder.success(kettle); + } + + public void delete(String id){ + deEngineMapper.deleteByPrimaryKey(id); + } + + public void validate(KettleDTO kettleDTO) throws Exception { + HttpClientConfig httpClientConfig = new HttpClientConfig(); + String authValue = "Basic " + Base64.getUrlEncoder().encodeToString((kettleDTO.getUser() + + ":" + kettleDTO.getPasswd()).getBytes()); + httpClientConfig.addHeader("Authorization", authValue); + String response = HttpClientUtil.get("http://" + kettleDTO.getCarte() + ":" + kettleDTO.getPort() + "/kettle/status/", httpClientConfig); + } + + public ResultHolder validate(String id) { + DeEngine kettle = deEngineMapper.selectByPrimaryKey(id); + try { + validate(new Gson().fromJson(kettle.getConfiguration(), KettleDTO.class)); + kettle.setStatus("Success"); + deEngineMapper.updateByPrimaryKeyWithBLOBs(kettle); + return ResultHolder.success(kettle); + }catch (Exception e){ + kettle.setStatus("Error"); + deEngineMapper.updateByPrimaryKeyWithBLOBs(kettle); + return ResultHolder.error(e.getMessage()); + } + } + + public List pageList(){ + DeEngineExample deEngineExample = new DeEngineExample(); + deEngineExample.createCriteria().andTypeEqualTo("kettle"); + return deEngineMapper.selectByExampleWithBLOBs(deEngineExample); + } + + public void updateKettleStatus(){ + if(!engineService.isClusterMode()){ + return; + } + Listkettles = pageList(); + kettles.forEach(kettle -> { + validate(kettle.getId()); + }); + } + + public SlaveServer getSlaveServer() throws Exception{ + SlaveServer remoteSlaveServer = new SlaveServer(); + if(engineService.isLocalMode()){ + remoteSlaveServer.setHostname(env.getProperty("carte.host", "127.0.0.1")); + remoteSlaveServer.setPort(env.getProperty("carte.port", "8080")); + remoteSlaveServer.setUsername(env.getProperty("carte.user", "cluster")); + remoteSlaveServer.setPassword(env.getProperty("carte.passwd", "cluster")); + }else { + List kettles = pageList().stream().filter(kettle -> kettle.getStatus() != null && kettle.getStatus().equalsIgnoreCase("Success")) + .collect(Collectors.toList()); + if(CollectionUtils.isEmpty(kettles)){ + throw new Exception("No valid kettle service."); + } + DeEngine kettle = kettles.get(new Random().nextInt(kettles.size())); + KettleDTO kettleDTO = new Gson().fromJson(kettle.getConfiguration(), KettleDTO.class); + remoteSlaveServer.setHostname(kettleDTO.getCarte()); + remoteSlaveServer.setPort(kettleDTO.getPort()); + remoteSlaveServer.setUsername(kettleDTO.getUser()); + remoteSlaveServer.setPort(kettleDTO.getPasswd()); + } + return remoteSlaveServer; + } + + public boolean isKettleRunning() { + if(engineService.isLocalMode()){ + try { + KettleDTO kettleDTO = new KettleDTO(); + kettleDTO.setCarte(env.getProperty("carte.host", "127.0.0.1")); + kettleDTO.setPort(env.getProperty("carte.port", "8080")); + kettleDTO.setUser(env.getProperty("carte.user", "cluster")); + kettleDTO.setPasswd(env.getProperty("carte.passwd", "cluster")); + validate(kettleDTO); + return true; + }catch (Exception e){ + return false; + } + } + if(engineService.isClusterMode()){ + List kettles = pageList().stream().filter(kettle -> kettle.getStatus() != null && kettle.getStatus().equalsIgnoreCase("Success")) + .collect(Collectors.toList()); + if(CollectionUtils.isEmpty(kettles)){ + return false; + }else { + return true; + } + } + return false; + } + +} diff --git a/frontend/src/api/system/kettle.js b/frontend/src/api/system/kettle.js new file mode 100644 index 0000000000..dc8ceea939 --- /dev/null +++ b/frontend/src/api/system/kettle.js @@ -0,0 +1,49 @@ +import request from '@/utils/request' +import {validateDs} from "@/api/system/datasource"; + + + +export function validate(data) { + return request({ + url: '/kettle/validate', + method: 'post', + loading: true, + data + }) +} + +export function validateById(id) { + return request({ + url: '/kettle/validate/' + id, + method: 'post', + loading: true + }) +} + +export function save(data) { + return request({ + url: '/kettle/save', + method: 'post', + loading: true, + data + }) +} + +export function deleteKettle(id) { + return request({ + url: '/delete/' + id, + method: 'delete', + loading: true + }) +} + + + +export function pageList(url, data) { + return request({ + url: url, + method: 'post', + loading: true, + data + }) +} diff --git a/frontend/src/lang/en.js b/frontend/src/lang/en.js index 515a63644f..d3f53e2be2 100644 --- a/frontend/src/lang/en.js +++ b/frontend/src/lang/en.js @@ -1328,6 +1328,13 @@ export default { min_pool_size: 'Minimum of connections', max_pool_size: 'Maximum connection', max_idle_time: 'Maximum idle (seconds)', + doris_host: 'Doris Address', + query_port: 'Query Port', + http_port: 'Http Port', + bucket_num: 'Bucket number', + replication_num: 'Replication number', + please_input_bucket_num: 'Please enter Bucket number', + please_input_replication_num: 'Please enter Replication number', acquire_increment: 'Growth number', connect_timeout: 'Connection timeout (seconds)', please_input_initial_pool_size: 'Please enter the number of initial connections', @@ -1927,5 +1934,13 @@ export default { email: 'Email:', tel: 'Tel:', web: 'Web:' + }, + kettle: { + add: 'Add Kettle', + status: 'Status', + carte: 'Kettle Address', + port: 'Port', + user: 'User', + passwd: 'Password' } } diff --git a/frontend/src/lang/tw.js b/frontend/src/lang/tw.js index 1d2826763c..4864991c48 100644 --- a/frontend/src/lang/tw.js +++ b/frontend/src/lang/tw.js @@ -1329,6 +1329,13 @@ export default { min_pool_size: '最小連接數', max_pool_size: '最大連接數', max_idle_time: '最大空閑(秒)', + doris_host: 'Doris 地址', + query_port: 'Query Port', + http_port: 'Http Port', + bucket_num: 'Bucket 數量', + replication_num: '副本數量', + please_input_bucket_num: '請輸入 Bucket 數量', + please_input_replication_num: '請輸入副本數量', acquire_increment: '增長數', connect_timeout: '連接超時(秒)', please_input_initial_pool_size: '請輸入初始連接數', @@ -1937,5 +1944,13 @@ export default { email: '郵箱:', tel: '電話:', web: '網址:' + }, + kettle: { + add: '添加 Kettle 服務', + status: '狀態', + carte: 'Kettle 地址', + port: '端口', + user: '用戶名', + passwd: '密碼' } } diff --git a/frontend/src/lang/zh.js b/frontend/src/lang/zh.js index a1089daca1..0e55f057c2 100644 --- a/frontend/src/lang/zh.js +++ b/frontend/src/lang/zh.js @@ -279,7 +279,7 @@ export default { id: 'ID', millisecond: '毫秒', cannot_be_null: '不能为空', - required: '{0}是必填的', + required: '必填', already_exists: '名称不能重复', modifier: '修改人', validate: '校验', @@ -1302,9 +1302,9 @@ export default { password: '密码', host: '主机名/IP地址', doris_host: 'Doris 地址', - port: '端口', query_port: 'Query Port', http_port: 'Http Port', + port: '端口', datasource_url: '地址', please_input_datasource_url: '请输入 Elasticsearch 地址,如: http://es_host:es_port', please_input_data_base: '请输入数据库名称', @@ -1952,5 +1952,13 @@ export default { email: '邮箱:', tel: '电话:', web: '网址:' + }, + kettle: { + add: '添加 Kettle 服务', + status: '状态', + carte: 'Kettle 地址', + port: '端口', + user: '用户名', + passwd: '密码' } } diff --git a/frontend/src/views/system/SysParam/KettleSetting.vue b/frontend/src/views/system/SysParam/KettleSetting.vue new file mode 100644 index 0000000000..4a43f68110 --- /dev/null +++ b/frontend/src/views/system/SysParam/KettleSetting.vue @@ -0,0 +1,232 @@ + + + + + diff --git a/frontend/src/views/system/SysParam/index.vue b/frontend/src/views/system/SysParam/index.vue index 760e7bf2da..f95f402272 100644 --- a/frontend/src/views/system/SysParam/index.vue +++ b/frontend/src/views/system/SysParam/index.vue @@ -34,6 +34,10 @@ + + + + @@ -42,13 +46,14 @@ import BasicSetting from './BasicSetting' import EmailSetting from './EmailSetting' import SimpleMode from './SimpleModeSetting' import ClusterMode from './ClusterModeSetting' +import KettleSetting from './KettleSetting' import LayoutContent from '@/components/business/LayoutContent' import PluginCom from '@/views/system/plugin/PluginCom' import { pluginLoaded } from '@/api/user' import { engineMode } from '@/api/system/engine' export default { - components: { BasicSetting, EmailSetting, LayoutContent, PluginCom, SimpleMode, ClusterMode}, + components: { BasicSetting, EmailSetting, LayoutContent, PluginCom, SimpleMode, ClusterMode, KettleSetting}, data() { return { activeName: 'zero',