国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

[case39]聊聊jdk httpclient的executor

dabai / 3087人閱讀

摘要:序本文主要研究一下的這里如果的為,則會創(chuàng)建這里如果是的話,參數(shù)傳遞的是如果是同步的方法,則傳的值是這里創(chuàng)建了一個,然后調(diào)用,這里使用了可以看到這里使用的是的方法注意這個方法是才有的,也是在這里使用的由于默認(rèn)是使用創(chuàng)建的,

本文主要研究一下jdk httpclient的executor

HttpClientImpl

java.net.http/jdk/internal/net/http/HttpClientImpl.java

    private HttpClientImpl(HttpClientBuilderImpl builder,
                           SingleFacadeFactory facadeFactory) {
        id = CLIENT_IDS.incrementAndGet();
        dbgTag = "HttpClientImpl(" + id +")";
        if (builder.sslContext == null) {
            try {
                sslContext = SSLContext.getDefault();
            } catch (NoSuchAlgorithmException ex) {
                throw new InternalError(ex);
            }
        } else {
            sslContext = builder.sslContext;
        }
        Executor ex = builder.executor;
        if (ex == null) {
            ex = Executors.newCachedThreadPool(new DefaultThreadFactory(id));
            isDefaultExecutor = true;
        } else {
            isDefaultExecutor = false;
        }
        delegatingExecutor = new DelegatingExecutor(this::isSelectorThread, ex);
        facadeRef = new WeakReference<>(facadeFactory.createFacade(this));
        client2 = new Http2ClientImpl(this);
        cookieHandler = builder.cookieHandler;
        connectTimeout = builder.connectTimeout;
        followRedirects = builder.followRedirects == null ?
                Redirect.NEVER : builder.followRedirects;
        this.userProxySelector = Optional.ofNullable(builder.proxy);
        this.proxySelector = userProxySelector
                .orElseGet(HttpClientImpl::getDefaultProxySelector);
        if (debug.on())
            debug.log("proxySelector is %s (user-supplied=%s)",
                      this.proxySelector, userProxySelector.isPresent());
        authenticator = builder.authenticator;
        if (builder.version == null) {
            version = HttpClient.Version.HTTP_2;
        } else {
            version = builder.version;
        }
        if (builder.sslParams == null) {
            sslParams = getDefaultParams(sslContext);
        } else {
            sslParams = builder.sslParams;
        }
        connections = new ConnectionPool(id);
        connections.start();
        timeouts = new TreeSet<>();
        try {
            selmgr = new SelectorManager(this);
        } catch (IOException e) {
            // unlikely
            throw new InternalError(e);
        }
        selmgr.setDaemon(true);
        filters = new FilterFactory();
        initFilters();
        assert facadeRef.get() != null;
    }

這里如果HttpClientBuilderImpl的executor為null,則會創(chuàng)建Executors.newCachedThreadPool(new DefaultThreadFactory(id))

HttpClientImpl.sendAsync

java.net.http/jdk/internal/net/http/HttpClientImpl.java

    @Override
    public  CompletableFuture>
    sendAsync(HttpRequest userRequest, BodyHandler responseHandler)
    {
        return sendAsync(userRequest, responseHandler, null);
    }

    @Override
    public  CompletableFuture>
    sendAsync(HttpRequest userRequest,
              BodyHandler responseHandler,
              PushPromiseHandler pushPromiseHandler) {
        return sendAsync(userRequest, responseHandler, pushPromiseHandler, delegatingExecutor.delegate);
    }

    private  CompletableFuture>
    sendAsync(HttpRequest userRequest,
              BodyHandler responseHandler,
              PushPromiseHandler pushPromiseHandler,
              Executor exchangeExecutor)    {

        Objects.requireNonNull(userRequest);
        Objects.requireNonNull(responseHandler);

        AccessControlContext acc = null;
        if (System.getSecurityManager() != null)
            acc = AccessController.getContext();

        // Clone the, possibly untrusted, HttpRequest
        HttpRequestImpl requestImpl = new HttpRequestImpl(userRequest, proxySelector);
        if (requestImpl.method().equals("CONNECT"))
            throw new IllegalArgumentException("Unsupported method CONNECT");

        long start = DEBUGELAPSED ? System.nanoTime() : 0;
        reference();
        try {
            if (debugelapsed.on())
                debugelapsed.log("ClientImpl (async) send %s", userRequest);

            // When using sendAsync(...) we explicitly pass the
            // executor"s delegate as exchange executor to force
            // asynchronous scheduling of the exchange.
            // When using send(...) we don"t specify any executor
            // and default to using the client"s delegating executor
            // which only spawns asynchronous tasks if it detects
            // that the current thread is the selector manager
            // thread. This will cause everything to execute inline
            // until we need to schedule some event with the selector.
            Executor executor = exchangeExecutor == null
                    ? this.delegatingExecutor : exchangeExecutor;

            MultiExchange mex = new MultiExchange<>(userRequest,
                                                            requestImpl,
                                                            this,
                                                            responseHandler,
                                                            pushPromiseHandler,
                                                            acc);
            CompletableFuture> res =
                    mex.responseAsync(executor).whenComplete((b,t) -> unreference());
            if (DEBUGELAPSED) {
                res = res.whenComplete(
                        (b,t) -> debugCompleted("ClientImpl (async)", start, userRequest));
            }

            // makes sure that any dependent actions happen in the CF default
            // executor. This is only needed for sendAsync(...), when
            // exchangeExecutor is non-null.
            if (exchangeExecutor != null) {
                res = res.whenCompleteAsync((r, t) -> { /* do nothing */}, ASYNC_POOL);
            }
            return res;
        } catch(Throwable t) {
            unreference();
            debugCompleted("ClientImpl (async)", start, userRequest);
            throw t;
        }
    }

這里如果是sendAsync的話,executor參數(shù)傳遞的是delegatingExecutor.delegate;如果是同步的send方法,則executor傳的值是null

這里創(chuàng)建了一個MultiExchange,然后調(diào)用mex.responseAsync(executor).whenComplete((b,t) -> unreference()),這里使用了executor

MultiExchange.responseAsync

java.net.http/jdk/internal/net/http/MultiExchange.java

    public CompletableFuture> responseAsync(Executor executor) {
        CompletableFuture start = new MinimalFuture<>();
        CompletableFuture> cf = responseAsync0(start);
        start.completeAsync( () -> null, executor); // trigger execution
        return cf;
    }

    private CompletableFuture>
    responseAsync0(CompletableFuture start) {
        return start.thenCompose( v -> responseAsyncImpl())
                    .thenCompose((Response r) -> {
                        Exchange exch = getExchange();
                        return exch.readBodyAsync(responseHandler)
                            .thenApply((T body) -> {
                                this.response =
                                    new HttpResponseImpl<>(r.request(), r, this.response, body, exch);
                                return this.response;
                            });
                    });
    }

可以看到這里使用的是CompletableFuture的completeAsync方法(注意這個方法是java9才有的),executor也是在這里使用的

由于默認(rèn)是使用Executors.newCachedThreadPool創(chuàng)建的executor,要注意控制并發(fā)數(shù)及任務(wù)執(zhí)行時間,防止線程數(shù)無限制增長過度消耗系統(tǒng)資源

    /**
     * Creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
     * available, and uses the provided
     * ThreadFactory to create new threads when needed.
     *
     * @param threadFactory the factory to use when creating new threads
     * @return the newly created thread pool
     * @throws NullPointerException if threadFactory is null
     */
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue(),
                                      threadFactory);
    }
RejectedExecutionException

實例代碼

    @Test
    public void testAsyncPool(){
        ThreadPoolExecutor executor = ThreadPoolBuilder.fixedPool()
                .setPoolSize(2)
                .setQueueSize(5)
                .setThreadNamePrefix("test-")
                .build();

        List> futureList = IntStream.rangeClosed(1,100)
                .mapToObj(i -> new CompletableFuture())
                .collect(Collectors.toList());
        futureList.stream()
                .forEach(future -> {
                    future.completeAsync(() -> {
                        try {
                            TimeUnit.SECONDS.sleep(3);
                        } catch (InterruptedException e1) {
                            e1.printStackTrace();
                        }
                        return "message";
                    },executor);
                });
        CompletableFuture.allOf(futureList
                .toArray(new CompletableFuture[futureList.size()]))
                .join();
    }
這里創(chuàng)建的是fixedPool,指定queueSize為5

日志輸出

java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$AsyncSupply@76b10754 rejected from java.util.concurrent.ThreadPoolExecutor@2bea5ab4[Running, pool size = 2, active threads = 2, queued tasks = 5, completed tasks = 0]

    at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055)
    at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825)
    at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1355)
    at java.base/java.util.concurrent.CompletableFuture.completeAsync(CompletableFuture.java:2591)
可以看到線程池隊列大小起到了限制作用
小結(jié)

jdk httpclient的executor在進(jìn)行異步操作的時候使用,默認(rèn)創(chuàng)建的是使用Executors.newCachedThreadPool創(chuàng)建的executor,其線程池大小是Integer.MAX_VALUE,因此在使用的時候要注意,最好是改為有界隊列,然后再加上線程池的監(jiān)控。

doc

java.net.http javadoc

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/77340.html

相關(guān)文章

  • 聊聊jdk httpclientretry參數(shù)

    摘要:序本文主要研究一下的參數(shù)這里有一個類型的變量,用來記錄請求次數(shù)另外還有一個,讀取的是值,讀取不到默認(rèn)取,為進(jìn)入該方法的時候,調(diào)用,遞增請求次數(shù),然后判斷有無超出限制,有則返回帶有異常的,即通過返回如果沒有超出限制,但是執(zhí)行請求失敗,則 序 本文主要研究一下jdk httpclient的retry參數(shù) DEFAULT_MAX_ATTEMPTS java.net.http/jdk/inte...

    ityouknow 評論0 收藏0
  • 聊聊jdk httpclientconnect timeout異常

    摘要:序本文主要研究一下的異常實例代碼異常日志如下最后調(diào)用這里調(diào)用獲取連接如果沒有連接會新創(chuàng)建一個,走的是這里先是調(diào)用了獲取連接,然后調(diào)用進(jìn)行連接這里委托給這里如果有設(shè)置的話,則會創(chuàng)建一個調(diào)用進(jìn)行連接,如果連接未 序 本文主要研究一下httpclient的connect timeout異常 實例代碼 @Test public void testConnectTimeout()...

    張利勇 評論0 收藏0
  • 聊聊jdk httpclientConnectionPool

    摘要:調(diào)用計算的時間,這個方法會清理移除并過期的連接除了清理過期的連接外,還通過間接觸發(fā),去清理關(guān)閉或異常的連接 序 本文主要研究一下jdk httpclient的ConnectionPool HttpConnection HttpConnection.getConnection java.net.http/jdk/internal/net/http/HttpConnection.java ...

    Worktile 評論0 收藏0
  • 淺析 jdk11 中 HttpClient 使用

    摘要:在中也可以直接使用返回的是,然后通過來獲取結(jié)果阻塞線程,從中獲取結(jié)果四一點嘮叨非常的年輕,網(wǎng)絡(luò)資料不多,且代碼非常精細(xì)和復(fù)雜,目前來看底層應(yīng)該是使用了線程池搭配進(jìn)行異步通訊。 零 前期準(zhǔn)備 0 版本 JDK 版本 : OpenJDK 11.0.1 IDE : idea 2018.3 1 HttpClient 簡介 java.net.http.HttpClient 是 jdk11 中正式...

    Eminjannn 評論0 收藏0
  • apacheHttpClient默認(rèn)重試機(jī)制

    摘要:異常重試默認(rèn)重試次,三次都失敗則拋出或其他異常 maven org.apache.httpcomponents httpclient 4.5.2 異常重試log 2017-01-31 19:31:39.057 INFO 3873 --- [askScheduler-13] o....

    MartinDai 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<