|
|
|
|
@ -10,7 +10,8 @@ import io.dataease.commons.utils.CommonBeanFactory;
|
|
|
|
|
import io.dataease.commons.utils.DorisTableUtils;
|
|
|
|
|
import io.dataease.commons.utils.LogUtil;
|
|
|
|
|
import io.dataease.datasource.constants.DatasourceTypes;
|
|
|
|
|
import io.dataease.datasource.dto.MysqlConfigrationDTO;
|
|
|
|
|
import io.dataease.datasource.dto.DorisConfigration;
|
|
|
|
|
import io.dataease.datasource.dto.MysqlConfigration;
|
|
|
|
|
import io.dataease.datasource.provider.JdbcProvider;
|
|
|
|
|
import io.dataease.datasource.request.DatasourceRequest;
|
|
|
|
|
import io.dataease.dto.dataset.DataSetTaskLogDTO;
|
|
|
|
|
@ -25,6 +26,7 @@ import org.pentaho.di.job.Job;
|
|
|
|
|
import org.pentaho.di.job.JobExecutionConfiguration;
|
|
|
|
|
import org.pentaho.di.job.JobHopMeta;
|
|
|
|
|
import org.pentaho.di.job.JobMeta;
|
|
|
|
|
import org.pentaho.di.job.entries.shell.JobEntryShell;
|
|
|
|
|
import org.pentaho.di.job.entries.special.JobEntrySpecial;
|
|
|
|
|
import org.pentaho.di.job.entries.success.JobEntrySuccess;
|
|
|
|
|
import org.pentaho.di.job.entries.trans.JobEntryTrans;
|
|
|
|
|
@ -36,7 +38,8 @@ import org.pentaho.di.trans.TransMeta;
|
|
|
|
|
import org.pentaho.di.trans.step.StepMeta;
|
|
|
|
|
import org.pentaho.di.trans.steps.sql.ExecSQLMeta;
|
|
|
|
|
import org.pentaho.di.trans.steps.tableinput.TableInputMeta;
|
|
|
|
|
import org.pentaho.di.trans.steps.tableoutput.TableOutputMeta;
|
|
|
|
|
import org.pentaho.di.trans.steps.textfileoutput.TextFileField;
|
|
|
|
|
import org.pentaho.di.trans.steps.textfileoutput.TextFileOutputMeta;
|
|
|
|
|
import org.pentaho.di.trans.steps.userdefinedjavaclass.UserDefinedJavaClassDef;
|
|
|
|
|
import org.pentaho.di.trans.steps.userdefinedjavaclass.UserDefinedJavaClassMeta;
|
|
|
|
|
import org.pentaho.di.www.SlaveServerJobStatus;
|
|
|
|
|
@ -68,11 +71,9 @@ public class ExtractDataService {
|
|
|
|
|
|
|
|
|
|
private static String lastUpdateTime = "${__last_update_time__}";
|
|
|
|
|
private static String currentUpdateTime = "${__current_update_time__}";
|
|
|
|
|
private static String dataease_column_family = "dataease";
|
|
|
|
|
private static String separator = "|";
|
|
|
|
|
private static String extention = "txt";
|
|
|
|
|
private static String root_path = "/opt/dataease/data/kettle/";
|
|
|
|
|
private static String data_path = "/opt/dataease/data/db/";
|
|
|
|
|
private static String hbase_conf_file = "/opt/dataease/conf/hbase-site.xml";
|
|
|
|
|
private static String pentaho_mappings = "pentaho_mappings";
|
|
|
|
|
|
|
|
|
|
@Value("${carte.host:127.0.0.1}")
|
|
|
|
|
private String carte;
|
|
|
|
|
@ -85,8 +86,14 @@ public class ExtractDataService {
|
|
|
|
|
|
|
|
|
|
private static String creatTableSql = "CREATE TABLE IF NOT EXISTS TABLE_NAME" +
|
|
|
|
|
"Column_Fields" +
|
|
|
|
|
"UNIQUE KEY(dataease_uuid)\n" +
|
|
|
|
|
"DISTRIBUTED BY HASH(dataease_uuid) BUCKETS 10\n" +
|
|
|
|
|
"PROPERTIES(\"replication_num\" = \"1\");";
|
|
|
|
|
|
|
|
|
|
private static String shellScript = "curl --location-trusted -u %s:%s -H \"label:%s\" -H \"column_separator:%s\" -H \"columns:%s\" -H \"merge_type: %s\" -T %s -XPUT http://%s:%s/api/%s/%s/_stream_load\n" +
|
|
|
|
|
// "rm -rf %s\n" +
|
|
|
|
|
"return $?";
|
|
|
|
|
|
|
|
|
|
private String createDorisTablColumnSql( List<DatasetTableField> datasetTableFields){
|
|
|
|
|
String Column_Fields = "dataease_uuid varchar(50),";
|
|
|
|
|
for (DatasetTableField datasetTableField : datasetTableFields) {
|
|
|
|
|
@ -110,7 +117,7 @@ public class ExtractDataService {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
Column_Fields = Column_Fields.substring(0, Column_Fields.length() -1 );
|
|
|
|
|
Column_Fields = "(" + Column_Fields + ")" + "DISTRIBUTED BY HASH(dataease_uuid) BUCKETS 10\n";
|
|
|
|
|
Column_Fields = "(" + Column_Fields + ")\n";
|
|
|
|
|
return Column_Fields;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@ -120,6 +127,7 @@ public class ExtractDataService {
|
|
|
|
|
DatasourceRequest datasourceRequest = new DatasourceRequest();
|
|
|
|
|
datasourceRequest.setDatasource(dorisDatasource);
|
|
|
|
|
datasourceRequest.setQuery(creatTableSql.replace("TABLE_NAME", dorisTableName).replace("Column_Fields", dorisTablColumnSql));
|
|
|
|
|
System.out.println(datasourceRequest.getQuery());
|
|
|
|
|
jdbcProvider.exec(datasourceRequest);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@ -128,7 +136,7 @@ public class ExtractDataService {
|
|
|
|
|
JdbcProvider jdbcProvider = CommonBeanFactory.getBean(JdbcProvider.class);;
|
|
|
|
|
DatasourceRequest datasourceRequest = new DatasourceRequest();
|
|
|
|
|
datasourceRequest.setDatasource(dorisDatasource);
|
|
|
|
|
datasourceRequest.setQuery("ALTER TABLE DORIS_TABLE REPLACE WITH TABLE DORIS_TMP_TABLE PROPERTIES('swap' = 'false');".replace("DORIS_TABLE", dorisTableName).replace("DORIS_TMP_TABLE", DorisTableUtils.doristmpName(dorisTableName)));
|
|
|
|
|
datasourceRequest.setQuery("ALTER TABLE DORIS_TABLE REPLACE WITH TABLE DORIS_TMP_TABLE PROPERTIES('swap' = 'false');".replace("DORIS_TABLE", dorisTableName).replace("DORIS_TMP_TABLE", DorisTableUtils.dorisTmpName(dorisTableName)));
|
|
|
|
|
jdbcProvider.exec(datasourceRequest);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@ -139,6 +147,16 @@ public class ExtractDataService {
|
|
|
|
|
DatasetTable datasetTable = dataSetTableService.get(datasetTableId);
|
|
|
|
|
Datasource datasource = datasourceMapper.selectByPrimaryKey(datasetTable.getDataSourceId());
|
|
|
|
|
List<DatasetTableField> datasetTableFields = dataSetTableFieldsService.list(DatasetTableField.builder().tableId(datasetTable.getId()).build());
|
|
|
|
|
datasetTableFields.sort((o1, o2) -> {
|
|
|
|
|
if (o1.getOriginName() == null) {
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
if (o2.getOriginName() == null) {
|
|
|
|
|
return 1;
|
|
|
|
|
}
|
|
|
|
|
return o1.getOriginName().compareTo(o2.getOriginName());
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
String tableName = new Gson().fromJson(datasetTable.getInfo(), DataTableInfoDTO.class).getTable();
|
|
|
|
|
String dorisTablColumnSql = createDorisTablColumnSql(datasetTableFields);
|
|
|
|
|
switch (updateType) {
|
|
|
|
|
@ -147,9 +165,9 @@ public class ExtractDataService {
|
|
|
|
|
writeDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, taskId);
|
|
|
|
|
// TODO before: check doris table column type
|
|
|
|
|
createDorisTable(DorisTableUtils.dorisName(datasetTableId), dorisTablColumnSql);
|
|
|
|
|
createDorisTable(DorisTableUtils.doristmpName(DorisTableUtils.dorisName(datasetTableId)), dorisTablColumnSql);
|
|
|
|
|
createDorisTable(DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTableId)), dorisTablColumnSql);
|
|
|
|
|
generateTransFile("all_scope", datasetTable, datasource, tableName, datasetTableFields, null);
|
|
|
|
|
generateJobFile("all_scope", datasetTable);
|
|
|
|
|
generateJobFile("all_scope", datasetTable, datasetTableFields);
|
|
|
|
|
extractData(datasetTable, "all_scope");
|
|
|
|
|
replaceTable(DorisTableUtils.dorisName(datasetTableId));
|
|
|
|
|
datasetTableTaskLog.setStatus(JobStatus.Completed.name());
|
|
|
|
|
@ -177,7 +195,7 @@ public class ExtractDataService {
|
|
|
|
|
String sql = datasetTableIncrementalConfig.getIncrementalAdd().replace(lastUpdateTime, dataSetTaskLogDTOS.get(0).getStartTime().toString()
|
|
|
|
|
.replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString()));
|
|
|
|
|
generateTransFile("incremental_add", datasetTable, datasource, tableName, datasetTableFields, sql);
|
|
|
|
|
generateJobFile("incremental_add", datasetTable);
|
|
|
|
|
generateJobFile("incremental_add", datasetTable, datasetTableFields);
|
|
|
|
|
extractData(datasetTable, "incremental_add");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@ -186,7 +204,7 @@ public class ExtractDataService {
|
|
|
|
|
String sql = datasetTableIncrementalConfig.getIncrementalDelete().replace(lastUpdateTime, dataSetTaskLogDTOS.get(0).getStartTime().toString()
|
|
|
|
|
.replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString()));
|
|
|
|
|
generateTransFile("incremental_delete", datasetTable, datasource, tableName, datasetTableFields, sql);
|
|
|
|
|
generateJobFile("incremental_delete", datasetTable);
|
|
|
|
|
generateJobFile("incremental_delete", datasetTable, datasetTableFields);
|
|
|
|
|
extractData(datasetTable, "incremental_delete");
|
|
|
|
|
}
|
|
|
|
|
datasetTableTaskLog.setStatus(JobStatus.Completed.name());
|
|
|
|
|
@ -242,9 +260,14 @@ public class ExtractDataService {
|
|
|
|
|
jobExecutionConfiguration.setRepository(repository);
|
|
|
|
|
String lastCarteObjectId = Job.sendToSlaveServer(jobMeta, jobExecutionConfiguration, repository, null);
|
|
|
|
|
SlaveServerJobStatus jobStatus = null;
|
|
|
|
|
do {
|
|
|
|
|
boolean running = true;
|
|
|
|
|
while(running) {
|
|
|
|
|
jobStatus = remoteSlaveServer.getJobStatus(jobMeta.getName(), lastCarteObjectId, 0);
|
|
|
|
|
} while (jobStatus != null && jobStatus.isRunning());
|
|
|
|
|
running = jobStatus.isRunning();
|
|
|
|
|
if(!running)
|
|
|
|
|
break;
|
|
|
|
|
Thread.sleep(1000);
|
|
|
|
|
}
|
|
|
|
|
if (jobStatus.getStatusDescription().equals("Finished")) {
|
|
|
|
|
return;
|
|
|
|
|
} else {
|
|
|
|
|
@ -266,16 +289,25 @@ public class ExtractDataService {
|
|
|
|
|
return remoteSlaveServer;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void generateJobFile(String extractType, DatasetTable datasetTable) throws Exception {
|
|
|
|
|
private void generateJobFile(String extractType, DatasetTable datasetTable, List<DatasetTableField> datasetTableFields) throws Exception {
|
|
|
|
|
String dorisOutputTable = null;
|
|
|
|
|
String jobName = null;
|
|
|
|
|
String script = null;
|
|
|
|
|
Datasource dorisDatasource = (Datasource)CommonBeanFactory.getBean("DorisDatasource");
|
|
|
|
|
DorisConfigration dorisConfigration = new Gson().fromJson(dorisDatasource.getConfiguration(), DorisConfigration.class);
|
|
|
|
|
String columns = String.join(",", datasetTableFields.stream().map(DatasetTableField::getOriginName).collect(Collectors.toList())) + ",dataease_uuid";
|
|
|
|
|
|
|
|
|
|
switch (extractType) {
|
|
|
|
|
case "all_scope":
|
|
|
|
|
jobName = "job_" + datasetTable.getId();
|
|
|
|
|
script = String.format(shellScript, dorisConfigration.getUsername(), dorisConfigration.getPassword(), String.valueOf(System.currentTimeMillis()), separator, columns, "APPEND",root_path + dorisOutputTable + "." + extention, dorisConfigration.getHost(),dorisConfigration.getHttpPort(), dorisConfigration.getDataBase(), dorisOutputTable, root_path + dorisOutputTable + "." + extention);
|
|
|
|
|
break;
|
|
|
|
|
case "incremental_add":
|
|
|
|
|
jobName = "job_add_" + datasetTable.getId();
|
|
|
|
|
script = String.format(shellScript, dorisConfigration.getUsername(), dorisConfigration.getPassword(), String.valueOf(System.currentTimeMillis()), separator, columns, "APPEND", root_path + dorisOutputTable + "." + extention, dorisConfigration.getHost(),dorisConfigration.getHttpPort(), dorisConfigration.getDataBase(), dorisOutputTable, root_path + dorisOutputTable + "." + extention);
|
|
|
|
|
break;
|
|
|
|
|
case "incremental_delete":
|
|
|
|
|
script = String.format(shellScript, dorisConfigration.getUsername(), dorisConfigration.getPassword(), String.valueOf(System.currentTimeMillis()), separator, columns, "DELETE", root_path + dorisOutputTable + "." + extention, dorisConfigration.getHost(),dorisConfigration.getHttpPort(), dorisConfigration.getDataBase(), dorisOutputTable, root_path + dorisOutputTable + "." + extention);
|
|
|
|
|
jobName = "job_delete_" + datasetTable.getId();
|
|
|
|
|
break;
|
|
|
|
|
default:
|
|
|
|
|
@ -286,12 +318,15 @@ public class ExtractDataService {
|
|
|
|
|
switch (extractType) {
|
|
|
|
|
case "all_scope":
|
|
|
|
|
transName = "trans_" + datasetTable.getId();
|
|
|
|
|
dorisOutputTable = DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTable.getId()));
|
|
|
|
|
break;
|
|
|
|
|
case "incremental_add":
|
|
|
|
|
transName = "trans_add_" + datasetTable.getId();
|
|
|
|
|
dorisOutputTable = DorisTableUtils.dorisName(datasetTable.getId());
|
|
|
|
|
break;
|
|
|
|
|
case "incremental_delete":
|
|
|
|
|
transName = "trans_delete_" + datasetTable.getId();
|
|
|
|
|
dorisOutputTable = DorisTableUtils.dorisName(datasetTable.getId());
|
|
|
|
|
break;
|
|
|
|
|
default:
|
|
|
|
|
break;
|
|
|
|
|
@ -299,6 +334,8 @@ public class ExtractDataService {
|
|
|
|
|
|
|
|
|
|
JobMeta jobMeta = new JobMeta();
|
|
|
|
|
jobMeta.setName(jobName);
|
|
|
|
|
|
|
|
|
|
//start
|
|
|
|
|
JobEntrySpecial start = new JobEntrySpecial();
|
|
|
|
|
start.setName("START");
|
|
|
|
|
start.setStart(true);
|
|
|
|
|
@ -307,6 +344,7 @@ public class ExtractDataService {
|
|
|
|
|
startEntry.setLocation(100, 100);
|
|
|
|
|
jobMeta.addJobEntry(startEntry);
|
|
|
|
|
|
|
|
|
|
//trans
|
|
|
|
|
JobEntryTrans transrans = new JobEntryTrans();
|
|
|
|
|
transrans.setTransname(transName);
|
|
|
|
|
transrans.setName("Transformation");
|
|
|
|
|
@ -317,41 +355,52 @@ public class ExtractDataService {
|
|
|
|
|
|
|
|
|
|
jobMeta.addJobHop(new JobHopMeta(startEntry, transEntry));
|
|
|
|
|
|
|
|
|
|
//exec shell
|
|
|
|
|
JobEntryShell shell = new JobEntryShell();
|
|
|
|
|
shell.setScript(script);
|
|
|
|
|
shell.insertScript = true;
|
|
|
|
|
shell.setName("shell");
|
|
|
|
|
JobEntryCopy shellEntry = new JobEntryCopy(shell);
|
|
|
|
|
shellEntry.setDrawn(true);
|
|
|
|
|
shellEntry.setLocation(500, 100);
|
|
|
|
|
jobMeta.addJobEntry(shellEntry);
|
|
|
|
|
|
|
|
|
|
JobHopMeta transHop = new JobHopMeta(transEntry, shellEntry);
|
|
|
|
|
transHop.setEvaluation(true);
|
|
|
|
|
jobMeta.addJobHop(transHop);
|
|
|
|
|
|
|
|
|
|
//success
|
|
|
|
|
JobEntrySuccess success = new JobEntrySuccess();
|
|
|
|
|
success.setName("Success");
|
|
|
|
|
JobEntryCopy successEntry = new JobEntryCopy(success);
|
|
|
|
|
successEntry.setDrawn(true);
|
|
|
|
|
successEntry.setLocation(500, 100);
|
|
|
|
|
successEntry.setLocation(700, 100);
|
|
|
|
|
jobMeta.addJobEntry(successEntry);
|
|
|
|
|
|
|
|
|
|
JobHopMeta greenHop = new JobHopMeta(transEntry, successEntry);
|
|
|
|
|
|
|
|
|
|
JobHopMeta greenHop = new JobHopMeta(shellEntry, successEntry);
|
|
|
|
|
greenHop.setEvaluation(true);
|
|
|
|
|
jobMeta.addJobHop(greenHop);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if(!extractType.equals("incremental_delete")){
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
String jobXml = jobMeta.getXML();
|
|
|
|
|
File file = new File(root_path + jobName + ".kjb");
|
|
|
|
|
FileUtils.writeStringToFile(file, jobXml, "UTF-8");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void generateTransFile(String extractType, DatasetTable datasetTable, Datasource datasource, String table, List<DatasetTableField> datasetTableFields, String selectSQL) throws Exception {
|
|
|
|
|
datasetTableFields.sort((o1, o2) -> {
|
|
|
|
|
if (o1.getOriginName() == null) {
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
if (o2.getOriginName() == null) {
|
|
|
|
|
return 1;
|
|
|
|
|
}
|
|
|
|
|
return o1.getOriginName().compareTo(o2.getOriginName());
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
TransMeta transMeta = new TransMeta();
|
|
|
|
|
String dorisOutputTable = DorisTableUtils.doristmpName(DorisTableUtils.dorisName(datasetTable.getId()));
|
|
|
|
|
String dorisOutputTable = null;
|
|
|
|
|
DatasourceTypes datasourceType = DatasourceTypes.valueOf(datasource.getType());
|
|
|
|
|
DatabaseMeta dataMeta = null;
|
|
|
|
|
switch (datasourceType) {
|
|
|
|
|
case mysql:
|
|
|
|
|
MysqlConfigrationDTO mysqlConfigrationDTO = new Gson().fromJson(datasource.getConfiguration(), MysqlConfigrationDTO.class);
|
|
|
|
|
dataMeta = new DatabaseMeta("db", "MYSQL", "Native", mysqlConfigrationDTO.getHost(), mysqlConfigrationDTO.getDataBase(), mysqlConfigrationDTO.getPort().toString(), mysqlConfigrationDTO.getUsername(), mysqlConfigrationDTO.getPassword());
|
|
|
|
|
MysqlConfigration mysqlConfigration = new Gson().fromJson(datasource.getConfiguration(), MysqlConfigration.class);
|
|
|
|
|
dataMeta = new DatabaseMeta("db", "MYSQL", "Native", mysqlConfigration.getHost(), mysqlConfigration.getDataBase(), mysqlConfigration.getPort().toString(), mysqlConfigration.getUsername(), mysqlConfigration.getPassword());
|
|
|
|
|
transMeta.addDatabase(dataMeta);
|
|
|
|
|
break;
|
|
|
|
|
default:
|
|
|
|
|
@ -359,7 +408,7 @@ public class ExtractDataService {
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
Datasource dorisDatasource = (Datasource)CommonBeanFactory.getBean("DorisDatasource");
|
|
|
|
|
MysqlConfigrationDTO dorisConfigration = new Gson().fromJson(dorisDatasource.getConfiguration(), MysqlConfigrationDTO.class);
|
|
|
|
|
MysqlConfigration dorisConfigration = new Gson().fromJson(dorisDatasource.getConfiguration(), MysqlConfigration.class);
|
|
|
|
|
DatabaseMeta dorisDataMeta = new DatabaseMeta("doris", "MYSQL", "Native", dorisConfigration.getHost(), dorisConfigration.getDataBase(), dorisConfigration.getPort().toString(), dorisConfigration.getUsername(), dorisConfigration.getPassword());
|
|
|
|
|
transMeta.addDatabase(dorisDataMeta);
|
|
|
|
|
StepMeta inputStep = null;
|
|
|
|
|
@ -368,56 +417,39 @@ public class ExtractDataService {
|
|
|
|
|
TransHopMeta hi1 = null;
|
|
|
|
|
TransHopMeta hi2 = null;
|
|
|
|
|
String transName = null;
|
|
|
|
|
|
|
|
|
|
switch (extractType) {
|
|
|
|
|
case "all_scope":
|
|
|
|
|
transName = "trans_" + datasetTable.getId();
|
|
|
|
|
dorisOutputTable = DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTable.getId()));
|
|
|
|
|
selectSQL = dataSetTableService.createQuerySQL(datasource.getType(), table, datasetTableFields.stream().map(DatasetTableField::getOriginName).toArray(String[]::new));
|
|
|
|
|
transMeta.setName(transName);
|
|
|
|
|
inputStep = inputStep(transMeta, selectSQL);
|
|
|
|
|
udjcStep = udjc(datasetTableFields);
|
|
|
|
|
outputStep = outputStep(transMeta, dorisOutputTable);
|
|
|
|
|
hi1 = new TransHopMeta(inputStep, udjcStep);
|
|
|
|
|
hi2 = new TransHopMeta(udjcStep, outputStep);
|
|
|
|
|
transMeta.addTransHop(hi1);
|
|
|
|
|
transMeta.addTransHop(hi2);
|
|
|
|
|
transMeta.addStep(inputStep);
|
|
|
|
|
transMeta.addStep(udjcStep);
|
|
|
|
|
transMeta.addStep(outputStep);
|
|
|
|
|
break;
|
|
|
|
|
case "incremental_add":
|
|
|
|
|
transName = "trans_add_" + datasetTable.getId();
|
|
|
|
|
dorisOutputTable = DorisTableUtils.dorisName(datasetTable.getId());
|
|
|
|
|
transMeta.setName(transName);
|
|
|
|
|
inputStep = inputStep(transMeta, selectSQL);
|
|
|
|
|
udjcStep = udjc(datasetTableFields);
|
|
|
|
|
outputStep = outputStep(transMeta, dorisOutputTable);
|
|
|
|
|
hi1 = new TransHopMeta(inputStep, udjcStep);
|
|
|
|
|
hi2 = new TransHopMeta(udjcStep, outputStep);
|
|
|
|
|
transMeta.addTransHop(hi1);
|
|
|
|
|
transMeta.addTransHop(hi2);
|
|
|
|
|
transMeta.addStep(inputStep);
|
|
|
|
|
transMeta.addStep(udjcStep);
|
|
|
|
|
transMeta.addStep(outputStep);
|
|
|
|
|
break;
|
|
|
|
|
case "incremental_delete":
|
|
|
|
|
dorisOutputTable = DorisTableUtils.dorisName(datasetTable.getId());
|
|
|
|
|
dorisOutputTable = DorisTableUtils.dorisDeleteName(DorisTableUtils.dorisName(datasetTable.getId()));
|
|
|
|
|
transName = "trans_delete_" + datasetTable.getId();
|
|
|
|
|
transMeta.setName(transName);
|
|
|
|
|
inputStep = inputStep(transMeta, selectSQL);
|
|
|
|
|
udjcStep = udjc(datasetTableFields);
|
|
|
|
|
outputStep = execSqlStep(transMeta, dorisOutputTable, datasetTableFields);
|
|
|
|
|
hi1 = new TransHopMeta(inputStep, udjcStep);
|
|
|
|
|
hi2 = new TransHopMeta(udjcStep, outputStep);
|
|
|
|
|
transMeta.addTransHop(hi1);
|
|
|
|
|
transMeta.addTransHop(hi2);
|
|
|
|
|
transMeta.addStep(inputStep);
|
|
|
|
|
transMeta.addStep(udjcStep);
|
|
|
|
|
transMeta.addStep(outputStep);
|
|
|
|
|
break;
|
|
|
|
|
default:
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
inputStep = inputStep(transMeta, selectSQL);
|
|
|
|
|
udjcStep = udjc(datasetTableFields);
|
|
|
|
|
outputStep = outputStep(dorisOutputTable);
|
|
|
|
|
hi1 = new TransHopMeta(inputStep, udjcStep);
|
|
|
|
|
hi2 = new TransHopMeta(udjcStep, outputStep);
|
|
|
|
|
transMeta.addTransHop(hi1);
|
|
|
|
|
transMeta.addTransHop(hi2);
|
|
|
|
|
transMeta.addStep(inputStep);
|
|
|
|
|
transMeta.addStep(udjcStep);
|
|
|
|
|
transMeta.addStep(outputStep);
|
|
|
|
|
|
|
|
|
|
String transXml = transMeta.getXML();
|
|
|
|
|
File file = new File(root_path + transName + ".ktr");
|
|
|
|
|
FileUtils.writeStringToFile(file, transXml, "UTF-8");
|
|
|
|
|
@ -434,14 +466,15 @@ public class ExtractDataService {
|
|
|
|
|
return fromStep;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private StepMeta outputStep(TransMeta transMeta, String dorisOutputTable){
|
|
|
|
|
TableOutputMeta tableOutputMeta = new TableOutputMeta();
|
|
|
|
|
DatabaseMeta dorisDatabaseMeta = transMeta.findDatabase("doris");
|
|
|
|
|
tableOutputMeta.setDatabaseMeta(dorisDatabaseMeta);
|
|
|
|
|
tableOutputMeta.setTableName(dorisOutputTable);
|
|
|
|
|
tableOutputMeta.setCommitSize(10000);
|
|
|
|
|
tableOutputMeta.setUseBatchUpdate(true);
|
|
|
|
|
StepMeta outputStep = new StepMeta("TableOutput", "TableOutput", tableOutputMeta);
|
|
|
|
|
private StepMeta outputStep(String dorisOutputTable){
|
|
|
|
|
TextFileOutputMeta textFileOutputMeta = new TextFileOutputMeta();
|
|
|
|
|
textFileOutputMeta.setEncoding("UTF-8");
|
|
|
|
|
textFileOutputMeta.setHeaderEnabled(false);
|
|
|
|
|
textFileOutputMeta.setFilename(root_path + dorisOutputTable);
|
|
|
|
|
textFileOutputMeta.setSeparator(separator);
|
|
|
|
|
textFileOutputMeta.setExtension(extention);
|
|
|
|
|
textFileOutputMeta.setOutputFields(new TextFileField[0]);
|
|
|
|
|
StepMeta outputStep = new StepMeta("TextFileOutput", "TextFileOutput", textFileOutputMeta);
|
|
|
|
|
outputStep.setLocation(600, 100);
|
|
|
|
|
outputStep.setDraw(true);
|
|
|
|
|
return outputStep;
|
|
|
|
|
|