Java多线程实践

Posted by     "" on Thursday, June 4, 2020

Java 多线程实践:从并发基础到性能优化

引言

在 CPU 核心数从单核迈向几十核的今天,多线程编程早已从“高级技巧”变成了“必备技能”。无论是高并发的 Web 服务器、响应灵敏的桌面应用,还是数据密集型的批处理任务,多线程都是绕不开的话题。

然而,Java 多线程是一把双刃剑——用得好,能成倍提升程序性能;用得不好,死锁、竞态条件、性能下降等问题会让你陷入无尽的调试深渊。本文将带你从零开始,系统掌握 Java 多线程的核心知识,并通过大量实战代码,让你真正具备编写高并发程序的能力。

一、Java 内存模型与线程基础

1.1 线程与进程的区别

在深入代码之前,先理清两个基础概念:

特性 进程 线程
资源拥有 独立的地址空间、文件描述符等 共享进程的资源
创建开销 大(需要复制资源) 小(只需创建栈和寄存器)
通信方式 IPC(管道、消息队列、共享内存) 直接读写共享变量
切换成本 高(涉及地址空间切换) 低(只需切换栈和寄存器)
隔离性 强(一个进程崩溃不影响其他) 弱(一个线程崩溃可能导致整个进程退出)

简单说:进程是资源分配的最小单位,线程是 CPU 调度的最小单位

1.2 Java 内存模型(JMM)

Java 内存模型(JMM)定义了多线程环境下变量访问的规则,它解决了两个核心问题:可见性有序性

主内存与工作内存

JMM 规定:

  • 所有变量存储在主内存中
  • 每个线程有自己的工作内存(缓存副本)
  • 线程对变量的操作必须在工作内存中进行,不能直接读写主内存
┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│   Thread 1  │    │   Thread 2  │    │   Thread 3  │
│  工作内存    │    │  工作内存    │    │  工作内存    │
│  变量副本    │    │  变量副本    │    │  变量副本    │
└──────┬──────┘    └──────┬──────┘    └──────┬──────┘
       │                  │                  │
       └──────────────────┼──────────────────┘
                          ▼
                 ┌───────────────┐
                 │   主内存       │
                 │   共享变量     │
                 └───────────────┘

问题示例:可见性

public class VisibilityDemo {
    private static boolean running = true;
    
    public static void main(String[] args) throws InterruptedException {
        Thread worker = new Thread(() -> {
            while (running) {
                // 循环等待
            }
            System.out.println("线程退出");
        });
        worker.start();
        
        Thread.sleep(1000);
        running = false;  // 主线程修改 running
        System.out.println("已设置 running = false");
    }
}

你可能期望 worker 线程在 1 秒后退出,但事实上它可能永远不退出——因为 worker 线程一直从工作内存中读取 running 的旧值,没有看到主线程的修改。这就是可见性问题

解决方案:使用 volatile 关键字

private static volatile boolean running = true;  // 强制从主内存读取

1.3 happens-before 规则

JMM 通过 happens-before 规则来保证可见性和有序性。核心规则包括:

  • 程序顺序规则:同一个线程中,写在前面的操作 happens-before 后面的操作
  • volatile 规则:对 volatile 变量的写 happens-before 后续对这个变量的读
  • 锁规则:解锁 happens-before 后续的加锁
  • 传递性:A happens-before B,B happens-before C,则 A happens-before C

理解这些规则是编写正确并发程序的基础。

二、线程的创建与生命周期

2.1 三种创建方式

方式一:继承 Thread 类

public class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println("线程执行中: " + Thread.currentThread().getName());
    }
    
    public static void main(String[] args) {
        MyThread t = new MyThread();
        t.start();  // 启动线程
    }
}

方式二:实现 Runnable 接口(推荐)

public class MyRunnable implements Runnable {
    @Override
    public void run() {
        System.out.println("线程执行中: " + Thread.currentThread().getName());
    }
    
    public static void main(String[] args) {
        Thread t = new Thread(new MyRunnable());
        t.start();
    }
}

方式三:实现 Callable 接口(可返回结果)

public class MyCallable implements Callable<String> {
    @Override
    public String call() throws Exception {
        Thread.sleep(1000);
        return "执行结果";
    }
    
    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<String> future = executor.submit(new MyCallable());
        String result = future.get();  // 阻塞等待结果
        System.out.println(result);
        executor.shutdown();
    }
}

为什么推荐 Runnable?

  • Java 是单继承,实现接口比继承类更灵活
  • Runnable 可以更好地共享资源
  • 符合组合优于继承的设计原则

2.2 线程的生命周期

Java 线程有六种状态,可以通过 Thread.getState() 获取:

┌─────────────────────────────────────────────────────────┐
│                         NEW                              │
│                  (线程创建,尚未启动)                      │
└────────────────────┬────────────────────────────────────┘
                     │ start()
                     ▼
┌─────────────────────────────────────────────────────────┐
│                      RUNNABLE                            │
│              (就绪 + 运行,等待 CPU 时间片)               │
└────┬───────────────────┬────────────────────┬───────────┘
     │                   │                    │
     │ synchronized      │ wait()             │ sleep()
     │ LockSupport.park  │                    │ join()
     ▼                   ▼                    ▼
┌──────────┐      ┌──────────────┐     ┌──────────────┐
│ BLOCKED  │      │   WAITING    │     │ TIMED_WAITING│
│(等待锁释放)│      │ (无限期等待) │     │  (限时等待)   │
└────┬─────┘      └──────┬───────┘     └──────┬───────┘
     │                   │                    │
     │ 获取锁             │ notify()           │ 超时或被唤醒
     │                   │ notifyAll()        │
     └───────────────────┴────────────────────┘
                          ▼
                  ┌─────────────┐
                  │  TERMINATED │
                  │   (已结束)   │
                  └─────────────┘

状态说明

状态 描述 触发条件
NEW 新建 new Thread() 后,未调用 start()
RUNNABLE 运行中 start() 后,正在 JVM 中执行
BLOCKED 阻塞 等待进入 synchronized 代码块/方法
WAITING 等待 wait()join()LockSupport.park()
TIMED_WAITING 限时等待 sleep()wait(timeout)join(timeout)
TERMINATED 终止 run() 方法执行完毕

2.3 线程常用方法

public class ThreadMethodsDemo {
    public static void main(String[] args) throws InterruptedException {
        Thread t = new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                System.out.println("工作中...");
                try {
                    Thread.sleep(500);  // 让出 CPU,不释放锁
                } catch (InterruptedException e) {
                    System.out.println("被中断了");
                    return;  // 响应中断
                }
            }
        });
        
        t.start();
        
        // 等待 t 执行完毕(阻塞当前线程)
        t.join();
        
        // 中断线程
        t.interrupt();
        
        // 检查是否被中断(会清除中断标志)
        Thread.interrupted();
        
        // 检查是否被中断(不清除标志)
        t.isInterrupted();
        
        // 让出 CPU 时间片(提示调度器)
        Thread.yield();
        
        // 守护线程:主线程结束,守护线程自动结束
        t.setDaemon(true);
    }
}

三、线程安全与锁机制

3.1 synchronized 关键字

synchronized 是 Java 内置的锁机制,可以修饰方法和代码块。

用法一:修饰实例方法

public class Counter {
    private int count = 0;
    
    // 锁的是当前实例对象
    public synchronized void increment() {
        count++;
    }
    
    public synchronized int getCount() {
        return count;
    }
}

用法二:修饰静态方法

public class Counter {
    private static int count = 0;
    
    // 锁的是 Class 对象
    public static synchronized void increment() {
        count++;
    }
}

用法三:同步代码块(最灵活)

public class Counter {
    private int count = 0;
    private final Object lock = new Object();  // 专门的锁对象
    
    public void increment() {
        // 只锁必要的代码,减小锁粒度
        synchronized (lock) {
            count++;
        }
    }
}

synchronized 原理简析

synchronized 依赖于 JVM 的对象头实现。每个对象都有一个监视器锁(monitor),当线程进入同步块时,会尝试获取 monitor 的所有权。在 HotSpot JVM 中,synchronized 经过了多代优化:

无锁 → 偏向锁 → 轻量级锁 → 重量级锁
  • 偏向锁:无竞争时,锁会偏向于第一个获取它的线程
  • 轻量级锁:少量竞争时,通过 CAS 自旋获取
  • 重量级锁:竞争激烈时,升级为操作系统互斥锁

3.2 ReentrantLock

ReentrantLockjava.util.concurrent.locks 包提供的可重入锁,功能比 synchronized 更丰富:

public class ReentrantLockDemo {
    private final ReentrantLock lock = new ReentrantLock();
    private int count = 0;
    
    public void increment() {
        lock.lock();  // 获取锁
        try {
            count++;
        } finally {
            lock.unlock();  // 必须在 finally 中释放
        }
    }
    
    // 尝试获取锁(不阻塞)
    public boolean tryIncrement() {
        if (lock.tryLock()) {  // 立即返回
            try {
                count++;
                return true;
            } finally {
                lock.unlock();
            }
        }
        return false;
    }
    
    // 限时获取锁
    public boolean tryIncrementWithTimeout() throws InterruptedException {
        if (lock.tryLock(1, TimeUnit.SECONDS)) {
            try {
                count++;
                return true;
            } finally {
                lock.unlock();
            }
        }
        return false;
    }
    
    // 可中断的锁获取
    public void incrementInterruptibly() throws InterruptedException {
        lock.lockInterruptibly();  // 响应中断
        try {
            count++;
        } finally {
            lock.unlock();
        }
    }
}

synchronized vs ReentrantLock

特性 synchronized ReentrantLock
使用方式 自动获取/释放 需要手动 lock/unlock
可中断 不支持 支持 lockInterruptibly()
超时尝试 不支持 支持 tryLock(timeout)
公平锁 非公平 可选择公平/非公平
条件变量 wait/notify 多个 Condition
性能 经过优化后接近 略优于 synchronized
推荐程度 优先使用 需要高级特性时使用

3.3 公平锁与非公平锁

// 公平锁:线程按请求顺序获取锁(性能稍差)
ReentrantLock fairLock = new ReentrantLock(true);

// 非公平锁:允许插队(默认,性能更好)
ReentrantLock unfairLock = new ReentrantLock(false);

3.4 读写锁(ReadWriteLock)

读操作可以并发,写操作互斥,适合读多写少的场景:

public class ReadWriteLockCache<K, V> {
    private final Map<K, V> cache = new HashMap<>();
    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
    private final Lock readLock = rwLock.readLock();
    private final Lock writeLock = rwLock.writeLock();
    
    public V get(K key) {
        readLock.lock();
        try {
            return cache.get(key);
        } finally {
            readLock.unlock();
        }
    }
    
    public void put(K key, V value) {
        writeLock.lock();
        try {
            cache.put(key, value);
        } finally {
            writeLock.unlock();
        }
    }
    
    public void clear() {
        writeLock.lock();
        try {
            cache.clear();
        } finally {
            writeLock.unlock();
        }
    }
}

3.5 StampedLock(Java 8+)

StampedLock 是读写锁的改进版,支持乐观读,性能更高:

public class StampedLockDemo {
    private double x, y;
    private final StampedLock sl = new StampedLock();
    
    // 写锁
    public void move(double deltaX, double deltaY) {
        long stamp = sl.writeLock();
        try {
            x += deltaX;
            y += deltaY;
        } finally {
            sl.unlockWrite(stamp);
        }
    }
    
    // 乐观读
    public double distanceFromOrigin() {
        long stamp = sl.tryOptimisticRead();
        double currentX = x, currentY = y;
        // 验证在读期间是否有写操作
        if (!sl.validate(stamp)) {
            // 有写操作,升级为悲观读锁
            stamp = sl.readLock();
            try {
                currentX = x;
                currentY = y;
            } finally {
                sl.unlockRead(stamp);
            }
        }
        return Math.sqrt(currentX * currentX + currentY * currentY);
    }
}

四、线程间通信

4.1 wait/notify 机制

wait()notify()notifyAll() 是 Object 类的方法,必须在 synchronized 代码块中使用:

public class WaitNotifyDemo {
    private static final Object lock = new Object();
    private static boolean condition = false;
    
    public static void main(String[] args) {
        // 等待线程
        Thread waiter = new Thread(() -> {
            synchronized (lock) {
                while (!condition) {  // 必须用 while,不能是 if
                    try {
                        System.out.println("等待条件满足...");
                        lock.wait();  // 释放锁,进入等待
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
                System.out.println("条件满足,继续执行");
            }
        });
        
        // 通知线程
        Thread notifier = new Thread(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            synchronized (lock) {
                condition = true;
                lock.notifyAll();  // 唤醒所有等待线程
                System.out.println("已发送通知");
            }
        });
        
        waiter.start();
        notifier.start();
    }
}

生产者-消费者模式

public class ProducerConsumer {
    private final Queue<Integer> queue = new LinkedList<>();
    private final int MAX_SIZE = 10;
    private final Object lock = new Object();
    
    public void produce() throws InterruptedException {
        int value = 0;
        while (true) {
            synchronized (lock) {
                while (queue.size() == MAX_SIZE) {
                    lock.wait();  // 队列满,等待消费者消费
                }
                queue.offer(value);
                System.out.println("生产: " + value++);
                lock.notifyAll();  // 通知消费者
            }
            Thread.sleep(500);
        }
    }
    
    public void consume() throws InterruptedException {
        while (true) {
            synchronized (lock) {
                while (queue.isEmpty()) {
                    lock.wait();  // 队列空,等待生产者生产
                }
                int value = queue.poll();
                System.out.println("消费: " + value);
                lock.notifyAll();  // 通知生产者
            }
            Thread.sleep(1000);
        }
    }
}

4.2 Condition

ConditionReentrantLock 的配套通信机制,比 wait/notify 更灵活(支持多个等待队列):

public class ConditionDemo {
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();
    private final Queue<Integer> queue = new LinkedList<>();
    private final int MAX_SIZE = 10;
    
    public void produce(int value) throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() == MAX_SIZE) {
                notFull.await();  // 等待队列不满
            }
            queue.offer(value);
            notEmpty.signal();    // 通知队列非空
        } finally {
            lock.unlock();
        }
    }
    
    public int consume() throws InterruptedException {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                notEmpty.await();  // 等待队列非空
            }
            int value = queue.poll();
            notFull.signal();      // 通知队列不满
            return value;
        } finally {
            lock.unlock();
        }
    }
}

五、并发工具类

Java 提供了丰富的并发工具类,极大地简化了多线程编程。

5.1 ThreadLocal

ThreadLocal 为每个线程维护独立的变量副本,常用于传递上下文(如用户信息、数据库连接):

public class ThreadLocalDemo {
    // 每个线程有自己的 SimpleDateFormat 实例
    private static final ThreadLocal<SimpleDateFormat> dateFormat = 
        ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
    
    // 用户上下文传递
    private static final ThreadLocal<String> currentUser = new ThreadLocal<>();
    
    public static void main(String[] args) {
        // 设置用户信息
        currentUser.set("张三");
        
        try {
            // 业务处理中可以随时获取当前用户
            String user = currentUser.get();
            System.out.println("当前用户: " + user);
        } finally {
            // 必须清理,防止内存泄漏(尤其是在线程池中)
            currentUser.remove();
        }
    }
}

内存泄漏警告:ThreadLocal 的 key 是弱引用,但 value 是强引用。如果不调用 remove(),在线程池场景下会导致内存泄漏。

5.2 原子类(Atomic)

原子类通过 CAS(Compare-And-Swap)实现无锁的线程安全操作:

public class AtomicDemo {
    private final AtomicInteger counter = new AtomicInteger(0);
    private final AtomicLong total = new AtomicLong(0);
    private final AtomicBoolean flag = new AtomicBoolean(false);
    private final AtomicReference<String> ref = new AtomicReference<>("initial");
    
    public void increment() {
        // 原子自增
        counter.incrementAndGet();
        
        // 原子加指定值
        total.addAndGet(100);
        
        // CAS 操作
        counter.compareAndSet(expectedValue, newValue);
        
        // 原子更新引用
        ref.compareAndSet("initial", "updated");
    }
    
    // 使用 LongAdder 替代 AtomicLong(更高并发)
    private final LongAdder adder = new LongAdder();
    public void add() {
        adder.increment();  // 性能更好
        long sum = adder.sum();
    }
}

CAS 原理:CAS 是 CPU 原子指令,比较内存值是否等于期望值,相等则更新为新值。整个过程是原子的,避免了锁的开销。

5.3 CountDownLatch

CountDownLatch 让一个或多个线程等待其他线程完成操作:

public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        int workerCount = 5;
        CountDownLatch latch = new CountDownLatch(workerCount);
        
        for (int i = 0; i < workerCount; i++) {
            final int workerId = i;
            new Thread(() -> {
                System.out.println("工人 " + workerId + " 开始工作");
                try {
                    Thread.sleep(1000);  // 模拟工作
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                System.out.println("工人 " + workerId + " 完成工作");
                latch.countDown();  // 计数减1
            }).start();
        }
        
        System.out.println("等待所有工人完成...");
        latch.await();  // 等待计数归零
        System.out.println("所有工作完成,继续执行");
    }
}

5.4 CyclicBarrier

CyclicBarrier 让一组线程互相等待,都到达屏障点后再继续执行(可循环使用):

public class CyclicBarrierDemo {
    public static void main(String[] args) {
        int partyCount = 3;
        CyclicBarrier barrier = new CyclicBarrier(partyCount, () -> {
            System.out.println("所有人都到齐了,开始下一阶段");
        });
        
        for (int i = 0; i < partyCount; i++) {
            final int id = i;
            new Thread(() -> {
                try {
                    System.out.println("玩家 " + id + " 到达集合点");
                    barrier.await();  // 等待其他线程
                    
                    System.out.println("玩家 " + id + " 开始任务");
                    Thread.sleep(500);
                    barrier.await();  // 可以重复使用
                    
                    System.out.println("玩家 " + id + " 完成");
                } catch (Exception e) {
                    Thread.currentThread().interrupt();
                }
            }).start();
        }
    }
}

CountDownLatch vs CyclicBarrier

特性 CountDownLatch CyclicBarrier
用途 等待其他线程完成 多个线程互相等待
可重用 不可重用(计数归零后失效) 可重用(调用 reset())
计数方式 减计数 加计数
屏障动作 不支持 支持到达屏障时的回调

5.5 Semaphore

Semaphore 控制同时访问某个资源的线程数量:

public class SemaphoreDemo {
    // 限流:最多 3 个线程同时访问
    private final Semaphore semaphore = new Semaphore(3);
    
    public void accessResource() {
        try {
            semaphore.acquire();  // 获取许可(阻塞)
            // semaphore.acquire(2);  // 获取多个许可
            try {
                System.out.println(Thread.currentThread().getName() + " 正在访问资源");
                Thread.sleep(1000);
            } finally {
                semaphore.release();  // 释放许可
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    // 尝试获取许可(非阻塞)
    public boolean tryAccess() {
        return semaphore.tryAcquire();
    }
}

5.6 CompletableFuture(Java 8+)

CompletableFuture 是异步编程的利器,支持链式调用和组合操作:

public class CompletableFutureDemo {
    public static void main(String[] args) throws Exception {
        // 1. 创建异步任务
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return "Hello";
        });
        
        // 2. 链式处理
        CompletableFuture<String> result = future
            .thenApply(s -> s + " World")      // 转换
            .thenApply(String::toUpperCase)
            .thenApply(s -> s + "!");
        
        System.out.println(result.get());  // HELLO WORLD!
        
        // 3. 异常处理
        CompletableFuture<Integer> withError = CompletableFuture
            .supplyAsync(() -> {
                if (true) throw new RuntimeException("出错了");
                return 42;
            })
            .exceptionally(ex -> {
                System.out.println("异常: " + ex.getMessage());
                return 0;
            });
        
        // 4. 组合多个 Future
        CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "Hello");
        CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "World");
        
        CompletableFuture<String> combined = f1.thenCombine(f2, (a, b) -> a + " " + b);
        
        // 5. 等待多个完成
        CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2);
        all.join();  // 等待全部完成
        
        // 6. 任意一个完成
        CompletableFuture<Object> any = CompletableFuture.anyOf(f1, f2);
        
        // 7. 异步回调
        CompletableFuture.supplyAsync(() -> "数据")
            .thenAcceptAsync(result -> {
                System.out.println("处理结果: " + result);
            });
    }
}

六、线程池

线程池的核心思想是复用线程,避免频繁创建和销毁线程的开销。

6.1 Executor 框架

public class ThreadPoolDemo {
    public static void main(String[] args) {
        // 1. 固定大小的线程池
        ExecutorService fixedPool = Executors.newFixedThreadPool(5);
        
        // 2. 单线程线程池
        ExecutorService singlePool = Executors.newSingleThreadExecutor();
        
        // 3. 缓存线程池(可无限扩展)
        ExecutorService cachedPool = Executors.newCachedThreadPool();
        
        // 4. 定时任务线程池
        ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(3);
        
        // 提交任务
        fixedPool.submit(() -> System.out.println("执行任务"));
        
        // 定时执行
        scheduledPool.schedule(() -> System.out.println("延迟1秒"), 1, TimeUnit.SECONDS);
        scheduledPool.scheduleAtFixedRate(() -> System.out.println("每秒执行"), 
            0, 1, TimeUnit.SECONDS);
        
        // 关闭线程池
        fixedPool.shutdown();        // 不再接收新任务,等待已有任务完成
        fixedPool.shutdownNow();     // 尝试停止所有正在执行的任务
    }
}

6.2 ThreadPoolExecutor 核心参数

Executors 工厂方法底层都是 ThreadPoolExecutor,理解其参数是配置线程池的关键:

public class CustomThreadPool {
    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            2,                      // corePoolSize: 核心线程数
            5,                      // maximumPoolSize: 最大线程数
            60,                     // keepAliveTime: 空闲线程存活时间
            TimeUnit.SECONDS,       // unit: 时间单位
            new LinkedBlockingQueue<>(10),  // workQueue: 任务队列
            Executors.defaultThreadFactory(),  // threadFactory: 线程工厂
            new ThreadPoolExecutor.AbortPolicy()  // handler: 拒绝策略
        );
    }
}

参数详解

参数 说明
corePoolSize 核心线程数,即使空闲也会保留
maximumPoolSize 最大线程数,队列满时会创建新线程直到此值
workQueue 任务队列,当核心线程都在忙时,任务放入队列
keepAliveTime 非核心线程的空闲存活时间
threadFactory 创建线程的工厂,可以自定义线程名称
handler 拒绝策略(队列满且线程数达到最大时)

任务处理流程

        提交任务
           │
           ▼
    ┌──────────────┐
    │ 核心线程是否  │──是──▶ 核心线程执行
    │   已满?     │
    └──────┬───────┘
           │ 否
           ▼
    ┌──────────────┐
    │  任务队列是否 │──是──▶ 加入队列等待
    │   已满?     │
    └──────┬───────┘
           │ 否
           ▼
    ┌──────────────┐
    │ 总线程数是否 │──是──▶ 创建非核心线程执行
    │   达到最大?  │
    └──────┬───────┘
           │ 否
           ▼
    触发拒绝策略

拒绝策略

策略 行为
AbortPolicy 抛出 RejectedExecutionException(默认)
CallerRunsPolicy 由提交任务的线程执行(减轻线程池压力)
DiscardPolicy 静默丢弃任务
DiscardOldestPolicy 丢弃队列中最老的任务,然后重试

6.3 工作窃取线程池(ForkJoinPool)

ForkJoinPool 采用工作窃取算法,适合递归分解任务:

public class ForkJoinDemo {
    // 计算 1 到 1亿 的和
    static class SumTask extends RecursiveTask<Long> {
        private static final int THRESHOLD = 10000;
        private final int start;
        private final int end;
        
        SumTask(int start, int end) {
            this.start = start;
            this.end = end;
        }
        
        @Override
        protected Long compute() {
            if (end - start <= THRESHOLD) {
                // 任务足够小,直接计算
                long sum = 0;
                for (int i = start; i <= end; i++) {
                    sum += i;
                }
                return sum;
            } else {
                // 分割任务
                int mid = (start + end) / 2;
                SumTask left = new SumTask(start, mid);
                SumTask right = new SumTask(mid + 1, end);
                
                // 并行执行子任务
                left.fork();
                long rightResult = right.compute();
                long leftResult = left.join();
                
                return leftResult + rightResult;
            }
        }
    }
    
    public static void main(String[] args) {
        ForkJoinPool pool = ForkJoinPool.commonPool();
        SumTask task = new SumTask(1, 100_000_000);
        long result = pool.invoke(task);
        System.out.println("结果: " + result);
    }
}

6.4 线程池最佳实践

1. 不要使用 Executors 默认的线程池

// ❌ 不推荐:队列无限,可能导致 OOM
ExecutorService fixed = Executors.newFixedThreadPool(10);
ExecutorService cached = Executors.newCachedThreadPool();

// ✅ 推荐:自定义参数,明确边界
ThreadPoolExecutor pool = new ThreadPoolExecutor(
    10, 20, 60L, TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(1000),
    new ThreadPoolExecutor.CallerRunsPolicy()
);

2. 设置合理的线程数

// CPU 密集型任务
int cpuCoreCount = Runtime.getRuntime().availableProcessors();
int cpuIntensiveThreads = cpuCoreCount + 1;

// IO 密集型任务
int ioIntensiveThreads = cpuCoreCount * 2;  // 经验值
// 更精确的计算: 线程数 = CPU核心数 / (1 - 阻塞系数)
// 阻塞系数 0.8-0.9 时: 线程数 = 核心数 * (1 / 0.2) = 5倍

3. 优雅关闭线程池

public void gracefulShutdown(ExecutorService pool) {
    pool.shutdown();  // 不再接收新任务
    try {
        // 等待 60 秒让任务完成
        if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
            pool.shutdownNow();  // 超时则强制关闭
            // 再等 60 秒让被中断的任务响应
            if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
                System.err.println("线程池未能完全关闭");
            }
        }
    } catch (InterruptedException e) {
        pool.shutdownNow();
        Thread.currentThread().interrupt();
    }
}

4. 监控线程池

public void monitorPool(ThreadPoolExecutor pool) {
    ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor();
    monitor.scheduleAtFixedRate(() -> {
        System.out.println("=== 线程池状态 ===");
        System.out.println("核心线程数: " + pool.getCorePoolSize());
        System.out.println("活动线程数: " + pool.getActiveCount());
        System.out.println("最大线程数: " + pool.getMaximumPoolSize());
        System.out.println("队列大小: " + pool.getQueue().size());
        System.out.println("已完成任务数: " + pool.getCompletedTaskCount());
        System.out.println("总任务数: " + pool.getTaskCount());
    }, 0, 5, TimeUnit.SECONDS);
}

七、并发容器

7.1 ConcurrentHashMap

ConcurrentHashMap 是线程安全的 HashMap,采用分段锁(Java 7)或 CAS + synchronized(Java 8+)实现:

public class ConcurrentHashMapDemo {
    public static void main(String[] args) {
        ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
        
        // 原子操作
        map.putIfAbsent("key", 1);           // 不存在才放入
        map.computeIfAbsent("key", k -> 2);   // 不存在时计算
        map.compute("key", (k, v) -> v == null ? 1 : v + 1);
        map.merge("key", 1, Integer::sum);   // 合并
        
        // 遍历(弱一致性,不抛 ConcurrentModificationException)
        map.forEach((k, v) -> System.out.println(k + ": " + v));
        
        // 批量操作
        map.forEach(2, (k, v) -> System.out.println(k + ": " + v));
        map.search(2, (k, v) -> v > 10 ? k : null);
        map.reduce(2, (k, v) -> v, Integer::sum);
    }
}

7.2 CopyOnWriteArrayList

适用于读多写少的场景,写操作会复制整个数组:

public class CopyOnWriteArrayListDemo {
    // 适合读多写少的场景(如黑名单、配置列表)
    private final CopyOnWriteArrayList<String> blacklist = new CopyOnWriteArrayList<>();
    
    public void addBlacklist(String ip) {
        blacklist.add(ip);  // 复制数组,代价较高
    }
    
    public boolean isBlocked(String ip) {
        return blacklist.contains(ip);  // 无锁读取
    }
    
    // 遍历时不会抛 ConcurrentModificationException
    public void printAll() {
        for (String ip : blacklist) {  // 使用快照迭代
            System.out.println(ip);
        }
    }
}

7.3 BlockingQueue

阻塞队列是生产者-消费者模式的核心组件:

public class BlockingQueueDemo {
    public static void main(String[] args) throws InterruptedException {
        // 有界阻塞队列
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
        
        // 无界阻塞队列(可能导致 OOM)
        BlockingQueue<Integer> unbounded = new LinkedBlockingQueue<>();
        
        // 优先级阻塞队列
        BlockingQueue<Task> priorityQueue = new PriorityBlockingQueue<>();
        
        // 延迟队列
        DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();
        
        // 同步队列(容量为 0,用于交接)
        SynchronousQueue<String> syncQueue = new SynchronousQueue<>();
        
        // 常用方法
        queue.put(1);           // 阻塞直到有空间
        Integer val = queue.take();  // 阻塞直到有元素
        boolean offered = queue.offer(1, 1, TimeUnit.SECONDS);  // 限时阻塞
        Integer polled = queue.poll(1, TimeUnit.SECONDS);       // 限时阻塞
    }
    
    static class Task implements Comparable<Task> {
        int priority;
        @Override
        public int compareTo(Task o) {
            return Integer.compare(o.priority, this.priority);
        }
    }
}

八、死锁与问题排查

8.1 死锁示例

public class DeadlockDemo {
    private static final Object lockA = new Object();
    private static final Object lockB = new Object();
    
    public static void main(String[] args) {
        Thread t1 = new Thread(() -> {
            synchronized (lockA) {
                System.out.println("线程1 持有 lockA");
                try { Thread.sleep(100); } catch (InterruptedException e) {}
                System.out.println("线程1 等待 lockB");
                synchronized (lockB) {
                    System.out.println("线程1 获取 lockB");
                }
            }
        });
        
        Thread t2 = new Thread(() -> {
            synchronized (lockB) {
                System.out.println("线程2 持有 lockB");
                try { Thread.sleep(100); } catch (InterruptedException e) {}
                System.out.println("线程2 等待 lockA");
                synchronized (lockA) {
                    System.out.println("线程2 获取 lockA");
                }
            }
        });
        
        t1.start();
        t2.start();
    }
}

8.2 死锁的四个必要条件

  1. 互斥条件:资源一次只能被一个线程占用
  2. 持有并等待:线程持有资源的同时等待其他资源
  3. 不可剥夺:已分配的资源不能被强制剥夺
  4. 循环等待:多个线程形成循环等待链

8.3 避免死锁的方法

// 方法1:固定加锁顺序
public class OrderedLock {
    private static final Object lockA = new Object();
    private static final Object lockB = new Object();
    private static final int HASH_A = System.identityHashCode(lockA);
    private static final int HASH_B = System.identityHashCode(lockB);
    
    public void safeMethod() {
        // 总是按 hashCode 顺序加锁
        Object first = HASH_A < HASH_B ? lockA : lockB;
        Object second = HASH_A < HASH_B ? lockB : lockA;
        
        synchronized (first) {
            synchronized (second) {
                // 业务逻辑
            }
        }
    }
}

// 方法2:使用 tryLock 超时
public class TryLockSolution {
    private final ReentrantLock lockA = new ReentrantLock();
    private final ReentrantLock lockB = new ReentrantLock();
    
    public void safeMethod() throws InterruptedException {
        while (true) {
            if (lockA.tryLock(1, TimeUnit.SECONDS)) {
                try {
                    if (lockB.tryLock(1, TimeUnit.SECONDS)) {
                        try {
                            // 业务逻辑
                            return;
                        } finally {
                            lockB.unlock();
                        }
                    }
                } finally {
                    lockA.unlock();
                }
            }
            // 获取失败,稍后重试
            Thread.sleep(10);
        }
    }
}

8.4 问题排查工具

1. jstack 查看线程栈

# 获取 Java 进程 PID
jps -l

# 打印线程栈
jstack <pid>

# 检测死锁
jstack -l <pid> | grep "deadlock"

2. 代码层面检测死锁

public class DeadlockDetector {
    public static void detectDeadlock() {
        ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
        long[] deadlockedThreads = threadBean.findDeadlockedThreads();
        
        if (deadlockedThreads != null) {
            System.out.println("发现死锁线程:");
            for (long id : deadlockedThreads) {
                ThreadInfo info = threadBean.getThreadInfo(id);
                System.out.println("线程: " + info.getThreadName());
                for (StackTraceElement element : info.getStackTrace()) {
                    System.out.println("  " + element);
                }
            }
        }
    }
}

九、性能优化实践

9.1 减少锁的粒度

// ❌ 粗粒度锁
public class CoarseLock {
    private final List<String> list1 = new ArrayList<>();
    private final List<String> list2 = new ArrayList<>();
    
    public synchronized void addToList1(String item) {
        list1.add(item);
    }
    
    public synchronized void addToList2(String item) {
        list2.add(item);
    }
}

// ✅ 细粒度锁
public class FineGrainedLock {
    private final List<String> list1 = new ArrayList<>();
    private final List<String> list2 = new ArrayList<>();
    private final Object lock1 = new Object();
    private final Object lock2 = new Object();
    
    public void addToList1(String item) {
        synchronized (lock1) {
            list1.add(item);
        }
    }
    
    public void addToList2(String item) {
        synchronized (lock2) {
            list2.add(item);
        }
    }
}

9.2 锁分离

public class LockSeparation {
    // 读写分离
    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
    
    // 分段锁(如 ConcurrentHashMap 的实现思想)
    private final Object[] segments;
    
    public LockSeparation(int concurrencyLevel) {
        segments = new Object[concurrencyLevel];
        for (int i = 0; i < concurrencyLevel; i++) {
            segments[i] = new Object();
        }
    }
    
    private int getSegment(Object key) {
        return Math.abs(key.hashCode() % segments.length);
    }
    
    public void put(Object key, Object value) {
        synchronized (segments[getSegment(key)]) {
            // 只锁一个分段
        }
    }
}

9.3 使用无锁数据结构

public class LockFreeDemo {
    // 无锁栈
    private final AtomicReference<Node> top = new AtomicReference<>();
    
    static class Node {
        int value;
        Node next;
        Node(int value) { this.value = value; }
    }
    
    public void push(int value) {
        Node newNode = new Node(value);
        Node oldTop;
        do {
            oldTop = top.get();
            newNode.next = oldTop;
        } while (!top.compareAndSet(oldTop, newNode));
    }
    
    public int pop() {
        Node oldTop;
        Node newTop;
        do {
            oldTop = top.get();
            if (oldTop == null) throw new IllegalStateException("Empty");
            newTop = oldTop.next;
        } while (!top.compareAndSet(oldTop, newTop));
        return oldTop.value;
    }
}

十、完整实战:并发任务执行器

综合以上知识,构建一个支持超时、重试、并发控制的通用任务执行器:

public class ConcurrentTaskExecutor {
    private final ThreadPoolExecutor executor;
    private final int maxRetries;
    
    public ConcurrentTaskExecutor(int coreThreads, int maxThreads, int maxRetries) {
        this.maxRetries = maxRetries;
        this.executor = new ThreadPoolExecutor(
            coreThreads, maxThreads, 60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(1000),
            new ThreadFactoryBuilder().setNameFormat("task-%d").build(),
            new ThreadPoolExecutor.CallerRunsPolicy()
        );
    }
    
    public <T> CompletableFuture<T> submit(Callable<T> task, long timeout, TimeUnit unit) {
        CompletableFuture<T> future = new CompletableFuture<>();
        
        executor.submit(() -> {
            try {
                T result = executeWithRetry(task, maxRetries);
                future.complete(result);
            } catch (Exception e) {
                future.completeExceptionally(e);
            }
        });
        
        // 超时处理
        ScheduledExecutorService timeoutMonitor = Executors.newSingleThreadScheduledExecutor();
        timeoutMonitor.schedule(() -> {
            if (!future.isDone()) {
                future.completeExceptionally(new TimeoutException("任务超时"));
            }
            timeoutMonitor.shutdown();
        }, timeout, unit);
        
        return future;
    }
    
    private <T> T executeWithRetry(Callable<T> task, int retries) throws Exception {
        for (int i = 0; i <= retries; i++) {
            try {
                return task.call();
            } catch (Exception e) {
                if (i == retries) {
                    throw e;
                }
                System.out.println("任务失败,第 " + (i + 1) + " 次重试");
                Thread.sleep((long) (Math.pow(2, i) * 100));  // 指数退避
            }
        }
        throw new RuntimeException("不可达");
    }
    
    public void shutdown() {
        executor.shutdown();
        try {
            if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
                executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
    
    // 使用示例
    public static void main(String[] args) throws Exception {
        ConcurrentTaskExecutor executor = new ConcurrentTaskExecutor(5, 10, 3);
        
        List<CompletableFuture<String>> futures = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            final int taskId = i;
            CompletableFuture<String> future = executor.submit(() -> {
                Thread.sleep(500);
                return "Task " + taskId + " completed";
            }, 5, TimeUnit.SECONDS);
            futures.add(future);
        }
        
        // 等待所有任务完成
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
            .thenRun(() -> System.out.println("所有任务完成"));
        
        // 获取结果
        for (CompletableFuture<String> future : futures) {
            System.out.println(future.get());
        }
        
        executor.shutdown();
    }
}

总结

场景 推荐方案
简单同步 synchronized
需要超时/中断 ReentrantLock
读多写少 ReadWriteLock / StampedLock
线程本地变量 ThreadLocal
原子操作 AtomicXXX / LongAdder
异步编程 CompletableFuture
线程池 ThreadPoolExecutor(自定义参数)
并发 Map ConcurrentHashMap
读多写少的 List CopyOnWriteArrayList
生产者-消费者 BlockingQueue
等待多个任务 CountDownLatch
线程互相等待 CyclicBarrier
限流 Semaphore
递归分解任务 ForkJoinPool

核心原则

  1. 优先使用 java.util.concurrent 包的工具类
  2. 能不用锁就不用锁,能用原子类就用原子类
  3. 必须用锁时,尽量减小锁粒度
  4. 始终注意可见性问题(volatile、synchronized)
  5. 正确管理线程池资源,优雅关闭
  6. 使用 ThreadLocal 后务必 remove()

多线程编程是一门实践性很强的技术,唯有通过大量的编码和调试,才能真正掌握其中的精髓。希望本文能为你打下坚实的基础!