摘要:年月日上午阿里云消息服,隊列消息發送以及消費的并發測試解析配置文件二者等價線程數并發數程序入口準備工作發送消息線程池一個計數信號量。但是,不使用實際的許可對象,只對可用許可的號碼進行計數,并采取相應的行動。
package com.study.mq.aliyunmns; import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.net.URL; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import org.apache.commons.lang3.SystemUtils; import org.apache.log4j.Logger; import org.apache.log4j.PropertyConfigurator; import com.aliyun.mns.client.CloudAccount; import com.aliyun.mns.client.CloudQueue; import com.aliyun.mns.client.MNSClient; import com.aliyun.mns.common.http.ClientConfiguration; import com.aliyun.mns.model.Message; /** * * @author wangkai * @2016年11月22日 上午11:27:14 * @desc:阿里云消息服(MNS),隊列消息發送以及消費的并發測試 * https://www.aliyun.com/product/mns?spm=5176.8142029 * .388261.80.fNnCkg */ public class MnsQueueAppV2 { private static Logger LOG = Logger.getLogger(MnsQueueAppV2.class.getName()); private static MNSClient client = null; // private static AtomicLong totalCount = new AtomicLong(0); private static String endpoint = null; private static String accessId = null; private static String accessKey = null; private static String queueName = "articlepricinglog"; private static int threadNum = 100; private static int clientNum = 10000; // private static int totalSeconds = 180; private static String log4jConfPath = "./log4j.properties"; static { PropertyConfigurator.configureAndWatch(log4jConfPath); } /** * 解析配置文件 * * @return */ @SuppressWarnings("unused") protected static boolean parseConf() { // URL resource = // MnsQueueAppV2.class.getClassLoader().getResource("name.properties"); String confFilePath = SystemUtils.getUserDir() + SystemUtils.FILE_SEPARATOR + "src/main/resources/mns.properties"; URL resource = MnsQueueAppV2.class.getResource("/mns.properties"); URL resource2 = MnsQueueAppV2.class.getClassLoader().getResource( "mns.properties");// 二者等價 BufferedInputStream bis = null; try { bis = new BufferedInputStream(new FileInputStream(confFilePath)); if (bis == null) { LOG.info("ConfFile not opened: " + confFilePath); return false; } } catch (FileNotFoundException e) { LOG.error("ConfFile not found: " + confFilePath, e); return false; } // load file Properties properties = new Properties(); try { properties.load(bis); } catch (IOException e) { LOG.error("Load ConfFile Failed: " + e.getMessage()); return false; } finally { try { bis.close(); } catch (Exception e) { // do nothing } } // init the member parameters endpoint = properties.getProperty("Endpoint"); LOG.info("Endpoint: " + endpoint); accessId = properties.getProperty("AccessId"); LOG.info("AccessId: " + accessId); accessKey = properties.getProperty("AccessKey"); queueName = properties.getProperty("QueueName", queueName); LOG.info("QueueName: " + queueName); threadNum = Integer.parseInt(properties.getProperty("ThreadNum", String.valueOf(threadNum))); LOG.info("ThreadNum: 線程數" + threadNum); clientNum = Integer.parseInt(properties.getProperty("ClientNum", String.valueOf(clientNum))); LOG.info("ClientNum: 并發數" + clientNum); // totalSeconds = // Integer.parseInt(properties.getProperty("TotalSeconds", // String.valueOf(totalSeconds))); // LOG.info("TotalSeconds: " + totalSeconds); return true; } /** * 程序入口 * * @param args * @throws InterruptedException */ public static void main(String[] args) throws InterruptedException { // 準備工作 if (!parseConf()) { return; } ClientConfiguration clientConfiguration = new ClientConfiguration(); clientConfiguration.setMaxConnections(threadNum); clientConfiguration.setMaxConnectionsPerRoute(threadNum); CloudAccount cloudAccount = new CloudAccount(accessId, accessKey, endpoint, clientConfiguration); client = cloudAccount.getMNSClient(); LOG.info("發送消息"); // 線程池 ExecutorService exec = Executors.newFixedThreadPool(500); /** * Semaphore 一個計數信號量。從概念上講,信號量維護了一個許可集。如有必要,在許可可用前會阻塞每一個 * acquire(),然后再獲取該許可。每個 release() * 添加一個許可,從而可能釋放一個正在阻塞的獲取者。但是,不使用實際的許可對象,Semaphore 只對可用許可的號碼進行計數 * ,并采取相應的行動。拿到信號量的線程可以進入代碼,否則就等待。通過acquire()和release()獲取和釋放訪問許可。 */ final Semaphore semp = new Semaphore(threadNum);// ["sem?f??] final Semaphore semaphore = new Semaphore(10, true); // 拿到信號量的線程可以進入代碼,否則就等待 // Semaphore(信號量)是用來控制同時訪問特定資源的線程數量,它通過協調各個線程,以保證合理的使用公共資源 // 輔助理解 :很多年以來,我都覺得從字面上很難理解Semaphore所表達的含義,只能把它比作是控制流量的紅綠燈, // 比如XX馬路要限制流量,只允許同時有一百輛車在這條路上行使,其他的都必須在路口等待,所以前一百輛車會看到綠燈, // 可以開進這條馬路,后面的車會看到紅燈,不能駛入XX馬路,但是如果前一百輛中有五輛車已經離開了XX馬路, // 那么后面就允許有5輛車駛入馬路,這個例子里說的車就是線程,駛入馬路就表示線程在執行,離開馬路就表示線程執行完成,看見紅燈就表示線程被阻塞,不能執行。 long startTime = System.currentTimeMillis(); // 開啟時間 /** * 原理: * 更進一步,信號量的特性如下:信號量是一個非負整數(車位數),所有通過它的線程(車輛)都會將該整數減一(通過它當然是為了使用資源), * 當該整數值為零時,所有試圖通過它的線程都將處于等待狀態。在信號量上我們定義兩種操作: Wait(等待) 和 Release(釋放)。 * 當一個線程調用Wait * (等待)操作時,它要么通過然后將信號量減一,要么一直等下去,直到信號量大于一或超時。Release(釋放)實際上是在信號量上執行加操作 * ,對應于車輛離開停車場,該操作之所以叫做“釋放”是因為加操作實際上是釋放了由信號量守護的資源。 */ // 開始 for (int index = 0; index < clientNum; index++) { // final int NO = index; Runnable task = new Runnable() { public void run() { try { semp.acquire();// 獲取許可 try { // 獲取queue CloudQueue queue = client.getQueueRef(queueName); // 組裝消息 Message message = new Message(); message.setMessageBody("Test"); // 發送消息 queue.putMessage(message); } catch (Exception e) { e.printStackTrace(); } semp.release();// 歸還許可 } catch (Exception e) { e.printStackTrace(); } } }; exec.submit(task); } long endTime = System.currentTimeMillis(); // 開啟時間 exec.shutdown(); LOG.info(clientNum + " 的并發發送消息總耗時:>>>" + (endTime - startTime) + " ms"); LOG.info(clientNum + " 的并發發送消息 QPS為:>>>" + (clientNum * 1000) / (endTime - startTime) + " q/s"); LOG.info("接收消息"); Thread.sleep(3000); ExecutorService exec2 = Executors.newFixedThreadPool(500); final Semaphore semp2 = new Semaphore(threadNum); long startTime2 = System.currentTimeMillis(); // 開啟時間 for (int index = 0; index < clientNum; index++) { // final int NO = index; Runnable task = new Runnable() { public void run() { try { semp2.acquire(); try { // 獲取queue CloudQueue queue = client.getQueueRef(queueName); // 獲取消息 Message message = queue.popMessage(); // 刪掉消息 if (message != null) queue.deleteMessage(message.getReceiptHandle()); } catch (Exception e) { e.printStackTrace(); } semp2.release(); } catch (Exception e) { e.printStackTrace(); } } }; exec2.submit(task); } long endTime2 = System.currentTimeMillis(); // 開啟時間 exec2.shutdown(); // 忽略線程切換的耗時 精確的做法? LOG.info(clientNum + " 的并發接收消息總耗時:>>>" + (endTime2 - startTime2)
+ " ms"); LOG.info(clientNum + " 的并發接收消息 QPS為:>>>" + (clientNum * 1000) / (endTime2 - startTime2) + " q/s"); }
}
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/66396.html
摘要:多線程工具箱之前言這一篇談一下信號量。信息信息信息信息信息信息信息信息信息信息信息小結適用于多線程請求數量資源的場景,但無法解決單多個線程對同一資源訪問的競爭性訪問。在后面我們在我們的多線程工具箱里面陸續會提到。 Java多線程工具箱之Semaphore 前言 這一篇談一下Semaphore:信號量。 將Semaphore類比為為信號燈,被繼承Runable的線程類比為列車:理解信號量...
摘要:在每個線程獲取之前,必須先從信號量獲取許可。注意,因為同時可能發生取消,所以返回并不保證有其他線程等待獲取許可。該值僅是估計的數字,因為在此方法遍歷內部數據結構的同時,線程的數目可能動態地變化。 本人郵箱: 歡迎轉載,轉載請注明網址 http://blog.csdn.net/tianshi_kcogithub: https://github.com/kco1989/kco代碼已經全部托...
摘要:將屏障重置為其初始狀態。注意,在由于其他原因造成損壞之后,實行重置可能會變得很復雜此時需要使用其他方式重新同步線程,并選擇其中一個線程來執行重置。 安全共享對象策略 1.線程限制 : 一個被線程限制的對象,由線程獨占,并且只能被占有它的線程修改2.共享只讀 : 一個共享只讀的對象,在沒有額外同步的情況下,可以被多個線程并發訪問,但是任何線程都不能修改它3.線程安全對象 : 一個線程安全...
摘要:前言之前學多線程的時候沒有學習線程的同步工具類輔助類。而其它線程完成自己的操作后,調用使計數器減。信號量控制一組線程同時執行。 前言 之前學多線程的時候沒有學習線程的同步工具類(輔助類)。ps:當時覺得暫時用不上,認為是挺高深的知識點就沒去管了.. 在前幾天,朋友發了一篇比較好的Semaphore文章過來,然后在瀏覽博客的時候又發現面試還會考,那還是挺重要的知識點。于是花了點時間去了解...
摘要:所以得出結論需要分配較多的線程進行讀數據,較少的線程進行寫數據。注意多線程編程對實際環境和需求有很大的依賴,需要根據實際的需求情況對各個參數做調整。 背景 最近對于 Java 多線程做了一段時間的學習,筆者一直認為,學習東西就是要應用到實際的業務需求中的。否則要么無法深入理解,要么硬生生地套用技術只是達到炫技的效果。 不過筆者仍舊認為自己對于多線程掌握不夠熟練,不敢輕易應用到生產代碼中...
閱讀 1883·2021-11-22 09:34
閱讀 3010·2021-09-28 09:35
閱讀 13375·2021-09-09 11:34
閱讀 3594·2019-08-29 16:25
閱讀 2820·2019-08-29 15:23
閱讀 2035·2019-08-28 17:55
閱讀 2424·2019-08-26 17:04
閱讀 3044·2019-08-26 12:21