Java 多线程实践:从并发基础到性能优化
引言
在 CPU 核心数从单核迈向几十核的今天,多线程编程早已从“高级技巧”变成了“必备技能”。无论是高并发的 Web 服务器、响应灵敏的桌面应用,还是数据密集型的批处理任务,多线程都是绕不开的话题。
然而,Java 多线程是一把双刃剑——用得好,能成倍提升程序性能;用得不好,死锁、竞态条件、性能下降等问题会让你陷入无尽的调试深渊。本文将带你从零开始,系统掌握 Java 多线程的核心知识,并通过大量实战代码,让你真正具备编写高并发程序的能力。
一、Java 内存模型与线程基础
1.1 线程与进程的区别
在深入代码之前,先理清两个基础概念:
| 特性 | 进程 | 线程 |
|---|---|---|
| 资源拥有 | 独立的地址空间、文件描述符等 | 共享进程的资源 |
| 创建开销 | 大(需要复制资源) | 小(只需创建栈和寄存器) |
| 通信方式 | IPC(管道、消息队列、共享内存) | 直接读写共享变量 |
| 切换成本 | 高(涉及地址空间切换) | 低(只需切换栈和寄存器) |
| 隔离性 | 强(一个进程崩溃不影响其他) | 弱(一个线程崩溃可能导致整个进程退出) |
简单说:进程是资源分配的最小单位,线程是 CPU 调度的最小单位。
1.2 Java 内存模型(JMM)
Java 内存模型(JMM)定义了多线程环境下变量访问的规则,它解决了两个核心问题:可见性和有序性。
主内存与工作内存
JMM 规定:
- 所有变量存储在主内存中
- 每个线程有自己的工作内存(缓存副本)
- 线程对变量的操作必须在工作内存中进行,不能直接读写主内存
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Thread 1 │ │ Thread 2 │ │ Thread 3 │
│ 工作内存 │ │ 工作内存 │ │ 工作内存 │
│ 变量副本 │ │ 变量副本 │ │ 变量副本 │
└──────┬──────┘ └──────┬──────┘ └──────┬──────┘
│ │ │
└──────────────────┼──────────────────┘
▼
┌───────────────┐
│ 主内存 │
│ 共享变量 │
└───────────────┘
问题示例:可见性
public class VisibilityDemo {
private static boolean running = true;
public static void main(String[] args) throws InterruptedException {
Thread worker = new Thread(() -> {
while (running) {
// 循环等待
}
System.out.println("线程退出");
});
worker.start();
Thread.sleep(1000);
running = false; // 主线程修改 running
System.out.println("已设置 running = false");
}
}
你可能期望 worker 线程在 1 秒后退出,但事实上它可能永远不退出——因为 worker 线程一直从工作内存中读取 running 的旧值,没有看到主线程的修改。这就是可见性问题。
解决方案:使用 volatile 关键字
private static volatile boolean running = true; // 强制从主内存读取
1.3 happens-before 规则
JMM 通过 happens-before 规则来保证可见性和有序性。核心规则包括:
- 程序顺序规则:同一个线程中,写在前面的操作 happens-before 后面的操作
- volatile 规则:对 volatile 变量的写 happens-before 后续对这个变量的读
- 锁规则:解锁 happens-before 后续的加锁
- 传递性:A happens-before B,B happens-before C,则 A happens-before C
理解这些规则是编写正确并发程序的基础。
二、线程的创建与生命周期
2.1 三种创建方式
方式一:继承 Thread 类
public class MyThread extends Thread {
@Override
public void run() {
System.out.println("线程执行中: " + Thread.currentThread().getName());
}
public static void main(String[] args) {
MyThread t = new MyThread();
t.start(); // 启动线程
}
}
方式二:实现 Runnable 接口(推荐)
public class MyRunnable implements Runnable {
@Override
public void run() {
System.out.println("线程执行中: " + Thread.currentThread().getName());
}
public static void main(String[] args) {
Thread t = new Thread(new MyRunnable());
t.start();
}
}
方式三:实现 Callable 接口(可返回结果)
public class MyCallable implements Callable<String> {
@Override
public String call() throws Exception {
Thread.sleep(1000);
return "执行结果";
}
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<String> future = executor.submit(new MyCallable());
String result = future.get(); // 阻塞等待结果
System.out.println(result);
executor.shutdown();
}
}
为什么推荐 Runnable?
- Java 是单继承,实现接口比继承类更灵活
- Runnable 可以更好地共享资源
- 符合组合优于继承的设计原则
2.2 线程的生命周期
Java 线程有六种状态,可以通过 Thread.getState() 获取:
┌─────────────────────────────────────────────────────────┐
│ NEW │
│ (线程创建,尚未启动) │
└────────────────────┬────────────────────────────────────┘
│ start()
▼
┌─────────────────────────────────────────────────────────┐
│ RUNNABLE │
│ (就绪 + 运行,等待 CPU 时间片) │
└────┬───────────────────┬────────────────────┬───────────┘
│ │ │
│ synchronized │ wait() │ sleep()
│ LockSupport.park │ │ join()
▼ ▼ ▼
┌──────────┐ ┌──────────────┐ ┌──────────────┐
│ BLOCKED │ │ WAITING │ │ TIMED_WAITING│
│(等待锁释放)│ │ (无限期等待) │ │ (限时等待) │
└────┬─────┘ └──────┬───────┘ └──────┬───────┘
│ │ │
│ 获取锁 │ notify() │ 超时或被唤醒
│ │ notifyAll() │
└───────────────────┴────────────────────┘
▼
┌─────────────┐
│ TERMINATED │
│ (已结束) │
└─────────────┘
状态说明:
| 状态 | 描述 | 触发条件 |
|---|---|---|
| NEW | 新建 | new Thread() 后,未调用 start() |
| RUNNABLE | 运行中 | start() 后,正在 JVM 中执行 |
| BLOCKED | 阻塞 | 等待进入 synchronized 代码块/方法 |
| WAITING | 等待 | wait()、join()、LockSupport.park() |
| TIMED_WAITING | 限时等待 | sleep()、wait(timeout)、join(timeout) |
| TERMINATED | 终止 | run() 方法执行完毕 |
2.3 线程常用方法
public class ThreadMethodsDemo {
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(() -> {
for (int i = 0; i < 5; i++) {
System.out.println("工作中...");
try {
Thread.sleep(500); // 让出 CPU,不释放锁
} catch (InterruptedException e) {
System.out.println("被中断了");
return; // 响应中断
}
}
});
t.start();
// 等待 t 执行完毕(阻塞当前线程)
t.join();
// 中断线程
t.interrupt();
// 检查是否被中断(会清除中断标志)
Thread.interrupted();
// 检查是否被中断(不清除标志)
t.isInterrupted();
// 让出 CPU 时间片(提示调度器)
Thread.yield();
// 守护线程:主线程结束,守护线程自动结束
t.setDaemon(true);
}
}
三、线程安全与锁机制
3.1 synchronized 关键字
synchronized 是 Java 内置的锁机制,可以修饰方法和代码块。
用法一:修饰实例方法
public class Counter {
private int count = 0;
// 锁的是当前实例对象
public synchronized void increment() {
count++;
}
public synchronized int getCount() {
return count;
}
}
用法二:修饰静态方法
public class Counter {
private static int count = 0;
// 锁的是 Class 对象
public static synchronized void increment() {
count++;
}
}
用法三:同步代码块(最灵活)
public class Counter {
private int count = 0;
private final Object lock = new Object(); // 专门的锁对象
public void increment() {
// 只锁必要的代码,减小锁粒度
synchronized (lock) {
count++;
}
}
}
synchronized 原理简析
synchronized 依赖于 JVM 的对象头实现。每个对象都有一个监视器锁(monitor),当线程进入同步块时,会尝试获取 monitor 的所有权。在 HotSpot JVM 中,synchronized 经过了多代优化:
无锁 → 偏向锁 → 轻量级锁 → 重量级锁
- 偏向锁:无竞争时,锁会偏向于第一个获取它的线程
- 轻量级锁:少量竞争时,通过 CAS 自旋获取
- 重量级锁:竞争激烈时,升级为操作系统互斥锁
3.2 ReentrantLock
ReentrantLock 是 java.util.concurrent.locks 包提供的可重入锁,功能比 synchronized 更丰富:
public class ReentrantLockDemo {
private final ReentrantLock lock = new ReentrantLock();
private int count = 0;
public void increment() {
lock.lock(); // 获取锁
try {
count++;
} finally {
lock.unlock(); // 必须在 finally 中释放
}
}
// 尝试获取锁(不阻塞)
public boolean tryIncrement() {
if (lock.tryLock()) { // 立即返回
try {
count++;
return true;
} finally {
lock.unlock();
}
}
return false;
}
// 限时获取锁
public boolean tryIncrementWithTimeout() throws InterruptedException {
if (lock.tryLock(1, TimeUnit.SECONDS)) {
try {
count++;
return true;
} finally {
lock.unlock();
}
}
return false;
}
// 可中断的锁获取
public void incrementInterruptibly() throws InterruptedException {
lock.lockInterruptibly(); // 响应中断
try {
count++;
} finally {
lock.unlock();
}
}
}
synchronized vs ReentrantLock
| 特性 | synchronized | ReentrantLock |
|---|---|---|
| 使用方式 | 自动获取/释放 | 需要手动 lock/unlock |
| 可中断 | 不支持 | 支持 lockInterruptibly() |
| 超时尝试 | 不支持 | 支持 tryLock(timeout) |
| 公平锁 | 非公平 | 可选择公平/非公平 |
| 条件变量 | wait/notify | 多个 Condition |
| 性能 | 经过优化后接近 | 略优于 synchronized |
| 推荐程度 | 优先使用 | 需要高级特性时使用 |
3.3 公平锁与非公平锁
// 公平锁:线程按请求顺序获取锁(性能稍差)
ReentrantLock fairLock = new ReentrantLock(true);
// 非公平锁:允许插队(默认,性能更好)
ReentrantLock unfairLock = new ReentrantLock(false);
3.4 读写锁(ReadWriteLock)
读操作可以并发,写操作互斥,适合读多写少的场景:
public class ReadWriteLockCache<K, V> {
private final Map<K, V> cache = new HashMap<>();
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
private final Lock writeLock = rwLock.writeLock();
public V get(K key) {
readLock.lock();
try {
return cache.get(key);
} finally {
readLock.unlock();
}
}
public void put(K key, V value) {
writeLock.lock();
try {
cache.put(key, value);
} finally {
writeLock.unlock();
}
}
public void clear() {
writeLock.lock();
try {
cache.clear();
} finally {
writeLock.unlock();
}
}
}
3.5 StampedLock(Java 8+)
StampedLock 是读写锁的改进版,支持乐观读,性能更高:
public class StampedLockDemo {
private double x, y;
private final StampedLock sl = new StampedLock();
// 写锁
public void move(double deltaX, double deltaY) {
long stamp = sl.writeLock();
try {
x += deltaX;
y += deltaY;
} finally {
sl.unlockWrite(stamp);
}
}
// 乐观读
public double distanceFromOrigin() {
long stamp = sl.tryOptimisticRead();
double currentX = x, currentY = y;
// 验证在读期间是否有写操作
if (!sl.validate(stamp)) {
// 有写操作,升级为悲观读锁
stamp = sl.readLock();
try {
currentX = x;
currentY = y;
} finally {
sl.unlockRead(stamp);
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
}
四、线程间通信
4.1 wait/notify 机制
wait()、notify()、notifyAll() 是 Object 类的方法,必须在 synchronized 代码块中使用:
public class WaitNotifyDemo {
private static final Object lock = new Object();
private static boolean condition = false;
public static void main(String[] args) {
// 等待线程
Thread waiter = new Thread(() -> {
synchronized (lock) {
while (!condition) { // 必须用 while,不能是 if
try {
System.out.println("等待条件满足...");
lock.wait(); // 释放锁,进入等待
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
System.out.println("条件满足,继续执行");
}
});
// 通知线程
Thread notifier = new Thread(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
synchronized (lock) {
condition = true;
lock.notifyAll(); // 唤醒所有等待线程
System.out.println("已发送通知");
}
});
waiter.start();
notifier.start();
}
}
生产者-消费者模式
public class ProducerConsumer {
private final Queue<Integer> queue = new LinkedList<>();
private final int MAX_SIZE = 10;
private final Object lock = new Object();
public void produce() throws InterruptedException {
int value = 0;
while (true) {
synchronized (lock) {
while (queue.size() == MAX_SIZE) {
lock.wait(); // 队列满,等待消费者消费
}
queue.offer(value);
System.out.println("生产: " + value++);
lock.notifyAll(); // 通知消费者
}
Thread.sleep(500);
}
}
public void consume() throws InterruptedException {
while (true) {
synchronized (lock) {
while (queue.isEmpty()) {
lock.wait(); // 队列空,等待生产者生产
}
int value = queue.poll();
System.out.println("消费: " + value);
lock.notifyAll(); // 通知生产者
}
Thread.sleep(1000);
}
}
}
4.2 Condition
Condition 是 ReentrantLock 的配套通信机制,比 wait/notify 更灵活(支持多个等待队列):
public class ConditionDemo {
private final ReentrantLock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
private final Queue<Integer> queue = new LinkedList<>();
private final int MAX_SIZE = 10;
public void produce(int value) throws InterruptedException {
lock.lock();
try {
while (queue.size() == MAX_SIZE) {
notFull.await(); // 等待队列不满
}
queue.offer(value);
notEmpty.signal(); // 通知队列非空
} finally {
lock.unlock();
}
}
public int consume() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
notEmpty.await(); // 等待队列非空
}
int value = queue.poll();
notFull.signal(); // 通知队列不满
return value;
} finally {
lock.unlock();
}
}
}
五、并发工具类
Java 提供了丰富的并发工具类,极大地简化了多线程编程。
5.1 ThreadLocal
ThreadLocal 为每个线程维护独立的变量副本,常用于传递上下文(如用户信息、数据库连接):
public class ThreadLocalDemo {
// 每个线程有自己的 SimpleDateFormat 实例
private static final ThreadLocal<SimpleDateFormat> dateFormat =
ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
// 用户上下文传递
private static final ThreadLocal<String> currentUser = new ThreadLocal<>();
public static void main(String[] args) {
// 设置用户信息
currentUser.set("张三");
try {
// 业务处理中可以随时获取当前用户
String user = currentUser.get();
System.out.println("当前用户: " + user);
} finally {
// 必须清理,防止内存泄漏(尤其是在线程池中)
currentUser.remove();
}
}
}
内存泄漏警告:ThreadLocal 的 key 是弱引用,但 value 是强引用。如果不调用 remove(),在线程池场景下会导致内存泄漏。
5.2 原子类(Atomic)
原子类通过 CAS(Compare-And-Swap)实现无锁的线程安全操作:
public class AtomicDemo {
private final AtomicInteger counter = new AtomicInteger(0);
private final AtomicLong total = new AtomicLong(0);
private final AtomicBoolean flag = new AtomicBoolean(false);
private final AtomicReference<String> ref = new AtomicReference<>("initial");
public void increment() {
// 原子自增
counter.incrementAndGet();
// 原子加指定值
total.addAndGet(100);
// CAS 操作
counter.compareAndSet(expectedValue, newValue);
// 原子更新引用
ref.compareAndSet("initial", "updated");
}
// 使用 LongAdder 替代 AtomicLong(更高并发)
private final LongAdder adder = new LongAdder();
public void add() {
adder.increment(); // 性能更好
long sum = adder.sum();
}
}
CAS 原理:CAS 是 CPU 原子指令,比较内存值是否等于期望值,相等则更新为新值。整个过程是原子的,避免了锁的开销。
5.3 CountDownLatch
CountDownLatch 让一个或多个线程等待其他线程完成操作:
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
int workerCount = 5;
CountDownLatch latch = new CountDownLatch(workerCount);
for (int i = 0; i < workerCount; i++) {
final int workerId = i;
new Thread(() -> {
System.out.println("工人 " + workerId + " 开始工作");
try {
Thread.sleep(1000); // 模拟工作
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("工人 " + workerId + " 完成工作");
latch.countDown(); // 计数减1
}).start();
}
System.out.println("等待所有工人完成...");
latch.await(); // 等待计数归零
System.out.println("所有工作完成,继续执行");
}
}
5.4 CyclicBarrier
CyclicBarrier 让一组线程互相等待,都到达屏障点后再继续执行(可循环使用):
public class CyclicBarrierDemo {
public static void main(String[] args) {
int partyCount = 3;
CyclicBarrier barrier = new CyclicBarrier(partyCount, () -> {
System.out.println("所有人都到齐了,开始下一阶段");
});
for (int i = 0; i < partyCount; i++) {
final int id = i;
new Thread(() -> {
try {
System.out.println("玩家 " + id + " 到达集合点");
barrier.await(); // 等待其他线程
System.out.println("玩家 " + id + " 开始任务");
Thread.sleep(500);
barrier.await(); // 可以重复使用
System.out.println("玩家 " + id + " 完成");
} catch (Exception e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
}
CountDownLatch vs CyclicBarrier
| 特性 | CountDownLatch | CyclicBarrier |
|---|---|---|
| 用途 | 等待其他线程完成 | 多个线程互相等待 |
| 可重用 | 不可重用(计数归零后失效) | 可重用(调用 reset()) |
| 计数方式 | 减计数 | 加计数 |
| 屏障动作 | 不支持 | 支持到达屏障时的回调 |
5.5 Semaphore
Semaphore 控制同时访问某个资源的线程数量:
public class SemaphoreDemo {
// 限流:最多 3 个线程同时访问
private final Semaphore semaphore = new Semaphore(3);
public void accessResource() {
try {
semaphore.acquire(); // 获取许可(阻塞)
// semaphore.acquire(2); // 获取多个许可
try {
System.out.println(Thread.currentThread().getName() + " 正在访问资源");
Thread.sleep(1000);
} finally {
semaphore.release(); // 释放许可
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 尝试获取许可(非阻塞)
public boolean tryAccess() {
return semaphore.tryAcquire();
}
}
5.6 CompletableFuture(Java 8+)
CompletableFuture 是异步编程的利器,支持链式调用和组合操作:
public class CompletableFutureDemo {
public static void main(String[] args) throws Exception {
// 1. 创建异步任务
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Hello";
});
// 2. 链式处理
CompletableFuture<String> result = future
.thenApply(s -> s + " World") // 转换
.thenApply(String::toUpperCase)
.thenApply(s -> s + "!");
System.out.println(result.get()); // HELLO WORLD!
// 3. 异常处理
CompletableFuture<Integer> withError = CompletableFuture
.supplyAsync(() -> {
if (true) throw new RuntimeException("出错了");
return 42;
})
.exceptionally(ex -> {
System.out.println("异常: " + ex.getMessage());
return 0;
});
// 4. 组合多个 Future
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<String> combined = f1.thenCombine(f2, (a, b) -> a + " " + b);
// 5. 等待多个完成
CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2);
all.join(); // 等待全部完成
// 6. 任意一个完成
CompletableFuture<Object> any = CompletableFuture.anyOf(f1, f2);
// 7. 异步回调
CompletableFuture.supplyAsync(() -> "数据")
.thenAcceptAsync(result -> {
System.out.println("处理结果: " + result);
});
}
}
六、线程池
线程池的核心思想是复用线程,避免频繁创建和销毁线程的开销。
6.1 Executor 框架
public class ThreadPoolDemo {
public static void main(String[] args) {
// 1. 固定大小的线程池
ExecutorService fixedPool = Executors.newFixedThreadPool(5);
// 2. 单线程线程池
ExecutorService singlePool = Executors.newSingleThreadExecutor();
// 3. 缓存线程池(可无限扩展)
ExecutorService cachedPool = Executors.newCachedThreadPool();
// 4. 定时任务线程池
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(3);
// 提交任务
fixedPool.submit(() -> System.out.println("执行任务"));
// 定时执行
scheduledPool.schedule(() -> System.out.println("延迟1秒"), 1, TimeUnit.SECONDS);
scheduledPool.scheduleAtFixedRate(() -> System.out.println("每秒执行"),
0, 1, TimeUnit.SECONDS);
// 关闭线程池
fixedPool.shutdown(); // 不再接收新任务,等待已有任务完成
fixedPool.shutdownNow(); // 尝试停止所有正在执行的任务
}
}
6.2 ThreadPoolExecutor 核心参数
Executors 工厂方法底层都是 ThreadPoolExecutor,理解其参数是配置线程池的关键:
public class CustomThreadPool {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // corePoolSize: 核心线程数
5, // maximumPoolSize: 最大线程数
60, // keepAliveTime: 空闲线程存活时间
TimeUnit.SECONDS, // unit: 时间单位
new LinkedBlockingQueue<>(10), // workQueue: 任务队列
Executors.defaultThreadFactory(), // threadFactory: 线程工厂
new ThreadPoolExecutor.AbortPolicy() // handler: 拒绝策略
);
}
}
参数详解:
| 参数 | 说明 |
|---|---|
| corePoolSize | 核心线程数,即使空闲也会保留 |
| maximumPoolSize | 最大线程数,队列满时会创建新线程直到此值 |
| workQueue | 任务队列,当核心线程都在忙时,任务放入队列 |
| keepAliveTime | 非核心线程的空闲存活时间 |
| threadFactory | 创建线程的工厂,可以自定义线程名称 |
| handler | 拒绝策略(队列满且线程数达到最大时) |
任务处理流程:
提交任务
│
▼
┌──────────────┐
│ 核心线程是否 │──是──▶ 核心线程执行
│ 已满? │
└──────┬───────┘
│ 否
▼
┌──────────────┐
│ 任务队列是否 │──是──▶ 加入队列等待
│ 已满? │
└──────┬───────┘
│ 否
▼
┌──────────────┐
│ 总线程数是否 │──是──▶ 创建非核心线程执行
│ 达到最大? │
└──────┬───────┘
│ 否
▼
触发拒绝策略
拒绝策略:
| 策略 | 行为 |
|---|---|
| AbortPolicy | 抛出 RejectedExecutionException(默认) |
| CallerRunsPolicy | 由提交任务的线程执行(减轻线程池压力) |
| DiscardPolicy | 静默丢弃任务 |
| DiscardOldestPolicy | 丢弃队列中最老的任务,然后重试 |
6.3 工作窃取线程池(ForkJoinPool)
ForkJoinPool 采用工作窃取算法,适合递归分解任务:
public class ForkJoinDemo {
// 计算 1 到 1亿 的和
static class SumTask extends RecursiveTask<Long> {
private static final int THRESHOLD = 10000;
private final int start;
private final int end;
SumTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if (end - start <= THRESHOLD) {
// 任务足够小,直接计算
long sum = 0;
for (int i = start; i <= end; i++) {
sum += i;
}
return sum;
} else {
// 分割任务
int mid = (start + end) / 2;
SumTask left = new SumTask(start, mid);
SumTask right = new SumTask(mid + 1, end);
// 并行执行子任务
left.fork();
long rightResult = right.compute();
long leftResult = left.join();
return leftResult + rightResult;
}
}
}
public static void main(String[] args) {
ForkJoinPool pool = ForkJoinPool.commonPool();
SumTask task = new SumTask(1, 100_000_000);
long result = pool.invoke(task);
System.out.println("结果: " + result);
}
}
6.4 线程池最佳实践
1. 不要使用 Executors 默认的线程池
// ❌ 不推荐:队列无限,可能导致 OOM
ExecutorService fixed = Executors.newFixedThreadPool(10);
ExecutorService cached = Executors.newCachedThreadPool();
// ✅ 推荐:自定义参数,明确边界
ThreadPoolExecutor pool = new ThreadPoolExecutor(
10, 20, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1000),
new ThreadPoolExecutor.CallerRunsPolicy()
);
2. 设置合理的线程数
// CPU 密集型任务
int cpuCoreCount = Runtime.getRuntime().availableProcessors();
int cpuIntensiveThreads = cpuCoreCount + 1;
// IO 密集型任务
int ioIntensiveThreads = cpuCoreCount * 2; // 经验值
// 更精确的计算: 线程数 = CPU核心数 / (1 - 阻塞系数)
// 阻塞系数 0.8-0.9 时: 线程数 = 核心数 * (1 / 0.2) = 5倍
3. 优雅关闭线程池
public void gracefulShutdown(ExecutorService pool) {
pool.shutdown(); // 不再接收新任务
try {
// 等待 60 秒让任务完成
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
pool.shutdownNow(); // 超时则强制关闭
// 再等 60 秒让被中断的任务响应
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("线程池未能完全关闭");
}
}
} catch (InterruptedException e) {
pool.shutdownNow();
Thread.currentThread().interrupt();
}
}
4. 监控线程池
public void monitorPool(ThreadPoolExecutor pool) {
ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor();
monitor.scheduleAtFixedRate(() -> {
System.out.println("=== 线程池状态 ===");
System.out.println("核心线程数: " + pool.getCorePoolSize());
System.out.println("活动线程数: " + pool.getActiveCount());
System.out.println("最大线程数: " + pool.getMaximumPoolSize());
System.out.println("队列大小: " + pool.getQueue().size());
System.out.println("已完成任务数: " + pool.getCompletedTaskCount());
System.out.println("总任务数: " + pool.getTaskCount());
}, 0, 5, TimeUnit.SECONDS);
}
七、并发容器
7.1 ConcurrentHashMap
ConcurrentHashMap 是线程安全的 HashMap,采用分段锁(Java 7)或 CAS + synchronized(Java 8+)实现:
public class ConcurrentHashMapDemo {
public static void main(String[] args) {
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// 原子操作
map.putIfAbsent("key", 1); // 不存在才放入
map.computeIfAbsent("key", k -> 2); // 不存在时计算
map.compute("key", (k, v) -> v == null ? 1 : v + 1);
map.merge("key", 1, Integer::sum); // 合并
// 遍历(弱一致性,不抛 ConcurrentModificationException)
map.forEach((k, v) -> System.out.println(k + ": " + v));
// 批量操作
map.forEach(2, (k, v) -> System.out.println(k + ": " + v));
map.search(2, (k, v) -> v > 10 ? k : null);
map.reduce(2, (k, v) -> v, Integer::sum);
}
}
7.2 CopyOnWriteArrayList
适用于读多写少的场景,写操作会复制整个数组:
public class CopyOnWriteArrayListDemo {
// 适合读多写少的场景(如黑名单、配置列表)
private final CopyOnWriteArrayList<String> blacklist = new CopyOnWriteArrayList<>();
public void addBlacklist(String ip) {
blacklist.add(ip); // 复制数组,代价较高
}
public boolean isBlocked(String ip) {
return blacklist.contains(ip); // 无锁读取
}
// 遍历时不会抛 ConcurrentModificationException
public void printAll() {
for (String ip : blacklist) { // 使用快照迭代
System.out.println(ip);
}
}
}
7.3 BlockingQueue
阻塞队列是生产者-消费者模式的核心组件:
public class BlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
// 有界阻塞队列
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
// 无界阻塞队列(可能导致 OOM)
BlockingQueue<Integer> unbounded = new LinkedBlockingQueue<>();
// 优先级阻塞队列
BlockingQueue<Task> priorityQueue = new PriorityBlockingQueue<>();
// 延迟队列
DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();
// 同步队列(容量为 0,用于交接)
SynchronousQueue<String> syncQueue = new SynchronousQueue<>();
// 常用方法
queue.put(1); // 阻塞直到有空间
Integer val = queue.take(); // 阻塞直到有元素
boolean offered = queue.offer(1, 1, TimeUnit.SECONDS); // 限时阻塞
Integer polled = queue.poll(1, TimeUnit.SECONDS); // 限时阻塞
}
static class Task implements Comparable<Task> {
int priority;
@Override
public int compareTo(Task o) {
return Integer.compare(o.priority, this.priority);
}
}
}
八、死锁与问题排查
8.1 死锁示例
public class DeadlockDemo {
private static final Object lockA = new Object();
private static final Object lockB = new Object();
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
synchronized (lockA) {
System.out.println("线程1 持有 lockA");
try { Thread.sleep(100); } catch (InterruptedException e) {}
System.out.println("线程1 等待 lockB");
synchronized (lockB) {
System.out.println("线程1 获取 lockB");
}
}
});
Thread t2 = new Thread(() -> {
synchronized (lockB) {
System.out.println("线程2 持有 lockB");
try { Thread.sleep(100); } catch (InterruptedException e) {}
System.out.println("线程2 等待 lockA");
synchronized (lockA) {
System.out.println("线程2 获取 lockA");
}
}
});
t1.start();
t2.start();
}
}
8.2 死锁的四个必要条件
- 互斥条件:资源一次只能被一个线程占用
- 持有并等待:线程持有资源的同时等待其他资源
- 不可剥夺:已分配的资源不能被强制剥夺
- 循环等待:多个线程形成循环等待链
8.3 避免死锁的方法
// 方法1:固定加锁顺序
public class OrderedLock {
private static final Object lockA = new Object();
private static final Object lockB = new Object();
private static final int HASH_A = System.identityHashCode(lockA);
private static final int HASH_B = System.identityHashCode(lockB);
public void safeMethod() {
// 总是按 hashCode 顺序加锁
Object first = HASH_A < HASH_B ? lockA : lockB;
Object second = HASH_A < HASH_B ? lockB : lockA;
synchronized (first) {
synchronized (second) {
// 业务逻辑
}
}
}
}
// 方法2:使用 tryLock 超时
public class TryLockSolution {
private final ReentrantLock lockA = new ReentrantLock();
private final ReentrantLock lockB = new ReentrantLock();
public void safeMethod() throws InterruptedException {
while (true) {
if (lockA.tryLock(1, TimeUnit.SECONDS)) {
try {
if (lockB.tryLock(1, TimeUnit.SECONDS)) {
try {
// 业务逻辑
return;
} finally {
lockB.unlock();
}
}
} finally {
lockA.unlock();
}
}
// 获取失败,稍后重试
Thread.sleep(10);
}
}
}
8.4 问题排查工具
1. jstack 查看线程栈
# 获取 Java 进程 PID
jps -l
# 打印线程栈
jstack <pid>
# 检测死锁
jstack -l <pid> | grep "deadlock"
2. 代码层面检测死锁
public class DeadlockDetector {
public static void detectDeadlock() {
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
long[] deadlockedThreads = threadBean.findDeadlockedThreads();
if (deadlockedThreads != null) {
System.out.println("发现死锁线程:");
for (long id : deadlockedThreads) {
ThreadInfo info = threadBean.getThreadInfo(id);
System.out.println("线程: " + info.getThreadName());
for (StackTraceElement element : info.getStackTrace()) {
System.out.println(" " + element);
}
}
}
}
}
九、性能优化实践
9.1 减少锁的粒度
// ❌ 粗粒度锁
public class CoarseLock {
private final List<String> list1 = new ArrayList<>();
private final List<String> list2 = new ArrayList<>();
public synchronized void addToList1(String item) {
list1.add(item);
}
public synchronized void addToList2(String item) {
list2.add(item);
}
}
// ✅ 细粒度锁
public class FineGrainedLock {
private final List<String> list1 = new ArrayList<>();
private final List<String> list2 = new ArrayList<>();
private final Object lock1 = new Object();
private final Object lock2 = new Object();
public void addToList1(String item) {
synchronized (lock1) {
list1.add(item);
}
}
public void addToList2(String item) {
synchronized (lock2) {
list2.add(item);
}
}
}
9.2 锁分离
public class LockSeparation {
// 读写分离
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
// 分段锁(如 ConcurrentHashMap 的实现思想)
private final Object[] segments;
public LockSeparation(int concurrencyLevel) {
segments = new Object[concurrencyLevel];
for (int i = 0; i < concurrencyLevel; i++) {
segments[i] = new Object();
}
}
private int getSegment(Object key) {
return Math.abs(key.hashCode() % segments.length);
}
public void put(Object key, Object value) {
synchronized (segments[getSegment(key)]) {
// 只锁一个分段
}
}
}
9.3 使用无锁数据结构
public class LockFreeDemo {
// 无锁栈
private final AtomicReference<Node> top = new AtomicReference<>();
static class Node {
int value;
Node next;
Node(int value) { this.value = value; }
}
public void push(int value) {
Node newNode = new Node(value);
Node oldTop;
do {
oldTop = top.get();
newNode.next = oldTop;
} while (!top.compareAndSet(oldTop, newNode));
}
public int pop() {
Node oldTop;
Node newTop;
do {
oldTop = top.get();
if (oldTop == null) throw new IllegalStateException("Empty");
newTop = oldTop.next;
} while (!top.compareAndSet(oldTop, newTop));
return oldTop.value;
}
}
十、完整实战:并发任务执行器
综合以上知识,构建一个支持超时、重试、并发控制的通用任务执行器:
public class ConcurrentTaskExecutor {
private final ThreadPoolExecutor executor;
private final int maxRetries;
public ConcurrentTaskExecutor(int coreThreads, int maxThreads, int maxRetries) {
this.maxRetries = maxRetries;
this.executor = new ThreadPoolExecutor(
coreThreads, maxThreads, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new ThreadFactoryBuilder().setNameFormat("task-%d").build(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
}
public <T> CompletableFuture<T> submit(Callable<T> task, long timeout, TimeUnit unit) {
CompletableFuture<T> future = new CompletableFuture<>();
executor.submit(() -> {
try {
T result = executeWithRetry(task, maxRetries);
future.complete(result);
} catch (Exception e) {
future.completeExceptionally(e);
}
});
// 超时处理
ScheduledExecutorService timeoutMonitor = Executors.newSingleThreadScheduledExecutor();
timeoutMonitor.schedule(() -> {
if (!future.isDone()) {
future.completeExceptionally(new TimeoutException("任务超时"));
}
timeoutMonitor.shutdown();
}, timeout, unit);
return future;
}
private <T> T executeWithRetry(Callable<T> task, int retries) throws Exception {
for (int i = 0; i <= retries; i++) {
try {
return task.call();
} catch (Exception e) {
if (i == retries) {
throw e;
}
System.out.println("任务失败,第 " + (i + 1) + " 次重试");
Thread.sleep((long) (Math.pow(2, i) * 100)); // 指数退避
}
}
throw new RuntimeException("不可达");
}
public void shutdown() {
executor.shutdown();
try {
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
// 使用示例
public static void main(String[] args) throws Exception {
ConcurrentTaskExecutor executor = new ConcurrentTaskExecutor(5, 10, 3);
List<CompletableFuture<String>> futures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
final int taskId = i;
CompletableFuture<String> future = executor.submit(() -> {
Thread.sleep(500);
return "Task " + taskId + " completed";
}, 5, TimeUnit.SECONDS);
futures.add(future);
}
// 等待所有任务完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenRun(() -> System.out.println("所有任务完成"));
// 获取结果
for (CompletableFuture<String> future : futures) {
System.out.println(future.get());
}
executor.shutdown();
}
}
总结
| 场景 | 推荐方案 |
|---|---|
| 简单同步 | synchronized |
| 需要超时/中断 | ReentrantLock |
| 读多写少 | ReadWriteLock / StampedLock |
| 线程本地变量 | ThreadLocal |
| 原子操作 | AtomicXXX / LongAdder |
| 异步编程 | CompletableFuture |
| 线程池 | ThreadPoolExecutor(自定义参数) |
| 并发 Map | ConcurrentHashMap |
| 读多写少的 List | CopyOnWriteArrayList |
| 生产者-消费者 | BlockingQueue |
| 等待多个任务 | CountDownLatch |
| 线程互相等待 | CyclicBarrier |
| 限流 | Semaphore |
| 递归分解任务 | ForkJoinPool |
核心原则:
- 优先使用
java.util.concurrent包的工具类 - 能不用锁就不用锁,能用原子类就用原子类
- 必须用锁时,尽量减小锁粒度
- 始终注意可见性问题(volatile、synchronized)
- 正确管理线程池资源,优雅关闭
- 使用 ThreadLocal 后务必 remove()
多线程编程是一门实践性很强的技术,唯有通过大量的编码和调试,才能真正掌握其中的精髓。希望本文能为你打下坚实的基础!