摘要:序本文主要研究一下的這里如果的為,則會創(chuàng)建這里如果是的話,參數(shù)傳遞的是如果是同步的方法,則傳的值是這里創(chuàng)建了一個,然后調(diào)用,這里使用了可以看到這里使用的是的方法注意這個方法是才有的,也是在這里使用的由于默認(rèn)是使用創(chuàng)建的,
序
本文主要研究一下jdk httpclient的executor
HttpClientImpljava.net.http/jdk/internal/net/http/HttpClientImpl.java
private HttpClientImpl(HttpClientBuilderImpl builder, SingleFacadeFactory facadeFactory) { id = CLIENT_IDS.incrementAndGet(); dbgTag = "HttpClientImpl(" + id +")"; if (builder.sslContext == null) { try { sslContext = SSLContext.getDefault(); } catch (NoSuchAlgorithmException ex) { throw new InternalError(ex); } } else { sslContext = builder.sslContext; } Executor ex = builder.executor; if (ex == null) { ex = Executors.newCachedThreadPool(new DefaultThreadFactory(id)); isDefaultExecutor = true; } else { isDefaultExecutor = false; } delegatingExecutor = new DelegatingExecutor(this::isSelectorThread, ex); facadeRef = new WeakReference<>(facadeFactory.createFacade(this)); client2 = new Http2ClientImpl(this); cookieHandler = builder.cookieHandler; connectTimeout = builder.connectTimeout; followRedirects = builder.followRedirects == null ? Redirect.NEVER : builder.followRedirects; this.userProxySelector = Optional.ofNullable(builder.proxy); this.proxySelector = userProxySelector .orElseGet(HttpClientImpl::getDefaultProxySelector); if (debug.on()) debug.log("proxySelector is %s (user-supplied=%s)", this.proxySelector, userProxySelector.isPresent()); authenticator = builder.authenticator; if (builder.version == null) { version = HttpClient.Version.HTTP_2; } else { version = builder.version; } if (builder.sslParams == null) { sslParams = getDefaultParams(sslContext); } else { sslParams = builder.sslParams; } connections = new ConnectionPool(id); connections.start(); timeouts = new TreeSet<>(); try { selmgr = new SelectorManager(this); } catch (IOException e) { // unlikely throw new InternalError(e); } selmgr.setDaemon(true); filters = new FilterFactory(); initFilters(); assert facadeRef.get() != null; }
這里如果HttpClientBuilderImpl的executor為null,則會創(chuàng)建Executors.newCachedThreadPool(new DefaultThreadFactory(id))
HttpClientImpl.sendAsyncjava.net.http/jdk/internal/net/http/HttpClientImpl.java
@Override publicCompletableFuture > sendAsync(HttpRequest userRequest, BodyHandler responseHandler) { return sendAsync(userRequest, responseHandler, null); } @Override public CompletableFuture > sendAsync(HttpRequest userRequest, BodyHandler responseHandler, PushPromiseHandler pushPromiseHandler) { return sendAsync(userRequest, responseHandler, pushPromiseHandler, delegatingExecutor.delegate); } private CompletableFuture > sendAsync(HttpRequest userRequest, BodyHandler responseHandler, PushPromiseHandler pushPromiseHandler, Executor exchangeExecutor) { Objects.requireNonNull(userRequest); Objects.requireNonNull(responseHandler); AccessControlContext acc = null; if (System.getSecurityManager() != null) acc = AccessController.getContext(); // Clone the, possibly untrusted, HttpRequest HttpRequestImpl requestImpl = new HttpRequestImpl(userRequest, proxySelector); if (requestImpl.method().equals("CONNECT")) throw new IllegalArgumentException("Unsupported method CONNECT"); long start = DEBUGELAPSED ? System.nanoTime() : 0; reference(); try { if (debugelapsed.on()) debugelapsed.log("ClientImpl (async) send %s", userRequest); // When using sendAsync(...) we explicitly pass the // executor"s delegate as exchange executor to force // asynchronous scheduling of the exchange. // When using send(...) we don"t specify any executor // and default to using the client"s delegating executor // which only spawns asynchronous tasks if it detects // that the current thread is the selector manager // thread. This will cause everything to execute inline // until we need to schedule some event with the selector. Executor executor = exchangeExecutor == null ? this.delegatingExecutor : exchangeExecutor; MultiExchange mex = new MultiExchange<>(userRequest, requestImpl, this, responseHandler, pushPromiseHandler, acc); CompletableFuture > res = mex.responseAsync(executor).whenComplete((b,t) -> unreference()); if (DEBUGELAPSED) { res = res.whenComplete( (b,t) -> debugCompleted("ClientImpl (async)", start, userRequest)); } // makes sure that any dependent actions happen in the CF default // executor. This is only needed for sendAsync(...), when // exchangeExecutor is non-null. if (exchangeExecutor != null) { res = res.whenCompleteAsync((r, t) -> { /* do nothing */}, ASYNC_POOL); } return res; } catch(Throwable t) { unreference(); debugCompleted("ClientImpl (async)", start, userRequest); throw t; } }
這里如果是sendAsync的話,executor參數(shù)傳遞的是delegatingExecutor.delegate;如果是同步的send方法,則executor傳的值是null
這里創(chuàng)建了一個MultiExchange,然后調(diào)用mex.responseAsync(executor).whenComplete((b,t) -> unreference()),這里使用了executor
MultiExchange.responseAsyncjava.net.http/jdk/internal/net/http/MultiExchange.java
public CompletableFuture> responseAsync(Executor executor) { CompletableFuture start = new MinimalFuture<>(); CompletableFuture > cf = responseAsync0(start); start.completeAsync( () -> null, executor); // trigger execution return cf; } private CompletableFuture > responseAsync0(CompletableFuture start) { return start.thenCompose( v -> responseAsyncImpl()) .thenCompose((Response r) -> { Exchange exch = getExchange(); return exch.readBodyAsync(responseHandler) .thenApply((T body) -> { this.response = new HttpResponseImpl<>(r.request(), r, this.response, body, exch); return this.response; }); }); }
可以看到這里使用的是CompletableFuture的completeAsync方法(注意這個方法是java9才有的),executor也是在這里使用的
由于默認(rèn)是使用Executors.newCachedThreadPool創(chuàng)建的executor,要注意控制并發(fā)數(shù)及任務(wù)執(zhí)行時間,防止線程數(shù)無限制增長過度消耗系統(tǒng)資源
/** * Creates a thread pool that creates new threads as needed, but * will reuse previously constructed threads when they are * available, and uses the provided * ThreadFactory to create new threads when needed. * * @param threadFactory the factory to use when creating new threads * @return the newly created thread pool * @throws NullPointerException if threadFactory is null */ public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueueRejectedExecutionException(), threadFactory); }
實例代碼
@Test public void testAsyncPool(){ ThreadPoolExecutor executor = ThreadPoolBuilder.fixedPool() .setPoolSize(2) .setQueueSize(5) .setThreadNamePrefix("test-") .build(); List> futureList = IntStream.rangeClosed(1,100) .mapToObj(i -> new CompletableFuture ()) .collect(Collectors.toList()); futureList.stream() .forEach(future -> { future.completeAsync(() -> { try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e1) { e1.printStackTrace(); } return "message"; },executor); }); CompletableFuture.allOf(futureList .toArray(new CompletableFuture>[futureList.size()])) .join(); }
這里創(chuàng)建的是fixedPool,指定queueSize為5
日志輸出
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$AsyncSupply@76b10754 rejected from java.util.concurrent.ThreadPoolExecutor@2bea5ab4[Running, pool size = 2, active threads = 2, queued tasks = 5, completed tasks = 0] at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055) at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825) at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1355) at java.base/java.util.concurrent.CompletableFuture.completeAsync(CompletableFuture.java:2591)
可以看到線程池隊列大小起到了限制作用小結(jié)
jdk httpclient的executor在進(jìn)行異步操作的時候使用,默認(rèn)創(chuàng)建的是使用Executors.newCachedThreadPool創(chuàng)建的executor,其線程池大小是Integer.MAX_VALUE,因此在使用的時候要注意,最好是改為有界隊列,然后再加上線程池的監(jiān)控。
docjava.net.http javadoc
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/77340.html
摘要:序本文主要研究一下的參數(shù)這里有一個類型的變量,用來記錄請求次數(shù)另外還有一個,讀取的是值,讀取不到默認(rèn)取,為進(jìn)入該方法的時候,調(diào)用,遞增請求次數(shù),然后判斷有無超出限制,有則返回帶有異常的,即通過返回如果沒有超出限制,但是執(zhí)行請求失敗,則 序 本文主要研究一下jdk httpclient的retry參數(shù) DEFAULT_MAX_ATTEMPTS java.net.http/jdk/inte...
摘要:序本文主要研究一下的異常實例代碼異常日志如下最后調(diào)用這里調(diào)用獲取連接如果沒有連接會新創(chuàng)建一個,走的是這里先是調(diào)用了獲取連接,然后調(diào)用進(jìn)行連接這里委托給這里如果有設(shè)置的話,則會創(chuàng)建一個調(diào)用進(jìn)行連接,如果連接未 序 本文主要研究一下httpclient的connect timeout異常 實例代碼 @Test public void testConnectTimeout()...
摘要:調(diào)用計算的時間,這個方法會清理移除并過期的連接除了清理過期的連接外,還通過間接觸發(fā),去清理關(guān)閉或異常的連接 序 本文主要研究一下jdk httpclient的ConnectionPool HttpConnection HttpConnection.getConnection java.net.http/jdk/internal/net/http/HttpConnection.java ...
摘要:在中也可以直接使用返回的是,然后通過來獲取結(jié)果阻塞線程,從中獲取結(jié)果四一點嘮叨非常的年輕,網(wǎng)絡(luò)資料不多,且代碼非常精細(xì)和復(fù)雜,目前來看底層應(yīng)該是使用了線程池搭配進(jìn)行異步通訊。 零 前期準(zhǔn)備 0 版本 JDK 版本 : OpenJDK 11.0.1 IDE : idea 2018.3 1 HttpClient 簡介 java.net.http.HttpClient 是 jdk11 中正式...
摘要:異常重試默認(rèn)重試次,三次都失敗則拋出或其他異常 maven org.apache.httpcomponents httpclient 4.5.2 異常重試log 2017-01-31 19:31:39.057 INFO 3873 --- [askScheduler-13] o....
閱讀 3431·2021-10-14 09:42
閱讀 2718·2021-09-08 10:44
閱讀 1300·2021-09-02 10:18
閱讀 3600·2021-08-30 09:43
閱讀 2794·2021-07-29 13:49
閱讀 3719·2019-08-29 17:02
閱讀 1577·2019-08-29 15:09
閱讀 1035·2019-08-29 11:01