【中间件】ElasticSearch在Java中的调用尝试
2023-12-18 22:26:34
ElasticSearch
ElasticSearch的两个主要功能:
- 全文检索
- 形成ELK集群,进行日志检索(ElasticSearch、LogStash、Kibana)
日志采集(服务器、移动设备、IOT传感器) -> 数据传输汇聚(Kafka、LogStash) -> 数据存储索引(ElasticSearch集群) -> 异常定位与监控(全文检索、结构化查询、数据可视化)
ES是一个基于内存的数据检索与分析引擎,我们如果有十分多的数据需要保存的话,我们可以构建多台服务器,利用ES天然适配于分布式的属性对ES中存储的数据进行分片存储
业务:商品在上架后向ES中保存
涉及到ES的问题,尽量不要使用分步查询,一步到位,否则第二次请求要发送的数据可能会相当大,导致网络崩溃
创建索引:(注意点:对于只需要看,不需要搜索的信息,可以把index属性和doc_values属性设置为false,对于内部的另一个属性,要标记为nested,否则会产生问题)
一个数据模型的映射示例:
PUT product
{
"mappings":{
"properties": {
"skuId":{ "type": "long" },
"spuId":{ "type": "keyword" },
"skuTitle": {
"type": "text",
"analyzer": "ik_smart"
},
"skuPrice": { "type": "keyword" },
"skuImg" : { "type": "keyword" },
"saleCount":{ "type":"long" },
"hasStock": { "type": "boolean" },
"hotScore": { "type": "long" },
"brandId": { "type": "long" },
"catalogId": { "type": "long" },
"brandName": {"type": "keyword"},
"brandImg":{
"type": "keyword",
"index": false,
"doc_values": false
},
"catalogName": {"type": "keyword" },
"attrs": {
"type": "nested",
"properties": {
"attrId": {"type": "long" },
"attrName": {
"type": "keyword",
"index": false,
"doc_values": false
},
"attrValue": {"type": "keyword" }
}
}
}
}
}
一定要使用nested的原因:
ES在存储时不会按照对象存储,而是按照属性,把所有同一个字段的内容都聚合成一个数组(用来便于全文检索),而我们不希望我们的内嵌的对象被扁平化处理,希望能够精确查找,所以我们标注nested来让ES不对该属性进行扁平化处理
user.name=["aaa","bbb"]
user.addr=["ccc","ddd"]
这种存储方式,可能会发生如下错误:
错误检索到{aaa,ddd},这个组合是不存在的
操作,将数据在ES中保存
我们提前准备好所有要保存的信息,建立Vo类,使用这个VO类进行保存
在search包中创建一个controller:
/**
* 上架商品
*/
@PostMapping("/product")
public R productStatusUp(@RequestBody List<SkuEsModel> skuEsModelList) {
// 由于我们在网络可能出现问题的部分Throw 了 IOException,故我们需要使用 try-catch 进行捕获
try{
productSaveService.productStatusUp(skuEsModelList);
} catch (Exception e) {
return R.error(PRODUCT_UP_EXCEPTION.getCode(), PRODUCT_UP_EXCEPTION.getMsg());
};
return R.ok();
}
}
具体操作还得再Impl中做:
/**
* 上架商品
*
* @param skuEsModelList
* @return
*/
@Override
public boolean productStatusUp(List<SkuEsModel> skuEsModelList) throws IOException {
/**
* 向ES中保存数据.bulk(BulkRequest, RequestOptions)
* BulkRequest中存储我们要保存的数据配置(索引、id等)
* RequestOptions我们提前在Config中进行全局定义,用来配置我们发送请求的各种参数,我们也可以自己定义,但这里我们先使用全局的
* 之后我们再新建IndexRequest对象,在IndexRequest中标注索引、id、具体的数据
* 我们的id可以使用我们的skuId
*/
BulkRequest bulkRequest = new BulkRequest();
for (SkuEsModel model : skuEsModelList) {
IndexRequest indexRequest = new IndexRequest(EsConstant.PRODUCT_INDEX); // 传入索引
indexRequest.id(model.getSkuId().toString()); // 传入id
String s = JSON.toJSONString(model); // 将对象转换成JSON对象
indexRequest.source(s, XContentType.JSON); // 传入具体的信息,第一个形参是String字符串,第二个形参是它对应的数据类型(这里是JSON)
bulkRequest.add(indexRequest); // 将信息传入BulkRequest,用于RestHighLevelClient进行调用
}
// 进行调用,向ES中进行最后的存储
BulkResponse bulk = restHighLevelClient.bulk(bulkRequest, GulimailElasticSearchConfig.COMMON_OPTIONS);
// 若发生添加错误
boolean b = bulk.hasFailures();
// 这个数组直接转Stream流的方式可以学一下
List<String> collect = Arrays.stream(bulk.getItems()).map(item -> {
return item.getId();
}).collect(Collectors.toList());
log.error("商品上架错误{}", collect);
return b;
}
操作Kibana,给ES建立好索引:
PUT product
{
"mappings":{
"properties": {
"skuId":{ "type": "long" },
"spuId":{ "type": "keyword" },
"skuTitle": {
"type": "text",
"analyzer": "ik_smart"
},
"skuPrice": { "type": "keyword" },
"skuImg" : { "type": "keyword" },
"saleCount":{ "type":"long" },
"hasStock": { "type": "boolean" },
"hotScore": { "type": "long" },
"brandId": { "type": "long" },
"catalogId": { "type": "long" },
"brandName": {"type": "keyword"},
"brandImg":{
"type": "keyword",
"index": false,
"doc_values": false
},
"catalogName": {"type": "keyword" },
"attrs": {
"type": "nested",
"properties": {
"attrId": {"type": "long" },
"attrName": {
"type": "keyword",
"index": false,
"doc_values": false
},
"attrValue": {"type": "keyword" }
}
}
}
}
}
在product模块中创建SearchFeignService接口:
@FeignClient("gulimail-search")
public interface SearchFeignService {
/**
* @PostMapping("/product")
* public R productStatusUp(@RequestBody List<SkuEsModel> skuEsModelList) {
*/
@PostMapping
R productStatusUp(@RequestBody List<SkuEsModel> skuEsModelList);
}
注入并调用这个接口(商品上架的整体内容):
/**
* 商品上架
* 上架是一个SPU类型的商品数据,而不是某一个具体的SKU
* @param spuId
*/
@Override
public void up(Long spuId) {
// 先查出当前spuid对应的所有sku的信息,包括品牌的名字也要查出来(一个spu对应多个sku,颜色不同、内存不同,等等)
List<SkuInfoEntity> skuInfoEntities = skuInfoService.getSkusBySpuId(spuId);
List<ProductAttrValueEntity> baseAttrs = productAttrValueService.baseAttrListforspu(spuId);
List<Long> attrIds = baseAttrs.stream().map(item -> {
return item.getAttrId();
}).collect(Collectors.toList());
// 查到所有可被检索信息的id
List<Long> searchAttrIds = attrService.selectSearchAttrIds(attrIds);
HashSet<Long> searchAttrIdsSet = new HashSet<>(searchAttrIds);
List<SkuEsModel.Attrs> attrs = new ArrayList<>();
List<SkuEsModel.Attrs> attrsList = baseAttrs.stream().filter(item -> baseAttrs.contains(item)).map(item -> {
SkuEsModel.Attrs attrs1 = new SkuEsModel.Attrs();
BeanUtils.copyProperties(item, attrs1);
return attrs1;
}).collect(Collectors.toList());
//获取所有的id
List<Long> skuIds = skuInfoEntities.stream().map(skuInfoEntity -> skuInfoEntity.getSkuId()).collect(Collectors.toList());
Map<Long, Boolean> stockMap = null;
try {
R<List<SkuHasStockVo>> skusHasStock = wareFeignService.getSkusHasStock(skuIds);
//若item -> item.getId(),则可以转化为:Item::getId(注意前面是类名,不是对象名)
stockMap = skusHasStock.getData().stream().collect(Collectors.toMap(SkuHasStockVo::getSkuId, SkuHasStockVo::getHasStock));
} catch (Exception e) {
log.error("远程调用出现异常:{}", e);
}
Map<Long, Boolean> finalStockMap = stockMap;
List<SkuEsModel> upProducts = skuInfoEntities.stream().map(item -> {
// 组装需要的数据
SkuEsModel esModel = new SkuEsModel();
BeanUtils.copyProperties(item, esModel);
// 设置其他没有被复制的字段
esModel.setSkuPrice(item.getPrice());
esModel.setSkuImg(item.getSkuDefaultImg());
// 查询库存
// TODO 远程调用仓库查库存
// 利用toMap和R的泛型的方法进行的快速转换(好牛的技巧!!!)
// Lambda中一般不能操作外部变量,这里有必要操作,在编译器的支持下做修改即可(Lambda操作的外部变量必须是以final修饰的)
esModel.setHasStock(finalStockMap.get(item.getSkuId()));
// 热度评分先设置为0
esModel.setHotScore(0L);
BrandEntity brand = brandService.getById(esModel.getBrandId());
esModel.setBrandName(brand.getName());
esModel.setBrandImg(brand.getLogo());
CategoryEntity category = categoryService.getById(esModel.getCatalogId());
esModel.setCatalogName(category.getName());
// 设置里面那个数组
esModel.setAttrs(attrsList);
return esModel;
}).collect(Collectors.toList());
// 将数据发送给ES进行存储
/**
* @PostMapping("/product")
* public R productStatusUp(@RequestBody List<SkuEsModel> skuEsModelList) {
*/
R r = searchFeignService.productStatusUp(upProducts);
if (r.getCode() == 0) {
baseMapper.updateSpuStatus(spuId, ProductConstant.StatusEnum.SPU_UP.getCode());
} else {
log.error("商品上架失败");
}
}
Feign的调用流程:
- 构造请求数据,将对象转为JSON数据,RequestTemplate template = buildTemplateFromArgs.create(argv)
- 发送请求执行(执行成功会进行解码响应)executeAndDecode(template)
- 这里的请求执行是有重试机制的,这个重试机制默认是关闭的,其逻辑大概是:执行出错 - 重试(多次重试) - 若再次出错,抛出异常,若没有再次出错 - 解码响应
尝试:
PUT gulimail_product
{
"mappings":{
"properties": {
"skuId":{ "type": "long" },
"spuId":{ "type": "keyword" },
"skuTitle": {
"type": "text",
"analyzer": "ik_smart"
},
"skuPrice": { "type": "keyword" },
"skuImg" : { "type": "keyword" },
"saleCount":{ "type":"long" },
"hasStock": { "type": "boolean" },
"hotScore": { "type": "long" },
"brandId": { "type": "long" },
"catalogId": { "type": "long" },
"brandName": {"type": "keyword"},
"brandImg":{
"type": "keyword",
"index": false,
},
"catalogName": {"type": "keyword" },
"attrs": {
"type": "nested",
"properties": {
"attrId": {"type": "long" },
"attrName": {
"type": "keyword",
"index": false,
},
"attrValue": {"type": "keyword" }
}
}
}
}
}
尝试成功,不加doc_values就可以进行聚合操作
文章来源:https://blog.csdn.net/weixin_41365204/article/details/135067745
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!