摘要:序本文主要研究一下的異常實例代碼異常日志如下最后調(diào)用這里調(diào)用獲取連接如果沒有連接會新創(chuàng)建一個,走的是這里先是調(diào)用了獲取連接,然后調(diào)用進(jìn)行連接這里委托給這里如果有設(shè)置的話,則會創(chuàng)建一個調(diào)用進(jìn)行連接,如果連接未
序
本文主要研究一下httpclient的connect timeout異常
實例代碼@Test public void testConnectTimeout() throws IOException, InterruptedException { HttpClient client = HttpClient.newBuilder() .build(); HttpRequest request = HttpRequest.newBuilder() .uri(URI.create("https://twitter.com")) .build(); long start = System.currentTimeMillis(); try{ HttpResponseresult = client.send(request, HttpResponse.BodyHandlers.ofString()); System.out.println(result.body()); }finally { long cost = System.currentTimeMillis() - start; System.out.println("cost:"+cost); } }
異常日志如下:
cost:75814 java.net.ConnectException: Operation timed out at java.net.http/jdk.internal.net.http.HttpClientImpl.send(HttpClientImpl.java:561) at java.net.http/jdk.internal.net.http.HttpClientFacade.send(HttpClientFacade.java:119) at com.example.HttpClientTest.testConnectTimeout(HttpClientTest.java:464) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at com.intellij.rt.execution.application.AppMainV2.main(AppMainV2.java:131) Caused by: java.net.ConnectException: Operation timed out at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:779) at java.net.http/jdk.internal.net.http.PlainHttpConnection$ConnectEvent.handle(PlainHttpConnection.java:128) at java.net.http/jdk.internal.net.http.HttpClientImpl$SelectorManager.handleEvent(HttpClientImpl.java:957) at java.net.http/jdk.internal.net.http.HttpClientImpl$SelectorManager.lambda$run$3(HttpClientImpl.java:912) at java.base/java.util.ArrayList.forEach(ArrayList.java:1540) at java.net.http/jdk.internal.net.http.HttpClientImpl$SelectorManager.run(HttpClientImpl.java:912)Exchange.responseAsync
java.net.http/jdk/internal/net/http/Exchange.java
public CompletableFutureresponseAsync() { return responseAsyncImpl(null); } CompletableFuture responseAsyncImpl(HttpConnection connection) { SecurityException e = checkPermissions(); if (e != null) { return MinimalFuture.failedFuture(e); } else { return responseAsyncImpl0(connection); } } CompletableFuture responseAsyncImpl0(HttpConnection connection) { Function , CompletableFuture > after407Check; bodyIgnored = null; if (request.expectContinue()) { request.addSystemHeader("Expect", "100-Continue"); Log.logTrace("Sending Expect: 100-Continue"); // wait for 100-Continue before sending body after407Check = this::expectContinue; } else { // send request body and proceed. after407Check = this::sendRequestBody; } // The ProxyAuthorizationRequired can be triggered either by // establishExchange (case of HTTP/2 SSL tunneling through HTTP/1.1 proxy // or by sendHeaderAsync (case of HTTP/1.1 SSL tunneling through HTTP/1.1 proxy // Therefore we handle it with a call to this checkFor407(...) after these // two places. Function , CompletableFuture > afterExch407Check = (ex) -> ex.sendHeadersAsync() .handle((r,t) -> this.checkFor407(r, t, after407Check)) .thenCompose(Function.identity()); return establishExchange(connection) .handle((r,t) -> this.checkFor407(r,t, afterExch407Check)) .thenCompose(Function.identity()); } // get/set the exchange impl, solving race condition issues with // potential concurrent calls to cancel() or cancel(IOException) private CompletableFuture extends ExchangeImpl > establishExchange(HttpConnection connection) { if (debug.on()) { debug.log("establishing exchange for %s,%n proxy=%s", request, request.proxy()); } // check if we have been cancelled first. Throwable t = getCancelCause(); checkCancelled(); if (t != null) { return MinimalFuture.failedFuture(t); } CompletableFuture extends ExchangeImpl > cf, res; cf = ExchangeImpl.get(this, connection); // We should probably use a VarHandle to get/set exchangeCF // instead - as we need CAS semantics. synchronized (this) { exchangeCF = cf; }; res = cf.whenComplete((r,x) -> { synchronized(Exchange.this) { if (exchangeCF == cf) exchangeCF = null; } }); checkCancelled(); return res.thenCompose((eimpl) -> { // recheck for cancelled, in case of race conditions exchImpl = eimpl; IOException tt = getCancelCause(); checkCancelled(); if (tt != null) { return MinimalFuture.failedFuture(tt); } else { // Now we"re good to go. Because exchImpl is no longer // null cancel() will be able to propagate directly to // the impl after this point ( if needed ). return MinimalFuture.completedFuture(eimpl); } }); }
responseAsync最后調(diào)用ExchangeImpl.get(this, connection)
ExchangeImpl.getjava.net.http/jdk/internal/net/http/ExchangeImpl.java
/** * Initiates a new exchange and assigns it to a connection if one exists * already. connection usually null. */ static CompletableFuture extends ExchangeImpl> get(Exchange exchange, HttpConnection connection) { if (exchange.version() == HTTP_1_1) { if (debug.on()) debug.log("get: HTTP/1.1: new Http1Exchange"); return createHttp1Exchange(exchange, connection); } else { Http2ClientImpl c2 = exchange.client().client2(); // #### improve HttpRequestImpl request = exchange.request(); CompletableFuturec2f = c2.getConnectionFor(request, exchange); if (debug.on()) debug.log("get: Trying to get HTTP/2 connection"); return c2f.handle((h2c, t) -> createExchangeImpl(h2c, t, exchange, connection)) .thenCompose(Function.identity()); } }
這里調(diào)用Http2ClientImpl.getConnectionFor獲取連接
Http2ClientImpl.getConnectionForjava.net.http/jdk/internal/net/http/Http2ClientImpl.java
/** * When HTTP/2 requested only. The following describes the aggregate behavior including the * calling code. In all cases, the HTTP2 connection cache * is checked first for a suitable connection and that is returned if available. * If not, a new connection is opened, except in https case when a previous negotiate failed. * In that case, we want to continue using http/1.1. When a connection is to be opened and * if multiple requests are sent in parallel then each will open a new connection. * * If negotiation/upgrade succeeds then * one connection will be put in the cache and the others will be closed * after the initial request completes (not strictly necessary for h2, only for h2c) * * If negotiate/upgrade fails, then any opened connections remain open (as http/1.1) * and will be used and cached in the http/1 cache. Note, this method handles the * https failure case only (by completing the CF with an ALPN exception, handled externally) * The h2c upgrade is handled externally also. * * Specific CF behavior of this method. * 1. completes with ALPN exception: h2 negotiate failed for first time. failure recorded. * 2. completes with other exception: failure not recorded. Caller must handle * 3. completes normally with null: no connection in cache for h2c or h2 failed previously * 4. completes normally with connection: h2 or h2c connection in cache. Use it. */ CompletableFuturegetConnectionFor(HttpRequestImpl req, Exchange> exchange) { URI uri = req.uri(); InetSocketAddress proxy = req.proxy(); String key = Http2Connection.keyFor(uri, proxy); synchronized (this) { Http2Connection connection = connections.get(key); if (connection != null) { try { if (connection.closed || !connection.reserveStream(true)) { if (debug.on()) debug.log("removing found closed or closing connection: %s", connection); deleteConnection(connection); } else { // fast path if connection already exists if (debug.on()) debug.log("found connection in the pool: %s", connection); return MinimalFuture.completedFuture(connection); } } catch (IOException e) { // thrown by connection.reserveStream() return MinimalFuture.failedFuture(e); } } if (!req.secure() || failures.contains(key)) { // secure: negotiate failed before. Use http/1.1 // !secure: no connection available in cache. Attempt upgrade if (debug.on()) debug.log("not found in connection pool"); return MinimalFuture.completedFuture(null); } } return Http2Connection .createAsync(req, this, exchange) .whenComplete((conn, t) -> { synchronized (Http2ClientImpl.this) { if (conn != null) { try { conn.reserveStream(true); } catch (IOException e) { throw new UncheckedIOException(e); // shouldn"t happen } offerConnection(conn); } else { Throwable cause = Utils.getCompletionCause(t); if (cause instanceof Http2Connection.ALPNException) failures.add(key); } } }); }
如果沒有連接會新創(chuàng)建一個,走的是Http2Connection.createAsync
Http2Connection.createAsyncjava.net.http/jdk/internal/net/http/Http2Connection.java
// Requires TLS handshake. So, is really async static CompletableFuturecreateAsync(HttpRequestImpl request, Http2ClientImpl h2client, Exchange> exchange) { assert request.secure(); AbstractAsyncSSLConnection connection = (AbstractAsyncSSLConnection) HttpConnection.getConnection(request.getAddress(), h2client.client(), request, HttpClient.Version.HTTP_2); // Expose the underlying connection to the exchange"s aborter so it can // be closed if a timeout occurs. exchange.connectionAborter.connection(connection); return connection.connectAsync(exchange) .thenCompose(unused -> connection.finishConnect()) .thenCompose(unused -> checkSSLConfig(connection)) .thenCompose(notused-> { CompletableFuture cf = new MinimalFuture<>(); try { Http2Connection hc = new Http2Connection(request, h2client, connection); cf.complete(hc); } catch (IOException e) { cf.completeExceptionally(e); } return cf; } ); }
這里先是調(diào)用了HttpConnection.getConnection獲取連接,然后調(diào)用connectAsync進(jìn)行連接
AsyncSSLConnectionjava.net.http/jdk/internal/net/http/AsyncSSLConnection.java
@Override public CompletableFutureconnectAsync(Exchange> exchange) { return plainConnection .connectAsync(exchange) .thenApply( unused -> { // create the SSLTube wrapping the SocketTube, with the given engine flow = new SSLTube(engine, client().theExecutor(), client().getSSLBufferSupplier()::recycle, plainConnection.getConnectionFlow()); return null; } ); }
這里委托給plainConnection.connectAsync
PlainHttpConnection.connectAsyncjava.net.http/jdk/internal/net/http/PlainHttpConnection.java
@Override public CompletableFutureconnectAsync(Exchange> exchange) { CompletableFuture cf = new MinimalFuture<>(); try { assert !connected : "Already connected"; assert !chan.isBlocking() : "Unexpected blocking channel"; boolean finished; connectTimerEvent = newConnectTimer(exchange, cf); if (connectTimerEvent != null) { if (debug.on()) debug.log("registering connect timer: " + connectTimerEvent); client().registerTimer(connectTimerEvent); } PrivilegedExceptionAction pa = () -> chan.connect(Utils.resolveAddress(address)); try { finished = AccessController.doPrivileged(pa); } catch (PrivilegedActionException e) { throw e.getCause(); } if (finished) { if (debug.on()) debug.log("connect finished without blocking"); cf.complete(null); } else { if (debug.on()) debug.log("registering connect event"); client().registerEvent(new ConnectEvent(cf)); } } catch (Throwable throwable) { cf.completeExceptionally(Utils.toConnectException(throwable)); try { close(); } catch (Exception x) { if (debug.on()) debug.log("Failed to close channel after unsuccessful connect"); } } return cf; }
這里如果client有設(shè)置connectTimeout的話,則會創(chuàng)建一個connectTimerEvent
調(diào)用chan.connect進(jìn)行連接,如果連接未完成,則注冊ConnectEvent
SocketChannelImpl.connectjava.base/sun/nio/ch/SocketChannelImpl.java
@Override public boolean connect(SocketAddress sa) throws IOException { InetSocketAddress isa = Net.checkAddress(sa); SecurityManager sm = System.getSecurityManager(); if (sm != null) sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort()); InetAddress ia = isa.getAddress(); if (ia.isAnyLocalAddress()) ia = InetAddress.getLocalHost(); try { readLock.lock(); try { writeLock.lock(); try { int n = 0; boolean blocking = isBlocking(); try { beginConnect(blocking, isa); do { n = Net.connect(fd, ia, isa.getPort()); } while (n == IOStatus.INTERRUPTED && isOpen()); } finally { endConnect(blocking, (n > 0)); } assert IOStatus.check(n); return n > 0; } finally { writeLock.unlock(); } } finally { readLock.unlock(); } } catch (IOException ioe) { // connect failed, close the channel close(); throw SocketExceptions.of(ioe, isa); } }
通過Net.connect調(diào)用本地方法進(jìn)行連接
ConnectEventjava.net.http/jdk/internal/net/http/PlainHttpConnection.java
final class ConnectEvent extends AsyncEvent { private final CompletableFuturecf; ConnectEvent(CompletableFuture cf) { this.cf = cf; } @Override public SelectableChannel channel() { return chan; } @Override public int interestOps() { return SelectionKey.OP_CONNECT; } @Override public void handle() { try { assert !connected : "Already connected"; assert !chan.isBlocking() : "Unexpected blocking channel"; if (debug.on()) debug.log("ConnectEvent: finishing connect"); boolean finished = chan.finishConnect(); assert finished : "Expected channel to be connected"; if (debug.on()) debug.log("ConnectEvent: connect finished: %s Local addr: %s", finished, chan.getLocalAddress()); // complete async since the event runs on the SelectorManager thread cf.completeAsync(() -> null, client().theExecutor()); } catch (Throwable e) { Throwable t = Utils.toConnectException(e); client().theExecutor().execute( () -> cf.completeExceptionally(t)); close(); } } @Override public void abort(IOException ioe) { client().theExecutor().execute( () -> cf.completeExceptionally(ioe)); close(); } }
SelectorManager對準(zhǔn)備好的事件觸發(fā)handle操作,對于ConnectEvent,就是調(diào)用ConnectEvent.handle
ConnectEvent的handle方法執(zhí)行chan.finishConnect(),如果捕獲到異常,則調(diào)用cf.completeExceptionally(t)
SocketChannelImpl.finishConnectjava.base/sun/nio/ch/SocketChannelImpl.java
@Override public boolean finishConnect() throws IOException { try { readLock.lock(); try { writeLock.lock(); try { // no-op if already connected if (isConnected()) return true; boolean blocking = isBlocking(); boolean connected = false; try { beginFinishConnect(blocking); int n = 0; if (blocking) { do { n = checkConnect(fd, true); } while ((n == 0 || n == IOStatus.INTERRUPTED) && isOpen()); } else { n = checkConnect(fd, false); } connected = (n > 0); } finally { endFinishConnect(blocking, connected); } assert (blocking && connected) ^ !blocking; return connected; } finally { writeLock.unlock(); } } finally { readLock.unlock(); } } catch (IOException ioe) { // connect failed, close the channel close(); throw SocketExceptions.of(ioe, remoteAddress); } }
checkConnect是一個本地方法,如果是連接超時,則拋出java.net.ConnectException: Operation timed out
tcp連接syn超時(net.ipv4.tcp_syn_retries)
當(dāng)client端與server端建立連接,client發(fā)出syn包,如果等待一定時間沒有收到server端發(fā)來的SYN+ACK,則會進(jìn)行重試,重試次數(shù)由具體由net.ipv4.tcp_syn_retries決定
/ # sysctl -a | grep tcp_syn_retries sysctl: error reading key "net.ipv6.conf.all.stable_secret": I/O error net.ipv4.tcp_syn_retries = 6 sysctl: error reading key "net.ipv6.conf.default.stable_secret": I/O error sysctl: error reading key "net.ipv6.conf.eth0.stable_secret": I/O error sysctl: error reading key "net.ipv6.conf.lo.stable_secret": I/O error
linux默認(rèn)是6次,第一次發(fā)送等待2^0秒沒收到回包則重試第一次,之后等待2^1,以此類推,第六次重試等待2^6秒,因此一共是1s+2s+4s+8s+16s+32s+64s=127s,因而在linux平臺下,如果httpclient沒有設(shè)置connect timeout,則依賴系統(tǒng)tcp的syn超時,即127s之后超時,java的本地調(diào)用拋出java.net.ConnectException: Operation timed out
如果是mac系統(tǒng),根據(jù)Overriding the default Linux kernel 20-second TCP socket connect timeout的描述,超時是75s,與本實例代碼輸出的75814ms近似一致。小結(jié)
使用jdk httpclient進(jìn)行連接,如果沒有設(shè)置client的connectTimeout,則具體的超時時間依賴系統(tǒng)的tcp相關(guān)設(shè)置
如果client端sync發(fā)送超時,則依賴tcp_syn_retries的配置來決定本地方法拋出java.net.ConnectException: Operation timed out異常的時間
linux下默認(rèn)tcp_syn_retries默認(rèn)為6,即重試6次,一共需要1s+2s+4s+8s+16s+32s+64s=127s,若再沒有收到server端發(fā)來的SYN+ACK則拋出java.net.ConnectException: Operation timed out異常
docTCP協(xié)議的那些超時
Linux 建立 TCP 連接的超時時間分析
Overriding the default Linux kernel 20-second TCP socket connect timeout
設(shè)置linux中tcp默認(rèn)的20秒connect超時時間
SYN丟包的幾個例子
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/77351.html
摘要:序本文主要研究一下的參數(shù)這里有一個類型的變量,用來記錄請求次數(shù)另外還有一個,讀取的是值,讀取不到默認(rèn)取,為進(jìn)入該方法的時候,調(diào)用,遞增請求次數(shù),然后判斷有無超出限制,有則返回帶有異常的,即通過返回如果沒有超出限制,但是執(zhí)行請求失敗,則 序 本文主要研究一下jdk httpclient的retry參數(shù) DEFAULT_MAX_ATTEMPTS java.net.http/jdk/inte...
摘要:調(diào)用計算的時間,這個方法會清理移除并過期的連接除了清理過期的連接外,還通過間接觸發(fā),去清理關(guān)閉或異常的連接 序 本文主要研究一下jdk httpclient的ConnectionPool HttpConnection HttpConnection.getConnection java.net.http/jdk/internal/net/http/HttpConnection.java ...
摘要:序本文主要研究一下的這里如果的為,則會創(chuàng)建這里如果是的話,參數(shù)傳遞的是如果是同步的方法,則傳的值是這里創(chuàng)建了一個,然后調(diào)用,這里使用了可以看到這里使用的是的方法注意這個方法是才有的,也是在這里使用的由于默認(rèn)是使用創(chuàng)建的, 序 本文主要研究一下jdk httpclient的executor HttpClientImpl java.net.http/jdk/internal/net/htt...
序 本文主要研究一下Java11的HttpClient的基本使用。 變化 從java9的jdk.incubator.httpclient模塊遷移到j(luò)ava.net.http模塊,包名由jdk.incubator.http改為java.net.http 原來的諸如HttpResponse.BodyHandler.asString()方法變更為HttpResponse.BodyHandlers.of...
摘要:在中也可以直接使用返回的是,然后通過來獲取結(jié)果阻塞線程,從中獲取結(jié)果四一點嘮叨非常的年輕,網(wǎng)絡(luò)資料不多,且代碼非常精細(xì)和復(fù)雜,目前來看底層應(yīng)該是使用了線程池搭配進(jìn)行異步通訊。 零 前期準(zhǔn)備 0 版本 JDK 版本 : OpenJDK 11.0.1 IDE : idea 2018.3 1 HttpClient 簡介 java.net.http.HttpClient 是 jdk11 中正式...
閱讀 6866·2021-09-22 15:36
閱讀 5687·2021-09-02 10:20
閱讀 1868·2019-08-30 15:44
閱讀 2652·2019-08-29 14:06
閱讀 1152·2019-08-29 11:17
閱讀 1585·2019-08-26 14:05
閱讀 3092·2019-08-26 13:50
閱讀 1550·2019-08-26 10:26