通过 MapReduce 降低服务响应时间

kevinwan

通过 MapReduce 降低服务响应时间

在微服务中开发中,api 网关扮演对外提供 restful api 的角色,而 api 的数据往往会依赖其他服务,复杂的 api 更是会依赖多个甚至数十个服务。虽然单个被依赖服务的耗时一般都比较低,但如果多个服务串行依赖的话那么整个 api 的耗时将会大大增加。

那么通过什么手段来优化呢?我们首先想到的是通过并发来的方式来处理依赖,这样就能降低整个依赖的耗时,Go 基础库中为我们提供了 WaitGroup 工具用来进行并发控制,但实际业务场景中多个依赖如果有一个出错我们期望能立即返回而不是等所有依赖都执行完再返回结果,而且 WaitGroup 中对变量的赋值往往需要加锁,每个依赖函数都需要添加 Add 和 Done 对于新手来说比较容易出错

基于以上的背景,go-zero 框架中为我们提供了并发处理工具MapReduce,该工具开箱即用,不需要做什么初始化,我们通过下图看下使用 MapReduce 和没使用的耗时对比:

通过 MapReduce 降低服务响应时间插图

相同的依赖,串行处理的话需要 200ms,使用 MapReduce 后的耗时等于所有依赖中最大的耗时为 100ms,可见 MapReduce 可以大大降低服务耗时,而且随着依赖的增加效果就会越明显,减少处理耗时的同时并不会增加服务器压力

并发处理工具MapReduce

MapReduce是 Google 提出的一个软件架构,用于大规模数据集的并行运算,go-zero 中的 MapReduce 工具正是借鉴了这种架构思想

go-zero 框架中的 MapReduce 工具主要用来对批量数据进行并发的处理,以此来提升服务的性能

通过 MapReduce 降低服务响应时间插图(1)

我们通过几个示例来演示 MapReduce 的用法

MapReduce 主要有三个参数,第一个参数为 generate 用以生产数据,第二个参数为 mapper 用以对数据进行处理,第三个参数为 reducer 用以对 mapper 后的数据做聚合返回,还可以通过 opts 选项设置并发处理的线程数量

场景一: 某些功能的结果往往需要依赖多个服务,比如商品详情的结果往往会依赖用户服务、库存服务、订单服务等等,一般被依赖的服务都是以 rpc 的形式对外提供,为了降低依赖的耗时我们往往需要对依赖做并行处理

func productDetail(uid, pid int64) (*ProductDetail, error) {
	var pd ProductDetail
	err := mr.Finish(func() (err error) {
		pd.User, err = userRpc.User(uid)
		return
	}, func() (err error) {
		pd.Store, err = storeRpc.Store(pid)
		return
	}, func() (err error) {
		pd.Order, err = orderRpc.Order(pid)
		return
	})

	if err != nil {
		log.Printf("product detail error: %v", err)
		return nil, err
	}

	return &pd, nil
}

该示例中返回商品详情依赖了多个服务获取数据,因此做并发的依赖处理,对接口的性能有很大的提升

场景二: 很多时候我们需要对一批数据进行处理,比如对一批用户 id,效验每个用户的合法性并且效验过程中有一个出错就认为效验失败,返回的结果为效验合法的用户 id

func checkLegal(uids []int64) ([]int64, error) {
	r, err := mr.MapReduce(func(source chan<- interface{}) {
		for _, uid := range uids {
			source <- uid
		}
	}, func(item interface{}, writer mr.Writer, cancel func(error)) {
		uid := item.(int64)
		ok, err := check(uid)
		if err != nil {
			cancel(err)
		}
		if ok {
			writer.Write(uid)
		}
	}, func(pipe <-chan interface{}, writer mr.Writer, cancel func(error)) {
		var uids []int64
		for p := range pipe {
			uids = append(uids, p.(int64))
		}
		writer.Write(uids)
	})
	if err != nil {
        log.Printf("check error: %v", err)
		return nil, err
	}

	return r.([]int64), nil
}

func check(uid int64) (bool, error) {
	// do something check user legal
	return true, nil
}

该示例中,如果 check 过程出现错误则通过 cancel 方法结束效验过程,并返回 error 整个效验过程结束,如果某个 uid 效验结果为 false 则最终结果不返回该 uid

MapReduce 使用注意事项

  • mapper 和 reducer 中都可以调用 cancel,参数为 error,调用后立即返回,返回结果为 nil, error
  • mapper 中如果不调用 writer.Write 则 item 最终不会被 reducer 聚合
  • reducer 中如果不调用 writer.Wirte 则返回结果为 nil, ErrReduceNoOutput
  • reducer 为单线程,所有 mapper 出来的结果在这里串行聚合

实现原理分析:

MapReduce 中首先通过 buildSource 方法通过执行 generate(参数为无缓冲 channel)产生数据,并返回无缓冲的 channel,mapper 会从该 channel 中读取数据

func buildSource(generate GenerateFunc) chan interface{} {
    source := make(chan interface{})
    go func() {
        defer close(source)
        generate(source)
    }()

    return source
}

在 MapReduceWithSource 方法中定义了 cancel 方法,mapper 和 reducer 中都可以调用该方法,调用后主线程收到 close 信号会立马返回

cancel := once(func(err error) {
    if err != nil {
        retErr.Set(err)
    } else {
        // 默认的 error
        retErr.Set(ErrCancelWithNil)
    }

    drain(source)
    // 调用 close(ouput)主线程收到 Done 信号,立马返回
    finish()
})

在 mapperDispatcher 方法中调用了 executeMappers,executeMappers 消费 buildSource 产生的数据,每一个 item 都会起一个 goroutine 单独处理,默认最大并发数为 16,可以通过 WithWorkers 进行设置

var wg sync.WaitGroup
defer func() {
    wg.Wait() // 保证所有的 item 都处理完成
    close(collector)
}()

pool := make(chan lang.PlaceholderType, workers)
writer := newGuardedWriter(collector, done) // 将 mapper 处理完的数据写入 collector
for {
    select {
    case <-done: // 当调用了 cancel 会触发立即返回
        return
    case pool <- lang.Placeholder: // 控制最大并发数
        item, ok := <-input
        if !ok {
            <-pool
            return
        }

        wg.Add(1)
        go func() {
            defer func() {
                wg.Done()
                <-pool
            }()

            mapper(item, writer) // 对 item 进行处理,处理完调用 writer.Write 把结果写入 collector 对应的 channel 中
        }()
    }
}

reducer 单 goroutine 对数 mapper 写入 collector 的数据进行处理,如果 reducer 中没有手动调用 writer.Write 则最终会执行 finish 方法对 output 进行 close 避免死锁

go func() {
    defer func() {
        if r := recover(); r != nil {
            cancel(fmt.Errorf("%v", r))
        } else {
            finish()
        }
    }()
    reducer(collector, writer, cancel)
}()

在该工具包中还提供了许多针对不同业务场景的方法,实现原理与 MapReduce 大同小异,感兴趣的同学可以查看源码学习

  • MapReduceVoid 功能和 MapReduce 类似但没有结果返回只返回 error
  • Finish 处理固定数量的依赖,返回 error,有一个 error 立即返回
  • FinishVoid 和 Finish 方法功能类似,没有返回值
  • Map 只做 generate 和 mapper 处理,返回 channel
  • MapVoid 和 Map 功能类似,无返回

本文主要介绍了 go-zero 框架中的 MapReduce 工具,在实际的项目中非常实用。用好工具对于提升服务性能和开发效率都有很大的帮助,希望本篇文章能给大家带来一些收获。

项目地址

https://github.com/tal-tech/go-zero

微信交流群

通过 MapReduce 降低服务响应时间

Jump Desktop 通过 DRP 连接 windows10,只要没操作,马上就断开连接,怎么破

ubuntuGary:家里整了台小 nas,用 windows10 lts 系统,平时不在家自己睡眠,回家用 Jump Desktop 连接会网络唤醒并登陆,但是没操作一两分钟,就会断开,很烦,可有解决方法?

Hadoop Mapreduce:是否可以将mapper输出写入单独的输出文件(而非中间文件),而无需将reducer的数量设置为零? - java

我需要匿名化包含数千个文件的GB数据。这样做通常需要永远。因此,我计划在服务器上使用一个已经安装的伪分布式Hadoop集群。每个文件中的每个记录都需要在几列上进行匿名化,这些匿名化的列将存储在哈希图中。理想情况下,我希望一个映射器实例处理每个文件并生成一个相应的匿名输出文件。此外,映射器应吐出匿名列作为键值对,reducer将这些键值对聚合到单个文件中。在h…

通过>和<运算符比较日期 - php

在我的所有php代码中,我都在UTC中存储日期和时间,但是我也在使用mysql存储日期时间(也在utc中)。大于和小于运算符会导致日期比较失败吗? $curdate=date('Y-m-d H:i:s'); if($start_datetime>$curdate) 参考方案 不。他们没有办法失败。为此,特意制作了MySQL日期格式。…

Hadoop发行差异 - java

有人可以概述可用的各种Hadoop发行版之间的各种差异吗? Cloudera -http://www.cloudera.com/hadoop 雅虎-http://developer.yahoo.net/blogs/hadoop/ 以Apache Hadoop发行版为基准。在标准Apache Hadoop发行版上使用这些发行版之一,是否有充分理由? 参考方案 …

在这种情况下,如何为Hadoop编写分区程序? - java

我有一个像“签名[空白]日期[空白]时间”这样的钥匙,但我希望所有在同一容器中具有相同签名的钥匙。对于这种情况如何写一个partioner?最好的祝福, 参考方案 默认情况下,使用HashParititioner,整个哈希码用于计算reducer数量。为了完成您的任务,您可以仅散列部分密钥(在您的情况下为签名),并且具有相同签名的所有密钥都将分配给同一red…