摘要:只要線程池未關閉該策略直接在調用者線程中運行當前被丟棄的任務。顯然這樣做不會真的丟棄任務但是任務提交線程的性能極有可能會急劇下降。任務并嘗試再次提交當前任務。
1. 同步控制
synchronized的擴展:重入鎖
同步控制不僅有synchronized配合object.wait()以及object.notify(),也有增強版的reentrantLock(重入鎖)
public class ReenterLock implements Runnable{ public static ReentrantLock lock=new ReentrantLock(); public static int i=0; @Override public void run() { for(int j=0;j<10000000;j++){ lock.lock(); lock.lock(); //此處演示重入性 try{ i++; }finally{ lock.unlock(); //退出臨界區必須解鎖 lock.unlock(); } } } public static void main(String[] args) throws InterruptedException { ReenterLock tl=new ReenterLock(); Thread t1=new Thread(tl); Thread t2=new Thread(tl); t1.start();t2.start(); t1.join();t2.join(); System.out.println(i); //計算結果為 20000000 } }
我們來看下reentrantlock相比synchronized鎖有何優點:
中斷響應
面對死鎖,似乎synchronized沒有任何主動解決策略,而reentrantlock則可以輕松解決
public class IntLock implements Runnable { public static ReentrantLock lock1 = new ReentrantLock(); public static ReentrantLock lock2 = new ReentrantLock(); int lock; /** * 控制加鎖順序,方便構造死鎖 * @param lock */ public IntLock(int lock) { this.lock = lock; } @Override public void run() { try { if (lock == 1) { lock1.lockInterruptibly(); //可中斷的加鎖 try{ Thread.sleep(500); }catch(InterruptedException e){} lock2.lockInterruptibly(); } else { lock2.lockInterruptibly(); try{ Thread.sleep(500); }catch(InterruptedException e){} lock1.lockInterruptibly(); } } catch (InterruptedException e) { e.printStackTrace(); System.out.println(Thread.currentThread().getName()+":線程被中斷"); } finally { if (lock1.isHeldByCurrentThread()) lock1.unlock(); if (lock2.isHeldByCurrentThread()) lock2.unlock(); System.out.println(Thread.currentThread().getName()+":線程退出"); } } public static void main(String[] args) throws InterruptedException { IntLock r1 = new IntLock(1); IntLock r2 = new IntLock(2); Thread t1 = new Thread(r1,"線程1"); Thread t2 = new Thread(r2,"線程2"); t1.start();t2.start(); Thread.sleep(1000); //中斷其中一個線程 t2.interrupt(); } } // 輸出結果: // java.lang.InterruptedException // at //java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:898) // at //java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1222) // at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335) // at geym.conc.ch3.synctrl.IntLock.run(IntLock.java:31) // at java.lang.Thread.run(Thread.java:745) // 線程2:線程被中斷 // 線程2:線程退出 // 線程1:線程退出
由上可知,當t1,t2形成死鎖時,可以主動利用中斷來解開,但完成任務的只有t1,t2被中斷. 而如果換成synchronized則將無法進行中斷
1鎖申請等待時限
lock1.tryLock(); //嘗試獲取鎖,獲得立即返回true,未獲得立即返回false lock1.tryLock(5, TimeUnit.SECONDS); //嘗試獲取鎖,5秒內未獲得則返回false,獲得返回true
public class TryLock implements Runnable { public static ReentrantLock lock1 = new ReentrantLock(); public static ReentrantLock lock2 = new ReentrantLock(); int lock; public TryLock(int lock) { this.lock = lock; } @Override public void run() { if (lock == 1) { while (true) { if (lock1.tryLock()) { try { try { Thread.sleep(500); } catch (InterruptedException e) { } if (lock2.tryLock()) { try { System.out.println(Thread.currentThread() .getId() + ":My Job done"); return; } finally { lock2.unlock(); } } } finally { lock1.unlock(); } } } } else { while (true) { if (lock2.tryLock()) { try { try { Thread.sleep(500); } catch (InterruptedException e) { } if (lock1.tryLock()) { try { System.out.println(Thread.currentThread() .getId() + ":My Job done"); return; } finally { lock1.unlock(); } } } finally { lock2.unlock(); } } } } } public static void main(String[] args) throws InterruptedException { TryLock r1 = new TryLock(1); TryLock r2 = new TryLock(2); Thread t1 = new Thread(r1); Thread t2 = new Thread(r2); t1.start(); t2.start(); } } // 15:My Job done // 14:My Job done
使用trylock可以有效地避免產生死鎖
公平鎖
synchronized鎖為非公平鎖,而reentrantLock既可以是公平鎖也可以是非公平鎖
非公平鎖容易產生饑餓,公平鎖先進先出,但效率不敵非公平鎖
public ReentrantLock(boolean fair)
ffffd
重入鎖的搭檔Condition
Condition和object.wait(),object.notify()方法類似
condition的基本方法如下:
void await() throws InterruptedException; //使當前線程等待,釋放鎖,能響應signal和signalAll方法,響應中斷 void awaitUninterruptibly(); //類似 await,但不響應中斷 long awaitNanos(long nanosTimeout)throws InterruptedException; //等待一段時間 boolean await (long time,TimeUnit unit)throws InterruptedException; boolean awaitUntil(Date deadline)throws InterruptedException; void signal(); //喚醒一個等待中的線程 void signalAll(); //喚醒所有等待中的線程
JDK內部就有很多對于ReentrantLock的使用,如ArrayBlockingQueue
//在 ArrayBlockingQueue中的一些定義 boolean fair = true; private final ReentrantLock lock = new ReentrantLock(fair); private final Condition notEmpty = lock.newCondition(); private final Condition notFull = lock.newCondition(); //put(方法的實現 public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); final E[] items = this.items; final ReentrantLock lock = this.lock; lock.lockInterruptibly(); //put方法做同步 try { try { while (count == items.length) //隊列已滿 notFull.await(); //等待隊列有足夠的空間 } catch (InterruptedException ie) { notFull.signal(); throw ie; } insert(e); //notFull被通知時,說明有足夠的空間 } finally { lock.unlock(); } } private void insert(E x) { items[putIndex] = x; putIndex = inc(putIndex); ++count; notFull.signal(); //通知take方法的線程,隊列已有數據 } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); //對take()方法做同步 try { try { while (count == 0) //如果隊列為空 notEmpty.await(); //則消費者隊列要等待一個非空的信號 } catch (InterruptedException ie) { notEmpty.signal(); throw ie; } E x = extract(); return x; } finally { lock.unlock(); } } private E extract() { final E[] items = this.items; E x = items[takeIndex]; items[takeIndex] = null; takeIndex = inc(takeIndex); --count; notFull.signal(); //通知put線程隊列已有空閑空間 return x; }
多線程同時訪問:信號量(semaphore)
同步鎖只能允許一個線程進行訪問,信號量可以指定多個線程同時訪問同一個資源.
//構造方法 public Semaphore(int permits) //傳入int表示能同時訪問的線程數 public Semaphore(int permits, boolean fair) //線程數,是否公平鎖 //實例方法 public void acquire() throws InterruptedException //獲取一個訪問權限,會阻塞線程,會被打斷 public void acquireUninterruptibly() //獲取一個訪問權限,會阻塞線程,不會被打斷 public boolean tryAcquire() //獲取一個訪問權限,立即返回 public boolean tryAcquire(long timeout, TimeUnit unit) //獲取一個訪問權限,嘗試一段時間 public void release() //釋放一個訪問權限
public class SemapDemo implements Runnable { final Semaphore semp = new Semaphore(5); @Override public void run() { try { semp.acquire(); Thread.sleep(2000); System.out.println(Thread.currentThread().getId() + ":done!"); } catch (InterruptedException e) { e.printStackTrace(); } finally { semp.release(); //使用完后要釋放,否則會引起信號量泄漏 } } public static void main(String[] args) { ExecutorService exec = Executors.newFixedThreadPool(20); final SemapDemo demo = new SemapDemo(); for (int i = 0; i < 20; i++) { exec.submit(demo); } } } //輸出結果 //每次輸出5個結果,對應信號量的5個許可
讀寫鎖ReadWriteLock
讀寫鎖適用于讀多寫少的場景,讀讀之間為并行,讀寫之間為串行,寫寫之間也為串行
public class ReadWriteLockDemo { private static Lock lock=new ReentrantLock(); private static ReentrantReadWriteLock readWriteLock=new ReentrantReadWriteLock(); //獲取讀寫鎖 private static Lock readLock = readWriteLock.readLock(); //讀鎖 private static Lock writeLock = readWriteLock.writeLock(); //寫鎖 private int value; public Object handleRead(Lock lock) throws InterruptedException{ try{ lock.lock(); //模擬讀操作 Thread.sleep(1000); //讀操作的耗時越多,讀寫鎖的優勢就越明顯 return value; }finally{ lock.unlock(); } } public void handleWrite(Lock lock,int index) throws InterruptedException{ try{ lock.lock(); //模擬寫操作 Thread.sleep(1000); value=index; }finally{ lock.unlock(); } } public static void main(String[] args) { final ReadWriteLockDemo demo=new ReadWriteLockDemo(); Runnable readRunnale=new Runnable() { @Override public void run() { try { demo.handleRead(readLock); // demo.handleRead(lock); } catch (InterruptedException e) { e.printStackTrace(); } } }; Runnable writeRunnale=new Runnable() { @Override public void run() { try { demo.handleWrite(writeLock,new Random().nextInt()); // demo.handleWrite(lock,new Random().nextInt()); } catch (InterruptedException e) { e.printStackTrace(); } } }; for(int i=0;i<18;i++){ new Thread(readRunnale).start(); } for(int i=18;i<20;i++){ new Thread(writeRunnale).start(); } } } //結果: //讀寫鎖明顯要比單純的鎖要更快結束,說明讀寫鎖確實提升不少效率
倒計數器CountDownLatch
讓一個線程等待,知道倒計時結束
public class CountDownLatchDemo implements Runnable { static final CountDownLatch end = new CountDownLatch(10); //構造倒計時器,倒計數為10 static final CountDownLatchDemo demo=new CountDownLatchDemo(); @Override public void run() { try { //模擬檢查任務 Thread.sleep(new Random().nextInt(10)*1000); System.out.println("check complete"); end.countDown(); //倒計時器減1 } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) throws InterruptedException { ExecutorService exec = Executors.newFixedThreadPool(10); for(int i=0;i<10;i++){ exec.submit(demo); } //等待檢查 end.await(); //主線程阻塞,待其他線程全部完成后再喚醒主線程 //發射火箭 System.out.println("Fire!"); exec.shutdown(); } }
循環柵欄CyclicBarrier
循環柵欄類似于倒計時器,但是計數器可以反復使用,cyclicBarrier比CountDownLatch稍微強大些,可以傳入一個barrierAction,barrierAction指每次完成計數便出發一次
public CyclicBarrier(int parties,Runnable barrierAction) //構造方法
public class CyclicBarrierDemo { public static class Soldier implements Runnable { private String soldier; private final CyclicBarrier cyclic; Soldier(CyclicBarrier cyclic, String soldierName) { this.cyclic = cyclic; this.soldier = soldierName; } public void run() { try { //等待所有士兵到齊 cyclic.await(); //觸發一次循環柵欄,達到計數器后才會進行下一步工作 doWork(); //等待所有士兵完成工作 cyclic.await(); //再次觸發循環柵欄,達到計數器后才會進行下一步工作 } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } void doWork() { try { Thread.sleep(Math.abs(new Random().nextInt()%10000)); //模擬工作 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(soldier + ":任務完成"); } } public static class BarrierRun implements Runnable { //用于傳入CyclicBarrier的構造方法,作為達到計數器數值后的觸發任務, 可以被多次調用 boolean flag; int N; public BarrierRun(boolean flag, int N) { this.flag = flag; this.N = N; } public void run() { if (flag) { System.out.println("司令:[士兵" + N + "個,任務完成!]"); } else { System.out.println("司令:[士兵" + N + "個,集合完畢!]"); flag = true; } } } public static void main(String args[]) throws InterruptedException { final int N = 10; Thread[] allSoldier=new Thread[N]; boolean flag = false; CyclicBarrier cyclic = new CyclicBarrier(N, new BarrierRun(flag, N)); //設置屏障點,主要是為了執行這個方法 System.out.println("集合隊伍!"); for (int i = 0; i < N; ++i) { System.out.println("士兵 "+i+" 報道!"); allSoldier[i]=new Thread(new Soldier(cyclic, "士兵 " + i)); allSoldier[i].start(); } } }
注意: 一旦其中一個被interrupt后,很可能會拋出一個interruptExpection和9個BrokenBarrierException,表示該循環柵欄已破損,防止其他線程進行無所謂的長久等待
線程阻塞工具LockSupport
LockSupport是一個非常實用的線程阻塞工具,不需要獲取某個對象的鎖(如wait),也不會拋出interruptedException異常
public static void park() //掛起當前線程, public static void park(Object blocker) //掛起當前線程,顯示阻塞對象,parking to wait for <地址值>
public class LockSupportDemo { public static Object u = new Object(); static ChangeObjectThread t1 = new ChangeObjectThread("t1"); static ChangeObjectThread t2 = new ChangeObjectThread("t2"); public static class ChangeObjectThread extends Thread { public ChangeObjectThread(String name){ super.setName(name); } @Override public void run() { synchronized (u) { System.out.println("in "+getName()); LockSupport.park(this); } } } public static void main(String[] args) throws InterruptedException { t1.start(); Thread.sleep(100); t2.start(); LockSupport.unpark(t1); LockSupport.unpark(t2); //即使unpark發生在park前,也可以使程序正常結束 t1.join(); t2.join(); } }
LockSupport使用了類似信號量的機制,它為每個線程準備一個許可,如果許可可用,park立即返回,并且消費這個許可(轉為不可用),如果許可不可用,就會阻塞,而unpark方法就是使一個許可變為可用 locksupport.park()可以相應中斷,但是不會拋出interruptedException,我們可以用Thread.interrupted等方法中獲取中斷標記.
public class LockSupportIntDemo { public static Object u = new Object(); static ChangeObjectThread t1 = new ChangeObjectThread("t1"); static ChangeObjectThread t2 = new ChangeObjectThread("t2"); public static class ChangeObjectThread extends Thread { public ChangeObjectThread(String name){ super.setName(name); } @Override public void run() { synchronized (u) { System.out.println("in "+getName()); LockSupport.park(); if(Thread.interrupted()){ //檢測到中斷位,并清除中斷狀態 System.out.println(getName()+" 被中斷了"); } if (Thread.currentThread().isInterrupted()){ //中斷狀態已被清除,無法檢測到 System.out.println(1); } } System.out.println(getName()+"執行結束"); } } public static void main(String[] args) throws InterruptedException { t1.start(); Thread.sleep(100); t2.start(); t1.interrupt(); LockSupport.unpark(t2); } } //輸出: //in t1 //t1 被中斷了 //t1執行結束 //in t2 //t2執行結束
Guava和Limiter限流
限流算法一般有兩種:漏桶算法和令牌桶算法
漏桶算法: 利用緩存區,所有請求進入系統,都在緩存區中保存,然后以固定的流速流出緩存區進行處理.
令牌桶算法: 桶中存放令牌,每個請求拿到令牌后才能進行處理,如果沒有令牌,請求要么等待,要么丟棄.RateLimiter就是采用這種算法
public class RateLimiterDemo { static RateLimiter limiter = RateLimiter.create(2); //每秒處理2個請求 public static class Task implements Runnable { @Override public void run() { System.out.println(System.currentTimeMillis()); } } public static void main(String args[]) throws InterruptedException { for (int i = 0; i < 50; i++) { limiter.acquire(); //過剩流量會等待 new Thread(new Task()).start(); } } } // 某些場景傾向于丟棄過剩流量,tryAcquire則是立即返回,不會阻塞 // for (int i = 0; i < 50; i++) { // if(!limiter.tryAcquire()) { // continue; // } // new Thread(new Task()).start(); // }2. 線程池
Executors框架
Executor框架提供了各種類型的線程池,主要有以下工廠方法:
//固定線程數量,當有新任務提交時,若池中有空閑線程則立即執行,若沒有空閑線程,任務會被暫存在一個任務隊列中,直到有空閑線程 public static ExecutorService newFixedThreadPool(int nThreads) //返回只有一個線程的線程池,多余任務被保存到一個任務隊列中,線程空閑時,按先入先出的順序執行隊列中的任務 public static ExecutorService newSingleThreadPoolExecutor() //線程數量不固定,優先使用空閑線程,多余任務會創建新線程 public static ExecutorService newCachedThreadPool() //線程數量為1,給定時間執行某任務,或周期性執行任務 public static ScheduledExecutorService newSingleThreadScheduledExecutor() //線程數量可以指定,定時或周期性執行任務 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
計劃任務:newScheduledThreadPool主要方法
//給定時間,對任務進行一次調度 public ScheduledFuture> schedule(Runnable command,long delay, TimeUnit unit); //周期調度,以任務完成后間隔固定時間調度下一個任務,(兩者相加) public ScheduledFuture> scheduleWithFixedDelay(Runnable command,long initialDelay, long delay, TimeUnit unit); //周期調度,兩個任務開始的時間差為固定間隔,如果任務時間大于間隔時間則以任務時間為準(兩者取其大者) public ScheduledFuture> scheduleAtFixedRate(Runnable command,long initialDelay, long period, TimeUnit unit);
注意: 任務異常時后續所有任務都將停止調度,因此必須保證所有任務異常均被正常處理.
核心線程池 ThreadPoolExecutor
ThreadPoolExecutor構造函數:
public ThreadPoolExecutor(int corePoolSize, //核心線程池大小 int maximumPoolSize, //最大線程池大小 long keepAliveTime, //線程池中超過corePoolSize數目的空閑線程最大存活時間;可以allowCoreThreadTimeOut(true)使得核心線程有效時間 TimeUnit unit, //keepAliveTime時間單位 BlockingQueueworkQueue, //阻塞任務隊列 ThreadFactory threadFactory, //新建線程工廠 RejectedExecutionHandler handler ) //當提交任務數超過maxmumPoolSize+workQueue之和時,任務會交給RejectedExecutionHandler來處理
workQueue指被提交但是未執行的任務隊列,是BlockingQueue接口的對象
1.直接提交隊列:SynchronousQueue,該隊列沒有容量,每個插入操作對應一個刪除操作,即提交的任務總是會交給線程執行,如果沒有空閑進程,則創建新線程,數量達最大則執行拒絕策略,一般需要設置很大的maximumPoolSize
2.有界任務隊列:ArrayBlockingQueue,有新任務時,若線程池的實際線程數小于corePoolSize,優先創建新線程,若大于corePoolSize,加入到等待隊列,若隊列已滿,不大于maximumPoolSize前提下,創建新線程執行;當且僅當等待隊列滿時才會創建新線程,否則數量一直維持在corePoolSize
3.無界任務隊列:LinkedBlockingQueue,小于corePoolSize時創建線程,達到corePoolSize則加入隊列直到資源消耗殆盡
4.優先任務隊列:PriorityBlockingQueue,特殊無界隊列,總是保證高優先級的任務先執行.
Executors分析
newFixedThreadPool: corePoolSize=maximumPoolSize,線程不會超過corePoolSize,使用LinkedBlockingQueue
newSingleThreadPoolExecutor: newFixedThreadPool的弱化版,corePoolSize只有1
newCachedThreadPool: corePoolSize=0,maximumPoolSize為無窮大,空閑線程60秒回收,使用SynchronousQueue隊列
ThreadPoolExecutor的execute()方法執行邏輯
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn"t, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { //檢查是否小于corePoolSize if (addWorker(command, true)) //添加線程,執行任務 return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { //添加進隊列 int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) //雙重校驗 reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) //提交線程池失敗 reject(command); //拒絕執行 }
拒絕策略
AbortPolicy:該策略會直接拋出異常,阻止系統正常工作。
CallerRunsPolicy:只要線程池未關閉,該策略直接在調用者線程中,運行當前被丟棄的任務。顯然這樣做不會真的丟棄任務,但是,任務提交線程的性能極有可 能會急劇下降。 任務,并嘗試再次提交當前任務。
DiscardOldestPolicy:該策略將丟棄最老的一個請求,也就是即將被執行的一個
DiscardPolicy:該策略默默地丟棄無法處理的任務,不予任何處理。如果允許任務丟失,我覺得這可能是最好的一種方案了吧!
自定義ThreadFactory
public static void main(String[] args) throws InterruptedException { MyTask task = new MyTask(); ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue(), new ThreadFactory(){ @Override public Thread newThread(Runnable r) { //自定義創建線程的方法 Thread t= new Thread(r); t.setDaemon(true); System.out.println("create "+t); return t; } } ); for (int i = 0; i < 5; i++) { es.submit(task); } Thread.sleep(2000); }
擴展線程池
public class ExtThreadPool { public static class MyTask implements Runnable { public String name; public MyTask(String name) { this.name = name; } @Override public void run() { System.out.println("正在執行" + ":Thread ID:" + Thread.currentThread().getId() + ",Task Name=" + name); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()) { @Override protected void beforeExecute(Thread t, Runnable r) { System.out.println("準備執行:" + ((MyTask) r).name); } @Override protected void afterExecute(Runnable r, Throwable t) { System.out.println("執行完成:" + ((MyTask) r).name); } @Override protected void terminated() { System.out.println("線程池退出"); } }; for (int i = 0; i < 5; i++) { MyTask task = new MyTask("TASK-GEYM-" + i); es.execute(task); Thread.sleep(10); } es.shutdown(); //等待所有任務執行完畢后再關閉 } }
異常堆棧消息
線程池中的異常堆棧可能不會拋出,需要我們自己去包裝
public class TraceThreadPoolExecutor extends ThreadPoolExecutor { public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueueworkQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } @Override public void execute(Runnable task) { super.execute(wrap(task, clientTrace(), Thread.currentThread() .getName())); } @Override public Future> submit(Runnable task) { return super.submit(wrap(task, clientTrace(), Thread.currentThread() .getName())); } private Exception clientTrace() { return new Exception("Client stack trace"); } private Runnable wrap(final Runnable task, final Exception clientStack, String clientThreadName) { return new Runnable() { @Override public void run() { try { task.run(); //外層包裹trycatch,即可打印出異常 } catch (Exception e) { clientStack.printStackTrace(); throw e; } } }; } }
Fork/Join框架
類似于mapreduce,用于大數據量,fork()創造子線程,join表示等待,
public class CountTask extends RecursiveTask{ private static final int THRESHOLD = 10000; //任務分解規模 private long start; private long end; public CountTask(long start,long end){ this.start=start; this.end=end; } @Override public Long compute(){ long sum=0; boolean canCompute = (end-start) subTasks=new ArrayList (); long pos=start; for(int i=0;i<100;i++){ long lastOne=pos+step; if(lastOne>end)lastOne=end; //最后一個任務可能小于step,故需要此步 CountTask subTask=new CountTask(pos,lastOne); //子任務 pos+=step+1; //調整下一個任務 subTasks.add(subTask); subTask.fork(); //fork子任務 } for(CountTask t:subTasks){ sum+=t.join(); //聚合任務 } } return sum; } public static void main(String[]args){ ForkJoinPool forkJoinPool = new ForkJoinPool(); CountTask task = new CountTask(0,200000000000L); ForkJoinTask result = forkJoinPool.submit(task); try{ long res = result.get(); System.out.println("sum="+res); }catch(InterruptedException e){ e.printStackTrace(); }catch(ExecutionException e){ e.printStackTrace(); } } }
注意: 如果任務的劃分層次很多,一直得不到返回,可能有兩種原因: 1.系統內線程數量越積越多,導致性能嚴重下降 2.函數調用層次變多,導致棧溢出
Guava對線程池的拓展
1.特殊的DirectExecutor線程池
Executor executor=MoreExecutors.directExecutor(); // 僅在當前線程運行,用于抽象
2.Daemon線程池
提供將普通線程轉換為Daemon線程.很多情況下,我們不希望后臺線程池阻止程序的退出
public class MoreExecutorsDemo2 { public static void main(String[] args) { ThreadPoolExecutor exceutor = (ThreadPoolExecutor)Executors.newFixedThreadPool(2); MoreExecutors.getExitingExecutorService(exceutor); exceutor.execute(() -> System.out.println("I am running in " + Thread.currentThread().getName())); } }
3.future模式擴展
待續....
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/75977.html
摘要:的并發容器并發集合這是一個高效的并發你可以把它理解為一個線程安全的。可以看作一個線程安全的這是一個接口,內部通過鏈表數組等方式實現了這個接口。 3. JDK的并發容器 并發集合 ConcurrentHashMap:這是一個高效的并發HashMap.你可以把它理解為一個線程安全的HashMap。 CopyOnWriteArrayList:這是一個List,從名字看就知道它和Ar...
摘要:大家好,我是冰河有句話叫做投資啥都不如投資自己的回報率高。馬上就十一國慶假期了,給小伙伴們分享下,從小白程序員到大廠高級技術專家我看過哪些技術類書籍。 大家好,我是...
摘要:有時候,由于初期考慮不周,或者后期的需求變化,一些普通變量可能也會有線程安全的需求。它可以讓你在不改動或者極少改動原有代碼的基礎上,讓普通的變量也享受操作帶來的線程安全性,這樣你可以修改極少的代碼,來獲得線程安全的保證。 有時候,由于初期考慮不周,或者后期的需求變化,一些普通變量可能也會有線程安全的需求。如果改動不大,我們可以簡單地修改程序中每一個使用或者讀取這個變量的地方。但顯然,這...
摘要:實戰高并發程序設計推薦豆瓣評分書的質量沒的說,推薦大家好好看一下。推薦,豆瓣評分,人評價本書介紹了在編程中條極具實用價值的經驗規則,這些經驗規則涵蓋了大多數開發人員每天所面臨的問題的解決方案。 很早就想把JavaGuide的書單更新一下了,昨晚加今天早上花了幾個時間對之前的書單進行了分類和補充完善。雖是終極版,但一定還有很多不錯的 Java 書籍我沒有添加進去,會繼續完善下去。希望這篇...
閱讀 3115·2023-04-25 15:02
閱讀 2806·2021-11-23 09:51
閱讀 2030·2021-09-27 13:47
閱讀 1984·2021-09-13 10:33
閱讀 957·2019-08-30 15:54
閱讀 2640·2019-08-30 15:53
閱讀 2853·2019-08-29 13:58
閱讀 881·2019-08-29 13:54