一、非阻塞&阻塞算法
非阻塞算法允许线程访问共享状态(或以其他方式协作或通信)而不阻塞线程的算法。 或者说,如果一个线程的暂停不导致算法中涉及的其他线程的暂停,则称算法是非阻塞的。
阻塞并发算法:
- 线程执行,或者
- 线程阻塞,直到可以安全地执行
非阻塞并发算法:
- 线程执行,或者
- 通知要执行的线程,无法执行
阻塞并发算法对应的数据结构被称为为阻塞并发数据结构,例如BlockingQueue。而非阻塞并发算法对应的数据结构被称为为非阻塞的并发数据结构,例如AtomicBoolean, AtomicInteger, AtomicLong等。
非阻塞算法优点
1、选择性
当线程无法成功执行时,可以选择进行其他操作,也可以选择自我阻塞,释放cpu,而不是只能阻塞。
2、没有死锁
当线程无法成功执行时,进行自我阻塞,不会造成其他线程的阻塞。当无法成功执行时,两个线程不会相互阻塞,所以非阻塞算法不会造成死锁。
3、没有线程挂起
线程挂起和恢复是十分昂贵的。在非阻塞算法中,线程不会因为无法成功执行而挂起。
4、更低的线程延迟
此处的线程延迟是指,提交请求和线程执行请求处理的经过的时间。因为非阻塞算法通常是通过忙等待来降低延迟。不过忙等待会消耗cpu。
二、非阻塞并发数据结构
1、Volatile 变量
Volatile变量是非阻塞的,Volatile变量的读写均直接从主内存进行读写。
如果只有一个且同一个线程对Volatile变量进行更新的话,就可以不需要加同步控制。
public class SingleWriterCounter {
private volatile long count = 0;
/**
* Only one thread may ever call this method,
* or it will lead to race conditions.
*/
public void inc() {
this.count++;
}
/**
* Many reading threads may call this method
* @return
*/
public long count() {
return this.count;
}
}
public class DoubleWriterCounter {
private volatile long countA = 0;
private volatile long countB = 0;
/**
* Only one (and the same from thereon) thread may ever call this method,
* or it will lead to race conditions.
*/
public void incA() { this.countA++; }
/**
* Only one (and the same from thereon) thread may ever call this method,
* or it will lead to race conditions.
*/
public void incB() { this.countB++; }
/**
* Many reading threads may call this method
*/
public long countA() { return this.countA; }
/**
* Many reading threads may call this method
*/
public long countB() { return this.countB; }
}
countA和countB可以用于分别表示产生的任务数和消费的任务数,DoubleWriterCounter可以用于生产者和消费者两个线程通信。
2、乐观锁
传统的锁,包括synchronized同步块和lock均是悲观锁,悲观锁会导致线程阻塞。
乐观锁假设没有其他线程对资源进行更新,如果假设是真的,那么就就可以成功更新资源,否则失败。
乐观锁会创建一份共享资源的拷贝,然后依据拷贝的资源(作为期待的资源),去更新原资源,如果更新失败,则需要重新获取一份原资源的拷贝,然后继续先前操作。更新原资源是通过compare-and-swap原子操作进行更新,只有原资源等于期待的资源,才能够更新成功。
乐观锁适用于低竞争的场景,不然会浪费大量的cpu时间去拷贝、修改以及失败的更新操作上。
当然,如果竞争过高,需要考虑一下重构代码,以降低竞争,不然只能用悲观锁。
import java.util.concurrent.atomic.AtomicLong;
public class AtomicCounter {
private AtomicLong count = new AtomicLong(0);
public void inc() {
boolean updated = false;
while(!updated){
long prevCount = this.count.get();
updated = this.count.compareAndSet(prevCount, prevCount + 1);
}
}
public long count() {
return this.count.get();
}
}
如果共享资源不能直接进行CAS(compare-and-swap)的时候,例如当共享的资源是一个集合,可以通过创建一个更改的资源对象,用该对象进行CAS。示意图如下:
以下模板不一定正确
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicStampedReference;
public class NonblockingTemplate {
public static class IntendedModification {
public AtomicBoolean completed =
new AtomicBoolean(false);
}
private AtomicStampedReference<IntendedModification>
ongoingMod =
new AtomicStampedReference<IntendedModification>(null, 0);
//declare the state of the data structure here.
public void modify() {
while(!attemptModifyASR());
}
public boolean attemptModifyASR(){
boolean modified = false;
IntendedModification currentlyOngoingMod =
ongoingMod.getReference();
int stamp = ongoingMod.getStamp();
if(currentlyOngoingMod == null){
//copy data structure state - for use
//in intended modification
//prepare intended modification
IntendedModification newMod =
new IntendedModification();
boolean modSubmitted =
ongoingMod.compareAndSet(null, newMod, stamp, stamp + 1);
if(modSubmitted){
//complete modification via a series of compare-and-swap operations.
//note: other threads may assist in completing the compare-and-swap
// operations, so some CAS may fail
modified = true;
}
} else {
//attempt to complete ongoing modification, so the data structure is freed up
//to allow access from this thread.
modified = false;
}
return modified;
}
}