SpringCloud-全家桶

微服务架构的概念以及优势

什么是微服务
  • 官方定义: 微服务就是由一系列围绕自己业务开发的微小服务构成,他们独立部署运行在自己的进程里,基于分布式的管理
  • 通俗定义: 微服务是一种架构,这种架构是将单个的整体应用程序分割成更小的项目关联的独立的服务。一个服务通常实现一组独立的特性或功能,包含自己的业务逻辑和适配器。各个微服务之间的关联通过暴露api 来实现。这些独立的微服务不需要部署在同一个虚拟机,同一个系统和同一个应用服务器中
单体应用架构优缺点
  • 优点: 单一架构模式在项目初期很小的时候开发方便,测试方便,部署方便,运行良好
  • 缺点: 应用随着时间的推进,加入的功能越来越多,最终会变得巨大,一个项目中很有可能数百万行的代码,互相之间繁琐的jar 包。久而久之,开发的效率低,代码维护困难。还有一个如果想整体应用采用新的技术,新的框架或者语言,那是不可能的,任意模块的漏洞或者错误都会影响整个应用,降低系统的可靠性
微服务加入的优缺点
  • 优点: 将服务拆分成多个单一职责的小的服务,进行单独部署,服务之间通过网络进行通信,每个服务应该有自己单独的管理团队,高度自治,服务各自有各自单独的职责,服务之间松耦合,避免因一个模块的问题导致服务崩溃
  • 缺点
    1. 开发人员需要处理分布式的复杂性
    2. 多服务运维难度,随着服务的增加,运维的压力也在增大
    3. 服务治理和服务监控是关键

SpringCloud

什么是 SpringCloud
  • 官网

    1
    2
    # https://spring.io/projects/spring-cloud
    Spring Cloud provides tools for developers to quickly build some of the common patterns in distributed systems (e.g. configuration management, service discovery, circuit breakers, intelligent routing, micro-proxy, control bus, one-time tokens, global locks, leadership election, distributed sessions, cluster state). Coordination of distributed systems leads to boiler plate patterns, and using Spring Cloud developers can quickly stand up services and applications that implement those patterns. They will work well in any distributed environment, including the developer’s own laptop, bare metal data centres, and managed platforms such as Cloud Foundry.

    springcloud 为开发人员提供了分布式系统中快速构建一些通用模式的工具(例如: 配置管理、服务发现、断路器、微代理、控制总线)。分布式系统的协调导致了锅炉版模式。使用springcloud 开发人员可以快速地建立实现这些模式的服务和应用程序

  • 通俗理解

    springcloud 是一个涵盖多个子项目的开发工具集,集合了众多的开源框架,他利用了Spring Boot 开发的便利性实现了很多功能,如服务注册,服务注册发现,负载均衡等。SpringCloud 在整合过程中主要是针对Neflilx 开源组件的封装,SpringCloud 的出现真正的简化了分布式架构的开发。Netflix 是美国的一个在线视频网站,微服务业的翘楚,它是公认的大规模生产微服务的杰出实践者,Netflix 的开源组件已经在它的大规模分布环境中经过多年的生产实践验证,因此SpringCloud 中很多组件都是基于Netflix

版本选择

IDEA-模拟集群实现

  • 集群配置

  • 配置修改

    -Dserver.port=8082

SpringCloud-环境搭建

环境搭建
  • java 环境

    jdk:1.8
  • 创建父项目(基于springboot 初始化器构建)

    父工程
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    <!-- 父项目的 pom.xml -->
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <modules>
    <module>eureka-server</module>
    </modules>

    <!-- lookup parent from repository -->
    <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.7.6</version>
    <relativePath/>
    </parent>

    <!-- 修改打包方式 pom -->
    <packaging>pom</packaging>

    <groupId>com.example</groupId>
    <artifactId>blr-cloud</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <properties>
    <java.version>1.8</java.version>
    <spring.cloud-version>2021.0.3</spring.cloud-version>
    </properties>

    <dependencyManagement>
    <dependencies>
    <!-- spring cloud -->
    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-dependencies</artifactId>
    <version>${spring.cloud-version}</version>
    <type>pom</type>
    <scope>import</scope>
    </dependency>
    </dependencies>
    </dependencyManagement>

    </project>

  • dependencyManagement dependencies

    • maven 使用dependencyManagement 元素来提供了一种管理依赖版本号的方式,通常会在一个组织者的最顶层的父pom 中看到dependencyManagement 元素

    • 使用pom.xml 中的dependencyManagement 元素能让所有在子项目中引用一个依赖而不需显示的列出版本号,maven 会沿着父子层次向上走,直到找到一个拥有dependencyManagement 元素的项目,然后他就会使用这个dependencyManagement 元素中指定的版本号

    • 这样做的好处就是: 如果有多个子项目都引用同一样的依赖,则可以避免在每个使用的子项目里都声明一个版本号,这样当想升级或切换到另一个版本时,只需在顶层父容器里更新,而不需要一个一个子项目的修改,另外如果某个子项目需要另外的一个版本,只需要声明version 就可以了

    • dependencyManagement 只是声明依赖,并不实现引入,因此子项目需要显示的声明需要使用的依赖,如果子项目中指定了版本号,那么就会使用子项目中指定的jar 版本

服务远程调用
  • 创建父项目

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example.demo</groupId>
    <artifactId>cloud-demo</artifactId>
    <version>1.0</version>

    <modules>
    <module>user-service</module>
    <module>order-service</module>
    </modules>

    <packaging>pom</packaging>

    <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.9.RELEASE</version>
    <relativePath/>
    </parent>

    <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
    <spring-cloud.version>Hoxton.SR10</spring-cloud.version>
    <mysql.version>5.1.47</mysql.version>
    <mybatis.version>2.1.1</mybatis.version>
    </properties>

    <dependencyManagement>
    <dependencies>
    <!-- springCloud -->
    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-dependencies</artifactId>
    <version>${spring-cloud.version}</version>
    <type>pom</type>
    <scope>import</scope>
    </dependency>

    <dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>${mysql.version}</version>
    </dependency>
    <!--mybatis-->
    <dependency>
    <groupId>org.mybatis.spring.boot</groupId>
    <artifactId>mybatis-spring-boot-starter</artifactId>
    <version>${mybatis.version}</version>
    </dependency>
    </dependencies>
    </dependencyManagement>
    <dependencies>
    <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    </dependency>
    </dependencies>
    </project>
  • 创建order-service 子模块

    • 修改xml

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      <dependencies>
      <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
      </dependency>
      <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      </dependency>
      <!--mybatis-->
      <dependency>
      <groupId>org.mybatis.spring.boot</groupId>
      <artifactId>mybatis-spring-boot-starter</artifactId>
      </dependency>
      </dependencies>
    • 创建实体类

      1
      2
      3
      4
      5
      6
      7
      8
      9
      @Data
      public class Order {
      private Long id;
      private Long price;
      private String name;
      private Integer num;
      private Long userId;
      private User user;
      }
      1
      2
      3
      4
      5
      6
      @Data
      public class User {
      private Long id;
      private String username;
      private String address;
      }
    • 创建mapper

      1
      2
      3
      4
      5
      6
      7

      public interface OrderMapper {

      @Select("select * from tb_order where id = #{id}")
      Order findById(Long id);
      }

    • 创建service

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      @Service
      public class OrderService {

      @Autowired
      private OrderMapper orderMapper;

      public Order queryOrderById(Long orderId) {
      // 1.查询订单
      Order order = orderMapper.findById(orderId);
      // 4.返回
      return order;
      }
      }

    • 创建控制器

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      @RestController
      @RequestMapping("order")
      public class OrderController {

      @Autowired
      private OrderService orderService;

      @GetMapping("{orderId}")
      public Order queryOrderByUserId(@PathVariable("orderId") Long orderId) {
      // 根据id查询订单并返回
      return orderService.queryOrderById(orderId);
      }
      }

    • 配置

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      # application.yml
      server:
      port: 8080
      spring:
      datasource:
      url: jdbc:mysql://localhost:3306/cloud_order?useSSL=false
      username: root
      password: root
      driver-class-name: com.mysql.jdbc.Driver
      mybatis:
      type-aliases-package: com.example.user.pojo
      configuration:
      map-underscore-to-camel-case: true
      logging:
      level:
      com.example: debug
      pattern:
      dateformat: MM-dd HH:mm:ss:SSS
    • sql 脚本

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      -- 创建数据库
      create database cloud_order;
      -- 创建表与测试数据
      CREATE TABLE `tb_order` (
      `id` bigint NOT NULL AUTO_INCREMENT COMMENT '订单id',
      `user_id` bigint NOT NULL COMMENT '用户id',
      `name` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '商品名称',
      `price` bigint NOT NULL COMMENT '商品价格',
      `num` int DEFAULT '0' COMMENT '商品数量',
      PRIMARY KEY (`id`) USING BTREE,
      UNIQUE KEY `username` (`name`) USING BTREE
      ) ENGINE=InnoDB AUTO_INCREMENT=109 DEFAULT CHARSET=utf8mb3 ROW_FORMAT=COMPACT

      INSERT INTO tb_order VALUES (101, 1, 'Apple 苹果 iPhone 12 ', 699900, 1);
      INSERT INTO tb_order VALUES (102, 2, '雅迪 yadea 新国标电动车', 209900, 1);
      INSERT INTO tb_order VALUES (103, 3, '骆驼(CAMEL)休闲运动鞋女', 43900, 1);
      INSERT INTO tb_order VALUES (104, 4, '小米10 双模5G 骁龙865', 359900, 1);
      INSERT INTO tb_order VALUES (105, 5, 'OPPO Reno3 Pro 双模5G 视频双防抖', 299900, 1);
      INSERT INTO tb_order VALUES (106, 6, '美的(Midea) 新能效 冷静星II ', 544900, 1);
      INSERT INTO tb_order VALUES (107, 2, '西昊/SIHOO 人体工学电脑椅子', 79900, 1);
      INSERT INTO tb_order VALUES (108, 3, '梵班(FAMDBANN)休闲男鞋', 31900, 1);

  • 创建user-service 子模块

    • 修改pom.xml

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      <dependencies>
      <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
      </dependency>
      <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      </dependency>
      <!--mybatis-->
      <dependency>
      <groupId>org.mybatis.spring.boot</groupId>
      <artifactId>mybatis-spring-boot-starter</artifactId>
      </dependency>
      </dependencies>
    • 创建实体类

      1
      2
      3
      4
      5
      6
      @Data
      public class User {
      private Long id;
      private String username;
      private String address;
      }
    • 创建mapper

      1
      2
      3
      4
      public interface UserMapper {  
      @Select("select * from tb_user where id = #{id}")
      User findById(@Param("id") Long id);
      }
    • 创建service

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      @Service
      public class UserService {

      @Autowired
      private UserMapper userMapper;

      public User queryById(Long id) {
      return userMapper.findById(id);
      }
      }
    • 创建控制器

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21

      @Slf4j
      @RestController
      @RequestMapping("/user")
      public class UserController {

      @Autowired
      private UserService userService;

      /**
      * 路径: /user/110
      *
      * @param id 用户id
      * @return 用户
      */
      @GetMapping("/{id}")
      public User queryById(@PathVariable("id") Long id) {
      return userService.queryById(id);
      }
      }

    • 创建配置

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      server:
      port: 8081
      spring:
      datasource:
      url: jdbc:mysql://localhost:3306/cloud_user?useSSL=false
      username: root
      password: root
      driver-class-name: com.mysql.jdbc.Driver
      mybatis:
      type-aliases-package: com.example.user.pojo
      configuration:
      map-underscore-to-camel-case: true
      logging:
      level:
      com.example: debug
      pattern:
      dateformat: MM-dd HH:mm:ss:SSS
    • 创建sql 脚本

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      -- 创建数据库
      create database cloud_user;

      -- 创建表与测试数据
      CREATE TABLE `tb_user`
      (
      `id` bigint NOT NULL AUTO_INCREMENT,
      `username` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '收件人',
      `address` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '地址',
      PRIMARY KEY (`id`) USING BTREE,
      UNIQUE KEY `username` (`username`) USING BTREE
      ) ENGINE = InnoDB
      AUTO_INCREMENT = 109
      DEFAULT CHARSET = utf8mb3
      ROW_FORMAT = COMPACT

      INSERT INTO tb_user VALUES (1, '柳岩', '湖南省衡阳市');
      INSERT INTO tb_user VALUES (2, '文二狗', '陕西省西安市');
      INSERT INTO tb_user VALUES (3, '华沉鱼', '湖北省十堰市');
      INSERT INTO tb_user VALUES (4, '张必沉', '天津市');
      INSERT INTO tb_user VALUES (5, '郑爽爽', '辽宁省沈阳市大东区');
      INSERT INTO tb_user VALUES (6, '范兵兵', '山东省青岛市');

  • 启动访问测试

    order user
    • 需求: 根据订单id 查询订单功能的同时,把订单所属的用户信息一起返回

      • 修改一: 在order-service 中注册RestTemplate

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        // order-service 启动类下注册 
        @MapperScan("com.example.order.mapper")
        @SpringBootApplication
        public class OrderApplication {

        public static void main(String[] args) {
        SpringApplication.run(OrderApplication.class, args);
        }

        /**
        * 创建 RestTemplate 并注入 spring 容器
        * @return
        */
        @Bean
        public RestTemplate restTemplate() {
        return new RestTemplate();
        }
        }
      • 修改二: 修改order-service 的控制器

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24

        @Service
        public class OrderService {

        @Autowired
        private OrderMapper orderMapper;
        @Autowired
        private RestTemplate restTemplate;

        public Order queryOrderById(Long orderId) {
        // 1.查询订单
        Order order = orderMapper.findById(orderId);
        /******************************************/
        String url = "http://localhost:8081/user/" + order.getUserId();
        // 发起远程调用
        User user = restTemplate.getForObject(url, User.class);
        // 封装 user 到 order
        order.setUser(user);
        /******************************************/
        // 4.返回
        return order;
        }
        }

        实现服务远程调用
服务注册中心介绍
  • 远程调用的问题

    问题原因

    在远程调用过程中,如果书写硬编码则在使用时是很不方便的,如果服务出现集群,那么如何抉择地址呢?在此种问题下,服务注册中心出现,解决上述的问题

  • 什么是服务注册中心

    所谓服务注册中心就是在整个的微服务架构中单独提出一个服务,这个服务不完成系统的任何业务功能,仅仅用来完成对整个服务系统的服务注册和服务发现,以及对服务健康状态的监控和管理功能

    无服务注册中心 服务注册中心
  • 服务注册中心

    1. 可以对所有的微服务的信息进行存储,如服务的名称、IP、端口等
    2. 可以在进行服务调用时通过服务发现查询可用的微服务列表及网络地址进行服务调用
    3. 可以对所有的微服务进行心跳检测,如发现某实例长时间无法访问,就会从服务注册表移除该实例
服务注册中心组件
Eureka
  • 作用

    作用介绍

    不论是服务消费者还是服务提供者,统称client,首先作为客户端(client),client 会注册服务信息到Eureka-Server(注册中心),通过服务拉取,获取信息,在通过远程调用,选择一个进行调用

  • 创建Eureka-Server 模块

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns="http://maven.apache.org/POM/4.0.0"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <!-- 继承父项目 -->
    <parent>
    <artifactId>blr-cloud</artifactId>
    <groupId>com.example</groupId>
    <version>0.0.1-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>eureka-server</artifactId>

    <properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
    <!-- spring-web -->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- eureka-server -->
    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
    </dependency>
    </dependencies>
    </project>
    • 配置

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      # Eureka-Server 的 application.yml
      server:
      port: 8761 # 执行服务端口号

      spring:
      application:
      name: EUREKA-SERVER # 指定服务名称 唯一标识

      eureka:
      instance:
      hostname: localhost # eureka 服务端的实例名称
      client:
      register-with-eureka: false # false表示不在注册中心注册自己
      fetch-registry: false # false 表示自己端就是注册中心,我的职责就是维护服务实例,并不需要去检索服务
      service-url:
      # 设置与 Eureka server 交互的地址查询服务和注册服务都需要依赖这个地址
      defaultZone: http://${eureka.instance.hostname}:${server.port}/eureka/

    • 启动类添加注解

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      package com.example.eureka.server;

      import org.springframework.boot.SpringApplication;
      import org.springframework.boot.autoconfigure.SpringBootApplication;
      import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;

      /**
      * @author coder-itl
      */
      @SpringBootApplication
      @EnableEurekaServer // 开启当前应用是一个服务注册中心
      public class EurekaServer {
      public static void main(String[] args) {
      SpringApplication.run(EurekaServer.class, args);
      }
      }

    • 启动后在浏览器访问:http://localhost:8761/

      需要的关注信息
  • 创建Eureka-Client 模块,这个模块是一个功能应用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns="http://maven.apache.org/POM/4.0.0"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
    <artifactId>blr-cloud</artifactId>
    <groupId>com.example</groupId>
    <version>0.0.1-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>eureka-client</artifactId>

    <properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <dependencies>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
    </dependencies>
    </project>
    • 配置

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      server:
      port: 8762 # 自定义启动端口

      spring:
      application:
      name: EUREKACLIENT

      eureka:
      client:
      register-with-eureka: true # 表示是否将自己注册进 EurekaServer 默认为 true
      fetch-registry: true # 是否从 EurekaServer 抓取已有的注册信息,默认为 true,单节点无所谓,集群必须设置为 true 才能配合 ribbon 使用负载均衡
      service-url:
      # 设置与 Eureka server 交互的地址查询服务和注册服务都需要依赖这个地址
      defaultZone: http://localhost:8761/eureka/

    • 启动类添加注解

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      package com.example.client;

      import org.springframework.boot.SpringApplication;
      import org.springframework.boot.autoconfigure.SpringBootApplication;
      import org.springframework.cloud.netflix.eureka.EnableEurekaClient;

      @SpringBootApplication
      @EnableEurekaClient
      public class EurekaClientApplication {
      public static void main(String[] args) {
      SpringApplication.run(EurekaClientApplication.class, args);
      }
      }

    • 启动顺序为:server->client

    • 启动

      在注册中心查看到已成功注册
  • 自我保护机制

    EurekaClient 会通过心跳的方式去和EurekaServer 进行连接(默认 30s EurekaClient 会发送一次心跳请求,如果超过了90s 还没有发送心跳信息的话,EurekaServer 就认为你宕机了,将当前EurekaClient 从注册表中移除)

    1
    2
    3
    4
    5
    # EurekaServer
    eureka:
    instance:
    lease-renewal-interval-in-seconds: 30 # 心跳的间隔
    lease-expiration-duration-in-seconds: 90 # 多久没发送,就认为你宕机了
    1
    2
    3
    4
    # EurekaClient
    eureka:
    client:
    registry-fetch-interval-seconds: 30 # 默认 每隔多久去更新一下本地的注册表缓存信息
    • Eureka 的自我保护机制,统计15 分钟内,如果一个服务的心跳发送比例低于85%,EurekaServer 就会开启自我保护机制

      • 不会从EurekaServer 中去除长时间没有收到的心跳的服务

      • EurekaServer 还是可以正常提供服务的

      • 网络比较稳定时,EurekaServer 才会开始将自己的信息被其他节点同步过去

        1
        2
        3
        4
        # EurekaServer
        eureka:
        server:
        enable-self-preservation: true # 默认开启自我保护机制
    • CAP 定理

      • C 一致性
      • A 可用性
      • P 分区容错性

      这三个特性在分布式情况环境下,只能满足2,而且分区容错性在分布式环境下,是必须要满足的,只能在AC 之间进行权衡

      • 如果先择CP: 保证了一致性,可能会造成你系统在一定时间内是不可用的,如果你同步数据的时间比较长,造成的损失大
      • Eureka 就是一个AP 的效果,高可用的集群,Eureka 集群是无中心的(无master)Eureka 即便宕机几个也不会影响系统的使用,不需要重新的去推举一个master,也会导致一定时间内数据不一致
  • 集群搭建

    分析
    • EurekaServer 集群实现步骤

      • 创建多个EurekaServer 项目

        idea 通过修改端口模拟集群
      • 引入eureka server 依赖

      • 配置文件

        • node1

          1
          2
          3
          server:
          port: 8761
          http://localhost:8762/eureka, http://localhost:8763/eureka
        • node2

          1
          2
          3
          server:
          port: 8762
          http://localhost:8761/eureka, http://localhost:8763/eureka
        • node3

          1
          2
          3
          server:
          port: 8763
          http://localhost:8761/eureka, http://localhost:8762/eureka
          在同一个EurekaServer 配置文件中,启动时修改如下
      • 在每个启动类添加注解@EnableEurekaServer

  • user-service order-service 作为客户端加入

    • 添加客户端依赖

    • 添加客户端配置

    • 启动类添加客户端注解

    • 修改order-service 的远程调用

      1
      2
      // String url = "http://localhost:8081/user/" + order.getUserId();
      String url = "http://USERSERVICE/user/" + order.getUserId();
      1
      2
      3
      4
      5
      @Bean
      @LoadBalanced // 负载均衡配置
      public RestTemplate restTemplate() {
      return new RestTemplate();
      }
      列表
      负载均衡流程
Zookeeper
  • 创建名为cloud-provider-payment8004 的子模块

  • 修改pom.xml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns="http://maven.apache.org/POM/4.0.0"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
    <artifactId>spring-cloud-parent</artifactId>
    <groupId>com.coderitl.springcloud</groupId>
    <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>cloud-provider-payment8004</artifactId>

    <dependencies>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- 整合 zookeeper 客户端 -->
    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-zookeeper-discovery</artifactId>
    <!-- 先排除自带的 zookeeper 3.5.3 -->
    <exclusions>
    <exclusion>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    </exclusion>
    </exclusions>
    </dependency>
    <!-- 在添加新版本 zookeeper -->
    <dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.9</version>
    </dependency>

    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-devtools</artifactId>
    <scope>runtime</scope>
    <optional>true</optional>
    </dependency>
    <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
    </dependency>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    </dependency>

    <dependency>
    <groupId>com.coderitl.springcloud</groupId>
    <artifactId>cloud-api-commons</artifactId>
    <version>${project.version}</version>
    </dependency>
    </dependencies>

    </project>
  • 创建启动类

    1
    2
    3
    4
    5
    6
    7
    @SpringBootApplication
    @EnableDiscoveryClient // 该注解用于向 consul 或这 zookeeper作为注册中心时注册服务
    public class PaymentMain8004 {
    public static void main(String[] args) {
    SpringApplication.run(PaymentMain8004.class, args);
    }
    }
  • 创建配置文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    # 8004 代表注册到 zookeeper 服务器的支付服务提供者的端口号
    server:
    port: 8004

    # 服务别名-注册 zookeeper 到注册中心名称
    spring:
    application:
    name: cloud-provider-payment
    cloud:
    zookeeper:
    connect-string: 192.168.247.130:2181 # 连接zookeeper服务器地址

  • 创建控制器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    package com.coderitl.springcloud.controller;

    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;

    import java.util.UUID;

    @RestController
    @Slf4j
    public class PaymentController {
    @Value("${server.port}")
    private String serverPort;

    @GetMapping("/paymeny/zk")
    public String paymentzk() {
    return "springCloud with zookeeper: " + serverPort + "\t" + UUID.randomUUID();
    }
    }

  • 启动测试

    • 版本依赖问题

      • 错误问题

        错误问题
        错误问题
      • 解决方案

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        <!--   整合 zookeeper 客户端 -->
        <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-zookeeper-discovery</artifactId>
        <!-- 先排除自带的 zookeeper 3.5.3 -->
        <exclusions>
        <exclusion>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        </exclusion>
        </exclusions>
        </dependency>
        <!-- 在添加新版本 zookeeper -->
        <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.4.9</version>
        </dependency>
    • 服务成功注册进Zookeeper

      查看服务是否注册成功 访问
      查看服务是否注册成功 访问
    • 查看zookeeper 内数据信息

      相信信息
      相信信息
  • 服务启动后生成的节点是临时节点,zookeeper 服务器再一定时间内心跳没有回应,会清理该节点

Consul
  • 特性

    特性 Eureka Nacos Consul Zookeeper
    CAP AP CP+AP CP CP
    健康检查 Client Beat TCP/HTTP/MYSQL/Client Beat TCP/HTTP/gPRC/CMD Keep Alive
    雪崩保护
    自动注销实例 支持 支持 不支持 支持
    访问协议 HTTP HTTP/DNS TTP/DNS TCP
    监听支持 支持 支持 支持 支持
    多数据中心 支持 支持 支持 不支持
    跨注册中心同步 不支持 支持 支持 不支持
    SpringCloud集成 支持 支持 支持 支持
  • Consul 角色

    • client:客户端,无状态,HTTP DNS 接口请求转发给局域网内的服务端集群
    • server:服务端,保存配置信息,高可用集群,每个数据中心的server 数量推荐为3 个或者5
  • 工作原理

    工作原理
    工作原理
  • 官网

    官网: https://www.consul.io/

    中文社区:https://www.springcloud.cc/spring-cloud-consul.html

  • 下载

    下载

  • 两个下载的内容都是consul.exe

    选择amd64
    选择amd64
  • 单节点

    cd 到对应目录consul.exe,使用cmd 启动Consul

    1
    2
    # 启动参数说明: -dev 表示开发模式运行 -server表示服务器模式运行
    consul agent -dev -client=0.0.0.0
    当前目录下cmd 启动consul
    当前目录下cmd 启动consul

    为了方便启动,也可以在consul.exe 同级目录下创建一个脚本来启动

    1
    2
    consul agent -dev -client=0.0.0.0
    pause
    创建启动脚本文件
    创建启动脚本文件
  • 访问

    http://localhost:8500/

    访问测试是否启动成功
    访问测试
  • 创建子model=> cloud-providerconsul-payment8006

  • 创建配置文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    server:
    port: 8006


    spring:
    application:
    name: consul-provider-payment
    cloud:
    consul:
    host: localhost
    port: 8500
    discovery:
    service-name: ${spring.application.name}
  • 修改pom.xml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    <dependencies>
    <!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-consul-discovery -->
    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-consul-discovery</artifactId>
    </dependency>
    <dependency>
    <groupId>com.coderitl.springcloud</groupId>
    <artifactId>cloud-api-commons</artifactId>
    <version>${project.version}</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-devtools -->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-devtools</artifactId>
    <scope>runtime</scope>
    <optional>true</optional>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
    <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-test -->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
    </dependency>

    </dependencies>
  • 主启动

    1
    2
    3
    4
    5
    6
    7
    8
    @SpringBootApplication
    @EnableDiscoveryClient
    public class PaymentMain8006 {
    public static void main(String[] args) {
    SpringApplication.run(PaymentMain8006.class, args);
    }
    }

  • 控制器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    @RestController
    @Slf4j
    public class PaymentController {

    @Value("${server.port}")
    private String serverPort;

    @GetMapping("/payment/consul")
    public String paymentConsul() {
    return "SpringCloud with consul: " + serverPort + "\t" + UUID.randomUUID();
    }
    }

  • 结果页面

    成功入驻
    成功入驻
  • 访问localhost:8006/payment/consul

    访问测试
    访问测试
  • 创建子model=> cloud-consumerconsul-order80

  • 修改pom.xml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    <dependencies>
    <!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-consul-discovery -->
    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-consul-discovery</artifactId>
    </dependency>

    <dependency>
    <groupId>com.coderitl.springcloud</groupId>
    <artifactId>cloud-api-commons</artifactId>
    <version>1.0-SNAPSHOT</version>
    </dependency>


    <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-devtools -->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-devtools</artifactId>
    <scope>runtime</scope>
    <optional>true</optional>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
    <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-test -->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
    </dependency>


    </dependencies>

  • 创建配置文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    server:
    port: 80


    spring:
    application:
    name: consul-consumer-order
    cloud:
    consul:
    host: localhost
    port: 8500
    discovery:
    service-name: ${spring.application.name}

  • 主启动

    1
    2
    3
    4
    5
    6
    7
    @SpringBootApplication
    @EnableDiscoveryClient
    public class OrderConsulMain80 {
    public static void main(String[] args) {
    SpringApplication.run(OrderConsulMain80.class, args);
    }
    }
  • 控制器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    @RestController
    @Slf4j
    public class OrderConsulController {
    // 服务中心的服务名称
    public static final String INVOKE_URL = "http://consul-provider-payment";

    @Resource
    private RestTemplate restTemplate;

    @GetMapping("/payment/consul")
    public String paymentInfo() {
    String result = restTemplate.getForObject(INVOKE_URL + "/payment/consul", String.class);
    return result;
    }
    }
  • 结果页面

    服务消费者注册进Consul 访问80
    服务消费者注册进Consul 访问80

Nacos
  • 服务注册到Nacos

    • 父工程中的dependencyManagement 下添加spring-cloud-alibaba 的管理依赖

      1
      2
      3
      4
      5
      6
      7
      8
      <!--  nacos 的管理依赖  -->
      <dependency>
      <groupId>com.alibaba.cloud</groupId>
      <artifactId>spring-cloud-alibaba-dependencies</artifactId>
      <version>2021.0.1.0</version>
      <type>pom</type>
      <scope>import</scope>
      </dependency>
    • 注释掉服务中原有的eureka 依赖,添加nacos 配置,移除eureka 的配置

      1
      2
      3
      4
      spring:
      cloud:
      nacos:
      server-addr: localhost:8848 # TODO: 添加关于 Nacos 的配置 => nacos服务地址
    • 添加nacos 的客户端依赖

      1
      2
      3
      4
      5
      6
      7
      8
      9
      <dependency>
      <groupId>com.alibaba.cloud</groupId>
      <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
      </dependency>
      <!-- 解决负载均衡 -->
      <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-loadbalancer</artifactId>
      </dependency>
    • 结果页面

      结果页面:服务名称建议小写, 在order-service 配置Nacos作为注册中心
      结果页面
    • nacos 注册中心细节问题

      • 负载均衡依赖

        1
        2
        3
        4
        5
        <!--  2. TODO: 添加负载均衡依赖 -->
        <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-loadbalancer</artifactId>
        </dependency>
        该依赖解决如下错误
        该依赖解决如下错误
      • 远程调用的地址使用小写(主要是通过可视化页面获取)

        1
        2
        // TODO: 使用服务发现,完成服务拉取 修改访问地址(与Eureka名称区别: 小写)
        private static final String URL = "http://user-service8081/user/";
      • 修改/user/system32/drivers/etc/hosts

        1
        2
        3
        # 添加对应的服务名称 与 127.0.0.1 绑定
        # Eg:
        127.0.0.1 user-service8081
        可解决此错误
        可解决此错误
  • 集群属性

    1
    2
    3
    4
    5
    6
    7
    spring:
    cloud:
    nacos:
    server-addr: localhost:8848
    # 实现集群配置
    discovery:
    cluster-name: BJ # 集群名称(自定义名称,也就是机房位置) 例如: 北京 BJ (同区域集群使用同一名称)
  • 调整负载均衡配置

    1
    2
    3
    4
    # user-service8081 名称严格为nacos服务列表中的服务名
    user-service8081:
    ribbon:
    NFLoadBalancerRuleClassName: com.alibaba.cloud.nacos.ribbon.NacosRule # 负载均衡规则
  • Nacos 负载均衡策略

    • 优先选择同集群服务实例列表
    • 本地集群找不到提供者,才去其他集群寻找,并且会报警告
    • 确定了可用实例列表后,再采用随机负载均衡挑选实例
  • 权重配置

    服务器设备性能有差异,部分实例所在机器性能较好,另一些比较差,我们希望性能好的机器承担更多的用户请求:

    Nacos 提供了权重配置来控制访问频率, 权重越大则访问频率越高

    配置权重(数字越小,被访问的概率将会降低)
    配置权重
    • 实例权重控制
      • nacos 控制台可以设置实例的权重值,0-1 之间
      • 同集群内的多个实例,权重越高被访问的频率越高
      • 权重设置为0 则完全不会被访问
  • 环境隔离: namespace

    Nacos 中服务存储和数据存储的最外层都是一个名为namespace 的东西,用来做最外层隔离

    namespace
    namespace
    • 新建命名空间

      新建命名空间 新建命名空间
      新建命名空间 新建命名空间
    • ID 很重要

      重要ID
      重要
    • 添加namespace

      1
      2
      3
      4
      5
      6
      7
      # order-service application.yml
      spring:
      cloud:
      nacos:
      discovery:
      namespace: 9527 # 命名空间填 创建命名空间时的 id【默认可以通过 UUID 自动生成】

      配置完成命名空间
      配置完成命名空间
    • 总结Nacos 环境隔离

      • namespace 用来做环境隔离
      • 每个namespace 都有唯一id
      • 不同的namespace 下的服务不可见
  • 临时实例

    临时实例
    临时实例
  • 临时实例采用心跳检测

  • 非临时实例

    非临时实例(Nacos会主动询问是否健康)
    非临时实例
  • 临时实例和非临时实例配置

    1
    2
    3
    4
    5
    spring:
    cloud:
    nacos:
    discovery:
    ephemeral: false # 设置为非临时实例
  • Eureka Nacos 区别

    • 共同点
      • 都支持服务注册和服务拉取
      • 都支持服务提供者心跳方式做健康检测
    • 区别
      • Nacos 支持服务端主动检测提供者状态: 临时实例采用心跳模式,非临时实例采用主动检测模式
      • 临时实例心跳不正常会被剔除,非临时实例则不会被剔除
      • Nacos 支持服务列表变更的消息推送模式,服务列表更新更及时
      • Nacos 集群默认采用AP 方式,当集群中存在非临时实例时,采用CP 模式,Eureka 采用AP 模式
Ribbon
  • 简介

    SpringCloudRibbon 是基于netflix Ribbon 实现的一套客户端负载均衡的工具

    简单地说,Ribbon Netflix 发布的开源项目,主要的功能是提供客户端的软件负载均衡算法和服务调用,Ribbon 客户端组件提供的一系列完善的配置项如何连接超时的,重试等,简单地说,就是在配制文件中列出Load Balancer(简称LB) 后面所有的机器,Ribbon 会自动地帮助你基于某种规则(如简单轮询,随机连接等)去连接这些机器,我们很容易使用Ribbon 实现自定义的负载均衡算法

  • LB 负载均衡(Load Balance)是什么

    简单地说就是将用户的请求平摊的分配到多个服务上,从而达到系统的HA(高可用)

    常见的负载均衡有软件Nginx,LVS,硬件 F5

  • Ribbon 本地负载均衡客户端VS Nginx 服务端负载均衡的区别

    Nginx 是服务器负载均衡,客户端所有请求都会交给nginx,然后由nginx 实现转发请求,负载均衡是由服务端实现的

    Ribbon 本地负载均衡,在调用微服务接口的时候,会在注册中心上获取注册信息服务列表之后缓存到JVM 本地,从而实现RPC 远程服务调用技术

  • 使用

    • 依赖问题

      新版Eureka 已经引入Ribbon
      新版Eureka已经引入Ribbon
    • Ribbion 的负载均衡和Rest 调用

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      // 返回对象响应体中数据转换成的对象,基本可以理解为 JSON (restTemplate.getForObject)
      @GetMapping("/get/{id}")
      public CommonResult<Payment> getPayment(@PathVariable("id") Long id) {
      log.info("OrderController id: {}", id);
      // 写操作: postForObject
      return restTemplate.getForObject(PAYMENT_URL + "/payment/get/" + id, CommonResult.class);
      }


      // 返回对象为 ResponseEntity对象,包含响应中的一些重要信息,必须响应头,响应状态码 响应体等
      @GetMapping("/getForEntity/{id}")
      public CommonResult<Payment> getPayment2(@PathVariable("id") Long id) {
      log.info("OrderController id: {}", id);
      ResponseEntity<CommonResult> entity = restTemplate.getForEntity(PAYMENT_URL + "/payment/get/" + id, CommonResult.class);
      if (entity.getStatusCode().is2xxSuccessful()) {
      return entity.getBody();
      } else {
      return new CommonResult(444, "操作失败");
      }
      }

      // restTemplate.postForEntity(PAYMENT_URL + "/payment/get/" + id, CommonResult.class).getBody();

    • Ribbon 核心组件IRule

      IRule:根据特定算法中从服务列表中选取一个要访问的服务

      1
      2
      3
      4
      5
      6
      7
      8
      9
      // IRule 是一个接口 作用范围: 全体服务
      public interface IRule {
      Server choose(Object var1);

      void setLoadBalancer(ILoadBalancer var1);

      ILoadBalancer getLoadBalancer();
      }

      关系图
  • 7种负载均衡方式

    • com.netflix.loadbalancer.RoundRobinRule 轮询
    • com.netflix.loadbalancer.RandomRule 随机
    • com.netflix.loadbalancer.RetryRule 先按照轮询的策略获取服务,如果获取服务失败则在指定时间内会进行重试,获取可用的服务
    • WeightedResponseTimeRule 轮询的扩展,响应速度越快的实例选择权重越大,越容易被选择
    • BestAvailableRule 会先过滤掉由于多次访问故障而处于断路跳闸状态的服务,然后选择一个并发最下的服务
    • AvailabiltiyFilteringgRule 先过滤掉故障实例,在选择并发较小的实例
    • ZoneAvoidanceRule:默认规则,复合判断server 所在区域的性能和server 的可用性选择服务器
  • 负载均衡流程

    负载均衡流程
    负载均衡流程
  • 负载均衡原理

    负载均衡算法: rest 接口第几次请求数 % 服务器集群总数量 = 实际调用服务器位置的下标,每次服务器重启后rest 接口计数从1 开始

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    List<ServiceInstance> instances = discoryClient.getInstances("CLOUD-PAYMENT-SERVICE");

    如:
    List[0] instances = 127.0.0.1:8001
    List[1] instances = 127.0.0.1:8002

    8001 + 8002 => 组合为集群,他们共计 2 台机器,集群总数为 2 按照轮询算法原理:
    当总请求数为 1 时: 1 % 2 = 1 对应下标位置为 1,则获取服务器地址为: 127.0.0.1:8002
    当总请求数为 2 时: 2 % 2 = 0 对应下标位置为 0,则获取服务器地址为: 127.0.0.1:8001
    当总请求数为 3 时: 3 % 2 = 1 对应下标位置为 1,则获取服务器地址为: 127.0.0.1:8002
    当总请求数为 4 时: 4 % 2 = 0 对应下标位置为 0,则获取服务器地址为: 127.0.0.1:8001
    ....

  • 如何替换(全局替换)

    • 配置细节

      官方文档明确给出警告:

      ​ 这个自定义配置类不能放在@ComponentScan 所扫描的当前包下以及子包下,否则我们自定义的这个配置类就会被所有的Ribbon 客户端所共享,达不到特殊定制化的目的了,也就是在启动类(@SpringBootApplication 复合注解,内部有ComponenScan)的父包外

    • 新建包package com.coderitl.myrule;

      1
      2
      3
      4
      5
      6
      7
      8
      @Configuration
      public class MySelfRule {
      @Bean
      public IRule myRule() {
      // 定义为随机(其他的同理 new XX) 作用范围: 局部服务生效
      return new RandomRule();
      }
      }
    • 主启动添加注解

      1
      2
      3
      4
      5
      6
      7
      8
      9
      @SpringBootApplication
      @EnableEurekaClient
      // 新添加
      @RibbonClient(name = "CLOUD-PAYMENT-SERVICE", configuration = MySelfRule.class)
      public class OrderMain80 {
      public static void main(String[] args) {
      SpringApplication.run(OrderMain80.class, args);
      }
      }
    • 访问http://localhost/consumer/payment/get/1 端口就变为随机的

      随机验证
      随机验证
  • 基于某个配置文件替换

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    # 单独定格书写
    userservice: # 是一个名为 userservice 的服务 ->服务名称
    ribbon:
    NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule # 负载均衡规则

    # 日志配置
    logging:
    level:
    com.coderitl: debug # 指定包 debug 级别
    pattern:
    dataformat: MM-dd HH:mm:ss:SSS
  • 两种方式的区别

    • 代码方式: 配置灵活,但修改时需要重新打包发布
    • 配置方式: 直观、方便、无需重新打包发布,但是无法做全局配置
  • 饥饿加载

    Ribbon 默认是采用懒加载,即第一次访问时才会创建LoadBalanceClient,请求时间会很长,而饥饿加载则会在项目启动时创建,降低第一次访问的耗时,配置如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    ribbon:
    eager-load:
    enable: true # 开启饥饿加载
    clients: userservice # 指定对 userservice 这个服务饥饿加载

    # 多个服务配置方式
    clients:
    - a
    - b

配置管理
  • Nacos 配置管理

    Nacos 配置管理
    Nacos配置管理
  • 新增配置文件

    新增配置文件
    新增配置文件
    创建流程
    创建流程
  • 配置文件加载流程

    未加入nacos 读取nacos 配置文件
    未加入nacos前 读取nacos配置文件
  • 配置统一管理步骤

    • 添加依赖

      1
      2
      3
      4
      5
      <!-- nacos 配置管理依赖 -->
      <dependency>
      <groupId>com.alibaba.cloud</groupId>
      <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
      </dependency>
    • userservice 中的resource 目录下创建一个bootstrap.yml 文件,这个文件是引导文件,优先级高于application.yml

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      # TODO: 2. 新建 bootstrap.yml 文件
      spring:
      application:
      name: userservice # 服务名称【大小写与服务名一致】
      profiles:
      active: dev # 开发环境 这里是 dev
      cloud:
      nacos:
      server-addr: localhost:8848 # Nacos 地址
      config:
      file-extension: yaml # 文件后缀名
      文件分析
      文件分析
      • 删除主配置文件中重复配置

        删除主配置文件中重复配置
        删除主配置文件中重复配置
    • 读取userservice-dev.yaml 配置文件中的属性

      1
      2
      3
      4
      5
      6
      <!-- 补充一个依赖 -->
      <!-- TODO: 新版本 nacos 用于加载 bootstrap.yml -->
      <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-bootstrap</artifactId>
      </dependency>
      无此依赖出现如下问题(新版nacos) 添加后
      无此依赖出现如下问题 添加后
      1
      2
      3
      # userservice-dev.yaml
      pattern:
      dataformat: yyyy-MM-dd HH:mm:ss
      1
      2
      3
      4
      5
      // TODO: 5. 添加访问配置文件的控制器
      @GetMapping("/now")
      public String getDataformat() {
      return LocalDateTime.now().format(DateTimeFormatter.ofPattern(dataformat));
      }
      读取userservice.yml
      读取userservice.yml
  • 方式一:在@Value 注入的变量所在类添加注解@RefreshScope

    实现热更新=>dataformat 更改为 dateformat
    实现热更新
  • 方式二: 使用@ConfigurationProperties 注解(推荐)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    //  配置类
    @Data
    @Component
    @ConfigurationProperties(prefix = "pattern") // 前缀 + dateformat => 组成完整字段
    public class PatternProperties {
    // TODO: 注意匹配 nacos 定于的属性名,切勿写错
    private String dateformat;
    }

    1
    2
    3
    4
    5
    6
    7
    8
    9
    // TODO: 7. 热加载方式二: 使用配置类
    @Autowired
    private PatternProperties patternProperties;

    // TODO: 5. 添加访问配置文件的控制器
    @GetMapping("/now")
    public String getDataformat() {
    return LocalDateTime.now().format(DateTimeFormatter.ofPattern(patternProperties.getDateformat()));
    }
  • 多环境共享

    不是所有的配置都适合放到配置中心,维护起来比较麻烦;建议将一些关键参数,需要运行时调整的参数放到 nacos 配置中心,一般都是自定义配置

    • 微服务启动会从nacos 读取多个配置文件

      • [spring.application.name]-[spring.profiles.active].yaml => userservice-dev.yaml
      • [spring.application.name].yaml => userservice.yaml

      无论profile 如何变化,[spring.application.name].yaml 这个文件按一定会被加载,因此对环境共享配置可以写入这个文件

    • 创建

      创建spring.application=> userservice
      创建
    • 创建多环境共享环境IDEA => 端口模拟实现

    • java

      1
      2
      3
      // 在 配置类中添加
      // TODO: 用于读取多环境配置数据
      private String envSharedValue;
      1
      2
      3
      4
      5
      6
      7
      8
      @Autowired
      private PatternProperties patternProperties;

      // TODO: 8. 配置多环境共享配置读取测试
      @GetMapping("/prop")
      public PatternProperties patternProperties() {
      return patternProperties;
      }
    • 访问测试

      成功读取多环境共享数据
      成功读取多环境共享数据
  • 多种配置的优先级(相同数据读取时显示内容遵循如下规则)

    服务名-profile.yaml(userservice-dev.yaml) > 服务名称.yaml(userservice.yaml) > 本地配置(application.yaml)

  • 集群

    集群
    集群
  • 集群步骤

    • 搭建数据库,初始化数据库表结构
    • 下载nacos 安装包
    • 配置nacos
    • 启动nacos 集群
    • nginx 反向代理
  • 起步配置

    1. 进入nacos conf 目录,修改配置文件cluster.conf.example,重命名为cluster.conf

      配置如下信息
      配置如下信息
    2. 修改\nacos\conf\application.properties 文件,添加数据库配置

      1
      2
      3
      4
      5
      6
      # 以下内容被注释,取消注释修改局部信息即可
      spring.datasource.platform=mysql
      db.num=1
      db.url.0=jdbc:mysql://127.0.0.1:3306/nacos_config?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC # 除了 nacos_config 数据库名称修改外,其他参数请勿修改
      db.user.0=root
      db.password.0=root
    3. 在连接的数据库上执行sql 脚本

    4. nacos 复制3,再修改各自对应端口号

      1
      2
      3
      nacos1 =>8845
      nacos2 =>8846
      nacos3 =>8847
      修改各自端口号Eg: nacos1\conf\application.properties=> server.port=8845
      修改各自端口号
    5. 然后分别启动三个节点

      然后分别启动三个节点
      然后分别启动三个节点
    6. Nginx 反向代理配置(其他内容为注释,可以选择全部替换,或者只替换http节点)

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      #user  nobody;
      worker_processes 1;

      events {
      worker_connections 1024;
      }

      http {
      include mime.types;
      default_type application/octet-stream;
      sendfile on;
      keepalive_timeout 65;

      upstream nacos-cluster{
      server 127.0.0.1:8845;
      server 127.0.0.1:8846;
      server 127.0.0.1:8847;
      }
      server {
      listen 80;
      server_name localhost;
      # 配置 nacos
      location /nacos {
      proxy_pass http://nacos-cluster;
      }
      error_page 500 502 503 504 /50x.html;
      location = /50x.html {
      root html;
      }
      }
      }

    7. 访问localhost/nacos => 用户名密码: nacos

      访问测试
      访问测试
    8. nacos 配置可以存储在数据库中

      数据库查看配置
      数据库查看配置
Feign
  • 是什么

    Feign 是一个声明式WebService 客户端,使用Feign 能让编写Web Service 客户端更加简单.

    他的使用方法是定义一个服务接口然后再上面添加注解,Feign 也支持可插拔式的编码器和解码器,SpringCloud Feign 进行了封装,使其支持SpringMVC 标准注解和HttpMessageConvertersFegin 可以与Eureka Ribbon 组合使用以支持负载均衡

  • Feign 能干什么

    Feign 旨在使编写Java Http 客户端变得更容易。

    前面使用Ribbon + RestTemplate 时,利用RestTemplate http 请求的封装处理,形成了一套模板化的调用方法,但是在实际开发中,由于对微服务依赖的调用可能不止一处, 往往一个接口会被多处调用,所以通常会针对每个微服务自行封装一些客户端类来包装这些以来服务的调用 ,所以,Fegin 在此基础上做了进一步封装,由他来帮助我们定义和实现依赖服务接口的定义,Feign 的实现下, 我们只需要创建一个接口并使用注解的方式来配置它(以前是 Dao接口上面标注 Mapper注解,现在是一个微服务接口上面标注一个,Feign注解即可) ,即可完成对微服务提供方的接口绑定,简化了使用SpringCloud Ribbon,自动封装服务调用客户端的开发量

  • Feign 集成了Ribbon

    利用Ribbon 维护了Payment 的服务列表信息,并且通过轮询实现类客户端的负载均衡,而与Ribbon 不同的是,通过feign 只需要定义服务绑定接口且以声明式的方法,优雅而简单的实现了服务调用

  • Feign openFeign 的区别

    Feign openFeign
    Feign SpringCloud 组件中的一个轻量级RestFul HTTP 服务客户端Feign 内置了Riboon,用来做客户端负载均衡,去调用服务注册中心的服务,Feign 的使用方式是: 使用Feign 的注解定义接口,调用这个接口,就可以调用服务注册中心的服务 openfeign SpringCloud Feign 的基础上支持了SpringMVC 的注解,@RequesMapping 等,openfeign @FeignClient 可以解析SpringMVC @RequestMapping 注解下的接口,并通过动态代理的方式产生实现类,实现类中做负载均衡并调用其他服务
    <dependency>
    org.springframework.cloud
    spring-cloud-starter-feign
    <dependency>
    org.springframework.cloud
    spring-cloud-starter-openfeign
  • Feign 的使用步骤

    • 引入依赖

      1
      2
      3
      4
      5
      <!-- TODO: 1. 添加 openFeign 依赖 -->
      <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-openfeign</artifactId>
      </dependency>
    • 主启动添加@EnableFeignClients 注解

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      @MapperScan("com.coderitl.springcloud.mapper")
      @SpringBootApplication
      @EnableFeignClients
      public class OrderApplication {
      public static void main(String[] args) {
      SpringApplication.run(OrderApplication.class, args);
      }
      @Bean
      @LoadBalanced
      public RestTemplate getRestTemplate() {
      return new RestTemplate();
      }
      }
    • 编写FeignClient 接口

      1
      2
      3
      4
      5
      6
      7
      8
      // TODO: 3. 创建远程调用接口 user-service8081=> 服务名称(必须的)
      @FeignClient("userservice")
      public interface UserClient {
      // private static final String URL = "http://userservice/user/"+order.getUserId();
      @GetMapping("/user/{id}")
      User findById(@PathVariable("id") Long id);
      }

    • 使用FeignClient 中定义的方法代理RestTemplate

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      @RestController
      @RequestMapping("/order")
      public class OrderController {

      @Autowired
      private OrderService orderService;
      @Autowired
      private UserClient userClient;

      // TODO: 4. 删除 RestTemplate
      @GetMapping("/{orderId}")
      public Order queryOrderByUserId(@PathVariable("orderId") Long orderId) {
      // 根据id查询订单并返回
      Order order = orderService.queryOrderById(orderId);
      // TODO: 5. 使用 feign 远程调用
      User user = userClient.findById(order.getUserId());
      order.setUser(user);
      return order;
      }
      }
    • 访问测试

      feign 实现远程调用
      feign实现远程调用
    • feign 实现了负载均衡内部集成了 Ribbon

  • Feign 运行自定义配置来覆盖默认配置,可以修改的配置如下

    类型 作用 说明
    feign。Logger.Level 修改日志级别 包含四种不同的级别NONE BASIC HEADERS FULL
    feign.codec.Decoder 响应结果的解析器 HTTP 远程调用的结果做解析,例如解析json 字符串为java 对象
    feign.codec.Encoder 请求参数编码 将请求参数编码,便于通过http 请求发送
    feign.Contract 支持的注解格式 默认是SpringMVC 的注解
    feign.Retryer 失败的重试机制 请求失败的重试机制,默认是没有,不过会使用Ribbon 的重试

    一般只需要配置日志级别

  • 配置日志有两种方式

    • 方式一: 配置文件

      • 全局生效

        1
        2
        3
        4
        5
        6
        # TODO: 实现日志配置的第一种方式
        feign:
        client:
        config:
        default: # 这里是用 default 就是全局配置,如果是写服务名称,则是针对某个微服务的配置
        loggerLevel: FULL # 日志级别
      • 局部生效

        1
        2
        3
        4
        5
        6
        # TODO: 实现日志配置的第一种方式
        feign:
        client:
        config:
        userservice: # 这里是用 default 就是全局配置,如果是写服务名称,则是针对某个微服务的配置
        loggerLevel: FULL # 日志级别
    • 配置方式二: Java 代码方式,需要先声明一个Bean

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      import feign.Logger;
      import org.springframework.context.annotation.Bean;

      // TODO: 创建 Bean 用于配置 feign 的日志
      public class DefaultFeignConfiguration {
      @Bean
      public Logger.Level level() {
      return Logger.Level.BASIC;
      }
      }

      • 而后如果是全局配置,则把它放到@EnableFeignClients 这个注解中(主启动)

        1
        2
        3
        4
        5
        6
        7
        8
        ...
        // defaultConfiguration = DefaultFeignConfiguration.class
        @EnableFeignClients(defaultConfiguration = DefaultFeignConfiguration.class)
        public class OrderApplication {
        public static void main(String[] args) {
        SpringApplication.run(OrderApplication.class, args);
        }
        }
      • 如果是局部配置,则把它放在@FeignClient 这个注解中(定义远程调用处)

        1
        2
        3
        4
        5
        6
        7
        8
        // TODO: configuration = DefaultFeignConfiguration.class
        @FeignClient(value = "userservice",configuration = DefaultFeignConfiguration.class)
        public interface UserClient {
        // private static final String URL = "http://user-service8081/user/";
        @GetMapping("/user/{id}")
        User findById(@PathVariable("id") Long id);
        }

  • Feign 底层的客户端实现

    • URLConnection: 默认实现,不支持连接池
    • Apache HttpClient 支持连接池
    • OKHttp 支持连接池
  • 优化性能主要包括

    • 使用连接池代替默认的URLConnection
    • 日志级别,最好用basic | none
  • 实现

    • 添加依赖

      1
      2
      3
      4
      5
      <!--  TODO: feign性能优化=>httpclient -->
      <dependency>
      <groupId>io.github.openfeign</groupId>
      <artifactId>feign-httpclient</artifactId>
      </dependency>
    • 配置

      1
      2
      3
      4
      5
      6
      # TODO: 配置连接池
      feign:
      httpclient:
      enabled: true # 开启 feign 对 httpClient 的支持
      max-connections: 200 # 最大连接数
      max-connections-per-route: 50 # 每个路径的最大连接数
  • 方式一:

    通过继承的方式,给消费者的FeignClient 和提供者的Controller 定义统一的父接口作为标准

    继承
    继承
  • 方式二(抽取)

    FeignClient 抽取为独立模块,并且把接口有关的POJO(实体类),默认的Feign 配置都放到这个模块中,提供给所有消费者使用

    抽取
    抽取
  • 实现方式二的步骤

    • 首先创建一个model,命名为feign-api(自定义,无要求名称),然后引入feign starter 依赖
    • order-service 中编写的UserClient,User,DefaultFeignConfiguraton 都复制到feign-api 项目中
    • order-service 中引入feign-api 依赖
    • 修改order-service 中的所有与上述组件有关的import 部分,改成导入feign-api 中的包
    • 重启测试
  • 当定义的FeignClient 不在SpringBootApplication 的扫描包范围时,这些FeignClient 无法使用,两种解决方案

    • 方式一: 指定FeignClient 所在包

      1
      2
      3
      // order-service: com.coderitl.springcloud 下的主启动类
      // feign-api: com.coderitl.feign 不在 orderservice的主启动扫描包之下
      @EnableFeignClients(basePackages="com.coderitl.feign.clients")
    • 指定FeignClient 字节码

      1
      2
      // 推荐 => 用哪个加载那个
      @EnableFeignClients(clients={UserClient.class})
Gateway
  • 引入

    网关
    网关
  • 网关功能

    • 身份认证和权限校验
    • 服务路由、负载均衡
    • 请求限流
  • SpringCloud 中网关的实现有两种

    • gateway
    • zuul

    Zuul 是基于Servlet 的实现,属于阻塞式编程,SpringCloudGateway 则是基于Spring5 中提供的WebFlux,属于响应式编程的实现,具备更好的性能

  • 创建新的model,引入SpringCloudGateway 的依赖和nacos 的服务发现依赖

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    <!-- TODO: 1. 添加网关所需依赖 -->
    <dependencies>

    <dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
    </dependency>
    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-gateway</artifactId>
    </dependency>

    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-loadbalancer</artifactId>
    </dependency>
    </dependencies>
  • 编写路由配置以及nacos 地址

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    # TODO: 编写路由以及 nacos 地址(网关也需要注册到注册中心)
    server:
    port: 10010

    spring:
    application:
    name: gateway # 服务名称
    cloud:
    nacos:
    discovery:
    server-addr: localhost:8848 # nacos 地址
    gateway:
    routes: # 网关路由配置
    - id: order-service # 路由 id,自定义,只要唯一即可
    uri: lb://order-service8088 # 路由的目标地址 lb:LoadBalanced(负载均衡) 后面跟服务名称
    predicates:
    - Path=/order/** # 这个是按照路径匹配 只要一 order 开头就符合
    - id: user-service # 路由 id,自定义,只要唯一即可
    # uri: http://127.0.0.1:8081 路由的目标地址 http 就是固定地址
    uri: lb://userservice # 路由的目标地址 lb:LoadBalanced(负载均衡) 后面跟服务名称
    predicates:
    - Path=/user/** # 这个是按照路径匹配 只要一 user 开头就符合
  • 访问测试

    通过网关成功访问服务
    通过网关成功访问服务
  • 流程

    业务成功访问经过流程
    业务成功访问经过流程
  • 网关路由可以配置的内容包括

    • 路由id: 路由唯一标识
    • uri: 路由目的地,支持lb 和 http 两种
    • predicatest:路由断言,判断请求是否符合要求,符合则转发到路由目的地
    • filters:路由过滤器,处理请求或响应
  • 路由断言工厂

    我们在配置文件中写的断言规则只是字符串,这些字符串会被Predicate Factory 读取并处理,转变为路由判断的条件

    • 例如: Path=/user/** 是按照路径匹配,这个规则是由org.springframework.cloud.gateway.handler.predicate.PathRoutePredicateFactory 类来处理的

    • 像这样的断言工厂在SpringCloudGateway 还有11

      https://docs.spring.io/spring-cloud-gateway/docs/current/reference/html/#gateway-request-predicates-factories

      名称 说明 实例
      After 是某个时间点之后的请求 - After=2017-01-20T17:42:47.789-07:00[America/Denver]
      Before 是某个时间点之前的请求 - Before=2017-01-20T17:42:47.789-07:00[America/Denver]
      Between 是某两个时间点之前的请求 - Between=2017-01-20T17:42:47.789-07:00[America/Denver], 2017-01-21T17:42:47.789-07:00[America/Denver]
      Cookie 请求必须包含某些cookie - Cookie=chocolate, ch.p
      Header 请求必须包含某些header - Header=X-Request-Id, \d+
      Host 请求必须是访问某个host(域名) - Host=**.somehost.org,**.anotherhost.org
      Method 请求方式必须是指定方式 - Method=GET,POST
      Path 请求路径必须符合指定规则 - Path=/red/{segment},/blue/{segment}
      Query 请求参数必须包含指定参数 - Query=green
      RemoteAddr 请求者的ip 必须是指定范围 - RemoteAddr=192.168.1.1/24
      Weight 权重处理 - Weight=group1, 2
    • 总结

      • PredicateFactory 的作用是什么

        读取用户定义的断言条件,对请求做出判断

      • Path=/user/** 是什么含义

        路径是以/user 开头的就认为是符合的

  • 过滤器

    https://docs.spring.io/spring-cloud-gateway/docs/current/reference/html/#the-addrequestheader-gatewayfilter-factory

    GatewayFilter 是网关中提供的一种过滤器,可以对进入网关的请求和微服务返回的响应做处理

    过滤器
    过滤器
  • 过滤器工厂

    Spring 提供了31 种不同的路由过滤器工厂

    名称 说明
    AddRequestHeader 给当前请求添加一个请求头
    RemoveRequestHeader 移除请求种的一个请求头
    AddResponseHeader 给响应结果中添加一个响应头
    RemoveResponseHeader 从响应结果中移除一个响应头
    RequestRateLimiter 限制请求的流量
    ...
  • 局部过滤器配置

    1
    2
    filters:
    - AddRequestHeader=X-Request-red, blue
  • 默认配置

    1
    2
    default-filters: # 默认过滤器 会对所有的路由请求都生效
    - AddRequestHeader=X-Request-red, blue
  • 全局过滤器

    全局过滤器的作用也是处理一切进入网关的请求和微服务响应,GatewayFilter 的作用一样
    区别在于 GatewayFilter 通过配置定义,处理逻辑是固定的,GlobalFilter 的逻辑需要自己写代码实现
    定义方式是实现 GlobalFilter 接口

    1
    2
    3
    4
    5
    6
    7
    8
    9
    /**
    * 处理当前请求,有必要的话通过{@link GatewayFilterChain} 将请求交给下一个过滤器处理
    * @param exchange 请求上下文,里面可以获取 Request Response 等信息
    * @param chain 用来把请求委托给下一个过滤器
    * @return {@code Mono}返回表示当前过滤器业务结束
    */
    public interface GlobalFilter {
    Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain);
    }
  • 案例实现

    需求: 定义全局过滤器,拦截请求,判断请求的参数是否满足下面条件

    • 参数中是否有authorization

    • authorization 参数值是否为admin

      如果同时满足则放行,否则拦截

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      // 加载顺序由两种实现: 注解 | 实现接口 Ordered (两者选其一) 一定要由顺序
      @Order(-1)
      @Component
      public class AuthorizeFilter implements GlobalFilter, Ordered {
      @Override
      public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
      // 1. 获取请求参数
      ServerHttpRequest request = exchange.getRequest();
      MultiValueMap<String, String> params = request.getQueryParams();
      // 2. 获取参数中的 authorization 参数
      String auth = params.getFirst("authorization");
      // 3. 判断参数值是否等于 admin
      if ("admin".equals(auth)) {
      // 4. 是 放行
      return chain.filter(exchange);
      }
      // 5. 否 拦截
      // 优化体验:
      // 5.1 设置状态码
      exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
      // 5.2 拦截请求
      return exchange.getResponse().setComplete();
      }

      // Ordered 方法重写
      @Override
      public int getOrder() {
      return -1;
      }
      }

      过滤器
      过滤器
  • 过滤器的执行顺序

    请求进入网关后会碰到三类过滤器: 当前路由的过滤器,DefaultFilter GlobalFilter

    请求路由后,会将当前路由过滤器和DefaultFilter GlobalFilter 合并到一个过滤器链(集合)中,排序后依次执行每个过滤器

    过滤器的执行顺序
    过滤器的执行顺序
    • 执行顺序
      • 每一个过滤器都必须指定一个int 类型的order 值,order 值越小,优先级越高,执行顺序越靠前
      • GlobalFilter 通过实现Ordered 接口,或者添加@Order 注解来指定order
      • 路由过滤器和defaultFiltert order Spring 指定,默认是按照声明顺序从1 递增(声明也就是书写出现的顺序)
      • 当过滤器的order 值一样时,会按照defaultFilter > 路由过滤器链 > GlobalFilter的顺序执行
  • 跨域问题处理

    跨域: 域名不一致就是跨域,主要包括

    • 域名不同: www.taobap.com 和 www.taobao.org 等
    • 域名相同,端口不同: localhost:8080 | localhost:8081

    跨域问题: 浏览器禁止请求的发起者与服务端发生跨域ajax 请求,请求被浏览器拦截的问题

  • 配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    # TODO: 编写路由以及 nacos 地址(网关也需要注册到注册中心)
    server:
    port: 10010

    spring:
    application:
    name: gateway # 服务名称
    cloud:
    nacos:
    discovery:
    server-addr: localhost:8848 # nacos 地址
    gateway:
    routes: # 网关路由配置
    - id: order-service # 路由 id,自定义,只要唯一即可
    uri: lb://order-service8088 # 路由的目标地址 lb:LoadBalanced(负载均衡) 后面跟服务名称
    predicates:
    - Path=/order/** # 这个是按照路径匹配 只要一 order 开头就符合
    - id: user-service # 路由 id,自定义,只要唯一即可
    # uri: http://127.0.0.1:8081 路由的目标地址 http 就是固定地址
    uri: lb://userservice # 路由的目标地址 lb:LoadBalanced(负载均衡) 后面跟服务名称
    predicates:
    - Path=/user/** # 这个是按照路径匹配 只要一 user 开头就符合
    filters:
    - AddRequestHeader=Truth,coder-itl is Student....
    default-filters:
    - AddRequestHeader=Truth,coder-itl is Teacher....
    globalcors: # 全局跨域处理
    add-to-simple-url-handler-mapping: true # 解决 options 请求被拦截问题
    cors-configurations:
    '[/**]':
    allowedOrigins: # 允许那些网站跨域请求
    - "http://localhost:8089"
    - "http://baidu.com"
    allowedMethods:
    - "GET"
    - "POST"
    - "DELETE"
    - "PUT"
    - "OPTIONS"
    allowedHeaders:
    - "*" # 允许请求中携带的头信息
    allowCredentials: true # 是否允许携带 cookie
    maxAge: 360000 # 这次跨域检测的有效期,在有效期内直接放行,减轻服务器压力

Docker
  • Docker 和虚拟机的差异
    • docker 是一个系统进程,虚拟机是在操作系统中的操作系统
    • docker 体积小,启动速度快,性能好,虚拟机体积大,启动速度慢,性能一般
  • 镜像和容器
    • 镜像(Image): Docker 将应用程序及器所需的依赖、函数库、环境、配置文件打包在一起,称为镜像
    • 容器(Container): 镜像中的应用程序运行后形成的进程就是容器,只是Docker 会给容器做隔离,对外不可见
  • 基于Centos7 操作

  • 卸载(如果之前安装过旧版本的Docker)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    yum remove -y docker \
    docker-client \
    docker-client-latest \
    docker-common \
    docker-latest \
    docker-latest-logrotate \
    docker-logrotate \
    docker-selinux \
    docker-engine-selinux \
    docker-engine \
    docker-ce
  • 安装docker

    • 安装yum 工具

      1
      2
      3
      yum install -y yum-utils \
      device-mapper-persistent-data \
      lvm2 --skip-broken
    • 更新本地镜像源

      1
      2
      3
      4
      5
      6
      7
      yum-config-manager \
      --add-repo \
      https://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo

      sed -i 's/download.docker.com/mirrors.aliyun.com\/docker-ce/g' /etc/yum.repos.d/docker-ce.repo

      yum makecache fast
    • 安装docker

      1
      2
      # docker-ce 为社区免费版
      yum install -y docker-ce
    • 关闭防火墙

      1
      2
      3
      4
      # 关闭
      systemctl stop firewalld
      # 禁止开启启动防火墙
      systemctl disable firewalld
    • 启动docker

      1
      systemctl start docker
    • 检查是否启动

      1
      2
      3
      systemctl status docker

      docker -v
      启动成功
      启动成功
    • docker 相关命令

      1
      2
      3
      4
      5
      6
      # 启动 docker 服务
      systemctl start docker
      # 停止 docker 服务
      systemctl stop docker
      # 重启 docker 服务
      systemctl restart docker
    • 配置镜像加速

      https://cr.console.aliyun.com/cn-hangzhou/instances/mirrors

      1
      2
      3
      4
      5
      6
      7
      8
      sudo mkdir -p /etc/docker
      sudo tee /etc/docker/daemon.json <<-'EOF'
      {
      "registry-mirrors": ["https://eyy45bvx.mirror.aliyuncs.com"]
      }
      EOF
      sudo systemctl daemon-reload
      sudo systemctl restart docker
  • Docker操作

    • 基本操作

      基本操作
      基本操作
    • 利用docker save nginx 镜像导出磁盘,然后再通过load 加载回来

      1
      2
      # 导出
      docker save -o nginx.tar nginx:latest
      实现
      实现
    • 删除镜像

      1
      docker rmi nginx:latest
    • 加载镜像

      1
      docker load  -i nginx.tar
      加载镜像
      加载镜像
  • 容器相关操作

    容器操作
    容器操作
    • 创建运行一个Nginx 容器

      1
      2
      3
      4
      5
      6
      7
      docker run --name containerName -p 80:80 -d nginx

      docker run: 创建并运行让其
      --name: 给容器起一个名字
      -p: 将宿主机端口与容器端口映射,冒号左侧是宿主机端口,右侧是容器端口
      -d: 后台运行
      nginx: 镜像名称
  • 数据卷

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    docker volume [COMMAND]

    [COMMAND]:

    create: 创建一个 volume
    inspect: 显示一个或多个 volume 的信息
    ls: 列出所有的 volume
    prune: 删除未使用的 volume
    rm: 删除一个或多个指定的 volume

    数据卷操作
    数据卷操作
  • 什么是DockerFile

    Dockerfile 就是一个文本文件,其中包含一个个的指令,用指令来说明要执行什么操作来构建镜像,每一个指令都会形成一层Layer

指令 说明 实例
FROM 指定基础镜像 FROM centos:7
ENV 设置环境变量,可在后面指令使用 ENV key value
COPY 拷贝本地文件到镜像的指定目录 COPY ./mysql-5.7.rpm /tmp
RUN 执行Linux shell 命令,一般是安装过程的命令 RUN yum install gcc
EXPOSE 指定容器运行时监听的端口,是给镜像使用者看的 EXPOSE 8080
ENTRYPOINT 镜像中应用的启动命令,容器运行时调用 ENTRYPOINT java -jar xx.jar
  • DockerCompose 是什么

    DockerCompose 可以基于Componse 文件帮助我们快速的部署分布式应用,而无需手动一个个创建和运行容器

    Compose 文件是一个文本文件,通过指令定义集群中的每个容器如何运行

  • 下载

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    # (github)下载 更换其他版本只需要更换  v2.2.2
    sudo curl -L "https://github.com/docker/compose/releases/download/v2.2.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose

    # 高速安装
    curl -L https://get.daocloud.io/docker/compose/releases/download/v2.4.1/docker-compose-`uname -s`-`uname -m` > /usr/local/bin/docker-compose
    # 修改权限
    sudo chmod +x /usr/local/bin/docker-compose

    # 创建软连接
    sudo ln -s /usr/local/bin/docker-compose /usr/bin/docker-compose

    # 检测是否安装
    docker-compose --version

    # 对于 alpine,需要以下依赖包: py-pip,python-dev,libffi-dev,openssl-dev,gcc,libc-dev,和 make。

  • Base 自动补全命令

    1
    2
    curl -L https://raw.githubusercontent.com/docker/compose/1.29.1/contrib/comption/bash/docker-compose > /etc/bash_completion.d/docker-compose

    • 出错

      1
      echo "199.232.68.133 raw.githubusercontent.com" >> /etc/hosts
RabbitMQ
  • 详细使用

    • 同步调用存在的问题

      • 耦合度高
      • 性能下降
      • 资源浪费
      • 级联失败
    • 异步调用

      异步调用常见实现就是事件驱动

      事件代理Broker
      事件代理Broker
      • 异步通信的优点
        • 耦合度低
        • 吞吐量提升
        • 故障隔离
        • 流量削峰
      • 异步通信的缺点
        • 依赖于Broker 的可靠性,安全性、吞吐能力
        • 架构复杂了,业务没有明显的流程线,不好追踪管理
    • 什么是MQ

      MQ(MessageQueue),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进 行通信。

      RabbitMQ ActiveMQ RocketMQ Kafka
      公司/社区 Rabbit Apache 阿里 Apache
      开发语言 Erlang Java Scala & Java
      协议支持 AMQP,XMPP,SMTP,STOMP OpenWrie,STOMP,REST,XMPP,AMQP 自定义协议 自定义协议
      可用性
      单机吞吐量 一般 非常高
      消息延迟 微秒级 毫秒级 毫秒级 毫秒级以内
      消息可靠性 一般 一般
    • 使用条件

    • 生产者不需要从消费者处获得反馈。引入消息队列之前的直接调用,其接口的返回值应该为空,这才让明 明下层的动作还没做,上层却当成动作做完了继续往后走,即所谓异步成为了可能。

    • 容许短暂的不一致性。

    • 确实是用了有效果。即解耦、提速、削峰这些方面的收益,超过加入MQ,管理MQ这些成本。

    • 离线安装

      rabbitmq:3-managementhttps://pan.baidu.com/s/1YUkVIyE7URpGfu-ZR2PpDQ?pwd=1234

      1
      2
      # 后缀名为 tar 的 docker 文件请勿解压
      docker load -i xxx.tar
    • 拉取

      1
      2
      # 拉取 rabbitmq:3-management[tag]
      docker pull rabbitmq:3-management
      搜索rabbitmq 选择带有management,management 的带有web 管理页面
      搜索rabbitmq 选择带有manager
    • 安装

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      docker run \
      -e RABBITMQ_DEFAULT_USER=root \
      -e RABBITMQ_DEFAULT_PASS=root \
      --name mq \
      --hostname mq1 \
      -p 15672:15672 \
      -p 5672:5672 \
      -d \
      rabbitmq:3-management

      # 参数解释
      -p 15672:15672 Web界面
      -p 5672:5672 消息通信
      • 通过端口访问

        通过端口访问
        通过端口访问
      • 概念

        • channel:操作MQ 的工具
        • exchange:路由消息到队列中
        • queue: 缓存消息
        • virtual host: 虚拟主机,是对queue,exchange 等资源的逻辑分组
    • AMQP

      Advanced Message Queving Protocol 是用于再应用程序或之间传递业务消息的开放标准,该协议与语言和平台无关,更符合微服务中独立性的要求

    • Spring AMQP

      Spring AMQP 是基于AMQP 协议定义的一套API 规范,提供了模块来发送和接受消息,包含两部分,其中spring-amqp 是基础抽象,spring-rabbit 是底层的默认实现

      • 特征
        • 侦听器容器,用于异步处理入站消息
        • 用于发送和接受消息的RabbitTemplate
        • RabbitAdmin 用于自动声明队列,交换和绑定
    • 添加用户

      添加用户
      添加用户
    • 添加虚拟环境

      添加虚拟环境
      添加虚拟环境
    • 创建一个队列,名称为simple.queue(*)

      创建一个队列,名称为simple.queue(*) 删除一个队列
      创建一个队列 删除一个队列
    • 测试

      • 创建聚合工程(父工程: SpringBoot 项目)

        1
        2
        3
        4
        <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
      • 创建两个模块(层级代表在谁下创建)

        • publisher 发布消息

          1
          2
          3
          4
          5
          6
          7
          spring:
          rabbitmq:
          host: 192.168.247.130 # 主机名
          port: 5672 # 端口
          virtual-host: / # 虚拟主机
          username: root
          password: root
          • 创建测试(启动类依然需要创建,由于使用到Configuration)

            1
            2
            3
            4
            5
            6
            7
            8
            9
            10
            11
            12
            13
            14
            15
            16
            17
            18
            19
            20
            21
            22
            23
            24
            package com.example.test;

            import org.junit.Test;
            import org.junit.runner.RunWith;
            import org.springframework.amqp.rabbit.core.RabbitTemplate;
            import org.springframework.beans.factory.annotation.Autowired;
            import org.springframework.boot.test.context.SpringBootTest;
            import org.springframework.test.context.junit4.SpringRunner;

            @RunWith(SpringRunner.class)
            @SpringBootTest
            public class SpringAmqpTest {
            @Autowired
            private RabbitTemplate rabbitTemplate;

            @Test
            public void testSimpleQueue1() {
            // 这个simple必须已存在才可用
            String queueName = "simple.queue";
            String message = "hello,spring amqp";
            rabbitTemplate.convertAndSend(queueName, message);
            }
            }

            队列不存在时出现声明错误 显示查看
          • 查看发送的message

            查看发送的message
            查看发送的message
        • consumer

          1
          # 和 publish 配置一样 都需要知道 MQ 信息
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          // 只是一个类
          @Component
          public class SpringRabbitListerner {
          @RabbitListener(queues = "simple.queue")
          // 方法参数就是消息
          public void listenSimpleQueueMessage(String msg) {
          System.out.println("spring 消费者收到的消息是 :[" + msg + "]");
          }
          }
          // 启动主启动类即可查阅到消息信息
          // 消息一旦被消费就会从队列删除,RabbitMQ没有消息回溯功能
          查阅
          查阅
    • consumer yml

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      spring:
      rabbitmq:
      host: 192.168.247.129 # MQ 主机名
      port: 5672 # 端口
      virtual-host: / # 虚拟主机
      username: root
      password: root
      # 消费预取限制
      listener:
      simple:
      prefetch: 1 # 设置 prefetch 这个值,可以控制预取消息的上限 1:每次只能获取一条,处理完才能获取下一个消息

    • Work Queue

      Work Queue 工作队列,可以提高消息处理速度,避免队列消息堆积

      Work Queue
      Work Queue
    • 模拟Work Queue 实现一个队列绑定多个消费者

      • 基本思路

        • publisher 服务中定义测试方法,每秒产生50 条消息,发送到simple.queue

          1
          2
          3
          4
          5
          6
          7
          8
          9
          @Test
          public void testWorkQueue() throws InterruptedException {
          String queueName = "simple.queue";
          String message = "hello,spring amqp";
          for (int i = 1; i <= 50; i++) {
          rabbitTemplate.convertAndSend(queueName, message + i);
          Thread.sleep(20);
          }
          }
        • consumer 服务中定义两个消息监听者,都监听simple.queue

          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          @RabbitListener(queues = "simple.queue")
          public void listenWorkQueueMessage1(String msg) throws InterruptedException {
          System.out.println("spring 消费者1收到的消息是 :[" + msg + "]" + LocalTime.now());
          Thread.sleep(20);
          }

          @RabbitListener(queues = "simple.queue")
          public void listenWorkQueueMessage2(String msg) throws InterruptedException {
          System.err.println("spring 消费者2收到的消息是.......... :[" + msg + "]" + LocalTime.now());
          Thread.sleep(200);
          }
        • 消费者1 每秒处理50 条消息,消费者2 每秒处理10 条消息

          处理结果
          处理结果
    • 发布订阅

      发布订阅模式之间与之前案例的区别就是允许将同一消息发送给多个消费者,实现方式就是加入了exchange(交换机)

      模型
      模型
      • FanoutExchange

        • 利用SpringAMQP 演示FanoutExchange 的使用

          交换机的使用
          交换机的使用
          • 实现思路

            1. consumer 服务中,利用代码声明队列,交换机,并将两者绑定

              1
              2
              3
              4
              5
              6
              7
              8
              9
              10
              11
              12
              13
              14
              15
              16
              17
              18
              19
              20
              21
              22
              23
              24
              25
              26
              27
              28
              29
              30
              31
              32
              33
              34
              35
              36
              37
              38
              39
              40
              41
              42
              43
              // 在 consumer 服务创建一个类,添加 @Configuration 注解,并声明 FanoutExchange、Queue和绑定关系对象 Binding

              package com.example.consumer.config;

              import org.springframework.amqp.core.Binding;
              import org.springframework.amqp.core.BindingBuilder;
              import org.springframework.amqp.core.FanoutExchange;
              import org.springframework.amqp.core.Queue;
              import org.springframework.context.annotation.Bean;
              import org.springframework.context.annotation.Configuration;


              @Configuration
              public class FanoutConfig {
              // 声明 FanoutExchange 交换机
              @Bean
              public FanoutExchange fanoutExchange() {
              return new FanoutExchange("coderitl.fanout");
              }

              // 声明第一个队列
              @Bean
              public Queue fanoutQueue1() {
              return new Queue("fanout.queue1");
              }
              // 声明第二个队列
              @Bean
              public Queue fanoutQueue2() {
              return new Queue("fanout.queue2");
              }

              // 绑定队列 1 和交换机
              @Bean
              public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
              return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
              }
              // 绑定队列 2 和交换机
              @Bean
              public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
              return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
              }
              }

            2. consumer 服务中,编写两个消费者,分别监听fanout.queue1 和 fanout.queue2

              1
              2
              3
              4
              5
              6
              7
              8
              9
              // 消费者
              @RabbitListener(queues = "fanout.queue1")
              public void listenFanoutQueue1(String msg) {
              System.out.println("spring 消费者收到fanout.queue1消息是 :[" + msg + "]");
              }
              @RabbitListener(queues = "fanout.queue2")
              public void listenFanoutQueue2(String msg) {
              System.out.println("spring 消费者收到fanout.queue2消息是 :[" + msg + "]");
              }
            3. publisher 中编写测试方法,coderitl.fanout 发送消息

              1
              2
              3
              4
              5
              6
              7
              8
              9
              @Test
              public void testSendFanoutExchange(){
              // 交换机名称
              String exchangeName = "coderitl.fanout";
              // 消息
              String message = "hello every one!";
              // 发送消息
              rabbitTemplate.convertAndSend(exchangeName,"",message);
              }
          • SpringAMQP 提供了声明交换机、队列、绑定关系的API

            绑定关系
            绑定关系
          • 总结

            • 交换机的作用是什么
              1. 接受publisher 发送的消息
              2. 将消息按照规则路由到与之绑定的队列
              3. 不能缓存消息,路由失败,消息丢失
              4. FanoutExchange 会将消息路由到每个绑定的队列1
            • 声明队列、交换机、绑定关系的Bean 是什么
              • Queue
              • FanoutExchange
              • Binding
    • 模型

      • 每一个Queue 斗鱼接收到的消息根据规则路由到指定的Queue,因此称为路由模式(routes)
      • 发布者发送消息时,指定消息的RoutingKey
      • Exchange 将消息路由到BindingKey 与消息RoutingKey
      • Exchange 将消息路由到BindingKey 与消息RoutingKey 一致的队列
      模型
      模型
    • 利用SpringAMQP 演示DireExchange 的使用

      • 实现思路

        • 利用@RabbitListener 声明Exchange,Queue,RoutingKey

          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          20
          21
          22
          23
          24
          25
          26
          27
          package com.example.consumer.listen;

          ...

          @Component
          public class SpringRabbitListerner {
          // 消费者
          @RabbitListener(bindings = @QueueBinding(
          value = @Queue(name = "direct.queue1"),
          exchange = @Exchange(name = "coderitl.direct",type= ExchangeTypes.DIRECT),
          key = {"red", "blue"}
          ))
          public void listenDirectQueue1(String msg) {
          System.out.println("spring 消费者收到direct.queue1消息是 :[" + msg + "]");
          }
          // 消费者
          @RabbitListener(bindings = @QueueBinding(
          value = @Queue(name = "direct.queue2"),
          exchange = @Exchange(name = "coderitl.direct",type= ExchangeTypes.DIRECT),
          key = {"yellow", "blue"}
          ))
          public void listenDirectQueue2(String msg) {
          System.out.println("spring 消费者收到direct.queue2消息是 :[" + msg + "]");
          }

          }

        • Consumer 服务中,编写两个消费者方法,分别监听direct.queue1 direct.queue2

          1
          2
          3
          4
          5
          public void listenDirectQueue1(String msg) {
          System.out.println("spring 消费者收到direct.queue1消息是 :[" + msg + "]");
          }

          ...
        • publisher 中编写测试方法,coderitl.direct 发送消息

          1
          2
          3
          4
          5
          6
          7
          8
          9
          @Test
          public void testSendDirectExchange(){
          // 交换机名称
          String exchangeName = "coderitl.direct";
          // 消息
          String message = "hello blue!";
          // 发送消息
          rabbitTemplate.convertAndSend(exchangeName,"blue",message);
          }
          查阅,如果结果未出现,清理缓存,重新构建,再次运行
          查阅
    • 总结

      • 描述Direct 交换机与Fanout 交换机的差异
        • Fanout 交换机将消息路由给每一个与之绑定的队列
        • Direct 交换机根据RoutingKey 判断路由给那个队列
        • 如果多个队列具有相同的RoutingKey,则与Fanout 功能类似
      • 基于@RabbitListener 注解声明队列和交换机有那些常见注解
        • @Queue
        • @Exchange
    • Queue Exchange 指定BindingKey 时可以使用通配符

      • #: 代指0 个或分多个单词
      • *:代指一个单词
    • 模型

    • 案例

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      // 消费者
      @RabbitListener(bindings = @QueueBinding(
      value = @Queue(name = "topic.queue1"),
      exchange = @Exchange(name = "coderitl.topic", type = ExchangeTypes.TOPIC),
      key = "china.#"
      ))
      public void listenTopicQueue1(String msg) {
      System.out.println("spring 消费者收到topic.queue1消息是 :[" + msg + "]");
      }

      // 消费者
      @RabbitListener(bindings = @QueueBinding(
      value = @Queue(name = "topic.queue2"),
      exchange = @Exchange(name = "coderitl.topic", type = ExchangeTypes.TOPIC),
      key = "#.news"
      ))
      public void listenTopicQueue2(String msg) {
      System.out.println("spring 消费者收到topic.queue2消息是 :[" + msg + "]");
      }
      1
      2
      3
      4
      5
      6
      7
      8
      9
      @Test
      public void testSendTopicExchange() throws InterruptedException {
      // 交换机名称 coderitl.topic
      String exchangeName = "coderitl.topic";
      // 消息
      String message = "Hello coderitl topic!";
      // 发送消息
      rabbitTemplate.convertAndSend(exchangeName,"china.news",message);
      }
      测试
      测试
    • 消息转换器

      Spring 对消息对象的处理方式是由org.springframework.amqp.support.converter.MessageConverter 来处理的,而默认实现是SimpleMessageConverter,基于JDK ObjectOutputStream 完成序列化

    • 修改MessageConverter,推荐JSON 方式序列化

      • 实现步骤

        • publisher 服务引入依赖

          1
          2
          3
          4
          5
          <!-- 如果引入了 web 的依赖则不需要引入下方依赖 -->
          <dependency>
          <groupId>com.fasterxml.jackson.dataformat</groupId>
          <artifactId>jackson-dataformat-xml</artifactId>
          </dependency>
        • publisher 服务声明MessageConverter

          1
          2
          3
          4
          @Bean
          public MessageConverter messageConverter() {
          return new Jackson2JsonMessageConverter();
          }
        • 发送

          1
          2
          3
          4
          5
          6
          7
          8
          9
          @Test
          public void testSendObjectMessage() throws InterruptedException {
          // 消息
          Map<String, Object> message = new HashMap<>();
          message.put("name", "coder-itl");
          message.put("age", "18");
          // 发送消息
          rabbitTemplate.convertAndSend( "object.queue", message);
          }
        • 接受

          1
          2
          3
          4
          5
          6
          // 创建队列
          @Bean
          public Queue objectQueue() {
          return new Queue("object.queue");
          }

          1
          2
          3
          4
          5
          // 需要配置 转换类型 
          @RabbitListener(queues = "object.queue")
          public void listenObjectQueue(Map<String, Object> msg){
          System.out.println("msg = " + msg);
          }
          JSON 序列化
          JSON序列化
    • 消息可靠性

      • 消息从生产者发送到exchange,再到queue,再到消费者,有哪些导致消息丢失的可能性?
        • 发送时丢失
          • 生产者发送的消息未送达exchange
          • 消息到达exchange 后未到达queue
        • MQ 宕机,queue 将消息丢失
        • consumer 接收到消息后未消费就宕机
    • 生产者确认机制

      RabbitMQ 提供了publisher confirm 机制来避免发送到MQ 过程中丢失。消息发送到MQ 以后,会返回一个结果给发送者,表示消息是否处理成功。结果有两种请求

      • publisher-confirm,发送者确认

        • 消息成功投递到交换机,返回ack
        • 消息未投递到交换机,返回nack
      • publisher-return,发送者回执

        • 消息投递到交换机了,但是没有路由到队列。返回ACK,以及路由失败原因
        生产者确认机制

        注意:确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同消息,避免ack 冲突

    • SpringAMQP 消息可靠性的具体实现步骤

      1. 生产者这个微服务的application.yml 中添加配置

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        # 生产者端
        spring:
        rabbitmq:
        host: 192.168.2.3 # 主机名
        port: 5672 # 端口
        virtual-host: /amqp # 虚拟主机
        username: root
        password: root
        publisher-confirm-type: correlated
        publisher-returns: true
        template:
        mandatory: true
        • 配置说明
          • publisher-confirm-type: 开启publisher-confirm,这里支持两种类型
            • simple: 同步等待confirm 结果,指导超时
            • correlated【英 [ˈkɒrəleɪtɪd] 】: 异步回调,定义ConfirmCallbackMQ 返回结果时会回调这个ConfirmCallback
          • publisher-returns: 开启publisher-return 功能,同样是基于callback 机制,不过定义ReturnCallback
          • template.mandatory【英 [ˈmændətəri] 】: 定义消息路由失败时的策略。true,则调用ReturnCallback,false: 则直接丢弃消息
      2. 每个RabbitTemplate 只能配置一个ReturnCallback,因此需要在项目启动过程中配置

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        // 生产者处
        @Slf4j
        @Configuration
        public class CommonConfig implements ApplicationContextAware {
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 获取 RabbitTemplate
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // TODO:设置 ReturnCallback? ReturnsCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
        log.info("消息发送失败,应答码: {},原因:{},交换机:{},路由键:{},消息:{}", replyCode, replyText, exchange, routingKey, message.toString());
        });
        }
        }
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        42
        43
        44
        45
        46
        47
        48
        49
        50
        51
        52
        53
        54
        55
        56
        57
        58
        59
        60
        61
        62
        63
        64
        65
        66
        67
        68
        // 等同于上述
        @Slf4j
        @Configuration
        public class CommonConfig implements ApplicationContextAware {
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 获取 RabbitTemplate
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // TODO:设置 ReturnCallback(旧)? ReturnsCallback(新)
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
        @Overripackage com.example.publisher.config;
        // 等同于上述

        import lombok.extern.slf4j.Slf4j;
        import org.springframework.amqp.core.ReturnedMessage;
        import org.springframework.amqp.rabbit.core.RabbitTemplate;
        import org.springframework.beans.BeansException;
        import org.springframework.context.ApplicationContext;
        import org.springframework.context.ApplicationContextAware;
        import org.springframework.context.annotation.Configuration;

        @Slf4j
        @Configuration
        public class CommonConfig implements ApplicationContextAware {
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 获取 RabbitTemplate
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // TODO:设置 ReturnCallback(旧)? ReturnsCallback(新)
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
        @Override
        public void returnedMessage(ReturnedMessage returnedMessage) {
        /**
        * public class ReturnedMessage {
        * private final Message message;
        * private final int replyCode;
        * private final String replyText;
        * private final String exchange;
        * private final String routingKey;
        * }
        */
        // 判断是否是延迟消息
        if (returnedMessage.getMessage().getMessageProperties().getReceivedDelay() > 0) {
        // 是一个延迟消息,忽略这个错误提示
        return;
        }
        // 记录日志
        log.info("消息发送失败,应答码: {},原因:{},交换机:{},路由键:{},消息:{}", returnedMessage.getReplyCode(), returnedMessage.getReplyText(), returnedMessage.getExchange(), returnedMessage.getRoutingKey(), returnedMessage.getMessage().toString());
        // 如果有需要,可以重发消息
        }
        });
        }
        }de
        public void returnedMessage(ReturnedMessage returnedMessage) {
        /**
        * public class ReturnedMessage {
        * private final Message message;
        * private final int replyCode;
        * private final String replyText;
        * private final String exchange;
        * private final String routingKey;
        * }
        */
        log.info("消息发送失败,应答码: {},原因:{},交换机:{},路由键:{},消息:{}", returnedMessage.getReplyCode(), returnedMessage.getReplyText(), returnedMessage.getExchange(), returnedMessage.getRoutingKey(), returnedMessage.getMessage().toString());
        }
        });
        }
        }
      3. 发送消息,指定消息ID,消息ConfirmCallback

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        @Slf4j
        @SpringBootTest
        class PublisherApplicationTests {
        @Autowired
        private RabbitTemplate rabbitTemplate;

        @Test
        void name() {
        // 消息体
        String message = "hello,spring amqp!";
        // 消息 id,需要封装到 CorrelationData 中
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        // 添加 callback
        correlationData.getFuture().addCallback(result -> {
        if (result.isAck()) {
        // ack 消息成功
        log.debug("消息发送成功,ID:{}", correlationData.getId());
        } else {
        // nack 消息失败
        log.error("消息发送失败,ID:{},原因:{}", correlationData.getId(), result.getReason());
        }
        }, ex -> {
        log.error("消息发送异常,ID:{},原因:{}", correlationData.getId(), ex.getMessage());
        });
        // 发送消息
        rabbitTemplate.convertAndSend("amq.direct", "simple", message, correlationData);
        }

        }
    • 消息持久化

      MQ 默认是内存存储消息,开启持久化功能可以确保缓存在MQ 中的消息不丢失

      1. 交换机持久化

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        // 交换机的持久化: 消费者配置中声明
        @Bean
        public DirectExchange directExchange() {
        /**
        * name: 交换机名称
        * durable: 是否持久化,true: 开启持久化
        * autoDelete: false 是否自动删除,一般是 false
        */
        return new DirectExchange("direct.exchange", true, false);
        }
      2. 队列持久化

        1
        2
        3
        4
        5
        6
        // 队列的持久化: 消费者配置中声明
        @Bean
        public Queue directQueue() {
        // 使用 QueueBuilder构建队列,durable 就是持久化的
        return QueueBuilder.durable("direct.queue").build();
        }
      3. 消息持久化,SpringAMQP 中的消息默认是持久化的,可以通过MessageProperties 中的DeliverMode 来指定

        1
        2
        3
        4
        5
        6
        7
        @Test
        void contextLoads() {
        String message = "数据内容....";
        Message msg = MessageBuilder.withBody(message.getBytes(StandardCharsets.UTF_8)) // 消息体
        .setDeliveryMode(MessageDeliveryMode.PERSISTENT) // 持久化
        .build();
        }
    • 消费者消息确认

      RabbitMQ 支持消费者确认机制,即: 消费者处理消息后可以向MQ 发送ack 回执,MQ 收到ack 回执后才会删除该消息,SpringAMQP 则允许配置三种确认模式

      1. manual: 手动ack,需要在业务代码结束后,调用api 发送ack
      2. auto: 自动ack,spring 监测listener 代码是否出现异常,没有异常则返回ack,抛出异常则返回nack
      3. none: 关闭ack,MQ 假定消费者获取消息后会成功处理,因此消息投递后立即被删除
      • 实现步骤

        1. 修改消费者的配置文件,添加如下配置

          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          logging:
          pattern:
          dateformat: HH:mm:ss:SSS
          level:
          com.example: debug
          spring:
          rabbitmq:
          host: 192.168.2.3
          port: 5672
          virtual-host: /amqp
          username: root
          password: root
          listener:
          simple:
          prefetch:
          direct:
          acknowledge-mode: manual
        2. 失败重试机制

          当消费者出现异常后,消息会不断的requeue【重新入队】到队列,在重新发送给消费者,然后再次异常,再次requeue,无限循环,导致MQ 的消息处理飙升,带来不必要的压力

          我们可以利用Spring retry 机制,在消费者出现异常时利用本地重试,而不是无限制的requeue mq 队列

          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          # 消费者端
          spring:
          rabbitmq:
          host: 192.168.2.3
          port: 5672
          virtual-host: /amqp
          username: root
          password: root
          listener:
          simple:
          prefetch: 1
          retry:
          enabled: true # 开启消费失败重试
          initial-interval: 1000 # 初始的失败等待时长为 1s
          multiplier: 1 # 下次失败的等待时长倍数,下次等待时长= multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # TODO: true: 无状态,false: 有状态,如果业务中包含事务,这里改为 false
          direct:
          acknowledge-mode: auto
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          # https://www.toyaml.com/index.html 在线转换 yml to Properties
          logging.pattern.dateformat=HH:mm:ss:SSS
          logging.level.com.example=debug
          spring.rabbitmq.host=192.168.2.3
          spring.rabbitmq.port=5672
          spring.rabbitmq.virtual-host=/amqp
          spring.rabbitmq.username=root
          spring.rabbitmq.password=root
          spring.rabbitmq.listener.simple.prefetch=1
          spring.rabbitmq.listener.simple.retry.enabled=true
          spring.rabbitmq.listener.simple.retry.initial-interval=1000
          spring.rabbitmq.listener.simple.retry.multiplier=1
          spring.rabbitmq.listener.simple.retry.max-attempts=3
          spring.rabbitmq.listener.simple.retry.stateless=true
          spring.rabbitmq.listener.direct.acknowledge-mode=auto

          层级目录
    • 消费者失败消息处理策略

      在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有messageRecoverer 接口处理,它包含三种不同的实现

      • RejectAndDontRequeueRecoverer: 重试耗尽后,直接reject,丢弃消息。默认就是这种方式

      • immediateRequeueMessageRecoverer: 重试耗尽后,返回nack,消息重新入队

      • RepublishMessageRecoverer【推荐】: 重试耗尽后,将失败的消息投递到指定的交换机

        模型
        • 实现

          1. 首先,定义接受失败消息的交换机、队列以及绑定关系

            1
            2
            3
            4
            5
            6
            7
            8
            9
            10
            11
            12
            13
            14
            15
            16
            17
            18
            19
            20
            21
            22
            23
            24
            25
            26
            27
            28
            29
            30
            31
            32
            33
            34
            35
            36
            37
            38
            39
            40
            41
            42
            43
            44
            45
            46
            47
            48
            49
            50
            51
            52
            53
            54
            55
            56
            57
            58
            // 消费者处
            package com.example.consumer.config;

            import org.springframework.amqp.core.*;
            import org.springframework.context.annotation.Bean;
            import org.springframework.context.annotation.Configuration;

            @Configuration
            public class CommonConfig {
            /**************************** 成功的交换机与队列以及绑定关系 *************************/
            // 交换机的持久化
            @Bean
            public DirectExchange directExchange() {
            /**
            * name: 交换机名称
            * durable: 是否持久化,true: 开启持久化
            * autoDelete: false 是否自动删除,一般是 false
            */
            return new DirectExchange("direct.exchange", true, false);
            }

            // 队列的持久化
            @Bean
            public Queue directQueue() {
            // 使用 QueueBuilder构建队列,durable 就是持久化的
            return QueueBuilder.durable("direct.queue").build();
            }

            @Bean
            public Binding queueToExchange(Queue directQueue, DirectExchange directExchange) {
            return BindingBuilder.bind(directQueue).to(directExchange).with("simple.test");
            }


            /******************** 失败的交换机与队列以及绑定关 ******************************/
            @Bean
            public DirectExchange errorDirectExchange() {
            /**
            * name: 交换机名称
            * durable: 是否持久化,true: 开启持久化
            * autoDelete: false 是否自动删除,一般是 false
            */
            return new DirectExchange("error.direct.exchange", true, false);
            }

            @Bean
            public Queue errorDirectQueue() {
            // 使用 QueueBuilder构建队列,durable 就是持久化的
            return QueueBuilder.durable("error.direct.queue").build();
            }

            @Bean
            public Binding queueToExchangeError(Queue errorDirectQueue, DirectExchange errorDirectExchange) {
            return BindingBuilder.bind(errorDirectQueue).to(errorDirectExchange).with("error");
            }

            }

          2. 定义RepublishMessageRecoverer

            1
            2
            3
            4
            5
            6
            7
            8
            9
            @Bean
            public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
            /**
            * rabbitTemplate: 使用 rabbitTemplate 重发
            * errorExchange: 发送的交换机
            * errorRoutingKey: 以键发送
            */
            return new RepublishMessageRecoverer(rabbitTemplate,"error.direct.exchange","error");
            }
            查看输出
    • 总结

      如可确保RabbitMQ 消息的可靠性?

      • 开启生产者确认机制,确保生产者的消息能到达队列
      • 开启持久化功能,确保消息未消费前在队列中不会丢失
      • 开启消费者确认机制为auto,由spring 确认消息处理成功后完成ack
      • 开启消费者失败重试机制,并设置MeessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理
    • 初始死信交换机

      当一个队列中的消息满足下列情况之一时,可以称之为死信【dead letter】

      1. 消费者使用basic.reject basic.nack 声明消费失败,并且消息的requeue 参数设置为false
      2. 消息是一个过期消息,超时无人消费
      3. 要投递的队列消息堆积满了,最早的消息可能成为死信

      如果该队列配置了dead-letter-exchange 属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机【Dead Letter Exchange简称 DLX】

      死信模型
    • TTL

      TTL,也就是Time-To-Live,如果一个队列中的消息TTL 结束仍未消费,则会变为死信,TTL 超时分为两种情况

      • 消息所在的队列设置了存活时间
      • 消息本身设置了存活时间
      原理
      • 实现

        正常队列绑定死信交换机以及死信队列实现
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        @Slf4j
        @Component
        public class SpringRabbitListener {
        // 延迟消息 基于注解方式
        @RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "dl.queue", durable = "true"),
        exchange = @Exchange(name = "dl.direct"),
        key = "dl"
        ))
        public void listenDlQueue(String msg) {
        log.info("接收到 dl.queue的延迟消息: {}", msg);
        }

        }

        蓝色框部分实现
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        @Configuration
        public class CommonConfig {
        /************************** 死信 *********************************/
        @Bean
        public DirectExchange ttlExchange() {
        return new DirectExchange("ttl.direct");
        }

        @Bean
        public Queue ttlQueue() {
        return QueueBuilder.durable("ttl.queue") // 指定队列名称并持久化
        .ttl(10000) // 设置队列的超时时间为 10s
        .deadLetterExchange("dl.direct") // 指定死信交换机
        .deadLetterRoutingKey("dl") // 指定死信 RoutingKey
        .build();
        }
        @Bean
        public Binding simpleBinding() {
        return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
        }
        }

    • 发送消息时,给消息本身设置超时时间

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      @Test
      void ttl() {
      String message = "ttl 5s or 10s...................!";
      Message build = MessageBuilder.withBody(message.getBytes(StandardCharsets.UTF_8))
      .setDeliveryMode(MessageDeliveryMode.PERSISTENT) // 持久化
      // 消息设置过期时间 TTL=5000ms=5s【最终 5s】 < 队列设置的 TTL=10000ms=10s
      .setExpiration("5000")
      .build();
      // 发送消息
      for (int i = 0; i < 10; i++) {
      rabbitTemplate.convertAndSend("ttl.direct", "ttl", build);
      }
      log.info("消息发送完毕....................");
      }

      如果消息和队列同时设置了超时时间,则以两者中最小的为准

      超时时间
    • 延迟队列

      利用TTL 结合死信交换机,我们实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列【Delay Queue】模式

      • 重新安装rabbitmq

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        docker run \
        -e RABBITMQ_DEFAULT_USER=root \
        -e RABBITMQ_DEFAULT_PASS=root \
        -v mq-plugins:/plugins \
        --name mq \
        --hostname mq1 \
        -p 15672:15672 \
        -p 5672:5672 \
        -d \
        rabbitmq:3-management
        当前版本信息
      • 容器内部执行虚拟主机添加命令

        1
        2
        3
        4
        5
        6
        7
        8
        9
        # 添加虚拟机
        rabbitmqctl add_vhost /amqp
        # 添加用户 root 密码 root
        rabbitmqctl add_user root root

        # 分配角色
        rabbitmqctl set_user_tags root administrator
        # 设置虚拟主机 /amqp 的权限:
        rabbitmqctl set_permissions -p /amqp root ".*" ".*" ".*"
      • 延迟队列的使用场景

        • 延迟发送短信
        • 用户下单,如果用户在15 分钟内未支付,则自动取消
        • 预约工作会议,20 分钟后自动通知所有参会人员
      • 延迟队列插件安装

      • SpringAMQP 使用延迟队列插件

        DelayExchange 的本质还是官方的三种交换机,只是添加了延迟队列.因此使用时只需要声明一个交换机,交换机的类型可以是任意的,然后设定delayed 属性为true 即可

        • 基于注解声明

          1
          2
          3
          4
          5
          6
          7
          8
          9
          // 消费者处: 基于注解声明 【推荐使用注解声明延迟队列】
          @RabbitListener(bindings = @QueueBinding(
          value = @Queue(name = "delay.queue", durable = "true"),
          exchange = @Exchange(name = "delay.direct", delayed = "true"),
          key = "delay"
          ))
          public void listenDelayedQueue(String msg) {
          log.info("接收到了 delay.queue 的延迟消息: {}", msg);
          }
        • 基于Bean

          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          20
          21
          22
          // 消费者处
          @Configuration
          public class CommonConfig {
          @Bean
          public DirectExchange delayedExchange() {
          return ExchangeBuilder.directExchange("delay.direct") // 指定交换机类型和名称
          .delayed() // 设置 delay 属性为 true
          .durable(true) // 持久化
          .build();
          }

          @Bean
          public Queue delayedQueue() {
          return new Queue("delay.queue");
          }

          @Bean
          public Binding delayedBinding() {
          return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with("delay");
          }
          }

        • 然后我们向这个delay true 的交换机中发送消息,一定要给消息添加一个header:x-delay 值为延迟的时间,单位为ms

          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          20
          21
          22
          23
          24
          25
          26
          27
          28
          // 生产者发送消息
          @Test
          void ttl_plugins() {
          String message = "ttl plugins...................!";
          Message build = MessageBuilder.withBody(message.getBytes(StandardCharsets.UTF_8))
          .setHeader("x-delay", 1000)
          .setDeliveryMode(MessageDeliveryMode.PERSISTENT) // 持久化
          .build();
          // 消息 id,需要封装到 CorrelationData 中
          CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
          // 添加 callback
          correlationData.getFuture().addCallback(result -> {
          if (result.isAck()) {
          // ack 消息成功
          log.debug("消息发送成功,ID: {}", correlationData.getId());
          } else {
          // nack 消息失败
          log.error("消息发送失败,ID: {},原因: {}", correlationData.getId(), result.getReason());
          }
          }, ex -> {
          log.error("消息发送异常,ID: {},原因: {}", correlationData.getId(), ex.getMessage());
          });
          // 发送消息
          for (int i = 0; i < 10; i++) {
          rabbitTemplate.convertAndSend("delay.direct", "delay", build, correlationData);
          }
          log.debug("消息发送完毕....................");
          }
        • 修改配置

          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          20
          21
          22
          23
          24
          25
          26
          27
          28
          29
          30
          31
          32
          33
          34
          35
          36
          37
          38
          39
          40
          41
          42
          43
          package com.example.publisher.config;
          // 等同于上述

          import lombok.extern.slf4j.Slf4j;
          import org.springframework.amqp.core.ReturnedMessage;
          import org.springframework.amqp.rabbit.core.RabbitTemplate;
          import org.springframework.beans.BeansException;
          import org.springframework.context.ApplicationContext;
          import org.springframework.context.ApplicationContextAware;
          import org.springframework.context.annotation.Configuration;

          @Slf4j
          @Configuration
          public class CommonConfig implements ApplicationContextAware {
          @Override
          public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
          // 获取 RabbitTemplate
          RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
          // TODO:设置 ReturnCallback(旧)? ReturnsCallback(新)
          rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
          @Override
          public void returnedMessage(ReturnedMessage returnedMessage) {
          /**
          * public class ReturnedMessage {
          * private final Message message;
          * private final int replyCode;
          * private final String replyText;
          * private final String exchange;
          * private final String routingKey;
          * }
          */
          // TODO: 判断是否是延迟消息
          if (returnedMessage.getMessage().getMessageProperties().getReceivedDelay() > 0) {
          // 是一个延迟消息,忽略这个错误提示
          return;
          }
          // 记录日志
          log.info("消息发送失败,应答码: {},原因:{},交换机:{},路由键:{},消息:{}", returnedMessage.getReplyCode(), returnedMessage.getReplyText(), returnedMessage.getExchange(), returnedMessage.getRoutingKey(), returnedMessage.getMessage().toString());
          // 如果有需要,可以重发消息
          }
          });
          }
          }
    • 消息堆积问题

      当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。最早接收到的消息,可能就会成为死信,会被丢弃,这就是消息堆积问题

    • 解决消息堆积有三种思路

      • 增加更多的消费者,提高消费速度
      • 在消费者内开启线程池加快消息处理速度
      • 扩大队列容积,提高堆积上限
    • 惰性队列的特征如下

      • 接收到消息后直接存入磁盘而非内存
      • 消费者要消费消息时才会从磁盘中读取并加载到内存
      • 支持数百万条的消息存储
    • 启用惰性队列

      • 设置一个队列为惰性队列,只需要在声明时,指定x-queue-mode 属性为lazy 即可

        1
        2
        # 可以将一个运行中的队列设置为惰性队列
        rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
      • SpringAMQP 声明惰性队列分两种方式

        • 基于@Bean

          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          // 推荐使用
          @Configuration
          public class LazyModeConfig {
          @Bean
          public Queue lazyQueue() {
          return QueueBuilder.durable("lazy.queue")
          .lazy() // 开启 x-queue-mode 为 lazy
          .build();
          }
          }

        • 基于注解

          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          @Slf4j
          @Component
          public class SpringRabbitListener {
          @RabbitListener(
          queuesToDeclare = @Queue(
          name = "lazy.queue",
          durable = "true",
          // 声明 lazy
          arguments = @Argument(name = "x-queue-mode", value = "lazy")
          )
          )
          public void listenLazyQueue(String msg) {
          log.debug("接收到 lazy.queue的消息: {}", msg);
          }
          }

    • 优缺点

      • 优点
        • 基于磁盘存储,消息上限高
        • 没有间歇性的page-out,性能比较稳定
      • 缺点
        • 基于磁盘存储,消息时效性会降低
        • 性能受限于磁盘的IO
    • 集群分类

      • 普通集群

        是一种分布式集群,将队列分散到集群的各个节点,从而提高整个集群的并发能力

      • 镜像集群

        是一种主从集群,普通集群的基础上,添加了主从备份功能,提高集群的数据可用性

        镜像集群虽然支持主从,但主从同步并不是强一致的,某些情况下可能有数据丢失的风险。因此在RabbitMQ 3.8 版本以后,推出了新的功能: 仲裁队列 来代替镜像集群,底层采用Raft 协议确保主从的数据一致性

    • 普通集群

      或者叫标准集群(classic cluster),具备下列特征

      1. 会在集群的各个节点间共享部分数据,包括: 交换机、队列元信息。不包含队列中的消息

      2. 当访问集群某节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回

      3. 队列所在节点宕机,队列中的消息就会丢失

        传递 宕机消息丢失
        • 普通集群部署计划

          计划分配

          集群中的节点默认都是:rabbit@[hostname],因此以上三个节点的名称分别为

          • rabbit@mq1
          • rabbit@mq2
          • rabbit@mq3
        • 获取cookie

          RabbitMQ 底层依赖于Erlang,而Erlang 虚拟机就是一个面向分布式的语言,默认就支持集群模式。集群模式中的每个RabbitMQ 节点使用 cookie 来确定它们是否被允许相互通信。

          要使两个节点能够通信,它们必须具有相同的共享秘密,称为Erlang cookiecookie 只是一串最多255 个字符的字母数字字符。

          每个集群节点必须具有相同的 cookie。实例之间也需要它来相互通信。

          • 我们先在之前启动的mq 容器中获取一个cookie 值,作为集群的cookie。执行下面的命令

            1
            2
            3
            4
            # rabbitmq:3-management[当前版本]|rabbitmq:3.8-management【可更换的版本】
            docker exec -it mq cat /var/lib/rabbitmq/.erlang.cookie
            # 非固定值
            QZCXBOTUGFDXGZHUHZBX
            获取cookie
          • 接下来,停止并删除当前的mq 容器,我们重新搭建集群

            1
            docker rm -f mq
          • 准备集群配置

            • /tmp 目录新建一个配置文件 rabbitmq.conf

              1
              2
              3
              cd /tmp
              # 创建文件
              vim rabbitmq.conf
              1
              2
              3
              4
              5
              6
              7
              # 写入内容如下
              loopback_users.guest = false
              listeners.tcp.default = 5672
              cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
              cluster_formation.classic_config.nodes.1 = rabbit@mq1
              cluster_formation.classic_config.nodes.2 = rabbit@mq2
              cluster_formation.classic_config.nodes.3 = rabbit@mq3
            • 再创建一个文件,记录cookie

              1
              2
              3
              4
              5
              cd /tmp
              # 写入cookie
              echo "QZCXBOTUGFDXGZHUHZBX" > .erlang.cookie
              # 修改cookie文件的权限
              chmod 600 .erlang.cookie
            • 准备三个目录mq1、mq2、mq3

              1
              2
              3
              cd /tmp
              # 创建目录
              mkdir mq1 mq2 mq3
            • 然后拷贝rabbitmq.conf、cookie 文件到mq1、mq2、mq3

              1
              2
              3
              4
              5
              6
              7
              8
              9
              # 进入/tmp
              cd /tmp
              # 拷贝
              cp rabbitmq.conf mq1
              cp rabbitmq.conf mq2
              cp rabbitmq.conf mq3
              cp .erlang.cookie mq1
              cp .erlang.cookie mq2
              cp .erlang.cookie mq3
            • 启动集群

              • 创建网络

                1
                docker network create mq-net
              • node1

                1
                2
                3
                4
                5
                6
                7
                8
                9
                10
                11
                docker run -d --net mq-net \
                -v ${PWD}/mq1/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
                -v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
                -e RABBITMQ_DEFAULT_USER=root \
                -e RABBITMQ_DEFAULT_PASS=root \
                --name mq1 \
                --hostname mq1 \
                -p 8071:5672 \
                -p 8081:15672 \
                rabbitmq:3-management
                # rabbitmq:3.8-management
              • node2

                1
                2
                3
                4
                5
                6
                7
                8
                9
                10
                11
                docker run -d --net mq-net \
                -v ${PWD}/mq2/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
                -v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
                -e RABBITMQ_DEFAULT_USER=root \
                -e RABBITMQ_DEFAULT_PASS=root \
                --name mq2 \
                --hostname mq2 \
                -p 8072:5672 \
                -p 8082:15672 \
                rabbitmq:3-management
                # rabbitmq:3.8-management
              • node3

                1
                2
                3
                4
                5
                6
                7
                8
                9
                10
                11
                12
                docker run -d --net mq-net \
                -v ${PWD}/mq3/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
                -v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
                -e RABBITMQ_DEFAULT_USER=root \
                -e RABBITMQ_DEFAULT_PASS=root \
                --name mq3 \
                --hostname mq3 \
                -p 8073:5672 \
                -p 8083:15672 \
                rabbitmq:3-management

                # rabbitmq:3.8-management
              • 测试

                • mq1 这个节点上添加一个队列

                  其他节点同时查看到

            在刚刚的案例中,一旦创建队列的主机宕机,队列就会不可用。不具备高可用能力。如果要解决这个问题,必须使用官方提供的镜像集群方案。

    • 镜像集群:本质是主从模式,具备以下特征

      • 交换机、队列、队列中的消息会在各个mq 的镜像节点之间同步备份
      • 创建队列的节点被称为该队列的主节点,备份到的其他节点叫做该队列的镜像节点
      • 一个队列的主节点可能是从另一个队列的镜像节点
      • 所有操作都是主节点完成,然后同步给镜像节点
      互相备份【推荐使用】
      • 镜像模式的特征

        默认情况下,队列只保存在创建该队列的节点上。而镜像模式下,创建队列的节点被称为该队列的主节点,队列还会拷贝到集群中的其它节点,也叫做该队列的镜像节点。

        但是,不同队列可以在集群中的任意节点上创建,因此不同队列的主节点可以不同。甚至,一个队列的主节点可能是另一个队列的镜像节点

        用户发送给队列的一切请求,例如发送消息、消息回执默认都会在主节点完成,如果是从节点接收到请求,也会路由到主节点去完成。镜像节点仅仅起到备份数据作用

        当主节点接收到消费者的ACK时,所有镜像都会删除节点中的数据。

        总结如下:

        • 镜像队列结构是一主多从(从就是镜像)
        • 所有操作都是主节点完成,然后同步给镜像节点
        • 主宕机后,镜像节点会替代成新的主(如果在主从同步完成前,主就已经宕机,可能出现数据丢失)
        • 不具备负载均衡功能,因为所有操作都会有主节点完成(但是不同队列,其主节点可以不同,可以利用这个提高吞吐量)
      • 镜像模式的配置

        镜像模式的配置有3种模式

        ha-mode ha-params 效果
        准确模式exactly 队列的副本量count 集群中队列副本(主服务器和镜像服务器之和)的数量。count如果为1意味着单个副本:即队列主节点。count值为2表示2个副本:1个队列主和1个队列镜像。换句话说:count = 镜像数量 + 1。如果群集中的节点数少于count,则该队列将镜像到所有节点。如果有集群总数大于count+1,并且包含镜像的节点出现故障,则将在另一个节点上创建一个新的镜像。
        all【不推荐使用】 (none) 队列在群集中的所有节点之间进行镜像。队列将镜像到任何新加入的节点。镜像到所有节点将对所有群集节点施加额外的压力,包括网络I / O,磁盘I / O和磁盘空间使用情况。推荐使用exactly,设置副本数为(N / 2 +1)。
        nodes node names 指定队列创建到哪些节点,如果指定的节点全部不存在,则会出现异常。如果指定的节点在集群中存在,但是暂时不可用,会创建节点到当前客户端连接到的节点。
      • exactly 模式配置

        1
        rabbitmqctl set_policy ha-two "^two\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
        • rabbitmqctl set_policy:固定写法
        • ha-two:策略名称,自定义
        • "^two\.":匹配队列的正则表达式,符合命名规则的队列才生效,这里是任何以two. 开头的队列名称
        • '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}': 策略内容
          • "ha-mode":"exactly":策略模式,此处是exactly模式,指定副本数量
          • "ha-params":2:策略参数,这里是2,就是副本数量为2,11镜像
          • "ha-sync-mode":"automatic":同步策略,默认是manual,即新加入的镜像节点不会同步旧的消息。如果设置为automatic,则新加入的镜像节点会把主节点中所有消息都同步,会带来额外的网络开销
      • all 模式配置

        1
        rabbitmqctl set_policy ha-all "^all\." '{"ha-mode":"all"}'
        • ha-all:策略名称,自定义
        • "^all\.":匹配所有以all. 开头的队列名
        • '{"ha-mode":"all"}':策略内容
          • "ha-mode":"all":策略模式,此处是all模式,即所有节点都会称为镜像节点
      • nodes 模式配置

        1
        rabbitmqctl set_policy ha-nodes "^nodes\." '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'
        • rabbitmqctl set_policy:固定写法
        • ha-nodes:策略名称,自定义
        • "^nodes\.":匹配队列的正则表达式,符合命名规则的队列才生效,这里是任何以nodes. 开头的队列名称
        • '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}': 策略内容
          • "ha-mode":"nodes":策略模式,此处是nodes模式
          • "ha-params":["rabbit@mq1", "rabbit@mq2"]:策略参数,这里指定副本所在节点名称
    • 配置精确模式exactly

      • 是在之前的普通模式上配置的

      • 进入mq1【任意启动的】容器内部执行

        1
        docker exec -it mq1 rabbitmqctl set_policy ha-two "^two\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
        添加一个two.test.queue 队列 出现镜像节点
    • 仲裁队列

      仲裁队列是3.8 版本以后才有的新功能,用来代替镜像队列,具备以下特征

      • 与镜像队列一样,都是主从模式,支持主从数据同步

      • 使用非常简单,没有复杂的配置

      • 主从同步基于Raft 协议,强一致

      • 搭建

        • 在任意控制台添加一个队列,一定要选择队列类型为Quorum 类型。

          添加队列

          可以看到,仲裁队列的 + 2 字样。代表这个队列有2 个镜像节点。

          因为仲裁队列默认的镜像数为5。如果你的集群有7 个节点,那么镜像数肯定是5;而我们集群只有3 个节点,因此镜像数量就是3

          仲裁集群
        • SpringAMQP 创建仲裁队列

          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          @Configuration
          public class QuorunQueueConfig {
          @Bean
          public Queue quorumQueue(){
          return QueueBuilder
          .durable("quorum.queue") // 持久化
          .quorum() // 仲裁队列
          .build();
          }
          }

        • yaml 配置

          1
          2
          3
          4
          5
          6
          spring:
          rabbitmq:
          addresses: 192.168.2.3:8071,192.168.2.3:8072,192.168.2.3:8073
          virtual-host: /
          username: root
          password: root
    • 扩容

      • 启动一个新的MQ容器

        1
        2
        3
        4
        5
        6
        7
        8
        9
        docker run -d --net mq-net \
        -v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
        -e RABBITMQ_DEFAULT_USER=itcast \
        -e RABBITMQ_DEFAULT_PASS=123321 \
        --name mq4 \
        --hostname mq5 \
        -p 8074:15672 \
        -p 8084:15672 \
        rabbitmq:3-management
      • 进入容器控制台

        1
        docker exec -it mq4 bash
      • 停止mq 进程

        1
        rabbitmqctl stop_app
      • 重置RabbitMQ 中的数据

        1
        rabbitmqctl reset
      • 加入mq1

        1
        rabbitmqctl join_cluster rabbit@mq1
      • 再次启动mq 进程

        1
        rabbitmqctl start_app
    • 增加仲裁队列副本

      • 我们先查看下quorum.queue 这个队列目前的副本情况,进入mq1 容器

        1
        2
        3
        docker exec -it mq1 bash

        rabbitmq-queues quorum_status "quorum.queue"
        • 现在,我们让mq4 也加入进来

          1
          rabbitmq-queues add_member "quorum.queue" "rabbit@mq4"
Elasticsearch
  • 详细使用

    • 倒排索引

      • 文档: 每条数据就是一个文档
      • 词条: 文档按照语义分成的词语
      词条不能重复
      词条不能重复
      • 搜索过程

        搜索过程
        搜索过程
    • ES 的一些概念

      • 什么是正向索引

        基于文档id 创建索引,查询词条时必须先找到文档,而后判断是否是包含词条

      • 什么是倒排索引

        对文档内容分词,对词条创建索引,并记录所在文档的信息,查询时现根据词条查询到文档id,而后获取到文档

    • 索引

      相同类型的文档的集合

      索引的映射(mapping):索引章文档的字段约束信息,类似表的结构约束

      索引
      索引
    • 概念对比

      MYSQL Elasticsearch 说明
      Table Index 索引Index,就是文档集合,类似数据库的表
      Row Document 文档,就是一条条数据,类似数据库中的行,文档都是JSON 格式
      Column Field 字段,就是JSON 文档中的字段,类似数据库中的列
      Schema Mapping Mapping(映射) 是索引中文档的约束,例如字段类型约束,类似数据库的表结构
      SQL DSL DSL elasticsearch 提供的JSON 风格的请求语句,用来操作elasticsearch 实现CRUD
    • 架构

      MYSQL:擅长事务类型操作,可以确保数据的安全和一致性

      Elasticsearch:擅长海量数据的搜索、分析、计算

      MYSQL Elasticsearch 互补
      MYSQL与Elasticsearch互补
    • 获取文件

      https://pan.baidu.com/s/1gLwmrZFi3U5gX32eZaGaHw?pwd=1234

    • 离线安装文件说明

      离线安装文件说明ik需要放在数据卷插件目录 _data/ 中
      离线安装文件说明
    • 安装流程

      • 部署单点ES

        因为我们还需要部署kibana 容器,因此需要让es kibana 容器关联,这里先创建一个网络

        1
        docker network create es-net
      • 如果上传到虚拟机中,之后运行命令加载即可(kibana 同样操作)

        1
        docker load -i es.tar
      • 拉取镜像

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        # -v 也可以指定自己的绝对路径
        docker run -d \
        --name es \
        -e "ES_JAVA_OPTS=-Xms512m -Xmx512m" \
        -e "discovery.type=single-node" \
        -v es-data:/usr/share/elasticsearch/data \
        -v es-plugins:/usr/share/elasticsearch/plugins \
        --privileged=true \
        --network es-net \
        -p 9200:9200 \
        -p 9300:9300 \
        --restart=always \
        elasticsearch:7.12.1

        # 9200 提供 http 使用
        # 9300 内部调用
        # 高配置
        docker run -d \
        --name es \
        -e "ES_JAVA_OPTS=-Xms1g -Xmx1g" \
        -e "discovery.type=single-node" \
        -v es-data:/usr/share/elasticsearch/data \
        -v es-plugins:/usr/share/elasticsearch/plugins \
        --privileged=true \
        --network es-net \
        -p 9200:9200 \
        -p 9300:9300 \
        --restart=always \
        elasticsearch:7.12.1

        • 查看数据卷位置

          docker volume inspect es-plugins
          查看数据卷位置
        • 访问测试

          访问测试,出现如下json 则为正确安装
          访问测试
    • kibana 安装

      • 安装启动

        1
        2
        3
        4
        5
        6
        7
        8
        docker run -d \
        --name kibana \
        -e ELASTICSEARCH_HOSTS=http://es:9200 \
        --network=es-net \
        --privileged=true \
        -p 5601:5601 \
        --restart=always \
        kibana:7.12.1
    • ik 离线安装

      • 上传文件到es-plugins数据卷的_data 目录下

        下载: https://ghproxy.com/https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.12.1/elasticsearch-analysis-ik-7.12.1.zip

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        # 下载
        wget https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.12.1/elasticsearch-analysis-ik-7.12.1.zip

        # 解压 zip 文件使用
        yum install -y unzip
        # 解压到指定目录 -d 指定目录
        unzip elasticsearch-analysis-ik-7.12.1.zip -d ik
        # 删除压缩包
        rm -rf elasticsearch-analysis-ik-7.12.1.zip
        # 重启 es
        docker restart es

        # 查看日志
        docker logs -f es

        下载 测试ik
      • IK 分词器包含两种模式

        • ik_smart:最少切分
        • ik_max_word:最细切分
      • IK 分词器-扩展词库

        要扩展ik 分词器的词库,只需要修改一个ik 分词器目录中的config 目录中的IKAnalyzer.cfg.xml 文件

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        <?xml version="1.0" encoding="UTF-8"?>
        <!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd">
        <properties>
        <comment>IK Analyzer 扩展配置</comment>
        <!--用户可以在这里配置自己的扩展字典 -->
        <entry key="ext_dict"></entry>
        <!--用户可以在这里配置自己的扩展停止词字典-->
        <entry key="ext_stopwords"></entry>
        <!--用户可以在这里配置远程扩展字典 -->
        <!-- <entry key="remote_ext_dict">words_location</entry> -->
        <!--用户可以在这里配置远程扩展停止词字典-->
        <!-- <entry key="remote_ext_stopwords">words_location</entry> -->
        </properties>

        <!-- 修改如下 -->

        <entry key="ext_dict">ext.dic</entry>
        <!--用户可以在这里配置自己的扩展停止词字典-->
        <entry key="ext_stopwords">stopword.dic</entry>

        <!-- 修改后需要重启 es -->

        使用
    • 总结

      • 分词器的作用是什么
        • 创建倒排索引时对文档分词
        • 用户搜索时,对输入的内容分词
    • Mapping 是对索引库中文档的约束,常见的mapping 属性包括

      • type 字段数据类型,常见的简单类型有

        • 字符串text(可分词文本),keyword(精确值,例如:ip地址)

        • 数值long,Integer,short,byte,double,float

        • 布尔:boolean

        • 日期date

        • 对象object

          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          {
          "age": 18,
          "weight": 56.55,
          "isMarried": false,
          "email": "zy@qq.com",
          "score": [1,2,3],
          "name": {
          "firstName": "coder",
          "lastName": "itl"
          }
          }
      • index:是否创建索引,默认为true

      • analyzer:使用哪种分词器

      • properties:该字段的子字段

    • 创建索引库

      ES 中通过Restful 请求操作索引库,文档,请求内容用DSL 语句来表示,创建索引库和mapping DSL 语句如下

      实例
      实例
    • 查看索引库

      1
      2
      3
      GET /索引库名
      # 示例
      GET /coderitl
    • 删除索引库

      1
      2
      3
      DELETE /索引库名
      # 示例
      DELETE /coderitl
    • 修改索引库

      禁止修改

      索引库和mapping 一旦创建无法修改,但是可以添加新的字段,

      1
      2
      3
      4
      5
      6
      7
      8
      PUT /索引库名/_mapping
      {
      "properties":{
      "新字段名":{
      "type": "integer"
      }
      }
      }
    • 语法

      1
      2
      3
      4
      5
      6
      7
      8
      9
      POST /索引库名/_doc/文档id
      {
      "字段1": "值1",
      "字段2": "值2",
      "字段3": {
      "子属性1": "值3",
      "子属性2": "值4"
      }
      }
    • 新增文档

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      # 新增文档 POST /coderitl/_doc/(固定格式)      1(id)
      POST /coderitl/_doc/1
      {
      "info":"程序员",
      "email":"lx@qq.com",
      "name":{
      "firstName":"信",
      "lastName":"李"
      }
      }
      新增文档 数组类型(非数组,是多值添加)
    • 查询文档

      1
      GET /coderitl/_doc/1
    • 删除文档

      1
      DELETE /coderitl/_doc/1
    • 修改文档

      • 全量修改: 会删除旧文档,添加新文档

        1
        2
        3
        4
        5
        6
        7
        // 修改指的是: 全量修改(所有字段)
        PUT /索引库名/_doc/文档id
        {
        "字段1": "值1",
        "字段2": "值2",
        // ...
        }
        全量修改时是所有的字段需要全部出现,不修改的只需要不改动全量: 会删除旧文档,添加新文档
      • 局部修改

        局部修改
        局部修改
    • hotel

      hotel
      • 对应mapping

        mapping 要考虑的问题: 字段名,数据类型,是否参数搜索,是否分词,如果分词,分词器是什么

      • ES 中支持两种地理坐标数据类型

        • geo_point:由纬度latitude 和经度longitude 确定的一个点32.64832,475.48432432
        • geo_shape:有多个geo_point 组成的复杂几何图形,例如一条直线
      • 字段拷贝

        字段拷贝可以使用copy_to 属性将当前字段拷贝到指定字段

        1
        2
        3
        4
        5
        6
        7
        8
        "all":{
        "type": "text",
        "analyzer":"ik_max_word"
        },
        "brand":{
        "type": "keyword",
        "copy_to":"all"
        }
      • DSL 创建索引库

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        42
        43
        44
        45
        46
        47
        48
        49
        50
        PUT /hotel
        {
        "mappings": {
        "properties": {
        "id": {
        "type": "keyword"
        },
        "name": {
        "type": "text",
        "analyzer": "ik_max_word",
        "copy_to": "all"
        },
        "address": {
        "type": "keyword",
        "index": false
        },
        "price": {
        "type": "integer"
        },
        "score": {
        "type": "integer"
        },
        "brand": {
        "type": "keyword",
        "copy_to": "all"
        },
        "city": {
        "type": "keyword"
        },
        "startName": {
        "type": "keyword"
        },
        "business": {
        "type": "keyword",
        "copy_to": "all"
        },
        "location": {
        "type": "geo_point"
        },
        "pic": {
        "type": "keyword",
        "index": false
        },
        "all": {
        "type": "text",
        "analyzer": "ik_max_word"
        }
        }
        }
        }
    • 批量查询

      1
      GET /hotel/_search
    • 什么是RestClient

      ES 官方提供了各种不同语言的客户端,用来操作ES,这些客户端的本质就是组装DSL 语句,通过http 请求发送给ES

    • RestClient 操作索引库

      1. 引入es RestHighLevelClient 依赖

        1
        2
        3
        4
        5
        <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-high-level-client</artifactId>
        <version>7.12.1</version>
        </dependency>
      2. 因为SpringBoot 默认的ES 版本是7.6.2,所以需要覆盖默认的ES 版本与安装的客户端一致

        1
        2
        3
        <properties>
        <elasticsearch.version>7.12.1</elasticsearch.version>
        </properties>
      3. 在测试类中初始化RestHighClient

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        42
        43
        44
        45
        46
        47
        48
        @SpringBootTest
        class HotelIndexTest {

        private RestHighLevelClient client;

        @Test
        void testCreateIndex() throws IOException {
        // 1.准备Request PUT /hotel
        CreateIndexRequest request = new CreateIndexRequest("hotel");
        // 2.准备请求参数
        request.source(MAPPING_TEMPLATE, XContentType.JSON);
        // 3.发送请求
        client.indices().create(request, RequestOptions.DEFAULT);
        }

        @Test
        void testExistsIndex() throws IOException {
        // 1.准备Request
        GetIndexRequest request = new GetIndexRequest("hotel");
        // 3.发送请求
        boolean isExists = client.indices().exists(request, RequestOptions.DEFAULT);

        System.out.println(isExists ? "存在" : "不存在");
        }
        @Test
        void testDeleteIndex() throws IOException {
        // 1.准备Request
        DeleteIndexRequest request = new DeleteIndexRequest("hotel");
        // 3.发送请求
        client.indices().delete(request, RequestOptions.DEFAULT);
        }


        @BeforeEach
        void setUp() {
        client = new RestHighLevelClient(RestClient.builder(
        HttpHost.create("http://192.168.150.101:9200")
        ));
        }

        @AfterEach
        void tearDown() throws IOException {
        client.close();
        }



        }
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        42
        43
        44
        45
        46
        47
        48
        49
        50
        51
        52
        53
        // MAPPING_TEMPLATE
        public class HotelIndexConstants {
        public static final String MAPPING_TEMPLATE = "{\n" +
        " \"mappings\": {\n" +
        " \"properties\": {\n" +
        " \"id\": {\n" +
        " \"type\": \"keyword\"\n" +
        " },\n" +
        " \"name\": {\n" +
        " \"type\": \"text\",\n" +
        " \"analyzer\": \"ik_max_word\",\n" +
        " \"copy_to\": \"all\"\n" +
        " },\n" +
        " \"address\": {\n" +
        " \"type\": \"keyword\",\n" +
        " \"index\": false\n" +
        " },\n" +
        " \"price\": {\n" +
        " \"type\": \"integer\"\n" +
        " },\n" +
        " \"score\": {\n" +
        " \"type\": \"integer\"\n" +
        " },\n" +
        " \"brand\": {\n" +
        " \"type\": \"keyword\",\n" +
        " \"copy_to\": \"all\"\n" +
        " },\n" +
        " \"city\": {\n" +
        " \"type\": \"keyword\"\n" +
        " },\n" +
        " \"starName\": {\n" +
        " \"type\": \"keyword\"\n" +
        " },\n" +
        " \"business\": {\n" +
        " \"type\": \"keyword\",\n" +
        " \"copy_to\": \"all\"\n" +
        " },\n" +
        " \"pic\": {\n" +
        " \"type\": \"keyword\",\n" +
        " \"index\": false\n" +
        " },\n" +
        " \"location\": {\n" +
        " \"type\": \"geo_point\"\n" +
        " },\n" +
        " \"all\": {\n" +
        " \"type\": \"text\",\n" +
        " \"analyzer\": \"ik_max_word\"\n" +
        " }\n" +
        " }\n" +
        " }\n" +
        "}";
        }

      4. 操作索引库

        • 创建索引库

          创建索引库
          分析
        • 删除索引库

          1
          2
          3
          4
          5
          6
          // 删除索引库
          @Test
          public void deleteIndex() throws IOException {
          DeleteIndexRequest request = new DeleteIndexRequest("hotel");
          client.indices().delete(request, RequestOptions.DEFAULT);
          }
        • 判断索引库是否存在

          1
          2
          3
          4
          5
          6
          7
          8
          // 判断索引库是否存在
          @Test
          public void isExistsIndex() throws IOException {
          GetIndexRequest request = new GetIndexRequest("hotel");
          boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
          System.out.println("exists = " + exists);
          }

    • 新增文档

      新增文档
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      package com.coderitl.es.hotel;

      @SpringBootTest
      public class TestDocument {
      private RestHighLevelClient client;
      @Autowired
      private HotelService hotelService;

      @Test
      public void testCreateDocument() throws IOException {
      // 根据 id 查询酒店数据
      Hotel hotel = hotelService.getById(61083L);
      // 转换为文档类型
      HotelDoc hotelDoc = new HotelDoc(hotel);

      // 1. request
      IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());
      // 2. 准备 json(JackSon 序列化)
      request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
      // 3. 发送请求
      client.index(request, RequestOptions.DEFAULT);
      }


      @BeforeEach
      public void setUp() {
      this.client = new RestHighLevelClient(RestClient.builder(
      // 集群使用逗号分隔 => HttpHost.create
      HttpHost.create("http://192.168.247.129:9200")
      ));
      }

      @AfterEach
      public void close() throws IOException {
      this.client.close();
      }

      }

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      // 根据文档创建的实体类
      @Data
      @NoArgsConstructor
      public class HotelDoc {
      private Long id;
      private String name;
      private String address;
      private Integer price;
      private Integer score;
      private String brand;
      private String city;
      private String starName;
      private String business;
      private String location;
      private String pic;


      public HotelDoc(Hotel hotel) {
      this.id = hotel.getId();
      this.name = hotel.getName();
      this.address = hotel.getAddress();
      this.price = hotel.getPrice();
      this.score = hotel.getScore();
      this.brand = hotel.getBrand();
      this.city = hotel.getCity();
      this.starName = hotel.getStarName();
      this.business = hotel.getBusiness();
      // 转换为文档类型
      this.location = hotel.getLatitude() + ", " + hotel.getLongitude();
      this.pic = hotel.getPic();
      }
      }

    • 查询文档

      查询文档
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      // 查询文档库
      @Test
      public void testSelectDocument() throws IOException {
      // 数据库对象
      Hotel hotel = hotelService.getById(ID_INFO);
      // 将数据库对象转换为 hotelDoc
      HotelDoc hotelDoc = new HotelDoc(hotel);
      // 1. 创建 request 对象
      GetRequest request = new GetRequest("hotel", ID_INFO.toString());
      // 2. 发送请求 得到结果
      GetResponse response = client.get(request, RequestOptions.DEFAULT);
      // 3. 解析结果
      String json = response.getSourceAsString();
      // JSON 反序列化
      HotelDoc jsonHotelDoc = JSON.parseObject(json, HotelDoc.class);
      System.out.println(jsonHotelDoc);
      }

    • 更新文档

      更新文档
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      // 更新文档库: 局部更新
      @Test
      public void testUpdateLocalDocument() throws IOException {
      // 1. 准备 request 对象
      UpdateRequest request = new UpdateRequest("hotel", ID_INFO.toString());
      // 2. 准备请求参数(更新的字段)
      request.doc(
      "price", "100000", // 第一个键值对
      "starName", "六钻" // 第二个键值对 以逗号分割键值对
      );
      // 3. 发送请求
      client.update(request, RequestOptions.DEFAULT);
      }

    • 删除文档库

      1
      2
      3
      4
      5
      @Test
      public void testDeleteDocument() throws IOException {
      DeleteRequest request = new DeleteRequest("hotel", ID_INFO.toString());
      client.delete(request, RequestOptions.DEFAULT);
      }
    • 利用JavaRestClient 批量导入酒店数据到ES

      • 需求: 批量查询酒店数据,然后导入到索引库

      • 思路

        1. 利用mybatis-plus 查询酒店数据

        2. 将查询到的数据转换为文档类型数据

        3. 利用JavaRestClient 中的Bulk 批处理,实现批量新增文档

          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          20
          // 批量加入索引库
          @Test
          public void testBulkDocument() throws IOException {
          // 批量查询
          List<Hotel> hotelList = hotelService.list();
          // 创建 request
          BulkRequest request = new BulkRequest();
          // 准备参数,添加多个新增的 Request
          for (Hotel hotel : hotelList) {
          // 转换为 hotelDoc
          HotelDoc hotelDoc = new HotelDoc(hotel);
          // 创建新增文档的 Request对象
          request.add(new IndexRequest("hotel")
          .id(hotelDoc.getId().toString())
          .source(JSON.toJSONString(hotelDoc), XContentType.JSON)
          );
          }
          client.bulk(request, RequestOptions.DEFAULT);
          }

    • DSL Query 分类

      • 查询所有: 查询出所有数据,一般测试用,Eg: match_all

      • 全文检索(full text)查询: 利用分词器对用户输入内容分词,然后去倒排索引库中匹配

        • match_query

        • multi_match_query

          match_all
          1
          2
          3
          4
          5
          6
          7
          # 查询所有
          GET /hotel/_search
          {
          "query": {
          "match_all": {}
          }
          }
          • 全文检索

            • match 查询: 全文检索查询的一种,会对用户输入内容分词,然后去倒排索引库检索

              1
              2
              3
              4
              5
              6
              7
              8
              9
              10
              11
              12
              13
              14
              15
              16
              17
              18
              19
              20
              21
              22
              # 语法
              GET /hotel/_search
              {
              "query": {
              "match": {
              # 查询字段类型为 text 类型的
              "FIELD": "TEXT"
              }
              }
              }

              # 示例

              GET /hotel/_search
              {
              "query": {
              "match": {
              "all": "如家"
              }
              }
              }

            • mullti_match:match 查询类似,只不过允许同时查询多个字段

              1
              2
              3
              4
              5
              6
              7
              8
              9
              GET /hotel/_search
              {
              "query": {
              "multi_match": {
              "query": "外滩",
              "fields": ["brand","name"]
              }
              }
              }
      • 精确查询: 根据精确词条值查询数据,一般是查找keyword、数值、日期、boolean 等类型字段, 不会 对所有条件分词

        • ids: 根据id 精确匹配

        • range:根据值的范围查询

          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          GET /hotel/_search
          {
          "query": {
          "range": {
          "price": {
          "gte": 2000,
          "lte": 10000
          }
          }
          }
          }
        • term 根据词条精确值查询

          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          GET /hotel/_search
          {
          "query": {
          "term": {
          "city": {
          "value": "上海" # 精确内容
          }

          }
          }
          }
      • 地理(geo)查询:根据经纬度查询

        • geo_distance:查询到指定中心点小于某个距离值的所有文档(附近功能)

          1
          2
          3
          4
          5
          6
          7
          8
          9
          GET /hotel/_search
          {
          "query": {
          "geo_distance": {
          "distance":"2km",
          "location":"31.21,121.5"
          }
          }
          }
          geo_distance
        • geo_bounding_box:查询geo_point 值落在某个矩形范围的所有文档

          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          GET /hotel/_search
          {
          "query": {
          "geo_bounding_box": {
          "location": {
          "top_left": {
          "lat":"31.1",
          "lon":"121.5"
          },
          "bottom_right": {
          "lat":"30.9",
          "lon":"121.7"
          }
          }

          }
          }
          }

          矩形内的点就是结果
      • 复合(compound)查询:复合查询可以将上述查询条件组合起来,合并查询条件

        • bool

          • must:必须匹配每个子查询,类似
          • should:选择性匹配子查询,类似
          • must_not:必须不匹配,不参与算分,类似
          • filter: 必须匹配 ,不参与算分
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          20
          21
          22
          23
          24
          25
          26
          27
          28
          29
          30
          31
          32
          33
          34
          35
          36
          37
          38
          39
          40
          41
          42
          43
          44
          # bool 基本使用
          GET /hotel/_search
          {
          "query": {
          "bool": {
          "must": [
          {
          "term": {
          "city": {
          "value": "上海"
          }
          }
          }
          ],
          "should": [
          {
          "term": {
          "brand": {
          "value": "华美达"
          }
          }
          }
          ],
          "must_not": [
          {
          "range": {
          "price": {
          "lte": 500
          }
          }
          }
          ],
          "filter": [
          {
          "range": {
          "score": {
          "gte": 45
          }
          }
          }
          ]
          }
          }
          }
        • function_score:算分函数查询,可以控制文档相关性算分,控制文档排名,例如百度竞价

      • 相关性打分算法

        • TF-IDF:elasticsearch5.0 之前,会随着词频增加而越来越大

        • BM25:elasticsearch5.0 之后,会随着词频增加而增大,但增长曲线会趋于水平

          打分算法 BM25 平滑(默认使用)
          打分算法 BM25
          影响打分算法
        • 案例

          如家这个品牌的酒店排名靠前一些

          • function_score 需要的三要素

            1. 那些文档需要算分加权=> 品牌为如家的酒店

            2. 算分函数是什么 => weight就可以

            3. 加权模式是什么=>求和

              1
              2
              3
              4
              5
              6
              7
              8
              9
              10
              11
              12
              13
              14
              15
              16
              17
              18
              19
              20
              21
              22
              23
              GET /hotel/_search
              {
              "query": {
              "function_score": {
              "query": {
              "match": {
              "all": "外滩"
              }
              },
              "functions": [
              {
              "filter": {
              "term": {
              "brand": "如家"
              }
              },
              "weight": 10
              }
              ],
              "boost_mode": "sum"
              }
              }
              }
    • 需求:搜索名字包含如家,价格不高于400,在坐标31.21,121,5 周围10km 范围内的酒店

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      GET /hotel/_search
      {
      "query": {
      "bool": {
      "must": [
      {
      "match": {
      "name": "如家"
      }
      }
      ],
      "must_not": [
      {
      "range": {
      "price": {
      "gte": 400
      }
      }
      }
      ],
      "filter": [
      {
      "geo_distance": {
      "distance": "10km",
      "location": {
      "lat": 31.21,
      "lon": 121.5
      }
      }
      }
      ]
      }
      }
      }

    elasticsearch 支持对搜索结果排序,默认是根据相关度算分_score 来排序,可以排序的字段类型有keyword、数值、地理坐标、日期类型等

    • 语法

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      GET /hotel/_search
      {
      "query": {
      "match_all": {}
      },
      "sort": [
      {
      "FIELD": {
      "order": "desc" // 排序字段和排序方式: ASC,DESC
      }
      }
      ]
      }

      // 地理坐标
      GET /hotel/_search
      {
      "query": {
      "match_all": {}
      },
      "sort": [
      {
      "_geo_distance": {
      "FIELD": "纬度,经度",
      "order": "asc",
      "unit": "km"
      }
      }
      ]
      }
    • 案例

      对酒店数据按照用户评价降序排序,评价相同的按照价格升序排序

      评价是score 字段,价格是price 字段,按照顺序添加两个排序规则

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      GET /hotel/_search
      {
      "query": {
      "match_all": {}
      },
      "sort": [
      {
      "score": {
      "order": "desc"
      },
      "price": "asc" // 简写形式
      }
      ]
      }
    • 案例

      实现对酒店数据按照到你的位置坐标的距离升序排序

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      GET /hotel/_search
      {
      "query": {
      "match_all": {}
      },
      "sort": [
      {
      "_geo_distance": {
      "location": "21,13", // 简写形式
      "order": "asc",
      "unit": "km"
      }
      }
      ]
      }
    • 结果分页

      elasticsearch 默认情况下只返回top10 的数据,而如果要查询更多的数据就需要修改分页参数了

      elasticsearch 中通过修改from、size 参数来控制要返回的分页结果

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      GET /hotel/_search
      {
      "query": {
      "match_all": {}
      },
      "from": 100, // 分页开始的位置,默认为 0
      "size": 1, // 期望获取的文档总数
      "sort": [
      {
      "price": {
      "order": "asc"
      }
      }
      ]
      }
      (深度分页)这种分页方式不利于集群,(因为获取了 from:100,zise:1 =>总共获取了101条数据,截取了 1 条数据 )
    • 深度分页带来的问题

      1. 首先在每个数据分片上都排序并查询前1000 条文档

      2. 然后将所有节点的结果聚合,在内存中重新排序选出前1000 条文档

      3. 最后从这1000 条中,选取从990 开始的10 条文档

        如果搜索页过深,或者结果集from+size 越大,对内存和CPU 的消耗也越高,因此ES 设定结果集查询的上限是10000

    • 深度分页解决方案

      • search after:分页时需要排序,原理是从上一次的排序值开始,查询下一页数据官方推荐
      • scroll:原理将排序数据形成的快照,保存在内存官方已经不推荐
    • 总结

      • from+size
        • 优点: 支持随机翻页
        • 缺点: 深度分页问题,默认查询上限是10000
        • 场景: 百度、京东、谷歌、淘宝这样的随机翻页搜索
      • after search
        • 优点: 没有查询上限(单次查询的size 不超过10000)
        • 缺点: 只能向后逐页查询,不支持随机翻页
        • 场景: 没有随即翻页需求的搜索,例如手机向下滚动翻页
      • scroll
        • 优点: 没有查询上限(单次查询的size 不超过10000)
        • 缺点: 会有额外内存消耗,并且搜索结果是非实时的
        • 场景: 海量数据的获取和转义,ES7.1 开始不推荐使用,建议使用after search 方案
    • 结果高亮

      高亮:就是在搜索结果中把搜索关键字突出显示

      • 原理

        • 将搜索结果中的关键字用标签标记出来
        • 在页面中给标签添加css 样式
      • 语法

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        GET /hotel/_search
        {
        "query": {
        "match": {
        "FIELD": "TEXT"
        }
        },
        "highlight": {
        "fields": { // 指定要高亮的字段
        "FIELD"{
        "pre_tags":"<em>", // 用来标记高亮字段的前置标签
        "post_tags":"</em>", // 用来标记高亮字段的后置标签,
        "require_field_match": "false" // 默认情况下,ES的搜索字段必须与高亮字段一致,false为可以不一致
        }
        }
        }
        }
      • 搜索结果处理整体语法

        搜索结果处理整体语法
    • match_all

      match_all
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      @Test
      void testMatchAll() throws IOException {
      // 1. 准备 request 对象
      SearchRequest request = new SearchRequest("hotel");
      // 准备 DSL
      request.source().query(QueryBuilders.matchAllQuery());
      // 结果对象
      SearchResponse response = client.search(request, RequestOptions.DEFAULT);
      handleResponse(response);
      }

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      // 结果解析抽取
      public static void handleResponse(SearchResponse response) {
      // 4. 解析响应
      SearchHits searchHits = response.getHits();
      // 4.1 获取总条数
      long total = searchHits.getTotalHits().value;
      System.out.println("共搜索到: " + total);
      // 4.2 文档数组
      SearchHit[] hits = searchHits.getHits();
      // 4.3 遍历
      for (SearchHit hit : hits) {
      // 获取文档 source
      String json = hit.getSourceAsString();
      // 反序列化
      HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);
      System.out.println("hotelDoc = " + hotelDoc);
      }
      }
      • 解析response

        解析response 一 一获取对应的值
    • match

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      @Test
      void testMatch() throws IOException {
      // 1. 准备 request 对象
      SearchRequest request = new SearchRequest("hotel");
      // 准备 DSL
      request.source().query(QueryBuilders.matchQuery("all","如家"));
      // 结果对象
      SearchResponse response = client.search(request, RequestOptions.DEFAULT);

      handleResponse(response);
      }
    • 精确查询

      • term

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        @Test
        void testTerm() throws IOException {
        // 1. 准备 request 对象
        SearchRequest request = new SearchRequest("hotel");
        // 准备 DSL
        request.source().query(QueryBuilders.termQuery("city", "上海"));
        // 结果对象
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        handleResponse(response);
        }
      • range

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        @Test
        void testRange() throws IOException {
        // 1. 准备 request 对象
        SearchRequest request = new SearchRequest("hotel");
        // 准备 DSL
        request.source().query(QueryBuilders.rangeQuery("price").gte(100).lte(150));
        // 结果对象
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        handleResponse(response);
        }
    • 复合查询

      复合查询
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      @Test
      void testBool() throws IOException {
      // 1. 准备 request 对象
      SearchRequest request = new SearchRequest("hotel");
      // 准备 DSL
      // 准备 BoolQuery
      BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
      // 添加 term
      boolQuery.must(QueryBuilders.termQuery("city","上海"));
      request.source().query(boolQuery);
      // 结果对象
      SearchResponse response = client.search(request, RequestOptions.DEFAULT);
      handleResponse(response);
      }

    • 排序和分页

      搜索结果的排序和分页是与query 同级的参数
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      @Test
      void testPageAndSort() throws IOException {
      int page = 1;
      int size = 5;
      // 1. 准备 request 对象
      SearchRequest request = new SearchRequest("hotel");
      // 2. 准备 DSL
      // 2.1 准备 query
      request.source().query(QueryBuilders.matchAllQuery())
      // 可以链式调用 也可以通过 request.source().sort("price", SortOrder.ASC); 等获取
      .sort("price", SortOrder.ASC)
      .from((page - 1) * size)
      .size(5);
      // 结果对象
      SearchResponse response = client.search(request, RequestOptions.DEFAULT);
      handleResponse(response);
      }
    • 高亮显示

      高亮显示
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      @Test
      void testHighLight() throws IOException {
      // 1. 准备 request 对象
      SearchRequest request = new SearchRequest("hotel");
      // 2. 准备 DSL
      // 2.1 准备 query
      request.source().query(QueryBuilders.matchQuery("all", "如家"));
      // 高亮
      request.source().highlighter(new HighlightBuilder().field("name").requireFieldMatch(false));
      // 结果对象
      SearchResponse response = client.search(request, RequestOptions.DEFAULT);
      handleResponse(response);
      }
      高亮显示处理高亮: highlight 部分的内容
      高亮显示处理
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      // 完整高亮处理
      @Test
      void testHighLight() throws IOException {
      // 1. 准备 request 对象
      SearchRequest request = new SearchRequest("hotel");
      // 2. 准备 DSL
      // 2.1 准备 query
      request.source().query(QueryBuilders.matchQuery("all", "如家"));
      // 高亮
      request.source().highlighter(new HighlightBuilder().field("name").requireFieldMatch(false));
      // 结果对象
      SearchResponse response = client.search(request, RequestOptions.DEFAULT);
      handleResponse(response);
      }
      // 结果解析抽取
      public static void handleResponse(SearchResponse response) {
      // 4. 解析响应
      SearchHits searchHits = response.getHits();
      // 4.1 获取总条数
      long total = searchHits.getTotalHits().value;
      System.out.println("共搜索到: " + total);
      // 4.2 文档数组
      SearchHit[] hits = searchHits.getHits();
      // 4.3 遍历
      for (SearchHit hit : hits) {
      // 获取文档 source
      String json = hit.getSourceAsString();
      // 反序列化
      HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);
      // 获取高亮结果
      Map<String, HighlightField> highlightFields = hit.getHighlightFields();
      if (!CollectionUtils.isEmpty(highlightFields)) {
      // 获取高亮值
      HighlightField highlightField = highlightFields.get("name");
      if (highlightField != null) {
      String name = highlightField.getFragments()[0].string();
      // 覆盖高亮结果
      hotelDoc.setName(name);
      }
      }
      System.out.println("hotelDoc = " + hotelDoc);
      }
      }
      实现高亮
    • 步骤

      1. 定义实体类,接受前端请求

        将搜索、分页、排序方式等封装到实体类
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        @Data
        public class RequestParams {
        // 搜索框的关键字
        private String key;
        // 当前页
        private Integer page;
        // 偏移量
        private Integer size;
        // 排序方式
        private String sortBy;
        }

      2. 定义controller 接口,接受页面请求,调用search 方法

        • 请求方式POST

        • 请求路径/hotel/list

        • 请求参数: 对象,类型为RequestParams

        • 返回值: PageResult,包含两个属性

          1. 总条数Long total

          2. 酒店数据List<HotelDoc> hotels

            1
            2
            3
            4
            5
            6
            7
            8
            9
            10
            11
            12
            13
            @Data
            public class PageResult {
            Long total;
            List<HotelDoc> hotels;

            public PageResult() {
            }

            public PageResult(Long total, List<HotelDoc> hotels) {
            this.total = total;
            this.hotels = hotels;
            }
            }
      3. 定义接口中的search方法,利用match 查询实现根据关键字搜索酒店信息

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        42
        43
        44
        45
        46
        47
        48
        49
        50
        51
        52
        53
        54
        55
        56
        @Service
        public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {
        @Autowired
        private RestHighLevelClient client;

        @Override
        public PageResult search(RequestParams requestParams) {
        SearchResponse response = null;
        try {
        // 1. 准备 Resuest
        SearchRequest request = new SearchRequest("hotel");
        // 2. 准备 DSL
        /****************************/
        // 2.1 准备 query
        String key = requestParams.getKey();
        if (key == null || "".equals(key)) {
        // 前端未传递 key
        request.source().query(QueryBuilders.matchAllQuery());
        } else {
        request.source().query(QueryBuilders.matchQuery("all", key));
        }
        // 2.2 分页
        int page = requestParams.getPage();
        int size = requestParams.getSize();
        request.source().from((page - 1) * size).size(size);
        /****************************/
        // 3. 发送请求,得到响应
        response = client.search(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
        throw new RuntimeException(e);
        }
        // 4. 解析响应
        return handleResponse(response);
        }

        // 4. 解析响应
        private PageResult handleResponse(SearchResponse response) {
        // 解析响应
        SearchHits searchHits = response.getHits();
        // 获取总条数
        long total = searchHits.getTotalHits().value;
        // 文档数组
        SearchHit[] hits = searchHits.getHits();
        List<HotelDoc> hotels = new ArrayList<>();
        for (SearchHit hit : hits) {
        // 获取文档 source
        String json = hit.getSourceAsString();
        // 反序列化
        HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);
        hotels.add(hotelDoc);
        }
        // 封装返回
        return new PageResult(total, hotels);
        }
        }

    • 距离排序

      距离排序
    • 广告置顶

      我们需要给置顶的酒店文档HotelDoc 加一个标记,然后利用function score 给带有标记的文档增加权重

      • 实现步骤分析

        1. HotelDoc 类添加一个isAD 字段,Boolean 类型

        2. 挑选几个酒店,给他的文档数据添加isAD 字段,值为true

          1
          2
          3
          4
          5
          6
          7
          # 更新 文档id=38665 添加字段 isAD=true
          POST /hotel/_doc/38665
          {
          "doc": {
          "isAD": true
          }
          }
        3. 修改search 方法,添加function score 功能,isAD 值为true 的酒店增加权重

        Function score 查询可以控制文档的相关性算分
    • 聚合的分类(对数据类型有限制:keyword、数值、日期、boolean)

      聚合: 可以实现对文档数据的统计、分析、运算

      • 常见聚合有三类:
        1. (*Bucket)聚合:用来对文档做分组
          • TermAggregation:按照文档字段值分组
          • Date Histogram:按照日期阶梯分组,例如一周一组,或者一月一组等等
        2. 度量(*Metric)聚合:用以计算一些值,比如: 最大值,最小值,平均值等
          • AVG
          • MAX
          • MIN
          • Stats 同时求max,min,avg,sum
        3. 管道(pipeline)聚合:其他聚合的结果为基础做聚合
    • DSL 实现Bucket 聚合

      统计所有数据中的酒店品牌有几种,此时可以根据酒店品牌的名称做聚合,类型为term

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      GET /hotel/_search
      {
      "size": 0, // 设置 size 为 0,结果中不包含文档,只包含聚合结果
      "aggs": { // 定义聚合
      "brandArr": { // 给聚合起个名称
      "terms": { // 聚合的类型,按照品牌值聚合,所以选择 term
      "field": "brand", // 参与聚合的字段
      "size": 20 // 希望获取的聚合结果数量
      }
      }
      }
      }

      默认情况下,Bucket 聚合会统计Bucket 内的文档数量,记为_count 并且按照_count 降序排序,我们可以修正结果排序方式

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      GET /hotel/_search
      {
      "size": 0, // 设置 size 为 0,结果中不包含文档,只包含聚合结果
      "aggs": { // 定义聚合
      "brandArr": { // 给聚合起个名称
      "terms": { // 聚合的类型,按照品牌值聚合,所以选择 term
      "field": "brand", // 参与聚合的字段
      "order":{
      "_count": "asc" // 按照_count升序排序
      },
      "size": 20 // 希望获取的聚合结果数量
      }
      }
      }
      }

      默认情况下,Bucket 聚合是对索引库的所有文档做聚合,我们可以限定要聚合的文档范围,只要添加query 条件即可

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      GET /hotel/_search
      {
      // 添加聚合条件
      "query": {
      "range": {
      "price": {
      "gte": 150,
      "lte": 200
      }
      }
      },
      "size": 0, // 设置 size 为 0,结果中不包含文档,只包含聚合结果
      "aggs": { // 定义聚合
      "brandArr": { // 给聚合起个名称
      "terms": { // 聚合的类型,按照品牌值聚合,所以选择 term
      "field": "brand", // 参与聚合的字段
      "order":{
      "_count": "asc" // 按照_count升序排序
      },
      "size": 20 // 希望获取的聚合结果数量
      }
      }
      }
      }
    • DSL 实现Metrices 聚合

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      GET /hotel/_search
      {
      "query": {
      "range": {
      "price": {
      "gte": 150,
      "lte": 200
      }
      }
      },
      "size": 0,
      "aggs": {
      "brandArr": {
      "terms": {
      "field": "brand",
      "order": {
      "_count": "asc"
      },
      "size": 20
      },
      "aggs": { // 是 brands 聚合的子聚合,也就是分组后对每组分别计算
      "score_stats": { // 聚合名称
      "stats": { // 聚合类型,这里是 stats 可以计算 min,max,avg 等
      "field": "score" // 聚合字段,这里是 score
      }
      }
      }
      }
      }
      }
    • JavaRestAPI 实现聚合

      对比分析
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      // 测试聚合
      @Test
      void testBuckets() throws IOException {
      // 1. 准备 request
      SearchRequest request = new SearchRequest("hotel");
      // 2. 准备 DSL
      // 2.1 设置 size 只看聚合
      request.source().size(0);
      request.source().aggregation(
      AggregationBuilders
      // 聚合名称
      .terms("brandAgg")
      // 期望获取文档数量
      .size(10)
      // 聚合字段
      .field("brand")
      );
      // 3. 发出请求
      SearchResponse response = client.search(request, RequestOptions.DEFAULT);
      // 4. 解析结果
      System.out.println("response = " + response);
      Aggregations aggregations = response.getAggregations();
      // 4.1 根据聚合名称获取聚合结果(自动生成的对象并非所要结果)
      Terms brandTerms = aggregations.get("brandAgg");
      // 4.2 获取 buckets
      List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
      // 4.3 遍历
      for (Terms.Bucket bucket : buckets) {
      // 4.4 获取 key
      String key = bucket.getKeyAsString();
      System.out.println(key);
      }
      }
      聚合结果获取分析
      聚合结果获取分析
    • 自动补全

      拼音分词器: https://github.com/medcl/elasticsearch-analysis-pinyin

      1
      2
      3
      4
      5
      6
      7
      8
      # 测试分词器
      POST /_analyze
      {
      "text": [
      "如家酒店还不错"
      ],
      "analyzer": "pinyin"
      }
      • 自定义分词器(elasticsearch 中分词器的组成包含如下三部分)

        1. character filters:tokenizer 之前对文本进行处理,例如删除字符、替换字符
        2. tokenizer:将文本按照一定的规则切割成词条,例如keyword 就是部分此,还有id_smart
        3. tokenizer filter tokenizer 输出的词条做进一步处理,例如大小写转换,同义词处理,拼音处理等
        分析
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        42
        43
        # 自定义配置(创建库) 未添加 搜索分词器
        PUT /test
        {
        "settings": {
        "analysis": {
        "analyzer": {
        "my_analyzer": {
        "tokenizer": "ik_max_word",
        "filter": "py"
        }
        },
        "filter": {
        "py": {
        "type": "pinyin",
        "keep_full_pinyin": false,
        "keep_joined_full_pinyin": true,
        "keep_original": true,
        "limit_first_letter_length": 16,
        "remove_duplicated_term": true,
        "none_chinese_pinyin_tokenize": false
        }
        }
        }
        },
        "mappings": {
        "properties": {
        "name": {
        "type": "text",
        "analyzer": "my_analyzer"
        }
        }
        }
        }

        # 使用


        POST /test/_analyze
        {
        "text": ["如家酒店真不错"],
        "analyzer": "my_analyzer"
        }

        测试使用
      • 搜索时出先问题

        同义词

        解决上述问题: 拼音分词器适合在创建倒排索引的时候使用,但不能在搜索的时候使用

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        # 创建文档时指定创建分词器和搜索分词器
        PUT /test
        {
        "settings": {
        "analysis": {
        "analyzer": {
        "my_analyzer": { // 自定义分词器名称
        "tokenizer": "ik_max_word", //
        "filter": "py"
        }
        },
        "filter": {
        // 规则: 在 github readme 文档有对应使用方法
        "py": {
        "type": "pinyin",
        "keep_full_pinyin": false,
        "keep_joined_full_pinyin": true,
        "keep_original": true,
        "limit_first_letter_length": 16,
        "remove_duplicated_term": true,
        "none_chinese_pinyin_tokenize": false
        }
        }
        }
        },
        "mappings": {
        "properties": {
        "name": {
        "type": "text",
        "analyzer": "my_analyzer", // 创建倒排索引使用的分词器
        "search_analyzer": "ik_smart" // 搜索时使用的分词器
        }
        }
        }
        }
    • 自动补全

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      # 创建索引库
      PUT coderitl
      {
      "mappings": {
      "properties": {
      "title":{
      "type": "completion"
      }
      }
      }
      }
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      # 实例数据
      POST coderitl/_doc
      {
      "title":["Sony","WH=1000XM3","9001","0090010"]
      }

      POST coderitl/_doc
      {
      "title":["SK-IT","PITERA","BBCS","DDES"]
      }


      POST coderitl/_doc
      {
      "title":["Nintendo","switch","AAFG","YYDS"]
      }
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      # 自动补全查询语法
      GET /coderitl/_search
      {
      "suggest": {
      "title_suggest": { // title_suggest 自定义名称
      "text": "so", // 查询关键字
      "completion": {
      "field": "title", // 补全查询的字段
      "skip_duplicates": true, // 跳过重复的
      "size": 10 // 获取前 10 条数据
      }
      }
      }
      }
      • 修改hotel

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        42
        43
        44
        45
        46
        47
        48
        49
        50
        51
        52
        53
        54
        55
        56
        57
        58
        59
        60
        61
        62
        63
        64
        65
        66
        67
        68
        69
        70
        71
        72
        73
        74
        75
        76
        77
        78
        79
        80
        81
        82
        PUT /hotel
        {
        "settings": {
        "analysis": {
        "analyzer": {
        "text_analyzer": {
        "tokenizer": "ik_max_word",
        "filter": "py"
        },
        "completion_analyzer": {
        "tokenizer": "keyword",
        "filter": "py"
        }
        },
        "filter": {
        "py": {
        "type": "pinyin",
        "keep_full_pinyin": false,
        "keep_joined_full_pinyin": true,
        "keep_original": true,
        "limit_first_letter_length": 16,
        "remove_duplicated_term": true,
        "none_chinese_pinyin_tokenize": false
        }
        }
        }
        },
        "mappings": {
        "properties": {
        "id": {
        "type": "keyword"
        },
        "name": {
        "type": "text",
        "analyzer": "text_analyzer",
        "search_analyzer": "ik_smart",
        "copy_to": "all"
        },
        "address": {
        "type": "keyword",
        "index": false
        },
        "price": {
        "type": "integer"
        },
        "score": {
        "type": "integer"
        },
        "brand": {
        "type": "keyword",
        "copy_to": "all"
        },
        "city": {
        "type": "keyword"
        },
        "startName": {
        "type": "keyword"
        },
        "business": {
        "type": "keyword",
        "copy_to": "all"
        },
        "location": {
        "type": "geo_point"
        },
        "pic": {
        "type": "keyword",
        "index": false
        },
        "all": {
        "type": "text",
        "analyzer": "text_analyzer",
        "search_analyzer": "ik_smart"
        },
        "suggestion": {
        "type": "completion",
        "analyzer": "completion_analyzer"
        }
        }
        }
        }

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        // 数据库对应字段 List<String> suggestion
        "suggestion": {
        "type": "completion",
        "analyzer": "completion_analyzer"
        }
        // 那些数据库字段做自动补全

        // 构造实现字段赋值
        this.suggestion = Arrays.asList(this.brand,this.business)

        对自动补全完善
    • JavaApi 实现自动补全

      JavaApi 实现自动补全
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      // 测试自动补全 API
      @Test
      public void testSuggest() throws IOException {
      // 1. 准备 request
      SearchRequest request = new SearchRequest("hotel");
      // 准备 DSL
      request
      .source()
      .suggest(
      new SuggestBuilder()
      // 自定义名称: suggestions
      .addSuggestion(
      "suggestions",
      // 参与自动补全的字段: suggestion
      SuggestBuilders.completionSuggestion("suggestion")
      // 搜索关键字
      .prefix("sd")
      // 跳过重复的
      .skipDuplicates(true)
      // 期望显示的数据量
      .size(10)));
      // 发送请求
      SearchResponse response = client.search(request, RequestOptions.DEFAULT);
      // 4. 解析结果
      Suggest suggest = response.getSuggest();
      // 4.1 根据不全查询名称 获取补全结果 Suggest.Suggestion<? extends Suggest.Suggestion.Entry<? extends
      // Suggest.Suggestion.Entry.Option>> == CompletionSuggestion
      CompletionSuggestion suggestions = suggest.getSuggestion("suggestions");
      // 4.2 获取 options
      List<CompletionSuggestion.Entry.Option> options = suggestions.getOptions();
      // 4.3 遍历
      for (CompletionSuggestion.Entry.Option option : options) {
      // 获取 text
      String text = option.getText().toString();
      System.out.println(text);
      }
      }
      结果解析
    • 数据同步

      • 数据同步问题分析

        elasticsearch 中的酒店数据来自于mysql,因此mysql 数据发生改变时,elasticsearch 也必须跟着改变,这个就是elasticsearch mysql 之间的数据同步

      • 数据同步的三种方式

        1. 同步调用

          同步调用
          • 优点: 实现简单,粗暴
          • 缺点: 业务耦合度高
        2. 异步通知(推荐)

          异步通知
          • 优点: 低耦合,实现难度一般
          • 缺点: 依赖mq 的可靠性
        3. 监听binlog

          监听binlog
        • 优点: 完全解除服务间耦合
        • 缺点: 开启binlog 增加数据库负担,实现复杂度高
    • 案例实现数据同步

      数据同步实现
    • 问题

      • 海量数据存储问题: 将索引库从逻辑上拆分为N 个分片,存储到多个节点

      • 单点故障问题: 将分片数据在不同节点备份

        实现
    • 搭建集群

      集群所需环境容量大小(运行内存4G+,否则过于卡顿,无法操作)
      集群所需环境容量大小
      1
      2
      # 创建网络
      docker network create elastic
      • es 运行需要修改一些linux 系统权限,修改/etc/sysctl.conf

        1
        2
        3
        4
        5
        6
        7
        # 编辑
        vim /etc/sysctl.conf
        # 添加如下
        vm.max_map_count=262144
        # 保存退出后使其生效
        sysctl -p

      • docker-compose.yml

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        42
        43
        44
        45
        46
        47
        48
        49
        50
        51
        52
        53
        54
        55
        56
        57
        58
        59
        60
        61
        62
        63
        64
        65
        66
        67
        68
        69
        70
        71
        72
        73
        74
        75
        76
        77
        78
        79
        80
        81
        82
        83
        84
        85
        86
        87
        88
        89
        90
        91
        92
        93
        94
        95
        96
        97
        98
        99
        100
        101
        102
        103
        104
        105
        106
        107
        108
        109
        110
        111
        112
        113
        114
        115
        116
        117
        118
        119
        120
        121
        version: '3'
        services:
        es01:
        image: docker.elastic.co/elasticsearch/elasticsearch:7.13.2
        container_name: es01
        environment:
        - node.name=es01
        - cluster.name=es-docker-cluster
        - discovery.seed_hosts=es02,es03
        - cluster.initial_master_nodes=es01,es02,es03
        - bootstrap.memory_lock=true
        - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
        - TZ="Asia/Shanghai"
        - node.master=true
        - node.data=true
        - http.cors.enabled=true
        - http.cors.allow-origin=*
        ulimits:
        memlock:
        soft: -1
        hard: -1
        volumes:
        - /root/es/data/data01:/usr/share/elasticsearch/data # 数据文件
        - /root/es/logs/logs01:/usr/share/elasticsearch/logs # 日志文件
        ports:
        - 9200:9200
        networks:
        - elastic
        es02:
        image: docker.elastic.co/elasticsearch/elasticsearch:7.13.2
        container_name: es02
        environment:
        - node.name=es02
        - cluster.name=es-docker-cluster
        - discovery.seed_hosts=es01,es03
        - cluster.initial_master_nodes=es01,es02,es03
        - bootstrap.memory_lock=true
        - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
        - TZ="Asia/Shanghai"
        - node.master=true
        - node.data=true
        - http.cors.enabled=true
        - http.cors.allow-origin=*
        ulimits:
        memlock:
        soft: -1
        hard: -1
        volumes:
        - /root/es/data/data02:/usr/share/elasticsearch/data
        - /root/es/logs/logs02:/usr/share/elasticsearch/logs
        ports:
        - 9201:9200
        - 9301:9300
        networks:
        - elastic
        es03:
        image: docker.elastic.co/elasticsearch/elasticsearch:7.13.2
        container_name: es03
        environment:
        - node.name=es03
        - cluster.name=es-docker-cluster
        - discovery.seed_hosts=es01,es02
        - cluster.initial_master_nodes=es01,es02,es03
        - bootstrap.memory_lock=true
        - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
        - TZ="Asia/Shanghai"
        - node.master=true
        - node.data=true
        - http.cors.enabled=true
        - http.cors.allow-origin=*
        ulimits:
        memlock:
        soft: -1
        hard: -1
        volumes:
        - /root/es/data/data03:/usr/share/elasticsearch/data
        - /root/es/logs/logs03:/usr/share/elasticsearch/logs
        ports:
        - 9202:9200
        - 9302:9300
        networks:
        - elastic
        kibana:
        image: docker.elastic.co/kibana/kibana:7.13.2
        container_name: kibana
        environment:
        - I18N_LOCALE=zh-CN
        ports:
        - "5601:5601"
        links:
        - es01:vm01
        depends_on:
        - es01
        - es02
        - es03
        networks:
        - elastic

        cerebro:
        image: lmenezes/cerebro:0.9.2
        container_name: cerebro
        ports:
        - "19000:9000"
        links:
        - es01:vm01
        command:
        - -Dhosts.0.host=http://vm01:9200
        networks:
        - elastic

        volumes:
        data01:
        driver: local
        data02:
        driver: local
        data03:
        driver: local

        networks:
        elastic:
        driver: bridge
      • 构建

        1
        docker-compose up -d
        构建安装集群
    • 访问5601 出现 Kibana server is not ready yes

      1
      2
      3
      # 获取 es01 的容器 ip 
      docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' c6 # c6 是 es01 的容器id
      # 复制获取的的 ip
    • 进入kibana 容器内部

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      # 进入 kibana
      docker exec -it kibana bash
      # 进入 config
      cd config
      # 修改该配置文件,根据下图实现
      vim kibana.yml

      # 修改完毕后重启
      docker-compose restart kibana

      修改如下
    • 集群状态监控

      github 地址: https://github.com/lmenezes/cerebro

      下载链接: https://github.com/lmenezes/cerebro/releases/download/v0.9.4/cerebro-0.9.4.zip (windows可使用)

    • 集群访问测试

      集群访问测试
    • 创建索引库

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      PUT /coderitl
      {
      "settings":{
      "number_of_shards": 3, // 分片数量
      "number_of_replicas":1 // 副本数量
      },
      "mappings":{
      "properties":{
      // mapping 映射定义
      }
      }
      }
      DSL 创建 cerebro 创建索引库
    • 集群节点职责划分

      节点类型 配置参数 默认值 节点职责
      master eligible node.master true 备选主节点: 主节点可以管理和记录集群状态,决定分片在那个节点,处理创建和删除索引库的请求
      data node.data true 数据节点:存储数据、搜索、聚合、CRUD
      ingest node.ingest true 数据存储之前的预处理
      coordonating 上面3 个参数都为false,则为coordonating 节点 路由请求到其他节点合并其他节点处理的结果,返回给用户
    • ES 集群的脑裂

      默认情况下,每个节点都是master eligible 节点,因此一旦master 节点宕机,其他候选节点会选举一个成为主节点,当主节点与其他节点网络故障时,可能发生脑裂问题

      脑裂产生的原因

      为了避免脑裂,需要要求候选票超过(eligible节点数量 + 1) / 2 才能当选为主,因此eligible 节点数量最好是奇数,对应配置项是discovery.zen.minmum_master_nodes es7.0 以后,已经成为默认配置,因此一般不会发生脑裂,

    • ES 集群的分布式存储

      当新增文档时,应该保存到不同的分片,保证数据均衡,主要通过hash 算法来计算文档应该存储到那个分片

      • 添加数据

        添加数据(9200 获取数据(9201)
        1
        2
        3
        4
        5
        6
        {
        "explain":true, // 获取来自那个分片信息
        "query": {
        "match_all": {}
        }
        }
        数据成功的存储在不同分片上
        1
        2
        # hash 算法
        shard = hash(_routing) % number_of_shards
        • _routing: 默认是文档的id

        • 算法与分片数量有关,因此索引库一旦创建,分片数量不能修改

        • 新增文档流程

          新增文档流程
      • 分布式查询

        • scatter phase:分散阶段,coordinating node 会把请求分发到每一个分片

        • gather phase:聚集阶段,coordinating node 汇总data node 的搜索结果,并处理为最终结果集返回给用户

          先分发,后汇总,在返回给用户
    • 问题引出

      停止一个主机
    • 故障转移

      集群的master 节点会监控集群中的节点状态,如果发现有节点宕机,会立即将宕机节点的分片数据转移到其他节点,确保数据安全,这个叫做故障转移

      分片备份自动转移
Sentinel
  • 详细使用

    • 雪崩问题

      微服务调用链路中的某个服务故障,引起整个链路中的所有微服务都不可用,这就是雪崩

      雪崩问题
      • 解决雪崩问题的常见方式有四种

        • 超时处理: 设定超时时间,请求超过一定时间没有响应就返回错误信息,不会无休止等待(只能缓解)

        • 船壁模式: 限定每个业务能使用的线程数,避免耗尽整个tomcat 的资源,因此也叫线程隔离

          船壁模式
          在这里插入图片描述
        • 熔断降级: 由熔断器统计业务执行的异常比例,如果超出阈值则会熔断该业务,拦截访问该业务的一切请求

          熔断降级
        • 流量控制: 限制业务访问的QPS,避免服务因流量的突增而故障(可以实现预防,其他三个是避免(已经发生雪崩,阻止传递)

          QPS:每秒钟处理的请求数量

    • 服务保护技术对比

      ** Sentinel Hystrix(已停止维护)
      隔离级别 信号量隔离 线程池隔离/信号量隔离
      熔断降级策略 基于慢调用比例或异常比例 基于失败比率
      实时指标实现 滑动窗口 滑动窗口(基于RxJava)
      规则配置 支持多种数据源 支持多种数据源
      扩展性 多个扩展性 插件的形式
      基于注解的支持 支持 支持
      限流 基于QPS,支持基于调用关系的限流 优先支持
      流量整形 支持慢启动,匀速排队模式 不支持
      系统自适应保护 支持 不支持
      控制台 开箱即用,可规则配置,查看秒级监控,机器发现等 不完善
      常见框架的适配 Servlet、Spring Cloud、Dubbo、gPRC Servlet、Spring Cloud、Netflix
    • order-service 中整合sentinel 并且连接sentinel 的控制台

      1. 引入sentinel 依赖

        1
        2
        3
        4
        5
        <!--   TODO: 整合 sentinel -->
        <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
        </dependency>
      2. 配置

        1
        2
        3
        4
        5
        spring:
        cloud:
        sentinel:
        transport:
        dashboard: localhost:8080
      3. 访问微服务的任意端点(接口 controller),触发sentinel 监控

        查看
    • 簇点链路

      簇点链路:就是项目内的调用链路,链路中被监控的每个接口就是一个资源,默认情况下sentinel 会监控SpringMVC 的每一个端点(接口),因此SpringMVC 的每一个端点就是调用链路中的一个资源

      簇点链路
    • 流控

      单机阈值: 其含义局势限制/order/{orderId} 这个资源的单机QPS 100(通过压力测试获取),超出的请求会被拦截报错

      • 流控模式

        • 直连: 统计当前资源的请求,触发阈值时对当前资源直接限流,也就是默认的模式

        • 关联: 统计与当前资源相关的另一个资源,触发阈值时,对当前资源限流

          使用场景: 比如用户支付时需要修改订单状态,同时用户要查询订单,查询和修改操作会争抢数据库锁,产生竞争,业务需求时有限支付和更新订单的业务,因此当修改订单业务触发阈值时,需要对订单业务限流

          配置过程
          • 配置流控规则:/order/update 资源被访问的QPS 超过5 时,/order/query 请求限流

            对谁限流加给谁 query 限流
            shishi
          • 满足如下条件使用关联模式

            • 两个有竞争关系的资源
            • 一个优先级较高,一个优先级较低
        • 链路: 统计从指定链路访问到本资源的请求时,对指定链路限流

          • 两条请求链路

            • /test1 => /common
            • /test2 => /common
          • 案例

            需求: 有查询订单和创建订单业务,两者都需要查询到商品,针对从查询订单进入到查询商品的请求统计,并设置限流

          • 步骤

            1. OrderService 中添加一个queryGoods 方法,不用实现业务

            2. OrderController 中,改造/order/query 端点,调用OrderService 中的queryGoods

            3. OrderController 中添加一个/orrder/save 的端点,调用OrderService queryGoods 方法

            4. queryGoods 设置限流规则,/order/query 进入queryGoods 的方法限制QPS 必须小于2

              配置案例
          • 配置链路

            • sentinel 默认只标记Controller 中的方法为资源,如果要标记其他方法

              1
              2
              3
              4
              @SentinelResource("goods")
              public String queryGoods() {
              return "queryGoods被调用.............";
              }
            • Sentinel 默认会将Controller 中的方法做context 整合,导致链路模式的流控失效

              1
              2
              3
              4
              spring:
              cloud:
              sentinel:
              web-context-unify: false # 关闭 context 整合
    • 流控效果

      流控效果是指请求达到流控阈值时应该采取的措施

      • 快速失败:达到阈值后,新的请求会被立即拒绝并抛出FlowException 异常,是默认的处理方式

      • warn up:预热模式,对超出阈值的请求同样是拒绝并抛出异常.但这种模式阈值会动态变化,从一个较小的值逐渐增加到最大阈值

        是应对服务冷启动的一种方案,请求阈值初始值是threshold /(除号) coldFactory,持续指定时长后,逐渐提高到threshold 值,而coldFactory 的默认值是3

        Eg: 设置QPS的 threshold 为 10,预热时间5s,那么初始阈值就是10/3,也就是3,然后再5s 后逐渐增长到10

        增长趋势
        配置 实时监控
      • 排队等待: 让所有的请求按照先后次序排队执行,两个请求的间隔不能小于指定时长

        当请求超过QPS 阈值时,快速失败和warn up 会拒绝新的请求并抛出异常,而排队等待则是让所有请求进入一个队列中,然后按照阈值允许的时间间隔内依次执行,后来的请求必须等待前面执行完成,如果请求预期的等待时间超出最大时长,则会被拒绝

        Eg: QPS=5,意味着每200ms 处理一个队列中的请求,timeout=2000 意味着预期等待超过2000ms 的请求会被拒绝并抛出异常

        过程
    • 热点参数限流

      之前的限流是统计访问某个资源的所有请求,判断是否超过QPS 阈值,而热点参数限流是分别统计参数值相同的请求,判断是否超过QPS 阈值

      热点参数限流
      • 配置位置

        热点参数配置位置
      • 重要参数解析

        参数解析
      • 高级选项参数

        高级选项参数
        在这里插入图片描述
        • 如果参数值是100,则每1s 允许的QPS 10
        • 如果参数值是101,则每1s 允许的QPS 15

        热点参数限流对默认的SpringMVC 资源无效,需要通过@SentinelResource 注解

    • 授权规则

      授权规则 可以对调用方的来源总控制,有白名单和黑名单两种方式

      • 白名单: 来源(origin)在白名单内的调用者允许访问
      • 黑名单: 来源(origin)在黑名单内的调用者不允许访问
      配置
    • 授权规则配置步骤

      配置授权规则
      • gateway 服务中,利用网关的过了不起添加名为gateway origin

        1
        2
        3
        4
        5
        spring:
        cloud:
        gateway:
        default-filters:
        - AddRequestHeader=origin,gateway # 添加名为 origin 的请求头,值为: gateway
      • request 获取一个名为origin 的请求头,作为origin 的值

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        package com.coderitl.springcloud.sentinel;

        @Component
        public class HeaderOriginParset implements RequestOriginParser {
        @Override
        public String parseOrigin(HttpServletRequest request) {
        // 1. 获取请求头
        String origin = request.getHeader("origin");
        // 2. 非空判断
        if (StringUtils.isEmpty(origin)) {
        origin = "blank";
        }
        return origin;
        }
        }
        添加配置 限制访问来源
      • 自定义异常结果

        异常 说明
        FlowException 限流异常
        ParamFlowException 热点参数限流异常
        DegradeException 降级异常
        AuthorityException 授权规则异常
        SystemBlockException 系统规则异常
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        package com.coderitl.springcloud.handler;

        @Component
        public class SentinelBlockHandler implements BlockExceptionHandler {
        @Override
        public void handle(
        HttpServletRequest httpServletRequest,
        HttpServletResponse httpServletResponse,
        BlockException e)
        throws Exception {
        String msg = "未知异常";
        // 限流状态码都是 429
        int status = 429;
        if (e instanceof FlowException) {
        msg = "请求被限流了.........";
        } else if (e instanceof DegradeException) {
        msg = "请求被降级了.........";
        } else if (e instanceof ParamFlowException) {
        msg = "热点参数限流.........";
        } else if (e instanceof AuthorityException) {
        msg = "请求没有权限!";
        // 权限 401
        status = 401;
        }
        httpServletResponse.setContentType("application/json;charset=utf-8");
        httpServletResponse.setStatus(status);
        httpServletResponse
        .getWriter()
        .println("{\"message\":\"" + msg + "\",\"status\":" + status + "}");
        }
        }

        优化显示
    • 规则持久化模式(3)

      • 规则管理模式-pull 模式

        pull 模式:控制台将配置的规则推送到Sentinel 客户端,而客户端会将配置规则保存在本地文件或数据库中,以后会定时去本地文件或数据库中查询,更新本地规则

        缺点:存在时效性
      • 规则管理模式-push 模式

        push 模式:控制台将配置规则推送到远程配置中心,例如NacosSentinel 客户端监听Nacos,获取配置变更的推送消息,完成本地配置更新|

        推荐
      • 原始方式(重启失效)

    • Sentinel 规则持久化

      1. 引入依赖

        1
        2
        3
        4
        5
        <!-- 在 order-service 中引入 sentinel 监听 nacos 的依赖 -->
        <dependency>
        <groupId>com.alibaba.csp</groupId>
        <artifactId>sentinel-datasource-nacos</artifactId>
        </dependency>
      2. 配置nacos 地址

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        # 在 order-service 中的 application.yml 文件配置 nacos 地址以及监听的配置信息
        spring:
        cloud:
        cloud:
        sentinel:
        datasource:
        flow:
        nacos:
        server-addr: localhost:8848 # nacos 地址
        dataId: orderservice-flow-rules
        groupId: SENTINEL_GROUP
        ruleType: flow # 还可以是 degrade authority param-flow
Redis
  • 详细使用

    • 持久化

      • RDB

        全称是Redis Database Backup file(Redis数据备份文件),也被叫做Redis 数据快照。简单来说就是把内存中的所有数据都记录到磁盘中,Redis 实例故障重启后,从磁盘读取快照文件,恢复数据

        快照文件称为RDB 文件,默认是保存在当前运行目录

        1
        2
        3
        4
        5
        127.0.0.1:6379>  save # 由 Redis 主进程来执行 RDB,会阻塞所有命令
        127.0.0.1:6379> bgsave # 开启子进程执行 RDB,避免主进程收到影响

        # Redis 停机时会执行一次 RDB

        触发机制
      • bgsave

        bgsave 流程
        • 基本流程
          1. fork 主进程得到一个子进程,共享内存空间
          2. 子进程读取内存数据并写入新的RDB 文件
          3. 用新RDB 文件替换旧的RDB 文件
    • AOF 持久化

      AOF 全称为Append Oble File(追加文件),Redis 处理的每一个写命令都会记录在AOF 文件,可以看作是命令日志文件

      AOF(存储了命令,执行命令恢复到原始数据)

      AOF 默认是关闭的,需要修改redis.conf 配置文件来开启AOF

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      # 禁用 RDB
      save ""
      # 是否开启 AOF 功能 默认是 no
      appendonly yes
      # AOF 文件的名称
      appendfilename "appendonly.aof"

      # AOF 的命令记录的频率也可以通过 redis.conf 文件来配置
      # 表示没执行一次写命令 立即记录到 AOF 文件
      appendfsync always
      # 写命令执行完先放入 AOF 缓冲区,然后表示每隔 1s 将缓冲区数据写到 AOF 文件,是默认方案
      appendfsync everysec
      # 写命令执行完先放入 AOF 缓冲区,由操作系统决定何时将缓冲区内容写回磁盘
      appendfsync no
      配置项 刷盘时机 优点 缺点
      always 同步刷盘 可靠性高,几乎不丢数据 性能影响大
      everysec 每秒刷盘 性能适中 最多丢失1s 数据
      no 操作系统控制 性能最好 可靠性差,可能丢失大量数据

      因为是记录命令,AOF 文件会比RDB 文件大得多,而且AOF 会记录对同一个key 的多次操作,但只有最后一次写操作才有意义,铜鼓执行bgrewriteaof 命令,可以让AOF 文件执行重写功能,用最少的命令达到相同效果

      执行重写bgrewriteaof(redis 命令行执行)

      Redis 也会在触发阈值时自动去重写AOF 文件,阈值也可以在redis.conf 中配置

      1
      2
      3
      4
      # AOF 文件比上次文件 增长超过多少百分比则触发重写
      auto-aof-rewrite-percentage 100
      # AOF 文件体积最小多大以上才可以触发重写
      auto-aof-rewrite-min-size 64mb
      • RDB AOF 对比

        RDB AOF
        持久化方式 定时对整个内存做快照 记录每一次执行的命令
        数据完整性 不完整,两次备份之间会丢失 相对完整,取决于刷盘策略
        文件大小 会有压缩,文件体积小 记录命令,文件体积很大
        宕机恢复速度 很快
        数据恢复优先级 低,因为数据完整性不如AOF 高,因为数据完整性更高
        系统资源占用 高,大量CPU 和内存消耗 低,主要是磁盘IO 资源但AOF 重写时会占用大量CPU 和内存资源
        使用场景 可以容忍数分钟的数据丢失,追求更快的启动速度 对数据安全性要求较高常见
    • 搭建集群

      一主二从
      • 配置文件

        1
        2
        3
        4
        5
        6
        7
        8
        # 进入 tmp 目录
        cd /tmp
        # 创建目录
        mkdir 7001 7002 7003

        # 拷贝配置文件(一键考培 参数 1 是数字 `一` )
        echo 7001 7002 7003 | xargs -t -n 1 cp redis-6.2.4/redis.conf

      • 修改每个实例的端口、工作目录

        修改每个文件夹内的配置文件,将端口分别修改为7001,7002,7003,将rdb 文件保存位置都修改自己所在目录

        1
        2
        3
        sed -i -e 's/6379/7001/g' -e 's/dir .\//dir \/tmp\/7001\//g' 7001/redis.conf
        sed -i -e 's/6379/7002/g' -e 's/dir .\//dir \/tmp\/7002\//g' 7002/redis.conf
        sed -i -e 's/6379/7003/g' -e 's/dir .\//dir \/tmp\/7003\//g' 7003/redis.conf
      • 修改每个实例的声明IP

        1
        2
        3
        # 虚拟机本身有多个 ip 为了避免将来混乱 我们需要在 redis.conf 文件中指定每一个实例的绑定 ip 信息
        # redis 实例的声明 IP
        replica-announce-ip
        1
        2
        3
        4
        5
        6
        7
        #  逐一修改 1a:数字 1 a:追加模式 (1a:第一行后追加)
        sed -i '1a replica-announce-ip 43.142.80.9' 7001/redis.conf
        sed -i '1a replica-announce-ip 43.142.80.9' 7002/redis.conf
        sed -i '1a replica-announce-ip 43.142.80.9' 7003/redis.conf

        # 一键修改
        printf '%s\n' 7001 7002 7003 | xargs -I{} -t sed -i '1a replica-announce-ip 43.142.80.9' {}/redis.conf
      • 启动三个窗口实例方便查看日志

        同时启动服务: tmp/ redis-server 700x/redis.conf
    • 开启主从关系

      • 有临时和永久两种模式

        • 修改配置文件(永久生效)

          1
          2
          # 在 redis.conf 中添加一行配置 replicaof | slaveof(5.0以前)命令
          slaveof <masterip> <masterport>
        • 使用redis-cli 客户端连接到redis 服务,执行slaveof 命令(重启后生效)

          1
          slaveof  <masterip> <masterport>
          1
          2
          3
          4
          5
          6
          7
          8
          # 前提条件所有主机配置如下内容(否则集群失败)
          1. 注释 bind 127.0.0.0
          2. protected-mode no

          # 连接测试 登录 7002 | 7003
          redis-cli -p 7002
          # 执行 slaveof => 7002 | 7003 成为 7001 的 slaveof(从机)
          slaveof 43.142.80.9 7001
          成功搭建集群环境(命令: info replication 查看信息
    • 数据同步原理

      数据同步原理(第一次是全量同步) 后续同步使用replid
      • master 如何判断slave 是不是第一次来同步数据?

        • Replication Id:简称replid 是数据集的标记,id 一致则说明是同一数据集,每一个master 都有唯一的reolidslave 则会继承master 节点的replid
        • offset 偏移量,随着记录在repl_baklog 中的数据增多而逐渐增大,slave 完成同步时也会记录当前同步的offset,如果slave offset 小于master offset,说明slave 数据落后于master,需要更新

        因此slave 做数据同步,必须向master 声明自己的replication Id offsetmaster 才可以判断到底需要同步那些数据

      • 增量同步(slave 重启后同步)

        增量同步

        repl_baklog 大小有上限,写满后会覆盖最早的数据,如果slave 断开时间过久,导致尚未备份的数据被覆盖,则无法基于log 做增量同步,只能再次全量同步

      • 优化Redis 主从集群

        • master 中配置repl-diskless-sync yes 启用无磁盘复制,避免全量同步是的磁盘 IO

        • Redis 单节点上的内存占用不要太大,减少RDB 导致的过多磁盘IO

        • 适当提高repl_baklog 的大小,发现slave 宕机时尽快实现故障恢复,尽可能避免全量同步

        • 限制一个master 上的slave 节点数量,如果实在是太多slave,则可以采用主-从-从链式结构,减少master 压力

          主-从-从链式
      • 总结

        • 简述全量同步和增量同步的区别
          • 全量同步: master 将完整的内存数据生成RDB,发送RDB slave,后续命令则记录在repl_baklog,逐个发送给slave
          • 增量同步: slave 提交自己的offset mastermaster 获取repl_baklog 中从offset 之后的命令给slave
        • 什么时候执行全量同步
          • slave 节点第一次连接master 节点时
          • slave 节点断开时间太久,repl_baklog 中的offset 已经被覆盖时
        • 什么时候执行增量同步
          • slave 节点断开又恢复,并且在repl_baklog 中能找到offset
    • 哨兵作用

      Redis 提供了哨兵Sentinel 机制来实现主从集群的自动故障恢复

      • 监控:Sentinel 会不断检查你的master slave 是否按预期工作

      • 自动故障恢复:如果master 故障,Sentinel 会将一个slave 提升为master,当故障实例恢复后也已新的master 为主

      • 通知:Sentinel 充当Redis 客户端的服务发现来源,当集群发生故障转移时,会将最新信息推送给Redis 的客户端

        哨兵
      • 服务状态监控

        Sentinel 基于心跳机制检测服务状态,每隔1s 向集群的每个实例发送ping 命令

        • 主观下线: 如果某sentinel 节点发现某实例未在规定时间响应,则认为该实例主观下线
        • 客观下线: 若超过指定数量(quorum)Sentinel 都认为该实例主观下线,则该实例客观下线,quorum 值最好超过sentinel 实例数量的一半
      • 选举新的master

        一旦发现master 故障,Sentinel 需要在slave 中选择一个作为新的master,选择依据如下:

        1. 首先会判断slave 节点与master 节点断开时间长短,如果超过指定值(down-after-milliseconds * 10)则会排除该slave 节点
        2. 然后判断slave 节点的slave-prioity 值,越小优先级越高,如果是0 则永不参与选举
        3. 如果slave-prority 一样,则判断slave 节点的offset 值,越大说明数据越新,优先级越高
        4. 最后是判断slave 节点的运行id 大小,越小优先级越高
      • 如何实现故障转移

        当选中了其中一个slave 为新的master

        1. Sentinel 给备选的slave 节点发送slaveof no one 命令,让该节点成为master

        2. Sentinel 给所有其他slave 发送slaveof ip poort 命令,让这些slave 成为新master 的从节点,开始新的master 上同步数据

        3. 最后,Sentinel 将故障节点标记为slave,当故障节点恢复后会自动成为新的master slave 节点

          故障转移
    • 哨兵集群搭建

      哨兵集群搭建图
      • 创建目录

        1
        2
        3
        4
        5
        6
        7
        8
        cd /tmp
        # 创建文件夹
        mkdir s1 s2 s3
        # 创建配置文件
        cd s1/

        vim sentinel.conf

        1
        2
        3
        4
        5
        6
        7
        8
        9
        # sentinel.conf 文件内容 3
        # 是当前 sentinel 实例的端口
        port 27001
        sentinel announce-ip 43.142.80.9
        # 指定主节点信息
        sentinel monitor mymaster 43.142.80.9 7001 2
        sentinel down-after-milliseconds mymaster 5000
        sentinel failover-timeout mymaster 60000
        dir "/tmp/s1"
        • 解读
          • mymaster:主节点名称,自定义
          • 43.142.80.9 7001:主节点的 ip 和端口
          • 2 选举master 时的quorum
        1
        2
        3
        4
        5
        # 方式一: 逐个拷贝
        cp s1/sentinel.conf s2
        cp s1/sentinel.conf s3
        # 方式二: 管道组合命令 一键拷贝
        echo s2 s3 | xargs -t -n 1 cp s1/sentinel.conf
        1
        2
        3
        # 修改 s2、s3 两个文件夹内的配置文件,将端口分别修改为 27002 、27003
        sed -i -e 's/27001/27002/g' -e 's/s1/s2/g' s2/sentinel.conf
        sed -i -e 's/27001/27003/g' -e 's/s1/s3/g' s3/sentinel.conf
        流程信息
      • 启动3个 redis 实例查看

        1
        2
        3
        4
        5
        6
        # 第 1 个
        redis-sentinel s1/sentinel.conf
        # 第 2 个
        redis-sentinel s2/sentinel.conf
        # 第 3 个
        redis-sentinel s3/sentinel.conf
        redis-sentinel 成功集群
    • RedisTemplate 的哨兵模式

      • 引入依赖

        1
        2
        3
        4
        <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
      • 修改配置

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        logging:
        level:
        io.lettuce.core: debug
        pattern:
        dateformat: MM-dd HH:mm:ss:SSS
        spring:
        redis:
        sentinel:
        master: mymaster # 指定 master 名称
        nodes: # 指定 redis-sentinel 集群信息
        - 43.142.80.9:27001
        - 43.142.80.9:27002
        - 43.142.80.9:27003
      • 控制器

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        @RestController
        public class HelloRedisController {
        @Autowired private StringRedisTemplate redisTemplate;

        @GetMapping("/get/{key}")
        public String hi(@PathVariable String key) {
        return redisTemplate.opsForValue().get(key);
        }

        @GetMapping("/set/{key}/{value}")
        public String hi(@PathVariable String key, @PathVariable String value) {
        redisTemplate.opsForValue().set(key, value);
        return "success add key";
        }
        }

      • 配置主从读写分离

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        package com.coderitl.springcloud.redis.conf;

        public class RedisBeanConf {
        @Bean
        public LettuceClientConfigurationBuilderCustomizer clientConfigurationBuilderCustomizer() {
        return clientConfigurationBuilder ->
        clientConfigurationBuilder.readFrom(ReadFrom.REPLICA_PREFERRED);
        }
        }

        这里的ReadFrom 是配置Redis 的读取策略,是一个枚举,包括如下选择

        • MASTER: 从主节点读取
        • MASTER_PREFFERRED: 优先从master 节点读取,master 不可用才读取replica
        • REPLICA slave(replica)节点读取
        • REPLICA_PREFERRED 优先从slave(replica)节点读取,所有的slave 都不可用才读取master
        成功获取Redis 数据
        连接日志信息
    • 结构

      结构图
    • 分片集群结构

      主从和哨兵可以解决高可用,高并发读的问题,但是依然有两个问题没有解决

      • 海量数据存储问题
      • 高并发写问题

      使用分片集群可以解决上述问题,分片集群特征:

      • 集群中有多个master,每个master 保存不同数据
      • 每个master 都可以由多个slave 节点
      • master 之间通过ping 检测彼此健康状态
      • 客户端请求可以访问集群任意节点,最终都会被转发到正确节点
    • 搭建分片集群

      • 结构图

        33
      • ip 分配

        IP PORT 角色
        43.142.80.9 7001 master
        43.142.80.9 7002 master
        43.142.80.9 7003 master
        43.142.80.9 8001 slave
        43.142.80.9 8002 slave
        43.142.80.9 8003 slave
      • 准备实例和配置

        删除之前的7001,7002,7003 的目录,重新创建7001,7002,7003,8001,8002,8003 目录

        1
        2
        3
        4
        5
        # tmp
        cd /tmp
        # 创建目录
        mkdir 7001 7002 7003 8001 8002 8003

      • /tmp 下准备一个新的redis.conf 文件

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        port 6379
        # 开启集群功能
        cluster-enabled yes
        # 集群的配置文件名称 不需要我们创建,由 redis 自己维护
        cluster-config-file /tmp/6379/nodes.conf
        # 节点心跳失败的超时时间
        cluster-node-timeout 5000
        # 持久化文件存放目录
        dir /tmp/6379
        # 绑定地址
        bind 0.0.0.0
        # 让 redis 后台运行
        daemonize yes
        # 注册的实例 ip
        replica-announce-ip 43.142.80.9
        # 保护模式
        protected-mode no
        # 数据库数量
        databases 1
        # 日志
        logfile /tmp/6379/run.log
      • 将这个redis.conf 文件拷贝到每隔目录下

        1
        2
        3
        4
        # 进入 /tmp 目录
        cd /tmp
        # 执行拷贝
        echo 7001 7002 7003 8001 8002 8003 | xargs -t -n 1 cp redis.conf
      • 修改每个目录下的redis.conf,将其中的6379 修改为与所在目录一致

        1
        2
        3
        cd /tmp
        # 修改配置文件
        printf '%s\n' 7001 7002 7003 8001 8002 8003 | xargs -I{} -t sed -i 's/6379/{}/g' {}/redis.conf
      • 启动

        1
        2
        3
        cd  /tmp
        # 一键启动所有服务
        printf '%s\n' 7001 7002 7003 8001 8002 8003 | xargs -I{} -t redis-server {}/redis.conf
      • 查看

        集群启动查看
      • 如果需要关闭所有进程,可执行命令

        1
        2
        3
        4
        ps -ef | grep redis | awk '{print $2}' | xargs kill

        # 推荐关闭方式
        printf '%s\n' 7001 7002 7003 8001 8002 8003 | xargs -I{} -t redis-cli -p {} shutdown
      • 创建集群

        1
        redis-cli --cluster create --cluster-replicas 1 43.142.80.9:7001 43.142.80.9:7002 43.142.80.9:7003 43.142.80.9:8001  43.142.80.9:8002 43.142.80.9:8003 
        • 参数说明
          • redis-cli --cluster | ./redis-trib.rb:代表集群操作命令
          • create: 代表是创建集群
          • --cluster-replicas 1(数字1): 指定集群中每个master 的副本个数为1,此时节点总数 ➗ (replicas + 1) 得到的就是master 数量,因此节点列表中的前n 个就是master,其他节点都是slave 节点,随机分配到不同master
        成功启动集群
      • 查看集群状态

        1
        redis-cli -p 7001 cluster nodes
        集群状态
    • 散列插槽

      Redis 会把每一个master 节点映射到0~16383 个插槽上,查看集群信息时就能看到

      数据key 不是与节点绑定,而是与插槽绑定,redis 会根据key 的有效部分计算插槽值,分两种情况

      • key 中包含{} {} 中至少包含一个字符,{} 中的部分是有效部分
      • key 中不包含{},整个key 都是有效部分

      例如: key username,那么根据计算,如果是{xxx}username 则根据xxx 计算,计算方式是利用CRC16 算法得到一个hash 值,然后对16384 取余,得到的结果就是slot

      slot(redis-cli -c -p 7001) 参数: -c 至关重要
    • 总结

      • Redis 如何判断某个key 应该在那个实例?

        • 16384 个插槽分配到不同的实例
        • 根据key 的有效部分计算哈希值,16384 取余
        • 余数作为插槽,寻找插槽所在实例即可
      • 如何将同一类数据固定的保存在同一个Redis 实例

        • 这一类数据使用相同的有效部分,例如key 都以{typeId}为前缀

          数据固定的保存在同一个Redis 实例
      • 集群伸缩

        向集群中添加一个新的master 节点,并向其中存储num = 10

        • 需求

          • 启动一个新的redis 实例,端口为7004

            1
            2
            3
            4
            mkdir 7004
            cp redis.conf 7004
            sed -i 's/6379/7004/g' 7004/redis.conf
            redis-server 7004/redis.conf
          • 添加7004 到之前的集群,并作为一个master 节点

            1
            redis-cli --cluster add-node 43.142.80.9:7004  43.142.80.9:7001
          • 7004 节点分配插槽,使得num 这个key 可以存储到7004 实例

            1
            2
            # 重新分片
            redis-cli --cluster reshard ip:port
            分配插槽
            在这里插入图片描述
    • 故障转移

      故障转移
    • 数据迁移

      数据迁移流程

      利用cluster failover 命令可以手动让集群中的某个master 宕机,切换到执行cluster failover 命令的这个slave 节点,实现无感知的数据迁移

      • 手动的Failover 支持三种不同模式
        • 缺省:默认的流程,如图1-6
        • force:省略了对offset 的一致性校验
        • takeover 直接执行第5 步,忽略数据一致性,忽略数据一致性,忽略master 状态和其他master 的意见
    • RedisTemplate 访问分片集群

      • 引入redisstarter 依赖
      • 配置分片集群地址
      • 配置读写分离

      分片集群配置

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      logging:
      level:
      io.lettuce.core: debug
      pattern:
      dateformat: MM-dd HH:mm:ss:SSS
      spring:
      redis:
      cluster:
      nodes: # 指定分片集群的每一个节点信息
      - 43.142.80.9:7001
      - 43.142.80.9:7002
      - 43.142.80.9:7003
      - 43.142.80.9:8001
      - 43.142.80.9:8002
      - 43.142.80.9:8003