环境:SpringBoot2.7.12

本篇文章将会为大家介绍有关spring integration提供的分布式锁功能。

1. 简介

Spring Integration是一个框架,用于构建事件驱动的应用程序。在 Spring Integration 中,LockRegistry是一个接口,用于管理分布式锁。分布式锁是一种同步机制,用于确保在分布式系统中的多个节点之间对共享资源的互斥访问。

LockRegistry及相关子接口(如:RenewableLockRegistry)接口的主要功能:

获取锁:当应用程序需要访问共享资源时,它可以通过LockRegistry获取一个锁。释放锁:当应用程序完成对共享资源的访问后,它应该释放锁,以便其他应用程序可以获取它(第一点中提到,并没有提供直接释放锁的操作,而是内部自动完成)。续期:提供续期机制,以便在需要时延长锁的持有时间。

常见的LockRegistry实现包括基于数据库、ZooKeeper 和 Redis 的实现。

公共依赖

<dependency>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-integration</artifactId></dependency>

2.基于数据库分布式锁

引入依赖

<dependency>  <groupId>org.springframework.integration</groupId>  <artifactId>spring-integration-jdbc</artifactId></dependency><dependency>  <groupId>com.zaxxer</groupId>  <artifactId>HikariCP</artifactId>  <scope>compile</scope></dependency><dependency>  <groupId>mysql</groupId>  <artifactId>mysql-connector-java</artifactId>  <version>8.0.30</version></dependency>

配置

spring:  datasource:    driverClassName: com.mysql.cj.jdbc.Driver    url: jdbc:mysql://localhost:3306/spring_lock?serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true&useSSL=false    username: root    password: xxxooo    type: com.zaxxer.hikari.HikariDataSource    hikari:      minimumIdle: 10      maximumPoolSize: 200---spring:  integration:    jdbc:      initialize-schema: always      # 基于数据库需要执行初始化脚本      schema: classpath:schema-mysql.sql

注册核心Bean对象

@Beanpublic DefaultLockRepository defaultLockRepository(DataSource dataSource) {  DefaultLockRepository lockRepository = new DefaultLockRepository(dataSource);  // 这里根据你的业务需要,配置表前缀,默认:IN_  lockRepository.setPrefix("T_") ;  return lockRepository ;}// 注册基于数据库的分布式锁@Beanpublic JdbcLockRegistry jdbcLockRegistry(DefaultLockRepository lockRepository) {  return new JdbcLockRegistry(lockRepository) ;}

测试用例

@Testpublic void testLock() throws Exception  int len = 10 ;  CountDownLatch cdl = new CountDownLatch(len) ;  CountDownLatch waiter = new CountDownLatch(len) ;  Thread[] ts = new Thread[len] ;  for (int i = 0; i < len; i++) {    ts[i] = new Thread(() -> {      waiter.countDown() ;      System.out.println(Thread.currentThread().getName() + " - 准备获取锁") ;      try {        waiter.await() ;      } catch (InterruptedException e1) {        e1.printStackTrace();      }      // 获取锁      Lock lock = registry.obtain("drug_store_key_001") ;      lock.lock() ;      System.out.println(Thread.currentThread().getName() + " - 获取锁成功") ;      try {        try {          TimeUnit.SECONDS.sleep(2) ;        } catch (InterruptedException e) {          e.printStackTrace();        }      } finally {        // 释放锁        lock.unlock() ;        cdl.countDown() ;        System.out.println(Thread.currentThread().getName() + " - 锁释放成功") ;      }    }, "T - " + i) ;  }  for (int i = 0; i < len; i++) {    ts[i].start() ;   }  cdl.await() ;}

数据库

图片

锁的实现JdbcLock,该对象实现了java.util.concurrent.locks.Lock,所以该锁是支持重入等操作的。

配置锁获取失败后的重试间隔,默认值100ms

JdbcLockRegistry jdbcLockRegistry = new JdbcLockRegistry(lockRepository);// 定义锁对象时设置当获取锁失败后重试间隔时间。jdbcLockRegistry.setIdleBetweenTries(Duration.ofMillis(200)) ;

锁续期

jdbcLockRegistry.renewLock("drug_store_key_001");

3. 基于Redis分布式锁

引入依赖

<dependency>  <groupId>org.springframework.integration</groupId>  <artifactId>spring-integration-jdbc</artifactId></dependency><dependency>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-data-redis</artifactId></dependency>

配置

spring:  redis:    host: localhost    port: 6379    password: xxxooo    database: 8    lettuce:      pool:        maxActive: 8        maxIdle: 100        minIdle: 10        maxWait: -1

测试用例

测试代码与上面基于JDBC的一样,只需要修改调用加锁的代码即可

Lock lock = redisLockRegistry.obtain("001") ;

设置锁的有效期,默认是60s

// 第三个参数设置了key的有效期,这里改成10sRedisLockRegistry redisLockRegistry = new RedisLockRegistry(connectionFactory, registryKey, 10000) ;

注意:redis key的有效期设置为10s,如果你的业务执行超过了10s,那么程序将会报错。并没有redission watch dog机制。

Exception in thread "T - 0" java.lang.IllegalStateException: Lock was released in the store due to expiration. The integrity of data protected by this lock may have been compromised.  at org.springframework.integration.redis.util.RedisLockRegistry$RedisLock.unlock(RedisLockRegistry.java:450)  at com.pack.SpringIntegrationDemoApplicationTests.lambda$1(SpringIntegrationDemoApplicationTests.java:83)  at java.lang.Thread.run(Thread.java:748)

如果10s过期后key自动删除后,其它线程是否能立马获取到锁呢?如果是单节点中其它现在也不能获取锁,必须等上一个线程结束后才可以,这是因为在内部还维护了一个ReentrantLock锁,在获取分布式锁前要先获取本地的一个锁。

private abstract class RedisLock implements Lock {  private final ReentrantLock localLock = new ReentrantLock();  public final void lock() {      this.localLock.lock();      while (true) {        try {          if (tryRedisLock(-1L)) {            return;          }        } catch (InterruptedException e) {        } catch (Exception e) {          this.localLock.unlock();          rethrowAsLockException(e);        }      }    }}

注意:不管是基于数据库还是Redis都要先获取本地的锁

Spring Cloud Task就使用到了Spring Integration中的锁基于数据库的。

总结:Spring Integration 的分布式锁为开发者提供了一种在分布式系统中实现可靠同步的有效方法。通过合理选择和使用这些锁实现,可以确保对共享资源的访问在多个节点之间保持协调一致,从而提高系统的整体可靠性和性能。

完毕!!!