1. OkHttp的使用
/**
* OkHttp测试类
* @author LTP 2023/6/29
*/
class OkHttpTest {
/** 创建OkHttpClient对象 */
private val okHttpClient = OkHttpClient.Builder().build()
/** 创建Request对象 */
private val request = Request.Builder().url("http://www.baidu.com").build()
/** 创建RealCall对象(Call的唯一接口实现类) */
private val realCall = okHttpClient.newCall(request)
/**
* 同步请求
*/
@Test
fun getRequestWithSync() {
// 使用同步请求,realCall执行
realCall.execute().use {
print(it.body?.string())
}
}
/**
* 异步请求
*/
@Test
fun getRequestWithASync() {
// 使用异步请求,将realCall加入请求队列
realCall.enqueue(object : Callback {
override fun onFailure(call: Call, e: IOException) {
e.printStackTrace()
}
override fun onResponse(call: Call, response: Response) {
print(response.body?.string() ?: "123")
}
})
// 得阻塞一会儿线程不让其结束以拿到请求结果
Thread.sleep(2000)
}
}
- 主要步骤有
- 创建OkHttpClient对象
- 创建Request对象
- 创建RealCall对象
- 开始发起请求,
enqueue
为异步请求,execute
为同步请求
2. 请求过程
- 源码基于OkHttp V4.11.0(直接下载到本地引入lib,BTPJ/OkHttpStudy)
2.1 OkHttpClient对象的创建
OkHttpClient() // 方法1
OkHttpClient.Builder().build() // 方法2
OkHttpClient().newBuilder().build() // 方法3
- 三种方式本质上都是一样的,这里以方法2为例
- 从创建方式来猜测就是一个典型的建造者模式,OkHttpClient.Builder()创建了一个okHttpClient的内部Builder类对象,里面初始化了一系列的成员变量,着重分析一下三个
/***
* 建造者模式
*/
class Builder constructor() {
/** 调度器 */
internal var dispatcher: Dispatcher = Dispatcher()
/** 拦截器集合 */
internal val interceptors: MutableList<Interceptor> = mutableListOf()
/** 网络拦截器集合 */
internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()
- 接着使用Builder对象调用了build()方法
/** 调用OkHttpClient构造创建OkHttpClient对象 */
fun build(): OkHttpClient = OkHttpClient(this)
- 其实就是调用的OkHttpClient(builder:Builder)构造
open class OkHttpClient internal constructor(
builder: Builder
) : Cloneable, Call.Factory, WebSocket.Factory {
@get:JvmName("dispatcher")
val dispatcher: Dispatcher = builder.dispatcher
@get:JvmName("interceptors")
val interceptors: List<Interceptor> =
builder.interceptors.toImmutableList()
@get:JvmName("networkInterceptors")
val networkInterceptors: List<Interceptor> =
builder.networkInterceptors.toImmutableList()
- 其实就是典型的建造者模式,通过
Builder
创建的调度器、拦截器等成员变量最终都会赋值给OkHttpClient
2.2 Request对象的创建
Request.Builder().url("http://www.baidu.com").build()
- 也是个建造者模式…
open class Builder {
// 请求地址
internal var url: HttpUrl? = null
// 请求方法(例如Get、Post)
internal var method: String
// 请求头Header集合
internal var headers: Headers.Builder
// 请求体Body
internal var body: RequestBody? = null
/** A mutable map of tags, or an immutable empty map if we don't have any. */
internal var tags: MutableMap<Class<*>, Any> = mutableMapOf()
constructor() {
this.method = "GET"
this.headers = Headers.Builder()
}
- 看下build()方法
open fun build(): Request {
return Request(
checkNotNull(url) { "url == null" },
method,
headers.build(),
body,
tags.toImmutableMap()
)
}
- 其实就是创建了一个Request对象,并把Bulder对象的成员属性赋值给Request
2.3 创建Call对象
okHttpClient.newCall(request)
- 看下newCall
override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)
- 返回了Call接口的唯一实现类RealCall
class RealCall(
val client: OkHttpClient,
val originalRequest: Request,
val forWebSocket: Boolean
) : Call {
...
- 看下Call接口具体定义了哪些方法
/** Call.kt
* 调用准备好的执行请求。请求可以被取消。由于此对象表示单个请求/响应对(流),因此不能执行两次
*/
interface Call : Cloneable {
/** 返回启动此调用的原始请求. */
fun request(): Request
/** 同步请求(请求会被立即调用,一直阻塞线程直至请求被响应或请求出错) */
@Throws(IOException::class)
fun execute(): Response
/**
* 异步请求,安排在将来某个时间点执行的请求,具体调度者是OkHttpClient.dispatcher调度器,
* 调度请求返回到Callback中
*/
fun enqueue(responseCallback: Callback)
/** 取消请求(已经完成的请求无法被取消) */
fun cancel()
/** 请求是否被执行(包含executed和enqueued),多次执行该方法是错误的 */
fun isExecuted(): Boolean
/** 请求是否被取消 */
fun isCanceled(): Boolean
/** 请求超时时间 */
fun timeout(): Timeout
/** 克隆一个新Call(无论旧Call是否已被执行) */
public override fun clone(): Call
/** 对象创建工厂 */
fun interface Factory {
fun newCall(request: Request): Call
}
}
- RealCall对Call中定义的这些方法进行了实现,具体可看RealCall的实现逻辑,这里就不贴出来了
2.4 发起请求
- 先分析下发起异步请求
// RealCall.kt
override fun enqueue(responseCallback: Callback) {
// 检测是否已被执行过,执行过抛异常
check(executed.compareAndSet(false, true)) { "Already Executed" }
// 事件监听回调相关,监听请求发起事件
callStart()
// 将请求交给调度器,调度器决定什么时候开始请求
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
private fun callStart() {
this.callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()")
// 事件监听回调
eventListener.callStart(this)
}
- 真正的逻辑在Dispatcher中
/** Dispatcher.kt
* 何时执行异步请求的调度器,内部默认是使用ExecutorService实现的
*/
class Dispatcher constructor() {
/** 同时执行的最大请求数,超过的请求会在内存中排队等待 */
@get:Synchronized var maxRequests = 64
set(maxRequests) {
require(maxRequests >= 1) { "max < 1: $maxRequests" }
synchronized(this) {
field = maxRequests
}
promoteAndExecute()
}
/** 每台主机可并发执行的最大请求数(通过URL的主机名) */
@get:Synchronized var maxRequestsPerHost = 5
set(maxRequestsPerHost) {
require(maxRequestsPerHost >= 1) { "max < 1: $maxRequestsPerHost" }
synchronized(this) {
field = maxRequestsPerHost
}
promoteAndExecute()
}
/** 请求队列空闲回调 */
@set:Synchronized
@get:Synchronized
var idleCallback: Runnable? = null
private var executorServiceOrNull: ExecutorService? = null
/** 线程池(懒汉式保证单例) */
@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
}
return executorServiceOrNull!!
}
/** 待发送到请求队列的异步请求 */
private val readyAsyncCalls = ArrayDeque<AsyncCall>()
/** 异步请求队列 */
private val runningAsyncCalls = ArrayDeque<AsyncCall>()
/** 同步请求队列 */
private val runningSyncCalls = ArrayDeque<RealCall>()
...
- 看下
dispatcher.enqueue
方法
/** Dispatcher.kt
* 加入队列
*/
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
// 添加进准备发送的集合
readyAsyncCalls.add(call)
// 修改AsyncCall,使其共享对同一主机的现有运行调用的AtomicInteger
if (!call.call.forWebSocket) {
val existingCall = findExistingCallWithHost(call.host)
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
// 处理队列移动并按序执行
promoteAndExecute()
}
/**Dispatcher.kt
* 将符合条件的call从readyAsyncCalls(准备发送的队列)添加到runningAsyncCalls(异步发送队列)中,
* 并在executor服务上运行它们。不能在同步的情况下调用,因为执行调用可能会调用用户代码
*
* @return true if the dispatcher is currently running calls. 如果调度程序当前正在运行,则为true
*/
private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()
// 正在运行的请求
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
// 超过最大请求数64,跳出整个循环
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
// 单服务器超过最大请求数,跳出当前单个循环
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
// 从请求准备队列中移除
i.remove()
asyncCall.callsPerHost.incrementAndGet()
executableCalls.add(asyncCall)
// 加入异步请求队列中
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
// 把请求任务交给线程池
asyncCall.executeOn(executorService)
}
return isRunning
}
/** RealCall.kt
* 尝试将此异步调用加入[executorService]的队列。如果执行器已关闭,则会尝试调用失败回调来进行清理
*/
fun executeOn(executorService: ExecutorService) {
client.dispatcher.assertThreadDoesntHoldLock()
var success = false
try {
// 使用线程执行
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
noMoreExchanges(ioException)
// 执行失败的回调调用
responseCallback.onFailure(this@RealCall, ioException)
} finally {
if (!success) {
// 发送请求完毕
client.dispatcher.finished(this) // This call is no longer running!
}
}
}
- 最终是交给线程池executorService来处理的,executorService.execute方法接收的是一个Runnable实现类,这个类就是
AsyncCall
,它也是RealCall的非静态内部类,持有对RealCall的引用
/**
* RealCall.kt
* Runnable实现类,会被加入到线程池执行
*/
internal inner class AsyncCall(
private val responseCallback: Callback
) : Runnable {
...
val host: String
get() = originalRequest.url.host
val request: Request
get() = originalRequest
val call: RealCall
get() = this@RealCall
/**
* 尝试将此异步调用加入[executorService]的队列。如果执行器已关闭,则会尝试调用失败回调来进行清理
*/
fun executeOn(executorService: ExecutorService) {
...
}
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
timeout.enter()
try {
// 发送请求,经过一系列的拦截器链最终返回响应结果
val response = getResponseWithInterceptorChain()
signalledCallback = true
// 回调请求成功的结果
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
} else {
responseCallback.onFailure(this@RealCall, e)
}
} catch (t: Throwable) {
// 失败后取消请求
cancel()
if (!signalledCallback) {
val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
// 请求异常,回调请求失败的结果
responseCallback.onFailure(this@RealCall, canceledException)
}
throw t
} finally {
// 发送请求完毕
client.dispatcher.finished(this)
}
}
}
}
- 看下请求完毕的方法
client.dispatcher.finished(this)
// Dispatcher.kt
/** 被[AsyncCall.run]调用表示请求完成(异步) */
internal fun finished(call: AsyncCall) {
call.callsPerHost.decrementAndGet()
finished(runningAsyncCalls, call)
}
/** 被[Call.execute]调用表示请求完成(同步) */
internal fun finished(call: RealCall) {
finished(runningSyncCalls, call)
}
/** 请求完毕的逻辑处理(将当前请求从请求队列移除、继续执行下一个请求、判断是否回调闲置Callback) */
private fun <T> finished(calls: Deque<T>, call: T) {
val idleCallback: Runnable?
synchronized(this) {
// 从请求任务队列中移除
if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
idleCallback = this.idleCallback
}
// 继续将等待队列中的请求移入异步队列,并交由线程池执行
val isRunning = promoteAndExecute()
// 如果没有请求需要被执行,回调请求队列闲置的callback
if (!isRunning && idleCallback != null) {
idleCallback.run()
}
}
- 简单分析下同步请求
realCall.execute()
// RealCall.kt
/** 同步请求 */
override fun execute(): Response {
check(executed.compareAndSet(false, true)) { "Already Executed" }
timeout.enter()
callStart()
try {
// 直接交由dispatcher执行
client.dispatcher.executed(this)
// 通过getResponseWithInterceptorChain返回请求响应
return getResponseWithInterceptorChain()
} finally {
// 发送请求完毕
client.dispatcher.finished(this)
}
}
// Dispatcher.kt
/** 被 [Call.execute] 调用表示正在执行请求. */
@Synchronized
internal fun executed(call: RealCall) {
// 直接加入请求队列
runningSyncCalls.add(call)
}
- 可见同步请求会将请求Call加入到同步请求队列,并直接调用
getResponseWithInterceptorChain()
返回响应结果 - 来张图对整个流程做个总结
3. 拦截器链
- 上面知道返回响应是通过
getResponseWithInterceptorChain()
这个方法的,看下源码
// RealCall.kt
@Throws(IOException::class)
internal fun getResponseWithInterceptorChain(): Response {
// 构建一个完整的拦截器链
val interceptors = mutableListOf<Interceptor>()
// 用户自定义的拦截器(全局拦截器,可以在任何请求过程中使用)
interceptors += client.interceptors
// 重试和重定向拦截器
interceptors += RetryAndFollowUpInterceptor(client)
// 桥接拦截器,主要负责请求和响应的转换
interceptors += BridgeInterceptor(client.cookieJar)
// 缓存拦截器,主要负责缓存的相关处理
interceptors += CacheInterceptor(client.cache)
// 连接拦截器,主要负责建立连接,建立 TCP 连接或者 TLS 连接
interceptors += ConnectInterceptor
if (!forWebSocket) {
// 用户自定义的网络拦截器(针对特定类型的请求使用的拦截器)
interceptors += client.networkInterceptors
}
// 调用服务器拦截器,主要负责网络数据的请求和响应,也就是实际的网络I/O操作
interceptors += CallServerInterceptor(forWebSocket)
// 构造具体的拦截器链
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)
var calledNoMoreExchanges = false
try {
// 拦截器链的执行关键,拦截器的层层调用、层层返回
val response = chain.proceed(originalRequest)
if (isCanceled()) {
response.closeQuietly()
throw IOException("Canceled")
}
// 返回响应结果
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
noMoreExchanges(null)
}
}
}
getResponseWithInterceptorChain
方法是整个OkHttp实现责任链模式的核心,在这个方法中除了众多拦截器是重点之外还有chain.proceed
,它利用了责任链模式进行层层调用proceed,层层返回response
// RealInterceptorChain.kt
@Throws(IOException::class)
override fun proceed(request: Request): Response {
...
// 创建并调用拦截器链中的下一个拦截器的intercept方法
val next = copy(index = index + 1, request = request)
val interceptor = interceptors[index]
@Suppress("USELESS_ELVIS")
// 调用下一个拦截器的intercept方法(intercept内部还会调用proceed)
val response = interceptor.intercept(next) ?: throw NullPointerException(
"interceptor $interceptor returned null")
...
return response
}
- 从源码得知,Chain 是用来描述责任链的,并通过其中的 process 方法开始依次执行链上的每个节点,并返回处理后的 Response。 Chain 的唯一实现为 RealInterceptorChain,可以称之为拦截器责任链,其中的节点由 RealCall 中添加进来的 Interceptor 们组成
- Interceptor 与 Chain 彼此互相依赖,互相调用,形成了一个完美的调用链,下面是大致的调用关系链
3.1 RetryAndFollowUpInterceptor
- 除去自定义拦截器之外第一个执行的拦截器。主要作用是
进行请求的重试与重定向
(还创建了一个连接管理池)。在某些情况下,当网络请求失败时,这个拦截器可以提供一种机制,尝试重新连接到网络,并再次发送请求。这个过程可以帮助应用程序在出现短暂的网络问题时保持连接,提高了系统的可靠性和稳定性。
// RetryAndFollowUpInterceptor.kt
class RetryAndFollowUpInterceptor(private val client: OkHttpClient) : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
var request = chain.request
val call = realChain.call
var followUpCount = 0
var priorResponse: Response? = null
var newExchangeFinder = true
var recoveredFailures = listOf<IOException>()
while (true) {
// newExchangeFinder传true,enterNetworkInterceptorExchange方法会创建一个新的
// ExchangeFinder对象(管理链接池)
call.enterNetworkInterceptorExchange(request, newExchangeFinder)
var response: Response
var closeActiveExchange = true
try {
if (call.isCanceled()) {
throw IOException("Canceled")
}
try {
// 责任链模式,让下一个拦截器处理
response = realChain.proceed(request)
newExchangeFinder = true
} catch (e: RouteException) {
...
val exchange = call.interceptorScopedExchange
// 计算出接收[userResponse]时要发出的HTTP请求。
// 这将添加身份验证标头、遵循重定向或处理客户端请求超时。如果后续操作不必要或不适用,则返回null
val followUp = followUpRequest(response, exchange)
// 获取到的request为空则直接返回response
if (followUp == null) {
if (exchange != null && exchange.isDuplex) {
call.timeoutEarlyExit()
}
closeActiveExchange = false
return response
}
val followUpBody = followUp.body
if (followUpBody != null && followUpBody.isOneShot()) {
closeActiveExchange = false
return response
}
response.body?.closeQuietly()
// 超过最大重试或者重定向次数则抛出异常
if (++followUpCount > MAX_FOLLOW_UPS) {
throw ProtocolException("Too many follow-up requests: $followUpCount")
}
// 返回的request若不为空则重新赋值后继续发起请求
request = followUp
priorResponse = response
} finally {
call.exitNetworkInterceptorExchange(closeActiveExchange)
}
}
}
...
/**
* 计算出接收[userResponse]时要发出的HTTP请求。
* 这将添加身份验证标头、遵循重定向或处理客户端请求超时。如果后续操作不必要或不适用,则返回null
*/
@Throws(IOException::class)
private fun followUpRequest(userResponse: Response, exchange: Exchange?): Request? {
val route = exchange?.connection?.route()
val responseCode = userResponse.code
val method = userResponse.request.method
when (responseCode) {
HTTP_PROXY_AUTH -> {
// 407异常处理
...
}
HTTP_UNAUTHORIZED -> return client.authenticator.authenticate(route, userResponse)
HTTP_PERM_REDIRECT, HTTP_TEMP_REDIRECT, HTTP_MULT_CHOICE, HTTP_MOVED_PERM, HTTP_MOVED_TEMP, HTTP_SEE_OTHER -> {
// 300、301、302、303、307、308,重定向请求处理
return buildRedirectRequest(userResponse, method)
}
HTTP_CLIENT_TIMEOUT -> {
// 408异常处理
...
}
HTTP_UNAVAILABLE -> {
// 503异常处理
...
}
HTTP_MISDIRECTED_REQUEST -> {
// 421异常处理
...
}
else -> return null
}
}
/** 构建重定向请求 */
private fun buildRedirectRequest(userResponse: Response, method: String): Request? {
...
// 大多数重定向不包括请求正文
val requestBuilder = userResponse.request.newBuilder()
if (HttpMethod.permitsRequestBody(method)) {
val responseCode = userResponse.code
val maintainBody = HttpMethod.redirectsWithBody(method) ||
responseCode == HTTP_PERM_REDIRECT ||
responseCode == HTTP_TEMP_REDIRECT
if (HttpMethod.redirectsToGet(method) && responseCode != HTTP_PERM_REDIRECT && responseCode != HTTP_TEMP_REDIRECT) {
requestBuilder.method("GET", null)
} else {
val requestBody = if (maintainBody) userResponse.request.body else null
requestBuilder.method(method, requestBody)
}
if (!maintainBody) {
requestBuilder.removeHeader("Transfer-Encoding")
requestBuilder.removeHeader("Content-Length")
requestBuilder.removeHeader("Content-Type")
}
}
// 跨主机重定向时,需要删除所有身份验证标头(应用层无法保留它们)
if (!userResponse.request.url.canReuseConnectionFor(url)) {
requestBuilder.removeHeader("Authorization")
}
return requestBuilder.url(url).build()
}
companion object {
/**
* 重试或者重定向的次数
*/
private const val MAX_FOLLOW_UPS = 20
}
}
3.2 BridgeInterceptor
- 桥接拦截器,主要负责请求和响应的转换。把用户构造的 request 对象转换成发送到服务器 request对象,并把服务器返回的响应转换为对用户友好的响应。
// BridgeInterceptor.kt
/**
* 从应用程序代码到网络代码的桥接。
* 首先,它根据用户请求构建一个网络请求。然后它继续呼叫网络。最后,它根据网络响应构建用户响应
*/
class BridgeInterceptor(private val cookieJar: CookieJar) : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
// 请求阶段:BridgeInterceptor负责将用户构建的Request请求转化为能够进行网络访问的请求。
// 这个过程可能包括一些对请求头部的处理,例如设置请求头信息,如Content-Type,Content-Length等
val userRequest = chain.request()
val requestBuilder = userRequest.newBuilder()
val body = userRequest.body
if (body != null) {
val contentType = body.contentType()
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString())
}
val contentLength = body.contentLength()
if (contentLength != -1L) {
requestBuilder.header("Content-Length", contentLength.toString())
requestBuilder.removeHeader("Transfer-Encoding")
} else {
requestBuilder.header("Transfer-Encoding", "chunked")
requestBuilder.removeHeader("Content-Length")
}
}
if (userRequest.header("Host") == null) {
requestBuilder.header("Host", userRequest.url.toHostHeader())
}
if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive")
}
// 如果添加了“Accept-Encoding:gzip”的Header,拦截器还负责解压缩传输流
var transparentGzip = false
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true
requestBuilder.header("Accept-Encoding", "gzip")
}
val cookies = cookieJar.loadForRequest(userRequest.url)
if (cookies.isNotEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies))
}
if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", userAgent)
}
// 返回阶段:当网络请求返回响应时,BridgeInterceptor会将网络请求返回的Response转化为用户可用的Response。
// 这个过程可能包括对返回数据进行解析,处理响应头信息,或者将处理后的响应返回给用户
val networkResponse = chain.proceed(requestBuilder.build())
cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)
val responseBuilder = networkResponse.newBuilder()
.request(userRequest)
if (transparentGzip &&
"gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&
networkResponse.promisesBody()
) {
val responseBody = networkResponse.body
if (responseBody != null) {
val gzipSource = GzipSource(responseBody.source())
val strippedHeaders = networkResponse.headers.newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build()
responseBuilder.headers(strippedHeaders)
val contentType = networkResponse.header("Content-Type")
responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))
}
}
return responseBuilder.build()
}
3.3 CacheInterceptor
- 缓存拦截器,主要
负责缓存的读取与写入
,将 Http 的请求结果放到到缓存中,以便在下次进行相同的请求时,直接从缓存中读取结果,提高响应速度。
// CacheInterceptor.kt
/**
* 主要负责缓存的读取和写入,
* 将 Http 的请求结果放到到缓存中,以便在下次进行相同的请求时,直接从缓存中读取结果,提高响应速度
*/
class CacheInterceptor(internal val cache: Cache?) : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val call = chain.call()
// 获取候选缓存
val cacheCandidate = cache?.get(chain.request())
val now = System.currentTimeMillis()
// 缓存策略,请求是使用网络、缓存还是两者兼用
val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
val networkRequest = strategy.networkRequest
val cacheResponse = strategy.cacheResponse
cache?.trackResponse(strategy)
val listener = (call as? RealCall)?.eventListener ?: EventListener.NONE
if (cacheCandidate != null && cacheResponse == null) {
// 候选缓存不适用,直接关闭
cacheCandidate.body?.closeQuietly()
}
// 如果我们被禁止使用网络,并且缓存不足,抛出异常504。
if (networkRequest == null && cacheResponse == null) {
return Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(HTTP_GATEWAY_TIMEOUT)
.message("Unsatisfiable Request (only-if-cached)")
.body(EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build().also {
listener.satisfactionFailure(call, it)
}
}
// 如果不需要网络则直接返回缓存即可
if (networkRequest == null) {
// 返回缓存请求结果
return cacheResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build().also {
listener.cacheHit(call, it)
}
}
if (cacheResponse != null) {
listener.cacheConditionalHit(call, cacheResponse)
} else if (cache != null) {
listener.cacheMiss(call)
}
var networkResponse: Response? = null
try {
// 责任链,让下一个拦截器处理
networkResponse = chain.proceed(networkRequest)
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
cacheCandidate.body?.closeQuietly()
}
}
// 有缓存响应
if (cacheResponse != null) {
// 服务器返回状态码为304则返回缓存结果
if (networkResponse?.code == HTTP_NOT_MODIFIED) {
val response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers, networkResponse.headers))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis)
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
networkResponse.body!!.close()
// 在合并标头之后但在剥离Content-Encoding标头之前更新缓存
cache!!.trackConditionalCacheHit()
cache.update(cacheResponse, response)
return response.also {
listener.cacheHit(call, it)
}
} else {
cacheResponse.body?.closeQuietly()
}
}
// 读取网络请求结果
val response = networkResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
if (cache != null) {
if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
// 将网络请求结果加入缓存
val cacheRequest = cache.put(response)
return cacheWritingResponse(cacheRequest, response).also {
if (cacheResponse != null) {
listener.cacheMiss(call)
}
}
}
// 缓存失效时需要清除
if (HttpMethod.invalidatesCache(networkRequest.method)) {
try {
cache.remove(networkRequest)
} catch (_: IOException) {
}
}
}
// 返回网络请求结果
return response
}
3.4 ConnectInterceptor
- 连接拦截器,主要
负责建立连接,建立 TCP 连接或者 TLS 连接
// ConnectInterceptor.kt
/**
* 打开与目标服务器的连接,然后转到下一个拦截器。网络可能用于返回的响应,或者使用条件GET验证缓存的响应
*/
object ConnectInterceptor : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
// 查找新连接或池连接以承载即将到来的请求和响应
val exchange = realChain.call.initExchange(chain)
val connectedChain = realChain.copy(exchange = exchange)
// 让下一层拦截器处理,同时将exchange一起传递过去
return connectedChain.proceed(realChain.request)
}
}
- 建立连接的部分核心代码
// realCall.kt
/** 查找新连接或池连接以承载即将到来的请求和响应 */
internal fun initExchange(chain: RealInterceptorChain): Exchange {
...
val codec = exchangeFinder.find(client, chain)
val result = Exchange(this, eventListener, exchangeFinder, codec)
return result
}
// ExchangeFinder.kt
fun find(
client: OkHttpClient,
chain: RealInterceptorChain
): ExchangeCodec {
try {
val resultConnection = findHealthyConnection(
connectTimeout = chain.connectTimeoutMillis,
readTimeout = chain.readTimeoutMillis,
writeTimeout = chain.writeTimeoutMillis,
pingIntervalMillis = client.pingIntervalMillis,
connectionRetryEnabled = client.retryOnConnectionFailure,
doExtensiveHealthChecks = chain.request.method != "GET"
)
return resultConnection.newCodec(client, chain)
...
}
// ExchangeFinder.kt
/** 查找可用的链接,如果不可用则一直重复查找,知道找到为止 */
@Throws(IOException::class)
private fun findHealthyConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean,
doExtensiveHealthChecks: Boolean
): RealConnection {
while (true) {
val candidate = findConnection(
connectTimeout = connectTimeout,
readTimeout = readTimeout,
writeTimeout = writeTimeout,
pingIntervalMillis = pingIntervalMillis,
connectionRetryEnabled = connectionRetryEnabled
)
// 确保连接可用
if (candidate.isHealthy(doExtensiveHealthChecks)) {
return candidate
}
...
}
}
// ExchangeFinder.kt
/** 获取连接,先是获取已经存在的进行重用,没有从池中取,再没有就创建一个新连接 */
@Throws(IOException::class)
private fun findConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean
): RealConnection {
...
// 如果连接还没有被释放则会重用。这里没有调用connectionAcquired()是由于我们之前已经获取了
if (call.connection != null) {
check(toClose == null)
return callConnection
}
...
// 能从连接池拿到连接直接返回
if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
val result = call.connection!!
eventListener.connectionAcquired(call, result)
return result
}
// 连接池中无连接时,创建新连接并添加到连接池
...
val newConnection = RealConnection(connectionPool, route)
call.connectionToCancel = newConnection
try {
// 使用新连接来进行服务器连接
newConnection.connect(
connectTimeout,
readTimeout,
writeTimeout,
pingIntervalMillis,
connectionRetryEnabled,
call,
eventListener
)
} finally {
...
synchronized(newConnection) {
// 新连接添加到连接池
connectionPool.put(newConnection)
call.acquireConnectionNoEvents(newConnection)
}
eventListener.connectionAcquired(call, newConnection)
return newConnection
}
- 从
exchangeFinder.find
开始到exchangeFinder.findConnection
只做了一件事,就是先尝试重用连接,如果不能重用就从连接池中取出一个新的连接,如果无法取出就直接创建一个新的连接并添加到连接池中。 - 看看服务器是怎么连接的,看这行代码
newConnection.connect
// RealConnection.kt
fun connect(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean,
call: Call,
eventListener: EventListener
) {
...
while (true) {
try {
if (route.requiresTunnel()) {
...
} else {
// 通过Socket构建完整HTTP或HTTPS连接
connectSocket(connectTimeout, readTimeout, call, eventListener)
...
}
// RealConnection.kt
/** 通过Socket来构建完整HTTP或HTTPS连接 */
@Throws(IOException::class)
private fun connectSocket(
connectTimeout: Int,
readTimeout: Int,
call: Call,
eventListener: EventListener
) {
...
try {
Platform.get().connectSocket(rawSocket, route.socketAddress, connectTimeout)
} catch (e: ConnectException) {
...
try {
// okio的接口,用于输入,类似InputStream
source = rawSocket.source().buffer()
//okio的接口,用于输出,类似OutputStream
sink = rawSocket.sink().buffer()
}
...
}
- 主要是创建了一个
socket
对象然后使用socket
建立连接,利用okio
的输入输出接口获取输入/输出流
3.5 CallServerInterceptor
- 主要负责网络数据的请求和响应,也就是实际的网络I/O操作。将请求头与请求体发送给服务器,以及解析服务器返回的response
// CallServerInterceptor.kt
/** 最后一个拦截器,实现对服务器的网络调用 */
class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.exchange!!
val request = realChain.request
val requestBody = request.body
val sentRequestMillis = System.currentTimeMillis()
var invokeStartEvent = true
var responseBuilder: Response.Builder? = null
var sendRequestException: IOException? = null
try {
exchange.writeRequestHeaders(request)
// 如果不是GET/HEAD请求
if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
// If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
// Continue" response before transmitting the request body. If we don't get that, return
// what we did get (such as a 4xx response) without ever transmitting the request body.
if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
exchange.flushRequest()
responseBuilder = exchange.readResponseHeaders(expectContinue = true)
exchange.responseHeadersStart()
invokeStartEvent = false
}
if (responseBuilder == null) {
if (requestBody.isDuplex()) {
// Prepare a duplex body so that the application can send a request body later.
// 准备双工正文,以便应用程序稍后可以发送请求正文
exchange.flushRequest()
val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
requestBody.writeTo(bufferedRequestBody)
} else {
// Write the request body if the "Expect: 100-continue" expectation was met.
val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
requestBody.writeTo(bufferedRequestBody)
bufferedRequestBody.close()
}
} else {
exchange.noRequestBody()
if (!exchange.connection.isMultiplexed) {
// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
// from being reused. Otherwise we're still obligated to transmit the request body to
// leave the connection in a consistent state.
exchange.noNewExchangesOnConnection()
}
}
} else {
exchange.noRequestBody()
}
if (requestBody == null || !requestBody.isDuplex()) {
exchange.finishRequest()
}
} catch (e: IOException) {
if (e is ConnectionShutdownException) {
throw e // No request was sent so there's no response to read.
}
if (!exchange.hasFailure) {
throw e // Don't attempt to read the response; we failed to send the request.
}
sendRequestException = e
}
try {
if (responseBuilder == null) {
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
if (invokeStartEvent) {
exchange.responseHeadersStart()
invokeStartEvent = false
}
}
var response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
var code = response.code
if (shouldIgnoreAndWaitForRealResponse(code)) {
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
if (invokeStartEvent) {
exchange.responseHeadersStart()
}
response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
code = response.code
}
exchange.responseHeadersEnd(response)
response = if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response.newBuilder()
.body(EMPTY_RESPONSE)
.build()
} else {
response.newBuilder()
.body(exchange.openResponseBody(response))
.build()
}
...
return response
...
}
3.6 拦截器总结
client.interceptors
:用户自定义的拦截器,会在所有的拦截器处理之前进行最早的拦截处理,可用于添加一些公共参数,如自定义 header、自定义 log 等等。RetryAndFollowUpIntercept
:重试和重定向拦截器,重试或者重定向的次数不能大于20次,同时它还创建了一个ExchangeFinder
对象用于管理连接池为后续的连接做准备BridgeInterceptor
:桥接拦截器,主要负责请求和响应的转换。补充请求头,把用户请求转换成网络请求,网络响应转换成用户可以接收的响应,同时还需要注意的一点是如果用户手动添加了Accept-Encoding
那就需要处理解压操作CacheInterceptor
:缓存拦截器,主要负责缓存的读取与写入,将 Http 的请求结果放到到缓存中,以便在下次进行相同的请求时,直接从缓存中读取结果,提高响应速度(内部是通过okio
来处理缓存的)ConnectInterceptor
:连接拦截器,负责建立连接的。最终是通过RealConnection
对象建立socket
连接的并且获得了输入输出流为下一步读写做准备,RealConnection
对象的获取时优先复用的,如果无法复用则从连接池中获取,如果无法获取则创建一个新的连接并将其放入连接池中client.networkInterceptors
:用户自定义的网络拦截器(针对特定类型的请求使用的拦截器)CallServerInterceptor
:调用服务器拦截器,主要负责网络数据的请求和响应,也就是实际的网络I/O操作。将请求头与请求体发送给服务器,以及解析服务器返回的 response
3.6.1 interceptors与networkInterceptors对比
- 两者都是用户自定义的拦截器,前者是在第一个,后者是在倒数第2个
interceptors
是应用拦截器,networkInterceptors
是网络拦截器;- 应用拦截器是用于在请求发送前和网络响应后的拦截器,只能触发一次。而网络拦截器在发生错误重试或者重定向时可以执行多次,相当于进行了二次请求;
- 如果CacheInterceptor命中了缓存就不在进行网络请求了,因此会存在短路网络拦截器的情况;
- 应用拦截器通常用于统计客户端的网络请求发起情况;网络拦截器中可以获取到最终发送请求的request也包括重定向的数据,也可以获取真正发生网络请求的回来的response,从而修改对应的请求和响应数据。
- 用法上:
interceptors可用于添加一些公共参数,如自定义 header、自定义 log 等,而networkInterceptors主要针对特定类型的请求使用的拦截器
© 版权声明
文章版权归作者所有,未经允许请勿转载,侵权请联系 admin@trc20.tw 删除。
THE END