并发编程 Code

并发编程 Code

一、CAS 无锁算法

public class ReentrantSpinLock {
    private AtomicReference<Thread> cas = new AtomicReference<Thread>();
    private int count;

    public void lock() {
        Thread current = Thread.currentThread();
        if (current == cas.get()) { // 如果当前线程已经获取到了锁,线程数增加一,然后返回
            count++;
            return;
        }
        // 如果没获取到锁,则通过CAS自旋
        while (!cas.compareAndSet(null, current)) {
            // DO nothing
        }
    }

    public void unlock() {
        Thread cur = Thread.currentThread();
        if (cur == cas.get()) {
            if (count > 0) {// 如果大于0,表示当前线程多次获取了该锁,释放锁通过count减一来模拟
                count--;
            } else {// 如果count==0,可以将锁释放,这样就能保证获取锁的次数与释放锁的次数是一致的了。
                cas.compareAndSet(cur, null);
            }
        }
    }
}

TicketLock 解决公平性的问题。

public class TicketLockV2 {
    /**
     * 服务号
     */
    private AtomicInteger serviceNum = new AtomicInteger();
    /**
     * 排队号
     */
    private AtomicInteger ticketNum = new AtomicInteger();
    /**
     * 新增一个ThreadLocal,用于存储每个线程的排队号
     */
    private ThreadLocal<Integer> ticketNumHolder = new ThreadLocal<Integer>();

    public void lock() {
        int currentTicketNum = ticketNum.getAndIncrement();
        ticketNumHolder.set(currentTicketNum);
        System.out.println("LOCK " + Thread.currentThread().getName() + " currentTicketNum:" + currentTicketNum + " serviceNum:" + serviceNum.get());
        while (currentTicketNum != serviceNum.get()) {
            Thread.onSpinWait(); // 添加自旋优化
        }
    }

    public void unlock() {
        Integer currentTickNum = ticketNumHolder.get();
        if (currentTickNum == null) {
            throw new IllegalMonitorStateException("应该先获取锁再释放锁");
        }
        if (!serviceNum.compareAndSet(currentTickNum, currentTickNum + 1)) {
            throw new IllegalMonitorStateException("当前线程并未持有锁");
        }
        System.out.println("UNLOCK " + Thread.currentThread().getName() + "- currentTicketNum:" + currentTickNum + "- serviceNum:" + serviceNum.get());
        ticketNumHolder.remove(); // 清理ThreadLocal,防止内存泄漏
    }
}

public class TicketLockV2Example {

    private static int counter = 0;
    private static final TicketLockV2 lock = new TicketLockV2();

    public static void main(String[] args) throws InterruptedException {
        // 创建多个线程进行测试
        int threadCount = 5;
        Thread[] threads = new Thread[threadCount];

        for (int i = 0; i < threadCount; i++) {
            threads[i] = new Thread(() -> {
                lock.lock();  // 获取锁
                counter++;    // 临界区操作
                lock.unlock(); // 确保在finally中释放锁
            }, "Thread-" + i);
        }

        // 启动所有线程
        for (Thread thread : threads) {
            thread.start();
        }

        // 等待所有线程完成
        for (Thread thread : threads) {
            thread.join();
        }

        System.out.println("最终计数器的值: " + counter);
    }
}

二、独占式锁 & 共享式锁

1、独占式锁(Exclusive Lock)

public class ExclusiveLockExample {
    private final ReentrantLock lock = new ReentrantLock();  // 独占式锁
    private int count = 0;
    
    public void write() {
        lock.lock();  // 只能被一个线程获取
        try {
            count++;
            // 模拟写操作
            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName() + " 写入完成");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

2、共享式锁(Shared Lock)

public class SharedLockExample {
    private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
    private final Lock readLock = rwLock.readLock();    // 共享锁
    private final Lock writeLock = rwLock.writeLock();  // 独占锁
    private int data = 0;
    
    // 读操作:可以多个线程同时读
    public void read() {
        readLock.lock();  // 获取共享锁
        try {
            System.out.println(Thread.currentThread().getName() + " 读取数据: " + data);
            Thread.sleep(1000);  // 模拟读取操作
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            readLock.unlock();
        }
    }
    
    // 写操作:只能一个线程写
    public void write(int newData) {
        writeLock.lock();  // 获取独占锁
        try {
            data = newData;
            System.out.println(Thread.currentThread().getName() + " 写入数据: " + newData);
            Thread.sleep(1000);  // 模拟写入操作
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            writeLock.unlock();
        }
    }
}

使用示例:

public class LockDemo {
    public static void main(String[] args) {
        SharedLockExample sharedLock = new SharedLockExample();
        
        // 创建多个读线程
        for (int i = 0; i < 3; i++) {
            new Thread(() -> {
                while (true) {
                    sharedLock.read();
                }
            }, "Reader-" + i).start();
        }
        
        // 创建写线程
        new Thread(() -> {
            int value = 0;
            while (true) {
                sharedLock.write(value++);
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "Writer").start();
    }
}

独占式锁:

  • 同一时刻只能有一个线程获取锁
  • 适用于写操作频繁的场景
  • 典型实现:ReentrantLock, synchronized
  • 特点:
    • 互斥访问
    • 可重入
    • 高一致性
      共享式锁:
  • 同一时刻可以有多个线程获取锁
  • 适用于读多写少的场景
  • 典型实现:ReentrantReadWriteLock 的读锁
  • 特点:
    • 并发读
    • 读写互斥
    • 写写互斥

三、AQS

AQS (AbstractQueuedSynchronizer) 的核心概念。

3.1、AQS 的基本结构

public abstract class AbstractQueuedSynchronizer {
    // 同步状态
    private volatile int state;
    
    // 等待队列的头节点
    private transient volatile Node head;
    
    // 等待队列的尾节点
    private transient volatile Node tail;
    
    // 内部类 Node,表示等待队列中的节点
    static final class Node {
        volatile Node prev;       // 前驱节点
        volatile Node next;       // 后继节点
        volatile Thread thread;   // 当前线程
        volatile int waitStatus; // 等待状态
    }
}

3.2、基于 AQS 实现一个简单的互斥锁

public class SimpleLock extends AbstractQueuedSynchronizer {
    // 尝试获取锁
    protected boolean tryAcquire(int arg) {
        // 期望状态为0,表示锁未被占用
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
    
    // 释放锁
    protected boolean tryRelease(int arg) {
        if (getState() == 0) {
            throw new IllegalMonitorStateException();
        }
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }
    
    // 对外提供的加锁方法
    public void lock() {
        acquire(1);
    }
    
    // 对外提供的解锁方法
    public void unlock() {
        release(1);
    }
}

3.3、AQS 的工作流程示例:

public class AQSExample {
    private static final SimpleLock lock = new SimpleLock();
    private static int count = 0;
    
    public static void main(String[] args) {
        // 创建多个线程竞争锁
        for (int i = 0; i < 3; i++) {
            new Thread(() -> {
                lock.lock();
                try {
                    // 临界区操作
                    count++;
                    System.out.println(Thread.currentThread().getName() + " 获得锁");
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }, "Thread-" + i).start();
        }
    }
}

3.4、AQS 的核心机制(源码)

// AQS 的主要操作流程
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
        selfInterrupt();
    }
}

// 入队操作
private Node addWaiter(Node mode) {
    // 创建新节点
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        // CAS设置尾节点
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 初始化队列
    enq(node);
    return node;
}

3.5、AQS 的主要特点

状态管理:

// 获取当前同步状态
protected final int getState() {
    return state;
}

// 设置同步状态
protected final void setState(int newState) {
    state = newState;
}

// CAS 修改同步状态
protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

队列管理:

// 等待队列示意
head -> Node1 -> Node2 -> Node3 -> tail
// 每个节点包含:
// - 线程引用
// - 等待状态
// - 前驱和后继指针

3.6、AQS 的实际应用

// ReentrantLock 的实现示例
public class ReentrantLock {
    private final Sync sync;
    
    // 公平锁
    public static class FairSync extends Sync {
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                // 检查队列中是否有前驱节点
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 处理重入
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                setState(nextc);
                return true;
            }
            return false;
        }
    }
}

AQS 的核心概念:
同步状态(State):

  • 使用 volatile 修饰的整数
  • 表示当前同步状态
  • 通过 CAS 操作进行修改

等待队列:

  • 双向链表结构
  • FIFO 顺序
  • 支持独占和共享两种模式

实现方式:

  • 独占模式:ReentrantLock
  • 共享模式:Semaphore、CountDownLatch
  • 组合模式:ReentrantReadWriteLock

主要优点:

  • 提供了标准化的同步器实现框架
  • 简化了锁的实现
  • 支持公平和非公平两种方式

使用 AQS 的最佳实践:
继承 AQS 实现自定义同步器时:

  • 重写 tryAcquire/tryRelease(独占模式)
  • 重写 tryAcquireShared/tryReleaseShared(共享模式)
  • 根据需要实现 isHeldExclusively

注意事项:

  • 正确维护同步状态
  • 考虑公平性需求
  • 处理中断和超时

四、 ConcurrentHashMap & HashMap

4.1、基本结构

// HashMap
public class HashMap<K,V> {
    // 数组 + 链表/红黑树
    Node<K,V>[] table;
    
    static class Node<K,V> {
        final int hash;
        final K key;
        V value;
        Node<K,V> next;
    }
}

// ConcurrentHashMap
public class ConcurrentHashMap<K,V> {
    // 数组 + 链表/红黑树 + 分段锁
    private transient volatile Node<K,V>[] table;
    
    static class Node<K,V> {
        final int hash;
        final K key;
        volatile V value;
        volatile Node<K,V> next;
    }
}

4.2、并发操作对比

// HashMap 线程不安全的示例
public class HashMapThreadUnsafe {
    public static void main(String[] args) throws InterruptedException {
        Map<Integer, Integer> map = new HashMap<>();
        
        // 多线程并发写入
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 1000; i++) {
                map.put(i, i);
            }
        });
        
        Thread t2 = new Thread(() -> {
            for (int i = 1000; i < 2000; i++) {
                map.put(i, i);
            }
        });
        
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        
        // 结果可能小于2000
        System.out.println("Size: " + map.size());
    }
}

// ConcurrentHashMap 线程安全的示例
public class ConcurrentHashMapThreadSafe {
    public static void main(String[] args) throws InterruptedException {
        ConcurrentHashMap<Integer, Integer> map = new ConcurrentHashMap<>();
        
        // 多线程并发写入
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 1000; i++) {
                map.put(i, i);
            }
        });
        
        Thread t2 = new Thread(() -> {
            for (int i = 1000; i < 2000; i++) {
                map.put(i, i);
            }
        });
        
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        
        // 结果一定是2000
        System.out.println("Size: " + map.size());
    }
}

4.3、常见操作的实现差异

// 1. 插入操作
public class MapOperations {
    private HashMap<String, String> hashMap = new HashMap<>();
    private ConcurrentHashMap<String, String> concurrentHashMap = new ConcurrentHashMap<>();
    
    // HashMap 插入
    public void putInHashMap(String key, String value) {
        hashMap.put(key, value);  // 非线程安全
    }
    
    // ConcurrentHashMap 插入
    public void putInConcurrentHashMap(String key, String value) {
        concurrentHashMap.put(key, value);  // 线程安全
    }
    
    // ConcurrentHashMap 原子操作
    public void atomicOperation() {
        // 原子性的 put-if-absent
        concurrentHashMap.putIfAbsent("key", "value");
        
        // 原子性的 replace
        concurrentHashMap.replace("key", "oldValue", "newValue");
        
        // 原子性的计算
        concurrentHashMap.compute("key", (k, v) -> v == null ? "default" : v + "updated");
    }
}

4.4、实际应用场景

// 场景1:缓存实现
public class CacheImplementation {
    private ConcurrentHashMap<String, Object> cache = new ConcurrentHashMap<>();
    
    public Object getFromCache(String key, Supplier<Object> valueLoader) {
        return cache.computeIfAbsent(key, k -> valueLoader.get());
    }
}

// 场景2:计数器
public class ConcurrentCounter {
    private ConcurrentHashMap<String, AtomicInteger> counters = new ConcurrentHashMap<>();
    
    public void increment(String key) {
        counters.computeIfAbsent(key, k -> new AtomicInteger(0)).incrementAndGet();
    }
}

// 场景3:会话管理
public class SessionManager {
    private ConcurrentHashMap<String, UserSession> sessions = new ConcurrentHashMap<>();
    
    public void addSession(String sessionId, UserSession session) {
        sessions.put(sessionId, session);
    }
    
    public UserSession getSession(String sessionId) {
        return sessions.get(sessionId);
    }
}

4.5、性能优化场景

public class PerformanceOptimization {
    private ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
    
    // 批量操作
    public void batchOperation() {
        // 使用 forEach 进行并行处理
        map.forEach((k, v) -> {
            // 处理每个元素
            processEntry(k, v);
        });
        
        // 使用 search 查找元素
        String result = map.search(1, (k, v) -> {
            if (someCondition(k, v)) {
                return v;
            }
            return null;
        });
    }
}

4.6、总结

主要区别:

  • 线程安全性:
    • HashMap:非线程安全
    • ConcurrentHashMap:线程安全
  • 实现机制:
    • HashMap:数组 + 链表/红黑树
    • ConcurrentHashMap:数组 + 链表/红黑树 + CAS + synchronized
  • 性能特点:
    • HashMap:单线程性能好
    • ConcurrentHashMap:多线程性能好

使用场景:

  • HashMap 适用于:
    • 单线程环境
    • 读多写少
    • 对性能要求高
    • 数据量较小
  • ConcurrentHashMap 适用于:
    • 多线程环境
    • 并发读写
    • 需要线程安全
    • 缓存实现
    • 计数器
    • 会话管理

注意事项:

  • 容量规划:
    • 初始容量设置
    • 负载因子选择
    • 并发级别考虑
  • 性能优化:
    • 避免过度同步
    • 合理使用批量操作
    • 注意内存使用
    • 功能选择:
    • 根据实际需求选择
    • 考虑是否需要线程安全
    • 评估性能要求

评论

暂无

添加新评论