go语言批量处理大量数据的方法

Golang被证明非常适合并发编程,goroutine比异步编程更易读、优雅、高效。本文提出一个适合由Golang实现的Pipeline执行模型,适合批量处理大量数据(ETL)的情景。

想象这样的应用情景:

从数据库A(Cassandra)加载用户评论(量巨大,例如10亿条);根据每条评论的用户ID、从数据库B(MySQL)关联用户资料;调用NLP服务(自然语言处理),处理每条评论;将处理结果写入数据库C(ElasticSearch)。

由于应用中遇到的各种问题,归纳出这些需求:
需求一:应分批处理数据,例如规定每批100条。出现问题时(例如任意一个数据库故障)则中断,下次程序启动时使用checkpoint从中断处恢复。
需求二:每个流程设置合理的并发数、让数据库和NLP服务有合理的负载(不影响其它业务的基础上,尽可能占用更多资源以提高ETL性能)。例如,步骤(1)-(4)分别设置并发数1、4、8、2。

这就是一个典型的Pipeline(流水线)执行模型。把每一批数据(例如100条)看作流水线上的产品,4个步骤对应流水线上4个处理工序,每个工序处理完毕后就把半成品交给下一个工序。每个工序可以同时处理的产品数各不相同。

你可能首先想到启用1+4+8+2个goroutine,使用channel来传递数据。我也曾经这么干,结论就是这么干会让程序员疯掉:流程并发控制代码非常复杂,特别是你得处理异常、执行时间超出预期、可控中断等问题,你不得不加入一堆channel,直到你自己都不记得有什么用。

重用的Pipeline模块

为了更高效完成ETL工作,我将Pipeline抽象成模块。我先把代码粘贴出来,再解析含义。模块可以直接使用,主要使用的接口是:NewPipeline、Async、Wait。

使用这个Pipeline组件,我们的ETL程序将会简单、高效、可靠,让程序员从繁琐的并发流程控制中解放出来:

package main
 
import "log"
 
func main() {
    //恢复上次执行的checkpoint,如果是第一次执行就获取一个初始值。
    checkpoint := loadCheckpoint()
    
    //工序(1)在pipeline外执行,最后一个工序是保存checkpoint
    pipeline := NewPipeline(4, 8, 2, 1) 
    for {
        //(1)
        //加载100条数据,并修改变量checkpoint
        //data是数组,每个元素是一条评论,之后的联表、NLP都直接修改data里的每条记录。
        data, err := extractReviewsFromA(&checkpoint, 100) 
        if err != nil {
            log.Print(err)
            break
        }
        
        //这里有个Golang著名的坑。
        //“checkpoint”是循环体外的变量,它在内存中只有一个实例并在循环中不断被修改,所以不能在异步中使用它。
        //这里创建一个副本curCheckpoint,储存本次循环的checkpoint。
        curCheckpoint := checkpoint
        
        ok := pipeline.Async(func() error {
            //(2)
            return joinUserFromB(data)
        }, func() error {
            //(3)
            return nlp(data)
        }, func() error {
            //(4)
            return loadDataToC(data)
        }, func() error {
            //(5)保存checkpoint
            log.Print("done:", curCheckpoint)
            return saveCheckpoint(curCheckpoint)
        })
        if !ok { break }
        
        if len(data) < 100 { break } //处理完毕
    }
    err := pipeline.Wait()
    if err != nil { log.Print(err) }
}

以上就是golang 如何处理大数据的详细内容,更多请关注北单博客其它相关文章!

原创文章,作者:YVIPG,如若转载,请注明出处:https://www.beidanyezhu.com/a/26094.html

(0)
YVIPG的头像YVIPG
上一篇 2024-12-17 14:10:17
下一篇 2025-01-01 17:24:57

相关推荐

  • go语言中的byte是什么

    字符串中的每一个元素叫做“字符”,在遍历或者单个获取字符串元素时可以获得字符。 Go语言的字符有以下两种: 一种是 uint8 类型,或者叫 byte 型,代表了 ASCII 码的…

  • go语言使用json隐藏字段的方法

    使用场景:在 go 中给 API 调用者响应 json 数据。 1. 有些字段不暴露给用户。 2. 有些字段是根据用户的级别控制是否有这些数据。 Id字段不暴露给用户,则使用 `j…

    2025-01-03
  • go语言适合开发什么

    其实Go语言主要用作服务器端开发,其定位是用来开发“大型软件”的,适合于需要很多程序员一起开发,并且开发周期较长的大型软件和支持云计算的网络服务。 Go语言融合了传统编译型语言的高…

  • golang中导入包的方法

    这篇文章运用简单易懂的例子给大家介绍golang中导入包的方法,代码非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。 import Go 使用包(package)作为…

  • go语言中的nil是什么

    大家都清楚,当你声明了一个变量 但却还并木优赋值时,golang中会自动给你的变量类型给一个对应的默认零值。 这是每种类型对应的零值: bool -> false numbe…

  • go语言中可以把包名去掉吗

    Golang不可以把包名去掉,包名是一种类似命名空间的管理和组织代码的方式,而Golang的包有两种类型,一种是“main”包,该包的可以有唯一的一个“main”函数,这个函数也是…

  • go语言支持泛型吗

    Golang团队认为在类型系统和运行时的复杂性花费太大,还没找到可以和这个复杂性相抵的良好设计。 内置的map和slice其实都有泛型的味道,加上可以用interface{}来构造…

  • go语言适合开发web吗

    go语言适合开发web吗?相信大部分人都不太了解,今天小编为了让大家更加了解,给大家总结了以下内容,跟随小编一起来看看吧。 网络编程方面,Go语言广泛应用于 Web 应用、API …

  • go语言中的反射

    反射是什么? 反射是一种计算机处理方式。有程序可以访问、检测和修改它本身状态或行为的这种能力。能提供封装程序集、类型的对象。(程序集包含模块,而模块包含类型,类型又包含成员。)Go…

  • go语言中读取文本乱码怎么解决

    1、当文件中存在中文字符时,读取文件出现乱码,解决方法: 使用"github.com/axgle/mahonia"第三方包解译码。 package functi…

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

分享本页
返回顶部