用gRPC实现adb调用的限流策略

今天这篇是平时工作时的技术总结,我是一个测试开发工程师,目前开发了一套智能硬件的测试框架,还开发了一套智能硬件的云真机测试平台。

在云真机平台的日常运行中,我发现了一个比较奇怪的问题,当长时间批量设备执行测试时,有时候会出现设备大量掉线,并且重插usb电脑不识别设备,就好像usb接口坏了一样,只有重启电脑才能解决这个问题。

在经过一段时间的排查后,发现这是因为批量执行测试时,adb调用的并发量太大,超过了电脑所能承受的上限。当每秒adb的调用次数在10次左右时,可以稳定运行测试,调用速率再快就会导致usb出问题。最终,通过对adb调用进行限流解决了这个问题。

限流方案我采用的是令牌桶的方案,每秒重置桶里令牌数为10个,当测试引擎成功获取到令牌时,调用adb,获取失败时循环等待令牌。限流方案的示意图如下:

在具体实现时,测试执行器与测试控制器分别是不同的进程,内存变量不共享,需要进程间通讯来实现令牌的获取。因此采用grpc调用来实现限流方案。

测试控制端在启动时初始化一个令牌桶变量,另外再启动一个每秒定时任务,每秒重置令牌个数为10个,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
const MaxAdbProcess=10

var Sema int
var SemaMu sync.Mutex

// 初始化Sema为10
func init() {
Sema = MaxAdbProcess
}
// 每秒重置令牌桶的令牌个数
globalVariable.Cron.AddFunc(
"* * * * * *", func() {
SemaMu.Lock()
Sema = MaxAdbProcess
SemaMu.Unlock()
})

获取及归还令牌API的protobuf 定义:

1
2
3
4
5
6
7
8
9
10
11
service UiAutomation{
rpc AdbSemaphore(AdbSemaphereReq)returns(AdbSemaphereRes);
}
// 如果UseAdb为true则获取一个信号量,如果为false则归还一个信号量
message AdbSemaphereReq{
bool UseAdb=1;
}
// 如果Permission为true则成功获取一个信号量,为false则获取信号量失败
message AdbSemaphereRes{
bool Permission=1;
}

获取及归还令牌API的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (s *server) AdbSemaphore(ctx context.Context, in *pb.AdbSemaphereReq) (*pb.AdbSemaphereRes, error) {
defer func() {
if err := recover(); err != nil {
globalVariable.Println("出现了问题,原因:", "[panic] err: %v\nstack: %s\n", err, utility.GetCurrentGoroutineStack())
}
}()
// 如果是获取令牌
if in.GetUseAdb() {
SemaMu.Lock()
defer SemaMu.Unlock()
if Sema > 0 {
Sema--
return &pb.AdbSemaphereRes{Permission: true}, nil
} else {
return &pb.AdbSemaphereRes{Permission: false}, nil
}
} else {
// 如果是释放令牌
SemaMu.Lock()
defer SemaMu.Unlock()
if Sema < consts.MaxAdbProcess {
Sema++
}
return &pb.AdbSemaphereRes{Permission: true}, nil
}
}

测试执行器获取令牌:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func GetLock(ctx context.Context) {
if globalVariable.UiAutomationGrpcClient == nil {
return
}
// 如果没有获取到令牌则死循环卡在这里,
// 如果grpc连接出现问题则不再获取令牌直接放行,
// 以免卡住测试
for {
semaphore, err := globalVariable.UiAutomationGrpcClient.AdbSemaphore(context.Background(), &uiautomation.AdbSemaphereReq{UseAdb: true})
if err != nil {
globalVariable.Println("[hjd debug]:", "进程锁出问题 ", err)
break
}
if semaphore.GetPermission() {
break
}
time.Sleep(time.Millisecond * 10)
}
}

测试执行器归还令牌:

1
2
3
4
5
6
7
8
9
10
11
func ReleaseLock(ctx context.Context) {
if globalVariable.UiAutomationGrpcClient == nil {
return
}
_, err := globalVariable.UiAutomationGrpcClient.AdbSemaphore(context.Background(), &uiautomation.AdbSemaphereReq{UseAdb: false})
if err != nil {
globalVariable.Println("[hjd debug]", "释放锁失败", err)
globalVariable.UiAutomationGrpcClient = nil
return
}
}

令牌桶机制搭建好之后,就要把所有调用adb的地方都加上获取令牌及释放令牌的函数调用,类似于这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func ExecAdb(ctx context.Context, sn util.SN, args []string) (string, error) {
var cmd *exec.Cmd
if sn != "" {
finalArgs := []string{"-s", sn.String()}
finalArgs = append(finalArgs, args...)
cmd = exec.Command(adbExecutable, finalArgs...)
} else {
cmd = exec.Command(adbExecutable, args...)
}
// 获取令牌
progressLock.GetLock(ctx)
stdout, err := cmd.Output()
// 释放令牌
progressLock.ReleaseLock(ctx)
if err != nil {
globalVariable.Println(err.Error())
return "", err
}
return string(stdout), nil
}

测试引擎在执行测试时,每当要调用adb命令时都先获取令牌,只有拿到令牌的进程才能调用adb命令。

这样就能把adb调用的频率限制住了,不论同时有多少台设备进行测试,adb的调用频率始终保持在10次左右。

这一篇就到这里啦。欢迎大家点赞、转发、私信。

江达小记