并发缓存,顾名思义就是让缓存可以在多个并发的 goroutine 中共享、交错使用,这在爬虫程序中尤其常见,每个新的爬虫任务都希望能够“实时地”复用之前,或正在进行中的任务结果,从而避免重复的网络请求开销。
这篇文章,以 yande.re 的图片爬虫为例,介绍如何在多个 goroutine 间共享并发缓存,从而实现一个高效的并发爬虫。yande.re 是一个还挺知名的二刺螈图片下载站点,里面图片大都是收录于各个渠道的插画原图。
首先,先让我们来实现一个简单的无并发的串行版本。为了简单,只采集最最基本的几个字段,因为这不是本文重点:
type Artwork struct {
ID string `json:"id"` // id of the artwork, e.g. "yande:932473"
Image string `json:"image"` // original image url
Title string `json:"title"` // title of the artwork
}
func (a *Artwork) String() string {
return fmt.Sprintf("ID: %s", a.ID) // for convenience to debug, we print the ID only
}
然后让我们快速的实现它:
// yande.go
type Yande struct {
occur map[string]struct{}
}
func (y *Yande) id(u string) string {
matches := regexp.MustCompile(`/post/show/(\d+)`).FindStringSubmatch(u)
if len(matches) == 2 {
return matches[1]
}
return ""
}
func (y *Yande) parse(id string, r io.Reader) (artworks []*Artwork, err error) {
doc, err := goquery.NewDocumentFromReader(r)
if err != nil {
return nil, err
}
image := doc.Find(".original-file-unchanged").AttrOr("href", "")
if image == "" { // there is no original file, use the changed one
image = doc.Find(".original-file-changed").AttrOr("href", "")
}
artworks = append(artworks, &Artwork{
ID: id,
Image: image,
Title: doc.Find("title").Text(),
})
// Find all related artworks, and Fetch them recursively in turn.
doc.Find("div.status-notice > a[href^='/post/show/']").Each(func(_ int, s *goquery.Selection) {
if id := y.id(s.AttrOr("href", "")); id != "" {
a, _ := y.Fetch("https://yande.re/post/show/" + id) // we don't care about the error in recursion
artworks = append(artworks, a...)
}
})
return artworks, nil
}
func (y *Yande) Fetch(u string) ([]*Artwork, error) {
id := y.id(u) // id of the artwork, e.g. "932473"
if id == "" {
return nil, errors.New("invalid url")
} else if _, ok := y.occur[id]; ok {
return nil, errors.New("the artwork has been fetched")
} else {
y.occur[id] = struct{}{}
}
r, err := http.Get(u)
if err != nil {
return nil, err
}
defer r.Body.Close()
return y.parse(id, r.Body)
}
func NewYande() *Yande {
return &Yande{
occur: make(map[string]struct{}),
}
}
上面代码中使用了 PuerkitoBio/goquery 这个包用来解析和选取 HTML,通过 go get
安装它:
go get -u github.com/PuerkitoBio/goquery
最后写个简单的测试 yande_test.go
,验证结果是否正确:
func TestYande_Fetch(t *testing.T) {
y := NewYande()
artworks, err := y.Fetch("https://yande.re/post/show/932473")
if err != nil {
panic(err)
}
for _, a := range artworks {
fmt.Println(a)
}
}
这里及之后均使用 https://yande.re/post/show/932473 作为测试,它目前关联了其它 4 个插画,若结果正确,应该包含如下链接:
执行测试,正如我们预期,确实是这 5 个:
ID: yande:932473
ID: yande:932468
ID: yande:932469
ID: yande:932472
ID: yande:933175
--- PASS: TestYande_Fetch (2.58s)
上面“串行版本”的最大缺点是“慢”,现在,让我们将其改造一下,先定个小目标:
首先修改一下 Yande
的结构:
type Yande struct {
doing map[string]struct{}
done map[string]struct{}
mu sync.Mutex
bus chan *Artwork
}
doing
正在进行的任务,done
已经完成的任务mu
:由于需要在多个 goroutine 中修改 doing
或 done
,因此需要添加一个 Mutex。你也可以使用 RWMutex
或 sync.Map
等代替,这里为了简单使用 Mutex
bus
:所有 goroutine 采集的结果都放在这里面,同时作为 Fetch
返回值返回,在所有 goroutine 任务结束后将其 close然后快速实现一下:
func (y *Yande) parse(id string, r io.Reader) (err error) {
doc, err := goquery.NewDocumentFromReader(r)
if err != nil {
return err
}
image := doc.Find(".original-file-unchanged").AttrOr("href", "")
if image == "" { // there is no original file, use the changed one
image = doc.Find(".original-file-changed").AttrOr("href", "")
}
// Find all related artworks, and Fetch them recursively in turn.
doc.Find("div.status-notice > a[href^='/post/show/']").Each(func(_ int, s *goquery.Selection) {
if id := y.id(s.AttrOr("href", "")); id != "" {
_, _ = y.Fetch("https://yande.re/post/show/" + id) // we don't care about the error in recursion
}
})
y.bus <- &Artwork{
ID: "yande:" + id,
Image: image,
Title: doc.Find("title").Text(),
}
y.end(id)
return nil
}
func (y *Yande) end(id string) {
defer y.mu.Unlock()
y.mu.Lock()
y.done[id] = struct{}{}
delete(y.doing, id)
if len(y.doing) == 0 {
close(y.bus)
}
}
func (y *Yande) Fetch(u string) (<-chan *Artwork, error) {
id := y.id(u) // id of the artwork, e.g. "932473"
if id == "" {
return nil, errors.New("invalid url")
}
defer y.mu.Unlock()
y.mu.Lock()
if _, ok := y.doing[id]; ok {
return nil, errors.New("already doing")
} else if _, ok := y.done[id]; ok {
return nil, errors.New("already done")
} else {
y.doing[id] = struct{}{}
}
go func() (succeed bool) {
defer func() {
if !succeed {
y.end(id)
}
}()
log.Println("Fetching", u)
r, err := http.Get(u)
if err != nil {
return false
}
defer r.Body.Close()
return y.parse(id, r.Body) == nil
}()
return y.bus, nil
}
func NewYande() *Yande {
return &Yande{
doing: make(map[string]struct{}),
done: make(map[string]struct{}),
bus: make(chan *Artwork, 30),
}
}
测试用例需要稍微改一下:
func TestYande_Fetch(t *testing.T) {
ch, err := NewYande().Fetch("https://yande.re/post/show/932473")
if err != nil {
panic(err)
}
for a := range ch {
fmt.Println(a)
}
}
最后执行看一下结果:
2022/08/30 19:31:30 Fetching https://yande.re/post/show/932473
2022/08/30 19:31:31 Fetching https://yande.re/post/show/932468
ID: yande:932473
2022/08/30 19:31:31 Fetching https://yande.re/post/show/932472
2022/08/30 19:31:31 Fetching https://yande.re/post/show/932469
2022/08/30 19:31:31 Fetching https://yande.re/post/show/933175
ID: yande:932472
ID: yande:932468
ID: yande:932469
ID: yande:933175
--- PASS: TestYande_Fetch (1.56s)
正确!耗时 1.56s,和串行版本的 2.58s 相比,提快了近 40%,当数据量更大时这个数字会更明显。
可以发现请求是并发执行的,并且在第二个请求开始后,第一个结果就“就绪”了,这意味着边采集的同时,外层函数(caller)甚至可以同时拿就绪的结果执行其它任务,从而进一步减少处理延迟,这就像一个魔法!
“并发版本”已经是局部最优解了,但是,从全局来看,它还存在一些问题。让我们更改一下测试用例:
func TestYande_Fetch(t *testing.T) {
ch1, _ := NewYande().Fetch("https://yande.re/post/show/932473")
ch2, _ := NewYande().Fetch("https://yande.re/post/show/932473")
for a := range ch1 {
fmt.Println("from ch1:", a)
}
for a := range ch2 {
fmt.Println("from ch2:", a)
}
}
这里假设在同一时刻,有两个来自用户的请求,要求同时采集同个插画,执行后的 output:
2022/08/30 19:43:05 Fetching https://yande.re/post/show/932473
2022/08/30 19:43:05 Fetching https://yande.re/post/show/932473
2022/08/30 19:43:06 Fetching https://yande.re/post/show/932469
2022/08/30 19:43:06 Fetching https://yande.re/post/show/933175
2022/08/30 19:43:06 Fetching https://yande.re/post/show/932472
2022/08/30 19:43:06 Fetching https://yande.re/post/show/932468
2022/08/30 19:43:07 Fetching https://yande.re/post/show/932468
2022/08/30 19:43:07 Fetching https://yande.re/post/show/932469
2022/08/30 19:43:07 Fetching https://yande.re/post/show/932472
from ch1: ID: yande:932473
2022/08/30 19:43:07 Fetching https://yande.re/post/show/933175
from ch1: ID: yande:932468
from ch1: ID: yande:932472
from ch1: ID: yande:932469
from ch1: ID: yande:933175
from ch2: ID: yande:932473
from ch2: ID: yande:932469
from ch2: ID: yande:932468
from ch2: ID: yande:932472
from ch2: ID: yande:933175
--- PASS: TestYande_Fetch (1.73s)
可以发现,我们的 5 个请求被放大到 2 倍 —— 每个请求都重复了一次。这是对 I/O 资源不必要的浪费,因为每个相互独立的 Fetch
并不能从另一个,可能已经就绪的 Fetch
中获得结果,从而受益。
因此,我们需要引入“全局缓存”的概念,并且是针对 Channel 的,是的,我们要“并发地”缓存 Channel!
要实现并发缓存 Channel 并不容易,需要考虑许多边缘情况。但这不妨碍我们设想一个一切顺利的情况:
ch1, _ := NewYande().Fetch1("https://yande.re/post/show/932473") // 932473 is an artwork related to 932469
ch2, _ := NewYande().Fetch2("https://yande.re/post/show/932469") // 932469 is an artwork related to 932473
上面代码的 output 可能长这样:
# 1 yande:932473 from ch1 // Fetch1() called
# 2 yande:932469 from ch2 // Fetch2() called
# 3 yande:932468 from ch1 // first related artwork from Fetch1()
# 4 yande:932469 from ch1 // second related artwork from Fetch1(), hit cache #2
# 5 yande:932473 from ch2 // first related artwork from Fetch2(), hit cache #1
# 6 yande:932468 from ch2 // second related artwork from Fetch2(), hit cache #3
# 7 yande:932472 from ch2 // ...
# 8 yande:933175 from ch2 // ...
# 9 yande:932472 from ch1 // hit cache #7
#10 yande:933175 from ch1 // hit cache #8
可以发现 Fetch1
、Fetch2
彼此交错执行,并且双方都为获得所有(5 个)插画而努力,但 Fetch1
并不知晓来自彼岸的 Fetch2
,反之亦然,双方各自不掌握对方的任何情报。因此,我们需要为它们建立纽带,即实现一个 Graph 结构,Fetch1
、Fetch2
是这个 Graph 的构建者,同时也是彼此的受益者。
简而言之,每次产出新 Artwork
时,将其添加到 Graph,同时与来自其它 goroutine 的 []Artwork
合并,其实就是个合并 sub graph 的操作,从而吞并成一个更大的 Graph。
但是,现实并非是完美无暇的,像上面 output 这样 Fetch1
、Fetch2
之间“近似完美”地交错,不仅没必要,还会带来性能损失。没必要是因为,本身单个 Fetch 就是并发的,不需要通过多个 Fetch 实现并发;性能损失是指,它们任何一方获取并使用对方缓存,都会产生额外的交互开销。
因此,最佳做法是仅保持其中一个 Fetch 是 active 的,另一个(一些)则处于 idle 状态,我们通过为 Graph 添加 Wait
、Done
两个方法实现,以下是 Graph 实现代码:
type Node struct {
idx int
val string
}
type wait struct {
ch []chan struct{}
quality uint
}
type Graph struct {
mu sync.RWMutex
waits map[int]*wait
nodes map[string]*Node
edges map[string]map[string]struct{}
}
func (g *Graph) addNode(id string, quality uint) (idx int) {
if _, ok := g.nodes[id]; ok {
return -1
}
idx = len(g.nodes)
g.nodes[id] = &Node{idx: idx, val: id}
if quality == 0 {
g.waits[idx] = &wait{quality: 0}
} else {
g.waits[idx] = &wait{ch: []chan struct{}{make(chan struct{})}, quality: quality}
}
return
}
func (g *Graph) IncNode(id string, quality uint) (idx int, exists bool) {
defer g.mu.Unlock()
g.mu.Lock()
if n, ok := g.nodes[id]; !ok {
return g.addNode(id, quality), false
} else {
g.waits[n.idx].quality += quality
return n.idx, true
}
}
func (g *Graph) mergeGraph(a, b *Node) {
log.Printf("edge changed: %s(%d) -> %s(%d)\n", b.val, b.idx, a.val, a.idx)
// quality
g.waits[a.idx].quality += g.waits[b.idx].quality
// blockers
g.waits[a.idx].ch = append(g.waits[a.idx].ch, g.waits[b.idx].ch...)
delete(g.waits, b.idx)
// sync to all nodes in that same graph of original
for _, n := range g.bfs(b.val, make(map[string]struct{})) {
g.nodes[n].idx = a.idx
}
}
func (g *Graph) hasEdge(a, b string) bool {
if _, ok := g.edges[a]; !ok {
return false
}
if _, ok := g.edges[a][b]; ok {
return true
}
return false
}
func (g *Graph) AddEdge(a, b string) bool {
if a == b {
return false
}
defer g.mu.Unlock()
g.mu.Lock()
if g.hasEdge(a, b) {
return false
}
_ = g.addNode(a, 0)
if n, ok := g.nodes[b]; !ok {
g.nodes[b] = &Node{idx: g.nodes[a].idx, val: b}
} else if idx := g.nodes[a].idx; n.idx != idx {
g.mergeGraph(g.nodes[a], n)
}
if _, ok := g.edges[a]; !ok {
g.edges[a] = make(map[string]struct{})
}
if _, ok := g.edges[b]; !ok {
g.edges[b] = make(map[string]struct{})
}
g.edges[a][b] = struct{}{}
g.edges[b][a] = struct{}{}
return true
}
func (g *Graph) Wait(id string) {
g.mu.RLock()
n, ok := g.nodes[id]
if !ok {
g.mu.RUnlock()
return
}
var ch chan struct{}
if w, ok := g.waits[n.idx]; ok && len(w.ch) > 0 {
ch = g.waits[n.idx].ch[0]
}
g.mu.RUnlock()
if ch != nil {
<-ch
}
}
func (g *Graph) Done(id string) int {
defer g.mu.Unlock()
g.mu.Lock()
n, ok := g.nodes[id]
if !ok {
return -1
}
w, ok := g.waits[n.idx]
if !ok {
return -1
}
if len(w.ch) < 1 {
return 0
}
w.quality--
if w.quality > 0 { // still waiting for other workers
return int(w.quality)
}
for _, ch := range w.ch {
close(ch)
}
w.ch = w.ch[:0]
return 0
}
func (g *Graph) DoneForWait(id string) []string {
g.Done(id)
g.Wait(id)
return g.Siblings(id)
}
func (g *Graph) bfs(id string, visited map[string]struct{}) (nodes []string) {
queue := []string{id}
for len(queue) > 0 {
head := queue[0]
if _, ok := visited[head]; ok {
queue = queue[1:]
} else {
visited[head] = struct{}{}
nodes = append(nodes, head)
queue = append(queue[1:], collect.Keys(g.edges[head])...)
}
}
return
}
func (g *Graph) Siblings(id string) []string {
defer g.mu.RUnlock()
g.mu.RLock()
return g.bfs(id, make(map[string]struct{}))
}
func NewGraph() *Graph {
return &Graph{
waits: make(map[int]*wait),
nodes: make(map[string]*Node),
edges: make(map[string]map[string]struct{}),
}
}
Go 的 Channel 只能被一次性读取,那要怎么把它缓存下来,供之后多次使用呢?之前我倒是想过,实现一个并发读写的共享 Channel 模型,即在一个 goroutine 中向 cached channel 写入数据,其它共享这个 channel cache 的 goroutine 也可以“实时地”读到写入的数据 —— 这种模式被称为 Fan-out,这里有一篇很好的文章简单介绍它。
我自以为是的实现了它,使用复杂的 pub/sub,但发现性能不佳,于是最终我选择了另一个“非实时”的方案:
func someExpensiveComputation() {
dist := make(chan int)
go func () {
defer close(dist)
for i := 0; i < 5; i++ {
time.Sleep(time.Second)
ch <- i
}
}
return cache.Put("some-id", dist)
}
ch1 := someExpensiveComputation()
ch2 := cache.Pull("some-id")
ch3 := cache.Pull("some-id")
每当 someExpensiveComputation
产出一个新结果,ch1
可以实时地读到该结果,但 ch2
、ch3
会在所有 5 个任务结束,即 close(dist)
时,一次性读到所有缓冲的 5 条结果。这么设计的原因如下:
我们只是想要“并发缓存”,而非“并发 Channel”,因为 Channel 间并发交互的开销更大
对于小型计算任务,没必要使用“并发 Channel”。如我们 someExpensiveComputation
还是产生 5 条结果,且只是对这些结果执行累加:
go func() {
sum := 0
for i := range ch1 {
sum += i
}
}
go func() {
sum := 0
for i := range ch2 {
sum += i
}
}
由于计算量、结果数量都很小,两个 goroutine 分别使用实时的 ch1
,和非实时的 ch2
,性能差别不大。
对于大型计算任务,更没必要使用“并发 Channel”。如我们 someExpensiveComputation
现在产生 5w 条结果,处理 1 条结果耗费 1ms,这意味着若采用“并发 Channel”,每个 goroutine 都将耗费 50s 计算一个“完全相同”的结果,造成计算资源浪费。
此时正确的做法应是,只由其中一个 goroutine 负责计算,并将最终计算结果缓存给其它 goroutine 使用,避免重复计算 —— 没错,这就是我们目前将要实现的“非并发 Channel”:
go func() {
result := make(int, 1)
cache.Put("my-expensive-result", result)
sum := 0
for i := range ch1 {
time.Sleep(time.Millisecond) // assume that takes 1ms for each task
sum += i
}
result <- sum
close(result)
}
go func() {
if c := cache.Pull("my-expensive-result"); c != nil {
fmt.Println(<-c) // wait and use cached result directly
return
}
// TODO: fallback to compute like above
}
好吧,说了这么多,相信你应该可以理解了,那让我们来实现它吧:
type Cache struct {
futures sync.Map
caches sync.Map
}
func (a *Cache) Get(id string) ([]*Artwork, bool) {
if d, ok := a.caches.Load(id); ok {
return d.([]*Artwork), true
}
return nil, false
}
func (a *Cache) Has(id string) bool {
_, ok := a.caches.Load(id)
return ok
}
func (a *Cache) Set(id string, s []*Artwork) {
if len(s) > 0 {
a.caches.Store(id, s) // TODO: maybe it needs that clone each artwork into a new slice
}
}
func (a *Cache) Put(ch <-chan *Artwork, ids ...string) <-chan *Artwork {
dist := make(chan *Artwork, cap(ch))
for _, id := range ids {
if id != "" {
a.futures.LoadOrStore(id, make(chan struct{}))
}
}
go func() {
defer close(dist)
buf := make([]*Artwork, 0, cap(ch))
occur := make(map[string]struct{}, cap(ch))
for i := range ch {
if _, ok := occur[i.ID]; !ok && i.ID != "" {
occur[i.ID] = struct{}{}
dist <- i
buf = append(buf, i)
}
}
for _, id := range ids {
if id == "" {
continue
}
a.Set(id, buf)
if c, ok := a.futures.LoadAndDelete(id); ok {
close(c.(chan struct{}))
}
}
}()
return dist
}
func (a *Cache) Pull(id string) <-chan *Artwork {
future, ok := a.futures.Load(id)
if !ok && !a.Has(id) {
return nil
}
ch := make(chan *Artwork, 10)
go func() {
defer close(ch)
if ok {
<-future.(chan struct{})
}
if s, ok := a.Get(id); ok {
for _, a := range s {
ch <- a
}
}
}()
return ch
}
func (a *Cache) Merge(ids ...string) <-chan *Artwork {
if len(ids) < 1 {
return nil
}
var buf []*Artwork
occur := make(map[string]struct{})
for _, id := range ids {
if c, ok := a.futures.Load(id); ok {
<-c.(chan struct{})
}
s, _ := a.Get(id)
for _, i := range s {
if _, ok := occur[i.ID]; !ok {
occur[i.ID] = struct{}{}
buf = append(buf, i)
}
}
}
for _, id := range ids {
a.Set(id, buf)
}
ch := make(chan *Artwork, len(buf))
for _, i := range buf {
ch <- i
}
close(ch)
return ch
}
func NewCache() *Cache {
return &Cache{}
}
一切准备工作就绪,现在实现我们的最终版本:
var rel = NewGraph()
var cache = NewCache()
type Yande struct {
doing map[string]struct{}
mu sync.Mutex
master string
masterBus chan *Artwork
}
func (y *Yande) parse(id string, r io.Reader, ch chan<- *Artwork) (err error) {
doc, err := goquery.NewDocumentFromReader(r)
if err != nil {
return err
}
image := doc.Find(".original-file-unchanged").AttrOr("href", "")
if image == "" { // there is no original file, use the changed one
image = doc.Find(".original-file-changed").AttrOr("href", "")
}
// Find all related artworks, and Fetch them recursively in turn.
doc.Find("div.status-notice > a[href^='/post/show/']").Each(func(_ int, s *goquery.Selection) {
if id := y.id(s.AttrOr("href", "")); id != "" {
_, _ = y.Fetch("https://yande.re/post/show/" + id) // we don't care about the error in recursion
}
})
ch <- &Artwork{
ID: "yande:" + id,
Image: image,
Title: doc.Find("title").Text(),
}
y.end(id, ch)
return nil
}
func (y *Yande) end(id string, ch chan<- *Artwork) {
if ch != nil {
close(ch)
}
defer y.mu.Unlock()
y.mu.Lock()
delete(y.doing, id)
if len(y.doing) == 0 {
siblings := rel.DoneForWait(id)
log.Println("Merging graph for yande:", y.master, siblings)
ids := collect.Map(siblings, func(id string, _ int) string {
return "yande:" + id
})
go func() {
// We need to get a collection, that is all artworks in this graph.
for i := range cache.Merge(ids...) {
y.masterBus <- i
}
close(y.masterBus)
}()
}
}
func (y *Yande) Fetch(u string) (<-chan *Artwork, error) {
id := y.id(u) // id of the artwork, e.g. "932473"
if id == "" {
return nil, errors.New("invalid url")
}
isMaster, exists := y.master == "", false
if isMaster {
y.master = id
_, exists = rel.IncNode(y.master, 1)
} else {
exists = !rel.AddEdge(y.master, id)
}
y.mu.Lock()
if _, ok := y.doing[id]; exists || ok {
y.mu.Unlock()
if isMaster { // If it's a brand-new master, and the id is fetching on another goroutine with occurred in graph.
y.masterBus = make(chan *Artwork, 30)
y.end(id, nil)
return y.masterBus, nil
}
return nil, errors.New("the artwork has been fetched")
} else {
y.doing[id] = struct{}{}
}
y.mu.Unlock()
if c := cache.Pull("yande:" + id); c != nil {
log.Println("Hits the cache for", u)
y.end(id, nil)
return c, nil
}
ch := make(chan *Artwork, 30)
dist := cache.Put(ch, "yande:"+id)
if isMaster {
y.master = id
dist = y.masterBus
}
go func() (succeed bool) {
defer func() {
if !succeed {
y.end(id, ch)
}
}()
log.Println("Fetching", u)
r, err := http.Get(u)
if err != nil {
return false
}
defer r.Body.Close()
return y.parse(id, r.Body, ch) == nil
}()
return dist, nil
}
func NewYande() *Yande {
return &Yande{
doing: make(map[string]struct{}),
masterBus: make(chan *Artwork, 30),
}
}
代码中使用了我写的另一个包 sxyazi/go-collection,执行以下命令安装它,或将其替换:
go get -u https://github.com/sxyazi/go-collection
测试用例还是前面那个:
func TestYande_Fetch(t *testing.T) {
ch1, _ := NewYande().Fetch("https://yande.re/post/show/932473")
ch2, _ := NewYande().Fetch("https://yande.re/post/show/932473")
for a := range ch1 {
fmt.Println("from ch1:", a)
}
for a := range ch2 {
fmt.Println("from ch2:", a)
}
}
运行它:
2022/08/30 20:50:51 Fetching https://yande.re/post/show/932473
2022/08/30 20:50:52 Fetching https://yande.re/post/show/932468
2022/08/30 20:50:52 Fetching https://yande.re/post/show/932469
2022/08/30 20:50:52 Fetching https://yande.re/post/show/933175
2022/08/30 20:50:52 Fetching https://yande.re/post/show/932472
2022/08/30 20:50:53 Merging graph for yande: 932473 [932472 932473 933175 932468 932469]
2022/08/30 20:50:53 Merging graph for yande: 932473 [932473 932468 932469 932472 933175]
from ch1: ID: yande:932472
from ch1: ID: yande:932473
from ch1: ID: yande:933175
from ch1: ID: yande:932468
from ch1: ID: yande:932469
from ch2: ID: yande:932472
from ch2: ID: yande:932473
from ch2: ID: yande:933175
from ch2: ID: yande:932468
from ch2: ID: yande:932469
--- PASS: TestYande_Fetch (1.67s)
完美!没有多余的请求,每个 Fetch
的 Graph 都得到了正确的合并,且都拿到正确的 5 个插画数据。此时将 URL 修改一下:
ch1, _ := NewYande().Fetch1("https://yande.re/post/show/932473") // 932473 is an artwork related to 932469
ch2, _ := NewYande().Fetch2("https://yande.re/post/show/932469") // 932469 is an artwork related to 932473
再次,运行它看一看:
=== RUN TestYande_Fetch
2022/08/30 20:53:47 Fetching https://yande.re/post/show/932469
2022/08/30 20:53:47 Fetching https://yande.re/post/show/932473
2022/08/30 20:53:48 edge changed: 932473(0) -> 932469(1)
2022/08/30 20:53:48 Hits the cache for https://yande.re/post/show/932473
2022/08/30 20:53:48 Fetching https://yande.re/post/show/932468
2022/08/30 20:53:48 Fetching https://yande.re/post/show/932472
2022/08/30 20:53:48 Fetching https://yande.re/post/show/933175
2022/08/30 20:53:49 Merging graph for yande: 932473 [932468 932473 932469 932472 933175]
2022/08/30 20:53:49 Merging graph for yande: 932469 [932469 932473 932472 933175 932468]
from ch1: ID: yande:932468
from ch1: ID: yande:932473
from ch1: ID: yande:932469
from ch1: ID: yande:932472
from ch1: ID: yande:933175
from ch2: ID: yande:932469
from ch2: ID: yande:932473
from ch2: ID: yande:932472
from ch2: ID: yande:933175
from ch2: ID: yande:932468
--- PASS: TestYande_Fetch (1.59s)
PASS
可以发现,现在多了一个 edge changed
,这表示在我们 Graph 动态构建过程中,932473
这一 sub graph 被自动合并到了 932469
,即被 932469
吞并为了一个更大的 Graph,完全符合我们的预期!
这篇应该是我写的最长的博客了,本来想分成两篇的,但又怕不太连贯,所以还是一篇。要实现一个并发缓存比想象的难,把它写成文章更难 :(