Go语言中的并发模式:扇入扇出

并发编程是Go语言的核心特性之一,而扇入(Fan-In)和扇出(Fan-Out)是两种常用的并发模式。它们能帮助我们高效处理数据流,充分利用多核优势。

什么是扇入和扇出?

扇出(Fan-Out):多个goroutine从同一个channel读取数据,实现并行处理
扇入(Fan-In):多个goroutine向同一个channel发送数据,实现结果汇聚

基础示例:生产者-消费者模型

package main

import (
	"fmt"
	"sync"
	"time"
)

// 生产者:生成数据
func producer(id int, out chan<- int) {
	for i := 0; i < 5; i++ {
		out <- i
		fmt.Printf("生产者%d: 生成数据 %d\n", id, i)
		time.Sleep(time.Millisecond * 100)
	}
	close(out)
}

// 消费者:处理数据
func consumer(id int, in <-chan int, wg *sync.WaitGroup) {
	defer wg.Done()
	for data := range in {
		fmt.Printf("消费者%d: 处理数据 %d\n", id, data)
		time.Sleep(time.Millisecond * 200)
	}
}

func main() {
	dataStream := make(chan int, 10)
	var wg sync.WaitGroup

	// 启动单个生产者
	go producer(1, dataStream)

	// 扇出:启动3个消费者并行处理
	for i := 1; i <= 3; i++ {
		wg.Add(1)
		go consumer(i, dataStream, &wg)
	}

	wg.Wait()
	fmt.Println("处理完成")
}

完整的扇入扇出模式

package main

import (
	"fmt"
	"sync"
	"time"
)

// 工作生成器
func generateWork(workCount int) <-chan int {
	out := make(chan int)
	go func() {
		for i := 0; i < workCount; i++ {
			out <- i
		}
		close(out)
	}()
	return out
}

// 工作者:处理单个任务
func worker(id int, in <-chan int, out chan<- int, wg *sync.WaitGroup) {
	defer wg.Done()
	for task := range in {
		// 模拟处理耗时
		time.Sleep(time.Millisecond * 50)
		result := task * 2
		out <- result
		fmt.Printf("工作者%d: 处理任务 %d -> 结果 %d\n", id, task, result)
	}
}

// 扇入:合并多个channel的结果
func fanIn(inputChans ...<-chan int) <-chan int {
	var wg sync.WaitGroup
	merged := make(chan int, 100)

	// 从每个输入channel收集数据
	output := func(in <-chan int) {
		defer wg.Done()
		for data := range in {
			merged <- data
		}
	}

	wg.Add(len(inputChans))
	for _, ch := range inputChans {
		go output(ch)
	}

	// 等待所有输入channel关闭
	go func() {
		wg.Wait()
		close(merged)
	}()

	return merged
}

func main() {
	const workerCount = 4
	const taskCount = 20

	// 生成任务
	tasks := generateWork(taskCount)

	// 创建工作者channel
	workerChans := make([]<-chan int, workerCount)
	var wg sync.WaitGroup

	// 扇出:启动多个工作者并行处理任务
	for i := 0; i < workerCount; i++ {
		workerOut := make(chan int, 10)
		workerChans[i] = workerOut
		wg.Add(1)
		go worker(i, tasks, workerOut, &wg)
	}

	// 扇入:合并所有工作者的结果
	results := fanIn(workerChans...)

	// 等待所有工作者完成
	go func() {
		wg.Wait()
		// 关闭每个工作者的输出channel
		for i := 0; i < workerCount; i++ {
			if ch, ok := any(workerChans[i]).(chan int); ok {
				close(ch)
			}
		}
	}()

	// 收集并显示结果
	var allResults []int
	for result := range results {
		allResults = append(allResults, result)
	}

	fmt.Printf("\n处理完成!共处理 %d 个任务\n", len(allResults))
	fmt.Printf("结果列表: %v\n", allResults)
}

实际应用:并发下载器

package main

import (
	"fmt"
	"io"
	"net/http"
	"sync"
	"time"
)

// 下载单个URL
func downloadURL(url string, resultChan chan<- string, wg *sync.WaitGroup) {
	defer wg.Done()

	start := time.Now()
	resp, err := http.Get(url)
	if err != nil {
		resultChan <- fmt.Sprintf("下载失败 %s: %v", url, err)
		return
	}
	defer resp.Body.Close()

	body, err := io.ReadAll(resp.Body)
	if err != nil {
		resultChan <- fmt.Sprintf("读取失败 %s: %v", url, err)
		return
	}

	elapsed := time.Since(start)
	resultChan <- fmt.Sprintf("下载成功 %s: %d 字节, 耗时 %v", 
		url, len(body), elapsed)
}

func main() {
	urls := []string{
		"https://httpbin.org/delay/1",
		"https://httpbin.org/delay/2",
		"https://httpbin.org/delay/1",
		"https://httpbin.org/delay/3",
	}

	const maxConcurrent = 2
	semaphore := make(chan struct{}, maxConcurrent)
	resultChan := make(chan string, len(urls))
	var wg sync.WaitGroup

	// 扇出:并发下载
	for _, url := range urls {
		wg.Add(1)
		go func(u string) {
			semaphore <- struct{}{}        // 获取信号量
			defer func() { <-semaphore }() // 释放信号量
			downloadURL(u, resultChan, &wg)
		}(url)
	}

	// 等待所有下载完成
	go func() {
		wg.Wait()
		close(resultChan)
	}()

	// 扇入:收集结果
	fmt.Println("开始下载...")
	for result := range resultChan {
		fmt.Println(result)
	}
	fmt.Println("所有下载完成!")
}

模式优势与最佳实践

优势:

  1. 提高吞吐量:并行处理加速任务执行
  2. 资源控制:通过worker数量控制并发度
  3. 解耦合:生产、处理、收集阶段分离
  4. 弹性扩展:易于增加或减少工作者数量

最佳实践:

// 1. 使用带缓冲的channel避免死锁
ch := make(chan int, 100)

// 2. 明确关闭channel
defer close(ch)

// 3. 使用WaitGroup同步goroutine
var wg sync.WaitGroup
wg.Add(1)
go func() {
    defer wg.Done()
    // 工作代码
}()

// 4. 设置超时控制
select {
case result := <-ch:
    fmt.Println(result)
case <-time.After(5 * time.Second):
    fmt.Println("超时")
}

// 5. 优雅停止
stopChan := make(chan struct{})
go func() {
    for {
        select {
        case <-stopChan:
            return
        case data := <-inputChan:
            // 处理数据
        }
    }
}()
// 需要停止时
close(stopChan)

总结

扇入扇出模式是Go并发编程的强大工具,特别适合处理数据管道和并行计算场景。通过合理运用这些模式,我们可以:
  1. 构建高效的数据处理流水线
  2. 充分利用多核CPU性能
  3. 实现优雅的并发控制
  4. 创建可扩展的并发架构
掌握这些模式后,你就能更好地设计高并发的Go应用程序,处理大量数据时游刃有余。

购买须知/免责声明
1.本文部分内容转载自其它媒体,但并不代表本站赞同其观点和对其真实性负责。
2.若您需要商业运营或用于其他商业活动,请您购买正版授权并合法使用。
3.如果本站有侵犯、不妥之处的资源,请在网站右边客服联系我们。将会第一时间解决!
4.本站所有内容均由互联网收集整理、网友上传,仅供大家参考、学习,不存在任何商业目的与商业用途。
5.本站提供的所有资源仅供参考学习使用,版权归原著所有,禁止下载本站资源参与商业和非法行为,请在24小时之内自行删除!
6.不保证任何源码框架的完整性。
7.侵权联系邮箱:aliyun6168@gail.com / aliyun666888@gail.com
8.若您最终确认购买,则视为您100%认同并接受以上所述全部内容。

会员源码网 后端编程 Go语言中的并发模式:扇入扇出 https://svipm.com/21583.html

相关文章

猜你喜欢
发表评论
暂无评论