我们来自字节跳动飞书商业应用研发部(Lark Business Applications),目前我们在北京、深圳、上海、武汉、杭州、成都、广州、三亚都设立了办公区域。我们关注的产品领域主要在企业经验管理软件上,包括飞书 OKR、飞书绩效、飞书招聘、飞书人事等 HCM 领域系统,也包括飞书审批、OA、法务、财务、采购、差旅与报销等系统。欢迎各位加入我们。
本文作者:飞书商业应用研发部 高世凯
欢迎大家关注飞书技术,每周定期更新飞书技术团队技术干货内容,想看什么内容,欢迎大家评论区留言~
前言
前段时间接入一个业务需求,需要对用户输入的数据进行优先级排序。按优先级排序后处理数据且不能有循环依赖,便想到了有向无环图,以及对应的依赖解析算法-拓扑排序。
拓扑排序介绍
拓扑排序(Topological Order)是指,将一个有向无环图(Directed Acyclic Graph简称DAG)进行排序进而得到一个有序的线性序列。
举例来说,如果我们将一系列需要运行的任务构成一个有向图,图中的有向边则代表某一任务必须在另一个任务之前完成这一限制。那么运用拓扑排序,我们就能得到满足执行顺序限制条件的一系列任务所需执行的先后顺序。
例如一个有向无环图如下:
根据图中的边的方向,我们可以看出,若要满足得到其拓扑排序,则结点被遍历的顺序必须满足如下要求:
- 结点 1 必须在结点 2、3 之前
- 结点 2 必须在结点 3、4 之前
- 结点 3 必须在结点 4、5 之前
- 结点 4 必须在结点 5 之前
则一个满足条件的拓扑排序为[ 1, 2, 3, 4, 5]。
入度和出度
- 入度:设有向图中有一结点v,其入度即为当前所有从其他结点出发,终点为v的的边的数目。也就是所有指向v的有向边的数目。
- 出度:设有向图中有一结点v,其出度即为当前所有起点为v,指向其他结点的边的数目。也就是所有由v发出的边的数目。
如上图中,结点1的入度为 0,出度为 2;结点5的出度为 0,入度为 2。
要想完成拓扑排序,我们每次都应当从入度为 0 的结点开始 BFS 遍历。因为只有入度为0的结点才能够成为拓扑排序的起点。
业务背景
- 用户在人事系统录入一批具有上下级关系的人员,这批人员没有顺序,每个人会同时录入其实线上级和虚线上级。
- 后端业务系统对用户录入的人员按实线上级、虚线上级关系进行排序,下级需要等上级录入完成生成上级的实体id 后,才能获取到上级的 id 并继续录入下级自身。
以下图用户输入为例,图中实线代表实线上级关系,虚线代表虚线上级关系。如员工A的实线上级为B,虚线上级为C。
程序解析传参结果:
[
[A,B],
[B,C],
[B,D],
[A,C],
[D,E],
[D,F],
[C,E],
[E,F],
[Q,W],
[Q,R],
[W,R],
[W,T],
[R,T],
[T,Y],
[T,U],
]
业务诉求
- 传参校验,不能有环。找出环上结点给到用户提示。
- 排序,顺序为先插入最高等级依赖结点,即出度为 0 的结点。
- 分组,对传入的结点排序后,图之间分隔开来,排序结果交给不同机器实例执行后续。如上图中执行排序后结果大概为:
[[F,E,D,C,B,A],[U,Y,T,W,R,Q]]
算法实现
成环检测
这块实现比较简单,看代码就可以理解,不再赘述
func DetectParentCircles(arr interface{}, keyFunc func(idx int) (id, parentID int)) ([][]int, error) {
value := reflect.ValueOf(arr)
if value.Kind() != reflect.Array && value.Kind() != reflect.Slice {
return nil, ErrNotArray
}
if value.Len() == 0 {
return nil, nil
}
var ret [][]int
inCircle := make(map[int]bool)
parentMap := make(map[int]int)
ids := make([]int, 0, value.Len())
for i := 0; i < value.Len(); i++ {
id, pid := keyFunc(i)
parentMap[id] = pid
ids = append(ids, id)
}
for _, id := range ids {
if _, ok := inCircle[id]; ok {
continue
}
visit := make(map[int]int)
circle := []int{id}
curr := id
visit[id] = 0
for {
if leader, ok := parentMap[curr]; ok {
if _, ok := inCircle[leader]; ok {
break
}
if circleIdx, ok := visit[leader]; ok {
ret = append(ret, circle[circleIdx:])
for _, tmp := range circle[circleIdx:] {
inCircle[tmp] = true
}
break
}
visit[leader] = len(circle)
circle = append(circle, leader)
curr = leader
} else {
break
}
}
}
return ret, nil
}
拓扑排序分组
思路
- 由于排序顺序需要是出度为 0 结点优先,所以我们采用 DFS 遍历。对结点用未访问、访问中,已访问三个状态标识,递归找到出度为 0 的结点。
- 分组,用一个 map 保存结点对应分组 identity,每遍历一个出度为 0 结点时创建一个 identity。后续遍历的结点记录的 identity 和此结点的前置依赖结点保持一致。
代码
代码实现有些暴力,抛砖引玉~
func topologicalShard(relatedList [][]int) [][]int {
if len(relatedList) == 0 {
return [][]int{}
}
node2partMap := make(map[int]int)
partMap := make(map[int][]int)
dfsMap := make(map[int][]int)
// 构建依赖关系
for i := 0; i < len(relatedList); i++ {
dfsMap[relatedList[i][0]] = append(dfsMap[relatedList[i][0]], relatedList[i][1])
}
visited := make(map[int]int) // 标记某元素是否使用过:未访问、访问中,已访问
var searchDfs func(nums []int)
var order []int
var mapKey int
partIDs := make(map[int]struct{})
searchDfs = func(nums []int) {
for _, num := range nums {
if visited[num] == NOTVISIT {
visited[num] = VISITING
searchDfs(dfsMap[num])
order = append(order, num)
node2partMap[num] = mapKey
visited[num] = VISITED
} else if visited[num] == VISITED {
partIDs[node2partMap[num]] = struct{}{}
}
}
}
for key := range dfsMap {
mapKey = key
searchDfs([]int{key})
// 多个有关联的组合并成一个组
if len(partMap) > 0 {
var tmp []int
for k, _ := range partIDs {
for _, it := range partMap[k] {
tmp = append(tmp, it)
node2partMap[it] = key
}
delete(partMap, k)
}
tmp = append(tmp, order...)
partMap[key] = tmp
} else {
partMap[key] = order
}
order = []int{}
partIDs = make(map[int]struct{})
}
ret := make([][]int, 0, len(partMap))
for _, value := range partMap {
ret = append(ret, value)
}
return ret
}
Benchmark 测试常规导入5000人随机依赖关系排序分组结果:
加入我们
扫码发现职位&投递简历
官网投递