并发编程是Go语言的核心特性之一,而扇入(Fan-In)和扇出(Fan-Out)是两种常用的并发模式。它们能帮助我们高效处理数据流,充分利用多核优势。
什么是扇入和扇出?
扇出(Fan-Out):多个goroutine从同一个channel读取数据,实现并行处理
扇入(Fan-In):多个goroutine向同一个channel发送数据,实现结果汇聚
基础示例:生产者-消费者模型
package main
import (
"fmt"
"sync"
"time"
)
// 生产者:生成数据
func producer(id int, out chan<- int) {
for i := 0; i < 5; i++ {
out <- i
fmt.Printf("生产者%d: 生成数据 %d\n", id, i)
time.Sleep(time.Millisecond * 100)
}
close(out)
}
// 消费者:处理数据
func consumer(id int, in <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
for data := range in {
fmt.Printf("消费者%d: 处理数据 %d\n", id, data)
time.Sleep(time.Millisecond * 200)
}
}
func main() {
dataStream := make(chan int, 10)
var wg sync.WaitGroup
// 启动单个生产者
go producer(1, dataStream)
// 扇出:启动3个消费者并行处理
for i := 1; i <= 3; i++ {
wg.Add(1)
go consumer(i, dataStream, &wg)
}
wg.Wait()
fmt.Println("处理完成")
}
完整的扇入扇出模式
package main
import (
"fmt"
"sync"
"time"
)
// 工作生成器
func generateWork(workCount int) <-chan int {
out := make(chan int)
go func() {
for i := 0; i < workCount; i++ {
out <- i
}
close(out)
}()
return out
}
// 工作者:处理单个任务
func worker(id int, in <-chan int, out chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for task := range in {
// 模拟处理耗时
time.Sleep(time.Millisecond * 50)
result := task * 2
out <- result
fmt.Printf("工作者%d: 处理任务 %d -> 结果 %d\n", id, task, result)
}
}
// 扇入:合并多个channel的结果
func fanIn(inputChans ...<-chan int) <-chan int {
var wg sync.WaitGroup
merged := make(chan int, 100)
// 从每个输入channel收集数据
output := func(in <-chan int) {
defer wg.Done()
for data := range in {
merged <- data
}
}
wg.Add(len(inputChans))
for _, ch := range inputChans {
go output(ch)
}
// 等待所有输入channel关闭
go func() {
wg.Wait()
close(merged)
}()
return merged
}
func main() {
const workerCount = 4
const taskCount = 20
// 生成任务
tasks := generateWork(taskCount)
// 创建工作者channel
workerChans := make([]<-chan int, workerCount)
var wg sync.WaitGroup
// 扇出:启动多个工作者并行处理任务
for i := 0; i < workerCount; i++ {
workerOut := make(chan int, 10)
workerChans[i] = workerOut
wg.Add(1)
go worker(i, tasks, workerOut, &wg)
}
// 扇入:合并所有工作者的结果
results := fanIn(workerChans...)
// 等待所有工作者完成
go func() {
wg.Wait()
// 关闭每个工作者的输出channel
for i := 0; i < workerCount; i++ {
if ch, ok := any(workerChans[i]).(chan int); ok {
close(ch)
}
}
}()
// 收集并显示结果
var allResults []int
for result := range results {
allResults = append(allResults, result)
}
fmt.Printf("\n处理完成!共处理 %d 个任务\n", len(allResults))
fmt.Printf("结果列表: %v\n", allResults)
}
实际应用:并发下载器
package main
import (
"fmt"
"io"
"net/http"
"sync"
"time"
)
// 下载单个URL
func downloadURL(url string, resultChan chan<- string, wg *sync.WaitGroup) {
defer wg.Done()
start := time.Now()
resp, err := http.Get(url)
if err != nil {
resultChan <- fmt.Sprintf("下载失败 %s: %v", url, err)
return
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
resultChan <- fmt.Sprintf("读取失败 %s: %v", url, err)
return
}
elapsed := time.Since(start)
resultChan <- fmt.Sprintf("下载成功 %s: %d 字节, 耗时 %v",
url, len(body), elapsed)
}
func main() {
urls := []string{
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/3",
}
const maxConcurrent = 2
semaphore := make(chan struct{}, maxConcurrent)
resultChan := make(chan string, len(urls))
var wg sync.WaitGroup
// 扇出:并发下载
for _, url := range urls {
wg.Add(1)
go func(u string) {
semaphore <- struct{}{} // 获取信号量
defer func() { <-semaphore }() // 释放信号量
downloadURL(u, resultChan, &wg)
}(url)
}
// 等待所有下载完成
go func() {
wg.Wait()
close(resultChan)
}()
// 扇入:收集结果
fmt.Println("开始下载...")
for result := range resultChan {
fmt.Println(result)
}
fmt.Println("所有下载完成!")
}
模式优势与最佳实践
优势:
-
提高吞吐量:并行处理加速任务执行
-
资源控制:通过worker数量控制并发度
-
解耦合:生产、处理、收集阶段分离
-
弹性扩展:易于增加或减少工作者数量
最佳实践:
// 1. 使用带缓冲的channel避免死锁
ch := make(chan int, 100)
// 2. 明确关闭channel
defer close(ch)
// 3. 使用WaitGroup同步goroutine
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
// 工作代码
}()
// 4. 设置超时控制
select {
case result := <-ch:
fmt.Println(result)
case <-time.After(5 * time.Second):
fmt.Println("超时")
}
// 5. 优雅停止
stopChan := make(chan struct{})
go func() {
for {
select {
case <-stopChan:
return
case data := <-inputChan:
// 处理数据
}
}
}()
// 需要停止时
close(stopChan)
总结
扇入扇出模式是Go并发编程的强大工具,特别适合处理数据管道和并行计算场景。通过合理运用这些模式,我们可以:
-
构建高效的数据处理流水线
-
充分利用多核CPU性能
-
实现优雅的并发控制
-
创建可扩展的并发架构
掌握这些模式后,你就能更好地设计高并发的Go应用程序,处理大量数据时游刃有余。