JAVA最全面试题大全(二)

Spring面试题

1、Spring IOC是什么

Spring 中的org.springframework.beans 包和 org.springframework.context 包构成了 Spring 框架 IoC 容器的基础。BeanFactory 接口提供了一个先进的配置机制,使得任何类型的对象的配置成为可能。ApplicationContex 接口对 BeanFactory(是一个子接口)进行了扩展,在 BeanFactory的基础上添加了其他功能,比如与 Spring 的 AOP 更容易集成,也提供了处理 message resource的机制(用于国际化)、事件传播以及应用层的特别配置,比如针对 Web 应用的WebApplicationContext。org.springframework.beans.factory.BeanFactory 是 Spring IoC 容器的具体实现,用来包装和管理前面提到的各种 bean。BeanFactory 接口是 Spring IoC 容器的核心接口。IOC:把对象的创建、初始化、销毁交给 spring 来管理,而不是由开发者控制,实现控制反转。

2、什么是依赖注入DI

依赖注入是在编译阶段尚未知所需的功能是来自哪个的类的情况下,将其他对象所依赖的功能对象实例化的模式。这就需要一种机制用来激活相应的组件以提供特定的功能,所以依赖注入是控制反转的基础。否则如果在组件不受框架控制的情况下,框架又怎么知道要创建哪个组件?

在 Java 中依赖注入有以下三种实现方式:

  • 构造器注入
  • Setter 方法注入
  • 接口注入

哪种依赖注入方式你建议使用,构造器注入,还是 Setter 方法注入?

构造器注入和 Setter 方法注入。最好的解决方案是用构造器参数实现强制依赖,setter 方法实现可选依赖。

3、BeanFactory 和 ApplicationContext 有什么区别

BeanFactory 可以理解为含有 bean 集合的工厂类。BeanFactory 包含了种 bean 的定义,以便在接收到客户端请求时将对应的 bean 实例化。

BeanFactory 还能在实例化对象的时生成协作类之间的关系。此举将 bean 自身与 bean 客户端的配置中解放出来。BeanFactory 还包含 了 bean 生命周期的控制,调用客户端的初始化方法(initialization methods)和销毁方法(destruction methods)。从表面上看,application context 如同 bean factory 一样具有 bean 定义、bean 关联关系的设置,根据请求分发 bean 的功能。但 applicationcontext 在此基础上还提供了其他的功能。

  • 提供了支持国际化的文本消息
  • 统一的资源文件读取方式
  • 已在监听器中注册的 bean 的事件

以下是几种较常见的 ApplicationContext 实现方式:

  • XmlWebApplicationContext:由 Web 应用的 XML 文件读取上下文
  • AnnotationConfigApplicationContext(基于 Java 配置启动容器)

FileSystemXmlApplicationContext :由文件系统中的 XML 配置文件读取上下文。

ApplicationContext context = new FileSystemXmlApplicationContext(“bean.xml”);

ClassPathXmlApplicationContext:从 classpath 的 XML 配置文件中读取上下文,并生成上下文定义。应用程序上下文从程序环境变量中

ApplicationContext context = new ClassPathXmlApplicationContext(“bean.xml”);

4、 Spring Bean 的生命周期

  • Spring 容器从 XML 文件中读取 bean 的定义,并实例化 bean。
  • Spring 根据 bean 的定义填充所有的属性。
  • 如果 bean 实现了 BeanNameAware 接口,Spring 传递 bean 的 ID 到 setBeanName 方法。
  • 如果 Bean 实现了 BeanFactoryAware 接口,Spring 传递 beanfactory 给 setBeanFactory 方法。
  • 如果有任何与 bean 相关联的 BeanPostProcessors,Spring 会在postProcesserBeforeInitialization()方法内调用它们。
  • 如果 bean 实现 IntializingBean 了,调用它的 afterPropertySet 方法,如果 bean 声明了初始化方法,调用此初始化方法。
  • 如果有 BeanPostProcessors 和 bean 关联,这些 bean 的postProcessAfterInitialization()方法将被调用。
  • 如果 bean 实现了 DisposableBean,它将调用 destroy()方法。
bean 的生命周期从调用 beanFactory 的 getBean 开始,到这个 bean 被销毁,可以总结为以下七个阶段:处理名称,检查缓存处理父子容器处理 dependsOn选择 scope 策略创建 bean类型转换处理销毁 bean

划分的阶段和名称并不重要,重要的是理解整个过程中做了哪些事情

1. 处理名称,检查缓存

  • 这一步会处理别名,将别名解析为实际名称
  • 对 FactoryBean 也会特殊处理,如果以 & 开头表示要获取 FactoryBean 本身,否则表示要获取其产品
  • 这里针对单例对象会检查一级、二级、三级缓存
    • singletonFactories 三级缓存,存放单例工厂对象
    • earlySingletonObjects 二级缓存,存放单例工厂的产品对象
      • 如果发生循环依赖,产品是代理;无循环依赖,产品是原始对象
    • singletonObjects 一级缓存,存放单例成品对象

2. 处理父子容器

  • 如果当前容器根据名字找不到这个 bean,此时若父容器存在,则执行父容器的 getBean 流程
  • 父子容器的 bean 名称可以重复

3. 处理 dependsOn

  • 如果当前 bean 有通过 dependsOn 指定了非显式依赖的 bean,这一步会提前创建这些 dependsOn 的 bean
  • 所谓非显式依赖,就是指两个 bean 之间不存在直接依赖关系,但需要控制它们的创建先后顺序

4. 选择 scope 策略

  • 对于 singleton scope,首先到单例池去获取 bean,如果有则直接返回,没有再进入创建流程
  • 对于 prototype scope,每次都会进入创建流程
  • 对于自定义 scope,例如 request,首先到 request 域获取 bean,如果有则直接返回,没有再进入创建流程

5.1 创建 bean - 创建 bean 实例

要点总结
有自定义 TargetSource 的情况由 AnnotationAwareAspectJAutoProxyCreator 创建代理返回
Supplier 方式创建 bean 实例为 Spring 5.0 新增功能,方便编程方式创建 bean 实例
FactoryMethod 方式 创建 bean 实例① 分成静态工厂与实例工厂;② 工厂方法若有参数,需要对工厂方法参数进行解析,利用 resolveDependency;③ 如果有多个工厂方法候选者,还要进一步按权重筛选
AutowiredAnnotationBeanPostProcessor① 优先选择带 @Autowired 注解的构造;② 若有唯一的带参构造,也会入选
mbd.getPreferredConstructors选择所有公共构造,这些构造之间按权重筛选
采用默认构造如果上面的后处理器和 BeanDefiniation 都没找到构造,采用默认构造,即使是私有的

5.2 创建 bean - 依赖注入

要点总结
AutowiredAnnotationBeanPostProcessor识别 @Autowired 及 @Value 标注的成员,封装为 InjectionMetadata 进行依赖注入
CommonAnnotationBeanPostProcessor识别 @Resource 标注的成员,封装为 InjectionMetadata 进行依赖注入
resolveDependency用来查找要装配的值,可以识别:① Optional;② ObjectFactory 及 ObjectProvider;③ @Lazy 注解;④ @Value 注解(${ }, #{ }, 类型转换);⑤ 集合类型(Collection,Map,数组等);⑥ 泛型和 @Qualifier(用来区分类型歧义);⑦ primary 及名字匹配(用来区分类型歧义)
AUTOWIRE_BY_NAME根据成员名字找 bean 对象,修改 mbd 的 propertyValues,不会考虑简单类型的成员
AUTOWIRE_BY_TYPE根据成员类型执行 resolveDependency 找到依赖注入的值,修改 mbd 的 propertyValues
applyPropertyValues根据 mbd 的 propertyValues 进行依赖注入(即xml中 `<property name ref

5.3 创建 bean - 初始化

要点总结
内置 Aware 接口的装配包括 BeanNameAware,BeanFactoryAware 等
扩展 Aware 接口的装配由 ApplicationContextAwareProcessor 解析,执行时机在 postProcessBeforeInitialization
@PostConstruct由 CommonAnnotationBeanPostProcessor 解析,执行时机在 postProcessBeforeInitialization
InitializingBean通过接口回调执行初始化
initMethod根据 BeanDefinition 得到的初始化方法执行初始化,即 <bean init-method> 或 @Bean(initMethod)
创建 aop 代理由 AnnotationAwareAspectJAutoProxyCreator 创建,执行时机在 postProcessAfterInitialization

5.4 创建 bean - 注册可销毁 bean

在这一步判断并登记可销毁 bean

  • 判断依据
    • 如果实现了 DisposableBean 或 AutoCloseable 接口,则为可销毁 bean
    • 如果自定义了 destroyMethod,则为可销毁 bean
    • 如果采用 @Bean 没有指定 destroyMethod,则采用自动推断方式获取销毁方法名(close,shutdown)
    • 如果有 @PreDestroy 标注的方法
  • 存储位置
    • singleton scope 的可销毁 bean 会存储于 beanFactory 的成员当中
    • 自定义 scope 的可销毁 bean 会存储于对应的域对象当中
    • prototype scope 不会存储,需要自己找到此对象销毁
  • 存储时都会封装为 DisposableBeanAdapter 类型对销毁方法的调用进行适配

6. 类型转换处理

  • 如果 getBean 的 requiredType 参数与实际得到的对象类型不同,会尝试进行类型转换

7. 销毁 bean

  • 销毁时机
    • singleton bean 的销毁在 ApplicationContext.close 时,此时会找到所有 DisposableBean 的名字,逐一销毁
    • 自定义 scope bean 的销毁在作用域对象生命周期结束时
    • prototype bean 的销毁可以通过自己手动调用 AutowireCapableBeanFactory.destroyBean 方法执行销毁
  • 同一 bean 中不同形式销毁方法的调用次序
    • 优先后处理器销毁,即 @PreDestroy
    • 其次 DisposableBean 接口销毁
    • 最后 destroyMethod 销毁(包括自定义名称,推断名称,AutoCloseable 接口 多选一)

5、哪些是重要的 bean 生命周期方法?你能重载它们吗?

有两个重要的 bean 生命周期方法,第一个是 setup,它是在容器加载 bean 的时候被调用。第二个方法是 teardown 它是在容器卸载类的时候被调用。 Thebean 标签有两个重要的属性(init-method 和 destroy-method)。用它们你可以自己定制初始化和注销方法。它们也有相应的注解(@PostConstruct 和@PreDestroy)。

6、Spring初始化bean的流程

Spring初始化Bean的流程如下:

  1. Spring通过BeanFactory或ApplicationContext加载配置文件,并且解析配置文件中的Bean定义。
  2. 对于每个Bean定义,Spring会根据Bean定义信息创建一个BeanDefinition对象。BeanDefinition对象包含了Bean的名称、类型、作用域、依赖关系等信息。
  3. Spring根据BeanDefinition对象中的信息,创建Bean实例。如果Bean的作用域是singleton,则只创建一个实例;如果Bean的作用域是prototype,则每次请求都会创建一个新的实例。
  4. 如果Bean实现了BeanNameAware接口,则调用BeanNameAware接口的setBeanName()方法,将Bean的名称传递给Bean。
  5. 如果Bean实现了BeanFactoryAware接口,则调用BeanFactoryAware接口的setBeanFactory()方法,将BeanFactory实例传递给Bean。
  6. 如果Bean实现了ApplicationContextAware接口,则调用ApplicationContextAware接口的setApplicationContext()方法,将ApplicationContext实例传递给Bean。
  7. 如果Bean实现了BeanPostProcessor接口,则调用BeanPostProcessor接口的postProcessBeforeInitialization()方法,对Bean进行预处理。
  8. 如果Bean实现了InitializingBean接口,则调用InitializingBean接口的afterPropertiesSet()方法,对Bean进行初始化。
  9. 如果Bean配置了init-method属性,则调用Bean的init-method方法,对Bean进行初始化。
  10. 如果Bean实现了BeanPostProcessor接口,则调用BeanPostProcessor接口的postProcessAfterInitialization()方法,对Bean进行后处理。
  11. 将初始化后的Bean实例添加到Spring容器中。

在整个初始化过程中,Spring使用了反射、依赖注入、AOP等技术,实现了Bean的自动装配和管理。这些技术使得Bean的创建和管理变得非常便捷和灵活,大大提高了应用程序的开发效率和质量。

7、Spring的循环依赖

循环依赖是指两个或多个Bean互相依赖,形成了一个环路的情况。在Spring中,如果发生循环依赖,会抛出BeanCurrentlyInCreationException异常,提示循环依赖错误。

Spring通过三级缓存来解决循环依赖问题,三级缓存分别是singletonObjects、earlySingletonObjects和singletonFactories。

  1. 当Spring需要创建一个Bean时,首先会检查singletonObjects缓存中是否存在该Bean的实例,如果存在则直接返回该实例。
  2. 如果singletonObjects缓存中不存在该Bean的实例,但是earlySingletonObjects缓存中存在该Bean的“早期”实例(即未完成依赖注入的实例),则将该“早期”实例返回,避免出现循环依赖。
  3. 如果singletonObjects和earlySingletonObjects缓存中都不存在该Bean的实例,则调用createBean()方法创建该Bean的实例,同时将该实例存储到singletonFactories缓存中。
  4. 在创建Bean实例的过程中,如果发现依赖的Bean还未创建,则先创建依赖的Bean实例,并将其存储到earlySingletonObjects缓存中。如果依赖的Bean已经存在,则直接从singletonObjects缓存中获取该Bean的实例。
  5. 当创建完所有的Bean实例后,Spring会执行依赖注入操作,将依赖的Bean注入到目标Bean中。
  6. 完成依赖注入后,Spring会将该Bean实例从singletonFactories缓存中移除,并将其存储到singletonObjects缓存中,以供后续使用。

需要注意的是,循环依赖可能会导致程序逻辑混乱,因此应该尽量避免出现循环依赖的情况。如果无法避免循环依赖,可以考虑使用构造函数注入代替属性注入,或者通过使用代理对象来解决循环依赖问题。

循环依赖的产生

  • 首先要明白,bean 的创建要遵循一定的步骤,必须是创建、注入、初始化三步,这些顺序不能乱

image-20210903085238916
  • set 方法(包括成员变量)的循环依赖如图所示
    • 可以在【a 创建】和【a set 注入 b】之间加入 b 的整个流程来解决
    • 【b set 注入 a】 时可以成功,因为之前 a 的实例已经创建完毕
    • a 的顺序,及 b 的顺序都能得到保障

image-20210903085454603
  • 构造方法的循环依赖如图所示,显然无法用前面的方法解决

image-20210903085906315

构造循环依赖的解决

  • 思路1
    • a 注入 b 的代理对象,这样能够保证 a 的流程走通
    • 后续需要用到 b 的真实对象时,可以通过代理间接访问

image-20210903091627659
  • 思路2
    • a 注入 b 的工厂对象,让 b 的实例创建被推迟,这样能够保证 a 的流程先走通
    • 后续需要用到 b 的真实对象时,再通过 ObjectFactory 工厂间接访问

image-20210903091743366
  • 示例1:用 @Lazy 为构造方法参数生成代理
public class App60_1 { static class A { private static final Logger log = LoggerFactory.getLogger("A"); private B b; public A(@Lazy B b) { log.debug("A(B b) {}", b.getClass()); this.b = b; } @PostConstruct public void init() { log.debug("init()"); } } static class B { private static final Logger log = LoggerFactory.getLogger("B"); private A a; public B(A a) { log.debug("B({})", a); this.a = a; } @PostConstruct public void init() { log.debug("init()"); } } public static void main(String[] args) { GenericApplicationContext context = new GenericApplicationContext(); context.registerBean("a", A.class); context.registerBean("b", B.class); AnnotationConfigUtils.registerAnnotationConfigProcessors(context.getDefaultListableBeanFactory()); context.refresh(); System.out.println(); } }
  • 示例2:用 ObjectProvider 延迟依赖对象的创建
public class App60_2 { static class A { private static final Logger log = LoggerFactory.getLogger("A"); private ObjectProvider<B> b; public A(ObjectProvider<B> b) { log.debug("A({})", b); this.b = b; } @PostConstruct public void init() { log.debug("init()"); } } static class B { private static final Logger log = LoggerFactory.getLogger("B"); private A a; public B(A a) { log.debug("B({})", a); this.a = a; } @PostConstruct public void init() { log.debug("init()"); } } public static void main(String[] args) { GenericApplicationContext context = new GenericApplicationContext(); context.registerBean("a", A.class); context.registerBean("b", B.class); AnnotationConfigUtils.registerAnnotationConfigProcessors(context.getDefaultListableBeanFactory()); context.refresh(); System.out.println(context.getBean(A.class).b.getObject()); System.out.println(context.getBean(B.class)); } }
  • 示例3:用 @Scope 产生代理
public class App60_3 { public static void main(String[] args) { GenericApplicationContext context = new GenericApplicationContext(); ClassPathBeanDefinitionScanner scanner = new ClassPathBeanDefinitionScanner(context.getDefaultListableBeanFactory()); scanner.scan("com.itheima.app60.sub"); context.refresh(); System.out.println(); } }
@Component class A { private static final Logger log = LoggerFactory.getLogger("A"); private B b; public A(B b) { log.debug("A(B b) {}", b.getClass()); this.b = b; } @PostConstruct public void init() { log.debug("init()"); } }
@Scope(proxyMode = ScopedProxyMode.TARGET_CLASS) @Component class B { private static final Logger log = LoggerFactory.getLogger("B"); private A a; public B(A a) { log.debug("B({})", a); this.a = a; } @PostConstruct public void init() { log.debug("init()"); } }
  • 示例4:用 Provider 接口解决,原理上与 ObjectProvider 一样,Provider 接口是独立的 jar 包,需要加入依赖
<dependency> <groupId>javax.inject</groupId> <artifactId>javax.inject</artifactId> <version>1</version> </dependency>
public class App60_4 { static class A { private static final Logger log = LoggerFactory.getLogger("A"); private Provider<B> b; public A(Provider<B> b) { log.debug("A({}})", b); this.b = b; } @PostConstruct public void init() { log.debug("init()"); } } static class B { private static final Logger log = LoggerFactory.getLogger("B"); private A a; public B(A a) { log.debug("B({}})", a); this.a = a; } @PostConstruct public void init() { log.debug("init()"); } } public static void main(String[] args) { GenericApplicationContext context = new GenericApplicationContext(); context.registerBean("a", A.class); context.registerBean("b", B.class); AnnotationConfigUtils.registerAnnotationConfigProcessors(context.getDefaultListableBeanFactory()); context.refresh(); System.out.println(context.getBean(A.class).b.get()); System.out.println(context.getBean(B.class)); } }

解决 set 循环依赖的原理

一级缓存

image-20210903100752165

作用是保证单例对象仅被创建一次

  • 第一次走 getBean("a") 流程后,最后会将成品 a 放入 singletonObjects 一级缓存
  • 后续再走 getBean("a") 流程时,先从一级缓存中找,这时已经有成品 a,就无需再次创建

一级缓存与循环依赖

image-20210903100914140

一级缓存无法解决循环依赖问题,分析如下

  • 无论是获取 bean a 还是获取 bean b,走的方法都是同一个 getBean 方法,假设先走 getBean("a")
  • 当 a 的实例对象创建,接下来执行 a.setB() 时,需要走 getBean("b") 流程,红色箭头 1
  • 当 b 的实例对象创建,接下来执行 b.setA() 时,又回到了 getBean("a") 的流程,红色箭头 2
  • 但此时 singletonObjects 一级缓存内没有成品的 a,陷入了死循环

二级缓存

image-20210903101849924

解决思路如下:

  • 再增加一个 singletonFactories 缓存
  • 在依赖注入前,即 a.setB() 以及 b.setA() 将 a 及 b 的半成品对象(未完成依赖注入和初始化)放入此缓存
  • 执行依赖注入时,先看看 singletonFactories 缓存中是否有半成品的对象,如果有拿来注入,顺利走完流程

对于上面的图

  • a = new A() 执行之后就会把这个半成品的 a 放入 singletonFactories 缓存,即 factories.put(a)
  • 接下来执行 a.setB(),走入 getBean("b") 流程,红色箭头 3
  • 这回再执行到 b.setA() 时,需要一个 a 对象,有没有呢?有!
  • factories.get() 在 singletonFactories 缓存中就可以找到,红色箭头 4 和 5
  • b 的流程能够顺利走完,将 b 成品放入 singletonObject 一级缓存,返回到 a 的依赖注入流程,红色箭头 6

二级缓存与创建代理

image-20210903103030877

二级缓存无法正确处理循环依赖并且包含有代理创建的场景,分析如下

  • spring 默认要求,在 a.init 完成之后才能创建代理 pa = proxy(a)
  • 由于 a 的代理创建时机靠后,在执行 factories.put(a) 向 singletonFactories 中放入的还是原始对象
  • 接下来箭头 3、4、5 这几步 b 对象拿到和注入的都是原始对象

三级缓存

image-20210903103628639

简单分析的话,只需要将代理的创建时机放在依赖注入之前即可,但 spring 仍然希望代理的创建时机在 init 之后,只有出现循环依赖时,才会将代理的创建时机提前。所以解决思路稍显复杂:

  • 图中 factories.put(fa) 放入的既不是原始对象,也不是代理对象而是工厂对象 fa
  • 当检查出发生循环依赖时,fa 的产品就是代理 pa,没有发生循环依赖,fa 的产品是原始对象 a
  • 假设出现了循环依赖,拿到了 singletonFactories 中的工厂对象,通过在依赖注入前获得了 pa,红色箭头 5
  • 这回 b.setA() 注入的就是代理对象,保证了正确性,红色箭头 7
  • 还需要把 pa 存入新加的 earlySingletonObjects 缓存,红色箭头 6
  • a.init 完成后,无需二次创建代理,从哪儿找到 pa 呢?earlySingletonObjects 已经缓存,蓝色箭头 9

当成品对象产生,放入 singletonObject 后,singletonFactories 和 earlySingletonObjects 就中的对象就没有用处,清除即可

8、Spring 支持的事务管理类型

Spring支持以下几种事务管理类型:

  1. 编程式事务管理:在代码中显式地开启、提交或回滚事务。
  2. 声明式事务管理:使用AOP技术,将事务管理代码与业务逻辑代码分离开来,通过配置文件声明事务增强器(TransactionInterceptor),自动将事务管理代码织入到业务逻辑代码中。
  3. 注解式事务管理:使用注解的方式声明事务增强器(@Transactional),自动将事务管理代码织入到被注解的方法中。
  4. JTA事务管理:使用Java Transaction API (JTA)实现分布式事务管理,支持多个数据源的事务管理。

其中,声明式事务管理和注解式事务管理是Spring事务管理的两种主要方式,它们都通过AOP技术实现了事务管理代码与业务逻辑代码的分离,使得业务逻辑代码更加简洁清晰。在实际开发中,通常使用声明式事务管理或注解式事务管理来管理事务,具体选择哪种方式取决于具体的业务需求和开发习惯。

9、Spring事务7种传播机制

Spring事务的传播机制描述了在一个事务方法调用另一个事务方法时,事务应该如何传播的规则。Spring支持以下7种事务传播机制:

  1. REQUIRED:如果当前没有事务,则创建一个新的事务;如果当前有事务,则加入该事务。
  2. SUPPORTS:如果当前有事务,则加入该事务;如果当前没有事务,则以非事务的方式继续执行。
  3. MANDATORY:如果当前有事务,则加入该事务;如果当前没有事务,则抛出异常。
  4. REQUIRES_NEW:创建一个新的事务,如果当前有事务,则挂起当前事务。
  5. NOT_SUPPORTED:以非事务的方式执行操作,如果当前有事务,则挂起当前事务。
  6. NEVER:以非事务的方式执行操作,如果当前有事务,则抛出异常。
  7. NESTED:如果当前有事务,则在该事务中嵌套一个新的事务;如果当前没有事务,则创建一个新的事务。

10、事务传播类型说明

  1. REQUIRED:如果当前没有事务,则创建一个新的事务;如果当前有事务,则加入该事务。这是最常用的事务传播类型,它保证了方法在一个事务中运行,如果当前存在事务,则加入该事务,否则创建一个新的事务。
  2. SUPPORTS:如果当前有事务,则加入该事务;如果当前没有事务,则以非事务的方式继续执行。该传播类型适用于只读操作,如果当前有事务则加入该事务,否则以非事务的方式执行。
  3. MANDATORY:如果当前有事务,则加入该事务;如果当前没有事务,则抛出异常。该传播类型要求当前必须存在一个事务,否则抛出异常。
  4. REQUIRES_NEW:创建一个新的事务,如果当前有事务,则挂起当前事务。该传播类型会创建一个新的事务,并且挂起当前事务,直到新的事务完成。
  5. NOT_SUPPORTED:以非事务的方式执行操作,如果当前有事务,则挂起当前事务。该传播类型会以非事务的方式执行,如果当前存在事务,则挂起当前事务,直到非事务执行完毕。
  6. NEVER:以非事务的方式执行操作,如果当前有事务,则抛出异常。该传播类型会以非事务的方式执行,如果当前存在事务,则抛出异常。
  7. NESTED:如果当前有事务,则在该事务中嵌套一个新的事务;如果当前没有事务,则创建一个新的事务。该传播类型会在当前事务中嵌套一个新的事务,如果当前没有事务,则创建一个新的事务。

需要注意的是,在选择事务传播类型时,应该根据具体的业务需求和数据一致性要求进行选择,以确保事务管理的正确性和一致性。同时,在使用嵌套事务时,应该特别注意事务的回滚和提交规则,以避免出现不一致的情况。

11、Spring事务为何失效

Spring事务失效的原因有多种可能,以下是一些常见的原因:

  1. 未开启事务管理:如果没有正确配置Spring事务管理器或没有在业务方法上添加@Transactional注解,就可能导致事务无法生效。
  2. 事务传播类型不正确:如果使用了错误的事务传播类型,可能会导致事务无法正确传播或嵌套,从而导致事务失效。
  3. 异常处理不当:如果事务方法中发生异常,但未正确处理或抛出异常,可能会导致事务无法正确回滚或提交,从而导致事务失效。
  4. 数据库引擎不支持事务:如果使用的数据库引擎不支持事务或事务隔离级别,则无法使用Spring事务管理器进行事务管理。
  5. 事务超时或锁等待:如果事务超时或锁等待时间过长,可能会导致事务失败或回滚,从而导致事务失效。
  6. 多线程并发操作:如果多个线程同时访问同一个事务,可能会导致事务无法正确提交或回滚,从而导致事务失效。
  7. 事务方法中包含非事务方法:如果事务方法中调用了非事务方法,可能会导致事务无法正确传播或嵌套,从而导致事务失效。

针对以上问题,可以通过正确配置事务管理器、选择合适的事务传播类型、正确处理异常、优化数据库性能、设置合适的事务超时时间和锁等待时间、使用线程安全的事务管理方式等方式来避免Spring事务失效。

1、抛出检查异常导致事务不能正确回滚

package com.xx.service; import com.baomidou.mybatisplus.extension.service.IService; import com.xx.common.Result; import com.xx.entity.Order; import java.io.FileNotFoundException; /** * @Author: xueqimiao * @Date: 2024/5/10 16:20 */ public interface OrderService extends IService<Order> { Result saveOrder(Order order) throws FileNotFoundException; }
@Override @Transactional public Result saveOrder(Order order) throws FileNotFoundException { String orderNo = "R" + SnowFlakeIdUtil.generateId(); order.setOrderNo(orderNo); order.setUserId("U"+SnowFlakeIdUtil.generateId()); order.setOrderPrice(new BigDecimal(1)); save(order); if(order != null){ new FileInputStream("aaa"); } return Result.ok("操作成功"); }
package com.xx.controller; import com.xx.common.Result; import com.xx.entity.Order; import com.xx.service.OrderService; import jakarta.annotation.Resource; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.io.FileNotFoundException; /** * @Author: xueqimiao * @Date: 2024/5/10 16:31 */ @RestController public class OrderController { @Resource private OrderService orderService; @GetMapping("/saveOrder") public Result saveOrder() throws FileNotFoundException { return orderService.saveOrder(new Order()); } }
原因:Spring 默认只会回滚非检查异常默认情况下,@Transactional 注解只会回滚 RuntimeException 异常及其子类。如果方法中抛出的异常不是 RuntimeException 的子类,那么 Spring 就不会回滚事务,这可能导致数据不一致的问题。因此,为了确保在方法中抛出任何异常时都能够回滚事务,可以在 @Transactional 注解中使用 rollbackFor 属性来指定需要回滚的异常类型。例如,@Transactional(rollbackFor = Exception.class) 表示在方法中抛出任何异常时都会回滚事务。@Transactional(rollbackFor = Exception.class)
@Override @Transactional(rollbackFor = Exception.class) public Result saveOrder(Order order) throws FileNotFoundException { String orderNo = "R" + SnowFlakeIdUtil.generateId(); order.setOrderNo(orderNo); order.setUserId("U"+SnowFlakeIdUtil.generateId()); order.setOrderPrice(new BigDecimal(1)); save(order); if(order != null){ new FileInputStream("aaa"); } return Result.ok("操作成功"); }
测试完还原代码

2、业务方法内自己 try-catch 异常导致事务不能正确回滚

@Override @Transactional(rollbackFor = Exception.class) public Result saveOrder(Order order){ String orderNo = "R" + SnowFlakeIdUtil.generateId(); order.setOrderNo(orderNo); order.setUserId("U"+SnowFlakeIdUtil.generateId()); order.setOrderPrice(new BigDecimal(1)); save(order); if(order != null){ try { new FileInputStream("aaa"); } catch (FileNotFoundException e) { e.printStackTrace(); } } return Result.ok("操作成功"); }
  • 原因:事务通知只有捉到了目标抛出的异常,才能进行后续的回滚处理,如果目标自己处理掉异常,事务通知无法知悉
  • 解法1:异常原样抛出
    • 在 catch 块添加 throw new RuntimeException(e);
  • 解法2:手动设置 TransactionStatus.setRollbackOnly()
    • 在 catch 块添加 TransactionInterceptor.currentTransactionStatus().setRollbackOnly();
@Transactional(rollbackFor = Exception.class) public Result saveOrder(Order order){ String orderNo = "R" + SnowFlakeIdUtil.generateId(); order.setOrderNo(orderNo); order.setUserId("U"+SnowFlakeIdUtil.generateId()); order.setOrderPrice(new BigDecimal(1)); save(order); if(order != null){ try { new FileInputStream("aaa"); } catch (FileNotFoundException e) { // throw new RuntimeException(e); TransactionInterceptor.currentTransactionStatus().setRollbackOnly(); } } return Result.ok("操作成功"); }

3、调用本类方法导致失效

void saveOrder2();
// 注意这里注释了 // @Transactional(rollbackFor = Exception.class) public Result saveOrder(Order order){ saveOrder2(); return Result.ok("操作成功"); } @Transactional(rollbackFor = Exception.class) public void saveOrder2() { Order order = new Order(); String orderNo = "R" + SnowFlakeIdUtil.generateId(); order.setOrderNo(orderNo); order.setUserId("U" + SnowFlakeIdUtil.generateId()); order.setOrderPrice(new BigDecimal(1)); save(order); int n = 10 / 0; }
  • 原因:本类方法调用不经过代理,因此无法增强
1、解决方式一

saveOrder()也加事务注解

@Transactional(rollbackFor = Exception.class) public Result saveOrder(Order order){ saveOrder2(); return Result.ok("操作成功"); } @Transactional(rollbackFor = Exception.class) public void saveOrder2() { Order order = new Order(); String orderNo = "R" + SnowFlakeIdUtil.generateId(); order.setOrderNo(orderNo); order.setUserId("U" + SnowFlakeIdUtil.generateId()); order.setOrderPrice(new BigDecimal(1)); save(order); int n = 10 / 0; }
2、解决方式二
@Resource private OrderService orderService; @Override // 注意这里注释了 // @Transactional(rollbackFor = Exception.class) public Result saveOrder(Order order){ orderService.saveOrder2(); return Result.ok("操作成功"); } @Transactional(rollbackFor = Exception.class) public void saveOrder2() { Order order = new Order(); String orderNo = "R" + SnowFlakeIdUtil.generateId(); order.setOrderNo(orderNo); order.setUserId("U" + SnowFlakeIdUtil.generateId()); order.setOrderPrice(new BigDecimal(1)); save(order); int n = 10 / 0; }

注意会出现循环依赖的问题:

*************************** APPLICATION FAILED TO START *************************** Description: The dependencies of some of the beans in the application context form a cycle: orderController ┌─────┐ | orderServiceImpl └─────┘ Action: Relying upon circular references is discouraged and they are prohibited by default. Update your application to remove the dependency cycle between beans. As a last resort, it may be possible to break the cycle automatically by setting spring.main.allow-circular-references to true.
spring: main: allow-circular-references: true
3、解决方案三
package com.xx.util; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; /** * @Author: xueqimiao * @Date: 2024/5/10 19:26 */ @Component public class SpringApplicationContextUtil implements ApplicationContextAware { private static ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext param) throws BeansException { applicationContext = param; } public static ApplicationContext getApplicationContext() { return applicationContext; } public static <T> T getBean(Class<T> clazz) { return getApplicationContext().getBean(clazz); } }
SpringApplicationContextUtil.getBean(OrderService.class).saveOrder2();
@Override // 注意这里注释了 // @Transactional(rollbackFor = Exception.class) public Result saveOrder(Order order){ // orderService.saveOrder2(); SpringApplicationContextUtil.getBean(OrderService.class).saveOrder2(); return Result.ok("操作成功"); } @Transactional(rollbackFor = Exception.class) public void saveOrder2() { Order order = new Order(); String orderNo = "R" + SnowFlakeIdUtil.generateId(); order.setOrderNo(orderNo); order.setUserId("U" + SnowFlakeIdUtil.generateId()); order.setOrderPrice(new BigDecimal(1)); save(order); int n = 10 / 0; }
4、解决方案四

将 saveOrder2()写到另一个类中去

package com.xx.service; /** * @Author: xueqimiao * @Date: 2024/5/10 19:29 */ public interface OrderService2 { void saveOrder2(); }
package com.xx.service.impl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.xx.entity.Order; import com.xx.mapper.OrderMapper; import com.xx.service.OrderService2; import com.xx.utils.SnowFlakeIdUtil; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.math.BigDecimal; /** * @Author: xueqimiao * @Date: 2024/5/10 17:09 */ @Service public class OrderService2Impl extends ServiceImpl<OrderMapper, Order> implements OrderService2 { @Transactional(rollbackFor = Exception.class) public void saveOrder2() { Order order = new Order(); String orderNo = "R" + SnowFlakeIdUtil.generateId(); order.setOrderNo(orderNo); order.setUserId("U" + SnowFlakeIdUtil.generateId()); order.setOrderPrice(new BigDecimal(1)); save(order); int n = 10 / 0; } }
@Resource private OrderService2 orderService2; @Override // 注意这里注释了 // @Transactional(rollbackFor = Exception.class) public Result saveOrder(Order order){ orderService2.saveOrder2(); return Result.ok("操作成功"); }

4、非 public 方法导致的事务失效

1、SpringBoot2

SpringBoot2SpringBoot3有所区别,以下操作业务逻辑代码不变,以下列出SpringBoot2的依赖

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>xx-spring-tx-boot2</artifactId> <version>1.0-SNAPSHOT</version> <parent> <artifactId>spring-boot-starter-parent</artifactId> <groupId>org.springframework.boot</groupId> <version>2.6.4</version> <relativePath/> </parent> <properties> <maven.compiler.source>17</maven.compiler.source> <maven.compiler.target>17</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <mysql.version>8.0.33</mysql.version> <mybatisplus.version>3.5.3.2</mybatisplus.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>com.xx</groupId> <artifactId>xx-common-core</artifactId> <version>1.4.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>3.5.3</version> </dependency> <!--mysql驱动--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.27</version> </dependency> </dependencies> </project>

我们到OrderService2Impl加一个非public的方法

@Transactional(rollbackFor = Exception.class) void saveOrder3() { Order order = new Order(); String orderNo = "R" + SnowFlakeIdUtil.generateId(); order.setOrderNo(orderNo); order.setUserId("U" + SnowFlakeIdUtil.generateId()); order.setOrderPrice(new BigDecimal(1)); save(order); int n = 10 / 0; }

OrderServiceImpl修改如下

// 注意这里注释了 // @Transactional(rollbackFor = Exception.class) public Result saveOrder(Order order){ orderService2Impl.saveOrder3(); return Result.ok("操作成功"); }
  • 原因:Spring 为方法创建代理、添加事务通知、前提条件都是该方法是 public 的
  • 解法1:改为 public 方法
  • 解法2:添加 bean 配置如下

同时启动类也要加@EnableTransactionManagement,需要自定义事务管理的行为,例如指定不同的事务管理器或配置其他事务相关的属性,那么你需要显式地添加 @EnableTransactionManagement 注解,并在配置类中进行自定义配置。

@Bean public TransactionAttributeSource transactionAttributeSource() { return new AnnotationTransactionAttributeSource(false); }

yaml也要配置允许bean覆盖

spring: main: allow-bean-definition-overriding: true

image-20240514135543803
2、SpringBoot3

在SpringBoot3(也就是Spring6.0)中默认是可以代理非public方法的,所以这一点在SpringBoot3中是不会失效的。

image-20240514135638496

5、Aop 切面顺序导致导致事务不能正确回滚

@Transactional(rollbackFor = Exception.class) public Result saveOrder(Order order){ save(order); int n = 10 / 0; return Result.ok("操作成功"); }
package com.xx.aspect; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.springframework.stereotype.Component; /** * @Author: xueqimiao * @Date: 2024/5/14 14:04 */ @Aspect @Component public class MyAspect { @Around("execution(* saveOrder(..))") public Object around(ProceedingJoinPoint pjp) throws Throwable { try { return pjp.proceed(); } catch (Throwable e) { return null; } } }
  • 原因:事务切面优先级最低,但如果自定义的切面优先级和他一样,则还是自定义切面在内层,这时若自定义切面没有正确抛出异常…
  • 解法1:异常原样抛出
    • 在 catch 块添加 throw new RuntimeException(e);
  • 解法2:手动设置 TransactionStatus.setRollbackOnly()
    • 在 catch 块添加 TransactionInterceptor.currentTransactionStatus().setRollbackOnly();
  • 解法3:调整切面顺序,在 MyAspect 上添加 @Order(Ordered.LOWEST_PRECEDENCE - 1) (不推荐)

12、AOP到底如何去支持事务

AOP是Spring事务模块实现事务管理的核心技术之一,它通过在目标方法周围添加事务增强(Transaction Advice)来实现事务管理。

具体来说,Spring事务管理器通过AOP将事务增强织入到带有@Transactional注解的业务方法中,从而实现事务管理。当业务方法被调用时,AOP会先判断当前是否已经有事务在进行中,如果有,则使用当前事务,否则创建一个新的事务,并将其与当前线程绑定。

在业务方法执行完毕后,AOP会根据事务执行结果进行提交或回滚操作。如果业务方法正常执行完成,则事务增强会将事务状态设置为可提交状态,当方法返回时,事务管理器会自动提交事务。如果业务方法抛出异常,则事务增强会将事务状态设置为可回滚状态,当方法返回时,事务管理器会自动回滚事务。

需要注意的是,在使用AOP实现事务管理时,应该避免在事务方法中调用非事务方法,因为这样会导致事务无法正确传播或嵌套,从而导致事务失效。

总之,AOP是Spring事务模块实现事务管理的关键技术之一,它通过将事务增强织入到业务方法中,实现了事务的自动管理、提交和回滚等功能,从而大大简化了事务管理的复杂度。

13、ACID,数据库事务的四大基本特性

  1. 原子性(Atomicity):事务是一个原子操作,要么全部执行成功,要么全部回滚失败。当事务执行失败时,所有已经执行的操作都必须回滚到初始状态,保证数据的一致性和完整性。
  2. 一致性(Consistency):事务执行前后,数据库必须保持一致性状态。即,事务执行后,数据库中的数据必须符合所有的约束和规则,不破坏数据完整性和正确性。
  3. 隔离性(Isolation):事务之间是相互隔离的,一个事务执行过程中所做的修改在提交之前对其他事务是不可见的,保证了并发事务的正确性和一致性。
  4. 持久性(Durability):事务提交后,对数据库的修改必须永久保存,即使发生系统故障或重启也不能丢失。

这四个特性是数据库事务的基本特性,保证了数据库操作的正确性、一致性和可靠性。如果数据库不支持ACID特性,则可能会导致数据不一致、丢失或损坏等问题,因此ACID是数据库事务的基本保障和标准。

14、Spring怎么保证线程安全

Spring框架本身并没有直接提供线程安全的机制,但是通过一些最佳实践和设计模式可以在Spring应用程序中实现线程安全。以下是一些保证Spring应用程序线程安全的方法:

  1. 无状态Bean:Spring容器中的Bean默认是单例的,因此要尽量避免在Bean中存储状态或共享状态,而应该使Bean无状态,这样可以避免线程安全问题。
  2. Synchronized关键字:在多线程环境下,可以使用Java的Synchronized关键字来同步共享资源的访问,从而保证线程安全。
  3. ThreadLocal变量:ThreadLocal变量是一种线程本地变量,每个线程都有自己的变量副本,因此可以避免多线程访问的竞争和冲突。
  4. 使用线程安全的集合和对象:Spring框架提供了一些线程安全的集合和对象,如ConcurrentHashMap、AtomicInteger等,可以在多线程环境下安全地使用。
  5. 使用AOP切面:可以使用Spring AOP切面来实现线程安全,通过在切面中添加事务管理、锁定共享资源等操作,可以保证多线程环境下的操作安全性。
  6. 使用注解:Spring提供了一些注解,如@Scope("prototype")、@Async等,可以用来控制Bean的作用域和异步执行,从而实现线程安全。

总之,在Spring应用程序中保证线程安全需要综合考虑多种因素,在程序设计和实现中遵循最佳实践和设计模式,并且在必要的时候使用Java的同步机制、线程安全的集合和对象、AOP切面等技术来保证线程安全。

15、Spring框架中的单例Beans是线程安全的么?

在Spring框架中,单例Beans默认是线程安全的,因为Spring容器在创建单例Bean时只会创建一个Bean实例,并且在每个请求中返回这个实例的引用。这意味着多个线程可以共享同一个Bean实例,并且对Bean实例的修改会在所有线程中反映出来。因此,Spring容器会确保单例Beans的线程安全性。

具体来说,Spring容器会在创建单例Beans时,使用synchronized关键字来确保Bean实例的创建是线程安全的。一旦Bean实例创建完成,Spring容器就不再使用synchronized关键字,因此对于线程安全的Bean实例,多个线程可以同时访问它们,而不会有线程安全问题。

需要注意的是,如果单例Beans中存储了共享状态或可变状态,那么就需要考虑线程安全的问题。此时,可以使用线程安全的集合和对象来确保线程安全,或者使用Spring AOP切面来实现多线程环境下的安全访问。

总之,Spring框架中的单例Beans默认是线程安全的,但是如果单例Beans中存储了共享状态或可变状态,则需要考虑线程安全的问题。

16、Spring Bean前置处理器与后置处理器

Spring Bean前置处理器和后置处理器是Spring框架的两个扩展点,可以在Bean实例化、依赖注入和销毁等生命周期阶段进行自定义处理。下面分别介绍一下Bean前置处理器和后置处理器的作用和用法。

  1. Bean前置处理器

Bean前置处理器是在Bean实例化之前调用的扩展点,它可以在Bean实例化之前对Bean进行自定义处理,例如修改Bean属性、验证Bean状态等。Bean前置处理器是通过实现org.springframework.beans.factory.config.BeanPostProcessor接口来实现的。

Bean前置处理器的使用步骤如下:

(1)创建一个类并实现org.springframework.beans.factory.config.BeanPostProcessor接口。

(2)在实现的方法中进行自定义处理,例如在postProcessBeforeInitialization方法中修改Bean属性或验证状态。

(3)将自定义的Bean前置处理器注册到Spring容器中。

  1. Bean后置处理器

Bean后置处理器是在Bean实例化之后、依赖注入之前和Bean销毁之前调用的扩展点,它可以在Bean实例化后对Bean进行自定义处理,例如修改Bean属性、初始化Bean状态等。Bean后置处理器是通过实现org.springframework.beans.factory.config.BeanPostProcessor接口来实现的。

Bean后置处理器的使用步骤如下:

(1)创建一个类并实现org.springframework.beans.factory.config.BeanPostProcessor接口。

(2)在实现的方法中进行自定义处理,例如在postProcessAfterInitialization方法中修改Bean属性或初始化状态。

(3)将自定义的Bean后置处理器注册到Spring容器中。

需要注意的是,Bean前置处理器和后置处理器都是通过实现BeanPostProcessor接口来实现的,因此如果有多个Bean前置处理器或后置处理器,它们的调用顺序是不确定的。如果需要保证处理器的执行顺序,则可以使用@Order注解或实现Ordered接口来指定处理器的执行顺序。

总之,Spring Bean前置处理器和后置处理器是Spring框架提供的扩展点,可以在Bean实例化、依赖注入和销毁等生命周期阶段进行自定义处理,是Spring框架的重要特性之一。

17、Bean的创建为什么要采用三级缓存

在Spring框架中,Bean的创建过程是一个非常复杂的过程,其中包括Bean的实例化、属性注入、初始化等多个步骤。为了提高Bean的创建效率和减少内存开销,Spring框架采用了三级缓存机制来管理Bean的创建过程。

具体来说,Spring框架中的三级缓存机制包括:

  1. singletonObjects缓存:用于缓存已经创建完成的单例Bean实例。
  2. earlySingletonObjects缓存:用于缓存正在创建中的单例Bean实例。
  3. singletonFactories缓存:用于缓存创建单例Bean实例的工厂方法。

这种缓存机制的设计是为了在Bean的创建过程中,尽可能地复用已经创建完成或正在创建中的Bean实例,从而提高创建效率和减少内存开销。具体来说,当获取一个Bean实例时,Spring框架会首先从singletonObjects缓存中查找是否已经创建完成该Bean实例,如果没有,则会从earlySingletonObjects缓存中查找是否正在创建该Bean实例,如果也没有,则会从singletonFactories缓存中查找创建该Bean实例的工厂方法,并将工厂方法执行结果放入earlySingletonObjects缓存中。当Bean实例创建完成后,会将Bean实例放入singletonObjects缓存中,并从earlySingletonObjects缓存中移除。

需要注意的是,虽然三级缓存机制可以提高Bean的创建效率和减少内存开销,但是也会带来一些问题。例如,当一个Bean实例依赖另一个正在创建的Bean实例时,就可能会出现循环依赖的问题。为了解决这个问题,Spring框架采用了AOP技术和提前暴露对象的方式来处理循环依赖问题。

总之,Spring框架采用三级缓存机制来管理Bean的创建过程,可以提高Bean的创建效率和减少内存开销,但是也会带来一些问题,需要注意处理。

18、Spring 面向切面编程(AOP)

面向切面的编程,或 AOP,是一种编程技术,允许程序模块化横向切割关注点,或横切典型的责任划分,如日志和事务管理。

1、Aspect 切面

​ AOP 核心就是切面,它将多个类的通用行为封装成可重用的模块,该模块含有一组 API 提供横切功能。比如,一个日志模块可以被称作日志的 AOP 切面。根据需求的不同,一个应用程序可以有若干切面。在 SpringAOP 中,切面通过带有@Aspect 注解的类实现。

2、在 SpringAOP 中,关注点和横切关注的区别是什么?

​ 关注点是应用中一个模块的行为,一个关注点可能会被定义成一个我们想实现的一个功能。横切关注点是一个关注点,此关注点是整个应用都会使用的功能,并影响整个应用,比如日志,安全和数据传输,几乎应用的每个模块都需要的功能。因此这些都属于横切关注点。

3、连接点

连接点代表一个应用程序的某个位置,在这个位置我们可以插入一个 AOP 切面,它实际上是个应用程序执行 SpringAOP 的位置。

4、通知

通知是个在方法执行前或执行后要做的动作,实际上是程序执行时要通过 SpringAOP 框架触发的代码段。 Spring 切面可以应用五种类型的通知:

  • before:前置通知,在一个方法执行前被调用。
  • after:在方法执行之后调用的通知,无论方法执行是否成功。
  • after-returning:仅当方法成功完成后执行的通知。
  • after-throwing:在方法抛出异常退出时执行的通知。
  • around:在方法执行之前和之后调用的通知。

5、切点

切入点是一个或一组连接点,通知将在这些位置执行。可以通过表达式或匹配的方式指明切入点。

6、什么是引入?

引入允许我们在已存在的类中增加新的方法和属性。

7、什么是目标对象?

被一个或者多个切面所通知的对象。它通常是一个代理对象。也指被通知(advised)对象。

8、什么是代理?

代理是通知目标对象后创建的对象。从客户端的角度看,代理对象和目标对象是一样的。

9、有几种不同类型的自动代理?

BeanNameAutoProxyCreator DefaultAdvisorAutoProxyCreator Metadataautoproxying

10、什么是织入。什么是织入应用的不同点?

织入是将切面和到其他应用类型或对象连接或创建一个被通知对象的过程。 织入可以在编译时,加载时,或运行时完成。

19、Spring Bean 的作用域之间有什么区别?

  1. singleton:这种 bean 范围是默认的,这种范围确保不管接受到多少个请求,每个容器中只有一个bean 的实例,单例的模式由 bean factory 自身来维护。
  2. prototype:原形范围与单例范围相反,为每一个 bean 请求提供一个实例。
  3. request:在请求 bean 范围内会每一个来自客户端的网络请求创建一个实例,在请求完成以后,bean 会失效并被垃圾回收器回收。
  4. Session:与请求范围类似,确保每个 session 中有一个 bean 的实例,在 session 过期后,bean会随之失效。
  5. global- session:global-session 和 Portlet 应用相关。当你的应用部署在 Portlet 容器中工作时,它包含很多 portlet。如果 你想要声明让所有的 portlet 共用全局的存储变量的话,那么这全局变量需要存储在 global-session 中。全局作用域与 Servlet 中的 session 作用域效果相同

20、Spring 中的设计模式

1. Spring 中的 Singleton

请大家区分 singleton pattern 与 Spring 中的 singleton bean

  • 根据单例模式的目的 Ensure a class only has one instance, and provide a global point of access to it
  • 显然 Spring 中的 singleton bean 并非实现了单例模式,singleton bean 只能保证每个容器内,相同 id 的 bean 单实例
  • 当然 Spring 中也用到了单例模式,例如
    • org.springframework.transaction.TransactionDefinition#withDefaults
    • org.springframework.aop.TruePointcut#INSTANCE
    • org.springframework.aop.interceptor.ExposeInvocationInterceptor#ADVISOR
    • org.springframework.core.annotation.AnnotationAwareOrderComparator#INSTANCE
    • org.springframework.core.OrderComparator#INSTANCE

2. Spring 中的 Builder

定义 Separate the construction of a complex object from its representation so that the same construction process can create different representations

它的主要亮点有三处:

  1. 较为灵活的构建产品对象
  2. 在不执行最后 build 方法前,产品对象都不可用
  3. 构建过程采用链式调用,看起来比较爽

Spring 中体现 Builder 模式的地方:

  • org.springframework.beans.factory.support.BeanDefinitionBuilder
  • org.springframework.web.util.UriComponentsBuilder
  • org.springframework.http.ResponseEntity.HeadersBuilder
  • org.springframework.http.ResponseEntity.BodyBuilder

3. Spring 中的 Factory Method

定义 Define an interface for creating an object, but let subclasses decide which class to instantiate. Factory Method lets a class defer instantiation to subclasses

根据上面的定义,Spring 中的 ApplicationContext 与 BeanFactory 中的 getBean 都可以视为工厂方法,它隐藏了 bean (产品)的创建过程和具体实现

Spring 中其它工厂:

  • org.springframework.beans.factory.FactoryBean
  • @Bean 标注的静态方法及实例方法
  • ObjectFactory 及 ObjectProvider

前两种工厂主要封装第三方的 bean 的创建过程,后两种工厂可以推迟 bean 创建,解决循环依赖及单例注入多例等问题

4. Spring 中的 Adapter

定义 Convert the interface of a class into another interface clients expect. Adapter lets classes work together that couldn't otherwise because of incompatible interfaces

典型的实现有两处:

  • org.springframework.web.servlet.HandlerAdapter – 因为控制器实现有各种各样,比如有
    • 大家熟悉的 @RequestMapping 标注的控制器实现
    • 传统的基于 Controller 接口(不是 @Controller注解啊)的实现
    • 较新的基于 RouterFunction 接口的实现
    • 它们的处理方法都不一样,为了统一调用,必须适配为 HandlerAdapter 接口
  • org.springframework.beans.factory.support.DisposableBeanAdapter – 因为销毁方法多种多样,因此都要适配为 DisposableBean 来统一调用销毁方法

5. Spring 中的 Composite

定义 Compose objects into tree structures to represent part-whole hierarchies. Composite lets clients treat individual objects and compositions of objects uniformly

典型实现有:

  • org.springframework.web.method.support.HandlerMethodArgumentResolverComposite
  • org.springframework.web.method.support.HandlerMethodReturnValueHandlerComposite
  • org.springframework.web.servlet.handler.HandlerExceptionResolverComposite
  • org.springframework.web.servlet.view.ViewResolverComposite

composite 对象的作用是,将分散的调用集中起来,统一调用入口,它的特征是,与具体干活的实现实现同一个接口,当调用 composite 对象的接口方法时,其实是委托具体干活的实现来完成

6. Spring 中的 Decorator

定义 Attach additional responsibilities to an object dynamically. Decorators provide a flexible alternative to subclassing for extending functionality

典型实现:

  • org.springframework.web.util.ContentCachingRequestWrapper

7. Spring 中的 Proxy

定义 Provide a surrogate or placeholder for another object to control access to it

装饰器模式注重的是功能增强,避免子类继承方式进行功能扩展,而代理模式更注重控制目标的访问

典型实现:

  • org.springframework.aop.framework.JdkDynamicAopProxy
  • org.springframework.aop.framework.ObjenesisCglibAopProxy

8. Spring 中的 Chain of Responsibility

定义 Avoid coupling the sender of a request to its receiver by giving more than one object a chance to handle the request. Chain the receiving objects and pass the request along the chain until an object handles it

典型实现:

  • org.springframework.web.servlet.HandlerInterceptor

9. Spring 中的 Observer

定义 Define a one-to-many dependency between objects so that when one object changes state, all its dependents are notified and updated automatically

典型实现:

  • org.springframework.context.ApplicationListener
  • org.springframework.context.event.ApplicationEventMulticaster
  • org.springframework.context.ApplicationEvent

10. Spring 中的 Strategy

定义 Define a family of algorithms, encapsulate each one, and make them interchangeable. Strategy lets the algorithm vary independently from clients that use it

典型实现:

  • org.springframework.beans.factory.support.InstantiationStrategy
  • org.springframework.core.annotation.MergedAnnotations.SearchStrategy
  • org.springframework.boot.autoconfigure.condition.SearchStrategy

11. Spring 中的 Template Method

定义 Define the skeleton of an algorithm in an operation, deferring some steps to subclasses. Template Method lets subclasses redefine certain steps of an algorithm without changing the algorithm's structure

典型实现:

  • 大部分以 Template 命名的类,如 JdbcTemplate,TransactionTemplate
  • 很多以 Abstract 命名的类,如 AbstractApplicationContext

21、Spring refresh 流程

Spring refresh 概述

refresh 是 AbstractApplicationContext 中的一个方法,负责初始化 ApplicationContext 容器,容器必须调用 refresh 才能正常工作。它的内部主要会调用 12 个方法,我们把它们称为 refresh 的 12 个步骤:

  1. prepareRefresh
  2. obtainFreshBeanFactory
  3. prepareBeanFactory
  4. postProcessBeanFactory
  5. invokeBeanFactoryPostProcessors
  6. registerBeanPostProcessors
  7. initMessageSource
  8. initApplicationEventMulticaster
  9. onRefresh
  10. registerListeners
  11. finishBeanFactoryInitialization
  12. finishRefresh
功能分类1 为准备环境2 3 4 5 6 为准备 BeanFactory7 8 9 10 12 为准备 ApplicationContext11 为初始化 BeanFactory 中非延迟单例 bean

1. prepareRefresh

  • 这一步创建和准备了 Environment 对象,它作为 ApplicationContext 的一个成员变量
  • Environment 对象的作用之一是为后续 @Value,值注入时提供键值
  • Environment 分成三个主要部分
    • systemProperties - 保存 java 环境键值
    • systemEnvironment - 保存系统环境键值
    • 自定义 PropertySource - 保存自定义键值,例如来自于 *.properties 文件的键值

image-20210902181639048

2. obtainFreshBeanFactory

  • 这一步获取(或创建) BeanFactory,它也是作为 ApplicationContext 的一个成员变量
  • BeanFactory 的作用是负责 bean 的创建、依赖注入和初始化,bean 的各项特征由 BeanDefinition 定义
    • BeanDefinition 作为 bean 的设计蓝图,规定了 bean 的特征,如单例多例、依赖关系、初始销毁方法等
    • BeanDefinition 的来源有多种多样,可以是通过 xml 获得、配置类获得、组件扫描获得,也可以是编程添加
  • 所有的 BeanDefinition 会存入 BeanFactory 中的 beanDefinitionMap 集合

image-20210902182004819

3. prepareBeanFactory

  • 这一步会进一步完善 BeanFactory,为它的各项成员变量赋值
  • beanExpressionResolver 用来解析 SpEL,常见实现为 StandardBeanExpressionResolver
  • propertyEditorRegistrars 会注册类型转换器
    • 它在这里使用了 ResourceEditorRegistrar 实现类
    • 并应用 ApplicationContext 提供的 Environment 完成 ${ } 解析
  • registerResolvableDependency 来注册 beanFactory 以及 ApplicationContext,让它们也能用于依赖注入
  • beanPostProcessors 是 bean 后处理器集合,会工作在 bean 的生命周期各个阶段,此处会添加两个:
    • ApplicationContextAwareProcessor 用来解析 Aware 接口
    • ApplicationListenerDetector 用来识别容器中 ApplicationListener 类型的 bean

image-20210902182541925

4. postProcessBeanFactory

  • 这一步是空实现,留给子类扩展。
    • 一般 Web 环境的 ApplicationContext 都要利用它注册新的 Scope,完善 Web 下的 BeanFactory
  • 这里体现的是模板方法设计模式

5. invokeBeanFactoryPostProcessors

  • 这一步会调用 beanFactory 后处理器
  • beanFactory 后处理器,充当 beanFactory 的扩展点,可以用来补充或修改 BeanDefinition
  • 常见的 beanFactory 后处理器有
    • ConfigurationClassPostProcessor – 解析 @Configuration、@Bean、@Import、@PropertySource 等
    • PropertySourcesPlaceHolderConfigurer – 替换 BeanDefinition 中的 $
    • MapperScannerConfigurer – 补充 Mapper 接口对应的 BeanDefinition

image-20210902183232114

6. registerBeanPostProcessors

  • 这一步是继续从 beanFactory 中找出 bean 后处理器,添加至 beanPostProcessors 集合中
  • bean 后处理器,充当 bean 的扩展点,可以工作在 bean 的实例化、依赖注入、初始化阶段,常见的有:
    • AutowiredAnnotationBeanPostProcessor 功能有:解析 @Autowired,@Value 注解
    • CommonAnnotationBeanPostProcessor 功能有:解析 @Resource,@PostConstruct,@PreDestroy
    • AnnotationAwareAspectJAutoProxyCreator 功能有:为符合切点的目标 bean 自动创建代理

image-20210902183520307

7. initMessageSource

  • 这一步是为 ApplicationContext 添加 messageSource 成员,实现国际化功能
  • 去 beanFactory 内找名为 messageSource 的 bean,如果没有,则提供空的 MessageSource 实现

image-20210902183819984

8. initApplicationContextEventMulticaster

  • 这一步为 ApplicationContext 添加事件广播器成员,即 applicationContextEventMulticaster
  • 它的作用是发布事件给监听器
  • 去 beanFactory 找名为 applicationEventMulticaster 的 bean 作为事件广播器,若没有,会创建默认的事件广播器
  • 之后就可以调用 ApplicationContext.publishEvent(事件对象) 来发布事件

image-20210902183943469

9. onRefresh

  • 这一步是空实现,留给子类扩展
    • SpringBoot 中的子类在这里准备了 WebServer,即内嵌 web 容器
  • 体现的是模板方法设计模式

10. registerListeners

  • 这一步会从多种途径找到事件监听器,并添加至 applicationEventMulticaster
  • 事件监听器顾名思义,用来接收事件广播器发布的事件,有如下来源
    • 事先编程添加的
    • 来自容器中的 bean
    • 来自于 @EventListener 的解析
  • 要实现事件监听器,只需要实现 ApplicationListener 接口,重写其中 onApplicationEvent(E e) 方法即可

image-20210902184343872

11. finishBeanFactoryInitialization

  • 这一步会将 beanFactory 的成员补充完毕,并初始化所有非延迟单例 bean
  • conversionService 也是一套转换机制,作为对 PropertyEditor 的补充
  • embeddedValueResolvers 即内嵌值解析器,用来解析 @Value 中的 ${ },借用的是 Environment 的功能
  • singletonObjects 即单例池,缓存所有单例对象
    • 对象的创建都分三个阶段,每一阶段都有不同的 bean 后处理器参与进来,扩展功能

image-20210902184641623

12. finishRefresh

  • 这一步会为 ApplicationContext 添加 lifecycleProcessor 成员,用来控制容器内需要生命周期管理的 bean
  • 如果容器中有名称为 lifecycleProcessor 的 bean 就用它,否则创建默认的生命周期管理器
  • 准备好生命周期管理器,就可以实现
    • 调用 context 的 start,即可触发所有实现 LifeCycle 接口 bean 的 start
    • 调用 context 的 stop,即可触发所有实现 LifeCycle 接口 bean 的 stop
  • 发布 ContextRefreshed 事件,整个 refresh 执行完成

image-20210902185052433

22、Spring声明式事务使用步骤

1、注解

@Transactional注解实现事务要加rollbackFor

@Transactional(rollbackFor = Exception.class)
默认情况下,@Transactional 注解只会回滚 RuntimeException 异常及其子类。如果方法中抛出的异常不是 RuntimeException 的子类,那么 Spring 就不会回滚事务,这可能导致数据不一致的问题。

因此,为了确保在方法中抛出任何异常时都能够回滚事务,可以在 @Transactional 注解中使用 rollbackFor 属性来指定需要回滚的异常类型。例如,@Transactional(rollbackFor = Exception.class) 表示在方法中抛出任何异常时都会回滚事务。

2、全局事务

配置全局事务后就不需要每个方法都添加@Transactional注解了,在开发过程中更注重于业务开发。
package com.xx.config; import org.springframework.aop.Advisor; import org.springframework.aop.aspectj.AspectJExpressionPointcut; import org.springframework.aop.support.DefaultPointcutAdvisor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionManager; import org.springframework.transaction.interceptor.NameMatchTransactionAttributeSource; import org.springframework.transaction.interceptor.RollbackRuleAttribute; import org.springframework.transaction.interceptor.RuleBasedTransactionAttribute; import org.springframework.transaction.interceptor.TransactionInterceptor; import java.util.ArrayList; import java.util.List; /** * @Author: xueqimiao * @Date: 2024/1/17 09:45 */ @Configuration public class TransactionConfiguration { /** * 配置全局事务的切点为service层的所有方法 * 设置service层所在位置 */ private static final String AOP_POINTCUT_EXPRESSION = "execution (* com.xx.service..*.*(..))"; /** * 注入事务管理器 */ @Autowired private TransactionManager transactionManager; /** * 配置事务拦截器 */ @Bean public TransactionInterceptor txAdvice() { RuleBasedTransactionAttribute txAttrRequired = new RuleBasedTransactionAttribute(); txAttrRequired.setName("REQUIRED事务"); //设置事务传播机制,默认是PROPAGATION_REQUIRED:如果当前存在事务,则加入该事务;如果当前没有事务,则创建一个新的事务 txAttrRequired.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); //设置异常回滚为Exception 默认是RuntimeException List<RollbackRuleAttribute> rollbackRuleAttributes = new ArrayList<>(); rollbackRuleAttributes.add(new RollbackRuleAttribute(Exception.class)); txAttrRequired.setRollbackRules(rollbackRuleAttributes); RuleBasedTransactionAttribute txAttrRequiredReadOnly = new RuleBasedTransactionAttribute(); txAttrRequiredReadOnly.setName("SUPPORTS事务"); //设置事务传播机制,PROPAGATION_SUPPORTS:如果当前存在事务,则加入该事务;如果当前没有事务,则以非事务的方式继续运行 txAttrRequiredReadOnly.setPropagationBehavior(TransactionDefinition.PROPAGATION_SUPPORTS); //设置异常回滚为Exception 默认是RuntimeException txAttrRequiredReadOnly.setRollbackRules(rollbackRuleAttributes); txAttrRequiredReadOnly.setReadOnly(true); /*事务管理规则,声明具备事务管理的方法名*/ NameMatchTransactionAttributeSource source = new NameMatchTransactionAttributeSource(); //方法名规则限制,必须以下列开头才会加入事务管理当中 source.addTransactionalMethod("add*", txAttrRequired); source.addTransactionalMethod("save*", txAttrRequired); source.addTransactionalMethod("create*", txAttrRequired); source.addTransactionalMethod("insert*", txAttrRequired); source.addTransactionalMethod("submit*", txAttrRequired); source.addTransactionalMethod("del*", txAttrRequired); source.addTransactionalMethod("remove*", txAttrRequired); source.addTransactionalMethod("update*", txAttrRequired); source.addTransactionalMethod("exec*", txAttrRequired); source.addTransactionalMethod("set*", txAttrRequired); //对于查询方法,根据实际情况添加事务管理 可能存在查询多个数据时,已查询出来的数据刚好被改变的情况 source.addTransactionalMethod("get*", txAttrRequiredReadOnly); source.addTransactionalMethod("select*", txAttrRequiredReadOnly); source.addTransactionalMethod("query*", txAttrRequiredReadOnly); source.addTransactionalMethod("find*", txAttrRequiredReadOnly); source.addTransactionalMethod("list*", txAttrRequiredReadOnly); source.addTransactionalMethod("count*", txAttrRequiredReadOnly); source.addTransactionalMethod("is*", txAttrRequiredReadOnly); return new TransactionInterceptor(transactionManager, source); } /** * 设置切面 */ @Bean public Advisor txAdviceAdvisor() { AspectJExpressionPointcut pointcut = new AspectJExpressionPointcut(); pointcut.setExpression(AOP_POINTCUT_EXPRESSION); return new DefaultPointcutAdvisor(pointcut, txAdvice()); } }

23、Spring常用注解

事务注解

  • @EnableTransactionManagement,会额外加载 4 个 bean
    • BeanFactoryTransactionAttributeSourceAdvisor 事务切面类
    • TransactionAttributeSource 用来解析事务属性
    • TransactionInterceptor 事务拦截器
    • TransactionalEventListenerFactory 事务监听器工厂
  • @Transactional

核心

  • @Order

切面

  • @EnableAspectJAutoProxy
    • 会加载 AnnotationAwareAspectJAutoProxyCreator,它是一个 bean 后处理器,用来创建代理
    • 如果没有配置 @EnableAspectJAutoProxy,又需要用到代理(如事务)则会使用 InfrastructureAdvisorAutoProxyCreator 这个 bean 后处理器

组件扫描与配置类

  • @Component
  • @Controller
  • @Service
  • @Repository
  • @ComponentScan
  • @Conditional
  • @Configuration
    • 配置类其实相当于一个工厂, 标注 @Bean 注解的方法相当于工厂方法
    • @Bean 不支持方法重载, 如果有多个重载方法, 仅有一个能入选为工厂方法
    • @Configuration 默认会为标注的类生成代理, 其目的是保证 @Bean 方法相互调用时, 仍然能保证其单例特性
    • @Configuration 中如果含有 BeanFactory 后处理器, 则实例工厂方法会导致 MyConfig 提前创建, 造成其依赖注入失败,解决方法是改用静态工厂方法或直接为 @Bean 的方法参数依赖注入, 针对 Mapper 扫描可以改用注解方式
  • @Bean
  • @Import
    • 四种用法① 引入单个 bean② 引入一个配置类③ 通过 Selector 引入多个类④ 通过 beanDefinition 注册器
    • 解析规则
      • 同一配置类中, @Import 先解析 @Bean 后解析
      • 同名定义, 默认后面解析的会覆盖前面解析的
      • 不允许覆盖的情况下, 如何能够让 MyConfig(主配置类) 的配置优先? (虽然覆盖方式能解决)
      • 采用 DeferredImportSelector,因为它最后工作, 可以简单认为先解析 @Bean, 再 Import
  • @Lazy
    • 加在类上,表示此类延迟实例化、初始化
    • 加在方法参数上,此参数会以代理方式注入
  • @PropertySource

依赖注入

  • @Autowired
  • @Qualifier
  • @Value

mvc mapping

  • @RequestMapping,可以派生多个注解如 @GetMapping 等

mvc rest

  • @RequestBody
  • @ResponseBody,组合 @Controller => @RestController
  • @ResponseStatus

mvc 统一处理

  • @ControllerAdvice,组合 @ResponseBody => @RestControllerAdvice
  • @ExceptionHandler

mvc 参数

  • @PathVariable

mvc ajax

  • @CrossOrigin

boot auto

  • @SpringBootApplication
  • @EnableAutoConfiguration
  • @SpringBootConfiguration

boot condition

  • @ConditionalOnClass,classpath 下存在某个 class 时,条件才成立
  • @ConditionalOnMissingBean,beanFactory 内不存在某个 bean 时,条件才成立
  • @ConditionalOnProperty,配置文件中存在某个 property(键、值)时,条件才成立

boot properties

  • @ConfigurationProperties,会将当前 bean 的属性与配置文件中的键值进行绑定
  • @EnableConfigurationProperties,会添加两个较为重要的 bean
    • ConfigurationPropertiesBindingPostProcessor,bean 后处理器,在 bean 初始化前调用下面的 binder
    • ConfigurationPropertiesBinder,真正执行绑定操作

Spring MVC 面试题

1、SpringMVC 工作原理?

  • 客户端发送请求到 DispatcherServlet
  • DispatcherServlet 查询 handlerMapping 找到处理请求的 Controller
  • Controller 调用业务逻辑后,返回 ModelAndView
  • DispatcherServlet 查询 ModelAndView,找到指定视图
  • 视图将结果返回到客户端

2、SpringMVC 流程?

  1. 用户发送请求至前端控制器 DispatcherServlet。
  2. DispatcherServlet 收到请求调用 HandlerMapping 处理器映射器。
  3. 处理器映射器找到具体的处理器(可以根据 xml 配置、注解进行查找),生成处理器对象及处理器拦截器(如果有则生成)一并返回给 DispatcherServlet。
  4. DispatcherServlet 调用 HandlerAdapter 处理器适配器。
  5. HandlerAdapter 经过适配调用具体的处理器(Controller,也叫后端控制器)。
  6. Controller 执行完成返回 ModelAndView。
  7. HandlerAdapter 将 controller 执行结果 ModelAndView 返回给 DispatcherServlet。
  8. DispatcherServlet 将 ModelAndView 传给 ViewReslover 视图解析器。
  9. ViewReslover 解析后返回具体 View。
  10. DispatcherServlet 根据 View 进行渲染视图(即将模型数据填充至视图中)。
  11. DispatcherServlet 响应用户。

3、 Spring MVC 执行流程详述

概要

我把整个流程分成三个阶段

  • 准备阶段
  • 匹配阶段
  • 执行阶段

准备阶段

  1. 在 Web 容器第一次用到 DispatcherServlet 的时候,会创建其对象并执行 init 方法
  2. init 方法内会创建 Spring Web 容器,并调用容器 refresh 方法
  3. refresh 过程中会创建并初始化 SpringMVC 中的重要组件, 例如 MultipartResolver,HandlerMapping,HandlerAdapter,HandlerExceptionResolver、ViewResolver 等
  4. 容器初始化后,会将上一步初始化好的重要组件,赋值给 DispatcherServlet 的成员变量,留待后用

image-20210903140657163

匹配阶段

  1. 用户发送的请求统一到达前端控制器 DispatcherServlet
  2. DispatcherServlet 遍历所有 HandlerMapping ,找到与路径匹配的处理器① HandlerMapping 有多个,每个 HandlerMapping 会返回不同的处理器对象,谁先匹配,返回谁的处理器。其中能识别 @RequestMapping 的优先级最高② 对应 @RequestMapping 的处理器是 HandlerMethod,它包含了控制器对象和控制器方法信息③ 其中路径与处理器的映射关系在 HandlerMapping 初始化时就会建立好

image-20210903141017502
  1. 将 HandlerMethod 连同匹配到的拦截器,生成调用链对象 HandlerExecutionChain 返回

image-20210903141124911
  1. 遍历HandlerAdapter 处理器适配器,找到能处理 HandlerMethod 的适配器对象,开始调用

image-20210903141204799

调用阶段

  1. 执行拦截器 preHandle

image-20210903141445870
  1. 由 HandlerAdapter 调用 HandlerMethod① 调用前处理不同类型的参数② 调用后处理不同类型的返回值

image-20210903141658199
  1. 第 2 步没有异常① 返回 ModelAndView② 执行拦截器 postHandle 方法③ 解析视图,得到 View 对象,进行视图渲染

image-20210903141749830
  1. 第 2 步有异常,进入 HandlerExceptionResolver 异常处理流程

image-20230324150707422
  1. 最后都会执行拦截器的 afterCompletion 方法
  2. 如果控制器方法标注了 @ResponseBody 注解,则在第 2 步,就会生成 json 结果,并标记 ModelAndView 已处理,这样就不会执行第 3 步的视图渲染

Spring Boot面试题

1、Spring Boot 中的监视器是什么?

Spring boot actuator 是 spring 启动框架中的重要功能之一。Spring boot 监视器可帮助您访问生产环境中正在运行的应用程序的当前状态。有几个指标必须在生产环境中进行检查和监控。即使一些外部应用程序可能正在使用这些服务来向相关人员触发警报消息。监视器模块公开了一组可直接作为 HTTP URL 访问的 REST 端点来检查状态。

2、SpringBoot 自动装配原理

SpringBoot的自动装配是SpringFramework的一个扩展,它可以根据应用程序的配置和classpath中的类来自动配置Spring应用程序上下文。SpringBoot的自动装配是通过条件化配置和Spring的组件扫描机制来实现的。

具体来说,SpringBoot的自动装配原理包括以下几个方面:

  1. 条件化配置

SpringBoot的自动装配是基于条件化配置的,即通过在类路径下的META-INF/spring.factories文件中定义条件化的自动配置类,根据条件来决定是否应用该自动配置类。例如,如果应用程序中配置了某个Bean,则不会应用自动配置类中的相应Bean。

  1. Spring组件扫描

SpringBoot的自动装配也依赖于Spring的组件扫描机制,即通过在应用程序中定义@ComponentScan注解来扫描应用程序中的组件,并将它们自动注入到Spring应用程序上下文中。通过自动扫描机制,SpringBoot可以自动装配许多常见的组件,例如控制器、服务、存储库等。

  1. 自动配置类

SpringBoot的自动装配还依赖于自动配置类,即在类路径下的META-INF/spring.factories文件中定义的条件化自动配置类。这些自动配置类可以自动配置Spring应用程序上下文中的各种组件,例如数据源、事务管理器、WebMvcConfigurer等。

  1. SpringBoot Starter

SpringBoot Starter是SpringBoot的一个重要特性,它可以将相关的依赖项打包成一个单独的模块,并提供默认的自动配置类和配置文件,从而简化应用程序的配置过程。例如,SpringBoot提供了spring-boot-starter-web模块,它包含了Web应用程序所需的依赖项和默认的自动配置类,可以快速地搭建一个Web应用程序。

总之,SpringBoot的自动装配是通过条件化配置和Spring的组件扫描机制来实现的,它可以大大简化应用程序的配置过程,并提高应用程序的开发效率。

@SpringBootConfiguration 是一个组合注解,由 @ComponentScan、@EnableAutoConfiguration 和 @SpringBootConfiguration 组成@SpringBootConfiguration 与普通 @Configuration 相比,唯一区别是前者要求整个 app 中只出现一次@ComponentScanexcludeFilters - 用来在组件扫描时进行排除,也会排除自动配置类@EnableAutoConfiguration 也是一个组合注解,由下面注解组成@AutoConfigurationPackage – 用来记住扫描的起始包@Import(AutoConfigurationImportSelector.class) 用来加载 META-INF/spring.factories 中的自动配置类

为什么不使用 @Import 直接引入自动配置类

有两个原因:

  1. 让主配置类和自动配置类变成了强耦合,主配置类不应该知道有哪些从属配置
  2. 直接用 @Import(自动配置类.class),引入的配置解析优先级较高,自动配置类的解析应该在主配置没提供时作为默认配置

因此,采用了 @Import(AutoConfigurationImportSelector.class)

  • 由 AutoConfigurationImportSelector.class 去读取 META-INF/spring.factories 中的自动配置类,实现了弱耦合。
  • 另外 AutoConfigurationImportSelector.class 实现了 DeferredImportSelector 接口,让自动配置的解析晚于主配置的解析

3、SpringBoot 拦截器、监听器、过滤器

SpringBoot中的拦截器、监听器和过滤器都是用于处理Web请求的中间件,它们可以对请求进行预处理、后处理、日志记录等操作,从而实现对Web应用程序的控制和管理。下面分别介绍它们的作用和区别:

  1. 拦截器(Interceptor)

拦截器是SpringMVC框架提供的一种机制,它可以在请求处理前、处理后和视图渲染前对请求进行拦截和处理。拦截器常用于对用户的请求进行身份验证、日志记录、权限控制等操作。在SpringBoot中,可以通过实现HandlerInterceptor接口来创建拦截器,在配置文件中配置拦截器的生效路径。

  1. 监听器(Listener)

监听器是用于监听应用程序中事件的一种机制,它可以在事件发生前、发生后或发生异常时进行处理。SpringBoot中的监听器可以监听Web应用程序的启动和关闭、Session的创建和销毁、上下文的初始化和销毁等事件。在SpringBoot中,可以通过实现ApplicationListener接口来创建监听器,在配置文件中配置监听器的生效方式。

  1. 过滤器(Filter)

过滤器是Web应用程序中最常用的一种中间件,它可以在请求被处理前和响应被发送前对请求和响应进行过滤和处理。过滤器常用于对请求进行编码、解码、验证、日志记录等操作。在SpringBoot中,可以通过实现Filter接口来创建过滤器,在配置文件中配置过滤器的生效路径和顺序。

总之,拦截器、监听器和过滤器都是用于处理Web请求的中间件,它们的作用和使用方式略有不同,但都可以实现对Web应用程序的控制和管理。在开发Web应用程序时,可以根据具体需求选择合适的中间件来处理请求。

Spring Cloud面试题

1、使用 Spring Cloud 有什么优势?

使用 Spring Boot 开发分布式微服务时,我们面临以下问题

  • 与分布式系统相关的复杂性-这种开销包括网络问题,延迟开销,带宽问题,安全问题。
  • 服务发现-服务发现工具管理群集中的流程和服务如何查找和互相交谈。它涉及一个服务目录,在该目录中注册服务,然后能够查找并连接到该目录中的服务。
  • 冗余-分布式系统中的冗余问题。
  • 负载平衡 --负载平衡改善跨多个计算资源的工作负荷,诸如计算机,计算机集群,网络链路,中央处理单元,或磁盘驱动器的分布。
  • 性能-问题 由于各种运营开销导致的性能问题。
  • 部署复杂性-Devops 技能的要求。

2、什么是 Hystrix?它如何实现容错?

Hystrix是Netflix开源的一个用于实现容错的库,它主要用于处理分布式系统中的延迟和故障。Hystrix可以帮助开发人员实现服务的容错和降级,从而提高系统的可用性和稳定性。

Hystrix的容错机制主要包括以下几个方面:

  1. 服务降级

当服务不可用或响应时间超过阈值时,Hystrix会自动将请求转发到服务的备用实现或缓存中,从而避免服务的故障影响到整个系统的性能和可用性。

  1. 服务熔断

Hystrix可以根据一定的规则来监控服务的健康状态,当服务出现故障或响应时间超过阈值时,Hystrix会自动断开服务的调用,避免服务的故障对整个系统的性能和可用性造成影响。

  1. 服务限流

Hystrix可以根据一定的规则来限制服务的请求流量,当服务的请求流量超过一定阈值时,Hystrix会自动拒绝部分请求或延迟部分请求的执行,从而保护服务的可用性和稳定性。

  1. 监控和报警

Hystrix可以实时监控服务的健康状态和性能指标,包括响应时间、失败率、请求量等,同时可以通过报警机制来及时发现和处理服务的异常情况,从而保障系统的可用性和稳定性。

总之,Hystrix是一个用于实现容错的库,它通过服务降级、服务熔断、服务限流和监控报警等机制来保障系统的可用性和稳定性。在分布式系统的开发中,Hystrix是一个非常重要的工具,可以帮助开发人员实现高可用、高性能的分布式系统。

3、Feign是什么?

Feign集成了Ribbon、RestTemplate实现了负载均衡的执行Http调用,只不过对原有的方式(Ribbon+RestTemplate)进行了封装,开发者不必手动使用RestTemplate调服务,而是定义一个接口,在这个接口中标注一个注解即可完成服务调用,这样更加符合面向接口编程的宗旨,简化了开发。

4、openFeign是什么?

OpenFeign是springcloud在Feign的基础上支持了SpringMVC的注解,如@RequestMapping等等。OpenFeign的@FeignClient可以解析SpringMVC的@RequestMapping注解下的接口,并通过动态代理的方式产生实现类,实现类中做负载均衡并调用其他服务。

5、Feign和openFeign有什么区别?

FeignopenFiegn
Feign是SpringCloud组件中一个轻量级RESTful的HTTP服务客户端,Feign内置了Ribbon,用来做客户端负载均衡,去调用服务注册中心的服务。Feign的使用方式是:使用Feign的注解定义接口,调用这个接口,就可以调用服务注册中心的服务OpenFeign 是SpringCloud在Feign的基础上支持了SpringMVC的注解,如@RequestMapping等。OpenFeign 的@FeignClient可以解析SpringMVC的@RequestMapping注解下的接口,并通过动态代理的方式产生实现类,实现类中做负载均衡并调用其他服务。

6、openFeign如何传参?

1、传递JSON数据

这个也是接口开发中常用的传参规则,在Spring Boot 中通过@RequestBody标识入参。

/** * 参数默认是@RequestBody标注的,这里的@RequestBody可以不填 * 方法名称任意 */ @PostMapping("/openfeign/provider/order2") Order createOrder2(@RequestBody Order order);

注意:openFeign默认的传参方式就是JSON传参(@RequestBody),因此定义接口的时候可以不用@RequestBody注解标注,不过为了规范,一般都填上。

2、POJO表单传参

/** * 参数默认是@RequestBody标注的,如果通过POJO表单传参的,使用@SpringQueryMap标注 */ @PostMapping("/openfeign/provider/order1") Order createOrder1(@SpringQueryMap Order order);

图片

openFeign提供了一个注解@SpringQueryMap完美解决POJO表单传参。

3、URL中携带参数

@GetMapping("/openfeign/provider/test/{id}") String get(@PathVariable("id")Integer id);

使用注解@PathVariable接收url中的占位符,这种方式很好理解。

4、另类传参

/** * 必须要@RequestParam注解标注,且value属性必须填上参数名 * 方法参数名可以任意,但是@RequestParam注解中的value属性必须和provider中的参数名相同 */ @PostMapping("/openfeign/provider/test2") String test(@RequestParam("id") String arg1,@RequestParam("name") String arg2);

7、超时如何处理?

@PostMapping("/test2") public String test2(String id,String name) throws InterruptedException { // 休眠3秒 Thread.sleep(3000); return MessageFormat.format("accept on msg id={0},name={1}",id,name); }

openFeign其实是有默认的超时时间的,默认分别是连接超时时间10秒、读超时时间60秒,源码在feign.Request.Options#Options()这个方法中,如下图:

图片

那么问题来了:为什么我只设置了睡眠3秒就报超时呢?

其实openFeign集成了Ribbon,Ribbon的默认超时连接时间、读超时时间都是是1秒,源码在org.springframework.cloud.openfeign.ribbon.FeignLoadBalancer#execute()方法中,如下图:

图片

源码大致意思:如果openFeign没有设置对应得超时时间,那么将会采用Ribbon的默认超时时间。

理解了超时设置的原理,由之产生两种方案也是很明了了,如下:

  • 设置openFeign的超时时间
  • 设置Ribbon的超时时间

1、设置Ribbon的超时时间(不推荐)

设置很简单,在配置文件中添加如下设置:

ribbon: # 值的是建立链接所用的时间,适用于网络状况正常的情况下, 两端链接所用的时间 ReadTimeout: 5000 # 指的是建立链接后从服务器读取可用资源所用的时间 ConectTimeout: 5000

2、设置openFeign的超时时间(推荐)

openFeign设置超时时间非常简单,只需要在配置文件中配置,如下:

feign: client: config: # default 设置的全局超时时间,指定服务名称可以设置单个服务的超时时间 default: connectTimeout: 5000 readTimeout: 5000
default设置的是全局超时时间,对所有的openFeign接口服务都生效

8、如何开启日志增强?

openFeign虽然提供了日志增强功能,但是默认是不显示任何日志的,不过开发者在调试阶段可以自己配置日志的级别。

openFeign的日志级别如下:

  • NONE:默认的,不显示任何日志;
  • BASIC:仅记录请求方法、URL、响应状态码及执行时间;
  • HEADERS:除了BASIC中定义的信息之外,还有请求和响应的头信息;
  • FULL:除了HEADERS中定义的信息之外,还有请求和响应的正文及元数据。

1、配置类中配置日志级别

图片
注意:这里的logger是feign包里的。

2、yaml文件中设置接口日志级别

只需要在配置文件中调整指定包或者openFeign的接口日志级别,如下:

logging: level: cn.xx.service: debug

这里的cn.xx.service是openFeign接口所在的包名,当然你也可以配置一个特定的openFeign接口。

上述步骤将日志设置成了FULL,此时发出请求,日志效果如下图:

图片

日志中详细的打印出了请求头、请求体的内容。

9、如何替换默认的httpclient?

Feign在默认情况下使用的是JDK原生的URLConnection发送HTTP请求,没有连接池,但是对每个地址会保持一个长连接,即利用HTTP的persistence connection。

在生产环境中,通常不使用默认的http client,通常有如下两种选择:

  • 使用ApacheHttpClient
  • 使用OkHttp

至于哪个更好,其实各有千秋,我比较倾向于ApacheHttpClient,毕竟老牌子了,稳定性不在话下。

那么如何替换掉呢?其实很简单,下面演示使用ApacheHttpClient替换。

1、添加ApacheHttpClient依赖

在openFeign接口服务的pom文件添加如下依赖:

<!-- 使用Apache HttpClient替换Feign原生httpclient--> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> </dependency> <dependency> <groupId>io.github.openfeign</groupId> <artifactId>feign-httpclient</artifactId> </dependency>

为什么要添加上面的依赖呢?从源码中不难看出,请看org.springframework.cloud.openfeign.FeignAutoConfiguration.HttpClientFeignConfiguration这个类,代码如下:

图片

上述红色框中的生成条件,其中的@ConditionalOnClass(ApacheHttpClient.class),必须要有ApacheHttpClient这个类才会生效,并且feign.httpclient.enabled这个配置要设置为true

2、配置文件中开启

在配置文件中要配置开启,代码如下:

feign: client: httpclient: # 开启 Http Client enabled: true

3、如何验证已经替换成功?

其实很简单,在feign.SynchronousMethodHandler#executeAndDecode()这个方法中可以清楚的看出调用哪个client,如下图:

图片

上图中可以看到最终调用的是ApacheHttpClient

4、总结

上述步骤仅仅演示一种替换方案,剩下的一种不再演示了,原理相同。

10、如何通讯优化?

在讲如何优化之前先来看一下GZIP 压缩算法,概念如下:

gzip是一种数据格式,采用用deflate算法压缩数据;gzip是一种流行的数据压缩算法,应用十分广泛,尤其是在Linux平台。

当GZIP压缩到一个纯文本数据时,效果是非常明显的,大约可以减少70%以上的数据大小。

网络数据经过压缩后实际上降低了网络传输的字节数,最明显的好处就是可以加快网页加载的速度。网页加载速度加快的好处不言而喻,除了节省流量,改善用户的浏览体验外,另一个潜在的好处是GZIP与搜索引擎的抓取工具有着更好的关系。例如 Google就可以通过直接读取GZIP文件来比普通手工抓取更快地检索网页。

GZIP压缩传输的原理如下图:

图片

按照上图拆解出的步骤如下:

  • 客户端向服务器请求头中带有:Accept-Encoding:gzip,deflate 字段,向服务器表示,客户端支持的压缩格式(gzip或者deflate),如果不发送该消息头,服务器是不会压缩的。
  • 服务端在收到请求之后,如果发现请求头中含有Accept-Encoding字段,并且支持该类型的压缩,就对响应报文压缩之后返回给客户端,并且携带Content-Encoding:gzip消息头,表示响应报文是根据该格式压缩过的。
  • 客户端接收到响应之后,先判断是否有Content-Encoding消息头,如果有,按该格式解压报文。否则按正常报文处理。

openFeign支持请求/响应开启GZIP压缩,整体的流程如下图:

图片

上图中涉及到GZIP传输的只有两块,分别是Application client -> Application Service、 Application Service->Application client

注意:openFeign支持的GZIP仅仅是在openFeign接口的请求和响应,即是openFeign消费者调用服务提供者的接口。

openFeign开启GZIP步骤也是很简单,只需要在配置文件中开启如下配置:

feign: # 开启压缩 compression: request: enabled: true # 开启压缩的阈值,单位字节,默认2048,即是2k,这里为了演示效果设置成10字节 min-request-size: 10 mime-types: text/xml,application/xml,application/json response: enabled: true

上述配置完成之后,发出请求,可以清楚看到请求头中已经携带了GZIP压缩,如下图:

图片

11、如何熔断降级

常见的熔断降级框架有HystrixSentinel,openFeign默认支持的就是Hystrix,这个在官方文档上就有体现,毕竟是一奶同胞嘛,哈哈...........

但是阿里的Sentinel无论是功能特性、简单易上手等各方面都完全秒杀Hystrix,因此此章节就使用openFeign+Sentinel进行整合实现服务降级。

1、添加Sentinel依赖

openFeign-consumer9006消费者的pom文件添加sentinel依赖(由于使用了聚合模块,不指定版本号),如下:

<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId> </dependency>

2、配置文件中开启sentinel熔断降级

要想openFeign使用sentinel的降级功能,还需要在配置文件中开启,添加如下配置:

feign: sentinel: enabled: true

3、添加降级回调类

这个类一定要和openFeign接口实现同一个类,如下图:

图片

OpenFeignFallbackService这个是降级回调的类,一旦OpenFeignService中对应得接口出现了异常则会调用这个类中对应得方法进行降级处理。

4、添加fallback属性

@FeignClient中添加fallback属性,属性值是降级回调的类,如下:

@FeignClient(value = "openFeign-provider",fallback = OpenFeignFallbackService.class) public interface OpenFeignService {}

5、演示

经过如上4个步骤,openFeign的熔断降级已经设置完成了,此时演示下效果。

通过postman调用http://localhost:9006/openfeign/order3这个接口,正常逻辑返回如下图:

图片

现在手动造个异常,在服务提供的接口中抛出异常,如下图:

图片

此时重新调用http://localhost:9006/openfeign/order3,返回如下图:

图片

12、Sentinel如何做分布式限流

Sentinel 是一种分布式系统的流量控制组件,可以实现流量控制、服务降级、熔断降级等功能。在 Sentinel 中,可以通过配置流控规则来控制每个服务的请求流量,防止服务被过度请求而导致服务宕机或者响应变慢。

Sentinel 中的分布式限流是通过令牌桶算法和滑动窗口算法实现的。具体来说,Sentinel 会在每个服务的入口处配置流控规则,根据规则来限制每个服务的请求流量。当某个服务的请求量超过流控规则中设置的阈值时,Sentinel 会根据算法来拒绝或者延迟请求,从而达到限流的效果。

在令牌桶算法中,Sentinel 会为每个服务配置一个令牌桶,令牌桶中存储了一定数量的令牌,每次请求时需要从令牌桶中获取令牌,如果令牌不足则请求被拒绝或者延迟。在滑动窗口算法中,Sentinel 会将每个服务的请求量按照时间窗口进行统计,如果请求量超过了设置的阈值,请求会被拒绝或者延迟。

总的来说,Sentinel 的分布式限流是通过令牌桶算法和滑动窗口算法实现的,可以根据业务需求进行配置和调整,从而实现对服务的流量控制和限制。

13、Sentinel限流策略

Sentinel 提供了多种限流策略,可以根据实际情况选择合适的限流策略。以下是 Sentinel 支持的一些限流策略:

  1. 直接拒绝:当请求超出阈值时,直接拒绝请求,返回错误信息。
  2. 预热/冷启动:在系统启动时,逐渐增加流量,以避免系统在刚启动时被大量请求压垮。
  3. 排队等待:当请求超出阈值时,将请求放入队列中等待处理,避免请求被拒绝后直接影响用户体验。
  4. 慢启动:在系统启动时,逐渐增加流量,避免系统在刚启动时被大量请求压垮。
  5. 平滑降级:当系统负载过高时,逐渐降低系统的处理能力,避免系统崩溃。
  6. 热点参数限流:根据请求参数进行限流,避免热点请求对系统造成过大的压力。
  7. 熔断降级:当系统负载过高或者出现异常时,将请求拦截或者延迟处理,避免系统崩溃或者响应时间过长。

以上是 Sentinel 支持的一些限流策略,根据业务需求和实际情况选择合适的策略进行配置和应用。在配置限流规则时,需要综合考虑多个方面,例如系统负载、业务需求、用户体验等因素,从而达到合理的限流效果。

14、Sentinel线程池隔离和信号量隔离

Sentinel 提供了两种隔离策略,分别是线程池隔离和信号量隔离。

线程池隔离是将每个被保护的资源分配到一个线程池中进行处理,每个资源都有自己的线程池,不同资源之间相互独立。在线程池隔离的模式下,当一个资源被限流或者熔断时,只会影响当前资源的请求,不会影响到其他资源的请求。线程池隔离的优点是隔离性好,可以有效地限制每个资源的请求量,避免资源互相影响,但是线程池的创建和销毁会占用系统资源。

信号量隔离是将每个被保护的资源分配到一个信号量中进行处理,每个资源都有自己的信号量,不同资源之间相互独立。在信号量隔离的模式下,当一个资源被限流或者熔断时,会影响到同一信号量下的所有资源的请求。信号量隔离的优点是资源消耗少,但是隔离性相对较差,容易造成资源互相影响。

在选择线程池隔离和信号量隔离时,需要根据实际情况进行选择。如果需要更好的隔离性和更精细的流量控制,可以选择线程池隔离;如果需要更高的性能和更少的资源消耗,可以选择信号量隔离。另外,在配置隔离策略时,需要根据业务需求和实际情况选择合适的配置参数,例如线程池大小、并发度等,以达到更好的效果。

15、Nacos是怎么实现配置动态更新的

Nacos 通过监听配置数据变更的方式实现配置动态更新。具体来说,当 Nacos 中的配置数据发生变化时,Nacos 会通过监听器机制将变化的信息通知给客户端,客户端在接收到通知后会重新获取最新的配置数据,并更新本地的配置信息。

Nacos 的配置监听机制包括两个部分:客户端监听和服务端监听。客户端监听是指客户端通过订阅 Nacos 的配置服务,当配置数据发生变化时,Nacos 会主动通知客户端,客户端在接收到通知后会重新获取最新的配置数据。服务端监听是指 Nacos 在配置数据发生变化时,会通知所有订阅该配置的客户端,以便客户端能够及时更新配置信息。

Nacos 中的配置监听机制是基于长连接实现的,客户端和服务端之间通过长连接来实现实时通信。当客户端与 Nacos 建立长连接后,Nacos 会将客户端所订阅的配置信息发送给客户端,并保持连接状态。当配置数据发生变化时,Nacos 会主动向客户端发送通知,客户端在接收到通知后会重新获取最新的配置数据,并更新本地的配置信息。

总之,Nacos 通过监听配置数据变化的方式实现配置动态更新,利用长连接实现实时通信,保证配置信息的及时更新和同步。这种机制能够满足配置实时更新的需求,提高了系统的可靠性和可维护性。

分布式事务

1、什么是分布式事务

分布式事务表示事务的参与者,事务所在的服务器,以及涉及的资源服务器,事务管理器等分别位于不同的服务或者不同的数据库节点上。

更简单的理解,事务表示一组操作,那么这一组操作中,包含多个操作 那么这多个操作可能存在不同的服务器上 或者多个操作根本就不属于同一个应用,分布式事务就表示,可能不在同一台服务器,不是同一个项目中的事务,整体成功或者整体失败。
1、事务

由一组操作构成的可靠、独立的工作单元,事务具备 ACID 的特性,即原子性、一致性、隔离性和持久性。

2、本地事务

本地事务由本地资源管理器(通常指数据库管理系统 DBMS,例如 MySQL、Oracle 等)管理,严格地支持 ACID 特性,高效可靠。本地事务不具备分布式事务的处理能力,隔离的最小单位受限于资源管理器,即本地事务只能对自己数据库的操作进行控制,对于其他数据库的操作则无能为力。

3、全局事务

全局事务指的是一次性操作多个资源管理器完成的事务,由一组分支事务组成。

4、分支事务

在分布式事务中,就是一个个受全局事务管辖和协调的本地事务。

我们可以将分布式事务理解成 一个包含了若干个分支事务的全局事务

全局事务的职责是协调其管辖的各个分支事务达成一致,要么一起成功提交,要么一起失败回滚。

此外,通常分支事务本身就是一个满足 ACID 特性的本地事务。

2、CAP理论

CAP 理论(有时也被称为布鲁尔定理(Brewer's Theorem))是分布式系统设计中的一个重要理论,它由计算机科学家 Eric Brewer 在 2000 年提出,用来描述分布式系统中的三个基本特性:一致性(Consistency)可用性(Availability)分区容错性(Partition Tolerance)

  1. 一致性(Consistency):
    • 一致性要求在分布式系统中的所有节点上,对数据的访问都呈现一致的视图。即,无论客户端访问哪个节点,都能够获得相同的数据。在一致性的情况下,系统的所有节点在同一时间点的数据状态是相同的。
    • 比如: mysql的主从复制读写分离时,当对主库添加数据成功时,从库必须也添加成功 并且从从库中读取数据时读取的应该是最新的数据,不能出现主库添加数据后从从库中读取的不是最新的数据,比如在主从复制过程中读取从库,此时数据还没有复制过来,此时读取的可能就不是最新的数据,此时就不满足一致性了所以想要满足一致性,当主库添加数据时,需要把数据同步到从库 并且同步的时候需要锁定从库,不能让从库参与读取,等到同步完成之后再取消锁定,此时再读就是最新的数据了,此时就满足了一致性。
  2. 可用性(Availability):
    • 可用性要求系统在任何时候都能够对外提供服务,即使系统中的一些节点出现故障。即,对于客户端的请求,系统能够在合理的时间内返回结果,而不是出现无响应或错误。
    • 比如:mysql的读写分离,当对主机添加数据时,从机会复制这条数据,当客户端读取从机时 不能超时报错必须得有响应。此时允许读取的数据不是最新的。从机还不能被锁定。
  3. 分区容错性(Partition Tolerance):
    • 分区容错性是指系统能够在节点之间发生通信故障或分区的情况下继续工作。
    • 比如:mysql的读写分离,给主机添加一条数据时,主机应该异步把数据复制给从机, 不能同步复制给从机,如果同步复制给从机,导致主机线程阻塞,影响用户二次插入(影响了主机对外提供添加数据服务),并且从机不能只有一个要有多个,如果只有一个从机,从机挂了之后影响了读操作(影响了从机对外提供读取数据服务),如果有多个从机,其中一个从机挂了 其他的从机还会对外提供服务保证系统了正常运行。
CAP定理证明 在分布式系统中要么满足CP ,要么满足AP,无法满足CA,更无法满足CAP无法满足CA的原因因为一致性要求数据必须一致,要想一致得锁定,锁定后则无法被访问,可用性又强调不能超时,还快速响应非错误结果,数据可以不一致,很显然矛盾的。可以满足CP的原因C表示一致性,P表示分区容错性,如果考虑网络分区的情况下,当给主库添加数据时,主库把数据同步给从库此时主库线程等待,直到把数据全部同步给网络分区中的所有的从机,这就不满足有快速响应结果的可用性了所以不能有A。可以满足AP的原因A表示可用性,访问需要快速响应,P表示分区容错性,挂机不影响对外提供服务,就要保证请求时快速从网络分,区节点中响应数据,此时就有可能从不同的节点中读取的数据不同,不满足一致性所以不能有C。实际中的考虑一般的系统中会优先使用AP,不强调强一致性,允许数据在某一个时间点不一致,但是数据经过一段时间间隔最终会一致。对一致性要求比较高的系统可能会选择CP,强调强一致性,比如银行,银行可能要求,不论什么时候查询,都要查询到最新的数据。

3、分布式事务解决方案

  • 强一致性 CP
    • XA协议 数据库级别的规范,需要数据库的支持
  • 弱一致性 AP
    • TCC
    • AT
    • Saga
    • 可靠消息最终一致性
    • 最大努力通知 xxl-job + MQ 定时轮询

XA协议:

  • 说明: XA是一种分布式事务协议,涉及多个数据库。它要求每个涉及的数据库都要支持事务,并且由一个事务管理器协调和控制事务的提交和回滚。
  • 优点: 提供了强一致性,所有参与的数据库要么都提交,要么都回滚。
  • 缺点: 实现复杂,性能开销较大。

TCC(Try-Confirm-Cancel):

  • 说明: TCC是一种基于补偿机制的分布式事务解决方案。每个操作被分解成三个阶段:尝试执行、确认执行和取消执行。通过事务的回滚和补偿操作来实现最终一致性。
  • 优点: 灵活性高,可以自定义补偿逻辑。
  • 缺点: 业务逻辑的改造较大,需要开发者定义补偿逻辑。

AT(Two-Phase Commit):

  • 说明: AT是一种两阶段提交协议,通过协调者向参与者发送两个消息(准备阶段和提交/回滚阶段)来实现事务的一致性。
  • 优点: 相对XA来说实现相对简单。
  • 缺点: 存在单点故障,可能导致协调者故障时系统无法正常运作。

Saga:

  • 说明: Saga是一种流程型的分布式事务解决方案,将大事务拆分成多个小事务,每个小事务有自己的补偿操作。通过事务的步骤序列和补偿来实现最终一致性。
  • 优点: 可以实现较为复杂的业务流程。
  • 缺点: 需要开发者定义补偿逻辑,系统复杂度相对较高。

可靠消息最终一致性:

  • 说明: 通过消息队列,将事务操作转换为消息发送,保证消息的可靠性投递,接收方异步处理消息。消息队列通常提供至少一次投递保证,确保消息最终被消费。
  • 优点: 简单易实现,可靠性较高。
  • 缺点: 无法处理所有可能的一致性问题,依赖于消息队列的可靠性。

最大努力通知:

  • 说明: 在分布式系统中,发送通知的一方尽最大努力发送通知,但不能保证通知的可靠性。接收通知的一方需要实现幂等性来处理可能的重复通知。
  • 优点: 简单,适用于不强调强一致性的场景。
  • 缺点: 不能保证通知的可靠性,需要接收方处理幂等性问题。

4、两阶段提交模型

二阶段提交表示的是XA规范下的事务提交 分为2个阶段

第一个阶段: 资源服务器执行xa prepare。

1、事务管理器通知资源管理器,让资源管理器为提交事务做准备

2、资源管理器收到消息后 执行sql 执行本地事务执行完毕之后不会提交事务,向事务管理器说我执行sql没有出现问题,已经准备好了提交(注意当前资源管理器当前线程阻塞,等待提交)

第二个阶段: 1、如果各个资源管理器都执行成功 则向各个资源管理器发送提交事务的请求

2、各个资源管理器收到请求之后 执行本地事务提交 然后释放资源

异常情况:

1、如果有资源管理器返回的是失败 则向各个资源管理器发送回滚事务的请求

2、各个资源管理器收到请求之后 执行事务回滚 然后释放资源

image-20240119095023476

image-20240119095036538
可能出现数据不一致 当所有参与者操作执行完成,并且向协调者发回同意的消息之后,协调者就会向所有的参与者发出提交的请求,假设者提交请求发到了参与者1,参与者2,但当发送给参与者3时,发生了网络异常,提交请求并没有到达参与者3,就会出现参与者1,参与者2都提交了,参与者3却没有提交,这个时候,整个分布式系统中就会出现数据不一致的情况。

5、三阶段提交模型

因为二阶段提交出现了问题,所以在二阶段的基础上,提出了三阶段提交。

三阶段提交时二阶段提交的改进版本,三个阶段如下:

预备阶段、准备阶段、提交阶段

预备阶段、准备阶段是从二阶段准备阶段拆分出来的

在三阶段中,也是一个协调者和多个参与者 参与者1,2,3

具体逻辑实现

1.预备阶段

协调者开启事务,向参与者发出询问的请求,询问参与者是否可以执行分布式事务的请求,可以执行就会返回一个同意的消息,如果不可以执行就会返回一个中止的消息,在协调者收到所有参与者的询问相应之后,分布式事务就进入了第二个阶段——准备阶段。

2.准备阶段

假设所有的参与者返回的是同意消息,那么分布式事务开始执行,协调者会向所有参与者发出执行分布式事务的请求,在所有参与者收到请求之后,会将所有的分布式事务记录成日志,然后逐条执行分布式事务操作,执行完之后,先不提交,如果执行成功,就返回一个同意的消息,如果执行本地事务的过程中,出现了异常,返回中止消息,在所有参与者返回本地执行结果之后,分布式事务进入第三个阶段,提交阶段。

3.提交阶段

协调者会根据所有参与者返回的结果来确定下一步的操作

假设所有参与者返回的都是同意的消息,那么协调者就知道所有的参与者准备好了,也就会开始执行提交请求,在参与者收到提交请求之后,就会执行本地事务的提交,并且释放相应的资源,执行完成之后,再返回一个完成的消息,在协调者收到所有的完成消息之后,分布式事务也就执行成功了。这是执行正常的情况。

异常处理

1.在预备阶段,协调者询问参与者是否进行分布式事务时,参与者并没有准备好,参与者会返回一个中止消息。

2.在准备阶段,参与者在执行本地事务时,执行异常,那么返回的不是同意,而是中止消息。

无论是第一种情况,还是第二种情况,一旦协调者收到参与者中止消息,就会中止分布式事务,向所有的参与者发出回滚的请求,参与者在收到回滚请求后,会根据日志,将所有的请求撤销,回到分布式事务开启之前的一致状态,这样,整个分布式事务就执行失败。

三阶段提交相对于二阶段提交的改进:

在三阶段提交增加了预备阶段,一旦进入预备阶段,表示所有参与者已经准备好分布式事务,即所有参与者的本地事务是可以执行成功的,这个时候,如果参与者记录完日志,并且执行完本地事务操作之后,就差提交了,这个时候不管发生协调者异常还是网络异常,协调者提交之后,请求没有到达参与者,参与者在等待超时之后,都会默认提交分布式事务,相对于二阶段提交,若二阶段异常,是没办法继续执行的,所以三阶段提交的改进是在二阶段提交没办法继续执行的情况下,可默认将事务提交。

缺点:

当参与者1执行本地事务异常时,给协调者回复中止信号,恰好协调者也异常,这个时候协调者无法中止整个分布式事务,参与者2,参与者3的事务会默认提交,但是参与者1又是执行异常的情况,整个分布式系统也会不一致,所以三阶段提交虽然改变了二阶段提交的问题,但并没有完美实现分布式事务,不过,理论上如此,在具体的实现中,我们会根据业务,对数据进行补偿和修正,以保证业务的数据最终一致。

Redis面试题

1、Redis是单线程还是多线程

这个问题不能简单地用“是”或“否”来回答。Redis的“单线程”标签主要指的是其网络请求处理和命令执行模型,但随着版本的演进,Redis逐步引入了多线程机制来处理特定任务。下面按版本进行说明:

🔹 Redis 2.6(2012年10月)

  • 引入了Lua脚本(EVAL命令),增强了原子性操作。
  • 处理命令仍是典型的单线程

🔹 Redis 3.x(2015年4月)

  • 引入了Redis Cluster,实现分布式集群。
  • 每个节点仍然是单线程处理客户端命令,多个节点并行带来了“逻辑多线程”。

🔹 Redis 4.x(2017年7月)

  • 开始引入异步删除键值的多线程机制(如UNLINK命令),解决大键阻塞问题。
  • 虽然命令处理仍是单线程,但开始使用多线程处理后台任务(如key删除、AOF重写等)。
  • 从这个版本起,Redis 不再是严格意义上的单线程

🔹 Redis 5.x(2018年10月)

  • 进行了内部代码的结构性重构,为多线程打下更坚实的基础。
  • 添加了Streams数据结构,但处理模式仍是“主线程处理命令 + 后台线程做辅助”。

🔹 Redis 6.x(2020年5月)

  • 关键转折点。引入了多线程IO机制
    • 主线程仍处理命令解析与执行,
    • 网络IO读写(accept、read、write)可以由多个线程并行完成(需手动开启)。
  • 实现了**“半多线程模型”**,大大提升了多核CPU下的性能。

✅ 总结

  • Redis 核心命令执行仍然由主线程串行处理,这是为了保持操作的原子性和简单性。
  • 但从 4.x 开始 Redis 就引入了多线程辅助功能,6.x 更是实质性引入了IO多线程,提高并发处理能力。

因此,Redis不是纯粹的单线程系统,而是逐步引入多线程特性的高性能键值数据库。

2、Redis3.x单线程时代但性能依旧很快的主要原因

基于内存操作:Redis 的所有数据都存在内存中,因此所有的运算都是内存级别的,所以他的性能比较高; 数据结构简单:Redis 的数据结构是专门设计的,而这些简单的数据结构的查找和操作的时间大部分复杂度都是 O(1),因此性能比较高; 多路复用和非阻塞 I/O:Redis使用 I/O多路复用功能来监听多个 socket连接客户端,这样就可以使用一个线程连接来处理多个请求,减少线程切换带来的开销,同时也避免了 I/O 阻塞操作 避免上下文切换:因为是单线程模型,因此就避免了不必要的上下文切换和多线程竞争,这就省去了多线程切换带来的时间和性能上的消耗,而且单线程不会导致死锁问题的发生 作者原话大致解析:Redis 是基于内存操作的,因此他的瓶颈可能是机器的内存或者网络带宽而并非 CPU,既然 CPU 不是瓶颈,那么自然就采用单线程的解决方案了,况且使用多线程比较麻烦

3、为什么逐渐加入了多线程特性?

正常情况下使用 del 指令可以很快的删除数据,而当被删除的 key 是一个非常大的对象时,例如时包含了成千上万个元素的 hash 集合时,那么 del 指令就会造成 Redis 主线程卡顿。 这就是redis3.x单线程时代最经典的故障,大key删除的头疼问题, 由于redis是单线程的,del bigKey ..... 等待很久这个线程才会释放,类似加了一个synchronized锁,你可以想象高并发下,程序堵成什么样子?

4、Redis6.0默认是否开启了多线程?

Redis将所有数据放在内存中,内存的响应时长大约为100纳秒,对于小数据包,Redis服务器可以处理8W到10W的QPS,这也是Redis处理的极限了,对于80%的公司来说,单线程的Redis已经足够使用了。

在Redis6.0中,多线程机制默认是关闭的,如果需要使用多线程功能,需要在redis.conf中完成两个设置 1.设置io-thread-do-reads配置项为yes,表示启动多线程 2.设置线程个数。关于线程数的设置,官方的建议是如果为 4 核的 CPU,建议线程数设置为 2 或 3,如果为 8 核 CPU 建议线程数设置为 6,线程数一定要小于机器核数,线程数并不是越大越好。

5、Redis的八种数据类型,分别是什么应用场景

string hash list set zset bitmap hyperloglog geo

1、string

文章浏览量,只要点击了,直接可以使用incr key 命令增加一个数字1,完成记录数字

2、hash

购物车

新增商品 → hset shopcar:uid1024 334488 1 新增商品 → hset shopcar:uid1024 334477 1 增加商品数量 → hincrby shopcar:uid1024 334477 1 商品总数 → hlen shopcar:uid1024 全部选择 → hgetall shopcar:uid102

3、list

微信公众号订阅的消息

用户ID,立即参与按钮 sadd key 用户ID 显示已经有多少人参与了,上图23208人参加 SCARD key 抽奖(从set中任意选取N个中奖人) SRANDMEMBER key 2 随机抽奖2个人,元素不删除 SPOP key 3 随机抽奖3个人,元素会删除 微信朋友圈: 新增点赞 sadd pub:msgID 点赞用户ID1 点赞用户ID2 取消点赞 srem pub:msgID 点赞用户ID 展现所有点赞过的用户 SMEMBERS pub:msgID 点赞用户数统计,就是常见的点赞红色数字 scard pub:msgID 判断某个朋友是否对楼主点赞过 SISMEMBER pub:msgID 用户ID 微博好友关注社交 - 共同关注的人 关注了他们两个,只要他们发布了新文章,就会安装进我的List lpush likearticle:id 11 22 查看自己的号订阅的全部文章 lrange likearticle:id 0 9

4、set

抽奖小程序

用户ID,立即参与按钮 sadd key 用户ID 显示已经有多少人参与了,上图23208人参加 SCARD key 抽奖(从set中任意选取N个中奖人) SRANDMEMBER key 2 随机抽奖2个人,元素不删除 SPOP key 3 随机抽奖3个人,元素会删除

5、zset

根据商品销售对商品进行排序显示

思路:定义商品销售排行榜(sorted set集合),key为goods:sellsort,分数为商品销售数量。 商品编号1001的销量是9,商品编号1002的销量是15 zadd goods:sellsort 9 1001 15 1002 有一个客户又买了2件商品1001,商品编号1001销量加2 zincrby goods:sellsort 2 1001 求商品销量前10名 ZRANGE goods:sellsort 0 10 withscores 抖音热搜 点击视频 ZINCRBY hotvcr:20200919 1 八佰 ZINCRBY hotvcr:20200919 15 八佰 2 花木兰 展示当日排行前10条 ZREVRANGE hotvcr:20200919 0 9 withscores

6、bitmap

京东签到领取京豆

签到日历仅展示当月签到数据 签到日历需展示最近连续签到天数 假设当前日期是20210618,且20210616未签到 若20210617已签到且0618未签到,则连续签到天数为1 若20210617已签到且0618已签到,则连续签到天数为2

7、hyperloglog

天猫网站首页亿级UV的Redis统计方案 统计日活

8、geo

美团地图位置附近的酒店推送

6、布隆过滤器怎么用?

介绍

  • 由一个初值都为零的bit数组和多个哈希函数构成,用来快速判断某个数据是否存在

image-20210628143128875
  • 本质就是判断具体数据存不存在一个大的集合中
  • 布隆过滤器是一种类似set的数据结构,只是统计结果不太准确

特点

  • 高效地插入和查询,占用空间少,返回的结果是不确定性的。
  • 一个元素如果判断结果为存在的时候元素不一定存在,但是判断结果为不存在的时候则一定不存在。
  • 布隆过滤器可以添加元素,但是不能删除元素。因为删掉元素会导致误判率增加。
  • 误判只会发生在过滤器没有添加过的元素,对于添加过的元素不会发生误判。

备注:

有,是可能有

无,是肯定无

可以保证的是,如果布隆过滤器判断一个元素不在一个集合中,那这个元素一定不会在集合中

使用场景

  • 解决缓存穿透的问题
缓存穿透是什么

一般情况下,先查询缓存redis是否有该条数据,缓存中没有时,再查询数据库。

当数据库也不存在该条数据时,每次查询都要访问数据库,这就是缓存穿透。

缓存透带来的问题是,当有大量请求查询数据库不存在的数据时,就会给数据库带来压力,甚至会拖垮数据库。

可以使用布隆过滤器解决缓存穿透的问题

把已存在数据的key存在布隆过滤器中,相当于redis前面挡着一个布隆过滤器。

当有新的请求时,先到布隆过滤器中查询是否存在:

如果布隆过滤器中不存在该条数据则直接返回;

如果布隆过滤器中已存在,才去查询缓存redis,如果redis里没查询到则穿透到Mysql数据库
  • 黑名单校验
发现存在黑名单中的,就执行特定操作。比如:识别垃圾邮件,只要是邮箱在黑名单中的邮件,就识别为垃圾邮件。

假设黑名单的数量是数以亿计的,存放起来就是非常耗费存储空间的,布隆过滤器则是一个较好的解决方案。

把所有黑名单都放在布隆过滤器中,在收到邮件时,判断邮件地址是否在布隆过滤器中即可。

原理

  • Java中传统hash
哈希函数的概念是:将任意大小的输入数据转换成特定大小的输出数据的函数,转换后的数据称为哈希值或哈希编码,也叫散列值

image-20210628143221991
如果两个散列值是不相同的(根据同一函数)那么这两个散列值的原始输入也是不相同的。

这个特性是散列函数具有确定性的结果,具有这种性质的散列函数称为单向散列函数。

散列函数的输入和输出不是唯一对应关系的,如果两个散列值相同,两个输入值很可能是相同的,但也可能不同,

这种情况称为“散列碰撞(collision)”。

用 hash表存储大数据量时,空间效率还是很低,当只有一个 hash 函数时,还很容易发生哈希碰撞。
  • Java中hash冲突java案例
public class HashCodeConflictDemo { public static void main(String[] args) { Set<Integer> hashCodeSet = new HashSet<>(); for (int i = 0; i <200000; i++) { int hashCode = new Object().hashCode(); if(hashCodeSet.contains(hashCode)) { System.out.println("出现了重复的hashcode: "+hashCode+"\t 运行到"+i); break; } hashCodeSet.add(hashCode); } System.out.println("Aa".hashCode()); System.out.println("BB".hashCode()); System.out.println("柳柴".hashCode()); System.out.println("柴柕".hashCode()); } }
1、布隆过滤器实现原理和数据结构
布隆过滤器原理

布隆过滤器(Bloom Filter) 是一种专门用来解决去重问题的高级数据结构。

实质就是一个大型位数组和几个不同的无偏hash函数(无偏表示分布均匀)。由一个初值都为零的bit数组和多个个哈希函数构成,用来快速判断某个数据是否存在。但是跟 HyperLogLog 一样,它也一样有那么一点点不精确,也存在一定的误判概率

image-20210628143243175
添加key时

使用多个hash函数对key进行hash运算得到一个整数索引值,对位数组长度进行取模运算得到一个位置,

每个hash函数都会得到一个不同的位置,将这几个位置都置1就完成了add操作。

查询key时

只要有其中一位是零就表示这个key不存在,但如果都是1,则不一定存在对应的key。

结论:

有,是可能有

无,是肯定无

当有变量被加入集合时,通过N个映射函数将这个变量映射成位图中的N个点,

把它们置为 1(假定有两个变量都通过 3 个映射函数)。

image-20210628143253575
查询某个变量的时候我们只要看看这些点是不是都是 1, 就可以大概率知道集合中有没有它了

如果这些点,有任何一个为零则被查询变量一定不在,

如果都是 1,则被查询变量很可能存在,

为什么说是可能存在,而不是一定存在呢?那是因为映射函数本身就是散列函数,散列函数是会有碰撞的。

image.png
2、3步骤

image-20210628143313048

image-20210628143326989
向布隆过滤器查询某个key是否存在时,先把这个 key 通过相同的多个 hash 函数进行运算,查看对应的位置是否都为 1,

只要有一个位为 0,那么说明布隆过滤器中这个 key 不存在;
如果这几个位置全都是 1,那么说明极有可能存在;
因为这些位置的 1 可能是因为其他的 key 存在导致的,也就是前面说过的hash冲突。。。。。
就比如我们在 add 了字符串wmyskxz数据之后,很明显下面1/3/5 这几个位置的 1 是因为第一次添加的 wmyskxz 而导致的;
此时我们查询一个没添加过的不存在的字符串inexistent-key,它有可能计算后坑位也是1/3/5 ,这就是误判了......

image-20210628143350360

image-20210628143402808

image-20210628143221991

image-20210628143243175

image-20210628143253575

image-20210628143313048

image-20210628143326989

image-20210628143350360

image-20210628143402808
3、布隆过滤器误判率,为什么不要删除
布隆过滤器的误判是指多个输入经过哈希之后在相同的bit位置1了,这样就无法判断究竟是哪个输入产生的,
因此误判的根源在于相同的 bit 位被多次映射且置 1。
这种情况也造成了布隆过滤器的删除问题,因为布隆过滤器的每一个 bit 并不是独占的,很有可能多个元素共享了某一位。
如果我们直接删除这一位的话,会影响其他的元素
特性
一个元素判断结果为没有时则一定没有,
如果判断结果为存在的时候元素不一定存在。
布隆过滤器可以添加元素,但是不能删除元素。因为删掉元素会导致误判率增加。
4、总结
  • 是否存在
    • 有,是很可能有
    • 无,是肯定无
    • 可以保证的是,如果布隆过滤器判断一个元素不在一个集合中,那这个元素一定不会在集合中
  • 使用时最好不要让实际元素数量远大于初始化数量
  • 当实际元素数量超过初始化数量时,应该对布隆过滤器进行重建,重新分配一个 size 更大的过滤器,再将所有的历史元素批量 add 进行
6、优缺点
  • 优点:
    • 高效地插入和查询,占用空间少
  • 缺点
    • 不能删除元素。因为删掉元素会导致误判率增加,因为hash冲突同一个位置可能存的东西是多个共有的,你删除一个元素的同时可能也把其它的删除了。
    • 存在误判不同的数据可能出来相同的hash值
7、布谷鸟过滤器
为了解决布隆过滤器不能删除元素的问题,布谷鸟过滤器横空出世。论文《Cuckoo Filter:Better Than Bloom》

作者将布谷鸟过滤器和布隆过滤器进行了深入的对比。相比布谷鸟过滤器而言布隆过滤器有以下不足:

查询性能弱、空间利用效率低、不支持反向操作(删除)以及不支持计数

7、Redis除了拿来做缓存,你还见过基于Redis的什么用法?

分布式锁

8、Redis 做分布式锁的时候有需要注意的问题?

1、独占性

OnlyOne,任何时刻只能有且仅有一个线程持有

2、高可用

若redis集群环境下,不能因为某一个节点挂了而出现获取锁和释放锁失败的情况

3、防死锁

杜绝死锁,必须有超时控制机制或者撤销操作,有个兜底终止跳出方案

4、不乱抢

防止张冠李戴,不能私下unlock别人的锁,只能自己加锁自己释放。

5、重入性

同一个节点的同一个线程如果获得锁之后,它也可以再次获取这个锁。

9、那你简单的介绍一下 Redlock 吧?你简历上写redisson,你谈谈

1、使用场景

多个服务间保证同一时刻同一时间段内同一用户只能有一个请求(防止关键业务出现并发攻击)

Redis分布式锁比较正确的姿势是采用redisson这个客户端工具

2、RedLock Java的实现 Redisson

image-20210628144659220
英文:https://redis.io/topics/distlock

中文:http://redis.cn/topics/distlock.html

Redisson是java的redis客户端之一,提供了一些api方便操作redis

redisson之官网:https://redisson.org/

redisson之Github:https://github.com/redisson/redisson/wiki/1.-Overview

redisson之解决分布式锁:https://github.com/redisson/redisson/wiki/8.-Distributed-locks-and-synchronizers

3、单机案例

1、三个重要元素
  • 加锁
    • 加锁实际上就是在redis中,给Key键设置一个值,为避免死锁,并给定一个过期时间
  • 解锁
    • 将Key键删除。但也不能乱删,不能说客户端1的请求将客户端2的锁给删除掉,只能自己删除自己的锁
    • Lua
为了保证解锁操作的原子性,我们用LUA脚本完成这一操作。先判断当前锁的字符串是否与传入的值相等,是的话就删除Key,解锁成功。 if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end
  • 超时
    • 锁key要注意过期时间,不能长期占用
加锁关键逻辑:

public static boolean tryLock(String key, String uniqueId, int seconds) {

return "OK".equals(jedis.set(key, uniqueId, "NX", "EX", seconds));

}

解锁关键逻辑:

public static boolean releaseLock(String key, String uniqueId) {

String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then " +

​ "return redis.call('del', KEYS[1]) else return 0 end";

return jedis.eval(

​ luaScript,

​ Collections.singletonList(key),

​ Collections.singletonList(uniqueId)

).equals(1L);

}

上面一般中小公司,不是高并发场景,是可以使用的。单机redis小业务也撑得住

4、多机案例

1、基于setnx的分布式锁有什么缺点?

image-20210628144726706
线程 1 首先获取锁成功,将键值对写入 redis 的 master 节点;

在 redis 将该键值对同步到 slave 节点之前,master 发生了故障;

redis 触发故障转移,其中一个 slave 升级为新的 master;

此时新的 master 并不包含线程 1 写入的键值对,因此线程 2 尝试获取锁也可以成功拿到锁;

此时相当于有两个线程获取到了锁,可能会导致各种预期之外的情况发生,例如最常见的脏数据。

我们加的是排它独占锁,同一时间只能有一个建redis锁成功并持有锁,严禁出现2个以上的请求线程拿到锁。危险的

2、redis之父提出了Redlock算法解决这个问题
Redis也提供了Redlock算法,用来实现基于多个实例的分布式锁。

锁变量由多个实例维护,即使有实例发生了故障,锁变量仍然是存在的,客户端还是可以完成锁操作。Redlock算法是实现高可靠分布式锁的一种有效解决方案,可以在实际开发中使用。
3、Redis、Zookeeper、Eureka集群比较
1、Zookeeper集群 -- CP

image-20210628144742249

image-20210628144751829
2、Eureka集群 -- AP

image-20210628144801409
3、Redis集群 -- AP

redis异步复制造成的锁丢失,

比如:主节点没来的及把刚刚set进来这条数据给从节点,就挂了。

5、设计理念

该方案也是基于(set 加锁、Lua 脚本解锁)进行改良的,所以redis之父antirez 只描述了差异的地方,大致方案如下。

假设我们有N个Redis主节点,例如 N = 5这些节点是完全独立的,我们不使用复制或任何其他隐式协调系统,为了取到锁客户端执行以下操作:

1获取当前时间,以毫秒为单位;
2依次尝试从5个实例,使用相同的 key 和随机值(例如 UUID)获取锁。当向Redis 请求获取锁时,客户端应该设置一个超时时间,这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为 10 秒,则超时时间应该在 5-50 毫秒之间。这样可以防止客户端在试图与一个宕机的 Redis 节点对话时长时间处于阻塞状态。如果一个实例不可用,客户端应该尽快尝试去另外一个 Redis 实例请求获取锁;
3客户端通过当前时间减去步骤 1 记录的时间来计算获取锁使用的时间。当且仅当从大多数(N/2+1,这里是 3 个节点)的 Redis 节点都取到锁,并且获取锁使用的时间小于锁失效时间时,锁才算获取成功;
4如果取到了锁,其真正有效时间等于初始有效时间减去获取锁所使用的时间(步骤 3 计算的结果)。
5如果由于某些原因未能获得锁(无法在至少 N/2 + 1 个 Redis 实例获取锁、或获取锁的时间超过了有效时间),客户端应该在所有的 Redis 实例上进行解锁(即便某些Redis实例根本就没有加锁成功,防止某些节点获取到锁但是客户端没有得到响应而导致接下来的一段时间不能被重新获取锁)。

该方案为了解决数据不一致的问题,直接舍弃了异步复制只使用 master 节点,同时由于舍弃了 slave,为了保证可用性,引入了 N 个节点,官方建议是 5。本次用3台实例来做说明。

客户端只有在满足下面的这两个条件时,才能认为是加锁成功。

条件1:客户端从超过半数(大于等于N/2+1)的Redis实例上成功获取到了锁;

条件2:客户端获取锁的总耗时没有超过锁的有效时间。

image-20210628144811476

为什么是奇数? N = 2X + 1 (N是最终部署机器数,X是容错机器数)

1 先知道什么是容错

失败了多少个机器实例后我还是可以容忍的,所谓的容忍就是数据一致性还是可以Ok的,CP数据一致性还是可以满足

加入在集群环境中,redis失败1台,可接受。2X+1 = 2 * 1+1 =3,部署3台,死了1个剩下2个可以正常工作,那就部署3台。

加入在集群环境中,redis失败2台,可接受。2X+1 = 2 * 2+1 =5,部署5台,死了2个剩下3个可以正常工作,那就部署5台。

2 为什么是奇数?

最少的机器,最多的产出效果

加入在集群环境中,redis失败1台,可接受。2N+2= 2 * 1+2 =4,部署4台

加入在集群环境中,redis失败2台,可接受。2N+2 = 2 * 2+2 =6,部署6台
1、案例
  • docker走起3台redis的master机器,本次设置3台master各自独立无从属关系
docker run -p 6381:6379 --name redis-master-1 -d redis:6.0.7

docker run -p 6382:6379 --name redis-master-2 -d redis:6.0.7

docker run -p 6383:6379 --name redis-master-3 -d redis:6.0.7
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <!--<version>3.12.0</version>--> <version>3.13.4</version> </dependency> spring.application.name=spring-boot-redis server.port=9090 spring.swagger2.enabled=true spring.redis.database=0 spring.redis.password= spring.redis.timeout=3000 #sentinel/cluster/single spring.redis.mode=single spring.redis.pool.conn-timeout=3000 spring.redis.pool.so-timeout=3000 spring.redis.pool.size=10 spring.redis.single.address1=192.168.111.147:6381 spring.redis.single.address2=192.168.111.147:6382 spring.redis.single.address3=192.168.111.147:6383 @Configuration @EnableConfigurationProperties(RedisProperties.class) public class CacheConfiguration { @Autowired RedisProperties redisProperties; @Bean RedissonClient redissonClient1() { Config config = new Config(); String node = redisProperties.getSingle().getAddress1(); node = node.startsWith("redis://") ? node : "redis://" + node; SingleServerConfig serverConfig = config.useSingleServer() .setAddress(node) .setTimeout(redisProperties.getPool().getConnTimeout()) .setConnectionPoolSize(redisProperties.getPool().getSize()) .setConnectionMinimumIdleSize(redisProperties.getPool().getMinIdle()); if (StringUtils.isNotBlank(redisProperties.getPassword())) { serverConfig.setPassword(redisProperties.getPassword()); } return Redisson.create(config); } @Bean RedissonClient redissonClient2() { Config config = new Config(); String node = redisProperties.getSingle().getAddress2(); node = node.startsWith("redis://") ? node : "redis://" + node; SingleServerConfig serverConfig = config.useSingleServer() .setAddress(node) .setTimeout(redisProperties.getPool().getConnTimeout()) .setConnectionPoolSize(redisProperties.getPool().getSize()) .setConnectionMinimumIdleSize(redisProperties.getPool().getMinIdle()); if (StringUtils.isNotBlank(redisProperties.getPassword())) { serverConfig.setPassword(redisProperties.getPassword()); } return Redisson.create(config); } @Bean RedissonClient redissonClient3() { Config config = new Config(); String node = redisProperties.getSingle().getAddress3(); node = node.startsWith("redis://") ? node : "redis://" + node; SingleServerConfig serverConfig = config.useSingleServer() .setAddress(node) .setTimeout(redisProperties.getPool().getConnTimeout()) .setConnectionPoolSize(redisProperties.getPool().getSize()) .setConnectionMinimumIdleSize(redisProperties.getPool().getMinIdle()); if (StringUtils.isNotBlank(redisProperties.getPassword())) { serverConfig.setPassword(redisProperties.getPassword()); } return Redisson.create(config); } /** * 单机 * @return */ /*@Bean public Redisson redisson() { Config config = new Config(); config.useSingleServer().setAddress("redis://192.168.111.147:6379").setDatabase(0); return (Redisson) Redisson.create(config); }*/ @Data public class RedisPoolProperties { private int maxIdle; private int minIdle; private int maxActive; private int maxWait; private int connTimeout; private int soTimeout; /** * 池大小 */ private int size; } @Data public class RedisPoolProperties { private int maxIdle; private int minIdle; private int maxActive; private int maxWait; private int connTimeout; private int soTimeout; /** * 池大小 */ private int size; } @Data public class RedisSingleProperties { private String address1; private String address2; private String address3; } @RestController @Slf4j public class RedLockController { public static final String CACHE_KEY_REDLOCK = "xx_REDLOCK"; @Autowired RedissonClient redissonClient1; @Autowired RedissonClient redissonClient2; @Autowired RedissonClient redissonClient3; @GetMapping(value = "/redlock") public void getlock() { //CACHE_KEY_REDLOCK为redis 分布式锁的key RLock lock1 = redissonClient1.getLock(CACHE_KEY_REDLOCK); RLock lock2 = redissonClient2.getLock(CACHE_KEY_REDLOCK); RLock lock3 = redissonClient3.getLock(CACHE_KEY_REDLOCK); RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3); boolean isLock; try { //waitTime 锁的等待时间处理,正常情况下 等5s //leaseTime就是redis key的过期时间,正常情况下等5分钟。 isLock = redLock.tryLock(5, 300, TimeUnit.SECONDS); log.info("线程{},是否拿到锁:{} ",Thread.currentThread().getName(),isLock); if (isLock) { //TODO if get lock success, do something; //暂停20秒钟线程 try { TimeUnit.SECONDS.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); } } } catch (Exception e) { log.error("redlock exception ",e); } finally { // 无论如何, 最后都要解锁 redLock.unlock(); System.out.println(Thread.currentThread().getName()+"\t"+"redLock.unlock()"); } } }

10、你觉得 Redlock 有什么问题呢?

Redlock是Redis官方提出的一种分布式锁算法,它的设计初衷是为了解决在分布式系统中实现可靠的互斥锁的问题。虽然Redlock在理论上看起来非常可靠,但是在实际使用中存在以下问题:

  1. 非确定性:Redlock算法的可靠性依赖于多个Redis节点之间的同步性和时钟同步性,如果其中一个Redis节点的时钟偏移过大,就可能导致锁的非确定性,这会给系统带来不可预测的风险。
  2. 高延迟:Redlock算法需要对多个Redis节点进行访问和同步,这会导致锁的获取和释放的延迟较高,尤其是在网络延迟较大的情况下,会影响系统的响应速度和性能。
  3. 可重入性问题:Redlock算法没有考虑锁的可重入性,如果同一个线程多次请求同一个锁,就有可能发生死锁或者锁失效的情况。
  4. 不支持阻塞式锁:Redlock算法只支持非阻塞式锁,如果一个线程无法获取锁,就需要不断重试,这会导致锁的争用和系统的性能下降。

综上所述,虽然Redlock算法在理论上看起来非常可靠,但是在实际使用中存在一些问题,需要根据具体情况进行评估和选择,或者采用其他更加可靠的分布式锁算法来替代Redlock算法。

11、Redis如何做分布式锁

Redis 可以使用 setnx 命令来实现分布式锁。setnx 命令原子地设置一个键的值,如果该键不存在,则设置成功并返回 1,否则设置失败并返回 0。利用这个机制,可以将 Redis 的一个字符串类型的键作为锁,并使用 setnx 命令尝试获取锁。

基本的实现流程如下:

  1. 尝试获取锁:客户端向 Redis 发送 setnx 命令,将一个字符串键设置为锁,在命令执行成功时获得锁。
  2. 执行业务逻辑:获取锁后,客户端可以执行一些需要互斥的业务逻辑。
  3. 释放锁:当客户端完成业务逻辑后,使用 del 命令将锁删除,从而释放锁。

需要注意以下几点:

  1. 在获取锁时,需要设置一个过期时间,以防止锁被永久保留(例如出现异常错误)。可以使用 expire 命令或 set 命令的 EX 参数来设置过期时间。
  2. 建议使用随机数或进程 ID 等信息作为锁的值,从而确保每个客户端的锁都是唯一的。
  3. 为了避免死锁和其他竞态条件问题,建议在释放锁之前先检查锁是否属于当前客户端。可以使用 get 命令获取锁的值并比较。
  4. 在高并发情况下,可能会出现多个客户端同时请求锁的情况,可以使用 Lua 脚本等机制来确保原子性操作。

12、使用 setnx 做分布式锁会出现什么问题

在使用 setnx 做分布式锁时,可能会出现以下问题:

死锁:如果一个线程获得了锁但没有释放它,那么其他线程将无法获取该锁,从而导致死锁。

竞态条件:多个线程同时请求锁,可能会导致多个线程都尝试获取锁并成功,从而破坏了锁的互斥性。

并发问题:当多个线程尝试获得锁时,可能会导致竞争和阻塞,从而降低系统性能。

过期时间: 过期时间不定,不好设置,会出现锁提前释放

可重入性问题:如果一个线程已经获取了锁并再次尝试获取锁,这可能会导致死锁或其他不可预测的行为。

13、使用 redisson 做分布式锁

Redisson 是一个基于 Redis 的分布式 Java 对象和服务框架,它提供了一些强大的分布式锁功能,可以帮助我们更安全、高效地实现分布式锁。

使用 Redisson 实现分布式锁的步骤如下:

  1. 引入 Redisson 依赖:在项目中引入 Redisson 的相应依赖,并配置 Redisson 的连接参数。
  2. 创建 RedissonClient:通过 Redisson 的 Config 类配置 RedissonClient,包括 Redis 的 host、port、password 等信息。
  3. 获取锁:使用 RedissonClient 的 getLock 方法获取锁对象。可以选择使用公平锁或非公平锁,也可以设置锁的过期时间。
  4. 执行业务逻辑:获取锁后,客户端可以执行一些需要互斥的业务逻辑。
  5. 释放锁:当客户端完成业务逻辑后,使用 unlock 方法释放锁。如果不手动释放锁,则 Redisson 会自动在锁超时后释放锁。
// 创建 Redisson 客户端 Config config = new Config(); config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("password"); RedissonClient redisson = Redisson.create(config); // 获取分布式锁 RLock lock = redisson.getLock("mylock"); try { // 尝试获取锁,最多等待 10 秒钟,锁的过期时间为 30 秒钟 boolean res = lock.tryLock(10, 30, TimeUnit.SECONDS); if (res) { // 成功获取锁,执行业务逻辑 // ... } } finally { // 释放锁 lock.unlock(); }

需要注意以下几点:

  1. Redisson 支持多种类型的锁,包括公平锁、非公平锁、可重入锁等。在选择锁类型时,需要根据具体需求和场景进行选择。
  2. 在高并发场景中,可能会出现死锁或锁争用问题。Redisson 提供了一些机制来避免这些问题,如重试次数、看门狗等。

14、Redis与zookpeer做分布式锁有什么区别

Redis和Zookeeper都可以用于实现分布式锁,但它们有以下几点区别:

  1. 数据存储方式不同:Redis是内存型数据库,数据存储在内存中,支持持久化,读写速度快;而Zookeeper是一个文件系统,数据存储在磁盘上,读写速度较慢。
  2. 锁的实现方式不同:Redis通过setnx命令和Lua脚本实现分布式锁,具有一定的性能优势;而Zookeeper通过临时节点和Watch机制实现分布式锁,可靠性更高。
  3. 依赖的第三方库不同:Redis需要使用Java客户端或者其它语言的客户端来连接Redis服务端,而Zookeeper则使用Curator库进行连接和操作。
  4. 锁的可重入性和释放机制不同: Redis支持可重入锁和自动释放锁;Zookeeper不支持可重入锁,在锁释放时需要手动删除节点。

总之,Redis和Zookeeper都有各自的优缺点和适用场景。当需要考虑到分布式锁的可靠性和一致性时,Zookeeper可以作为更好的选择;而如果需要高并发下的性能和简单易用性,则Redis可能更适合。

15、Redis分布式锁如何续期?看门狗知道吗?

1、code

public class WatchDogDemo { public static final String LOCKKEY = "AAA"; private static Config config; private static Redisson redisson; static { config = new Config(); config.useSingleServer().setAddress("redis://"+"192.168.111.147"+":6379").setDatabase(0); redisson = (Redisson)Redisson.create(config); } public static void main(String[] args) { RLock redissonLock = redisson.getLock(LOCKKEY); redissonLock.lock(); try { System.out.println("1111"); //暂停几秒钟线程 try { TimeUnit.SECONDS.sleep(25); } catch (InterruptedException e) { e.printStackTrace(); } }catch (Exception e){ e.printStackTrace(); }finally { redissonLock.unlock(); } System.out.println(Thread.currentThread().getName() + " main ------ ends."); //暂停几秒钟线程 try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } redisson.shutdown(); } }

2、缓存续命

Redis 分布式锁过期了,但是业务逻辑还没处理完怎么办

  • 守护线程“续命”
额外起一个线程,定期检查线程是否还持有锁,如果有则延长过期时间。

Redisson 里面就实现了这个方案,使用“看门狗”定期检查(每1/3的锁时间检查1次),如果线程还持有锁,则刷新过期时间;
分布式难以避免的,系统时钟影响

如果线程 1 从 3 个实例获取到了锁。但是这 3 个实例中的某个实例的系统时间走的稍微快一点,则它持有的锁会提前过期被释放,当他释放后,此时又有 3 个实例是空闲的,则线程2也可以获取到锁,则可能出现两个线程同时持有锁了。

3、watchdog

在获取锁成功后,给锁加一个 watchdog,watchdog 会起一个定时任务,在锁没有被释放且快要过期的时候会续期

image-20210628144855062

image-20210628144912598

4、源码解析

1、分析1 - 通过redisson新建出来的锁key,默认是30秒

image-20210628144933314
2、分析2

image-20210628144943694
3、分析3

image-20210628144956071

这里面初始化了一个定时器,dely 的时间是 internalLockLeaseTime/3。

在 Redisson 中,internalLockLeaseTime 是 30s,也就是每隔 10s 续期一次,每次 30s。

image-20210628145006568
4、分析4 - watch dog自动延期机制

客户端A加锁成功,就会启动一个watch dog看门狗,他是一个后台线程,会每隔10秒检查一下,如果客户端A还持有锁key,那么就会不断的延长锁key的生存时间,默认每次续命又从30秒新开始

image-20210628145019802
5、分析5
KEYS[1]代表的是你加锁的那个keyRLock redissonLock = redisson.getLock("lockxx");这里你自己设置了加锁的那个锁key
ARGV[2]代表的是加锁的客户端的ID

202207111522676

ARGV[1]就是锁key的默认生存时间默认30秒
如何加锁你要加锁的那个锁key不存在的话,你就进行加锁hincrby 7bcf6a9f-e7f7-49b0-9727-141df3b88038:117 1接着会执行 pexpire lockxx 30000

image-20210628145048697
6、流程解释
  • 通过exists判断,如果锁不存在,则设置值和过期时间,加锁成功
  • 通过hexists判断,如果锁已存在,并且锁的是当前线程,则证明是重入锁,加锁成功
  • 如果锁已存在,但锁的不是当前线程,则证明有其他线程持有锁。返回当前锁的过期时间(代表了lockxx这个锁key的剩余生存时间),加锁失败
7、加锁查看
package com.xx.redis.test; import org.redisson.Redisson; import org.redisson.api.RLock; import org.redisson.config.Config; import java.util.concurrent.TimeUnit; public class WatchDogDemo { public static final String LOCKKEY = "AAA"; private static Config config; private static Redisson redisson; static { config = new Config(); config.useSingleServer().setAddress("redis://"+"192.168.111.147"+":6379").setDatabase(0); redisson = (Redisson)Redisson.create(config); } public static void main(String[] args) { RLock redissonLock = redisson.getLock(LOCKKEY); redissonLock.lock(); try { System.out.println("1111"); //暂停几秒钟线程 try { TimeUnit.SECONDS.sleep(25); } catch (InterruptedException e) { e.printStackTrace(); } }catch (Exception e){ e.printStackTrace(); }finally { if (redissonLock.isLocked() && redissonLock.isHeldByCurrentThread()) { redissonLock.unlock(); } } System.out.println(Thread.currentThread().getName() + " main ------ ends."); //暂停几秒钟线程 try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } redisson.shutdown(); } } 加锁成功后,在redis的内存数据中,就有一条hash结构的数据。 Key为锁的名称;field为随机字符串+线程ID;值为1。见下 如果同一线程多次调用lock方法,值递增1。----------可重入锁见后

image-20210628145110022
8、可重入锁查看

image-20210628145121353

加大业务逻辑处理时间,看超过10秒钟后,redisson的续命加时

9、解锁

image-20210628145138091

16、生产上你们你们的redis内存设置多少?

一般推荐Redis设置内存为最大物理内存的四分之三

Redis的内存设置应该根据具体的应用场景和需求来进行调整。一般来说,需要考虑以下几个方面:

  1. 数据量:根据需要存储的数据量来设置Redis的内存大小,确保足够存储所有数据。
  2. 并发访问量:如果有大量并发访问,需要设置足够的内存来支持并发读写操作。
  3. 数据类型:不同的数据类型在Redis中占据的内存大小不同,需要根据实际情况来进行调整。
  4. 操作频率:如果某些数据经常被访问,需要分配更多的内存来加速访问。
  5. 可用性和可靠性:在设置Redis的内存大小时,需要考虑到系统的可用性和可靠性,确保系统在处理大量数据时不会出现宕机或数据丢失的情况。

因此,对于生产环境的Redis内存设置,需要根据具体的业务需求和系统规模进行调整,并进行充分的测试和评估,以确保Redis能够稳定运行并满足业务需求。

17、如何配置、修改redis的内存大小

1、查看Redis最大占用内存

image-20210628145303624

打开redis配置文件,设置maxmemory参数,maxmemory是bytes字节类型,注意转换。

2、redis默认内存多少可以用?

image-20210628145313236

3、一般生产上你如何配置?

一般推荐Redis设置内存为最大物理内存的四分之三

因为Redis内存的设置需要根据具体的业务需求和系统规模来调整,不同的应用场景和业务负载下,Redis内存的设置可能会有所不同。

一般来说,Redis的内存设置应该考虑以下几个因素:

  1. 数据量大小:根据业务数据量大小来确定Redis的内存大小,通常建议将Redis的内存大小设置为业务数据量的1.5倍以上,以确保系统的高效和稳定。
  2. 并发请求量:根据系统的并发请求量来调整Redis的内存大小,通常建议将Redis的内存大小设置为能够支持最高并发请求量的2倍以上,以确保系统的高性能和可靠性。
  3. 数据类型和访问频率:根据不同的数据类型和访问频率,对Redis的内存大小进行合理的分配,将访问频率较高的数据存储在内存中,访问频率较低的数据存储在磁盘中,以最大程度地减少内存的占用和提高系统的性能。

需要注意的是,Redis内存的设置需要综合考虑多方面的因素,包括系统规模、业务负载、数据类型和访问频率等因素,同时需要进行实时监控和调整,以保证系统的高效和稳定。

4、如何修改redis内存设置

  • 通过修改文件配置

image-20210628145328630
  • 通过命令修改

image-20210628145337988

18、Redis清理内存的方式?定期删除和惰性删除了解过吗

1、三种不同的删除策略

如果一个键是过期的,那它到了过期时间之后是不是马上就从内存中被被删除呢??

NO
1、立即删除
Redis不可能时时刻刻遍历所有被设置了生存时间的key,来检测数据是否已经到达过期时间,然后对它进行删除。

立即删除能保证内存中数据的最大新鲜度,因为它保证过期键值会在过期后马上被删除,其所占用的内存也会随之释放。但是立即删除对cpu是最不友好的。因为删除操作会占用cpu的时间,如果刚好碰上了cpu很忙的时候,比如正在做交集或排序等计算的时候,就会给cpu造成额外的压力,让CPU心累,时时需要删除,忙死。。。。。。。这会产生大量的性能消耗,同时也会影响数据的读取操作。

总结:对CPU不友好,用处理器性能换取存储空间 (拿时间换空间)
2、惰性删除
数据到达过期时间,不做处理。等下次访问该数据时,

如果未过期,返回数据 ;

发现已过期,删除,返回不存在。

惰性删除策略的缺点是,它对内存是最不友好的

如果一个键已经过期,而这个键又仍然保留在redis中,那么只要这个过期键不被删除,它所占用的内存就不会释放。

在使用惰性删除策略时,如果数据库中有非常多的过期键,而这些过期键又恰好没有被访问到的话,那么它们也许永远也不会被删除(除非用户手动执行FLUSHDB),我们甚至可以将这种情况看作是一种内存泄漏–无用的垃圾数据占用了大量的内存,而服务器却不会自己去释放它们,这对于运行状态非常依赖于内存的Redis服务器来说,肯定不是一个好消息

总结:对memory不友好,用存储空间换取处理器性能(拿空间换时间)
3、定期删除
定期删除策略是前两种策略的折中:

定期删除策略每隔一段时间执行一次删除过期键操作,并通过限制删除操作执行的时长和频率来减少删除操作对CPU时间的影响。

周期性轮询redis库中的时效性数据,采用随机抽取的策略,利用过期数据占比的方式控制删除频度

特点1:CPU性能占用设置有峰值,检测频度可自定义设置

特点2:内存压力不是很大,长期占用内存的冷数据会被持续清理

总结:周期性抽查存储空间 (随机抽查,重点抽查)

redis默认每个100ms检查,是否有过期的key,有过期key则删除。注意:redis不是每隔100ms将所有的key检查一次而是随机抽取进行检查(如果每隔100ms,全部key进行检查,redis直接进去ICU)。因此,如果只采用定期删除策略,会导致很多key到时间没有删除。

定期删除策略的难点是确定删除操作执行的时长和频率:如果删除操作执行得太频繁,或者执行的时间太长,定期删除策略就会退化成立即删除策略,以至于将CPU时间过多地消耗在删除过期键上面。如果删除操作执行得太少,或者执行的时间太短,定期删除策略又会和惰性删除束略一样,出现浪费内存的情况。因此,如果采用定期删除策略的话,服务器必须根据情况,合理地设置删除操作的执行时长和执行频率。

总结:定期抽样key,判断是否过期 会有漏网之鱼

1 定期删除时,从来没有被抽查到

2 惰性删除时,也从来没有被点中使用过

上述2步骤======> 大量过期的key堆积在内存中,导致redis内存空间紧张或者很快耗尽

19、Redis缓存淘汰策略

1、noeviction: 不会驱逐任何key

2、allkeys-lru: 对所有key使用LRU算法进行删除

3、volatile-lru: 对所有设置了过期时间的key使用LRU算法进行删除

4、allkeys-random: 对所有key随机删除

5、volatile-random: 对所有设置了过期时间的key随机删除

6、volatile-ttl: 删除马上要过期的key

7、allkeys-lfu: 对所有key使用LFU算法进行删除

8、volatile-lfu: 对所有设置了过期时间的key使用LFU算法进行删除

9、总结

  • 2 * 4 得8
  • 2个维度
    • 过期键中筛选
    • 所有键中筛选
  • 4个方面
    • LRU
    • LFU
    • random
    • ttl

10、生产用哪个

allkeys-lru

20、Redis的LRU了解过吗

LRU(Least Recently Used,最近最少使用)是Redis中一种常用的缓存淘汰策略。LRU策略会优先淘汰最近最少使用的缓存对象,从而保留最近使用频率较高的缓存对象,以提高缓存的命中率和性能。

在Redis中,LRU策略的实现是通过一个双向链表和一个哈希表来完成的。哈希表中存储了缓存对象的键和指向双向链表中对应节点的指针,双向链表中存储了缓存对象的值和上一次访问时间等信息。

当缓存对象需要淘汰时,Redis会从双向链表的尾部开始查找最近最少使用的缓存对象,并将其从哈希表和双向链表中同时删除。当有新的缓存对象加入时,Redis会将其插入双向链表的头部,并在哈希表中添加对应的键和指针。

需要注意的是,Redis中的LRU策略并不是完全精确的,因为它只能根据上一次访问时间来判断缓存对象的使用频率,在某些情况下可能会导致一些较为频繁访问但访问时间较久远的缓存对象被误判为“冷数据”,因此在实际使用中,需要根据具体的业务场景来选择合适的缓存淘汰策略。

21、你只要用缓存,就可能会涉及到缓存与数据库双存储双写,你只要是双写,就一定会有数据一致性的问题,那么你如何解决一致性问题?

1、canal

1、简介
canal [kə'næl],中文翻译为 水道/管道/沟渠/运河,主要用途是用于 MySQL 数据库增量日志数据的订阅、消费和解析,是阿里巴巴开发并开源的,采用Java语言开发;

历史背景是早期阿里巴巴因为杭州和美国双机房部署,存在跨机房数据同步的业务需求,实现方式主要是基于业务 trigger(触发器) 获取增量变更。从2010年开始,阿里巴巴逐步尝试采用解析数据库日志获取增量变更进行同步,由此衍生出了canal项目;

https://github.com/alibaba/canal

Canal是基于MySQL变更日志增量订阅和消费的组件
  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理
下载canal: https://github.com/alibaba/canal/wiki/QuickStart

java案例:https://github.com/alibaba/canal/wiki/ClientExample
2、工作原理
1、传统MySQL主从复制工作原理

image-20210628150140044
MySQL的主从复制将经过如下步骤:

1、当 master 主服务器上的数据发生改变时,则将其改变写入二进制事件日志文件中;

2、salve 从服务器会在一定时间间隔内对 master 主服务器上的二进制日志进行探测,探测其是否发生过改变,如果探测到 master 主服务器的二进制事件日志发生了改变,则开始一个 I/O Thread 请求 master 二进制事件日志;

3、同时 master 主服务器为每个 I/O Thread 启动一个dump Thread,用于向其发送二进制事件日志;

4、slave 从服务器将接收到的二进制事件日志保存至自己本地的中继日志文件中;

5、salve 从服务器将启动 SQL Thread 从中继日志中读取二进制日志,在本地重放,使得其数据和主服务器保持一致;

6、最后 I/O Thread 和 SQL Thread 将进入睡眠状态,等待下一次被唤醒;
2、canal工作原理
canal 工作原理

canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议

MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )

canal 解析 binary log 对象(原始为 byte 流)

image-20210628150153951

分布式系统只有最终一致性,很难做到强一致性

2、mysql-canal-redis双写一致性Coding

1、mysql版本5.7.28
1、脚本
CREATE TABLE `t_user` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `userName` varchar(100) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8mb4;
2、当前的主机二进制日志
show master status;
3、查看SHOW VARIABLES LIKE 'log_bin';

image-20210628150208265
4、开启 MySQL的binlog写入功能
mysql安装目录

D:\devSoft\mysql\mysql5.7.28目录下打开 my.ini

最好备份

log-bin=mysql-bin #开启 binlog

binlog-format=ROW #选择 ROW 模式

server_id=1 #配置MySQL replaction需要定义,不要和canal的 slaveId重复

ROW模式 除了记录sql语句之外,还会记录每个字段的变化情况,能够清楚的记录每行数据的变化历史,但会占用较多的空间。

STATEMENT模式只记录了sql语句,但是没有记录上下文信息,在进行数据恢复的时候可能会导致数据的丢失情况;

MIX模式比较灵活的记录,理论上说当遇到了表结构变更的时候,就会记录为statement模式。当遇到了数据更新或者删除情况下就会变为row模式;

window my.ini

linux my.cnf

image-20210628150220333
重启mysql
5、授权canal连接MySQL账号
mysql默认的用户在mysql库的user表里





默认没有canal账户,此处新建+授权

DROP USER 'canal'@'%';

CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';

GRANT ALL PRIVILEGES ON . TO 'canal'@'%' IDENTIFIED BY 'canal';

FLUSH PRIVILEGES;

SELECT * FROM mysql.user;



2、canal服务端
1、下载
https://github.com/alibaba/canal/releases
2、解压后整体放入/mycanal路径下

image-20210628150314283
3、配置修改
  • /mycanal/canal.deployer-1.1.5/conf/example路径下

image-20210628150324228
  • instance.properties
  • 换成自己的mysql的IP地址
  • 换成自己的在mysql新建的canal账户
  • /mycanal/canal.deployer-1.1.5/bin路径下执行
  • ./startup.sh
image-20210628150347647
image-20210628150335132
4、查看 server 日志

image-20210628150358368
5、查看 instance 的日志

image-20210628150407089

3、canal客户端(Java编写业务程序)

1、canal_demo
2、pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.5.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.xx.study</groupId> <artifactId>canal_demo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>canal_demo</name> <description>Demo project for Spring Boot</description> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <junit.version>4.12</junit.version> <log4j.version>1.2.17</log4j.version> <lombok.version>1.16.18</lombok.version> <mysql.version>5.1.47</mysql.version> <druid.version>1.1.16</druid.version> <mybatis.spring.boot.version>1.3.0</mybatis.spring.boot.version> </properties> <dependencies> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.0</version> </dependency> <!--guava--> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>23.0</version> </dependency> <!--web+actuator--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <!--SpringBoot与Redis整合依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> </dependency> <!-- jedis --> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.1.0</version> </dependency> <!-- springboot-aop 技术--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> <!-- redisson --> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.13.4</version> </dependency> <!--Mysql数据库驱动--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.47</version> </dependency> <!--集成druid连接池--> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> <version>1.1.10</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>${druid.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>${druid.version}</version> </dependency> <!--mybatis和springboot整合--> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>${mybatis.spring.boot.version}</version> </dependency> <!-- 添加springboot对amqp的支持 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!--通用基础配置--> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>${junit.version}</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>${log4j.version}</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>${lombok.version}</version> <optional>true</optional> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>RELEASE</version> <scope>compile</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.73</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
3、yml
server.port=5555
4、RedisUtils
import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; public class RedisUtils { public static JedisPool jedisPool; static { JedisPoolConfig jedisPoolConfig=new JedisPoolConfig(); jedisPoolConfig.setMaxTotal(20); jedisPoolConfig.setMaxIdle(10); jedisPool=new JedisPool(jedisPoolConfig,"192.168.111.147",6379); } public static Jedis getJedis() throws Exception { if(null!=jedisPool){ return jedisPool.getResource(); } throw new Exception("Jedispool is not ok"); } /*public static void main(String[] args) throws Exception { try(Jedis jedis = RedisUtils.getJedis()) { System.out.println(jedis); jedis.set("k1","xxx2"); String result = jedis.get("k1"); System.out.println("-----result: "+result); System.out.println(RedisUtils.jedisPool.getNumActive());//1 }catch (Exception e){ e.printStackTrace(); } }*/ }
5、RedisCanalClientExample
import com.alibaba.fastjson.JSONObject; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry.*; import com.alibaba.otter.canal.protocol.Message; import com.xx.study.util.RedisUtils; import org.springframework.beans.factory.annotation.Autowired; import redis.clients.jedis.Jedis; import java.net.InetSocketAddress; import java.util.List; import java.util.concurrent.TimeUnit; /** * @auther xx * @create 2020-11-11 17:13 */ public class RedisCanalClientExample { public static final Integer _60SECONDS = 60; public static void main(String args[]) { // 创建链接canal服务端 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.111.147", 11111), "example", "", ""); int batchSize = 1000; int emptyCount = 0; try { connector.connect(); //connector.subscribe(".*\\..*"); connector.subscribe("db2020.t_user"); connector.rollback(); int totalEmptyCount = 10 * _60SECONDS; while (emptyCount < totalEmptyCount) { Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { emptyCount++; try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } else { emptyCount = 0; printEntry(message.getEntries()); } connector.ack(batchId); // 提交确认 // connector.rollback(batchId); // 处理失败, 回滚数据 } System.out.println("empty too many times, exit"); } finally { connector.disconnect(); } } private static void printEntry(List<Entry> entrys) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; } RowChange rowChage = null; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR # parser of eromanga-event has an error,data:" + entry.toString(),e); } EventType eventType = rowChage.getEventType(); System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (RowData rowData : rowChage.getRowDatasList()) { if (eventType == EventType.INSERT) { redisInsert(rowData.getAfterColumnsList()); } else if (eventType == EventType.DELETE) { redisDelete(rowData.getBeforeColumnsList()); } else {//EventType.UPDATE redisUpdate(rowData.getAfterColumnsList()); } } } } private static void redisInsert(List<Column> columns) { JSONObject jsonObject = new JSONObject(); for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); jsonObject.put(column.getName(),column.getValue()); } if(columns.size() > 0) { try(Jedis jedis = RedisUtils.getJedis()) { jedis.set(columns.get(0).getValue(),jsonObject.toJSONString()); }catch (Exception e){ e.printStackTrace(); } } } private static void redisDelete(List<Column> columns) { JSONObject jsonObject = new JSONObject(); for (Column column : columns) { jsonObject.put(column.getName(),column.getValue()); } if(columns.size() > 0) { try(Jedis jedis = RedisUtils.getJedis()) { jedis.del(columns.get(0).getValue()); }catch (Exception e){ e.printStackTrace(); } } } private static void redisUpdate(List<Column> columns) { JSONObject jsonObject = new JSONObject(); for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); jsonObject.put(column.getName(),column.getValue()); } if(columns.size() > 0) { try(Jedis jedis = RedisUtils.getJedis()) { jedis.set(columns.get(0).getValue(),jsonObject.toJSONString()); System.out.println("---------update after: "+jedis.get(columns.get(0).getValue())); }catch (Exception e){ e.printStackTrace(); } } } }

22、双写一致性,你先动缓存redis还是数据库mysql哪一个?why?

1、如果redis中有数据
需要和数据库中的值相同
2、如果redis中无数据
数据库中的值要是最新值

2、缓存按照操作来分,有细分2种

1、只读缓存
2、读写缓存
  • 同步直写策略:写缓存时也同步写数据库,缓存和数据库中的数据⼀致;
  • 对于读写缓存来说,要想保证缓存和数据库中的数据⼀致,就要采⽤同步直写策略

3、数据库和缓存一致性的几种更新策略

单线程,这样重量级的数据操作最好不要多线程

总之,我们要达到最终一致性!

我们可以对存入缓存的数据设置过期时间,所有的写操作以数据库为准,对缓存操作只是尽最大努力即可。也就是说如果数据库写成功,缓存更新失败,那么只要到达过期时间,则后面的读请求自然会从数据库中读取新值然后回填缓存,达到一致性,切记以mysql的数据库写入库为准。
1、3种更新策略
1、先更新数据库,再更新缓存
1 先更新mysql的某商品的库存,当前商品的库存是100,更新为99个。

2 先更新mysql修改为99成功,然后更新redis。3 此时假设异常出现,更新redis失败了,这导致mysql里面的库存是99而redis里面的还是100 。4 上述发生,会让数据库里面和缓存redis里面数据不一致,读到脏数据
2、先删除缓存,再更新数据库
问题
这里写20秒,是自己故意乱写的,表示更新数据库可能失败,实际中不可能...O(∩_∩)O哈哈~

1 A线程先成功删除了redis里面的数据,然后去更新mysql,此时mysql正在更新中,还没有结束。(比如网络延时)

B突然出现要来读取缓存数据。





2 此时redis里面的数据是空的,B线程来读取,先去读redis里数据(已经被A线程delete掉了),此处出来2个问题:

2.1 B从mysql获得了旧值

​ B线程发现redis里没有(缓存缺失)马上去mysql里面读取,从数据库里面读取来的是旧值。

2.2 B会把获得的旧值写回redis

获得旧值数据后返回前台并回写进redis(刚被A线程删除的旧数据有极大可能又被写回了)。





3 A线程更新完mysql,发现redis里面的缓存是脏数据,A线程直接懵逼了,o(╥﹏╥)o

两个并发操作,一个是更新操作,另一个是查询操作,A更新操作删除缓存后,B查询操作没有命中缓存,B先把老数据读出来后放到缓存中,然后A更新操作更新了数据库。

于是,在缓存中的数据还是老的数据,导致缓存中的数据是脏的,而且还一直这样脏下去了。

4 总结流程:

(1)请求A进行写操作,删除缓存后,工作正在进行中......A还么有彻底更新完

(2)请求B开工,查询redis发现缓存不存在

(3)请求B继续,去数据库查询得到了myslq中的旧值

(4)请求B将旧值写入redis缓存

(5)请求A将新值写入mysql数据库

上述情况就会导致不一致的情形出现。



解决方案

多个线程同时去查询数据库的这条数据,那么我们可以在第一个查询数据的请求上使用一个 互斥锁来锁住它。

其他的线程走到这一步拿不到锁就等着,等第一个线程查询到了数据,然后做缓存。

后面的线程进来发现已经有缓存了,就直接走缓存。



采用延时双删策略

image-20210628150706131

image-20210628150715573

双删方案面试题

这个删除该休眠多久呢

线程Asleep的时间,就需要大于线程B读取数据再写入缓存的时间。

这个时间怎么确定呢?

在业务程序运行的时候,统计下线程读数据和写缓存的操作时间,自行评估自己的项目的读数据业务逻辑的耗时,

以此为基础来进行估算。然后写数据的休眠时间则在读数据业务逻辑的耗时基础上加百毫秒即可。

这么做的目的,就是确保读请求结束,写请求可以删除读请求造成的缓存脏数据。

当前演示的效果是mysql单机,如果mysql主从读写分离架构如何?

(1)请求A进行写操作,删除缓存

(2)请求A将数据写入数据库了,

(3)请求B查询缓存发现,缓存没有值

(4)请求B去从库查询,这时,还没有完成主从同步,因此查询到的是旧值

(5)请求B将旧值写入缓存

(6)数据库完成主从同步,从库变为新值 上述情形,就是数据不一致的原因。还是使用双删延时策略。
只是,睡眠时间修改为在主从同步的延时时间基础上,加几百ms

这种同步淘汰策略,吞吐量降低怎么办?

image-20210628150744199
3、先更新数据库,再删除缓存

image-20210628150754726

业务指导思想

解决方案

image-20210628150807012
1 可以把要删除的缓存值或者是要更新的数据库值暂存到消息队列中(例如使用Kafka/RabbitMQ等)。
2 当程序没有能够成功地删除缓存值或者是更新数据库值时,可以从消息队列中重新读取这些值,然后再次进行删除或更新。
3 如果能够成功地删除或更新,我们就要把这些值从消息队列中去除,以免重复操作,此时,我们也可以保证数据库和缓存的数据一致了,否则还需要再次进行重试
4 如果重试超过的一定次数后还是没有成功,我们就需要向业务层发送报错信息了,通知运维人员。
4、方案2和方案3用那个?利弊如何
在大多数业务场景下,我们会把Redis作为只读缓存使用。假如定位是只读缓存来说,
理论上我们既可以先删除缓存值再更新数据库,也可以先更新数据库再删除缓存,但是没有完美方案,两害相衡趋其轻的原则
个人建议是,优先使用先更新数据库,再删除缓存的方案。理由如下:
1 先删除缓存值再更新数据库,有可能导致请求因缓存缺失而访问数据库,给数据库带来压力,严重导致打满mysql。
2 如果业务应用中读取数据库和写缓存的时间不好估算,那么,延迟双删中的等待时间就不好设置。

多补充一句:如果使用先更新数据库,再删除缓存的方案

如果业务层要求必须读取一致性的数据,那么我们就需要在更新数据库时,先在Redis缓存客户端暂存并发读请求,等数据库更新完、缓存值删除后,再读取数据,从而保证数据一致性。

image-20210628150828581

23、Redis存储对象要么用string要么用hash,什么时候用string?什么时候用hash

在Redis中,可以使用string和hash来存储对象,它们各有优缺点,应根据具体场景进行选择。

当对象比较简单,只包含一个键值对时,可以使用string类型来存储对象。string类型的数据结构非常简单,存储效率高,适合存储简单的字符串、数字等数据类型。例如,可以使用string类型来存储用户的ID、用户名、年龄等信息。

当对象比较复杂,包含多个属性时,可以使用hash类型来存储对象。hash类型可以看作是一张哈希表,其中包含多个键值对,每个键值对表示对象的一个属性。使用hash类型可以更方便地存储和操作对象的各个属性,同时也可以节省存储空间。例如,可以使用hash类型来存储用户的详细信息,包括姓名、地址、电话等多个属性。

需要注意的是,当使用hash类型存储对象时,需要考虑到哈希表的扩容和缩容问题,因为哈希表的大小是固定的,当存储的键值对数量超过哈希表的负载因子时,Redis会自动扩容哈希表,从而导致一定的性能损耗和内存浪费。因此,在使用hash类型存储对象时,需要合理设置哈希表的初始大小和负载因子,以避免频繁扩容和浪费内存的情况发生。

24、你在项目中对Redis的使用,请不要说分布式锁

  1. 缓存:Redis可以作为缓存来提高系统的性能和响应速度。例如,可以将经常访问的数据存储在Redis中,避免每次请求都要从数据库中查询数据。
  2. 计数器:Redis可以用来实现计数器功能,例如统计网站的访问量、用户的登录次数等。通过INCR等命令可以实现原子性自增或自减操作。
  3. 消息队列:Redis可以用来实现简单的消息队列功能,例如将需要异步处理的任务放入Redis队列中,然后由后台线程或其他进程来消费队列中的任务。
  4. 分布式限流:Redis可以用来实现分布式限流功能,例如限制API的访问频率、防止爬虫等恶意行为。
  5. 排行榜:Redis可以用来实现排行榜功能,例如统计用户的积分、评论数等信息,然后根据这些信息来生成排行榜。
  6. 地理位置:Redis可以用来存储地理位置信息,例如根据经纬度查询附近的商家、朋友等。

以上只是一些简单的使用场景,Redis还有更多的用途,例如发布/订阅、持久化、事务等,可以根据项目需求来选择合适的使用方式。

25、美团面试官,直接打开App,请问我们美团附近的酒店你如何设计并落地,谈谈你的想法

  1. 地理位置服务:首先需要使用地理位置服务来获取用户的当前位置,可以使用GPS、WiFi、基站等多种方式来实现。获取到用户的位置后,可以使用地图API来显示周边的酒店信息。
  2. 数据存储:需要将酒店的信息存储到数据库中,例如酒店的名称、地址、评分、价格等信息。可以使用MySQL、Redis等数据库来存储数据,其中Redis可以作为缓存来提高查询效率。
  3. 搜索功能:可以使用全文检索引擎来实现搜索功能,例如Elasticsearch、Solr等。用户可以输入关键词来搜索酒店名称、地址等信息,搜索结果可以按照距离、价格、评分等多种方式进行排序。也可以使用Redis的GEO实现。
  4. 推荐功能:可以使用推荐算法来实现个性化推荐功能,例如根据用户的历史订单、浏览记录、评价等信息来推荐适合的酒店。可以使用协同过滤、基于内容的推荐、深度学习等算法来实现推荐功能。
  5. 营销功能:可以使用优惠券、积分、返现等营销手段来吸引用户下单。可以根据用户的购买习惯、地理位置等信息来精准投放优惠券等活动。
  6. 用户评论:用户可以对酒店进行评价和评论,可以使用机器学习等算法来自动分类、提取评论中的情感信息,从而帮助用户更好地选择酒店。
  7. 交易系统:最后需要建立完善的交易系统,包括订单生成、支付、退款、售后等功能。可以使用支付宝、微信支付等第三方支付平台来实现支付功能,可以使用客服系统来处理用户的售后问题。

以上是我对美团附近酒店的设计思路,针对不同的场景和需求,可以做出不同的调整和优化。

26、Redis的IO多路复用如何理解,为什么单线程还可以抗那么高的qps

IO多路复用简单说明

I/O 的读和写本身是堵塞的,比如当 socket 中有数据时,Redis 会通过调用先将数据从内核态空间拷贝到用户态空间,再交给 Redis 调用,而这个拷贝的过程就是阻塞的,当数据量越大时拷贝所需要的时间就越多,而这些操作都是基于单线程完成的。

image-20210628140934483

​ 在 Redis 6.0 中新增了多线程的功能来提高 I/O 的读写性能,他的主要实现思路是将主线程的 IO 读写任务拆分给一组独立的线程去执行,这样就可以使多个 socket 的读写可以并行化了,采用多路 I/O 复用技术可以让单个线程高效的处理多个连接请求(尽量减少网络IO的时间消耗),将最耗时的Socket的读取、请求解析、写入单独外包出去,剩下的命令执行仍然由主线程串行执行并和内存的数据交互。

image-20210628140943429

结合上图可知,网络IO操作就变成多线程化了,其他核心部分仍然是线程安全的,是个不错的折中办法。

Redis 6.0 将网络数据读写、请求协议解析通过多个IO线程的来处理 ,对于真正的命令执行来说,仍然使用主线程操作,一举两得.

image-20210628140956180

27、Redis的zset底层实现,说了压缩列表和跳表,问这样设计的优缺点

Redis中的zset是一种有序集合,它的底层实现采用了压缩列表和跳表的结构。

压缩列表是一种紧凑的线性结构,可以节省内存空间。在zset中,如果集合中的元素数量比较少,可以使用压缩列表来存储数据,因为压缩列表的结构比较简单,只需要存储元素的score和value即可。压缩列表的优点是占用内存小,但缺点是查询效率较低,因为需要遍历整个列表才能找到指定的元素。

跳表是一种高效的有序集合结构,可以快速地进行查找、插入和删除操作。在zset中,如果集合中的元素数量比较多,可以使用跳表来存储数据,因为跳表的查询效率比较高,可以在O(log n)的时间复杂度内查找元素。跳表的优点是查询效率高,但缺点是占用内存空间比较大。

综合来看,Redis中的zset底层实现采用了压缩列表和跳表的结构,可以在不同的场景下选择合适的数据结构来存储数据,从而达到节省内存空间和提高查询效率的目的。但是,这样的设计也存在一些缺点,例如在插入或删除元素时需要同时更新压缩列表和跳表,因此会增加一定的复杂度和开销。此外,压缩列表和跳表的结构比较复杂,需要占用一定的计算资源来维护和更新,因此可能会影响系统的性能和稳定性。

28、Redis的跳表说一下,解决了什么问题,时间复杂度和空间复杂度如何

Redis中的跳表(Skip List)是一种基于链表的数据结构,可以快速地进行查找、插入和删除操作,常用于实现有序集合和索引等功能。跳表的设计思想是通过增加多级索引来加速查找操作,从而达到快速查询的目的。

跳表的主要优势是通过平衡时间和空间复杂度来实现高效的查找操作,避免了传统的平衡二叉树需要维护平衡的复杂度和开销。跳表在插入、删除元素时,只需要更新相邻节点的索引,而不需要重新平衡整棵树,从而提高了操作的效率。因此,跳表的时间复杂度为O(log n),空间复杂度为O(n)。

跳表中包含多级索引,每一级索引都是一个有序的链表,索引节点的值比下一级索引节点的值大。跳表的最底层索引包含了所有的数据节点,因此可以通过最底层的索引进行查找。

在插入或删除元素时,需要在跳表中找到相应的位置,并更新相邻节点的索引。因为每一级索引的节点数是按照一定概率随机生成的,因此跳表的高度是随机的,但平均高度为O(log n)。

总之,跳表是一种高效的数据结构,可以在查找、插入和删除元素时达到O(log n)的时间复杂度,同时空间复杂度为O(n),适合用于实现有序集合和索引等功能。

29、IO多路复用select和epllo了解?

IO多路复用是指在单个线程内同时监视多个文件描述符,从而实现对I/O操作的异步处理。其中,select和epoll是两种常用的IO多路复用机制。

select是Unix系统中最古老的IO多路复用机制,它可以同时监视多个文件描述符,一旦某个文件描述符就绪,就会通知应用程序进行处理。select的缺点是它采用轮询的方式来监视文件描述符,当文件描述符的数量较多时,会导致系统开销较大,因此在高并发场景下效率不高。

epoll是Linux系统中较新的IO多路复用机制,它采用事件驱动的方式来监视文件描述符,并且可以支持较大的并发连接数,因此在高并发场景下效率更高。epoll的优点是它支持边缘触发和水平触发两种模式,可以更加精细地控制事件通知,从而提高了系统的性能和稳定性。

总之,select和epoll都是常用的IO多路复用机制,它们能够实现对多个文件描述符的异步处理和高效监视,但是epoll在高并发场景下的效率更高,因此在实际开发中应该根据具体情况选择合适的IO多路复用机制。

31、一个字符串类型的值能存储最大容量是多少?

Redis一个字符串类型的值最大能存储512MB的数据。这是由于Redis使用了一个名为sds(simple dynamic string)的数据结构来存储字符串,其最大长度为2^32-1字节,即4GB,但是Redis规定单个字符串对象的最大长度为512MB,这是为了确保Redis的性能和稳定性,在数据存储和传输过程中不会出现过度消耗内存、影响系统性能的情况。

32、Redis 的持久化机制是什么?各自的优缺点?

Redis 提供了不同级别的持久化方式:

  • RDB持久化方式能够在指定的时间间隔能对你的数据进行快照存储.
  • AOF持久化方式记录每次对服务器写的操作,当服务器重启的时候会重新执行这些命令来恢复原始的数据,AOF命令以redis协议追加保存每次写的操作到文件末尾.Redis还能对AOF文件进行后台重写,使得AOF文件的体积不至于过大.
  • 如果你只希望你的数据在服务器运行的时候存在,你也可以不使用任何持久化方式.
  • 你也可以同时开启两种持久化方式, 在这种情况下, 当redis重启的时候会优先载入AOF文件来恢复原始的数据,因为在通常情况下AOF文件保存的数据集要比RDB文件保存的数据集要完整.

RDB

优点
  • RDB是一个非常紧凑的文件,它保存了某个时间点得数据集,非常适用于数据集的备份,比如你可以在每个小时报保存一下过去24小时内的数据,同时每天保存过去30天的数据,这样即使出了问题你也可以根据需求恢复到不同版本的数据集.
  • RDB是一个紧凑的单一文件,很方便传送到另一个远端数据中心或者亚马逊的S3(可能加密),非常适用于灾难恢复.
  • RDB在保存RDB文件时父进程唯一需要做的就是fork出一个子进程,接下来的工作全部由子进程来做,父进程不需要再做其他IO操作,所以RDB持久化方式可以最大化redis的性能.
  • 与AOF相比,在恢复大的数据集的时候,RDB方式会更快一些.
缺点
  • 如果你希望在redis意外停止工作(例如电源中断)的情况下丢失的数据最少的话,那么RDB不适合你.虽然你可以配置不同的save时间点(例如每隔5分钟并且对数据集有100个写的操作),是Redis要完整的保存整个数据集是一个比较繁重的工作,你通常会每隔5分钟或者更久做一次完整的保存,万一在Redis意外宕机,你可能会丢失几分钟的数据.
  • RDB 需要经常fork子进程来保存数据集到硬盘上,当数据集比较大的时候,fork的过程是非常耗时的,可能会导致Redis在一些毫秒级内不能响应客户端的请求.如果数据集巨大并且CPU性能不是很好的情况下,这种情况会持续1秒,AOF也需要fork,但是你可以调节重写日志文件的频率来提高数据集的耐久度.

AOF

优点
  • 使用AOF 会让你的Redis更加耐久: 你可以使用不同的fsync策略:无fsync,每秒fsync,每次写的时候fsync.使用默认的每秒fsync策略,Redis的性能依然很好(fsync是由后台线程进行处理的,主线程会尽力处理客户端请求),一旦出现故障,你最多丢失1秒的数据.
  • AOF文件是一个只进行追加的日志文件,所以不需要写入seek,即使由于某些原因(磁盘空间已满,写的过程中宕机等等)未执行完整的写入命令,你也也可使用redis-check-aof工具修复这些问题.
  • Redis 可以在 AOF 文件体积变得过大时,自动地在后台对 AOF 进行重写: 重写后的新 AOF 文件包含了恢复当前数据集所需的最小命令集合。 整个重写操作是绝对安全的,因为 Redis 在创建新 AOF 文件的过程中,会继续将命令追加到现有的 AOF 文件里面,即使重写过程中发生停机,现有的 AOF 文件也不会丢失。 而一旦新 AOF 文件创建完毕,Redis 就会从旧 AOF 文件切换到新 AOF 文件,并开始对新 AOF 文件进行追加操作。
  • AOF 文件有序地保存了对数据库执行的所有写入操作, 这些写入操作以 Redis 协议的格式保存, 因此 AOF 文件的内容非常容易被人读懂, 对文件进行分析(parse)也很轻松。 导出(export) AOF 文件也非常简单: 举个例子, 如果你不小心执行了 FLUSHALL 命令, 但只要 AOF 文件未被重写, 那么只要停止服务器, 移除 AOF 文件末尾的 FLUSHALL 命令, 并重启 Redis , 就可以将数据集恢复到 FLUSHALL 执行之前的状态。
缺点
  • 对于相同的数据集来说,AOF 文件的体积通常要大于 RDB 文件的体积。
  • 根据所使用的 fsync 策略,AOF 的速度可能会慢于 RDB 。 在一般情况下, 每秒 fsync 的性能依然非常高, 而关闭 fsync 可以让 AOF 的速度和 RDB 一样快, 即使在高负荷之下也是如此。 不过在处理巨大的写入载入时,RDB 可以提供更有保证的最大延迟时间(latency)。

32、Redis 集群最大节点个数是多少?为什么redis集群的最大槽数是16384个?

Redis集群并没有使用一致性hash而是引入了哈希槽的概念。Redis 集群有16384个哈希槽,每个key通过CRC16校验后对16384取模来决定放置哪个槽,集群的每个节点负责一部分hash槽。但为什么哈希槽的数量是16384(2^14)个呢?

CRC16算法产生的hash值有16bit,该算法可以产生2^16=65536个值。

换句话说值是分布在0~65535之间。那作者在做mod运算的时候,为什么不mod65536,而选择mod16384?

https://github.com/redis/redis/issues/2576

image-20210628142941264

image-20210628142953503

正常的心跳数据包带有节点的完整配置,可以用幂等方式用旧的节点替换旧节点,以便更新旧的配置。

这意味着它们包含原始节点的插槽配置,该节点使用2k的空间和16k的插槽,但是会使用8k的空间(使用65k的插槽)。

同时,由于其他设计折衷,Redis集群不太可能扩展到1000个以上的主节点。

因此16k处于正确的范围内,以确保每个主机具有足够的插槽,最多可容纳1000个矩阵,但数量足够少,可以轻松地将插槽配置作为原始位图传播。请注意,在小型群集中,位图将难以压缩,因为当N较小时,位图将设置的slot / N位占设置位的很大百分比。

image-20210628143047089

(1)如果槽位为65536,发送心跳信息的消息头达8k,发送的心跳包过于庞大。

在消息头中最占空间的是myslots[CLUSTER_SLOTS/8]。 当槽位为65536时,这块的大小是: 65536÷8÷1024=8kb

因为每秒钟,redis节点需要发送一定数量的ping消息作为心跳包,如果槽位为65536,这个ping消息的消息头太大了,浪费带宽。

(2)redis的集群主节点数量基本不可能超过1000个。

集群节点越多,心跳包的消息体内携带的数据越多。如果节点过1000个,也会导致网络拥堵。因此redis作者不建议redis cluster节点数量超过1000个。 那么,对于节点数在1000以内的redis cluster集群,16384个槽位够用了。没有必要拓展到65536个。

(3)槽位越小,节点少的情况下,压缩比高,容易传输

Redis主节点的配置信息中它所负责的哈希槽是通过一张bitmap的形式来保存的,在传输过程中会对bitmap进行压缩,但是如果bitmap的填充率slots / N很高的话(N表示节点数),bitmap的压缩率就很低。 如果节点数很少,而哈希槽数量很多的话,bitmap的压缩率就很低。

33、MySQL 里有 2000w 数据,redis 中只存 20w 的数据,如何保证 redis 中的数据都是热点数据?

要保证Redis中存储的数据都是热点数据,可以考虑以下几种方式:

  1. 通过访问日志或监控系统等手段,对MySQL中的数据进行分析,找出访问频率较高的数据,然后将这部分数据同步到Redis中,保证Redis中的数据都是热点数据。
  2. 利用MySQL的binlog或者增量备份等功能,实现MySQL数据变化的实时同步到Redis中,这样就可以保证Redis中的数据都是最新的热点数据。
  3. 利用Redis的LRU算法,将访问频率较高的数据放在内存中,访问频率较低的数据缓存到磁盘中,这样可以保证Redis中存储的数据都是热点数据。同时,可以通过设置maxmemory和maxmemory-policy等参数来控制Redis中的缓存大小和数据淘汰策略,从而确保Redis中的数据都是热点数据。

综上所述,保证Redis中存储的数据都是热点数据,需要结合实际情况采取不同的策略,同时需要根据访问频率和数据量等因素来控制Redis中的缓存大小和数据淘汰策略,从而实现高效的数据存储和访问。

33、缓存雪崩、缓存击穿、缓存穿透

1、缓存雪崩

1、什么情况会发生雪崩
  • redis主机挂了,Redis 全盘崩溃
  • 比如缓存中有大量数据同时过期
2、雪崩解决方案
  • 构建多级缓存架构:nginx缓存 + redis缓存 +其他缓存(ehcache等)
  • redis缓存集群实现高可用 (主从+哨兵,Redis Cluster)
  • ehcache本地缓存 + Hystrix或者阿里sentinel限流&降级
  • 开启Redis持久化机制aof/rdb,尽快恢复缓存集群
  • 使用锁或队列
    • 用加锁或者队列的方式保证来保证不会有大量的线程对数据库一次性进行读写,从而避免失效时大量的并发请求落到底层存储系统上。不适用高并发情况
  • 将缓存失效时间分散开
    • 比如我们可以在原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。

image-20210628144058333

2、缓存穿透

1、是什么

​ 请求去查询一条记录,先redis后mysql发现都查询不到该条记录,但是请求每次都会打到数据库上面去,导致后台数据库压力暴增,这种现象我们称为缓存穿透,这个redis变成了一个摆设。。。。。。

​ key对应的数据在数据源并不存在,每次针对此key的请求从缓存获取不到,请求都会压到数据源,从而可能压垮数据源。比如用一个不存在的用户id获取用户信息,不论缓存还是数据库都没有,若黑客利用此漏洞进行攻击可能压垮数据库。

简单说就是本来无一物,既不在Redis缓存中,也不在数据库中

2、解决方案
方案1:空对象缓存或者缺省值

image.png
    • 黑客或者恶意攻击
黑客会对你的系统进行攻击,拿一个不存在的id 去查询数据,会产生大量的请求到数据库去查询。可能会导致你的数据库由于压力过大而宕掉

id相同打你系统:第一次打到mysql,空对象缓存后第二次就返回null了,避免mysql被攻击,不用再到数据库中去走一圈了

id不同打你系统:由于存在空对象缓存和缓存回写(看自己业务不限死),redis中的无关紧要的key也会越写越多(记得设置redis过期时间)
方案2:Google布隆过滤器Guava解决缓存穿透

Guava 中布隆过滤器的实现算是比较权威的,所以实际项目中我们不需要手动实现一个布隆过滤器

Guava’s BloomFilter 源码剖析 https://github.com/google/guava/blob/master/guava/src/com/google/common/hash/BloomFilter.java

@Test public void bloomFilter() { // 创建布隆过滤器对象 BloomFilter<Integer> filter = BloomFilter.create(Funnels.integerFunnel(), 100); // 判断指定元素是否存在 System.out.println(filter.mightContain(1)); System.out.println(filter.mightContain(2)); // 将元素添加进布隆过滤器 filter.put(1); filter.put(2); System.out.println(filter.mightContain(1)); System.out.println(filter.mightContain(2)); } public static final int _1W = 10000; //布隆过滤器里预计要插入多少数据 public static int size = 100 * _1W; //误判率,它越小误判的个数也就越少(思考,是不是可以设置的无限小,没有误判岂不更好) public static double fpp = 0.03; // 构建布隆过滤器 private static BloomFilter<Integer> bloomFilter = BloomFilter.create(Funnels.integerFunnel(), size,fpp); public static void main(String[] args) { //1 先往布隆过滤器里面插入100万的样本数据 for (int i = 0; i < size; i++) { bloomFilter.put(i); } //故意取10万个不在过滤器里的值,看看有多少个会被认为在过滤器里 List<Integer> list = new ArrayList<>(10 * _1W); for (int i = size+1; i < size + 100000; i++) { if (bloomFilter.mightContain(i)) { System.out.println(i+"\t"+"被误判了."); list.add(i); } } System.out.println("误判的数量:" + list.size()); }

现在总共有10万数据是不存在的,误判了3033次,

原始样本:100W

不存在数据:101W---110W

image-20210628144126509

image-20210628144140797
方案3:Redis布隆过滤器解决缓存穿透

Guava缺点说明:

Guava 提供的布隆过滤器的实现还是很不错的 (想要详细了解的可以看一下它的源码实现),但是它有一个重大的缺陷就是只能单机使用 ,而现在互联网一般都是分布式的场景。

为了解决这个问题,我们就需要用到 Redis 中的布隆过滤器了

案例:白名单过滤器

  • 白名单架构说明

image-20210628144155420
  • 误判问题,但是概率小可以接受,不能从布隆过滤器删除
  • 全部合法的key都需要放入过滤器+redis里面,不然数据就是返回null
 public static final int _1W = 10000; //布隆过滤器里预计要插入多少数据 public static int size = 100 * _1W; //误判率,它越小误判的个数也就越少 public static double fpp = 0.03; static RedissonClient redissonClient = null; static RBloomFilter rBloomFilter = null; static { Config config = new Config(); config.useSingleServer().setAddress("redis://192.168.111.147:6379").setDatabase(0); //构造redisson redissonClient = Redisson.create(config); //通过redisson构造rBloomFilter rBloomFilter = redissonClient.getBloomFilter("phoneListBloomFilter",new StringCodec()); rBloomFilter.tryInit(size,fpp); // 1测试 布隆过滤器有+redis有 rBloomFilter.add("10086"); redissonClient.getBucket("10086",new StringCodec()).set("chinamobile10086"); // 2测试 布隆过滤器有+redis无 //rBloomFilter.add("10087"); //3 测试 ,都没有 } public static void main(String[] args) { String phoneListById = getPhoneListById("10087"); System.out.println("------查询出来的结果: "+phoneListById); //暂停几秒钟线程 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } redissonClient.shutdown(); } private static String getPhoneListById(String IDNumber) { String result = null; if (IDNumber == null) { return null; } //1 先去布隆过滤器里面查询 if (rBloomFilter.contains(IDNumber)) { //2 布隆过滤器里有,再去redis里面查询 RBucket<String> rBucket = redissonClient.getBucket(IDNumber, new StringCodec()); result = rBucket.get(); if(result != null) { return "i come from redis: "+result; }else{ result = getPhoneListByMySQL(IDNumber); if (result == null) { return null; } // 重新将数据更新回redis redissonClient.getBucket(IDNumber,new StringCodec()).set(result); } return "i come from mysql: "+result; } return result; } private static String getPhoneListByMySQL(String IDNumber) { return "chinamobile"+IDNumber; }

image-20210628144223759

黑名单使用:

image-20210628144234748
3、安装
1、采用docker安装RedisBloom,推荐
Redis 在 4.0 之后有了插件功能(Module),可以使用外部的扩展功能, 可以使用 RedisBloom 作为 Redis 布隆过滤器插件。 docker run -p 6379:6379 --name=redis6379bloom -d redislabs/rebloom docker exec -it redis6379bloom /bin/bash redis-cli bf.reserve filter 0.01 100 bf.add filter v11 bf.exists filter v11 bf.exists filter v12

image-20210628144304368
2、编译安装
# 下载 编译 安装Rebloom插件 wget https://github.com/RedisLabsModules/rebloom/archive/v2.2.2.tar.gz # 解压 tar -zxvf v2.2.2.tar.gz cd RedisBloom-2.2.2 # 若是第一次使用 需要安装gcc++环境 make # redis服启动添加对应参数 这样写还是挺麻烦的 # rebloom_module="/usr/local/rebloom/rebloom.so" # daemon --user ${REDIS_USER-redis} "$exec $REDIS_CONFIG --loadmodule # $rebloom_module --daemonize yes --pidfile $pidfile" # 记录当前位置 pwd # 进入reids目录 配置在redis.conf中 更加方便 vim redis.conf # :/loadmodule redisbloom.so是刚才具体的pwd位置 cv一下 loadmodule /xxx/redis/redis-5.0.8/RedisBloom-2.2.2/redisbloom.so # 保存退出 wq # 重新启动redis-server 我是在redis中 操作的 若不在请写出具体位置 不然会报错 redis-server redis.conf # 连接容器中的 redis 服务 若是无密码 redis-cli即可 redis-cli -a 密码 # 进入可以使用BF.ADD命令算成功

3、缓存击穿

1、是什么
大量的请求同时查询一个 key 时,此时这个key正好失效了,就会导致大量的请求都打到数据库上面去

简单说就是热点key突然失效了,暴打mysql

危害:会造成某一时刻数据库请求量过大,压力剧增。

2、解决方案
  1. 缓存击穿 - 热点key失效 - 互斥更新、随机退避、差异失效时间
  2. 对于访问频繁的热点key,干脆就不设置过期时间
  3. 互斥独占锁防止击穿

image-20210628144326371

4、实际案例

高并发的淘宝聚划算案例落地

image.png

image-20210628144342058
@Autowired private RedisTemplate redisTemplate; @PostConstruct public void initJHS(){ log.info("启动定时器淘宝聚划算功能模拟.........."+DateUtil.now()); new Thread(() -> { //模拟定时器,定时把数据库的特价商品,刷新到redis中 while (true){ //模拟从数据库读取100件特价商品,用于加载到聚划算的页面中 List<Product> list=this.products(); //采用redis list数据结构的lpush来实现存储 this.redisTemplate.delete(Constants.JHS_KEY); //lpush命令 this.redisTemplate.opsForList().leftPushAll(Constants.JHS_KEY,list); //间隔一分钟 执行一遍 try { TimeUnit.MINUTES.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } log.info("runJhs定时刷新.............."); } },"t1").start(); } /** * 模拟从数据库读取100件特价商品,用于加载到聚划算的页面中 */ public List<Product> products() { List<Product> list=new ArrayList<>(); for (int i = 1; i <=20; i++) { Random rand = new Random(); int id= rand.nextInt(10000); Product obj=new Product((long) id,"product"+i,i,"detail"); list.add(obj); } return list; } @Autowired private RedisTemplate redisTemplate; /** * 分页查询:在高并发的情况下,只能走redis查询,走db的话必定会把db打垮 * http://localhost:5555/swagger-ui.html#/jhs-product-controller/findUsingGET */ @RequestMapping(value = "/pruduct/find",method = RequestMethod.GET) @ApiOperation("按照分页和每页显示容量,点击查看") public List<Product> find(int page, int size) { List<Product> list=null; long start = (page - 1) * size; long end = start + size - 1; try { //采用redis list数据结构的lrange命令实现分页查询 list = this.redisTemplate.opsForList().range(Constants.JHS_KEY, start, end); if (CollectionUtils.isEmpty(list)) { //TODO 走DB查询 } log.info("查询结果:{}", list); } catch (Exception ex) { //这里的异常,一般是redis瘫痪 ,或 redis网络timeout log.error("exception:", ex); //TODO 走DB查询 } return list; }

image-20210628144400350

image-20210628144408895

image-20210628144417822

image-20210628144427183

image-20210628144439950

34、redis的key存储多少数据可以称为大key

在Redis中,大key是指存储的数据量非常大的key,这会导致Redis在进行内存回收或数据持久化时,占用较多的内存或磁盘资源,从而影响Redis的性能和稳定性。虽然没有明确的标准来定义大key,但是一般认为,如果一个key存储的数据量超过10KB,就可以称之为大key。

需要注意的是,大key不仅会影响Redis的性能和稳定性,还会导致Redis集群中的数据分布不均,从而影响集群的负载均衡和可用性。因此,在使用Redis时,应该避免存储过大的数据量到单个key中,尽量采用拆分、分片等方式来分散数据,保证Redis的高效和稳定。

阿里的规范:https://developer.aliyun.com/article/531067

35、知道 redis 的持久化吗?底层如何实现的?有什么优点缺点?

​ RDB(Redis DataBase:在不同的时间点将 redis 的数据生成的快照同步到磁盘等介质上):内存到硬盘的快照,定期更新。缺点:耗时,耗性能(fork+io 操作),易丢失数据。

​ AOF(Append Only File:将 redis 所执行过的所有指令都记录下来,在下次 redis 重启时,只需要执行指令就可以了):写日志。缺点:体积大,恢复速度慢

​ bgsave 做镜像全量持久化,aof 做增量持久化。因为 bgsave 会消耗比较长的时间,不够实时,在停机的时候会导致大量的数据丢失,需要 aof 来配合,在 redis 实例重启时,优先使用 aof 来恢复内存的状态,如果没有 aof 日志,就会使用 rdb 文件来恢复。Redis 会定期做aof 重写,压缩 aof 文件日志大小。Redis4.0 之后有了混合持久化的功能,将 bgsave 的全量和 aof 的增量做了融合处理,这样既保证了恢复的效率又兼顾了数据的安全性。bgsave 的原理,fork 和 cow, fork 是指 redis 通过创建子进程来进行 bgsave 操作,cow 指的是 copy onwrite,子进程创建后,父子进程共享数据段,父进程继续提供读写服务,写脏的页面数据会逐渐和子进程分离开来。

36、大量数据写入redis,如何快速处理

Redis是一个内存数据库,写入速度非常快,可以支持每秒数十万次的写入操作。以下是一些在大量数据写入Redis时可以使用的技巧和优化:

1.使用pipeline批量写入:Redis支持通过pipeline方式进行批量写入,将多个命令打包到一起发送可以减少网络延迟和CPU消耗,从而提高写入性能。

2.使用Redis集群: Redis集群可以将数据分散到不同的节点上,从而提高写入性能和容量。同时,集群可以自动重定向请求到正确的分片上,提高可用性和负载均衡。

3.使用Redis事务:Redis事务可以将多个命令打包成原子性的操作,从而避免并发写入冲突和数据异常。

4.使用Redis持久化:Redis可以通过持久化方式,将内存中的数据异步或者同步地保存到磁盘上,保证数据的可靠性和持久性。

5.合理设置Redis参数:Redis有很多配置参数可以根据实际情况进行调整,例如最大内存限制、并发连接数、超时时间等等。

6.使用Redis缓存:如果写入操作不是那么紧急,可以考虑使用Redis做缓存,在缓存中进行写入操作,然后再批量更新到数据库中。

7.避免过期键:Redis中的过期键会占用内存,当过期键过多时可能会影响性能,应该根据实际情况合理设置过期时间或者使用惰性删除方式。

8.使用Redis Lua脚本:可以将多个操作打包到Lua脚本中运行,从而减少网络开销和命令调用次数。

总之,在处理大量数据写入Redis时,需要结合具体场景进行优化和调整,以提高性能和可靠性。

Read more

手写一个C++ TCP服务器实现自定义协议(顺便解决粘包问题)

手写一个C++ TCP服务器实现自定义协议(顺便解决粘包问题)

在之前的博客中,我们了解了关于UDP和TCP的网络编程,直观的感受了一下网络套接字是如何使用的,并且成功的完成了客户端与服务端的网络通信,但是其中还有一个小细节我们可能会忽略,就是UDP是基于数据报进行传输的,一下子就将所有我们要发送的信息传送给对方,但是我们的TCP可是基于字节流进行传输的,我们如何保证读取上来的数据,是一个完整的报文呢? 我们在进行TCP网络通信的时候,通过调用connec函数调用,使客户端可以和服务端保持链接之后,客户端将自己想要发送的数据通过write系统调用写进对应的socket函数调用给我们返回的文件描述符所对应的文件中。 现在有一个问题就是我们向文件中写入的时候,直接将其放入即可,但是想要往出拿的时候就有点困难了,想要往出拿的人如果不知道放的人是如何放的,就会造成一系列的错误,这就好比放数据时先放了一个整形,又放了一个浮点数,还放了一个字符串,然而拿的人按照字符串,整形,浮点数这样的方式进行获取,这就会导致数据不一致的现象,所以一旦我们要发送一些带有结构化的数据时,就必须再次制定——协议,这样才能满足我们想要返送一些结构化数据的需求。 TCP是传输控

By Ne0inhk
yolo26n-pose在lsp姿势估计数据集的训练预测流程(python/c++)

yolo26n-pose在lsp姿势估计数据集的训练预测流程(python/c++)

1.模型训练测试(python) 打开https://docs.ultralytics.com/zh/tasks/pose/#models,选择模型下载,这里选择yolo26n-pose.pt,下载后放到你创建的项目的model文件夹 前期python环境配置可参考《yolo11m端侧实验》 如果需要数据集,可以自行网上找找,或者可以通过网盘分享的文件:lsp.zip 链接: https://pan.baidu.com/s/1D_XUBfj0j_aXql0WE61G6g?pwd=cwkr 提取码: cwkr 若有兴趣知道用代码获取该数据集的方法,可以通过网盘分享的文件:LSPPoseCode.zip 链接: https://pan.baidu.com/s/1u0l_SADo_JrN-1oj11OUkQ?pwd=znph

By Ne0inhk
Rust赋能Android蓝牙协议栈:从C++到安全高效的重构之路

Rust赋能Android蓝牙协议栈:从C++到安全高效的重构之路

在移动设备生态中,蓝牙协议栈是连接物理世界与数字世界的关键桥梁,从无线耳机、智能手环到车载系统,其稳定性、安全性与效率直接决定用户体验。长期以来,Android蓝牙协议栈核心模块基于C++开发,凭借接近硬件的性能优势支撑了数十亿设备的运行。但随着物联网设备爆发式增长、蓝牙5.3/5.4等新协议落地,C++固有的内存安全缺陷与并发管理难题愈发凸显。2021年起,Google开始在Android蓝牙协议栈中引入Rust重构核心模块,这一技术选型并非偶然,而是工程实践中安全与效率平衡的必然结果。 目录 一、Android蓝牙协议栈的C++之困 1.1 内存安全漏洞:蓝牙模块的阿喀琉斯之踵 1.2 并发管理复杂:多设备连接下的稳定性难题 1.3 代码可维护性下降:遗产代码的演进瓶颈 二、Rust:破解困局的关键特性赋能 2.1 所有权模型 2.2 并发安全:无数据竞争的天生优势 2.3 零成本抽象与可维护性:

By Ne0inhk