摘要:為了控制壓測時的,則需要實現邏輯。則是獲取屬性并初始化客戶端客戶端配置則提供了設置泛化調用入參的以及接下來要介紹的部分的全鏈路壓測中,我們都使用校驗請求結果,壓測插件中,我們也實現了基于的校驗。
Dubbo 壓測插件已開源,本文涉及代碼詳見gatling-dubbo
Gatling 是一個開源的基于 Scala、Akka、Netty 實現的高性能壓測框架,較之其他基于線程實現的壓測框架,Gatling 基于 AKKA Actor 模型實現,請求由事件驅動,在系統資源消耗上低于其他壓測框架(如內存、連接池等),使得單臺施壓機可以模擬更多的用戶。此外,Gatling 提供了一套簡單高效的 DSL(領域特定語言)方便我們編排業務場景,同時也具備流量控制、壓力控制的能力并提供了良好的壓測報告,所以有贊選擇在 Gatling 基礎上擴展分布式能力,開發了自己的全鏈路壓測引擎 MAXIM。全鏈路壓測中我們主要模擬用戶實際使用場景,使用 HTTP 接口作為壓測入口,但有贊目前后端服務中 Dubbo 應用比重越來越高,如果可以知道 Dubbo 應用單機水位將對我們把控系統后端服務能力大有裨益。基于 Gatling 的優勢和在有贊的使用基礎,我們擴展 Gatling 開發了 gatling-dubbo 壓測插件。
插件主要結構實現 Dubbo 壓測插件,需實現以下四部分內容:
Protocol 和 ProtocolBuild
協議部分,這里主要定義 Dubbo 客戶端相關內容,如協議、泛化調用、服務 URL、注冊中心等內容,ProtocolBuild 則為 DSL 使用 Protocol 的輔助類
Action 和 ActionBuild
執行部分,這里的作用是發起 Dubbo 請求,校驗請求結果并記錄日志以便后續生成壓測報告。ActionBuild 則為 DSL 使用 Action 的輔助類
Check 和 CheckBuild
檢查部分,全鏈路壓測中我們都使用Json Path檢查請求結果,這里我們實現了一樣的檢查邏輯。CheckBuild 則為 DSL 使用 Check 的輔助類
DSL
Dubbo 插件的領域特定語言,我們提供了一套簡單易用的 API 方便編寫 Duboo 壓測腳本,風格上與原生 HTTP DSL 保持一致
Protocol協議部分由 5 個屬性組成,這些屬性將在 Action 初始化 Dubbo 客戶端時使用,分別是:
protocol
協議,設置為dubbo
generic
泛化調用設置,Dubbo 壓測插件使用泛化調用發起請求,所以這里設置為true,有贊優化了泛化調用的性能,為了使用該特性,引入了一個新值result_no_change(去掉優化前泛化調用的序列化開銷以提升性能)
url
Dubbo 服務的地址:dubbo://IP地址:端口
registryProtocol
Dubbo 注冊中心的協議,設置為ETCD3
registryAddress
Dubbo 注冊中心的地址
如果是測試 Dubbo 單機水位,則設置 url,注冊中心設置為空;如果是測試 Dubbo 集群水位,則設置注冊中心(目前支持 ETCD3),url 設置為空。由于目前注冊中心只支持 ETCD3,插件在 Dubbo 集群上使用缺乏靈活性,所以我們又實現了客戶端層面的負載均衡,如此便可拋開特定的注冊中心來測試 Dubbo 集群水位。該特性目前正在內測中。
object DubboProtocol { val DubboProtocolKey = new ProtocolKey { type Protocol = DubboProtocol type Components = DubboComponents def protocolClass: Class[io.gatling.core.protocol.Protocol] = classOf[DubboProtocol].asInstanceOf[Class[io.gatling.core.protocol.Protocol]] def defaultProtocolValue(configuration: GatlingConfiguration): DubboProtocol = throw new IllegalStateException("Can"t provide a default value for DubboProtocol") def newComponents(system: ActorSystem, coreComponents: CoreComponents): DubboProtocol => DubboComponents = { dubboProtocol => DubboComponents(dubboProtocol) } } } case class DubboProtocol( protocol: String, //dubbo generic: String, //泛化調用? url: String, //use url or registryProtocol: String, //use registry registryAddress: String //use registry ) extends Protocol { type Components = DubboComponents }
為了方便 Action 中使用上面這些屬性,我們將其裝進了 Gatling 的 ProtocolComponents:
case class DubboComponents(dubboProtocol: DubboProtocol) extends ProtocolComponents { def onStart: Option[Session => Session] = None def onExit: Option[Session => Unit] = None }
以上就是關于 Protocol 的定義。為了能在 DSL 中配置上述 Protocol,我們定義了 DubboProtocolBuilder,包含了 5 個方法分別設置 Protocol 的 protocol、generic、url、registryProtocol、registryAddress 5 個屬性。
object DubboProtocolBuilderBase { def protocol(protocol: String) = DubboProtocolBuilderGenericStep(protocol) } case class DubboProtocolBuilderGenericStep(protocol: String) { def generic(generic: String) = DubboProtocolBuilderUrlStep(protocol, generic) } case class DubboProtocolBuilderUrlStep(protocol: String, generic: String) { def url(url: String) = DubboProtocolBuilderRegistryProtocolStep(protocol, generic, url) } case class DubboProtocolBuilderRegistryProtocolStep(protocol: String, generic: String, url: String) { def registryProtocol(registryProtocol: String) = DubboProtocolBuilderRegistryAddressStep(protocol, generic, url, registryProtocol) } case class DubboProtocolBuilderRegistryAddressStep(protocol: String, generic: String, url: String, registryProtocol: String) { def registryAddress(registryAddress: String) = DubboProtocolBuilder(protocol, generic, url, registryProtocol, registryAddress) } case class DubboProtocolBuilder(protocol: String, generic: String, url: String, registryProtocol: String, registryAddress: String) { def build = DubboProtocol( protocol = protocol, generic = generic, url = url, registryProtocol = registryProtocol, registryAddress = registryAddress ) }Action
DubboAction 包含了 Duboo 請求邏輯、請求結果校驗邏輯以及壓力控制邏輯,需要擴展 ExitableAction 并實現 execute 方法。
DubboAction 類的域 argTypes、argValues 分別是泛化調用請求參數類型和請求參數值,需為 Expression[] 類型,這樣當使用數據 Feeder 作為壓測腳本參數輸入時,可以使用類似 ${args_types}、${args_values}這樣的表達式從數據 Feeder 中解析對應字段的值。
execute 方法必須以異步方式執行 Dubbo 請求,這樣前一個 Dubbo 請求執行后但還未等響應返回時虛擬用戶就可以通過 AKKA Message 立即發起下一個請求,如此一個虛擬用戶可以在很短的時間內構造大量請求。請求方式方面,相比于泛化調用,原生 API 調用需要客戶端載入 Dubbo 服務相應的 API 包,但有時候卻拿不到,此外,當被測 Dubbo 應用多了,客戶端需要載入多個 API 包,所以出于使用上的便利性,Dubbo 壓測插件使用泛化調用發起請求。
異步請求響應后會執行 onComplete 方法,校驗請求結果,并根據校驗結果記錄請求成功或失敗日志,壓測報告就是使用這些日志統計計算的。
為了控制壓測時的 RPS,則需要實現 throttle 邏輯。實踐中發現,高并發情況下,泛化調用性能遠不如原生 API 調用性能,且響應時間成倍增長(如此不能表征 Dubbo 應用的真正性能),導致 Dubbo 壓測插件壓力控制不準,解決辦法是優化泛化調用性能,使之與原生 API 調用的性能相近,請參考dubbo 泛化調用性能優化。
class DubboAction( interface: String, method: String, argTypes: Expression[Array[String]], argValues: Expression[Array[Object]], genericService: GenericService, checks: List[DubboCheck], coreComponents: CoreComponents, throttled: Boolean, val objectMapper: ObjectMapper, val next: Action ) extends ExitableAction with NameGen { override def statsEngine: StatsEngine = coreComponents.statsEngine override def name: String = genName("dubboRequest") override def execute(session: Session): Unit = recover(session) { argTypes(session) flatMap { argTypesArray => argValues(session) map { argValuesArray => val startTime = System.currentTimeMillis() val f = Future { try { genericService.$invoke(method, argTypes(session).get, argValues(session).get) } finally { } } f.onComplete { case Success(result) => val endTime = System.currentTimeMillis() val resultMap = result.asInstanceOf[JMap[String, Any]] val resultJson = objectMapper.writeValueAsString(resultMap) val (newSession, error) = Check.check(resultJson, session, checks) error match { case None => statsEngine.logResponse(session, interface + "." + method, ResponseTimings(startTime, endTime), Status("OK"), None, None) throttle(newSession(session)) case Some(Failure(errorMessage)) => statsEngine.logResponse(session, interface + "." + method, ResponseTimings(startTime, endTime), Status("KO"), None, Some(errorMessage)) throttle(newSession(session).markAsFailed) } case FuFailure(e) => val endTime = System.currentTimeMillis() statsEngine.logResponse(session, interface + "." + method, ResponseTimings(startTime, endTime), Status("KO"), None, Some(e.getMessage)) throttle(session.markAsFailed) } } } } private def throttle(s: Session): Unit = { if (throttled) { coreComponents.throttler.throttle(s.scenario, () => next ! s) } else { next ! s } } }
DubboActionBuilder 則是獲取 Protocol 屬性并初始化 Dubbo 客戶端:
case class DubboActionBuilder(interface: String, method: String, argTypes: Expression[Array[String]], argValues: Expression[Array[Object]], checks: List[DubboCheck]) extends ActionBuilder { private def components(protocolComponentsRegistry: ProtocolComponentsRegistry): DubboComponents = protocolComponentsRegistry.components(DubboProtocol.DubboProtocolKey) override def build(ctx: ScenarioContext, next: Action): Action = { import ctx._ val protocol = components(protocolComponentsRegistry).dubboProtocol //Dubbo客戶端配置 val reference = new ReferenceConfig[GenericService] val application = new ApplicationConfig application.setName("gatling-dubbo") reference.setApplication(application) reference.setProtocol(protocol.protocol) reference.setGeneric(protocol.generic) if (protocol.url == "") { val registry = new RegistryConfig registry.setProtocol(protocol.registryProtocol) registry.setAddress(protocol.registryAddress) reference.setRegistry(registry) } else { reference.setUrl(protocol.url) } reference.setInterface(interface) val cache = ReferenceConfigCache.getCache val genericService = cache.get(reference) val objectMapper: ObjectMapper = new ObjectMapper() new DubboAction(interface, method, argTypes, argValues, genericService, checks, coreComponents, throttled, objectMapper, next) } }
LambdaProcessBuilder 則提供了設置 Dubbo 泛化調用入參的 DSL 以及接下來要介紹的 Check 部分的 DSL
case class DubboProcessBuilder(interface: String, method: String, argTypes: Expression[Array[String]] = _ => Success(Array.empty[String]), argValues: Expression[Array[Object]] = _ => Success(Array.empty[Object]), checks: List[DubboCheck] = Nil) extends DubboCheckSupport { def argTypes(argTypes: Expression[Array[String]]): DubboProcessBuilder = copy(argTypes = argTypes) def argValues(argValues: Expression[Array[Object]]): DubboProcessBuilder = copy(argValues = argValues) def check(dubboChecks: DubboCheck*): DubboProcessBuilder = copy(checks = checks ::: dubboChecks.toList) def build(): ActionBuilder = DubboActionBuilder(interface, method, argTypes, argValues, checks) }Check
全鏈路壓測中,我們都使用Json Path校驗 HTTP 請求結果,Dubbo 壓測插件中,我們也實現了基于Json Path的校驗。實現 Check,必須實現 Gatling check 中的 Extender 和 Preparer:
package object dubbo { type DubboCheck = Check[String] val DubboStringExtender: Extender[DubboCheck, String] = (check: DubboCheck) => check val DubboStringPreparer: Preparer[String, String] = (result: String) => Success(result) }
基于Json Path的校驗邏輯:
trait DubboJsonPathOfType { self: DubboJsonPathCheckBuilder[String] => def ofType[X: JsonFilter](implicit extractorFactory: JsonPathExtractorFactory) = new DubboJsonPathCheckBuilder[X](path, jsonParsers) } object DubboJsonPathCheckBuilder { val CharsParsingThreshold = 200 * 1000 def preparer(jsonParsers: JsonParsers): Preparer[String, Any] = response => { if (response.length() > CharsParsingThreshold || jsonParsers.preferJackson) jsonParsers.safeParseJackson(response) else jsonParsers.safeParseBoon(response) } def jsonPath(path: Expression[String])(implicit extractorFactory: JsonPathExtractorFactory, jsonParsers: JsonParsers) = new DubboJsonPathCheckBuilder[String](path, jsonParsers) with DubboJsonPathOfType } class DubboJsonPathCheckBuilder[X: JsonFilter]( private[check] val path: Expression[String], private[check] val jsonParsers: JsonParsers )(implicit extractorFactory: JsonPathExtractorFactory) extends DefaultMultipleFindCheckBuilder[DubboCheck, String, Any, X]( DubboStringExtender, DubboJsonPathCheckBuilder.preparer(jsonParsers) ) { import extractorFactory._ def findExtractor(occurrence: Int) = path.map(newSingleExtractor[X](_, occurrence)) def findAllExtractor = path.map(newMultipleExtractor[X]) def countExtractor = path.map(newCountExtractor) }
DubboCheckSupport 則提供了設置 jsonPath 表達式的 DSL
trait DubboCheckSupport { def jsonPath(path: Expression[String])(implicit extractorFactory: JsonPathExtractorFactory, jsonParsers: JsonParsers) = DubboJsonPathCheckBuilder.jsonPath(path) }
Dubbo 壓測腳本中可以設置一個或多個 check 校驗請求結果,使用 DSL check 方法*
DSLtrait AwsDsl提供頂層 DSL。我們還定義了 dubboProtocolBuilder2DubboProtocol、dubboProcessBuilder2ActionBuilder 兩個 Scala 隱式方法,以自動構造 DubboProtocol 和 ActionBuilder。
此外,泛化調用中使用的參數類型為 Java 類型,而我們的壓測腳本使用 Scala 編寫,所以這里需要做兩種語言間的類型轉換,所以我們定義了 transformJsonDubboData 方法
trait DubboDsl extends DubboCheckSupport { val Dubbo = DubboProtocolBuilderBase def dubbo(interface: String, method: String) = DubboProcessBuilder(interface, method) implicit def dubboProtocolBuilder2DubboProtocol(builder: DubboProtocolBuilder): DubboProtocol = builder.build implicit def dubboProcessBuilder2ActionBuilder(builder: DubboProcessBuilder): ActionBuilder = builder.build() def transformJsonDubboData(argTypeName: String, argValueName: String, session: Session): Session = { session.set(argTypeName, toArray(session(argTypeName).as[JList[String]])) .set(argValueName, toArray(session(argValueName).as[JList[Any]])) } private def toArray[T:ClassTag](value: JList[T]): Array[T] = { value.asScala.toArray } }
object Predef extends DubboDslDubbo 壓測腳本和數據 Feeder 示例
壓測腳本示例:
import io.gatling.core.Predef._ import io.gatling.dubbo.Predef._ import scala.concurrent.duration._ class DubboTest extends Simulation { val dubboConfig = Dubbo .protocol("dubbo") .generic("true") //直連某臺Dubbo機器,只多帶帶壓測一臺機器的水位 .url("dubbo://IP地址:端口") //或設置注冊中心,壓測該Dubbo應用集群的水位,支持ETCD3注冊中心 .registryProtocol("") .registryAddress("") val jsonFileFeeder = jsonFile("data.json").circular //數據Feeder val dubboScenario = scenario("load test dubbo") .forever("repeated") { feed(jsonFileFeeder) .exec(session => transformJsonDubboData("args_types1", "args_values1", session)) .exec(dubbo("com.xxx.xxxService", "methodName") .argTypes("${args_types1}") .argValues("${args_values1}") .check(jsonPath("$.code").is("200")) ) } setUp( dubboScenario.inject(atOnceUsers(10)) .throttle( reachRps(10) in (1 seconds), holdFor(30 seconds)) ).protocols(dubboConfig) }
data.json 示例:
[ { "args_types1": ["com.xxx.xxxDTO"], "args_values1": [{ "field1": "111", "field2": "222", "field3": "333" }] } ]Dubbo 壓測報告示例
我的系列博客
混沌工程 - 軟件系統高可用、彈性化的必由之路
異步系統的兩種測試方法
我的其他測試相關開源項目
捉蟲記:方便產品、開發、測試三方協同自測的管理工具
招聘
有贊測試組在持續招人中,大量崗位空缺,只要你來,就能幫你點亮全棧開發技能樹,有意向換工作的同學可以發簡歷到 sunjun【@】youzan.com
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/11449.html
摘要:為了控制壓測時的,則需要實現邏輯。則是獲取屬性并初始化客戶端客戶端配置則提供了設置泛化調用入參的以及接下來要介紹的部分的全鏈路壓測中,我們都使用校驗請求結果,壓測插件中,我們也實現了基于的校驗。 Dubbo 壓測插件已開源,本文涉及代碼詳見gatling-dubbo Gatling 是一個開源的基于 Scala、Akka、Netty 實現的高性能壓測框架,較之其他基于線程實現的壓測框架...
摘要:工欲善其事,必先利其器,我們拿什么工具來壓測呢我們做了很多前期調研和論證,最終決定基于開發有贊自己的分布式全鏈路壓測引擎。 一年以前,有贊準備在雙十一到來之前對系統進行一次性能摸底,以便提前發現并解決系統潛在性能問題,好讓系統在雙十一期間可以從容應對劇增的流量。工欲善其事,必先利其器,我們拿什么工具來壓測呢?我們做了很多前期調研和論證,最終決定基于 Gatling 開發有贊自己的分布式...
摘要:被測的網關都沒有添加額外業務,只做反向代理吞吐量下圖是吞吐量的情況,可以看到均比直壓低一點點,而和則要低得多。結論三者的表現均很不錯,其對于吞吐量和響應時間的性能損耗很低,可以忽略不計。 用于實現API網關的技術有很多,大致分為這么幾類: 通用反向代理:Nginx、Haproxy、…… 網絡編程框架:Netty、Servlet、…… API網關框架:Spring Cloud Gate...
摘要:被測的網關都沒有添加額外業務,只做反向代理吞吐量下圖是吞吐量的情況,可以看到均比直壓低一點點,而和則要低得多。結論三者的表現均很不錯,其對于吞吐量和響應時間的性能損耗很低,可以忽略不計。 用于實現API網關的技術有很多,大致分為這么幾類: 通用反向代理:Nginx、Haproxy、…… 網絡編程框架:Netty、Servlet、…… API網關框架:Spring Cloud Gate...
閱讀 1123·2021-11-24 09:39
閱讀 3623·2021-09-02 15:21
閱讀 2161·2021-08-24 10:01
閱讀 722·2021-08-19 10:55
閱讀 2447·2019-08-30 15:55
閱讀 1212·2019-08-30 14:16
閱讀 2992·2019-08-29 15:17
閱讀 3235·2019-08-29 13:53