diff --git a/backend/src/main/java/io/dataease/config/CommonConfig.java b/backend/src/main/java/io/dataease/config/CommonConfig.java index 6e98754712..cfe2d068f9 100644 --- a/backend/src/main/java/io/dataease/config/CommonConfig.java +++ b/backend/src/main/java/io/dataease/config/CommonConfig.java @@ -22,15 +22,15 @@ public class CommonConfig { private Environment env; // 保存了配置文件的信息 private static String root_path = "/opt/dataease/data/kettle/"; - @Bean - @ConditionalOnMissingBean - public org.apache.hadoop.conf.Configuration configuration() { - org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration(); - configuration.set("hbase.zookeeper.quorum", env.getProperty("hbase.zookeeper.quorum")); - configuration.set("hbase.zookeeper.property.clientPort", env.getProperty("hbase.zookeeper.property.clientPort")); - configuration.set("hbase.client.retries.number", env.getProperty("hbase.client.retries.number", "1")); - return configuration; - } +// @Bean +// @ConditionalOnMissingBean +// public org.apache.hadoop.conf.Configuration configuration() { +// org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration(); +// configuration.set("hbase.zookeeper.quorum", env.getProperty("hbase.zookeeper.quorum")); +// configuration.set("hbase.zookeeper.property.clientPort", env.getProperty("hbase.zookeeper.property.clientPort")); +// configuration.set("hbase.client.retries.number", env.getProperty("hbase.client.retries.number", "1")); +// return configuration; +// } @Bean @ConditionalOnMissingBean diff --git a/backend/src/main/java/io/dataease/datasource/provider/JdbcProvider.java b/backend/src/main/java/io/dataease/datasource/provider/JdbcProvider.java index 6517233c45..e3868bc2d2 100644 --- a/backend/src/main/java/io/dataease/datasource/provider/JdbcProvider.java +++ b/backend/src/main/java/io/dataease/datasource/provider/JdbcProvider.java @@ -7,6 +7,7 @@ import io.dataease.datasource.dto.MysqlConfigrationDTO; import io.dataease.datasource.dto.SqlServerConfigration; import io.dataease.datasource.dto.TableFiled; import io.dataease.datasource.request.DatasourceRequest; +import org.apache.arrow.util.VisibleForTesting; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Service; @@ -39,6 +40,23 @@ public class JdbcProvider extends DatasourceProvider { return list; } + @VisibleForTesting + public void exec(DatasourceRequest datasourceRequest) throws Exception { + Connection connection = null; + try { + connection = getConnectionFromPool(datasourceRequest); + Statement stat = connection.createStatement(); + stat.execute(datasourceRequest.getQuery()); + } catch (SQLException e) { + throw new Exception("ERROR:" + e.getMessage(), e); + } catch (Exception e) { + throw new Exception("ERROR:" + e.getMessage(), e); + }finally { + returnSource(connection, datasourceRequest.getDatasource().getId()); + } + } + + @Override public ResultSet getDataResultSet(DatasourceRequest datasourceRequest) throws Exception { ResultSet rs; @@ -47,7 +65,6 @@ public class JdbcProvider extends DatasourceProvider { connection = getConnectionFromPool(datasourceRequest); Statement stat = connection.createStatement(); rs = stat.executeQuery(datasourceRequest.getQuery()); - returnSource(connection, datasourceRequest.getDatasource().getId()); } catch (SQLException e) { throw new Exception("ERROR:" + e.getMessage(), e); } catch (Exception e) { @@ -66,7 +83,6 @@ public class JdbcProvider extends DatasourceProvider { connection = getConnectionFromPool(datasourceRequest); Statement stat = connection.createStatement(); ResultSet rs = stat.executeQuery(datasourceRequest.getQuery() + MessageFormat.format(" LIMIT {0}, {1}", (datasourceRequest.getStartPage() - 1) * datasourceRequest.getPageSize(), datasourceRequest.getPageSize())); - returnSource(connection, datasourceRequest.getDatasource().getId()); list = fetchResult(rs); } catch (SQLException e) { throw new Exception("ERROR:" + e.getMessage(), e); @@ -174,8 +190,6 @@ public class JdbcProvider extends DatasourceProvider { return list; } - ; - @Override public void test(DatasourceRequest datasourceRequest) throws Exception { String queryStr = getTablesSql(datasourceRequest); diff --git a/backend/src/main/java/io/dataease/listener/AppStartReadHBaseListener.java b/backend/src/main/java/io/dataease/listener/AppStartReadHBaseListener.java index d8dcc2e3e6..f7ba6a1a7d 100644 --- a/backend/src/main/java/io/dataease/listener/AppStartReadHBaseListener.java +++ b/backend/src/main/java/io/dataease/listener/AppStartReadHBaseListener.java @@ -33,20 +33,20 @@ public class AppStartReadHBaseListener implements ApplicationListener datasetTables = datasetTableMapper.selectByExampleWithBLOBs(datasetTableExample); - for (DatasetTable table : datasetTables) { -// commonThreadPool.addTask(() -> { - try { - List fields = dataSetTableFieldsService.getFieldsByTableId(table.getId()); - sparkCalc.getHBaseDataAndCache(table.getId(), fields); - } catch (Exception e) { - e.printStackTrace(); - } -// }); - } +// System.out.println("================= Read HBase start ================="); +// // 项目启动,从数据集中找到定时抽取的表,从HBase中读取放入缓存 +// DatasetTableExample datasetTableExample = new DatasetTableExample(); +// datasetTableExample.createCriteria().andModeEqualTo(1); +// List datasetTables = datasetTableMapper.selectByExampleWithBLOBs(datasetTableExample); +// for (DatasetTable table : datasetTables) { +//// commonThreadPool.addTask(() -> { +// try { +// List fields = dataSetTableFieldsService.getFieldsByTableId(table.getId()); +// sparkCalc.getHBaseDataAndCache(table.getId(), fields); +// } catch (Exception e) { +// e.printStackTrace(); +// } +//// }); +// } } } diff --git a/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java b/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java index 8c88a6afe2..f9a9c5820b 100644 --- a/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java +++ b/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java @@ -56,6 +56,9 @@ import org.pentaho.di.trans.TransHopMeta; import org.pentaho.di.trans.TransMeta; import org.pentaho.di.trans.step.StepMeta; import org.pentaho.di.trans.steps.tableinput.TableInputMeta; +import org.pentaho.di.trans.steps.textfileoutput.TextFileField; +import org.pentaho.di.trans.steps.textfileoutput.TextFileOutput; +import org.pentaho.di.trans.steps.textfileoutput.TextFileOutputMeta; import org.pentaho.di.trans.steps.userdefinedjavaclass.InfoStepDefinition; import org.pentaho.di.trans.steps.userdefinedjavaclass.UserDefinedJavaClassDef; import org.pentaho.di.trans.steps.userdefinedjavaclass.UserDefinedJavaClassMeta; @@ -105,6 +108,7 @@ public class ExtractDataService { private static String currentUpdateTime = "${__current_update_time__}"; private static String dataease_column_family = "dataease"; private static String root_path = "/opt/dataease/data/kettle/"; + private static String data_path = "/opt/dataease/data/db/"; private static String hbase_conf_file = "/opt/dataease/conf/hbase-site.xml"; private static String pentaho_mappings = "pentaho_mappings"; @@ -129,7 +133,7 @@ public class ExtractDataService { DatasetTableTaskLog datasetTableTaskLog = new DatasetTableTaskLog(); UpdateType updateType = UpdateType.valueOf(type); try { - Admin admin = getConnection().getAdmin(); +// Admin admin = getConnection().getAdmin(); DatasetTable datasetTable = dataSetTableService.get(datasetTableId); Datasource datasource = datasourceMapper.selectByPrimaryKey(datasetTable.getDataSourceId()); List datasetTableFields = dataSetTableFieldsService.list(DatasetTableField.builder().tableId(datasetTable.getId()).build()); @@ -141,10 +145,10 @@ public class ExtractDataService { writeDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, taskId); //check pentaho_mappings table - TableName pentaho_mappings = TableName.valueOf(this.pentaho_mappings); - if (!admin.tableExists(pentaho_mappings)) { - creatHaseTable(pentaho_mappings, admin, Arrays.asList("columns", "key")); - } +// TableName pentaho_mappings = TableName.valueOf(this.pentaho_mappings); +// if (!admin.tableExists(pentaho_mappings)) { +// creatHaseTable(pentaho_mappings, admin, Arrays.asList("columns", "key")); +// } //check pentaho files if (!isExitFile("job_" + datasetTableId + ".kjb") || !isExitFile("trans_" + datasetTableId + ".ktr")) { @@ -152,25 +156,25 @@ public class ExtractDataService { generateJobFile("all_scope", datasetTable); } - if (!admin.tableExists(hbaseTable)) { - creatHaseTable(hbaseTable, admin, Arrays.asList(dataease_column_family)); - } - admin.disableTable(hbaseTable); - admin.truncateTable(hbaseTable, true); +// if (!admin.tableExists(hbaseTable)) { +// creatHaseTable(hbaseTable, admin, Arrays.asList(dataease_column_family)); +// } +// admin.disableTable(hbaseTable); +// admin.truncateTable(hbaseTable, true); extractData(datasetTable, "all_scope"); // after sync complete,read data to cache from HBase - sparkCalc.getHBaseDataAndCache(datasetTableId, dataSetTableFieldsService.getFieldsByTableId(datasetTableId)); +// sparkCalc.getHBaseDataAndCache(datasetTableId, dataSetTableFieldsService.getFieldsByTableId(datasetTableId)); datasetTableTaskLog.setStatus(JobStatus.Completed.name()); datasetTableTaskLog.setEndTime(System.currentTimeMillis()); dataSetTableTaskLogService.save(datasetTableTaskLog); break; case add_scope: // 增量更新 - if (!admin.tableExists(hbaseTable)) { - LogUtil.error("TableName error, dataaset: " + datasetTableId); - return; - } +// if (!admin.tableExists(hbaseTable)) { +// LogUtil.error("TableName error, dataaset: " + datasetTableId); +// return; +// } DatasetTableIncrementalConfig datasetTableIncrementalConfig = dataSetTableService.incrementalConfig(datasetTableId); if (datasetTableIncrementalConfig == null || StringUtils.isEmpty(datasetTableIncrementalConfig.getTableId())) { return; @@ -209,7 +213,7 @@ public class ExtractDataService { extractData(datasetTable, "incremental_delete"); } // after sync complete,read data to cache from HBase - sparkCalc.getHBaseDataAndCache(datasetTableId, dataSetTableFieldsService.getFieldsByTableId(datasetTableId)); +// sparkCalc.getHBaseDataAndCache(datasetTableId, dataSetTableFieldsService.getFieldsByTableId(datasetTableId)); datasetTableTaskLog.setStatus(JobStatus.Completed.name()); datasetTableTaskLog.setEndTime(System.currentTimeMillis()); dataSetTableTaskLogService.save(datasetTableTaskLog); @@ -239,17 +243,17 @@ public class ExtractDataService { dataSetTableTaskLogService.save(datasetTableTaskLog); } - private void creatHaseTable(TableName tableName, Admin admin, List columnFamily) throws Exception { - TableDescriptorBuilder descBuilder = TableDescriptorBuilder.newBuilder(tableName); - Collection families = new ArrayList<>(); - for (String s : columnFamily) { - ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(s); - families.add(hcd); - } - descBuilder.setColumnFamilies(families); - TableDescriptor desc = descBuilder.build(); - admin.createTable(desc); - } +// private void creatHaseTable(TableName tableName, Admin admin, List columnFamily) throws Exception { +// TableDescriptorBuilder descBuilder = TableDescriptorBuilder.newBuilder(tableName); +// Collection families = new ArrayList<>(); +// for (String s : columnFamily) { +// ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(s); +// families.add(hcd); +// } +// descBuilder.setColumnFamilies(families); +// TableDescriptor desc = descBuilder.build(); +// admin.createTable(desc); +// } private void extractData(DatasetTable datasetTable, String extractType) throws Exception { KettleFileRepository repository = CommonBeanFactory.getBean(KettleFileRepository.class); @@ -285,13 +289,13 @@ public class ExtractDataService { } } - private synchronized Connection getConnection() throws Exception { - if (connection == null || connection.isClosed()) { - Configuration cfg = CommonBeanFactory.getBean(Configuration.class); - connection = ConnectionFactory.createConnection(cfg, pool); - } - return connection; - } +// private synchronized Connection getConnection() throws Exception { +// if (connection == null || connection.isClosed()) { +// Configuration cfg = CommonBeanFactory.getBean(Configuration.class); +// connection = ConnectionFactory.createConnection(cfg, pool); +// } +// return connection; +// } private boolean isExitFile(String fileName) { File file = new File(root_path + fileName); @@ -380,6 +384,15 @@ public class ExtractDataService { switch (extractType) { case "all_scope": transName = "trans_" + datasetTable.getId(); + datasetTableFields.sort((o1, o2) -> { + if (o1.getOriginName() == null) { + return -1; + } + if (o2.getOriginName() == null) { + return 1; + } + return o1.getOriginName().compareTo(o2.getOriginName()); + }); selectSQL = dataSetTableService.createQuerySQL(datasource.getType(), table, datasetTableFields.stream().map(DatasetTableField::getOriginName).toArray(String[]::new)); break; case "incremental_add": @@ -422,70 +435,90 @@ public class ExtractDataService { fromStep.setLocation(100, 100); transMeta.addStep(fromStep); - //第二个 (User defined Java class) - UserDefinedJavaClassMeta userDefinedJavaClassMeta = new UserDefinedJavaClassMeta(); - List fields = new ArrayList<>(); - UserDefinedJavaClassMeta.FieldInfo fieldInfo = new UserDefinedJavaClassMeta.FieldInfo("uuid", ValueMetaInterface.TYPE_STRING, -1, -1); - fields.add(fieldInfo); - userDefinedJavaClassMeta.setFieldInfo(fields); - List definitions = new ArrayList(); - UserDefinedJavaClassDef userDefinedJavaClassDef = new UserDefinedJavaClassDef(UserDefinedJavaClassDef.ClassType.TRANSFORM_CLASS, "Processor", code); - userDefinedJavaClassDef.setActive(true); - definitions.add(userDefinedJavaClassDef); - userDefinedJavaClassMeta.replaceDefinitions(definitions); + //第二个 (TextFileOutput) + TextFileOutputMeta textFileOutputMeta = new TextFileOutputMeta(); + textFileOutputMeta.setFilename(data_path + datasetTable.getId()); + textFileOutputMeta.setExtension("txt"); + textFileOutputMeta.setSeparator(";"); + textFileOutputMeta.setFileCompression("None"); + textFileOutputMeta.setEnclosure("\""); + textFileOutputMeta.setEncoding("UTF-8"); + TextFileField[] outputFields = new TextFileField[1]; + outputFields[0] = new TextFileField(); + textFileOutputMeta.setOutputFields(outputFields); - StepMeta userDefinedJavaClassStep = new StepMeta("UserDefinedJavaClass", "UserDefinedJavaClass", userDefinedJavaClassMeta); - userDefinedJavaClassStep.setLocation(300, 100); - userDefinedJavaClassStep.setDraw(true); - transMeta.addStep(userDefinedJavaClassStep); - - //第三个 (HBaseOutputMeta) - NamedClusterService namedClusterService = new NamedClusterManager(); - NamedCluster clusterTemplate = new NamedClusterImpl(); - clusterTemplate.setName("hadoop"); - clusterTemplate.setZooKeeperHost(zkHost); - clusterTemplate.setZooKeeperPort(zkPort); - clusterTemplate.setStorageScheme("HDFS"); - namedClusterService.setClusterTemplate(clusterTemplate); - - List providers = new ArrayList<>(); - ClusterInitializer clusterInitializer = new ClusterInitializerImpl(providers); - NamedClusterServiceLocator namedClusterServiceLocator = new NamedClusterServiceLocatorImpl(clusterInitializer); - - List runtimeTestActionHandlers = new ArrayList<>(); - RuntimeTestActionHandler defaultHandler = null; - - RuntimeTestActionService runtimeTestActionService = new RuntimeTestActionServiceImpl(runtimeTestActionHandlers, defaultHandler); - RuntimeTester runtimeTester = new RuntimeTesterImpl(new ArrayList<>(Arrays.asList(mock(RuntimeTest.class))), mock(ExecutorService.class), "modules"); - - Put put = new Put((datasetTable.getId() + "," + "target_mapping").getBytes()); - for (DatasetTableField datasetTableField : datasetTableFields) { - put.addColumn("columns".getBytes(), (dataease_column_family + "," + datasetTableField.getOriginName() + "," + datasetTableField.getOriginName()).getBytes(), transToColumnType(datasetTableField.getDeType()).getBytes()); - } - put.addColumn("key".getBytes(), "uuid".getBytes(), "String".getBytes()); - TableName pentaho_mappings = TableName.valueOf(this.pentaho_mappings); - Table tab = getConnection().getTable(pentaho_mappings); - tab.put(put); - - HBaseOutputMeta hBaseOutputMeta = new HBaseOutputMeta(namedClusterService, namedClusterServiceLocator, runtimeTestActionService, runtimeTester); - hBaseOutputMeta.setTargetTableName(datasetTable.getId()); - hBaseOutputMeta.setTargetMappingName("target_mapping"); - hBaseOutputMeta.setNamedCluster(clusterTemplate); - hBaseOutputMeta.setCoreConfigURL(hbase_conf_file); - hBaseOutputMeta.setDisableWriteToWAL(true); - hBaseOutputMeta.setWriteBufferSize("31457280"); //30M - if (extractType.equalsIgnoreCase("incremental_delete")) { - hBaseOutputMeta.setDeleteRowKey(true); - } - StepMeta tostep = new StepMeta("HBaseOutput", "HBaseOutput", hBaseOutputMeta); + StepMeta tostep = new StepMeta("TextFileOutput", "TextFileOutput", textFileOutputMeta); tostep.setLocation(600, 100); - tostep.setDraw(true); transMeta.addStep(tostep); - TransHopMeta hi1 = new TransHopMeta(fromStep, userDefinedJavaClassStep); - TransHopMeta hi2 = new TransHopMeta(userDefinedJavaClassStep, tostep); + TransHopMeta hi1 = new TransHopMeta(fromStep, tostep); transMeta.addTransHop(hi1); - transMeta.addTransHop(hi2); + + +// //第二个 (User defined Java class) +// UserDefinedJavaClassMeta userDefinedJavaClassMeta = new UserDefinedJavaClassMeta(); +// List fields = new ArrayList<>(); +// UserDefinedJavaClassMeta.FieldInfo fieldInfo = new UserDefinedJavaClassMeta.FieldInfo("uuid", ValueMetaInterface.TYPE_STRING, -1, -1); +// fields.add(fieldInfo); +// userDefinedJavaClassMeta.setFieldInfo(fields); +// List definitions = new ArrayList(); +// UserDefinedJavaClassDef userDefinedJavaClassDef = new UserDefinedJavaClassDef(UserDefinedJavaClassDef.ClassType.TRANSFORM_CLASS, "Processor", code); +// userDefinedJavaClassDef.setActive(true); +// definitions.add(userDefinedJavaClassDef); +// userDefinedJavaClassMeta.replaceDefinitions(definitions); +// +// StepMeta userDefinedJavaClassStep = new StepMeta("UserDefinedJavaClass", "UserDefinedJavaClass", userDefinedJavaClassMeta); +// userDefinedJavaClassStep.setLocation(300, 100); +// userDefinedJavaClassStep.setDraw(true); +// transMeta.addStep(userDefinedJavaClassStep); +// +// //第三个 (HBaseOutputMeta) +// NamedClusterService namedClusterService = new NamedClusterManager(); +// NamedCluster clusterTemplate = new NamedClusterImpl(); +// clusterTemplate.setName("hadoop"); +// clusterTemplate.setZooKeeperHost(zkHost); +// clusterTemplate.setZooKeeperPort(zkPort); +// clusterTemplate.setStorageScheme("HDFS"); +// namedClusterService.setClusterTemplate(clusterTemplate); +// +// List providers = new ArrayList<>(); +// ClusterInitializer clusterInitializer = new ClusterInitializerImpl(providers); +// NamedClusterServiceLocator namedClusterServiceLocator = new NamedClusterServiceLocatorImpl(clusterInitializer); +// +// List runtimeTestActionHandlers = new ArrayList<>(); +// RuntimeTestActionHandler defaultHandler = null; +// +// RuntimeTestActionService runtimeTestActionService = new RuntimeTestActionServiceImpl(runtimeTestActionHandlers, defaultHandler); +// RuntimeTester runtimeTester = new RuntimeTesterImpl(new ArrayList<>(Arrays.asList(mock(RuntimeTest.class))), mock(ExecutorService.class), "modules"); +// +// Put put = new Put((datasetTable.getId() + "," + "target_mapping").getBytes()); +// for (DatasetTableField datasetTableField : datasetTableFields) { +// put.addColumn("columns".getBytes(), (dataease_column_family + "," + datasetTableField.getOriginName() + "," + datasetTableField.getOriginName()).getBytes(), transToColumnType(datasetTableField.getDeType()).getBytes()); +// } +// put.addColumn("key".getBytes(), "uuid".getBytes(), "String".getBytes()); +// TableName pentaho_mappings = TableName.valueOf(this.pentaho_mappings); +// Table tab = getConnection().getTable(pentaho_mappings); +// tab.put(put); +// +// HBaseOutputMeta hBaseOutputMeta = new HBaseOutputMeta(namedClusterService, namedClusterServiceLocator, runtimeTestActionService, runtimeTester); +// hBaseOutputMeta.setTargetTableName(datasetTable.getId()); +// hBaseOutputMeta.setTargetMappingName("target_mapping"); +// hBaseOutputMeta.setNamedCluster(clusterTemplate); +// hBaseOutputMeta.setCoreConfigURL(hbase_conf_file); +// hBaseOutputMeta.setDisableWriteToWAL(true); +// hBaseOutputMeta.setWriteBufferSize("31457280"); //30M +// if (extractType.equalsIgnoreCase("incremental_delete")) { +// hBaseOutputMeta.setDeleteRowKey(true); +// } +// StepMeta tostep = new StepMeta("HBaseOutput", "HBaseOutput", hBaseOutputMeta); +// tostep.setLocation(600, 100); +// +// tostep.setDraw(true); +// transMeta.addStep(tostep); +// TransHopMeta hi1 = new TransHopMeta(fromStep, userDefinedJavaClassStep); +// TransHopMeta hi2 = new TransHopMeta(userDefinedJavaClassStep, tostep); +// transMeta.addTransHop(hi1); +// transMeta.addTransHop(hi2); String transXml = transMeta.getXML(); File file = new File(root_path + transName + ".ktr"); diff --git a/backend/src/main/java/io/dataease/service/spark/SparkCalc.java b/backend/src/main/java/io/dataease/service/spark/SparkCalc.java index 0824f2258e..53f8dc0116 100644 --- a/backend/src/main/java/io/dataease/service/spark/SparkCalc.java +++ b/backend/src/main/java/io/dataease/service/spark/SparkCalc.java @@ -41,6 +41,7 @@ import java.util.List; @Service public class SparkCalc { private static String column_family = "dataease"; + private static String data_path = "/opt/dataease/data/db/"; @Resource private Environment env; // 保存了配置文件的信息 @@ -54,12 +55,13 @@ public class SparkCalc { sqlContext.setConf("spark.sql.shuffle.partitions", env.getProperty("spark.sql.shuffle.partitions", "1")); sqlContext.setConf("spark.default.parallelism", env.getProperty("spark.default.parallelism", "1")); - Dataset dataFrame = CacheUtil.getInstance().getCacheData(hTable); - if (ObjectUtils.isEmpty(dataFrame)) { - dataFrame = getHBaseDataAndCache(sparkContext, sqlContext, hTable, fields); - } + Dataset dataFrame = getData(sparkContext, sqlContext, hTable, fields); +// Dataset dataFrame = CacheUtil.getInstance().getCacheData(hTable); +// if (ObjectUtils.isEmpty(dataFrame)) { +// dataFrame = getData(sparkContext, sqlContext, hTable, fields); +// } - dataFrame.createOrReplaceTempView(tmpTable); + dataFrame.createOrReplaceTempView( tmpTable); Dataset sql = sqlContext.sql(getSQL(xAxis, yAxis, tmpTable, requestList)); // transform List data = new ArrayList<>(); @@ -86,6 +88,69 @@ public class SparkCalc { return getHBaseDataAndCache(sparkContext, sqlContext, hTable, fields); } + public Dataset getData(JavaSparkContext sparkContext, SQLContext sqlContext, String tableId, List fields) throws Exception { + fields.sort((o1, o2) -> { + if (o1.getOriginName() == null) { + return -1; + } + if (o2.getOriginName() == null) { + return 1; + } + return o1.getOriginName().compareTo(o2.getOriginName()); + }); + + JavaRDD pairRDD = sparkContext.textFile(data_path + tableId + ".txt"); + + JavaRDD rdd = pairRDD.mapPartitions( (FlatMapFunction, Row>) tuple2Iterator -> { + List iterator = new ArrayList<>(); + while (tuple2Iterator.hasNext()) { + String[] items = tuple2Iterator.next().split(";"); + List list = new ArrayList<>(); + for(int i=0; i structFields = new ArrayList<>(); + // struct顺序要与rdd顺序一致 + fields.forEach(x -> { + if (x.getDeType() == 0 || x.getDeType() == 1) { + structFields.add(DataTypes.createStructField(x.getOriginName(), DataTypes.StringType, true)); + } else if (x.getDeType() == 2) { + structFields.add(DataTypes.createStructField(x.getOriginName(), DataTypes.LongType, true)); + } else if (x.getDeType() == 3) { + structFields.add(DataTypes.createStructField(x.getOriginName(), DataTypes.DoubleType, true)); + } + }); + StructType structType = DataTypes.createStructType(structFields); + + Dataset dataFrame = sqlContext.createDataFrame(rdd, structType); + return dataFrame; + } + public Dataset getHBaseDataAndCache(JavaSparkContext sparkContext, SQLContext sqlContext, String hTable, List fields) throws Exception { Scan scan = new Scan(); scan.addFamily(Bytes.toBytes(column_family)); @@ -145,7 +210,7 @@ public class SparkCalc { StructType structType = DataTypes.createStructType(structFields); Dataset dataFrame = sqlContext.createDataFrame(rdd, structType).persist(StorageLevel.MEMORY_AND_DISK_SER()); - CacheUtil.getInstance().addCacheData(hTable, dataFrame); +// CacheUtil.getInstance().addCacheData(hTable, dataFrame); dataFrame.count(); return dataFrame; }