SpringCloud微服务技术栈(黑马)学习笔记DAY7

2024-01-10 06:07:14

数据聚合

? ? ? ? 聚合可以实现对文档数据的统计、分析、运算。常见的聚合有三类:桶、度量、管道

? ? ?Bucket聚合

? ? ? ?对于字段的值做分组,以下是使用DSL实现Bucker聚合。

GET /hotel/_search
{
  "size": 0,
  "aggs": {
    "brandAgg": {
      "terms": {
        "field": "brand",
        "size": 5
      }
    }
  }
}

? ? ? ? ?默认情况下,Bucket聚合是对索引库的所有文档做聚合,我们可以限定要聚合的文档范围,只要添加query条件即可:

? ? ?Metrics聚合?

GET /hotel/_search
{
  "size": 0,
  "aggs": {
    "brandAgg": {
      "terms": {
        "field": "brand",
        "size": 5
      },
      "aggs": {
        "scoreAgg": {
          "stats" : {
            "field": "score"
          }
        }
      }
    }
    
  }
}

? ? ? ? 我们可以通过在第一个聚合里面通过order对第二个聚合进行排序

}
GET /hotel/_search
{
  "size": 0,
  "aggs": {
    "brandAgg": {
      "terms": {
        "field": "brand",
        "size": 5,
        "order": {
          "scoreAgg.avg": "desc"
        }
      },
      "aggs": {
        "scoreAgg": {
          "stats" : {
            "field": "score"
          }
        }
      }
    }
    
  }
}

RestClient实现聚合?

? ? ? ? 我们以品牌聚合为例

    @Test
    void testAggregation() throws IOException {
        SearchRequest request = new SearchRequest("hotel");
        request.source().size(0);
        request.source().aggregation(AggregationBuilders
                .terms("brandAgg")
                .field("brand")
                .size(10));
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        Aggregations aggregations = response.getAggregations();
        Terms brandTerms = aggregations.get("brandAgg");
        List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
        for (Terms.Bucket bucket : buckets) {
            String key = bucket.getKeyAsString();
            System.out.println(key);
        }
    }

? ? ? 在IUserService中定义方法,实现对品牌、城市、星级的聚合?

  @Override
    public Map<String, List<String>> filters() {
        try {
            SearchRequest request = new SearchRequest("hotel");
            request.source().size(0);
            bulidAggregation(request);
            SearchResponse response = client.search(request, RequestOptions.DEFAULT);
            Map<String,List<String>> result=new HashMap<>();
            Aggregations aggregations = response.getAggregations();
            List<String> brandList = getByAggName(aggregations,"brandAgg");
            result.put("品牌",brandList);
            List<String> cityList = getByAggName(aggregations,"cityAgg");
            result.put("城市",cityList);
            List<String> starList = getByAggName(aggregations,"starNameAgg");
            result.put("星级",starList);
            return result;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private static List<String> getByAggName(Aggregations aggregations,String name) {
        Terms brandTerms = aggregations.get(name);
        List<String> brandList=new ArrayList<>();
        List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
        for (Terms.Bucket bucket : buckets) {
            String key = bucket.getKeyAsString();
            brandList.add(key);
        }
        return brandList;
    }

    private static void bulidAggregation(SearchRequest request) {
        request.source().aggregation(AggregationBuilders
                .terms("brandAgg")
                .field("brand")
                .size(100));
        request.source().aggregation(AggregationBuilders
                .terms("cityAgg")
                .field("city")
                .size(100));
        request.source().aggregation(AggregationBuilders
                .terms("starNameAgg")
                .field("starName")
                .size(100));
    }

? ? ? ? 前端页面会向服务端发起请求,查询品牌、城市、星级等字段的聚合结果

? ? ? ? 请求的方式是POST,URL如上图所示,前端向后端传递的参数和查询一样。

   @Override
    public Map<String, List<String>> filters(RequestParams params) {
        try {
            SearchRequest request = new SearchRequest("hotel");
            buildBasicQuery(params, request);
            request.source().size(0);
            bulidAggregation(request);
            SearchResponse response = client.search(request, RequestOptions.DEFAULT);
            Map<String,List<String>> result=new HashMap<>();
            Aggregations aggregations = response.getAggregations();
            List<String> brandList = getByAggName(aggregations,"brandAgg");
            result.put("品牌",brandList);
            List<String> cityList = getByAggName(aggregations,"cityAgg");
            result.put("城市",cityList);
            List<String> starList = getByAggName(aggregations,"starNameAgg");
            result.put("星级",starList);
            return result;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

?自动补全

? ? ? ? 这边呢出现了一个问题,就是我在window下载好的pinyin插件结果复制到centos中出现报错。

后面呢,在centos下载就没有这个问题。

? ? 自定义分词器 ?

? ? ? ? 拼音分词器适合在创建倒排索引的时候使用,但不能在搜索的时候使用。(会出现搜索到同音字)

??????实现hotel库的自动补全?

? ? ? ? 步骤:1)修改hotel索引库结果,设置自定义拼音分词器

? ? ? ? ? ? ? ? ? ?2)修改索引库name、all字段,使用自定义分词器

? ? ? ? ? ? ? ? ? ?3)索引库添加一个新字段suggestion字段,内容包含brand、business

? ? ? ? ? ? ? ? ? ?4)给HotelDoc类添加suggestion字段,内容包含brand、business

? ? ? ? ? ? ? ? ? ?5)重新导入数据到hotel库

? ? ? ?RestAPI实现自动补全

    @Test
    void testSuggest(){
        try {
            SearchRequest request = new SearchRequest("hotel");
            request.source().suggest(new SuggestBuilder().addSuggestion(
                    "suggestions",
                    SuggestBuilders.completionSuggestion("suggestion")
                            .prefix("h")
                            .skipDuplicates(true)
                            .size(10)
            ));
            SearchResponse response = client.search(request, RequestOptions.DEFAULT);
            System.out.println(response);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

            SearchResponse response = client.search(request, RequestOptions.DEFAULT);
            Suggest suggest = response.getSuggest();
            CompletionSuggestion suggestions = suggest.getSuggestion("suggestion");
            List<CompletionSuggestion.Entry.Option> options = suggestions.getOptions();
            for (CompletionSuggestion.Entry.Option option : options) {
                String text = option.getText().toString();
                System.out.println(text);
            }

搜索框自动补全

//请求方式get
//传入一个参数prefix
//返回数组

//controller层
    @GetMapping("/suggestion")
    public List<String> getSuggestions(@RequestParam("key") String prefix){
        return hotelService.getSuggestions(prefix);
    }
//service层
    @Override
    public List<String> getSuggestions(String prefix) {
        try {
            SearchRequest request = new SearchRequest("hotel");
            request.source().suggest(new SuggestBuilder().addSuggestion(
                    "suggestions",
                    SuggestBuilders.completionSuggestion("suggestion")
                            .prefix(prefix)
                            .skipDuplicates(true)
                            .size(10)
            ));
            SearchResponse response = client.search(request, RequestOptions.DEFAULT);
            Suggest suggest = response.getSuggest();
            CompletionSuggestion suggestions = suggest.getSuggestion("suggestions");
            List<CompletionSuggestion.Entry.Option> options = suggestions.getOptions();
            List<String> list = new ArrayList<>(options.size());
            for (CompletionSuggestion.Entry.Option option : options) {
                String text = option.getText().toString();
                list.add(text);
            }
            return list;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

    }

?数据同步

? ? ? ? 当数据库mysql里的数据发生变化的时候,我们要求elasticsearch里的数据也必须跟着改变。

? ? ? ? 方法一:同步调用(写入数据库后,需要等待索引库更新,这才是一个过程的全程)

? ? ? ? 方法二:异步通知

? ? ? ? ?方法三:监听binlog

?????酒店搜索项目中实现es和mysql的同步

?????????申明交换机和队列一般都是在消费者,因此我们在hotel-demo创建了mq的配置类里面创建相关队列,交换机的bean。

//还有定义了一个常量类
public class MqConstants {
    public final static String HOTEL_EXCHANGE = "hotel.topic";
    public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
    public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
    public final static String HOTEL_INSERT_KEY = "hotel.insert";
    public final static String HOTEL_DELETE_KEY = "hotel.delete";
}


@Configuration
public class MqConfig {
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange(MqConstants.HOTEL_EXCHANGE,true,false);
    }
    @Bean
    public Queue insertQueue(){
        return new Queue(MqConstants.HOTEL_INSERT_QUEUE,true);
    }
    @Bean
    public Queue deleteQueue(){
        return new Queue(MqConstants.HOTEL_DELETE_QUEUE,true);
    }
    @Bean
    public Binding insertQueueBinding(){
        return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);
    }
    @Bean
    public Binding deleteQueueBinding(){
        return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);
    }
}

? ? ? ? 此外,我们需要添加对应的依赖和配置mq的配置信息。

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

  rabbitmq:
    host: 192.168.239.137
    port: 5672
    username: root
    password: 123
    virtual-host: /

? ? ? ? 在生产者那边hotel-admin每队数据库进行增加修改删除都要发个消息给队列。

    @Autowired
    private RabbitTemplate rabbitTemplate;


    public void saveHotel(@RequestBody Hotel hotel) {
        hotelService.save(hotel);
        rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId());
    }

    public void updateById(@RequestBody Hotel hotel) {
        if (hotel.getId() == null) {
            throw new InvalidParameterException("id不能为空");
        }
        hotelService.updateById(hotel);
        rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId());
    }


    public void deleteById(@PathVariable("id") Long id) {
        hotelService.removeById(id);
        rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_DELETE_KEY,id);
    }

? ? ? ? 我们之后在消费端又专门穿件了一个类mq用来处理队列中的消息

//mq类
@Component
public class HotelListener {
    @Autowired
    private  IHotelService iHotelService;
    @RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
    public void listenInsertOrUpdate(Long id){
        iHotelService.insertById(id);
    }
    @RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
    public void listenDelete(Long id){
        iHotelService.deleteById(id);
    }

}


//service层
    @Override
    public void deleteById(Long id) {
        try {
            DeleteRequest request= new DeleteRequest("hotel",id.toString());
            client.delete(request,RequestOptions.DEFAULT);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void insertById(Long id) {
        try {
            Hotel hotel = getById(id);
            HotelDoc hotelDoc=new HotelDoc(hotel);
            IndexRequest request=new IndexRequest("hotel").id(hotel.getId().toString());
            request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
            client.index(request,RequestOptions.DEFAULT);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

文章来源:https://blog.csdn.net/wwwwz9/article/details/135232099
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。