fix: 定时同步任务支持在历史数据变动后增量更新到doris中 #6276
This commit is contained in:
parent
06f0d69bcc
commit
7a934daafd
@ -20,8 +20,8 @@ import java.util.List;
|
||||
public class DorisDDLProvider extends DDLProviderImpl {
|
||||
private static final String creatTableSql = "CREATE TABLE IF NOT EXISTS `TABLE_NAME`" +
|
||||
"Column_Fields" +
|
||||
"UNIQUE KEY(`dataease_uuid`)\n" +
|
||||
"DISTRIBUTED BY HASH(dataease_uuid) BUCKETS BUCKETS_NUM\n" +
|
||||
"UNIQUE KEY(`UNIQUE_KEY`)\n" +
|
||||
"DISTRIBUTED BY HASH(`DISTRIBUTED_BY_HASH`) BUCKETS BUCKETS_NUM\n" +
|
||||
"PROPERTIES(\"replication_num\" = \"ReplicationNum\");";
|
||||
|
||||
@Override
|
||||
@ -48,18 +48,22 @@ public class DorisDDLProvider extends DDLProviderImpl {
|
||||
@Override
|
||||
public String createTableSql(DataTableInfoDTO dataTableInfoDTO, String tableName, List<DatasetTableField> datasetTableFields, Datasource engine, String version) {
|
||||
DorisConfiguration dorisConfiguration = new Gson().fromJson(engine.getConfiguration(), DorisConfiguration.class);
|
||||
String dorisTableColumnSql = createDorisTableColumnSql(datasetTableFields, version);
|
||||
String sql = creatTableSql.replace("TABLE_NAME", tableName).replace("Column_Fields", dorisTableColumnSql)
|
||||
|
||||
String sql = creatTableSql.replace("TABLE_NAME", tableName)
|
||||
.replace("BUCKETS_NUM", dorisConfiguration.getBucketNum().toString())
|
||||
.replace("ReplicationNum", dorisConfiguration.getReplicationNum().toString());
|
||||
if (dataTableInfoDTO.isSetKey() && CollectionUtils.isNotEmpty(dataTableInfoDTO.getKeys())) {
|
||||
sql = sql.replace("`dataease_uuid`", "`" + String.join("`, `", dataTableInfoDTO.getKeys()) + "`");
|
||||
sql = sql.replace("`UNIQUE_KEY`", "`" + String.join("`, `", dataTableInfoDTO.getKeys()) + "`")
|
||||
.replace("DISTRIBUTED_BY_HASH", dataTableInfoDTO.getKeys().get(0)).replace("Column_Fields", createDorisTableColumnSql(datasetTableFields, version));
|
||||
} else {
|
||||
sql = sql.replace("UNIQUE_KEY", "dataease_uuid").replace("DISTRIBUTED_BY_HASH", "dataease_uuid").replace("Column_Fields", createDorisTableColumnSql(datasetTableFields, version));
|
||||
}
|
||||
|
||||
return sql;
|
||||
}
|
||||
|
||||
private String createDorisTableColumnSql(final List<DatasetTableField> datasetTableFields, String version) {
|
||||
StringBuilder Column_Fields = new StringBuilder("dataease_uuid varchar(50), `");
|
||||
StringBuilder Column_Fields = new StringBuilder("`");
|
||||
for (DatasetTableField datasetTableField : datasetTableFields) {
|
||||
Column_Fields.append(datasetTableField.getDataeaseName()).append("` ");
|
||||
Integer size = datasetTableField.getSize() * 3;
|
||||
|
||||
@ -78,7 +78,7 @@ public class MysqlDDLProvider extends DDLProviderImpl {
|
||||
}
|
||||
|
||||
private String createDorisTableColumnSql(final List<DatasetTableField> datasetTableFields) {
|
||||
StringBuilder Column_Fields = new StringBuilder("dataease_uuid varchar(50), `");
|
||||
StringBuilder Column_Fields = new StringBuilder("`");
|
||||
for (DatasetTableField datasetTableField : datasetTableFields) {
|
||||
Column_Fields.append(datasetTableField.getDataeaseName()).append("` ");
|
||||
Integer size = datasetTableField.getSize() * 4;
|
||||
|
||||
@ -205,7 +205,7 @@ public class ExtractDataService {
|
||||
saveSuccessLog(datasetTableTaskLog, false);
|
||||
updateTableStatus(datasetTableId, JobStatus.Completed, execTime);
|
||||
if (ops.equalsIgnoreCase("替换")) {
|
||||
List<DatasetTableField> oldFields = getDatasetTableFields(datasetTable.getId());
|
||||
List<DatasetTableField> oldFields = getDatasetTableFields(datasetTable);
|
||||
List<DatasetTableField> toAdd = new ArrayList<>();
|
||||
List<DatasetTableField> toDelete = new ArrayList<>();
|
||||
for (DatasetTableField oldField : oldFields) {
|
||||
@ -311,7 +311,7 @@ public class ExtractDataService {
|
||||
} else {
|
||||
datasource.setType(datasetTable.getType());
|
||||
}
|
||||
List<DatasetTableField> datasetTableFields = getDatasetTableFields(datasetTable.getId());
|
||||
List<DatasetTableField> datasetTableFields = getDatasetTableFields(datasetTable);
|
||||
boolean msg = false;
|
||||
JobStatus lastExecStatus = JobStatus.Completed;
|
||||
Long execTime = null;
|
||||
@ -421,8 +421,9 @@ public class ExtractDataService {
|
||||
|
||||
}
|
||||
|
||||
public List<DatasetTableField> getDatasetTableFields(String datasetTableId) {
|
||||
List<DatasetTableField> datasetTableFields = dataSetTableFieldsService.list(DatasetTableField.builder().tableId(datasetTableId).build());
|
||||
public List<DatasetTableField> getDatasetTableFields(DatasetTable datasetTable) {
|
||||
DataTableInfoDTO dataTableInfoDTO = new Gson().fromJson(datasetTable.getInfo(), DataTableInfoDTO.class);
|
||||
List<DatasetTableField> datasetTableFields = dataSetTableFieldsService.list(DatasetTableField.builder().tableId(datasetTable.getId()).build());
|
||||
datasetTableFields = datasetTableFields.stream().filter(datasetTableField -> datasetTableField.getExtField() == 0).collect(Collectors.toList());
|
||||
datasetTableFields.sort((o1, o2) -> {
|
||||
if (o1.getColumnIndex() == null) {
|
||||
@ -433,6 +434,27 @@ public class ExtractDataService {
|
||||
}
|
||||
return o1.getColumnIndex().compareTo(o2.getColumnIndex());
|
||||
});
|
||||
if (dataTableInfoDTO.isSetKey() && CollectionUtils.isNotEmpty(dataTableInfoDTO.getKeys()) && !engineService.isSimpleMode()) {
|
||||
List<DatasetTableField> orderKeyDatasetTableFields = new ArrayList<>();
|
||||
for (int i = 0; i < dataTableInfoDTO.getKeys().size(); i++) {
|
||||
for (DatasetTableField datasetTableField : datasetTableFields) {
|
||||
if (datasetTableField.getOriginName().equalsIgnoreCase(dataTableInfoDTO.getKeys().get(i))) {
|
||||
orderKeyDatasetTableFields.add(datasetTableField);
|
||||
}
|
||||
}
|
||||
}
|
||||
for (DatasetTableField datasetTableField : datasetTableFields) {
|
||||
if (!dataTableInfoDTO.getKeys().contains(datasetTableField.getOriginName())) {
|
||||
orderKeyDatasetTableFields.add(datasetTableField);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
DatasetTableField datasetTableField = new DatasetTableField();
|
||||
datasetTableField.setDeExtractType(0);
|
||||
datasetTableField.setDataeaseName("dataease_uuid");
|
||||
datasetTableField.setOriginName("dataease_uuid");
|
||||
datasetTableFields.add(0, datasetTableField);
|
||||
}
|
||||
return datasetTableFields;
|
||||
}
|
||||
|
||||
@ -483,7 +505,7 @@ public class ExtractDataService {
|
||||
|
||||
Datasource engine = engineService.getDeEngine();
|
||||
DorisConfiguration dorisConfiguration = new Gson().fromJson(engine.getConfiguration(), DorisConfiguration.class);
|
||||
String columns = datasetTableFields.stream().map(DatasetTableField::getDataeaseName).collect(Collectors.joining(",")) + ",dataease_uuid";
|
||||
String columns = datasetTableFields.stream().map(DatasetTableField::getDataeaseName).collect(Collectors.joining(","));
|
||||
|
||||
String dataFile = null;
|
||||
String script = null;
|
||||
@ -874,7 +896,7 @@ public class ExtractDataService {
|
||||
String script = null;
|
||||
Datasource dorisDatasource = engineService.getDeEngine();
|
||||
DorisConfiguration dorisConfiguration = new Gson().fromJson(dorisDatasource.getConfiguration(), DorisConfiguration.class);
|
||||
String columns = columnFields + ",dataease_uuid";
|
||||
String columns = columnFields;
|
||||
String streamLoadScript = "";
|
||||
if (kettleFilesKeep) {
|
||||
streamLoadScript = shellScript;
|
||||
@ -962,6 +984,8 @@ public class ExtractDataService {
|
||||
if (engineService.isSimpleMode()) {
|
||||
return;
|
||||
}
|
||||
DataTableInfoDTO dataTableInfoDTO = new Gson().fromJson(datasetTable.getInfo(), DataTableInfoDTO.class);
|
||||
boolean isSetKey = dataTableInfoDTO.isSetKey() && CollectionUtils.isNotEmpty(dataTableInfoDTO.getKeys());
|
||||
TransMeta transMeta = new TransMeta();
|
||||
String outFile = null;
|
||||
DatasourceTypes datasourceType = DatasourceTypes.valueOf(datasource.getType());
|
||||
@ -989,7 +1013,7 @@ public class ExtractDataService {
|
||||
}
|
||||
transMeta.addDatabase(dataMeta);
|
||||
inputSteps = inputStep(transMeta, selectSQL, mysqlConfiguration);
|
||||
udjcStep = udjc(datasetTableFields, DatasourceTypes.mysql, mysqlConfiguration);
|
||||
udjcStep = udjc(datasetTableFields, DatasourceTypes.mysql, mysqlConfiguration, isSetKey);
|
||||
break;
|
||||
case sqlServer:
|
||||
SqlServerConfiguration sqlServerConfiguration = new Gson().fromJson(datasource.getConfiguration(), SqlServerConfiguration.class);
|
||||
@ -1216,11 +1240,6 @@ public class ExtractDataService {
|
||||
textFileField.setType("String");
|
||||
outputFields[i] = textFileField;
|
||||
}
|
||||
TextFileField textFileField = new TextFileField();
|
||||
textFileField.setName("dataease_uuid");
|
||||
textFileField.setType("String");
|
||||
outputFields[datasetTableFields.size()] = textFileField;
|
||||
|
||||
textFileOutputMeta.setOutputFields(outputFields);
|
||||
} else if (datasource.getType().equalsIgnoreCase(DatasourceTypes.sqlServer.name()) || datasource.getType().equalsIgnoreCase(DatasourceTypes.pg.name()) || datasource.getType().equalsIgnoreCase(DatasourceTypes.mysql.name())) {
|
||||
TextFileField[] outputFields = new TextFileField[datasetTableFields.size() + 1];
|
||||
@ -1236,11 +1255,6 @@ public class ExtractDataService {
|
||||
|
||||
outputFields[i] = textFileField;
|
||||
}
|
||||
TextFileField textFileField = new TextFileField();
|
||||
textFileField.setName("dataease_uuid");
|
||||
textFileField.setType("String");
|
||||
outputFields[datasetTableFields.size()] = textFileField;
|
||||
|
||||
textFileOutputMeta.setOutputFields(outputFields);
|
||||
} else if (datasource.getType().equalsIgnoreCase(DatasourceTypes.excel.name())) {
|
||||
TextFileField[] outputFields = new TextFileField[datasetTableFields.size() + 1];
|
||||
@ -1256,10 +1270,6 @@ public class ExtractDataService {
|
||||
|
||||
outputFields[i] = textFileField;
|
||||
}
|
||||
TextFileField textFileField = new TextFileField();
|
||||
textFileField.setName("dataease_uuid");
|
||||
textFileField.setType("String");
|
||||
outputFields[datasetTableFields.size()] = textFileField;
|
||||
|
||||
textFileOutputMeta.setOutputFields(outputFields);
|
||||
} else {
|
||||
@ -1272,7 +1282,7 @@ public class ExtractDataService {
|
||||
return outputStep;
|
||||
}
|
||||
|
||||
private StepMeta udjc(List<DatasetTableField> datasetTableFields, DatasourceTypes datasourceType, JdbcConfiguration jdbcConfiguration) {
|
||||
private StepMeta udjc(List<DatasetTableField> datasetTableFields, DatasourceTypes datasourceType, JdbcConfiguration jdbcConfiguration, boolean isSetKey) {
|
||||
StringBuilder handleBinaryTypeCode = new StringBuilder();
|
||||
String excelCompletion = "";
|
||||
|
||||
@ -1284,12 +1294,18 @@ public class ExtractDataService {
|
||||
|
||||
UserDefinedJavaClassMeta userDefinedJavaClassMeta = new UserDefinedJavaClassMeta();
|
||||
List<UserDefinedJavaClassMeta.FieldInfo> fields = new ArrayList<>();
|
||||
if(isSetKey){
|
||||
UserDefinedJavaClassMeta.FieldInfo fieldInfo = new UserDefinedJavaClassMeta.FieldInfo("dataease_uuid", ValueMetaInterface.TYPE_STRING, -1, -1);
|
||||
fields.add(fieldInfo);
|
||||
}
|
||||
userDefinedJavaClassMeta.setFieldInfo(fields);
|
||||
List<UserDefinedJavaClassDef> definitions = new ArrayList<>();
|
||||
String tmp_code = code.replace("handleWraps", handleWraps).replace("handleBinaryType", handleBinaryTypeCode.toString());
|
||||
|
||||
if(isSetKey){
|
||||
tmp_code.replace("handleDataease_uuid", "");
|
||||
}else {
|
||||
tmp_code.replace("handleDataease_uuid", handleDataease_uuid);
|
||||
}
|
||||
String Column_Fields;
|
||||
|
||||
if (datasourceType.equals(DatasourceTypes.oracle) || datasourceType.equals(DatasourceTypes.db2)) {
|
||||
@ -1442,12 +1458,14 @@ public class ExtractDataService {
|
||||
" get(Fields.Out, filed).setValue(r, tmp);\n" +
|
||||
" } \n";
|
||||
|
||||
private final static String handleDataease_uuid = "get(Fields.Out, \"dataease_uuid\").setValue(r, md5);" ;
|
||||
private final static String handleCharset = "\tif(tmp != null && Arrays.asList(\"varcharFields\".split(\",\")).contains(filed)){\n" +
|
||||
" \t\t\ttry {\n" +
|
||||
"\t\t\t\tget(Fields.Out, filed).setValue(r, new String(tmp.getBytes(\"Datasource_Charset\"), \"Target_Charset\"));\n" +
|
||||
" \t\t}catch (Exception e){}\n" +
|
||||
"\t\t}";
|
||||
|
||||
|
||||
private final static String code = "import org.pentaho.di.core.row.ValueMetaInterface;\n" +
|
||||
"import java.util.List;\n" +
|
||||
"import java.io.File;\n" +
|
||||
@ -1487,7 +1505,7 @@ public class ExtractDataService {
|
||||
" }\n" +
|
||||
"\n" +
|
||||
" String md5 = md5(str);\n" +
|
||||
" get(Fields.Out, \"dataease_uuid\").setValue(r, md5);\n" +
|
||||
" handleDataease_uuid\n" +
|
||||
"\n" +
|
||||
" putRow(data.outputRowMeta, r);\n" +
|
||||
"\n" +
|
||||
|
||||
@ -280,7 +280,7 @@ public class PanelAppTemplateService {
|
||||
public void createDorisTable(List<DatasetTable> datasetTablesInfo) throws Exception {
|
||||
for (DatasetTable datasetTable : datasetTablesInfo) {
|
||||
if (1 == datasetTable.getMode() && !(DatasetType.CUSTOM.name().equalsIgnoreCase(datasetTable.getType()) || DatasetType.UNION.name().equalsIgnoreCase(datasetTable.getType()))) {
|
||||
List<DatasetTableField> fields = extractDataService.getDatasetTableFields(datasetTable.getId());
|
||||
List<DatasetTableField> fields = extractDataService.getDatasetTableFields(datasetTable);
|
||||
extractDataService.createEngineTable(datasetTable.getInfo(), TableUtils.tableName(datasetTable.getId()), fields);
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user