package io.dataease.provider.datasource; import com.alibaba.fastjson.JSONObject; import com.google.gson.Gson; import io.dataease.commons.utils.HttpClientConfig; import io.dataease.commons.utils.HttpClientUtil; import io.dataease.controller.request.datasource.es.EsReponse; import io.dataease.controller.request.datasource.es.Request; import io.dataease.controller.request.datasource.es.RequestWithCursor; import io.dataease.controller.request.datasource.DatasourceRequest; import io.dataease.dto.datasource.EsConfiguration; import io.dataease.dto.datasource.TableDesc; import io.dataease.dto.datasource.TableField; import io.dataease.exception.DataEaseException; import io.dataease.i18n.Translator; import io.dataease.provider.query.es.EsQueryProvider; import org.apache.commons.codec.binary.Base64; import org.apache.commons.lang.StringUtils; import org.apache.http.HttpHeaders; import org.springframework.stereotype.Service; import java.nio.charset.StandardCharsets; import java.util.*; import java.util.stream.Collectors; @Service("es") public class EsProvider extends DatasourceProvider { /** * 增加缓存机制 key 由 'provider_sql_' dsr.datasource.id dsr.table dsr.query共4部分组成,命中则使用缓存直接返回不再执行sql逻辑 * @param dsr * @return * @throws Exception */ /** * 这里使用声明式缓存不是很妥当 * 改为chartViewService中使用编程式缓存 * * @Cacheable( value = JdbcConstants.JDBC_PROVIDER_KEY, * key = "'provider_sql_' + #dsr.datasource.id + '_' + #dsr.table + '_' + #dsr.query", * condition = "#dsr.pageSize == null || #dsr.pageSize == 0L" * ) */ @Override public List getData(DatasourceRequest dsr) throws Exception { List list = new LinkedList<>(); try { EsConfiguration esConfiguration = new Gson().fromJson(dsr.getDatasource().getConfiguration(), EsConfiguration.class); HttpClientConfig httpClientConfig = new HttpClientConfig(); if (StringUtils.isNotEmpty(esConfiguration.getEsUsername())) { String auth = esConfiguration.getEsUsername() + ":" + esConfiguration.getEsPassword(); byte[] encodedAuth = Base64.encodeBase64(auth.getBytes(StandardCharsets.UTF_8)); httpClientConfig.addHeader(HttpHeaders.AUTHORIZATION, "Basic " + new String(encodedAuth)); } Request request = new Request(); request.setQuery(dsr.getQuery()); request.setFetch_size(dsr.getFetchSize()); String url = esConfiguration.getUrl().endsWith("/") ? esConfiguration.getUrl() + esConfiguration.getUri() + "?format=json" : esConfiguration.getUrl() + "/" + esConfiguration.getUri() + "?format=json"; String response = HttpClientUtil.post(url, new Gson().toJson(request), httpClientConfig); EsReponse esReponse = new Gson().fromJson(response, EsReponse.class); list.addAll(fetchResult(esReponse)); if (dsr.isPageable()) { Integer realSize = dsr.getPage() * dsr.getPageSize() < list.size() ? dsr.getPage() * dsr.getPageSize() : list.size(); list = list.subList((dsr.getPage() - 1) * dsr.getPageSize(), realSize); } if (!dsr.isPreviewData()) { while (StringUtils.isNotEmpty(esReponse.getCursor())) { RequestWithCursor requstWithCursor = new RequestWithCursor(); requstWithCursor.setQuery(dsr.getQuery()); requstWithCursor.setFetch_size(dsr.getFetchSize()); requstWithCursor.setCursor(esReponse.getCursor()); response = HttpClientUtil.post(url, new Gson().toJson(requstWithCursor), httpClientConfig); esReponse = new Gson().fromJson(response, EsReponse.class); list.addAll(fetchResult(esReponse)); } } } catch (Exception e) { e.printStackTrace(); DataEaseException.throwException(e); } return list; } @Override public List fetchResult(DatasourceRequest datasourceRequest) throws Exception { List list = new LinkedList<>(); try { String response = exexQuery(datasourceRequest, datasourceRequest.getQuery(), "?format=json"); list = fetchResult(response); } catch (Exception e) { DataEaseException.throwException(e); } return list; } @Override public List getTableFileds(DatasourceRequest datasourceRequest) throws Exception { datasourceRequest.setQuery("desc " + datasourceRequest.getTable()); List tableFields = new ArrayList<>(); try { String response = exexQuery(datasourceRequest, datasourceRequest.getQuery(), "?format=json"); tableFields = fetchResultField4Table(response); } catch (Exception e) { DataEaseException.throwException(e); } return tableFields; } private List fetchResult(String response) throws Exception { EsReponse esReponse = new Gson().fromJson(response, EsReponse.class); return fetchResult(esReponse); } private List fetchResult(EsReponse esReponse) throws Exception { List list = new LinkedList<>(); if (esReponse.getError() != null) { throw new Exception(esReponse.getError().getReason()); } list.addAll(esReponse.getRows()); return list; } @Override public List fetchResultField(DatasourceRequest datasourceRequest) throws Exception { List tableFields = new ArrayList<>(); try { String response = exexQuery(datasourceRequest, datasourceRequest.getQuery(), "?format=json"); tableFields = fetchResultField4Sql(response); } catch (Exception e) { DataEaseException.throwException(e); } return tableFields; } private List fetchResultField(String response) throws Exception { List fieldList = new ArrayList<>(); EsReponse esReponse = new Gson().fromJson(response, EsReponse.class); if (esReponse.getError() != null) { throw new Exception(esReponse.getError().getReason()); } for (String[] row : esReponse.getRows()) { TableField field = new TableField(); field.setFieldName(row[0]); field.setRemarks(row[0]); field.setFieldType(row[2]); field.setFieldSize(EsQueryProvider.transFieldTypeSize(row[2])); fieldList.add(field); } return fieldList; } private List fetchResultField4Sql(String response) throws Exception { List fieldList = new ArrayList<>(); EsReponse esReponse = new Gson().fromJson(response, EsReponse.class); if (esReponse.getError() != null) { throw new Exception(esReponse.getError().getReason()); } for (EsReponse.Column column : esReponse.getColumns()) { TableField field = new TableField(); field.setFieldName(column.getName()); field.setRemarks(column.getName()); field.setFieldType(column.getType()); field.setFieldSize(EsQueryProvider.transFieldTypeSize(column.getType())); fieldList.add(field); } return fieldList; } private List fetchResultField4Table(String response) throws Exception { List fieldList = new ArrayList<>(); EsReponse esReponse = new Gson().fromJson(response, EsReponse.class); if (esReponse.getError() != null) { throw new Exception(esReponse.getError().getReason()); } for (String[] row : esReponse.getRows()) { if(!row[1].equalsIgnoreCase("STRUCT")){ TableField field = new TableField(); field.setFieldName(row[0]); field.setRemarks(row[0]); field.setFieldType(row[2]); field.setFieldSize(EsQueryProvider.transFieldTypeSize(row[2])); fieldList.add(field); } } return fieldList; } @Override public Map fetchResultAndField(DatasourceRequest datasourceRequest) throws Exception { Map result = new HashMap<>(); try { String response = exexQuery(datasourceRequest, datasourceRequest.getQuery(), "?format=json"); result.put("dataList", fetchResult(response)); result.put("fieldList", fetchResultField4Sql(response)); } catch (Exception e) { DataEaseException.throwException(e); } return result; } @Override public void handleDatasource(DatasourceRequest datasourceRequest, String type) throws Exception { } @Override public List getTables(DatasourceRequest datasourceRequest) throws Exception { List tables = new ArrayList<>(); try { String response = exexQuery(datasourceRequest, "show tables", "?format=json"); tables = fetchTables(response); tables = tables.stream().filter(table -> !table.getName().startsWith(".")).collect(Collectors.toList()); } catch (Exception e) { DataEaseException.throwException(e); } return tables; } private List fetchTables(String response) throws Exception { List tables = new ArrayList<>(); EsReponse esReponse = new Gson().fromJson(response, EsReponse.class); if (esReponse.getError() != null) { throw new Exception(esReponse.getError().getReason()); } for (String[] row : esReponse.getRows()) { if (row.length == 3 && row[1].contains("TABLE") && row[2].equalsIgnoreCase("INDEX")) { TableDesc tableDesc = new TableDesc(); tableDesc.setName(row[0]); tables.add(tableDesc); } if (row.length == 2 && row[1].contains("TABLE")) { TableDesc tableDesc = new TableDesc(); tableDesc.setName(row[0]); tables.add(tableDesc); } } return tables; } @Override public List getSchema(DatasourceRequest datasourceRequest) throws Exception { return new ArrayList<>(); } @Override public String checkStatus(DatasourceRequest datasourceRequest) throws Exception { EsConfiguration esConfiguration = new Gson().fromJson(datasourceRequest.getDatasource().getConfiguration(), EsConfiguration.class); String response = exexGetQuery(datasourceRequest); if (JSONObject.parseObject(response).getJSONObject("error") != null) { throw new Exception(JSONObject.parseObject(response).getJSONObject("error").getString("reason")); } String version = JSONObject.parseObject(response).getJSONObject("version").getString("number"); String[] versionList = version.split("\\."); if (Integer.valueOf(versionList[0]) < 7 && Integer.valueOf(versionList[1]) < 3) { throw new Exception(Translator.get("i18n_es_limit")); } if (Integer.valueOf(versionList[0]) == 6) { esConfiguration.setUri("_xpack/sql"); } if (Integer.valueOf(versionList[0]) == 7) { esConfiguration.setUri("_sql"); } datasourceRequest.getDatasource().setConfiguration(new Gson().toJson(esConfiguration)); getTables(datasourceRequest); return "Success"; } private String exexQuery(DatasourceRequest datasourceRequest, String sql, String uri) { EsConfiguration esConfiguration = new Gson().fromJson(datasourceRequest.getDatasource().getConfiguration(), EsConfiguration.class); uri = esConfiguration.getUri() + uri; HttpClientConfig httpClientConfig = new HttpClientConfig(); if (StringUtils.isNotEmpty(esConfiguration.getEsUsername()) && StringUtils.isNotEmpty(esConfiguration.getEsPassword())) { String auth = esConfiguration.getEsUsername() + ":" + esConfiguration.getEsPassword(); byte[] encodedAuth = Base64.encodeBase64(auth.getBytes(StandardCharsets.UTF_8)); httpClientConfig.addHeader(HttpHeaders.AUTHORIZATION, "Basic " + new String(encodedAuth)); } Request request = new Request(); request.setQuery(sql); request.setFetch_size(datasourceRequest.getFetchSize()); String url = esConfiguration.getUrl().endsWith("/") ? esConfiguration.getUrl() + uri : esConfiguration.getUrl() + "/" + uri; String response = HttpClientUtil.post(url, new Gson().toJson(request), httpClientConfig); return response; } private String exexGetQuery(DatasourceRequest datasourceRequest) { EsConfiguration esConfiguration = new Gson().fromJson(datasourceRequest.getDatasource().getConfiguration(), EsConfiguration.class); HttpClientConfig httpClientConfig = new HttpClientConfig(); if (StringUtils.isNotEmpty(esConfiguration.getEsUsername()) && StringUtils.isNotEmpty(esConfiguration.getEsPassword())) { String auth = esConfiguration.getEsUsername() + ":" + esConfiguration.getEsPassword(); byte[] encodedAuth = Base64.encodeBase64(auth.getBytes(StandardCharsets.UTF_8)); httpClientConfig.addHeader(HttpHeaders.AUTHORIZATION, "Basic " + new String(encodedAuth)); } String response = HttpClientUtil.get(esConfiguration.getUrl(), httpClientConfig); return response; } }