feat: 抽取数据到doris

This commit is contained in:
taojinlong 2021-04-27 17:32:26 +08:00
parent f91661e7fe
commit 0db59a7fbe
14 changed files with 525 additions and 1093 deletions

View File

@ -17,7 +17,6 @@
<java.version>1.8</java.version> <java.version>1.8</java.version>
<graalvm.version>20.1.0</graalvm.version> <graalvm.version>20.1.0</graalvm.version>
<jwt.version>3.12.1</jwt.version> <jwt.version>3.12.1</jwt.version>
<spark.version>3.1.1</spark.version>
</properties> </properties>
<dependencies> <dependencies>
@ -315,70 +314,12 @@
<artifactId>ehcache</artifactId> <artifactId>ehcache</artifactId>
<version>2.9.1</version> <version>2.9.1</version>
</dependency> </dependency>
<!-- hbase -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>2.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
<version>2.4.1</version>
</dependency>
<dependency> <dependency>
<groupId>org.testng</groupId> <groupId>org.testng</groupId>
<artifactId>testng</artifactId> <artifactId>testng</artifactId>
<version>6.8</version> <version>6.8</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.objenesis</groupId>
<artifactId>objenesis</artifactId>
</exclusion>
</exclusions>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<artifactId>janino</artifactId>
<groupId>org.codehaus.janino</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency> <dependency>
<groupId>org.codehaus.janino</groupId> <groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId> <artifactId>janino</artifactId>
@ -400,27 +341,16 @@
<artifactId>metastore</artifactId> <artifactId>metastore</artifactId>
<version>8.3.0.18-1084</version> <version>8.3.0.18-1084</version>
</dependency> </dependency>
<dependency>
<groupId>pentaho</groupId>
<artifactId>pentaho-big-data-kettle-plugins-hbase-meta</artifactId>
<version>8.3.0.18-1084</version>
</dependency>
<dependency>
<groupId>pentaho</groupId>
<artifactId>pentaho-big-data-kettle-plugins-hbase</artifactId>
<version>8.3.0.18-1084</version>
</dependency>
<dependency>
<groupId>pentaho</groupId>
<artifactId>pentaho-big-data-impl-cluster</artifactId>
<version>8.3.0.18-1084</version>
</dependency>
<dependency> <dependency>
<groupId>org.pentaho.di.plugins</groupId> <groupId>org.pentaho.di.plugins</groupId>
<artifactId>pdi-engine-configuration-impl</artifactId> <artifactId>pdi-engine-configuration-impl</artifactId>
<version>8.3.0.7-683</version> <version>8.3.0.7-683</version>
</dependency> </dependency>
<dependency>
<groupId>c3p0</groupId>
<artifactId>c3p0</artifactId>
<version>0.9.1.2</version>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -18,13 +18,15 @@ public class DatasetTableField implements Serializable {
private String type; private String type;
private Integer size;
private Integer deType;
private Boolean checked; private Boolean checked;
private Integer columnIndex; private Integer columnIndex;
private Long lastSyncTime; private Long lastSyncTime;
private Integer deType;
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
} }

View File

@ -454,6 +454,126 @@ public class DatasetTableFieldExample {
return (Criteria) this; return (Criteria) this;
} }
public Criteria andSizeIsNull() {
addCriterion("`size` is null");
return (Criteria) this;
}
public Criteria andSizeIsNotNull() {
addCriterion("`size` is not null");
return (Criteria) this;
}
public Criteria andSizeEqualTo(Integer value) {
addCriterion("`size` =", value, "size");
return (Criteria) this;
}
public Criteria andSizeNotEqualTo(Integer value) {
addCriterion("`size` <>", value, "size");
return (Criteria) this;
}
public Criteria andSizeGreaterThan(Integer value) {
addCriterion("`size` >", value, "size");
return (Criteria) this;
}
public Criteria andSizeGreaterThanOrEqualTo(Integer value) {
addCriterion("`size` >=", value, "size");
return (Criteria) this;
}
public Criteria andSizeLessThan(Integer value) {
addCriterion("`size` <", value, "size");
return (Criteria) this;
}
public Criteria andSizeLessThanOrEqualTo(Integer value) {
addCriterion("`size` <=", value, "size");
return (Criteria) this;
}
public Criteria andSizeIn(List<Integer> values) {
addCriterion("`size` in", values, "size");
return (Criteria) this;
}
public Criteria andSizeNotIn(List<Integer> values) {
addCriterion("`size` not in", values, "size");
return (Criteria) this;
}
public Criteria andSizeBetween(Integer value1, Integer value2) {
addCriterion("`size` between", value1, value2, "size");
return (Criteria) this;
}
public Criteria andSizeNotBetween(Integer value1, Integer value2) {
addCriterion("`size` not between", value1, value2, "size");
return (Criteria) this;
}
public Criteria andDeTypeIsNull() {
addCriterion("de_type is null");
return (Criteria) this;
}
public Criteria andDeTypeIsNotNull() {
addCriterion("de_type is not null");
return (Criteria) this;
}
public Criteria andDeTypeEqualTo(Integer value) {
addCriterion("de_type =", value, "deType");
return (Criteria) this;
}
public Criteria andDeTypeNotEqualTo(Integer value) {
addCriterion("de_type <>", value, "deType");
return (Criteria) this;
}
public Criteria andDeTypeGreaterThan(Integer value) {
addCriterion("de_type >", value, "deType");
return (Criteria) this;
}
public Criteria andDeTypeGreaterThanOrEqualTo(Integer value) {
addCriterion("de_type >=", value, "deType");
return (Criteria) this;
}
public Criteria andDeTypeLessThan(Integer value) {
addCriterion("de_type <", value, "deType");
return (Criteria) this;
}
public Criteria andDeTypeLessThanOrEqualTo(Integer value) {
addCriterion("de_type <=", value, "deType");
return (Criteria) this;
}
public Criteria andDeTypeIn(List<Integer> values) {
addCriterion("de_type in", values, "deType");
return (Criteria) this;
}
public Criteria andDeTypeNotIn(List<Integer> values) {
addCriterion("de_type not in", values, "deType");
return (Criteria) this;
}
public Criteria andDeTypeBetween(Integer value1, Integer value2) {
addCriterion("de_type between", value1, value2, "deType");
return (Criteria) this;
}
public Criteria andDeTypeNotBetween(Integer value1, Integer value2) {
addCriterion("de_type not between", value1, value2, "deType");
return (Criteria) this;
}
public Criteria andCheckedIsNull() { public Criteria andCheckedIsNull() {
addCriterion("`checked` is null"); addCriterion("`checked` is null");
return (Criteria) this; return (Criteria) this;
@ -633,66 +753,6 @@ public class DatasetTableFieldExample {
addCriterion("last_sync_time not between", value1, value2, "lastSyncTime"); addCriterion("last_sync_time not between", value1, value2, "lastSyncTime");
return (Criteria) this; return (Criteria) this;
} }
public Criteria andDeTypeIsNull() {
addCriterion("de_type is null");
return (Criteria) this;
}
public Criteria andDeTypeIsNotNull() {
addCriterion("de_type is not null");
return (Criteria) this;
}
public Criteria andDeTypeEqualTo(Integer value) {
addCriterion("de_type =", value, "deType");
return (Criteria) this;
}
public Criteria andDeTypeNotEqualTo(Integer value) {
addCriterion("de_type <>", value, "deType");
return (Criteria) this;
}
public Criteria andDeTypeGreaterThan(Integer value) {
addCriterion("de_type >", value, "deType");
return (Criteria) this;
}
public Criteria andDeTypeGreaterThanOrEqualTo(Integer value) {
addCriterion("de_type >=", value, "deType");
return (Criteria) this;
}
public Criteria andDeTypeLessThan(Integer value) {
addCriterion("de_type <", value, "deType");
return (Criteria) this;
}
public Criteria andDeTypeLessThanOrEqualTo(Integer value) {
addCriterion("de_type <=", value, "deType");
return (Criteria) this;
}
public Criteria andDeTypeIn(List<Integer> values) {
addCriterion("de_type in", values, "deType");
return (Criteria) this;
}
public Criteria andDeTypeNotIn(List<Integer> values) {
addCriterion("de_type not in", values, "deType");
return (Criteria) this;
}
public Criteria andDeTypeBetween(Integer value1, Integer value2) {
addCriterion("de_type between", value1, value2, "deType");
return (Criteria) this;
}
public Criteria andDeTypeNotBetween(Integer value1, Integer value2) {
addCriterion("de_type not between", value1, value2, "deType");
return (Criteria) this;
}
} }
public static class Criteria extends GeneratedCriteria { public static class Criteria extends GeneratedCriteria {

View File

@ -7,10 +7,11 @@
<result column="origin_name" jdbcType="VARCHAR" property="originName" /> <result column="origin_name" jdbcType="VARCHAR" property="originName" />
<result column="name" jdbcType="VARCHAR" property="name" /> <result column="name" jdbcType="VARCHAR" property="name" />
<result column="type" jdbcType="VARCHAR" property="type" /> <result column="type" jdbcType="VARCHAR" property="type" />
<result column="size" jdbcType="INTEGER" property="size" />
<result column="de_type" jdbcType="INTEGER" property="deType" />
<result column="checked" jdbcType="BIT" property="checked" /> <result column="checked" jdbcType="BIT" property="checked" />
<result column="column_index" jdbcType="INTEGER" property="columnIndex" /> <result column="column_index" jdbcType="INTEGER" property="columnIndex" />
<result column="last_sync_time" jdbcType="BIGINT" property="lastSyncTime" /> <result column="last_sync_time" jdbcType="BIGINT" property="lastSyncTime" />
<result column="de_type" jdbcType="INTEGER" property="deType" />
</resultMap> </resultMap>
<sql id="Example_Where_Clause"> <sql id="Example_Where_Clause">
<where> <where>
@ -71,8 +72,8 @@
</where> </where>
</sql> </sql>
<sql id="Base_Column_List"> <sql id="Base_Column_List">
id, table_id, origin_name, `name`, `type`, `checked`, column_index, last_sync_time, id, table_id, origin_name, `name`, `type`, `size`, de_type, `checked`, column_index,
de_type last_sync_time
</sql> </sql>
<select id="selectByExample" parameterType="io.dataease.base.domain.DatasetTableFieldExample" resultMap="BaseResultMap"> <select id="selectByExample" parameterType="io.dataease.base.domain.DatasetTableFieldExample" resultMap="BaseResultMap">
select select
@ -106,11 +107,13 @@
</delete> </delete>
<insert id="insert" parameterType="io.dataease.base.domain.DatasetTableField"> <insert id="insert" parameterType="io.dataease.base.domain.DatasetTableField">
insert into dataset_table_field (id, table_id, origin_name, insert into dataset_table_field (id, table_id, origin_name,
`name`, `type`, `checked`, column_index, `name`, `type`, `size`, de_type,
last_sync_time, de_type) `checked`, column_index, last_sync_time
)
values (#{id,jdbcType=VARCHAR}, #{tableId,jdbcType=VARCHAR}, #{originName,jdbcType=VARCHAR}, values (#{id,jdbcType=VARCHAR}, #{tableId,jdbcType=VARCHAR}, #{originName,jdbcType=VARCHAR},
#{name,jdbcType=VARCHAR}, #{type,jdbcType=VARCHAR}, #{checked,jdbcType=BIT}, #{columnIndex,jdbcType=INTEGER}, #{name,jdbcType=VARCHAR}, #{type,jdbcType=VARCHAR}, #{size,jdbcType=INTEGER}, #{deType,jdbcType=INTEGER},
#{lastSyncTime,jdbcType=BIGINT}, #{deType,jdbcType=INTEGER}) #{checked,jdbcType=BIT}, #{columnIndex,jdbcType=INTEGER}, #{lastSyncTime,jdbcType=BIGINT}
)
</insert> </insert>
<insert id="insertSelective" parameterType="io.dataease.base.domain.DatasetTableField"> <insert id="insertSelective" parameterType="io.dataease.base.domain.DatasetTableField">
insert into dataset_table_field insert into dataset_table_field
@ -130,6 +133,12 @@
<if test="type != null"> <if test="type != null">
`type`, `type`,
</if> </if>
<if test="size != null">
`size`,
</if>
<if test="deType != null">
de_type,
</if>
<if test="checked != null"> <if test="checked != null">
`checked`, `checked`,
</if> </if>
@ -139,9 +148,6 @@
<if test="lastSyncTime != null"> <if test="lastSyncTime != null">
last_sync_time, last_sync_time,
</if> </if>
<if test="deType != null">
de_type,
</if>
</trim> </trim>
<trim prefix="values (" suffix=")" suffixOverrides=","> <trim prefix="values (" suffix=")" suffixOverrides=",">
<if test="id != null"> <if test="id != null">
@ -159,6 +165,12 @@
<if test="type != null"> <if test="type != null">
#{type,jdbcType=VARCHAR}, #{type,jdbcType=VARCHAR},
</if> </if>
<if test="size != null">
#{size,jdbcType=INTEGER},
</if>
<if test="deType != null">
#{deType,jdbcType=INTEGER},
</if>
<if test="checked != null"> <if test="checked != null">
#{checked,jdbcType=BIT}, #{checked,jdbcType=BIT},
</if> </if>
@ -168,9 +180,6 @@
<if test="lastSyncTime != null"> <if test="lastSyncTime != null">
#{lastSyncTime,jdbcType=BIGINT}, #{lastSyncTime,jdbcType=BIGINT},
</if> </if>
<if test="deType != null">
#{deType,jdbcType=INTEGER},
</if>
</trim> </trim>
</insert> </insert>
<select id="countByExample" parameterType="io.dataease.base.domain.DatasetTableFieldExample" resultType="java.lang.Long"> <select id="countByExample" parameterType="io.dataease.base.domain.DatasetTableFieldExample" resultType="java.lang.Long">
@ -197,6 +206,12 @@
<if test="record.type != null"> <if test="record.type != null">
`type` = #{record.type,jdbcType=VARCHAR}, `type` = #{record.type,jdbcType=VARCHAR},
</if> </if>
<if test="record.size != null">
`size` = #{record.size,jdbcType=INTEGER},
</if>
<if test="record.deType != null">
de_type = #{record.deType,jdbcType=INTEGER},
</if>
<if test="record.checked != null"> <if test="record.checked != null">
`checked` = #{record.checked,jdbcType=BIT}, `checked` = #{record.checked,jdbcType=BIT},
</if> </if>
@ -206,9 +221,6 @@
<if test="record.lastSyncTime != null"> <if test="record.lastSyncTime != null">
last_sync_time = #{record.lastSyncTime,jdbcType=BIGINT}, last_sync_time = #{record.lastSyncTime,jdbcType=BIGINT},
</if> </if>
<if test="record.deType != null">
de_type = #{record.deType,jdbcType=INTEGER},
</if>
</set> </set>
<if test="_parameter != null"> <if test="_parameter != null">
<include refid="Update_By_Example_Where_Clause" /> <include refid="Update_By_Example_Where_Clause" />
@ -221,10 +233,11 @@
origin_name = #{record.originName,jdbcType=VARCHAR}, origin_name = #{record.originName,jdbcType=VARCHAR},
`name` = #{record.name,jdbcType=VARCHAR}, `name` = #{record.name,jdbcType=VARCHAR},
`type` = #{record.type,jdbcType=VARCHAR}, `type` = #{record.type,jdbcType=VARCHAR},
`size` = #{record.size,jdbcType=INTEGER},
de_type = #{record.deType,jdbcType=INTEGER},
`checked` = #{record.checked,jdbcType=BIT}, `checked` = #{record.checked,jdbcType=BIT},
column_index = #{record.columnIndex,jdbcType=INTEGER}, column_index = #{record.columnIndex,jdbcType=INTEGER},
last_sync_time = #{record.lastSyncTime,jdbcType=BIGINT}, last_sync_time = #{record.lastSyncTime,jdbcType=BIGINT}
de_type = #{record.deType,jdbcType=INTEGER}
<if test="_parameter != null"> <if test="_parameter != null">
<include refid="Update_By_Example_Where_Clause" /> <include refid="Update_By_Example_Where_Clause" />
</if> </if>
@ -244,6 +257,12 @@
<if test="type != null"> <if test="type != null">
`type` = #{type,jdbcType=VARCHAR}, `type` = #{type,jdbcType=VARCHAR},
</if> </if>
<if test="size != null">
`size` = #{size,jdbcType=INTEGER},
</if>
<if test="deType != null">
de_type = #{deType,jdbcType=INTEGER},
</if>
<if test="checked != null"> <if test="checked != null">
`checked` = #{checked,jdbcType=BIT}, `checked` = #{checked,jdbcType=BIT},
</if> </if>
@ -253,9 +272,6 @@
<if test="lastSyncTime != null"> <if test="lastSyncTime != null">
last_sync_time = #{lastSyncTime,jdbcType=BIGINT}, last_sync_time = #{lastSyncTime,jdbcType=BIGINT},
</if> </if>
<if test="deType != null">
de_type = #{deType,jdbcType=INTEGER},
</if>
</set> </set>
where id = #{id,jdbcType=VARCHAR} where id = #{id,jdbcType=VARCHAR}
</update> </update>
@ -265,10 +281,11 @@
origin_name = #{originName,jdbcType=VARCHAR}, origin_name = #{originName,jdbcType=VARCHAR},
`name` = #{name,jdbcType=VARCHAR}, `name` = #{name,jdbcType=VARCHAR},
`type` = #{type,jdbcType=VARCHAR}, `type` = #{type,jdbcType=VARCHAR},
`size` = #{size,jdbcType=INTEGER},
de_type = #{deType,jdbcType=INTEGER},
`checked` = #{checked,jdbcType=BIT}, `checked` = #{checked,jdbcType=BIT},
column_index = #{columnIndex,jdbcType=INTEGER}, column_index = #{columnIndex,jdbcType=INTEGER},
last_sync_time = #{lastSyncTime,jdbcType=BIGINT}, last_sync_time = #{lastSyncTime,jdbcType=BIGINT}
de_type = #{deType,jdbcType=INTEGER}
where id = #{id,jdbcType=VARCHAR} where id = #{id,jdbcType=VARCHAR}
</update> </update>
</mapper> </mapper>

View File

@ -1,6 +1,8 @@
package io.dataease.config; package io.dataease.config;
import com.alibaba.fastjson.JSONObject;
import com.fit2cloud.autoconfigure.QuartzAutoConfiguration; import com.fit2cloud.autoconfigure.QuartzAutoConfiguration;
import io.dataease.base.domain.Datasource;
import io.dataease.commons.utils.CommonThreadPool; import io.dataease.commons.utils.CommonThreadPool;
import org.pentaho.di.core.KettleEnvironment; import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.repository.filerep.KettleFileRepository; import org.pentaho.di.repository.filerep.KettleFileRepository;
@ -21,41 +23,26 @@ public class CommonConfig {
private Environment env; // 保存了配置文件的信息 private Environment env; // 保存了配置文件的信息
private static String root_path = "/opt/dataease/data/kettle/"; private static String root_path = "/opt/dataease/data/kettle/";
// @Bean @Bean(name = "DorisDatasource")
// @ConditionalOnMissingBean @ConditionalOnMissingBean
// public org.apache.hadoop.conf.Configuration configuration() { public Datasource configuration() {
// org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration(); JSONObject jsonObject = new JSONObject();
// configuration.set("hbase.zookeeper.quorum", env.getProperty("hbase.zookeeper.quorum")); jsonObject.put("dataSourceType", "jdbc");
// configuration.set("hbase.zookeeper.property.clientPort", env.getProperty("hbase.zookeeper.property.clientPort")); jsonObject.put("dataBase", env.getProperty("doris.db", "doris"));
// configuration.set("hbase.client.retries.number", env.getProperty("hbase.client.retries.number", "1")); jsonObject.put("username", env.getProperty("doris.user", "root"));
// return configuration; jsonObject.put("password", env.getProperty("doris.password", "dataease"));
// } jsonObject.put("host", env.getProperty("doris.host", "doris"));
jsonObject.put("port", env.getProperty("doris.port", "9030"));
Datasource datasource = new Datasource();
datasource.setId("doris");
datasource.setName("doris");
datasource.setDesc("doris");
datasource.setType("mysql");
datasource.setConfiguration(jsonObject.toJSONString());
return datasource;
}
// @Bean
// @ConditionalOnMissingBean
// public SparkSession javaSparkSession() {
// SparkSession spark = SparkSession.builder()
// .appName(env.getProperty("spark.appName", "DataeaseJob"))
// .master(env.getProperty("spark.master", "local[*]"))
// .config("spark.scheduler.mode", env.getProperty("spark.scheduler.mode", "FAIR"))
//// .config("spark.serializer", env.getProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer"))
//// .config("spark.executor.cores", env.getProperty("spark.executor.cores", "8"))
//// .config("spark.executor.memory", env.getProperty("spark.executor.memory", "6442450944b"))
//// .config("spark.locality.wait", env.getProperty("spark.locality.wait", "600000"))
//// .config("spark.maxRemoteBlockSizeFetchToMem", env.getProperty("spark.maxRemoteBlockSizeFetchToMem", "2000m"))
//// .config("spark.shuffle.detectCorrupt", env.getProperty("spark.shuffle.detectCorrupt", "false"))
//// .config("spark.shuffle.service.enabled", env.getProperty("spark.shuffle.service.enabled", "true"))
//// .config("spark.sql.adaptive.enabled", env.getProperty("spark.sql.adaptive.enabled", "true"))
//// .config("spark.sql.adaptive.shuffle.targetPostShuffleInputSize", env.getProperty("spark.sql.adaptive.shuffle.targetPostShuffleInputSize", "200M"))
//// .config("spark.sql.broadcastTimeout", env.getProperty("spark.sql.broadcastTimeout", "12000"))
//// .config("spark.sql.retainGroupColumns", env.getProperty("spark.sql.retainGroupColumns", "false"))
//// .config("spark.sql.sortMergeJoinExec.buffer.in.memory.threshold", env.getProperty("spark.sql.sortMergeJoinExec.buffer.in.memory.threshold", "100000"))
//// .config("spark.sql.sortMergeJoinExec.buffer.spill.threshold", env.getProperty("spark.sql.sortMergeJoinExec.buffer.spill.threshold", "100000"))
//// .config("spark.sql.variable.substitute", env.getProperty("spark.sql.variable.substitute", "false"))
//// .config("spark.temp.expired.time", env.getProperty("spark.temp.expired.time", "3600"))
// .getOrCreate();
// return spark;
// }
@Bean @Bean
@ConditionalOnMissingBean @ConditionalOnMissingBean

View File

@ -8,6 +8,7 @@ import io.dataease.datasource.request.DatasourceRequest;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
public abstract class DatasourceProvider { public abstract class DatasourceProvider {
@ -15,8 +16,6 @@ public abstract class DatasourceProvider {
abstract public List<String[]> getData(DatasourceRequest datasourceRequest) throws Exception; abstract public List<String[]> getData(DatasourceRequest datasourceRequest) throws Exception;
abstract public ResultSet getDataResultSet(DatasourceRequest datasourceRequest) throws Exception;
abstract public List<String> getTables(DatasourceRequest datasourceRequest) throws Exception; abstract public List<String> getTables(DatasourceRequest datasourceRequest) throws Exception;
public List<TableFiled> getTableFileds(DatasourceRequest datasourceRequest) throws Exception { public List<TableFiled> getTableFileds(DatasourceRequest datasourceRequest) throws Exception {
@ -27,13 +26,11 @@ public abstract class DatasourceProvider {
getData(datasourceRequest); getData(datasourceRequest);
} }
abstract public Long count(DatasourceRequest datasourceRequest) throws Exception; abstract public List<String[]> fetchResult(DatasourceRequest datasourceRequest) throws Exception;
abstract public List<String[]> getPageData(DatasourceRequest datasourceRequest) throws Exception; abstract public List<TableFiled> fetchResultField(DatasourceRequest datasourceRequest) throws Exception;
abstract public List<String[]> fetchResult(ResultSet rs) throws Exception; abstract public Map<String, List> fetchResultAndField(DatasourceRequest datasourceRequest) throws Exception;
abstract public List<TableFiled> fetchResultField(ResultSet rs) throws Exception; abstract public void initDataSource(DatasourceRequest datasourceRequest) throws Exception;
abstract public void initConnectionPool(DatasourceRequest datasourceRequest) throws Exception;
} }

View File

@ -1,26 +1,25 @@
package io.dataease.datasource.provider; package io.dataease.datasource.provider;
import com.google.gson.Gson; import com.google.gson.Gson;
import io.dataease.base.domain.DatasetTableField; import com.mchange.v2.c3p0.ComboPooledDataSource;
import io.dataease.datasource.constants.DatasourceTypes; import io.dataease.datasource.constants.DatasourceTypes;
import io.dataease.datasource.dto.MysqlConfigrationDTO; import io.dataease.datasource.dto.MysqlConfigrationDTO;
import io.dataease.datasource.dto.SqlServerConfigration; import io.dataease.datasource.dto.SqlServerConfigration;
import io.dataease.datasource.dto.TableFiled; import io.dataease.datasource.dto.TableFiled;
import io.dataease.datasource.request.DatasourceRequest; import io.dataease.datasource.request.DatasourceRequest;
import org.apache.arrow.util.VisibleForTesting;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.beans.PropertyVetoException;
import java.sql.*; import java.sql.*;
import java.text.MessageFormat;
import java.util.*; import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
@Service("jdbc") @Service("jdbc")
public class JdbcProvider extends DatasourceProvider { public class JdbcProvider extends DatasourceProvider {
private static Map<String, ArrayBlockingQueue<Connection>> jdbcConnection = new HashMap<>(); private static Map<String, ComboPooledDataSource> jdbcConnection = new HashMap<>();
private static int poolSize = 20; private static int initPoolSize = 5;
private static int maxConnections = 200;
@Override @Override
public List<String[]> getData(DatasourceRequest datasourceRequest) throws Exception { public List<String[]> getData(DatasourceRequest datasourceRequest) throws Exception {
List<String[]> list = new LinkedList<>(); List<String[]> list = new LinkedList<>();
@ -35,67 +34,46 @@ public class JdbcProvider extends DatasourceProvider {
} catch (Exception e) { } catch (Exception e) {
throw new Exception("ERROR:" + e.getMessage(), e); throw new Exception("ERROR:" + e.getMessage(), e);
}finally { }finally {
returnSource(connection, datasourceRequest.getDatasource().getId()); connection.close();
} }
return list; return list;
} }
@VisibleForTesting
public void exec(DatasourceRequest datasourceRequest) throws Exception { public void exec(DatasourceRequest datasourceRequest) throws Exception {
Connection connection = null; Connection connection = null;
try { try {
connection = getConnectionFromPool(datasourceRequest); connection = getConnectionFromPool(datasourceRequest);
Statement stat = connection.createStatement(); Statement stat = connection.createStatement();
stat.execute(datasourceRequest.getQuery()); Boolean result = stat.execute(datasourceRequest.getQuery());
stat.close();
} catch (SQLException e) { } catch (SQLException e) {
throw new Exception("ERROR:" + e.getMessage(), e); throw new Exception("ERROR:" + e.getMessage(), e);
} catch (Exception e) { } catch (Exception e) {
throw new Exception("ERROR:" + e.getMessage(), e); throw new Exception("ERROR:" + e.getMessage(), e);
}finally { }finally {
returnSource(connection, datasourceRequest.getDatasource().getId()); connection.close();
} }
} }
@Override @Override
public ResultSet getDataResultSet(DatasourceRequest datasourceRequest) throws Exception { public List<String[]> fetchResult(DatasourceRequest datasourceRequest) throws Exception {
ResultSet rs; ResultSet rs;
Connection connection = null; Connection connection = null;
try { try {
connection = getConnectionFromPool(datasourceRequest); connection = getConnectionFromPool(datasourceRequest);
Statement stat = connection.createStatement(); Statement stat = connection.createStatement();
rs = stat.executeQuery(datasourceRequest.getQuery()); rs = stat.executeQuery(datasourceRequest.getQuery());
return fetchResult(rs);
} catch (SQLException e) { } catch (SQLException e) {
throw new Exception("ERROR:" + e.getMessage(), e); throw new Exception("ERROR:" + e.getMessage(), e);
} catch (Exception e) { } catch (Exception e) {
throw new Exception("ERROR:" + e.getMessage(), e); throw new Exception("ERROR:" + e.getMessage(), e);
}finally { }finally {
returnSource(connection, datasourceRequest.getDatasource().getId()); connection.close();
} }
return rs;
} }
@Override private List<String[]> fetchResult(ResultSet rs) throws Exception {
public List<String[]> getPageData(DatasourceRequest datasourceRequest) throws Exception {
List<String[]> list = new LinkedList<>();
Connection connection = null;
try {
connection = getConnectionFromPool(datasourceRequest);
Statement stat = connection.createStatement();
ResultSet rs = stat.executeQuery(datasourceRequest.getQuery() + MessageFormat.format(" LIMIT {0}, {1}", (datasourceRequest.getStartPage() - 1) * datasourceRequest.getPageSize(), datasourceRequest.getPageSize()));
list = fetchResult(rs);
} catch (SQLException e) {
throw new Exception("ERROR:" + e.getMessage(), e);
} catch (Exception e) {
throw new Exception("ERROR:" + e.getMessage(), e);
}finally {
returnSource(connection, datasourceRequest.getDatasource().getId());
}
return list;
}
@Override
public List<String[]> fetchResult(ResultSet rs) throws Exception {
List<String[]> list = new LinkedList<>(); List<String[]> list = new LinkedList<>();
ResultSetMetaData metaData = rs.getMetaData(); ResultSetMetaData metaData = rs.getMetaData();
int columnCount = metaData.getColumnCount(); int columnCount = metaData.getColumnCount();
@ -104,7 +82,7 @@ public class JdbcProvider extends DatasourceProvider {
for (int j = 0; j < columnCount; j++) { for (int j = 0; j < columnCount; j++) {
int columType = metaData.getColumnType(j + 1); int columType = metaData.getColumnType(j + 1);
switch (columType) { switch (columType) {
case java.sql.Types.DATE: case Types.DATE:
row[j] = rs.getDate(j + 1).toString(); row[j] = rs.getDate(j + 1).toString();
break; break;
default: default:
@ -118,7 +96,49 @@ public class JdbcProvider extends DatasourceProvider {
} }
@Override @Override
public List<TableFiled> fetchResultField(ResultSet rs) throws Exception { public List<TableFiled> fetchResultField(DatasourceRequest datasourceRequest) throws Exception {
ResultSet rs;
Connection connection = null;
try {
connection = getConnectionFromPool(datasourceRequest);
Statement stat = connection.createStatement();
rs = stat.executeQuery(datasourceRequest.getQuery());
return fetchResultField(rs);
} catch (SQLException e) {
throw new Exception("ERROR:" + e.getMessage(), e);
} catch (Exception e) {
throw new Exception("ERROR:" + e.getMessage(), e);
}finally {
connection.close();
}
}
@Override
public Map<String, List> fetchResultAndField(DatasourceRequest datasourceRequest) throws Exception {
ResultSet rs;
Map<String, List> result = new HashMap<>();
Connection connection = null;
List<String[]> dataList = new LinkedList<>();
List<TableFiled> fieldList = new ArrayList<>();
try {
connection = getConnectionFromPool(datasourceRequest);
Statement stat = connection.createStatement();
rs = stat.executeQuery(datasourceRequest.getQuery());
dataList = fetchResult(rs);
fieldList = fetchResultField(rs);
result.put("dataList", dataList);
result.put("fieldList", fieldList);
return result;
} catch (SQLException e) {
throw new Exception("ERROR:" + e.getMessage(), e);
} catch (Exception e) {
throw new Exception("ERROR:" + e.getMessage(), e);
}finally {
connection.close();
}
}
private List<TableFiled> fetchResultField(ResultSet rs) throws Exception {
List<TableFiled> fieldList = new ArrayList<>(); List<TableFiled> fieldList = new ArrayList<>();
ResultSetMetaData metaData = rs.getMetaData(); ResultSetMetaData metaData = rs.getMetaData();
int columnCount = metaData.getColumnCount(); int columnCount = metaData.getColumnCount();
@ -130,6 +150,7 @@ public class JdbcProvider extends DatasourceProvider {
field.setFieldName(l); field.setFieldName(l);
field.setRemarks(l); field.setRemarks(l);
field.setFieldType(t); field.setFieldType(t);
field.setFieldSize(metaData.getColumnDisplaySize(j + 1));
fieldList.add(field); fieldList.add(field);
} }
return fieldList; return fieldList;
@ -142,16 +163,18 @@ public class JdbcProvider extends DatasourceProvider {
Connection con = null; Connection con = null;
try { try {
con = getConnectionFromPool(datasourceRequest); con = getConnectionFromPool(datasourceRequest);
Statement ps = con.createStatement(); Statement statement = con.createStatement();
ResultSet resultSet = ps.executeQuery(queryStr); ResultSet resultSet = statement.executeQuery(queryStr);
while (resultSet.next()) { while (resultSet.next()) {
tables.add(resultSet.getString(1)); tables.add(resultSet.getString(1));
} }
resultSet.close();
statement.close();
return tables; return tables;
} catch (Exception e) { } catch (Exception e) {
throw new Exception("ERROR: " + e.getMessage(), e); throw new Exception("ERROR: " + e.getMessage(), e);
}finally { }finally {
returnSource(con, datasourceRequest.getDatasource().getId()); con.close();
} }
} }
@ -175,17 +198,19 @@ public class JdbcProvider extends DatasourceProvider {
remarks = colName; remarks = colName;
} }
tableFiled.setRemarks(remarks); tableFiled.setRemarks(remarks);
tableFiled.setFieldSize(Integer.valueOf(resultSet.getString("COLUMN_SIZE")));
String dbType = resultSet.getString("TYPE_NAME"); String dbType = resultSet.getString("TYPE_NAME");
tableFiled.setFieldType(dbType); tableFiled.setFieldType(dbType);
list.add(tableFiled); list.add(tableFiled);
} }
} }
resultSet.close();
} catch (SQLException e) { } catch (SQLException e) {
throw new Exception("ERROR:" + e.getMessage(), e); throw new Exception("ERROR:" + e.getMessage(), e);
} catch (Exception e) { } catch (Exception e) {
throw new Exception("ERROR:" + e.getMessage(), e); throw new Exception("ERROR:" + e.getMessage(), e);
}finally { }finally {
returnSource(connection, datasourceRequest.getDatasource().getId()); connection.close();
} }
return list; return list;
} }
@ -198,6 +223,8 @@ public class JdbcProvider extends DatasourceProvider {
con = getConnection(datasourceRequest); con = getConnection(datasourceRequest);
Statement ps = con.createStatement(); Statement ps = con.createStatement();
ResultSet resultSet = ps.executeQuery(queryStr); ResultSet resultSet = ps.executeQuery(queryStr);
resultSet.close();
ps.close();
} catch (Exception e) { } catch (Exception e) {
throw new Exception("ERROR: " + e.getMessage(), e); throw new Exception("ERROR: " + e.getMessage(), e);
}finally { }finally {
@ -217,45 +244,43 @@ public class JdbcProvider extends DatasourceProvider {
} catch (Exception e) { } catch (Exception e) {
throw new Exception("ERROR: " + e.getMessage(), e); throw new Exception("ERROR: " + e.getMessage(), e);
}finally { }finally {
returnSource(con, datasourceRequest.getDatasource().getId()); con.close();
} }
return 0L; return 0L;
} }
private void returnSource(Connection connection, String dataSourceId) throws Exception{
if(connection != null && !connection.isClosed()){
ArrayBlockingQueue<Connection> connections = jdbcConnection.get(dataSourceId);
connections.put(connection);
}
}
private Connection getConnectionFromPool(DatasourceRequest datasourceRequest)throws Exception { private Connection getConnectionFromPool(DatasourceRequest datasourceRequest)throws Exception {
ArrayBlockingQueue<Connection> connections = jdbcConnection.get(datasourceRequest.getDatasource().getId()); ComboPooledDataSource dataSource = jdbcConnection.get(datasourceRequest.getDatasource().getId());
if (connections == null) { if (dataSource == null) {
initConnectionPool(datasourceRequest); initDataSource(datasourceRequest);
} }
connections = jdbcConnection.get(datasourceRequest.getDatasource().getId()); dataSource = jdbcConnection.get(datasourceRequest.getDatasource().getId());
Connection co = connections.take(); Connection co = dataSource.getConnection();
return co; return co;
} }
@Override @Override
public void initConnectionPool(DatasourceRequest datasourceRequest)throws Exception{ public void initDataSource(DatasourceRequest datasourceRequest)throws Exception{
ArrayBlockingQueue<Connection> connections = jdbcConnection.get(datasourceRequest.getDatasource().getId()); ComboPooledDataSource dataSource = jdbcConnection.get(datasourceRequest.getDatasource().getId());
if (connections == null) { if (dataSource == null) {
connections = new ArrayBlockingQueue<>(poolSize); dataSource = new ComboPooledDataSource();
for (int i = 0; i < poolSize ; i++) { setCredential(datasourceRequest, dataSource);
Connection connection = getConnection(datasourceRequest); dataSource.setMaxIdleTime(30); // 最大空闲时间
connections.add(connection); dataSource.setAcquireIncrement(5);// 增长数
} dataSource.setInitialPoolSize(initPoolSize);// 初始连接数
jdbcConnection.put(datasourceRequest.getDatasource().getId(), connections); dataSource.setMinPoolSize(initPoolSize); // 最小连接数
}else { dataSource.setMaxPoolSize(maxConnections); // 最大连接数
for (int i = 0; i < poolSize ; i++) { dataSource.setAcquireRetryAttempts(30);// 获取连接重试次数
Connection connection = connections.take(); dataSource.setIdleConnectionTestPeriod(60); // 每60s检查数据库空闲连接
connection.close(); dataSource.setMaxStatements(0); // c3p0全局的PreparedStatements缓存的大小
connection = getConnection(datasourceRequest); dataSource.setBreakAfterAcquireFailure(false); // 获取连接失败将会引起所有等待连接池来获取连接的线程抛出异常但是数据源仍有效保留并在下次调用getConnection()的时候继续尝试获取连接如果设为true那么在尝试获取连接失败后该数据源将申明已断开并永久关闭Default: false
connections.add(connection); dataSource.setTestConnectionOnCheckout(false); // 在每个connection 提交是校验有效性
} dataSource.setTestConnectionOnCheckin(true); // 取得连接的同时将校验连接的有效性
dataSource.setCheckoutTimeout(60000); // 从连接池获取连接的超时时间如设为0则无限期等待单位毫秒默认为0
dataSource.setPreferredTestQuery("SELECT 1");
dataSource.setDebugUnreturnedConnectionStackTraces(true);
dataSource.setUnreturnedConnectionTimeout(3600);
jdbcConnection.put(datasourceRequest.getDatasource().getId(), dataSource);
} }
} }
@ -293,6 +318,29 @@ public class JdbcProvider extends DatasourceProvider {
return DriverManager.getConnection(jdbcurl, props); return DriverManager.getConnection(jdbcurl, props);
} }
private void setCredential(DatasourceRequest datasourceRequest, ComboPooledDataSource dataSource) throws PropertyVetoException {
DatasourceTypes datasourceType = DatasourceTypes.valueOf(datasourceRequest.getDatasource().getType());
switch (datasourceType) {
case mysql:
MysqlConfigrationDTO mysqlConfigrationDTO = new Gson().fromJson(datasourceRequest.getDatasource().getConfiguration(), MysqlConfigrationDTO.class);
dataSource.setUser(mysqlConfigrationDTO.getUsername());
dataSource.setDriverClass(mysqlConfigrationDTO.getDriver());
dataSource.setPassword(mysqlConfigrationDTO.getPassword());
dataSource.setJdbcUrl(mysqlConfigrationDTO.getJdbc());
break;
case sqlServer:
SqlServerConfigration sqlServerConfigration = new Gson().fromJson(datasourceRequest.getDatasource().getConfiguration(), SqlServerConfigration.class);
dataSource.setUser(sqlServerConfigration.getUsername());
dataSource.setDriverClass(sqlServerConfigration.getDriver());
dataSource.setPassword(sqlServerConfigration.getPassword());
dataSource.setJdbcUrl(sqlServerConfigration.getJdbc());
break;
default:
break;
}
}
private String getDatabase(DatasourceRequest datasourceRequest) { private String getDatabase(DatasourceRequest datasourceRequest) {
DatasourceTypes datasourceType = DatasourceTypes.valueOf(datasourceRequest.getDatasource().getType()); DatasourceTypes datasourceType = DatasourceTypes.valueOf(datasourceRequest.getDatasource().getType());
switch (datasourceType) { switch (datasourceType) {

View File

@ -41,7 +41,6 @@ public class DatasourceService {
datasource.setUpdateTime(currentTimeMillis); datasource.setUpdateTime(currentTimeMillis);
datasource.setCreateTime(currentTimeMillis); datasource.setCreateTime(currentTimeMillis);
datasourceMapper.insertSelective(datasource); datasourceMapper.insertSelective(datasource);
initConnectionPool(datasource);
return datasource; return datasource;
} }
@ -71,7 +70,6 @@ public class DatasourceService {
datasource.setCreateTime(null); datasource.setCreateTime(null);
datasource.setUpdateTime(System.currentTimeMillis()); datasource.setUpdateTime(System.currentTimeMillis());
datasourceMapper.updateByPrimaryKeySelective(datasource); datasourceMapper.updateByPrimaryKeySelective(datasource);
initConnectionPool(datasource);
} }
public void validate(Datasource datasource) throws Exception { public void validate(Datasource datasource) throws Exception {
@ -92,31 +90,4 @@ public class DatasourceService {
public Datasource get(String id) { public Datasource get(String id) {
return datasourceMapper.selectByPrimaryKey(id); return datasourceMapper.selectByPrimaryKey(id);
} }
private void initConnectionPool(Datasource datasource){
commonThreadPool.addTask(() ->{
try {
DatasourceProvider datasourceProvider = ProviderFactory.getProvider(datasource.getType());
DatasourceRequest datasourceRequest = new DatasourceRequest();
datasourceRequest.setDatasource(datasource);
datasourceProvider.initConnectionPool(datasourceRequest);
}catch (Exception e){}
});
}
public void initAllDataSourceConnectionPool(){
List<Datasource> datasources = datasourceMapper.selectByExampleWithBLOBs(new DatasourceExample());
datasources.forEach(datasource -> {
commonThreadPool.addTask(() ->{
try {
DatasourceProvider datasourceProvider = ProviderFactory.getProvider(datasource.getType());
DatasourceRequest datasourceRequest = new DatasourceRequest();
datasourceRequest.setDatasource(datasource);
datasourceProvider.initConnectionPool(datasourceRequest);
}catch (Exception e){
e.printStackTrace();
}
});
});
}
} }

View File

@ -1,22 +0,0 @@
package io.dataease.listener;
import io.dataease.datasource.service.DatasourceService;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
@Order(value = 2)
public class AppStartInitDataSourceListener implements ApplicationListener<ApplicationReadyEvent> {
@Resource
private DatasourceService datasourceService;
@Override
public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
System.out.println("================= Init datasource connection pool =================");
// 项目启动从数据集中找到定时抽取的表从HBase中读取放入缓存
datasourceService.initAllDataSourceConnectionPool();
}
}

View File

@ -240,9 +240,9 @@ public class DataSetTableService {
datasourceRequest.setDatasource(ds); datasourceRequest.setDatasource(ds);
String sql = new Gson().fromJson(dataSetTableRequest.getInfo(), DataTableInfoDTO.class).getSql(); String sql = new Gson().fromJson(dataSetTableRequest.getInfo(), DataTableInfoDTO.class).getSql();
datasourceRequest.setQuery(sql); datasourceRequest.setQuery(sql);
ResultSet dataResultSet = datasourceProvider.getDataResultSet(datasourceRequest); Map<String, List> result = datasourceProvider.fetchResultAndField(datasourceRequest);
List<String[]> data = datasourceProvider.fetchResult(dataResultSet); List<String[]> data = result.get("dataList");
List<TableFiled> fields = datasourceProvider.fetchResultField(dataResultSet); List<TableFiled> fields = result.get("fieldList");
String[] fieldArray = fields.stream().map(TableFiled::getFieldName).toArray(String[]::new); String[] fieldArray = fields.stream().map(TableFiled::getFieldName).toArray(String[]::new);
List<Map<String, Object>> jsonArray = new ArrayList<>(); List<Map<String, Object>> jsonArray = new ArrayList<>();
@ -263,67 +263,6 @@ public class DataSetTableService {
return map; return map;
} }
public List<String[]> getDataSetData(String datasourceId, String table, List<DatasetTableField> fields) {
List<String[]> data = new ArrayList<>();
Datasource ds = datasourceMapper.selectByPrimaryKey(datasourceId);
DatasourceProvider datasourceProvider = ProviderFactory.getProvider(ds.getType());
DatasourceRequest datasourceRequest = new DatasourceRequest();
datasourceRequest.setDatasource(ds);
String[] fieldArray = fields.stream().map(DatasetTableField::getOriginName).toArray(String[]::new);
datasourceRequest.setQuery(createQuerySQL(ds.getType(), table, fieldArray) + " LIMIT 0, 10");
try {
data.addAll(datasourceProvider.getData(datasourceRequest));
} catch (Exception e) {
}
return data;
}
public Long getDataSetTotalData(String datasourceId, String table) {
List<String[]> data = new ArrayList<>();
Datasource ds = datasourceMapper.selectByPrimaryKey(datasourceId);
DatasourceProvider datasourceProvider = ProviderFactory.getProvider(ds.getType());
DatasourceRequest datasourceRequest = new DatasourceRequest();
datasourceRequest.setDatasource(ds);
datasourceRequest.setQuery("select count(*) from " + table);
try {
return datasourceProvider.count(datasourceRequest);
} catch (Exception e) {
}
return 0l;
}
public List<String[]> getDataSetPageData(String datasourceId, String table, List<DatasetTableField> fields, Long startPage, Long pageSize) {
List<String[]> data = new ArrayList<>();
Datasource ds = datasourceMapper.selectByPrimaryKey(datasourceId);
DatasourceProvider datasourceProvider = ProviderFactory.getProvider(ds.getType());
DatasourceRequest datasourceRequest = new DatasourceRequest();
datasourceRequest.setDatasource(ds);
String[] fieldArray = fields.stream().map(DatasetTableField::getOriginName).toArray(String[]::new);
datasourceRequest.setPageSize(pageSize);
datasourceRequest.setStartPage(startPage);
datasourceRequest.setQuery(createQuerySQL(ds.getType(), table, fieldArray));
try {
return datasourceProvider.getData(datasourceRequest);
} catch (Exception e) {
}
return data;
}
public List<String[]> getDataSetDataBySql(String datasourceId, String table, String sql) {
List<String[]> data = new ArrayList<>();
Datasource ds = datasourceMapper.selectByPrimaryKey(datasourceId);
DatasourceProvider datasourceProvider = ProviderFactory.getProvider(ds.getType());
DatasourceRequest datasourceRequest = new DatasourceRequest();
datasourceRequest.setDatasource(ds);
datasourceRequest.setQuery(sql);
try {
return datasourceProvider.getData(datasourceRequest);
} catch (Exception e) {
}
return data;
}
public void saveTableField(DatasetTable datasetTable) throws Exception { public void saveTableField(DatasetTable datasetTable) throws Exception {
Datasource ds = datasourceMapper.selectByPrimaryKey(datasetTable.getDataSourceId()); Datasource ds = datasourceMapper.selectByPrimaryKey(datasetTable.getDataSourceId());
DataSetTableRequest dataSetTableRequest = new DataSetTableRequest(); DataSetTableRequest dataSetTableRequest = new DataSetTableRequest();
@ -338,8 +277,7 @@ public class DataSetTableService {
DatasourceRequest datasourceRequest = new DatasourceRequest(); DatasourceRequest datasourceRequest = new DatasourceRequest();
datasourceRequest.setDatasource(ds); datasourceRequest.setDatasource(ds);
datasourceRequest.setQuery(new Gson().fromJson(dataSetTableRequest.getInfo(), DataTableInfoDTO.class).getSql()); datasourceRequest.setQuery(new Gson().fromJson(dataSetTableRequest.getInfo(), DataTableInfoDTO.class).getSql());
ResultSet dataResultSet = datasourceProvider.getDataResultSet(datasourceRequest); fields = datasourceProvider.fetchResultField(datasourceRequest);
fields = datasourceProvider.fetchResultField(dataResultSet);
} else if (StringUtils.equalsIgnoreCase(datasetTable.getType(), "excel")) { } else if (StringUtils.equalsIgnoreCase(datasetTable.getType(), "excel")) {
DataTableInfoDTO dataTableInfoDTO = new Gson().fromJson(dataSetTableRequest.getInfo(), DataTableInfoDTO.class); DataTableInfoDTO dataTableInfoDTO = new Gson().fromJson(dataSetTableRequest.getInfo(), DataTableInfoDTO.class);
String path = dataTableInfoDTO.getData(); String path = dataTableInfoDTO.getData();
@ -367,6 +305,7 @@ public class DataSetTableService {
} else { } else {
datasetTableField.setDeType(transFieldType(ds.getType(), filed.getFieldType())); datasetTableField.setDeType(transFieldType(ds.getType(), filed.getFieldType()));
} }
datasetTableField.setSize(filed.getFieldSize());
datasetTableField.setChecked(true); datasetTableField.setChecked(true);
datasetTableField.setColumnIndex(i); datasetTableField.setColumnIndex(i);
datasetTableField.setLastSyncTime(syncTime); datasetTableField.setLastSyncTime(syncTime);

View File

@ -7,20 +7,20 @@ import io.dataease.commons.constants.JobStatus;
import io.dataease.commons.constants.ScheduleType; import io.dataease.commons.constants.ScheduleType;
import io.dataease.commons.constants.UpdateType; import io.dataease.commons.constants.UpdateType;
import io.dataease.commons.utils.CommonBeanFactory; import io.dataease.commons.utils.CommonBeanFactory;
import io.dataease.commons.utils.DorisTableUtils;
import io.dataease.commons.utils.LogUtil; import io.dataease.commons.utils.LogUtil;
import io.dataease.datasource.constants.DatasourceTypes; import io.dataease.datasource.constants.DatasourceTypes;
import io.dataease.datasource.dto.MysqlConfigrationDTO; import io.dataease.datasource.dto.MysqlConfigrationDTO;
import io.dataease.datasource.provider.JdbcProvider;
import io.dataease.datasource.request.DatasourceRequest;
import io.dataease.dto.dataset.DataSetTaskLogDTO; import io.dataease.dto.dataset.DataSetTaskLogDTO;
import io.dataease.dto.dataset.DataTableInfoDTO; import io.dataease.dto.dataset.DataTableInfoDTO;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.pentaho.di.cluster.SlaveServer; import org.pentaho.di.cluster.SlaveServer;
import org.pentaho.di.core.database.DatabaseMeta; import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.plugins.PluginRegistry; import org.pentaho.di.core.row.ValueMetaInterface;
import org.pentaho.di.core.plugins.StepPluginType;
import org.pentaho.di.job.Job; import org.pentaho.di.job.Job;
import org.pentaho.di.job.JobExecutionConfiguration; import org.pentaho.di.job.JobExecutionConfiguration;
import org.pentaho.di.job.JobHopMeta; import org.pentaho.di.job.JobHopMeta;
@ -34,18 +34,22 @@ import org.pentaho.di.repository.filerep.KettleFileRepository;
import org.pentaho.di.trans.TransHopMeta; import org.pentaho.di.trans.TransHopMeta;
import org.pentaho.di.trans.TransMeta; import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.StepMeta; import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.steps.sql.ExecSQLMeta;
import org.pentaho.di.trans.steps.tableinput.TableInputMeta; import org.pentaho.di.trans.steps.tableinput.TableInputMeta;
import org.pentaho.di.trans.steps.textfileoutput.TextFileField; import org.pentaho.di.trans.steps.tableoutput.TableOutputMeta;
import org.pentaho.di.trans.steps.textfileoutput.TextFileOutputMeta; import org.pentaho.di.trans.steps.userdefinedjavaclass.UserDefinedJavaClassDef;
import org.pentaho.di.trans.steps.userdefinedjavaclass.UserDefinedJavaClassMeta;
import org.pentaho.di.www.SlaveServerJobStatus; import org.pentaho.di.www.SlaveServerJobStatus;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.io.File; import java.io.File;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.stream.Collectors;
@Service @Service
public class ExtractDataService { public class ExtractDataService {
@ -61,7 +65,6 @@ public class ExtractDataService {
@Resource @Resource
private DatasourceMapper datasourceMapper; private DatasourceMapper datasourceMapper;
private static ExecutorService pool = Executors.newScheduledThreadPool(50); //设置连接池 private static ExecutorService pool = Executors.newScheduledThreadPool(50); //设置连接池
private Connection connection;
private static String lastUpdateTime = "${__last_update_time__}"; private static String lastUpdateTime = "${__last_update_time__}";
private static String currentUpdateTime = "${__current_update_time__}"; private static String currentUpdateTime = "${__current_update_time__}";
@ -79,61 +82,83 @@ public class ExtractDataService {
private String user; private String user;
@Value("${carte.passwd:cluster}") @Value("${carte.passwd:cluster}")
private String passwd; private String passwd;
@Value("${hbase.zookeeper.quorum:zookeeper}")
private String zkHost;
@Value("${hbase.zookeeper.property.clientPort:2181}")
private String zkPort;
// @Resource private static String creatTableSql = "CREATE TABLE IF NOT EXISTS TABLE_NAME" +
// private SparkCalc sparkCalc; "Column_Fields" +
"PROPERTIES(\"replication_num\" = \"1\");";
private String createDorisTablColumnSql( List<DatasetTableField> datasetTableFields){
String Column_Fields = "dataease_uuid varchar(50),";
for (DatasetTableField datasetTableField : datasetTableFields) {
Column_Fields = Column_Fields + datasetTableField.getOriginName() + " ";
switch (datasetTableField.getDeType()){
case 0:
Column_Fields = Column_Fields + "varchar(lenth)".replace("lenth", String.valueOf(datasetTableField.getSize())) + ",";
break;
case 1:
Column_Fields = Column_Fields + "varchar(lenth)".replace("lenth", String.valueOf(datasetTableField.getSize())) + ",";
break;
case 2:
Column_Fields = Column_Fields + "bigint(lenth)".replace("lenth", String.valueOf(datasetTableField.getSize())) + ",";
break;
case 3:
Column_Fields = Column_Fields + "DOUBLE(lenth)".replace("lenth", String.valueOf(datasetTableField.getSize())) + ",";
break;
default:
Column_Fields = Column_Fields + "varchar(lenth)".replace("lenth", String.valueOf(datasetTableField.getSize())) + ",";
break;
}
}
Column_Fields = Column_Fields.substring(0, Column_Fields.length() -1 );
Column_Fields = "(" + Column_Fields + ")" + "DISTRIBUTED BY HASH(dataease_uuid) BUCKETS 10\n";
return Column_Fields;
}
private void createDorisTable(String dorisTableName, String dorisTablColumnSql) throws Exception{
Datasource dorisDatasource = (Datasource)CommonBeanFactory.getBean("DorisDatasource");
JdbcProvider jdbcProvider = CommonBeanFactory.getBean(JdbcProvider.class);;
DatasourceRequest datasourceRequest = new DatasourceRequest();
datasourceRequest.setDatasource(dorisDatasource);
datasourceRequest.setQuery(creatTableSql.replace("TABLE_NAME", dorisTableName).replace("Column_Fields", dorisTablColumnSql));
jdbcProvider.exec(datasourceRequest);
}
private void replaceTable (String dorisTableName) throws Exception{
Datasource dorisDatasource = (Datasource)CommonBeanFactory.getBean("DorisDatasource");
JdbcProvider jdbcProvider = CommonBeanFactory.getBean(JdbcProvider.class);;
DatasourceRequest datasourceRequest = new DatasourceRequest();
datasourceRequest.setDatasource(dorisDatasource);
datasourceRequest.setQuery("ALTER TABLE DORIS_TABLE REPLACE WITH TABLE DORIS_TMP_TABLE PROPERTIES('swap' = 'false');".replace("DORIS_TABLE", dorisTableName).replace("DORIS_TMP_TABLE", DorisTableUtils.doristmpName(dorisTableName)));
jdbcProvider.exec(datasourceRequest);
}
public void extractData(String datasetTableId, String taskId, String type) { public void extractData(String datasetTableId, String taskId, String type) {
DatasetTableTaskLog datasetTableTaskLog = new DatasetTableTaskLog(); DatasetTableTaskLog datasetTableTaskLog = new DatasetTableTaskLog();
UpdateType updateType = UpdateType.valueOf(type); UpdateType updateType = UpdateType.valueOf(type);
try { try {
// Admin admin = getConnection().getAdmin();
DatasetTable datasetTable = dataSetTableService.get(datasetTableId); DatasetTable datasetTable = dataSetTableService.get(datasetTableId);
Datasource datasource = datasourceMapper.selectByPrimaryKey(datasetTable.getDataSourceId()); Datasource datasource = datasourceMapper.selectByPrimaryKey(datasetTable.getDataSourceId());
List<DatasetTableField> datasetTableFields = dataSetTableFieldsService.list(DatasetTableField.builder().tableId(datasetTable.getId()).build()); List<DatasetTableField> datasetTableFields = dataSetTableFieldsService.list(DatasetTableField.builder().tableId(datasetTable.getId()).build());
String table = new Gson().fromJson(datasetTable.getInfo(), DataTableInfoDTO.class).getTable(); String tableName = new Gson().fromJson(datasetTable.getInfo(), DataTableInfoDTO.class).getTable();
TableName hbaseTable = TableName.valueOf(datasetTableId); String dorisTablColumnSql = createDorisTablColumnSql(datasetTableFields);
switch (updateType) { switch (updateType) {
// 全量更新 // 全量更新
case all_scope: case all_scope:
writeDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, taskId); writeDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, taskId);
// TODO before: check doris table column type
//check pentaho_mappings table createDorisTable(DorisTableUtils.dorisName(datasetTableId), dorisTablColumnSql);
// TableName pentaho_mappings = TableName.valueOf(this.pentaho_mappings); createDorisTable(DorisTableUtils.doristmpName(DorisTableUtils.dorisName(datasetTableId)), dorisTablColumnSql);
// if (!admin.tableExists(pentaho_mappings)) { generateTransFile("all_scope", datasetTable, datasource, tableName, datasetTableFields, null);
// creatHaseTable(pentaho_mappings, admin, Arrays.asList("columns", "key")); generateJobFile("all_scope", datasetTable);
// }
//check pentaho files
if (!isExitFile("job_" + datasetTableId + ".kjb") || !isExitFile("trans_" + datasetTableId + ".ktr")) {
generateTransFile("all_scope", datasetTable, datasource, table, datasetTableFields, null);
generateJobFile("all_scope", datasetTable);
}
// if (!admin.tableExists(hbaseTable)) {
// creatHaseTable(hbaseTable, admin, Arrays.asList(dataease_column_family));
// }
// admin.disableTable(hbaseTable);
// admin.truncateTable(hbaseTable, true);
extractData(datasetTable, "all_scope"); extractData(datasetTable, "all_scope");
// after sync complete,read data to cache from HBase replaceTable(DorisTableUtils.dorisName(datasetTableId));
// sparkCalc.getHBaseDataAndCache(datasetTableId, dataSetTableFieldsService.getFieldsByTableId(datasetTableId));
datasetTableTaskLog.setStatus(JobStatus.Completed.name()); datasetTableTaskLog.setStatus(JobStatus.Completed.name());
datasetTableTaskLog.setEndTime(System.currentTimeMillis()); datasetTableTaskLog.setEndTime(System.currentTimeMillis());
dataSetTableTaskLogService.save(datasetTableTaskLog); dataSetTableTaskLogService.save(datasetTableTaskLog);
break; break;
// 增量更新
case add_scope: case add_scope:
// 增量更新
// if (!admin.tableExists(hbaseTable)) {
// LogUtil.error("TableName error, dataaset: " + datasetTableId);
// return;
// }
DatasetTableIncrementalConfig datasetTableIncrementalConfig = dataSetTableService.incrementalConfig(datasetTableId); DatasetTableIncrementalConfig datasetTableIncrementalConfig = dataSetTableService.incrementalConfig(datasetTableId);
if (datasetTableIncrementalConfig == null || StringUtils.isEmpty(datasetTableIncrementalConfig.getTableId())) { if (datasetTableIncrementalConfig == null || StringUtils.isEmpty(datasetTableIncrementalConfig.getTableId())) {
return; return;
@ -149,15 +174,10 @@ public class ExtractDataService {
// 增量添加 // 增量添加
if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd().replace(" ", ""))) { if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd().replace(" ", ""))) {
System.out.println("datasetTableIncrementalConfig.getIncrementalAdd(): " + datasetTableIncrementalConfig.getIncrementalAdd());
String sql = datasetTableIncrementalConfig.getIncrementalAdd().replace(lastUpdateTime, dataSetTaskLogDTOS.get(0).getStartTime().toString() String sql = datasetTableIncrementalConfig.getIncrementalAdd().replace(lastUpdateTime, dataSetTaskLogDTOS.get(0).getStartTime().toString()
.replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString())); .replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString()));
generateTransFile("incremental_add", datasetTable, datasource, tableName, datasetTableFields, sql);
if (!isExitFile("job_add_" + datasetTableId + ".kjb") || !isExitFile("trans_add_" + datasetTableId + ".ktr")) { generateJobFile("incremental_add", datasetTable);
generateTransFile("incremental_add", datasetTable, datasource, table, datasetTableFields, sql);
generateJobFile("incremental_add", datasetTable);
}
extractData(datasetTable, "incremental_add"); extractData(datasetTable, "incremental_add");
} }
@ -165,14 +185,10 @@ public class ExtractDataService {
if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalDelete())) { if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalDelete())) {
String sql = datasetTableIncrementalConfig.getIncrementalDelete().replace(lastUpdateTime, dataSetTaskLogDTOS.get(0).getStartTime().toString() String sql = datasetTableIncrementalConfig.getIncrementalDelete().replace(lastUpdateTime, dataSetTaskLogDTOS.get(0).getStartTime().toString()
.replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString())); .replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString()));
if (!isExitFile("job_delete_" + datasetTableId + ".kjb") || !isExitFile("trans_delete_" + datasetTableId + ".ktr")) { generateTransFile("incremental_delete", datasetTable, datasource, tableName, datasetTableFields, sql);
generateTransFile("incremental_delete", datasetTable, datasource, table, datasetTableFields, sql); generateJobFile("incremental_delete", datasetTable);
generateJobFile("incremental_delete", datasetTable);
}
extractData(datasetTable, "incremental_delete"); extractData(datasetTable, "incremental_delete");
} }
// after sync complete,read data to cache from HBase
// sparkCalc.getHBaseDataAndCache(datasetTableId, dataSetTableFieldsService.getFieldsByTableId(datasetTableId));
datasetTableTaskLog.setStatus(JobStatus.Completed.name()); datasetTableTaskLog.setStatus(JobStatus.Completed.name());
datasetTableTaskLog.setEndTime(System.currentTimeMillis()); datasetTableTaskLog.setEndTime(System.currentTimeMillis());
dataSetTableTaskLogService.save(datasetTableTaskLog); dataSetTableTaskLogService.save(datasetTableTaskLog);
@ -202,18 +218,6 @@ public class ExtractDataService {
dataSetTableTaskLogService.save(datasetTableTaskLog); dataSetTableTaskLogService.save(datasetTableTaskLog);
} }
// private void creatHaseTable(TableName tableName, Admin admin, List<String> columnFamily) throws Exception {
// TableDescriptorBuilder descBuilder = TableDescriptorBuilder.newBuilder(tableName);
// Collection<ColumnFamilyDescriptor> families = new ArrayList<>();
// for (String s : columnFamily) {
// ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(s);
// families.add(hcd);
// }
// descBuilder.setColumnFamilies(families);
// TableDescriptor desc = descBuilder.build();
// admin.createTable(desc);
// }
private void extractData(DatasetTable datasetTable, String extractType) throws Exception { private void extractData(DatasetTable datasetTable, String extractType) throws Exception {
KettleFileRepository repository = CommonBeanFactory.getBean(KettleFileRepository.class); KettleFileRepository repository = CommonBeanFactory.getBean(KettleFileRepository.class);
RepositoryDirectoryInterface repositoryDirectoryInterface = repository.loadRepositoryDirectoryTree(); RepositoryDirectoryInterface repositoryDirectoryInterface = repository.loadRepositoryDirectoryTree();
@ -248,14 +252,6 @@ public class ExtractDataService {
} }
} }
// private synchronized Connection getConnection() throws Exception {
// if (connection == null || connection.isClosed()) {
// Configuration cfg = CommonBeanFactory.getBean(Configuration.class);
// connection = ConnectionFactory.createConnection(cfg, pool);
// }
// return connection;
// }
private boolean isExitFile(String fileName) { private boolean isExitFile(String fileName) {
File file = new File(root_path + fileName); File file = new File(root_path + fileName);
return file.exists(); return file.exists();
@ -338,33 +334,18 @@ public class ExtractDataService {
} }
private void generateTransFile(String extractType, DatasetTable datasetTable, Datasource datasource, String table, List<DatasetTableField> datasetTableFields, String selectSQL) throws Exception { private void generateTransFile(String extractType, DatasetTable datasetTable, Datasource datasource, String table, List<DatasetTableField> datasetTableFields, String selectSQL) throws Exception {
TransMeta transMeta = new TransMeta(); datasetTableFields.sort((o1, o2) -> {
String transName = null; if (o1.getOriginName() == null) {
switch (extractType) { return -1;
case "all_scope": }
transName = "trans_" + datasetTable.getId(); if (o2.getOriginName() == null) {
datasetTableFields.sort((o1, o2) -> { return 1;
if (o1.getOriginName() == null) { }
return -1; return o1.getOriginName().compareTo(o2.getOriginName());
} });
if (o2.getOriginName() == null) {
return 1;
}
return o1.getOriginName().compareTo(o2.getOriginName());
});
selectSQL = dataSetTableService.createQuerySQL(datasource.getType(), table, datasetTableFields.stream().map(DatasetTableField::getOriginName).toArray(String[]::new));
break;
case "incremental_add":
transName = "trans_add_" + datasetTable.getId();
break;
case "incremental_delete":
transName = "trans_delete_" + datasetTable.getId();
break;
default:
break;
}
transMeta.setName(transName); TransMeta transMeta = new TransMeta();
String dorisOutputTable = DorisTableUtils.doristmpName(DorisTableUtils.dorisName(datasetTable.getId()));
DatasourceTypes datasourceType = DatasourceTypes.valueOf(datasource.getType()); DatasourceTypes datasourceType = DatasourceTypes.valueOf(datasource.getType());
DatabaseMeta dataMeta = null; DatabaseMeta dataMeta = null;
switch (datasourceType) { switch (datasourceType) {
@ -377,124 +358,126 @@ public class ExtractDataService {
break; break;
} }
//registry是给每个步骤生成一个标识id Datasource dorisDatasource = (Datasource)CommonBeanFactory.getBean("DorisDatasource");
PluginRegistry registry = PluginRegistry.getInstance(); MysqlConfigrationDTO dorisConfigration = new Gson().fromJson(dorisDatasource.getConfiguration(), MysqlConfigrationDTO.class);
//第一个表输入步骤(TableInputMeta) DatabaseMeta dorisDataMeta = new DatabaseMeta("doris", "MYSQL", "Native", dorisConfigration.getHost(), dorisConfigration.getDataBase(), dorisConfigration.getPort().toString(), dorisConfigration.getUsername(), dorisConfigration.getPassword());
TableInputMeta tableInput = new TableInputMeta(); transMeta.addDatabase(dorisDataMeta);
StepMeta inputStep = null;
//给表输入添加一个DatabaseMeta连接数据库 StepMeta outputStep = null;
DatabaseMeta database_bjdt = transMeta.findDatabase("db"); StepMeta udjcStep = null;
tableInput.setDatabaseMeta(database_bjdt); TransHopMeta hi1 = null;
tableInput.setSQL(selectSQL); TransHopMeta hi2 = null;
//添加TableInputMeta到转换中 String transName = null;
String tableInputPluginId = registry.getPluginId(StepPluginType.class, tableInput); switch (extractType) {
StepMeta fromStep = new StepMeta(tableInputPluginId, "Data Input", tableInput); case "all_scope":
//给步骤添加在spoon工具中的显示位置 transName = "trans_" + datasetTable.getId();
fromStep.setDraw(true); selectSQL = dataSetTableService.createQuerySQL(datasource.getType(), table, datasetTableFields.stream().map(DatasetTableField::getOriginName).toArray(String[]::new));
fromStep.setLocation(100, 100); transMeta.setName(transName);
transMeta.addStep(fromStep); inputStep = inputStep(transMeta, selectSQL);
udjcStep = udjc(datasetTableFields);
//第二个 (TextFileOutput) outputStep = outputStep(transMeta, dorisOutputTable);
TextFileOutputMeta textFileOutputMeta = new TextFileOutputMeta(); hi1 = new TransHopMeta(inputStep, udjcStep);
textFileOutputMeta.setFilename(data_path + datasetTable.getId()); hi2 = new TransHopMeta(udjcStep, outputStep);
textFileOutputMeta.setExtension("txt"); transMeta.addTransHop(hi1);
textFileOutputMeta.setSeparator(";"); transMeta.addTransHop(hi2);
textFileOutputMeta.setFileCompression("None"); transMeta.addStep(inputStep);
textFileOutputMeta.setEnclosure("\""); transMeta.addStep(udjcStep);
textFileOutputMeta.setEncoding("UTF-8"); transMeta.addStep(outputStep);
TextFileField[] outputFields = new TextFileField[1]; break;
outputFields[0] = new TextFileField(); case "incremental_add":
textFileOutputMeta.setOutputFields(outputFields); transName = "trans_add_" + datasetTable.getId();
dorisOutputTable = DorisTableUtils.dorisName(datasetTable.getId());
StepMeta tostep = new StepMeta("TextFileOutput", "TextFileOutput", textFileOutputMeta); transMeta.setName(transName);
tostep.setLocation(600, 100); inputStep = inputStep(transMeta, selectSQL);
tostep.setDraw(true); udjcStep = udjc(datasetTableFields);
transMeta.addStep(tostep); outputStep = outputStep(transMeta, dorisOutputTable);
TransHopMeta hi1 = new TransHopMeta(fromStep, tostep); hi1 = new TransHopMeta(inputStep, udjcStep);
transMeta.addTransHop(hi1); hi2 = new TransHopMeta(udjcStep, outputStep);
transMeta.addTransHop(hi1);
transMeta.addTransHop(hi2);
// //第二个 (User defined Java class) transMeta.addStep(inputStep);
// UserDefinedJavaClassMeta userDefinedJavaClassMeta = new UserDefinedJavaClassMeta(); transMeta.addStep(udjcStep);
// List<UserDefinedJavaClassMeta.FieldInfo> fields = new ArrayList<>(); transMeta.addStep(outputStep);
// UserDefinedJavaClassMeta.FieldInfo fieldInfo = new UserDefinedJavaClassMeta.FieldInfo("uuid", ValueMetaInterface.TYPE_STRING, -1, -1); break;
// fields.add(fieldInfo); case "incremental_delete":
// userDefinedJavaClassMeta.setFieldInfo(fields); dorisOutputTable = DorisTableUtils.dorisName(datasetTable.getId());
// List<UserDefinedJavaClassDef> definitions = new ArrayList<UserDefinedJavaClassDef>(); transName = "trans_delete_" + datasetTable.getId();
// UserDefinedJavaClassDef userDefinedJavaClassDef = new UserDefinedJavaClassDef(UserDefinedJavaClassDef.ClassType.TRANSFORM_CLASS, "Processor", code); transMeta.setName(transName);
// userDefinedJavaClassDef.setActive(true); inputStep = inputStep(transMeta, selectSQL);
// definitions.add(userDefinedJavaClassDef); udjcStep = udjc(datasetTableFields);
// userDefinedJavaClassMeta.replaceDefinitions(definitions); outputStep = execSqlStep(transMeta, dorisOutputTable, datasetTableFields);
// hi1 = new TransHopMeta(inputStep, udjcStep);
// StepMeta userDefinedJavaClassStep = new StepMeta("UserDefinedJavaClass", "UserDefinedJavaClass", userDefinedJavaClassMeta); hi2 = new TransHopMeta(udjcStep, outputStep);
// userDefinedJavaClassStep.setLocation(300, 100); transMeta.addTransHop(hi1);
// userDefinedJavaClassStep.setDraw(true); transMeta.addTransHop(hi2);
// transMeta.addStep(userDefinedJavaClassStep); transMeta.addStep(inputStep);
// transMeta.addStep(udjcStep);
// //第三个 (HBaseOutputMeta) transMeta.addStep(outputStep);
// NamedClusterService namedClusterService = new NamedClusterManager(); break;
// NamedCluster clusterTemplate = new NamedClusterImpl(); default:
// clusterTemplate.setName("hadoop"); break;
// clusterTemplate.setZooKeeperHost(zkHost); }
// clusterTemplate.setZooKeeperPort(zkPort);
// clusterTemplate.setStorageScheme("HDFS");
// namedClusterService.setClusterTemplate(clusterTemplate);
//
// List<ClusterInitializerProvider> providers = new ArrayList<>();
// ClusterInitializer clusterInitializer = new ClusterInitializerImpl(providers);
// NamedClusterServiceLocator namedClusterServiceLocator = new NamedClusterServiceLocatorImpl(clusterInitializer);
//
// List<RuntimeTestActionHandler> runtimeTestActionHandlers = new ArrayList<>();
// RuntimeTestActionHandler defaultHandler = null;
//
// RuntimeTestActionService runtimeTestActionService = new RuntimeTestActionServiceImpl(runtimeTestActionHandlers, defaultHandler);
// RuntimeTester runtimeTester = new RuntimeTesterImpl(new ArrayList<>(Arrays.asList(mock(RuntimeTest.class))), mock(ExecutorService.class), "modules");
//
// Put put = new Put((datasetTable.getId() + "," + "target_mapping").getBytes());
// for (DatasetTableField datasetTableField : datasetTableFields) {
// put.addColumn("columns".getBytes(), (dataease_column_family + "," + datasetTableField.getOriginName() + "," + datasetTableField.getOriginName()).getBytes(), transToColumnType(datasetTableField.getDeType()).getBytes());
// }
// put.addColumn("key".getBytes(), "uuid".getBytes(), "String".getBytes());
// TableName pentaho_mappings = TableName.valueOf(this.pentaho_mappings);
// Table tab = getConnection().getTable(pentaho_mappings);
// tab.put(put);
//
// HBaseOutputMeta hBaseOutputMeta = new HBaseOutputMeta(namedClusterService, namedClusterServiceLocator, runtimeTestActionService, runtimeTester);
// hBaseOutputMeta.setTargetTableName(datasetTable.getId());
// hBaseOutputMeta.setTargetMappingName("target_mapping");
// hBaseOutputMeta.setNamedCluster(clusterTemplate);
// hBaseOutputMeta.setCoreConfigURL(hbase_conf_file);
// hBaseOutputMeta.setDisableWriteToWAL(true);
// hBaseOutputMeta.setWriteBufferSize("31457280"); //30M
// if (extractType.equalsIgnoreCase("incremental_delete")) {
// hBaseOutputMeta.setDeleteRowKey(true);
// }
// StepMeta tostep = new StepMeta("HBaseOutput", "HBaseOutput", hBaseOutputMeta);
// tostep.setLocation(600, 100);
//
// tostep.setDraw(true);
// transMeta.addStep(tostep);
// TransHopMeta hi1 = new TransHopMeta(fromStep, userDefinedJavaClassStep);
// TransHopMeta hi2 = new TransHopMeta(userDefinedJavaClassStep, tostep);
// transMeta.addTransHop(hi1);
// transMeta.addTransHop(hi2);
String transXml = transMeta.getXML(); String transXml = transMeta.getXML();
File file = new File(root_path + transName + ".ktr"); File file = new File(root_path + transName + ".ktr");
FileUtils.writeStringToFile(file, transXml, "UTF-8"); FileUtils.writeStringToFile(file, transXml, "UTF-8");
} }
public String transToColumnType(Integer field) { private StepMeta inputStep(TransMeta transMeta, String selectSQL){
switch (field) { TableInputMeta tableInput = new TableInputMeta();
case 0: DatabaseMeta database = transMeta.findDatabase("db");
return "String"; tableInput.setDatabaseMeta(database);
case 1: tableInput.setSQL(selectSQL);
return "Date"; StepMeta fromStep = new StepMeta("TableInput", "Data Input", tableInput);
case 2: fromStep.setDraw(true);
return "BigNumber"; fromStep.setLocation(100, 100);
default: return fromStep;
return "String"; }
}
private StepMeta outputStep(TransMeta transMeta, String dorisOutputTable){
TableOutputMeta tableOutputMeta = new TableOutputMeta();
DatabaseMeta dorisDatabaseMeta = transMeta.findDatabase("doris");
tableOutputMeta.setDatabaseMeta(dorisDatabaseMeta);
tableOutputMeta.setTableName(dorisOutputTable);
tableOutputMeta.setCommitSize(10000);
tableOutputMeta.setUseBatchUpdate(true);
StepMeta outputStep = new StepMeta("TableOutput", "TableOutput", tableOutputMeta);
outputStep.setLocation(600, 100);
outputStep.setDraw(true);
return outputStep;
}
private StepMeta udjc(List<DatasetTableField> datasetTableFields){
UserDefinedJavaClassMeta userDefinedJavaClassMeta = new UserDefinedJavaClassMeta();
List<UserDefinedJavaClassMeta.FieldInfo> fields = new ArrayList<>();
UserDefinedJavaClassMeta.FieldInfo fieldInfo = new UserDefinedJavaClassMeta.FieldInfo("dataease_uuid", ValueMetaInterface.TYPE_STRING, -1, -1);
fields.add(fieldInfo);
userDefinedJavaClassMeta.setFieldInfo(fields);
List<UserDefinedJavaClassDef> definitions = new ArrayList<UserDefinedJavaClassDef>();
UserDefinedJavaClassDef userDefinedJavaClassDef = new UserDefinedJavaClassDef(UserDefinedJavaClassDef.ClassType.TRANSFORM_CLASS, "Processor",
code.replace("Column_Fields", String.join(",", datasetTableFields.stream().map(DatasetTableField::getOriginName).collect(Collectors.toList()))));
userDefinedJavaClassDef.setActive(true);
definitions.add(userDefinedJavaClassDef);
userDefinedJavaClassMeta.replaceDefinitions(definitions);
StepMeta userDefinedJavaClassStep = new StepMeta("UserDefinedJavaClass", "UserDefinedJavaClass", userDefinedJavaClassMeta);
userDefinedJavaClassStep.setLocation(300, 100);
userDefinedJavaClassStep.setDraw(true);
return userDefinedJavaClassStep;
}
private StepMeta execSqlStep(TransMeta transMeta, String dorisOutputTable, List<DatasetTableField>datasetTableFields){
ExecSQLMeta execSQLMeta = new ExecSQLMeta();
DatabaseMeta dorisDatabaseMeta = transMeta.findDatabase("doris");
execSQLMeta.setDatabaseMeta(dorisDatabaseMeta);
String deleteSql = "delete from DORIS_TABLE where dataease_uuid='?';".replace("DORIS_TABLE", dorisOutputTable);
execSQLMeta.setSql(deleteSql);
execSQLMeta.setExecutedEachInputRow(true);
execSQLMeta.setArguments(new String[]{"dataease_uuid"});
StepMeta execSQLStep = new StepMeta("ExecSQL", "ExecSQL", execSQLMeta);
execSQLStep.setLocation(600, 100);
execSQLStep.setDraw(true);
return execSQLStep;
} }
private static String code = "import org.pentaho.di.core.row.ValueMetaInterface;\n" + private static String code = "import org.pentaho.di.core.row.ValueMetaInterface;\n" +
@ -506,29 +489,11 @@ public class ExtractDataService {
"import java.util.List;\n" + "import java.util.List;\n" +
"import java.util.concurrent.ExecutorService;\n" + "import java.util.concurrent.ExecutorService;\n" +
"import java.util.concurrent.Executors;\n" + "import java.util.concurrent.Executors;\n" +
"import org.pentaho.di.core.util.StringUtil;\n" +
"\n" + "\n" +
"public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {\n" + "public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {\n" +
" if (first) {\n" + " if (first) {\n" +
" first = false;\n" + " first = false;\n" +
"\n" +
" /* TODO: Your code here. (Using info fields)\n" +
"\n" +
" FieldHelper infoField = get(Fields.Info, \"info_field_name\");\n" +
"\n" +
" RowSet infoStream = findInfoRowSet(\"info_stream_tag\");\n" +
"\n" +
" Object[] infoRow = null;\n" +
"\n" +
" int infoRowCount = 0;\n" +
"\n" +
" // Read all rows from info step before calling getRow() method, which returns first row from any\n" +
" // input rowset. As rowMeta for info and input steps varies getRow() can lead to errors.\n" +
" while((infoRow = getRowFrom(infoStream)) != null){\n" +
"\n" +
" // do something with info data\n" +
" infoRowCount++;\n" +
" }\n" +
" */\n" +
" }\n" + " }\n" +
"\n" + "\n" +
" Object[] r = getRow();\n" + " Object[] r = getRow();\n" +
@ -538,19 +503,17 @@ public class ExtractDataService {
" return false;\n" + " return false;\n" +
" }\n" + " }\n" +
"\n" + "\n" +
" // It is always safest to call createOutputRow() to ensure that your output row's Object[] is large\n" +
" // enough to handle any new fields you are creating in this step.\n" +
" r = createOutputRow(r, data.outputRowMeta.size());\n" + " r = createOutputRow(r, data.outputRowMeta.size());\n" +
" String str = \"\";\n" + " String str = \"\";\n" +
" List<ValueMetaInterface> valueMetaList = data.outputRowMeta.getValueMetaList();\n" + "\n" +
" for (ValueMetaInterface valueMetaInterface : valueMetaList) {\n" + " List<String> fileds = Arrays.asList(\"Column_Fields\".split(\",\"));\n" +
"\t if(!valueMetaInterface.getName().equalsIgnoreCase(\"uuid\")){\n" + " for (String filed : fileds) {\n" +
" str = str + get(Fields.In, valueMetaInterface.getName()).getString(r);\n" + " String tmp = get(Fields.In, filed).getString(r);\n" +
" }\n" + " str = str + tmp;\n" +
" }\n" + " }\n" +
"\n" + "\n" +
" String md5 = md5(str);\n" + " String md5 = md5(str);\n" +
" get(Fields.Out, \"uuid\").setValue(r, md5);\n" + " get(Fields.Out, \"dataease_uuid\").setValue(r, md5);\n" +
"\n" + "\n" +
" putRow(data.outputRowMeta, r);\n" + " putRow(data.outputRowMeta, r);\n" +
"\n" + "\n" +
@ -590,6 +553,6 @@ public class ExtractDataService {
" str = str + d[i];\n" + " str = str + d[i];\n" +
" }\n" + " }\n" +
" return str;\n" + " return str;\n" +
" }\n"; " }";
} }

View File

@ -1,53 +0,0 @@
package io.dataease.service.spark;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import java.util.HashMap;
import java.util.Map;
/**
* @Author gin
* @Date 2021/4/13 12:32 下午
*/
public class CacheUtil {
private static CacheUtil cacheUtil;
private static Map<String, Dataset<Row>> cacheMap;
private CacheUtil(){
cacheMap = new HashMap<String, Dataset<Row>>();
}
public static CacheUtil getInstance(){
if (cacheUtil == null){
cacheUtil = new CacheUtil();
}
return cacheUtil;
}
/**
* 添加缓存
* @param key
* @param obj
*/
public void addCacheData(String key,Dataset<Row> obj){
cacheMap.put(key,obj);
}
/**
* 取出缓存
* @param key
* @return
*/
public Dataset<Row> getCacheData(String key){
return cacheMap.get(key);
}
/**
* 清楚缓存
* @param key
*/
public void removeCacheData(String key){
cacheMap.remove(key);
}
}

View File

@ -1,407 +0,0 @@
//package io.dataease.service.spark;
//
//import io.dataease.base.domain.DatasetTableField;
//import io.dataease.commons.utils.CommonBeanFactory;
//import io.dataease.controller.request.chart.ChartExtFilterRequest;
//import io.dataease.dto.chart.ChartViewFieldDTO;
//import org.antlr.analysis.MachineProbe;
//import org.apache.commons.collections4.CollectionUtils;
//import org.apache.commons.lang3.ObjectUtils;
//import org.apache.commons.lang3.StringUtils;
//import org.apache.hadoop.hbase.client.Result;
//import org.apache.hadoop.hbase.client.Scan;
//import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
//import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
//import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
//import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
//import org.apache.hadoop.hbase.util.Bytes;
//import org.apache.spark.api.java.JavaPairRDD;
//import org.apache.spark.api.java.JavaRDD;
//import org.apache.spark.api.java.JavaSparkContext;
//import org.apache.spark.api.java.function.FlatMapFunction;
//import org.apache.spark.api.java.function.Function;
//import org.apache.spark.sql.*;
//import org.apache.spark.sql.types.DataTypes;
//import org.apache.spark.sql.types.StructField;
//import org.apache.spark.sql.types.StructType;
//import org.apache.spark.storage.StorageLevel;
//import org.springframework.core.env.Environment;
//import org.springframework.stereotype.Service;
//import scala.Tuple2;
//
//import javax.annotation.Resource;
//import java.math.BigDecimal;
//import java.text.MessageFormat;
//import java.util.*;
//
///**
// * @Author gin
// * @Date 2021/3/26 3:49 下午
// */
//@Service
//public class SparkCalc {
// private static String column_family = "dataease";
// private static String data_path = "/opt/dataease/data/db/";
// @Resource
// private Environment env; // 保存了配置文件的信息
//
// public List<String[]> getData(String hTable, List<DatasetTableField> fields, List<ChartViewFieldDTO> xAxis, List<ChartViewFieldDTO> yAxis, String tmpTable, List<ChartExtFilterRequest> requestList) throws Exception {
// // Spark Context
// SparkSession spark = CommonBeanFactory.getBean(SparkSession.class);
// JavaSparkContext sparkContext = new JavaSparkContext(spark.sparkContext());
//
// // Spark SQL Context
// SQLContext sqlContext = new SQLContext(sparkContext);
// sqlContext.setConf("spark.sql.shuffle.partitions", env.getProperty("spark.sql.shuffle.partitions", "1"));
// sqlContext.setConf("spark.default.parallelism", env.getProperty("spark.default.parallelism", "1"));
//
// /*Map<String, BigDecimal> dataFrame = getData(sparkContext, sqlContext, hTable, fields);
// List<String[]> data = new ArrayList<>();
// Iterator<Map.Entry<String, BigDecimal>> iterator = dataFrame.entrySet().iterator();
// while (iterator.hasNext()) {
// String[] r = new String[2];
// Map.Entry<String, BigDecimal> next = iterator.next();
// String key = next.getKey();
// BigDecimal value = next.getValue();
// r[0] = key;
// r[1] = value.toString();
// data.add(r);
// }*/
//
//// Dataset<Row> dataFrame = getData(sparkContext, sqlContext, hTable, fields);
// Dataset<Row> dataFrame = CacheUtil.getInstance().getCacheData(hTable);
// if (ObjectUtils.isEmpty(dataFrame)) {
// dataFrame = getHBaseDataAndCache(sparkContext, sqlContext, hTable, fields);
// }
//
// dataFrame.createOrReplaceTempView(tmpTable);
// Dataset<Row> sql = sqlContext.sql(getSQL(xAxis, yAxis, tmpTable, requestList));
// // transform
// List<String[]> data = new ArrayList<>();
// List<Row> list = sql.collectAsList();
// for (Row row : list) {
// String[] r = new String[row.length()];
// for (int i = 0; i < row.length(); i++) {
// r[i] = row.get(i) == null ? "null" : row.get(i).toString();
// }
// data.add(r);
// }
// return data;
// }
//
// public Dataset<Row> getHBaseDataAndCache(String hTable, List<DatasetTableField> fields) throws Exception {
// // Spark Context
// SparkSession spark = CommonBeanFactory.getBean(SparkSession.class);
// JavaSparkContext sparkContext = new JavaSparkContext(spark.sparkContext());
//
// // Spark SQL Context
// SQLContext sqlContext = new SQLContext(sparkContext);
// sqlContext.setConf("spark.sql.shuffle.partitions", env.getProperty("spark.sql.shuffle.partitions", "1"));
// sqlContext.setConf("spark.default.parallelism", env.getProperty("spark.default.parallelism", "1"));
// return getHBaseDataAndCache(sparkContext, sqlContext, hTable, fields);
// }
//
// public Map<String, BigDecimal> getData(JavaSparkContext sparkContext, SQLContext sqlContext, String tableId, List<DatasetTableField> fields) throws Exception {
// fields.sort((o1, o2) -> {
// if (o1.getOriginName() == null) {
// return -1;
// }
// if (o2.getOriginName() == null) {
// return 1;
// }
// return o1.getOriginName().compareTo(o2.getOriginName());
// });
//
// JavaRDD<String> pairRDD = sparkContext.textFile(data_path + tableId + ".txt");
//// System.out.println(pairRDD.count());
//
//// JavaRDD<Map.Entry<String, BigDecimal>> rdd = pairRDD.map((Function<String, Map.Entry<String, BigDecimal>>) v1 -> {
//// Map<String, BigDecimal> map = new HashMap<>();
//// String[] items = v1.split(";");
//// String day = null;
//// BigDecimal res = new BigDecimal(0);
//// for (int i = 0; i < items.length; i++) {
//// String l = items[i];
//// DatasetTableField x = fields.get(i);
//// if (x.getOriginName().equalsIgnoreCase("sync_day")) {
//// day = l;
//// }
//// if (x.getOriginName().equalsIgnoreCase("usage_cost")) {
//// res = new BigDecimal(l);
//// }
//// }
//// BigDecimal bigDecimal = map.get(day);
//// if (bigDecimal == null) {
//// map.put(day, res);
//// } else {
//// map.put(day, bigDecimal.add(res));
//// }
//// return map.entrySet().iterator().next();
//// });
//
// JavaRDD<Map.Entry<String, BigDecimal>> rdd = pairRDD.mapPartitions((FlatMapFunction<java.util.Iterator<String>, Map.Entry<String, BigDecimal>>) tuple2Iterator -> {
// Map<String, BigDecimal> map = new HashMap<>();
// while (tuple2Iterator.hasNext()) {
// String[] items = tuple2Iterator.next().split(";");
// String day = null;
// BigDecimal res = new BigDecimal(0);
// for (int i = 0; i < items.length; i++) {
// String l = items[i];
// DatasetTableField x = fields.get(i);
// if (x.getOriginName().equalsIgnoreCase("sync_day")) {
// day = l;
// }
// if (x.getOriginName().equalsIgnoreCase("usage_cost")) {
// res = new BigDecimal(l);
// }
// }
// BigDecimal bigDecimal = map.get(day);
// if (bigDecimal == null) {
// map.put(day, res);
// } else {
// map.put(day, bigDecimal.add(res));
// }
// }
// return map.entrySet().iterator();
// });
//
//
//// System.out.println(rdd.count());
//
// Map<String, BigDecimal> map = new HashMap<>();
// List<Map.Entry<String, BigDecimal>> collect = rdd.collect();
//// System.out.println(collect.size());
//
// collect.forEach(stringBigDecimalEntry -> {
// String key = stringBigDecimalEntry.getKey();
// BigDecimal value = stringBigDecimalEntry.getValue();
//
// BigDecimal bigDecimal = map.get(key);
// if (bigDecimal == null) {
// map.put(key, value);
// } else {
// map.put(key, bigDecimal.add(value));
// }
// });
//
// return map;
// }
//
//// public Dataset<Row> getData(JavaSparkContext sparkContext, SQLContext sqlContext, String tableId, List<DatasetTableField> fields) throws Exception {
//// fields.sort((o1, o2) -> {
//// if (o1.getOriginName() == null) {
//// return -1;
//// }
//// if (o2.getOriginName() == null) {
//// return 1;
//// }
//// return o1.getOriginName().compareTo(o2.getOriginName());
//// });
////
//// JavaRDD<String> pairRDD = sparkContext.textFile(data_path + tableId + ".txt");
////
//// JavaRDD<Row> rdd = pairRDD.mapPartitions((FlatMapFunction<java.util.Iterator<String>, Row>) tuple2Iterator -> {
//// List<Row> iterator = new ArrayList<>();
//// while (tuple2Iterator.hasNext()) {
//// String[] items = tuple2Iterator.next().split(";");
//// List<Object> list = new ArrayList<>();
//// for (int i = 0; i < items.length; i++) {
//// String l = items[i];
//// DatasetTableField x = fields.get(i);
//// if (x.getDeType() == 0 || x.getDeType() == 1) {
//// list.add(l);
//// } else if (x.getDeType() == 2) {
//// if (StringUtils.isEmpty(l)) {
//// l = "0";
//// }
//// if (StringUtils.equalsIgnoreCase(l, "Y")) {
//// l = "1";
//// }
//// if (StringUtils.equalsIgnoreCase(l, "N")) {
//// l = "0";
//// }
//// list.add(Long.valueOf(l));
//// } else if (x.getDeType() == 3) {
//// if (StringUtils.isEmpty(l)) {
//// l = "0.0";
//// }
//// list.add(Double.valueOf(l));
//// }
//// }
//// iterator.add(RowFactory.create(list.toArray()));
//// }
//// return iterator.iterator();
//// });
////
//// List<StructField> structFields = new ArrayList<>();
//// // struct顺序要与rdd顺序一致
//// fields.forEach(x -> {
//// if (x.getDeType() == 0 || x.getDeType() == 1) {
//// structFields.add(DataTypes.createStructField(x.getOriginName(), DataTypes.StringType, true));
//// } else if (x.getDeType() == 2) {
//// structFields.add(DataTypes.createStructField(x.getOriginName(), DataTypes.LongType, true));
//// } else if (x.getDeType() == 3) {
//// structFields.add(DataTypes.createStructField(x.getOriginName(), DataTypes.DoubleType, true));
//// }
//// });
//// StructType structType = DataTypes.createStructType(structFields);
////
//// Dataset<Row> dataFrame = sqlContext.createDataFrame(rdd, structType);
//// return dataFrame;
//// }
//
// public Dataset<Row> getHBaseDataAndCache(JavaSparkContext sparkContext, SQLContext sqlContext, String hTable, List<DatasetTableField> fields) throws Exception {
// Scan scan = new Scan();
// scan.addFamily(Bytes.toBytes(column_family));
// for (DatasetTableField field : fields) {
// scan.addColumn(Bytes.toBytes(column_family), Bytes.toBytes(field.getOriginName()));
// }
// ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
// String scanToString = new String(Base64.getEncoder().encode(proto.toByteArray()));
//
// // HBase config
// org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
// conf.set("hbase.zookeeper.quorum", env.getProperty("hbase.zookeeper.quorum"));
// conf.set("hbase.zookeeper.property.clientPort", env.getProperty("hbase.zookeeper.property.clientPort"));
// conf.set("hbase.client.retries.number", env.getProperty("hbase.client.retries.number", "1"));
// conf.set(TableInputFormat.INPUT_TABLE, hTable);
// conf.set(TableInputFormat.SCAN, scanToString);
//
// JavaPairRDD<ImmutableBytesWritable, Result> pairRDD = sparkContext.newAPIHadoopRDD(conf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class);
//
// JavaRDD<Row> rdd = pairRDD.mapPartitions((FlatMapFunction<Iterator<Tuple2<ImmutableBytesWritable, Result>>, Row>) tuple2Iterator -> {
// List<Row> iterator = new ArrayList<>();
// while (tuple2Iterator.hasNext()) {
// Result result = tuple2Iterator.next()._2;
// List<Object> list = new ArrayList<>();
// fields.forEach(x -> {
// String l = Bytes.toString(result.getValue(column_family.getBytes(), x.getOriginName().getBytes()));
// if (x.getDeType() == 0 || x.getDeType() == 1) {
// list.add(l);
// } else if (x.getDeType() == 2) {
// if (StringUtils.isEmpty(l)) {
// l = "0";
// }
// list.add(Long.valueOf(l));
// } else if (x.getDeType() == 3) {
// if (StringUtils.isEmpty(l)) {
// l = "0.0";
// }
// list.add(Double.valueOf(l));
// }
// });
// iterator.add(RowFactory.create(list.toArray()));
// }
// return iterator.iterator();
// });
//
// List<StructField> structFields = new ArrayList<>();
// // struct顺序要与rdd顺序一致
// fields.forEach(x -> {
// if (x.getDeType() == 0 || x.getDeType() == 1) {
// structFields.add(DataTypes.createStructField(x.getOriginName(), DataTypes.StringType, true));
// } else if (x.getDeType() == 2) {
// structFields.add(DataTypes.createStructField(x.getOriginName(), DataTypes.LongType, true));
// } else if (x.getDeType() == 3) {
// structFields.add(DataTypes.createStructField(x.getOriginName(), DataTypes.DoubleType, true));
// }
// });
// StructType structType = DataTypes.createStructType(structFields);
//
// Dataset<Row> dataFrame = sqlContext.createDataFrame(rdd, structType).persist(StorageLevel.MEMORY_AND_DISK_SER());
// CacheUtil.getInstance().addCacheData(hTable, dataFrame);
// dataFrame.count();
// return dataFrame;
// }
//
// public String getSQL(List<ChartViewFieldDTO> xAxis, List<ChartViewFieldDTO> yAxis, String table, List<ChartExtFilterRequest> extFilterRequestList) {
// // 字段汇总 排序等
// String[] field = yAxis.stream().map(y -> "CAST(" + y.getSummary() + "(" + y.getOriginName() + ") AS DECIMAL(20,2)) AS _" + y.getSummary() + "_" + y.getOriginName()).toArray(String[]::new);
// String[] group = xAxis.stream().map(ChartViewFieldDTO::getOriginName).toArray(String[]::new);
// String[] order = yAxis.stream().filter(y -> StringUtils.isNotEmpty(y.getSort()) && !StringUtils.equalsIgnoreCase(y.getSort(), "none"))
// .map(y -> "_" + y.getSummary() + "_" + y.getOriginName() + " " + y.getSort()).toArray(String[]::new);
//
// String sql = MessageFormat.format("SELECT {0},{1} FROM {2} WHERE 1=1 {3} GROUP BY {4} ORDER BY null,{5}",
// StringUtils.join(group, ","),
// StringUtils.join(field, ","),
// table,
// transExtFilter(extFilterRequestList),// origin field filter and panel field filter,
// StringUtils.join(group, ","),
// StringUtils.join(order, ","));
// if (sql.endsWith(",")) {
// sql = sql.substring(0, sql.length() - 1);
// }
// // 如果是对结果字段过滤则再包裹一层sql
// String[] resultFilter = yAxis.stream().filter(y -> CollectionUtils.isNotEmpty(y.getFilter()) && y.getFilter().size() > 0)
// .map(y -> {
// String[] s = y.getFilter().stream().map(f -> "AND _" + y.getSummary() + "_" + y.getOriginName() + transFilterTerm(f.getTerm()) + f.getValue()).toArray(String[]::new);
// return StringUtils.join(s, " ");
// }).toArray(String[]::new);
// if (resultFilter.length == 0) {
// return sql;
// } else {
// String filterSql = MessageFormat.format("SELECT * FROM {0} WHERE 1=1 {1}",
// "(" + sql + ") AS tmp",
// StringUtils.join(resultFilter, " "));
// return filterSql;
// }
// }
//
// public String transFilterTerm(String term) {
// switch (term) {
// case "eq":
// return " = ";
// case "not_eq":
// return " <> ";
// case "lt":
// return " < ";
// case "le":
// return " <= ";
// case "gt":
// return " > ";
// case "ge":
// return " >= ";
// case "in":
// return " IN ";
// case "not in":
// return " NOT IN ";
// case "like":
// return " LIKE ";
// case "not like":
// return " NOT LIKE ";
// case "null":
// return " IS NULL ";
// case "not_null":
// return " IS NOT NULL ";
// default:
// return "";
// }
// }
//
// public String transExtFilter(List<ChartExtFilterRequest> requestList) {
// if (CollectionUtils.isEmpty(requestList)) {
// return "";
// }
// StringBuilder filter = new StringBuilder();
// for (ChartExtFilterRequest request : requestList) {
// List<String> value = request.getValue();
// if (CollectionUtils.isEmpty(value)) {
// continue;
// }
// DatasetTableField field = request.getDatasetTableField();
// filter.append(" AND ")
// .append(field.getOriginName())
// .append(" ")
// .append(transFilterTerm(request.getOperator()))
// .append(" ");
// if (StringUtils.containsIgnoreCase(request.getOperator(), "in")) {
// filter.append("('").append(StringUtils.join(value, "','")).append("')");
// } else if (StringUtils.containsIgnoreCase(request.getOperator(), "like")) {
// filter.append("'%").append(value.get(0)).append("%'");
// } else {
// filter.append("'").append(value.get(0)).append("'");
// }
// }
// return filter.toString();
// }
//}

View File

@ -67,7 +67,7 @@
<!-- <table tableName="datasource"/>--> <!-- <table tableName="datasource"/>-->
<!-- <table tableName="sys_dict"/>--> <!-- <table tableName="sys_dict"/>-->
<!-- <table tableName="sys_dict_item"/>--> <!-- <table tableName="sys_dict_item"/>-->
<table tableName="panel_template"/> <table tableName="dataset_table_field"/>
<!-- <table tableName="panel_design"/>--> <!-- <table tableName="panel_design"/>-->