## 并发控制 在多线程和分布式系统中,确保数据一致性是至关重要的。为了防止多个线程或进程同时修改同一份数据,可以采用不同的并发控制策略,主要包括乐观锁、悲观锁、Java的同步机制以及分布式锁等方式。 ### 一、synchronized synchronized是Java提供的一种同步机制,它可以保证同一时间只有一个线程可以访问某个对象或方法。当一个线程访问一个对象的synchronized方法或代码块时,其他线程无法访问该对象,直到该线程访问结束。 ```java public class Counter { private int count = 0; // 共享资源 // synchronized 方法,保证线程安全地递增计数 public synchronized void increment() { count++; // 对共享资源的操作 } // synchronized 方法,保证线程安全地递减计数 public synchronized void decrement() { count--; } // 获取当前计数,也应该是 synchronized,以保证线程安全 public synchronized int getCount() { return count; } // 测试 Counter 类 public static void main(String[] args) { Counter counter = new Counter(); // 创建线程来递增计数 for (int i = 0; i < 5; i++) { new Thread(() -> { for (int j = 0; j < 1000; j++) { counter.increment(); } }).start(); } // 创建线程来递减计数 for (int i = 0; i < 5; i++) { new Thread(() -> { for (int j = 0; j < 1000; j++) { counter.decrement(); } }).start(); } // 等待所有线程完成 while (Thread.activeCount() > 2) { Thread.yield(); } // 输出最终的计数 System.out.println("Final count is " + counter.getCount()); } } ``` ### 二、 Lock接口:JUC(Java.util.concurrent) Java 的 `java.util.concurrent.locks` 包提供了比 `synchronized` 关键字更复杂的同步机制。`Lock` 接口是这个包中的核心,它允许更细粒度的锁定操作,与 `synchronized` 相比,提供了一些额外的特性,比如尝试非阻塞获取锁、可中断的锁获取、超时以及能够实现多个条件对象等。 `Lock` 接口的基本使用方法: 1. **获取 Lock** - 调用 `lock()` 方法获取锁。 2. **释放 Lock** - 调用 `unlock()` 方法释放锁。 3. **尝试获取 Lock** - 调用 `tryLock()` 方法尝试获取锁,如果锁不可用,则可以选择性地等待或立即返回。 4. **尝试获取 Lock 并指定等待时间** - 调用 `tryLock(long time, TimeUnit unit)` 方法尝试获取锁,并指定最长的等待时间。 5. **可中断地获取 Lock** - 在尝试获取锁的过程中,线程的中断状态会被检查,如果线程被中断,则 `tryLock()` 方法会抛出 `InterruptedException`。 下面是一个简单的示例,演示了如何使用 `ReentrantLock` 类(实现了 `Lock` 接口)来同步访问共享资源: ```java import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class Counter { private int count = 0; private final Lock lock = new ReentrantLock(); public void increment() { lock.lock(); // 获取锁 try { count++; // 对共享资源的安全访问 } finally { lock.unlock(); // 释放锁 } } public void decrement() { lock.lock(); try { count--; // 对共享资源的安全访问 } finally { lock.unlock(); } } public int getCount() { lock.lock(); try { return count; // 对共享资源的安全访问 } finally { lock.unlock(); } } public static void main(String[] args) { //测试, 和前面的示例类似 } } ``` 在这个 `Counter` 类中,我们使用 `ReentrantLock` 来代替 `synchronized` 方法。每次访问 `increment`、`decrement` 或 `getCount` 方法时,我们首先获取锁,然后在 `finally` 块中确保释放锁,即使在访问共享资源时发生异常,锁仍然会被释放。 使用 `Lock` 接口可以提供比 `synchronized` 更灵活的线程同步,但同时也需要编写更多的代码来确保锁的正确获取和释放。 ### 三、分布式锁 分布式锁是分布式系统中用于确保多个进程或多个节点在任何时候对共享资源的访问是互斥的机制。在分布式系统中,不同的进程可能在不同的服务器或不同的地理位置上运行,因此传统的锁机制(如单机数据库锁、文件锁等)不再适用。分布式锁提供了一种方法,使得这些分布式的进程能够协调对共享资源的访问。 **分布式锁的关键特性包括:** 1. **互斥性**:确保任何时刻只能有一个进程持有锁,防止并发操作导致的数据不一致。 2. **可靠性**:即使在分布式系统部分节点故障的情况下,锁的管理机制仍然能够正确工作。 3. **高性能**:尽量减小获取和释放锁的开销,不影响系统整体性能。 4. **公平性**:按照请求锁的顺序分配锁,避免饥饿现象。 5. **可重入性**:同一个进程可以在未释放锁的情况下再次获得该锁。 6. **锁超时**:防止因某些原因导致持有锁的进程未能及时释放锁,引发死锁。 **实现分布式锁的一些常见方法包括:** 1. **基于Redis的分布式锁**:利用Redis的原子操作(如`SETNX`、`GETSET`命令或Lua脚本)实现锁的获取和释放。 2. **ZooKeeper**:利用ZooKeeper的临时节点和节点监听机制来实现分布式锁。 3. **数据库乐观锁/悲观锁**:在分布式数据库中利用乐观锁版本号或悲观锁机制控制并发访问。 4. **分布式协调服务**:如Etcd,通过其键值存储和TTL特性实现分布式锁。 5. **基于消息队列的锁**:利用消息队列的FIFO特性间接实现锁机制。 6. **Redisson**等第三方库:提供封装好的分布式锁实现,简化分布式锁的使用。 **分布式锁的应用场景包括:** 1. **数据库并发控制**:在分布式数据库中,防止多节点同时修改同一条数据导致的数据不一致。 2. **缓存一致性**:确保在分布式缓存系统中,数据更新操作的原子性和一致性。 3. **任务调度**:防止分布式任务调度系统中,相同任务被多个节点同时执行。 4. **资源限制**:限制对有限资源的并发访问,如文件系统、数据库连接池等。 5. **微服务架构**:在微服务之间协调资源访问,避免服务之间的数据竞争和冲突。 6. **幂等操作控制**:确保重复的请求不会多次执行同一操作,如支付、订单创建等关键业务场景。 使用分布式锁时,需要仔细设计以避免常见的问题,如死锁、锁的争用、性能瓶颈等。正确实现的分布式锁对于维护分布式系统的稳定性和数据一致性至关重要。 ### 四、Redisson Redisson 是一个开源的Java库,它为Redis客户端提供了丰富的数据结构和分布式服务,包括分布式锁、信号量、计数器、消息队列、分布式集合等多种分布式解决方案。Redisson通过高级抽象简化了分布式编程的复杂性,使得开发者能够更加专注于业务逻辑,而不是复杂的并发控制和低级通信协议。 #### Redisson的特点: - **高性能**: Redisson利用Redis的特性实现了高效的并发控制和数据结构操作。 - **全面的分布式服务**: 提供了分布式锁、信号量、闭锁、原子计数器等多种并发控制工具。 - **易用性**: 提供了简单直观的API,使得开发人员能够快速上手并构建分布式应用。 - **容错性**: 支持Redis Sentinel和Redis Cluster,增强了系统的高可用性。 - **灵活的配置**: 支持多种Redis连接方式,包括单机、主从、哨兵、集群模式等。 #### 使用Redisson实现分布式锁的例子 在项目中引入Redisson的依赖: gradle中引入redisson的配置 ```groovy compile group: 'org.redisson', name: 'redisson', version: '3.13.6' ``` 使用Redisson实现分布式锁的简单示例: ```java import org.redisson.Redisson; import org.redisson.api.RLock; import org.redisson.config.Config; public class RedissonDistributedLockExample { public static void main(String[] args) { // 配置Redisson客户端 Config config = new Config(); config.useSingleServer().setAddress("redis://localhost:6379"); // 单机模式配置,根据实际情况调整 // 创建Redisson客户端实例 RedissonClient redisson = Redisson.create(config); RLock lock = redisson.getLock("myLock"); try { // 尝试获取锁,如果无法立即获取则等待最多10秒钟 if (lock.tryLock(10, TimeUnit.SECONDS)) { System.out.println("Lock acquired, performing critical section..."); // 在这里执行临界区代码 Thread.sleep(2000); // 模拟处理耗时操作 } else { System.out.println("Unable to acquire lock within the given time."); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); System.err.println("Thread interrupted while waiting for lock."); } finally { // 释放锁 if (lock.isHeldByCurrentThread()) { lock.unlock(); System.out.println("Lock released."); } } // 关闭Redisson客户端 redisson.shutdown(); } } ``` 在这个例子中,创建了一个Redisson客户端,并通过它获取了一个名为`myLock`的分布式锁。尝试以非阻塞方式获取锁,如果10秒内无法获取到锁,则认为获取失败。在持有锁的期间执行临界区代码,完成后通过`unlock()`方法释放锁。Redisson自动处理了锁的过期和释放问题,提高了使用的便利性和安全性。 ### 五、乐观锁 乐观锁是一种并发控制的方法,用于在多线程或分布式系统中管理对共享资源的访问。它基于这样的假设:多个事务在同时进行时,它们之间的冲突是相对少见的。乐观锁通常适用于写冲突不频繁,读操作远多于写操作的场景。 #### 核心特征: 1. **非阻塞读取**:读取数据时不会阻塞其他线程,提升读操作的并发性能。 2. **并发冲突检测**:在数据更新阶段,通过比较数据版本号或时间戳判断是否有并发修改。 3. **冲突解决**:发现并发冲突时,可能通过重试、合并变更或放弃操作等方式处理。 4. **适用场景**:适用于并发写操作较少,读操作占主导的应用环境。 #### 实现方案: 许多数据库管理系统支持乐观锁。 ##### 数据库版本号(Version) 这是最常见的乐观锁实现方式。在数据库表中增加一个额外的列,如`version`,每当数据被修改时,该版本号就会递增。 **实现步骤**: - **表结构**:在表中加入一个版本字段,如`version INT DEFAULT 0`。 - **读取数据**:查询时,除了需要的数据外,也读取版本号。 - **更新数据**:更新时,除了设置新值,还需在`WHERE`子句中加入版本号的条件,如`UPDATE table SET ..., version = version + 1 WHERE id = ? AND version = ?`。如果更新影响的行数为0,说明版本号不匹配,即数据已被其他事务修改。 ##### 时间戳(Timestamp) 与版本号类似,使用时间戳作为乐观锁的依据。每次更新时,除了更新数据外,还会更新数据的最后修改时间戳。 **实现步骤**: - **表结构**:在表中加入一个时间戳字段,如`last_modified TIMESTAMP DEFAULT CURRENT_TIMESTAMP`。 - **读取数据**:查询时,一并读取时间戳。 - **更新数据**:更新时,同样在`WHERE`子句中加入时间戳的条件,如`UPDATE table SET ..., last_modified = NOW() WHERE id = ? AND last_modified = ?`。如果时间戳不匹配,则更新失败。 ##### 自定义版本字段 在某些特定场景下,可以根据业务需求自定义一个逻辑上的版本字段,如订单状态版本、商品库存版本等,原理与上述两种方式相似。 ##### 应用层实现 乐观锁也可以完全在应用层实现,不依赖于数据库的特殊字段。通过在数据对象中维护一个版本字段,在更新前比对应用层缓存的版本与数据库中的版本是否一致。 **注意**:应用层实现需要注意并发控制,确保版本比较和数据更新操作的原子性。 ##### 分布式环境下 在分布式系统中,乐观锁的实现往往需要借助分布式锁或者分布式缓存服务(如Redis)来统一管理版本信息,确保不同节点间的数据一致性。 #### 应用场景: - **多用户编辑**:如文档协同编辑,每个编辑动作携带文档当前版本号,提交时检查版本是否过期。 - **库存管理**:电商系统中扣减库存时,先读取商品的当前库存及版本号,然后校验无误后更新库存。 - **金融交易**:处理转账等敏感操作,通过版本控制确保交易的序列化和一致性。 #### 乐观锁的优点: - **减少锁的开销**:由于乐观锁不需要在每个操作时都进行锁定,因此减少了锁的开销,提高了系统的吞吐量。 - **提高并发性**:允许多个用户同时读取数据,只有在数据修改时才进行冲突检查,从而提高了系统的并发性。 - **无死锁**:由于没有使用锁,因此不会出现死锁的情况。 #### 乐观锁的缺点: - **ABA问题**:如果一个数据项被读取,然后被更新并恢复到原始状态(A-B-A),乐观锁可能无法检测到这种变化,导致问题。 - **重试开销**:在高冲突环境中,如果更新经常被其他事务干扰,可能需要多次重试,增加了处理开销。 - **实现复杂性**:需要手动管理版本号和冲突检测逻辑,增加了实现的复杂性。 #### 使用数据库乐观锁的例子 乐观锁的一个典型实现方式是通过在数据库表中增加一个版本号字段(version),并在更新数据时检查版本号是否发生变化。以下是一个基于Java和Spring Data JPA的乐观锁示例,假设我们有一个简单的`Product`实体类,用于表示商品信息: ##### 实体类定义(Product.java) ```java @Entity public class Product { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; private String name; private Integer stock; // 版本号字段,用于乐观锁 @Version private Long version; // Getter and Setter省略... } ``` ##### Repository 接口定义(ProductRepository.java) ```java import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.stereotype.Repository; @Repository public interface ProductRepository extends JpaRepository { } ``` ##### Service 层使用乐观锁更新商品库存(ProductService.java) ```java @Service public class ProductService { @Autowired private ProductRepository productRepository; /** * 使用乐观锁减少商品库存 * @param productId 商品ID * @param quantity 减少的数量 * @return 是否更新成功 */ @Transactional public boolean reduceStock(Long productId, Integer quantity) { Product product = productRepository.findById(productId).orElseThrow(() -> new RuntimeException("商品不存在")); // 检查库存是否足够 if (product.getStock() < quantity) { return false; } // 尝试更新库存,这里JPA会自动处理乐观锁逻辑 try { product.setStock(product.getStock() - quantity); productRepository.save(product); return true; } catch (Exception e) { // 如果更新失败,通常是由于版本冲突(即乐观锁检查失败) return false; } } } ``` 在这个示例中,当调用`reduceStock`方法尝试减少商品库存时,如果在读取商品信息后、更新库存前,有其他事务修改了商品信息(包括库存或版本号),那么在执行`save`操作时,因为版本号不匹配(即乐观锁检查失败),JPA会抛出异常,从而导致事务回滚。这种方法有效地避免了并发更新导致的数据不一致问题,而不需要在整个读取到更新的过程中持有锁,提高了系统的并发性能。 ### 六、悲观锁 悲观锁(Pessimistic Locking)是一种在并发控制中常用的锁机制,它通过阻止其他事务访问共享资源来防止并发冲突。 悲观锁机制下,事务在进行数据操作前便假设最坏的情况——即并发冲突必然会发生,因此它会在数据被读取或修改前就加锁,确保在整个操作过程中其他事务无法访问此数据,以此来防止并发冲突。 #### 悲观锁的工作原理: 1. **锁定资源**:在事务开始时,先对需要操作的资源进行加锁,通常是排他锁(Exclusive Lock),这意味着其他事务不能同时对同一资源进行修改。 2. **持有锁**:事务在持有锁的期间可以安全地读取和修改资源。 3. **释放锁**:事务完成后,无论是提交还是回滚,都需要释放持有的锁,以便其他事务可以访问该资源。 #### 悲观锁的优点: - **简单直观**:锁的机制容易理解,实现起来也相对简单。 - **避免冲突**:通过锁定资源,可以确保在事务期间数据的一致性和完整性。 #### 悲观锁的缺点: - **性能影响**:由于需要等待锁的释放,可能会造成其他事务的阻塞,影响系统的响应速度和吞吐量。 - **死锁风险**:如果多个事务相互等待对方持有的锁,可能会导致死锁。 - **资源利用率低**:在高并发环境下,锁的争用可能导致资源的利用率降低。 #### 悲观锁的应用场景: - **写操作频繁**:当系统中的写操作远多于读操作时,悲观锁可以有效地避免冲突。 - **数据一致性要求高**:对数据一致性要求极高的场景,悲观锁可以提供强一致性保证。 #### 悲观锁的实现方式: - **数据库层面**:通过SQL的SELECT ... FOR UPDATE语句或SELECT ... LOCK IN SHARE MODE(共享锁,适用于只读操作)来实现悲观锁。这种方式依赖于数据库的锁机制。 - **编程语言层面**:在Java中,可以使用synchronized关键字或java.util.concurrent.locks包下的锁类(如ReentrantLock),但这些通常用于单机多线程环境,要实现分布式环境下的悲观锁,则需要借助于分布式锁服务。 #### spring boot中使用悲观锁 以下是一个基于Java和Spring Data JPA的悲观锁示例,假设我们有一个简单的`Product`实体类,用于表示商品信息。在这个示例中,我们使用JPA的`@Lock`注解来实现悲观锁,具体锁定策略可以通过`LockModeType`枚举来指定。 ##### 实体类定义(Product.java) ```java @Entity public class Product { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; private String name; private Integer stock; // Getter and Setter省略... } ``` ##### Repository 接口定义(ProductRepository.java) ```java @Repository public interface ProductRepository extends JpaRepository { @Lock(LockModeType.PESSIMISTIC_WRITE) Product findById(Long id); } ``` @Lock(LockModeType.PESSIMISTIC_WRITE) 表示使用悲观锁的写锁(PESSIMISTIC_WRITE),即在事务开始时对资源进行加锁,直到事务结束才释放。 ##### Service 层使用悲观锁更新商品库存(ProductService.java) ```java @Service public class ProductService { @Autowired private ProductRepository productRepository; /** * 使用悲观锁减少商品库存 * @param productId 商品ID * @param quantity 减少的数量 */ @Transactional public void reduceStock(Long productId, Integer quantity) { Product product = productRepository.findOneById(productId); // 检查库存是否足够 if (product.getStock() < quantity) { throw new IllegalArgumentException("库存不足"); } // 更新库存 product.setStock(product.getStock() - quantity); productRepository.save(product); } } ``` 在这个示例中,我们在`ProductRepository`接口的`findById`方法上添加了`@Lock(LockModeType.PESSIMISTIC_WRITE)`注解。这意味着当调用这个方法时,JPA会立即在数据库中对指定的商品记录加写锁,阻止其他事务对同一商品记录的读写操作,直到当前事务结束。这样,在`reduceStock`方法内部,我们可以安全地检查和更新商品库存,而不必担心并发冲突导致的数据不一致问题。需要注意的是,悲观锁策略会显著增加锁的竞争,可能导致其他事务的阻塞和性能下降,因此在并发冲突不是特别频繁的场景下,可能更倾向于使用乐观锁策略。 总的来说,悲观锁是一种有效的并发控制手段,尤其适用于冲突可能性较高且对数据一致性要求较高的场景。然而,它也可能带来性能上的开销和死锁的风险,因此在实际应用中需要仔细权衡。