Golang 并发缓存实践

go-concurrent-cache

并发缓存,顾名思义就是让缓存可以在多个并发的 goroutine 中共享、交错使用,这在爬虫程序中尤其常见,每个新的爬虫任务都希望能够“实时地”复用之前,或正在进行中的任务结果,从而避免重复的网络请求开销。

这篇文章,以 yande.re 的图片爬虫为例,介绍如何在多个 goroutine 间共享并发缓存,从而实现一个高效的并发爬虫。yande.re 是一个还挺知名的二刺螈图片下载站点,里面图片大都是收录于各个渠道的插画原图。

串行版本

首先,先让我们来实现一个简单的无并发的串行版本。为了简单,只采集最最基本的几个字段,因为这不是本文重点:

type Artwork struct {
	ID    string `json:"id"`    // id of the artwork, e.g. "yande:932473"
	Image string `json:"image"` // original image url
	Title string `json:"title"` // title of the artwork
}

func (a *Artwork) String() string {
	return fmt.Sprintf("ID: %s", a.ID) // for convenience to debug, we print the ID only
}

然后让我们快速的实现它:

// yande.go
type Yande struct {
	occur map[string]struct{}
}

func (y *Yande) id(u string) string {
	matches := regexp.MustCompile(`/post/show/(\d+)`).FindStringSubmatch(u)
	if len(matches) == 2 {
		return matches[1]
	}
	return ""
}

func (y *Yande) parse(id string, r io.Reader) (artworks []*Artwork, err error) {
	doc, err := goquery.NewDocumentFromReader(r)
	if err != nil {
		return nil, err
	}

	image := doc.Find(".original-file-unchanged").AttrOr("href", "")
	if image == "" { // there is no original file, use the changed one
		image = doc.Find(".original-file-changed").AttrOr("href", "")
	}

	artworks = append(artworks, &Artwork{
		ID:    id,
		Image: image,
		Title: doc.Find("title").Text(),
	})

	// Find all related artworks, and Fetch them recursively in turn.
	doc.Find("div.status-notice > a[href^='/post/show/']").Each(func(_ int, s *goquery.Selection) {
		if id := y.id(s.AttrOr("href", "")); id != "" {
			a, _ := y.Fetch("https://yande.re/post/show/" + id) // we don't care about the error in recursion
			artworks = append(artworks, a...)
		}
	})
	return artworks, nil
}

func (y *Yande) Fetch(u string) ([]*Artwork, error) {
	id := y.id(u) // id of the artwork, e.g. "932473"
	if id == "" {
		return nil, errors.New("invalid url")
	} else if _, ok := y.occur[id]; ok {
		return nil, errors.New("the artwork has been fetched")
	} else {
		y.occur[id] = struct{}{}
	}

	r, err := http.Get(u)
	if err != nil {
		return nil, err
	}
	defer r.Body.Close()
	return y.parse(id, r.Body)
}

func NewYande() *Yande {
	return &Yande{
		occur: make(map[string]struct{}),
	}
}

上面代码中使用了 PuerkitoBio/goquery 这个包用来解析和选取 HTML,通过 go get 安装它:

go get -u github.com/PuerkitoBio/goquery

最后写个简单的测试 yande_test.go,验证结果是否正确:

func TestYande_Fetch(t *testing.T) {
	y := NewYande()

	artworks, err := y.Fetch("https://yande.re/post/show/932473")
	if err != nil {
		panic(err)
	}

	for _, a := range artworks {
		fmt.Println(a)
	}
}

这里及之后均使用 https://yande.re/post/show/932473 作为测试,它目前关联了其它 4 个插画,若结果正确,应该包含如下链接:

执行测试,正如我们预期,确实是这 5 个:

ID: yande:932473
ID: yande:932468
ID: yande:932469
ID: yande:932472
ID: yande:933175
--- PASS: TestYande_Fetch (2.58s)

并发版本

上面“串行版本”的最大缺点是“慢”,现在,让我们将其改造一下,先定个小目标:

  • 所有网络请求均在 goroutine 中,从而最大复用 I/O 资源
  • 使用 Channel 代替 Slice 作为结果返回,这允许我们在不同 goroutine 间更好的交换信息

首先修改一下 Yande 的结构:

type Yande struct {
	doing map[string]struct{}
	done  map[string]struct{}
	mu    sync.Mutex
	bus   chan *Artwork
}
  • doing 正在进行的任务,done 已经完成的任务
  • mu:由于需要在多个 goroutine 中修改 doingdone,因此需要添加一个 Mutex。你也可以使用 RWMutexsync.Map 等代替,这里为了简单使用 Mutex
  • bus:所有 goroutine 采集的结果都放在这里面,同时作为 Fetch 返回值返回,在所有 goroutine 任务结束后将其 close

然后快速实现一下:

func (y *Yande) parse(id string, r io.Reader) (err error) {
	doc, err := goquery.NewDocumentFromReader(r)
	if err != nil {
		return err
	}

	image := doc.Find(".original-file-unchanged").AttrOr("href", "")
	if image == "" { // there is no original file, use the changed one
		image = doc.Find(".original-file-changed").AttrOr("href", "")
	}

	// Find all related artworks, and Fetch them recursively in turn.
	doc.Find("div.status-notice > a[href^='/post/show/']").Each(func(_ int, s *goquery.Selection) {
		if id := y.id(s.AttrOr("href", "")); id != "" {
			_, _ = y.Fetch("https://yande.re/post/show/" + id) // we don't care about the error in recursion
		}
	})

	y.bus <- &Artwork{
		ID:    "yande:" + id,
		Image: image,
		Title: doc.Find("title").Text(),
	}
	y.end(id)
	return nil
}

func (y *Yande) end(id string) {
	defer y.mu.Unlock()
	y.mu.Lock()

	y.done[id] = struct{}{}
	delete(y.doing, id)
	if len(y.doing) == 0 {
		close(y.bus)
	}
}

func (y *Yande) Fetch(u string) (<-chan *Artwork, error) {
	id := y.id(u) // id of the artwork, e.g. "932473"
	if id == "" {
		return nil, errors.New("invalid url")
	}

	defer y.mu.Unlock()
	y.mu.Lock()
	if _, ok := y.doing[id]; ok {
		return nil, errors.New("already doing")
	} else if _, ok := y.done[id]; ok {
		return nil, errors.New("already done")
	} else {
		y.doing[id] = struct{}{}
	}

	go func() (succeed bool) {
		defer func() {
			if !succeed {
				y.end(id)
			}
		}()

		log.Println("Fetching", u)
		r, err := http.Get(u)
		if err != nil {
			return false
		}
		defer r.Body.Close()
		return y.parse(id, r.Body) == nil
	}()
	return y.bus, nil
}

func NewYande() *Yande {
	return &Yande{
		doing: make(map[string]struct{}),
		done:  make(map[string]struct{}),
		bus:   make(chan *Artwork, 30),
	}
}

测试用例需要稍微改一下:

func TestYande_Fetch(t *testing.T) {
	ch, err := NewYande().Fetch("https://yande.re/post/show/932473")
	if err != nil {
		panic(err)
	}
	for a := range ch {
		fmt.Println(a)
	}
}

最后执行看一下结果:

2022/08/30 19:31:30 Fetching https://yande.re/post/show/932473
2022/08/30 19:31:31 Fetching https://yande.re/post/show/932468
ID: yande:932473
2022/08/30 19:31:31 Fetching https://yande.re/post/show/932472
2022/08/30 19:31:31 Fetching https://yande.re/post/show/932469
2022/08/30 19:31:31 Fetching https://yande.re/post/show/933175
ID: yande:932472
ID: yande:932468
ID: yande:932469
ID: yande:933175
--- PASS: TestYande_Fetch (1.56s)

正确!耗时 1.56s,和串行版本的 2.58s 相比,提快了近 40%,当数据量更大时这个数字会更明显。

可以发现请求是并发执行的,并且在第二个请求开始后,第一个结果就“就绪”了,这意味着边采集的同时,外层函数(caller)甚至可以同时拿就绪的结果执行其它任务,从而进一步减少处理延迟,这就像一个魔法!

并发缓存版本

“并发版本”已经是局部最优解了,但是,从全局来看,它还存在一些问题。让我们更改一下测试用例:

func TestYande_Fetch(t *testing.T) {
	ch1, _ := NewYande().Fetch("https://yande.re/post/show/932473")
	ch2, _ := NewYande().Fetch("https://yande.re/post/show/932473")

	for a := range ch1 {
		fmt.Println("from ch1:", a)
	}
	for a := range ch2 {
		fmt.Println("from ch2:", a)
	}
}

这里假设在同一时刻,有两个来自用户的请求,要求同时采集同个插画,执行后的 output:

2022/08/30 19:43:05 Fetching https://yande.re/post/show/932473
2022/08/30 19:43:05 Fetching https://yande.re/post/show/932473
2022/08/30 19:43:06 Fetching https://yande.re/post/show/932469
2022/08/30 19:43:06 Fetching https://yande.re/post/show/933175
2022/08/30 19:43:06 Fetching https://yande.re/post/show/932472
2022/08/30 19:43:06 Fetching https://yande.re/post/show/932468
2022/08/30 19:43:07 Fetching https://yande.re/post/show/932468
2022/08/30 19:43:07 Fetching https://yande.re/post/show/932469
2022/08/30 19:43:07 Fetching https://yande.re/post/show/932472
from ch1: ID: yande:932473
2022/08/30 19:43:07 Fetching https://yande.re/post/show/933175
from ch1: ID: yande:932468
from ch1: ID: yande:932472
from ch1: ID: yande:932469
from ch1: ID: yande:933175
from ch2: ID: yande:932473
from ch2: ID: yande:932469
from ch2: ID: yande:932468
from ch2: ID: yande:932472
from ch2: ID: yande:933175
--- PASS: TestYande_Fetch (1.73s)

可以发现,我们的 5 个请求被放大到 2 倍 —— 每个请求都重复了一次。这是对 I/O 资源不必要的浪费,因为每个相互独立的 Fetch 并不能从另一个,可能已经就绪的 Fetch 中获得结果,从而受益。

因此,我们需要引入“全局缓存”的概念,并且是针对 Channel 的,是的,我们要“并发地”缓存 Channel!

关系模型

要实现并发缓存 Channel 并不容易,需要考虑许多边缘情况。但这不妨碍我们设想一个一切顺利的情况:

ch1, _ := NewYande().Fetch1("https://yande.re/post/show/932473")  // 932473 is an artwork related to 932469
ch2, _ := NewYande().Fetch2("https://yande.re/post/show/932469")  // 932469 is an artwork related to 932473

上面代码的 output 可能长这样:

# 1 yande:932473 from ch1  // Fetch1() called
# 2 yande:932469 from ch2  // Fetch2() called
# 3 yande:932468 from ch1  // first related artwork from Fetch1()
# 4 yande:932469 from ch1  // second related artwork from Fetch1(), hit cache #2
# 5 yande:932473 from ch2  // first related artwork from Fetch2(), hit cache #1
# 6 yande:932468 from ch2  // second related artwork from Fetch2(), hit cache #3
# 7 yande:932472 from ch2  // ...
# 8 yande:933175 from ch2  // ...
# 9 yande:932472 from ch1  // hit cache #7
#10 yande:933175 from ch1  // hit cache #8

可以发现 Fetch1Fetch2 彼此交错执行,并且双方都为获得所有(5 个)插画而努力,但 Fetch1 并不知晓来自彼岸的 Fetch2,反之亦然,双方各自不掌握对方的任何情报。因此,我们需要为它们建立纽带,即实现一个 Graph 结构,Fetch1Fetch2 是这个 Graph 的构建者,同时也是彼此的受益者。

简而言之,每次产出新 Artwork 时,将其添加到 Graph,同时与来自其它 goroutine 的 []Artwork 合并,其实就是个合并 sub graph 的操作,从而吞并成一个更大的 Graph。

但是,现实并非是完美无暇的,像上面 output 这样 Fetch1Fetch2 之间“近似完美”地交错,不仅没必要,还会带来性能损失。没必要是因为,本身单个 Fetch 就是并发的,不需要通过多个 Fetch 实现并发;性能损失是指,它们任何一方获取并使用对方缓存,都会产生额外的交互开销。

因此,最佳做法是仅保持其中一个 Fetch 是 active 的,另一个(一些)则处于 idle 状态,我们通过为 Graph 添加 WaitDone 两个方法实现,以下是 Graph 实现代码:

type Node struct {
	idx int
	val string
}
type wait struct {
	ch      []chan struct{}
	quality uint
}
type Graph struct {
	mu    sync.RWMutex
	waits map[int]*wait
	nodes map[string]*Node
	edges map[string]map[string]struct{}
}

func (g *Graph) addNode(id string, quality uint) (idx int) {
	if _, ok := g.nodes[id]; ok {
		return -1
	}

	idx = len(g.nodes)
	g.nodes[id] = &Node{idx: idx, val: id}
	if quality == 0 {
		g.waits[idx] = &wait{quality: 0}
	} else {
		g.waits[idx] = &wait{ch: []chan struct{}{make(chan struct{})}, quality: quality}
	}
	return
}

func (g *Graph) IncNode(id string, quality uint) (idx int, exists bool) {
	defer g.mu.Unlock()
	g.mu.Lock()

	if n, ok := g.nodes[id]; !ok {
		return g.addNode(id, quality), false
	} else {
		g.waits[n.idx].quality += quality
		return n.idx, true
	}
}

func (g *Graph) mergeGraph(a, b *Node) {
	log.Printf("edge changed: %s(%d) -> %s(%d)\n", b.val, b.idx, a.val, a.idx)

	// quality
	g.waits[a.idx].quality += g.waits[b.idx].quality

	// blockers
	g.waits[a.idx].ch = append(g.waits[a.idx].ch, g.waits[b.idx].ch...)
	delete(g.waits, b.idx)
	// sync to all nodes in that same graph of original
	for _, n := range g.bfs(b.val, make(map[string]struct{})) {
		g.nodes[n].idx = a.idx
	}
}

func (g *Graph) hasEdge(a, b string) bool {
	if _, ok := g.edges[a]; !ok {
		return false
	}
	if _, ok := g.edges[a][b]; ok {
		return true
	}
	return false
}

func (g *Graph) AddEdge(a, b string) bool {
	if a == b {
		return false
	}

	defer g.mu.Unlock()
	g.mu.Lock()
	if g.hasEdge(a, b) {
		return false
	}

	_ = g.addNode(a, 0)
	if n, ok := g.nodes[b]; !ok {
		g.nodes[b] = &Node{idx: g.nodes[a].idx, val: b}
	} else if idx := g.nodes[a].idx; n.idx != idx {
		g.mergeGraph(g.nodes[a], n)
	}

	if _, ok := g.edges[a]; !ok {
		g.edges[a] = make(map[string]struct{})
	}
	if _, ok := g.edges[b]; !ok {
		g.edges[b] = make(map[string]struct{})
	}
	g.edges[a][b] = struct{}{}
	g.edges[b][a] = struct{}{}
	return true
}

func (g *Graph) Wait(id string) {
	g.mu.RLock()
	n, ok := g.nodes[id]
	if !ok {
		g.mu.RUnlock()
		return
	}

	var ch chan struct{}
	if w, ok := g.waits[n.idx]; ok && len(w.ch) > 0 {
		ch = g.waits[n.idx].ch[0]
	}

	g.mu.RUnlock()
	if ch != nil {
		<-ch
	}
}

func (g *Graph) Done(id string) int {
	defer g.mu.Unlock()
	g.mu.Lock()

	n, ok := g.nodes[id]
	if !ok {
		return -1
	}
	w, ok := g.waits[n.idx]
	if !ok {
		return -1
	}

	if len(w.ch) < 1 {
		return 0
	}

	w.quality--
	if w.quality > 0 { // still waiting for other workers
		return int(w.quality)
	}

	for _, ch := range w.ch {
		close(ch)
	}
	w.ch = w.ch[:0]
	return 0
}

func (g *Graph) DoneForWait(id string) []string {
	g.Done(id)
	g.Wait(id)
	return g.Siblings(id)
}

func (g *Graph) bfs(id string, visited map[string]struct{}) (nodes []string) {
	queue := []string{id}
	for len(queue) > 0 {
		head := queue[0]
		if _, ok := visited[head]; ok {
			queue = queue[1:]
		} else {
			visited[head] = struct{}{}
			nodes = append(nodes, head)
			queue = append(queue[1:], collect.Keys(g.edges[head])...)
		}
	}
	return
}

func (g *Graph) Siblings(id string) []string {
	defer g.mu.RUnlock()
	g.mu.RLock()

	return g.bfs(id, make(map[string]struct{}))
}

func NewGraph() *Graph {
	return &Graph{
		waits: make(map[int]*wait),
		nodes: make(map[string]*Node),
		edges: make(map[string]map[string]struct{}),
	}
}

Channel 缓存

Go 的 Channel 只能被一次性读取,那要怎么把它缓存下来,供之后多次使用呢?之前我倒是想过,实现一个并发读写的共享 Channel 模型,即在一个 goroutine 中向 cached channel 写入数据,其它共享这个 channel cache 的 goroutine 也可以“实时地”读到写入的数据 —— 这种模式被称为 Fan-out,这里有一篇很好的文章简单介绍它。

我自以为是的实现了它,使用复杂的 pub/sub,但发现性能不佳,于是最终我选择了另一个“非实时”的方案:

func someExpensiveComputation() {
	dist := make(chan int)
	go func () {
		defer close(dist)
		for i := 0; i < 5; i++ {
			time.Sleep(time.Second)
			ch <- i
		}
	}
	return cache.Put("some-id", dist)
}

ch1 := someExpensiveComputation()
ch2 := cache.Pull("some-id")
ch3 := cache.Pull("some-id")

每当 someExpensiveComputation 产出一个新结果,ch1 可以实时地读到该结果,但 ch2ch3 会在所有 5 个任务结束,即 close(dist) 时,一次性读到所有缓冲的 5 条结果。这么设计的原因如下:

  • 我们只是想要“并发缓存”,而非“并发 Channel”,因为 Channel 间并发交互的开销更大

  • 对于小型计算任务,没必要使用“并发 Channel”。如我们 someExpensiveComputation 还是产生 5 条结果,且只是对这些结果执行累加:

    go func() {
    	sum := 0
    	for i := range ch1 {
    		sum += i
    	}
    }
    go func() {
    	sum := 0
    	for i := range ch2 {
    		sum += i
    	}
    }
    

    由于计算量、结果数量都很小,两个 goroutine 分别使用实时的 ch1,和非实时的 ch2,性能差别不大。

  • 对于大型计算任务,更没必要使用“并发 Channel”。如我们 someExpensiveComputation 现在产生 5w 条结果,处理 1 条结果耗费 1ms,这意味着若采用“并发 Channel”,每个 goroutine 都将耗费 50s 计算一个“完全相同”的结果,造成计算资源浪费。

    此时正确的做法应是,只由其中一个 goroutine 负责计算,并将最终计算结果缓存给其它 goroutine 使用,避免重复计算 —— 没错,这就是我们目前将要实现的“非并发 Channel”:

    go func() {
    	result := make(int, 1)
    	cache.Put("my-expensive-result", result)
    
    	sum := 0
    	for i := range ch1 {
    		time.Sleep(time.Millisecond)  // assume that takes 1ms for each task
    		sum += i
    	}
    	result <- sum
    	close(result)
    }
    go func() {
    	if c := cache.Pull("my-expensive-result"); c != nil {
    		fmt.Println(<-c)  // wait and use cached result directly
    		return
    	}
    
    	// TODO: fallback to compute like above
    }
    

好吧,说了这么多,相信你应该可以理解了,那让我们来实现它吧:

type Cache struct {
	futures sync.Map
	caches  sync.Map
}

func (a *Cache) Get(id string) ([]*Artwork, bool) {
	if d, ok := a.caches.Load(id); ok {
		return d.([]*Artwork), true
	}
	return nil, false
}

func (a *Cache) Has(id string) bool {
	_, ok := a.caches.Load(id)
	return ok
}

func (a *Cache) Set(id string, s []*Artwork) {
	if len(s) > 0 {
		a.caches.Store(id, s) // TODO: maybe it needs that clone each artwork into a new slice
	}
}

func (a *Cache) Put(ch <-chan *Artwork, ids ...string) <-chan *Artwork {
	dist := make(chan *Artwork, cap(ch))
	for _, id := range ids {
		if id != "" {
			a.futures.LoadOrStore(id, make(chan struct{}))
		}
	}

	go func() {
		defer close(dist)
		buf := make([]*Artwork, 0, cap(ch))
		occur := make(map[string]struct{}, cap(ch))
		for i := range ch {
			if _, ok := occur[i.ID]; !ok && i.ID != "" {
				occur[i.ID] = struct{}{}
				dist <- i
				buf = append(buf, i)
			}
		}

		for _, id := range ids {
			if id == "" {
				continue
			}
			a.Set(id, buf)
			if c, ok := a.futures.LoadAndDelete(id); ok {
				close(c.(chan struct{}))
			}
		}
	}()
	return dist
}

func (a *Cache) Pull(id string) <-chan *Artwork {
	future, ok := a.futures.Load(id)
	if !ok && !a.Has(id) {
		return nil
	}

	ch := make(chan *Artwork, 10)
	go func() {
		defer close(ch)
		if ok {
			<-future.(chan struct{})
		}
		if s, ok := a.Get(id); ok {
			for _, a := range s {
				ch <- a
			}
		}
	}()
	return ch
}

func (a *Cache) Merge(ids ...string) <-chan *Artwork {
	if len(ids) < 1 {
		return nil
	}

	var buf []*Artwork
	occur := make(map[string]struct{})

	for _, id := range ids {
		if c, ok := a.futures.Load(id); ok {
			<-c.(chan struct{})
		}

		s, _ := a.Get(id)
		for _, i := range s {
			if _, ok := occur[i.ID]; !ok {
				occur[i.ID] = struct{}{}
				buf = append(buf, i)
			}
		}
	}

	for _, id := range ids {
		a.Set(id, buf)
	}

	ch := make(chan *Artwork, len(buf))
	for _, i := range buf {
		ch <- i
	}
	close(ch)
	return ch
}

func NewCache() *Cache {
	return &Cache{}
}

实现并发缓存

一切准备工作就绪,现在实现我们的最终版本:

var rel = NewGraph()
var cache = NewCache()
type Yande struct {
	doing     map[string]struct{}
	mu        sync.Mutex
	master    string
	masterBus chan *Artwork
}

func (y *Yande) parse(id string, r io.Reader, ch chan<- *Artwork) (err error) {
	doc, err := goquery.NewDocumentFromReader(r)
	if err != nil {
		return err
	}

	image := doc.Find(".original-file-unchanged").AttrOr("href", "")
	if image == "" { // there is no original file, use the changed one
		image = doc.Find(".original-file-changed").AttrOr("href", "")
	}

	// Find all related artworks, and Fetch them recursively in turn.
	doc.Find("div.status-notice > a[href^='/post/show/']").Each(func(_ int, s *goquery.Selection) {
		if id := y.id(s.AttrOr("href", "")); id != "" {
			_, _ = y.Fetch("https://yande.re/post/show/" + id) // we don't care about the error in recursion
		}
	})

	ch <- &Artwork{
		ID:    "yande:" + id,
		Image: image,
		Title: doc.Find("title").Text(),
	}
	y.end(id, ch)
	return nil
}

func (y *Yande) end(id string, ch chan<- *Artwork) {
	if ch != nil {
		close(ch)
	}

	defer y.mu.Unlock()
	y.mu.Lock()

	delete(y.doing, id)
	if len(y.doing) == 0 {
		siblings := rel.DoneForWait(id)
		log.Println("Merging graph for yande:", y.master, siblings)

		ids := collect.Map(siblings, func(id string, _ int) string {
			return "yande:" + id
		})
		go func() {
			// We need to get a collection, that is all artworks in this graph.
			for i := range cache.Merge(ids...) {
				y.masterBus <- i
			}
			close(y.masterBus)
		}()
	}
}

func (y *Yande) Fetch(u string) (<-chan *Artwork, error) {
	id := y.id(u) // id of the artwork, e.g. "932473"
	if id == "" {
		return nil, errors.New("invalid url")
	}

	isMaster, exists := y.master == "", false
	if isMaster {
		y.master = id
		_, exists = rel.IncNode(y.master, 1)
	} else {
		exists = !rel.AddEdge(y.master, id)
	}

	y.mu.Lock()
	if _, ok := y.doing[id]; exists || ok {
		y.mu.Unlock()
		if isMaster { // If it's a brand-new master, and the id is fetching on another goroutine with occurred in graph.
			y.masterBus = make(chan *Artwork, 30)
			y.end(id, nil)
			return y.masterBus, nil
		}
		return nil, errors.New("the artwork has been fetched")
	} else {
		y.doing[id] = struct{}{}
	}
	y.mu.Unlock()

	if c := cache.Pull("yande:" + id); c != nil {
		log.Println("Hits the cache for", u)
		y.end(id, nil)
		return c, nil
	}

	ch := make(chan *Artwork, 30)
	dist := cache.Put(ch, "yande:"+id)
	if isMaster {
		y.master = id
		dist = y.masterBus
	}

	go func() (succeed bool) {
		defer func() {
			if !succeed {
				y.end(id, ch)
			}
		}()

		log.Println("Fetching", u)
		r, err := http.Get(u)
		if err != nil {
			return false
		}
		defer r.Body.Close()
		return y.parse(id, r.Body, ch) == nil
	}()
	return dist, nil
}

func NewYande() *Yande {
	return &Yande{
		doing:     make(map[string]struct{}),
		masterBus: make(chan *Artwork, 30),
	}
}

代码中使用了我写的另一个包 sxyazi/go-collection,执行以下命令安装它,或将其替换:

go get -u https://github.com/sxyazi/go-collection

测试用例还是前面那个:

func TestYande_Fetch(t *testing.T) {
	ch1, _ := NewYande().Fetch("https://yande.re/post/show/932473")
	ch2, _ := NewYande().Fetch("https://yande.re/post/show/932473")

	for a := range ch1 {
		fmt.Println("from ch1:", a)
	}
	for a := range ch2 {
		fmt.Println("from ch2:", a)
	}
}

运行它:

2022/08/30 20:50:51 Fetching https://yande.re/post/show/932473
2022/08/30 20:50:52 Fetching https://yande.re/post/show/932468
2022/08/30 20:50:52 Fetching https://yande.re/post/show/932469
2022/08/30 20:50:52 Fetching https://yande.re/post/show/933175
2022/08/30 20:50:52 Fetching https://yande.re/post/show/932472
2022/08/30 20:50:53 Merging graph for yande: 932473 [932472 932473 933175 932468 932469]
2022/08/30 20:50:53 Merging graph for yande: 932473 [932473 932468 932469 932472 933175]
from ch1: ID: yande:932472
from ch1: ID: yande:932473
from ch1: ID: yande:933175
from ch1: ID: yande:932468
from ch1: ID: yande:932469
from ch2: ID: yande:932472
from ch2: ID: yande:932473
from ch2: ID: yande:933175
from ch2: ID: yande:932468
from ch2: ID: yande:932469
--- PASS: TestYande_Fetch (1.67s)

完美!没有多余的请求,每个 Fetch 的 Graph 都得到了正确的合并,且都拿到正确的 5 个插画数据。此时将 URL 修改一下:

ch1, _ := NewYande().Fetch1("https://yande.re/post/show/932473")  // 932473 is an artwork related to 932469
ch2, _ := NewYande().Fetch2("https://yande.re/post/show/932469")  // 932469 is an artwork related to 932473

再次,运行它看一看:

=== RUN   TestYande_Fetch
2022/08/30 20:53:47 Fetching https://yande.re/post/show/932469
2022/08/30 20:53:47 Fetching https://yande.re/post/show/932473
2022/08/30 20:53:48 edge changed: 932473(0) -> 932469(1)
2022/08/30 20:53:48 Hits the cache for https://yande.re/post/show/932473
2022/08/30 20:53:48 Fetching https://yande.re/post/show/932468
2022/08/30 20:53:48 Fetching https://yande.re/post/show/932472
2022/08/30 20:53:48 Fetching https://yande.re/post/show/933175
2022/08/30 20:53:49 Merging graph for yande: 932473 [932468 932473 932469 932472 933175]
2022/08/30 20:53:49 Merging graph for yande: 932469 [932469 932473 932472 933175 932468]
from ch1: ID: yande:932468
from ch1: ID: yande:932473
from ch1: ID: yande:932469
from ch1: ID: yande:932472
from ch1: ID: yande:933175
from ch2: ID: yande:932469
from ch2: ID: yande:932473
from ch2: ID: yande:932472
from ch2: ID: yande:933175
from ch2: ID: yande:932468
--- PASS: TestYande_Fetch (1.59s)
PASS

可以发现,现在多了一个 edge changed,这表示在我们 Graph 动态构建过程中,932473 这一 sub graph 被自动合并到了 932469,即被 932469 吞并为了一个更大的 Graph,完全符合我们的预期!

最后

这篇应该是我写的最长的博客了,本来想分成两篇的,但又怕不太连贯,所以还是一篇。要实现一个并发缓存比想象的难,把它写成文章更难 :(