摘要:最開(kāi)始是使用的正常的普通方式去寫(xiě)入,但是量太大了,所以就嘗試使用多線程來(lái)寫(xiě)入。下面我們就來(lái)介紹一下怎么使用多線程進(jìn)行導(dǎo)入。配置線程池我們需要?jiǎng)?chuàng)建一個(gè)類(lèi)來(lái)設(shè)置線程池的各種配置。它可以使主線程一直等到所有的子線程執(zhí)行完之后再執(zhí)行。
前言:
最近在工作中需要將一大批數(shù)據(jù)導(dǎo)入到數(shù)據(jù)庫(kù)中,因?yàn)榉N種原因這些數(shù)據(jù)不能使用同步數(shù)據(jù)的方式來(lái)進(jìn)行復(fù)制,而是提供了一批文本,文本里面有很多行url地址,需要的字段都包含在這些url中。最開(kāi)始是使用的正常的普通方式去寫(xiě)入,但是量太大了,所以就嘗試使用多線程來(lái)寫(xiě)入。下面我們就來(lái)介紹一下怎么使用多線程進(jìn)行導(dǎo)入。
1.文本格式格式就是類(lèi)似于這種格式的url,當(dāng)然這里只是舉個(gè)例子,大概有300多個(gè)文本,每個(gè)文本里面有大概25000條url,而每條url要插入兩個(gè)表,這個(gè)量還是有點(diǎn)大的,單線程跑的非常慢。
https://www.test.com/?type=1&code=123456&goodsId=321
2.springboot配置線程池我們需要?jiǎng)?chuàng)建一個(gè)ExecutorConfig類(lèi)來(lái)設(shè)置線程池的各種配置。
@Configuration @EnableAsync public class ExecutorConfig { private static Logger logger = LogManager.getLogger(ExecutorConfig.class.getName()); @Bean public Executor asyncServiceExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //配置核心線程數(shù) executor.setCorePoolSize(5); //配置最大線程數(shù) executor.setMaxPoolSize(10); //配置隊(duì)列大小 executor.setQueueCapacity(400); //配置線程池中的線程的名稱(chēng)前綴 executor.setThreadNamePrefix("thread-"); // rejection-policy:當(dāng)pool已經(jīng)達(dá)到max size的時(shí)候,如何處理新任務(wù) // CALLER_RUNS:不在新線程中執(zhí)行任務(wù),而是有調(diào)用者所在的線程來(lái)執(zhí)行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //執(zhí)行初始化 executor.initialize(); return executor; } }3.創(chuàng)建異步任務(wù)接口
我們需要?jiǎng)?chuàng)建一個(gè)接口,再這個(gè)接口里面聲明了我們需要調(diào)用的異步方法
public interface AsyncService { /** * 執(zhí)行異步任務(wù) */ void writeTxt(); }4.創(chuàng)建異步實(shí)現(xiàn)類(lèi)
再創(chuàng)建一個(gè)異步類(lèi)實(shí)現(xiàn)上面的異步接口,重寫(xiě)接口里面的方法,最重要的是我們需要在方法上加@Async("asyncServiceExecutor")注解,它是剛剛我們?cè)诰€程池配置類(lèi)的里的那個(gè)配制方法的名字,加上這個(gè)后每次執(zhí)行這個(gè)方法都會(huì)開(kāi)啟一個(gè)線程放入線程池中。我下面這個(gè)方法是開(kāi)啟多線程遍歷文件夾中的文件然后為每個(gè)文件都復(fù)制一個(gè)副本出來(lái)。
@Service public class AsyncServiceImpl implements AsyncService { private static Logger logger = LogManager.getLogger(AsyncServiceImpl.class.getName()); @Async("asyncServiceExecutor") public void writeTxt(String fileName){ logger.info("線程-" + Thread.currentThread().getId() + "在執(zhí)行寫(xiě)入"); try { File file = new File(fileName); Listlines = FileUtils.readLines(file); File copyFile = new File(fileName + "_copy.txt"); lines.stream().forEach(string->{ try { FileUtils.writeStringToFile(copyFile,string,"utf8",true); FileUtils.writeStringToFile(copyFile," ","utf8",true); } catch (IOException e) { logger.info(e.getMessage()); } }); }catch (Exception e) { logger.info(e.getMessage()); } } }
@RunWith(SpringRunner.class) @SpringBootTest public class BootApplicationTests { @Autowired private AsyncService asyncService; @Test public void write() { File file = new File("F://ac_code_1//test.txt"); try { FileUtils.writeStringToFile(file, "ceshi", "utf8"); FileUtils.writeStringToFile(file, " ", "utf8"); FileUtils.writeStringToFile(file, "ceshi2", "utf8"); } catch (IOException e) { e.printStackTrace(); } }5.修改為阻塞式
上面的步驟已經(jīng)基本實(shí)現(xiàn)了多線程的操作,但是當(dāng)我真的開(kāi)始導(dǎo)入數(shù)據(jù)的時(shí)候又發(fā)現(xiàn)一個(gè)問(wèn)題,就是每次運(yùn)行后才剛開(kāi)始導(dǎo)入就自動(dòng)停止了,原因是我在Junit中運(yùn)行了代碼后它雖然開(kāi)始導(dǎo)入了,但是因?yàn)閿?shù)據(jù)很多時(shí)間很長(zhǎng),而Juint跑完主線程的邏輯后就把整個(gè)JVM都關(guān)掉了,所以導(dǎo)入了一點(diǎn)點(diǎn)就停止了,上面的測(cè)試方法之所以沒(méi)問(wèn)題是因?yàn)閹讉€(gè)文件的復(fù)制速度很快,在主線程跑完之前就跑完了,所以看上去沒(méi)問(wèn)題。最開(kāi)始我用了一個(gè)最笨的方法,直接在主線程最后調(diào)用Thread.sleep()方法,雖然有效果但是這也太low了,而且你也沒(méi)法判斷到底數(shù)據(jù)導(dǎo)完沒(méi)有。所以我又換了一個(gè)方式。
6.使用countDownLatch阻塞主線程CountDownLatch是一個(gè)同步工具類(lèi),它允許一個(gè)或多個(gè)線程一直等待,直到其他線程執(zhí)行完后再執(zhí)行。它可以使主線程一直等到所有的子線程執(zhí)行完之后再執(zhí)行。我們修改下代碼,創(chuàng)建一個(gè)CountDownLatch實(shí)例,大小是所有運(yùn)行線程的數(shù)量,然后在異步類(lèi)的方法中的finally里面對(duì)它進(jìn)行減1,在主線程最后調(diào)用await()方法,這樣就能確保所有的子線程運(yùn)行完后主線程才會(huì)繼續(xù)執(zhí)行。
@RunWith(SpringRunner.class) @SpringBootTest public class BootApplicationTests { private final CountDownLatch countDownLatch = new CountDownLatch(10); @Autowired private AsyncService asyncService; @Test public void mainWait() { try { for (int i = 0; i < 10; i++) { asyncService.mainWait(countDownLatch); } countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } }
@Service public class AsyncServiceImpl implements AsyncService { private static Logger logger = LogManager.getLogger(AsyncServiceImpl.class.getName()); @Override @Async("asyncServiceExecutor") public void mainWait(CountDownLatch countDownLatch) { try { System.out.println("線程" + Thread.currentThread().getId() + "開(kāi)始執(zhí)行"); for (int i=1;i<1000000000;i++){ Integer integer = new Integer(i); int l = integer.intValue(); for (int x=1;x<10;x++){ Integer integerx = new Integer(x); int j = integerx.intValue(); } } System.out.println("線程" + Thread.currentThread().getId() + "執(zhí)行結(jié)束"); } catch (Exception e) { e.printStackTrace(); } finally { countDownLatch.countDown(); } } }7.導(dǎo)入代碼
雖然上面的多線程是重點(diǎn),不過(guò)還是把導(dǎo)入數(shù)據(jù)的代碼展示出來(lái)給大家參考一下,當(dāng)然這是簡(jiǎn)化版,真實(shí)的要比這個(gè)多了很多判斷,不過(guò)那都是基于業(yè)務(wù)需求做的判斷。
@RunWith(value = SpringRunner.class) @SpringBootTest public class ApplicationTests { private static Log logger = LogFactory.getLog(ApplicationTests.class); private final CountDownLatch countDownLatch; @Autowired AsyncService asyncService; @Test public void writeCode() { try { File file = new File("F:ac_code_1"); File[] files = file.listFiles(); //計(jì)數(shù)器數(shù)量就等于文件數(shù)量,因?yàn)槊總€(gè)文件會(huì)開(kāi)一個(gè)線程 countDownLatch = new CountDownLatch(files.length); Arrays.stream(files).forEach(file1 -> { File child = new File(file1.getAbsolutePath()); String fileName = child.getAbsolutePath(); logger.info(asyncService.writeCode(fileName,countDownLatch)); }); countDownLatch.await(); catch (InterruptedException e) { e.printStackTrace(); } } }
@Service public class AsyncServiceImpl implements AsyncService { private static Log logger = LogFactory.getLog(AsyncServiceImpl.class); @Autowired IExampleService exampleService; @Override @Async("asyncServiceExecutor") public String writeCode(String fileName,CountDownLatch countDownLatch) { logger.info("線程-" + Thread.currentThread().getId() + "在導(dǎo)入-" + fileName); try { File file = new File(fileName); List總結(jié):list = FileUtils.readLines(file); for (String string : list) { String[] parmas = string.split(","); ExampleVo vo = new ExampleVo(); vo.setParam1(parmas[0]); vo.setParam1(parmas[1]); vo.setParam1(parmas[2]); exampleService.save(vo); } return "導(dǎo)入完成-" + fileName; }catch (Exception e){ e.printStackTrace(); return null; }finally { //導(dǎo)入完后減1 countDownLatch.countDown(); } } }
到這里就已經(jīng)講完了多線程插入數(shù)據(jù)的方法,目前這個(gè)方法還很簡(jiǎn)陋。因?yàn)槭敲總€(gè)文件都開(kāi)一個(gè)線程性能消耗比較大,而且如果線程線程池的線程配置太多了,頻繁切換反而會(huì)變得很慢,大家如果有更好的辦法都可以留言討論。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/77066.html
摘要:對(duì)多線程的支持詳解這兩天看阿里的開(kāi)發(fā)手冊(cè),到多線程的時(shí)候說(shuō)永遠(yuǎn)不要用這種方式來(lái)使用多線程。在使用線程池的大多數(shù)情況下都是異步非阻塞的。二配置類(lèi)配置類(lèi)代碼如下下午解讀利用來(lái)開(kāi)啟對(duì)于異步任務(wù)的支持配置類(lèi)實(shí)現(xiàn)接口,返回一個(gè)線程池對(duì)象。 Springboot對(duì)多線程的支持詳解 這兩天看阿里的JAVA開(kāi)發(fā)手冊(cè),到多線程的時(shí)候說(shuō)永遠(yuǎn)不要用 new Thread()這種方式來(lái)使用多線程。確實(shí)是這樣的...
摘要:也是自帶的一個(gè)基于線程池設(shè)計(jì)的定時(shí)任務(wù)類(lèi)。其每個(gè)調(diào)度任務(wù)都會(huì)分配到線程池中的一個(gè)線程執(zhí)行,所以其任務(wù)是并發(fā)執(zhí)行的,互不影響。 原創(chuàng)不易,如需轉(zhuǎn)載,請(qǐng)注明出處https://www.cnblogs.com/baixianlong/p/10659045.html,否則將追究法律責(zé)任?。?! 一、在JAVA開(kāi)發(fā)領(lǐng)域,目前可以通過(guò)以下幾種方式進(jìn)行定時(shí)任務(wù) 1、單機(jī)部署模式 Timer:jdk中...
摘要:馬蜂窩推薦系統(tǒng)對(duì)于請(qǐng)求的平均處理時(shí)延要求在級(jí)別,時(shí)延的線保持在以?xún)?nèi)。任務(wù)隊(duì)列與異步寫(xiě)入這里我們使用了中的線程池來(lái)實(shí)現(xiàn)。三優(yōu)化方向基于和,我們?cè)诂F(xiàn)有的推薦系統(tǒng)中增加了一個(gè)本地容災(zāi)緩存系統(tǒng),當(dāng)依賴(lài)服務(wù)或者應(yīng)用本身突發(fā)異常時(shí)可以返回緩存的數(shù)據(jù)。 數(shù)據(jù)庫(kù)突然斷開(kāi)連接、第三方接口遲遲不返回結(jié)果、高峰期網(wǎng)絡(luò)發(fā)生抖動(dòng)...... 當(dāng)程序突發(fā)異常時(shí),我們的應(yīng)用可以告訴調(diào)用方或者用戶(hù)「對(duì)不起,服務(wù)器出了...
摘要:馬蜂窩推薦系統(tǒng)對(duì)于請(qǐng)求的平均處理時(shí)延要求在級(jí)別,時(shí)延的線保持在以?xún)?nèi)。任務(wù)隊(duì)列與異步寫(xiě)入這里我們使用了中的線程池來(lái)實(shí)現(xiàn)。三優(yōu)化方向基于和,我們?cè)诂F(xiàn)有的推薦系統(tǒng)中增加了一個(gè)本地容災(zāi)緩存系統(tǒng),當(dāng)依賴(lài)服務(wù)或者應(yīng)用本身突發(fā)異常時(shí)可以返回緩存的數(shù)據(jù)。數(shù)據(jù)庫(kù)突然斷開(kāi)連接、第三方接口遲遲不返回結(jié)果、高峰期網(wǎng)絡(luò)發(fā)生抖動(dòng)...... 當(dāng)程序突發(fā)異常時(shí),我們的應(yīng)用可以告訴調(diào)用方或者用戶(hù)「對(duì)不起,服務(wù)器出了點(diǎn)問(wèn)題」...
閱讀 3250·2023-04-25 22:47
閱讀 3765·2021-10-11 10:59
閱讀 2300·2021-09-07 10:12
閱讀 4243·2021-08-11 11:15
閱讀 3432·2019-08-30 13:15
閱讀 1750·2019-08-30 13:00
閱讀 968·2019-08-29 14:02
閱讀 1680·2019-08-26 13:57