摘要:三分布式鎖這部分是重要功能,在此基礎上實現諸如,分布式協調通知,負載均衡,選舉等復雜場景。針對此情況,改進后判斷讀寫順序為創建完臨時順序節點后,獲取下的所有子節點。
注:該文章用作回顧記錄
一、準備工作預先下載安裝 ZooKeeper ,簡單配置就能使用了。然后構建 Maven 項目,將下面的代碼粘貼到 pom.xml中:
org.apache.zookeeper zookeeper 3.4.5 com.101tec zkclient 0.5
zkclient 是開源的客戶端工具,其中封裝了很多功能,比如:刪除包含子節點的父節點,連接重試,異步回調,偏向 Java 寫法的注冊監聽等,極大地方便了用戶使用。
下面不過多介紹客戶端操作,只針對應用場景做介紹,該文章會隨著本人的學習持續補充。
二、數據發布/訂閱使用 ZooKeeper 節點監聽來實現該功能:
ZkClient zkClient = new ZkClient("IP1:Port1,IP2:Port2,IP3:Port3", 5000); // 連接集群 zkClient.createPersistent("/xxx/xxx"); // 創建持久節點 // 注冊子節點變更監聽,當子節點改變(比如創建了"/xxx/xxx/1")或當前節點刪除等,會觸發異步回調 zkClient.subscribeChildChanges("/xxx/xxx", new IZkChildListener() { @Override public void handleChildChange(String parentPath, ListcurrentChilds) throws Exception { } });
下面為部分源碼:
package org.I0Itec.zkclient; public class ZkClient implements Watcher { public ListwatchForChilds(final String path) { return retryUntilConnected(new Callable >() { @Override public List
call() throws Exception { exists(path, true); try { return getChildren(path, true); } catch (ZkNoNodeException e) { } return null; } }); } public T retryUntilConnected(Callable callable) throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { final long operationStartTime = System.currentTimeMillis(); while (true) { if (_closed) { throw new IllegalStateException("ZkClient already closed!"); } try { return callable.call(); } catch (Exception e) { throw ExceptionUtil.convertToRuntimeException(e); } } } }
基于 ZooKeeper 實現的數據發布/訂閱很簡單吧,快動手試試。
三、分布式鎖這部分是 ZooKeeper 重要功能,在此基礎上實現諸如,分布式協調/通知,負載均衡,Master選舉等復雜場景。
1、排它鎖排它鎖又稱為寫鎖或獨占鎖。比如事務 T1 對數據對象 O1 加了排它鎖,那么在整個加鎖期間,只允許 T1 對 O1 進行讀取或更新操作,其它事務都不能對 O1 操作。
1)獲取鎖
所有客戶端都創建臨時節點 zkClient.createEphemeral("/xxx/xxx", null);,ZooKeeper 會保證在所有客戶端中,最終只有一個客戶端能創建成功,那么就認為該客戶端獲取了鎖。同時,所有沒獲取到鎖的客戶端需在/xxx/xxx 上注冊子節點變更監聽,以便實時監聽節點變化。如節點發生變化,則未獲取到鎖的客戶端再重新獲取鎖。
private static ZkClient zkClient = new ZkClient("IP1:Port1,IP2:Port2,IP3:Port3", 5000); private static final String lockParentPath = "/zk-book/exclusice_lock"; public static void main(String[] args) throws InterruptedException { try { zkClient.createEphemeral(lockParentPath + "/lock"); System.out.println("service3 獲取鎖成功"); } catch (Exception e) { System.out.println("service3獲取鎖失敗"); zkClient.subscribeChildChanges(lockParentPath, new IZkChildListener() { @Override public void handleChildChange(String parentPath, ListcurrentChilds) throws Exception { System.out.println("service3再次獲取鎖"); main(null); } }); } Thread.sleep(Integer.MAX_VALUE); }
2)釋放鎖
當 "/xxx/xxx" 是臨時節點時,以下倆種情況都會釋放鎖。
當已獲取鎖的客戶機宕機,導致連接超時斷開,那么 ZooKeeper 會將臨時節點刪除。
正常執行完邏輯后,客戶端主動將臨時節點刪除。
2、共享鎖共享鎖又稱為讀鎖。如果事務 T1 對數據對象 O1 加了共享鎖,那么 T1 只能對 O1 進行讀取操作,其它事務只能對 O1 加共享鎖,直到 O1 上所有共享鎖都被釋放。
1)獲取鎖
所有客戶端都創建臨時順序節點 zkClient.createEphemeralSequential("/xxx/xxx", null);,ZooKeeper 會生成類似下面的節點,已保證節點的唯一性。
2)判斷讀寫順序
創建完臨時順序節點后,獲取 "/xxx" 下的所有子節點,并對該節點注冊子節點變更監聽。
確定創建完的臨時順序節點在所有節點中的順序。
對于讀節點:
沒有比自己序號小的節點,或比自己序號小的節點都是讀節點,則成功獲取到共享鎖。
如果比自己序號小的節點中存在寫節點,則需進入等待。
對于寫節點:
如果自己不是序號最小的節點,則需進入等待。
接受到子節點變更通知后,重復步驟1
以下為實現代碼:
import java.io.IOException; import java.util.Collections; import java.util.Comparator; import java.util.List; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.ZkClient; import org.apache.http.client.ClientProtocolException; /** * 分布式共享鎖 * @author alexnevsky * @date 2018年5月23日 */ public class SharedLock { private static ZkClient zkClient = new ZkClient("IP1:Port1,IP2:Port2,IP3:Port3", 5000); private static final String PARENT_PATH = "/zk-book/shared_lock"; private static volatile boolean isExecuted = false; public static void main(String[] args) throws InterruptedException, ClientProtocolException, IOException { String nodeTemp = zkClient.createEphemeralSequential(PARENT_PATH + "/w-", null); String node = nodeTemp.substring(nodeTemp.lastIndexOf("/") + 1); ListcurrentChilds = sortNodes(zkClient.getChildren(PARENT_PATH)); if (currentChilds.size() > 0) isExecuted = getLockAndExecute(currentChilds, node); zkClient.subscribeChildChanges(PARENT_PATH, new IZkChildListener() { @Override public void handleChildChange(String parentPath, List currentChilds) throws Exception { if (currentChilds.size() > 0) { currentChilds = sortNodes(currentChilds); isExecuted = getLockAndExecute(currentChilds, node); } } }); while (!isExecuted) { Thread.sleep(Integer.MAX_VALUE); } } /** * 排序節點 * @author alexnevsky * @date 2018年5月24日 * @param nodes * @return */ private static List sortNodes(List nodes) { Collections.sort(nodes, new Comparator () { @Override public int compare(String o1, String o2) { o1 = o1.indexOf("r-") > -1 ? o1.replaceFirst("r-", "") : o1.replaceFirst("w-", ""); o2 = o2.indexOf("r-") > -1 ? o2.replaceFirst("r-", "") : o2.replaceFirst("w-", ""); return Integer.parseInt(o1) - Integer.parseInt(o2); // 比較序列號 } }); return nodes; } /** * 獲取節點位置 * @author alexnevsky * @date 2018年5月24日 * @param nodes * @param node * @return */ private static Integer getNodePosition(List nodes, String node) { for (int i = 0, size = nodes.size(); i < size; i++) { if (nodes.get(i).equals(node)) return i; } return null; // 無此數據 } /** * 是否得到鎖 * @author alexnevsky * @date 2018年5月24日 * @param nodes * @param node * @param nodePosition * @return */ private static boolean isGetLock(List nodes, String node, int nodePosition) { if (nodePosition == 0) // 沒有比此序號更小的節點 return true; if (node.indexOf("r-") > -1) { // 讀節點 for (int i = 0; i < nodePosition; i++) { // 遍歷小于次序號的節點 String nodeTemp = nodes.get(i); if (nodeTemp.indexOf("w-") > -1) // 存在寫節點,則進入等待鎖 return false; } return true; } return false; } /** * 獲取鎖并執行 * @author alexnevsky * @date 2018年5月24日 * @param currentChilds * @param node * @return */ private static boolean getLockAndExecute(List currentChilds, String node) { Integer nodePosition = getNodePosition(currentChilds, node); if (nodePosition == null) // 子節點為空 return false; System.out.println("子節點:" + currentChilds.toString() + ", " + node + " 的位置:" + nodePosition); boolean isGetLock = isGetLock(currentChilds, node, nodePosition); if (isGetLock) { System.out.println(node + " 成功獲取到鎖,開始執行耗時任務"); doSomething(); boolean isSuccess = zkClient.delete(PARENT_PATH + "/" + node); if (isSuccess) System.out.println(node + " 成功執行完任務并刪除節點"); } else { System.out.println(node + " 未獲取到鎖"); } return isGetLock; } private static void doSomething() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }
測試以上代碼會發現,當獲取鎖的節點過多時,某一節點變更會通知所有節點,會對 ZooKeeper 服務器造成巨大的性能影響和網絡沖擊,服務器會發送給客戶端大量的事件通知。比如有以下節點,當 w-24 節點變更時,會通知給其余節點。
因為當獲取共享鎖時,要判斷比自己序號小的節點,所以應該只給 r-25 節點發送通知。針對此情況,改進后判斷讀寫順序為:
創建完臨時順序節點后,獲取 "/xxx" 下的所有子節點。
客戶端調用 getChildren() 來獲取子節點列表,注意,這里不注冊任何監聽。
如果未獲取到共享鎖,那么找到比自己序號小的節點來注冊監聽,分為以下倆種情況:
讀節點:比自己序號小的最后一個寫節點注冊監聽
寫節點:比自己序號小的最后一個節點注冊監聽
等待監聽通知,重復步驟2
改進后的共享鎖代碼實現:
import java.io.IOException; import java.util.Collections; import java.util.Comparator; import java.util.List; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.ZkClient; import org.apache.http.client.ClientProtocolException; /** * 分布式共享鎖最優 * @author alexnevsky * @date 2018年5月23日 */ public class SharedLockOptimal { private static ZkClient zkClient = new ZkClient("IP1:Port1,IP2:Port2,IP3:Port3", 5000); private static final String PARENT_PATH = "/zk-book/shared_lock"; private static String nodeFullPath = zkClient.createEphemeralSequential(PARENT_PATH + "/r-", null); public static void main(String[] args) throws InterruptedException, ClientProtocolException, IOException { ListcurrentChilds = sortNodes(zkClient.getChildren(PARENT_PATH)); String node = nodeFullPath.substring(nodeFullPath.lastIndexOf("/") + 1); boolean isReadNode = node.indexOf("r-") > -1 ? true : false, isGetLock = getLock(currentChilds, node); System.out.println("當前所有節點:" + currentChilds.toString() + ", 該" + (isReadNode ? "讀" : "寫") + "節點:" + node); if (isGetLock) { execute(node); System.out.println("退出程序"); System.exit(1); } else { String monitorNode = getMonitorNode(currentChilds, node); System.out.println(node + " 未獲取到鎖,注冊監聽節點:" + monitorNode); if (null != monitorNode) { zkClient.subscribeChildChanges(PARENT_PATH + "/" + monitorNode, new IZkChildListener() { @Override public void handleChildChange(String parentPath, List currentChilds) throws Exception { main(null); // 遞歸調用 } }); } Thread.sleep(Integer.MAX_VALUE); } } /** * 排序節點 * @author alexnevsky * @date 2018年5月24日 * @param nodes * @return */ private static List sortNodes(List nodes) { Collections.sort(nodes, new Comparator () { @Override public int compare(String o1, String o2) { o1 = o1.indexOf("r-") > -1 ? o1.replaceFirst("r-", "") : o1.replaceFirst("w-", ""); o2 = o2.indexOf("r-") > -1 ? o2.replaceFirst("r-", "") : o2.replaceFirst("w-", ""); return Integer.parseInt(o1) - Integer.parseInt(o2); // 比較序列號 } }); return nodes; } /** * 獲取節點位置 * @author alexnevsky * @date 2018年5月24日 * @param currentChilds * @param node * @return */ private static Integer getNodePosition(List currentChilds, String node) { for (int i = 0, size = currentChilds.size(); i < size; i++) { if (currentChilds.get(i).equals(node)) return i; } return null; } /** * 獲取監聽節點 * @author alexnevsky * @date 2018年5月25日 * @param currentChilds * @param node * @return */ private static String getMonitorNode(List currentChilds, String node) { String monitorNode = null; Integer nodePosition = getNodePosition(currentChilds, node); if (0 < nodePosition) { // 非首節點 if (node.indexOf("r-") > -1) { // 讀節點 // 獲取比當前序號小的最后一個寫節點 for (int i = nodePosition - 1; i >= 0; i--) { String tempNode = currentChilds.get(i); if (tempNode.indexOf("w-") > -1) return tempNode; } } else { // 獲取比當前序號小的最后一個節點 return currentChilds.get(nodePosition - 1); } } return monitorNode; } /** * 獲取鎖 * @author alexnevsky * @date 2018年5月24日 * @param currentChilds * @param node * @return */ private static boolean getLock(List currentChilds, String node) { Integer nodePosition = getNodePosition(currentChilds, node); if (nodePosition == null) return false; if (nodePosition == 0) // 無序號更小的節點 return true; if (node.indexOf("r-") > -1) { // 讀節點 for (int i = 0; i < nodePosition; i++) { // 遍歷前面序號的節點 String tempNode = currentChilds.get(i); if (tempNode.indexOf("w-") > -1) // 存在寫節點,返回失敗 return false; } return true; } return false; } /** * 執行 * @author alexnevsky * @date 2018年5月24日 * @param node * @return */ private static void execute(String node) { System.out.println(node + " 成功獲取到鎖,開始執行耗時任務"); doSomething(); boolean isDeletedLock = zkClient.delete(nodeFullPath); System.out.println(node + " 成功執行完任務,刪除節點" + (isDeletedLock ? "成功" : "失敗")); } /** * 模擬耗時任務 * @author alexnevsky * @date 2018年5月25日 */ public static void doSomething() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/69512.html
摘要:可簡單地認為它是的擴展,負載均衡自然成為不可或缺的特性。是基于開發的服務代理組件,在使用場景中,它與和整合,打造具備服務動態更新和負載均衡能力的服務網關。類似的特性在項目也有體現,它是另一種高性能代理的方案,提供服務發現健康和負載均衡。 摘要: Cloud Native 應用架構隨著云技術的發展受到業界特別重視和關注,尤其是 CNCF(Cloud Native Computing Fo...
摘要:作為面試官,我是如何甄別應聘者的包裝程度語言和等其他語言的對比分析和主從復制的原理詳解和持久化的原理是什么面試中經常被問到的持久化與恢復實現故障恢復自動化詳解哨兵技術查漏補缺最易錯過的技術要點大掃盲意外宕機不難解決,但你真的懂數據恢復嗎每秒 作為面試官,我是如何甄別應聘者的包裝程度Go語言和Java、python等其他語言的對比分析 Redis和MySQL Redis:主從復制的原理詳...
摘要:作為面試官,我是如何甄別應聘者的包裝程度語言和等其他語言的對比分析和主從復制的原理詳解和持久化的原理是什么面試中經常被問到的持久化與恢復實現故障恢復自動化詳解哨兵技術查漏補缺最易錯過的技術要點大掃盲意外宕機不難解決,但你真的懂數據恢復嗎每秒 作為面試官,我是如何甄別應聘者的包裝程度Go語言和Java、python等其他語言的對比分析 Redis和MySQL Redis:主從復制的原理詳...
摘要:可簡單地認為它是的擴展,負載均衡自然成為不可或缺的特性。類似的特性在項目也有體現,它是另一種高性能代理的方案,提供服務發現健康和負載均衡。 Dubbo Cloud Native 實踐與思考 分享簡介 Cloud Native 應用架構隨著云技術的發展受到業界特別重視和關注,尤其是 CNCF(Cloud Native Computing Foundation)項目蓬勃發展之際。Dubbo...
閱讀 1297·2021-11-22 09:34
閱讀 2162·2021-10-08 10:18
閱讀 1724·2021-09-29 09:35
閱讀 2453·2019-08-29 17:20
閱讀 2137·2019-08-29 15:36
閱讀 3398·2019-08-29 13:52
閱讀 775·2019-08-29 12:29
閱讀 1183·2019-08-28 18:10