From 6fb10cdfff552c4e3f2b7bcb0130275951c3e341 Mon Sep 17 00:00:00 2001 From: junjie Date: Mon, 29 Mar 2021 15:09:16 +0800 Subject: [PATCH 1/2] =?UTF-8?q?feat(=E6=95=B0=E6=8D=AE=E9=9B=86):=20Spark?= =?UTF-8?q?=E5=88=9D=E6=AD=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/pom.xml | 12 ++ .../java/io/dataease/config/CommonConfig.java | 17 +- .../dataease/dto/chart/ChartViewFieldDTO.java | 3 +- .../service/chart/ChartViewService.java | 45 +++-- .../io/dataease/service/spark/SparkCalc.java | 169 ++++++++++++++++++ frontend/src/views/dataset/data/ViewTable.vue | 2 +- 6 files changed, 223 insertions(+), 25 deletions(-) create mode 100644 backend/src/main/java/io/dataease/service/spark/SparkCalc.java diff --git a/backend/pom.xml b/backend/pom.xml index 0add4657ae..38822a30a2 100644 --- a/backend/pom.xml +++ b/backend/pom.xml @@ -371,6 +371,18 @@ org.apache.spark spark-sql_2.12 ${spark.version} + + + janino + org.codehaus.janino + + + + + + org.codehaus.janino + janino + 3.0.8 diff --git a/backend/src/main/java/io/dataease/config/CommonConfig.java b/backend/src/main/java/io/dataease/config/CommonConfig.java index 229ee6a069..09bc21dad6 100644 --- a/backend/src/main/java/io/dataease/config/CommonConfig.java +++ b/backend/src/main/java/io/dataease/config/CommonConfig.java @@ -3,12 +3,12 @@ package io.dataease.config; import com.fit2cloud.autoconfigure.QuartzAutoConfiguration; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SparkSession; import org.springframework.boot.autoconfigure.AutoConfigureBefore; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.DependsOn; import org.springframework.core.env.Environment; import javax.annotation.Resource; @@ -23,7 +23,7 @@ public class CommonConfig { @Bean @ConditionalOnMissingBean - public org.apache.hadoop.conf.Configuration configuration(){ + public org.apache.hadoop.conf.Configuration configuration() { org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration(); configuration.set("hbase.zookeeper.quorum", env.getProperty("hbase.zookeeper.quorum")); configuration.set("hbase.zookeeper.property.clientPort", env.getProperty("hbase.zookeeper.property.clientPort")); @@ -34,10 +34,19 @@ public class CommonConfig { @Bean @ConditionalOnMissingBean - public JavaSparkContext javaSparkContext(){ - SparkConf conf = new SparkConf().setAppName(env.getProperty("spark.appName", "DataeaseJob") ).setMaster(env.getProperty("spark.master", "local[*]") ); + public JavaSparkContext javaSparkContext() { + SparkConf conf = new SparkConf().setAppName(env.getProperty("spark.appName", "DataeaseJob")).setMaster(env.getProperty("spark.master", "local[*]")); SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); return sc; } + + @Bean + @ConditionalOnMissingBean + public SQLContext sqlContext(JavaSparkContext javaSparkContext) { + SQLContext sqlContext = new SQLContext(javaSparkContext); + sqlContext.setConf("spark.sql.shuffle.partitions", env.getProperty("spark.sql.shuffle.partitions", "1")); + sqlContext.setConf("spark.default.parallelism", env.getProperty("spark.default.parallelism", "1")); + return sqlContext; + } } diff --git a/backend/src/main/java/io/dataease/dto/chart/ChartViewFieldDTO.java b/backend/src/main/java/io/dataease/dto/chart/ChartViewFieldDTO.java index 9eae0f4a58..e4a7127d77 100644 --- a/backend/src/main/java/io/dataease/dto/chart/ChartViewFieldDTO.java +++ b/backend/src/main/java/io/dataease/dto/chart/ChartViewFieldDTO.java @@ -2,6 +2,7 @@ package io.dataease.dto.chart; import lombok.Data; +import java.io.Serializable; import java.util.List; /** @@ -9,7 +10,7 @@ import java.util.List; * @Date 2021/3/11 1:18 下午 */ @Data -public class ChartViewFieldDTO { +public class ChartViewFieldDTO implements Serializable { private String id; private String tableId; diff --git a/backend/src/main/java/io/dataease/service/chart/ChartViewService.java b/backend/src/main/java/io/dataease/service/chart/ChartViewService.java index a283c131d1..202f1d9a17 100644 --- a/backend/src/main/java/io/dataease/service/chart/ChartViewService.java +++ b/backend/src/main/java/io/dataease/service/chart/ChartViewService.java @@ -2,7 +2,10 @@ package io.dataease.service.chart; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; -import io.dataease.base.domain.*; +import io.dataease.base.domain.ChartViewExample; +import io.dataease.base.domain.ChartViewWithBLOBs; +import io.dataease.base.domain.DatasetTable; +import io.dataease.base.domain.Datasource; import io.dataease.base.mapper.ChartViewMapper; import io.dataease.commons.utils.BeanUtils; import io.dataease.controller.request.chart.ChartViewRequest; @@ -15,8 +18,8 @@ import io.dataease.dto.chart.ChartViewDTO; import io.dataease.dto.chart.ChartViewFieldDTO; import io.dataease.dto.chart.Series; import io.dataease.dto.dataset.DataTableInfoDTO; -import io.dataease.service.dataset.DataSetTableFieldsService; import io.dataease.service.dataset.DataSetTableService; +import io.dataease.service.spark.SparkCalc; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Service; @@ -25,7 +28,6 @@ import javax.annotation.Resource; import java.math.BigDecimal; import java.text.MessageFormat; import java.util.*; -import java.util.stream.Collectors; /** * @Author gin @@ -40,7 +42,7 @@ public class ChartViewService { @Resource private DatasourceService datasourceService; @Resource - private DataSetTableFieldsService dataSetTableFieldsService; + private SparkCalc sparkCalc; public ChartViewWithBLOBs save(ChartViewWithBLOBs chartView) { long timestamp = System.currentTimeMillis(); @@ -102,22 +104,27 @@ public class ChartViewService { // 获取数据集 DatasetTable table = dataSetTableService.get(view.getTableId()); - // todo 判断连接方式,直连或者定时抽取 table.mode - Datasource ds = datasourceService.get(table.getDataSourceId()); - DatasourceProvider datasourceProvider = ProviderFactory.getProvider(ds.getType()); - DatasourceRequest datasourceRequest = new DatasourceRequest(); - datasourceRequest.setDatasource(ds); - DataTableInfoDTO dataTableInfoDTO = new Gson().fromJson(table.getInfo(), DataTableInfoDTO.class); - if (StringUtils.equalsIgnoreCase(table.getType(), "db")) { - datasourceRequest.setTable(dataTableInfoDTO.getTable()); - datasourceRequest.setQuery(getSQL(ds.getType(), dataTableInfoDTO.getTable(), xAxis, yAxis)); - } else if (StringUtils.equalsIgnoreCase(table.getType(), "sql")) { - datasourceRequest.setQuery(getSQL(ds.getType(), " (" + dataTableInfoDTO.getSql() + ") AS tmp ", xAxis, yAxis)); + // 判断连接方式,直连或者定时抽取 table.mode + List data = new ArrayList<>(); + if (table.getMode() == 0) {// 直连 + Datasource ds = datasourceService.get(table.getDataSourceId()); + DatasourceProvider datasourceProvider = ProviderFactory.getProvider(ds.getType()); + DatasourceRequest datasourceRequest = new DatasourceRequest(); + datasourceRequest.setDatasource(ds); + DataTableInfoDTO dataTableInfoDTO = new Gson().fromJson(table.getInfo(), DataTableInfoDTO.class); + if (StringUtils.equalsIgnoreCase(table.getType(), "db")) { + datasourceRequest.setTable(dataTableInfoDTO.getTable()); + datasourceRequest.setQuery(getSQL(ds.getType(), dataTableInfoDTO.getTable(), xAxis, yAxis)); + } else if (StringUtils.equalsIgnoreCase(table.getType(), "sql")) { + datasourceRequest.setQuery(getSQL(ds.getType(), " (" + dataTableInfoDTO.getSql() + ") AS tmp ", xAxis, yAxis)); + } + data = datasourceProvider.getData(datasourceRequest); + } else if (table.getMode() == 1) {// 抽取 + DataTableInfoDTO dataTableInfoDTO = new Gson().fromJson(table.getInfo(), DataTableInfoDTO.class); + data = sparkCalc.getData(dataTableInfoDTO.getTable() + "-" + table.getDataSourceId(), xAxis, yAxis, "tmp");// todo hBase table name maybe change } - List data = datasourceProvider.getData(datasourceRequest); - - // todo 处理结果,目前做一个单系列图表,后期图表组件再扩展 + // 图表组件可再扩展 for (ChartViewFieldDTO y : yAxis) { Series series1 = new Series(); series1.setName(y.getName()); @@ -163,7 +170,7 @@ public class ChartViewService { } public String transMysqlSQL(String table, List xAxis, List yAxis) { - // TODO 字段汇总 排序等 + // 字段汇总 排序等 String[] field = yAxis.stream().map(y -> "CAST(" + y.getSummary() + "(" + y.getOriginName() + ") AS DECIMAL(20,2)) AS _" + y.getSummary() + "_" + y.getOriginName()).toArray(String[]::new); String[] group = xAxis.stream().map(ChartViewFieldDTO::getOriginName).toArray(String[]::new); String[] order = yAxis.stream().filter(y -> StringUtils.isNotEmpty(y.getSort()) && !StringUtils.equalsIgnoreCase(y.getSort(), "none")) diff --git a/backend/src/main/java/io/dataease/service/spark/SparkCalc.java b/backend/src/main/java/io/dataease/service/spark/SparkCalc.java new file mode 100644 index 0000000000..1006d898bf --- /dev/null +++ b/backend/src/main/java/io/dataease/service/spark/SparkCalc.java @@ -0,0 +1,169 @@ +package io.dataease.service.spark; + +import io.dataease.commons.utils.CommonBeanFactory; +import io.dataease.dto.chart.ChartViewFieldDTO; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TableInputFormat; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.sql.*; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.springframework.stereotype.Service; +import scala.Tuple2; + +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Base64; +import java.util.List; + +import static org.reflections8.Reflections.collect; + +/** + * @Author gin + * @Date 2021/3/26 3:49 下午 + */ +@Service +public class SparkCalc { + private static String column_family = "dataease"; + + public List getData(String hTable, List xAxis, List yAxis, String tmpTable) throws Exception { + Scan scan = new Scan(); + scan.addFamily(column_family.getBytes()); + ClientProtos.Scan proto = ProtobufUtil.toScan(scan); + String scanToString = new String(Base64.getEncoder().encode(proto.toByteArray())); + + JavaSparkContext sparkContext = CommonBeanFactory.getBean(JavaSparkContext.class); + Configuration conf = CommonBeanFactory.getBean(Configuration.class); + conf.set(TableInputFormat.INPUT_TABLE, hTable); + conf.set(TableInputFormat.SCAN, scanToString); + + JavaPairRDD pairRDD = sparkContext.newAPIHadoopRDD(conf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class); + JavaRDD rdd = pairRDD.map((Function, Row>) immutableBytesWritableResultTuple2 -> + { + Result result = immutableBytesWritableResultTuple2._2; + List list = new ArrayList<>(); + xAxis.forEach(x -> { + if (x.getDeType() == 0 || x.getDeType() == 1) { + list.add(Bytes.toString(result.getValue(column_family.getBytes(), x.getOriginName().getBytes()))); + } else if (x.getDeType() == 2) { + list.add(Long.valueOf(Bytes.toString(result.getValue(column_family.getBytes(), x.getOriginName().getBytes())))); + } + }); + yAxis.forEach(y -> { + if (y.getDeType() == 0 || y.getDeType() == 1) { + list.add(Bytes.toString(result.getValue(column_family.getBytes(), y.getOriginName().getBytes()))); + } else if (y.getDeType() == 2) { + list.add(Long.valueOf(Bytes.toString(result.getValue(column_family.getBytes(), y.getOriginName().getBytes())))); + } + }); + return RowFactory.create(list.toArray()); + }); + + List structFields = new ArrayList<>(); + // struct顺序要与rdd顺序一致 + xAxis.forEach(x -> { + if (x.getDeType() == 0 || x.getDeType() == 1) { + structFields.add(DataTypes.createStructField(x.getOriginName(), DataTypes.StringType, true)); + } else if (x.getDeType() == 2) { + structFields.add(DataTypes.createStructField(x.getOriginName(), DataTypes.LongType, true)); + } + }); + yAxis.forEach(y -> { + if (y.getDeType() == 0 || y.getDeType() == 1) { + structFields.add(DataTypes.createStructField(y.getOriginName(), DataTypes.StringType, true)); + } else if (y.getDeType() == 2) { + structFields.add(DataTypes.createStructField(y.getOriginName(), DataTypes.LongType, true)); + } + }); + StructType structType = DataTypes.createStructType(structFields); + + SQLContext sqlContext = CommonBeanFactory.getBean(SQLContext.class); + Dataset dataFrame = sqlContext.createDataFrame(rdd, structType); + dataFrame.createOrReplaceTempView(tmpTable); + + Dataset sql = sqlContext.sql(getSQL(xAxis, yAxis, tmpTable)); + + List data = new ArrayList<>(); + + // transform + JavaRDD rowJavaRDD = sql.javaRDD(); + List list = rowJavaRDD.collect(); + for (Row row : list) { + String[] r = new String[row.length()]; + for (int i = 0; i < row.length(); i++) { + r[i] = row.get(i).toString(); + } + data.add(r); + } + + return data; + } + + private String getSQL(List xAxis, List yAxis, String table) { + // 字段汇总 排序等 + String[] field = yAxis.stream().map(y -> "CAST(" + y.getSummary() + "(" + y.getOriginName() + ") AS DECIMAL(20,2)) AS _" + y.getSummary() + "_" + y.getOriginName()).toArray(String[]::new); + String[] group = xAxis.stream().map(ChartViewFieldDTO::getOriginName).toArray(String[]::new); + String[] order = yAxis.stream().filter(y -> StringUtils.isNotEmpty(y.getSort()) && !StringUtils.equalsIgnoreCase(y.getSort(), "none")) + .map(y -> "_" + y.getSummary() + "_" + y.getOriginName() + " " + y.getSort()).toArray(String[]::new); + + String sql = MessageFormat.format("SELECT {0},{1} FROM {2} WHERE 1=1 {3} GROUP BY {4} ORDER BY null,{5}", + StringUtils.join(group, ","), + StringUtils.join(field, ","), + table, + "", + StringUtils.join(group, ","), + StringUtils.join(order, ",")); + if (sql.endsWith(",")) { + sql = sql.substring(0, sql.length() - 1); + } + // 如果是对结果字段过滤,则再包裹一层sql + String[] resultFilter = yAxis.stream().filter(y -> CollectionUtils.isNotEmpty(y.getFilter()) && y.getFilter().size() > 0) + .map(y -> { + String[] s = y.getFilter().stream().map(f -> "AND _" + y.getSummary() + "_" + y.getOriginName() + transFilterTerm(f.getTerm()) + f.getValue()).toArray(String[]::new); + return StringUtils.join(s, " "); + }).toArray(String[]::new); + if (resultFilter.length == 0) { + return sql; + } else { + String filterSql = MessageFormat.format("SELECT * FROM {0} WHERE 1=1 {1}", + "(" + sql + ") AS tmp", + StringUtils.join(resultFilter, " ")); + return filterSql; + } + } + + public String transFilterTerm(String term) { + switch (term) { + case "eq": + return " = "; + case "not_eq": + return " <> "; + case "lt": + return " < "; + case "le": + return " <= "; + case "gt": + return " > "; + case "ge": + return " >= "; + case "null": + return " IS NULL "; + case "not_null": + return " IS NOT NULL "; + default: + return ""; + } + } +} diff --git a/frontend/src/views/dataset/data/ViewTable.vue b/frontend/src/views/dataset/data/ViewTable.vue index 2505d6386b..b95df5ae63 100644 --- a/frontend/src/views/dataset/data/ViewTable.vue +++ b/frontend/src/views/dataset/data/ViewTable.vue @@ -27,7 +27,7 @@ 关联视图 TODO - + From e38e60c21c4bdf033c43668a2d29ad3428c7353b Mon Sep 17 00:00:00 2001 From: junjie Date: Mon, 29 Mar 2021 15:37:01 +0800 Subject: [PATCH 2/2] =?UTF-8?q?feat(=E6=95=B0=E6=8D=AE=E9=9B=86):=20Spark?= =?UTF-8?q?=E5=BA=8F=E5=88=97=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/io/dataease/dto/chart/ChartViewFieldFilterDTO.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/backend/src/main/java/io/dataease/dto/chart/ChartViewFieldFilterDTO.java b/backend/src/main/java/io/dataease/dto/chart/ChartViewFieldFilterDTO.java index b5303899a3..09211a9cdf 100644 --- a/backend/src/main/java/io/dataease/dto/chart/ChartViewFieldFilterDTO.java +++ b/backend/src/main/java/io/dataease/dto/chart/ChartViewFieldFilterDTO.java @@ -3,13 +3,15 @@ package io.dataease.dto.chart; import lombok.Getter; import lombok.Setter; +import java.io.Serializable; + /** * @Author gin * @Date 2021/3/25 10:31 上午 */ @Getter @Setter -public class ChartViewFieldFilterDTO { +public class ChartViewFieldFilterDTO implements Serializable { private String term; private String value; }