Merge pull request #7786 from dataease/pr@dev@fixSql
fix: 定时同步任务支持在历史数据变动后增量更新到doris中 #6276
This commit is contained in:
commit
2dde48fb54
@ -10,6 +10,7 @@ import io.dataease.provider.DDLProviderImpl;
|
||||
import org.apache.commons.collections4.CollectionUtils;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
@ -53,8 +54,16 @@ public class DorisDDLProvider extends DDLProviderImpl {
|
||||
.replace("BUCKETS_NUM", dorisConfiguration.getBucketNum().toString())
|
||||
.replace("ReplicationNum", dorisConfiguration.getReplicationNum().toString());
|
||||
if (dataTableInfoDTO.isSetKey() && CollectionUtils.isNotEmpty(dataTableInfoDTO.getKeys())) {
|
||||
sql = sql.replace("`UNIQUE_KEY`", "`" + String.join("`, `", dataTableInfoDTO.getKeys()) + "`")
|
||||
.replace("DISTRIBUTED_BY_HASH", dataTableInfoDTO.getKeys().get(0)).replace("Column_Fields", createDorisTableColumnSql(datasetTableFields, version));
|
||||
List<String> keys = new ArrayList<>();
|
||||
for (int i = 0; i < dataTableInfoDTO.getKeys().size(); i++) {
|
||||
for (DatasetTableField datasetTableField : datasetTableFields) {
|
||||
if (datasetTableField.getOriginName().equalsIgnoreCase(dataTableInfoDTO.getKeys().get(i))) {
|
||||
keys.add(datasetTableField.getDataeaseName());
|
||||
}
|
||||
}
|
||||
}
|
||||
sql = sql.replace("`UNIQUE_KEY`", "`" + String.join("`, `", keys) + "`")
|
||||
.replace("DISTRIBUTED_BY_HASH", keys.get(0)).replace("Column_Fields", createDorisTableColumnSql(datasetTableFields, version));
|
||||
} else {
|
||||
sql = sql.replace("UNIQUE_KEY", "dataease_uuid").replace("DISTRIBUTED_BY_HASH", "dataease_uuid").replace("Column_Fields", createDorisTableColumnSql(datasetTableFields, version));
|
||||
}
|
||||
|
||||
@ -176,16 +176,7 @@ public class ExtractDataService {
|
||||
if (datasetTableFields == null) {
|
||||
datasetTableFields = dataSetTableFieldsService.list(DatasetTableField.builder().tableId(datasetTable.getId()).build());
|
||||
}
|
||||
datasetTableFields = datasetTableFields.stream().filter(datasetTableField -> datasetTableField.getExtField() == 0).collect(Collectors.toList());
|
||||
datasetTableFields.sort((o1, o2) -> {
|
||||
if (o1.getColumnIndex() == null) {
|
||||
return -1;
|
||||
}
|
||||
if (o2.getColumnIndex() == null) {
|
||||
return 1;
|
||||
}
|
||||
return o1.getColumnIndex().compareTo(o2.getColumnIndex());
|
||||
});
|
||||
datasetTableFields = sortDatasetTableFields(datasetTable, datasetTableFields);
|
||||
|
||||
DatasetTableTaskLog datasetTableTaskLog = writeExcelLog(datasetTableId, ops);
|
||||
switch (updateType) {
|
||||
@ -211,7 +202,7 @@ public class ExtractDataService {
|
||||
for (DatasetTableField oldField : oldFields) {
|
||||
boolean delete = true;
|
||||
for (DatasetTableField datasetTableField : datasetTableFields) {
|
||||
if (oldField.getDataeaseName().equalsIgnoreCase(datasetTableField.getDataeaseName()) && oldField.getType().equalsIgnoreCase(datasetTableField.getType())) {
|
||||
if (!oldField.getDataeaseName().equalsIgnoreCase("dataease_uuid") && oldField.getDataeaseName().equalsIgnoreCase(datasetTableField.getDataeaseName()) && oldField.getType().equalsIgnoreCase(datasetTableField.getType())) {
|
||||
delete = false;
|
||||
}
|
||||
}
|
||||
@ -422,8 +413,12 @@ public class ExtractDataService {
|
||||
}
|
||||
|
||||
public List<DatasetTableField> getDatasetTableFields(DatasetTable datasetTable) {
|
||||
DataTableInfoDTO dataTableInfoDTO = new Gson().fromJson(datasetTable.getInfo(), DataTableInfoDTO.class);
|
||||
List<DatasetTableField> datasetTableFields = dataSetTableFieldsService.list(DatasetTableField.builder().tableId(datasetTable.getId()).build());
|
||||
return sortDatasetTableFields(datasetTable, datasetTableFields);
|
||||
}
|
||||
|
||||
private List<DatasetTableField> sortDatasetTableFields(DatasetTable datasetTable, List<DatasetTableField> datasetTableFields) {
|
||||
DataTableInfoDTO dataTableInfoDTO = new Gson().fromJson(datasetTable.getInfo(), DataTableInfoDTO.class);
|
||||
datasetTableFields = datasetTableFields.stream().filter(datasetTableField -> datasetTableField.getExtField() == 0).collect(Collectors.toList());
|
||||
datasetTableFields.sort((o1, o2) -> {
|
||||
if (o1.getColumnIndex() == null) {
|
||||
@ -448,14 +443,15 @@ public class ExtractDataService {
|
||||
orderKeyDatasetTableFields.add(datasetTableField);
|
||||
}
|
||||
}
|
||||
return orderKeyDatasetTableFields;
|
||||
} else {
|
||||
DatasetTableField datasetTableField = new DatasetTableField();
|
||||
datasetTableField.setDeExtractType(0);
|
||||
datasetTableField.setDataeaseName("dataease_uuid");
|
||||
datasetTableField.setOriginName("dataease_uuid");
|
||||
datasetTableFields.add(0, datasetTableField);
|
||||
return datasetTableFields;
|
||||
}
|
||||
return datasetTableFields;
|
||||
}
|
||||
|
||||
private void extractData(DatasetTable datasetTable, Datasource datasource, List<DatasetTableField> datasetTableFields, String extractType, String selectSQL) throws Exception {
|
||||
@ -1103,11 +1099,12 @@ public class ExtractDataService {
|
||||
}
|
||||
|
||||
private Map<String, String> getSelectSQL(String extractType, DatasetTable datasetTable, Datasource datasource, List<DatasetTableField> datasetTableFields, String selectSQL) {
|
||||
List<DatasetTableField> fields = datasetTableFields.stream().filter(datasetTableField -> !datasetTableField.getDataeaseName().equalsIgnoreCase("dataease_uuid")).collect(Collectors.toList());
|
||||
Map<String, String> sql = new HashMap<>();
|
||||
if (extractType.equalsIgnoreCase("all_scope") && datasetTable.getType().equalsIgnoreCase(DatasetType.DB.name())) {
|
||||
String tableName = new Gson().fromJson(datasetTable.getInfo(), DataTableInfoDTO.class).getTable();
|
||||
QueryProvider qp = ProviderFactory.getQueryProvider(datasource.getType());
|
||||
sql.put("selectSQL", qp.createRawQuerySQL(tableName, datasetTableFields, datasource));
|
||||
sql.put("selectSQL", qp.createRawQuerySQL(tableName, fields, datasource));
|
||||
sql.put("totalSql", qp.getTotalCount(true, tableName, datasource));
|
||||
}
|
||||
|
||||
@ -1119,13 +1116,13 @@ public class ExtractDataService {
|
||||
}
|
||||
QueryProvider qp = ProviderFactory.getQueryProvider(datasource.getType());
|
||||
sql.put("totalSql", qp.getTotalCount(false, selectSQL, datasource));
|
||||
sql.put("selectSQL", qp.createRawQuerySQLAsTmp(selectSQL, datasetTableFields));
|
||||
sql.put("selectSQL", qp.createRawQuerySQLAsTmp(selectSQL, fields));
|
||||
}
|
||||
|
||||
if (!extractType.equalsIgnoreCase("all_scope")) {
|
||||
QueryProvider qp = ProviderFactory.getQueryProvider(datasource.getType());
|
||||
sql.put("totalSql", qp.getTotalCount(false, selectSQL, datasource));
|
||||
sql.put("selectSQL", qp.createRawQuerySQLAsTmp(selectSQL, datasetTableFields));
|
||||
sql.put("selectSQL", qp.createRawQuerySQLAsTmp(selectSQL, fields));
|
||||
}
|
||||
|
||||
return sql;
|
||||
@ -1233,7 +1230,7 @@ public class ExtractDataService {
|
||||
textFileOutputMeta.setExtension(extension);
|
||||
|
||||
if (datasource.getType().equalsIgnoreCase(DatasourceTypes.oracle.name())) {
|
||||
TextFileField[] outputFields = new TextFileField[datasetTableFields.size() + 1];
|
||||
TextFileField[] outputFields = new TextFileField[datasetTableFields.size()];
|
||||
for (int i = 0; i < datasetTableFields.size(); i++) {
|
||||
TextFileField textFileField = new TextFileField();
|
||||
textFileField.setName(datasetTableFields.get(i).getOriginName());
|
||||
@ -1242,7 +1239,7 @@ public class ExtractDataService {
|
||||
}
|
||||
textFileOutputMeta.setOutputFields(outputFields);
|
||||
} else if (datasource.getType().equalsIgnoreCase(DatasourceTypes.sqlServer.name()) || datasource.getType().equalsIgnoreCase(DatasourceTypes.pg.name()) || datasource.getType().equalsIgnoreCase(DatasourceTypes.mysql.name())) {
|
||||
TextFileField[] outputFields = new TextFileField[datasetTableFields.size() + 1];
|
||||
TextFileField[] outputFields = new TextFileField[datasetTableFields.size()];
|
||||
for (int i = 0; i < datasetTableFields.size(); i++) {
|
||||
TextFileField textFileField = new TextFileField();
|
||||
textFileField.setName(datasetTableFields.get(i).getDataeaseName());
|
||||
@ -1257,7 +1254,7 @@ public class ExtractDataService {
|
||||
}
|
||||
textFileOutputMeta.setOutputFields(outputFields);
|
||||
} else if (datasource.getType().equalsIgnoreCase(DatasourceTypes.excel.name())) {
|
||||
TextFileField[] outputFields = new TextFileField[datasetTableFields.size() + 1];
|
||||
TextFileField[] outputFields = new TextFileField[datasetTableFields.size()];
|
||||
for (int i = 0; i < datasetTableFields.size(); i++) {
|
||||
TextFileField textFileField = new TextFileField();
|
||||
textFileField.setName(datasetTableFields.get(i).getDataeaseName());
|
||||
@ -1294,16 +1291,16 @@ public class ExtractDataService {
|
||||
|
||||
UserDefinedJavaClassMeta userDefinedJavaClassMeta = new UserDefinedJavaClassMeta();
|
||||
List<UserDefinedJavaClassMeta.FieldInfo> fields = new ArrayList<>();
|
||||
if(isSetKey){
|
||||
if (isSetKey) {
|
||||
UserDefinedJavaClassMeta.FieldInfo fieldInfo = new UserDefinedJavaClassMeta.FieldInfo("dataease_uuid", ValueMetaInterface.TYPE_STRING, -1, -1);
|
||||
fields.add(fieldInfo);
|
||||
}
|
||||
userDefinedJavaClassMeta.setFieldInfo(fields);
|
||||
List<UserDefinedJavaClassDef> definitions = new ArrayList<>();
|
||||
String tmp_code = code.replace("handleWraps", handleWraps).replace("handleBinaryType", handleBinaryTypeCode.toString());
|
||||
if(isSetKey){
|
||||
if (isSetKey) {
|
||||
tmp_code.replace("handleDataease_uuid", "");
|
||||
}else {
|
||||
} else {
|
||||
tmp_code.replace("handleDataease_uuid", handleDataease_uuid);
|
||||
}
|
||||
String Column_Fields;
|
||||
@ -1458,7 +1455,7 @@ public class ExtractDataService {
|
||||
" get(Fields.Out, filed).setValue(r, tmp);\n" +
|
||||
" } \n";
|
||||
|
||||
private final static String handleDataease_uuid = "get(Fields.Out, \"dataease_uuid\").setValue(r, md5);" ;
|
||||
private final static String handleDataease_uuid = "get(Fields.Out, \"dataease_uuid\").setValue(r, md5);";
|
||||
private final static String handleCharset = "\tif(tmp != null && Arrays.asList(\"varcharFields\".split(\",\")).contains(filed)){\n" +
|
||||
" \t\t\ttry {\n" +
|
||||
"\t\t\t\tget(Fields.Out, filed).setValue(r, new String(tmp.getBytes(\"Datasource_Charset\"), \"Target_Charset\"));\n" +
|
||||
|
||||
Loading…
Reference in New Issue
Block a user