提交 535adc44 authored 作者: songchuancai's avatar songchuancai

优化代码

上级 5d11c0d7
...@@ -29,16 +29,12 @@ public class QueryDataTask<T> implements Callable<T> { ...@@ -29,16 +29,12 @@ public class QueryDataTask<T> implements Callable<T> {
public T call() throws Exception { public T call() throws Exception {
if (StringUtils.isEmpty(querySql)) { if (StringUtils.isEmpty(querySql)) {
Result<Long> dataResult = JdbcUtil.executeCountSql(dataApiDataSource, countSql, jdbcParamValues); return (T) JdbcUtil.executeCountSql(dataApiDataSource, countSql, jdbcParamValues);
return (T) dataResult.getData();
// return (T) JdbcUtil.testCountSql().getData(); // TODO FOR TEST
} else { } else {
List<List<Object>> result = new ArrayList<>(); List<List<Object>> data = new ArrayList<>();
Result<List<JSONObject>> dataResult = JdbcUtil.executeSql(dataApiDataSource, querySql, jdbcParamValues, queryDataTotalPerThread); Result<List<JSONObject>> dataResult = JdbcUtil.executeSql(dataApiDataSource, querySql, jdbcParamValues, queryDataTotalPerThread);
Result<List<List<Object>>> result = new Result<>();
List<JSONObject> dataJsonList = dataResult.getData(); List<JSONObject> dataJsonList = dataResult.getData();
// TODO 针对错误信息保存到es
for (JSONObject dataJson : dataJsonList) { for (JSONObject dataJson : dataJsonList) {
List<Object> lineData = new ArrayList<>(); List<Object> lineData = new ArrayList<>();
for (List<String> head : columnHeads) { for (List<String> head : columnHeads) {
...@@ -46,11 +42,12 @@ public class QueryDataTask<T> implements Callable<T> { ...@@ -46,11 +42,12 @@ public class QueryDataTask<T> implements Callable<T> {
Object columnValue = dataJson.get(columnName); Object columnValue = dataJson.get(columnName);
lineData.add(columnValue); // 获取列名对应的值 lineData.add(columnValue); // 获取列名对应的值
} }
result.add(lineData); data.add(lineData);
} }
result.setMsg(dataResult.getMsg());
result.setData(data);
result.setMsg(dataResult.getMsg());
return (T) result; return (T) result;
// return (T) JdbcUtil.testExecuteSql(pageNumber).getData(); // TODO FOR TEST
} }
} }
} }
......
package com.hisense.dataservice.controller;
import com.hisense.dataservice.library.model.page.PageData;
import com.hisense.dataservice.library.model.page.PageLink;
import com.hisense.dataservice.service.DataServiceLogManagementService;
import com.hisense.dataservice.vo.CallLogVo;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
/**
* @author : scc
* @date : 2023/03/20
**/
@Api(tags = "数据api服务-日志管理")
@RestController
@RequestMapping("/api/v1/dataService/logs")
public class DataServiceLogManagementController extends BaseController{
@Autowired
private DataServiceLogManagementService dataServiceLogManagementService;
@ApiImplicitParams({
@ApiImplicitParam(name = "environment", value = "环境", required = true),
@ApiImplicitParam(name = "systemId", value = "系统ID", required = true),
@ApiImplicitParam(name = "nextToken", value = "下一页token", required = false),
@ApiImplicitParam(name = "pageSize", value = "指定返回结果中每页显示的记录数量", defaultValue = "10", required = false),
@ApiImplicitParam(name = "page", value = "指定显示返回结果中的第几页", defaultValue = "1", required = false),
@ApiImplicitParam(name = "sortProperty", value = "根据哪个字段排序", required = false, allowableValues = "timestamp, status", defaultValue = "timestamp"),
@ApiImplicitParam(name = "sortOrder", value = "排序方向", required = false, allowableValues = "ASC,DESC")
})
@GetMapping("/callLog/{environment}/{systemId}")
public PageData<CallLogVo> getCallLog(@PathVariable String environment,
@PathVariable Long systemId,
@RequestParam(value = "pageSize", required = false, defaultValue = "10") Integer pageSize,
@RequestParam(value = "page", required = false, defaultValue = "1") Integer page,
@RequestParam(value = "sortProperty", required = false) String sortProperty,
@RequestParam(value = "sortOrder", required = false) String sortOrder){
PageLink pageLink = createPageLink(pageSize, page, "", sortProperty, sortOrder);
return dataServiceLogManagementService.getCallLog(environment, systemId, pageLink);
}
}
package com.hisense.dataservice.controller; package com.hisense.dataservice.controller;
import com.alibaba.fastjson.JSONObject;
import com.hisense.dataservice.dto.DataApiModelDto; import com.hisense.dataservice.dto.DataApiModelDto;
import com.hisense.dataservice.enums.ModelConfigType;
import com.hisense.dataservice.library.model.Result; import com.hisense.dataservice.library.model.Result;
import com.hisense.dataservice.library.model.page.PageData; import com.hisense.dataservice.library.model.page.PageData;
import com.hisense.dataservice.library.model.page.PageLink; import com.hisense.dataservice.library.model.page.PageLink;
...@@ -94,5 +96,19 @@ public class DataServiceManagementController extends BaseController { ...@@ -94,5 +96,19 @@ public class DataServiceManagementController extends BaseController {
return null; return null;
} }
@ApiOperation(value = "数据服务管理-服务调试")
@ApiImplicitParams({
@ApiImplicitParam(name = "environment", value = "环境", required = true),
@ApiImplicitParam(name = "dataModelId", value = "数据服务ID", required = true),
@ApiImplicitParam(name = "type", value = "调试类型(发布/订阅)", required = true, allowableValues = "PUBLISH/SUBSCRIBE")
})
@GetMapping("/debugService/{environment}/{dataModelId}/{type}")
public Result<JSONObject> debugService(@PathVariable String environment,
@PathVariable Long dataModelId,
@PathVariable ModelConfigType type) {
return dataApiServiceManagementService.debugDataApiService(environment, dataModelId,type);
}
} }
...@@ -50,6 +50,8 @@ public class DataApiModel extends BaseEntity{ ...@@ -50,6 +50,8 @@ public class DataApiModel extends BaseEntity{
@Column(nullable = false) @Column(nullable = false)
private Long publishSystemId; private Long publishSystemId;
// 发布系统名称
private String publishSystemName;
// 内外网 // 内外网
@Enumerated(EnumType.STRING) @Enumerated(EnumType.STRING)
@Column(name = "network_env", columnDefinition = "varchar2(10)", nullable = false) @Column(name = "network_env", columnDefinition = "varchar2(10)", nullable = false)
......
...@@ -7,12 +7,7 @@ package com.hisense.dataservice.enums; ...@@ -7,12 +7,7 @@ package com.hisense.dataservice.enums;
**/ **/
public enum OperateEnum { public enum OperateEnum {
GREATER_THAN("大于"),
LESS_THAN("小于"),
EQUAL("等于"), EQUAL("等于"),
NOT_EQUAL("不等于"),
GT_EQ("大于等于"),
LT_EQ("小于等于"),
LIKE("模糊匹配"), LIKE("模糊匹配"),
IN("集合"), IN("集合"),
BETWEEN_AND("范围"); BETWEEN_AND("范围");
......
package com.hisense.dataservice.es.config;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.Assert;
import java.util.Arrays;
import java.util.Objects;
/**
* @author : scc
* @date : 2023/01/05
**/
@Configuration
@ConfigurationProperties(prefix = "es")
@Slf4j
@Data
public class ElasticsearchConfig {
private String hosts;
private String username;
private String password;
private int connectTimeout;
private int socketTimeout;
private int connectionRequestTimeout;
@Bean
public RestHighLevelClient client() {
log.info("elasticsearch init start ");
Assert.hasLength(hosts, "elasticsearch host is null");
HttpHost[] httpHosts = Arrays.stream(hosts.split(",")).map(host -> {
Assert.hasLength(host, "elasticsearch host is null");
String[] h = host.split(":");
if (h.length != 2) throw new RuntimeException("host ip or port formate is error");
return new HttpHost(h[0], Integer.parseInt(h[1]));
}).filter(Objects::nonNull).toArray(HttpHost[]::new);
RestClientBuilder builder = RestClient.builder(httpHosts);
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
builder.setHttpClientConfigCallback(f -> f.setDefaultCredentialsProvider(credentialsProvider))
.setRequestConfigCallback(r -> {
r.setConnectTimeout(connectTimeout);
r.setSocketTimeout(socketTimeout);
r.setConnectionRequestTimeout(connectionRequestTimeout);
return r;
});
return new RestHighLevelClient(builder);
}
}
\ No newline at end of file
package com.hisense.dataservice.es.data;
import lombok.Data;
/**
* @author : scc
* @date : 2023/01/05
**/
@Data
public class EsBaseData {
//索引
public String indexName;
//id
public String esId;
}
\ No newline at end of file
package com.hisense.dataservice.es.data;
import com.hisense.dataservice.es.enums.FieldTypeEnum;
import lombok.Data;
/**
* @author : scc
* @date : 2023/01/05
**/
@Data
public class EsField {
//字段对应操作功能
private FieldTypeEnum fieldTypeEnum;
//查询字段名称
private String field;
//查询字段对应的值
private String value;
public EsField(){
}
public EsField(String field ,String value , FieldTypeEnum fieldTypeEnum){
this.value = value;
this.field = field;
this.fieldTypeEnum = fieldTypeEnum;
}
}
\ No newline at end of file
package com.hisense.dataservice.es.data;
import org.springframework.util.CollectionUtils;
import java.util.List;
/**
* @author : scc
* @date : 2023/01/05
**/
public class EsPage<T> {
//当前页的数量
private int size;
/**
* 当前页
*/
private int pageNum;
/**
* 每页显示多少条
*/
private int pageSize;
/**
* 总记录数
*/
private int total;
/**
* 本页的数据列表
*/
private List<T> list;
/**
* 总页数
*/
private int pages;
/**
* 页码列表的开始索引(包含)
*/
private int beginPageIndex;
/**
* 页码列表的结束索引(包含)
*/
private int endPageIndex;
/**
* 只接受前4个必要的属性,会自动的计算出其他3个属性的值
*
* @param pageNum
* @param pageSize
* @param total
* @param list
*/
public EsPage(int pageNum, int pageSize, int total, List<T> list) {
this.pageNum = pageNum;
this.pageSize = pageSize;
this.total = total;
this.list = list;
this.size = CollectionUtils.isEmpty(list)?0:list.size();
// 计算总页码
pages =pageSize==0?0:(total + pageSize - 1) / pageSize;
// 计算 beginPageIndex 和 endPageIndex
// >> 总页数不多于10页,则全部显示
if (pages <= 10) {
beginPageIndex = 1;
endPageIndex = pages;
}
// >> 总页数多于10页,则显示当前页附近的共10个页码
else {
// 当前页附近的共10个页码(前4个 + 当前页 + 后5个)
beginPageIndex = pageNum - 4;
endPageIndex = pageNum + 5;
// 当前面的页码不足4个时,则显示前10个页码
if (beginPageIndex < 1) {
beginPageIndex = 1;
endPageIndex = 10;
}
// 当后面的页码不足5个时,则显示后10个页码
if (endPageIndex > pages) {
endPageIndex = pages;
beginPageIndex = pages - 10 + 1;
}
}
}
public int getPageSize() {
return pageSize;
}
public void setPageSize(int pageSize) {
this.pageSize = pageSize;
}
public int getPages() {
return pages;
}
public void setPages(int pages) {
this.pages = pages;
}
public int getBeginPageIndex() {
return beginPageIndex;
}
public void setBeginPageIndex(int beginPageIndex) {
this.beginPageIndex = beginPageIndex;
}
public int getEndPageIndex() {
return endPageIndex;
}
public void setEndPageIndex(int endPageIndex) {
this.endPageIndex = endPageIndex;
}
public List<T> getList() {
return list;
}
public void setList(List<T> list) {
this.list = list;
}
public int getSize() {
return size;
}
public void setSize(int size) {
this.size = size;
}
public int getPageNum() {
return pageNum;
}
public void setPageNum(int pageNum) {
this.pageNum = pageNum;
}
public int getTotal() {
return total;
}
public void setTotal(int total) {
this.total = total;
}
}
package com.hisense.dataservice.es.data;
import lombok.AllArgsConstructor;
import lombok.Data;
/**
* @author : scc
* @date : 2023/01/10
**/
@Data
@AllArgsConstructor
public class EsSearchData {
private String tenantId;
private String deviceId;
private String productId;
private String deviceType;
private String topic;
private String ts;
private String data;
private Long nextToken;
}
package com.hisense.dataservice.es.enums;
/**
* @author : scc
* @date : 2023/01/05
**/
public enum FieldTypeEnum {
//精准查询
PRECISE_QUERY,
//模糊查询
VAGUE_QUERY,
//排序_升序
ORDER_ASC,
//排序_降序
ORDER_ESC,
//时间范围查询
TS_RANGE_QUERY
;
}
package com.hisense.dataservice.es.utils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.hisense.dataservice.es.data.EsBaseData;
import com.hisense.dataservice.es.data.EsField;
import com.hisense.dataservice.es.data.EsPage;
import com.hisense.dataservice.es.data.EsSearchData;
import lombok.extern.slf4j.Slf4j;
import org.apache.poi.ss.formula.functions.T;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.IndicesClient;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.common.unit.Fuzziness;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import springfox.documentation.spring.web.json.Json;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
/**
* @author : scc
* @date : 2023/01/05
**/
@Component
@Slf4j
public class ElasticsearchUtils {
@Autowired
private RestHighLevelClient restHighLevelClient;
/**
* 添加索引
*
* @param indexName
* @return
* @throws IOException
*/
public boolean addIndex(String indexName) {
Assert.hasLength(indexName, "Elasticsearch exception indexName null");
CreateIndexResponse createIndexResponse = null;
try {
//1.使用client获取操作索引对象
IndicesClient indices = restHighLevelClient.indices();
//2.具体操作获取返回值
//2.1 设置索引名称
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
createIndexResponse = indices.create(createIndexRequest, RequestOptions.DEFAULT);
} catch (Exception e) {
// e.printStackTrace();
log.error("elasticsearch addindex error , meassage = {}", e.getMessage());
//打印轨迹
log.error(e.getMessage(), e);
throw new RuntimeException("elasticsearch addindex error , meassage=" + e.getMessage());
}
//3.根据返回值判断结果
return createIndexResponse.isAcknowledged();
}
/**
* 删除索引
*
* @param indexName
* @return
* @throws IOException
*/
public boolean deleteIndex(String indexName) {
Assert.hasLength(indexName, "Elasticsearch exception indexName null");
AcknowledgedResponse deleteRespone = null;
try {
//1.使用client获取操作索引对象
IndicesClient indices = restHighLevelClient.indices();
//2.具体操作获取返回值
//2.1 设置索引名称
DeleteIndexRequest request = new DeleteIndexRequest(indexName);//指定要删除的索引名称
deleteRespone = indices.delete(request, RequestOptions.DEFAULT);
} catch (Exception e) {
// e.printStackTrace();
log.error("elasticsearch deleteIndex error , meassage = {}", e.getMessage());
//打印轨迹
log.error(e.getMessage(), e);
throw new RuntimeException("elasticsearch deleteIndex error , meassage=" + e.getMessage());
}
//3.根据返回值判断结果
return deleteRespone.isAcknowledged();
}
/**
* 创建数据
*
* @param indexName
* @param id
* @param data
* @throws IOException
*/
public boolean addData(String indexName, String id, Object data) {
Assert.notNull(data, "Elasticsearch exception data null");
Assert.hasLength(indexName, "Elasticsearch exception indexName null");
IndexResponse indexResponse = null;
try {
//准备文档
String jsonString = JSONObject.toJSONString(data);
Map jsonMap = JSONObject.parseObject(jsonString, Map.class);
//创建请求
IndexRequest indexRequest = new IndexRequest(indexName).id(id);
//指定文档内容
indexRequest.source(jsonMap);
//true 当存在相同的_id时,插入会出现异常; false 当存在相同_id时,插入会进行覆盖;
indexRequest.create(true);
//通过client进行http请求
indexResponse = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
} catch (Exception e) {
log.error("elasticsearch addOrUpdateDoc error , meassage = {}", e.getMessage());
//打印轨迹
log.error(e.getMessage(), e);
throw new RuntimeException("elasticsearch addOrUpdateDoc error , meassage=" + e.getMessage());
}
return indexResponse.getResult().equals(DocWriteResponse.Result.CREATED);
}
/**
* 创建数据
*
*/
public boolean addData(String indexName, String jsonString) {
Assert.notNull(jsonString, "Elasticsearch exception data null");
Assert.hasLength(indexName, "Elasticsearch exception indexName null");
IndexResponse indexResponse = null;
try {
//准备文档
Map jsonMap = JSONObject.parseObject(jsonString, Map.class);
//创建请求
IndexRequest indexRequest = new IndexRequest(indexName).id(UUID.randomUUID().toString());
//指定文档内容
indexRequest.source(jsonMap);
//true 当存在相同的_id时,插入会出现异常; false 当存在相同_id时,插入会进行覆盖;
indexRequest.create(true);
indexRequest.type("_doc");
//通过client进行http请求
indexResponse = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
} catch (Exception e) {
log.error("elasticsearch addOrUpdateDoc error , meassage = {}", e.getMessage());
//打印轨迹
log.error(e.getMessage(), e);
throw new RuntimeException("elasticsearch addOrUpdateDoc error , meassage=" + e.getMessage());
}
return indexResponse.getResult().equals(DocWriteResponse.Result.CREATED);
}
/**
* 创建文档id存在则更新文档
*
* @param indexName
* @param id
* @param data
* @throws IOException
*/
public boolean addOrUpdateData(String indexName, String id, Object data) {
Assert.notNull(data, "Elasticsearch exception data null");
Assert.hasLength(indexName, "Elasticsearch exception indexName null");
IndexResponse indexResponse = null;
try {
//准备文档
String jsonString = JSONObject.toJSONString(data);
Map jsonMap = JSONObject.parseObject(jsonString, Map.class);
//创建请求
IndexRequest indexRequest = new IndexRequest(indexName).id(id);
//指定文档内容
indexRequest.source(jsonMap);
//通过client进行http请求
indexResponse = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
} catch (Exception e) {
log.error("elasticsearch addOrUpdateDoc error , meassage = {}", e.getMessage());
//打印轨迹
log.error(e.getMessage(), e);
throw new RuntimeException("elasticsearch addOrUpdateDoc error , meassage=" + e.getMessage());
}
return indexResponse.getResult().equals(DocWriteResponse.Result.CREATED);
}
/**
* 单条更新
*
* @param indexName
* @param id
* @param data
* @return
* @throws IOException
*/
public boolean updateData(String indexName, String id, Object data) throws IOException {
UpdateRequest updateRequest = new UpdateRequest(indexName, id);
//准备文档
String jsonString = JSONObject.toJSONString(data);
Map jsonMap = JSONObject.parseObject(jsonString, Map.class);
updateRequest.doc(jsonMap);
updateRequest.timeout(TimeValue.timeValueSeconds(1));
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
//数据为存储而不是更新
UpdateResponse update = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
return update.getGetResult().equals(DocWriteResponse.Result.UPDATED);
}
/**
* 批量新增数据
*
* @param index
* @param datas
* @return
*/
public boolean addBatchData(String index, List<? extends EsBaseData> datas) {
Assert.hasLength(index, "Elasticsearch exception indexName null");
Assert.notEmpty(datas, "addBatchData elastaicsearch exception datas is null");
if (datas.size() > 100000) {
log.error("es add batch data too large{}", datas.size());
throw new RuntimeException("es add batch data too large" + datas.size());
}
BulkResponse bulk = null;
try {
BulkRequest request = new BulkRequest();
datas.forEach(data -> {
String source = JSON.toJSONString(data);
request.add(new IndexRequest(data.getIndexName()).id(data.esId).source(source, XContentType.JSON));
});
bulk = restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
} catch (Exception e) {
log.error("elasticsearch addBatchData error , meassage = {}", e.getMessage());
//打印轨迹
log.error(e.getMessage(), e);
throw new RuntimeException("elasticsearch addBatchData error , meassage=" + e.getMessage());
}
return !bulk.hasFailures();
}
/**
* 通过id删除数据
*
* @param indexName
* @param id
* @return
*/
public boolean deleteDataById(String indexName, String id) {
DeleteRequest deleteRequest = new DeleteRequest(indexName, id);
DeleteResponse response = null;
try {
response = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
log.error("elasticsearch deleteDocById error , meassage = {}", e.getMessage());
//打印轨迹
log.error(e.getMessage(), e);
throw new RuntimeException("elasticsearch deleteDataById error , meassage=" + e.getMessage());
}
return response.getResult().equals(DocWriteResponse.Result.DELETED);
}
/**
* 通过条件删除数据
*
* @param indexName
* @param conditionFileds
* @return
*/
public boolean deleteDataByCondition(String indexName, List<EsField> conditionFileds) {
Assert.hasLength(indexName, "Elasticsearch exception indexName null");
Assert.notEmpty(conditionFileds, "Elasticsearch exception conditionFileds null");
BulkByScrollResponse resp = null;
try {
DeleteByQueryRequest request = new DeleteByQueryRequest(indexName);
// SearchSourceBuilder searchSourceBuilder = buildSearchSourceBuilder(conditionFileds);
// request.getSearchRequest().source(searchSourceBuilder);
// 更新时版本冲突
request.setConflicts("proceed");
//构建条件
setDeletCondition(conditionFileds, request);
// 刷新索引
request.setRefresh(true);
resp = restHighLevelClient.deleteByQuery(request, RequestOptions.DEFAULT);
} catch (Exception e) {
log.error("elasticsearch deleteDataByCondition error , meassage = {}", e.getMessage());
//打印轨迹
log.error(e.getMessage(), e);
throw new RuntimeException("elasticsearch deleteDataByCondition error , meassage=" + e.getMessage());
}
return resp.getStatus().getDeleted() > 0;
}
/**
* 通过条件更新数据
*
* @param indexName
* @param conditionFileds
* @return
*/
public boolean updateDataByCondition(String indexName, List<EsField> conditionFileds, Object data) {
Assert.hasLength(indexName, "Elasticsearch exception indexName null");
Assert.notEmpty(conditionFileds, "Elasticsearch exception conditionFileds null");
Assert.notNull(data, "elasticsearch updateDataByCondition data is null ");
BulkByScrollResponse resp = null;
try {
UpdateByQueryRequest request = new UpdateByQueryRequest(indexName);
//设置分片并行
request.setSlices(2);
//设置版本冲突时继续执行
request.setConflicts("proceed");
//构建条件
setUpdateConfition(conditionFileds, request);
//设置更新完成后刷新索引 ps很重要如果不加可能数据不会实时刷新
request.setRefresh(true);
StringBuffer scriptContext = buildScriptContext(data);
//设置要修改的内容可以多个值多个用;隔开
request.setScript(new Script(scriptContext.toString()));
resp = restHighLevelClient.updateByQuery(request, RequestOptions.DEFAULT);
} catch (Exception e) {
log.error("elasticsearch updateDataByCondition error , meassage = {}", e.getMessage());
//打印轨迹
log.error(e.getMessage(), e);
throw new RuntimeException("elasticsearch updateDataByCondition error , meassage=" + e.getMessage());
}
return resp.getStatus().getUpdated() > 0;
}
/**
* 根据id查询文档
*/
public <T> T selectDataById(String indexName, String id, Class<T> c) {
Assert.hasLength(indexName, "Elasticsearch exception indexName null");
Assert.hasLength(id, "Elasticsearch exception id null");
GetResponse response = null;
try {
//设置查询的索引、文档
GetRequest indexRequest = new GetRequest(indexName, id);
response = restHighLevelClient.get(indexRequest, RequestOptions.DEFAULT);
} catch (Exception e) {
log.error("elasticsearch selectDataById error , meassage = {}", e.getMessage());
//打印轨迹
log.error(e.getMessage(), e);
throw new RuntimeException("elasticsearch selectDataById error , meassage=" + e.getMessage());
}
String res = response.getSourceAsString();
return JSONObject.parseObject(res, c);
}
/**
* 条件查询
*
* @param indexName
* @param conditionFileds 条件
* @param c 返回对象类型
* @return
*/
public <T> List<T> selectDataList(String indexName, List<EsField> conditionFileds, Class<T> c) {
Assert.hasLength(indexName, "Elasticsearch exception indexName null");
Assert.notNull(c, "Class<T> is null ");
List<T> res = null;
try {
// 创建检索请求
SearchRequest searchRequest = new SearchRequest();
// 指定索引
searchRequest.indices(indexName);
SearchSourceBuilder searchSourceBuilder = buildSearchSourceBuilder(conditionFileds);
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
//分析结果
SearchHit[] hits = searchResponse.getHits().getHits();
res = new ArrayList<>();
for (SearchHit hit : hits
) {
String data = hit.getSourceAsString();
T t = JSONObject.parseObject(data, c);
log.info("data={}", data);
res.add(t);
}
} catch (Exception e) {
log.error("elasticsearch selectDataList error , meassage = {}", e.getMessage());
//打印轨迹
log.error(e.getMessage(), e);
throw new RuntimeException("elasticsearch selectDataList error , meassage=" + e.getMessage());
}
return res;
}
/**
* 条件分页查询
*
* @param indexName
* @param conditionFileds 条件
* @param c 返回对象类型
* @return
*/
public <T> EsPage<T> selectDataPage(String indexName, Integer pageNum, Integer pageSize, List<EsField> conditionFileds, Class<T> c) {
Assert.hasLength(indexName, "Elasticsearch exception indexName null");
Assert.notNull(c, "Class<T> is null ");
List<T> res = null;
//总记录数
Integer total = 0;
try {
// 创建检索请求
SearchRequest searchRequest = new SearchRequest();
// 指定索引
searchRequest.indices(indexName);
SearchSourceBuilder searchSourceBuilder = buildSearchSourceBuilder(conditionFileds);
//设置分页
searchSourceBuilder.from((pageNum - 1) * pageSize).size(pageSize);
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
//分析结果
SearchHit[] hits = searchResponse.getHits().getHits();
total = new Long(searchResponse.getHits().getTotalHits().value).intValue();
res = new ArrayList<>();
for (SearchHit hit : hits
) {
String data = hit.getSourceAsString();
T t = JSONObject.parseObject(data, c);
log.info("data={}", data);
res.add(t);
}
} catch (Exception e) {
log.error("elasticsearch selectDataPage error , meassage = {}", e.getMessage());
//打印轨迹
// log.error(e.getMessage(), e);
throw new RuntimeException("elasticsearch selectDataPage error , meassage=" + e.getMessage());
}
return new EsPage<>(pageNum, pageSize, total, res);
}
/**
* 条件查询
*
* @param indexName
* @param conditionFileds 条件
* @param c 返回对象类型
* @return
*/
public <T> List<T> selectDataListsByPreciseQuery(String indexName, List<EsField> conditionFileds, Class<T> c, SortOrder sortOrder, String[] includeFields, String[] excludeFields, Integer pageSize, Long nextTime) {
Assert.hasLength(indexName, "Elasticsearch exception indexName null");
Assert.notNull(c, "Class<T> is null ");
List<T> res = null;
try {
// 创建检索请求
SearchRequest searchRequest = new SearchRequest();
// 指定索引
searchRequest.indices(indexName);
// 构建查询条件
QueryBuilder queryBuilder = buildBoolMustQueryBuilder(conditionFileds);
// 构建DSL
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(queryBuilder);
// 获取的字段(列)和不需要获取的列
searchSourceBuilder.fetchSource(includeFields, excludeFields);
//设置分页
searchSourceBuilder.size(pageSize);
log.info("es 查询条件:{}", searchSourceBuilder.toString());
// 设置查询条件
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
//分析结果
SearchHit[] hits = searchResponse.getHits().getHits();
res = new ArrayList<>();
for (SearchHit hit : hits
) {
String data = hit.getSourceAsString();
T t = JSONObject.parseObject(data, c);
log.info("es查询到:data={}", data);
res.add(t);
}
} catch (Exception e) {
log.error("elasticsearch selectDataListsByPreciseQuery error , message: ", e);
throw new RuntimeException("elasticsearch selectDataList error , message: " + e.getMessage());
}
return res;
}
/**
* 条件查询-根据排序字段进行分页的查询操作
*
* @return
*/
public <T> List<T> selectDataListsBySearchAfterPage(String indexName, List<EsField> conditionFileds, SortOrder sortOrder, String[] includeFields, String[] excludeFields, Integer pageSize, Long nextToken, String pageSortFiled, Class<T> c) {
Assert.hasLength(indexName, "Elasticsearch exception indexName null");
List<T> res = null;
try {
// 创建检索请求
SearchRequest searchRequest = new SearchRequest();
// 指定索引
searchRequest.indices(indexName);
// 构建查询条件
QueryBuilder queryBuilder = buildBoolMustQueryBuilder(conditionFileds);
// 构建DSL
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(queryBuilder);
// 设置排序
searchSourceBuilder.sort(new FieldSortBuilder(pageSortFiled).order(sortOrder));
// 获取的字段(列)和不需要获取的列
searchSourceBuilder.fetchSource(includeFields, excludeFields);
//设置分页
searchSourceBuilder.size(pageSize);
searchSourceBuilder.from(0);
searchSourceBuilder.searchAfter(new Object[]{nextToken});
log.info("es 查询条件:{}", searchSourceBuilder.toString());
// 设置查询条件
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
//分析结果
SearchHit[] hits = searchResponse.getHits().getHits();
res = new ArrayList<>();
for (SearchHit hit : hits
) {
Object[] sortValues = hit.getSortValues();
Object nextTimeBySort = sortValues[0];
String data = hit.getSourceAsString();
JSONObject dataJson = JSONObject.parseObject(data);
dataJson.put("nextToken", nextTimeBySort);
T t = JSONObject.parseObject(dataJson.toJSONString(), c);
log.info("es查询到:data={}", data);
res.add(t);
}
} catch (Exception e) {
log.error("elasticsearch selectDataListsByPreciseQuery error , message: ", e);
throw new RuntimeException("elasticsearch selectDataList error , message: " + e.getMessage());
}
return res;
}
private static QueryBuilder buildBoolMustQueryBuilder(List<EsField> conditionFileds){
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
//构建条件
if (!CollectionUtils.isEmpty(conditionFileds)) {
for (EsField condFiled :
conditionFileds) {
switch (condFiled.getFieldTypeEnum()) {
case VAGUE_QUERY:
// 模糊查询
boolQueryBuilder.must(QueryBuilders.matchQuery(condFiled.getField(), condFiled.getValue()).fuzziness(Fuzziness.AUTO));
break;
case PRECISE_QUERY:
// 精准查询
boolQueryBuilder.must(QueryBuilders.matchQuery(condFiled.getField()+".keyword", condFiled.getValue()));
break;
case TS_RANGE_QUERY:
String[] values = condFiled.getValue().split(";");
boolQueryBuilder.must(QueryBuilders.rangeQuery(condFiled.getField())
.from(values[0])
.to(values[1])
.includeLower(true)
.includeUpper(true)
.boost(1.0f));
break;
default:
//默认精准查询
boolQueryBuilder.must(QueryBuilders.matchQuery(condFiled.getField(), condFiled.getValue()));
break;
}
}
}
return boolQueryBuilder;
}
/**
* 创建搜索条件
*
* @param conditionFileds
* @return
*/
private static SearchSourceBuilder buildSearchSourceBuilder(List<EsField> conditionFileds) {
// 指定DSL
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
//构建条件
if (!CollectionUtils.isEmpty(conditionFileds)) {
for (EsField condFiled :
conditionFileds) {
switch (condFiled.getFieldTypeEnum()) {
case ORDER_ASC:
//升序
searchSourceBuilder.sort(condFiled.getField(), SortOrder.ASC);
break;
case ORDER_ESC:
//降序
searchSourceBuilder.sort(condFiled.getField(), SortOrder.DESC);
break;
case VAGUE_QUERY:
// 模糊查询
searchSourceBuilder.query(QueryBuilders.matchQuery(condFiled.getField(), condFiled.getValue()).fuzziness(Fuzziness.AUTO));
break;
case PRECISE_QUERY:
searchSourceBuilder.query(QueryBuilders.matchQuery(condFiled.getField(), condFiled.getValue()));
break;
default:
//默认精准查询
searchSourceBuilder.query(QueryBuilders.matchQuery(condFiled.getField(), condFiled.getValue()));
break;
}
}
}
return searchSourceBuilder;
}
/**
* 构建删除条件
*
* @param conditionFileds
* @param request
*/
private void setDeletCondition(List<EsField> conditionFileds, DeleteByQueryRequest request) {
if (!CollectionUtils.isEmpty(conditionFileds)) {
for (EsField condFiled :
conditionFileds) {
switch (condFiled.getFieldTypeEnum()) {
case VAGUE_QUERY:
// 模糊查询
request.setQuery(QueryBuilders.matchQuery(condFiled.getField(), condFiled.getValue()).fuzziness(Fuzziness.AUTO));
break;
case PRECISE_QUERY:
request.setQuery(QueryBuilders.matchQuery(condFiled.getField(), condFiled.getValue()));
break;
default:
log.error("field type error ,only supprt VAGUE_QUERY and PRECISE_QUERY");
throw new RuntimeException(" field type error ,only supprt VAGUE_QUERY and PRECISE_QUERY");
}
}
}
}
/**
* 构建修改条件
*
* @param conditionFileds
* @param request
*/
private void setUpdateConfition(List<EsField> conditionFileds, UpdateByQueryRequest request) {
if (!CollectionUtils.isEmpty(conditionFileds)) {
for (EsField condFiled :
conditionFileds) {
switch (condFiled.getFieldTypeEnum()) {
case VAGUE_QUERY:
// 模糊查询
request.setQuery(QueryBuilders.matchQuery(condFiled.getField(), condFiled.getValue()).fuzziness(Fuzziness.AUTO));
break;
case PRECISE_QUERY:
request.setQuery(QueryBuilders.matchQuery(condFiled.getField(), condFiled.getValue()));
break;
default:
log.error("field type error ,only supprt VAGUE_QUERY and PRECISE_QUERY");
throw new RuntimeException(" field type error ,only supprt VAGUE_QUERY and PRECISE_QUERY");
}
}
}
}
/**
* 创建修改script内容
*
* @param data
* @return
*/
private StringBuffer buildScriptContext(Object data) {
//准备文档
String jsonString = JSONObject.toJSONString(data);
Map<String, String> jsonMap = JSONObject.parseObject(jsonString, Map.class);
String tem = "ctx._source['key']='value'";
StringBuffer scriptContext = new StringBuffer();
for (String key :
jsonMap.keySet()) {
String value = jsonMap.get(key);
scriptContext.append(tem.replace("key", key).replace("value", value))
.append(";");
}
return scriptContext;
}
}
...@@ -6,6 +6,7 @@ import org.springframework.data.domain.Page; ...@@ -6,6 +6,7 @@ import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable; import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.domain.Specification; import org.springframework.data.jpa.domain.Specification;
import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.stereotype.Repository; import org.springframework.stereotype.Repository;
import java.util.Optional; import java.util.Optional;
...@@ -24,4 +25,7 @@ public interface DataApiModelRepository extends JpaRepository<DataApiModel,Long> ...@@ -24,4 +25,7 @@ public interface DataApiModelRepository extends JpaRepository<DataApiModel,Long>
Optional<DataApiModel> findByIdAndSystemEnv(Long id, SystemEnv systemEnv); Optional<DataApiModel> findByIdAndSystemEnv(Long id, SystemEnv systemEnv);
boolean existsByNameAndSystemEnv(String name, SystemEnv env); boolean existsByNameAndSystemEnv(String name, SystemEnv env);
@Query(value = "select name from system_info where id = :systemId", nativeQuery = true)
String findSystemNameBySystemId(Long systemId);
} }
...@@ -2,6 +2,7 @@ package com.hisense.dataservice.service; ...@@ -2,6 +2,7 @@ package com.hisense.dataservice.service;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.hisense.dataservice.dto.DataApiModelDto; import com.hisense.dataservice.dto.DataApiModelDto;
import com.hisense.dataservice.enums.ModelConfigType;
import com.hisense.dataservice.library.model.Result; import com.hisense.dataservice.library.model.Result;
import com.hisense.dataservice.library.model.page.PageData; import com.hisense.dataservice.library.model.page.PageData;
import com.hisense.dataservice.library.model.page.PageLink; import com.hisense.dataservice.library.model.page.PageLink;
...@@ -22,7 +23,7 @@ public interface DataApiServiceManagementService extends BaseService { ...@@ -22,7 +23,7 @@ public interface DataApiServiceManagementService extends BaseService {
// 创建或修改未发布的数据api服务(草稿状态) // 创建或修改未发布的数据api服务(草稿状态)
Result<Long> createDataApi(String environment, DataApiModelDto dataApiModelDto); Result<Long> createDataApi(String environment, DataApiModelDto dataApiModelDto);
Result<JSONObject> debugDataApiService(String environment, Long dataModelId); Result<JSONObject> debugDataApiService(String environment, Long dataModelId, ModelConfigType type);
Result<PageData<DataApiModelItemVo>> queryDataApiList(String environment, Integer status, PageLink pageLink); Result<PageData<DataApiModelItemVo>> queryDataApiList(String environment, Integer status, PageLink pageLink);
......
package com.hisense.dataservice.service;
import com.hisense.dataservice.library.model.page.PageData;
import com.hisense.dataservice.library.model.page.PageLink;
import com.hisense.dataservice.vo.CallLogVo;
/**
* @author : scc
* @date : 2023/03/20
**/
public interface DataServiceLogManagementService {
PageData<CallLogVo> getCallLog(String environment, Long systemId, PageLink pageLink);
}
...@@ -5,6 +5,7 @@ import com.hisense.dataservice.bo.*; ...@@ -5,6 +5,7 @@ import com.hisense.dataservice.bo.*;
import com.hisense.dataservice.dto.DataApiModelDto; import com.hisense.dataservice.dto.DataApiModelDto;
import com.hisense.dataservice.entity.*; import com.hisense.dataservice.entity.*;
import com.hisense.dataservice.enums.*; import com.hisense.dataservice.enums.*;
import com.hisense.dataservice.es.utils.ElasticsearchUtils;
import com.hisense.dataservice.library.model.Result; import com.hisense.dataservice.library.model.Result;
import com.hisense.dataservice.library.model.page.PageData; import com.hisense.dataservice.library.model.page.PageData;
import com.hisense.dataservice.library.model.page.PageLink; import com.hisense.dataservice.library.model.page.PageLink;
...@@ -58,6 +59,9 @@ public class DataApiServiceManagementServiceImpl implements DataApiServiceManage ...@@ -58,6 +59,9 @@ public class DataApiServiceManagementServiceImpl implements DataApiServiceManage
private ThreadPoolManagerUtil debugQueryDataThreadPool = new ThreadPoolManagerUtil("debugQueryDataThread"); private ThreadPoolManagerUtil debugQueryDataThreadPool = new ThreadPoolManagerUtil("debugQueryDataThread");
@Autowired
private ElasticsearchUtils elasticsearchUtils;
@Transactional @Transactional
public Result<Boolean> createOrUpdateDataApi(DataApiModelDto dataApiModelDto) { public Result<Boolean> createOrUpdateDataApi(DataApiModelDto dataApiModelDto) {
// 获取数据源 // 获取数据源
...@@ -170,6 +174,7 @@ public class DataApiServiceManagementServiceImpl implements DataApiServiceManage ...@@ -170,6 +174,7 @@ public class DataApiServiceManagementServiceImpl implements DataApiServiceManage
if(null == dataApiModelDto.getPublishSystemId()){ if(null == dataApiModelDto.getPublishSystemId()){
return failure("发布系统ID不能为空", null); return failure("发布系统ID不能为空", null);
} }
dataApiModel.setPublishSystemName(dataApiModelRepository.findSystemNameBySystemId(dataApiModelDto.getPublishSystemId()));
dataApiModel.setNetworkEnv(NetworkEnv.valueOf(dataApiModelDto.getNetworkEnv())); dataApiModel.setNetworkEnv(NetworkEnv.valueOf(dataApiModelDto.getNetworkEnv()));
dataApiModel.setSystemEnv(SystemEnv.valueOf(dataApiModelDto.getSystemEnv())); dataApiModel.setSystemEnv(SystemEnv.valueOf(dataApiModelDto.getSystemEnv()));
dataApiModel.setPublishSystemId(dataApiModelDto.getPublishSystemId()); dataApiModel.setPublishSystemId(dataApiModelDto.getPublishSystemId());
...@@ -450,7 +455,7 @@ public class DataApiServiceManagementServiceImpl implements DataApiServiceManage ...@@ -450,7 +455,7 @@ public class DataApiServiceManagementServiceImpl implements DataApiServiceManage
} }
return tableIdAndColumnIdToSelfMap; return tableIdAndColumnIdToSelfMap;
} }
public Result<JSONObject> debugDataApiService(String environment, Long dataModelId){ public Result<JSONObject> debugDataApiService(String environment, Long dataModelId, ModelConfigType type){
JSONObject result = new JSONObject(); JSONObject result = new JSONObject();
Optional<DataApiModel> dataApiModelOptional = dataApiModelRepository.findById(dataModelId); Optional<DataApiModel> dataApiModelOptional = dataApiModelRepository.findById(dataModelId);
...@@ -467,37 +472,44 @@ public class DataApiServiceManagementServiceImpl implements DataApiServiceManage ...@@ -467,37 +472,44 @@ public class DataApiServiceManagementServiceImpl implements DataApiServiceManage
} }
DataApiDataSource dataApiDataSource = dataApiDataSourceOptional.get(); DataApiDataSource dataApiDataSource = dataApiDataSourceOptional.get();
List<TableJoinRelationConfig> tableJoinRelationConfigs = dataApiModel.getTableJoinRelationConfig();
if(CollectionUtils.isEmpty(tableJoinRelationConfigs)){
return failure("调试数据api服务失败--数据表关系配置不存在", null);
}
DataApiModelConfig dataApiModelConfig; DataApiModelConfig dataApiModelConfig;
Optional<DataApiModelConfig> dataApiModelConfigOptional = dataApiModelConfigRepository.findByTypeIdAndType(dataApiModel.getId(), ModelConfigType.PUBLISH); Optional<DataApiModelConfig> dataApiModelConfigOptional = dataApiModelConfigRepository.findByTypeIdAndType(dataApiModel.getId(), type);
if (!dataApiModelConfigOptional.isPresent()) { if (!dataApiModelConfigOptional.isPresent()) {
return failure("调试数据api服务失败--行列配置不存在", null); return failure("调试数据api服务失败--行列配置不存在", null);
} }
dataApiModelConfig = dataApiModelConfigOptional.get(); dataApiModelConfig = dataApiModelConfigOptional.get();
List<TableJoinRelationConfig> tableJoinRelationConfigs = dataApiModel.getTableJoinRelationConfig();
if(CollectionUtils.isEmpty(tableJoinRelationConfigs)){
return failure("调试数据api服务失败--数据表关系配置不存在", null);
}
String fullSql = dataApiModelConfig.generateAndSetFullSql(tableJoinRelationConfigs, dataApiDataSource.getType()); String fullSql = dataApiModelConfig.generateAndSetFullSql(tableJoinRelationConfigs, dataApiDataSource.getType());
if(StringUtils.isEmpty(fullSql)){ if(StringUtils.isEmpty(fullSql)){
return failure("调试数据api服务失败--生成sql失败", null); return failure("调试数据api服务失败--生成sql失败", null);
} }
// 查询出前10条数据进行返回 // 查询出前10条数据进行返回
Result<List<List<Object>>> executeSqlResult = new Result<>();
List<List<Object>> data = new ArrayList<>(); List<List<Object>> data = new ArrayList<>();
List<Object> jdbcParams = getJdbcParams(dataApiModelConfig, dataApiDataSource.getType());
try{ try{
data = executeDataQueryTask(fullSql, dataApiModelConfig, dataApiDataSource); executeSqlResult = executeDataQueryTask(fullSql, dataApiModelConfig, dataApiDataSource,jdbcParams);
dataApiModel.setDebugStatus(DebugStatusEnum.SUCCESS); dataApiModel.setDebugStatus(DebugStatusEnum.SUCCESS);
data = executeSqlResult.getData();
}catch (Exception e){ }catch (Exception e){
log.error("执行数据查询失败:", e); log.error("执行数据查询失败:", e);
dataApiModel.setDebugStatus(DebugStatusEnum.FAIL); dataApiModel.setDebugStatus(DebugStatusEnum.FAIL);
} }
// 保存调试结果
// 设置调试结果
dataApiModelRepository.save(dataApiModel); dataApiModelRepository.save(dataApiModel);
// 获取数据的列名
List<List<String>> heads = ColumnDataRangeFilter.generateExcelHead(dataApiModelConfig.getColumnDataRangeConfig()); List<List<String>> heads = ColumnDataRangeFilter.generateExcelHead(dataApiModelConfig.getColumnDataRangeConfig());
result.put("data", data); result.put("data", data);
result.put("heads", heads); result.put("heads", heads);
return success("数据api服务调试", result); if(DebugStatusEnum.FAIL.equals(dataApiModel.getDebugStatus())){
return failure(executeSqlResult.getMsg());
}
return success("数据api服务调试成功", result);
} }
@Override @Override
...@@ -537,13 +549,25 @@ public class DataApiServiceManagementServiceImpl implements DataApiServiceManage ...@@ -537,13 +549,25 @@ public class DataApiServiceManagementServiceImpl implements DataApiServiceManage
dataApiModelDetailVo.setColumnDataRangeConfig(item.getColumnDataRangeConfig()); dataApiModelDetailVo.setColumnDataRangeConfig(item.getColumnDataRangeConfig());
dataApiModelDetailVo.setLineDataRangeConfig(item.getLineDataRangeConfig()); dataApiModelDetailVo.setLineDataRangeConfig(item.getLineDataRangeConfig());
}); });
// TODO 设置发布系统相关信息 dataApiModelDetailVo.setSystemLeader(dataApiModel.getCreator());
// dataApiModelDetailVo.setPublishSystem();
// dataApiModelDetailVo.setSystemLeader();
return success(dataApiModelDetailVo); return success(dataApiModelDetailVo);
} }
private List<List<Object>> executeDataQueryTask(String dataSql, DataApiModelConfig dataApiModelConfig, DataApiDataSource dataSource) throws Exception{ private Result<List<List<Object>>> executeDataQueryTask(String dataSql, DataApiModelConfig dataApiModelConfig, DataApiDataSource dataSource, List<Object> jdbcParamValues) throws Exception{
QueryDataTask queryDataTask = new QueryDataTask();
queryDataTask.setDataApiDataSource(dataSource);
queryDataTask.setQuerySql(dataSql);
queryDataTask.setPageNumber(1L);
queryDataTask.setColumnHeads(ColumnDataRangeFilter.generateExcelHead(dataApiModelConfig.getColumnDataRangeConfig()));
queryDataTask.setJdbcParamValues(jdbcParamValues);
Future<?> future = debugQueryDataThreadPool.addExecuteTask(queryDataTask);
Result<List<List<Object>>> result = new Result<>();
result = (Result<List<List<Object>>>) future.get();
return result;
}
private List<Object> getJdbcParams(DataApiModelConfig dataApiModelConfig, DataSourceTypeEnum dataSourceType){
// 获取sql的填充参数 // 获取sql的填充参数
List<Object> jdbcParamValues = new ArrayList<>(); List<Object> jdbcParamValues = new ArrayList<>();
List<LineDataRangeFilter> lineDataRangeConfigs = dataApiModelConfig.getLineDataRangeConfig(); List<LineDataRangeFilter> lineDataRangeConfigs = dataApiModelConfig.getLineDataRangeConfig();
...@@ -554,8 +578,6 @@ public class DataApiServiceManagementServiceImpl implements DataApiServiceManage ...@@ -554,8 +578,6 @@ public class DataApiServiceManagementServiceImpl implements DataApiServiceManage
} }
} }
} }
DataSourceTypeEnum dataSourceType = dataSource.getType();
// 封装查询数据任务 // 封装查询数据任务
Long limitFirstValue = 0L; Long limitFirstValue = 0L;
Long limitSecondValue = 10L; Long limitSecondValue = 10L;
...@@ -571,17 +593,7 @@ public class DataApiServiceManagementServiceImpl implements DataApiServiceManage ...@@ -571,17 +593,7 @@ public class DataApiServiceManagementServiceImpl implements DataApiServiceManage
} }
jdbcParamValues.add(limitFirstValue); jdbcParamValues.add(limitFirstValue);
jdbcParamValues.add(limitSecondValue); jdbcParamValues.add(limitSecondValue);
return jdbcParamValues;
QueryDataTask queryDataTask = new QueryDataTask();
queryDataTask.setDataApiDataSource(dataSource);
queryDataTask.setQuerySql(dataSql);
queryDataTask.setPageNumber(1L);
queryDataTask.setColumnHeads(ColumnDataRangeFilter.generateExcelHead(dataApiModelConfig.getColumnDataRangeConfig()));
queryDataTask.setJdbcParamValues(jdbcParamValues);
Future<?> future = debugQueryDataThreadPool.addExecuteTask(queryDataTask);
List<List<Object>> data = new ArrayList<>();
data = (List<List<Object>>) future.get();
return data;
} }
} }
package com.hisense.dataservice.service.impl;
import com.hisense.dataservice.es.data.EsField;
import com.hisense.dataservice.es.data.EsPage;
import com.hisense.dataservice.es.enums.FieldTypeEnum;
import com.hisense.dataservice.es.utils.ElasticsearchUtils;
import com.hisense.dataservice.library.model.page.PageData;
import com.hisense.dataservice.library.model.page.PageLink;
import com.hisense.dataservice.library.model.page.SortOrder;
import com.hisense.dataservice.service.DataServiceLogManagementService;
import com.hisense.dataservice.vo.CallLogVo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
/**
* @author : scc
* @date : 2023/03/20
**/
@Service
@Slf4j
public class DataServiceLogManagementServiceImpl implements DataServiceLogManagementService {
@Autowired
private ElasticsearchUtils elasticsearchUtils;
@Override
public PageData<CallLogVo> getCallLog(String environment, Long systemId, PageLink pageLink) {
List<EsField> conditions = new ArrayList<>();
// 排序
EsField esField = new EsField();
esField.setField(pageLink.getSortOrder().getProperty());
esField.setFieldTypeEnum(pageLink.getSortOrder().getDirection().equals(SortOrder.Direction.ASC) ? FieldTypeEnum.ORDER_ASC :FieldTypeEnum.ORDER_ESC);
conditions.add(esField);
// 索引名称 : data_serv_call_log_+环境+订阅系统ID
String indexName = "data_serv_call_log_" + environment + "_" +systemId;
PageData result = new PageData();
result.setData(new ArrayList());
EsPage<CallLogVo> callLogVoEsPage;
try{
callLogVoEsPage = elasticsearchUtils.selectDataPage(indexName, pageLink.getPage(), pageLink.getPageSize(), conditions, CallLogVo.class);
result.setData(callLogVoEsPage.getList());
result.setCurrentPage(callLogVoEsPage.getPageNum());
result.setTotalPages(callLogVoEsPage.getPages());
result.setTotalElements(callLogVoEsPage.getTotal());
}catch (Exception e){
log.error("查询数据服务调用日志失败", e.getMessage());
}
return result;
}
}
...@@ -12,49 +12,9 @@ import java.sql.ResultSet; ...@@ -12,49 +12,9 @@ import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Random;
@Slf4j @Slf4j
public class JdbcUtil { public class JdbcUtil {
// for test
public static Result<Long> testCountSql() {
Integer result = 1100;
String threadName = Thread.currentThread().getName();
log.info("thread: {}, start...", threadName);
try {
Thread.sleep(1000 * new Random().nextInt(5));
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("thread: {}, end...", threadName);
return new Result<>(result.longValue());
}
// for test
public static Result<List<List<Object>>> testExecuteSql(Long pageNumber) {
List<List<Object>> jsonObjects = new ArrayList<>();
String threadName = Thread.currentThread().getName();
log.info("thread: {}, start...", threadName);
try {
Thread.sleep(1000 * new Random().nextInt(5));
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 0; i < 10; i++) {
List<Object> jsonObject = new ArrayList<>();
jsonObject.add("number:" + pageNumber + "scc_" + i);
jsonObject.add(i + 10);
jsonObject.add(i + 11);
jsonObject.add(i + 12);
jsonObject.add(i + 13);
jsonObject.add(i + 14);
jsonObjects.add(jsonObject);
}
log.info("thread: {}, end...", threadName);
return new Result<>(jsonObjects);
}
public static Result<List<JSONObject>> executeSql(DataApiDataSource datasource, String sql, List<Object> jdbcParamValues, Integer batchSize) throws Exception { public static Result<List<JSONObject>> executeSql(DataApiDataSource datasource, String sql, List<Object> jdbcParamValues, Integer batchSize) throws Exception {
String threadName = Thread.currentThread().getName(); String threadName = Thread.currentThread().getName();
...@@ -102,13 +62,10 @@ public class JdbcUtil { ...@@ -102,13 +62,10 @@ public class JdbcUtil {
return new Result("200", "查询成功", result); return new Result("200", "查询成功", result);
} catch (SQLException sqlException) { } catch (SQLException sqlException) {
log.error("[{}] 数据库连接失败:", datasource.getSourceName(), sqlException); log.error("[{}] 数据库连接失败:", datasource.getSourceName(), sqlException);
// TODO 记录到ES return new Result("300", "数据源异常:" + sqlException.getMessage(), result);
throw new SQLException("数据库连接失败");
// return new Result("600", sqlException.getMessage(), result);
} catch (Exception e) { } catch (Exception e) {
log.error("thread: {}, sql: {}, params: {}, 分页查询数据异常:", threadName, sql, jdbcParamValues, e); log.error("thread: {}, sql: {}, params: {}, 分页查询数据异常:", threadName, sql, jdbcParamValues, e);
throw new SQLException("分页查询数据异常"); return new Result("400", "系统异常:" + e.getMessage(), result);
// return new Result("500", e.getMessage(), result);
} finally { } finally {
try { try {
if (connection != null) if (connection != null)
......
package com.hisense.dataservice.vo;
import com.fasterxml.jackson.annotation.JsonFormat;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.util.Date;
import java.util.List;
/**
* @author : scc
* @date : 2023/03/20
**/
@ApiModel("数据服务调用日志vo")
@Data
public class CallLogVo {
@ApiModelProperty(name = "订阅ID")
private Long subscribeSystemId;
@ApiModelProperty(name = "环境")
private String environment;
@ApiModelProperty(name = "数据服务名称")
private String serviceName;
@ApiModelProperty(name = "调用系统")
private String systemName;
@ApiModelProperty(name = "调用时间")
@JsonFormat(shape = JsonFormat.Shape.STRING,pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private String timestamp;
@ApiModelProperty(name = "调用状态")
private String status;
@ApiModelProperty(name = "原因")
private String reason;
@ApiModelProperty(name = "sql语句")
private String sql;
@ApiModelProperty(name = "sql过滤值")
private List<String> jdbcParams;
}
...@@ -39,7 +39,7 @@ public class DataApiModelDetailVo { ...@@ -39,7 +39,7 @@ public class DataApiModelDetailVo {
private String systemLeader; private String systemLeader;
@ApiModelProperty(name = "发布系统") @ApiModelProperty(name = "发布系统")
private String publishSystem; private String publishSystemName;
@ApiModelProperty(name = "发布时间") @ApiModelProperty(name = "发布时间")
private Date createdTime; private Date createdTime;
......
...@@ -13,9 +13,6 @@ import lombok.Data; ...@@ -13,9 +13,6 @@ import lombok.Data;
@Data @Data
public class DataApiModelItemVo { public class DataApiModelItemVo {
@ApiModelProperty(name = "数据源ID")
private Long dataSourceId;
@ApiModelProperty(name = "数据服务ID") @ApiModelProperty(name = "数据服务ID")
private Long id; private Long id;
...@@ -31,4 +28,10 @@ public class DataApiModelItemVo { ...@@ -31,4 +28,10 @@ public class DataApiModelItemVo {
@ApiModelProperty(name = "系统环境") @ApiModelProperty(name = "系统环境")
private SystemEnv systemEnv; private SystemEnv systemEnv;
@ApiModelProperty(name = "发布人")
private String creator;
@ApiModelProperty(name = "发布系统名")
private String systemName;
} }
...@@ -15,4 +15,10 @@ spring: ...@@ -15,4 +15,10 @@ spring:
server: server:
port=8080 # 应用服务 WEB 访问端口 port=8080 # 应用服务 WEB 访问端口
es:
hosts: ${ES_HOST:192.168.78.132:9200}
username: ${ES_USERNAME:elastic}
password: ${ES_PASSWORD:elastic}
connectTimeout: ${ES_CONNECT_TIMEOUT:30000}
socketTimeout: ${ES_SOCKET_TIMEOUT:30000}
connectionRequestTimeout: ${CONNECTION_REQUEST_TIMEOUT:30000}
...@@ -4,10 +4,16 @@ import com.alibaba.fastjson.JSONObject; ...@@ -4,10 +4,16 @@ import com.alibaba.fastjson.JSONObject;
import com.hisense.dataservice.DataServiceApplication; import com.hisense.dataservice.DataServiceApplication;
import com.hisense.dataservice.bo.*; import com.hisense.dataservice.bo.*;
import com.hisense.dataservice.dto.DataApiModelDto; import com.hisense.dataservice.dto.DataApiModelDto;
import com.hisense.dataservice.enums.ModelConfigType;
import com.hisense.dataservice.enums.NetworkEnv; import com.hisense.dataservice.enums.NetworkEnv;
import com.hisense.dataservice.enums.OperateEnum; import com.hisense.dataservice.enums.OperateEnum;
import com.hisense.dataservice.enums.SystemEnv; import com.hisense.dataservice.enums.SystemEnv;
import com.hisense.dataservice.es.utils.ElasticsearchUtils;
import com.hisense.dataservice.library.model.Result; import com.hisense.dataservice.library.model.Result;
import com.hisense.dataservice.library.model.page.PageData;
import com.hisense.dataservice.library.model.page.PageLink;
import com.hisense.dataservice.library.model.page.SortOrder;
import com.hisense.dataservice.vo.CallLogVo;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
...@@ -29,6 +35,10 @@ import java.util.List; ...@@ -29,6 +35,10 @@ import java.util.List;
public class DataApiServiceManagementServiceTest { public class DataApiServiceManagementServiceTest {
@Autowired @Autowired
private DataServiceLogManagementService dataServiceLogManagementService;
@Autowired
private ElasticsearchUtils elasticsearchUtils;
@Autowired
private DataApiServiceManagementService dataApiServiceManagementService; private DataApiServiceManagementService dataApiServiceManagementService;
@Test @Test
...@@ -52,7 +62,7 @@ public class DataApiServiceManagementServiceTest { ...@@ -52,7 +62,7 @@ public class DataApiServiceManagementServiceTest {
log.info("执行第四步结果,发布api:{}", result4); log.info("执行第四步结果,发布api:{}", result4);
Assert.assertEquals(false, result4.getData()); Assert.assertEquals(false, result4.getData());
Result<JSONObject> result5 = dataApiServiceManagementService.debugDataApiService("test", result3.getData()); Result<JSONObject> result5 = dataApiServiceManagementService.debugDataApiService("test", result3.getData(), ModelConfigType.PUBLISH);
log.info("执行第五步结果,调试api:{}", result5); log.info("执行第五步结果,调试api:{}", result5);
Assert.assertEquals(false, result5.getData()); Assert.assertEquals(false, result5.getData());
...@@ -85,7 +95,7 @@ public class DataApiServiceManagementServiceTest { ...@@ -85,7 +95,7 @@ public class DataApiServiceManagementServiceTest {
log.info("执行发布api;第四步结果,发布api:{}", result4); log.info("执行发布api;第四步结果,发布api:{}", result4);
Assert.assertEquals(false, result4.getData()); Assert.assertEquals(false, result4.getData());
Result<JSONObject> result5 = dataApiServiceManagementService.debugDataApiService("test", result3.getData()); Result<JSONObject> result5 = dataApiServiceManagementService.debugDataApiService("test", result3.getData(), ModelConfigType.PUBLISH);
log.info("执行调试;第五步结果,调试api:{}", result5); log.info("执行调试;第五步结果,调试api:{}", result5);
Assert.assertEquals(null, result5.getData()); Assert.assertEquals(null, result5.getData());
...@@ -93,7 +103,7 @@ public class DataApiServiceManagementServiceTest { ...@@ -93,7 +103,7 @@ public class DataApiServiceManagementServiceTest {
@Test @Test
public void test(){ public void test(){
Result<JSONObject> result5 = dataApiServiceManagementService.debugDataApiService("DEV", 541L); Result<JSONObject> result5 = dataApiServiceManagementService.debugDataApiService("DEV", 541L, ModelConfigType.PUBLISH);
log.info("执行调试;第五步结果,调试api:{}", result5); log.info("执行调试;第五步结果,调试api:{}", result5);
Assert.assertEquals(null, result5.getData()); Assert.assertEquals(null, result5.getData());
} }
...@@ -267,4 +277,31 @@ public class DataApiServiceManagementServiceTest { ...@@ -267,4 +277,31 @@ public class DataApiServiceManagementServiceTest {
filterCondition.setFilterValues(filterValues); filterCondition.setFilterValues(filterValues);
return filterCondition; return filterCondition;
} }
@Test
public void testEsLog(){
// 保存调用记录到ES中
JSONObject callSqlEsLogJson = new JSONObject();
callSqlEsLogJson.put("sql", "select * from A where 1=1 and namel = ?");
callSqlEsLogJson.put("jdbcParams", "['a']");
callSqlEsLogJson.put("serviceName", "测试服务名");
callSqlEsLogJson.put("systemName", "测试系统名");
callSqlEsLogJson.put("status", "失败");
callSqlEsLogJson.put("reason", "数据库连接失败");
callSqlEsLogJson.put("timestamp", System.currentTimeMillis());
callSqlEsLogJson.put("environment", "staging");
callSqlEsLogJson.put("subscribeSystemId", "1");
String indexName = "call_log_" + "staging_" + "2";
boolean addLogResult = elasticsearchUtils.addData(indexName, callSqlEsLogJson.toJSONString());
log.info("添加记录到es,结果:{}", addLogResult);
PageLink pageLink = new PageLink(10,1,"", new SortOrder("timestamp", SortOrder.Direction.DESC));
PageData<CallLogVo> pageData1 = dataServiceLogManagementService.getCallLog("staging", 2L, pageLink);
log.info("pageData1: {}", pageData1);
PageData<CallLogVo> pageData2 = dataServiceLogManagementService.getCallLog("staging", 1L, pageLink);
log.info("pageData2: {}", pageData2);
}
} }
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论