Java非阻塞的并发算法

isen
isen
发布于 2023-09-26 / 85 阅读 / 0 评论 / 0 点赞

Java非阻塞的并发算法

Non-blocking Algorithms

一、非阻塞&阻塞算法

非阻塞算法允许线程访问共享状态(或以其他方式协作或通信)而不阻塞线程的算法。 或者说,如果一个线程的暂停不导致算法中涉及的其他线程的暂停,则称算法是非阻塞的。

阻塞并发算法:

  • 线程执行,或者
  • 线程阻塞,直到可以安全地执行

image-nicm.png

非阻塞并发算法:

  • 线程执行,或者
  • 通知要执行的线程,无法执行

image-vrpu.png

阻塞并发算法对应的数据结构被称为为阻塞并发数据结构,例如BlockingQueue。而非阻塞并发算法对应的数据结构被称为为非阻塞的并发数据结构,例如AtomicBoolean, AtomicInteger, AtomicLong等。

非阻塞算法优点

1、选择性

当线程无法成功执行时,可以选择进行其他操作,也可以选择自我阻塞,释放cpu,而不是只能阻塞。

2、没有死锁

当线程无法成功执行时,进行自我阻塞,不会造成其他线程的阻塞。当无法成功执行时,两个线程不会相互阻塞,所以非阻塞算法不会造成死锁。

3、没有线程挂起

线程挂起和恢复是十分昂贵的。在非阻塞算法中,线程不会因为无法成功执行而挂起。

4、更低的线程延迟

此处的线程延迟是指,提交请求和线程执行请求处理的经过的时间。因为非阻塞算法通常是通过忙等待来降低延迟。不过忙等待会消耗cpu。

二、非阻塞并发数据结构

1、Volatile 变量

Volatile变量是非阻塞的,Volatile变量的读写均直接从主内存进行读写。

如果只有一个且同一个线程对Volatile变量进行更新的话,就可以不需要加同步控制。

public class SingleWriterCounter {

    private volatile long count = 0;

    /**
     * Only one thread may ever call this method,
     * or it will lead to race conditions.
     */
    public void inc() {
        this.count++;
    }


    /**
     * Many reading threads may call this method
     * @return
     */
    public long count() {
        return this.count;
    }
}
public class DoubleWriterCounter {

    private volatile long countA = 0;
    private volatile long countB = 0;

    /**
     * Only one (and the same from thereon) thread may ever call this method,
     * or it will lead to race conditions.
     */
    public void incA() { this.countA++;  }


    /**
     * Only one (and the same from thereon) thread may ever call this method,
     * or it will lead to race conditions.
     */
    public void incB() { this.countB++;  }


    /**
     * Many reading threads may call this method
     */
    public long countA() { return this.countA; }


    /**
     * Many reading threads may call this method
     */
    public long countB() { return this.countB; }
}

countA和countB可以用于分别表示产生的任务数和消费的任务数,DoubleWriterCounter可以用于生产者和消费者两个线程通信。

2、乐观锁

传统的锁,包括synchronized同步块和lock均是悲观锁,悲观锁会导致线程阻塞。

乐观锁假设没有其他线程对资源进行更新,如果假设是真的,那么就就可以成功更新资源,否则失败。

乐观锁会创建一份共享资源的拷贝,然后依据拷贝的资源(作为期待的资源),去更新原资源,如果更新失败,则需要重新获取一份原资源的拷贝,然后继续先前操作。更新原资源是通过compare-and-swap原子操作进行更新,只有原资源等于期待的资源,才能够更新成功。

乐观锁适用于低竞争的场景,不然会浪费大量的cpu时间去拷贝、修改以及失败的更新操作上。
当然,如果竞争过高,需要考虑一下重构代码,以降低竞争,不然只能用悲观锁。

import java.util.concurrent.atomic.AtomicLong;

public class AtomicCounter {
    private AtomicLong count = new AtomicLong(0);

    public void inc() {
        boolean updated = false;
        while(!updated){
            long prevCount = this.count.get();
            updated = this.count.compareAndSet(prevCount, prevCount + 1);
        }
    }

    public long count() {
        return this.count.get();
    }
}

如果共享资源不能直接进行CAS(compare-and-swap)的时候,例如当共享的资源是一个集合,可以通过创建一个更改的资源对象,用该对象进行CAS。示意图如下:

image-vpcm.png

以下模板不一定正确

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicStampedReference;

public class NonblockingTemplate {

    public static class IntendedModification {
        public AtomicBoolean completed =
                new AtomicBoolean(false);
    }

    private AtomicStampedReference<IntendedModification>
        ongoingMod =
            new AtomicStampedReference<IntendedModification>(null, 0);

    //declare the state of the data structure here.


    public void modify() {
        while(!attemptModifyASR());
    }

    public boolean attemptModifyASR(){

        boolean modified = false;
    
        IntendedModification currentlyOngoingMod =
        ongoingMod.getReference();
        int stamp = ongoingMod.getStamp();
    
        if(currentlyOngoingMod == null){
            //copy data structure state - for use
            //in intended modification
        
            //prepare intended modification
            IntendedModification newMod =
            new IntendedModification();
        
            boolean modSubmitted = 
                ongoingMod.compareAndSet(null, newMod, stamp, stamp + 1);
        
            if(modSubmitted){
            
                //complete modification via a series of compare-and-swap operations.
                //note: other threads may assist in completing the compare-and-swap
                // operations, so some CAS may fail
            
                modified = true;
            }
    
        } else {
            //attempt to complete ongoing modification, so the data structure is freed up
            //to allow access from this thread.
        
            modified = false;
        }
    
        return modified;
    }
}