本文会先简单说下OkHttp3的工作流程,然后介绍OkHttp3的一些核心类(如连接池StreamAllocation以及各式各样的Interceptor),接着从源码角度分析一次HTTP请求在OkHttp3中所经历的过程,在不同的Interceptor(拦截器)可以看到一些OkHttp3设计的一些巧妙思想,最后对上述分析做个简单的总结。
Okhttp3是Square公司开源的强大高效的Java网络请求库,旨于替换Java的HttpUrlConnection和Apache的HttpClient的轻量级网络框架,已经被运用到很多开源库以及Android的源码中(Android Studio 在6.0之后,移除了HttpClient,并且用OKHttp代替了HttpUrlConnection)。
支持Http2/SPDY;
默认启用长连接,使用连接池管理,支持Cache(目前仅支持GET请求的缓存);
路由节点管理,提升访问速度;
透明的Gzip处理,节省网络流量。
灵活的拦截器,行为类似Java EE的Filter或者函数编程中的Map。
OKhttp3也主要由以下6个概念一起运作:
OkHttpClient:客户端;
Dispatcher:线程池;
Interceptor:拦截器;
Request:请求;
Response:响应;
CallBack:回调。
OkHttp的简单使用方法 OkHttp 使用方法,直接抄官网的 (╯-╰)/ 。
GET 请求:
1
2
3
4
5
6
7
8
9
10
OkHttpClient client = new OkHttpClient();
String run (String url) throws IOException {
Request request = new Request.Builder()
.url(url)
.build();
Response response = client.newCall(request).execute();
return response.body().string();
}
POST 请求:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static final MediaType JSON
= MediaType.parse("application/json; charset=utf-8" );
OkHttpClient client = new OkHttpClient();
String post (String url, String json) throws IOException {
RequestBody body = RequestBody.create(JSON, json);
Request request = new Request.Builder()
.url(url)
.post(body)
.build();
Response response = client.newCall(request).execute();
return response.body().string();
}
深入源码 在这里,先分析下同步请求的源码,之后再回过头来看异步请求的源码。
Let’s go !
同步请求 OkHttpClient 首先创建一个 OkHttpClient
对象,那我们看看在构造器中做了什么:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public OkHttpClient () {
this (new Builder());
}
OkHttpClient(Builder builder) {
this .dispatcher = okHttpClient.dispatcher;
this .proxy = okHttpClient.proxy;
this .protocols = okHttpClient.protocols;
this .connectionSpecs = okHttpClient.connectionSpecs;
this .interceptors.addAll(okHttpClient.interceptors);
this .networkInterceptors.addAll(okHttpClient.networkInterceptors);
this .eventListenerFactory = okHttpClient.eventListenerFactory;
this .proxySelector = okHttpClient.proxySelector;
this .cookieJar = okHttpClient.cookieJar;
this .internalCache = okHttpClient.internalCache;
this .cache = okHttpClient.cache;
this .socketFactory = okHttpClient.socketFactory;
boolean isTLS = false ;
for (ConnectionSpec spec : connectionSpecs) {
isTLS = isTLS || spec.isTls();
}
if (builder.sslSocketFactory != null || !isTLS) {
this .sslSocketFactory = builder.sslSocketFactory;
this .certificateChainCleaner = builder.certificateChainCleaner;
} else {
X509TrustManager trustManager = systemDefaultTrustManager();
this .sslSocketFactory = systemDefaultSslSocketFactory(trustManager);
this .certificateChainCleaner = CertificateChainCleaner.get(trustManager);
}
this .hostnameVerifier = builder.hostnameVerifier;
this .certificatePinner = builder.certificatePinner.withCertificateChainCleaner(
certificateChainCleaner);
this .proxyAuthenticator = builder.proxyAuthenticator;
this .authenticator = builder.authenticator;
this .connectionPool = builder.connectionPool;
this .dns = builder.dns;
this .followSslRedirects = builder.followSslRedirects;
this .followRedirects = builder.followRedirects;
this .retryOnConnectionFailure = builder.retryOnConnectionFailure;
this .connectTimeout = builder.connectTimeout;
this .readTimeout = builder.readTimeout;
this .writeTimeout = builder.writeTimeout;
this .pingInterval = builder.pingInterval;
}
在构造器中利用建造者模式来构建 OkHttpClient
的对象。当然,如果你想自定义 OkHttpClient
配置的话,就要 new 一个 OkHttpClient.Builder
来配置自己的参数了。相信大家都干过这种事情了(∩_∩)。
OkHttpClient
的构造器中主要是扎堆扎堆的配置,没别的。
所以OKHttpClient这个类作用就是把配置全部生成好相当于一个机关枪,工厂已经把所有需要的配件都生成好了。就差扣扳机钥匙来开枪了。那么子弹就是我们的Request这个类。但是看这个类之前我们先去看看这里面为什么能做连续发射(Dispatcher)。
之后再调用 newCall(Request request)
:
1
2
3
4
@Override
public Call newCall (Request request) {
return RealCall.newRealCall(this , request, false );
}
在方法里面其实是创建了一个 RealCall
的对象,那么我们就进入 RealCall
中去看看吧。
RealCall 在 RealCall
的构造器中只是给一些变量赋值或初始化而已,没什么:
1
2
3
4
5
6
7
8
9
10
11
12
13
static RealCall newRealCall (OkHttpClient client, Request originalRequest, boolean forWebSocket) {
RealCall call = new RealCall(client, originalRequest, forWebSocket);
call.eventListener = client.eventListenerFactory().create(call);
return call;
}
private RealCall (OkHttpClient client, Request originalRequest, boolean forWebSocket) {
this .client = client;
this .originalRequest = originalRequest;
this .forWebSocket = forWebSocket;
this .retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
}
然后再把目光转向 RealCall
中的 execute()
方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override
public Response execute () throws IOException {
synchronized (this ) {
if (executed) throw new IllegalStateException("Already Executed" );
executed = true ;
}
captureCallStackTrace();
try {
client.dispatcher().executed(this );
Response result = getResponseWithInterceptorChain();
if (result == null ) throw new IOException("Canceled" );
return result;
} finally {
client.dispatcher().finished(this );
}
}
execute()
方法为执行该 RealCall
,在方法里面一开始检查了该 call 时候被执行。
然后又加入了 Dispatcher
的 runningSyncCalls
中。runningSyncCalls
队列只是用来记录正在同步请求中的 call ,在 call 完成请求后又会从 runningSyncCalls
中移除。
可见,在同步请求中 Dispatcher
参与的部分很少。但是在异步请求中, Dispatcher
可谓是大展身手。
最重要的方法,那就是 getResponseWithInterceptorChain()
。我们可以看到这方法是直接返回 Response
对象的,所以,在这个方法中一定做了很多很多的事情。
那就继续深入吧:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Response getResponseWithInterceptorChain () throws IOException {
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null , null , null , 0 ,
originalRequest, this , eventListener, client.readTimeoutMillis());
return chain.proceed(originalRequest);
}
在 getResponseWithInterceptorChain()
方法中有一堆的拦截器!!!
我们都知道,拦截器是 OkHttp 的精髓。
client.interceptors()
,首先加入 interceptors
的是用户自定义的拦截器,比如修改请求头的拦截器等;
RetryAndFollowUpInterceptor
是用来重试和重定向的拦截器,在下面我们会讲到;
BridgeInterceptor
是用来将用户友好的请求转化为向服务器的请求,之后又把服务器的响应转化为对用户友好的响应;
CacheInterceptor
是缓存拦截器,若存在缓存并且可用就直接返回该缓存,否则会向服务器请求;
ConnectInterceptor
用来建立连接的拦截器;
client.networkInterceptors()
加入用户自定义的 networkInterceptors
;
CallServerInterceptor
是真正向服务器发出请求且得到响应的拦截器;
最后在聚合了这些拦截器后,利用 RealInterceptorChain
来链式调用这些拦截器,利用的就是责任链模式。
RealInterceptorChain RealInterceptorChain
可以说是真正把这些拦截器串起来的一个角色。一个个拦截器就像一颗颗珠子,而 RealInterceptorChain
就是把这些珠子串连起来的那根绳子。
进入 RealInterceptorChain
,主要是 proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec, RealConnection connection)
这个方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public Response proceed (Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
if (index >= interceptors.size()) throw new AssertionError();
calls++;
if (this .httpCodec != null && !this .connection.supportsUrl(request.url())) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1 )
+ " must retain the same host and port" );
}
if (this .httpCodec != null && calls > 1 ) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1 )
+ " must call proceed() exactly once" );
}
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1 , request, call, eventListener, readTimeout);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1 ) {
throw new IllegalStateException("network interceptor " + interceptor
+ " must call proceed() exactly once" );
}
if (response == null ) {
throw new NullPointerException("interceptor " + interceptor + " returned null" );
}
if (response.body() == null ) {
throw new IllegalStateException(
"interceptor " + interceptor + " returned a response with no body" );
}
return response;
}
下面就要进入分析拦截器的步骤了,至于用户自定义的拦截器在这就略过了。还有,拦截器只分析主要的 intercept(Chain chain)
代码。
RetryAndFollowUpInterceptor 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
@Override
public Response intercept (Chain chain) throws IOException {
Request request = chain.request();
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Call call = realChain.call();
EventListener eventListener = realChain.eventListener();
streamAllocation = new StreamAllocation(client.connectionPool(), createAddress(request.url()),
call, eventListener, callStackTrace);
int followUpCount = 0 ;
Response priorResponse = null ;
while (true ) {
if (canceled) {
streamAllocation.release();
throw new IOException("Canceled" );
}
Response response = null ;
boolean releaseConnection = true ;
try {
response = realChain.proceed(request, streamAllocation, null , null );
releaseConnection = false ;
} catch (RouteException e) {
if (!recover(e.getLastConnectException(), false , request)) {
throw e.getLastConnectException();
}
releaseConnection = false ;
continue ;
} catch (IOException e) {
boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
if (!recover(e, requestSendStarted, request)) throw e;
releaseConnection = false ;
continue ;
} finally {
if (releaseConnection) {
streamAllocation.streamFailed(null );
streamAllocation.release();
}
}
if (priorResponse != null ) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null )
.build())
.build();
}
Request followUp = followUpRequest(response);
if (followUp == null ) {
if (!forWebSocket) {
streamAllocation.release();
}
return response;
}
closeQuietly(response.body());
if (++followUpCount > MAX_FOLLOW_UPS) {
streamAllocation.release();
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}
if (followUp.body() instanceof UnrepeatableRequestBody) {
streamAllocation.release();
throw new HttpRetryException("Cannot retry streamed HTTP body" , response.code());
}
if (!sameConnection(response, followUp.url())) {
streamAllocation.release();
streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(followUp.url()), call, eventListener, callStackTrace);
} else if (streamAllocation.codec() != null ) {
throw new IllegalStateException("Closing the body of " + response
+ " didn't close its backing stream. Bad interceptor?" );
}
request = followUp;
priorResponse = response;
}
}
总体来说,RetryAndFollowUpInterceptor
是用来失败重试以及重定向的拦截器。
BridgeInterceptor 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
@Override
public Response intercept (Chain chain) throws IOException {
Request userRequest = chain.request();
Request.Builder requestBuilder = userRequest.newBuilder();
RequestBody body = userRequest.body();
if (body != null ) {
MediaType contentType = body.contentType();
if (contentType != null ) {
requestBuilder.header("Content-Type" , contentType.toString());
}
long contentLength = body.contentLength();
if (contentLength != -1 ) {
requestBuilder.header("Content-Length" , Long.toString(contentLength));
requestBuilder.removeHeader("Transfer-Encoding" );
} else {
requestBuilder.header("Transfer-Encoding" , "chunked" );
requestBuilder.removeHeader("Content-Length" );
}
}
if (userRequest.header("Host" ) == null ) {
requestBuilder.header("Host" , hostHeader(userRequest.url(), false ));
}
if (userRequest.header("Connection" ) == null ) {
requestBuilder.header("Connection" , "Keep-Alive" );
}
boolean transparentGzip = false ;
if (userRequest.header("Accept-Encoding" ) == null && userRequest.header("Range" ) == null ) {
transparentGzip = true ;
requestBuilder.header("Accept-Encoding" , "gzip" );
}
List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
if (!cookies.isEmpty()) {
requestBuilder.header("Cookie" , cookieHeader(cookies));
}
if (userRequest.header("User-Agent" ) == null ) {
requestBuilder.header("User-Agent" , Version.userAgent());
}
Response networkResponse = chain.proceed(requestBuilder.build());
HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
Response.Builder responseBuilder = networkResponse.newBuilder()
.request(userRequest);
if (transparentGzip
&& "gzip" .equalsIgnoreCase(networkResponse.header("Content-Encoding" ))
&& HttpHeaders.hasBody(networkResponse)) {
GzipSource responseBody = new GzipSource(networkResponse.body().source());
Headers strippedHeaders = networkResponse.headers().newBuilder()
.removeAll("Content-Encoding" )
.removeAll("Content-Length" )
.build();
responseBuilder.headers(strippedHeaders);
responseBuilder.body(new RealResponseBody(strippedHeaders, Okio.buffer(responseBody)));
}
return responseBuilder.build();
}
在 BridgeInterceptor
这一步,先把用户友好的请求进行重新构造,变成了向服务器发送的请求。
之后调用 chain.proceed(requestBuilder.build())
进行下一个拦截器的处理。
等到后面的拦截器都处理完毕,得到响应。再把 networkResponse
转化成对用户友好的 response
。
CacheInterceptor 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
@Override
public Response intercept (Chain chain) throws IOException {
Response cacheCandidate = cache != null
? cache.get(chain.request())
: null ;
long now = System.currentTimeMillis();
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
Request networkRequest = strategy.networkRequest;
Response cacheResponse = strategy.cacheResponse;
if (cache != null ) {
cache.trackResponse(strategy);
}
if (cacheCandidate != null && cacheResponse == null ) {
closeQuietly(cacheCandidate.body());
}
if (networkRequest == null && cacheResponse == null ) {
return new Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(504 )
.message("Unsatisfiable Request (only-if-cached)" )
.body(Util.EMPTY_RESPONSE)
.sentRequestAtMillis(-1L )
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
}
if (networkRequest == null ) {
return cacheResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build();
}
Response networkResponse = null ;
try {
networkResponse = chain.proceed(networkRequest);
} finally {
if (networkResponse == null && cacheCandidate != null ) {
closeQuietly(cacheCandidate.body());
}
}
if (cacheResponse != null ) {
if (networkResponse.code() == HTTP_NOT_MODIFIED) {
Response response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers(), networkResponse.headers()))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis())
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
networkResponse.body().close();
cache.trackConditionalCacheHit();
cache.update(cacheResponse, response);
return response;
} else {
closeQuietly(cacheResponse.body());
}
}
Response response = networkResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
if (cache != null ) {
if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
CacheRequest cacheRequest = cache.put(response);
return cacheWritingResponse(cacheRequest, response);
}
if (HttpMethod.invalidatesCache(networkRequest.method())) {
try {
cache.remove(networkRequest);
} catch (IOException ignored) {
}
}
}
return response;
}
CacheInterceptor
做的事情就是根据请求拿到缓存,若没有缓存或者缓存失效,就进入网络请求阶段,否则会返回缓存。
ConnectInterceptor 1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public Response intercept (Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();
boolean doExtensiveHealthChecks = !request.method().equals("GET" );
HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
这里调用了 streamAllocation.newStream
创建了一个 HttpCodec
的对象。
而 HttpCodec
是一个抽象类,其实现类分别是 Http1Codec
和 Http2Codec
。相对应的就是 HTTP/1.1 和 HTTP/2.0 。
我们来看下 streamAllocation.newStream
的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public HttpCodec newStream (OkHttpClient client, boolean doExtensiveHealthChecks) {
int connectTimeout = client.connectTimeoutMillis();
int readTimeout = client.readTimeoutMillis();
int writeTimeout = client.writeTimeoutMillis();
boolean connectionRetryEnabled = client.retryOnConnectionFailure();
try {
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
HttpCodec resultCodec = resultConnection.newCodec(client, this );
synchronized (connectionPool) {
codec = resultCodec;
return resultCodec;
}
} catch (IOException e) {
throw new RouteException(e);
}
}
在 newStream(OkHttpClient client, boolean doExtensiveHealthChecks)
中先在连接池中找到可用的连接 resultConnection
,再结合 sink
和 source
创建出 HttpCodec
的对象。
CallServerInterceptor 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
@Override
public Response intercept (Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
HttpCodec httpCodec = realChain.httpStream();
StreamAllocation streamAllocation = realChain.streamAllocation();
RealConnection connection = (RealConnection) realChain.connection();
Request request = realChain.request();
long sentRequestMillis = System.currentTimeMillis();
httpCodec.writeRequestHeaders(request);
Response.Builder responseBuilder = null ;
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null ) {
if ("100-continue" .equalsIgnoreCase(request.header("Expect" ))) {
httpCodec.flushRequest();
responseBuilder = httpCodec.readResponseHeaders(true );
}
if (responseBuilder == null ) {
Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
} else if (!connection.isMultiplexed()) {
streamAllocation.noNewStreams();
}
}
httpCodec.finishRequest();
if (responseBuilder == null ) {
responseBuilder = httpCodec.readResponseHeaders(false );
}
Response response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
int code = response.code();
if (forWebSocket && code == 101 ) {
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
response = response.newBuilder()
.body(httpCodec.openResponseBody(response))
.build();
}
if ("close" .equalsIgnoreCase(response.request().header("Connection" ))
|| "close" .equalsIgnoreCase(response.header("Connection" ))) {
streamAllocation.noNewStreams();
}
if ((code == 204 || code == 205 ) && response.body().contentLength() > 0 ) {
throw new ProtocolException(
"HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
}
return response;
}
在 CallServerInterceptor
中可见,关于请求和响应部分都是通过 HttpCodec
来实现的。而在 HttpCodec
内部又是通过 sink
和 source
来实现的。所以说到底还是 IO 流在起作用。
小结 到这里,我们也完全明白了 OkHttp 中的分层思想,每一个 interceptor 只处理自己的事,而剩余的就交给其他的 interceptor 。这种思想可以简化一些繁琐复杂的流程,从而达到逻辑清晰、互不干扰的效果。
异步请求 与同步请求直接调用 execute()
不同的是,异步请求是调用了 enqueue(Callback responseCallback)
这个方法。那么我们对异步请求探究的入口就是 enqueue(Callback responseCallback)
了。
RealCall 1
2
3
4
5
6
7
8
9
10
@Override
public void enqueue (Callback responseCallback) {
synchronized (this ) {
if (executed) throw new IllegalStateException("Already Executed" );
executed = true ;
}
captureCallStackTrace();
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
主要的方法就是调用了 Dispatcher
的 enqueue(AsyncCall call)
方法。这里需要注意的是,传入的是 AsyncCall
对象,而不是同步中的 RealCall
。
那么我们就跟进到 Dispatcher
的源码中吧,至于 AsyncCall
我们会在下面详细讲到。
Dispatcher 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
synchronized void enqueue (AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
public synchronized ExecutorService executorService () {
if (executorService == null ) {
executorService = new ThreadPoolExecutor(0 , Integer.MAX_VALUE, 60 , TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher" , false ));
}
return executorService;
}
从 enqueue(AsyncCall call)
中可以知道,OkHttp 在运行中的异步请求数最多为 63 ,而同一个 host 的异步请求数最多为 4 。否则会加入到 readyAsyncCalls
中。
在加入到 runningAsyncCalls
后,就会进入线程池中被执行。到了这里,我们就要到 AsyncCall
中一探究竟了。
AsyncCall 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
AsyncCall(Callback responseCallback) {
super ("OkHttp %s" , redactedUrl());
this .responseCallback = responseCallback;
}
String host () {
return originalRequest.url().host();
}
Request request () {
return originalRequest;
}
RealCall get () {
return RealCall.this ;
}
@Override
protected void execute () {
boolean signalledCallback = false ;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true ;
responseCallback.onFailure(RealCall.this , new IOException("Canceled" ));
} else {
signalledCallback = true ;
responseCallback.onResponse(RealCall.this , response);
}
} catch (IOException e) {
if (signalledCallback) {
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this , e);
}
} finally {
client.dispatcher().finished(this );
}
}
}
在 AsyncCall
的 execute()
方法中,也是调用了 getResponseWithInterceptorChain()
方法来得到 Response
对象。从这里开始,就和同步请求的流程是一样的,就没必要讲了。
在得到 Response
后,进行结果的回调。
最后,调用了 Dispatcher
的 finished
方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void finished (AsyncCall call) {
finished(runningAsyncCalls, call, true );
}
private <T> void finished (Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this ) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!" );
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this .idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null ) {
idleCallback.run();
}
}
在 finished(Deque<T> calls, T call, boolean promoteCalls)
中对该 call 移除。
若在 readyAsyncCalls
中其他的 call ,就移动到 runningAsyncCalls
中并加入线程池中。
这样,完整的流程就循环起来了。
基本上 OkHttp 的请求响应的流程就讲完了,篇幅有点长长长啊。不过还有很多点没有涉及到的