一、CAS 无锁算法
public class ReentrantSpinLock {
private AtomicReference<Thread> cas = new AtomicReference<Thread>();
private int count;
public void lock() {
Thread current = Thread.currentThread();
if (current == cas.get()) { // 如果当前线程已经获取到了锁,线程数增加一,然后返回
count++;
return;
}
// 如果没获取到锁,则通过CAS自旋
while (!cas.compareAndSet(null, current)) {
// DO nothing
}
}
public void unlock() {
Thread cur = Thread.currentThread();
if (cur == cas.get()) {
if (count > 0) {// 如果大于0,表示当前线程多次获取了该锁,释放锁通过count减一来模拟
count--;
} else {// 如果count==0,可以将锁释放,这样就能保证获取锁的次数与释放锁的次数是一致的了。
cas.compareAndSet(cur, null);
}
}
}
}
TicketLock 解决公平性的问题。
public class TicketLockV2 {
/**
* 服务号
*/
private AtomicInteger serviceNum = new AtomicInteger();
/**
* 排队号
*/
private AtomicInteger ticketNum = new AtomicInteger();
/**
* 新增一个ThreadLocal,用于存储每个线程的排队号
*/
private ThreadLocal<Integer> ticketNumHolder = new ThreadLocal<Integer>();
public void lock() {
int currentTicketNum = ticketNum.getAndIncrement();
ticketNumHolder.set(currentTicketNum);
System.out.println("LOCK " + Thread.currentThread().getName() + " currentTicketNum:" + currentTicketNum + " serviceNum:" + serviceNum.get());
while (currentTicketNum != serviceNum.get()) {
Thread.onSpinWait(); // 添加自旋优化
}
}
public void unlock() {
Integer currentTickNum = ticketNumHolder.get();
if (currentTickNum == null) {
throw new IllegalMonitorStateException("应该先获取锁再释放锁");
}
if (!serviceNum.compareAndSet(currentTickNum, currentTickNum + 1)) {
throw new IllegalMonitorStateException("当前线程并未持有锁");
}
System.out.println("UNLOCK " + Thread.currentThread().getName() + "- currentTicketNum:" + currentTickNum + "- serviceNum:" + serviceNum.get());
ticketNumHolder.remove(); // 清理ThreadLocal,防止内存泄漏
}
}
public class TicketLockV2Example {
private static int counter = 0;
private static final TicketLockV2 lock = new TicketLockV2();
public static void main(String[] args) throws InterruptedException {
// 创建多个线程进行测试
int threadCount = 5;
Thread[] threads = new Thread[threadCount];
for (int i = 0; i < threadCount; i++) {
threads[i] = new Thread(() -> {
lock.lock(); // 获取锁
counter++; // 临界区操作
lock.unlock(); // 确保在finally中释放锁
}, "Thread-" + i);
}
// 启动所有线程
for (Thread thread : threads) {
thread.start();
}
// 等待所有线程完成
for (Thread thread : threads) {
thread.join();
}
System.out.println("最终计数器的值: " + counter);
}
}
二、独占式锁 & 共享式锁
1、独占式锁(Exclusive Lock)
public class ExclusiveLockExample {
private final ReentrantLock lock = new ReentrantLock(); // 独占式锁
private int count = 0;
public void write() {
lock.lock(); // 只能被一个线程获取
try {
count++;
// 模拟写操作
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " 写入完成");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
2、共享式锁(Shared Lock)
public class SharedLockExample {
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock(); // 共享锁
private final Lock writeLock = rwLock.writeLock(); // 独占锁
private int data = 0;
// 读操作:可以多个线程同时读
public void read() {
readLock.lock(); // 获取共享锁
try {
System.out.println(Thread.currentThread().getName() + " 读取数据: " + data);
Thread.sleep(1000); // 模拟读取操作
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
readLock.unlock();
}
}
// 写操作:只能一个线程写
public void write(int newData) {
writeLock.lock(); // 获取独占锁
try {
data = newData;
System.out.println(Thread.currentThread().getName() + " 写入数据: " + newData);
Thread.sleep(1000); // 模拟写入操作
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
writeLock.unlock();
}
}
}
使用示例:
public class LockDemo {
public static void main(String[] args) {
SharedLockExample sharedLock = new SharedLockExample();
// 创建多个读线程
for (int i = 0; i < 3; i++) {
new Thread(() -> {
while (true) {
sharedLock.read();
}
}, "Reader-" + i).start();
}
// 创建写线程
new Thread(() -> {
int value = 0;
while (true) {
sharedLock.write(value++);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "Writer").start();
}
}
独占式锁:
- 同一时刻只能有一个线程获取锁
- 适用于写操作频繁的场景
- 典型实现:ReentrantLock, synchronized
- 特点:
- 互斥访问
- 可重入
- 高一致性
共享式锁:
- 同一时刻可以有多个线程获取锁
- 适用于读多写少的场景
- 典型实现:ReentrantReadWriteLock 的读锁
- 特点:
- 并发读
- 读写互斥
- 写写互斥
三、AQS
AQS (AbstractQueuedSynchronizer) 的核心概念。
3.1、AQS 的基本结构
public abstract class AbstractQueuedSynchronizer {
// 同步状态
private volatile int state;
// 等待队列的头节点
private transient volatile Node head;
// 等待队列的尾节点
private transient volatile Node tail;
// 内部类 Node,表示等待队列中的节点
static final class Node {
volatile Node prev; // 前驱节点
volatile Node next; // 后继节点
volatile Thread thread; // 当前线程
volatile int waitStatus; // 等待状态
}
}
3.2、基于 AQS 实现一个简单的互斥锁
public class SimpleLock extends AbstractQueuedSynchronizer {
// 尝试获取锁
protected boolean tryAcquire(int arg) {
// 期望状态为0,表示锁未被占用
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 释放锁
protected boolean tryRelease(int arg) {
if (getState() == 0) {
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// 对外提供的加锁方法
public void lock() {
acquire(1);
}
// 对外提供的解锁方法
public void unlock() {
release(1);
}
}
3.3、AQS 的工作流程示例:
public class AQSExample {
private static final SimpleLock lock = new SimpleLock();
private static int count = 0;
public static void main(String[] args) {
// 创建多个线程竞争锁
for (int i = 0; i < 3; i++) {
new Thread(() -> {
lock.lock();
try {
// 临界区操作
count++;
System.out.println(Thread.currentThread().getName() + " 获得锁");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}, "Thread-" + i).start();
}
}
}
3.4、AQS 的核心机制(源码)
// AQS 的主要操作流程
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
selfInterrupt();
}
}
// 入队操作
private Node addWaiter(Node mode) {
// 创建新节点
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
// CAS设置尾节点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 初始化队列
enq(node);
return node;
}
3.5、AQS 的主要特点
状态管理:
// 获取当前同步状态
protected final int getState() {
return state;
}
// 设置同步状态
protected final void setState(int newState) {
state = newState;
}
// CAS 修改同步状态
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
队列管理:
// 等待队列示意
head -> Node1 -> Node2 -> Node3 -> tail
// 每个节点包含:
// - 线程引用
// - 等待状态
// - 前驱和后继指针
3.6、AQS 的实际应用
// ReentrantLock 的实现示例
public class ReentrantLock {
private final Sync sync;
// 公平锁
public static class FairSync extends Sync {
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 检查队列中是否有前驱节点
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 处理重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
setState(nextc);
return true;
}
return false;
}
}
}
AQS 的核心概念:
同步状态(State):
- 使用 volatile 修饰的整数
- 表示当前同步状态
- 通过 CAS 操作进行修改
等待队列:
- 双向链表结构
- FIFO 顺序
- 支持独占和共享两种模式
实现方式:
- 独占模式:ReentrantLock
- 共享模式:Semaphore、CountDownLatch
- 组合模式:ReentrantReadWriteLock
主要优点:
- 提供了标准化的同步器实现框架
- 简化了锁的实现
- 支持公平和非公平两种方式
使用 AQS 的最佳实践:
继承 AQS 实现自定义同步器时:
- 重写 tryAcquire/tryRelease(独占模式)
- 重写 tryAcquireShared/tryReleaseShared(共享模式)
- 根据需要实现 isHeldExclusively
注意事项:
- 正确维护同步状态
- 考虑公平性需求
- 处理中断和超时
四、 ConcurrentHashMap & HashMap
4.1、基本结构
// HashMap
public class HashMap<K,V> {
// 数组 + 链表/红黑树
Node<K,V>[] table;
static class Node<K,V> {
final int hash;
final K key;
V value;
Node<K,V> next;
}
}
// ConcurrentHashMap
public class ConcurrentHashMap<K,V> {
// 数组 + 链表/红黑树 + 分段锁
private transient volatile Node<K,V>[] table;
static class Node<K,V> {
final int hash;
final K key;
volatile V value;
volatile Node<K,V> next;
}
}
4.2、并发操作对比
// HashMap 线程不安全的示例
public class HashMapThreadUnsafe {
public static void main(String[] args) throws InterruptedException {
Map<Integer, Integer> map = new HashMap<>();
// 多线程并发写入
Thread t1 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
map.put(i, i);
}
});
Thread t2 = new Thread(() -> {
for (int i = 1000; i < 2000; i++) {
map.put(i, i);
}
});
t1.start();
t2.start();
t1.join();
t2.join();
// 结果可能小于2000
System.out.println("Size: " + map.size());
}
}
// ConcurrentHashMap 线程安全的示例
public class ConcurrentHashMapThreadSafe {
public static void main(String[] args) throws InterruptedException {
ConcurrentHashMap<Integer, Integer> map = new ConcurrentHashMap<>();
// 多线程并发写入
Thread t1 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
map.put(i, i);
}
});
Thread t2 = new Thread(() -> {
for (int i = 1000; i < 2000; i++) {
map.put(i, i);
}
});
t1.start();
t2.start();
t1.join();
t2.join();
// 结果一定是2000
System.out.println("Size: " + map.size());
}
}
4.3、常见操作的实现差异
// 1. 插入操作
public class MapOperations {
private HashMap<String, String> hashMap = new HashMap<>();
private ConcurrentHashMap<String, String> concurrentHashMap = new ConcurrentHashMap<>();
// HashMap 插入
public void putInHashMap(String key, String value) {
hashMap.put(key, value); // 非线程安全
}
// ConcurrentHashMap 插入
public void putInConcurrentHashMap(String key, String value) {
concurrentHashMap.put(key, value); // 线程安全
}
// ConcurrentHashMap 原子操作
public void atomicOperation() {
// 原子性的 put-if-absent
concurrentHashMap.putIfAbsent("key", "value");
// 原子性的 replace
concurrentHashMap.replace("key", "oldValue", "newValue");
// 原子性的计算
concurrentHashMap.compute("key", (k, v) -> v == null ? "default" : v + "updated");
}
}
4.4、实际应用场景
// 场景1:缓存实现
public class CacheImplementation {
private ConcurrentHashMap<String, Object> cache = new ConcurrentHashMap<>();
public Object getFromCache(String key, Supplier<Object> valueLoader) {
return cache.computeIfAbsent(key, k -> valueLoader.get());
}
}
// 场景2:计数器
public class ConcurrentCounter {
private ConcurrentHashMap<String, AtomicInteger> counters = new ConcurrentHashMap<>();
public void increment(String key) {
counters.computeIfAbsent(key, k -> new AtomicInteger(0)).incrementAndGet();
}
}
// 场景3:会话管理
public class SessionManager {
private ConcurrentHashMap<String, UserSession> sessions = new ConcurrentHashMap<>();
public void addSession(String sessionId, UserSession session) {
sessions.put(sessionId, session);
}
public UserSession getSession(String sessionId) {
return sessions.get(sessionId);
}
}
4.5、性能优化场景
public class PerformanceOptimization {
private ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
// 批量操作
public void batchOperation() {
// 使用 forEach 进行并行处理
map.forEach((k, v) -> {
// 处理每个元素
processEntry(k, v);
});
// 使用 search 查找元素
String result = map.search(1, (k, v) -> {
if (someCondition(k, v)) {
return v;
}
return null;
});
}
}
4.6、总结
主要区别:
- 线程安全性:
- HashMap:非线程安全
- ConcurrentHashMap:线程安全
- 实现机制:
- HashMap:数组 + 链表/红黑树
- ConcurrentHashMap:数组 + 链表/红黑树 + CAS + synchronized
- 性能特点:
- HashMap:单线程性能好
- ConcurrentHashMap:多线程性能好
使用场景:
- HashMap 适用于:
- 单线程环境
- 读多写少
- 对性能要求高
- 数据量较小
- ConcurrentHashMap 适用于:
- 多线程环境
- 并发读写
- 需要线程安全
- 缓存实现
- 计数器
- 会话管理
注意事项:
- 容量规划:
- 初始容量设置
- 负载因子选择
- 并发级别考虑
- 性能优化:
- 避免过度同步
- 合理使用批量操作
- 注意内存使用
- 功能选择:
- 根据实际需求选择
- 考虑是否需要线程安全
- 评估性能要求
评论