From a02df9d8436a0659060454b935e3786acda22fbe Mon Sep 17 00:00:00 2001 From: junjie Date: Tue, 13 Apr 2021 18:21:40 +0800 Subject: [PATCH] =?UTF-8?q?feat(backend):spark=20cache=20=E5=88=9D?= =?UTF-8?q?=E6=AD=A5=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../commons/utils/CommonThreadPool.java | 99 ++++++++++++++++ .../java/io/dataease/config/CommonConfig.java | 27 ++--- .../dataease/listener/AppStartListener.java | 2 + .../listener/AppStartReadHBaseListener.java | 58 +++++++++ .../service/chart/ChartViewService.java | 10 +- .../dataset/DataSetTableFieldsService.java | 6 + .../service/dataset/ExtractDataService.java | 86 +++++++------- .../io/dataease/service/spark/CacheUtil.java | 53 +++++++++ .../io/dataease/service/spark/SparkCalc.java | 112 +++++++++--------- 9 files changed, 338 insertions(+), 115 deletions(-) create mode 100644 backend/src/main/java/io/dataease/commons/utils/CommonThreadPool.java create mode 100644 backend/src/main/java/io/dataease/listener/AppStartReadHBaseListener.java create mode 100644 backend/src/main/java/io/dataease/service/spark/CacheUtil.java diff --git a/backend/src/main/java/io/dataease/commons/utils/CommonThreadPool.java b/backend/src/main/java/io/dataease/commons/utils/CommonThreadPool.java new file mode 100644 index 0000000000..1b56cb5a1f --- /dev/null +++ b/backend/src/main/java/io/dataease/commons/utils/CommonThreadPool.java @@ -0,0 +1,99 @@ +package io.dataease.commons.utils; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.util.concurrent.*; + +/** + * @Author gin + * @Date 2021/4/13 4:08 下午 + */ +public class CommonThreadPool { + + private int corePoolSize = 10; + + private int maxQueueSize = 10; + + private int keepAliveSeconds = 600; + + private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor; + + @PostConstruct + public void init() { + scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(corePoolSize); + scheduledThreadPoolExecutor.setKeepAliveTime(keepAliveSeconds, TimeUnit.SECONDS); + } + + @PreDestroy + public void shutdown() { + if (scheduledThreadPoolExecutor != null) { + scheduledThreadPoolExecutor.shutdown(); + } + } + + /** + * 线程池是否可用(实际队列数是否小于最大队列数) + * + * @return true为可用,false不可用 + */ + public boolean available() { + return scheduledThreadPoolExecutor.getQueue().size() <= maxQueueSize; + } + + /** + * 添加任务,不强制限制队列数 + * + * @param task 任务 + */ + public void addTask(Runnable task) { + scheduledThreadPoolExecutor.execute(task); + } + + /** + * 添加延迟执行任务,不强制限制队列数 + * + * @param task 任务 + * @param delay 延迟时间 + * @param unit 延迟时间单位 + */ + public void scheduleTask(Runnable task, long delay, TimeUnit unit) { + scheduledThreadPoolExecutor.schedule(task, delay, unit); + } + + /** + * 添加任务和超时时间(超时时间内未执行完的任务将被终止并移除线程池,防止任务执行时间过长而占用线程池) + * + * @param task 任务 + * @param timeOut 超时时间 + * @param timeUnit 超时时间单位 + */ + public void addTask(Runnable task, long timeOut, TimeUnit timeUnit) { + scheduledThreadPoolExecutor.execute(() -> { + ExecutorService executorService = Executors.newSingleThreadExecutor(); + try { + Future future = executorService.submit(task); + future.get(timeOut, timeUnit); // 此行会阻塞,直到任务执行完或超时 + } catch (TimeoutException timeoutException) { + LogUtil.getLogger().error("timeout to execute task", timeoutException); + } catch (Exception exception) { + LogUtil.getLogger().error("failed to execute task", exception); + } finally { + if (!executorService.isShutdown()) { + executorService.shutdown(); + } + } + }); + } + + public void setCorePoolSize(int corePoolSize) { + this.corePoolSize = corePoolSize; + } + + public void setMaxQueueSize(int maxQueueSize) { + this.maxQueueSize = maxQueueSize; + } + + public void setKeepAliveSeconds(int keepAliveSeconds) { + this.keepAliveSeconds = keepAliveSeconds; + } +} diff --git a/backend/src/main/java/io/dataease/config/CommonConfig.java b/backend/src/main/java/io/dataease/config/CommonConfig.java index 94e5fb0050..f22c749e62 100644 --- a/backend/src/main/java/io/dataease/config/CommonConfig.java +++ b/backend/src/main/java/io/dataease/config/CommonConfig.java @@ -1,6 +1,7 @@ package io.dataease.config; import com.fit2cloud.autoconfigure.QuartzAutoConfiguration; +import io.dataease.commons.utils.CommonThreadPool; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SparkSession; @@ -33,31 +34,20 @@ public class CommonConfig { return configuration; } - @Bean @ConditionalOnMissingBean - public JavaSparkContext javaSparkContext() { + public SparkSession javaSparkSession() { SparkSession spark = SparkSession.builder() .appName(env.getProperty("spark.appName", "DataeaseJob")) .master(env.getProperty("spark.master", "local[*]")) .config("spark.scheduler.mode", "FAIR") .getOrCreate(); - JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - return sc; + return spark; } @Bean @ConditionalOnMissingBean - public SQLContext sqlContext(JavaSparkContext javaSparkContext) { - SQLContext sqlContext = new SQLContext(javaSparkContext); - sqlContext.setConf("spark.sql.shuffle.partitions", env.getProperty("spark.sql.shuffle.partitions", "1")); - sqlContext.setConf("spark.default.parallelism", env.getProperty("spark.default.parallelism", "1")); - return sqlContext; - } - - @Bean - @ConditionalOnMissingBean - public KettleFileRepository kettleFileRepository()throws Exception{ + public KettleFileRepository kettleFileRepository() throws Exception { KettleEnvironment.init(); KettleFileRepository repository = new KettleFileRepository(); KettleFileRepositoryMeta kettleDatabaseMeta = new KettleFileRepositoryMeta("KettleFileRepository", "repo", @@ -65,4 +55,13 @@ public class CommonConfig { repository.init(kettleDatabaseMeta); return repository; } + + @Bean(destroyMethod = "shutdown") + public CommonThreadPool resourcePoolThreadPool() { + CommonThreadPool commonThreadPool = new CommonThreadPool(); + commonThreadPool.setCorePoolSize(20); + commonThreadPool.setMaxQueueSize(100); + commonThreadPool.setKeepAliveSeconds(3600); + return commonThreadPool; + } } diff --git a/backend/src/main/java/io/dataease/listener/AppStartListener.java b/backend/src/main/java/io/dataease/listener/AppStartListener.java index f44b517d36..6193506241 100644 --- a/backend/src/main/java/io/dataease/listener/AppStartListener.java +++ b/backend/src/main/java/io/dataease/listener/AppStartListener.java @@ -6,12 +6,14 @@ import io.dataease.service.ScheduleService; import io.dataease.service.dataset.DataSetTableTaskService; import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.ApplicationListener; +import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.List; @Component +@Order(value = 1) public class AppStartListener implements ApplicationListener { @Resource private ScheduleService scheduleService; diff --git a/backend/src/main/java/io/dataease/listener/AppStartReadHBaseListener.java b/backend/src/main/java/io/dataease/listener/AppStartReadHBaseListener.java new file mode 100644 index 0000000000..2fc2f731d7 --- /dev/null +++ b/backend/src/main/java/io/dataease/listener/AppStartReadHBaseListener.java @@ -0,0 +1,58 @@ +package io.dataease.listener; + +import io.dataease.base.domain.DatasetTable; +import io.dataease.base.domain.DatasetTableExample; +import io.dataease.base.domain.DatasetTableField; +import io.dataease.base.domain.DatasetTableFieldExample; +import io.dataease.base.mapper.DatasetTableFieldMapper; +import io.dataease.base.mapper.DatasetTableMapper; +import io.dataease.commons.utils.CommonBeanFactory; +import io.dataease.commons.utils.CommonThreadPool; +import io.dataease.service.dataset.DataSetTableFieldsService; +import io.dataease.service.spark.SparkCalc; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.SparkSession; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.ApplicationListener; +import org.springframework.core.annotation.Order; +import org.springframework.core.env.Environment; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.List; + +@Component +@Order(value = 2) +public class AppStartReadHBaseListener implements ApplicationListener { + @Resource + private CommonThreadPool commonThreadPool; + @Resource + private SparkCalc sparkCalc; + @Resource + private Environment env; // 保存了配置文件的信息 + + @Resource + private DatasetTableMapper datasetTableMapper; + @Resource + private DataSetTableFieldsService dataSetTableFieldsService; + + @Override + public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) { + System.out.println("================= Read HBase start ================="); + // 项目启动,从数据集中找到定时抽取的表,从HBase中读取放入缓存 + DatasetTableExample datasetTableExample = new DatasetTableExample(); + datasetTableExample.createCriteria().andModeEqualTo(1); + List datasetTables = datasetTableMapper.selectByExampleWithBLOBs(datasetTableExample); + for (DatasetTable table : datasetTables) { + commonThreadPool.addTask(() -> { + try { + List fields = dataSetTableFieldsService.getFieldsByTableId(table.getId()); + sparkCalc.getHBaseDataAndCache(table.getId(), fields); + } catch (Exception e) { + e.printStackTrace(); + } + }); + } + } +} diff --git a/backend/src/main/java/io/dataease/service/chart/ChartViewService.java b/backend/src/main/java/io/dataease/service/chart/ChartViewService.java index cc0ee213f8..2100b7ada1 100644 --- a/backend/src/main/java/io/dataease/service/chart/ChartViewService.java +++ b/backend/src/main/java/io/dataease/service/chart/ChartViewService.java @@ -4,6 +4,7 @@ import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import io.dataease.base.domain.*; import io.dataease.base.mapper.ChartViewMapper; +import io.dataease.base.mapper.DatasetTableFieldMapper; import io.dataease.commons.utils.AuthUtils; import io.dataease.commons.utils.BeanUtils; import io.dataease.controller.request.chart.ChartViewRequest; @@ -16,6 +17,7 @@ import io.dataease.dto.chart.ChartViewDTO; import io.dataease.dto.chart.ChartViewFieldDTO; import io.dataease.dto.chart.Series; import io.dataease.dto.dataset.DataTableInfoDTO; +import io.dataease.service.dataset.DataSetTableFieldsService; import io.dataease.service.dataset.DataSetTableService; import io.dataease.service.spark.SparkCalc; import org.apache.commons.collections4.CollectionUtils; @@ -41,6 +43,8 @@ public class ChartViewService { private DatasourceService datasourceService; @Resource private SparkCalc sparkCalc; + @Resource + private DataSetTableFieldsService dataSetTableFieldsService; public ChartViewWithBLOBs save(ChartViewWithBLOBs chartView) { checkName(chartView); @@ -121,9 +125,9 @@ public class ChartViewService { } data = datasourceProvider.getData(datasourceRequest); } else if (table.getMode() == 1) {// 抽取 -// DataTableInfoDTO dataTableInfoDTO = new Gson().fromJson(table.getInfo(), DataTableInfoDTO.class); -// String tableName = dataTableInfoDTO.getTable() + "-" + table.getDataSourceId();// todo hBase table name maybe change - data = sparkCalc.getData(table.getId(), xAxis, yAxis, "tmp_" + view.getId().split("-")[0]); + // 获取数据集de字段 + List fields = dataSetTableFieldsService.getFieldsByTableId(table.getId()); + data = sparkCalc.getData(table.getId(), fields, xAxis, yAxis, "tmp_" + view.getId().split("-")[0]); } // 图表组件可再扩展 diff --git a/backend/src/main/java/io/dataease/service/dataset/DataSetTableFieldsService.java b/backend/src/main/java/io/dataease/service/dataset/DataSetTableFieldsService.java index b4367ae8ac..020684bc8e 100644 --- a/backend/src/main/java/io/dataease/service/dataset/DataSetTableFieldsService.java +++ b/backend/src/main/java/io/dataease/service/dataset/DataSetTableFieldsService.java @@ -60,4 +60,10 @@ public class DataSetTableFieldsService { datasetTableFieldExample.createCriteria().andIdIn(ids); return datasetTableFieldMapper.selectByExample(datasetTableFieldExample); } + + public List getFieldsByTableId(String id) { + DatasetTableFieldExample datasetTableFieldExample = new DatasetTableFieldExample(); + datasetTableFieldExample.createCriteria().andTableIdEqualTo(id); + return datasetTableFieldMapper.selectByExample(datasetTableFieldExample); + } } diff --git a/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java b/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java index e252383830..d952cbaefe 100644 --- a/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java +++ b/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java @@ -13,6 +13,7 @@ import io.dataease.datasource.constants.DatasourceTypes; import io.dataease.datasource.dto.MysqlConfigrationDTO; import io.dataease.dto.dataset.DataSetTaskLogDTO; import io.dataease.dto.dataset.DataTableInfoDTO; +import io.dataease.service.spark.SparkCalc; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; @@ -97,7 +98,7 @@ public class ExtractDataService { private DataSetTableTaskService dataSetTableTaskService; @Resource private DatasourceMapper datasourceMapper; - private static ExecutorService pool = Executors.newScheduledThreadPool(50); //设置连接池 + private static ExecutorService pool = Executors.newScheduledThreadPool(50); //设置连接池 private Connection connection; private static String lastUpdateTime = "${__last_update_time__}"; @@ -120,6 +121,9 @@ public class ExtractDataService { @Value("${hbase.zookeeper.property.clientPort:2181}") private String zkPort; + @Resource + private SparkCalc sparkCalc; + public void extractData(String datasetTableId, String taskId, String type) { DatasetTableTaskLog datasetTableTaskLog = new DatasetTableTaskLog(); @@ -131,60 +135,62 @@ public class ExtractDataService { List datasetTableFields = dataSetTableFieldsService.list(DatasetTableField.builder().tableId(datasetTable.getId()).build()); String table = new Gson().fromJson(datasetTable.getInfo(), DataTableInfoDTO.class).getTable(); TableName hbaseTable = TableName.valueOf(datasetTableId); - switch (updateType){ + switch (updateType) { // 全量更新 case all_scope: writeDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, taskId); //check pentaho_mappings table TableName pentaho_mappings = TableName.valueOf(this.pentaho_mappings); - if(!admin.tableExists(pentaho_mappings)){ - creatHaseTable(pentaho_mappings, admin, Arrays.asList("columns","key")); + if (!admin.tableExists(pentaho_mappings)) { + creatHaseTable(pentaho_mappings, admin, Arrays.asList("columns", "key")); } //check pentaho files - if(!isExitFile("job_" + datasetTableId + ".kjb") || !isExitFile("trans_" + datasetTableId + ".ktr")){ + if (!isExitFile("job_" + datasetTableId + ".kjb") || !isExitFile("trans_" + datasetTableId + ".ktr")) { generateTransFile("all_scope", datasetTable, datasource, table, datasetTableFields, null); generateJobFile("all_scope", datasetTable); } - if(!admin.tableExists(hbaseTable)){ + if (!admin.tableExists(hbaseTable)) { creatHaseTable(hbaseTable, admin, Arrays.asList(dataease_column_family)); } admin.disableTable(hbaseTable); admin.truncateTable(hbaseTable, true); extractData(datasetTable, "all_scope"); + // after sync complete,read data to cache from HBase + sparkCalc.getHBaseDataAndCache(datasetTableId, dataSetTableFieldsService.getFieldsByTableId(datasetTableId)); datasetTableTaskLog.setStatus(JobStatus.Completed.name()); datasetTableTaskLog.setEndTime(System.currentTimeMillis()); dataSetTableTaskLogService.save(datasetTableTaskLog); break; case add_scope: // 增量更新 - if(!admin.tableExists(hbaseTable)){ + if (!admin.tableExists(hbaseTable)) { LogUtil.error("TableName error, dataaset: " + datasetTableId); return; } DatasetTableIncrementalConfig datasetTableIncrementalConfig = dataSetTableService.incrementalConfig(datasetTableId); - if(datasetTableIncrementalConfig == null || StringUtils.isEmpty(datasetTableIncrementalConfig.getTableId())){ + if (datasetTableIncrementalConfig == null || StringUtils.isEmpty(datasetTableIncrementalConfig.getTableId())) { return; } DatasetTableTaskLog request = new DatasetTableTaskLog(); request.setTableId(datasetTableId); request.setStatus(JobStatus.Completed.name()); List dataSetTaskLogDTOS = dataSetTableTaskLogService.list(request); - if(CollectionUtils.isEmpty(dataSetTaskLogDTOS)){ + if (CollectionUtils.isEmpty(dataSetTaskLogDTOS)) { return; } - writeDatasetTableTaskLog(datasetTableTaskLog,datasetTableId, taskId); + writeDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, taskId); // 增量添加 - if(StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd().replace(" ", ""))){ + if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd().replace(" ", ""))) { System.out.println("datasetTableIncrementalConfig.getIncrementalAdd(): " + datasetTableIncrementalConfig.getIncrementalAdd()); String sql = datasetTableIncrementalConfig.getIncrementalAdd().replace(lastUpdateTime, dataSetTaskLogDTOS.get(0).getStartTime().toString() .replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString())); - if(!isExitFile("job_add_" + datasetTableId + ".kjb") || !isExitFile("trans_add_" + datasetTableId + ".ktr")){ + if (!isExitFile("job_add_" + datasetTableId + ".kjb") || !isExitFile("trans_add_" + datasetTableId + ".ktr")) { generateTransFile("incremental_add", datasetTable, datasource, table, datasetTableFields, sql); generateJobFile("incremental_add", datasetTable); } @@ -193,39 +199,39 @@ public class ExtractDataService { } // 增量删除 - if( StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalDelete())){ + if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalDelete())) { String sql = datasetTableIncrementalConfig.getIncrementalDelete().replace(lastUpdateTime, dataSetTaskLogDTOS.get(0).getStartTime().toString() .replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString())); - if(!isExitFile("job_delete_" + datasetTableId + ".kjb") || !isExitFile("trans_delete_" + datasetTableId + ".ktr")){ + if (!isExitFile("job_delete_" + datasetTableId + ".kjb") || !isExitFile("trans_delete_" + datasetTableId + ".ktr")) { generateTransFile("incremental_delete", datasetTable, datasource, table, datasetTableFields, sql); generateJobFile("incremental_delete", datasetTable); } extractData(datasetTable, "incremental_delete"); } - + // after sync complete,read data to cache from HBase + sparkCalc.getHBaseDataAndCache(datasetTableId, dataSetTableFieldsService.getFieldsByTableId(datasetTableId)); datasetTableTaskLog.setStatus(JobStatus.Completed.name()); datasetTableTaskLog.setEndTime(System.currentTimeMillis()); dataSetTableTaskLogService.save(datasetTableTaskLog); break; } - }catch (Exception e){ + } catch (Exception e) { e.printStackTrace(); LogUtil.error("ExtractData error, dataaset: " + datasetTableId); LogUtil.error(e.getMessage(), e); datasetTableTaskLog.setStatus(JobStatus.Error.name()); datasetTableTaskLog.setEndTime(System.currentTimeMillis()); dataSetTableTaskLogService.save(datasetTableTaskLog); - } - finally { + } finally { DatasetTableTask datasetTableTask = dataSetTableTaskService.get(taskId); - if (datasetTableTask != null && datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString())){ + if (datasetTableTask != null && datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString())) { datasetTableTask.setRate(ScheduleType.SIMPLE_COMPLETE.toString()); dataSetTableTaskService.update(datasetTableTask); } } } - private void writeDatasetTableTaskLog(DatasetTableTaskLog datasetTableTaskLog, String datasetTableId, String taskId){ + private void writeDatasetTableTaskLog(DatasetTableTaskLog datasetTableTaskLog, String datasetTableId, String taskId) { datasetTableTaskLog.setTableId(datasetTableId); datasetTableTaskLog.setTaskId(taskId); datasetTableTaskLog.setStatus(JobStatus.Underway.name()); @@ -233,7 +239,7 @@ public class ExtractDataService { dataSetTableTaskLogService.save(datasetTableTaskLog); } - private void creatHaseTable(TableName tableName, Admin admin, List columnFamily)throws Exception{ + private void creatHaseTable(TableName tableName, Admin admin, List columnFamily) throws Exception { TableDescriptorBuilder descBuilder = TableDescriptorBuilder.newBuilder(tableName); Collection families = new ArrayList<>(); for (String s : columnFamily) { @@ -245,11 +251,11 @@ public class ExtractDataService { admin.createTable(desc); } - private void extractData(DatasetTable datasetTable, String extractType)throws Exception{ + private void extractData(DatasetTable datasetTable, String extractType) throws Exception { KettleFileRepository repository = CommonBeanFactory.getBean(KettleFileRepository.class); RepositoryDirectoryInterface repositoryDirectoryInterface = repository.loadRepositoryDirectoryTree(); JobMeta jobMeta = null; - switch (extractType){ + switch (extractType) { case "all_scope": jobMeta = repository.loadJob("job_" + datasetTable.getId(), repositoryDirectoryInterface, null, null); break; @@ -272,27 +278,27 @@ public class ExtractDataService { do { jobStatus = remoteSlaveServer.getJobStatus(jobMeta.getName(), lastCarteObjectId, 0); } while (jobStatus != null && jobStatus.isRunning()); - if(jobStatus.getStatusDescription().equals("Finished")){ + if (jobStatus.getStatusDescription().equals("Finished")) { return; - }else { + } else { throw new Exception(jobStatus.getLoggingString()); - } + } } - private synchronized Connection getConnection() throws Exception{ - if(connection == null || connection.isClosed()){ + private synchronized Connection getConnection() throws Exception { + if (connection == null || connection.isClosed()) { Configuration cfg = CommonBeanFactory.getBean(Configuration.class); connection = ConnectionFactory.createConnection(cfg, pool); } return connection; } - private boolean isExitFile(String fileName){ - File file=new File(root_path + fileName); + private boolean isExitFile(String fileName) { + File file = new File(root_path + fileName); return file.exists(); } - private SlaveServer getSlaveServer(){ + private SlaveServer getSlaveServer() { SlaveServer remoteSlaveServer = new SlaveServer(); remoteSlaveServer.setHostname(carte);// 设置远程IP remoteSlaveServer.setPort(port);// 端口 @@ -301,14 +307,14 @@ public class ExtractDataService { return remoteSlaveServer; } - private void generateJobFile(String extractType, DatasetTable datasetTable) throws Exception{ + private void generateJobFile(String extractType, DatasetTable datasetTable) throws Exception { String jobName = null; switch (extractType) { case "all_scope": jobName = "job_" + datasetTable.getId(); break; case "incremental_add": - jobName = "job_add_" + datasetTable.getId(); + jobName = "job_add_" + datasetTable.getId(); break; case "incremental_delete": jobName = "job_delete_" + datasetTable.getId(); @@ -323,7 +329,7 @@ public class ExtractDataService { transName = "trans_" + datasetTable.getId(); break; case "incremental_add": - transName = "trans_add_" + datasetTable.getId(); + transName = "trans_add_" + datasetTable.getId(); break; case "incremental_delete": transName = "trans_delete_" + datasetTable.getId(); @@ -364,11 +370,11 @@ public class ExtractDataService { jobMeta.addJobHop(greenHop); String jobXml = jobMeta.getXML(); - File file = new File( root_path + jobName + ".kjb"); + File file = new File(root_path + jobName + ".kjb"); FileUtils.writeStringToFile(file, jobXml, "UTF-8"); } - private void generateTransFile(String extractType, DatasetTable datasetTable, Datasource datasource, String table, List datasetTableFields, String selectSQL) throws Exception{ + private void generateTransFile(String extractType, DatasetTable datasetTable, Datasource datasource, String table, List datasetTableFields, String selectSQL) throws Exception { TransMeta transMeta = new TransMeta(); String transName = null; switch (extractType) { @@ -377,7 +383,7 @@ public class ExtractDataService { selectSQL = dataSetTableService.createQuerySQL(datasource.getType(), table, datasetTableFields.stream().map(DatasetTableField::getOriginName).toArray(String[]::new)); break; case "incremental_add": - transName = "trans_add_" + datasetTable.getId(); + transName = "trans_add_" + datasetTable.getId(); break; case "incremental_delete": transName = "trans_delete_" + datasetTable.getId(); @@ -450,11 +456,11 @@ public class ExtractDataService { RuntimeTestActionHandler defaultHandler = null; RuntimeTestActionService runtimeTestActionService = new RuntimeTestActionServiceImpl(runtimeTestActionHandlers, defaultHandler); - RuntimeTester runtimeTester = new RuntimeTesterImpl(new ArrayList<>( Arrays.asList( mock( RuntimeTest.class ) ) ), mock( ExecutorService.class ), "modules"); + RuntimeTester runtimeTester = new RuntimeTesterImpl(new ArrayList<>(Arrays.asList(mock(RuntimeTest.class))), mock(ExecutorService.class), "modules"); Put put = new Put((datasetTable.getId() + "," + "target_mapping").getBytes()); for (DatasetTableField datasetTableField : datasetTableFields) { - put.addColumn("columns".getBytes(), (dataease_column_family + "," + datasetTableField.getOriginName() + "," + datasetTableField.getOriginName()).getBytes(), transToColumnType(datasetTableField.getDeType()).getBytes()); + put.addColumn("columns".getBytes(), (dataease_column_family + "," + datasetTableField.getOriginName() + "," + datasetTableField.getOriginName()).getBytes(), transToColumnType(datasetTableField.getDeType()).getBytes()); } put.addColumn("key".getBytes(), "uuid".getBytes(), "String".getBytes()); TableName pentaho_mappings = TableName.valueOf(this.pentaho_mappings); @@ -466,7 +472,7 @@ public class ExtractDataService { hBaseOutputMeta.setTargetMappingName("target_mapping"); hBaseOutputMeta.setNamedCluster(clusterTemplate); hBaseOutputMeta.setCoreConfigURL(hbase_conf_file); - if(extractType.equalsIgnoreCase("incremental_delete")){ + if (extractType.equalsIgnoreCase("incremental_delete")) { hBaseOutputMeta.setDeleteRowKey(true); } StepMeta tostep = new StepMeta("HBaseOutput", "HBaseOutput", hBaseOutputMeta); diff --git a/backend/src/main/java/io/dataease/service/spark/CacheUtil.java b/backend/src/main/java/io/dataease/service/spark/CacheUtil.java new file mode 100644 index 0000000000..56986f5cb5 --- /dev/null +++ b/backend/src/main/java/io/dataease/service/spark/CacheUtil.java @@ -0,0 +1,53 @@ +package io.dataease.service.spark; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; + +import java.util.HashMap; +import java.util.Map; + +/** + * @Author gin + * @Date 2021/4/13 12:32 下午 + */ +public class CacheUtil { + private static CacheUtil cacheUtil; + private static Map> cacheMap; + + private CacheUtil(){ + cacheMap = new HashMap>(); + } + + public static CacheUtil getInstance(){ + if (cacheUtil == null){ + cacheUtil = new CacheUtil(); + } + return cacheUtil; + } + + /** + * 添加缓存 + * @param key + * @param obj + */ + public void addCacheData(String key,Dataset obj){ + cacheMap.put(key,obj); + } + + /** + * 取出缓存 + * @param key + * @return + */ + public Dataset getCacheData(String key){ + return cacheMap.get(key); + } + + /** + * 清楚缓存 + * @param key + */ + public void removeCacheData(String key){ + cacheMap.remove(key); + } +} diff --git a/backend/src/main/java/io/dataease/service/spark/SparkCalc.java b/backend/src/main/java/io/dataease/service/spark/SparkCalc.java index 26f33525aa..936ddb8d57 100644 --- a/backend/src/main/java/io/dataease/service/spark/SparkCalc.java +++ b/backend/src/main/java/io/dataease/service/spark/SparkCalc.java @@ -1,8 +1,10 @@ package io.dataease.service.spark; +import io.dataease.base.domain.DatasetTableField; import io.dataease.commons.utils.CommonBeanFactory; import io.dataease.dto.chart.ChartViewFieldDTO; import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Result; @@ -42,21 +44,56 @@ public class SparkCalc { @Resource private Environment env; // 保存了配置文件的信息 - public List getData(String hTable, List xAxis, List yAxis, String tmpTable) throws Exception { + public List getData(String hTable, List fields, List xAxis, List yAxis, String tmpTable) throws Exception { + // Spark Context + SparkSession spark = CommonBeanFactory.getBean(SparkSession.class); + JavaSparkContext sparkContext = new JavaSparkContext(spark.sparkContext()); + + // Spark SQL Context +// SQLContext sqlContext = CommonBeanFactory.getBean(SQLContext.class); + SQLContext sqlContext = new SQLContext(sparkContext); + sqlContext.setConf("spark.sql.shuffle.partitions", env.getProperty("spark.sql.shuffle.partitions", "1")); + sqlContext.setConf("spark.default.parallelism", env.getProperty("spark.default.parallelism", "1")); + + Dataset dataFrame = CacheUtil.getInstance().getCacheData(hTable); + if (ObjectUtils.isEmpty(dataFrame)) { + dataFrame = getHBaseDataAndCache(sparkContext, sqlContext, hTable, fields); + } + + dataFrame.createOrReplaceTempView(tmpTable); + Dataset sql = sqlContext.sql(getSQL(xAxis, yAxis, tmpTable)); + // transform + List data = new ArrayList<>(); + List list = sql.collectAsList(); + for (Row row : list) { + String[] r = new String[row.length()]; + for (int i = 0; i < row.length(); i++) { + r[i] = row.get(i) == null ? "null" : row.get(i).toString(); + } + data.add(r); + } + return data; + } + + public Dataset getHBaseDataAndCache(String hTable, List fields) throws Exception { + // Spark Context + SparkSession spark = CommonBeanFactory.getBean(SparkSession.class); + JavaSparkContext sparkContext = new JavaSparkContext(spark.sparkContext()); + + // Spark SQL Context +// SQLContext sqlContext = CommonBeanFactory.getBean(SQLContext.class); + SQLContext sqlContext = new SQLContext(sparkContext); + sqlContext.setConf("spark.sql.shuffle.partitions", env.getProperty("spark.sql.shuffle.partitions", "1")); + sqlContext.setConf("spark.default.parallelism", env.getProperty("spark.default.parallelism", "1")); + return getHBaseDataAndCache(sparkContext, sqlContext, hTable, fields); + } + + public Dataset getHBaseDataAndCache(JavaSparkContext sparkContext, SQLContext sqlContext, String hTable, List fields) throws Exception { Scan scan = new Scan(); scan.addFamily(column_family.getBytes()); ClientProtos.Scan proto = ProtobufUtil.toScan(scan); String scanToString = new String(Base64.getEncoder().encode(proto.toByteArray())); - // Spark Context -// JavaSparkContext sparkContext = CommonBeanFactory.getBean(JavaSparkContext.class); - SparkSession spark = SparkSession.builder() - .appName(env.getProperty("spark.appName", "DataeaseJob")) - .master(env.getProperty("spark.master", "local[*]")) - .config("spark.scheduler.mode", "FAIR") - .getOrCreate(); - JavaSparkContext sparkContext = new JavaSparkContext(spark.sparkContext()); - // HBase config // Configuration conf = CommonBeanFactory.getBean(Configuration.class); org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); @@ -73,7 +110,7 @@ public class SparkCalc { while (tuple2Iterator.hasNext()) { Result result = tuple2Iterator.next()._2; List list = new ArrayList<>(); - xAxis.forEach(x -> { + fields.forEach(x -> { String l = Bytes.toString(result.getValue(column_family.getBytes(), x.getOriginName().getBytes())); if (x.getDeType() == 0 || x.getDeType() == 1) { list.add(l); @@ -89,22 +126,6 @@ public class SparkCalc { list.add(Double.valueOf(l)); } }); - yAxis.forEach(y -> { - String l = Bytes.toString(result.getValue(column_family.getBytes(), y.getOriginName().getBytes())); - if (y.getDeType() == 0 || y.getDeType() == 1) { - list.add(l); - } else if (y.getDeType() == 2) { - if (StringUtils.isEmpty(l)) { - l = "0"; - } - list.add(Long.valueOf(l)); - } else if (y.getDeType() == 3) { - if (StringUtils.isEmpty(l)) { - l = "0.0"; - } - list.add(Double.valueOf(l)); - } - }); iterator.add(RowFactory.create(list.toArray())); } return iterator.iterator(); @@ -112,7 +133,7 @@ public class SparkCalc { List structFields = new ArrayList<>(); // struct顺序要与rdd顺序一致 - xAxis.forEach(x -> { + fields.forEach(x -> { if (x.getDeType() == 0 || x.getDeType() == 1) { structFields.add(DataTypes.createStructField(x.getOriginName(), DataTypes.StringType, true)); } else if (x.getDeType() == 2) { @@ -121,40 +142,15 @@ public class SparkCalc { structFields.add(DataTypes.createStructField(x.getOriginName(), DataTypes.DoubleType, true)); } }); - yAxis.forEach(y -> { - if (y.getDeType() == 0 || y.getDeType() == 1) { - structFields.add(DataTypes.createStructField(y.getOriginName(), DataTypes.StringType, true)); - } else if (y.getDeType() == 2) { - structFields.add(DataTypes.createStructField(y.getOriginName(), DataTypes.LongType, true)); - } else if (y.getDeType() == 3) { - structFields.add(DataTypes.createStructField(y.getOriginName(), DataTypes.DoubleType, true)); - } - }); StructType structType = DataTypes.createStructType(structFields); - // Spark SQL Context -// SQLContext sqlContext = CommonBeanFactory.getBean(SQLContext.class); - SQLContext sqlContext = new SQLContext(sparkContext); - sqlContext.setConf("spark.sql.shuffle.partitions", env.getProperty("spark.sql.shuffle.partitions", "1")); - sqlContext.setConf("spark.default.parallelism", env.getProperty("spark.default.parallelism", "1")); - - Dataset dataFrame = sqlContext.createDataFrame(rdd, structType); - dataFrame.createOrReplaceTempView(tmpTable); - Dataset sql = sqlContext.sql(getSQL(xAxis, yAxis, tmpTable)); - // transform - List data = new ArrayList<>(); - List list = sql.collectAsList(); - for (Row row : list) { - String[] r = new String[row.length()]; - for (int i = 0; i < row.length(); i++) { - r[i] = row.get(i) == null ? "null" : row.get(i).toString(); - } - data.add(r); - } - return data; + Dataset dataFrame = sqlContext.createDataFrame(rdd, structType).persist(); + CacheUtil.getInstance().addCacheData(hTable, dataFrame); + dataFrame.count(); + return dataFrame; } - private String getSQL(List xAxis, List yAxis, String table) { + public String getSQL(List xAxis, List yAxis, String table) { // 字段汇总 排序等 String[] field = yAxis.stream().map(y -> "CAST(" + y.getSummary() + "(" + y.getOriginName() + ") AS DECIMAL(20,2)) AS _" + y.getSummary() + "_" + y.getOriginName()).toArray(String[]::new); String[] group = xAxis.stream().map(ChartViewFieldDTO::getOriginName).toArray(String[]::new);