SpringCloud微服务技术栈(黑马)学习笔记DAY7
数据聚合
? ? ? ? 聚合可以实现对文档数据的统计、分析、运算。常见的聚合有三类:桶、度量、管道
? ? ?Bucket聚合
? ? ? ?对于字段的值做分组,以下是使用DSL实现Bucker聚合。
GET /hotel/_search
{
"size": 0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"size": 5
}
}
}
}
? ? ? ? ?默认情况下,Bucket聚合是对索引库的所有文档做聚合,我们可以限定要聚合的文档范围,只要添加query条件即可:
? ? ?Metrics聚合?
GET /hotel/_search
{
"size": 0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"size": 5
},
"aggs": {
"scoreAgg": {
"stats" : {
"field": "score"
}
}
}
}
}
}
? ? ? ? 我们可以通过在第一个聚合里面通过order对第二个聚合进行排序
}
GET /hotel/_search
{
"size": 0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"size": 5,
"order": {
"scoreAgg.avg": "desc"
}
},
"aggs": {
"scoreAgg": {
"stats" : {
"field": "score"
}
}
}
}
}
}
RestClient实现聚合?
? ? ? ? 我们以品牌聚合为例
@Test
void testAggregation() throws IOException {
SearchRequest request = new SearchRequest("hotel");
request.source().size(0);
request.source().aggregation(AggregationBuilders
.terms("brandAgg")
.field("brand")
.size(10));
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
Aggregations aggregations = response.getAggregations();
Terms brandTerms = aggregations.get("brandAgg");
List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
for (Terms.Bucket bucket : buckets) {
String key = bucket.getKeyAsString();
System.out.println(key);
}
}
? ? ? 在IUserService中定义方法,实现对品牌、城市、星级的聚合?
@Override
public Map<String, List<String>> filters() {
try {
SearchRequest request = new SearchRequest("hotel");
request.source().size(0);
bulidAggregation(request);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
Map<String,List<String>> result=new HashMap<>();
Aggregations aggregations = response.getAggregations();
List<String> brandList = getByAggName(aggregations,"brandAgg");
result.put("品牌",brandList);
List<String> cityList = getByAggName(aggregations,"cityAgg");
result.put("城市",cityList);
List<String> starList = getByAggName(aggregations,"starNameAgg");
result.put("星级",starList);
return result;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private static List<String> getByAggName(Aggregations aggregations,String name) {
Terms brandTerms = aggregations.get(name);
List<String> brandList=new ArrayList<>();
List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
for (Terms.Bucket bucket : buckets) {
String key = bucket.getKeyAsString();
brandList.add(key);
}
return brandList;
}
private static void bulidAggregation(SearchRequest request) {
request.source().aggregation(AggregationBuilders
.terms("brandAgg")
.field("brand")
.size(100));
request.source().aggregation(AggregationBuilders
.terms("cityAgg")
.field("city")
.size(100));
request.source().aggregation(AggregationBuilders
.terms("starNameAgg")
.field("starName")
.size(100));
}
? ? ? ? 前端页面会向服务端发起请求,查询品牌、城市、星级等字段的聚合结果
? ? ? ? 请求的方式是POST,URL如上图所示,前端向后端传递的参数和查询一样。
@Override
public Map<String, List<String>> filters(RequestParams params) {
try {
SearchRequest request = new SearchRequest("hotel");
buildBasicQuery(params, request);
request.source().size(0);
bulidAggregation(request);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
Map<String,List<String>> result=new HashMap<>();
Aggregations aggregations = response.getAggregations();
List<String> brandList = getByAggName(aggregations,"brandAgg");
result.put("品牌",brandList);
List<String> cityList = getByAggName(aggregations,"cityAgg");
result.put("城市",cityList);
List<String> starList = getByAggName(aggregations,"starNameAgg");
result.put("星级",starList);
return result;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
?自动补全
? ? ? ? 这边呢出现了一个问题,就是我在window下载好的pinyin插件结果复制到centos中出现报错。
后面呢,在centos下载就没有这个问题。
? ? 自定义分词器 ?
? ? ? ? 拼音分词器适合在创建倒排索引的时候使用,但不能在搜索的时候使用。(会出现搜索到同音字)
??????实现hotel库的自动补全?
? ? ? ? 步骤:1)修改hotel索引库结果,设置自定义拼音分词器
? ? ? ? ? ? ? ? ? ?2)修改索引库name、all字段,使用自定义分词器
? ? ? ? ? ? ? ? ? ?3)索引库添加一个新字段suggestion字段,内容包含brand、business
? ? ? ? ? ? ? ? ? ?4)给HotelDoc类添加suggestion字段,内容包含brand、business
? ? ? ? ? ? ? ? ? ?5)重新导入数据到hotel库
? ? ? ?RestAPI实现自动补全
@Test
void testSuggest(){
try {
SearchRequest request = new SearchRequest("hotel");
request.source().suggest(new SuggestBuilder().addSuggestion(
"suggestions",
SuggestBuilders.completionSuggestion("suggestion")
.prefix("h")
.skipDuplicates(true)
.size(10)
));
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
System.out.println(response);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
Suggest suggest = response.getSuggest();
CompletionSuggestion suggestions = suggest.getSuggestion("suggestion");
List<CompletionSuggestion.Entry.Option> options = suggestions.getOptions();
for (CompletionSuggestion.Entry.Option option : options) {
String text = option.getText().toString();
System.out.println(text);
}
搜索框自动补全
//请求方式get
//传入一个参数prefix
//返回数组
//controller层
@GetMapping("/suggestion")
public List<String> getSuggestions(@RequestParam("key") String prefix){
return hotelService.getSuggestions(prefix);
}
//service层
@Override
public List<String> getSuggestions(String prefix) {
try {
SearchRequest request = new SearchRequest("hotel");
request.source().suggest(new SuggestBuilder().addSuggestion(
"suggestions",
SuggestBuilders.completionSuggestion("suggestion")
.prefix(prefix)
.skipDuplicates(true)
.size(10)
));
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
Suggest suggest = response.getSuggest();
CompletionSuggestion suggestions = suggest.getSuggestion("suggestions");
List<CompletionSuggestion.Entry.Option> options = suggestions.getOptions();
List<String> list = new ArrayList<>(options.size());
for (CompletionSuggestion.Entry.Option option : options) {
String text = option.getText().toString();
list.add(text);
}
return list;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
?数据同步
? ? ? ? 当数据库mysql里的数据发生变化的时候,我们要求elasticsearch里的数据也必须跟着改变。
? ? ? ? 方法一:同步调用(写入数据库后,需要等待索引库更新,这才是一个过程的全程)
? ? ? ? 方法二:异步通知
? ? ? ? ?方法三:监听binlog
?????酒店搜索项目中实现es和mysql的同步
?????????申明交换机和队列一般都是在消费者,因此我们在hotel-demo创建了mq的配置类里面创建相关队列,交换机的bean。
//还有定义了一个常量类
public class MqConstants {
public final static String HOTEL_EXCHANGE = "hotel.topic";
public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
public final static String HOTEL_INSERT_KEY = "hotel.insert";
public final static String HOTEL_DELETE_KEY = "hotel.delete";
}
@Configuration
public class MqConfig {
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(MqConstants.HOTEL_EXCHANGE,true,false);
}
@Bean
public Queue insertQueue(){
return new Queue(MqConstants.HOTEL_INSERT_QUEUE,true);
}
@Bean
public Queue deleteQueue(){
return new Queue(MqConstants.HOTEL_DELETE_QUEUE,true);
}
@Bean
public Binding insertQueueBinding(){
return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);
}
@Bean
public Binding deleteQueueBinding(){
return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);
}
}
? ? ? ? 此外,我们需要添加对应的依赖和配置mq的配置信息。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
rabbitmq:
host: 192.168.239.137
port: 5672
username: root
password: 123
virtual-host: /
? ? ? ? 在生产者那边hotel-admin每队数据库进行增加修改删除都要发个消息给队列。
@Autowired
private RabbitTemplate rabbitTemplate;
public void saveHotel(@RequestBody Hotel hotel) {
hotelService.save(hotel);
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId());
}
public void updateById(@RequestBody Hotel hotel) {
if (hotel.getId() == null) {
throw new InvalidParameterException("id不能为空");
}
hotelService.updateById(hotel);
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId());
}
public void deleteById(@PathVariable("id") Long id) {
hotelService.removeById(id);
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_DELETE_KEY,id);
}
? ? ? ? 我们之后在消费端又专门穿件了一个类mq用来处理队列中的消息
//mq类
@Component
public class HotelListener {
@Autowired
private IHotelService iHotelService;
@RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
public void listenInsertOrUpdate(Long id){
iHotelService.insertById(id);
}
@RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
public void listenDelete(Long id){
iHotelService.deleteById(id);
}
}
//service层
@Override
public void deleteById(Long id) {
try {
DeleteRequest request= new DeleteRequest("hotel",id.toString());
client.delete(request,RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void insertById(Long id) {
try {
Hotel hotel = getById(id);
HotelDoc hotelDoc=new HotelDoc(hotel);
IndexRequest request=new IndexRequest("hotel").id(hotel.getId().toString());
request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
client.index(request,RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!