国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

分布式鎖(zookeeper)與接口冪等性實現

zhigoo / 544人閱讀

摘要:分布式系統錯綜復雜,今天,我們著重對分布式系統的互斥性與冪等性進行分析與解決。阻塞鎖與自旋鎖。公平鎖與非公平鎖。實現今天重點講解使用實現分布式鎖。個人感覺是最適合實現分布式鎖。如以上流程,接口無法冪等,可能導致重復扣款。

背景

隨著數據量的增大,用戶的增多,系統的并發訪問越來越大,傳統的單機已經滿足不了需求,分布式系統成為一種必然的趨勢。分布式系統錯綜復雜,今天,我們著重對分布式系統的互斥性與冪等性進行分析與解決。

互斥性

互斥性問題也就是共享資源的搶占問題。如何解決呢?也就是鎖,保證對共享資源的串行化訪問。互斥性要如何實現?。在java中,最常用的是synchronized和lock這兩種內置的鎖,但這只適用于單進程中的多線程。對于在同一操作系統下的多個進程間,常見的鎖實現有pv信號量等。然而,當問題擴展到多臺機器的多個操作系統時,也就是分布式鎖,情況就復雜多了。

鎖要存在哪里。必須提供一個所有主機都能訪問到的存儲空間

加鎖的進程在掛掉之后,如何確保鎖被解開,釋放資源。可以通過超時機制或者定時檢測心跳來實現

不同進程間如何獲取相同的唯一標識來競爭鎖。可以利用要保護的資源生成一個唯一的id

獲取鎖操作的原子性。必須保證讀取鎖狀態、加鎖兩步的原子性

鎖的可重入性。某個線程試圖再次獲取由自己持有的鎖,這個操作會百分百成功,這就是可重入性。如果不能保證可重入性,就會有死鎖的可能。

阻塞鎖與自旋鎖。當獲取不到鎖時,阻塞鎖就是線程阻塞自身,等待喚醒,自旋鎖就是不斷的嘗試重新獲取鎖。

公平鎖與非公平鎖。公平鎖保證按照請求的順序獲取鎖,非公平鎖就是可以插隊。公平鎖一般要維持一個隊列來實現,所以非公平鎖的性能會更好一點。

避免驚群效應。如果分布式鎖是阻塞鎖,當鎖的占有者釋放鎖時,要避免同時喚醒多個阻塞的線程,產生驚群效應。

zookeeper實現

今天重點講解使用zookeeper實現分布式鎖。個人感覺zookeeper是最適合實現分布式鎖。它的幾個特性:

順序節點:可以避免驚群效應

臨時節點:避免機器宕機倒是鎖無法釋放

watch機制:可以及時喚醒等待的線程

zk實現分布式鎖的流程如下

我這里用zk實現了一個可重入的、阻塞的、公平的分布式鎖,代碼如下:

package locks;

import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import utils.ZkUtils;
import watcher.PredecessorNodeWatcher;
import watcher.SessionWatcher;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* Created by huangwt on 2018/3/21.
*/
@Slf4j
public class ReentrantZKLock {

    private final static String BASE_NODE = "/baseNode";
    private final static String CHILDREN_NODE = "/node_";

    private final Lock localLock;
    private final Condition condition;

    //用于重入檢測
    private static ThreadLocal threadLocal = new ThreadLocal();

    private ZooKeeper zooKeeper = null;

    private String node = null;

    ReentrantZKLock(String addr, int timeout) {
        try {
            zooKeeper = new ZooKeeper(addr, timeout, new SessionWatcher());
            localLock = new ReentrantLock();
            condition = localLock.newCondition();
        } catch (IOException e) {
            log.error("get zookeeper failed", e);
            throw new RuntimeException(e);
        }
    }

    public void lock() {
        //重入檢測
        if (checkReentrant()) {
            return;
        }
        try {
            node = zooKeeper.create(BASE_NODE + CHILDREN_NODE, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            while (true) {
                localLock.lock();
                try {
                    List childrenNodes = zooKeeper.getChildren(BASE_NODE, false);
                    ZkUtils.childNodeSort(childrenNodes);
                    //當前節點的索引
                    int myNodeIndex = childrenNodes.indexOf(node);
                    //當前節點的前一個節點
                    int beforeNodeIndex = myNodeIndex - 1;
                    Stat stat = null;
                    while (beforeNodeIndex >= 0) {
                        stat = zooKeeper.exists(childrenNodes.get(beforeNodeIndex), new PredecessorNodeWatcher(condition));
                        if (stat != null) {
                            break;
                        }
                    }

                    if (stat != null) {  //前序節點存在,等待前序節點被刪除,釋放鎖
                        condition.await();
                    } else { // 獲取到鎖
                        threadLocal.set(new AtomicInteger(1));
                        return;
                    }
                } finally {
                    localLock.unlock();
                }
            }
        } catch (Exception e) {
            log.error("lock failed", e);
            throw new RuntimeException(e);
        }

    }

    public void unlock() {
        AtomicInteger times = threadLocal.get();
        if (times == null) {
            return;
        }
        if (times.decrementAndGet() == 0) {
            threadLocal.remove();
            try {
                zooKeeper.delete(node, -1);
            } catch (Exception e) {
                log.error("unlock faild", e);
                throw new RuntimeException(e);
            }
        }

    }

    private boolean checkReentrant() {
        AtomicInteger times = threadLocal.get();
        if (times != null) {
            times.incrementAndGet();
            return true;
        }

        return false;
    }
}
package utils;

import java.util.Collections;
import java.util.Comparator;
import java.util.List;

/**
* Created by huangwt on 2018/3/24.
*/
public class ZkUtils {

    /**
    * 對子節點排序
    *
    * @param node
    */
    public static void childNodeSort(List node) {
        Collections.sort(node, new ChildNodeCompare());
    }

    private static class ChildNodeCompare implements Comparator {

        public int compare(String childNode1, String childNode2) {
            return childNode1.compareTo(childNode2);
        }
    }

}
package watcher;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;

import java.util.concurrent.locks.Condition;

/**
* Created by huangwt on 2018/3/24.
*/
public class PredecessorNodeWatcher implements Watcher {
    private Condition condition = null;

    public PredecessorNodeWatcher(Condition condition) {
        this.condition = condition;
    }

    public void process(WatchedEvent event) {
        //前序節點被刪除,鎖被釋放,喚醒當前等待線程
        if(event.getType() == Event.EventType.NodeDeleted){
            condition.signal();
        }
    }
}
package watcher;

import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;

/**
* Created by huangwt on 2018/3/24.
*/
@Slf4j
public class SessionWatcher implements Watcher {
    public void process(WatchedEvent event) {
        if (event.getState() == Event.KeeperState.SyncConnected) {
            log.info("get zookeeper success");
        }
    }
}

主要是使用了ThreadLocal實現了鎖的可重入性,使用watch機制實現了阻塞鎖,使用臨時節點實現的公平鎖。
這段代碼只是一個demo供大家參考,還有很多問題沒解決。比如當zookper掛掉的時候,阻塞的線程就無法被喚醒,這時候就需要監聽zk的心跳。

冪等性

冪等性是系統接口對外的一種承諾,數學表達為:f(f(x)) = f(x)。
冪等性指的是,使用相同參數對同一資源重復調用某個接口的結果與調用一次的結果相同。

為什么需要冪等性?

假設現在有一個方法 :Boolean withdraw(account_id, amount) ,作用是從account_id對應的賬戶中扣除amount數額的錢,如果扣除成功則返回true,賬戶余額減少amount; 如果扣除失敗則返回false,賬戶余額不變。
如以上流程,接口無法冪等,可能導致重復扣款。

解決

請求獲取ticketId

請求扣款,傳入ticketId

根據ticketId查詢此次操作是否存在,如果存在則表示該操作已經執行過,直接返回結果;如果不存在,扣款,保存結果

返回結果到客戶端

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/69250.html

相關文章

  • 【最全】Java 進階面試總結

    摘要:這里有一份面試題相關總結,涉及高并發分布式高可用相關知識點,在此分享給大家,希望大家能拿到一份理想的知識點會陸續更新在上,覺得還算湊和的話可以關注一下噢高并發架構消息隊列為什么使用消息隊列消息隊列有什么優點和缺點都有什么優點和缺點如何保證消 這里有一份面試題相關總結,涉及高并發、分布式、高可用相關知識點,在此分享給大家,希望大家能拿到一份理想的 Offer! 知識點會陸續更新在 Git...

    nifhlheimr 評論0 收藏0
  • 五萬字15張導圖Java自學路線,小白零基礎入門,程序員進階,收藏這篇就夠了

    摘要:本文收錄于技術專家修煉文中配套資料合集路線導圖高清源文件點擊跳轉到文末點擊底部卡片回復資料領取哈嘍,大家好,我是一條最近粉絲問我有沒有自學路線,有了方向才能按圖索驥,事半功倍。 ...

    suosuopuo 評論0 收藏0
  • 微服務化之無狀態化容器化

    摘要:然而在微服務化之前,建議先進行容器化,在容器化之前,建議先無狀態化,當整個流程容器化了,以后的微服務拆分才會水到渠成。 此文已由作者劉超授權網易云社區發布。 歡迎訪問網易云社區,了解更多網易技術產品運營經驗。 一、為什么要做無狀態化和容器化 很多應用拆分成微服務,是為了承載高并發,往往一個進程扛不住這么大的量,因而需要拆分成多組進程,每組進程承載特定的工作,根據并發的壓力用多個副本公共...

    seanlook 評論0 收藏0
  • 【推薦】最新200篇:技術文章整理

    摘要:作為面試官,我是如何甄別應聘者的包裝程度語言和等其他語言的對比分析和主從復制的原理詳解和持久化的原理是什么面試中經常被問到的持久化與恢復實現故障恢復自動化詳解哨兵技術查漏補缺最易錯過的技術要點大掃盲意外宕機不難解決,但你真的懂數據恢復嗎每秒 作為面試官,我是如何甄別應聘者的包裝程度Go語言和Java、python等其他語言的對比分析 Redis和MySQL Redis:主從復制的原理詳...

    BicycleWarrior 評論0 收藏0

發表評論

0條評論

最新活動
閱讀需要支付1元查看
<