java 并发——线程

前一篇文章总结了对 java 并发中的内置锁的理解,这篇文章来说说线程并发线程总有剪不断理还乱的关系。关于 java 线程的基本概念、线程与进程的关系以及如何创建线程,想必大家都很清楚了。之前总结过,存疑新同学的传送门:Java 多线程

二、线程框架 线程的三种创建方式:

我们知道,java 线程的三种创建方式:

继承自 Thread 类创建线程;

new Thread(){ @Override public void run() {; } }.start()


new Thread(new Runnable() { @Override public void run() { } }).start();

使用 Callable 和 Future 创建线程

new Thread(new FutureTask<Integer>(new Callable<Integer>() { @Override public Integer call() throws Exception { return null; } }) ).start(); 看源码

先看 Runnable 接口源码:

@FunctionalInterface public interface Runnable { /** * When an object implementing interface <code>Runnable</code> is used * to create a thread, starting the thread causes the object's * <code>run</code> method to be called in that separately executing * thread. * <p> * The general contract of the method <code>run</code> is that it may * take any action whatsoever. * * @see java.lang.Thread#run() */ public abstract void run(); }

函数式接口,只有一个 run() 方法。
再看 Thread 类源码,我们发现,Thread 类实现了 Runnable 接口:

public class Thread implements Runnable { ... /* What will be run. */ private Runnable target; ... /* Java thread status for tools, * initialized to indicate thread 'not yet started' */ private volatile int threadStatus = 0; ... /** * Causes this thread to begin execution; the Java Virtual Machine * calls the <code>run</code> method of this thread. * <p> * The result is that two threads are running concurrently: the * current thread (which returns from the call to the * <code>start</code> method) and the other thread (which executes its * <code>run</code> method). * <p> * It is never legal to start a thread more than once. * In particular, a thread may not be restarted once it has completed * execution. * * @exception IllegalThreadStateException if the thread was already * started. * @see #run() * @see #stop() */ public synchronized void start() { /** * This method is not invoked for the main method thread or "system" * group threads created/set up by the VM. Any new functionality added * to this method in the future may have to also be added to the VM. * * A zero status value corresponds to state "NEW". */ if (threadStatus != 0) throw new IllegalThreadStateException(); /* Notify the group that this thread is about to be started * so that it can be added to the group's list of threads * and the group's unstarted count can be decremented. */ group.add(this); boolean started = false; try { start0(); started = true; } finally { try { if (!started) { group.threadStartFailed(this); } } catch (Throwable ignore) { /* do nothing. If start0 threw a Throwable then it will be passed up the call stack */ } } } @Override public void run() { if (target != null) {; } }

我们看到,Thread 类有一个 Runnable 类型的 target 域。Thread 的 run 方法就是调用的 target 的 run 方法。而启动线程则需要调用线程的 start 方法。在 Thread 类中,还有一个 volatile 修饰的 threadStatus 域,用来表示线程的状态,初始值为0,当我们重复调用线程的 start 方法时,会抛出 java.lang.IllegalThreadStateException 的异常。

当我们需要获取线程中方法执行的返回值时,使用 FutureTask 和 Callable 的方式创建。看 Thread 源码可知,Thread 类构造方法可传入 Runnable 对象,方式三,这里传入 FutureTask 对象,可以猜想: FutureTask 一定是实现了 Runnable 接口。而 FutureTask 的构造方法又传入了 Callable 对象,我们重写了 call 方法。我们看看相关类的源码,梳理一下。


@FunctionalInterface public interface Runnable { public abstract void run(); }


public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }


@FunctionalInterface public interface Callable<V> { V call() throws Exception; }


public interface RunnableFuture<V> extends Runnable, Future<V> { /** * Sets this Future to the result of its computation * unless it has been cancelled. */ void run(); }


public class FutureTask<V> implements RunnableFuture<V> { ... /** The underlying callable; nulled out after running */ private Callable<V> callable; ... /** * Creates a {@code FutureTask} that will, upon running, execute the * given {@code Callable}. * * @param callable the callable task * @throws NullPointerException if the callable is null */ public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } /** * Creates a {@code FutureTask} that will, upon running, execute the * given {@code Runnable}, and arrange that {@code get} will return the * given result on successful completion. * * @param runnable the runnable task * @param result the result to return on successful completion. If * you don't need a particular result, consider using * constructions of the form: * {@code Future<?> f = new FutureTask<Void>(runnable, null)} * @throws NullPointerException if the runnable is null */ public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable } public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result =; ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } /** * Executes the computation without setting its result, and then * resets this future to initial state, failing to do so if the * computation encounters an exception or is cancelled. This is * designed for use with tasks that intrinsically execute more * than once. * * @return {@code true} if successfully run and reset */ protected boolean runAndReset() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return false; boolean ran = false; int s = state; try { Callable<V> c = callable; if (c != null && s == NEW) { try {; // don't set result ran = true; } catch (Throwable ex) { setException(ex); } } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } return ran && s == NEW; } }

