diff --git a/backend/pom.xml b/backend/pom.xml
index 38822a30a2..6de71fa692 100644
--- a/backend/pom.xml
+++ b/backend/pom.xml
@@ -385,6 +385,42 @@
3.0.8
+
+ pentaho-kettle
+ kettle-core
+ 8.3.0.18-1084
+
+
+ pentaho-kettle
+ kettle-engine
+ 8.3.0.18-1084
+
+
+ pentaho
+ metastore
+ 8.3.0.18-1084
+
+
+ pentaho
+ pentaho-big-data-kettle-plugins-hbase-meta
+ 8.3.0.18-1084
+
+
+ pentaho
+ pentaho-big-data-kettle-plugins-hbase
+ 8.3.0.18-1084
+
+
+ pentaho
+ pentaho-big-data-impl-cluster
+ 8.3.0.18-1084
+
+
+ org.pentaho.di.plugins
+ pdi-engine-configuration-impl
+ 8.3.0.7-683
+
+
@@ -521,4 +557,20 @@
+
+
+ pentaho-public
+ Pentaho Public
+ http://nexus.pentaho.org/content/groups/omni
+
+ true
+ always
+
+
+ true
+ always
+
+
+
+
diff --git a/backend/src/main/java/io/dataease/config/CommonConfig.java b/backend/src/main/java/io/dataease/config/CommonConfig.java
index e3a73048ed..94e5fb0050 100644
--- a/backend/src/main/java/io/dataease/config/CommonConfig.java
+++ b/backend/src/main/java/io/dataease/config/CommonConfig.java
@@ -4,6 +4,9 @@ import com.fit2cloud.autoconfigure.QuartzAutoConfiguration;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
+import org.pentaho.di.core.KettleEnvironment;
+import org.pentaho.di.repository.filerep.KettleFileRepository;
+import org.pentaho.di.repository.filerep.KettleFileRepositoryMeta;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
@@ -18,7 +21,7 @@ public class CommonConfig {
@Resource
private Environment env; // 保存了配置文件的信息
-
+ private static String root_path = "/opt/dataease/data/kettle/";
@Bean
@ConditionalOnMissingBean
@@ -51,4 +54,15 @@ public class CommonConfig {
sqlContext.setConf("spark.default.parallelism", env.getProperty("spark.default.parallelism", "1"));
return sqlContext;
}
+
+ @Bean
+ @ConditionalOnMissingBean
+ public KettleFileRepository kettleFileRepository()throws Exception{
+ KettleEnvironment.init();
+ KettleFileRepository repository = new KettleFileRepository();
+ KettleFileRepositoryMeta kettleDatabaseMeta = new KettleFileRepositoryMeta("KettleFileRepository", "repo",
+ "dataease kettle repo", root_path);
+ repository.init(kettleDatabaseMeta);
+ return repository;
+ }
}
diff --git a/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java b/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java
index 7078f03f5f..55bbf4d263 100644
--- a/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java
+++ b/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java
@@ -1,28 +1,89 @@
package io.dataease.service.dataset;
import com.google.gson.Gson;
+import com.sun.org.apache.bcel.internal.generic.SWITCH;
import io.dataease.base.domain.*;
+import io.dataease.base.mapper.DatasourceMapper;
import io.dataease.commons.constants.JobStatus;
import io.dataease.commons.constants.ScheduleType;
import io.dataease.commons.constants.UpdateType;
import io.dataease.commons.utils.CommonBeanFactory;
import io.dataease.commons.utils.LogUtil;
+import io.dataease.datasource.constants.DatasourceTypes;
+import io.dataease.datasource.dto.MysqlConfigrationDTO;
import io.dataease.dto.dataset.DataSetTaskLogDTO;
import io.dataease.dto.dataset.DataTableInfoDTO;
import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
+import org.pentaho.big.data.api.cluster.NamedCluster;
+import org.pentaho.big.data.api.cluster.NamedClusterService;
+import org.pentaho.big.data.api.cluster.service.locator.NamedClusterServiceLocator;
+import org.pentaho.big.data.api.cluster.service.locator.impl.NamedClusterServiceLocatorImpl;
+import org.pentaho.big.data.api.initializer.ClusterInitializer;
+import org.pentaho.big.data.api.initializer.ClusterInitializerProvider;
+import org.pentaho.big.data.api.initializer.impl.ClusterInitializerImpl;
+import org.pentaho.big.data.impl.cluster.NamedClusterImpl;
+import org.pentaho.big.data.impl.cluster.NamedClusterManager;
+import org.pentaho.big.data.kettle.plugins.hbase.MappingDefinition;
+import org.pentaho.big.data.kettle.plugins.hbase.output.HBaseOutputMeta;
+import org.pentaho.di.cluster.SlaveServer;
+import org.pentaho.di.core.KettleEnvironment;
+import org.pentaho.di.core.database.DatabaseMeta;
+import org.pentaho.di.core.plugins.PluginRegistry;
+import org.pentaho.di.core.plugins.StepPluginType;
+import org.pentaho.di.core.util.EnvUtil;
+import org.pentaho.di.engine.configuration.impl.pentaho.DefaultRunConfiguration;
+import org.pentaho.di.job.Job;
+import org.pentaho.di.job.JobExecutionConfiguration;
+import org.pentaho.di.job.JobHopMeta;
+import org.pentaho.di.job.JobMeta;
+import org.pentaho.di.job.entries.special.JobEntrySpecial;
+import org.pentaho.di.job.entries.success.JobEntrySuccess;
+import org.pentaho.di.job.entries.trans.JobEntryTrans;
+import org.pentaho.di.job.entries.writetolog.JobEntryWriteToLog;
+import org.pentaho.di.job.entry.JobEntryCopy;
+import org.pentaho.di.repository.RepositoryDirectoryInterface;
+import org.pentaho.di.repository.filerep.KettleFileRepository;
+import org.pentaho.di.repository.filerep.KettleFileRepositoryMeta;
+import org.pentaho.di.trans.TransConfiguration;
+import org.pentaho.di.trans.TransExecutionConfiguration;
+import org.pentaho.di.trans.TransHopMeta;
+import org.pentaho.di.trans.TransMeta;
+import org.pentaho.di.trans.step.StepMeta;
+import org.pentaho.di.trans.steps.tableinput.TableInputMeta;
+import org.pentaho.di.trans.steps.userdefinedjavaclass.InfoStepDefinition;
+import org.pentaho.di.trans.steps.userdefinedjavaclass.UserDefinedJavaClassDef;
+import org.pentaho.di.trans.steps.userdefinedjavaclass.UserDefinedJavaClassMeta;
+import org.pentaho.di.www.SlaveServerJobStatus;
+import org.pentaho.runtime.test.RuntimeTest;
+import org.pentaho.runtime.test.RuntimeTester;
+import org.pentaho.runtime.test.action.RuntimeTestActionHandler;
+import org.pentaho.runtime.test.action.RuntimeTestActionService;
+import org.pentaho.runtime.test.action.impl.RuntimeTestActionServiceImpl;
+import org.pentaho.runtime.test.impl.RuntimeTesterImpl;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
+import org.pentaho.di.core.row.ValueMetaInterface;
+import scala.annotation.meta.field;
import javax.annotation.Resource;
+import javax.sound.sampled.Line;
+import java.io.File;
import java.security.MessageDigest;
+import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
-import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import static org.mockito.Mockito.mock;
+
@Service
public class ExtractDataService {
@@ -34,12 +95,31 @@ public class ExtractDataService {
private DataSetTableTaskLogService dataSetTableTaskLogService;
@Resource
private DataSetTableTaskService dataSetTableTaskService;
- private Long pageSize = 10000l;
+ @Resource
+ private DatasourceMapper datasourceMapper;
private static ExecutorService pool = Executors.newScheduledThreadPool(50); //设置连接池
private Connection connection;
+
private static String lastUpdateTime = "${__last_update_time__}";
private static String currentUpdateTime = "${__current_update_time__}";
- private static String column_family = "dataease";
+ private static String dataease_column_family = "dataease";
+ private static String root_path = "/opt/dataease/data/kettle/";
+ private static String hbase_conf_file = "/opt/dataease/conf/hbase-site.xml";
+ private static String pentaho_mappings = "pentaho_mappings";
+
+ @Value("${carte.host:127.0.0.1}")
+ private String carte;
+ @Value("${carte.port:8080}")
+ private String port;
+ @Value("${carte.user:cluster}")
+ private String user;
+ @Value("${carte.passwd:cluster}")
+ private String passwd;
+ @Value("${hbase.zookeeper.quorum:zookeeper}")
+ private String zkHost;
+ @Value("${hbase.zookeeper.property.clientPort:2181}")
+ private String zkPort;
+
public void extractData(String datasetTableId, String taskId, String type) {
DatasetTableTaskLog datasetTableTaskLog = new DatasetTableTaskLog();
@@ -47,24 +127,41 @@ public class ExtractDataService {
try {
Admin admin = getConnection().getAdmin();
DatasetTable datasetTable = dataSetTableService.get(datasetTableId);
+ Datasource datasource = datasourceMapper.selectByPrimaryKey(datasetTable.getDataSourceId());
List datasetTableFields = dataSetTableFieldsService.list(DatasetTableField.builder().tableId(datasetTable.getId()).build());
String table = new Gson().fromJson(datasetTable.getInfo(), DataTableInfoDTO.class).getTable();
- TableName tableName = TableName.valueOf(table + "-" + datasetTable.getDataSourceId());
+ TableName hbaseTable = TableName.valueOf(datasetTableId);
switch (updateType){
// 全量更新
case all_scope:
- writeDatasetTableTaskLog(datasetTableTaskLog,datasetTableId, taskId);
- if(!admin.tableExists(tableName)){
- creatHaseTable(tableName, admin);
+ writeDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, taskId);
+
+ //check pentaho_mappings table
+ TableName pentaho_mappings = TableName.valueOf(this.pentaho_mappings);
+ if(!admin.tableExists(pentaho_mappings)){
+ creatHaseTable(pentaho_mappings, admin, Arrays.asList("columns","key"));
}
- extractAllData(admin, tableName, table, datasetTable, datasetTableFields);
+
+ //check pentaho files
+ if(!isExitFile("job_" + datasetTableId + ".kjb") || !isExitFile("trans_" + datasetTableId + ".ktr")){
+ generateTransFile("all_scope", datasetTable, datasource, table, datasetTableFields, null);
+ generateJobFile("all_scope", datasetTable);
+ }
+
+ if(!admin.tableExists(hbaseTable)){
+ creatHaseTable(hbaseTable, admin, Arrays.asList(dataease_column_family));
+ }
+ admin.disableTable(hbaseTable);
+ admin.truncateTable(hbaseTable, true);
+
+ extractData(datasetTable, "all_scope");
datasetTableTaskLog.setStatus(JobStatus.Completed.name());
datasetTableTaskLog.setEndTime(System.currentTimeMillis());
dataSetTableTaskLogService.save(datasetTableTaskLog);
break;
case add_scope:
// 增量更新
- if(!admin.tableExists(tableName)){
+ if(!admin.tableExists(hbaseTable)){
LogUtil.error("TableName error, dataaset: " + datasetTableId);
return;
}
@@ -82,17 +179,28 @@ public class ExtractDataService {
writeDatasetTableTaskLog(datasetTableTaskLog,datasetTableId, taskId);
// 增量添加
- if(StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd())){
+ if(StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd().replace(" ", ""))){
+ System.out.println("datasetTableIncrementalConfig.getIncrementalAdd(): " + datasetTableIncrementalConfig.getIncrementalAdd());
String sql = datasetTableIncrementalConfig.getIncrementalAdd().replace(lastUpdateTime, dataSetTaskLogDTOS.get(0).getStartTime().toString()
.replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString()));
- extractIncrementalData(tableName,table,datasetTable, datasetTableFields, sql, "add");
+
+ if(!isExitFile("job_add_" + datasetTableId + ".kjb") || !isExitFile("trans_add_" + datasetTableId + ".ktr")){
+ generateTransFile("incremental_add", datasetTable, datasource, table, datasetTableFields, sql);
+ generateJobFile("incremental_add", datasetTable);
+ }
+
+ extractData(datasetTable, "incremental_add");
}
// 增量删除
if( StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalDelete())){
String sql = datasetTableIncrementalConfig.getIncrementalDelete().replace(lastUpdateTime, dataSetTaskLogDTOS.get(0).getStartTime().toString()
.replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString()));
- extractIncrementalData(tableName,table,datasetTable, datasetTableFields, sql, "delete");
+ if(!isExitFile("job_delete_" + datasetTableId + ".kjb") || !isExitFile("trans_delete_" + datasetTableId + ".ktr")){
+ generateTransFile("incremental_delete", datasetTable, datasource, table, datasetTableFields, sql);
+ generateJobFile("incremental_delete", datasetTable);
+ }
+ extractData(datasetTable, "incremental_delete");
}
datasetTableTaskLog.setStatus(JobStatus.Completed.name());
@@ -125,57 +233,50 @@ public class ExtractDataService {
dataSetTableTaskLogService.save(datasetTableTaskLog);
}
- private void creatHaseTable(TableName tableName, Admin admin)throws Exception{
+ private void creatHaseTable(TableName tableName, Admin admin, List columnFamily)throws Exception{
TableDescriptorBuilder descBuilder = TableDescriptorBuilder.newBuilder(tableName);
- ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(column_family);
- descBuilder.setColumnFamily(hcd);
+ Collection families = new ArrayList<>();
+ for (String s : columnFamily) {
+ ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(s);
+ families.add(hcd);
+ }
+ descBuilder.setColumnFamilies(families);
TableDescriptor desc = descBuilder.build();
admin.createTable(desc);
}
- private void extractAllData(Admin admin, TableName tableName, String table, DatasetTable datasetTable, List datasetTableFields)throws Exception{
- admin.disableTable(tableName);
- admin.truncateTable(tableName, true);
-
- Table tab = getConnection().getTable(tableName);
- Long total = dataSetTableService.getDataSetTotalData(datasetTable.getDataSourceId(), table);
- Long pageCount = total % pageSize == 0 ? total / pageSize : (total / pageSize) + 1;
-
- for (Long pageIndex = 1l; pageIndex <= pageCount; pageIndex++) {
- List data = dataSetTableService.getDataSetPageData(datasetTable.getDataSourceId(), table, datasetTableFields, pageIndex, pageSize);
- insertDataToHbaseTable(data,datasetTableFields,tab);
+ private void extractData(DatasetTable datasetTable, String extractType)throws Exception{
+ KettleFileRepository repository = CommonBeanFactory.getBean(KettleFileRepository.class);
+ RepositoryDirectoryInterface repositoryDirectoryInterface = repository.loadRepositoryDirectoryTree();
+ JobMeta jobMeta = null;
+ switch (extractType){
+ case "all_scope":
+ jobMeta = repository.loadJob("job_" + datasetTable.getId(), repositoryDirectoryInterface, null, null);
+ break;
+ case "incremental_add":
+ jobMeta = repository.loadJob("job_add_" + datasetTable.getId(), repositoryDirectoryInterface, null, null);
+ break;
+ case "incremental_delete":
+ jobMeta = repository.loadJob("job_delete_" + datasetTable.getId(), repositoryDirectoryInterface, null, null);
+ break;
+ default:
+ break;
}
- }
- private void extractIncrementalData(TableName tableName, String table, DatasetTable datasetTable, List datasetTableFields, String sql, String type)throws Exception{
- Table tab = getConnection().getTable(tableName);
- List data = dataSetTableService.getDataSetDataBySql(datasetTable.getDataSourceId(), table, sql);
- if (type.equalsIgnoreCase("add")){
- insertDataToHbaseTable(data,datasetTableFields,tab);
- }else {
- deleteDataFromHbaseTable(data,datasetTableFields,tab);
- }
- }
-
- private void insertDataToHbaseTable(List data, List datasetTableFields, Table tab)throws Exception{
- for (String[] d : data) {
- Put put = new Put(md5(generateStr(datasetTableFields.size(), d)).getBytes());
- for(int i=0;i data, List datasetTableFields, Table tab)throws Exception{
- for (String[] d : data) {
- Delete delete = new Delete(md5(generateStr(datasetTableFields.size(), d)).getBytes());
- tab.delete(delete);
- }
+ SlaveServer remoteSlaveServer = getSlaveServer();
+ JobExecutionConfiguration jobExecutionConfiguration = new JobExecutionConfiguration();
+ jobExecutionConfiguration.setRemoteServer(remoteSlaveServer);
+ jobExecutionConfiguration.setRepository(repository);
+ String lastCarteObjectId = Job.sendToSlaveServer(jobMeta, jobExecutionConfiguration, repository, null);
+ SlaveServerJobStatus jobStatus = null;
+ do {
+ jobStatus = remoteSlaveServer.getJobStatus(jobMeta.getName(), lastCarteObjectId, 0);
+ } while (jobStatus != null && jobStatus.isRunning());
+ if(jobStatus.getStatusDescription().equals("Finished")){
+ return;
+ }else {
+ throw new Exception(jobStatus.getLoggingString());
+ }
}
private synchronized Connection getConnection() throws Exception{
@@ -186,42 +287,309 @@ public class ExtractDataService {
return connection;
}
-
- private static final char[] HEX_DIGITS = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};
- private static final String UTF_8 = "UTF-8";
-
- public static String md5(String src) {
- return md5(src, UTF_8);
+ private boolean isExitFile(String fileName){
+ File file=new File(root_path + fileName);
+ return file.exists();
}
- public static String md5(String src, String charset) {
- try {
- byte[] strTemp = io.micrometer.core.instrument.util.StringUtils.isEmpty(charset) ? src.getBytes() : src.getBytes(charset);
- MessageDigest mdTemp = MessageDigest.getInstance("MD5");
- mdTemp.update(strTemp);
+ private SlaveServer getSlaveServer(){
+ SlaveServer remoteSlaveServer = new SlaveServer();
+ remoteSlaveServer.setHostname(carte);// 设置远程IP
+ remoteSlaveServer.setPort(port);// 端口
+ remoteSlaveServer.setUsername(user);
+ remoteSlaveServer.setPassword(passwd);
+ return remoteSlaveServer;
+ }
- byte[] md = mdTemp.digest();
- int j = md.length;
- char[] str = new char[j * 2];
- int k = 0;
+ private void generateJobFile(String extractType, DatasetTable datasetTable) throws Exception{
+ String jobName = null;
+ switch (extractType) {
+ case "all_scope":
+ jobName = "job_" + datasetTable.getId();
+ break;
+ case "incremental_add":
+ jobName = "job_add_" + datasetTable.getId();
+ break;
+ case "incremental_delete":
+ jobName = "job_delete_" + datasetTable.getId();
+ break;
+ default:
+ break;
+ }
- for (byte byte0 : md) {
- str[k++] = HEX_DIGITS[byte0 >>> 4 & 0xf];
- str[k++] = HEX_DIGITS[byte0 & 0xf];
- }
+ String transName = null;
+ switch (extractType) {
+ case "all_scope":
+ transName = "trans_" + datasetTable.getId();
+ break;
+ case "incremental_add":
+ transName = "trans_add_" + datasetTable.getId();
+ break;
+ case "incremental_delete":
+ transName = "trans_delete_" + datasetTable.getId();
+ break;
+ default:
+ break;
+ }
- return new String(str);
- } catch (Exception e) {
- throw new RuntimeException("MD5 encrypt error:", e);
+ JobMeta jobMeta = new JobMeta();
+ jobMeta.setName(jobName);
+ JobEntrySpecial start = new JobEntrySpecial();
+ start.setName("START");
+ start.setStart(true);
+ JobEntryCopy startEntry = new JobEntryCopy(start);
+ startEntry.setDrawn(true);
+ startEntry.setLocation(100, 100);
+ jobMeta.addJobEntry(startEntry);
+
+ JobEntryTrans transrans = new JobEntryTrans();
+ transrans.setTransname(transName);
+ transrans.setName("Transformation");
+ JobEntryCopy transEntry = new JobEntryCopy(transrans);
+ transEntry.setDrawn(true);
+ transEntry.setLocation(300, 100);
+ jobMeta.addJobEntry(transEntry);
+
+ jobMeta.addJobHop(new JobHopMeta(startEntry, transEntry));
+
+ JobEntrySuccess success = new JobEntrySuccess();
+ success.setName("Success");
+ JobEntryCopy successEntry = new JobEntryCopy(success);
+ successEntry.setDrawn(true);
+ successEntry.setLocation(500, 100);
+ jobMeta.addJobEntry(successEntry);
+
+ JobHopMeta greenHop = new JobHopMeta(transEntry, successEntry);
+ greenHop.setEvaluation(true);
+ jobMeta.addJobHop(greenHop);
+
+ String jobXml = jobMeta.getXML();
+ File file = new File( root_path + jobName + ".kjb");
+ FileUtils.writeStringToFile(file, jobXml, "UTF-8");
+ }
+
+ private void generateTransFile(String extractType, DatasetTable datasetTable, Datasource datasource, String table, List datasetTableFields, String selectSQL) throws Exception{
+ TransMeta transMeta = new TransMeta();
+ String transName = null;
+ switch (extractType) {
+ case "all_scope":
+ transName = "trans_" + datasetTable.getId();
+ selectSQL = dataSetTableService.createQuerySQL(datasource.getType(), table, datasetTableFields.stream().map(DatasetTableField::getOriginName).toArray(String[]::new));
+ break;
+ case "incremental_add":
+ transName = "trans_add_" + datasetTable.getId();
+ break;
+ case "incremental_delete":
+ transName = "trans_delete_" + datasetTable.getId();
+ break;
+ default:
+ break;
+ }
+
+ transMeta.setName(transName);
+ DatasourceTypes datasourceType = DatasourceTypes.valueOf(datasource.getType());
+ DatabaseMeta dataMeta = null;
+ switch (datasourceType) {
+ case mysql:
+ MysqlConfigrationDTO mysqlConfigrationDTO = new Gson().fromJson(datasource.getConfiguration(), MysqlConfigrationDTO.class);
+ dataMeta = new DatabaseMeta("db", "MYSQL", "Native", mysqlConfigrationDTO.getHost(), mysqlConfigrationDTO.getDataBase(), mysqlConfigrationDTO.getPort().toString(), mysqlConfigrationDTO.getUsername(), mysqlConfigrationDTO.getPassword());
+ transMeta.addDatabase(dataMeta);
+ break;
+ default:
+ break;
+
+ }
+ //registry是给每个步骤生成一个标识id
+ PluginRegistry registry = PluginRegistry.getInstance();
+ //第一个表输入步骤(TableInputMeta)
+ TableInputMeta tableInput = new TableInputMeta();
+
+ //给表输入添加一个DatabaseMeta连接数据库
+ DatabaseMeta database_bjdt = transMeta.findDatabase("db");
+ tableInput.setDatabaseMeta(database_bjdt);
+ tableInput.setSQL(selectSQL);
+ //添加TableInputMeta到转换中
+ String tableInputPluginId = registry.getPluginId(StepPluginType.class, tableInput);
+ StepMeta fromStep = new StepMeta(tableInputPluginId, "Data Input", tableInput);
+ //给步骤添加在spoon工具中的显示位置
+ fromStep.setDraw(true);
+ fromStep.setLocation(100, 100);
+ transMeta.addStep(fromStep);
+
+ //第二个 (User defined Java class)
+ UserDefinedJavaClassMeta userDefinedJavaClassMeta = new UserDefinedJavaClassMeta();
+ List fields = new ArrayList<>();
+ UserDefinedJavaClassMeta.FieldInfo fieldInfo = new UserDefinedJavaClassMeta.FieldInfo("uuid", ValueMetaInterface.TYPE_STRING, -1, -1);
+ fields.add(fieldInfo);
+ userDefinedJavaClassMeta.setFieldInfo(fields);
+ List definitions = new ArrayList();
+ UserDefinedJavaClassDef userDefinedJavaClassDef = new UserDefinedJavaClassDef(UserDefinedJavaClassDef.ClassType.TRANSFORM_CLASS, "Processor", code);
+ userDefinedJavaClassDef.setActive(true);
+ definitions.add(userDefinedJavaClassDef);
+ userDefinedJavaClassMeta.replaceDefinitions(definitions);
+
+ StepMeta userDefinedJavaClassStep = new StepMeta("UserDefinedJavaClass", "UserDefinedJavaClass", userDefinedJavaClassMeta);
+ userDefinedJavaClassStep.setLocation(300, 100);
+ userDefinedJavaClassStep.setDraw(true);
+ transMeta.addStep(userDefinedJavaClassStep);
+
+ //第三个 (HBaseOutputMeta)
+ NamedClusterService namedClusterService = new NamedClusterManager();
+ NamedCluster clusterTemplate = new NamedClusterImpl();
+ clusterTemplate.setName("hadoop");
+ clusterTemplate.setZooKeeperHost(zkHost);
+ clusterTemplate.setZooKeeperPort(zkPort);
+ clusterTemplate.setStorageScheme("HDFS");
+ namedClusterService.setClusterTemplate(clusterTemplate);
+
+ List providers = new ArrayList<>();
+ ClusterInitializer clusterInitializer = new ClusterInitializerImpl(providers);
+ NamedClusterServiceLocator namedClusterServiceLocator = new NamedClusterServiceLocatorImpl(clusterInitializer);
+
+ List runtimeTestActionHandlers = new ArrayList<>();
+ RuntimeTestActionHandler defaultHandler = null;
+
+ RuntimeTestActionService runtimeTestActionService = new RuntimeTestActionServiceImpl(runtimeTestActionHandlers, defaultHandler);
+ RuntimeTester runtimeTester = new RuntimeTesterImpl(new ArrayList<>( Arrays.asList( mock( RuntimeTest.class ) ) ), mock( ExecutorService.class ), "modules");
+
+ Put put = new Put((datasetTable.getId() + "," + "target_mapping").getBytes());
+ for (DatasetTableField datasetTableField : datasetTableFields) {
+ put.addColumn("columns".getBytes(), (dataease_column_family + "," + datasetTableField.getOriginName() + "," + datasetTableField.getOriginName()).getBytes(), transToColumnType(datasetTableField.getDeType()).getBytes());
+ }
+ put.addColumn("key".getBytes(), "uuid".getBytes(), "String".getBytes());
+ TableName pentaho_mappings = TableName.valueOf(this.pentaho_mappings);
+ Table tab = getConnection().getTable(pentaho_mappings);
+ tab.put(put);
+
+ HBaseOutputMeta hBaseOutputMeta = new HBaseOutputMeta(namedClusterService, namedClusterServiceLocator, runtimeTestActionService, runtimeTester);
+ hBaseOutputMeta.setTargetTableName(datasetTable.getId());
+ hBaseOutputMeta.setTargetMappingName("target_mapping");
+ hBaseOutputMeta.setNamedCluster(clusterTemplate);
+ hBaseOutputMeta.setCoreConfigURL(hbase_conf_file);
+ if(extractType.equalsIgnoreCase("incremental_delete")){
+ hBaseOutputMeta.setDeleteRowKey(true);
+ }
+ StepMeta tostep = new StepMeta("HBaseOutput", "HBaseOutput", hBaseOutputMeta);
+ tostep.setLocation(600, 100);
+
+ tostep.setDraw(true);
+ transMeta.addStep(tostep);
+ TransHopMeta hi1 = new TransHopMeta(fromStep, userDefinedJavaClassStep);
+ TransHopMeta hi2 = new TransHopMeta(userDefinedJavaClassStep, tostep);
+ transMeta.addTransHop(hi1);
+ transMeta.addTransHop(hi2);
+
+ String transXml = transMeta.getXML();
+ File file = new File(root_path + transName + ".ktr");
+ FileUtils.writeStringToFile(file, transXml, "UTF-8");
+ }
+
+ public String transToColumnType(Integer field) {
+ switch (field) {
+ case 0:
+ return "String";
+ case 1:
+ return "Date";
+ case 2:
+ return "Long";
+ default:
+ return "String";
}
}
- public String generateStr(int size, String[] d ){
- String str = null;
- for(int i=0;i valueMetaList = data.outputRowMeta.getValueMetaList();\n" +
+ " for (ValueMetaInterface valueMetaInterface : valueMetaList) {\n" +
+ "\t if(!valueMetaInterface.getName().equalsIgnoreCase(\"uuid\")){\n" +
+ " str = str + get(Fields.In, valueMetaInterface.getName()).getString(r);\n" +
+ " }\n" +
+ " }\n" +
+ "\n" +
+ " String md5 = md5(str);\n" +
+ " get(Fields.Out, \"uuid\").setValue(r, md5);\n" +
+ "\n" +
+ " putRow(data.outputRowMeta, r);\n" +
+ "\n" +
+ " return true;\n" +
+ "}\n" +
+ "\n" +
+ " private static final char[] HEX_DIGITS = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};\n" +
+ " private static final String UTF_8 = \"UTF-8\";\n" +
+ " public static String md5(String src) {\n" +
+ " return md5(src, UTF_8);\n" +
+ " }\n" +
+ " public static String md5(String src, String charset) {\n" +
+ " try {\n" +
+ " byte[] strTemp = charset == null || charset.equals(\"\") ? src.getBytes() : src.getBytes(charset);\n" +
+ " MessageDigest mdTemp = MessageDigest.getInstance(\"MD5\");\n" +
+ " mdTemp.update(strTemp);\n" +
+ "\n" +
+ " byte[] md = mdTemp.digest();\n" +
+ " int j = md.length;\n" +
+ " char[] str = new char[j * 2];\n" +
+ " int k = 0;\n" +
+ "\n" +
+ " for (byte byte0 : md) {\n" +
+ " str[k++] = HEX_DIGITS[byte0 >>> 4 & 0xf];\n" +
+ " str[k++] = HEX_DIGITS[byte0 & 0xf];\n" +
+ " }\n" +
+ "\n" +
+ " return new String(str);\n" +
+ " } catch (Exception e) {\n" +
+ " throw new RuntimeException(\"MD5 encrypt error:\", e);\n" +
+ " }\n" +
+ " }\n" +
+ "\n" +
+ " public String generateStr(int size, String[] d ){\n" +
+ " String str = null;\n" +
+ " for(int i=0;i