国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

聊聊jdk httpclient的ConnectionPool

Worktile / 925人閱讀

摘要:調用計算的時間,這個方法會清理移除并過期的連接除了清理過期的連接外,還通過間接觸發,去清理關閉或異常的連接

本文主要研究一下jdk httpclient的ConnectionPool

HttpConnection HttpConnection.getConnection

java.net.http/jdk/internal/net/http/HttpConnection.java

    /**
     * Factory for retrieving HttpConnections. A connection can be retrieved
     * from the connection pool, or a new one created if none available.
     *
     * The given {@code addr} is the ultimate destination. Any proxies,
     * etc, are determined from the request. Returns a concrete instance which
     * is one of the following:
     *      {@link PlainHttpConnection}
     *      {@link PlainTunnelingConnection}
     *
     * The returned connection, if not from the connection pool, must have its,
     * connect() or connectAsync() method invoked, which ( when it completes
     * successfully ) renders the connection usable for requests.
     */
    public static HttpConnection getConnection(InetSocketAddress addr,
                                               HttpClientImpl client,
                                               HttpRequestImpl request,
                                               Version version) {
        // The default proxy selector may select a proxy whose  address is
        // unresolved. We must resolve the address before connecting to it.
        InetSocketAddress proxy = Utils.resolveAddress(request.proxy());
        HttpConnection c = null;
        boolean secure = request.secure();
        ConnectionPool pool = client.connectionPool();

        if (!secure) {
            c = pool.getConnection(false, addr, proxy);
            if (c != null && c.isOpen() /* may have been eof/closed when in the pool */) {
                final HttpConnection conn = c;
                if (DEBUG_LOGGER.on())
                    DEBUG_LOGGER.log(conn.getConnectionFlow()
                                     + ": plain connection retrieved from HTTP/1.1 pool");
                return c;
            } else {
                return getPlainConnection(addr, proxy, request, client);
            }
        } else {  // secure
            if (version != HTTP_2) { // only HTTP/1.1 connections are in the pool
                c = pool.getConnection(true, addr, proxy);
            }
            if (c != null && c.isOpen()) {
                final HttpConnection conn = c;
                if (DEBUG_LOGGER.on())
                    DEBUG_LOGGER.log(conn.getConnectionFlow()
                                     + ": SSL connection retrieved from HTTP/1.1 pool");
                return c;
            } else {
                String[] alpn = null;
                if (version == HTTP_2 && hasRequiredHTTP2TLSVersion(client)) {
                    alpn = new String[] { "h2", "http/1.1" };
                }
                return getSSLConnection(addr, proxy, alpn, request, client);
            }
        }
    }

這里非https、https1.1的,走pool.getConnection(true, addr, proxy)

HttpConnection.closeOrReturnToCache

java.net.http/jdk/internal/net/http/HttpConnection.java

    void closeOrReturnToCache(HttpHeaders hdrs) {
        if (hdrs == null) {
            // the connection was closed by server, eof
            close();
            return;
        }
        if (!isOpen()) {
            return;
        }
        HttpClientImpl client = client();
        if (client == null) {
            close();
            return;
        }
        ConnectionPool pool = client.connectionPool();
        boolean keepAlive = hdrs.firstValue("Connection")
                .map((s) -> !s.equalsIgnoreCase("close"))
                .orElse(true);

        if (keepAlive) {
            Log.logTrace("Returning connection to the pool: {0}", this);
            pool.returnToPool(this);
        } else {
            close();
        }
    }

調用pool.returnToPool(this)歸還連接

ConnectionPool

java.net.http/jdk/internal/net/http/ConnectionPool.java

/**
 * Http 1.1 connection pool.
 */
final class ConnectionPool {

    static final long KEEP_ALIVE = Utils.getIntegerNetProperty(
            "jdk.httpclient.keepalive.timeout", 1200); // seconds
    static final long MAX_POOL_SIZE = Utils.getIntegerNetProperty(
            "jdk.httpclient.connectionPoolSize", 0); // unbounded
    final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);

    // Pools of idle connections

    private final HashMap> plainPool;
    private final HashMap> sslPool;
    private final ExpiryList expiryList;
    private final String dbgTag; // used for debug
    boolean stopped;

    //......
    /**
     * Entries in connection pool are keyed by destination address and/or
     * proxy address:
     * case 1: plain TCP not via proxy (destination only)
     * case 2: plain TCP via proxy (proxy only)
     * case 3: SSL not via proxy (destination only)
     * case 4: SSL over tunnel (destination and proxy)
     */
    static class CacheKey {
        final InetSocketAddress proxy;
        final InetSocketAddress destination;

        CacheKey(InetSocketAddress destination, InetSocketAddress proxy) {
            this.proxy = proxy;
            this.destination = destination;
        }

        @Override
        public boolean equals(Object obj) {
            if (obj == null) {
                return false;
            }
            if (getClass() != obj.getClass()) {
                return false;
            }
            final CacheKey other = (CacheKey) obj;
            if (!Objects.equals(this.proxy, other.proxy)) {
                return false;
            }
            if (!Objects.equals(this.destination, other.destination)) {
                return false;
            }
            return true;
        }

        @Override
        public int hashCode() {
            return Objects.hash(proxy, destination);
        }
    }

    synchronized HttpConnection getConnection(boolean secure,
                                              InetSocketAddress addr,
                                              InetSocketAddress proxy) {
        if (stopped) return null;
        CacheKey key = new CacheKey(addr, proxy);
        HttpConnection c = secure ? findConnection(key, sslPool)
                                  : findConnection(key, plainPool);
        //System.out.println ("getConnection returning: " + c);
        return c;
    }

    private HttpConnection
    findConnection(CacheKey key,
                   HashMap> pool) {
        LinkedList l = pool.get(key);
        if (l == null || l.isEmpty()) {
            return null;
        } else {
            HttpConnection c = l.removeFirst();
            expiryList.remove(c);
            return c;
        }
    }
    /**
     * Returns the connection to the pool.
     */
    void returnToPool(HttpConnection conn) {
        returnToPool(conn, Instant.now(), KEEP_ALIVE);
    }

    // Called also by whitebox tests
    void returnToPool(HttpConnection conn, Instant now, long keepAlive) {

        // Don"t call registerCleanupTrigger while holding a lock,
        // but register it before the connection is added to the pool,
        // since we don"t want to trigger the cleanup if the connection
        // is not in the pool.
        CleanupTrigger cleanup = registerCleanupTrigger(conn);

        // it"s possible that cleanup may have been called.
        HttpConnection toClose = null;
        synchronized(this) {
            if (cleanup.isDone()) {
                return;
            } else if (stopped) {
                conn.close();
                return;
            }
            if (MAX_POOL_SIZE > 0 && expiryList.size() >= MAX_POOL_SIZE) {
                toClose = expiryList.removeOldest();
                if (toClose != null) removeFromPool(toClose);
            }
            if (conn instanceof PlainHttpConnection) {
                putConnection(conn, plainPool);
            } else {
                assert conn.isSecure();
                putConnection(conn, sslPool);
            }
            expiryList.add(conn, now, keepAlive);
        }
        if (toClose != null) {
            if (debug.on()) {
                debug.log("Maximum pool size reached: removing oldest connection %s",
                          toClose.dbgString());
            }
            close(toClose);
        }
        //System.out.println("Return to pool: " + conn);
    }

    private void removeFromPool(HttpConnection c) {
        assert Thread.holdsLock(this);
        if (c instanceof PlainHttpConnection) {
            removeFromPool(c, plainPool);
        } else {
            assert c.isSecure();
            removeFromPool(c, sslPool);
        }
    }

    private boolean
    removeFromPool(HttpConnection c,
                   HashMap> pool) {
        //System.out.println("cacheCleaner removing: " + c);
        assert Thread.holdsLock(this);
        CacheKey k = c.cacheKey();
        List l = pool.get(k);
        if (l == null || l.isEmpty()) {
            pool.remove(k);
            return false;
        }
        return l.remove(c);
    }

    private void
    putConnection(HttpConnection c,
                  HashMap> pool) {
        CacheKey key = c.cacheKey();
        LinkedList l = pool.get(key);
        if (l == null) {
            l = new LinkedList<>();
            pool.put(key, l);
        }
        l.add(c);
    }

    void stop() {
        List closelist = Collections.emptyList();
        try {
            synchronized (this) {
                stopped = true;
                closelist = expiryList.stream()
                    .map(e -> e.connection)
                    .collect(Collectors.toList());
                expiryList.clear();
                plainPool.clear();
                sslPool.clear();
            }
        } finally {
            closelist.forEach(this::close);
        }
    }
}

借用連接調用getConnection方法,最后是調用findConnection方法,從LinkedList>移除掉第一個,再從expiryList移除掉該連接

歸還連接調用returnToPool方法,如果當前expiryList超出MAX_POOL_SIZE,則移除掉最老的一個,再將其從ExpiryList、HashMap>移除并且close掉;之后調用putConnection往HashMap>添加該連接,最后再往expiryList添加該連接

可以看見ConnectionPool維護了HashMap>以及ExpiryList兩個重要的屬性,借用時從這兩個地方移除,歸還時往這兩個地方添加;不一樣的是歸還時如果MAX_POOL_SIZE大于0,則會對expiryList大小進行判斷,超過最大值則移除最老的連接,并將其從這兩個地方移除掉

MAX_POOL_SIZE讀取的是jdk.httpclient.connectionPoolSize,讀取不到默認為0,表示無限

ConnectionPool有個stop方法,在HttpClient的stop時候調用(SelectorManager線程退出時觸發),stop方法會清除連接池并關閉連接

ExpiryList

java.net.http/jdk/internal/net/http/ConnectionPool.java

    /**
     * Manages a LinkedList of sorted ExpiryEntry. The entry with the closer
     * deadline is at the tail of the list, and the entry with the farther
     * deadline is at the head. In the most common situation, new elements
     * will need to be added at the head (or close to it), and expired elements
     * will need to be purged from the tail.
     */
    private static final class ExpiryList {
        private final LinkedList list = new LinkedList<>();
        private volatile boolean mayContainEntries;

        int size() { return list.size(); }

        // A loosely accurate boolean whose value is computed
        // at the end of each operation performed on ExpiryList;
        // Does not require synchronizing on the ConnectionPool.
        boolean purgeMaybeRequired() {
            return mayContainEntries;
        }

        // Returns the next expiry deadline
        // should only be called while holding a synchronization
        // lock on the ConnectionPool
        Optional nextExpiryDeadline() {
            if (list.isEmpty()) return Optional.empty();
            else return Optional.of(list.getLast().expiry);
        }

        // should only be called while holding a synchronization
        // lock on the ConnectionPool
        HttpConnection removeOldest() {
            ExpiryEntry entry = list.pollLast();
            return entry == null ? null : entry.connection;
        }

        // should only be called while holding a synchronization
        // lock on the ConnectionPool
        void add(HttpConnection conn) {
            add(conn, Instant.now(), KEEP_ALIVE);
        }

        // Used by whitebox test.
        void add(HttpConnection conn, Instant now, long keepAlive) {
            Instant then = now.truncatedTo(ChronoUnit.SECONDS)
                    .plus(keepAlive, ChronoUnit.SECONDS);

            // Elements with the farther deadline are at the head of
            // the list. It"s more likely that the new element will
            // have the farthest deadline, and will need to be inserted
            // at the head of the list, so we"re using an ascending
            // list iterator to find the right insertion point.
            ListIterator li = list.listIterator();
            while (li.hasNext()) {
                ExpiryEntry entry = li.next();

                if (then.isAfter(entry.expiry)) {
                    li.previous();
                    // insert here
                    li.add(new ExpiryEntry(conn, then));
                    mayContainEntries = true;
                    return;
                }
            }
            // last (or first) element of list (the last element is
            // the first when the list is empty)
            list.add(new ExpiryEntry(conn, then));
            mayContainEntries = true;
        }

        // should only be called while holding a synchronization
        // lock on the ConnectionPool
        void remove(HttpConnection c) {
            if (c == null || list.isEmpty()) return;
            ListIterator li = list.listIterator();
            while (li.hasNext()) {
                ExpiryEntry e = li.next();
                if (e.connection.equals(c)) {
                    li.remove();
                    mayContainEntries = !list.isEmpty();
                    return;
                }
            }
        }

        // should only be called while holding a synchronization
        // lock on the ConnectionPool.
        // Purge all elements whose deadline is before now (now included).
        List purgeUntil(Instant now) {
            if (list.isEmpty()) return Collections.emptyList();

            List closelist = new ArrayList<>();

            // elements with the closest deadlines are at the tail
            // of the queue, so we"re going to use a descending iterator
            // to remove them, and stop when we find the first element
            // that has not expired yet.
            Iterator li = list.descendingIterator();
            while (li.hasNext()) {
                ExpiryEntry entry = li.next();
                // use !isAfter instead of isBefore in order to
                // remove the entry if its expiry == now
                if (!entry.expiry.isAfter(now)) {
                    li.remove();
                    HttpConnection c = entry.connection;
                    closelist.add(c);
                } else break; // the list is sorted
            }
            mayContainEntries = !list.isEmpty();
            return closelist;
        }

        // should only be called while holding a synchronization
        // lock on the ConnectionPool
        java.util.stream.Stream stream() {
            return list.stream();
        }

        // should only be called while holding a synchronization
        // lock on the ConnectionPool
        void clear() {
            list.clear();
            mayContainEntries = false;
        }
    }

    static final class ExpiryEntry {
        final HttpConnection connection;
        final Instant expiry; // absolute time in seconds of expiry time
        ExpiryEntry(HttpConnection connection, Instant expiry) {
            this.connection = connection;
            this.expiry = expiry;
        }
    }

ExpiryList內部使用了LinkedList,而且使用ExpiryEntry對connection進行包裝

ExpiryEntry里頭除了HttpConnection,還維護了expiry時間,表示該連接的失效時間

對ExpiryList的添加操作是根據當前時間的秒數+KEEP_ALIVE參數計算出expiry時間,KEEP_ALIVE讀取的是jdk.httpclient.keepalive.timeout,讀取不到默認是1200秒;之后根據失效時間插入到LinkedList,失效時間長的在list頭部,快失效的在list尾部

對ExpiryList的移除操作有兩類,一類是移除最老的,通過pollLast操作完成,一類是移除指定連接,即使用ListIterator遍歷LinkedList進行匹配再移除

這里維護了mayContainEntries變量,在LinkedList進行操作時更新,用于返回ExpiryList是否有連接,避免需要時同步調用ConnectionPool來計算

ConnectionPool.purgeExpiredConnectionsAndReturnNextDeadline

java.net.http/jdk/internal/net/http/ConnectionPool.java

   /**
     * Purge expired connection and return the number of milliseconds
     * in which the next connection is scheduled to expire.
     * If no connections are scheduled to be purged return 0.
     * @return the delay in milliseconds in which the next connection will
     *         expire.
     */
    long purgeExpiredConnectionsAndReturnNextDeadline() {
        if (!expiryList.purgeMaybeRequired()) return 0;
        return purgeExpiredConnectionsAndReturnNextDeadline(Instant.now());
    }

    // Used for whitebox testing
    long purgeExpiredConnectionsAndReturnNextDeadline(Instant now) {
        long nextPurge = 0;

        // We may be in the process of adding new elements
        // to the expiry list - but those elements will not
        // have outlast their keep alive timer yet since we"re
        // just adding them.
        if (!expiryList.purgeMaybeRequired()) return nextPurge;

        List closelist;
        synchronized (this) {
            closelist = expiryList.purgeUntil(now);
            for (HttpConnection c : closelist) {
                if (c instanceof PlainHttpConnection) {
                    boolean wasPresent = removeFromPool(c, plainPool);
                    assert wasPresent;
                } else {
                    boolean wasPresent = removeFromPool(c, sslPool);
                    assert wasPresent;
                }
            }
            nextPurge = now.until(
                    expiryList.nextExpiryDeadline().orElse(now),
                    ChronoUnit.MILLIS);
        }
        closelist.forEach(this::close);
        return nextPurge;
    }

由于ExpiryList的connection具有失效時間,因而存在清理失效連接的步驟,這個步驟是通過purgeExpiredConnectionsAndReturnNextDeadline來完成

purgeExpiredConnectionsAndReturnNextDeadline方法被SelectorManager調用,用于計算selector.select的timeout時間

該方法首先調用expiryList.purgeMaybeRequired()訪問mayContainEntries,看expiryList有無連接,沒有連接直接返回0;之后調用expiryList.purgeUntil(now)移除并獲取目前過期的連接,然后挨個從HashMap>移除并計算nextPurge,最后挨個close掉移除的連接

CleanupTrigger

java.net.http/jdk/internal/net/http/ConnectionPool.java

    private CleanupTrigger registerCleanupTrigger(HttpConnection conn) {
        // Connect the connection flow to a pub/sub pair that will take the
        // connection out of the pool and close it if anything happens
        // while the connection is sitting in the pool.
        CleanupTrigger cleanup = new CleanupTrigger(conn);
        FlowTube flow = conn.getConnectionFlow();
        if (debug.on()) debug.log("registering %s", cleanup);
        flow.connectFlows(cleanup, cleanup);
        return cleanup;
    }

    void cleanup(HttpConnection c, Throwable error) {
        if (debug.on())
            debug.log("%s : ConnectionPool.cleanup(%s)",
                    String.valueOf(c.getConnectionFlow()), error);
        synchronized(this) {
            removeFromPool(c);
            expiryList.remove(c);
        }
        c.close();
    }

    /**
     * An object that subscribes to the flow while the connection is in
     * the pool. Anything that comes in will cause the connection to be closed
     * and removed from the pool.
     */
    private final class CleanupTrigger implements
            FlowTube.TubeSubscriber, FlowTube.TubePublisher,
            Flow.Subscription {

        private final HttpConnection connection;
        private volatile boolean done;

        public CleanupTrigger(HttpConnection connection) {
            this.connection = connection;
        }

        public boolean isDone() { return done;}

        private void triggerCleanup(Throwable error) {
            done = true;
            cleanup(connection, error);
        }

        @Override public void request(long n) {}
        @Override public void cancel() {}

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            subscription.request(1);
        }
        @Override
        public void onError(Throwable error) { triggerCleanup(error); }
        @Override
        public void onComplete() { triggerCleanup(null); }
        @Override
        public void onNext(List item) {
            triggerCleanup(new IOException("Data received while in pool"));
        }

        @Override
        public void subscribe(Flow.Subscriber> subscriber) {
            subscriber.onSubscribe(this);
        }

        @Override
        public String toString() {
            return "CleanupTrigger(" + connection.getConnectionFlow() + ")";
        }
    }

在調用returnToPool的時候,會調用registerCleanupTrigger,創建一個CleanupTrigger,然后調用conn.getConnectionFlow()獲取flow,再調用flow.connectFlows(cleanup, cleanup)

CleanupTrigger既是FlowTube.TubeSubscriber也是FlowTube.TubePublisher,在onComplete及onError方法里頭調用了cleanup方法,將連接從HashMap>及expiryList移除

這個CleanupTrigger的功能可能類似于主動式的連接健康檢查,在底層連接發生異常關閉的時候,通知到連接池這邊,觸發清理這些臟的連接

小結

jdk httpclient的ConnectionPool相對于apache common pools而言比較簡單,有幾個參數(實際作用于ExpiryList):

MAX_POOL_SIZE(jdk.httpclient.connectionPoolSize),默認為0,表示無限
KEEP_ALIVE(jdk.httpclient.keepalive.timeout),默認是1200秒

ConnectionPool同時維護了兩個屬性:HashMap>及expiryList,前者使用目標ip地址及代理地址作為CacheKey,每個地址維護一個連接池;后者不分cacheKey,對每個在連接池中的connection進行包裝,根據KEEP_ALIVE記錄了失效時間。

SelectorManager調用purgeExpiredConnectionsAndReturnNextDeadline計算select的timeout時間,這個方法會清理(移除并close)過期的連接

除了SelectorManager清理過期的連接外,connection還通過FlowTube間接觸發CleanupTrigger,去清理關閉或異常的連接

doc

java.net.http javadoc

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/77366.html

相關文章

  • [case39]聊聊jdk httpclientexecutor

    摘要:序本文主要研究一下的這里如果的為,則會創建這里如果是的話,參數傳遞的是如果是同步的方法,則傳的值是這里創建了一個,然后調用,這里使用了可以看到這里使用的是的方法注意這個方法是才有的,也是在這里使用的由于默認是使用創建的, 序 本文主要研究一下jdk httpclient的executor HttpClientImpl java.net.http/jdk/internal/net/htt...

    dabai 評論0 收藏0
  • 淺析 jdk11 中 HttpClient 使用

    摘要:在中也可以直接使用返回的是,然后通過來獲取結果阻塞線程,從中獲取結果四一點嘮叨非常的年輕,網絡資料不多,且代碼非常精細和復雜,目前來看底層應該是使用了線程池搭配進行異步通訊。 零 前期準備 0 版本 JDK 版本 : OpenJDK 11.0.1 IDE : idea 2018.3 1 HttpClient 簡介 java.net.http.HttpClient 是 jdk11 中正式...

    Eminjannn 評論0 收藏0
  • 聊聊jdk httpclientconnect timeout異常

    摘要:序本文主要研究一下的異常實例代碼異常日志如下最后調用這里調用獲取連接如果沒有連接會新創建一個,走的是這里先是調用了獲取連接,然后調用進行連接這里委托給這里如果有設置的話,則會創建一個調用進行連接,如果連接未 序 本文主要研究一下httpclient的connect timeout異常 實例代碼 @Test public void testConnectTimeout()...

    張利勇 評論0 收藏0
  • 聊聊jdk httpclientretry參數

    摘要:序本文主要研究一下的參數這里有一個類型的變量,用來記錄請求次數另外還有一個,讀取的是值,讀取不到默認取,為進入該方法的時候,調用,遞增請求次數,然后判斷有無超出限制,有則返回帶有異常的,即通過返回如果沒有超出限制,但是執行請求失敗,則 序 本文主要研究一下jdk httpclient的retry參數 DEFAULT_MAX_ATTEMPTS java.net.http/jdk/inte...

    ityouknow 評論0 收藏0
  • 記一次OkHttpClient導致線程過多排查

    摘要:首先先解讀下這個報警內容,原因活躍線程數過多,是監聽的端口號用來獲取虛擬機各項信息,代表著此時的線程數,是設置的報警閾值。 前言 前天,一位21世紀的好好青年正在工位上默念社會主義大法好的時候,釘釘上又報警了(公司項目接入了open-faclon監控,指標不正常會報警給釘釘的機器人),無奈默默流淚揮手告別社會主義大法開始定位線上問題。 報警內容 首先我們先來看下報警信息,為防止泄露公...

    tianyu 評論0 收藏0

發表評論

0條評論

最新活動
閱讀需要支付1元查看
<