国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

Java SDK 并發(fā)包全面總結(jié)

luckyyulin / 2029人閱讀

摘要:一和并發(fā)包中的和主要解決的是線程的互斥和同步問題,這兩者的配合使用,相當(dāng)于的使用。寫鎖與讀鎖之間互斥,一個(gè)線程在寫時(shí),不允許讀操作。的注意事項(xiàng)不支持重入,即不可反復(fù)獲取同一把鎖。沒有返回值,也就是說無法獲取執(zhí)行結(jié)果。

一、Lock 和 Condition

Java 并發(fā)包中的 Lock 和 Condition 主要解決的是線程的互斥和同步問題,這兩者的配合使用,相當(dāng)于 synchronized、wait()、notify() 的使用。

1. Lock 的優(yōu)勢

比起傳統(tǒng)的 synchronized 關(guān)鍵字,Lock 最大的不同(或者說優(yōu)勢)在于:

阻塞的線程能夠響應(yīng)中斷,這樣能夠有機(jī)會(huì)釋放自己持有的鎖,避免死鎖

支持超時(shí),如果線程在一定時(shí)間內(nèi)未獲取到鎖,不是進(jìn)入阻塞狀態(tài),而是拋出異常

非阻塞的獲取鎖,如果未獲取到鎖,不進(jìn)入阻塞狀態(tài),而是直接返回

三種情況分別對(duì)應(yīng) Lock 的三個(gè)方法:void lockInterruptibly()boolean tryLock(long time, TimeUnit unit)boolean tryLock()

Lock 最常用的一個(gè)實(shí)現(xiàn)類是 ReentrantLock,代表可重入鎖,意思是可以反復(fù)獲取同一把鎖。
除此之外,Lock 的構(gòu)造方法可以傳入一個(gè) boolean 值,表示是否是公平鎖。

2. Lock 和 Condition 的使用

前面實(shí)現(xiàn)的簡單的阻塞隊(duì)列就是使用 Lock 和 Condition ,現(xiàn)在其含義已經(jīng)非常明確了:

public class BlockingQueue {
    private int capacity;
    private int size;

    //定義鎖和條件
    private final Lock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();

    /**
     * 入隊(duì)列
     */
    public void enqueue(T data){
        lock.lock();
        try {
            //如果隊(duì)列滿了,需要等待,直到隊(duì)列不滿
            while (size >= capacity){
                notFull.await();
            }
            //入隊(duì)代碼,省略
            //入隊(duì)之后,通知隊(duì)列已經(jīng)不為空了
            notEmpty.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //在finally塊中釋放鎖,避免死鎖
            lock.unlock();
        }
    }

    /**
     * 出隊(duì)列
     */
    public T dequeue(){
        lock.lock();
        try {
            //如果隊(duì)列為空,需要等待,直到隊(duì)列不為空
            while (size <= 0){
                notEmpty.await();
            }
            //出隊(duì)代碼,省略
            //出隊(duì)列之后,通知隊(duì)列已經(jīng)不滿了
            notFull.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        //實(shí)際應(yīng)該返回出隊(duì)數(shù)據(jù)
        return null;
    }
}

可以看到,Lock 需要手動(dòng)的加鎖和解鎖,并且解鎖操作是放在 finally 塊中的,這是一種編程范式,盡量遵守。

二、ReadWriteLock

ReadWriteLock 表示讀寫鎖,適用于讀多寫少的情況,讀寫鎖一般有幾個(gè)特征:

讀鎖與讀鎖之間不互斥,即允許多個(gè)線程同時(shí)讀變量。

寫鎖與讀鎖之間互斥,一個(gè)線程在寫時(shí),不允許讀操作。

寫鎖與寫鎖之間互斥,只允許 一個(gè)線程寫操作。

讀寫鎖減小了鎖的粒度,在讀多寫少的場景下,對(duì)性能的提升較為明顯。ReadWriteLock 的簡單使用示例如下:

public class ReadWriteLockTest {

    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final Lock readLock =lock.readLock();
    private final Lock writeLock =lock.writeLock();

    private int value;

    //加寫鎖
    private void addValue(){
        writeLock.lock();
        try {
            value += 1;
        }
        finally {
            writeLock.unlock();
        }
    }

    //加讀鎖
    private int getValue(){
        readLock.lock();
        try {
            return value;
        }
        finally {
            readLock.unlock();
        }
    }
}

讀寫鎖的升級(jí)與降級(jí)

Java 中不允許鎖的升級(jí),即加寫鎖時(shí)必須釋放讀鎖。

但是允許鎖的降級(jí),即加讀鎖時(shí),可以不釋放寫鎖,最后讀鎖和寫鎖一起釋放。

三、StampedLock 1. StampedLock 的使用及特點(diǎn)

StampedLock 是 Java 1.8 版本中提供的鎖,主要支持三種鎖模式:寫鎖、悲觀讀鎖、樂觀讀。

其中寫鎖和悲觀讀鎖跟 ReadWriteLock 中的寫鎖和讀鎖的概念類似。StampedLock 在使用的時(shí)候不一樣,加鎖的時(shí)候會(huì)返回一個(gè)參數(shù),解鎖的時(shí)候需要傳入這個(gè)參數(shù),示例如下:

public class StampedLockTest {

    private final StampedLock lock = new StampedLock();
    private int value;

    private void addValue(){
        long stamp = lock.writeLock();
        try {
            value += 1;
        }
        finally {
            lock.unlockWrite(stamp);
        }
    }
}

StampedLock 最主要的特點(diǎn)是支持“樂觀讀”,即當(dāng)進(jìn)行讀操作的時(shí)候,并不是所有的寫操作都被阻塞,允許一個(gè)線程獲取寫鎖。樂觀讀的使用示例如下:

public class StampedLockTest {

    private final StampedLock lock = new StampedLock();
    private int value;

    private void getValue(){
        //樂觀讀,讀入變量
        long stamp = lock.tryOptimisticRead();
        int a = value;
        //如果驗(yàn)證失敗
        if (!lock.validate(stamp)){
            //升級(jí)為悲觀讀鎖,繼續(xù)讀入變量
            stamp = lock.readLock();
            try {
                a = value;
            }
            finally {
                lock.unlockRead(stamp);
            }
        }
    }
}

需要注意的是,這里使用 validate() 方法進(jìn)行驗(yàn)證,如果樂觀讀失敗,則升級(jí)為悲觀讀鎖,繼續(xù)獲取變量。

2. StampedLock 的注意事項(xiàng)

StampedLock 不支持重入,即不可反復(fù)獲取同一把鎖。

在使用 StampedLock 的時(shí)候,不要調(diào)用中斷操作。如果需要支持中斷,可以調(diào)用 readLockInterruptibly 和 writeLockInterruptibly 方法。

四、Semaphore

Semaphore 表示信號(hào)量,初始化對(duì)象的時(shí)候,需要傳一個(gè)參數(shù),表示信號(hào)量的計(jì)數(shù)器值。acquire() 方法將計(jì)數(shù)器加 1,release() 方法減 1,這兩個(gè)方法都能夠保證原子性。

信號(hào)量的簡單示例:

public class SemaphoreTest {

    private final Semaphore semaphore = new Semaphore(1);
    private int value;

    public void addValue() {
        try {
            semaphore.acquire();
            value += 1;
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
        finally {
            semaphore.release();
        }
    }

程序中使用信號(hào)量實(shí)現(xiàn)了一個(gè)線程安全的方法,初始值設(shè)為了 1,當(dāng)多個(gè)方法訪問 addValue 方法的時(shí)候,由于 acquire 方法保證原子性,所以只能有一個(gè)線程將計(jì)數(shù)器減 1 并進(jìn)入臨界區(qū),另一個(gè)線程等待。

一個(gè)線程執(zhí)行完后,調(diào)用 release 方法,計(jì)數(shù)器加 1,另一個(gè)等待的線程被喚醒。

Semaphore 與 Lock 的一個(gè)不同點(diǎn)便是信號(hào)量允許多個(gè)線程同時(shí)進(jìn)入臨界區(qū),例如將初始值設(shè)置的更大一些。例如下面這個(gè)例子:

public class SemaphoreTest {

    //初始值 2,表示 2 個(gè)線程可同時(shí)進(jìn)入臨界區(qū)
    private final Semaphore semaphore = new Semaphore(2);

    public void test() {
        try {
            semaphore.acquire();
            System.out.println("線程" + Thread.currentThread().getName() + " 進(jìn)入臨界區(qū) : " + System.currentTimeMillis());
            Thread.sleep(1000);
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
        finally {
            semaphore.release();
        }
    }
}
五、CountDownLatch

CountDownLatch 是一個(gè)線程同步的工具,主要實(shí)現(xiàn)一個(gè)線程等待多個(gè)線程的功能。在原始的 Thread 中,可以調(diào)用 join() 方法來等待線程執(zhí)行完畢,而 CountDownLatch 則可以用在線程池中的線程等待。

下面是 CountDownLatch 的使用示例:

public class CountDownLatchTest {

    //實(shí)際生產(chǎn)中不推薦使用這種創(chuàng)建線程的方式
    private final ExecutorService threadPool = Executors.newFixedThreadPool(2);

    public void test() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(2);
        threadPool.execute(() -> {
            System.out.println("線程1執(zhí)行完畢");
            latch.countDown();
        });
        threadPool.execute(() -> {
            System.out.println("線程2執(zhí)行完畢");
            latch.countDown();
        });
        latch.await();
        System.out.println("兩個(gè)線程都執(zhí)行完畢");

        threadPool.shutdown();
    }
}

CountDownLatch 的初始值為 2,線程執(zhí)行完畢則調(diào)用 countDown 方法,計(jì)數(shù)器減 1。減到 0 的時(shí)候,會(huì)喚醒主線程繼續(xù)執(zhí)行。

六、CyclicBarrier

CyclicBarrier 也是一個(gè)線程同步工具類,主要實(shí)現(xiàn)多個(gè)線程之間的互相等待。

CyclicBarrier 有兩個(gè)構(gòu)造函數(shù),可以傳一個(gè)計(jì)數(shù)器的初始值,還可以加上一個(gè) Runnable,表示計(jì)數(shù)器執(zhí)行減到 0 的時(shí)候,需要執(zhí)行的回調(diào)方法。

public class CyclicBarrierTest {

    private final ExecutorService threadPool = Executors.newFixedThreadPool(2);
    private final CyclicBarrier barrier = new CyclicBarrier(2, this::note);

    public void print(){
        threadPool.execute(() -> {
            System.out.println("線程1執(zhí)行完畢");
            try {
                barrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        });
        threadPool.execute(() -> {
            System.out.println("線程2執(zhí)行完畢");
            try {
                barrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        });
        threadPool.shutdown();
    }
    public void note(){
        System.out.println("兩個(gè)線程執(zhí)行完畢");
    }
}

示例中設(shè)置 CyclicBarrier 的初始值為 2,線程執(zhí)行完畢調(diào)用 await 方法,計(jì)數(shù)器減 1。print() 方法中的兩個(gè)線程執(zhí)行完后,計(jì)數(shù)器減到 0,就會(huì)調(diào)用 note 方法。

七、ThreadPoolExecutor 1. 線程池的工作原理

由于線程是一種重量級(jí)對(duì)象,頻繁的創(chuàng)建和銷毀比較消耗系統(tǒng)資源,因此線程池的優(yōu)勢就顯現(xiàn)出來了。線程池可有降低資源消耗,因?yàn)椴挥妙l繁創(chuàng)建和銷毀線程;提高響應(yīng)速度,需要執(zhí)行任務(wù)時(shí),可直接使用線程池中的線程資源;還能夠有效的管理、監(jiān)控線程池中的線程。

Java 中的線程池的實(shí)現(xiàn)是一種很典型的生產(chǎn)者-消費(fèi)者模式,使用線程的一方是生產(chǎn)者,主要提供需要執(zhí)行的任務(wù),線程池是消費(fèi)者,消費(fèi)生產(chǎn)者提供的任務(wù)。

下面這段代碼能夠幫助理解線程池的實(shí)現(xiàn)原理(僅用于幫助理解,實(shí)際執(zhí)行結(jié)果有出入):

public class ThreadPool {
    //保存任務(wù)的阻塞隊(duì)列
    private BlockingQueue workQueue;
    //保存工作線程的列表
    private List threadList = new ArrayList<>();

    //構(gòu)造方法
    public ThreadPool(int poolSize, BlockingQueue workQueue) {
        this.workQueue = workQueue;
        //根據(jù)poolSize的數(shù)量創(chuàng)建工作線程,并執(zhí)行線程
        for (int i = 0; i < poolSize; i++) {
            WorkThread thread = new WorkThread();
            thread.start();
            threadList.add(thread);
        }
    }

    //執(zhí)行任務(wù)的方法,主要是將任務(wù)添加到隊(duì)列中
    public void execute(Runnable task) {
        try {
            workQueue.put(task);
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    //工作線程
    class WorkThread extends Thread{
        @Override
        public void run() {
            //循環(huán)取出任務(wù)執(zhí)行
            while (!workQueue.isEmpty()) {
                try {
                    Runnable task = workQueue.take();
                    task.run();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

上面的代碼注釋很詳細(xì)了,主要是使用了一個(gè)阻塞隊(duì)列,用來存儲(chǔ)生產(chǎn)者的任務(wù)。然后在構(gòu)造器中創(chuàng)建線程,并循環(huán)從隊(duì)列中取出任務(wù)執(zhí)行。

2. Java 中的線程池

Java 中提供了 Executors 這個(gè)類來快速創(chuàng)建線程池,簡單使用示例如下:

Executors.newSingleThreadExecutor();//創(chuàng)建一個(gè)線程的線程池
Executors.newFixedThreadPool(5);//創(chuàng)建固定數(shù)量線程
Executors.newCachedThreadPool();//創(chuàng)建可調(diào)整數(shù)量的線程
Executors.newScheduledThreadPool(5);//創(chuàng)建定時(shí)任務(wù)線程池

但是在《阿里巴巴Java開發(fā)手冊(cè)》中,明確禁止使用 Executors 創(chuàng)建線程池(甚至也不建議使用 Thread 顯式創(chuàng)建線程),主要原因是 Executors 的默認(rèn)方法都是使用的無界隊(duì)列,在高負(fù)載的情況下,很容易導(dǎo)致 OOM(Out Of Memory)。

所以在 Java 中創(chuàng)建線程池的正確姿勢是使用 ThreadPoolExecutor ,其構(gòu)造函數(shù)有七個(gè):

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue workQueue,
                          ThreadFactory threadFactory,//可選
                          RejectedExecutionHandler handler//可選
                          ) { ...

corePoolSize:線程池中最少的線程數(shù)

maximumPoolSize:線程池中創(chuàng)建的最大的線程數(shù)

keepAliveTime:表示線程池中線程的活躍時(shí)間,如果線程在這個(gè)活躍時(shí)間內(nèi)沒有執(zhí)行任務(wù),并且線程數(shù)量超過了 corePoolSize,那么線程池就會(huì)回收多余的線程。

TimeUnit:上一個(gè)參數(shù)的時(shí)間單位

workQueue:保存任務(wù)的隊(duì)列,為了避免 OOM,建議使用有界隊(duì)列

threadFactory:可選參數(shù),不傳的話就是默認(rèn)值。也可以自己傳一個(gè)實(shí)現(xiàn)了 ThreadFactory 接口的類,表示自定義線程,例如給線程指定名字,線程組等。

handler:可選參數(shù)。定義任務(wù)的拒絕策略,表示無空閑線程時(shí),并且隊(duì)列中的任務(wù)滿了的,怎么拒絕新的任務(wù)。目前的拒絕策略有四種:

AbortPolicy:默認(rèn)的拒絕策略,拋出 RejectedExecutionException 異常

CallerRunsPolicy:讓提交任務(wù)的線程自己去執(zhí)行這個(gè)任務(wù)

DiscardOldestPolicy:丟棄最老的任務(wù),及最先加入隊(duì)列中的任務(wù),并添加新的任務(wù)

DiscardPolicy:直接丟棄任務(wù),并且不會(huì)拋出任何異常

調(diào)用 ThreadPoolExecutor

線程池創(chuàng)建好了之后,就需要執(zhí)行任務(wù),ThreadPoolExecutor 提供了兩個(gè)方法,一是 execute,二是 submit。execute 沒有返回值,也就是說無法獲取執(zhí)行結(jié)果。使用示例如下:

public static void main(String[] args) {
    BlockingQueue queue = new LinkedBlockingQueue<>(5);
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS, queue);
    threadPool.execute(() -> {
        System.out.println("In this world");
    });

    threadPool.shutdown();
}

而 submit 方法有一個(gè) Future 接口的返回值,F(xiàn)uture 接口有五個(gè)方法:

cancle:取消任務(wù)

isCancled:任務(wù)是否已取消

isDone:任務(wù)是否已執(zhí)行完

get:獲取任務(wù)執(zhí)行結(jié)果

get(long timeout, TimeUnit unit):支持超時(shí)獲取任務(wù)執(zhí)行結(jié)果

下面代碼展示了取消任務(wù)的方法:

public static void main(String[] args) {
    
    BlockingQueue queue = new LinkedBlockingQueue<>(5);
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS, queue);

    Future future = threadPool.submit(() -> {
        System.out.println("I am roseduan");
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
    future.cancel(false);
    threadPool.shutdown();
}

程序的本意是打印語句然后休眠 5 秒,但由于調(diào)用了 cancle 方法 ,因此程序直接結(jié)束,不會(huì)有任何輸出。

八、FutureTask

FutureTask 也是一個(gè)支持獲取任務(wù)執(zhí)行結(jié)果的工具類,F(xiàn)utureTask 實(shí)現(xiàn)了 Runnable 和 Future 接口。

所以可以將 FutureTask 作為任務(wù)提交給 ThreadPoolExecutor 或者 Thread 執(zhí)行,并且可以獲取執(zhí)行結(jié)果。簡單的使用如下:

public static void main(String[] args) throws ExecutionException, InterruptedException {
    //創(chuàng)建任務(wù)
    FutureTask task = new FutureTask<>(() -> "Java and " + "Python");

    BlockingQueue queue = new LinkedBlockingQueue<>(5);
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS, queue);
    
    threadPool.execute(task);
    //獲取執(zhí)行結(jié)果
    System.out.println(task.get());

    threadPool.shutdown();
}

傳給 Thread 作為參數(shù)的使用示例如下:

public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask task = new FutureTask<>(() -> 1 + 2);
        Thread thread = new Thread(task);
        thread.start();
        System.out.println(task.get());//輸出3
    }
九、CompletableFuture

CompletableFuture 是一個(gè)異步編程的工具類,異步化能夠最大化并行程序的執(zhí)行,是多線程性能優(yōu)化的基礎(chǔ)。

1. 創(chuàng)建 CompletableFuture 對(duì)象

Completable 有四個(gè)靜態(tài)方法,可以用來創(chuàng)建對(duì)象:

runAsync(Runnable runnable);//無返回值
runAsync(Runnable runnable, Executor executor);//無返回值,可指定線程池

supplyAsync(Supplier supplier);//有返回值
supplyAsync(Supplier supplier, Executor executor);//有返回值,可指定線程池

可以看到,四個(gè)方法分為了是否有返回值,和是否自定義線程池。如果不自定義線程池,那么 CompletableFuture 會(huì)使用公共的線程池,默認(rèn)創(chuàng)建 CPU 核數(shù)的數(shù)量的線程池,當(dāng)有多個(gè)任務(wù)的時(shí)候,還是建議根據(jù)每個(gè)任務(wù)自定義線程池。

一個(gè)簡單的使用示例如下,其中 task3 會(huì)等待兩個(gè)任務(wù)都執(zhí)行完畢:

public static void main(String[] args) {
    CompletableFuture task1 = CompletableFuture.runAsync(() -> {
        System.out.println("任務(wù)1執(zhí)行完畢");
    });
    CompletableFuture task2 = CompletableFuture.runAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("任務(wù)2執(zhí)行完畢");
    });
    CompletableFuture task3 = task1.thenCombine(task2, (__, res) -> "兩個(gè)任務(wù)執(zhí)行完畢");

    System.out.println(task3.join());
}

CompletableFuture 實(shí)現(xiàn)了 Future 接口,因此可以查看任務(wù)執(zhí)行的情況,并且可以獲取返回值。

2. CompletionStage 接口中的方法

CompletableFuture 還實(shí)現(xiàn)了 CompletionStage 接口。這個(gè)接口描述了任務(wù)之間的時(shí)序關(guān)系,分別有串行、并行、聚合三種關(guān)系。需要注意的是,并行本就是其所具有的特性,所以不再探討了,并且聚合關(guān)系又分為了 AND 聚合關(guān)系和 OR 聚合關(guān)系。下面依次介紹串行、AND 聚合、OR 聚合這三種關(guān)系。

首先是串行關(guān)系,串行很簡單,一個(gè)任務(wù)執(zhí)行完后再執(zhí)行另一個(gè)任務(wù),例如下圖:

描述串行關(guān)系的幾個(gè)方法是:thenApply、thenAccept、thenRun、thenCompose。

thenApply 既支持接收參數(shù),又能夠支持返回值。

thenAccept 支持接收參數(shù),但是不支持返回值。

thenRun 既不能接收參數(shù),也不能有返回值。

CompletionStage 中的大部分方法都有帶有 Async 后綴的方法,表示可能會(huì)使用其他的線程來執(zhí)行主體中的內(nèi)容,后面介紹的方法都類似這樣,不再贅述。

簡單的使用示例如下:

public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture future = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("任務(wù)1執(zhí)行完畢");
        return "Task1";
    }).thenApply((s) -> "接收到的參數(shù) : " + s);;

    System.out.println(future.get());
}

其次是 AND 匯聚關(guān)系,典型的場景便是一個(gè)線程等待兩個(gè)線程都執(zhí)行完后再執(zhí)行,例如下圖:

描述 AND 聚合關(guān)系的有三個(gè)方法:thenCombine、thenAcceptBoth、runAfterBoth,其是否接收參數(shù)和支持返回值,和上面的三個(gè)方法對(duì)應(yīng)。一個(gè)簡單的使用示例如下:

public static void main(String[] args) {
    CompletableFuture task1 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("任務(wù)1執(zhí)行完畢");
        return "task1";
    });
    CompletableFuture task2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("任務(wù)2執(zhí)行完畢");
        return "task2";
    });

    CompletableFuture task3 = task1.thenCombine(task2, (r,s) -> r + " " + s);
    System.out.println(task3.join());
}

任務(wù) 1 休眠了 2 秒,任務(wù) 3 會(huì)等待前面兩個(gè)任務(wù)執(zhí)行完成之后再執(zhí)行。

最后是 OR 聚合關(guān)系,表示線程等待其中一個(gè)線程滿足條件之后,就可以繼續(xù)執(zhí)行了,不用等待全部的線程。

描述 OR 聚合關(guān)系的是 applyToEither、acceptEither、runAfterEither。使用示例和上面的類似,只需要將方法改一下就是了,這里不再贅述了。

3. 處理異常

在異步編程中,CompletionStage 接口還提供了幾個(gè)可以處理異常的方法,和 try() catch() finally() 類似。

這幾個(gè)方法分別是 :

exceptionally:相當(dāng)于 catch

whenComplete:相當(dāng)于 finally

handle:相當(dāng)于 finally ,支持返回值

使用示例如下:

public static void main(String[] args) {
    CompletableFuture task = CompletableFuture.supplyAsync(() -> {
        String str = null;
        return str.length();
        //相當(dāng)于catch
    }).exceptionally((e) -> {
        System.out.println("發(fā)生異常");
        return 0;
    });

    //相當(dāng)于 finally
    task.whenComplete((s, r) -> {
        System.out.println("執(zhí)行結(jié)束");
    });
    System.out.println(task.join());
}
十、CompletionService

CompletionService 是一個(gè)批量執(zhí)行異步任務(wù)的工具類,先來看一個(gè)例子:

public static void main(String[] args) throws ExecutionException, InterruptedException {
    StringBuffer sb = new StringBuffer();

    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
            5, 5,
            10, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(5));

    Future task1 = threadPool.submit(() -> {
        Thread.sleep(2000);
        return "Task1";
    });
    Future task2 = threadPool.submit(() -> "Task2");
    Future task3 = threadPool.submit(() -> "Task3");
    
    sb.append(task1.get());
    sb.append(task2.get());
    sb.append(task3.get());
}

程序的意思是,依次執(zhí)行三個(gè)任務(wù),并將其結(jié)果存儲(chǔ)到 StringBuffer 中,由于 task1 休眠了 2 秒,所以 sb 會(huì)在這里阻塞。

由于這三個(gè)任務(wù)之間沒有關(guān)聯(lián),所以等待的消耗完全是沒必要的,解決的辦法便是利用一個(gè)阻塞隊(duì)列,先執(zhí)行完的任務(wù)將結(jié)果保存在隊(duì)列中,sb 從隊(duì)列中取出就行了。

CompletionService 實(shí)際上就是將線程池和阻塞隊(duì)列的功能整合了起來,解決了類似上面的問題。CompletionService 的實(shí)現(xiàn)類是 ExecutorCompletionService,這個(gè)類有兩個(gè)構(gòu)造方法:

public ExecutorCompletionService(Executor executor) {}
public ExecutorCompletionService(Executor executor,
    BlockingQueue> completionQueue) {}

如果不傳一個(gè)阻塞隊(duì)列,則會(huì)使用默認(rèn)的無界隊(duì)列。

CompletionService 主要有這幾個(gè)方法:

submit() 提交任務(wù)、take() 從阻塞隊(duì)列中獲取執(zhí)行結(jié)果(如果隊(duì)列為空,線程阻塞)、poll() 也是從隊(duì)列中獲取執(zhí)行結(jié)果(如果隊(duì)列為空,則返回 null),另外 poll 還支持超時(shí)獲取。

使用 CompletionService 改造后的程序示例如下:

public static void main(String[] args) throws ExecutionException, InterruptedException {
    StringBuffer sb = new StringBuffer();

    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
            5, 5,
            10, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(5));

    CompletionService service = new ExecutorCompletionService<>(threadPool);
    service.submit(() -> {
        Thread.sleep(2000);
        return "Task1";
    });
    service.submit(() -> "Task2");
    service.submit(() -> "Task3");

    System.out.println(sb.append(service.take().get()).toString());
    System.out.println(sb.append(service.take().get()).toString());
    System.out.println(sb.append(service.take().get()).toString());
}
十一、Fork/Join 1. Fork/Join 使用

Fork/Join 是一個(gè)處理分治任務(wù)的計(jì)算框架,所謂分治,即分而治之,將一個(gè)任務(wù)分解成子任務(wù),求解子任務(wù),然后將子任務(wù)的結(jié)果合并,就得到了最后的結(jié)果。分治思想的應(yīng)用十分的廣泛,例如常見的快速排序、歸并排序,還有流行的大數(shù)據(jù)計(jì)算框架 MapReduce,都應(yīng)用了分治思想。

Java 中,F(xiàn)ork 對(duì)應(yīng)的是 任務(wù)分解,Join 則表示 子任務(wù)的結(jié)果合并。

Fork/Join 主要包含兩個(gè)主要的實(shí)現(xiàn)類:

一是線程池 ForkJoinPool,默認(rèn)會(huì)創(chuàng)建 CPU核數(shù)數(shù)量的線程

二是 ForkJoinTask,這是一個(gè)抽象類,主要的方法有 fork() 和 join(),前者表示執(zhí)行子任務(wù),后者表示阻塞等待子任務(wù)的執(zhí)行結(jié)果。ForkJoinTask 還有兩個(gè)子類:

RecursiveTask

RecursiveAction

這兩個(gè)類也是抽象的,我們需要自定義并繼承這個(gè)類,并覆蓋其 compute 方法。其中 RecursiveTask 有返回值,而 RecursiveAction 沒有返回值。

下面是一個(gè)使用 ForkJoin 的示例,實(shí)現(xiàn)了 n 的階乘,注釋寫得比較詳細(xì)。

public class ForkJoinTest {
    public static void main(String[] args) {
        //創(chuàng)建線程池
        ForkJoinPool forkJoinPool = new ForkJoinPool(4);
        //創(chuàng)建任務(wù)
        Factorial task = new Factorial(6);
        //invoke 方法執(zhí)行任務(wù)(還可以使用 execute、submit),得到執(zhí)行的結(jié)果
        Integer res = forkJoinPool.invoke(task);
        System.out.println(res);
    }

    static class Factorial extends RecursiveTask {
        private final int n;
        Factorial(int n) {
            this.n = n;
        }
        @Override
        protected Integer compute() {
            if (n == 0){
                return 1;
            }
            Factorial f = new Factorial(n - 1);
            //執(zhí)行子任務(wù)
            f.fork();
            //等待子任務(wù)結(jié)果
            return n * factorial.join();
        }
    }
}
2. ForkJoinPool 原理

和普通的線程池類似,F(xiàn)orkJoinPool 是一個(gè)特殊的線程池,并且也采用的是生產(chǎn)者 - 消費(fèi)者模式。跟普通線程池共享一個(gè)隊(duì)列不同,F(xiàn)orkJoinPool 其中維護(hù)了多個(gè)雙端隊(duì)列,當(dāng)一個(gè)線程對(duì)應(yīng)的任務(wù)隊(duì)列為空的時(shí)候,線程并不會(huì)空閑,而是“竊取”其他隊(duì)列的任務(wù)執(zhí)行。

由于是雙端隊(duì)列,正常執(zhí)行任務(wù)和“竊取任務(wù)”可以從兩端進(jìn)行出隊(duì),這樣避免了數(shù)據(jù)競爭。

采用“任務(wù)竊取”這種模式,也是 ForkJoinPool 比普通線程池更加智能的體現(xiàn)。

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/74698.html

相關(guān)文章

  • 跳槽季如何快速全面復(fù)習(xí)面試題

    摘要:排序算法和集合工具類排序算法和集合工具類。面試官總是問排序算法也不是在難為你,而是在考察你的編程功底。你首先要理解多線程不僅僅是和那么簡單,整個(gè)并發(fā)包下面的工具都是在為多線程服務(wù)。 去年的這個(gè)時(shí)候樓主通過兩個(gè)月的復(fù)習(xí)拿到了阿里巴巴的 offer,有一些運(yùn)氣,也有一些心得,借著跳槽季來臨特此分享出來。簡單梳理一下我的復(fù)習(xí)思路,同時(shí)也希望和大家一起交流討論,一起學(xué)習(xí),如果不對(duì)之處歡迎指正一...

    keke 評(píng)論0 收藏0
  • [Java發(fā)-6]“管程”-java管程初探

    摘要:語言在之前,提供的唯一的并發(fā)原語就是管程,而且之后提供的并發(fā)包,也是以管程技術(shù)為基礎(chǔ)的。但是管程更容易使用,所以選擇了管程。線程進(jìn)入條件變量的等待隊(duì)列后,是允許其他線程進(jìn)入管程的。并發(fā)編程里兩大核心問題互斥和同步,都可以由管程來幫你解決。 并發(fā)編程這個(gè)技術(shù)領(lǐng)域已經(jīng)發(fā)展了半個(gè)世紀(jì)了。有沒有一種核心技術(shù)可以很方便地解決我們的并發(fā)問題呢?這個(gè)問題, 我會(huì)選擇 Monitor(管程)技術(shù)。Ja...

    Steve_Wang_ 評(píng)論0 收藏0
  • Java開發(fā)區(qū)塊鏈的三大sdk

    摘要:是企業(yè)與區(qū)塊鏈相遇的地方。的框架旨在成為開發(fā)區(qū)塊鏈解決方案的支柱。以太坊,主要是針對(duì)工程師使用進(jìn)行區(qū)塊鏈以太坊開發(fā)的詳解。 如果你想將區(qū)塊鏈合并到一個(gè)Java項(xiàng)目中,現(xiàn)在我們來看看就是這個(gè)細(xì)分領(lǐng)域中三個(gè)最大的OSS玩家。 好的伙計(jì)們,我們都聽說過比特幣,以太坊或其他加密貨幣,其中有一些時(shí)髦的名字圍繞著我們常見的新聞,但我們作為Java開發(fā)人員知道如何輕松地與這些區(qū)塊鏈技術(shù)進(jìn)行交互嗎?以...

    iKcamp 評(píng)論0 收藏0
  • [Java發(fā)-11] 發(fā)容器的使用

    摘要:同步容器及其注意事項(xiàng)中的容器主要可以分為四個(gè)大類,分別是和,但并不是所有的容器都是線程安全的。并發(fā)容器及其注意事項(xiàng)在版本之前所謂的線程安全的容器,主要指的就是同步容器,當(dāng)然因?yàn)樗蟹椒ǘ加脕肀WC互斥,串行度太高了,性能太差了。 Java 并發(fā)包有很大一部分內(nèi)容都是關(guān)于并發(fā)容器的,因此學(xué)習(xí)和搞懂這部分的內(nèi)容很有必要。 Java 1.5 之前提供的同步容器雖然也能保證線程安全,但是性能很差...

    legendaryedu 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<