Java线程相关方法之LockSupport篇

isen
isen
发布于 2023-09-25 / 38 阅读 / 0 评论 / 0 点赞

Java线程相关方法之LockSupport篇

参考:

一、简介

LockSupport 提供创建锁和其他同步类的基本线程阻塞原语,实际使用 UNSAFE 相关方法实现。

LockSupport 与使用它的每一个线程关联一个 permit。调用 park 方法,如果 permit 可用,则立马返回,同时消费掉 permit,否则可能阻塞。调用 unpark 方法,如果 permit 不可用,那么变为可用,可用的 permit 最多只有一个,多次调用 unpark 方法,也只有一个 可用 permit。

LockSupport 的方法 park 和 unpark 的分别用于阻塞线程和解除阻塞线程,并且没有导致 Thread.suspend 和 Thread.resume 被废弃的死锁问题。由于许可的存在,调用 park 的线程和另一个试图将其 unpark 的线程之间的竞争将保持活性。pack 方法会因为 interrupt,或者超时,或者"no reason"而返回,所以通常是将 park 放在一个重复检查条件的循环中。从这个意义上讲,park 可以作为“忙碌等待”的优化,不会浪费太多时间旋转,但是必须与 unpark 配对使用,才有效。

函数中的 blocker 用于记录线程阻塞的原因,或者说是被谁阻塞,作为线程阻塞的额外信息,用于提示线程阻塞的原因。在调试获取线程信息时,可以看到 blocker 信息。

park/unpark 与 wait/notify 区别

  1. park/unpark 直接操作线程,而 wait/notify 通过对象锁操作线程。
  2. park/unpark 不需要 synchronized,即不需要获取对象锁,并且 park和unpark 的先后顺序没有绝对要求。
  3. 线程 park 后处于(timed) waiting 状态无法由 notify 唤醒。即 park/unpark 与 wait/notify 无法交叉使用。
  4. park 与 wait 都会使线程变为 (timed) waiting 状态。
  5. 线程 park 后处于(timed) waiting 状态,使用 interrupt 不会抛出 InterruptedException 异常(属于 interrupt 的第4种情况)。

二、示例

1、简单使用

public class LockSupportDemo {
	private static volatile boolean isOk = false;

	static class MyThread extends Thread{
		Thread unparkedThread;
		MyThread(Thread unparkedThread){
			this.unparkedThread = unparkedThread;
		}

		@Override
		public void run() {
			try {
				Thread.sleep(3000000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			isOk = true;
			System.out.println(Thread.currentThread().getName() + " unblock main thread, and set isOk=true");
			LockSupport.unpark(unparkedThread);
		}
	}

	public static void main(String[] args) {
		isOk = false;

		Thread thread = new MyThread(Thread.currentThread());
		thread.start();

		System.out.println(Thread.currentThread().getName() + " block");
		// 主线程阻塞
		while(!isOk){
			//循环是防止因为interrupt和不明原因醒来
			//如果是interrupt,不会抛InterruptedException,需要通过Thread.interrupted() 或者 isInterrupted()获取中断状态
			LockSupport.park(isOk);
			//不推荐使用
//			LockSupport.park();
		}

		System.out.println(Thread.currentThread().getName() + " unblock");
	}
}

通过 jstack 可以看出 main 线程阻塞信息,其中包括了"- parking to wait for <0x000000076b37c1f8> (a java.lang.Boolean)" 的 blocker 信息,0x000000076b37c1f8 是 blocker 对象的地址

2、先进先出非重入锁的类框架

class FIFOMutex {
	private final AtomicBoolean locked = new AtomicBoolean(false);
	private final Queue<Thread> waiters = new ConcurrentLinkedQueue<Thread>();

	public void lock() {
		boolean wasInterrupted = false;
		Thread current = Thread.currentThread();
		waiters.add(current);

		// Block while not first in queue or cannot acquire lock
		while (waiters.peek() != current || !locked.compareAndSet(false, true)) {
			LockSupport.park(this);
			if (Thread.interrupted()) // ignore interrupts while waiting
				wasInterrupted = true;
		}

		waiters.remove();
		if (wasInterrupted)          // reassert interrupt status on exit
			current.interrupt();
	}

	public void unlock() {
		locked.set(false);
		LockSupport.unpark(waiters.peek());
	}
}

三、源码

public class LockSupport {

    private static final sun.misc.Unsafe UNSAFE;
    //Thread中的parkBlocker相对Thread对象的偏移量
    private static final long parkBlockerOffset;

    /**
     * Disables the current thread for thread scheduling purposes unless the
     * permit is available.
     *
     * <p>If the permit is available then it is consumed and the call returns
     * immediately; otherwise
     * the current thread becomes disabled for thread scheduling
     * purposes and lies dormant until one of three things happens:
     *
     * <ul>
     * <li>Some other thread invokes {@link #unpark unpark} with the
     * current thread as the target; or
     *
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread; or
     *
     * <li>The call spuriously (that is, for no reason) returns.
     * </ul>
     *
     * <p>This method does <em>not</em> report which of these caused the
     * method to return. Callers should re-check the conditions which caused
     * the thread to park in the first place. Callers may also determine,
     * for example, the interrupt status of the thread upon return.
     *
     * @param blocker the synchronization object responsible for this
     *        thread parking
     * @since 1.6
     */
//带有blocker的park版本========================================================
    public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(false, 0L);
        setBlocker(t, null);
    }
    
    public static void parkNanos(Object blocker, long nanos) {
        if (nanos > 0) {
            Thread t = Thread.currentThread();
            setBlocker(t, blocker);
            UNSAFE.park(false, nanos);
            setBlocker(t, null);
        }
    }
    
    public static void parkUntil(Object blocker, long deadline) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(true, deadline);
        setBlocker(t, null);
    }
    
//没有带blocker的park版本=======================================================
    public static void park() {
        UNSAFE.park(false, 0L);
    }
    
    public static void parkNanos(long nanos) {
        if (nanos > 0)
            UNSAFE.park(false, nanos);
    }
    
    public static void parkUntil(long deadline) {
        UNSAFE.park(true, deadline);
    }
    
    /**
     * Makes available the permit for the given thread, if it
     * was not already available.  If the thread was blocked on
     * {@code park} then it will unblock.  Otherwise, its next call
     * to {@code park} is guaranteed not to block. This operation
     * is not guaranteed to have any effect at all if the given
     * thread has not been started.
     *
     * @param thread the thread to unpark, or {@code null}, in which case
     *        this operation has no effect
     */
    public static void unpark(Thread thread) {
        if (thread != null)
            UNSAFE.unpark(thread);
    }
    
    //设置 t 中的 parkBlocker 为 arg
    private static void setBlocker(Thread t, Object arg) {
        // Even though volatile, hotspot doesn't need a write barrier here.
        UNSAFE.putObject(t, parkBlockerOffset, arg);
    }
    
    /**
     * Returns the blocker object supplied to the most recent
     * invocation of a park method that has not yet unblocked, or null
     * if not blocked.  The value returned is just a momentary
     * snapshot -- the thread may have since unblocked or blocked on a
     * different blocker object.
     *
     * @param t the thread
     * @return the blocker
     * @throws NullPointerException if argument is null
     * @since 1.6
     */
    //获取 t 中的 parkBlocker
    public static Object getBlocker(Thread t) {
        if (t == null)
            throw new NullPointerException();
        return UNSAFE.getObjectVolatile(t, parkBlockerOffset);
    }
    
    
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> tk = Thread.class;
            //获取 parkBlockerOffset
            parkBlockerOffset = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("parkBlocker"));
            //用于ThreadLocalRandom类进行随机数生成,它比Random性能要高的多,目前没有使用
            SEED = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("threadLocalRandomSeed"));
            PROBE = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("threadLocalRandomProbe"));
            SECONDARY = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("threadLocalRandomSecondarySeed"));
        } catch (Exception ex) { throw new Error(ex); }
    }
    
}

public class Thread implements Runnable {
    /**
     * The argument supplied to the current call to
     * java.util.concurrent.locks.LockSupport.park.
     * Set by (private) java.util.concurrent.locks.LockSupport.setBlocker
     * Accessed using java.util.concurrent.locks.LockSupport.getBlocker
     */
    volatile Object parkBlocker;
}