diff --git a/backend/pom.xml b/backend/pom.xml index 177a028f21..1523947dec 100644 --- a/backend/pom.xml +++ b/backend/pom.xml @@ -326,6 +326,11 @@ hbase-common 2.4.1 + + org.apache.hbase + hbase-mapreduce + 2.4.1 + org.testng diff --git a/backend/src/main/java/io/dataease/base/domain/DatasetTableIncrementalConfig.java b/backend/src/main/java/io/dataease/base/domain/DatasetTableIncrementalConfig.java new file mode 100644 index 0000000000..33b94f2d03 --- /dev/null +++ b/backend/src/main/java/io/dataease/base/domain/DatasetTableIncrementalConfig.java @@ -0,0 +1,17 @@ +package io.dataease.base.domain; + +import java.io.Serializable; +import lombok.Data; + +@Data +public class DatasetTableIncrementalConfig implements Serializable { + private String id; + + private String tableId; + + private String incrementalDelete; + + private String incrementalAdd; + + private static final long serialVersionUID = 1L; +} \ No newline at end of file diff --git a/backend/src/main/java/io/dataease/base/domain/DatasetTableIncrementalConfigExample.java b/backend/src/main/java/io/dataease/base/domain/DatasetTableIncrementalConfigExample.java new file mode 100644 index 0000000000..9750acef68 --- /dev/null +++ b/backend/src/main/java/io/dataease/base/domain/DatasetTableIncrementalConfigExample.java @@ -0,0 +1,480 @@ +package io.dataease.base.domain; + +import java.util.ArrayList; +import java.util.List; + +public class DatasetTableIncrementalConfigExample { + protected String orderByClause; + + protected boolean distinct; + + protected List oredCriteria; + + public DatasetTableIncrementalConfigExample() { + oredCriteria = new ArrayList(); + } + + public void setOrderByClause(String orderByClause) { + this.orderByClause = orderByClause; + } + + public String getOrderByClause() { + return orderByClause; + } + + public void setDistinct(boolean distinct) { + this.distinct = distinct; + } + + public boolean isDistinct() { + return distinct; + } + + public List getOredCriteria() { + return oredCriteria; + } + + public void or(Criteria criteria) { + oredCriteria.add(criteria); + } + + public Criteria or() { + Criteria criteria = createCriteriaInternal(); + oredCriteria.add(criteria); + return criteria; + } + + public Criteria createCriteria() { + Criteria criteria = createCriteriaInternal(); + if (oredCriteria.size() == 0) { + oredCriteria.add(criteria); + } + return criteria; + } + + protected Criteria createCriteriaInternal() { + Criteria criteria = new Criteria(); + return criteria; + } + + public void clear() { + oredCriteria.clear(); + orderByClause = null; + distinct = false; + } + + protected abstract static class GeneratedCriteria { + protected List criteria; + + protected GeneratedCriteria() { + super(); + criteria = new ArrayList(); + } + + public boolean isValid() { + return criteria.size() > 0; + } + + public List getAllCriteria() { + return criteria; + } + + public List getCriteria() { + return criteria; + } + + protected void addCriterion(String condition) { + if (condition == null) { + throw new RuntimeException("Value for condition cannot be null"); + } + criteria.add(new Criterion(condition)); + } + + protected void addCriterion(String condition, Object value, String property) { + if (value == null) { + throw new RuntimeException("Value for " + property + " cannot be null"); + } + criteria.add(new Criterion(condition, value)); + } + + protected void addCriterion(String condition, Object value1, Object value2, String property) { + if (value1 == null || value2 == null) { + throw new RuntimeException("Between values for " + property + " cannot be null"); + } + criteria.add(new Criterion(condition, value1, value2)); + } + + public Criteria andIdIsNull() { + addCriterion("id is null"); + return (Criteria) this; + } + + public Criteria andIdIsNotNull() { + addCriterion("id is not null"); + return (Criteria) this; + } + + public Criteria andIdEqualTo(String value) { + addCriterion("id =", value, "id"); + return (Criteria) this; + } + + public Criteria andIdNotEqualTo(String value) { + addCriterion("id <>", value, "id"); + return (Criteria) this; + } + + public Criteria andIdGreaterThan(String value) { + addCriterion("id >", value, "id"); + return (Criteria) this; + } + + public Criteria andIdGreaterThanOrEqualTo(String value) { + addCriterion("id >=", value, "id"); + return (Criteria) this; + } + + public Criteria andIdLessThan(String value) { + addCriterion("id <", value, "id"); + return (Criteria) this; + } + + public Criteria andIdLessThanOrEqualTo(String value) { + addCriterion("id <=", value, "id"); + return (Criteria) this; + } + + public Criteria andIdLike(String value) { + addCriterion("id like", value, "id"); + return (Criteria) this; + } + + public Criteria andIdNotLike(String value) { + addCriterion("id not like", value, "id"); + return (Criteria) this; + } + + public Criteria andIdIn(List values) { + addCriterion("id in", values, "id"); + return (Criteria) this; + } + + public Criteria andIdNotIn(List values) { + addCriterion("id not in", values, "id"); + return (Criteria) this; + } + + public Criteria andIdBetween(String value1, String value2) { + addCriterion("id between", value1, value2, "id"); + return (Criteria) this; + } + + public Criteria andIdNotBetween(String value1, String value2) { + addCriterion("id not between", value1, value2, "id"); + return (Criteria) this; + } + + public Criteria andTableIdIsNull() { + addCriterion("table_id is null"); + return (Criteria) this; + } + + public Criteria andTableIdIsNotNull() { + addCriterion("table_id is not null"); + return (Criteria) this; + } + + public Criteria andTableIdEqualTo(String value) { + addCriterion("table_id =", value, "tableId"); + return (Criteria) this; + } + + public Criteria andTableIdNotEqualTo(String value) { + addCriterion("table_id <>", value, "tableId"); + return (Criteria) this; + } + + public Criteria andTableIdGreaterThan(String value) { + addCriterion("table_id >", value, "tableId"); + return (Criteria) this; + } + + public Criteria andTableIdGreaterThanOrEqualTo(String value) { + addCriterion("table_id >=", value, "tableId"); + return (Criteria) this; + } + + public Criteria andTableIdLessThan(String value) { + addCriterion("table_id <", value, "tableId"); + return (Criteria) this; + } + + public Criteria andTableIdLessThanOrEqualTo(String value) { + addCriterion("table_id <=", value, "tableId"); + return (Criteria) this; + } + + public Criteria andTableIdLike(String value) { + addCriterion("table_id like", value, "tableId"); + return (Criteria) this; + } + + public Criteria andTableIdNotLike(String value) { + addCriterion("table_id not like", value, "tableId"); + return (Criteria) this; + } + + public Criteria andTableIdIn(List values) { + addCriterion("table_id in", values, "tableId"); + return (Criteria) this; + } + + public Criteria andTableIdNotIn(List values) { + addCriterion("table_id not in", values, "tableId"); + return (Criteria) this; + } + + public Criteria andTableIdBetween(String value1, String value2) { + addCriterion("table_id between", value1, value2, "tableId"); + return (Criteria) this; + } + + public Criteria andTableIdNotBetween(String value1, String value2) { + addCriterion("table_id not between", value1, value2, "tableId"); + return (Criteria) this; + } + + public Criteria andIncrementalDeleteIsNull() { + addCriterion("incremental_delete is null"); + return (Criteria) this; + } + + public Criteria andIncrementalDeleteIsNotNull() { + addCriterion("incremental_delete is not null"); + return (Criteria) this; + } + + public Criteria andIncrementalDeleteEqualTo(String value) { + addCriterion("incremental_delete =", value, "incrementalDelete"); + return (Criteria) this; + } + + public Criteria andIncrementalDeleteNotEqualTo(String value) { + addCriterion("incremental_delete <>", value, "incrementalDelete"); + return (Criteria) this; + } + + public Criteria andIncrementalDeleteGreaterThan(String value) { + addCriterion("incremental_delete >", value, "incrementalDelete"); + return (Criteria) this; + } + + public Criteria andIncrementalDeleteGreaterThanOrEqualTo(String value) { + addCriterion("incremental_delete >=", value, "incrementalDelete"); + return (Criteria) this; + } + + public Criteria andIncrementalDeleteLessThan(String value) { + addCriterion("incremental_delete <", value, "incrementalDelete"); + return (Criteria) this; + } + + public Criteria andIncrementalDeleteLessThanOrEqualTo(String value) { + addCriterion("incremental_delete <=", value, "incrementalDelete"); + return (Criteria) this; + } + + public Criteria andIncrementalDeleteLike(String value) { + addCriterion("incremental_delete like", value, "incrementalDelete"); + return (Criteria) this; + } + + public Criteria andIncrementalDeleteNotLike(String value) { + addCriterion("incremental_delete not like", value, "incrementalDelete"); + return (Criteria) this; + } + + public Criteria andIncrementalDeleteIn(List values) { + addCriterion("incremental_delete in", values, "incrementalDelete"); + return (Criteria) this; + } + + public Criteria andIncrementalDeleteNotIn(List values) { + addCriterion("incremental_delete not in", values, "incrementalDelete"); + return (Criteria) this; + } + + public Criteria andIncrementalDeleteBetween(String value1, String value2) { + addCriterion("incremental_delete between", value1, value2, "incrementalDelete"); + return (Criteria) this; + } + + public Criteria andIncrementalDeleteNotBetween(String value1, String value2) { + addCriterion("incremental_delete not between", value1, value2, "incrementalDelete"); + return (Criteria) this; + } + + public Criteria andIncrementalAddIsNull() { + addCriterion("incremental_add is null"); + return (Criteria) this; + } + + public Criteria andIncrementalAddIsNotNull() { + addCriterion("incremental_add is not null"); + return (Criteria) this; + } + + public Criteria andIncrementalAddEqualTo(String value) { + addCriterion("incremental_add =", value, "incrementalAdd"); + return (Criteria) this; + } + + public Criteria andIncrementalAddNotEqualTo(String value) { + addCriterion("incremental_add <>", value, "incrementalAdd"); + return (Criteria) this; + } + + public Criteria andIncrementalAddGreaterThan(String value) { + addCriterion("incremental_add >", value, "incrementalAdd"); + return (Criteria) this; + } + + public Criteria andIncrementalAddGreaterThanOrEqualTo(String value) { + addCriterion("incremental_add >=", value, "incrementalAdd"); + return (Criteria) this; + } + + public Criteria andIncrementalAddLessThan(String value) { + addCriterion("incremental_add <", value, "incrementalAdd"); + return (Criteria) this; + } + + public Criteria andIncrementalAddLessThanOrEqualTo(String value) { + addCriterion("incremental_add <=", value, "incrementalAdd"); + return (Criteria) this; + } + + public Criteria andIncrementalAddLike(String value) { + addCriterion("incremental_add like", value, "incrementalAdd"); + return (Criteria) this; + } + + public Criteria andIncrementalAddNotLike(String value) { + addCriterion("incremental_add not like", value, "incrementalAdd"); + return (Criteria) this; + } + + public Criteria andIncrementalAddIn(List values) { + addCriterion("incremental_add in", values, "incrementalAdd"); + return (Criteria) this; + } + + public Criteria andIncrementalAddNotIn(List values) { + addCriterion("incremental_add not in", values, "incrementalAdd"); + return (Criteria) this; + } + + public Criteria andIncrementalAddBetween(String value1, String value2) { + addCriterion("incremental_add between", value1, value2, "incrementalAdd"); + return (Criteria) this; + } + + public Criteria andIncrementalAddNotBetween(String value1, String value2) { + addCriterion("incremental_add not between", value1, value2, "incrementalAdd"); + return (Criteria) this; + } + } + + public static class Criteria extends GeneratedCriteria { + + protected Criteria() { + super(); + } + } + + public static class Criterion { + private String condition; + + private Object value; + + private Object secondValue; + + private boolean noValue; + + private boolean singleValue; + + private boolean betweenValue; + + private boolean listValue; + + private String typeHandler; + + public String getCondition() { + return condition; + } + + public Object getValue() { + return value; + } + + public Object getSecondValue() { + return secondValue; + } + + public boolean isNoValue() { + return noValue; + } + + public boolean isSingleValue() { + return singleValue; + } + + public boolean isBetweenValue() { + return betweenValue; + } + + public boolean isListValue() { + return listValue; + } + + public String getTypeHandler() { + return typeHandler; + } + + protected Criterion(String condition) { + super(); + this.condition = condition; + this.typeHandler = null; + this.noValue = true; + } + + protected Criterion(String condition, Object value, String typeHandler) { + super(); + this.condition = condition; + this.value = value; + this.typeHandler = typeHandler; + if (value instanceof List>) { + this.listValue = true; + } else { + this.singleValue = true; + } + } + + protected Criterion(String condition, Object value) { + this(condition, value, null); + } + + protected Criterion(String condition, Object value, Object secondValue, String typeHandler) { + super(); + this.condition = condition; + this.value = value; + this.secondValue = secondValue; + this.typeHandler = typeHandler; + this.betweenValue = true; + } + + protected Criterion(String condition, Object value, Object secondValue) { + this(condition, value, secondValue, null); + } + } +} \ No newline at end of file diff --git a/backend/src/main/java/io/dataease/base/mapper/DatasetTableIncrementalConfigMapper.java b/backend/src/main/java/io/dataease/base/mapper/DatasetTableIncrementalConfigMapper.java new file mode 100644 index 0000000000..dc7637adf3 --- /dev/null +++ b/backend/src/main/java/io/dataease/base/mapper/DatasetTableIncrementalConfigMapper.java @@ -0,0 +1,22 @@ +package io.dataease.base.mapper; + +import io.dataease.base.domain.DatasetTableIncrementalConfig; +import io.dataease.base.domain.DatasetTableIncrementalConfigExample; +import java.util.List; +import org.apache.ibatis.annotations.Param; + +public interface DatasetTableIncrementalConfigMapper { + long countByExample(DatasetTableIncrementalConfigExample example); + + int deleteByExample(DatasetTableIncrementalConfigExample example); + + int insert(DatasetTableIncrementalConfig record); + + int insertSelective(DatasetTableIncrementalConfig record); + + List selectByExample(DatasetTableIncrementalConfigExample example); + + int updateByExampleSelective(@Param("record") DatasetTableIncrementalConfig record, @Param("example") DatasetTableIncrementalConfigExample example); + + int updateByExample(@Param("record") DatasetTableIncrementalConfig record, @Param("example") DatasetTableIncrementalConfigExample example); +} \ No newline at end of file diff --git a/backend/src/main/java/io/dataease/base/mapper/DatasetTableIncrementalConfigMapper.xml b/backend/src/main/java/io/dataease/base/mapper/DatasetTableIncrementalConfigMapper.xml new file mode 100644 index 0000000000..b21c1704da --- /dev/null +++ b/backend/src/main/java/io/dataease/base/mapper/DatasetTableIncrementalConfigMapper.xml @@ -0,0 +1,164 @@ + + + + + + + + + + + + + + + + + + and ${criterion.condition} + + + and ${criterion.condition} #{criterion.value} + + + and ${criterion.condition} #{criterion.value} and #{criterion.secondValue} + + + and ${criterion.condition} + + #{listItem} + + + + + + + + + + + + + + + + + + and ${criterion.condition} + + + and ${criterion.condition} #{criterion.value} + + + and ${criterion.condition} #{criterion.value} and #{criterion.secondValue} + + + and ${criterion.condition} + + #{listItem} + + + + + + + + + + + id, table_id, incremental_delete, incremental_add + + + select + + distinct + + + from dataset_table_incremental_config + + + + + order by ${orderByClause} + + + + delete from dataset_table_incremental_config + + + + + + insert into dataset_table_incremental_config (id, table_id, incremental_delete, + incremental_add) + values (#{id,jdbcType=VARCHAR}, #{tableId,jdbcType=VARCHAR}, #{incrementalDelete,jdbcType=VARCHAR}, + #{incrementalAdd,jdbcType=VARCHAR}) + + + insert into dataset_table_incremental_config + + + id, + + + table_id, + + + incremental_delete, + + + incremental_add, + + + + + #{id,jdbcType=VARCHAR}, + + + #{tableId,jdbcType=VARCHAR}, + + + #{incrementalDelete,jdbcType=VARCHAR}, + + + #{incrementalAdd,jdbcType=VARCHAR}, + + + + + select count(*) from dataset_table_incremental_config + + + + + + update dataset_table_incremental_config + + + id = #{record.id,jdbcType=VARCHAR}, + + + table_id = #{record.tableId,jdbcType=VARCHAR}, + + + incremental_delete = #{record.incrementalDelete,jdbcType=VARCHAR}, + + + incremental_add = #{record.incrementalAdd,jdbcType=VARCHAR}, + + + + + + + + update dataset_table_incremental_config + set id = #{record.id,jdbcType=VARCHAR}, + table_id = #{record.tableId,jdbcType=VARCHAR}, + incremental_delete = #{record.incrementalDelete,jdbcType=VARCHAR}, + incremental_add = #{record.incrementalAdd,jdbcType=VARCHAR} + + + + + \ No newline at end of file diff --git a/backend/src/main/java/io/dataease/commons/constants/ScheduleType.java b/backend/src/main/java/io/dataease/commons/constants/ScheduleType.java index 98b4fde4dd..18aeb65588 100644 --- a/backend/src/main/java/io/dataease/commons/constants/ScheduleType.java +++ b/backend/src/main/java/io/dataease/commons/constants/ScheduleType.java @@ -1,5 +1,5 @@ package io.dataease.commons.constants; public enum ScheduleType { - CRON, SIMPLE + CRON, SIMPLE, SIMPLE_COMPLETE } diff --git a/backend/src/main/java/io/dataease/commons/constants/UpdateType.java b/backend/src/main/java/io/dataease/commons/constants/UpdateType.java new file mode 100644 index 0000000000..0919e566af --- /dev/null +++ b/backend/src/main/java/io/dataease/commons/constants/UpdateType.java @@ -0,0 +1,5 @@ +package io.dataease.commons.constants; + +public enum UpdateType { + all_scope, add_scope +} diff --git a/backend/src/main/java/io/dataease/controller/dataset/DataSetTableController.java b/backend/src/main/java/io/dataease/controller/dataset/DataSetTableController.java index 827386e0bb..7a902aa505 100644 --- a/backend/src/main/java/io/dataease/controller/dataset/DataSetTableController.java +++ b/backend/src/main/java/io/dataease/controller/dataset/DataSetTableController.java @@ -2,6 +2,7 @@ package io.dataease.controller.dataset; import io.dataease.base.domain.DatasetTable; import io.dataease.base.domain.DatasetTableField; +import io.dataease.base.domain.DatasetTableIncrementalConfig; import io.dataease.controller.request.dataset.DataSetTableRequest; import io.dataease.datasource.dto.TableFiled; import io.dataease.service.dataset.DataSetTableService; @@ -70,4 +71,15 @@ public class DataSetTableController { public Map getSQLPreview(@RequestBody DataSetTableRequest dataSetTableRequest) throws Exception { return dataSetTableService.getSQLPreview(dataSetTableRequest); } + + @PostMapping("incrementalConfig") + public DatasetTableIncrementalConfig incrementalConfig(@RequestBody DatasetTableIncrementalConfig datasetTableIncrementalConfig) throws Exception { + return dataSetTableService.incrementalConfig(datasetTableIncrementalConfig); + } + + @PostMapping("save/incrementalConfig") + public void saveIncrementalConfig(@RequestBody DatasetTableIncrementalConfig datasetTableIncrementalConfig) throws Exception { + dataSetTableService.saveIncrementalConfig(datasetTableIncrementalConfig); + } + } diff --git a/backend/src/main/java/io/dataease/job/sechedule/DeScheduleJob.java b/backend/src/main/java/io/dataease/job/sechedule/DeScheduleJob.java index 8400020daa..3fa52a44d3 100644 --- a/backend/src/main/java/io/dataease/job/sechedule/DeScheduleJob.java +++ b/backend/src/main/java/io/dataease/job/sechedule/DeScheduleJob.java @@ -8,6 +8,7 @@ public abstract class DeScheduleJob implements Job { protected String datasetTableId; protected String expression; protected String taskId; + protected String updateType; @Override public void execute(JobExecutionContext context) throws JobExecutionException { @@ -16,6 +17,7 @@ public abstract class DeScheduleJob implements Job { this.datasetTableId = jobDataMap.getString("datasetTableId"); this.expression = jobDataMap.getString("expression"); this.taskId = jobDataMap.getString("taskId"); + this.updateType = jobDataMap.getString("updateType"); LogUtil.info(jobKey.getGroup() + " Running: " + datasetTableId); LogUtil.info("CronExpression: " + expression); diff --git a/backend/src/main/java/io/dataease/job/sechedule/ExtractDataJob.java b/backend/src/main/java/io/dataease/job/sechedule/ExtractDataJob.java index 2704f33a02..ec91fbdce7 100644 --- a/backend/src/main/java/io/dataease/job/sechedule/ExtractDataJob.java +++ b/backend/src/main/java/io/dataease/job/sechedule/ExtractDataJob.java @@ -16,7 +16,7 @@ public class ExtractDataJob extends DeScheduleJob{ @Override void businessExecute(JobExecutionContext context) { - extractDataService.extractData(datasetTableId, taskId); + extractDataService.extractData(datasetTableId, taskId, updateType); } } diff --git a/backend/src/main/java/io/dataease/job/sechedule/ScheduleManager.java b/backend/src/main/java/io/dataease/job/sechedule/ScheduleManager.java index 44bdf3aaad..53a2c4852d 100644 --- a/backend/src/main/java/io/dataease/job/sechedule/ScheduleManager.java +++ b/backend/src/main/java/io/dataease/job/sechedule/ScheduleManager.java @@ -369,11 +369,12 @@ public class ScheduleManager { addOrUpdateCronJob(jobKey, triggerKey, jobClass, cron, startTime, endTime, null); } - public JobDataMap getDefaultJobDataMap(String resourceId, String expression, String taskId) { + public JobDataMap getDefaultJobDataMap(String resourceId, String expression, String taskId, String updateType) { JobDataMap jobDataMap = new JobDataMap(); jobDataMap.put("datasetTableId", resourceId); jobDataMap.put("taskId", taskId); jobDataMap.put("expression", expression); + jobDataMap.put("updateType", updateType); return jobDataMap; } diff --git a/backend/src/main/java/io/dataease/service/ScheduleService.java b/backend/src/main/java/io/dataease/service/ScheduleService.java index 5885687dcb..8aa4e85735 100644 --- a/backend/src/main/java/io/dataease/service/ScheduleService.java +++ b/backend/src/main/java/io/dataease/service/ScheduleService.java @@ -1,6 +1,7 @@ package io.dataease.service; import io.dataease.base.domain.DatasetTableTask; +import io.dataease.commons.constants.ScheduleType; import io.dataease.job.sechedule.ExtractDataJob; import io.dataease.job.sechedule.ScheduleManager; import org.apache.commons.lang3.StringUtils; @@ -21,12 +22,13 @@ public class ScheduleService { private ScheduleManager scheduleManager; public void addSchedule(DatasetTableTask datasetTableTask) throws Exception { - if (StringUtils.equalsIgnoreCase(datasetTableTask.getRate(), "0")) { + if (StringUtils.equalsIgnoreCase(datasetTableTask.getRate(), ScheduleType.SIMPLE.toString())) { scheduleManager.addOrUpdateSingleJob(new JobKey(datasetTableTask.getId(), datasetTableTask.getTableId()), new TriggerKey(datasetTableTask.getId(), datasetTableTask.getTableId()), ExtractDataJob.class, - new Date(datasetTableTask.getStartTime()), scheduleManager.getDefaultJobDataMap(datasetTableTask.getTableId(), datasetTableTask.getCron(), datasetTableTask.getId())); - } else if (StringUtils.equalsIgnoreCase(datasetTableTask.getRate(), "1")) { + new Date(datasetTableTask.getStartTime()), + scheduleManager.getDefaultJobDataMap(datasetTableTask.getTableId(), datasetTableTask.getCron(), datasetTableTask.getId(), datasetTableTask.getType())); + } else if (StringUtils.equalsIgnoreCase(datasetTableTask.getRate(), ScheduleType.CRON.toString())) { Date endTime; if (datasetTableTask.getEndTime() == null || datasetTableTask.getEndTime() == 0) { endTime = null; @@ -38,7 +40,7 @@ public class ScheduleService { new TriggerKey(datasetTableTask.getId(), datasetTableTask.getTableId()), ExtractDataJob.class, datasetTableTask.getCron(), new Date(datasetTableTask.getStartTime()), endTime, - scheduleManager.getDefaultJobDataMap(datasetTableTask.getTableId(), datasetTableTask.getCron(), datasetTableTask.getId())); + scheduleManager.getDefaultJobDataMap(datasetTableTask.getTableId(), datasetTableTask.getCron(), datasetTableTask.getId(), datasetTableTask.getType())); } } diff --git a/backend/src/main/java/io/dataease/service/dataset/DataSetTableService.java b/backend/src/main/java/io/dataease/service/dataset/DataSetTableService.java index eb7e2576d4..01640780c9 100644 --- a/backend/src/main/java/io/dataease/service/dataset/DataSetTableService.java +++ b/backend/src/main/java/io/dataease/service/dataset/DataSetTableService.java @@ -2,10 +2,8 @@ package io.dataease.service.dataset; import com.google.gson.Gson; -import io.dataease.base.domain.DatasetTable; -import io.dataease.base.domain.DatasetTableExample; -import io.dataease.base.domain.DatasetTableField; -import io.dataease.base.domain.Datasource; +import io.dataease.base.domain.*; +import io.dataease.base.mapper.DatasetTableIncrementalConfigMapper; import io.dataease.base.mapper.DatasetTableMapper; import io.dataease.base.mapper.DatasourceMapper; import io.dataease.commons.utils.BeanUtils; @@ -40,6 +38,8 @@ public class DataSetTableService { private DataSetTableFieldsService dataSetTableFieldsService; @Resource private DataSetTableTaskService dataSetTableTaskService; + @Resource + private DatasetTableIncrementalConfigMapper datasetTableIncrementalConfigMapper; public void batchInsert(List datasetTable) throws Exception { for (DatasetTable table : datasetTable) { @@ -261,6 +261,20 @@ public class DataSetTableService { return data; } + public List getDataSetDataBySql(String datasourceId, String table, String sql) { + List data = new ArrayList<>(); + Datasource ds = datasourceMapper.selectByPrimaryKey(datasourceId); + DatasourceProvider datasourceProvider = ProviderFactory.getProvider(ds.getType()); + DatasourceRequest datasourceRequest = new DatasourceRequest(); + datasourceRequest.setDatasource(ds); + datasourceRequest.setQuery(sql); + try { + return datasourceProvider.getData(datasourceRequest); + } catch (Exception e) { + } + return data; + } + public void saveTableField(DatasetTable datasetTable) throws Exception { Datasource ds = datasourceMapper.selectByPrimaryKey(datasetTable.getDataSourceId()); DataSetTableRequest dataSetTableRequest = new DataSetTableRequest(); @@ -349,4 +363,35 @@ public class DataSetTableService { return 0; } } + + public DatasetTableIncrementalConfig incrementalConfig(DatasetTableIncrementalConfig datasetTableIncrementalConfig){ + if(StringUtils.isEmpty(datasetTableIncrementalConfig.getTableId())){return new DatasetTableIncrementalConfig();} + DatasetTableIncrementalConfigExample example = new DatasetTableIncrementalConfigExample(); + example.createCriteria().andTableIdEqualTo(datasetTableIncrementalConfig.getTableId()); + List configs = datasetTableIncrementalConfigMapper.selectByExample(example); + if(CollectionUtils.isNotEmpty(configs)){ + return configs.get(0); + }else { + return new DatasetTableIncrementalConfig(); + } + } + + public DatasetTableIncrementalConfig incrementalConfig(String datasetTableId){ + DatasetTableIncrementalConfig datasetTableIncrementalConfig = new DatasetTableIncrementalConfig(); + datasetTableIncrementalConfig.setTableId(datasetTableId); + return incrementalConfig(datasetTableIncrementalConfig); + } + + + public void saveIncrementalConfig(DatasetTableIncrementalConfig datasetTableIncrementalConfig){ + if(StringUtils.isEmpty(datasetTableIncrementalConfig.getId())){ + datasetTableIncrementalConfig.setId(UUID.randomUUID().toString()); + datasetTableIncrementalConfigMapper.insertSelective(datasetTableIncrementalConfig); + }else{ + DatasetTableIncrementalConfigExample example = new DatasetTableIncrementalConfigExample(); + example.createCriteria().andTableIdEqualTo(datasetTableIncrementalConfig.getTableId()); + datasetTableIncrementalConfigMapper.updateByExample(datasetTableIncrementalConfig, example); + } + } + } diff --git a/backend/src/main/java/io/dataease/service/dataset/DataSetTableTaskService.java b/backend/src/main/java/io/dataease/service/dataset/DataSetTableTaskService.java index ba494c375a..5640cd3020 100644 --- a/backend/src/main/java/io/dataease/service/dataset/DataSetTableTaskService.java +++ b/backend/src/main/java/io/dataease/service/dataset/DataSetTableTaskService.java @@ -70,6 +70,10 @@ public class DataSetTableTaskService { return datasetTableTaskMapper.selectByPrimaryKey(id); } + public void update(DatasetTableTask datasetTableTask) { + datasetTableTaskMapper.updateByPrimaryKey(datasetTableTask); + } + public List list(DatasetTableTask datasetTableTask) { DatasetTableTaskExample datasetTableTaskExample = new DatasetTableTaskExample(); DatasetTableTaskExample.Criteria criteria = datasetTableTaskExample.createCriteria(); diff --git a/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java b/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java index 248d98f497..7078f03f5f 100644 --- a/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java +++ b/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java @@ -1,19 +1,23 @@ package io.dataease.service.dataset; import com.google.gson.Gson; -import io.dataease.base.domain.DatasetTable; -import io.dataease.base.domain.DatasetTableField; -import io.dataease.base.domain.DatasetTableTaskLog; +import io.dataease.base.domain.*; import io.dataease.commons.constants.JobStatus; +import io.dataease.commons.constants.ScheduleType; +import io.dataease.commons.constants.UpdateType; import io.dataease.commons.utils.CommonBeanFactory; import io.dataease.commons.utils.LogUtil; +import io.dataease.dto.dataset.DataSetTaskLogDTO; import io.dataease.dto.dataset.DataTableInfoDTO; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.springframework.stereotype.Service; import javax.annotation.Resource; +import java.security.MessageDigest; import java.util.List; import java.util.UUID; import java.util.concurrent.ExecutorService; @@ -28,54 +32,74 @@ public class ExtractDataService { private DataSetTableFieldsService dataSetTableFieldsService; @Resource private DataSetTableTaskLogService dataSetTableTaskLogService; + @Resource + private DataSetTableTaskService dataSetTableTaskService; private Long pageSize = 10000l; private static ExecutorService pool = Executors.newScheduledThreadPool(50); //设置连接池 private Connection connection; + private static String lastUpdateTime = "${__last_update_time__}"; + private static String currentUpdateTime = "${__current_update_time__}"; + private static String column_family = "dataease"; - public void extractData(String datasetTableId, String taskId) { + public void extractData(String datasetTableId, String taskId, String type) { DatasetTableTaskLog datasetTableTaskLog = new DatasetTableTaskLog(); + UpdateType updateType = UpdateType.valueOf(type); try { - datasetTableTaskLog.setTableId(datasetTableId); - datasetTableTaskLog.setTaskId(taskId); - datasetTableTaskLog.setStatus(JobStatus.Underway.name()); - datasetTableTaskLog.setStartTime(System.currentTimeMillis()); - dataSetTableTaskLogService.save(datasetTableTaskLog); Admin admin = getConnection().getAdmin(); DatasetTable datasetTable = dataSetTableService.get(datasetTableId); List datasetTableFields = dataSetTableFieldsService.list(DatasetTableField.builder().tableId(datasetTable.getId()).build()); String table = new Gson().fromJson(datasetTable.getInfo(), DataTableInfoDTO.class).getTable(); TableName tableName = TableName.valueOf(table + "-" + datasetTable.getDataSourceId()); - if(!admin.tableExists(tableName)){ - TableDescriptorBuilder descBuilder = TableDescriptorBuilder.newBuilder(tableName); - ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of("cf"); - descBuilder.setColumnFamily(hcd); - TableDescriptor desc = descBuilder.build(); - admin.createTable(desc); - } - admin.disableTable(tableName); - admin.truncateTable(tableName, true); - - Table tab = getConnection().getTable(tableName); - Long total = dataSetTableService.getDataSetTotalData(datasetTable.getDataSourceId(), table); - Long pageCount = total % pageSize == 0 ? total / pageSize : (total / pageSize) + 1; - - for (Long pageIndex = 1l; pageIndex <= pageCount; pageIndex++) { - List data = dataSetTableService.getDataSetPageData(datasetTable.getDataSourceId(), table, datasetTableFields, pageIndex, pageSize); - for (String[] d : data) { - for(int i=0;i dataSetTaskLogDTOS = dataSetTableTaskLogService.list(request); + if(CollectionUtils.isEmpty(dataSetTaskLogDTOS)){ + return; + } + writeDatasetTableTaskLog(datasetTableTaskLog,datasetTableId, taskId); + + // 增量添加 + if(StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd())){ + String sql = datasetTableIncrementalConfig.getIncrementalAdd().replace(lastUpdateTime, dataSetTaskLogDTOS.get(0).getStartTime().toString() + .replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString())); + extractIncrementalData(tableName,table,datasetTable, datasetTableFields, sql, "add"); + } + + // 增量删除 + if( StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalDelete())){ + String sql = datasetTableIncrementalConfig.getIncrementalDelete().replace(lastUpdateTime, dataSetTaskLogDTOS.get(0).getStartTime().toString() + .replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString())); + extractIncrementalData(tableName,table,datasetTable, datasetTableFields, sql, "delete"); + } + + datasetTableTaskLog.setStatus(JobStatus.Completed.name()); + datasetTableTaskLog.setEndTime(System.currentTimeMillis()); + dataSetTableTaskLogService.save(datasetTableTaskLog); + break; } - datasetTableTaskLog.setStatus(JobStatus.Completed.name()); - datasetTableTaskLog.setEndTime(System.currentTimeMillis()); - dataSetTableTaskLogService.save(datasetTableTaskLog); }catch (Exception e){ e.printStackTrace(); LogUtil.error("ExtractData error, dataaset: " + datasetTableId); @@ -84,8 +108,75 @@ public class ExtractDataService { datasetTableTaskLog.setEndTime(System.currentTimeMillis()); dataSetTableTaskLogService.save(datasetTableTaskLog); } + finally { + DatasetTableTask datasetTableTask = dataSetTableTaskService.get(taskId); + if (datasetTableTask != null && datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString())){ + datasetTableTask.setRate(ScheduleType.SIMPLE_COMPLETE.toString()); + dataSetTableTaskService.update(datasetTableTask); + } + } } + private void writeDatasetTableTaskLog(DatasetTableTaskLog datasetTableTaskLog, String datasetTableId, String taskId){ + datasetTableTaskLog.setTableId(datasetTableId); + datasetTableTaskLog.setTaskId(taskId); + datasetTableTaskLog.setStatus(JobStatus.Underway.name()); + datasetTableTaskLog.setStartTime(System.currentTimeMillis()); + dataSetTableTaskLogService.save(datasetTableTaskLog); + } + + private void creatHaseTable(TableName tableName, Admin admin)throws Exception{ + TableDescriptorBuilder descBuilder = TableDescriptorBuilder.newBuilder(tableName); + ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(column_family); + descBuilder.setColumnFamily(hcd); + TableDescriptor desc = descBuilder.build(); + admin.createTable(desc); + } + + private void extractAllData(Admin admin, TableName tableName, String table, DatasetTable datasetTable, List datasetTableFields)throws Exception{ + admin.disableTable(tableName); + admin.truncateTable(tableName, true); + + Table tab = getConnection().getTable(tableName); + Long total = dataSetTableService.getDataSetTotalData(datasetTable.getDataSourceId(), table); + Long pageCount = total % pageSize == 0 ? total / pageSize : (total / pageSize) + 1; + + for (Long pageIndex = 1l; pageIndex <= pageCount; pageIndex++) { + List data = dataSetTableService.getDataSetPageData(datasetTable.getDataSourceId(), table, datasetTableFields, pageIndex, pageSize); + insertDataToHbaseTable(data,datasetTableFields,tab); + } + } + + private void extractIncrementalData(TableName tableName, String table, DatasetTable datasetTable, List datasetTableFields, String sql, String type)throws Exception{ + Table tab = getConnection().getTable(tableName); + List data = dataSetTableService.getDataSetDataBySql(datasetTable.getDataSourceId(), table, sql); + if (type.equalsIgnoreCase("add")){ + insertDataToHbaseTable(data,datasetTableFields,tab); + }else { + deleteDataFromHbaseTable(data,datasetTableFields,tab); + } + } + + private void insertDataToHbaseTable(List data, List datasetTableFields, Table tab)throws Exception{ + for (String[] d : data) { + Put put = new Put(md5(generateStr(datasetTableFields.size(), d)).getBytes()); + for(int i=0;i data, List datasetTableFields, Table tab)throws Exception{ + for (String[] d : data) { + Delete delete = new Delete(md5(generateStr(datasetTableFields.size(), d)).getBytes()); + tab.delete(delete); + } + } private synchronized Connection getConnection() throws Exception{ if(connection == null || connection.isClosed()){ @@ -94,4 +185,43 @@ public class ExtractDataService { } return connection; } + + + private static final char[] HEX_DIGITS = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'}; + private static final String UTF_8 = "UTF-8"; + + public static String md5(String src) { + return md5(src, UTF_8); + } + + public static String md5(String src, String charset) { + try { + byte[] strTemp = io.micrometer.core.instrument.util.StringUtils.isEmpty(charset) ? src.getBytes() : src.getBytes(charset); + MessageDigest mdTemp = MessageDigest.getInstance("MD5"); + mdTemp.update(strTemp); + + byte[] md = mdTemp.digest(); + int j = md.length; + char[] str = new char[j * 2]; + int k = 0; + + for (byte byte0 : md) { + str[k++] = HEX_DIGITS[byte0 >>> 4 & 0xf]; + str[k++] = HEX_DIGITS[byte0 & 0xf]; + } + + return new String(str); + } catch (Exception e) { + throw new RuntimeException("MD5 encrypt error:", e); + } + } + + public String generateStr(int size, String[] d ){ + String str = null; + for(int i=0;i - - - + + + + @@ -64,8 +64,13 @@ - - + + + + + + + diff --git a/frontend/src/lang/zh.js b/frontend/src/lang/zh.js index 7d864f6b1d..6da2829093 100644 --- a/frontend/src/lang/zh.js +++ b/frontend/src/lang/zh.js @@ -671,7 +671,12 @@ export default { add_sql_table: '添加SQL', preview: '预览', pls_input_name: '请输入名称', - connect_mode: '连接模式' + connect_mode: '连接模式', + incremental_update_type: '增量更新方式:', + incremental_add: '增量添加:', + incremental_delete: '增量删除:', + last_update_time: '上次更新时间:', + current_update_time: '当前更新时间:' }, datasource: { create: '新建数据连接', @@ -689,6 +694,7 @@ export default { please_input_port: '请输入端口', modify: '编辑数据连接', validate_success: '校验成功', + validate: '校验', delete: '删除组织', delete_confirm: '删除该组织会关联删除该组织下的所有资源(如:相关工作空间,项目,测试用例等),确定要删除吗?', input_name: '请输入名称', diff --git a/frontend/src/views/dataset/data/UpdateInfo.vue b/frontend/src/views/dataset/data/UpdateInfo.vue index 7d061ee51e..ad18afdfef 100644 --- a/frontend/src/views/dataset/data/UpdateInfo.vue +++ b/frontend/src/views/dataset/data/UpdateInfo.vue @@ -79,12 +79,11 @@ @@ -97,22 +96,22 @@ /> - + - + - + - {{ $t('dataset.execute_once') }} - {{ $t('dataset.cron_config') }} + {{ $t('dataset.execute_once') }} + {{ $t('dataset.execute_once') }} + {{ $t('dataset.cron_config') }} + + + + + {{ $t('dataset.incremental_update_type') }} + + + {{ $t('dataset.incremental_add') }} + {{ $t('incremental_delete.incremental_update_type') }} + + + + + + + + 参数: + + {{ $t('dataset.last_update_time') }} + {{ $t('dataset.current_update_time') }} + + + + + + + + + + + + @@ -201,9 +241,33 @@ @@ -369,4 +511,13 @@ export default { .el-form-item { margin-bottom: 10px; } + + .codemirror { + height: 160px; + overflow-y: auto; + } + .codemirror >>> .CodeMirror-scroll { + height: 160px; + overflow-y: auto; + } diff --git a/frontend/src/views/system/datasource/index.vue b/frontend/src/views/system/datasource/index.vue index 347f51134b..cb9a3e59ab 100644 --- a/frontend/src/views/system/datasource/index.vue +++ b/frontend/src/views/system/datasource/index.vue @@ -11,7 +11,7 @@ @search="search" > - + @@ -62,7 +62,7 @@ - + @@ -79,7 +79,8 @@ @@ -198,13 +199,12 @@ export default { this.$success(this.$t('commons.save_success')) this.search() this.dialogVisible = false - }) + }); } else { return false } }) }, - validaDatasource(datasourceForm) { this.$refs[datasourceForm].validate(valid => { if (valid) { @@ -230,7 +230,6 @@ export default { const result = {} if (condition && condition.quick) { for (const [key, value] of Object.entries(condition)) { - // console.log(`${key}`) if (`${key}` === 'quick') { const v_new = Object.assign({}, value) v_new['field'] = 'name'