提交 b4c406aa authored 作者: songchuancai's avatar songchuancai

优化代码

上级 535adc44
......@@ -10,6 +10,7 @@ import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
/**
* @author : scc
......@@ -44,9 +45,9 @@ public class ColumnDataRangeFilter {
.append(".")
.append(field.getColumnName())
.append(" as ")
.append(this.getTableName())
.append(this.getTableName().toUpperCase(Locale.ROOT))
.append("_")
.append(field.getColumnName())
.append(field.getColumnName().toUpperCase(Locale.ROOT))
.append(",");
}
......@@ -83,9 +84,9 @@ public class ColumnDataRangeFilter {
head = new ArrayList<>();
headBuilder = new StringBuilder();
headBuilder
.append(this.getTableName())
.append(this.getTableName().toUpperCase(Locale.ROOT))
.append("_")
.append(field.getColumnName());
.append(field.getColumnName().toUpperCase(Locale.ROOT));
head.add(headBuilder.toString());
heads.add(head);
}
......
......@@ -29,11 +29,11 @@ public class QueryDataTask<T> implements Callable<T> {
public T call() throws Exception {
if (StringUtils.isEmpty(querySql)) {
return (T) JdbcUtil.executeCountSql(dataApiDataSource, countSql, jdbcParamValues);
Result<Long> result = JdbcUtil.executeCountSql(dataApiDataSource, countSql, jdbcParamValues);
return (T) result.getData();
} else {
List<List<Object>> data = new ArrayList<>();
Result<List<JSONObject>> dataResult = JdbcUtil.executeSql(dataApiDataSource, querySql, jdbcParamValues, queryDataTotalPerThread);
Result<List<List<Object>>> result = new Result<>();
List<JSONObject> dataJsonList = dataResult.getData();
for (JSONObject dataJson : dataJsonList) {
List<Object> lineData = new ArrayList<>();
......@@ -44,10 +44,7 @@ public class QueryDataTask<T> implements Callable<T> {
}
data.add(lineData);
}
result.setMsg(dataResult.getMsg());
result.setData(data);
result.setMsg(dataResult.getMsg());
return (T) result;
return (T) data;
}
}
}
......
......@@ -16,8 +16,8 @@ public class BaseExceptionController {
@ExceptionHandler({Exception.class})
@ResponseBody
public Result<String> exceptionHandler(HttpServletRequest request, Exception e) {
log.error("发生了异常:", e);
String msg = e.getMessage();
log.error("系统异常:", e);
String msg = "系统异常: " + e.getMessage();
return new Result(String.valueOf(HttpStatus.INTERNAL_SERVER_ERROR.value()),msg,null);
}
......@@ -25,8 +25,8 @@ public class BaseExceptionController {
@ExceptionHandler({IllegalArgumentException.class})
@ResponseBody
public Result<String> exceptionHandler(HttpServletRequest request, IllegalArgumentException e) {
log.error("参数不合法:", e);
String msg = e.getMessage();
log.error("系统异常,参数不合法:", e);
String msg = "系统异常,参数不合法:" + e.getMessage();
return new Result(String.valueOf(HttpStatus.BAD_REQUEST.value()),msg,null);
}
......
......@@ -67,6 +67,10 @@ public class DataApiModelConfig extends BaseEntity{
@Column(name = "sql", columnDefinition = "varchar2(1000)", nullable = true)
private String fullSql;
// SQL语句,记录生成的countSql
@Column(name = "count_sql", columnDefinition = "varchar2(1000)", nullable = true)
private String countSql;
// 根据订阅系统的配置生成sql行列筛选条件
// 生成的时候需要进行校验
......@@ -106,28 +110,29 @@ public class DataApiModelConfig extends BaseEntity{
this.setFromSql(fromSql);
this.setSelectSql(selectSql);
this.setWhereSql(whereSql);
this.setCountSql(generateDataCountSql());
return fullSql;
}
// 生成数据量大小查询sql
public String generateDataCountSql() {
// 已经通过校验行列筛选条件
StringBuilder fullSqlBuilder = new StringBuilder();
StringBuilder countSqlBuilder = new StringBuilder();
// 拼接select条件
fullSqlBuilder.append(" select count(1) ");
countSqlBuilder.append(" select count(1) ");
// 拼接from条件
String fromSql = this.getFromSql();
fullSqlBuilder.append(fromSql);
countSqlBuilder.append(fromSql);
// 拼接where条件
String whereSql = this.getWhereSql();
fullSqlBuilder.append(" where ").append(whereSql);
countSqlBuilder.append(" where ").append(whereSql);
// 生成完成的sql
String fullSql = fullSqlBuilder.toString();
return fullSql;
String countSql = countSqlBuilder.toString();
return countSql;
}
/**
......
package com.hisense.dataservice.exceptions;
/**
* @author : scc
* @date : 2023/03/21
**/
public class DataQueryException extends Exception{
public DataQueryException(String msg){
super("数据查询异常:" + msg);
}
public DataQueryException(String msg, Throwable cause){
super("数据查询异常:" + msg, cause);
}
}
......@@ -12,15 +12,15 @@ import com.hisense.dataservice.entity.DataApiModel;
import com.hisense.dataservice.entity.DataApiModelConfig;
import com.hisense.dataservice.entity.DataApiSubscribeConfig;
import com.hisense.dataservice.enums.*;
import com.hisense.dataservice.exceptions.DataQueryException;
import com.hisense.dataservice.library.model.Result;
import com.hisense.dataservice.repository.*;
import com.hisense.dataservice.service.DataApiCommonService;
import com.hisense.dataservice.util.FileUtil;
import com.hisense.dataservice.util.JdbcUtil;
import com.hisense.dataservice.util.ThreadPoolManagerUtil;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
......@@ -28,7 +28,7 @@ import org.springframework.util.StringUtils;
import javax.servlet.http.HttpServletRequest;
import javax.transaction.Transactional;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Function;
......@@ -78,152 +78,304 @@ public class DataApiCommonServiceImpl implements DataApiCommonService {
}
@Override
public Object queryData(String environment, String pattern, String apiId, HttpServletRequest request) {
// TODO 区分数据库连接异常和服务本身异常,保存到ES中
//1、根据url pattern从mapping rule里获取apiId
//
// // TODO 区分数据库连接异常和服务本身异常,保存到ES中
//
// //1、根据url pattern从mapping rule里获取apiId
JSONObject jsonResult = new JSONObject();
// Integer apiId = publishApiService.getApiIdBypattern(pattern);
// if(apiId == 0) {
//// Integer apiId = publishApiService.getApiIdBypattern(pattern);
//// if(apiId == 0) {
//// jsonResult.put("status", HttpStatus.INTERNAL_SERVER_ERROR.value());
//// jsonResult.put("message", String.format("api with url %s not exist", pattern));
//// return jsonResult;
//// }
//// // 2.获取api信息(网关环境,api相关配置等信息)
//// PublishApi publishApi = publishApiRepository.findOne(apiId);
//// if (publishApi == null) {
//// jsonResult.put("status", HttpStatus.INTERNAL_SERVER_ERROR.value());
//// jsonResult.put("message", "获取原Api失败!");
//// return jsonResult;
//// }
//// // 3.获取订阅系统ID(根据type+user_key/(appid+appsecure)+appid)
//// Integer systemId = publishApplicationService.findSystemByUserKeyOrAppId(request);
//// if (systemId == null) {
//// jsonResult.put("status", HttpStatus.INTERNAL_SERVER_ERROR.value());
//// jsonResult.put("message", "获取订阅系统失败!");
//// return jsonResult;
//// }
// // 4.根据订阅系统ID获取自定义的配置
// // 4.1根据apiID查询创建的数据api模板
//
// Integer systemId = findSystemByUserKeyOrAppId(request);
// if (systemId == null) {
// jsonResult.put("status", HttpStatus.INTERNAL_SERVER_ERROR.value());
// jsonResult.put("message", String.format("api with url %s not exist", pattern));
// jsonResult.put("message", "获取订阅系统失败!");
// return jsonResult;
// }
// // 2.获取api信息(网关环境,api相关配置等信息)
// PublishApi publishApi = publishApiRepository.findOne(apiId);
// if (publishApi == null) {
//
// Optional<DataApiModel> dataApiModelOptional = dataApiModelRepository.findByApiID(apiId);
// if (!dataApiModelOptional.isPresent()) {
// jsonResult.put("status", HttpStatus.INTERNAL_SERVER_ERROR.value());
// jsonResult.put("message", "获取原Api失败!");
// jsonResult.put("message", String.format("data api with apiId %s not exist", apiId));
// return jsonResult;
// }
// // 3.获取订阅系统ID(根据type+user_key/(appid+appsecure)+appid)
// Integer systemId = publishApplicationService.findSystemByUserKeyOrAppId(request);
// if (systemId == null) {
// DataApiModel dataApiModel = dataApiModelOptional.get();
//
// // 4.2获取订阅的相关配置
// Optional<DataApiSubscribeConfig> dataApiSubscribeConfigOptional = dataApiSubscribeRepository.findBySubscribeSystemIdAndDataApiModelIdAndDeletedIsFalse(systemId, dataApiModel.getId());
// if (!dataApiSubscribeConfigOptional.isPresent()) {
// jsonResult.put("status", HttpStatus.INTERNAL_SERVER_ERROR.value());
// jsonResult.put("message", "获取订阅系统失败!");
// jsonResult.put("message", "获取订阅系统自定义配置失败!");
// return jsonResult;
// }
// DataApiSubscribeConfig dataApiSubscribeConfig = dataApiSubscribeConfigOptional.get();
//
// // 获取行列筛选配置
// Optional<DataApiModelConfig> bySubscribeConfigIdAndType = dataApiModelConfigRepository.findByTypeIdAndType(dataApiSubscribeConfig.getId(), ModelConfigType.SUBSCRIBE);
// if (!bySubscribeConfigIdAndType.isPresent()) {
// jsonResult.put("status", HttpStatus.INTERNAL_SERVER_ERROR.value());
// jsonResult.put("message", "获取订阅系统自定义行列配置失败!");
// return jsonResult;
// }
// DataApiModelConfig dataApiModelConfig = bySubscribeConfigIdAndType.get();
// String fullSql = dataApiModelConfig.getFullSql();
//// String countSql = dataApiModelConfig.generateDataCountSql();
//
// // 获取数据源
// Optional<DataApiDataSource> dataApiDataSourceOptional = dataApiDataSourceRepository.findById(dataApiModel.getDataSourceId());
// if (!dataApiDataSourceOptional.isPresent()) {
// jsonResult.put("status", HttpStatus.INTERNAL_SERVER_ERROR.value());
// jsonResult.put("message", "获取数据源失败!");
// return jsonResult;
// }
// DataApiDataSource dataApiDataSource = dataApiDataSourceOptional.get();
//
// // 获取sql的填充参数
// List<Object> jdbcParamValues = new ArrayList<>();
// for (LineDataRangeFilter lineDataRangeFilter : dataApiModelConfig.getLineDataRangeConfig()) {
// for (FilterCondition filterCondition : lineDataRangeFilter.getFilterConditions()) {
// jdbcParamValues.addAll(filterCondition.getFilterValues());
// }
// }
// // 参数个数和sql需要填充个数是否一致 TODO
//
// // 数据临时文件名
// String tmpFileName = FileUtil.getPath() + dataApiSubscribeConfig.getId() + System.currentTimeMillis() + ".xlsx";
// // excel的列名
// List<List<String>> heads = ColumnDataRangeFilter.generateExcelHead(dataApiModelConfig.getColumnDataRangeConfig());
//
// Long dataPageTotal;
// try {
// // 获取数据总量
// dataPageTotal = getDataPageTotal(dataApiDataSource, countSql, jdbcParamValues);
//
// // 异步分页查询数据(页:行:列)
// List<List<List<Object>>> dataResult = asyncExecuteSql(dataPageTotal, heads, dataApiDataSource, fullSql, countSql, jdbcParamValues);
//
// // 保存数据到文件
// if (!saveTmpDataFile(tmpFileName, dataResult, heads)) {
// jsonResult.put("status", HttpStatus.INTERNAL_SERVER_ERROR.value());
// jsonResult.put("message", "系统异常:保存数据到文件中错误");
// return jsonResult;
// }
// // 上传文件到存储服务器中 TODO
//
// // 根据数据量返回数据,文件上传并获取下载地址(异步)并响应客户端数据 TODO
// if (dataResult.size() <= 10000) {
// // 直接返回数据 TODO
// log.info("数据少于10000条,返回数据");
// jsonResult.put("data", dataResult);
// } else {
// // 返回文件下载地址 TODO
// log.info("数据大于10000条,返回数据文件下载地址");
// jsonResult.put("data", "http://");
// }
// } catch (ExecutionException e) { // 异步线程可能报的错误: DataQueryException || RuntimeException
// jsonResult.put("status", "500");
// jsonResult.put("message", e.getMessage());
//
// if (e.getCause() instanceof DataQueryException) { // TODO
// // 数据查询异常,则记录到ES TODO
// log.info("记录数据到es....");
// }
// return jsonResult;
// } catch (Exception e) { // 其他业务逻辑异常
// jsonResult.put("status", "500");
// jsonResult.put("message", "系统异常:" + e.getMessage());
// return jsonResult;
// }
// 4.根据订阅系统ID获取自定义的配置
// 4.1根据apiID查询创建的数据api模板
Integer systemId = findSystemByUserKeyOrAppId(request);
if (systemId == null) {
jsonResult.put("status", HttpStatus.INTERNAL_SERVER_ERROR.value());
jsonResult.put("message", "获取订阅系统失败!");
return jsonResult;
}
public Result<JSONObject> queryDataLatest(String environment, String apiId) {
Result<JSONObject> result = new Result<>();
result.setCode(Result.FAIL);
JSONObject object = new JSONObject();
result.setData(object);
Integer systemId = 1; // TODO
Optional<DataApiModel> dataApiModelOptional = dataApiModelRepository.findByApiID(apiId);
if (!dataApiModelOptional.isPresent()) {
jsonResult.put("status", HttpStatus.INTERNAL_SERVER_ERROR.value());
jsonResult.put("message", String.format("data api with apiId %s not exist", apiId));
return jsonResult;
result.setMsg(String.format("data api with apiId %s not exist", apiId));
return result;
}
DataApiModel dataApiModel = dataApiModelOptional.get();
// 4.2获取订阅的相关配置
// 获取订阅的相关配置
Optional<DataApiSubscribeConfig> dataApiSubscribeConfigOptional = dataApiSubscribeRepository.findBySubscribeSystemIdAndDataApiModelIdAndDeletedIsFalse(systemId, dataApiModel.getId());
if (!dataApiSubscribeConfigOptional.isPresent()) {
jsonResult.put("status", HttpStatus.INTERNAL_SERVER_ERROR.value());
jsonResult.put("message", "获取订阅系统自定义配置失败!");
return jsonResult;
result.setMsg("获取订阅系统自定义配置失败!");
return result;
}
DataApiSubscribeConfig dataApiSubscribeConfig = dataApiSubscribeConfigOptional.get();
// 获取行列筛选配置
Optional<DataApiModelConfig> bySubscribeConfigIdAndType = dataApiModelConfigRepository.findByTypeIdAndType(dataApiSubscribeConfig.getId(), ModelConfigType.SUBSCRIBE);
if (!bySubscribeConfigIdAndType.isPresent()) {
jsonResult.put("status", HttpStatus.INTERNAL_SERVER_ERROR.value());
jsonResult.put("message", "获取订阅系统自定义行列配置失败!");
return jsonResult;
result.setMsg("获取订阅系统自定义行列配置失败!");
return result;
}
DataApiModelConfig dataApiModelConfig = bySubscribeConfigIdAndType.get();
String fullSql = dataApiModelConfig.getFullSql();
String countSql = dataApiModelConfig.generateDataCountSql();
// 获取数据源
Optional<DataApiDataSource> dataApiDataSourceOptional = dataApiDataSourceRepository.findById(dataApiModel.getDataSourceId());
if (!dataApiDataSourceOptional.isPresent()) {
jsonResult.put("status", HttpStatus.INTERNAL_SERVER_ERROR.value());
jsonResult.put("message", "获取数据源失败!");
return jsonResult;
result.setMsg("获取数据源失败!");
return result;
}
DataApiDataSource dataApiDataSource = dataApiDataSourceOptional.get();
DataSourceTypeEnum dataSourceType = dataApiDataSource.getType();
// 获取sql的填充参数
List<Object> jdbcParamValues = new ArrayList<>();
for (LineDataRangeFilter lineDataRangeFilter : dataApiModelConfig.getLineDataRangeConfig()) {
for (FilterCondition filterCondition : lineDataRangeFilter.getFilterConditions()) {
jdbcParamValues.addAll(filterCondition.getFilterValues());
}
}
// 参数个数和sql需要填充个数是否一致 TODO
// 数据临时文件名
String tmpFileName = FileUtil.getPath() + dataApiSubscribeConfig.getId() + ".xlsx";
String tmpFileName = FileUtil.getPath() + System.currentTimeMillis() + ".xlsx";
// excel的列名
List<List<String>> heads = ColumnDataRangeFilter.generateExcelHead(dataApiModelConfig.getColumnDataRangeConfig());
// 异步分页查询数据(页:行:列)
List<List<List<Object>>> dataResult = asyncExecuteSql(heads, dataApiDataSource, fullSql, countSql, jdbcParamValues, dataSourceType);
Result<List<List<List<Object>>>> dataResult = doGetData(dataApiDataSource, dataApiModelConfig, heads);
if (dataResult.getCode().equals(Result.FAIL)) {
result.setMsg(dataResult.getMsg());
return result;
}
List<List<List<Object>>> data = dataResult.getData();
// 保存数据到文件
saveTmpDataFile(tmpFileName, dataResult, heads);
if (!saveTmpDataFile(tmpFileName, data, heads)) {
result.setMsg("系统异常:保存数据到文件中错误");
return result;
}
// 上传文件到存储服务器中 TODO
// 根据数据量返回数据,文件上传并获取下载地址(异步)并响应客户端数据 TODO
if (dataResult.size() <= 10000) {
if (data.size() <= 10000) {
// 直接返回数据 TODO
log.info("数据少于10000条,返回数据");
jsonResult.put("data", dataResult);
object.put("data", data);
object.put("heads", heads);
} else {
// 返回文件下载地址 TODO
log.info("数据大于10000条,返回数据文件下载地址");
jsonResult.put("data", "http://");
object.put("fileUrl", "http://test.com");
}
return jsonResult;
return result;
}
/**
* 异步线程去查询数据并写到excel文件中
* 注意:页码从0开始计算
*/
public List<List<List<Object>>> asyncExecuteSql(List<List<String>> heads, DataApiDataSource dataApiDataSource, String dataSql, String countSql, List<Object> jdbcParamValues, DataSourceTypeEnum dataSourceType) {
List<List<List<Object>>> results = new ArrayList<>();
// 获取全部数据
private Result<List<List<List<Object>>>> doGetData(DataApiDataSource dataApiDataSource, DataApiModelConfig dataApiModelConfig, List<List<String>> heads) {
Result<List<List<List<Object>>>> result = new Result<>();
result.setCode(Result.OK);
Long dataPageTotal;
String fullSql = dataApiModelConfig.getFullSql(); // TODO 前面需要判断该api是否已经审核通过了
String countSql = dataApiModelConfig.getCountSql();
if (StringUtils.isEmpty(fullSql) || StringUtils.isEmpty(countSql)) {
result.setCode(Result.FAIL);
result.setMsg("系统异常:未生成完整的sql");
return result;
}
try {
List<Object> jdbcParamValues = new ArrayList<>();
List<LineDataRangeFilter> lineDataRangeConfigs = dataApiModelConfig.getLineDataRangeConfig();
if (!CollectionUtils.isEmpty(lineDataRangeConfigs)) {
for (LineDataRangeFilter lineDataRangeFilter : lineDataRangeConfigs) {
for (FilterCondition filterCondition : lineDataRangeFilter.getFilterConditions()) {
jdbcParamValues.addAll(filterCondition.getFilterValues());
}
}
}
// 获取数据总量
dataPageTotal = getDataPageTotal(dataApiDataSource, countSql, jdbcParamValues);
Long pageNumber;
Long pageSize = queryDataTotalPerThread.longValue();
Long limitFirstValue;
Long limitSecondValue;
Long dataTotal = 1L;
// 异步分页查询数据(页:行:列)
List<List<List<Object>>> dataResult = asyncExecuteSql(dataPageTotal, heads, dataApiDataSource, fullSql, countSql, jdbcParamValues);
result.setData(dataResult);
result.setCode(Result.OK);
} catch (ExecutionException e) { // 异步线程可能报的错误: DataQueryException || RuntimeException
result.setCode(Result.FAIL);
result.setMsg(e.getMessage());
if (e.getCause() instanceof DataQueryException) {
// 数据查询异常,则记录到ES TODO
log.warn("记录数据到es....");
}
log.error("查询数据异步线程异常:", e);
} catch (Exception e) { // 其他业务逻辑异常
log.error("系统异常,获取数据异常:", e);
result.setCode(Result.FAIL);
result.setMsg("系统异常:" + e.getMessage());
}
return result;
}
try {
// 获取数据总页数
private Long getDataPageTotal(DataApiDataSource dataApiDataSource, String countSql, List<Object> jdbcParamValues) throws ExecutionException, InterruptedException {
// 获取数据总条数
QueryDataTask<Long> queryDataCountTask = new QueryDataTask<>();
queryDataCountTask.setDataApiDataSource(dataApiDataSource);
queryDataCountTask.setJdbcParamValues(jdbcParamValues);
queryDataCountTask.setCountSql(countSql);
Future<?> future = batchQueryDataThreadPool.addExecuteTask(queryDataCountTask);
dataTotal = (Long) future.get();
Long dataTotal = (Long) future.get();
// 计算页数(页码从0开始)
Long pageTotal = dataTotal / queryDataTotalPerThread;
log.info("数据总条数:{}, 数据总页数:{}", dataTotal, pageTotal + 1);
if ((dataTotal % queryDataTotalPerThread) != 0) {
pageTotal += 1; // 页码加1
}
log.info("数据总条数:{}, 数据总页数:{}", dataTotal, pageTotal);
return pageTotal;
}
/**
* 异步线程去查询数据并写到excel文件中
* 注意:页码从0开始计算
*/
public List<List<List<Object>>> asyncExecuteSql(Long pageTotal, List<List<String>> heads, DataApiDataSource dataApiDataSource, String dataSql, String countSql, List<Object> jdbcParamValues) throws InterruptedException, ExecutionException {
List<List<List<Object>>> results = new ArrayList<>();
Long pageNumber;
Long pageSize = queryDataTotalPerThread.longValue();
Long limitFirstValue;
Long limitSecondValue;
DataSourceTypeEnum dataSourceType = dataApiDataSource.getType();
// 根据数据页数封装多个任务并发分页查询数据库
List<QueryDataTask<List<List<Object>>>> queryDataTasks = new ArrayList<>();
List<Object> values;
while (pageTotal >= 0) {
Long index = 1L;
List<Future<List<List<Object>>>> futures = new ArrayList<>();
Boolean done = Boolean.FALSE;
try {
while (index <= pageTotal) {
// 封装成任务提交(sql,limit条件)
pageNumber = pageTotal;
pageNumber = index;
if (DataSourceTypeEnum.ORACLE.equals(dataSourceType)) {
limitFirstValue = (pageNumber + 1) * pageSize;
limitSecondValue = pageNumber * pageSize;
limitFirstValue = pageNumber* pageSize;
limitSecondValue = (pageNumber-1) * pageSize;
} else if (DataSourceTypeEnum.POSTGRESQL.equals(dataSourceType)) {
limitFirstValue = pageSize;
limitSecondValue = pageNumber;
limitSecondValue = pageNumber - 1;
} else if (DataSourceTypeEnum.MYSQL.equals(dataSourceType)) {
limitFirstValue = pageNumber;
limitFirstValue = pageNumber - 1;
limitSecondValue = pageSize;
} else {
log.info("不支持的数据库类型:{}", dataSourceType);
......@@ -241,24 +393,29 @@ public class DataApiCommonServiceImpl implements DataApiCommonService {
queryDataTask.setJdbcParamValues(values);
queryDataTask.setQueryDataTotalPerThread(queryDataTotalPerThread);
queryDataTasks.add(queryDataTask);
--pageTotal; // 页码减一
++index;
}
ThreadPoolExecutor threadPool = batchQueryDataThreadPool.getThreadPool();
List<Future<List<List<Object>>>> futures = threadPool.invokeAll(queryDataTasks);
futures = threadPool.invokeAll(queryDataTasks);
for (int i = 0; i < futures.size(); i++) {
List<List<Object>> dataJsonObjectList = futures.get(i).get();
results.add(dataJsonObjectList);
}
} catch (Exception e) {
log.error("异步获取数据错误:", e);
done = Boolean.TRUE;
} finally {
if (!done) {
log.warn("未完成查询数据的任务,开始关闭所有线程");
for (Future<List<List<Object>>> future : futures) {
future.cancel(true);
}
}
}
return results;
}
// 保存数据到临时文件中
public Boolean saveTmpDataFile(String excelFileName, List<List<List<Object>>> dataList, List<List<String>> heads) {
private Boolean saveTmpDataFile(String excelFileName, List<List<List<Object>>> dataList, List<List<String>> heads) {
// 等待每个并发任务完成,把查询到的数据写入excel表中
Boolean result = Boolean.TRUE;
try (ExcelWriter excelWriter = EasyExcel.write(excelFileName).build();) {
......
......@@ -29,6 +29,7 @@ import org.springframework.util.StringUtils;
import javax.persistence.criteria.Predicate;
import javax.transaction.Transactional;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Function;
import java.util.stream.Collectors;
......@@ -489,26 +490,29 @@ public class DataApiServiceManagementServiceImpl implements DataApiServiceManage
return failure("调试数据api服务失败--生成sql失败", null);
}
// 查询出前10条数据进行返回
Result<List<List<Object>>> executeSqlResult = new Result<>();
List<List<Object>> executeSqlDataList = new ArrayList<>();
List<List<Object>> data = new ArrayList<>();
List<Object> jdbcParams = getJdbcParams(dataApiModelConfig, dataApiDataSource.getType());
String errorMsg = "";
try{
executeSqlResult = executeDataQueryTask(fullSql, dataApiModelConfig, dataApiDataSource,jdbcParams);
executeSqlDataList = executeDataQueryTask(fullSql, dataApiModelConfig, dataApiDataSource,jdbcParams);
dataApiModel.setDebugStatus(DebugStatusEnum.SUCCESS);
data = executeSqlResult.getData();
}catch (Exception e){
log.error("执行数据查询失败:", e);
errorMsg = e.getMessage();
dataApiModel.setDebugStatus(DebugStatusEnum.FAIL);
}
// 保存调试结果
dataApiModelRepository.save(dataApiModel);
if(DebugStatusEnum.FAIL.equals(dataApiModel.getDebugStatus())){
return failure(errorMsg);
}
// 获取数据的列名
List<List<String>> heads = ColumnDataRangeFilter.generateExcelHead(dataApiModelConfig.getColumnDataRangeConfig());
result.put("data", data);
result.put("heads", heads);
if(DebugStatusEnum.FAIL.equals(dataApiModel.getDebugStatus())){
return failure(executeSqlResult.getMsg());
}
return success("数据api服务调试成功", result);
}
......@@ -553,9 +557,9 @@ public class DataApiServiceManagementServiceImpl implements DataApiServiceManage
return success(dataApiModelDetailVo);
}
private Result<List<List<Object>>> executeDataQueryTask(String dataSql, DataApiModelConfig dataApiModelConfig, DataApiDataSource dataSource, List<Object> jdbcParamValues) throws Exception{
private List<List<Object>> executeDataQueryTask(String dataSql, DataApiModelConfig dataApiModelConfig, DataApiDataSource dataSource, List<Object> jdbcParamValues) throws ExecutionException,InterruptedException {
QueryDataTask queryDataTask = new QueryDataTask();
QueryDataTask<List<List<Object>>> queryDataTask = new QueryDataTask();
queryDataTask.setDataApiDataSource(dataSource);
queryDataTask.setQuerySql(dataSql);
queryDataTask.setPageNumber(1L);
......@@ -563,8 +567,8 @@ public class DataApiServiceManagementServiceImpl implements DataApiServiceManage
queryDataTask.setJdbcParamValues(jdbcParamValues);
Future<?> future = debugQueryDataThreadPool.addExecuteTask(queryDataTask);
Result<List<List<Object>>> result = new Result<>();
result = (Result<List<List<Object>>>) future.get();
List<List<Object>> result = new ArrayList<>();
result = (List<List<Object>>) future.get();
return result;
}
private List<Object> getJdbcParams(DataApiModelConfig dataApiModelConfig, DataSourceTypeEnum dataSourceType){
......
......@@ -4,6 +4,7 @@ import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.hisense.dataservice.entity.DataApiDataSource;
import com.hisense.dataservice.exceptions.DataQueryException;
import com.hisense.dataservice.library.model.Result;
import lombok.extern.slf4j.Slf4j;
......@@ -61,22 +62,22 @@ public class JdbcUtil {
log.info("thread: {}, 返回的数据:{}", threadName, result);
return new Result("200", "查询成功", result);
} catch (SQLException sqlException) {
log.error("[{}] 数据库连接失败:", datasource.getSourceName(), sqlException);
return new Result("300", "数据源异常:" + sqlException.getMessage(), result);
log.error("sql: {}, params: {},数据查询异常:", sql, jdbcParamValues, sqlException);
throw new DataQueryException(sqlException.getMessage(), sqlException.getCause());
} catch (Exception e) {
log.error("thread: {}, sql: {}, params: {}, 分页查询数据异常:", threadName, sql, jdbcParamValues, e);
return new Result("400", "系统异常:" + e.getMessage(), result);
log.error("sql: {}, params: {}, 分页查询数据异常:", sql, jdbcParamValues, e);
throw new RuntimeException("系统异常:" + e.getMessage());
} finally {
try {
if (connection != null)
connection.close();
} catch (SQLException e) {
log.error("关闭数据库连接异常信息:{}", e.getMessage());
log.error("关闭数据库连接异常:{}", e.getMessage());
}
}
}
public static Result<Long> executeCountSql(DataApiDataSource datasource, String sql, List<Object> jdbcParamValues) {
public static Result<Long> executeCountSql(DataApiDataSource datasource, String sql, List<Object> jdbcParamValues) throws DataQueryException {
log.info("sql:{}", sql);
DruidPooledConnection connection = null;
try {
......@@ -100,14 +101,15 @@ public class JdbcUtil {
log.info("返回的数据:{}", count);
return new Result("200", "查询成功", count);
} catch (SQLException e) {
log.error("异常信息:{}", e.getMessage());
return new Result().setError("500", e.getMessage());
throw new DataQueryException(e.getMessage());
}catch (Exception e){
throw new RuntimeException("系统异常:" + e.getMessage());
} finally {
try {
if (connection != null)
connection.close();
} catch (SQLException e) {
log.error("错误信息:{}", e.getMessage());
log.error("关闭数据库连接异常:{}", e.getMessage());
}
}
}
......
......@@ -48,7 +48,7 @@ public class ThreadPoolManagerUtil {
new NamedThreadFactoryImpl(poolName)){
public void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
printException(r, t);
// printException(r, t);
}
};
}
......
......@@ -9,6 +9,7 @@ import com.hisense.dataservice.entity.DataApiModel;
import com.hisense.dataservice.entity.DataApiModelConfig;
import com.hisense.dataservice.entity.DataApiSubscribeConfig;
import com.hisense.dataservice.enums.*;
import com.hisense.dataservice.library.model.Result;
import com.hisense.dataservice.repository.DataApiDataSourceRepository;
import com.hisense.dataservice.repository.DataApiModelConfigRepository;
import com.hisense.dataservice.repository.DataApiModelRepository;
......@@ -213,13 +214,13 @@ public class DataApiCommonServiceTest {
List<ColumnDataRangeFilter> columnDataRangeConfig = dataApiModelConfig.getColumnDataRangeConfig();
List<List<String>> lists = ColumnDataRangeFilter.generateExcelHead(columnDataRangeConfig);
List<List<List<Object>>> datas = dataApiModelService.asyncExecuteSql(lists, new DataApiDataSource(), "select * from A where name = ? ", "select count(1) from A where name = ?", jdbcParams, DataSourceTypeEnum.ORACLE);
List<List<String>> heads = ColumnDataRangeFilter.generateExcelHead(dataApiModelConfig.getColumnDataRangeConfig());
String tmpFileName = FileUtil.getPath() + System.currentTimeMillis() + ".xlsx";
dataApiModelService.saveTmpDataFile(tmpFileName, datas, heads);
log.info("时间:{} ms", System.currentTimeMillis() - start);
// List<List<List<Object>>> datas = dataApiModelService.asyncExecuteSql(lists, new DataApiDataSource(), "select * from A where name = ? ", "select count(1) from A where name = ?", jdbcParams, DataSourceTypeEnum.ORACLE);
// List<List<String>> heads = ColumnDataRangeFilter.generateExcelHead(dataApiModelConfig.getColumnDataRangeConfig());
//
// String tmpFileName = FileUtil.getPath() + System.currentTimeMillis() + ".xlsx";
//
// dataApiModelService.saveTmpDataFile(tmpFileName, datas, heads);
// log.info("时间:{} ms", System.currentTimeMillis() - start);
}
@Test
......@@ -485,5 +486,12 @@ public class DataApiCommonServiceTest {
}
}
@Test
public void testQueryDataLatest(){
Result<JSONObject> staging = dataApiModelService.queryDataLatest("staging", "448086965d904841b9deb8d93d54fd8a");
log.info(String.valueOf(staging));
}
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论