OkHttp源码阅读

1. OkHttp的使用

/**

 * OkHttp测试类
 * @author LTP  2023/6/29
 */


class OkHttpTest {


    /** 创建OkHttpClient对象 */
    private val okHttpClient = OkHttpClient.Builder().build()



    /** 创建Request对象 */
    private val request = Request.Builder().url("http://www.baidu.com").build()




    /** 创建RealCall对象(Call的唯一接口实现类) */
    private val realCall = okHttpClient.newCall(request)



    /**
     * 同步请求
     */
    @Test
    fun getRequestWithSync() {
        // 使用同步请求,realCall执行
        realCall.execute().use {
            print(it.body?.string())
        }
    }


    /**
     * 异步请求
     */
    @Test
    fun getRequestWithASync() {
        // 使用异步请求,将realCall加入请求队列
        realCall.enqueue(object : Callback {
            override fun onFailure(call: Call, e: IOException) {
                e.printStackTrace()
            }



            override fun onResponse(call: Call, response: Response) {
                print(response.body?.string() ?: "123")
            }
        })


        // 得阻塞一会儿线程不让其结束以拿到请求结果
        Thread.sleep(2000)
    }
}
  • 主要步骤有
    1. 创建OkHttpClient对象
    2. 创建Request对象
    3. 创建RealCall对象
    4. 开始发起请求,enqueue为异步请求,execute为同步请求

2. 请求过程

  • 源码基于OkHttp V4.11.0(直接下载到本地引入lib,BTPJ/OkHttpStudy

2.1 OkHttpClient对象的创建

OkHttpClient() // 方法1
OkHttpClient.Builder().build() // 方法2
OkHttpClient().newBuilder().build() // 方法3
  • 三种方式本质上都是一样的,这里以方法2为例
  • 从创建方式来猜测就是一个典型的建造者模式,OkHttpClient.Builder()创建了一个okHttpClient的内部Builder类对象,里面初始化了一系列的成员变量,着重分析一下三个
/***
 * 建造者模式
 */




class Builder constructor() {
    /** 调度器 */
    internal var dispatcher: Dispatcher = Dispatcher()
    /** 拦截器集合 */
    internal val interceptors: MutableList<Interceptor> = mutableListOf()
    /** 网络拦截器集合 */
    internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()
  • 接着使用Builder对象调用了build()方法
/** 调用OkHttpClient构造创建OkHttpClient对象 */
fun build(): OkHttpClient = OkHttpClient(this)
  • 其实就是调用的OkHttpClient(builder:Builder)构造
open class OkHttpClient internal constructor(
    builder: Builder
) : Cloneable, Call.Factory, WebSocket.Factory {


    @get:JvmName("dispatcher")
    val dispatcher: Dispatcher = builder.dispatcher
    
    @get:JvmName("interceptors")
    val interceptors: List<Interceptor> =
        builder.interceptors.toImmutableList()


    @get:JvmName("networkInterceptors")
    val networkInterceptors: List<Interceptor> =
        builder.networkInterceptors.toImmutableList()
  • 其实就是典型的建造者模式,通过Builder创建的调度器、拦截器等成员变量最终都会赋值给OkHttpClient

2.2 Request对象的创建

Request.Builder().url("http://www.baidu.com").build()
  • 也是个建造者模式…
open class Builder {
  // 请求地址
  internal var url: HttpUrl? = null
  // 请求方法(例如Get、Post)
  internal var method: String
  // 请求头Header集合
  internal var headers: Headers.Builder
  // 请求体Body
  internal var body: RequestBody? = null


  /** A mutable map of tags, or an immutable empty map if we don't have any. */
  internal var tags: MutableMap<Class<*>, Any> = mutableMapOf()


  constructor() {
    this.method = "GET"
    this.headers = Headers.Builder()
  }
  • 看下build()方法
open fun build(): Request {
  return Request(
      checkNotNull(url) { "url == null" },
      method,
      headers.build(),
      body,
      tags.toImmutableMap()
  )
}

  • 其实就是创建了一个Request对象,并把Bulder对象的成员属性赋值给Request

2.3 创建Call对象

okHttpClient.newCall(request)
  • 看下newCall
override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)
  • 返回了Call接口的唯一实现类RealCall
class RealCall(
  val client: OkHttpClient,
  val originalRequest: Request,
  val forWebSocket: Boolean
) : Call {
...
  • 看下Call接口具体定义了哪些方法
/** Call.kt
 * 调用准备好的执行请求。请求可以被取消。由于此对象表示单个请求/响应对(流),因此不能执行两次
 */




interface Call : Cloneable {
  /** 返回启动此调用的原始请求. */
  fun request(): Request




  /** 同步请求(请求会被立即调用,一直阻塞线程直至请求被响应或请求出错) */
  @Throws(IOException::class)
  fun execute(): Response


 /**
  * 异步请求,安排在将来某个时间点执行的请求,具体调度者是OkHttpClient.dispatcher调度器,
  * 调度请求返回到Callback中
  */
  fun enqueue(responseCallback: Callback)



  /** 取消请求(已经完成的请求无法被取消) */ 
  fun cancel()



  /** 请求是否被执行(包含executed和enqueued),多次执行该方法是错误的 */
  fun isExecuted(): Boolean


  /** 请求是否被取消 */
  fun isCanceled(): Boolean


  /** 请求超时时间 */
  fun timeout(): Timeout


  /** 克隆一个新Call(无论旧Call是否已被执行) */
  public override fun clone(): Call

  /** 对象创建工厂 */
  fun interface Factory {
    fun newCall(request: Request): Call
  }
}
  • RealCall对Call中定义的这些方法进行了实现,具体可看RealCall的实现逻辑,这里就不贴出来了

2.4 发起请求

  • 先分析下发起异步请求
// RealCall.kt


override fun enqueue(responseCallback: Callback) {
  // 检测是否已被执行过,执行过抛异常
  check(executed.compareAndSet(false, true)) { "Already Executed" }

  // 事件监听回调相关,监听请求发起事件
  callStart()
  // 将请求交给调度器,调度器决定什么时候开始请求
  client.dispatcher.enqueue(AsyncCall(responseCallback))
}



private fun callStart() {
  this.callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()")
  // 事件监听回调
  eventListener.callStart(this)
}

  • 真正的逻辑在Dispatcher中
/** Dispatcher.kt

 * 何时执行异步请求的调度器,内部默认是使用ExecutorService实现的
 */




class Dispatcher constructor() {
  /** 同时执行的最大请求数,超过的请求会在内存中排队等待 */
  @get:Synchronized var maxRequests = 64
    set(maxRequests) {
      require(maxRequests >= 1) { "max < 1: $maxRequests" }
      synchronized(this) {
        field = maxRequests
      }
      promoteAndExecute()
    }



  /** 每台主机可并发执行的最大请求数(通过URL的主机名) */
  @get:Synchronized var maxRequestsPerHost = 5
    set(maxRequestsPerHost) {
      require(maxRequestsPerHost >= 1) { "max < 1: $maxRequestsPerHost" }
      synchronized(this) {
        field = maxRequestsPerHost
      }
      promoteAndExecute()
    }



  /** 请求队列空闲回调 */
  @set:Synchronized
  @get:Synchronized
  var idleCallback: Runnable? = null


  private var executorServiceOrNull: ExecutorService? = null




  /** 线程池(懒汉式保证单例) */
  @get:Synchronized
  @get:JvmName("executorService") val executorService: ExecutorService
    get() {
      if (executorServiceOrNull == null) {
        executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
            SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
      }
      return executorServiceOrNull!!
    }


  /** 待发送到请求队列的异步请求 */
  private val readyAsyncCalls = ArrayDeque<AsyncCall>()

  /** 异步请求队列 */
  private val runningAsyncCalls = ArrayDeque<AsyncCall>()


 /** 同步请求队列 */
  private val runningSyncCalls = ArrayDeque<RealCall>()
  ...
  • 看下dispatcher.enqueue方法
/** Dispatcher.kt

 * 加入队列
 */




internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
        // 添加进准备发送的集合
        readyAsyncCalls.add(call)


        // 修改AsyncCall,使其共享对同一主机的现有运行调用的AtomicInteger
        if (!call.call.forWebSocket) {
            val existingCall = findExistingCallWithHost(call.host)
            if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
        }
    }
    // 处理队列移动并按序执行
    promoteAndExecute()
}


/**Dispatcher.kt
 * 将符合条件的call从readyAsyncCalls(准备发送的队列)添加到runningAsyncCalls(异步发送队列)中,
 * 并在executor服务上运行它们。不能在同步的情况下调用,因为执行调用可能会调用用户代码
 *
 * @return true if the dispatcher is currently running calls. 如果调度程序当前正在运行,则为true
 */
private fun promoteAndExecute(): Boolean {
    this.assertThreadDoesntHoldLock()



    // 正在运行的请求
    val executableCalls = mutableListOf<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) {
        val i = readyAsyncCalls.iterator()
        while (i.hasNext()) {
            val asyncCall = i.next()



            // 超过最大请求数64,跳出整个循环
            if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
            // 单服务器超过最大请求数,跳出当前单个循环
            if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
            // 从请求准备队列中移除
            i.remove()
            asyncCall.callsPerHost.incrementAndGet()
            executableCalls.add(asyncCall)
            // 加入异步请求队列中
            runningAsyncCalls.add(asyncCall)
        }
        isRunning = runningCallsCount() > 0
    }




    for (i in 0 until executableCalls.size) {
        val asyncCall = executableCalls[i]
        // 把请求任务交给线程池
        asyncCall.executeOn(executorService)
    }



    return isRunning
}
/** RealCall.kt
 * 尝试将此异步调用加入[executorService]的队列。如果执行器已关闭,则会尝试调用失败回调来进行清理
 */




fun executeOn(executorService: ExecutorService) {
  client.dispatcher.assertThreadDoesntHoldLock()


  var success = false
  try {

    // 使用线程执行
    executorService.execute(this)
    success = true
  } catch (e: RejectedExecutionException) {
    val ioException = InterruptedIOException("executor rejected")
    ioException.initCause(e)
    noMoreExchanges(ioException)
    // 执行失败的回调调用
    responseCallback.onFailure(this@RealCall, ioException)
  } finally {
    if (!success) {
      // 发送请求完毕
      client.dispatcher.finished(this) // This call is no longer running!
    }
  }
}
  • 最终是交给线程池executorService来处理的,executorService.execute方法接收的是一个Runnable实现类,这个类就是AsyncCall,它也是RealCall的非静态内部类,持有对RealCall的引用
/**

 * RealCall.kt
 * Runnable实现类,会被加入到线程池执行
 */


internal inner class AsyncCall(
  private val responseCallback: Callback
) : Runnable {
  ...



  val host: String
    get() = originalRequest.url.host




  val request: Request
      get() = originalRequest



  val call: RealCall
      get() = this@RealCall



  /**
   * 尝试将此异步调用加入[executorService]的队列。如果执行器已关闭,则会尝试调用失败回调来进行清理
   */
  fun executeOn(executorService: ExecutorService) {
    ...
  }



  override fun run() {
    threadName("OkHttp ${redactedUrl()}") {
      var signalledCallback = false
      timeout.enter()
      try {
        // 发送请求,经过一系列的拦截器链最终返回响应结果
        val response = getResponseWithInterceptorChain()
        signalledCallback = true
        // 回调请求成功的结果
        responseCallback.onResponse(this@RealCall, response)
      } catch (e: IOException) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
        } else {
          responseCallback.onFailure(this@RealCall, e)
        }
      } catch (t: Throwable) {
        // 失败后取消请求
        cancel()
        if (!signalledCallback) {
          val canceledException = IOException("canceled due to $t")
          canceledException.addSuppressed(t)
          // 请求异常,回调请求失败的结果
          responseCallback.onFailure(this@RealCall, canceledException)
        }

        throw t
      } finally {
        // 发送请求完毕
        client.dispatcher.finished(this)
      }
    }
  }
}
  • 看下请求完毕的方法client.dispatcher.finished(this)
// Dispatcher.kt

/** 被[AsyncCall.run]调用表示请求完成(异步) */
internal fun finished(call: AsyncCall) {
    call.callsPerHost.decrementAndGet()
    finished(runningAsyncCalls, call)
}




/** 被[Call.execute]调用表示请求完成(同步) */
internal fun finished(call: RealCall) {
    finished(runningSyncCalls, call)
}




/** 请求完毕的逻辑处理(将当前请求从请求队列移除、继续执行下一个请求、判断是否回调闲置Callback) */
private fun <T> finished(calls: Deque<T>, call: T) {
    val idleCallback: Runnable?
    synchronized(this) {
        // 从请求任务队列中移除
        if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
        idleCallback = this.idleCallback
    }


    // 继续将等待队列中的请求移入异步队列,并交由线程池执行
    val isRunning = promoteAndExecute()



    // 如果没有请求需要被执行,回调请求队列闲置的callback
    if (!isRunning && idleCallback != null) {
        idleCallback.run()
    }
}
  • 简单分析下同步请求
realCall.execute()
// RealCall.kt


/** 同步请求 */
override fun execute(): Response {
  check(executed.compareAndSet(false, true)) { "Already Executed" }


  timeout.enter()
  callStart()
  try {

    // 直接交由dispatcher执行
    client.dispatcher.executed(this)
    // 通过getResponseWithInterceptorChain返回请求响应
    return getResponseWithInterceptorChain()
  } finally {
    // 发送请求完毕
    client.dispatcher.finished(this)
  }
}


// Dispatcher.kt

/** 被 [Call.execute] 调用表示正在执行请求. */
@Synchronized
internal fun executed(call: RealCall) {
    // 直接加入请求队列
    runningSyncCalls.add(call)
}
  • 可见同步请求会将请求Call加入到同步请求队列,并直接调用getResponseWithInterceptorChain()返回响应结果
  • 来张图对整个流程做个总结
    image.png

3. 拦截器链

  • 上面知道返回响应是通过getResponseWithInterceptorChain()这个方法的,看下源码
// RealCall.kt


@Throws(IOException::class)

internal fun getResponseWithInterceptorChain(): Response {
  // 构建一个完整的拦截器链
  val interceptors = mutableListOf<Interceptor>()
  // 用户自定义的拦截器(全局拦截器,可以在任何请求过程中使用)
  interceptors += client.interceptors
  // 重试和重定向拦截器
  interceptors += RetryAndFollowUpInterceptor(client)
  // 桥接拦截器,主要负责请求和响应的转换
  interceptors += BridgeInterceptor(client.cookieJar)
  // 缓存拦截器,主要负责缓存的相关处理
  interceptors += CacheInterceptor(client.cache)
  // 连接拦截器,主要负责建立连接,建立 TCP 连接或者 TLS 连接
  interceptors += ConnectInterceptor
  if (!forWebSocket) {
    // 用户自定义的网络拦截器(针对特定类型的请求使用的拦截器)
    interceptors += client.networkInterceptors
  }
  // 调用服务器拦截器,主要负责网络数据的请求和响应,也就是实际的网络I/O操作
  interceptors += CallServerInterceptor(forWebSocket)

  // 构造具体的拦截器链
  val chain = RealInterceptorChain(
      call = this,
      interceptors = interceptors,
      index = 0,
      exchange = null,
      request = originalRequest,
      connectTimeoutMillis = client.connectTimeoutMillis,
      readTimeoutMillis = client.readTimeoutMillis,
      writeTimeoutMillis = client.writeTimeoutMillis
  )

  var calledNoMoreExchanges = false
  try {
    // 拦截器链的执行关键,拦截器的层层调用、层层返回
    val response = chain.proceed(originalRequest)
    if (isCanceled()) {
      response.closeQuietly()
      throw IOException("Canceled")
    }

    // 返回响应结果
    return response
  } catch (e: IOException) {
    calledNoMoreExchanges = true
    throw noMoreExchanges(e) as Throwable
  } finally {
    if (!calledNoMoreExchanges) {
      noMoreExchanges(null)
    }
  }
}
  • getResponseWithInterceptorChain方法是整个OkHttp实现责任链模式的核心,在这个方法中除了众多拦截器是重点之外还有chain.proceed,它利用了责任链模式进行层层调用proceed,层层返回response
// RealInterceptorChain.kt
@Throws(IOException::class)

override fun proceed(request: Request): Response {
  ...

  // 创建并调用拦截器链中的下一个拦截器的intercept方法
  val next = copy(index = index + 1, request = request)
  val interceptor = interceptors[index]


  @Suppress("USELESS_ELVIS")
  // 调用下一个拦截器的intercept方法(intercept内部还会调用proceed)
  val response = interceptor.intercept(next) ?: throw NullPointerException(
      "interceptor $interceptor returned null")
  ...



  return response
}
  • 从源码得知,Chain 是用来描述责任链的,并通过其中的 process 方法开始依次执行链上的每个节点,并返回处理后的 Response。 Chain 的唯一实现为 RealInterceptorChain,可以称之为拦截器责任链,其中的节点由 RealCall 中添加进来的 Interceptor 们组成
  • Interceptor 与 Chain 彼此互相依赖,互相调用,形成了一个完美的调用链,下面是大致的调用关系链

image.png

3.1 RetryAndFollowUpInterceptor

  • 除去自定义拦截器之外第一个执行的拦截器。主要作用是进行请求的重试与重定向(还创建了一个连接管理池)。在某些情况下,当网络请求失败时,这个拦截器可以提供一种机制,尝试重新连接到网络,并再次发送请求。这个过程可以帮助应用程序在出现短暂的网络问题时保持连接,提高了系统的可靠性和稳定性。
// RetryAndFollowUpInterceptor.kt
class RetryAndFollowUpInterceptor(private val client: OkHttpClient) : Interceptor {

    @Throws(IOException::class)
    override fun intercept(chain: Interceptor.Chain): Response {
        val realChain = chain as RealInterceptorChain
        var request = chain.request
        val call = realChain.call
        var followUpCount = 0
        var priorResponse: Response? = null
        var newExchangeFinder = true
        var recoveredFailures = listOf<IOException>()
        while (true) {
            // newExchangeFinder传true,enterNetworkInterceptorExchange方法会创建一个新的
            // ExchangeFinder对象(管理链接池)
            call.enterNetworkInterceptorExchange(request, newExchangeFinder)



            var response: Response
            var closeActiveExchange = true
            try {
                if (call.isCanceled()) {
                    throw IOException("Canceled")
                }



                try {
                    // 责任链模式,让下一个拦截器处理
                    response = realChain.proceed(request)
                    newExchangeFinder = true
                } catch (e: RouteException) {
                   ...




                val exchange = call.interceptorScopedExchange

                // 计算出接收[userResponse]时要发出的HTTP请求。
                // 这将添加身份验证标头、遵循重定向或处理客户端请求超时。如果后续操作不必要或不适用,则返回null
                val followUp = followUpRequest(response, exchange)



                // 获取到的request为空则直接返回response
                if (followUp == null) {
                    if (exchange != null && exchange.isDuplex) {
                        call.timeoutEarlyExit()
                    }
                    closeActiveExchange = false
                    return response
                }


                val followUpBody = followUp.body
                if (followUpBody != null && followUpBody.isOneShot()) {
                    closeActiveExchange = false
                    return response
                }

                response.body?.closeQuietly()

                // 超过最大重试或者重定向次数则抛出异常
                if (++followUpCount > MAX_FOLLOW_UPS) {
                    throw ProtocolException("Too many follow-up requests: $followUpCount")
                }



                // 返回的request若不为空则重新赋值后继续发起请求
                request = followUp
                priorResponse = response
            } finally {
                call.exitNetworkInterceptorExchange(closeActiveExchange)
            }
        }
    }

    ...

    /**
     * 计算出接收[userResponse]时要发出的HTTP请求。
     * 这将添加身份验证标头、遵循重定向或处理客户端请求超时。如果后续操作不必要或不适用,则返回null
     */
    @Throws(IOException::class)
    private fun followUpRequest(userResponse: Response, exchange: Exchange?): Request? {
        val route = exchange?.connection?.route()
        val responseCode = userResponse.code

        val method = userResponse.request.method
        when (responseCode) {
            HTTP_PROXY_AUTH -> {
               // 407异常处理
               ...
            }

            HTTP_UNAUTHORIZED -> return client.authenticator.authenticate(route, userResponse)

            HTTP_PERM_REDIRECT, HTTP_TEMP_REDIRECT, HTTP_MULT_CHOICE, HTTP_MOVED_PERM, HTTP_MOVED_TEMP, HTTP_SEE_OTHER -> {
                // 300、301、302、303、307、308,重定向请求处理
                return buildRedirectRequest(userResponse, method)
            }

            HTTP_CLIENT_TIMEOUT -> {
                // 408异常处理
                ...
            }

            HTTP_UNAVAILABLE -> {
                // 503异常处理
                ...
            }

            HTTP_MISDIRECTED_REQUEST -> {
                // 421异常处理
                ...
            }

            else -> return null
        }
    }

    /** 构建重定向请求 */
    private fun buildRedirectRequest(userResponse: Response, method: String): Request? {
        ...

        // 大多数重定向不包括请求正文
        val requestBuilder = userResponse.request.newBuilder()
        if (HttpMethod.permitsRequestBody(method)) {
            val responseCode = userResponse.code
            val maintainBody = HttpMethod.redirectsWithBody(method) ||
                    responseCode == HTTP_PERM_REDIRECT ||
                    responseCode == HTTP_TEMP_REDIRECT
            if (HttpMethod.redirectsToGet(method) && responseCode != HTTP_PERM_REDIRECT && responseCode != HTTP_TEMP_REDIRECT) {
                requestBuilder.method("GET", null)
            } else {
                val requestBody = if (maintainBody) userResponse.request.body else null
                requestBuilder.method(method, requestBody)
            }
            if (!maintainBody) {
                requestBuilder.removeHeader("Transfer-Encoding")
                requestBuilder.removeHeader("Content-Length")
                requestBuilder.removeHeader("Content-Type")
            }
        }

        // 跨主机重定向时,需要删除所有身份验证标头(应用层无法保留它们)
        if (!userResponse.request.url.canReuseConnectionFor(url)) {
            requestBuilder.removeHeader("Authorization")
        }

        return requestBuilder.url(url).build()
    }

    companion object {
        /**
         * 重试或者重定向的次数
         */
        private const val MAX_FOLLOW_UPS = 20
    }
}

3.2 BridgeInterceptor

  • 桥接拦截器,主要负责请求和响应的转换。把用户构造的 request 对象转换成发送到服务器 request对象,并把服务器返回的响应转换为对用户友好的响应。
// BridgeInterceptor.kt
/**


 * 从应用程序代码到网络代码的桥接。
 * 首先,它根据用户请求构建一个网络请求。然后它继续呼叫网络。最后,它根据网络响应构建用户响应
 */

class BridgeInterceptor(private val cookieJar: CookieJar) : Interceptor {




    @Throws(IOException::class)
    override fun intercept(chain: Interceptor.Chain): Response {
        // 请求阶段:BridgeInterceptor负责将用户构建的Request请求转化为能够进行网络访问的请求。
        // 这个过程可能包括一些对请求头部的处理,例如设置请求头信息,如Content-Type,Content-Length等
        val userRequest = chain.request()
        val requestBuilder = userRequest.newBuilder()



        val body = userRequest.body
        if (body != null) {
            val contentType = body.contentType()
            if (contentType != null) {
                requestBuilder.header("Content-Type", contentType.toString())
            }


            val contentLength = body.contentLength()
            if (contentLength != -1L) {
                requestBuilder.header("Content-Length", contentLength.toString())
                requestBuilder.removeHeader("Transfer-Encoding")
            } else {
                requestBuilder.header("Transfer-Encoding", "chunked")
                requestBuilder.removeHeader("Content-Length")
            }
        }




        if (userRequest.header("Host") == null) {
            requestBuilder.header("Host", userRequest.url.toHostHeader())
        }

        if (userRequest.header("Connection") == null) {
            requestBuilder.header("Connection", "Keep-Alive")
        }

        // 如果添加了“Accept-Encoding:gzip”的Header,拦截器还负责解压缩传输流
        var transparentGzip = false
        if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
            transparentGzip = true
            requestBuilder.header("Accept-Encoding", "gzip")
        }


        val cookies = cookieJar.loadForRequest(userRequest.url)
        if (cookies.isNotEmpty()) {
            requestBuilder.header("Cookie", cookieHeader(cookies))
        }

        if (userRequest.header("User-Agent") == null) {
            requestBuilder.header("User-Agent", userAgent)
        }


        // 返回阶段:当网络请求返回响应时,BridgeInterceptor会将网络请求返回的Response转化为用户可用的Response。
        // 这个过程可能包括对返回数据进行解析,处理响应头信息,或者将处理后的响应返回给用户
        val networkResponse = chain.proceed(requestBuilder.build())



        cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)

        val responseBuilder = networkResponse.newBuilder()
            .request(userRequest)

        if (transparentGzip &&
            "gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&
            networkResponse.promisesBody()
        ) {
            val responseBody = networkResponse.body
            if (responseBody != null) {
                val gzipSource = GzipSource(responseBody.source())
                val strippedHeaders = networkResponse.headers.newBuilder()
                    .removeAll("Content-Encoding")
                    .removeAll("Content-Length")
                    .build()
                responseBuilder.headers(strippedHeaders)
                val contentType = networkResponse.header("Content-Type")
                responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))
            }
        }

        return responseBuilder.build()
    }

3.3 CacheInterceptor

  • 缓存拦截器,主要负责缓存的读取与写入,将 Http 的请求结果放到到缓存中,以便在下次进行相同的请求时,直接从缓存中读取结果,提高响应速度。
// CacheInterceptor.kt
/**


 * 主要负责缓存的读取和写入,
 * 将 Http 的请求结果放到到缓存中,以便在下次进行相同的请求时,直接从缓存中读取结果,提高响应速度
 */

class CacheInterceptor(internal val cache: Cache?) : Interceptor {




  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val call = chain.call()
    // 获取候选缓存
    val cacheCandidate = cache?.get(chain.request())


    val now = System.currentTimeMillis()



    // 缓存策略,请求是使用网络、缓存还是两者兼用
    val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
    val networkRequest = strategy.networkRequest
    val cacheResponse = strategy.cacheResponse



    cache?.trackResponse(strategy)
    val listener = (call as? RealCall)?.eventListener ?: EventListener.NONE


    if (cacheCandidate != null && cacheResponse == null) {
      // 候选缓存不适用,直接关闭
      cacheCandidate.body?.closeQuietly()
    }

    // 如果我们被禁止使用网络,并且缓存不足,抛出异常504。
    if (networkRequest == null && cacheResponse == null) {
      return Response.Builder()
          .request(chain.request())
          .protocol(Protocol.HTTP_1_1)
          .code(HTTP_GATEWAY_TIMEOUT)
          .message("Unsatisfiable Request (only-if-cached)")
          .body(EMPTY_RESPONSE)
          .sentRequestAtMillis(-1L)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build().also {
            listener.satisfactionFailure(call, it)
          }
    }


    // 如果不需要网络则直接返回缓存即可
    if (networkRequest == null) {
      // 返回缓存请求结果
      return cacheResponse!!.newBuilder()
          .cacheResponse(stripBody(cacheResponse))
          .build().also {
            listener.cacheHit(call, it)
          }
    }

    if (cacheResponse != null) {
      listener.cacheConditionalHit(call, cacheResponse)
    } else if (cache != null) {
      listener.cacheMiss(call)
    }



    var networkResponse: Response? = null
    try {
      // 责任链,让下一个拦截器处理
      networkResponse = chain.proceed(networkRequest)
    } finally {
      // If we're crashing on I/O or otherwise, don't leak the cache body.
      if (networkResponse == null && cacheCandidate != null) {
        cacheCandidate.body?.closeQuietly()
      }
    }

    // 有缓存响应
    if (cacheResponse != null) {
      // 服务器返回状态码为304则返回缓存结果
      if (networkResponse?.code == HTTP_NOT_MODIFIED) {
        val response = cacheResponse.newBuilder()
            .headers(combine(cacheResponse.headers, networkResponse.headers))
            .sentRequestAtMillis(networkResponse.sentRequestAtMillis)
            .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build()

        networkResponse.body!!.close()


        // 在合并标头之后但在剥离Content-Encoding标头之前更新缓存
        cache!!.trackConditionalCacheHit()
        cache.update(cacheResponse, response)
        return response.also {
          listener.cacheHit(call, it)
        }
      } else {
        cacheResponse.body?.closeQuietly()
      }
    }

    // 读取网络请求结果
    val response = networkResponse!!.newBuilder()
        .cacheResponse(stripBody(cacheResponse))
        .networkResponse(stripBody(networkResponse))
        .build()

    if (cache != null) {
      if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
        // 将网络请求结果加入缓存
        val cacheRequest = cache.put(response)
        return cacheWritingResponse(cacheRequest, response).also {
          if (cacheResponse != null) {
            listener.cacheMiss(call)
          }
        }
      }

      // 缓存失效时需要清除
      if (HttpMethod.invalidatesCache(networkRequest.method)) {
        try {
          cache.remove(networkRequest)
        } catch (_: IOException) {
        }
      }
    }

    // 返回网络请求结果
    return response
  }

3.4 ConnectInterceptor

  • 连接拦截器,主要负责建立连接,建立 TCP 连接或者 TLS 连接
// ConnectInterceptor.kt
/**


 * 打开与目标服务器的连接,然后转到下一个拦截器。网络可能用于返回的响应,或者使用条件GET验证缓存的响应
 */


object ConnectInterceptor : Interceptor {
  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    // 查找新连接或池连接以承载即将到来的请求和响应
    val exchange = realChain.call.initExchange(chain)
    val connectedChain = realChain.copy(exchange = exchange)
    // 让下一层拦截器处理,同时将exchange一起传递过去
    return connectedChain.proceed(realChain.request)
  }
}

  • 建立连接的部分核心代码
// realCall.kt
/** 查找新连接或池连接以承载即将到来的请求和响应 */
internal fun initExchange(chain: RealInterceptorChain): Exchange {
  ...

  val codec = exchangeFinder.find(client, chain)
  val result = Exchange(this, eventListener, exchangeFinder, codec)
  return result
}
// ExchangeFinder.kt


fun find(
  client: OkHttpClient,
  chain: RealInterceptorChain
): ExchangeCodec {
  try {
    val resultConnection = findHealthyConnection(
        connectTimeout = chain.connectTimeoutMillis,
        readTimeout = chain.readTimeoutMillis,
        writeTimeout = chain.writeTimeoutMillis,
        pingIntervalMillis = client.pingIntervalMillis,
        connectionRetryEnabled = client.retryOnConnectionFailure,
        doExtensiveHealthChecks = chain.request.method != "GET"
    )
    return resultConnection.newCodec(client, chain)
 ...
}


// ExchangeFinder.kt


/**  查找可用的链接,如果不可用则一直重复查找,知道找到为止 */
@Throws(IOException::class)


private fun findHealthyConnection(
  connectTimeout: Int,


  readTimeout: Int,


  writeTimeout: Int,

  pingIntervalMillis: Int,

  connectionRetryEnabled: Boolean,
  doExtensiveHealthChecks: Boolean
): RealConnection {
  while (true) {

    val candidate = findConnection(
        connectTimeout = connectTimeout,
        readTimeout = readTimeout,
        writeTimeout = writeTimeout,
        pingIntervalMillis = pingIntervalMillis,
        connectionRetryEnabled = connectionRetryEnabled
    )



    // 确保连接可用
    if (candidate.isHealthy(doExtensiveHealthChecks)) {
      return candidate
    }
    ...
  }
}
// ExchangeFinder.kt


/** 获取连接,先是获取已经存在的进行重用,没有从池中取,再没有就创建一个新连接 */
@Throws(IOException::class)


private fun findConnection(
  connectTimeout: Int,


  readTimeout: Int,


  writeTimeout: Int,

  pingIntervalMillis: Int,

  connectionRetryEnabled: Boolean
): RealConnection {
    ...
    // 如果连接还没有被释放则会重用。这里没有调用connectionAcquired()是由于我们之前已经获取了
    if (call.connection != null) {
      check(toClose == null)
      return callConnection
    }
    ...



  // 能从连接池拿到连接直接返回
  if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
    val result = call.connection!!
    eventListener.connectionAcquired(call, result)
    return result
  }



  // 连接池中无连接时,创建新连接并添加到连接池
  ...
  val newConnection = RealConnection(connectionPool, route)
  call.connectionToCancel = newConnection
  try {
    // 使用新连接来进行服务器连接
    newConnection.connect(
        connectTimeout,
        readTimeout,
        writeTimeout,
        pingIntervalMillis,
        connectionRetryEnabled,
        call,
        eventListener
    )
  } finally {
  ...
  synchronized(newConnection) {
     // 新连接添加到连接池
    connectionPool.put(newConnection)
    call.acquireConnectionNoEvents(newConnection)
  }


  eventListener.connectionAcquired(call, newConnection)
    return newConnection
  }
  • exchangeFinder.find开始到exchangeFinder.findConnection只做了一件事,就是先尝试重用连接,如果不能重用就从连接池中取出一个新的连接,如果无法取出就直接创建一个新的连接并添加到连接池中。
  • 看看服务器是怎么连接的,看这行代码newConnection.connect
// RealConnection.kt

fun connect(
  connectTimeout: Int,
  readTimeout: Int,
  writeTimeout: Int,
  pingIntervalMillis: Int,
  connectionRetryEnabled: Boolean,
  call: Call,
  eventListener: EventListener
) {
...
  while (true) {

    try {
      if (route.requiresTunnel()) {
        ...
      } else {
        // 通过Socket构建完整HTTP或HTTPS连接
        connectSocket(connectTimeout, readTimeout, call, eventListener)
  ...
}

// RealConnection.kt

/** 通过Socket来构建完整HTTP或HTTPS连接 */
@Throws(IOException::class)


private fun connectSocket(
  connectTimeout: Int,


  readTimeout: Int,


  call: Call,
  eventListener: EventListener
) {
 ...
  try {
    Platform.get().connectSocket(rawSocket, route.socketAddress, connectTimeout)
  } catch (e: ConnectException) {
 ...
try {
  // okio的接口,用于输入,类似InputStream
  source = rawSocket.source().buffer()
  //okio的接口,用于输出,类似OutputStream
  sink = rawSocket.sink().buffer()
}

...
}
  • 主要是创建了一个socket对象然后使用socket建立连接,利用okio的输入输出接口获取输入/输出流

3.5 CallServerInterceptor

  • 主要负责网络数据的请求和响应,也就是实际的网络I/O操作。将请求头与请求体发送给服务器,以及解析服务器返回的response
// CallServerInterceptor.kt
/** 最后一个拦截器,实现对服务器的网络调用 */
class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor {


  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    val exchange = realChain.exchange!!
    val request = realChain.request
    val requestBody = request.body
    val sentRequestMillis = System.currentTimeMillis()




    var invokeStartEvent = true
    var responseBuilder: Response.Builder? = null
    var sendRequestException: IOException? = null
    try {
      exchange.writeRequestHeaders(request)



      // 如果不是GET/HEAD请求
      if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
        // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
        // Continue" response before transmitting the request body. If we don't get that, return
        // what we did get (such as a 4xx response) without ever transmitting the request body.
        if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
          exchange.flushRequest()
          responseBuilder = exchange.readResponseHeaders(expectContinue = true)
          exchange.responseHeadersStart()
          invokeStartEvent = false
        }
        if (responseBuilder == null) {
          if (requestBody.isDuplex()) {
            // Prepare a duplex body so that the application can send a request body later.
            // 准备双工正文,以便应用程序稍后可以发送请求正文
            exchange.flushRequest()
            val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
            requestBody.writeTo(bufferedRequestBody)
          } else {
            // Write the request body if the "Expect: 100-continue" expectation was met.
            val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
            requestBody.writeTo(bufferedRequestBody)
            bufferedRequestBody.close()
          }
        } else {
          exchange.noRequestBody()
          if (!exchange.connection.isMultiplexed) {
            // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
            // from being reused. Otherwise we're still obligated to transmit the request body to
            // leave the connection in a consistent state.
            exchange.noNewExchangesOnConnection()
          }
        }

      } else {
        exchange.noRequestBody()
      }


      if (requestBody == null || !requestBody.isDuplex()) {
        exchange.finishRequest()
      }
    } catch (e: IOException) {
      if (e is ConnectionShutdownException) {
        throw e // No request was sent so there's no response to read.
      }
      if (!exchange.hasFailure) {
        throw e // Don't attempt to read the response; we failed to send the request.
      }
      sendRequestException = e
    }


    try {
      if (responseBuilder == null) {
        responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
        if (invokeStartEvent) {
          exchange.responseHeadersStart()
          invokeStartEvent = false
        }
      }
      var response = responseBuilder
          .request(request)
          .handshake(exchange.connection.handshake())
          .sentRequestAtMillis(sentRequestMillis)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build()
      var code = response.code


      if (shouldIgnoreAndWaitForRealResponse(code)) {
        responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
        if (invokeStartEvent) {
          exchange.responseHeadersStart()
        }
        response = responseBuilder
            .request(request)
            .handshake(exchange.connection.handshake())
            .sentRequestAtMillis(sentRequestMillis)
            .receivedResponseAtMillis(System.currentTimeMillis())
            .build()
        code = response.code
      }

      exchange.responseHeadersEnd(response)

      response = if (forWebSocket && code == 101) {
        // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
        response.newBuilder()
            .body(EMPTY_RESPONSE)
            .build()
      } else {
        response.newBuilder()
            .body(exchange.openResponseBody(response))
            .build()
      }
     ...
      return response
   ...
  }

3.6 拦截器总结

  1. client.interceptors:用户自定义的拦截器,会在所有的拦截器处理之前进行最早的拦截处理,可用于添加一些公共参数,如自定义 header、自定义 log 等等。
  2. RetryAndFollowUpIntercept:重试和重定向拦截器,重试或者重定向的次数不能大于20次,同时它还创建了一个ExchangeFinder对象用于管理连接池为后续的连接做准备
  3. BridgeInterceptor:桥接拦截器,主要负责请求和响应的转换。补充请求头,把用户请求转换成网络请求,网络响应转换成用户可以接收的响应,同时还需要注意的一点是如果用户手动添加了Accept-Encoding那就需要处理解压操作
  4. CacheInterceptor:缓存拦截器,主要负责缓存的读取与写入,将 Http 的请求结果放到到缓存中,以便在下次进行相同的请求时,直接从缓存中读取结果,提高响应速度(内部是通过okio来处理缓存的)
  5. ConnectInterceptor:连接拦截器,负责建立连接的。最终是通过RealConnection对象建立socket连接的并且获得了输入输出流为下一步读写做准备,RealConnection对象的获取时优先复用的,如果无法复用则从连接池中获取,如果无法获取则创建一个新的连接并将其放入连接池中
  6. client.networkInterceptors:用户自定义的网络拦截器(针对特定类型的请求使用的拦截器)
  7. CallServerInterceptor:调用服务器拦截器,主要负责网络数据的请求和响应,也就是实际的网络I/O操作。将请求头与请求体发送给服务器,以及解析服务器返回的 response

3.6.1 interceptors与networkInterceptors对比

  • 两者都是用户自定义的拦截器,前者是在第一个,后者是在倒数第2个
  • interceptors是应用拦截器,networkInterceptors是网络拦截器;
  • 应用拦截器是用于在请求发送前和网络响应后的拦截器,只能触发一次。而网络拦截器在发生错误重试或者重定向时可以执行多次,相当于进行了二次请求;
  • 如果CacheInterceptor命中了缓存就不在进行网络请求了,因此会存在短路网络拦截器的情况;
  • 应用拦截器通常用于统计客户端的网络请求发起情况;网络拦截器中可以获取到最终发送请求的request也包括重定向的数据,也可以获取真正发生网络请求的回来的response,从而修改对应的请求和响应数据。
  • 用法上:interceptors可用于添加一些公共参数,如自定义 header、自定义 log 等,而networkInterceptors主要针对特定类型的请求使用的拦截器

© 版权声明
THE END
喜欢就支持一下吧
点赞0

Warning: mysqli_query(): (HY000/3): Error writing file '/tmp/MY3zsPCl' (Errcode: 28 - No space left on device) in /www/wwwroot/583.cn/wp-includes/class-wpdb.php on line 2345
admin的头像-五八三
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

图形验证码
取消
昵称代码图片