w4lle's Notes

人生如逆旅,我亦是行人。

w4lle's avatar w4lle

OkHttp3 源码浅析

背景

之前的底层网络库基本就是Apache HttpClient和HttpURLConnection。由于HttClient比较难用,官方在Android2.3以后就不建议用了,并且在Android5.0以后废弃了HttpClient,在Android6.0更是删除了HttpClient。

HttpURLConnection是一种多用途、轻量极的HTTP客户端,使用它来进行HTTP操作可以适用于大多数的应用程序,但是在Android 2.2版本之前存在一些bug,所以官方建议在Android2.3以后替代HttpClient,Volley就是按版本分区使用这两个网络库。

然而随着开源届扛把子Square的崛起,OkHttp的开源,这两个网络库只能被淹没在历史洪流中。Android4.4以后HttpURLConnection的底层已经替换成OkHttp实现。OkHttp配合同样是Square开源的Retrofit,网络请求变得更简便,功能更强大。

OkHttp

OkHttp是一个现代,快速,高效的网络库,OkHttp 库的设计和实现的首要目标是高效。

  • 支持 HTTP/2和SPDY,这使得对同一个主机发出的所有请求都可以共享相同的套接字连接;
  • 如果 HTTP/2和SPDY不可用,OkHttp会使用连接池来复用连接以提高效率。
  • 支持Gzip降低传输内容的大小
  • 支持Http缓存
  • 会从很多常用的连接问题中自动恢复。如果服务器配置了多个IP地址,OkHttp 会自动重试一个主机的多个 IP 地址。
  • 使用Okio来大大简化数据的访问与存储,提高性能

简单使用

简单的异步请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder()
.url(url)
.build();
client.newCall(request).enqueue(new Callback() {
public void onFailure(Request request, IOException e) {
}
public void onResponse(Response response) throws IOException {
System.out.println(response.body().string());
}
});

使用非常的简答,发送请求,拿到异步结果。

OkHttpClient

跟下源码,OkHttpClient.newCall实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class OkHttpClient implements Cloneable, Call.Factory{
public static final class Builder {
Dispatcher dispatcher;
Proxy proxy;
List<Protocol> protocols;
List<ConnectionSpec> connectionSpecs;
final List<Interceptor> interceptors = new ArrayList<>();
final List<Interceptor> networkInterceptors = new ArrayList<>();
ProxySelector proxySelector;
CookieJar cookieJar;
Cache cache;
InternalCache internalCache;
SocketFactory socketFactory;
SSLSocketFactory sslSocketFactory;
CertificateChainCleaner certificateChainCleaner;
HostnameVerifier hostnameVerifier;
CertificatePinner certificatePinner;
Authenticator proxyAuthenticator;
Authenticator authenticator;
ConnectionPool connectionPool;
Dns dns;
boolean followSslRedirects;
boolean followRedirects;
boolean retryOnConnectionFailure;
int connectTimeout;
int readTimeout;
int writeTimeout;
}
...
...
@Override public Call newCall(Request request) {
return new RealCall(this, request);
...
...
}
}

OkHttpClient通过Builder实例化,实现了Call.Factory接口创建了一个RealCall的实例,而RealCall是Call接口的实现。

1
2
3
4
5
6
7
8
9
10
11
public interface Call {
Request request();
Response execute() throws IOException;
void enqueue(Callback responseCallback);
void cancel();
boolean isExecuted();
boolean isCanceled();
interface Factory {
Call newCall(Request request);
}
}

RealCall

RealCall中封装了OKHttpClient和Request

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
protected RealCall(OkHttpClient client, Request originalRequest) {
this.client = client;
this.originalRequest = originalRequest;
}
@Override public void enqueue(Callback responseCallback) {
enqueue(responseCallback, false);
}
void enqueue(Callback responseCallback, boolean forWebSocket) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
client.dispatcher().enqueue(new AsyncCall(responseCallback, forWebSocket));
}
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
private final boolean forWebSocket;
private AsyncCall(Callback responseCallback, boolean forWebSocket) {
super("OkHttp %s", redactedUrl().toString());
this.responseCallback = responseCallback;
this.forWebSocket = forWebSocket;
}
...
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain(forWebSocket);
if (canceled) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
//注意这一句代码
client.dispatcher().finished(this);
}
}
}

调用enqueue封装成AsyncCall交给OKHttpClient的dispatcher线程池执行。

Dispatcher线程池

OkHttp的dispatcher参数是直接new出来的。先看下enqueue方法,将AsyncCall当做参数传递进来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public final class Dispatcher {
/** 最大并发请求数为64 */
private int maxRequests = 64;
/** 每个主机最大请求数为5 */
private int maxRequestsPerHost = 5;
/** 线程池 */
private ExecutorService executorService;
/** 准备执行的请求 */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
/** 正在执行的异步请求,包含已经取消但未执行完的请求 */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
/** 正在执行的同步请求,包含已经取消单未执行完的请求 */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
}

构造一个线程池ExecutorService:

1
2
3
4
5
6
executorService = new ThreadPoolExecutor(
0, //corePoolSize 最小并发线程数,如果是0的话,空闲一段时间后所有线程将全部被销毁。
Integer.MAX_VALUE, //maximumPoolSize: 最大线程数,当任务进来时可以扩充的线程最大值,当大于了这个值就会根据丢弃处理机制来处理
60, //keepAliveTime: 当线程数大于corePoolSize时,多余的空闲线程的最大存活时间
TimeUnit.SECONDS,//单位秒
new SynchronousQueue<Runnable>(),//工作队列,先进先出 Util.threadFactory("OkHttp Dispatcher", false));//单个线程的工厂

构建了一个最大线程数为Integer.MAX_VALUE的线程池,也就是说,是个不设最大上限的线程池(其实有限制64个),有多少任务添加进来就新建多少线程,以保证I/O任务中高阻塞低占用的过程中,不会长时间卡在阻塞上。当工作完成后,线程池会在60s内相继关闭所有线程。

还记得刚才在AsyncCall.execute() finally中的内容吗

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
finally {
client.dispatcher().finished(this);
}
...
/** Used by {@code AsyncCall#run} to signal completion. */
synchronized void finished(AsyncCall call) {
if (!runningAsyncCalls.remove(call)) throw new AssertionError("AsyncCall wasn't running!");
promoteCalls();
}
//Dispatcher.java
private void promoteCalls() {
//超过阈值 返回
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}

当AsyncCall执行完成后,调用Disptcher的finish()方法,调用promoteCalls()方法,如果超过阈值,继续等待,否则取出缓存区的任务执行,顺序是先进先出。

Dispatcher线程池总结

  • 调度线程池Disptcher实现了高并发,低阻塞的实现
  • 采用Deque作为缓存,先进先出的顺序执行
  • 任务在try/finally中调用了finished函数,控制任务队列的执行顺序,而不是采用锁,减少了编码复杂性提高性能

Interceptor

调度基本整明白了,AsyncCall 中的execute具体内容还没有分析,主要就一行代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override protected void execute() {
boolean signalledCallback = false;
try {
...
Response response = getResponseWithInterceptorChain(forWebSocket);
...
} finally {
client.dispatcher().finished(this);
}
}
private Response getResponseWithInterceptorChain(boolean forWebSocket) throws IOException {
Interceptor.Chain chain = new ApplicationInterceptorChain(0, originalRequest, forWebSocket);
return chain.proceed(originalRequest);
}

从方法名字基本可以猜到是干嘛的,调用chain.proceed(originalRequest);将request传递进来,从拦截器链里拿到返回结果。那么拦截器Interceptor是干嘛的,Chain是干嘛的呢?继续往下看ApplicationInterceptorChain

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class ApplicationInterceptorChain implements Interceptor.Chain {
private final int index;
private final Request request;
ApplicationInterceptorChain(int index, Request request, boolean forWebSocket) {
this.index = index;
this.request = request;
this.forWebSocket = forWebSocket;
}
@Override public Connection connection() {
return null;
}
@Override public Request request() {
return request;
}
@Override public Response proceed(Request request) throws IOException {
// If there's another interceptor in the chain, call that.
if (index < client.interceptors().size()) {
Interceptor.Chain chain = new ApplicationInterceptorChain(index + 1, request, forWebSocket);
Interceptor interceptor = client.interceptors().get(index);
Response interceptedResponse = interceptor.intercept(chain);
if (interceptedResponse == null) {
throw new NullPointerException("application interceptor " + interceptor
+ " returned null");
}
return interceptedResponse;
}
// No more interceptors. Do HTTP.
return getResponse(request, forWebSocket);
}
}

ApplicationInterceptorChain实现了Interceptor.Chain接口,持有Request的引用。

1
2
3
4
5
6
7
8
9
10
11
public interface Interceptor {
Response intercept(Chain chain) throws IOException;
interface Chain {
Request request();
Response proceed(Request request) throws IOException;
Connection connection();
}
}

proceed方法中判断index(此时为0)是否小于client.interceptors(List )的大小,如果小于也就是说client.interceptors还有Interceptor,那么就再封装一个ApplicationInterceptorChain,只不过index + 1,然后取出第index个Interceptor将chain传递进去。传递进去干嘛呢?我们看一个用法,以实际项目为例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor(new RetrofitLogger());
interceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
OkHttpClient client = new OkHttpClient.Builder()
.addInterceptor(interceptor)
.retryOnConnectionFailure(true)
.connectTimeout(15, TimeUnit.SECONDS)
.addInterceptor(getCommonParameterInterceptor())
.addNetworkInterceptor(getTokenInterceptor())
.build();
@Override
protected Interceptor getCommonParameterInterceptor() {
return new Interceptor() {
@Override
public Response intercept(Chain chain) throws IOException {
Request originalRequest = chain.request();
Request request = originalRequest;
if (!originalRequest.method().equalsIgnoreCase("POST")) {
HttpUrl modifiedUrl = originalRequest.url().newBuilder()
.addQueryParameter("version_code", String.valueOf(AppUtils.getVersionCode()))
.addQueryParameter("app_key", "nicepro")
.addQueryParameter("app_device", "Android")
.addQueryParameter("app_version", AppUtils.getVersionName())
.addQueryParameter("token", AccountUtils.getToken())
.build();
request = originalRequest.newBuilder().url(modifiedUrl).build();
}
return chain.proceed(request);
}
};
}
@Override
protected Interceptor getTokenInterceptor() {
return new Interceptor() {
@Override
public Response intercept(Chain chain) throws IOException {
Request originalRequest = chain.request();
Request authorised = originalRequest.newBuilder()
.header("app-key", "nicepro")
.header("app-device", "Android")
.header("app-version", AppUtils.getVersionName())
.header("os", AppUtils.getOs())
.header("os-version", AppUtils.getAndroidVersion() + "")
.header("Accept", "application/json")
.header("User-Agent", "Android/retrofit")
.header("token", AccountUtils.getToken())
.build();
return chain.proceed(authorised);
}
};
}

可以看到每个Interceptor的intercept方法中做了一些操作后,最后都会调用chain.proceed(request)方法,而这个chain就是每次prceed方法中生成的ApplicationInterceptorChain,用index+1的方式递归调用OkHttClient中的Interceptors,进行拦截操作,比如可以用来监控log,修改请求,修改结果,供开发者自定义参数添加等等,然后最终调用的还是最初的index=0的那个chain的proceed方法中的getResponse(request, forWebSocket);

可以说OkHttp是用chain串联起拦截器,而每个拦截器都有能力返回Response,返回Response即终止整个调用链,这种设计模式称为责任链模式。这种模式为OkHttp提供了强大的装配能力,极大的提高了OkHttp的扩展性和可维护性。

在Android系统中最典型的责任链模式就是View的Touch传递机制,一层一层传递直到被消费。

官方的一张图就能很好的解释Interceptor

整个流程很清晰。这种设计真是太棒了,值得学习!

连接池复用

我们知道进行一次tcp网络请求,一般要三次握手连接,四次握手断开连接。一次完整的http请求过程见下图。

如果请求重复的地址,那么重复的连接和断开连接就成了延长整个时间的的重要因素,特别是在复杂的网络环境下,每次请求传输数据的大小将不再是请求速度的决定性因素。

http有一种keepalive connections的机制,可以在传输后仍然保持连接,当客户端需要再次获取数据时,直接使用刚刚空闲下来的连接而不需要再次握手。

Okhttp支持5个并发KeepAlive,默认链路生命为5分钟(链路空闲后,保持存活的时间),关于OkHttp连接池复用详细请看这篇文章 OkHttp3源码分析[复用连接池]

DNS解析

对比上一张图的一次完整的Http请求,在复杂的天朝网络环境下,相信大多数开发者都碰到过很奇怪的网络问题,比如运营商动态插入辣鸡html代码嵌入广告,比如运营商缓存请求数据导致用户请求到的数据不是最新的问题,比如某些运营商只支持put\post请求,而不支持delete请求,比如运营商。。。这些问题大部分都跟DNS相关。

为了解决DNS劫持的问题,我们在薄荷app上做了很多优化工作,比如使用HTTP DNS(我们使用的DNSPod)代替系统自带的libc库去查询运营商的DNS服务器,直接拿到IP地址进行IP直连,其中又做了一些缓存和选择最优IP的一些操作。解决掉了很大一部分用户反馈的网络问题。

而在OkHttp中,可以直接配置DNS,默认是系统自带的Dns.SYSTEM

1
2
3
4
5
6
// Try each address for best behavior in mixed IPv4/IPv6 environments.
List<InetAddress> addresses = address.dns().lookup(socketHost);
for (int i = 0, size = addresses.size(); i < size; i++) {
InetAddress inetAddress = addresses.get(i);
inetSocketAddresses.add(new InetSocketAddress(inetAddress, socketPort));
}

注意结果是数组,即一个域名可能会有多个IP,如果一个IP不通,会自动重连下一个IP。

开发者就可以新建一个Dns类复写lookup方法通过HTTP DNS请求IP地址,其中新建一个HttpDNSClient来请求DNS,插入拦截器来配置缓存时间,容错处理等等,然后在构建OkHttpClient时加入dns方法即可。

1
2
3
4
5
client = new OkHttpClient.Builder().addNetworkInterceptor(getLogger())
.dispatcher(getDispatcher())
//配置DNS查询实现
.dns(HTTP_DNS)
.build();

这样的全局HTTP DNS解析真是足够简单高效,并且完全是无侵入性的,丝毫不影响正常的网络请求。

总结

本文基本讲了下OkHttp3的大概流程,Interceptor的基本原理,DNS的可选配置等。涉及到socket和Okio流相关的都没有讲到,有兴趣的读者可以在参考文章自行搜索。总结来说,OkHttp基本可以满足日常开发的需求,并且性能足够强大,配合Retrofit + Rxjava更是效率翻倍。如果你在开发新的项目,强烈建议你扔掉Volley,拥抱Retrofit。

参考

本文链接: http://w4lle.com/2016/12/06/OkHttp/

版权声明:本文为 w4lle 原创文章,可以随意转载,但必须在明确位置注明出处!
本文链接: http://w4lle.com/2016/12/06/OkHttp/