SpringCloud
SpringCloud-全家桶
微服务架构的概念以及优势
什么是微服务
- 官方定义: 微服务就是由一系列围绕自己业务开发的微小服务构成,他们独立部署运行在自己的进程里,
基于分布式的管理 - 通俗定义: 微服务是一种架构,这种架构是将单个的整体应用程序分割成更小的项目关联的独立的服务。一个服务通常实现一组独立的特性或功能,
包含自己的业务逻辑和适配器。各个微服务之间的关联通过暴露 api
来实现。这些独立的微服务不需要部署在同一个虚拟机,同一个系统和同一个应用服务器中
单体应用架构优缺点
- 优点: 单一架构模式在项目初期很小的时候开发方便,
测试方便, 部署方便,运行良好 - 缺点: 应用随着时间的推进,加入的功能越来越多,最终会变得巨大,一个项目中很有可能数百万行的代码,互相之间繁琐的
jar
包。久而久之,开发的效率低,代码维护困难。还有一个如果想整体应用采用新的技术,新的框架或者语言,那是不可能的,任意模块的漏洞或者错误都会影响整个应用,降低系统的可靠性
微服务加入的优缺点
- 优点: 将服务拆分成多个单一职责的小的服务,进行单独部署,服务之间通过网络进行通信,每个服务应该有自己单独的管理团队,高度自治,服务各自有各自单独的职责,服务之间松耦合,避免因一个模块的问题导致服务崩溃
- 缺点
- 开发人员需要处理分布式的复杂性
- 多服务运维难度,随着服务的增加,运维的压力也在增大
- 服务治理和服务监控是关键
SpringCloud
什么是 SpringCloud
-
官网
1
2# https://spring.io/projects/spring-cloud
Spring Cloud provides tools for developers to quickly build some of the common patterns in distributed systems (e.g. configuration management, service discovery, circuit breakers, intelligent routing, micro-proxy, control bus, one-time tokens, global locks, leadership election, distributed sessions, cluster state). Coordination of distributed systems leads to boiler plate patterns, and using Spring Cloud developers can quickly stand up services and applications that implement those patterns. They will work well in any distributed environment, including the developer’s own laptop, bare metal data centres, and managed platforms such as Cloud Foundry.springcloud
为开发人员提供了分布式系统中快速构建一些通用模式的工具 (例如: 配置管理、服务发现、断路器、微代理、控制总线)。分布式系统的协调导致了锅炉版模式。使用 springcloud
开发人员可以快速地建立实现这些模式的服务和应用程序 -
通俗理解
springcloud
是一个涵盖多个子项目的开发工具集,集合了众多的开源框架,他利用了 Spring Boot
开发的便利性实现了很多功能,如服务注册,服务注册发现,负载均衡等。 SpringCloud
在整合过程中主要是针对 Neflilx
开源组件的封装, SpringCloud
的出现真正的简化了分布式架构的开发。 Netflix
是美国的一个在线视频网站, 微服务业的翘楚,它是公认的大规模生产微服务的杰出实践者, Netflix
的开源组件已经在它的大规模分布环境中经过多年的生产实践验证,因此 SpringCloud
中很多组件都是基于 Netflix
版本选择
IDEA-模拟集群实现
-
集群配置
-
配置修改
-Dserver.port=8082
SpringCloud-环境搭建
环境搭建
-
java
环境 jdk:1.8
-
创建父项目
(基于 springboot
初始化器构建) 父工程 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44<!-- 父项目的 pom.xml -->
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<modules>
<module>eureka-server</module>
</modules>
<!-- lookup parent from repository -->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.6</version>
<relativePath/>
</parent>
<!-- 修改打包方式 pom -->
<packaging>pom</packaging>
<groupId>com.example</groupId>
<artifactId>blr-cloud</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<java.version>1.8</java.version>
<spring.cloud-version>2021.0.3</spring.cloud-version>
</properties>
<dependencyManagement>
<dependencies>
<!-- spring cloud -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring.cloud-version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
</project> -
dependencyManagement
和 dependencies
-
maven
使用 dependencyManagement
元素来提供了一种管理依赖版本号的方式, 通常会在一个组织者的最顶层的父 pom
中看到 dependencyManagement
元素 -
使用
pom.xml
中的 dependencyManagement
元素能让所有在子项目中引用一个依赖而不需 显示的列出版本号
,maven
会沿着父子层次向上走, 直到找到一个拥有 dependencyManagement
元素的项目, 然后他就会使用这个 dependencyManagement
元素中指定的版本号 -
这样做的好处就是: 如果有多个子项目都引用同一样的依赖,
则可以避免在每个使用的子项目里都声明一个版本号, 这样当想升级或切换到另一个版本时, 只需在顶层父容器里更新, 而不需要一个一个子项目的修改, 另外如果某个子项目需要另外的一个版本, 只需要声明 version
就可以了 -
dependencyManagement
里 只是声明依赖,
,因此子项目需要显示的声明需要使用的依赖,如果子项目中指定了版本号,并不实现引入 那么就会使用子项目中指定的 jar
版本
-
服务远程调用
-
创建父项目
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example.demo</groupId>
<artifactId>cloud-demo</artifactId>
<version>1.0</version>
<modules>
<module>user-service</module>
<module>order-service</module>
</modules>
<packaging>pom</packaging>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.9.RELEASE</version>
<relativePath/>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<spring-cloud.version>Hoxton.SR10</spring-cloud.version>
<mysql.version>5.1.47</mysql.version>
<mybatis.version>2.1.1</mybatis.version>
</properties>
<dependencyManagement>
<dependencies>
<!-- springCloud -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<!--mybatis-->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>${mybatis.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
</project> -
创建
order-service
子模块 -
修改
xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!--mybatis-->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
</dependency>
</dependencies> -
创建实体类
1
2
3
4
5
6
7
8
9
public class Order {
private Long id;
private Long price;
private String name;
private Integer num;
private Long userId;
private User user;
}1
2
3
4
5
6
public class User {
private Long id;
private String username;
private String address;
} -
创建
mapper
1
2
3
4
5
6
7
public interface OrderMapper {
Order findById(Long id);
} -
创建
service
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class OrderService {
private OrderMapper orderMapper;
public Order queryOrderById(Long orderId) {
// 1.查询订单
Order order = orderMapper.findById(orderId);
// 4.返回
return order;
}
} -
创建控制器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class OrderController {
private OrderService orderService;
public Order queryOrderByUserId( { Long orderId)
// 根据id 查询订单并返回
return orderService.queryOrderById(orderId);
}
} -
配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18# application.yml
server:
port: 8080
spring:
datasource:
url: jdbc:mysql://localhost:3306/cloud_order?useSSL=false
username: root
password: root
driver-class-name: com.mysql.jdbc.Driver
mybatis:
type-aliases-package: com.example.user.pojo
configuration:
map-underscore-to-camel-case: true
logging:
level:
com.example: debug
pattern:
dateformat: MM-dd HH:mm:ss:SSS -
sql
脚本 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22-- 创建数据库
create database cloud_order;
-- 创建表与测试数据
CREATE TABLE `tb_order` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '订单id',
`user_id` bigint NOT NULL COMMENT '用户id',
`name` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '商品名称',
`price` bigint NOT NULL COMMENT '商品价格',
`num` int DEFAULT '0' COMMENT '商品数量',
PRIMARY KEY (`id`) USING BTREE,
UNIQUE KEY `username` (`name`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=109 DEFAULT CHARSET=utf8mb3 ROW_FORMAT=COMPACT
INSERT INTO tb_order VALUES (101, 1, 'Apple 苹果 iPhone 12 ', 699900, 1);
INSERT INTO tb_order VALUES (102, 2, '雅迪 yadea 新国标电动车', 209900, 1);
INSERT INTO tb_order VALUES (103, 3, '骆驼(CAMEL)休闲运动鞋女', 43900, 1);
INSERT INTO tb_order VALUES (104, 4, '小米10 双模 5G 骁龙 865', 359900, 1);
INSERT INTO tb_order VALUES (105, 5, 'OPPO Reno3 Pro 双模5G 视频双防抖', 299900, 1);
INSERT INTO tb_order VALUES (106, 6, '美的(Midea) 新能效 冷静星II ', 544900, 1);
INSERT INTO tb_order VALUES (107, 2, '西昊/SIHOO 人体工学电脑椅子', 79900, 1);
INSERT INTO tb_order VALUES (108, 3, '梵班(FAMDBANN)休闲男鞋', 31900, 1);
-
-
创建
user-service
子模块 -
修改
pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!--mybatis-->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
</dependency>
</dependencies> -
创建实体类
1
2
3
4
5
6
public class User {
private Long id;
private String username;
private String address;
} -
创建
mapper
1
2
3
4public interface UserMapper {
User findById(; Long id)
} -
创建
service
1
2
3
4
5
6
7
8
9
10
public class UserService {
private UserMapper userMapper;
public User queryById(Long id) {
return userMapper.findById(id);
}
} -
创建控制器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class UserController {
private UserService userService;
/**
* 路径: /user/110
*
* @param id 用户id
* @return 用户
*/
public User queryById( { Long id)
return userService.queryById(id);
}
} -
创建配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17server:
port: 8081
spring:
datasource:
url: jdbc:mysql://localhost:3306/cloud_user?useSSL=false
username: root
password: root
driver-class-name: com.mysql.jdbc.Driver
mybatis:
type-aliases-package: com.example.user.pojo
configuration:
map-underscore-to-camel-case: true
logging:
level:
com.example: debug
pattern:
dateformat: MM-dd HH:mm:ss:SSS -
创建
sql
脚本 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23-- 创建数据库
create database cloud_user;
-- 创建表与测试数据
CREATE TABLE `tb_user`
(
`id` bigint NOT NULL AUTO_INCREMENT,
`username` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '收件人',
`address` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '地址',
PRIMARY KEY (`id`) USING BTREE,
UNIQUE KEY `username` (`username`) USING BTREE
) ENGINE = InnoDB
AUTO_INCREMENT = 109
DEFAULT CHARSET = utf8mb3
ROW_FORMAT = COMPACT
INSERT INTO tb_user VALUES (1, '柳岩', '湖南省衡阳市');
INSERT INTO tb_user VALUES (2, '文二狗', '陕西省西安市');
INSERT INTO tb_user VALUES (3, '华沉鱼', '湖北省十堰市');
INSERT INTO tb_user VALUES (4, '张必沉', '天津市');
INSERT INTO tb_user VALUES (5, '郑爽爽', '辽宁省沈阳市大东区');
INSERT INTO tb_user VALUES (6, '范兵兵', '山东省青岛市');
-
-
启动访问测试
order
user -
需求: 根据订单
id
查询订单功能的同时, 把订单所属的用户信息一起返回 -
修改一: 在
order-service
中注册 RestTemplate
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18// order-service 启动类下注册
public class OrderApplication {
public static void main(String[] args) {
SpringApplication.run(OrderApplication.class, args);
}
/**
* 创建 RestTemplate 并注入 spring 容器
* @return
*/
public RestTemplate restTemplate() {
return new RestTemplate();
}
} -
修改二: 修改
order-service
的控制器 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class OrderService {
private OrderMapper orderMapper;
private RestTemplate restTemplate;
public Order queryOrderById(Long orderId) {
// 1.查询订单
Order order = orderMapper.findById(orderId);
/******************************************/
String url = "http://localhost:8081/user/" + order.getUserId();
// 发起远程调用
User user = restTemplate.getForObject(url, User.class);
// 封装 user 到 order
order.setUser(user);
/******************************************/
// 4.返回
return order;
}
}实现服务远程调用
-
-
服务注册中心介绍
-
远程调用的问题
问题原因 在远程调用过程中,
如果书写硬编码则在使用时是很不方便的,如果服务出现集群,那么如何抉择地址呢?在此种问题下,服务注册中心出现,解决上述的问题 -
什么是服务注册中心
所谓服务注册中心就是在整个的微服务架构中单独提出一个服务,这个服务不完成系统的任何业务功能,仅仅用来完成对整个服务系统的服务注册和服务发现,以及对服务健康状态的监控和管理功能
无服务注册中心 服务注册中心 -
服务注册中心
- 可以对所有的微服务的信息进行存储,如服务的名称、
IP
、端口等 - 可以在进行服务调用时通过服务发现查询可用的微服务列表及网络地址进行服务调用
- 可以对所有的微服务进行心跳检测,如发现某实例长时间无法访问,就会从服务注册表移除该实例
- 可以对所有的微服务的信息进行存储,如服务的名称、
服务注册中心组件
Eureka
-
作用
作用介绍 不论是服务消费者还是服务提供者,
统称 client
,首先作为客户端 ( client
),client
会注册服务信息到 Eureka-Server(注册中心)
,通过服务拉取, 获取信息, 在通过远程调用, 选择一个进行调用 -
创建
Eureka-Server
模块 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!-- 继承父项目 -->
<parent>
<artifactId>blr-cloud</artifactId>
<groupId>com.example</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>eureka-server</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- spring-web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- eureka-server -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>
</dependencies>
</project>-
配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18# Eureka-Server 的 application.yml
server:
port: 8761 # 执行服务端口号
spring:
application:
name: EUREKA-SERVER # 指定服务名称 唯一标识
eureka:
instance:
hostname: localhost # eureka 服务端的实例名称
client:
register-with-eureka: false # false表示不在注册中心注册自己
fetch-registry: false # false 表示自己端就是注册中心,我的职责就是维护服务实例, 并不需要去检索服务
service-url:
# 设置与 Eureka server 交互的地址查询服务和注册服务都需要依赖这个地址
defaultZone: http://${eureka.instance.hostname}:${server.port}/eureka/ -
启动类添加注解
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17package com.example.eureka.server;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;
/**
* @author coder-itl
*/
// 开启当前应用是一个服务注册中心
public class EurekaServer {
public static void main(String[] args) {
SpringApplication.run(EurekaServer.class, args);
}
} -
启动后在浏览器访问:
http://localhost:8761/
需要的关注信息
-
-
创建
Eureka-Client
模块,这个模块是一个功能应用 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>blr-cloud</artifactId>
<groupId>com.example</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>eureka-client</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
</dependencies>
</project>-
配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15server:
port: 8762 # 自定义启动端口
spring:
application:
name: EUREKACLIENT
eureka:
client:
register-with-eureka: true # 表示是否将自己注册进 EurekaServer 默认为 true
fetch-registry: true # 是否从 EurekaServer 抓取已有的注册信息,默认为 true, 单节点无所谓, 集群必须设置为 true 才能配合 ribbon 使用负载均衡
service-url:
# 设置与 Eureka server 交互的地址查询服务和注册服务都需要依赖这个地址
defaultZone: http://localhost:8761/eureka/ -
启动类添加注解
1
2
3
4
5
6
7
8
9
10
11
12
13
14package com.example.client;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
public class EurekaClientApplication {
public static void main(String[] args) {
SpringApplication.run(EurekaClientApplication.class, args);
}
} -
启动顺序为:
server->client
-
启动
在注册中心查看到已成功注册
-
-
自我保护机制
EurekaClient
会通过心跳的方式去和 EurekaServer
进行连接 ( 默认 30s EurekaClient
会发送一次心跳请求,如果超过了 90s
还没有发送心跳信息的话, EurekaServer
就认为你宕机了, 将当前 EurekaClient
从注册表中移除) 1
2
3
4
5# EurekaServer
eureka:
instance:
lease-renewal-interval-in-seconds: 30 # 心跳的间隔
lease-expiration-duration-in-seconds: 90 # 多久没发送,就认为你宕机了 1
2
3
4# EurekaClient
eureka:
client:
registry-fetch-interval-seconds: 30 # 默认 每隔多久去更新一下本地的注册表缓存信息-
Eureka
的自我保护机制, 统计 15
分钟内, 如果一个服务的心跳发送比例低于 85%
,EurekaServer
就会开启自我保护机制 -
不会从
EurekaServer
中去除长时间没有收到的心跳的服务 -
EurekaServer
还是可以正常提供服务的 -
网络比较稳定时,
EurekaServer
才会开始将自己的信息被其他节点同步过去 1
2
3
4# EurekaServer
eureka:
server:
enable-self-preservation: true # 默认开启自我保护机制
-
-
CAP
定理 C
一致性 A
可用性 P
分区容错性
这三个特性在分布式情况环境下,
只能满足 2
,个 而且分区容错性在分布式环境下, 是必须要满足的, 只能在 AC
之间进行权衡 - 如果先择
CP
: 保证了一致性,可能会造成你系统在一定时间内是不可用的, 如果你同步数据的时间比较长, 造成的损失大 Eureka
就是一个 AP
的效果, 高可用的集群, Eureka
集群是无中心的 (无 master
)Eureka
即便宕机几个也不会影响系统的使用, 不需要重新的去推举一个 master
,也会导致一定时间内数据不一致
-
-
集群搭建
分析 -
EurekaServer
集群实现步骤 -
创建多个
EurekaServer
项目 idea
通过修改端口模拟集群 -
引入
eureka server
依赖 -
配置文件
-
node1
1
2
3server:
port: 8761
http://localhost:8762/eureka, http://localhost:8763/eureka -
node2
1
2
3server:
port: 8762
http://localhost:8761/eureka, http://localhost:8763/eureka -
node3
1
2
3server:
port: 8763
http://localhost:8761/eureka, http://localhost:8762/eureka在同一个 EurekaServer
配置文件中,启动时修改如下
-
-
在每个启动类添加注解
@EnableEurekaServer
-
-
-
将
user-service
和 order-service
作为客户端加入 -
添加客户端依赖
-
添加客户端配置
-
启动类添加客户端注解
-
修改
order-service
的远程调用 1
2// String url = "http://localhost:8081/user/" + order.getUserId();
String url = "http://USERSERVICE/user/" + order.getUserId();1
2
3
4
5
// 负载均衡配置
public RestTemplate restTemplate() {
return new RestTemplate();
}列表 负载均衡流程
-
Zookeeper
-
前往
Zookper
,了解安装与使用 -
测试主机到虚拟机网络连通性
测试主机到虚拟机网络连通性
-
创建名为
cloud-provider-payment8004
的子模块 -
修改
pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spring-cloud-parent</artifactId>
<groupId>com.coderitl.springcloud</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cloud-provider-payment8004</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 整合 zookeeper 客户端 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zookeeper-discovery</artifactId>
<!-- 先排除自带的 zookeeper 3.5.3 -->
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 在添加新版本 zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.9</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>com.coderitl.springcloud</groupId>
<artifactId>cloud-api-commons</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project> -
创建启动类
1
2
3
4
5
6
7
// 该注解用于向 consul 或这 zookeeper作为注册中心时注册服务
public class PaymentMain8004 {
public static void main(String[] args) {
SpringApplication.run(PaymentMain8004.class, args);
}
} -
创建
配置文件
1
2
3
4
5
6
7
8
9
10
11
12# 8004 代表注册到 zookeeper 服务器的支付服务提供者的端口号
server:
port: 8004
# 服务别名-注册 zookeeper 到注册中心名称
spring:
application:
name: cloud-provider-payment
cloud:
zookeeper:
connect-string: 192.168.247.130:2181 # 连接zookeeper 服务器地址
-
创建控制器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21package com.coderitl.springcloud.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;
public class PaymentController {
private String serverPort;
public String paymentzk() {
return "springCloud with zookeeper: " + serverPort + "\t" + UUID.randomUUID();
}
} -
启动测试
-
版本依赖问题
-
错误问题
错误问题 -
解决方案
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18<!-- 整合 zookeeper 客户端 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zookeeper-discovery</artifactId>
<!-- 先排除自带的 zookeeper 3.5.3 -->
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 在添加新版本 zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.9</version>
</dependency>
-
-
服务成功注册进
Zookeeper
查看服务是否注册成功 访问 -
查看
zookeeper
内数据信息 相信信息
-
-
服务启动后生成的节点是临时节点,
zookeeper
服务器再一定时间内心跳没有回应, 会清理该节点
Consul
-
特性
特性 Eureka Nacos Consul Zookeeper CAP AP
CP+AP
CP
CP
健康检查 Client Beat
TCP/HTTP/MYSQL/Client Beat
TCP/HTTP/gPRC/CMD
Keep Alive
雪崩保护 有 有 无 无 自动注销实例 支持 支持 不支持 支持 访问协议 HTTP
HTTP/DNS
TTP/DNS
TCP
监听支持 支持 支持 支持 支持 多数据中心 支持 支持 支持 不支持 跨注册中心同步 不支持 支持 支持 不支持 SpringCloud 集成 支持 支持 支持 支持
-
Consul
角色client
:客户端,无状态, 将 HTTP
和 DNS
接口请求转发给局域网内的服务端集群 server
:服务端,保存配置信息, 高可用集群, 每个数据中心的 server
数量推荐为 3
个或者 5
个
-
工作原理
工作原理
-
官网
-
下载
-
两个下载的内容都是
consul.exe
选择 amd64
-
单节点
cd
到对应目录 consul.exe
,使用cmd
启动 Consul
1
2# 启动参数说明: -dev 表示开发模式运行 -server
表示服务器模式运行
consul agent -dev -client=0.0.0.0当前目录下 cmd
启动 consul
为了方便启动,
也可以在 consul.exe
同级目录下创建一个脚本来启动 1
2consul agent -dev -client=0.0.0.0
pause创建启动脚本文件 -
访问
访问测试是否启动成功
-
创建子
model=> cloud-providerconsul-payment8006
-
创建配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13server:
port: 8006
spring:
application:
name: consul-provider-payment
cloud:
consul:
host: localhost
port: 8500
discovery:
service-name: ${spring.application.name} -
修改
pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47<dependencies>
<!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-consul-discovery -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-consul-discovery</artifactId>
</dependency>
<dependency>
<groupId>com.coderitl.springcloud</groupId>
<artifactId>cloud-api-commons</artifactId>
<version>${project.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-devtools -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies> -
主启动
1
2
3
4
5
6
7
8
public class PaymentMain8006 {
public static void main(String[] args) {
SpringApplication.run(PaymentMain8006.class, args);
}
} -
控制器
1
2
3
4
5
6
7
8
9
10
11
12
13
public class PaymentController {
private String serverPort;
public String paymentConsul() {
return "SpringCloud with consul: " + serverPort + "\t" + UUID.randomUUID();
}
} -
结果页面
成功入驻 -
访问
localhost:8006/payment/consul
访问测试
-
创建子
model=> cloud-consumerconsul-order80
-
修改
pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51<dependencies>
<!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-consul-discovery -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-consul-discovery</artifactId>
</dependency>
<dependency>
<groupId>com.coderitl.springcloud</groupId>
<artifactId>cloud-api-commons</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-devtools -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies> -
创建配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14server:
port: 80
spring:
application:
name: consul-consumer-order
cloud:
consul:
host: localhost
port: 8500
discovery:
service-name: ${spring.application.name} -
主启动
1
2
3
4
5
6
7
public class OrderConsulMain80 {
public static void main(String[] args) {
SpringApplication.run(OrderConsulMain80.class, args);
}
} -
控制器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class OrderConsulController {
// 服务中心的服务名称
public static final String INVOKE_URL = "http://consul-provider-payment";
private RestTemplate restTemplate;
public String paymentInfo() {
String result = restTemplate.getForObject(INVOKE_URL + "/payment/consul", String.class);
return result;
}
} -
结果页面
服务消费者注册进 Consul
访问 80
…
Nacos
-
服务注册到
Nacos
-
在
父工程
中的dependencyManagement
下添加 spring-cloud-alibaba
的管理依赖 1
2
3
4
5
6
7
8<!-- nacos 的管理依赖 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>2021.0.1.0</version>
<type>pom</type>
<scope>import</scope>
</dependency> -
注释掉
服务中
原有的eureka
依赖, 添加 nacos
配置,移除 eureka
的配置 1
2
3
4spring:
cloud:
nacos:
server-addr: localhost:8848 # TODO: 添加关于 Nacos 的配置 => nacos服务地址 -
添加
nacos
的客户端依赖 1
2
3
4
5
6
7
8
9<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!-- 解决负载均衡 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-loadbalancer</artifactId>
</dependency> -
结果页面
结果页面:服务名称建议小写, 在 order-service 配置
Nacos 作为注册中心 -
nacos
注册中心细节问题 -
负载均衡依赖
1
2
3
4
5<!-- 2. TODO: 添加负载均衡依赖 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-loadbalancer</artifactId>
</dependency>该依赖解决如下错误 -
远程调用的地址
使用小写
(主要是通过可视化页面获取) 1
2// TODO: 使用服务发现,
完成服务拉取 修改访问地址 (与 Eureka 名称区别: 小写)
private static final String URL = "http://user-service8081/user/"; -
修改
/user/system32/drivers/etc/hosts
1
2
3# 添加对应的服务名称 与 127.0.0.1 绑定
# Eg:
127.0.0.1 user-service8081可解决此错误
-
-
-
集群属性
1
2
3
4
5
6
7spring:
cloud:
nacos:
server-addr: localhost:8848
# 实现集群配置
discovery:
cluster-name: BJ # 集群名称(自定义名称, 也就是机房位置) 例如: 北京 BJ (同区域集群使用同一名称) -
调整负载均衡配置
1
2
3
4# user-service8081 名称严格为
nacos 服务列表中的服务名
user-service8081:
ribbon:
NFLoadBalancerRuleClassName: com.alibaba.cloud.nacos.ribbon.NacosRule # 负载均衡规则 -
Nacos
负载均衡策略 - 优先选择同集群服务实例列表
- 本地集群找不到提供者,
才去其他集群寻找, 并且会报警告 - 确定了可用实例列表后,
再采用随机负载均衡挑选实例
-
权重配置
服务器设备性能有差异,
部分实例所在机器性能较好, 另一些比较差, 我们希望性能好的机器承担更多的用户请求: Nacos
提供了权重配置来控制访问频率, 权重越大则访问频率越高 配置权重(数字越小, 被访问的概率将会降低) - 实例权重控制
nacos
控制台可以设置实例的权重值, 0-1
之间 - 同集群内的多个实例,
权重越高被访问的频率越高 - 权重设置为
0
则完全不会被访问
- 实例权重控制
-
环境隔离:
namespace
Nacos
中服务存储和数据存储的最外层都是一个名为 namespace
的东西, 用来做最外层隔离 namespace
-
新建命名空间
新建命名空间 新建命名空间 -
ID
很重要 重要 ID
-
添加
namespace
1
2
3
4
5
6
7# order-service application.yml
spring:
cloud:
nacos:
discovery:
namespace: 9527 # 命名空间填 创建命名空间时的 id【默认可以通过 UUID 自动生成】
配置完成命名空间 -
总结
Nacos
环境隔离 namespace
用来做环境隔离 - 每个
namespace
都有唯一 id
- 不同的
namespace
下的服务不可见
-
-
临时实例
临时实例 -
临时实例
采用心跳检测
-
非临时实例
非临时实例 ( Nacos
)会主动询问是否健康 -
临时实例和非临时实例配置
1
2
3
4
5spring:
cloud:
nacos:
discovery:
ephemeral: false # 设置为非临时实例 -
Eureka
和 Nacos
区别 - 共同点
- 都支持服务注册和服务拉取
- 都支持服务提供者心跳方式做健康检测
- 区别
Nacos
支持服务端主动检测提供者状态: 临时实例采用心跳模式, 非临时实例采用主动检测模式 - 临时实例心跳不正常会被剔除,
非临时实例则不会被剔除 Nacos
支持服务列表变更的消息推送模式, 服务列表更新更及时 Nacos
集群默认采用 AP
方式, 当集群中存在非临时实例时, 采用 CP
模式, Eureka
采用 AP
模式
- 共同点
Ribbon
-
简介
SpringCloudRibbon
是基于 netflix Ribbon
实现的一套 客户端
负载均衡的工具简单地说,
Ribbon
是 Netflix
发布的开源项目, 主要的功能是提供 客户端的软件负载均衡算法和服务调用
,Ribbon
客户端组件提供的一系列完善的配置项如何连接超时的, 重试等,简单地说, 就是在配制文件中列出 Load Balancer(简称
LB) 后面所有的机器, Ribbon
会自动地帮助你基于某种规则 (如简单轮询, 随机连接等) 去连接这些机器, 我们很容易使用 Ribbon
实现自定义的负载均衡算法 -
LB
负载均衡 ( Load Balance
)是什么 简单地说就是将用户的请求平摊的分配到多个服务上,
从而达到系统的 HA
(高可用)常见的负载均衡有软件
Nginx,LVS,硬件 F5
等 -
Ribbon
本地负载均衡客户端 VS Nginx
服务端负载均衡的区别 Nginx
是服务器负载均衡, 客户端所有请求都会交给 nginx
,然后由nginx
实现转发请求, 即 负载均衡
是由服务端实现的Ribbon
本地负载均衡, 在调用微服务接口的时候, 会在注册中心上获取注册信息服务列表之后缓存到 JVM
本地, 从而实现 RPC
远程服务调用技术 -
使用
-
依赖问题
新版 Eureka
已经引入 Ribbon
-
Ribbion
的负载均衡和Rest
调用 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23// 返回对象响应体中数据转换成的对象,
基本可以理解为 JSON (restTemplate.getForObject)
public CommonResult<Payment> getPayment( { Long id)
log.info("OrderController id: {}", id);
// 写操作: postForObject
return restTemplate.getForObject(PAYMENT_URL + "/payment/get/" + id, CommonResult.class);
}
// 返回对象为 ResponseEntity对象, 包含响应中的一些重要信息, 必须响应头, 响应状态码 响应体等
public CommonResult<Payment> getPayment2( { Long id)
log.info("OrderController id: {}", id);
ResponseEntity<CommonResult> entity = restTemplate.getForEntity(PAYMENT_URL + "/payment/get/" + id, CommonResult.class);
if (entity.getStatusCode().is2xxSuccessful()) {
return entity.getBody();
} else {
return new CommonResult(444, "操作失败");
}
}
// restTemplate.postForEntity(PAYMENT_URL + "/payment/get/" + id, CommonResult.class).getBody(); -
Ribbon
核心组件 IRule
IRule
:根据特定算法中从服务列表中选取一个要访问的服务1
2
3
4
5
6
7
8
9// IRule 是一个接口 作用范围: 全体服务
public interface IRule {
Server choose(Object var1);
void setLoadBalancer(ILoadBalancer var1);
ILoadBalancer getLoadBalancer();
}关系图
-
-
7
种负载均衡方式 com.netflix.loadbalancer.RoundRobinRule
轮询 com.netflix.loadbalancer.RandomRule
随机 com.netflix.loadbalancer.RetryRule
先按照 轮询
的策略获取服务,如果获取服务失败则在指定时间内会进行重试, 获取可用的服务 WeightedResponseTimeRule
对 轮询
的扩展,响应速度越快的实例选择权重越大, 越容易被选择 BestAvailableRule
会先过滤掉由于多次访问故障而处于断路跳闸状态的服务, 然后选择一个并发最下的服务 AvailabiltiyFilteringgRule
先过滤掉故障实例, 在选择并发较小的实例 ZoneAvoidanceRule
:默认规则, 复合判断 server
所在区域的性能和 server
的可用性选择服务器
-
负载均衡流程
负载均衡流程 -
负载均衡原理
负载均衡算法:
rest 接口第几次请求数
%服务器集群总数量
=实际调用服务器位置的下标
,每次服务器重启后 rest
接口计数从 1
开始 1
2
3
4
5
6
7
8
9
10
11
12
13List<ServiceInstance> instances = discoryClient.getInstances("CLOUD-PAYMENT-SERVICE");
如:
List[0] instances = 127.0.0.1:8001
List[1] instances = 127.0.0.1:8002
8001 + 8002 => 组合为集群,他们共计 2 台机器, 集群总数为 2 按照轮询算法原理:
当总请求数为 1 时: 1 % 2 = 1 对应下标位置为 1,则获取服务器地址为: 127.0.0.1:8002
当总请求数为 2 时: 2 % 2 = 0 对应下标位置为 0,则获取服务器地址为: 127.0.0.1:8001
当总请求数为 3 时: 3 % 2 = 1 对应下标位置为 1,则获取服务器地址为: 127.0.0.1:8002
当总请求数为 4 时: 4 % 2 = 0 对应下标位置为 0,则获取服务器地址为: 127.0.0.1:8001
....
-
如何替换
( 全局替换
)-
配置细节
官方文档明确给出警告:
这个自定义配置类不能放在
@ComponentScan
所扫描的当前包下以及子包下,否则我们自定义的这个配置类就会被所有的 Ribbon
客户端所共享, 达不到特殊定制化的目的了,也就是在启动类 ( @SpringBootApplication
复合注解, 内部有 ComponenScan
)的父包外 -
新建包
package com.coderitl.myrule;
1
2
3
4
5
6
7
8
public class MySelfRule {
public IRule myRule() {
// 定义为随机(其他的同理 new XX) 作用范围: 局部服务生效
return new RandomRule();
}
} -
主启动添加注解
1
2
3
4
5
6
7
8
9
// 新添加
public class OrderMain80 {
public static void main(String[] args) {
SpringApplication.run(OrderMain80.class, args);
}
} -
访问
http://localhost/consumer/payment/get/1
端口就变为随机的 随机验证
-
-
基于某个配置文件替换
1
2
3
4
5
6
7
8
9
10
11# 单独定格书写
userservice: # 是一个名为 userservice 的服务 ->服务名称
ribbon:
NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule # 负载均衡规则
# 日志配置
logging:
level:
com.coderitl: debug # 指定包 debug 级别
pattern:
dataformat: MM-dd HH:mm:ss:SSS -
两种方式的区别
- 代码方式: 配置灵活,但修改时需要重新打包发布
- 配置方式: 直观、方便、无需重新打包发布,但是无法做全局配置
-
饥饿加载
Ribbon
默认是采用懒加载,即第一次访问时才会创建 LoadBalanceClient
,请求时间会很长,而饥饿加载则会在项目启动时创建, 降低第一次访问的耗时,配置如下: 1
2
3
4
5
6
7
8
9
10ribbon:
eager-load:
enable: true # 开启饥饿加载
clients: userservice # 指定对 userservice 这个服务饥饿加载
# 多个服务配置方式
clients:
- a
- b
配置管理
-
Nacos
配置管理 Nacos
配置管理 -
新增配置文件
新增配置文件 创建流程 -
配置文件加载流程
未加入 nacos
前 读取 nacos
配置文件 -
配置统一管理步骤
-
添加依赖
1
2
3
4
5<!-- nacos 配置管理依赖 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency> -
在
userservice
中的 resource
目录下创建一个 bootstrap.yml
文件, 这个文件是引导文件, 优先级高于 application.yml
1
2
3
4
5
6
7
8
9
10
11# TODO: 2. 新建 bootstrap.yml 文件
spring:
application:
name: userservice # 服务名称【大小写与服务名一致】
profiles:
active: dev # 开发环境 这里是 dev
cloud:
nacos:
server-addr: localhost:8848 # Nacos 地址
config:
file-extension: yaml # 文件后缀名文件分析 -
删除主配置文件中重复配置
删除主配置文件中重复配置
-
-
读取
userservice-dev.yaml
配置文件中的属性 1
2
3
4
5
6<!-- 补充一个依赖 -->
<!-- TODO: 新版本 nacos 用于加载 bootstrap.yml -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bootstrap</artifactId>
</dependency>无此依赖出现如下问题 (新版 nacos
)添加后 1
2
3# userservice-dev.yaml
pattern:
dataformat: yyyy-MM-dd HH:mm:ss1
2
3
4
5// TODO: 5. 添加访问配置文件的控制器
public String getDataformat() {
return LocalDateTime.now().format(DateTimeFormatter.ofPattern(dataformat));
}读取 userservice.yml
-
-
方式一:在
@Value
注入的变量所在类添加注解 @RefreshScope
实现热更新 => dataformat 更改为 dateformat
-
方式二: 使用
@ConfigurationProperties
注解 (推荐) 1
2
3
4
5
6
7
8
9// 配置类
// 前缀 + dateformat => 组成完整字段
public class PatternProperties {
// TODO: 注意匹配 nacos 定于的属性名,切勿写错
private String dateformat;
}1
2
3
4
5
6
7
8
9// TODO: 7. 热加载方式二: 使用配置类
private PatternProperties patternProperties;
// TODO: 5. 添加访问配置文件的控制器
public String getDataformat() {
return LocalDateTime.now().format(DateTimeFormatter.ofPattern(patternProperties.getDateformat()));
}
-
多环境共享
不是所有的配置都适合放到配置中心,维护起来比较麻烦;建议将一些关键参数,需要
运行时调整的参数
放到nacos
配置中心,一般都是自定义配置
。-
微服务启动会从
nacos
读取多个配置文件 [spring.application.name]-[spring.profiles.active].yaml => userservice-dev.yaml
[spring.application.name].yaml => userservice.yaml
无论
profile
如何变化, [spring.application.name].yaml
这个文件按一定会被加载, 因此对环境共享配置可以写入这个文件 -
创建
创建 spring.application=> userservice
-
创建多环境共享环境
IDEA => 端口模拟实现
-
java
1
2
3// 在 配置类中添加
// TODO: 用于读取多环境配置数据
private String envSharedValue;1
2
3
4
5
6
7
8
private PatternProperties patternProperties;
// TODO: 8. 配置多环境共享配置读取测试
public PatternProperties patternProperties() {
return patternProperties;
} -
访问测试
成功读取多环境共享数据
-
-
多种配置的优先级
( 相同数据读取时显示内容遵循如下规则
)服务名-profile.yaml(userservice-dev.yaml) > 服务名称.yaml(userservice.yaml) > 本地配置
(application.yaml)
-
集群
集群 -
集群步骤
- 搭建数据库,
初始化数据库表结构 - 下载
nacos
安装包 - 配置
nacos
- 启动
nacos
集群 nginx
反向代理
- 搭建数据库,
-
起步配置
-
进入
nacos
的 conf
目录, 修改配置文件 cluster.conf.example
,重命名为cluster.conf
配置如下信息 -
修改
\nacos\conf\application.properties
文件, 添加数据库配置 1
2
3
4
5
6# 以下内容被注释,
取消注释修改局部信息即可
spring.datasource.platform=mysql
db.num=1
db.url.0=jdbc:mysql://127.0.0.1:3306/nacos_config?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC # 除了 nacos_config 数据库名称修改外,其他参数请勿修改
db.user.0=root
db.password.0=root -
在连接的数据库上执行
sql
脚本 -
将
nacos
复制 3
,份 再修改 各自对应端口号
1
2
3nacos1 =>8845
nacos2 =>8846
nacos3 =>8847修改各自端口号 Eg: nacos1\conf\application.properties=> server.port=8845
-
然后分别启动三个节点
然后分别启动三个节点 -
Nginx
反向代理配置 (其他内容为注释, 可以选择全部替换, 或者只替换 http
)节点 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32#user nobody;
worker_processes 1;
events {
worker_connections 1024;
}
http {
include mime.types;
default_type application/octet-stream;
sendfile on;
keepalive_timeout 65;
upstream nacos-cluster{
server 127.0.0.1:8845;
server 127.0.0.1:8846;
server 127.0.0.1:8847;
}
server {
listen 80;
server_name localhost;
# 配置 nacos
location /nacos {
proxy_pass http://nacos-cluster;
}
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}
}
} -
访问
localhost/nacos => 用户名密码: nacos
访问测试 -
nacos
配置可以存储在数据库中 数据库查看配置
-
Feign
-
是什么
Feign
是一个声明式 WebService
客户端, 使用 Feign
能让编写 Web Service
客户端更加简单. 他的使用方法是
定义一个服务接口然后再上面添加注解
,Feign
也支持可插拔式的编码器和解码器, SpringCloud
对 Feign
进行了封装, 使其支持 SpringMVC
标准注解和 HttpMessageConverters
,Fegin
可以与 Eureka
和 Ribbon
组合使用以支持负载均衡 -
Feign
能干什么 Feign
旨在使编写 Java Http
客户端变得更容易。 前面使用
Ribbon + RestTemplate
时, 利用 RestTemplate
对 http
请求的封装处理, 形成了一套模板化的调用方法, 但是在实际开发中, 由于对微服务依赖的调用可能不止一处, 往往一个接口会被多处调用, 所以通常会针对每个微服务自行封装一些客户端类来包装这些以来服务的调用 所以, Fegin
在此基础上做了进一步封装, 由他来帮助我们定义和实现依赖服务接口的定义, 在 Feign
的实现下, 我们只需要创建一个接口并使用注解的方式来配置它 (以前是 Dao 接口上面标注 Mapper 注解, 现在是一个微服务接口上面标注一个,Feign 注解即可) 即可完成对微服务提供方的接口绑定, 简化了使用 SpringCloud Ribbon
,自动封装服务调用客户端的开发量 -
Feign
集成了 Ribbon
利用
Ribbon
维护了 Payment
的服务列表信息, 并且通过轮询实现类客户端的负载均衡, 而与 Ribbon
不同的是, 通过 feign
只需要定义服务绑定接口且以声明式的方法, 优雅而简单的实现了服务调用 -
Feign
与 openFeign
的区别 Feign
openFeign
Feign
是 SpringCloud
组件中的一个轻量级 RestFul
的 HTTP
服务客户端 Feign
内置了 Riboon
,用来做客户端负载均衡,去调用服务注册中心的服务, Feign
的使用方式是: 使用 Feign 的注解定义接口, 调用这个接口, 就可以调用服务注册中心的服务 openfeign
是 SpringCloud
在 Feign
的基础上支持了 SpringMVC
的注解, 如 @RequesMapping
等, openfeign
的 @FeignClient
可以解析 SpringMVC
的 @RequestMapping
注解下的接口, 并通过动态代理的方式产生实现类, 实现类中做负载均衡并调用其他服务 <dependency>
org.springframework.cloud
spring-cloud-starter-feign
<dependency>
org.springframework.cloud
spring-cloud-starter-openfeign
-
Feign
的使用步骤 -
引入依赖
1
2
3
4
5<!-- TODO: 1. 添加 openFeign 依赖 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency> -
主启动添加
@EnableFeignClients
注解 1
2
3
4
5
6
7
8
9
10
11
12
13
public class OrderApplication {
public static void main(String[] args) {
SpringApplication.run(OrderApplication.class, args);
}
public RestTemplate getRestTemplate() {
return new RestTemplate();
}
} -
编写
FeignClient
接口 1
2
3
4
5
6
7
8// TODO: 3. 创建远程调用接口 user-service8081=> 服务名称
(必须的)
public interface UserClient {
// private static final String URL = "http://userservice/user/"+order.getUserId();
User findById(; Long id)
} -
使用
FeignClient
中定义的方法代理 RestTemplate
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class OrderController {
private OrderService orderService;
private UserClient userClient;
// TODO: 4. 删除 RestTemplate
public Order queryOrderByUserId( { Long orderId)
// 根据id 查询订单并返回
Order order = orderService.queryOrderById(orderId);
// TODO: 5. 使用 feign 远程调用
User user = userClient.findById(order.getUserId());
order.setUser(user);
return order;
}
} -
访问测试
feign
实现远程调用 -
feign
实现了负载均衡 内部集成了 Ribbon
-
-
Feign
运行自定义配置来覆盖默认配置, 可以修改的配置如下 类型 作用 说明 feign。Logger.Level
修改日志级别 包含四种不同的级别 NONE BASIC HEADERS FULL
feign.codec.Decoder 响应结果的解析器 HTTP
远程调用的结果做解析, 例如解析 json
字符串为 java
对象 feign.codec.Encoder 请求参数编码 将请求参数编码, 便于通过 http
请求发送 feign.Contract 支持的注解格式 默认是 SpringMVC
的注解 feign.Retryer 失败的重试机制 请求失败的重试机制, 默认是没有, 不过会使用 Ribbon
的重试 一般只需要配置日志级别
-
配置日志有两种方式
-
方式一: 配置文件
-
全局生效
1
2
3
4
5
6# TODO: 实现日志配置的第一种方式
feign:
client:
config:
default: # 这里是用 default 就是全局配置,如果是写服务名称, 则是针对某个微服务的配置
loggerLevel: FULL # 日志级别 -
局部生效
1
2
3
4
5
6# TODO: 实现日志配置的第一种方式
feign:
client:
config:
userservice: # 这里是用 default 就是全局配置,如果是写服务名称, 则是针对某个微服务的配置
loggerLevel: FULL # 日志级别
-
-
配置方式二:
Java
代码方式, 需要先声明一个 Bean
1
2
3
4
5
6
7
8
9
10
11import feign.Logger;
import org.springframework.context.annotation.Bean;
// TODO: 创建 Bean 用于配置 feign 的日志
public class DefaultFeignConfiguration {
public Logger.Level level() {
return Logger.Level.BASIC;
}
}-
而后如果是全局配置,
则把它放到 @EnableFeignClients
这个注解中 ( 主启动
)1
2
3
4
5
6
7
8...
// defaultConfiguration = DefaultFeignConfiguration.class
public class OrderApplication {
public static void main(String[] args) {
SpringApplication.run(OrderApplication.class, args);
}
} -
如果是局部配置,
则把它放在 @FeignClient
这个注解中 ( 定义远程调用处
)1
2
3
4
5
6
7
8// TODO: configuration = DefaultFeignConfiguration.class
public interface UserClient {
// private static final String URL = "http://user-service8081/user/";
User findById(; Long id)
}
-
-
-
Feign
底层的客户端实现 URLConnection
: 默认实现,不支持连接池 Apache HttpClient
支持连接池 OKHttp
支持连接池
-
优化性能主要包括
- 使用连接池代替默认的
URLConnection
- 日志级别,
最好用 basic | none
- 使用连接池代替默认的
-
实现
-
添加依赖
1
2
3
4
5<!-- TODO: feign
性能优化 =>httpclient -->
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-httpclient</artifactId>
</dependency> -
配置
1
2
3
4
5
6# TODO: 配置连接池
feign:
httpclient:
enabled: true # 开启 feign 对 httpClient 的支持
max-connections: 200 # 最大连接数
max-connections-per-route: 50 # 每个路径的最大连接数
-
-
方式一:
通过继承的方式,
给消费者的 FeignClient
和提供者的 Controller
定义统一的父接口作为标准 继承 -
方式二
(抽取) 将
FeignClient
抽取为独立模块, 并且把接口有关的 POJO(实体类)
,默认的 Feign
配置都放到这个模块中, 提供给所有消费者使用 抽取 -
实现方式二的步骤
- 首先创建一个
model
,命名为feign-api(自定义,
,无要求名称) 然后引入 feign
的 starter
依赖 - 将
order-service
中编写的 UserClient,User,DefaultFeignConfiguraton
都复制到 feign-api
项目中 - 在
order-service
中引入 feign-api
依赖 - 修改
order-service
中的所有与上述组件有关的 import
部分, 改成导入 feign-api
中的包 - 重启测试
- 首先创建一个
-
当定义的
FeignClient
不在 SpringBootApplication
的扫描包范围时, 这些 FeignClient
无法使用, 两种解决方案 -
方式一: 指定
FeignClient
所在包 1
2
3// order-service: com.coderitl.springcloud 下的主启动类
// feign-api: com.coderitl.feign 不在 orderservice的主启动扫描包之下 -
指定
FeignClient
字节码 1
2// 推荐 => 用哪个加载那个
-
Gateway
-
引入
网关 -
网关功能
- 身份认证和权限校验
- 服务路由、负载均衡
- 请求限流
-
SpringCloud
中网关的实现有两种 gateway
zuul
Zuul
是基于 Servlet
的实现, 属于阻塞式编程, 而 SpringCloudGateway
则是基于 Spring5
中提供的 WebFlux
,属于响应式编程的实现,具备更好的性能
-
创建新的
model
,引入SpringCloudGateway
的依赖和 nacos
的服务发现依赖 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17<!-- TODO: 1. 添加网关所需依赖 -->
<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-loadbalancer</artifactId>
</dependency>
</dependencies> -
编写路由配置以及
nacos
地址 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22# TODO: 编写路由以及 nacos 地址
(网关也需要注册到注册中心)
server:
port: 10010
spring:
application:
name: gateway # 服务名称
cloud:
nacos:
discovery:
server-addr: localhost:8848 # nacos 地址
gateway:
routes: # 网关路由配置
- id: order-service # 路由 id,自定义, 只要唯一即可
uri: lb://order-service8088 # 路由的目标地址 lb:LoadBalanced(负载均衡) 后面跟服务名称
predicates:
- Path=/order/** # 这个是按照路径匹配 只要一 order 开头就符合
- id: user-service # 路由 id,自定义, 只要唯一即可
# uri: http://127.0.0.1:8081 路由的目标地址 http 就是固定地址
uri: lb://userservice # 路由的目标地址 lb:LoadBalanced(负载均衡) 后面跟服务名称
predicates:
- Path=/user/** # 这个是按照路径匹配 只要一 user 开头就符合 -
访问测试
通过网关成功访问服务 -
流程
业务成功访问经过流程
-
网关路由可以配置的内容包括
- 路由
id
: 路由唯一标识 uri
: 路由目的地,支持 lb 和 http
两种 predicatest
:路由断言,判断请求是否符合要求, 符合则转发到路由目的地 filters
:路由过滤器,处理请求或响应
- 路由
-
路由断言工厂
我们在配置文件中写的断言规则只是字符串,
这些字符串会被 Predicate Factory
读取并处理, 转变为路由判断的条件 -
例如:
Path=/user/**
是按照路径匹配,这个规则是由 org.springframework.cloud.gateway.handler.predicate.PathRoutePredicateFactory
类来处理的 -
像这样的断言工厂在
SpringCloudGateway
还有11
个 名称 说明 实例 After
是某个时间点之后的请求 - After=2017-01-20T17:42:47.789-07:00[America/Denver]
Before
是某个时间点之前的请求 - Before=2017-01-20T17:42:47.789-07:00[America/Denver]
Between
是某两个时间点之前的请求 - Between=2017-01-20T17:42:47.789-07:00[America/Denver], 2017-01-21T17:42:47.789-07:00[America/Denver]
Cookie
请求必须包含某些 cookie
- Cookie=chocolate, ch.p
Header
请求必须包含某些 header
- Header=X-Request-Id, \d+
Host
请求必须是访问某个 host(域名)
- Host=**.somehost.org,**.anotherhost.org
Method
请求方式必须是指定方式 - Method=GET,POST
Path
请求路径必须符合指定规则 - Path=/red/{segment},/blue/{segment}
Query
请求参数必须包含指定参数 - Query=green
RemoteAddr
请求者的 ip
必须是指定范围 - RemoteAddr=192.168.1.1/24
Weight
权重处理 - Weight=group1, 2
-
总结
-
PredicateFactory
的作用是什么 读取用户定义的断言条件,
对请求做出判断 -
Path=/user/**
是什么含义 路径是以
/user
开头的就认为是符合的
-
-
-
过滤器
GatewayFilter
是网关中提供的一种过滤器, 可以对进入网关的请求和微服务返回的响应做处理 过滤器 -
过滤器工厂
Spring
提供了 31
种不同的路由过滤器工厂 名称 说明 AddRequestHeader
给当前请求添加一个请求头 RemoveRequestHeader
移除请求种的一个请求头 AddResponseHeader
给响应结果中添加一个响应头 RemoveResponseHeader
从响应结果中移除一个响应头 RequestRateLimiter
限制请求的流量 ...
-
局部过滤器配置
1
2filters:
- AddRequestHeader=X-Request-red, blue -
默认配置
1
2default-filters: # 默认过滤器 会对所有的路由请求都生效
- AddRequestHeader=X-Request-red, blue -
全局过滤器
全局过滤器的作用也是处理一切进入网关的请求和微服务响应,
与 GatewayFilter
的作用一样
区别在于GatewayFilter
通过配置定义,处理逻辑是固定的, 而 GlobalFilter
的逻辑需要自己写代码实现
定义方式是实现GlobalFilter
接口1
2
3
4
5
6
7
8
9/**
* 处理当前请求,有必要的话通过 {@link GatewayFilterChain} 将请求交给下一个过滤器处理
* @param exchange 请求上下文,里面可以获取 Request Response 等信息
* @param chain 用来把请求委托给下一个过滤器
* @return {@code Mono} 返回表示当前过滤器业务结束
*/
public interface GlobalFilter {
Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain);
} -
案例实现
需求: 定义全局过滤器,
拦截请求, 判断请求的参数是否满足下面条件 -
参数中是否有
authorization
-
authorization
参数值是否为 admin
如果同时满足则放行,
否则拦截 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31// 加载顺序由两种实现: 注解 | 实现接口 Ordered (两者选其一) 一定要由顺序
public class AuthorizeFilter implements GlobalFilter, Ordered {
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// 1. 获取请求参数
ServerHttpRequest request = exchange.getRequest();
MultiValueMap<String, String> params = request.getQueryParams();
// 2. 获取参数中的 authorization 参数
String auth = params.getFirst("authorization");
// 3. 判断参数值是否等于 admin
if ("admin".equals(auth)) {
// 4. 是 放行
return chain.filter(exchange);
}
// 5. 否 拦截
// 优化体验:
// 5.1 设置状态码
exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
// 5.2 拦截请求
return exchange.getResponse().setComplete();
}
// Ordered 方法重写
public int getOrder() {
return -1;
}
}过滤器
-
-
过滤器的执行顺序
请求进入网关后会碰到三类过滤器: 当前路由的过滤器,
DefaultFilter GlobalFilter
请求路由后,
会将当前路由过滤器和 DefaultFilter GlobalFilter
合并到一个过滤器链 (集合) 中, 排序后依次执行每个过滤器 过滤器的执行顺序 - 执行顺序
- 每一个过滤器都必须指定一个
int
类型的 order
值, order
值越小, 优先级越高, 执行顺序越靠前 GlobalFilter
通过实现 Ordered
接口, 或者添加 @Order
注解来指定 order
值 - 路由过滤器和
defaultFiltert
的 order
由 Spring
指定, 默认是按照声明顺序从 1
递增 (声明也就是书写出现的顺序) - 当过滤器的
order
值一样时, 会按照 defaultFilter > 路由过滤器链 > GlobalFilter
的顺序执行
- 每一个过滤器都必须指定一个
- 执行顺序
-
跨域问题处理
跨域: 域名不一致就是跨域,
主要包括 - 域名不同:
www.taobap.com 和 www.taobao.org 等
- 域名相同,
端口不同: localhost:8080 | localhost:8081
跨域问题: 浏览器禁止请求的发起者与服务端发生跨域
ajax
请求, 请求被浏览器拦截的问题 - 域名不同:
-
配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44# TODO: 编写路由以及 nacos 地址
(网关也需要注册到注册中心)
server:
port: 10010
spring:
application:
name: gateway # 服务名称
cloud:
nacos:
discovery:
server-addr: localhost:8848 # nacos 地址
gateway:
routes: # 网关路由配置
- id: order-service # 路由 id,自定义, 只要唯一即可
uri: lb://order-service8088 # 路由的目标地址 lb:LoadBalanced(负载均衡) 后面跟服务名称
predicates:
- Path=/order/** # 这个是按照路径匹配 只要一 order 开头就符合
- id: user-service # 路由 id,自定义, 只要唯一即可
# uri: http://127.0.0.1:8081 路由的目标地址 http 就是固定地址
uri: lb://userservice # 路由的目标地址 lb:LoadBalanced(负载均衡) 后面跟服务名称
predicates:
- Path=/user/** # 这个是按照路径匹配 只要一 user 开头就符合
filters:
- AddRequestHeader=Truth,coder-itl is Student....
default-filters:
- AddRequestHeader=Truth,coder-itl is Teacher....
globalcors: # 全局跨域处理
add-to-simple-url-handler-mapping: true # 解决 options 请求被拦截问题
cors-configurations:
'[/**]':
allowedOrigins: # 允许那些网站跨域请求
- "http://localhost:8089"
- "http://baidu.com"
allowedMethods:
- "GET"
- "POST"
- "DELETE"
- "PUT"
- "OPTIONS"
allowedHeaders:
- "*" # 允许请求中携带的头信息
allowCredentials: true # 是否允许携带 cookie
maxAge: 360000 # 这次跨域检测的有效期,在有效期内直接放行,减轻服务器压力
Docker
Docker
和虚拟机的差异 docker
是一个系统进程, 虚拟机是在操作系统中的操作系统 docker
体积小, 启动速度快, 性能好, 虚拟机体积大, 启动速度慢, 性能一般
- 镜像和容器
- 镜像
( Image
):Docker
将应用程序及器所需的依赖、函数库、环境、配置文件打包在一起, 称为镜像 - 容器
( Container
): 镜像中的应用程序运行后形成的进程就是容器,只是 Docker
会给容器做隔离, 对外不可见
- 镜像
-
基于
Centos7
操作 -
卸载
(如果之前安装过旧版本的 Docker
)1
2
3
4
5
6
7
8
9
10
11yum remove -y docker \
docker-client \
docker-client-latest \
docker-common \
docker-latest \
docker-latest-logrotate \
docker-logrotate \
docker-selinux \
docker-engine-selinux \
docker-engine \
docker-ce -
安装
docker
-
安装
yum
工具 1
2
3yum install -y yum-utils \
device-mapper-persistent-data \
lvm2 --skip-broken -
更新本地镜像源
1
2
3
4
5
6
7yum-config-manager \
--add-repo \
https://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
sed -i 's/download.docker.com/mirrors.aliyun.com\/docker-ce/g' /etc/yum.repos.d/docker-ce.repo
yum makecache fast -
安装
docker
1
2# docker-ce 为社区免费版
yum install -y docker-ce -
关闭防火墙
1
2
3
4# 关闭
systemctl stop firewalld
# 禁止开启启动防火墙
systemctl disable firewalld -
启动
docker
1
systemctl start docker
-
检查是否启动
1
2
3systemctl status docker
docker -v启动成功 -
docker
相关命令 1
2
3
4
5
6# 启动 docker 服务
systemctl start docker
# 停止 docker 服务
systemctl stop docker
# 重启 docker 服务
systemctl restart docker -
配置镜像加速
1
2
3
4
5
6
7
8sudo mkdir -p /etc/docker
sudo tee /etc/docker/daemon.json <<-'EOF'
{
"registry-mirrors": ["https://eyy45bvx.mirror.aliyuncs.com"]
}
EOF
sudo systemctl daemon-reload
sudo systemctl restart docker
-
-
Docker
操作 -
基本操作
基本操作 -
利用
docker save
将 nginx
镜像导出磁盘, 然后再通过 load
加载回来 1
2# 导出
docker save -o nginx.tar nginx:latest实现 -
删除镜像
1
docker rmi nginx:latest
-
加载镜像
1
docker load -i nginx.tar
加载镜像
-
-
容器相关操作
容器操作 -
创建运行一个
Nginx
容器 1
2
3
4
5
6
7docker run --name containerName -p 80:80 -d nginx
docker run: 创建并运行让其
--name: 给容器起一个名字
-p: 将宿主机端口与容器端口映射,冒号左侧是宿主机端口, 右侧是容器端口
-d: 后台运行
nginx: 镜像名称
-
-
数据卷
1
2
3
4
5
6
7
8
9
10docker volume [COMMAND]
[COMMAND]:
create: 创建一个 volume
inspect: 显示一个或多个 volume 的信息
ls: 列出所有的 volume
prune: 删除未使用的 volume
rm: 删除一个或多个指定的 volume数据卷操作
-
什么是
DockerFile
Dockerfile
就是一个文本文件, 其中包含一个个的指令, 用指令来说明要执行什么操作来构建镜像, 每一个指令都会形成一层 Layer
指令 | 说明 | 实例 |
---|---|---|
FROM |
指定基础镜像 | FROM centos:7 |
ENV |
设置环境变量,可在后面指令使用 | ENV key value |
COPY |
拷贝本地文件到镜像的指定目录 | COPY ./mysql-5.7.rpm /tmp |
RUN |
执行Linux
shell
|
RUN yum install gcc |
EXPOSE |
指定容器运行时监听的端口, |
EXPOSE 8080 |
ENTRYPOINT |
镜像中应用的启动命令, |
ENTRYPOINT java -jar xx.jar |
-
DockerCompose
是什么DockerCompose
可以基于Componse
文件帮助我们快速的部署分布式应用, 而无需手动一个个创建和运行容器 Compose
文件是一个文本文件,通过指令定义集群中的每个容器如何运行 -
下载
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16# (github)
下载 更换其他版本只需要更换 v2.2.2
sudo curl -L "https://github.com/docker/compose/releases/download/v2.2.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
# 高速安装
curl -L https://get.daocloud.io/docker/compose/releases/download/v2.4.1/docker-compose-`uname -s`-`uname -m` > /usr/local/bin/docker-compose
# 修改权限
sudo chmod +x /usr/local/bin/docker-compose
# 创建软连接
sudo ln -s /usr/local/bin/docker-compose /usr/bin/docker-compose
# 检测是否安装
docker-compose --version
# 对于 alpine,需要以下依赖包: py-pip,python-dev,libffi-dev,openssl-dev,gcc,libc-dev,和 make。
-
Base
自动补全命令 1
2curl -L https://raw.githubusercontent.com/docker/compose/1.29.1/contrib/comption/bash/docker-compose > /etc/bash_completion.d/docker-compose
-
出错
1
echo "199.232.68.133 raw.githubusercontent.com" >> /etc/hosts
-
RabbitMQ
-
详细使用
-
同步调用存在的问题
- 耦合度高
- 性能下降
- 资源浪费
- 级联失败
-
异步调用
异步调用常见实现就是
事件驱动
事件代理 Broker
- 异步通信的优点
- 耦合度低
- 吞吐量提升
- 故障隔离
- 流量削峰
- 异步通信的缺点
- 依赖于
Broker
的可靠性, 安全性、吞吐能力 - 架构复杂了,
业务没有明显的流程线, 不好追踪管理
- 依赖于
- 异步通信的优点
-
什么是
MQ
MQ(MessageQueue)
,是在消息的传输过程中保存消息的容器。多用于分布式系统之间进 行通信。RabbitMQ
ActiveMQ
RocketMQ
Kafka
公司 / 社区 Rabbit
Apache
阿里 Apache
开发语言 Erlang
Java
高 Scala & Java
协议支持 AMQP,XMPP,SMTP,STOMP
OpenWrie,STOMP,REST,XMPP,AMQP
自定义协议 自定义协议 可用性 高 高 高 高 单机吞吐量 一般 差 高 非常高 消息延迟 微秒级 毫秒级 毫秒级 毫秒级以内 消息可靠性 高 一般 高 一般 -
使用条件
-
生产者不需要从消费者处获得反馈。引入消息队列之前的直接调用,其接口的返回值应该为空,这才让明 明下层的动作还没做,上层却当成动作做完了继续往后走,即所谓异步成为了可能。
-
容许短暂的不一致性。
-
确实是用了有效果。即解耦、提速、削峰这些方面的收益,超过加入
MQ,管理 MQ 这些成本。
-
离线安装
rabbitmq:3-management
:https://pan.baidu.com/s/1YUkVIyE7URpGfu-ZR2PpDQ?pwd=12341
2# 后缀名为 tar 的 docker 文件请勿解压
docker load -i xxx.tar -
拉取
1
2# 拉取 rabbitmq:3-management[tag]
docker pull rabbitmq:3-management搜索 rabbitmq
选择带有 management,management
的带有 web
管理页面 -
安装
1
2
3
4
5
6
7
8
9
10
11
12
13docker run \
-e RABBITMQ_DEFAULT_USER=root \
-e RABBITMQ_DEFAULT_PASS=root \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management
# 参数解释
-p 15672:15672 Web界面
-p 5672:5672 消息通信-
通过端口访问
通过端口访问 -
概念
channel
:操作MQ
的工具 exchange
:路由消息到队列中 queue
: 缓存消息virtual host
: 虚拟主机,是对 queue,exchange
等资源的逻辑分组
-
-
AMQP
Advanced Message Queving Protocol
是用于再应用程序或之间传递业务消息的开放标准, 该协议与语言和平台无关, 更符合微服务中独立性的要求 -
Spring AMQP
Spring AMQP
是基于 AMQP
协议定义的一套 API
规范, 提供了模块来发送和接受消息, 包含两部分, 其中 spring-amqp
是基础抽象, spring-rabbit
是底层的默认实现 - 特征
- 侦听器容器,
用于异步处理入站消息 - 用于发送和接受消息的
RabbitTemplate
RabbitAdmin
用于自动声明队列, 交换和绑定
- 侦听器容器,
- 特征
-
添加用户
添加用户 -
添加虚拟环境
添加虚拟环境 -
创建一个队列,
名称为 simple.queue(*)
创建一个队列, 名称为 simple.queue(*)
删除一个队列 -
测试
-
创建聚合工程
( 父工程: SpringBoot 项目
)1
2
3
4<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency> -
创建两个模块(层级代表在谁下创建)
-
publisher 发布消息
1
2
3
4
5
6
7spring:
rabbitmq:
host: 192.168.247.130 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: root
password: root-
创建测试
( 启动类依然需要创建,
)由于使用到 Configuration 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24package com.example.test;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
public class SpringAmqpTest {
private RabbitTemplate rabbitTemplate;
public void testSimpleQueue1() {
// 这个simple 必须已存在才可用
String queueName = "simple.queue";
String message = "hello,spring amqp";
rabbitTemplate.convertAndSend(queueName, message);
}
}队列不存在时出现声明错误 显示查看 -
查看发送的
message
查看发送的 message
-
-
consumer
1
# 和 publish 配置一样 都需要知道 MQ 信息
1
2
3
4
5
6
7
8
9
10
11// 只是一个类
public class SpringRabbitListerner {
// 方法参数就是消息
public void listenSimpleQueueMessage(String msg) {
System.out.println("spring 消费者收到的消息是 :[" + msg + "]");
}
}
// 启动主启动类即可查阅到消息信息
// 消息一旦被消费就会从队列删除,RabbitMQ没有消息回溯功能 查阅
-
-
-
consumer
的 yml
1
2
3
4
5
6
7
8
9
10
11
12spring:
rabbitmq:
host: 192.168.247.129 # MQ 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: root
password: root
# 消费预取限制
listener:
simple:
prefetch: 1 # 设置 prefetch 这个值,可以控制预取消息的上限 1: 每次只能获取一条, 处理完才能获取下一个消息 -
Work Queue
Work Queue
工作队列, 可以提高消息处理速度, 避免队列消息堆积 Work Queue
-
模拟
Work Queue
实现一个队列绑定多个消费者 -
基本思路
-
在
publisher
服务中定义测试方法, 每秒产生 50
条消息, 发送到 simple.queue
1
2
3
4
5
6
7
8
9
public void testWorkQueue() throws InterruptedException {
String queueName = "simple.queue";
String message = "hello,spring amqp";
for (int i = 1; i <= 50; i++) {
rabbitTemplate.convertAndSend(queueName, message + i);
Thread.sleep(20);
}
} -
在
consumer
服务中定义两个消息监听者, 都监听 simple.queue
1
2
3
4
5
6
7
8
9
10
11
public void listenWorkQueueMessage1(String msg) throws InterruptedException {
System.out.println("spring 消费者1 收到的消息是 :[" + msg + "]" + LocalTime.now());
Thread.sleep(20);
}
public void listenWorkQueueMessage2(String msg) throws InterruptedException {
System.err.println("spring 消费者2 收到的消息是.......... :[" + msg + "]" + LocalTime.now());
Thread.sleep(200);
} -
消费者
1
每秒处理50
条消息, 消费者 2
每秒处理 10
条消息 处理结果
-
-
-
发布订阅
发布订阅模式之间与之前案例的区别就是允许将同一消息发送给多个消费者,
实现方式就是加入了 exchange(交换机)
模型 -
FanoutExchange
-
利用
SpringAMQP
演示 FanoutExchange
的使用 交换机的使用 -
实现思路
-
在
consumer
服务中, 利用代码声明队列, 交换机, 并将两者绑定 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43// 在 consumer 服务创建一个类,
添加 @Configuration 注解, 并声明 FanoutExchange、Queue 和绑定关系对象 Binding
package com.example.consumer.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
public class FanoutConfig {
// 声明 FanoutExchange 交换机
public FanoutExchange fanoutExchange() {
return new FanoutExchange("coderitl.fanout");
}
// 声明第一个队列
public Queue fanoutQueue1() {
return new Queue("fanout.queue1");
}
// 声明第二个队列
public Queue fanoutQueue2() {
return new Queue("fanout.queue2");
}
// 绑定队列 1 和交换机
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
// 绑定队列 2 和交换机
public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
} -
在
consumer
服务中, 编写两个消费者, 分别监听 fanout.queue1 和 fanout.queue2
1
2
3
4
5
6
7
8
9// 消费者
public void listenFanoutQueue1(String msg) {
System.out.println("spring 消费者收到fanout.queue1 消息是 :[" + msg + "]");
}
public void listenFanoutQueue2(String msg) {
System.out.println("spring 消费者收到fanout.queue2 消息是 :[" + msg + "]");
} -
在
publisher
中编写测试方法, 向 coderitl.fanout
发送消息 1
2
3
4
5
6
7
8
9
public void testSendFanoutExchange(){
// 交换机名称
String exchangeName = "coderitl.fanout";
// 消息
String message = "hello every one!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName,"",message);
}
-
-
SpringAMQP
提供了声明交换机、队列、绑定关系的 API
绑定关系 -
总结
- 交换机的作用是什么
- 接受
publisher
发送的消息 - 将消息按照规则路由到与之绑定的队列
- 不能缓存消息,
路由失败, 消息丢失 FanoutExchange
会将消息路由到每个绑定的队列 1
- 接受
- 声明队列、交换机、绑定关系的
Bean
是什么 Queue
FanoutExchange
Binding
- 交换机的作用是什么
-
-
-
-
模型
- 每一个
Queue
斗鱼接收到的消息根据规则路由到指定的 Queue
,因此称为路由模式( routes
) - 发布者发送消息时,
指定消息的 RoutingKey
Exchange
将消息路由到 BindingKey
与消息 RoutingKey
Exchange
将消息路由到 BindingKey
与消息 RoutingKey
一致的队列
模型 - 每一个
-
利用
SpringAMQP
演示 DireExchange
的使用 -
实现思路
-
利用
@RabbitListener
声明 Exchange,Queue,RoutingKey
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27package com.example.consumer.listen;
...
public class SpringRabbitListerner {
// 消费者
public void listenDirectQueue1(String msg) {
System.out.println("spring 消费者收到direct.queue1 消息是 :[" + msg + "]");
}
// 消费者
public void listenDirectQueue2(String msg) {
System.out.println("spring 消费者收到direct.queue2 消息是 :[" + msg + "]");
}
} -
在
Consumer
服务中, 编写两个消费者方法, 分别监听 direct.queue1
和 direct.queue2
1
2
3
4
5public void listenDirectQueue1(String msg) {
System.out.println("spring 消费者收到direct.queue1 消息是 :[" + msg + "]");
}
... -
在
publisher
中编写测试方法, 向 coderitl.direct
发送消息 1
2
3
4
5
6
7
8
9
public void testSendDirectExchange(){
// 交换机名称
String exchangeName = "coderitl.direct";
// 消息
String message = "hello blue!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName,"blue",message);
}查阅, 如果结果未出现, 清理缓存, 重新构建,再次运行
-
-
-
总结
- 描述
Direct
交换机与 Fanout
交换机的差异 Fanout
交换机将消息路由给每一个与之绑定的队列 Direct
交换机根据 RoutingKey
判断路由给那个队列 - 如果多个队列具有相同的
RoutingKey
,则与 Fanout
功能类似
- 基于
@RabbitListener
注解声明队列和交换机有那些常见注解 @Queue
@Exchange
- 描述
-
Queue
与 Exchange
指定 BindingKey
时可以使用通配符 #
: 代指0
个或分多个单词 *
:代指一个单词
-
模型
-
案例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19// 消费者
public void listenTopicQueue1(String msg) {
System.out.println("spring 消费者收到topic.queue1 消息是 :[" + msg + "]");
}
// 消费者
public void listenTopicQueue2(String msg) {
System.out.println("spring 消费者收到topic.queue2 消息是 :[" + msg + "]");
}1
2
3
4
5
6
7
8
9
public void testSendTopicExchange() throws InterruptedException {
// 交换机名称 coderitl.topic
String exchangeName = "coderitl.topic";
// 消息
String message = "Hello coderitl topic!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName,"china.news",message);
}测试
-
消息转换器
Spring
对消息对象的处理方式是由 org.springframework.amqp.support.converter.MessageConverter
来处理的, 而默认实现是 SimpleMessageConverter
,基于JDK
的 ObjectOutputStream
完成序列化 -
修改
MessageConverter
,推荐 JSON
方式序列化 -
实现步骤
-
在
publisher
服务引入依赖 1
2
3
4
5<!-- 如果引入了 web 的依赖则不需要引入下方依赖 -->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
</dependency> -
在
publisher
服务声明 MessageConverter
1
2
3
4
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
} -
发送
1
2
3
4
5
6
7
8
9
public void testSendObjectMessage() throws InterruptedException {
// 消息
Map<String, Object> message = new HashMap<>();
message.put("name", "coder-itl");
message.put("age", "18");
// 发送消息
rabbitTemplate.convertAndSend( "object.queue", message);
} -
接受
1
2
3
4
5
6// 创建队列
public Queue objectQueue() {
return new Queue("object.queue");
}1
2
3
4
5// 需要配置 转换类型
public void listenObjectQueue(Map<String, Object> msg){
System.out.println("msg = " + msg);
}JSON
序列化
-
-
-
消息可靠性
- 消息从生产者发送到
exchange
,再到queue
,再到消费者,有哪些导致消息丢失的可能性? - 发送时丢失
- 生产者发送的消息未送达
exchange
- 消息到达
exchange
后未到达 queue
- 生产者发送的消息未送达
MQ
宕机, queue
将消息丢失 consumer
接收到消息后未消费就宕机
- 发送时丢失
- 消息从生产者发送到
-
生产者确认机制
RabbitMQ 提供了
publisher confirm
机制来避免发送到 MQ
过程中丢失。消息发送到 MQ
以后, 会返回一个结果给发送者, 表示消息是否处理成功。结果有两种请求 -
publisher-confirm
,发送者确认- 消息成功投递到交换机,
返回 ack
- 消息未投递到交换机,
返回 nack
- 消息成功投递到交换机,
-
publisher-return
,发送者回执- 消息投递到交换机了,
但是没有路由到队列。返回 ACK
,以及路由失败原因
生产者确认机制 注意:确认机制发送消息时,
需要给每个消息设置一个全局唯一 id
,以区分不同消息,避免 ack
冲突 - 消息投递到交换机了,
-
-
SpringAMQP
消息可靠性的具体实现步骤 -
在
生产者
这个微服务的application.yml
中添加配置 1
2
3
4
5
6
7
8
9
10
11
12# 生产者端
spring:
rabbitmq:
host: 192.168.2.3 # 主机名
port: 5672 # 端口
virtual-host: /amqp # 虚拟主机
username: root
password: root
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true- 配置说明
publisher-confirm-type
: 开启publisher-confirm
,这里支持两种类型simple
: 同步等待confirm
结果, 指导超时 correlated【英 [ˈkɒrəleɪtɪd] 】
: 异步回调,定义 ConfirmCallback
,MQ
返回结果时会回调这个 ConfirmCallback
publisher-returns
: 开启publisher-return
功能, 同样是基于 callback
机制, 不过定义 ReturnCallback
template.mandatory【英 [ˈmændətəri] 】
: 定义消息路由失败时的策略。true
,则调用ReturnCallback
,false
: 则直接丢弃消息
- 配置说明
-
每个
RabbitTemplate
只能配置一个 ReturnCallback
,因此需要在项目启动过程中配置 1
2
3
4
5
6
7
8
9
10
11
12
13
14// 生产者处
public class CommonConfig implements ApplicationContextAware {
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 获取 RabbitTemplate
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// TODO:设置 ReturnCallback? ReturnsCallback
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("消息发送失败,应答码: {}, 原因:{}, 交换机:{}, 路由键:{}, 消息:{}", replyCode, replyText, exchange, routingKey, message.toString());
});
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68// 等同于上述
public class CommonConfig implements ApplicationContextAware {
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 获取 RabbitTemplate
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// TODO:设置 ReturnCallback(旧)? ReturnsCallback(新)
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
com.example.publisher.config;
// 等同于上述
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;
public class CommonConfig implements ApplicationContextAware {
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 获取 RabbitTemplate
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// TODO:设置 ReturnCallback(旧)? ReturnsCallback(新)
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
public void returnedMessage(ReturnedMessage returnedMessage) {
/**
* public class ReturnedMessage {
* private final Message message;
* private final int replyCode;
* private final String replyText;
* private final String exchange;
* private final String routingKey;
* }
*/
// 判断是否是延迟消息
if (returnedMessage.getMessage().getMessageProperties().getReceivedDelay() > 0) {
// 是一个延迟消息,忽略这个错误提示
return;
}
// 记录日志
log.info("消息发送失败,应答码: {}, 原因:{}, 交换机:{}, 路由键:{}, 消息:{}", returnedMessage.getReplyCode(), returnedMessage.getReplyText(), returnedMessage.getExchange(), returnedMessage.getRoutingKey(), returnedMessage.getMessage().toString());
// 如果有需要,可以重发消息
}
});
}
}de
public void returnedMessage(ReturnedMessage returnedMessage) {
/**
* public class ReturnedMessage {
* private final Message message;
* private final int replyCode;
* private final String replyText;
* private final String exchange;
* private final String routingKey;
* }
*/
log.info("消息发送失败,应答码: {}, 原因:{}, 交换机:{}, 路由键:{}, 消息:{}", returnedMessage.getReplyCode(), returnedMessage.getReplyText(), returnedMessage.getExchange(), returnedMessage.getRoutingKey(), returnedMessage.getMessage().toString());
}
});
}
} -
发送消息,
指定消息 ID
,消息ConfirmCallback
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class PublisherApplicationTests {
private RabbitTemplate rabbitTemplate;
void name() {
// 消息体
String message = "hello,spring amqp!";
// 消息 id,需要封装到 CorrelationData 中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 添加 callback
correlationData.getFuture().addCallback(result -> {
if (result.isAck()) {
// ack 消息成功
log.debug("消息发送成功,ID:{}", correlationData.getId());
} else {
// nack 消息失败
log.error("消息发送失败,ID:{},原因:{}", correlationData.getId(), result.getReason());
}
}, ex -> {
log.error("消息发送异常,ID:{},原因:{}", correlationData.getId(), ex.getMessage());
});
// 发送消息
rabbitTemplate.convertAndSend("amq.direct", "simple", message, correlationData);
}
}
-
-
消息持久化
MQ
默认是内存存储消息, 开启持久化功能可以确保缓存在 MQ
中的消息不丢失 -
交换机持久化
1
2
3
4
5
6
7
8
9
10// 交换机的持久化: 消费者配置中声明
public DirectExchange directExchange() {
/**
* name: 交换机名称
* durable: 是否持久化,true: 开启持久化
* autoDelete: false 是否自动删除,一般是 false
*/
return new DirectExchange("direct.exchange", true, false);
} -
队列持久化
1
2
3
4
5
6// 队列的持久化: 消费者配置中声明
public Queue directQueue() {
// 使用 QueueBuilder构建队列,durable 就是持久化的
return QueueBuilder.durable("direct.queue").build();
} -
消息持久化,
SpringAMQP
中的消息默认是持久化的, 可以通过 MessageProperties
中的 DeliverMode
来指定 1
2
3
4
5
6
7
void contextLoads() {
String message = "数据内容....";
Message msg = MessageBuilder.withBody(message.getBytes(StandardCharsets.UTF_8)) // 消息体
.setDeliveryMode(MessageDeliveryMode.PERSISTENT) // 持久化
.build();
}
-
-
消费者消息确认
RabbitMQ
支持消费者确认机制, 即: 消费者处理消息后可以向 MQ
发送 ack
回执, MQ
收到 ack
回执后才会删除该消息, 而 SpringAMQP
则允许配置三种确认模式 manual
: 手动ack
,需要在业务代码结束后,调用 api
发送 ack
auto
: 自动ack
,由 spring
监测 listener
代码是否出现异常, 没有异常则返回 ack
,抛出异常则返回 nack
none
: 关闭ack
,MQ
假定消费者获取消息后会成功处理, 因此消息投递后 立即被删除
-
实现步骤
-
修改消费者的配置文件,
添加如下配置 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17logging:
pattern:
dateformat: HH:mm:ss:SSS
level:
com.example: debug
spring:
rabbitmq:
host: 192.168.2.3
port: 5672
virtual-host: /amqp
username: root
password: root
listener:
simple:
prefetch:
direct:
acknowledge-mode: manual -
失败重试机制
当消费者出现异常后,
消息会不断的 requeue【重新入队】
到队列,在重新发送给消费者, 然后再次异常, 再次 requeue
,无限循环,导致 MQ
的消息处理飙升, 带来不必要的压力 我们可以利用
Spring
的 retry
机制, 在消费者出现异常时利用本地重试, 而不是无限制的 requeue
到 mq
队列 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19# 消费者端
spring:
rabbitmq:
host: 192.168.2.3
port: 5672
virtual-host: /amqp
username: root
password: root
listener:
simple:
prefetch: 1
retry:
enabled: true # 开启消费失败重试
initial-interval: 1000 # 初始的失败等待时长为 1s
multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # TODO: true: 无状态,false: 有状态,如果业务中包含事务, 这里改为 false
direct:
acknowledge-mode: auto1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16# https://www.toyaml.com/index.html 在线转换 yml to Properties
logging.pattern.dateformat=HH:mm:ss:SSS
logging.level.com.example=debug
spring.rabbitmq.host=192.168.2.3
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/amqp
spring.rabbitmq.username=root
spring.rabbitmq.password=root
spring.rabbitmq.listener.simple.prefetch=1
spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.retry.initial-interval=1000
spring.rabbitmq.listener.simple.retry.multiplier=1
spring.rabbitmq.listener.simple.retry.max-attempts=3
spring.rabbitmq.listener.simple.retry.stateless=true
spring.rabbitmq.listener.direct.acknowledge-mode=auto层级目录
-
-
消费者失败消息处理策略
在开启重试模式后,
重试次数耗尽, 如果消息依然失败, 则需要有 messageRecoverer
接口处理,它包含三种不同的实现 -
RejectAndDontRequeueRecoverer
: 重试耗尽后,直接 reject
,丢弃消息。默认就是这种方式 -
immediateRequeueMessageRecoverer
: 重试耗尽后,返回 nack
,消息重新入队 -
RepublishMessageRecoverer【推荐】
: 重试耗尽后,将失败的消息投递到指定的交换机 模型 -
实现
-
首先,定义接受失败消息的交换机、队列以及绑定关系
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58// 消费者处
package com.example.consumer.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
public class CommonConfig {
/**************************** 成功的交换机与队列以及绑定关系 *************************/
// 交换机的持久化
public DirectExchange directExchange() {
/**
* name: 交换机名称
* durable: 是否持久化,true: 开启持久化
* autoDelete: false 是否自动删除,一般是 false
*/
return new DirectExchange("direct.exchange", true, false);
}
// 队列的持久化
public Queue directQueue() {
// 使用 QueueBuilder构建队列,durable 就是持久化的
return QueueBuilder.durable("direct.queue").build();
}
public Binding queueToExchange(Queue directQueue, DirectExchange directExchange) {
return BindingBuilder.bind(directQueue).to(directExchange).with("simple.test");
}
/******************** 失败的交换机与队列以及绑定关 ******************************/
public DirectExchange errorDirectExchange() {
/**
* name: 交换机名称
* durable: 是否持久化,true: 开启持久化
* autoDelete: false 是否自动删除,一般是 false
*/
return new DirectExchange("error.direct.exchange", true, false);
}
public Queue errorDirectQueue() {
// 使用 QueueBuilder构建队列,durable 就是持久化的
return QueueBuilder.durable("error.direct.queue").build();
}
public Binding queueToExchangeError(Queue errorDirectQueue, DirectExchange errorDirectExchange) {
return BindingBuilder.bind(errorDirectQueue).to(errorDirectExchange).with("error");
}
} -
定义
RepublishMessageRecoverer
1
2
3
4
5
6
7
8
9
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
/**
* rabbitTemplate: 使用 rabbitTemplate 重发
* errorExchange: 发送的交换机
* errorRoutingKey: 以键发送
*/
return new RepublishMessageRecoverer(rabbitTemplate,"error.direct.exchange","error");
}查看输出
-
-
-
-
总结
如可确保
RabbitMQ
消息的可靠性? - 开启生产者确认机制,
确保生产者的消息能到达队列 - 开启持久化功能,
确保消息未消费前在队列中不会丢失 - 开启消费者确认机制为
auto
,由spring
确认消息处理成功后完成 ack
- 开启消费者失败重试机制,
并设置 MeessageRecoverer
,多次重试失败后将消息投递到异常交换机, 交由人工处理
- 开启生产者确认机制,
-
初始死信交换机
当一个队列中的消息满足下列情况之一时,
可以称之为 死信【dead letter】
- 消费者使用
basic.reject
或 basic.nack
声明消费失败, 并且消息的 requeue
参数设置为 false
- 消息是一个过期消息,
超时无人消费 - 要投递的队列消息堆积满了,
最早的消息可能成为死信
如果该队列配置了
dead-letter-exchange
属性,指定了一个交换机, 那么队列中的死信就会投递到这个交换机中, 而这个交换机称为 死信交换机【Dead Letter Exchange
简称 DLX】 死信模型 - 消费者使用
-
TTL
TTL
,也就是Time-To-Live
,如果一个队列中的消息TTL
结束仍未消费, 则会变为死信, TTL
超时分为两种情况 - 消息所在的队列设置了存活时间
- 消息本身设置了存活时间
原理 -
实现
正常队列绑定死信交换机以及死信队列实现 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class SpringRabbitListener {
// 延迟消息 基于注解方式
public void listenDlQueue(String msg) {
log.info("接收到 dl.queue的延迟消息: {}", msg);
}
}蓝色框部分实现 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class CommonConfig {
/************************** 死信 *********************************/
public DirectExchange ttlExchange() {
return new DirectExchange("ttl.direct");
}
public Queue ttlQueue() {
return QueueBuilder.durable("ttl.queue") // 指定队列名称并持久化
.ttl(10000) // 设置队列的超时时间为 10s
.deadLetterExchange("dl.direct") // 指定死信交换机
.deadLetterRoutingKey("dl") // 指定死信 RoutingKey
.build();
}
public Binding simpleBinding() {
return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
}
}
-
发送消息时,
给消息本身设置超时时间 1
2
3
4
5
6
7
8
9
10
11
12
13
14
void ttl() {
String message = "ttl 5s or 10s...................!";
Message build = MessageBuilder.withBody(message.getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT) // 持久化
// 消息设置过期时间 TTL=5000ms=5s【最终 5s】 < 队列设置的 TTL=10000ms=10s
.setExpiration("5000")
.build();
// 发送消息
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("ttl.direct", "ttl", build);
}
log.info("消息发送完毕....................");
}如果消息和队列同时设置了
超时时间
,则以两者中最小的为准 超时时间 -
延迟队列
利用
TTL
结合死信交换机, 我们实现了消息发出后, 消费者延迟收到消息的效果。这种消息模式就称为 延迟队列【Delay Queue】
模式-
重新安装
rabbitmq
1
2
3
4
5
6
7
8
9
10docker run \
-e RABBITMQ_DEFAULT_USER=root \
-e RABBITMQ_DEFAULT_PASS=root \
-v mq-plugins:/plugins \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management当前版本信息 -
容器内部执行虚拟主机添加命令
1
2
3
4
5
6
7
8
9# 添加虚拟机
rabbitmqctl add_vhost /amqp
# 添加用户 root 密码 root
rabbitmqctl add_user root root
# 分配角色
rabbitmqctl set_user_tags root administrator
# 设置虚拟主机 /amqp 的权限:
rabbitmqctl set_permissions -p /amqp root ".*" ".*" ".*" -
延迟队列的使用场景
- 延迟发送短信
- 用户下单,
如果用户在 15
分钟内未支付, 则自动取消 - 预约工作会议,
20
分钟后自动通知所有参会人员
-
延迟队列插件安装
-
基于
Docker
-
RabbitMQ
有一个官方的插件社区,地址为: -
github
下载: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/3.9.0, 选择后缀为
ez
的文件, This release has no changes except for its metadata that lists [RabbitMQ 3.9.0](https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.9.0) (and future
3.9.xreleases) as supported.
-
上传插件到数据卷插件目录
1
2# 查看数据卷位置
docker volume inspect mq-plugins上传 -
安装插件,进入容器内部
1
2
3
4# 进入 mq 容器内部
docker exec -it mq bash
# 开启插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange结果如下 -
页面使用插件
DelayExchange
插件的原理是对官方原生的 Exchange
做了功能升级 -
将
DelayExchange
接收到的消息暂存在内存中 ( 官方的 Exchange
是无法存储消息的) -
在
DelayExchange
中计时, 超时后才投递消息到队列中 -
使用
在
RabbitMQ
的管理平台创建一个 DelayExchange
创建 DelayExchange
消息的延迟时间需要在发送消息的时候指定
指定
-
-
-
SpringAMQP
使用延迟队列插件 DelayExchange
的本质还是官方的三种交换机, 只是添加了延迟队列. 因此使用时只需要声明一个交换机, 交换机的类型可以是任意的, 然后设定 delayed
属性为 true
即可 -
基于注解声明
1
2
3
4
5
6
7
8
9// 消费者处: 基于注解声明 【推荐使用注解声明延迟队列】
public void listenDelayedQueue(String msg) {
log.info("接收到了 delay.queue 的延迟消息: {}", msg);
} -
基于
Bean
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22// 消费者处
public class CommonConfig {
public DirectExchange delayedExchange() {
return ExchangeBuilder.directExchange("delay.direct") // 指定交换机类型和名称
.delayed() // 设置 delay 属性为 true
.durable(true) // 持久化
.build();
}
public Queue delayedQueue() {
return new Queue("delay.queue");
}
public Binding delayedBinding() {
return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with("delay");
}
} -
然后我们向这个
delay
为 true
的交换机中发送消息, 一定要给消息添加一个 header:x-delay
值为延迟的时间, 单位为 ms
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28// 生产者发送消息
void ttl_plugins() {
String message = "ttl plugins...................!";
Message build = MessageBuilder.withBody(message.getBytes(StandardCharsets.UTF_8))
.setHeader("x-delay", 1000)
.setDeliveryMode(MessageDeliveryMode.PERSISTENT) // 持久化
.build();
// 消息 id,需要封装到 CorrelationData 中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 添加 callback
correlationData.getFuture().addCallback(result -> {
if (result.isAck()) {
// ack 消息成功
log.debug("消息发送成功,ID: {}", correlationData.getId());
} else {
// nack 消息失败
log.error("消息发送失败,ID: {},原因: {}", correlationData.getId(), result.getReason());
}
}, ex -> {
log.error("消息发送异常,ID: {},原因: {}", correlationData.getId(), ex.getMessage());
});
// 发送消息
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("delay.direct", "delay", build, correlationData);
}
log.debug("消息发送完毕....................");
} -
修改配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43package com.example.publisher.config;
// 等同于上述
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;
public class CommonConfig implements ApplicationContextAware {
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 获取 RabbitTemplate
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// TODO:设置 ReturnCallback(旧)? ReturnsCallback(新)
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
public void returnedMessage(ReturnedMessage returnedMessage) {
/**
* public class ReturnedMessage {
* private final Message message;
* private final int replyCode;
* private final String replyText;
* private final String exchange;
* private final String routingKey;
* }
*/
// TODO: 判断是否是延迟消息
if (returnedMessage.getMessage().getMessageProperties().getReceivedDelay() > 0) {
// 是一个延迟消息,忽略这个错误提示
return;
}
// 记录日志
log.info("消息发送失败,应答码: {}, 原因:{}, 交换机:{}, 路由键:{}, 消息:{}", returnedMessage.getReplyCode(), returnedMessage.getReplyText(), returnedMessage.getExchange(), returnedMessage.getRoutingKey(), returnedMessage.getMessage().toString());
// 如果有需要,可以重发消息
}
});
}
}
-
-
-
消息堆积问题
当生产者发送消息的速度超过了消费者处理消息的速度,
就会导致队列中的消息堆积, 直到队列存储消息达到上限。最早接收到的消息, 可能就会成为死信, 会被丢弃, 这就是 消息堆积
问题 -
解决消息堆积有三种思路
- 增加更多的消费者,
提高消费速度 - 在消费者内开启线程池加快消息处理速度
- 扩大队列容积,提高堆积上限
- 增加更多的消费者,
-
惰性队列的特征如下
- 接收到消息后
直接存入磁盘而非内存
- 消费者要消费消息时才会从磁盘中读取并加载到内存
- 支持数百万条的消息存储
- 接收到消息后
-
启用惰性队列
-
设置一个队列为惰性队列,只需要在声明时,
指定 x-queue-mode
属性为 lazy
即可 1
2# 可以将一个运行中的队列设置为惰性队列
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues -
用
SpringAMQP
声明惰性队列分两种方式 -
基于
@Bean
1
2
3
4
5
6
7
8
9
10
11// 推荐使用
public class LazyModeConfig {
public Queue lazyQueue() {
return QueueBuilder.durable("lazy.queue")
.lazy() // 开启 x-queue-mode 为 lazy
.build();
}
} -
基于
注解
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class SpringRabbitListener {
public void listenLazyQueue(String msg) {
log.debug("接收到 lazy.queue的消息: {}", msg);
}
}
-
-
-
优缺点
- 优点
- 基于磁盘存储,
消息上限高 - 没有间歇性的
page-out
,性能比较稳定
- 基于磁盘存储,
- 缺点
- 基于磁盘存储,
消息时效性会降低 - 性能受限于磁盘的
IO
- 基于磁盘存储,
- 优点
-
集群分类
-
普通集群
是一种分布式集群,
将队列分散到集群的各个节点, 从而提高整个集群的并发能力 -
镜像集群
是一种主从集群,
普通集群的基础上, 添加了主从备份功能, 提高集群的数据可用性 镜像集群虽然支持主从,但主从同步并不是强一致的,
某些情况下可能有数据丢失的风险。因此在 RabbitMQ
的 3.8
版本以后, 推出了新的功能: 仲裁队列 底层采用 Raft
协议确保主从的数据一致性
-
-
普通集群
或者叫标准集群
( classic cluster
),具备下列特征 -
会在集群的各个节点间共享部分数据,
包括: 交换机、队列元信息。不包含队列中的消息 -
当访问集群某节点时,
如果队列不在该节点, 会从数据所在节点传递到当前节点并返回 -
队列所在节点宕机,
队列中的消息就会丢失 传递 宕机消息丢失 -
普通集群部署计划
计划分配 集群中的节点默认都是:
rabbit@[hostname]
,因此以上三个节点的名称分别为rabbit@mq1
rabbit@mq2
rabbit@mq3
-
获取
cookie
RabbitMQ
底层依赖于 Erlang
,而Erlang
虚拟机就是一个面向分布式的语言,默认就支持集群模式。集群模式中的每个 RabbitMQ
节点使用cookie
来确定它们是否被允许相互通信。要使两个节点能够通信,它们必须具有相同的共享秘密,称为
Erlang cookie。 cookie
只是一串最多255
个字符的字母数字字符。 每个集群节点必须具有相同的 cookie。实例之间也需要它来相互通信。
-
我们先在之前启动的
mq
容器中获取一个 cookie
值,作为集群的 cookie
。执行下面的命令1
2
3
4# rabbitmq:3-management[当前版本]|rabbitmq:3.8-management【可更换的版本】
docker exec -it mq cat /var/lib/rabbitmq/.erlang.cookie
# 非固定值
QZCXBOTUGFDXGZHUHZBX获取 cookie
-
接下来,停止并删除当前的
mq
容器,我们重新搭建集群 1
docker rm -f mq
-
准备集群配置
-
在
/tmp
目录新建一个配置文件 rabbitmq.conf
1
2
3cd /tmp
# 创建文件
vim rabbitmq.conf1
2
3
4
5
6
7# 写入内容如下
loopback_users.guest = false
listeners.tcp.default = 5672
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
cluster_formation.classic_config.nodes.1 = rabbit@mq1
cluster_formation.classic_config.nodes.2 = rabbit@mq2
cluster_formation.classic_config.nodes.3 = rabbit@mq3 -
再创建一个文件,记录
cookie
1
2
3
4
5cd /tmp
# 写入cookie
echo "QZCXBOTUGFDXGZHUHZBX" > .erlang.cookie
# 修改cookie 文件的权限
chmod 600 .erlang.cookie -
准备三个目录
mq1、mq2、mq3
1
2
3cd /tmp
# 创建目录
mkdir mq1 mq2 mq3 -
然后拷贝
rabbitmq.conf、cookie
文件到 mq1、mq2、mq3
1
2
3
4
5
6
7
8
9# 进入
/tmp
cd /tmp
# 拷贝
cp rabbitmq.conf mq1
cp rabbitmq.conf mq2
cp rabbitmq.conf mq3
cp .erlang.cookie mq1
cp .erlang.cookie mq2
cp .erlang.cookie mq3 -
启动集群
-
创建网络
1
docker network create mq-net
-
node1
1
2
3
4
5
6
7
8
9
10
11docker run -d --net mq-net \
-v ${PWD}/mq1/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=root \
-e RABBITMQ_DEFAULT_PASS=root \
--name mq1 \
--hostname mq1 \
-p 8071:5672 \
-p 8081:15672 \
rabbitmq:3-management
# rabbitmq:3.8-management -
node2
1
2
3
4
5
6
7
8
9
10
11docker run -d --net mq-net \
-v ${PWD}/mq2/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=root \
-e RABBITMQ_DEFAULT_PASS=root \
--name mq2 \
--hostname mq2 \
-p 8072:5672 \
-p 8082:15672 \
rabbitmq:3-management
# rabbitmq:3.8-management -
node3
1
2
3
4
5
6
7
8
9
10
11
12docker run -d --net mq-net \
-v ${PWD}/mq3/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=root \
-e RABBITMQ_DEFAULT_PASS=root \
--name mq3 \
--hostname mq3 \
-p 8073:5672 \
-p 8083:15672 \
rabbitmq:3-management
# rabbitmq:3.8-management -
测试
-
在
mq1
这个节点上添加一个队列 其他节点同时查看到
-
-
在刚刚的案例中,一旦创建队列的
主机宕机
,队列就会不可用。不具备高可用能力。如果要解决这个问题,必须使用官方提供的镜像集群方案。 -
-
-
-
-
镜像集群:本质是主从模式,
具备以下特征 - 交换机、队列、队列中的消息会在各个
mq
的镜像节点之间同步备份 - 创建队列的节点被称为该队列的
主节点
,备份到的其他节点叫做该队列的镜像节点 - 一个队列的主节点可能是从另一个队列的镜像节点
- 所有操作都是主节点完成,然后同步给镜像节点
互相备份【推荐使用】 -
镜像模式的特征
默认情况下,队列只保存在创建该队列的节点上。而镜像模式下,创建队列的节点被称为该队列的主节点,队列还会拷贝到集群中的其它节点,也叫做该队列的镜像节点。
但是,不同队列可以在集群中的任意节点上创建,因此不同队列的主节点可以不同。甚至,一个队列的主节点可能是另一个队列的镜像节点。
用户发送给队列的一切请求,例如发送消息、消息回执默认都会在主节点完成,如果是从节点接收到请求,也会路由到主节点去完成。镜像节点仅仅起到备份数据作用。
当主节点接收到消费者的
ACK 时,所有镜像都会删除节点中的数据。 总结如下:
- 镜像队列结构是一主多从(从就是镜像)
- 所有操作都是主节点完成,然后同步给镜像节点
- 主宕机后,镜像节点会替代成新的主(如果在主从同步完成前,主就已经宕机,可能出现数据丢失)
- 不具备负载均衡功能,因为所有操作都会有主节点完成(但是不同队列,其主节点可以不同,可以利用这个提高吞吐量)
-
镜像模式的配置
镜像模式的配置有
3 种模式 ha-mode ha-params 效果 准确模式 exactly 队列的副本量 count 集群中队列副本(主服务器和镜像服务器之和)的数量。count 如果为 1 意味着单个副本:即队列主节点。count 值为 2 表示 2 个副本:1 个队列主和 1 个队列镜像。换句话说:count = 镜像数量 + 1。如果群集中的节点数少于 count,则该队列将镜像到所有节点。如果有集群总数大于 count+1,并且包含镜像的节点出现故障,则将在另一个节点上创建一个新的镜像。 all【不推荐使用】 (none) 队列在群集中的所有节点之间进行镜像。队列将镜像到任何新加入的节点。镜像到所有节点将对所有群集节点施加额外的压力,包括网络 I / O,磁盘 I / O 和磁盘空间使用情况。推荐使用 exactly,设置副本数为(N / 2 +1)。 nodes node names 指定队列创建到哪些节点,如果指定的节点全部不存在,则会出现异常。如果指定的节点在集群中存在,但是暂时不可用,会创建节点到当前客户端连接到的节点。 -
exactly
模式配置 1
rabbitmqctl set_policy ha-two "^two\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
rabbitmqctl set_policy
:固定写法ha-two
:策略名称,自定义"^two\."
:匹配队列的正则表达式,符合命名规则的队列才生效,这里是任何以two.
开头的队列名称 '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
: 策略内容"ha-mode":"exactly"
:策略模式,此处是exactly 模式,指定副本数量 "ha-params":2
:策略参数,这里是2,就是副本数量为 2,1 主 1 镜像 "ha-sync-mode":"automatic"
:同步策略,默认是manual,即新加入的镜像节点不会同步旧的消息。如果设置为 automatic,则新加入的镜像节点会把主节点中所有消息都同步,会带来额外的网络开销
-
all
模式配置 1
rabbitmqctl set_policy ha-all "^all\." '{"ha-mode":"all"}'
ha-all
:策略名称,自定义"^all\."
:匹配所有以all.
开头的队列名 '{"ha-mode":"all"}'
:策略内容"ha-mode":"all"
:策略模式,此处是all 模式,即所有节点都会称为镜像节点
-
nodes
模式配置 1
rabbitmqctl set_policy ha-nodes "^nodes\." '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'
rabbitmqctl set_policy
:固定写法ha-nodes
:策略名称,自定义"^nodes\."
:匹配队列的正则表达式,符合命名规则的队列才生效,这里是任何以nodes.
开头的队列名称 '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'
: 策略内容"ha-mode":"nodes"
:策略模式,此处是nodes 模式 "ha-params":["rabbit@mq1", "rabbit@mq2"]
:策略参数,这里指定副本所在节点名称
- 交换机、队列、队列中的消息会在各个
-
配置精确模式
exactly
-
是在之前的普通模式上配置的
-
进入
mq1【任意启动的】
容器内部执行1
docker exec -it mq1 rabbitmqctl set_policy ha-two "^two\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
添加一个 two.test.queue
队列 出现镜像节点
-
-
仲裁队列
仲裁队列是
3.8
版本以后才有的新功能, 用来代替镜像队列, 具备以下特征 -
与镜像队列一样,都是主从模式,
支持主从数据同步 -
使用非常简单,
没有复杂的配置 -
主从同步基于
Raft
协议, 强一致 -
搭建
-
在任意控制台添加一个队列,一定要选择队列类型为
Quorum
类型。 添加队列 可以看到,仲裁队列的
+ 2
字样。代表这个队列有 2
个镜像节点。 因为仲裁队列
默认的镜像数为
。如果你的集群有5 7
个节点,那么镜像数肯定是 5
;而我们集群只有3
个节点,因此镜像数量就是 3
仲裁集群 -
SpringAMQP
创建仲裁队列 1
2
3
4
5
6
7
8
9
10
11
public class QuorunQueueConfig {
public Queue quorumQueue(){
return QueueBuilder
.durable("quorum.queue") // 持久化
.quorum() // 仲裁队列
.build();
}
} -
yaml
配置 1
2
3
4
5
6spring:
rabbitmq:
addresses: 192.168.2.3:8071,192.168.2.3:8072,192.168.2.3:8073
virtual-host: /
username: root
password: root
-
-
-
扩容
-
启动一个新的
MQ 容器 1
2
3
4
5
6
7
8
9docker run -d --net mq-net \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq4 \
--hostname mq5 \
-p 8074:15672 \
-p 8084:15672 \
rabbitmq:3-management -
进入容器控制台
1
docker exec -it mq4 bash
-
停止
mq
进程 1
rabbitmqctl stop_app
-
重置
RabbitMQ
中的数据 1
rabbitmqctl reset
-
加入
mq1
1
rabbitmqctl join_cluster rabbit@mq1
-
再次启动
mq
进程 1
rabbitmqctl start_app
-
-
增加仲裁队列副本
-
我们先查看下
quorum.queue
这个队列目前的副本情况,进入 mq1
容器 1
2
3docker exec -it mq1 bash
rabbitmq-queues quorum_status "quorum.queue"-
现在,我们让
mq4
也加入进来 1
rabbitmq-queues add_member "quorum.queue" "rabbit@mq4"
-
-
-
Elasticsearch
-
详细使用
-
倒排索引
- 文档: 每条数据就是一个文档
- 词条: 文档按照语义分成的词语
词条不能重复 -
搜索过程
搜索过程
-
ES
的一些概念 -
什么是正向索引
基于文档
id
创建索引, 查询词条时必须先找到文档, 而后判断是否是包含词条 -
什么是倒排索引
对文档内容分词,
对词条创建索引, 并记录所在文档的信息, 查询时现根据词条查询到文档 id
,而后获取到文档
-
-
索引
相同类型的文档的集合
索引的映射
( mapping
):索引章文档的字段约束信息, 类似表的结构约束 索引 -
概念对比
MYSQL
Elasticsearch
说明 Table
Index
索引 Index
,就是文档集合, 类似数据库的表 Row
Document
文档, 就是一条条数据,类似数据库中的行, 文档都是 JSON
格式 Column
Field
字段, 就是 JSON
文档中的字段, 类似数据库中的列 Schema
Mapping
Mapping(映射)
是索引中文档的约束, 例如字段类型约束,类似数据库的表结构 SQL
DSL
DSL
是 elasticsearch
提供的 JSON
风格的请求语句, 用来操作 elasticsearch
实现 CRUD
-
架构
MYSQL
:擅长事务类型操作, 可以确保数据的安全和一致性 Elasticsearch
:擅长海量数据的搜索、分析、计算MYSQL
与 Elasticsearch
互补
-
获取文件
-
离线安装文件说明
离线安装文件说明 ik
需要放在数据卷插件目录 _data/ 中 -
安装流程
-
部署单点
ES
因为我们还需要部署
kibana
容器,因此需要让 es
和 kibana
容器关联, 这里先创建一个网络 1
docker network create es-net
-
如果上传到虚拟机中,
之后运行命令加载即可 ( kibana
同样操作) 1
docker load -i es.tar
-
拉取镜像
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30# -v 也可以指定自己的绝对路径
docker run -d \
--name es \
-e "ES_JAVA_OPTS=-Xms512m -Xmx512m" \
-e "discovery.type=single-node" \
-v es-data:/usr/share/elasticsearch/data \
-v es-plugins:/usr/share/elasticsearch/plugins \
--privileged=true \
--network es-net \
-p 9200:9200 \
-p 9300:9300 \
--restart=always \
elasticsearch:7.12.1
# 9200 提供 http 使用
# 9300 内部调用
# 高配置
docker run -d \
--name es \
-e "ES_JAVA_OPTS=-Xms1g -Xmx1g" \
-e "discovery.type=single-node" \
-v es-data:/usr/share/elasticsearch/data \
-v es-plugins:/usr/share/elasticsearch/plugins \
--privileged=true \
--network es-net \
-p 9200:9200 \
-p 9300:9300 \
--restart=always \
elasticsearch:7.12.1
-
查看数据卷位置
docker volume inspect es-plugins
-
访问测试
访问测试,出现如下 json
则为正确安装
-
-
-
kibana
安装 -
安装启动
1
2
3
4
5
6
7
8docker run -d \
--name kibana \
-e ELASTICSEARCH_HOSTS=http://es:9200 \
--network=es-net \
--privileged=true \
-p 5601:5601 \
--restart=always \
kibana:7.12.1-
访问测试
访问测试: http://192.168.247.129:5601/app/dev_tools#/console, 点击 Explore on my own
Dev Tools
-
Dev Tools
这个界面中可以编写
DSL
来操作 elasticsearch
,并且对 DSL
语句有自动补全功能 Dev Tools
-
-
-
ik
离线安装 -
上传文件到
es-plugins
数据卷的 _data 目录下 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15# 下载
wget https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.12.1/elasticsearch-analysis-ik-7.12.1.zip
# 解压 zip 文件使用
yum install -y unzip
# 解压到指定目录 -d 指定目录
unzip elasticsearch-analysis-ik-7.12.1.zip -d ik
# 删除压缩包
rm -rf elasticsearch-analysis-ik-7.12.1.zip
# 重启 es
docker restart es
# 查看日志
docker logs -f es下载 测试
ik -
IK
分词器包含两种模式 ik_smart
:最少切分 ik_max_word
:最细切分
-
IK
分词器-扩展词库 要扩展
ik
分词器的词库, 只需要修改一个 ik
分词器目录中的 config
目录中的 IKAnalyzer.cfg.xml
文件 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<properties>
<comment>IK Analyzer 扩展配置</comment>
<!--用户可以在这里配置自己的扩展字典 -->
<entry key="ext_dict"></entry>
<!--用户可以在这里配置自己的扩展停止词字典-->
<entry key="ext_stopwords"></entry>
<!--用户可以在这里配置远程扩展字典 -->
<!-- <entry key="remote_ext_dict">words_location</entry> -->
<!--用户可以在这里配置远程扩展停止词字典-->
<!-- <entry key="remote_ext_stopwords">words_location</entry> -->
</properties>
<!-- 修改如下 -->
<entry key="ext_dict">ext.dic</entry>
<!--用户可以在这里配置自己的扩展停止词字典-->
<entry key="ext_stopwords">stopword.dic</entry>
<!-- 修改后需要重启 es -->使用
-
-
总结
- 分词器的作用是什么
- 创建倒排索引时对文档分词
- 用户搜索时,
对输入的内容分词
- 分词器的作用是什么
-
Mapping
是对索引库中文档的约束, 常见的 mapping
属性包括 -
type
字段数据类型, 常见的简单类型有 -
字符串
text(可分词文本),keyword(精确值,例如:ip
地址) -
数值
long,Integer,short,byte,double,float
-
布尔:
boolean
-
日期
date
-
对象
object
1
2
3
4
5
6
7
8
9
10
11{
"age": 18,
"weight": 56.55,
"isMarried": false,
"email": "zy@qq.com",
"score": [1,2,3],
"name": {
"firstName": "coder",
"lastName": "itl"
}
}
-
-
index
:是否创建索引, 默认为 true
-
analyzer
:使用哪种分词器 -
properties
:该字段的子字段
-
-
创建索引库
ES
中通过 Restful
请求操作索引库, 文档, 请求内容用 DSL
语句来表示, 创建索引库和 mapping
的 DSL
语句如下 实例 -
查看索引库
1
2
3GET /
索引库名
# 示例
GET /coderitl -
删除索引库
1
2
3DELETE /
索引库名
# 示例
DELETE /coderitl -
修改索引库
禁止修改
索引库和
mapping
一旦创建无法修改, 但是可以添加新的字段, 1
2
3
4
5
6
7
8PUT /
索引库名 /_mapping
{
"properties":{
"新字段名":{
"type": "integer"
}
}
}
-
语法
1
2
3
4
5
6
7
8
9POST /
索引库名 /_doc/ 文档 id
{
"字段1": "值 1",
"字段2": "值 2",
"字段3": {
"子属性1": "值 3",
"子属性2": "值 4"
}
} -
新增文档
1
2
3
4
5
6
7
8
9
10# 新增文档 POST /coderitl/_doc/(固定格式) 1(id)
POST /coderitl/_doc/1
{
"info":"程序员",
"email":"lx@qq.com",
"name":{
"firstName":"信",
"lastName":"李"
}
}新增文档 数组类型 (非数组, 是多值添加) -
查询文档
1
GET /coderitl/_doc/1
-
删除文档
1
DELETE /coderitl/_doc/1
-
修改文档
-
全量修改:
会删除旧文档, 添加新文档 1
2
3
4
5
6
7// 修改指的是: 全量修改
(所有字段)
PUT /索引库名 /_doc/ 文档 id
{
"字段1": "值 1",
"字段2": "值 2",
// ...
}全量修改时是所有的字段需要全部出现, 不修改的只需要不改动 全量: 会删除旧文档,
添加新文档 -
局部修改
局部修改
-
-
hotel
hotel
-
对应
mapping
mapping
要考虑的问题: 字段名,
数据类型, 是否参数搜索, 是否分词, 如果分词, 分词器是什么 -
ES
中支持两种地理坐标数据类型 geo_point
:由纬度 latitude
和经度 longitude
确定的一个点 32.64832,475.48432432
geo_shape
:有多个 geo_point
组成的复杂几何图形, 例如一条直线
-
字段拷贝
字段拷贝可以使用
copy_to
属性将当前字段拷贝到指定字段 1
2
3
4
5
6
7
8"all":{
"type": "text",
"analyzer":"ik_max_word"
},
"brand":{
"type": "keyword",
"copy_to":"all"
} -
DSL
创建索引库 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50PUT /hotel
{
"mappings": {
"properties": {
"id": {
"type": "keyword"
},
"name": {
"type": "text",
"analyzer": "ik_max_word",
"copy_to": "all"
},
"address": {
"type": "keyword",
"index": false
},
"price": {
"type": "integer"
},
"score": {
"type": "integer"
},
"brand": {
"type": "keyword",
"copy_to": "all"
},
"city": {
"type": "keyword"
},
"startName": {
"type": "keyword"
},
"business": {
"type": "keyword",
"copy_to": "all"
},
"location": {
"type": "geo_point"
},
"pic": {
"type": "keyword",
"index": false
},
"all": {
"type": "text",
"analyzer": "ik_max_word"
}
}
}
}
-
-
批量查询
1
GET /hotel/_search
-
什么是
RestClient
ES
官方提供了各种不同语言的客户端, 用来操作 ES
,这些客户端的本质就是组装DSL
语句, 通过 http
请求发送给 ES
-
RestClient
操作索引库 -
引入
es
的 RestHighLevelClient
依赖 1
2
3
4
5<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.12.1</version>
</dependency> -
因为
SpringBoot
默认的 ES
版本是 7.6.2
,所以需要覆盖默认的ES
版本与安装的客户端一致 1
2
3<properties>
<elasticsearch.version>7.12.1</elasticsearch.version>
</properties> -
在测试类中初始化
RestHighClient
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class HotelIndexTest {
private RestHighLevelClient client;
void testCreateIndex() throws IOException {
// 1.准备 Request PUT /hotel
CreateIndexRequest request = new CreateIndexRequest("hotel");
// 2.准备请求参数
request.source(MAPPING_TEMPLATE, XContentType.JSON);
// 3.发送请求
client.indices().create(request, RequestOptions.DEFAULT);
}
void testExistsIndex() throws IOException {
// 1.准备 Request
GetIndexRequest request = new GetIndexRequest("hotel");
// 3.发送请求
boolean isExists = client.indices().exists(request, RequestOptions.DEFAULT);
System.out.println(isExists ? "存在" : "不存在");
}
void testDeleteIndex() throws IOException {
// 1.准备 Request
DeleteIndexRequest request = new DeleteIndexRequest("hotel");
// 3.发送请求
client.indices().delete(request, RequestOptions.DEFAULT);
}
void setUp() {
client = new RestHighLevelClient(RestClient.builder(
HttpHost.create("http://192.168.150.101:9200")
));
}
void tearDown() throws IOException {
client.close();
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53// MAPPING_TEMPLATE
public class HotelIndexConstants {
public static final String MAPPING_TEMPLATE = "{\n" +
" \"mappings\": {\n" +
" \"properties\": {\n" +
" \"id\": {\n" +
" \"type\": \"keyword\"\n" +
" },\n" +
" \"name\": {\n" +
" \"type\": \"text\",\n" +
" \"analyzer\": \"ik_max_word\",\n" +
" \"copy_to\": \"all\"\n" +
" },\n" +
" \"address\": {\n" +
" \"type\": \"keyword\",\n" +
" \"index\": false\n" +
" },\n" +
" \"price\": {\n" +
" \"type\": \"integer\"\n" +
" },\n" +
" \"score\": {\n" +
" \"type\": \"integer\"\n" +
" },\n" +
" \"brand\": {\n" +
" \"type\": \"keyword\",\n" +
" \"copy_to\": \"all\"\n" +
" },\n" +
" \"city\": {\n" +
" \"type\": \"keyword\"\n" +
" },\n" +
" \"starName\": {\n" +
" \"type\": \"keyword\"\n" +
" },\n" +
" \"business\": {\n" +
" \"type\": \"keyword\",\n" +
" \"copy_to\": \"all\"\n" +
" },\n" +
" \"pic\": {\n" +
" \"type\": \"keyword\",\n" +
" \"index\": false\n" +
" },\n" +
" \"location\": {\n" +
" \"type\": \"geo_point\"\n" +
" },\n" +
" \"all\": {\n" +
" \"type\": \"text\",\n" +
" \"analyzer\": \"ik_max_word\"\n" +
" }\n" +
" }\n" +
" }\n" +
"}";
} -
操作索引库
-
创建索引库
创建索引库 -
删除索引库
1
2
3
4
5
6// 删除索引库
public void deleteIndex() throws IOException {
DeleteIndexRequest request = new DeleteIndexRequest("hotel");
client.indices().delete(request, RequestOptions.DEFAULT);
} -
判断索引库是否存在
1
2
3
4
5
6
7
8// 判断索引库是否存在
public void isExistsIndex() throws IOException {
GetIndexRequest request = new GetIndexRequest("hotel");
boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
System.out.println("exists = " + exists);
}
-
-
-
新增文档
新增文档 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39package com.coderitl.es.hotel;
public class TestDocument {
private RestHighLevelClient client;
private HotelService hotelService;
public void testCreateDocument() throws IOException {
// 根据 id 查询酒店数据
Hotel hotel = hotelService.getById(61083L);
// 转换为文档类型
HotelDoc hotelDoc = new HotelDoc(hotel);
// 1. request
IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());
// 2. 准备 json(JackSon 序列化)
request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
// 3. 发送请求
client.index(request, RequestOptions.DEFAULT);
}
public void setUp() {
this.client = new RestHighLevelClient(RestClient.builder(
// 集群使用逗号分隔 => HttpHost.create
HttpHost.create("http://192.168.247.129:9200")
));
}
public void close() throws IOException {
this.client.close();
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33// 根据文档创建的实体类
public class HotelDoc {
private Long id;
private String name;
private String address;
private Integer price;
private Integer score;
private String brand;
private String city;
private String starName;
private String business;
private String location;
private String pic;
public HotelDoc(Hotel hotel) {
this.id = hotel.getId();
this.name = hotel.getName();
this.address = hotel.getAddress();
this.price = hotel.getPrice();
this.score = hotel.getScore();
this.brand = hotel.getBrand();
this.city = hotel.getCity();
this.starName = hotel.getStarName();
this.business = hotel.getBusiness();
// 转换为文档类型
this.location = hotel.getLatitude() + ", " + hotel.getLongitude();
this.pic = hotel.getPic();
}
} -
查询文档
查询文档 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18// 查询文档库
public void testSelectDocument() throws IOException {
// 数据库对象
Hotel hotel = hotelService.getById(ID_INFO);
// 将数据库对象转换为 hotelDoc
HotelDoc hotelDoc = new HotelDoc(hotel);
// 1. 创建 request 对象
GetRequest request = new GetRequest("hotel", ID_INFO.toString());
// 2. 发送请求 得到结果
GetResponse response = client.get(request, RequestOptions.DEFAULT);
// 3. 解析结果
String json = response.getSourceAsString();
// JSON 反序列化
HotelDoc jsonHotelDoc = JSON.parseObject(json, HotelDoc.class);
System.out.println(jsonHotelDoc);
} -
更新文档
更新文档 1
2
3
4
5
6
7
8
9
10
11
12
13
14// 更新文档库: 局部更新
public void testUpdateLocalDocument() throws IOException {
// 1. 准备 request 对象
UpdateRequest request = new UpdateRequest("hotel", ID_INFO.toString());
// 2. 准备请求参数(更新的字段)
request.doc(
"price", "100000", // 第一个键值对
"starName", "六钻" // 第二个键值对 以逗号分割键值对
);
// 3. 发送请求
client.update(request, RequestOptions.DEFAULT);
} -
删除文档库
1
2
3
4
5
public void testDeleteDocument() throws IOException {
DeleteRequest request = new DeleteRequest("hotel", ID_INFO.toString());
client.delete(request, RequestOptions.DEFAULT);
} -
利用
JavaRestClient
批量导入酒店数据到 ES
-
需求: 批量查询酒店数据,
然后导入到索引库 -
思路
-
利用
mybatis-plus
查询酒店数据 -
将查询到的数据转换为文档类型数据
-
利用
JavaRestClient
中的 Bulk
批处理, 实现批量新增文档 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20// 批量加入索引库
public void testBulkDocument() throws IOException {
// 批量查询
List<Hotel> hotelList = hotelService.list();
// 创建 request
BulkRequest request = new BulkRequest();
// 准备参数,添加多个新增的 Request
for (Hotel hotel : hotelList) {
// 转换为 hotelDoc
HotelDoc hotelDoc = new HotelDoc(hotel);
// 创建新增文档的 Request对象
request.add(new IndexRequest("hotel")
.id(hotelDoc.getId().toString())
.source(JSON.toJSONString(hotelDoc), XContentType.JSON)
);
}
client.bulk(request, RequestOptions.DEFAULT);
}
-
-
-
DSL Query
分类 -
查询所有: 查询出所有数据,
一般测试用, Eg: match_all
-
全文检索
( full text
)查询: 利用分词器对用户输入内容分词, 然后去倒排索引库中匹配 -
match_query
-
multi_match_query
match_all
1
2
3
4
5
6
7# 查询所有
GET /hotel/_search
{
"query": {
"match_all": {}
}
}-
全文检索
-
match
查询: 全文检索查询的一种, 会对用户输入内容分词, 然后去倒排索引库检索 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22# 语法
GET /hotel/_search
{
"query": {
"match": {
# 查询字段类型为 text 类型的
"FIELD": "TEXT"
}
}
}
# 示例
GET /hotel/_search
{
"query": {
"match": {
"all": "如家"
}
}
} -
mullti_match
:与 match
查询类似, 只不过允许同时查询多个字段 1
2
3
4
5
6
7
8
9GET /hotel/_search
{
"query": {
"multi_match": {
"query": "外滩",
"fields": ["brand","name"]
}
}
}
-
-
-
-
精确查询: 根据精确词条值查询数据,
一般是查找 keyword、数值、日期、boolean
等类型字段, 不会 -
ids
: 根据id
精确匹配 -
range
:根据值的范围查询 1
2
3
4
5
6
7
8
9
10
11GET /hotel/_search
{
"query": {
"range": {
"price": {
"gte": 2000,
"lte": 10000
}
}
}
} -
term
根据词条 精确值
查询1
2
3
4
5
6
7
8
9
10
11GET /hotel/_search
{
"query": {
"term": {
"city": {
"value": "上海" # 精确内容
}
}
}
}
-
-
地理
( geo
)查询: 根据经纬度查询 -
geo_distance
:查询到指定中心点小于某个距离值的所有文档( 附近功能
)1
2
3
4
5
6
7
8
9GET /hotel/_search
{
"query": {
"geo_distance": {
"distance":"2km",
"location":"31.21,121.5"
}
}
}geo_distance
-
geo_bounding_box
:查询 geo_point
值落在某个矩形范围的所有文档 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19GET /hotel/_search
{
"query": {
"geo_bounding_box": {
"location": {
"top_left": {
"lat":"31.1",
"lon":"121.5"
},
"bottom_right": {
"lat":"30.9",
"lon":"121.7"
}
}
}
}
}矩形内的点就是结果
-
-
复合
( compound
)查询: 复合查询可以将上述查询条件组合起来, 合并查询条件 -
bool
must
:必须匹配每个子查询, 类似 与
should
:选择性匹配子查询, 类似 或
must_not
:必须不匹配,不参与算分, 类似 非
filter
:必须匹配 不参与算分
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44# bool 基本使用
GET /hotel/_search
{
"query": {
"bool": {
"must": [
{
"term": {
"city": {
"value": "上海"
}
}
}
],
"should": [
{
"term": {
"brand": {
"value": "华美达"
}
}
}
],
"must_not": [
{
"range": {
"price": {
"lte": 500
}
}
}
],
"filter": [
{
"range": {
"score": {
"gte": 45
}
}
}
]
}
}
} -
function_score
:算分函数查询, 可以控制文档相关性算分, 控制文档排名, 例如 百度竞价
-
-
相关性打分算法
-
TF-IDF
:在 elasticsearch5.0
之前, 会随着词频增加而越来越大 -
BM25
:在 elasticsearch5.0
之后, 会随着词频增加而增大, 但增长曲线会趋于水平 打分算法 BM25
平滑(默认使用) 影响打分算法 -
案例
给
如家
这个品牌的酒店排名靠前一些-
function_score
需要的三要素 -
那些文档需要算分加权
=> 品牌为如家的酒店
-
算分函数是什么
=> weight
就可以 -
加权模式是什么
=>
求和 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23GET /hotel/_search
{
"query": {
"function_score": {
"query": {
"match": {
"all": "外滩"
}
},
"functions": [
{
"filter": {
"term": {
"brand": "如家"
}
},
"weight": 10
}
],
"boost_mode": "sum"
}
}
}
-
-
-
-
-
需求:搜索名字包含
如家
,价格不高于 400
,在坐标31.21,121,5
周围 10km
范围内的酒店 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34GET /hotel/_search
{
"query": {
"bool": {
"must": [
{
"match": {
"name": "如家"
}
}
],
"must_not": [
{
"range": {
"price": {
"gte": 400
}
}
}
],
"filter": [
{
"geo_distance": {
"distance": "10km",
"location": {
"lat": 31.21,
"lon": 121.5
}
}
}
]
}
}
}
elasticsearch
支持对搜索 结果排序
,默认是根据相关度算分 _score
来排序, 可以排序的字段类型有 keyword、数值、地理坐标、日期类型等
-
语法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30GET /hotel/_search
{
"query": {
"match_all": {}
},
"sort": [
{
"FIELD": {
"order": "desc" // 排序字段和排序方式: ASC,DESC
}
}
]
}
// 地理坐标
GET /hotel/_search
{
"query": {
"match_all": {}
},
"sort": [
{
"_geo_distance": {
"FIELD": "纬度,经度",
"order": "asc",
"unit": "km"
}
}
]
} -
案例
对酒店数据按照用户评价降序排序,
评价相同的按照价格升序排序 评价是
score
字段, 价格是 price
字段, 按照顺序添加两个排序规则 1
2
3
4
5
6
7
8
9
10
11
12
13
14GET /hotel/_search
{
"query": {
"match_all": {}
},
"sort": [
{
"score": {
"order": "desc"
},
"price": "asc" // 简写形式
}
]
} -
案例
实现对酒店数据按照到你的位置坐标的距离升序排序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15GET /hotel/_search
{
"query": {
"match_all": {}
},
"sort": [
{
"_geo_distance": {
"location": "21,13", // 简写形式
"order": "asc",
"unit": "km"
}
}
]
} -
结果分页
elasticsearch
默认情况下只返回 top10
的数据, 而如果要查询更多的数据就需要修改分页参数了 elasticsearch
中通过修改 from、size
参数来控制要返回的分页结果 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15GET /hotel/_search
{
"query": {
"match_all": {}
},
"from": 100, // 分页开始的位置,默认为 0
"size": 1, // 期望获取的文档总数
"sort": [
{
"price": {
"order": "asc"
}
}
]
}(深度分页) 这种分页方式不利于集群,(因为获取了 from:100,zise:1 =>
)总共获取了 101 条数据, 截取了 1 条数据 -
深度分页带来的问题
-
首先在每个数据分片上都排序并查询前
1000
条文档 -
然后将所有节点的结果聚合,
在内存中重新排序选出前 1000
条文档 -
最后从这
1000
条中, 选取从 990
开始的 10
条文档 如果搜索页过深,
或者结果集 from+size
越大, 对内存和 CPU
的消耗也越高, 因此 ES
设定结果集查询的上限是 10000
-
-
深度分页解决方案
search after
:分页时需要排序, 原理是从上一次的排序值开始, 查询下一页数据 官方推荐
scroll
:原理将排序数据形成的快照,保存在内存 官方已经不推荐
-
总结
from+size
- 优点: 支持随机翻页
- 缺点: 深度分页问题,
默认查询上限是 10000
- 场景: 百度、京东、谷歌、淘宝这样的随机翻页搜索
after search
- 优点: 没有查询上限
(单次查询的 size
不超过 10000
) - 缺点: 只能向后逐页查询,
不支持随机翻页 - 场景: 没有随即翻页需求的搜索,例如手机向下滚动翻页
- 优点: 没有查询上限
scroll
- 优点: 没有查询上限
(单次查询的 size
不超过 10000
) - 缺点: 会有额外内存消耗,
并且搜索结果是 非实时的
- 场景: 海量数据的获取和转义,
从 ES7.1
开始不推荐使用, 建议使用 after search
方案
- 优点: 没有查询上限
-
结果高亮
高亮:
就是在搜索结果中把搜索关键字突出显示 -
原理
- 将搜索结果中的关键字用标签标记出来
- 在页面中给标签添加
css
样式
-
语法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17GET /hotel/_search
{
"query": {
"match": {
"FIELD": "TEXT"
}
},
"highlight": {
"fields": { // 指定要高亮的字段
"FIELD"{
"pre_tags":"<em>", // 用来标记高亮字段的前置标签
"post_tags":"</em>", // 用来标记高亮字段的后置标签,
"require_field_match": "false" // 默认情况下,ES的搜索字段必须与高亮字段一致,false 为可以不一致
}
}
}
} -
搜索结果处理整体语法
搜索结果处理整体语法
-
-
match_all
match_all
1
2
3
4
5
6
7
8
9
10
11
void testMatchAll() throws IOException {
// 1. 准备 request 对象
SearchRequest request = new SearchRequest("hotel");
// 准备 DSL
request.source().query(QueryBuilders.matchAllQuery());
// 结果对象
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
handleResponse(response);
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18// 结果解析抽取
public static void handleResponse(SearchResponse response) {
// 4. 解析响应
SearchHits searchHits = response.getHits();
// 4.1 获取总条数
long total = searchHits.getTotalHits().value;
System.out.println("共搜索到: " + total);
// 4.2 文档数组
SearchHit[] hits = searchHits.getHits();
// 4.3 遍历
for (SearchHit hit : hits) {
// 获取文档 source
String json = hit.getSourceAsString();
// 反序列化
HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);
System.out.println("hotelDoc = " + hotelDoc);
}
}-
解析
response
解析 response
一 一获取对应的值
-
-
match
1
2
3
4
5
6
7
8
9
10
11
void testMatch() throws IOException {
// 1. 准备 request 对象
SearchRequest request = new SearchRequest("hotel");
// 准备 DSL
request.source().query(QueryBuilders.matchQuery("all","如家"));
// 结果对象
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
handleResponse(response);
} -
精确查询
-
term
1
2
3
4
5
6
7
8
9
10
void testTerm() throws IOException {
// 1. 准备 request 对象
SearchRequest request = new SearchRequest("hotel");
// 准备 DSL
request.source().query(QueryBuilders.termQuery("city", "上海"));
// 结果对象
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
handleResponse(response);
} -
range
1
2
3
4
5
6
7
8
9
10
void testRange() throws IOException {
// 1. 准备 request 对象
SearchRequest request = new SearchRequest("hotel");
// 准备 DSL
request.source().query(QueryBuilders.rangeQuery("price").gte(100).lte(150));
// 结果对象
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
handleResponse(response);
}
-
-
复合查询
复合查询 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void testBool() throws IOException {
// 1. 准备 request 对象
SearchRequest request = new SearchRequest("hotel");
// 准备 DSL
// 准备 BoolQuery
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
// 添加 term
boolQuery.must(QueryBuilders.termQuery("city","上海"));
request.source().query(boolQuery);
// 结果对象
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
handleResponse(response);
}
-
排序和分页
搜索结果的排序和分页是与 query
同级的参数 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void testPageAndSort() throws IOException {
int page = 1;
int size = 5;
// 1. 准备 request 对象
SearchRequest request = new SearchRequest("hotel");
// 2. 准备 DSL
// 2.1 准备 query
request.source().query(QueryBuilders.matchAllQuery())
// 可以链式调用 也可以通过 request.source().sort("price", SortOrder.ASC); 等获取
.sort("price", SortOrder.ASC)
.from((page - 1) * size)
.size(5);
// 结果对象
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
handleResponse(response);
} -
高亮显示
高亮显示 1
2
3
4
5
6
7
8
9
10
11
12
13
void testHighLight() throws IOException {
// 1. 准备 request 对象
SearchRequest request = new SearchRequest("hotel");
// 2. 准备 DSL
// 2.1 准备 query
request.source().query(QueryBuilders.matchQuery("all", "如家"));
// 高亮
request.source().highlighter(new HighlightBuilder().field("name").requireFieldMatch(false));
// 结果对象
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
handleResponse(response);
}高亮显示处理 高亮: highlight
部分的内容 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43// 完整高亮处理
void testHighLight() throws IOException {
// 1. 准备 request 对象
SearchRequest request = new SearchRequest("hotel");
// 2. 准备 DSL
// 2.1 准备 query
request.source().query(QueryBuilders.matchQuery("all", "如家"));
// 高亮
request.source().highlighter(new HighlightBuilder().field("name").requireFieldMatch(false));
// 结果对象
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
handleResponse(response);
}
// 结果解析抽取
public static void handleResponse(SearchResponse response) {
// 4. 解析响应
SearchHits searchHits = response.getHits();
// 4.1 获取总条数
long total = searchHits.getTotalHits().value;
System.out.println("共搜索到: " + total);
// 4.2 文档数组
SearchHit[] hits = searchHits.getHits();
// 4.3 遍历
for (SearchHit hit : hits) {
// 获取文档 source
String json = hit.getSourceAsString();
// 反序列化
HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);
// 获取高亮结果
Map<String, HighlightField> highlightFields = hit.getHighlightFields();
if (!CollectionUtils.isEmpty(highlightFields)) {
// 获取高亮值
HighlightField highlightField = highlightFields.get("name");
if (highlightField != null) {
String name = highlightField.getFragments()[0].string();
// 覆盖高亮结果
hotelDoc.setName(name);
}
}
System.out.println("hotelDoc = " + hotelDoc);
}
}实现高亮
-
步骤
-
定义实体类,
接受前端请求 将搜索、分页、排序方式等封装到实体类 1
2
3
4
5
6
7
8
9
10
11
12
public class RequestParams {
// 搜索框的关键字
private String key;
// 当前页
private Integer page;
// 偏移量
private Integer size;
// 排序方式
private String sortBy;
} -
定义
controller
接口, 接受页面请求, 调用 search
方法 -
请求方式
POST
-
请求路径
/hotel/list
-
请求参数: 对象,类型为
RequestParams
-
返回值:
PageResult
,包含两个属性-
总条数
Long total
-
酒店数据
List<HotelDoc> hotels
1
2
3
4
5
6
7
8
9
10
11
12
13
public class PageResult {
Long total;
List<HotelDoc> hotels;
public PageResult() {
}
public PageResult(Long total, List<HotelDoc> hotels) {
this.total = total;
this.hotels = hotels;
}
}
-
-
-
定义
接口中的
,search 方法 利用 match
查询实现根据关键字搜索酒店信息 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {
private RestHighLevelClient client;
public PageResult search(RequestParams requestParams) {
SearchResponse response = null;
try {
// 1. 准备 Resuest
SearchRequest request = new SearchRequest("hotel");
// 2. 准备 DSL
/****************************/
// 2.1 准备 query
String key = requestParams.getKey();
if (key == null || "".equals(key)) {
// 前端未传递 key
request.source().query(QueryBuilders.matchAllQuery());
} else {
request.source().query(QueryBuilders.matchQuery("all", key));
}
// 2.2 分页
int page = requestParams.getPage();
int size = requestParams.getSize();
request.source().from((page - 1) * size).size(size);
/****************************/
// 3. 发送请求,得到响应
response = client.search(request, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
// 4. 解析响应
return handleResponse(response);
}
// 4. 解析响应
private PageResult handleResponse(SearchResponse response) {
// 解析响应
SearchHits searchHits = response.getHits();
// 获取总条数
long total = searchHits.getTotalHits().value;
// 文档数组
SearchHit[] hits = searchHits.getHits();
List<HotelDoc> hotels = new ArrayList<>();
for (SearchHit hit : hits) {
// 获取文档 source
String json = hit.getSourceAsString();
// 反序列化
HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);
hotels.add(hotelDoc);
}
// 封装返回
return new PageResult(total, hotels);
}
}
-
-
距离排序
距离排序 -
广告置顶
我们需要给置顶的酒店文档
HotelDoc
加一个标记, 然后利用 function score
给带有标记的文档增加权重 -
实现步骤分析
-
给
HotelDoc
类添加一个 isAD
字段, Boolean
类型 -
挑选几个酒店,给他的文档数据添加
isAD
字段, 值为 true
1
2
3
4
5
6
7# 更新 文档
id=38665 添加字段 isAD=true
POST /hotel/_doc/38665
{
"doc": {
"isAD": true
}
} -
修改
search
方法, 添加 function score
功能, 给 isAD
值为 true
的酒店增加权重
Function score
查询可以控制文档的相关性算分 -
-
-
聚合的分类
(对数据类型有限制: keyword、数值、日期、boolean
)等 聚合: 可以实现对文档数据的统计、分析、运算
- 常见聚合有三类:
- 桶
( *Bucket
)聚合:用来对文档做分组 TermAggregation
:按照文档字段值分组Date Histogram
:按照日期阶梯分组, 例如一周一组, 或者一月一组等等
- 度量
( *Metric
)聚合:用以计算一些值, 比如: 最大值, 最小值,平均值等 AVG
MAX
MIN
Stats
同时求 max,min,avg,sum
等
- 管道
( pipeline
)聚合: 其他聚合的结果为基础做聚合
- 桶
- 常见聚合有三类:
-
DSL
实现 Bucket
聚合 统计所有数据中的酒店品牌有几种,
此时可以根据酒店品牌的名称做聚合, 类型为 term
1
2
3
4
5
6
7
8
9
10
11
12GET /hotel/_search
{
"size": 0, // 设置 size 为 0,结果中不包含文档, 只包含聚合结果
"aggs": { // 定义聚合
"brandArr": { // 给聚合起个名称
"terms": { // 聚合的类型,按照品牌值聚合, 所以选择 term
"field": "brand", // 参与聚合的字段
"size": 20 // 希望获取的聚合结果数量
}
}
}
}默认情况下,
Bucket
聚合会统计 Bucket
内的文档数量, 记为 _count
并且按照 _count
降序排序,我们可以修正结果排序方式 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15GET /hotel/_search
{
"size": 0, // 设置 size 为 0,结果中不包含文档, 只包含聚合结果
"aggs": { // 定义聚合
"brandArr": { // 给聚合起个名称
"terms": { // 聚合的类型,按照品牌值聚合, 所以选择 term
"field": "brand", // 参与聚合的字段
"order":{
"_count": "asc" // 按照_count 升序排序
},
"size": 20 // 希望获取的聚合结果数量
}
}
}
}默认情况下,
Bucket
聚合是对索引库的所有文档做聚合, 我们可以限定要聚合的文档范围, 只要添加 query
条件即可 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24GET /hotel/_search
{
// 添加聚合条件
"query": {
"range": {
"price": {
"gte": 150,
"lte": 200
}
}
},
"size": 0, // 设置 size 为 0,结果中不包含文档, 只包含聚合结果
"aggs": { // 定义聚合
"brandArr": { // 给聚合起个名称
"terms": { // 聚合的类型,按照品牌值聚合, 所以选择 term
"field": "brand", // 参与聚合的字段
"order":{
"_count": "asc" // 按照_count 升序排序
},
"size": 20 // 希望获取的聚合结果数量
}
}
}
} -
DSL
实现 Metrices
聚合 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30GET /hotel/_search
{
"query": {
"range": {
"price": {
"gte": 150,
"lte": 200
}
}
},
"size": 0,
"aggs": {
"brandArr": {
"terms": {
"field": "brand",
"order": {
"_count": "asc"
},
"size": 20
},
"aggs": { // 是 brands 聚合的子聚合,也就是分组后对每组分别计算
"score_stats": { // 聚合名称
"stats": { // 聚合类型,这里是 stats 可以计算 min,max,avg 等
"field": "score" // 聚合字段,这里是 score
}
}
}
}
}
} -
JavaRestAPI
实现聚合 对比分析 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33// 测试聚合
void testBuckets() throws IOException {
// 1. 准备 request
SearchRequest request = new SearchRequest("hotel");
// 2. 准备 DSL
// 2.1 设置 size 只看聚合
request.source().size(0);
request.source().aggregation(
AggregationBuilders
// 聚合名称
.terms("brandAgg")
// 期望获取文档数量
.size(10)
// 聚合字段
.field("brand")
);
// 3. 发出请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 4. 解析结果
System.out.println("response = " + response);
Aggregations aggregations = response.getAggregations();
// 4.1 根据聚合名称获取聚合结果(自动生成的对象并非所要结果)
Terms brandTerms = aggregations.get("brandAgg");
// 4.2 获取 buckets
List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
// 4.3 遍历
for (Terms.Bucket bucket : buckets) {
// 4.4 获取 key
String key = bucket.getKeyAsString();
System.out.println(key);
}
}聚合结果获取分析 -
自动补全
拼音分词器: https://github.com/medcl/elasticsearch-analysis-pinyin
1
2
3
4
5
6
7
8# 测试分词器
POST /_analyze
{
"text": [
"如家酒店还不错"
],
"analyzer": "pinyin"
}-
自定义分词器
( elasticsearch
中分词器的组成包含如下三部分) character filters
:在 tokenizer
之前对文本进行处理, 例如删除字符、替换字符 tokenizer
:将文本按照一定的规则切割成词条, 例如 keyword
就是部分此,还有 id_smart
tokenizer filter
将 tokenizer
输出的词条做进一步处理, 例如大小写转换, 同义词处理, 拼音处理等
分析 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43# 自定义配置
(创建库) 未添加 搜索分词器
PUT /test
{
"settings": {
"analysis": {
"analyzer": {
"my_analyzer": {
"tokenizer": "ik_max_word",
"filter": "py"
}
},
"filter": {
"py": {
"type": "pinyin",
"keep_full_pinyin": false,
"keep_joined_full_pinyin": true,
"keep_original": true,
"limit_first_letter_length": 16,
"remove_duplicated_term": true,
"none_chinese_pinyin_tokenize": false
}
}
}
},
"mappings": {
"properties": {
"name": {
"type": "text",
"analyzer": "my_analyzer"
}
}
}
}
# 使用
POST /test/_analyze
{
"text": ["如家酒店真不错"],
"analyzer": "my_analyzer"
}测试使用 -
搜索时出先问题
同义词 解决上述问题: 拼音分词器适合在创建倒排索引的时候使用,
但不能在搜索的时候使用 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35# 创建文档时指定创建分词器和搜索分词器
PUT /test
{
"settings": {
"analysis": {
"analyzer": {
"my_analyzer": { // 自定义分词器名称
"tokenizer": "ik_max_word", //
"filter": "py"
}
},
"filter": {
// 规则: 在 github readme 文档有对应使用方法
"py": {
"type": "pinyin",
"keep_full_pinyin": false,
"keep_joined_full_pinyin": true,
"keep_original": true,
"limit_first_letter_length": 16,
"remove_duplicated_term": true,
"none_chinese_pinyin_tokenize": false
}
}
}
},
"mappings": {
"properties": {
"name": {
"type": "text",
"analyzer": "my_analyzer", // 创建倒排索引使用的分词器
"search_analyzer": "ik_smart" // 搜索时使用的分词器
}
}
}
}
-
-
自动补全
1
2
3
4
5
6
7
8
9
10
11# 创建索引库
PUT coderitl
{
"mappings": {
"properties": {
"title":{
"type": "completion"
}
}
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16# 实例数据
POST coderitl/_doc
{
"title":["Sony","WH=1000XM3","9001","0090010"]
}
POST coderitl/_doc
{
"title":["SK-IT","PITERA","BBCS","DDES"]
}
POST coderitl/_doc
{
"title":["Nintendo","switch","AAFG","YYDS"]
}1
2
3
4
5
6
7
8
9
10
11
12
13
14# 自动补全查询语法
GET /coderitl/_search
{
"suggest": {
"title_suggest": { // title_suggest 自定义名称
"text": "so", // 查询关键字
"completion": {
"field": "title", // 补全查询的字段
"skip_duplicates": true, // 跳过重复的
"size": 10 // 获取前 10 条数据
}
}
}
}-
修改
hotel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82PUT /hotel
{
"settings": {
"analysis": {
"analyzer": {
"text_analyzer": {
"tokenizer": "ik_max_word",
"filter": "py"
},
"completion_analyzer": {
"tokenizer": "keyword",
"filter": "py"
}
},
"filter": {
"py": {
"type": "pinyin",
"keep_full_pinyin": false,
"keep_joined_full_pinyin": true,
"keep_original": true,
"limit_first_letter_length": 16,
"remove_duplicated_term": true,
"none_chinese_pinyin_tokenize": false
}
}
}
},
"mappings": {
"properties": {
"id": {
"type": "keyword"
},
"name": {
"type": "text",
"analyzer": "text_analyzer",
"search_analyzer": "ik_smart",
"copy_to": "all"
},
"address": {
"type": "keyword",
"index": false
},
"price": {
"type": "integer"
},
"score": {
"type": "integer"
},
"brand": {
"type": "keyword",
"copy_to": "all"
},
"city": {
"type": "keyword"
},
"startName": {
"type": "keyword"
},
"business": {
"type": "keyword",
"copy_to": "all"
},
"location": {
"type": "geo_point"
},
"pic": {
"type": "keyword",
"index": false
},
"all": {
"type": "text",
"analyzer": "text_analyzer",
"search_analyzer": "ik_smart"
},
"suggestion": {
"type": "completion",
"analyzer": "completion_analyzer"
}
}
}
}1
2
3
4
5
6
7
8
9
10// 数据库对应字段 List<String> suggestion
"suggestion": {
"type": "completion",
"analyzer": "completion_analyzer"
}
// 那些数据库字段做自动补全
// 构造实现字段赋值
this.suggestion = Arrays.asList(this.brand,this.business)对自动补全完善
-
-
JavaApi
实现自动补全 JavaApi
实现自动补全 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37// 测试自动补全 API
public void testSuggest() throws IOException {
// 1. 准备 request
SearchRequest request = new SearchRequest("hotel");
// 准备 DSL
request
.source()
.suggest(
new SuggestBuilder()
// 自定义名称: suggestions
.addSuggestion(
"suggestions",
// 参与自动补全的字段: suggestion
SuggestBuilders.completionSuggestion("suggestion")
// 搜索关键字
.prefix("sd")
// 跳过重复的
.skipDuplicates(true)
// 期望显示的数据量
.size(10)));
// 发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 4. 解析结果
Suggest suggest = response.getSuggest();
// 4.1 根据不全查询名称 获取补全结果 Suggest.Suggestion<? extends Suggest.Suggestion.Entry<? extends
// Suggest.Suggestion.Entry.Option>> == CompletionSuggestion
CompletionSuggestion suggestions = suggest.getSuggestion("suggestions");
// 4.2 获取 options
List<CompletionSuggestion.Entry.Option> options = suggestions.getOptions();
// 4.3 遍历
for (CompletionSuggestion.Entry.Option option : options) {
// 获取 text
String text = option.getText().toString();
System.out.println(text);
}
}结果解析 -
数据同步
-
数据同步问题分析
elasticsearch
中的酒店数据来自于 mysql
,因此 mysql
数据发生改变时, elasticsearch
也必须跟着改变, 这个就是 elasticsearch
与 mysql
之间的 数据同步
-
数据同步的三种方式
-
同步调用
同步调用 - 优点: 实现简单,
粗暴 - 缺点: 业务耦合度高
- 优点: 实现简单,
-
异步通知
( 推荐
)异步通知 - 优点: 低耦合,
实现难度一般 - 缺点: 依赖
mq
的可靠性
- 优点: 低耦合,
-
监听
binlog
监听 binlog
- 优点: 完全解除服务间耦合
- 缺点: 开启
binlog
增加数据库负担, 实现复杂度高
-
-
-
案例实现数据同步
数据同步实现
-
问题
-
海量数据存储问题: 将索引库从逻辑上拆分为
N
个分片, 存储到多个节点 -
单点故障问题: 将分片数据在不同节点备份
实现
-
-
搭建集群
集群所需环境容量大小 (运行内存 4G+
,否则过于卡顿,无法操作) 1
2# 创建网络
docker network create elastic-
es
运行需要修改一些 linux
系统权限, 修改 /etc/sysctl.conf
1
2
3
4
5
6
7# 编辑
vim /etc/sysctl.conf
# 添加如下
vm.max_map_count=262144
# 保存退出后使其生效
sysctl -p -
docker-compose.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121version: '3'
services:
es01:
image: docker.elastic.co/elasticsearch/elasticsearch:7.13.2
container_name: es01
environment:
- node.name=es01
- cluster.name=es-docker-cluster
- discovery.seed_hosts=es02,es03
- cluster.initial_master_nodes=es01,es02,es03
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- TZ="Asia/Shanghai"
- node.master=true
- node.data=true
- http.cors.enabled=true
- http.cors.allow-origin=*
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- /root/es/data/data01:/usr/share/elasticsearch/data # 数据文件
- /root/es/logs/logs01:/usr/share/elasticsearch/logs # 日志文件
ports:
- 9200:9200
networks:
- elastic
es02:
image: docker.elastic.co/elasticsearch/elasticsearch:7.13.2
container_name: es02
environment:
- node.name=es02
- cluster.name=es-docker-cluster
- discovery.seed_hosts=es01,es03
- cluster.initial_master_nodes=es01,es02,es03
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- TZ="Asia/Shanghai"
- node.master=true
- node.data=true
- http.cors.enabled=true
- http.cors.allow-origin=*
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- /root/es/data/data02:/usr/share/elasticsearch/data
- /root/es/logs/logs02:/usr/share/elasticsearch/logs
ports:
- 9201:9200
- 9301:9300
networks:
- elastic
es03:
image: docker.elastic.co/elasticsearch/elasticsearch:7.13.2
container_name: es03
environment:
- node.name=es03
- cluster.name=es-docker-cluster
- discovery.seed_hosts=es01,es02
- cluster.initial_master_nodes=es01,es02,es03
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- TZ="Asia/Shanghai"
- node.master=true
- node.data=true
- http.cors.enabled=true
- http.cors.allow-origin=*
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- /root/es/data/data03:/usr/share/elasticsearch/data
- /root/es/logs/logs03:/usr/share/elasticsearch/logs
ports:
- 9202:9200
- 9302:9300
networks:
- elastic
kibana:
image: docker.elastic.co/kibana/kibana:7.13.2
container_name: kibana
environment:
- I18N_LOCALE=zh-CN
ports:
- "5601:5601"
links:
- es01:vm01
depends_on:
- es01
- es02
- es03
networks:
- elastic
cerebro:
image: lmenezes/cerebro:0.9.2
container_name: cerebro
ports:
- "19000:9000"
links:
- es01:vm01
command:
- -Dhosts.0.host=http://vm01:9200
networks:
- elastic
volumes:
data01:
driver: local
data02:
driver: local
data03:
driver: local
networks:
elastic:
driver: bridge -
构建
1
docker-compose up -d
构建安装集群
-
-
访问
5601 出现 Kibana server is not ready yes
1
2
3# 获取 es01 的容器 ip
docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' c6 # c6 是 es01 的容器id
# 复制获取的的 ip -
进入
kibana
容器内部 1
2
3
4
5
6
7
8
9
10# 进入 kibana
docker exec -it kibana bash
# 进入 config
cd config
# 修改该配置文件,根据下图实现
vim kibana.yml
# 修改完毕后重启
docker-compose restart kibana修改如下 -
集群状态监控
github
地址: https://github.com/lmenezes/cerebro 下载链接: https://github.com/lmenezes/cerebro/releases/download/v0.9.4/cerebro-0.9.4.zip (
windows
)可使用 -
集群访问测试
集群访问测试 -
创建索引库
1
2
3
4
5
6
7
8
9
10
11
12PUT /coderitl
{
"settings":{
"number_of_shards": 3, // 分片数量
"number_of_replicas":1 // 副本数量
},
"mappings":{
"properties":{
// mapping 映射定义
}
}
}DSL
创建 cerebro
创建索引库 -
集群节点职责划分
节点类型 配置参数 默认值 节点职责 master eligible
node.master
true
备选主节点: 主节点可以管理和记录集群状态, 决定分片在那个节点, 处理创建和删除索引库的请求 data
node.data
true
数据节点: 存储数据、搜索、聚合、 CRUD
ingest
node.ingest
true
数据存储之前的预处理 coordonating
上面 3
个参数都为 false
,则为coordonating
节点 无 路由请求到其他节点合并其他节点处理的结果, 返回给用户 -
ES
集群的脑裂 默认情况下,
每个节点都是 master eligible
节点, 因此一旦 master
节点宕机, 其他候选节点会选举一个成为主节点, 当主节点与其他节点网络故障时, 可能发生脑裂问题 脑裂产生的原因 为了避免脑裂,
需要要求候选票超过 (eligible
才能当选为主,节点数量 + 1) / 2 因此 eligible
节点数量最好是奇数, 对应配置项是 discovery.zen.minmum_master_nodes
在 es7.0
以后, 已经成为默认配置, 因此一般不会发生脑裂, -
ES
集群的分布式存储 当新增文档时,
应该保存到不同的分片, 保证数据均衡, 主要通过 hash
算法来计算文档应该存储到那个分片 -
添加数据
添加数据( 9200
)获取数据 ( 9201
)1
2
3
4
5
6{
"explain":true, // 获取来自那个分片信息
"query": {
"match_all": {}
}
}数据成功的存储在不同分片上 1
2# hash 算法
shard = hash(_routing) % number_of_shards-
_routing
: 默认是文档的id
-
算法与分片数量有关,
因此索引库一旦创建, 分片数量不能修改 -
新增文档流程
新增文档流程
-
-
分布式查询
-
scatter phase
:分散阶段, coordinating node
会把请求分发到每一个分片 -
gather phase
:聚集阶段,coordinating node
汇总 data node
的搜索结果, 并处理为最终结果集返回给用户 先分发, 后汇总,在返回给用户
-
-
-
问题引出
停止一个主机 -
故障转移
集群的
master
节点会监控集群中的节点状态, 如果发现有节点宕机, 会立即将宕机节点的分片数据转移到其他节点,确保数据安全, 这个叫做 故障转移
分片备份自动转移
-
Sentinel
-
详细使用
-
雪崩问题
微服务调用链路中的某个服务故障,引起整个链路中的所有微服务都不可用,
这就是雪崩 雪崩问题 -
解决雪崩问题的常见方式有四种
-
超时处理: 设定超时时间,
请求超过一定时间没有响应就返回错误信息, 不会无休止等待(只能缓解) -
船壁模式: 限定每个业务能使用的线程数,
避免耗尽整个 tomcat
的资源, 因此也叫线程隔离 船壁模式 -
熔断降级: 由熔断器统计业务执行的异常比例,
如果超出阈值则会 熔断
该业务,拦截访问该业务的一切请求 熔断降级 -
流量控制: 限制业务访问的
QPS
,避免服务因流量的突增而故障(可以实现预防,其他三个是避免
)(已经发生雪崩, 阻止传递) QPS
:每秒钟处理的请求数量
-
-
-
服务保护技术对比
**
Sentinel
Hystrix(已停止维护)
隔离级别
信号量隔离 线程池隔离 / 信号量隔离 熔断降级策略
基于慢调用比例或异常比例 基于失败比率 实时指标实现 滑动窗口 滑动窗口 (基于 RxJava
)规则配置 支持多种数据源 支持多种数据源 扩展性 多个扩展性 插件的形式 基于注解的支持 支持 支持 限流
基于 QPS
,支持基于调用关系的限流优先支持 流量整形
支持慢启动, 匀速排队模式 不支持 系统自适应保护 支持 不支持 控制台
开箱即用, 可规则配置, 查看秒级监控, 机器发现等 不完善 常见框架的适配 Servlet、Spring Cloud、Dubbo、gPRC
等 Servlet、Spring Cloud、Netflix
-
安装
文档: https://sentinelguard.io/zh-cn/
github
: https://github.com/alibaba/Sentineljar
下载链接: https://github.com/alibaba/Sentinel/releases/download/1.8.4/sentinel-dashboard-1.8.4.jar -
下载
jar
文件 1
2# 在文件位置执行
java -jar sentinel-dashboard-1.8.4.jar执行 -
访问
用户名和密码:
sentinel
-
修改配置
配置项 默认值 说明 server.port
8080
服务端口 sentinel.dashboard.auth.username
sentinel
默认用户名 sentinel.dashboard.auth.password
sentinel
默认密码 1
2# 修改
java -jar sentinel-dashboard-1.8.4.jar -Dserver.port=10010
-
在
order-service
中整合 sentinel
并且连接 sentinel
的控制台 -
引入
sentinel
依赖 1
2
3
4
5<!-- TODO: 整合 sentinel -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency> -
配置
1
2
3
4
5spring:
cloud:
sentinel:
transport:
dashboard: localhost:8080 -
访问微服务的任意端点
( 接口 controller
),触发 sentinel
监控 查看
-
-
簇点链路
簇点链路
:就是项目内的调用链路,链路中 被监控
的每个接口就是一个资源,默认情况下sentinel
会监控 SpringMVC
的每一个端点 ( 接口
),因此 SpringMVC
的每一个端点就是调用链路中的一个资源 簇点链路 -
流控
单机阈值: 其含义局势限制
/order/{orderId}
这个资源的单机 QPS
为 100(通过压力测试获取)
,超出的请求会被拦截报错-
流控模式
-
直连: 统计当前资源的请求,
触发阈值时对当前资源直接限流, 也就是默认的模式 -
关联: 统计与当前资源相关的另一个资源,
触发阈值时, 对当前资源限流 使用场景: 比如用户支付时需要修改订单状态,
同时用户要查询订单, 查询和修改操作会争抢数据库锁, 产生竞争, 业务需求时有限支付和更新订单的业务, 因此当修改订单业务触发阈值时, 需要对订单业务限流 配置过程 -
配置流控规则:
当 /order/update
资源被访问的 QPS
超过 5
时, 对 /order/query
请求限流 对谁限流加给谁 对 query
限流 shishi
-
满足如下条件使用关联模式
- 两个有竞争关系的资源
- 一个优先级较高,一个优先级较低
-
-
链路: 统计从指定链路访问到本资源的请求时,
对指定链路限流 -
两条请求链路
/test1 => /common
/test2 => /common
-
案例
需求: 有查询订单和创建订单业务,
两者都需要查询到商品, 针对从查询订单进入到查询商品的请求统计, 并设置限流 -
步骤
-
在
OrderService
中添加一个 queryGoods
方法, 不用实现业务 -
在
OrderController
中, 改造 /order/query
端点, 调用 OrderService
中的 queryGoods
-
在
OrderController
中添加一个 /orrder/save
的端点, 调用 OrderService
的 queryGoods
方法 -
给
queryGoods
设置限流规则, 从 /order/query
进入 queryGoods
的方法限制 QPS
必须小于 2
配置案例
-
-
配置链路
-
sentinel
默认只标记 Controller
中的方法为资源, 如果要标记其他方法 1
2
3
4
public String queryGoods() {
return "queryGoods被调用.............";
} -
Sentinel
默认会将 Controller
中的方法做 context
整合, 导致链路模式的流控失效 1
2
3
4spring:
cloud:
sentinel:
web-context-unify: false # 关闭 context 整合
-
-
-
-
-
流控效果
流控效果是指请求达到流控阈值时应该采取的措施
-
快速失败:达到阈值后,
新的请求会被立即拒绝并抛出 FlowException
异常,是默认的处理方式 -
warn up
:预热模式,对超出阈值的请求同样是拒绝并抛出异常. 但这种模式阈值会动态变化, 从一个较小的值逐渐增加到最大阈值 是应对服务冷启动的一种方案,
请求阈值初始值是 threshold /(除号) coldFactory
,持续指定时长后,逐渐提高到 threshold
值,而 coldFactory
的默认值是 3
Eg: 设置
,QPS 的 threshold 为 10 预热时间 5s
,那么初始阈值就是 10/3
,也就是3
,然后再 5s
后逐渐增长到 10
增长趋势 配置 实时监控 -
排队等待: 让所有的请求按照先后次序排队执行,
两个请求的间隔不能小于指定时长 当请求超过
QPS
阈值时, 快速失败和 warn up
会拒绝新的请求并抛出异常, 而排队等待则是让所有请求进入一个队列中, 然后按照阈值允许的时间间隔内依次执行, 后来的请求必须等待前面执行完成, 如果请求预期的等待时间超出最大时长, 则会被拒绝 Eg: QPS=5
,意味着每200ms
处理一个队列中的请求, timeout=2000
意味着预期等待超过 2000ms
的请求会被拒绝并抛出异常 过程
-
-
热点参数限流
之前的限流是统计访问某个资源的所有请求,
判断是否超过 QPS
阈值, 而热点参数限流是分别统计 参数值相同
的请求,判断是否超过QPS
阈值 热点参数限流 -
配置位置
热点参数配置位置 -
重要参数解析
参数解析 -
高级选项参数
高级选项参数 - 如果参数值是
100
,则每1s
允许的 QPS
为 10
- 如果参数值是
101
,则每1s
允许的 QPS
为 15
热点参数限流对默认的
SpringMVC
资源无效, 需要通过 @SentinelResource
注解 - 如果参数值是
-
-
授权规则
授权规则 可以对调用方的来源总控制,
有白名单和黑名单两种方式 - 白名单: 来源
( origin
)在白名单内的调用者允许访问 - 黑名单: 来源
( origin
)在黑名单内的调用者不允许访问
配置 - 白名单: 来源
-
授权规则配置步骤
配置授权规则 -
在
gateway
服务中, 利用网关的过了不起添加名为 gateway
的 origin
头 1
2
3
4
5spring:
cloud:
gateway:
default-filters:
- AddRequestHeader=origin,gateway # 添加名为 origin 的请求头,值为: gateway -
从
request
获取一个名为 origin
的请求头, 作为 origin
的值 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15package com.coderitl.springcloud.sentinel;
public class HeaderOriginParset implements RequestOriginParser {
public String parseOrigin(HttpServletRequest request) {
// 1. 获取请求头
String origin = request.getHeader("origin");
// 2. 非空判断
if (StringUtils.isEmpty(origin)) {
origin = "blank";
}
return origin;
}
}添加配置 限制访问来源 -
自定义异常结果
异常 说明 FlowException
限流异常 ParamFlowException
热点参数限流异常 DegradeException
降级异常 AuthorityException
授权规则异常 SystemBlockException
系统规则异常 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32package com.coderitl.springcloud.handler;
public class SentinelBlockHandler implements BlockExceptionHandler {
public void handle(
HttpServletRequest httpServletRequest,
HttpServletResponse httpServletResponse,
BlockException e)
throws Exception {
String msg = "未知异常";
// 限流状态码都是 429
int status = 429;
if (e instanceof FlowException) {
msg = "请求被限流了.........";
} else if (e instanceof DegradeException) {
msg = "请求被降级了.........";
} else if (e instanceof ParamFlowException) {
msg = "热点参数限流.........";
} else if (e instanceof AuthorityException) {
msg = "请求没有权限!";
// 权限 401
status = 401;
}
httpServletResponse.setContentType("application/json;charset=utf-8");
httpServletResponse.setStatus(status);
httpServletResponse
.getWriter()
.println("{\"message\":\"" + msg + "\",\"status\":" + status + "}");
}
}优化显示
-
-
规则持久化模式
( 3
)种 -
规则管理模式-
pull
模式 pull
模式: 控制台将配置的规则推送到 Sentinel
客户端,而客户端会将配置规则保存在本地文件或数据库中,以后会定时去本地文件或数据库中查询, 更新本地规则 缺点: 存在时效性 -
规则管理模式-
push
模式 push
模式: 控制台将配置规则推送到远程配置中心, 例如 Nacos
,Sentinel
客户端监听 Nacos
,获取配置变更的推送消息,完成本地配置更新| 推荐 -
原始方式
(重启失效)
-
-
Sentinel
规则持久化 -
引入依赖
1
2
3
4
5<!-- 在 order-service 中引入 sentinel 监听 nacos 的依赖 -->
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-nacos</artifactId>
</dependency> -
配置
nacos
地址 1
2
3
4
5
6
7
8
9
10
11
12# 在 order-service 中的 application.yml 文件配置 nacos 地址以及监听的配置信息
spring:
cloud:
cloud:
sentinel:
datasource:
flow:
nacos:
server-addr: localhost:8848 # nacos 地址
dataId: orderservice-flow-rules
groupId: SENTINEL_GROUP
ruleType: flow # 还可以是 degrade authority param-flow
-
-
Redis
-
详细使用
-
持久化
-
RDB
全称是
Redis Database Backup file(Redis
,数据备份文件) 也被叫做 Redis
数据快照。简单来说就是把内存中的所有数据都记录到磁盘中, 当 Redis
实例故障重启后, 从磁盘读取快照文件, 恢复数据 快照文件称为
RDB
文件, 默认是保存在当前运行目录 1
2
3
4
5127.0.0.1:6379> save # 由 Redis 主进程来执行 RDB,
会阻塞所有命令
127.0.0.1:6379> bgsave # 开启子进程执行 RDB,避免主进程收到影响
# Redis 停机时会执行一次 RDB触发机制 -
bgsave
bgsave
流程 - 基本流程
fork
主进程得到一个子进程, 共享内存空间 - 子进程读取内存数据并写入新的
RDB
文件 - 用新
RDB
文件替换旧的 RDB
文件
- 基本流程
-
-
AOF
持久化 AOF
全称为 Append Oble File(追加文件)
,Redis
处理的每一个写命令都会记录在 AOF
文件, 可以看作是命令日志文件 AOF
(存储了命令,执行命令恢复到原始数据) AOF
默认是关闭的, 需要修改 redis.conf
配置文件来开启 AOF
1
2
3
4
5
6
7
8
9
10
11
12
13
14# 禁用 RDB
save ""
# 是否开启 AOF 功能 默认是 no
appendonly yes
# AOF 文件的名称
appendfilename "appendonly.aof"
# AOF 的命令记录的频率也可以通过 redis.conf 文件来配置
# 表示没执行一次写命令 立即记录到 AOF 文件
appendfsync always
# 写命令执行完先放入 AOF 缓冲区,然后表示每隔 1s 将缓冲区数据写到 AOF 文件, 是默认方案
appendfsync everysec
# 写命令执行完先放入 AOF 缓冲区,由操作系统决定何时将缓冲区内容写回磁盘
appendfsync no配置项 刷盘时机 优点 缺点 always
同步刷盘 可靠性高,几乎不丢数据 性能影响大 everysec
每秒刷盘 性能适中 最多丢失 1s
数据 no
操作系统控制 性能最好 可靠性差, 可能丢失大量数据 因为是记录命令,
AOF
文件会比 RDB
文件大得多, 而且 AOF
会记录对同一个 key
的多次操作, 但只有最后一次写操作才有意义, 铜鼓执行 bgrewriteaof
命令, 可以让 AOF
文件执行重写功能, 用最少的命令达到相同效果 执行重写 bgrewriteaof(redis 命令行执行)
Redis
也会在触发阈值时自动去重写 AOF
文件, 阈值也可以在 redis.conf
中配置 1
2
3
4# AOF 文件比上次文件 增长超过多少百分比则触发重写
auto-aof-rewrite-percentage 100
# AOF 文件体积最小多大以上才可以触发重写
auto-aof-rewrite-min-size 64mb-
RDB
与 AOF
对比 RDB
AOF
持久化方式 定时对整个内存做快照 记录每一次执行的命令 数据完整性 不完整,两次备份之间会丢失 相对完整, 取决于刷盘策略 文件大小 会有压缩, 文件体积小 记录命令,文件体积很大 宕机恢复速度 很快 慢 数据恢复优先级 低,因为数据完整性不如 AOF
高, 因为数据完整性更高 系统资源占用 高, 大量 CPU
和内存消耗 低,主要是磁盘 IO
资源但 AOF
重写时会占用大量 CPU
和内存资源 使用场景 可以容忍数分钟的数据丢失, 追求更快的启动速度 对数据安全性要求较高常见
-
-
搭建集群
一主二从 -
配置文件
1
2
3
4
5
6
7
8# 进入 tmp 目录
cd /tmp
# 创建目录
mkdir 7001 7002 7003
# 拷贝配置文件(一键考培 参数 1 是数字 `一` )
echo 7001 7002 7003 | xargs -t -n 1 cp redis-6.2.4/redis.conf -
修改每个实例的端口、工作目录
修改每个文件夹内的配置文件,
将端口分别修改为 7001,7002,7003
,将rdb
文件保存位置都修改自己所在目录 1
2
3sed -i -e 's/6379/7001/g' -e 's/dir .\//dir \/tmp\/7001\//g' 7001/redis.conf
sed -i -e 's/6379/7002/g' -e 's/dir .\//dir \/tmp\/7002\//g' 7002/redis.conf
sed -i -e 's/6379/7003/g' -e 's/dir .\//dir \/tmp\/7003\//g' 7003/redis.conf -
修改每个实例的声明
IP
1
2
3# 虚拟机本身有多个 ip 为了避免将来混乱 我们需要在 redis.conf 文件中指定每一个实例的绑定 ip 信息
# redis 实例的声明 IP
replica-announce-ip1
2
3
4
5
6
7# 逐一修改 1a:
数字 1 a: 追加模式 (1a:第一行后追加)
sed -i '1a replica-announce-ip 43.142.80.9' 7001/redis.conf
sed -i '1a replica-announce-ip 43.142.80.9' 7002/redis.conf
sed -i '1a replica-announce-ip 43.142.80.9' 7003/redis.conf
# 一键修改
printf '%s\n' 7001 7002 7003 | xargs -I{} -t sed -i '1a replica-announce-ip 43.142.80.9' {}/redis.conf -
启动三个窗口实例方便查看日志
同时启动服务: tmp/ redis-server 700x/redis.conf
-
-
开启主从关系
-
有临时和永久两种模式
-
修改配置文件
( 永久生效
)1
2# 在 redis.conf 中添加一行配置 replicaof | slaveof(5.0
以前) 命令
slaveof <masterip> <masterport> -
使用
redis-cli
客户端连接到 redis
服务, 执行 slaveof
命令 ( 重启后生效
)1
slaveof <masterip> <masterport>
1
2
3
4
5
6
7
8# 前提条件所有主机配置如下内容
(否则集群失败)
1. 注释 bind 127.0.0.0
2. protected-mode no
# 连接测试 登录 7002 | 7003
redis-cli -p 7002
# 执行 slaveof => 7002 | 7003 成为 7001 的 slaveof(从机)
slaveof 43.142.80.9 7001成功搭建集群环境( 命令: info replication 查看信息
)
-
-
-
数据同步原理
数据同步原理(第一次是全量同步) 后续同步使用 replid
-
master
如何判断 slave
是不是第一次来同步数据? Replication Id
:简称 replid
是数据集的标记, id
一致则说明是同一数据集,每一个 master
都有唯一的 reolid
,slave
则会继承 master
节点的 replid
offset
偏移量, 随着记录在 repl_baklog
中的数据增多而逐渐增大, slave
完成同步时也会记录当前同步的 offset
,如果slave
的 offset
小于 master
的 offset
,说明slave
数据落后于 master
,需要更新
因此
slave
做数据同步, 必须向 master
声明自己的 replication Id
和 offset
,master
才可以判断到底需要同步那些数据 -
增量同步
( slave
重启后同步) 增量同步 repl_baklog
大小有上限, 写满后会覆盖最早的数据, 如果 slave
断开时间过久, 导致尚未备份的数据被覆盖, 则无法基于 log
做增量同步, 只能再次全量同步 -
优化
Redis
主从集群 -
在
master
中配置 repl-diskless-sync yes
启用无磁盘复制, 避免全量同步是的 磁盘 IO
-
Redis
单节点上的内存占用不要太大, 减少 RDB
导致的过多磁盘 IO
-
适当提高
repl_baklog
的大小, 发现 slave
宕机时尽快实现故障恢复, 尽可能避免全量同步 -
限制一个
master
上的 slave
节点数量, 如果实在是太多 slave
,则可以采用 主-从-从
链式结构,减少 master
压力 主-从-从
链式
-
-
总结
- 简述全量同步和增量同步的区别
- 全量同步:
master
将完整的内存数据生成 RDB
,发送RDB
到 slave
,后续命令则记录在repl_baklog
,逐个发送给slave
- 增量同步:
slave
提交自己的 offset
到 master
,master
获取 repl_baklog
中从 offset
之后的命令给 slave
- 全量同步:
- 什么时候执行全量同步
slave
节点第一次连接 master
节点时 slave
节点断开时间太久, repl_baklog
中的 offset
已经被覆盖时
- 什么时候执行增量同步
slave
节点断开又恢复, 并且在 repl_baklog
中能找到 offset
时
- 简述全量同步和增量同步的区别
-
-
哨兵作用
Redis
提供了哨兵 Sentinel
机制来实现主从集群的自动故障恢复 -
监控:
Sentinel
会不断检查你的 master
和 slave
是否按预期工作 -
自动故障恢复:如果
master
故障, Sentinel
会将一个 slave
提升为 master
,当故障实例恢复后也已新的master
为主 -
通知:
Sentinel
充当 Redis
客户端的服务发现来源, 当集群发生故障转移时, 会将最新信息推送给 Redis
的客户端 哨兵 -
服务状态监控
Sentinel
基于心跳机制检测服务状态, 每隔 1s
向集群的每个实例发送 ping
命令 - 主观下线: 如果某
sentinel
节点发现某实例未在规定时间响应, 则认为该实例 主观下线
- 客观下线: 若超过指定数量
( quorum
)的 Sentinel
都认为该实例主观下线, 则该实例客观下线, quorum
值最好超过 sentinel
实例数量的一半
- 主观下线: 如果某
-
选举新的
master
一旦发现
master
故障, Sentinel
需要在 slave
中选择一个作为新的 master
,选择依据如下:- 首先会判断
slave
节点与 master
节点断开时间长短, 如果超过指定值 ( down-after-milliseconds * 10
)则会排除该 slave
节点 - 然后判断
slave
节点的 slave-prioity
值, 越小优先级越高, 如果是 0
则永不参与选举 - 如果
slave-prority
一样, 则判断 slave
节点的 offset
值, 越大说明数据越新, 优先级越高 - 最后是判断
slave
节点的运行 id
大小, 越小优先级越高
- 首先会判断
-
如何实现故障转移
当选中了其中一个
slave
为新的 master
后 -
Sentinel
给备选的 slave
节点发送 slaveof no one
命令, 让该节点成为 master
-
Sentinel
给所有其他 slave
发送 slaveof ip poort
命令, 让这些 slave
成为新 master
的从节点, 开始新的 master
上同步数据 -
最后,
Sentinel
将故障节点标记为 slave
,当故障节点恢复后会自动成为新的master
的 slave
节点 故障转移
-
-
-
哨兵集群搭建
哨兵集群搭建图 -
创建目录
1
2
3
4
5
6
7
8cd /tmp
# 创建文件夹
mkdir s1 s2 s3
# 创建配置文件
cd s1/
vim sentinel.conf1
2
3
4
5
6
7
8
9# sentinel.conf 文件内容 3
份
# 是当前 sentinel 实例的端口
port 27001
sentinel announce-ip 43.142.80.9
# 指定主节点信息
sentinel monitor mymaster 43.142.80.9 7001 2
sentinel down-after-milliseconds mymaster 5000
sentinel failover-timeout mymaster 60000
dir "/tmp/s1"- 解读
mymaster
:主节点名称, 自定义 43.142.80.9 7001
:主节点的 ip
和端口 2
选举 master
时的 quorum
值
1
2
3
4
5# 方式一: 逐个拷贝
cp s1/sentinel.conf s2
cp s1/sentinel.conf s3
# 方式二: 管道组合命令 一键拷贝
echo s2 s3 | xargs -t -n 1 cp s1/sentinel.conf1
2
3# 修改 s2、s3 两个文件夹内的配置文件,
将端口分别修改为 27002 、27003
sed -i -e 's/27001/27002/g' -e 's/s1/s2/g' s2/sentinel.conf
sed -i -e 's/27001/27003/g' -e 's/s1/s3/g' s3/sentinel.conf流程信息 - 解读
-
启动
3
个 redis 实例查看 1
2
3
4
5
6# 第 1 个
redis-sentinel s1/sentinel.conf
# 第 2 个
redis-sentinel s2/sentinel.conf
# 第 3 个
redis-sentinel s3/sentinel.confredis-sentinel
成功集群
-
-
RedisTemplate
的哨兵模式 -
引入依赖
1
2
3
4<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency> -
修改配置
1
2
3
4
5
6
7
8
9
10
11
12
13logging:
level:
io.lettuce.core: debug
pattern:
dateformat: MM-dd HH:mm:ss:SSS
spring:
redis:
sentinel:
master: mymaster # 指定 master 名称
nodes: # 指定 redis-sentinel 集群信息
- 43.142.80.9:27001
- 43.142.80.9:27002
- 43.142.80.9:27003 -
控制器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class HelloRedisController {
private StringRedisTemplate redisTemplate;
public String hi( { String key)
return redisTemplate.opsForValue().get(key);
}
public String hi( { String key, String value)
redisTemplate.opsForValue().set(key, value);
return "success add key";
}
} -
配置主从读写分离
1
2
3
4
5
6
7
8
9
10package com.coderitl.springcloud.redis.conf;
public class RedisBeanConf {
public LettuceClientConfigurationBuilderCustomizer clientConfigurationBuilderCustomizer() {
return clientConfigurationBuilder ->
clientConfigurationBuilder.readFrom(ReadFrom.REPLICA_PREFERRED);
}
}这里的
ReadFrom
是配置 Redis
的读取策略, 是一个 枚举
,包括如下选择 MASTER
: 从主节点读取MASTER_PREFFERRED
: 优先从master
节点读取, master
不可用才读取 replica
REPLICA
从 slave(replica)节点读取
REPLICA_PREFERRED
优先从 slave(replica)
节点读取,所有的 slave
都不可用才读取 master
成功获取 Redis
数据 连接日志信息
-
-
结构
结构图 -
分片集群结构
主从和哨兵可以解决高可用,
高并发读的问题, 但是依然有两个问题没有解决 - 海量数据存储问题
- 高并发写问题
使用分片集群可以解决上述问题,
分片集群特征: - 集群中有多个
master
,每个master
保存不同数据 - 每个
master
都可以由多个 slave
节点 master
之间通过 ping
检测彼此健康状态 - 客户端请求可以访问集群任意节点,
最终都会被转发到正确节点
-
搭建分片集群
-
结构图
3
主 3 从 -
ip
分配 IP
PORT
角色 43.142.80.9
7001
master
43.142.80.9
7002
master
43.142.80.9
7003
master
43.142.80.9
8001
slave
43.142.80.9
8002
slave
43.142.80.9
8003
slave
-
准备实例和配置
删除之前的
7001,7002,7003
的目录, 重新创建 7001,7002,7003,8001,8002,8003
目录 1
2
3
4
5# tmp
cd /tmp
# 创建目录
mkdir 7001 7002 7003 8001 8002 8003 -
在
/tmp
下准备一个新的 redis.conf
文件 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21port 6379
# 开启集群功能
cluster-enabled yes
# 集群的配置文件名称 不需要我们创建,由 redis 自己维护
cluster-config-file /tmp/6379/nodes.conf
# 节点心跳失败的超时时间
cluster-node-timeout 5000
# 持久化文件存放目录
dir /tmp/6379
# 绑定地址
bind 0.0.0.0
# 让 redis 后台运行
daemonize yes
# 注册的实例 ip
replica-announce-ip 43.142.80.9
# 保护模式
protected-mode no
# 数据库数量
databases 1
# 日志
logfile /tmp/6379/run.log -
将这个
redis.conf
文件拷贝到每隔目录下 1
2
3
4# 进入 /tmp 目录
cd /tmp
# 执行拷贝
echo 7001 7002 7003 8001 8002 8003 | xargs -t -n 1 cp redis.conf -
修改每个目录下的
redis.conf
,将其中的6379
修改为与所在目录一致 1
2
3cd /tmp
# 修改配置文件
printf '%s\n' 7001 7002 7003 8001 8002 8003 | xargs -I{} -t sed -i 's/6379/{}/g' {}/redis.conf -
启动
1
2
3cd /tmp
# 一键启动所有服务
printf '%s\n' 7001 7002 7003 8001 8002 8003 | xargs -I{} -t redis-server {}/redis.conf -
查看
集群启动查看 -
如果需要关闭所有进程,
可执行命令 1
2
3
4ps -ef | grep redis | awk '{print $2}' | xargs kill
# 推荐关闭方式
printf '%s\n' 7001 7002 7003 8001 8002 8003 | xargs -I{} -t redis-cli -p {} shutdown -
创建集群
1
redis-cli --cluster create --cluster-replicas 1 43.142.80.9:7001 43.142.80.9:7002 43.142.80.9:7003 43.142.80.9:8001 43.142.80.9:8002 43.142.80.9:8003
- 参数说明
redis-cli --cluster | ./redis-trib.rb
:代表集群操作命令 create
: 代表是创建集群--cluster-replicas 1(数字
: 指定集群中每个1) master
的副本个数为 1
,此时 节点总数 ➗ (replicas + 1)
得到的就是 master
数量, 因此节点列表中的前 n
个就是 master
,其他节点都是slave
节点, 随机分配到不同 master
成功启动集群 - 参数说明
-
查看集群状态
1
redis-cli -p 7001 cluster nodes
集群状态
-
-
散列插槽
Redis
会把每一个 master
节点映射到 0~16383
个插槽上, 查看集群信息时就能看到 数据
key
不是与节点绑定, 而是与插槽绑定, redis
会根据 key
的有效部分计算插槽值,分两种情况 key
中包含 {}
且 {}
中至少包含一个字符, {}
中的部分是有效部分 key
中不包含 {}
,整个 key
都是有效部分
例如:
key
是 username
,那么根据计算, 如果是 {xxx}username
则根据 xxx
计算, 计算方式是利用 CRC16
算法得到一个 hash
值, 然后对 16384
取余, 得到的结果就是 slot
值 slot(redis-cli -c -p 7001) 参数: -c 至关重要
-
总结
-
Redis
如何判断某个 key
应该在那个实例? - 将
16384
个插槽分配到不同的实例 - 根据
key
的有效部分计算哈希值, 对 16384
取余 - 余数作为插槽,
寻找插槽所在实例即可
- 将
-
如何将同一类数据固定的保存在同一个
Redis
实例 -
这一类数据使用相同的有效部分,
例如 key
都以 {typeId}
为前缀 数据固定的保存在同一个 Redis
实例
-
-
集群伸缩
向集群中添加一个新的
master
节点, 并向其中存储 num = 10
-
需求
-
启动一个新的
redis
实例, 端口为 7004
1
2
3
4mkdir 7004
cp redis.conf 7004
sed -i 's/6379/7004/g' 7004/redis.conf
redis-server 7004/redis.conf -
添加
7004
到之前的集群, 并作为一个 master
节点 1
redis-cli --cluster add-node 43.142.80.9:7004 43.142.80.9:7001
-
给
7004
节点分配插槽, 使得 num
这个 key
可以存储到 7004
实例 1
2# 重新分片
redis-cli --cluster reshard ip:port分配插槽
-
-
-
-
故障转移
故障转移 -
数据迁移
数据迁移流程 利用
cluster failover
命令可以手动让集群中的某个 master
宕机, 切换到执行 cluster failover
命令的这个 slave
节点, 实现无感知的数据迁移 - 手动的
Failover
支持三种不同模式 - 缺省:默认的流程,
如图 1-6
步 force
:省略了对 offset
的一致性校验 takeover
直接执行第 5
步, 忽略数据一致性, 忽略数据一致性, 忽略 master
状态和其他 master
的意见
- 缺省:默认的流程,
- 手动的
-
RedisTemplate
访问分片集群 - 引入
redis
的 starter 依赖 - 配置分片集群地址
- 配置读写分离
分片集群配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15logging:
level:
io.lettuce.core: debug
pattern:
dateformat: MM-dd HH:mm:ss:SSS
spring:
redis:
cluster:
nodes: # 指定分片集群的每一个节点信息
- 43.142.80.9:7001
- 43.142.80.9:7002
- 43.142.80.9:7003
- 43.142.80.9:8001
- 43.142.80.9:8002
- 43.142.80.9:8003 - 引入
-