精华 一个简单的 mysql 队列问题
发布于 9 年前 作者 alsotang 12561 次浏览 最后一次编辑是 8 年前 来自 分享

最近有个朋友要实现队列任务方面的工作,我们就 mysql(innodb) 的事务和锁的特性聊了一些有趣的话题。

其中,最终的解决方案来自大神 https://github.com/fengmk2 之前的一个队列实现。 我做了一个小改进,使得之前表级锁的表现可以恢复到行级锁水平。

任务的大致描述是这样的:

有一个表,里面存了很多的用户id,大概100w条,表的结构简化如下:

create table user_block_status {
  user_id bigint // 用户的id
  status int // 用户的状态。1 ok 2 not ok
  updated_time timestamp // 更新时间戳
}

这个表里面,每隔10秒就要去检查用户是否存在违规页面。如果存在的话,则需要把 status 置为 2,默认是 1。

有 100 个 worker 会并发地从表里面读取 user_id,所以我们要设计一个策略,使得这 100 个 worker 在并发时, 读到的是独立的 100 个条目。

方案1

一开始的方案是这样的:

// 这一句不一定会发请求,可能会优化成跟接下来的第一个 query 一起发出
sql.begin_transaction

// 第一次io发生。
// 如果一个用户在 10s 内没有被更新,那么取出来
// 这时候由于程序拿得到 user_id 的值,所以网络io是发生了的。否则拿不到 user_id 的值
outdate_time = now() - 10s
line = sql.query('select user_id where updated_time < ? order by updated_time asc limit 1', [outdate_time])

// 第二次 io 发生
// 更新这一行的 updated_time,免得被其他worker重复读取
user_id = line.user_id
sql.query('update user_block_status set updated_time=now() where user_id = ?',
  [user_id])

// 第三次 io 发生
sql.commit

// do something with user_id

可以看到,这个地方我们发起了 3 次 io 请求。当然,请求数不是很关键,因为请求数以及对应的时间是一个恒定量, 而随着 worker 的增加,这一块并不会带来额外的性能瓶颈。但由于我们使用了事务,所以当 worker 由 100 增加到 1000 的时候,数据库由于存在大量的事务操作,这些事务都需要掌握写锁,所以有潜在的写锁排队问题。

而且关键是,方案是不可行的,根本没有起到队列的效果。

为什么呢?我们假设网络io无限快,而数据库每条语句的执行时间是1s,那么我们这个事务的执行时间是 2s。 这时如果 3 个 worker 并发地在同一秒(00:00)执行,那么假设 worker1 读到的 user_id 是 10086, 由于读锁是共享的,worker2 和 worker3 读到的 user_id 也是 10086。这时他们三个都想要更新 10086 的值, 而 worker1 抢先加了写锁,所以 worker2 和 worker3 就需要等待 worker1 的事务执行完毕, 才能重新获得 10086 的写锁并进行写入。 所以当 worker2 执行的时候,是 00:02 的时候,当 worker3 执行的时候,是 00:04 的时刻。 而且由于他们都是在对 10086 进行更新,所以没有起到队列的效果。

这里的查询条件太特殊,导致所有并发的事务需要的都是同一条数据, 这时候 innodb 行级锁的特性也没有发挥出来。

这个方案不仅并发时的表现类似表级锁的特性,而且也没有达到队列的效果。

方案2

将 update 语句在先,select 语句在后。

update 语句改成

outdate_time = now() - 10s
result = sql.query('update user_block_status set updated_time=now() where updated_time < ?   order by updated_time asc limit 1',
  [outdate_time])

## each worker can get different result.user_id

这样在 update 的时候,3 个 worker 会排队,分别更新不同的 user_id 条目。然后返回来的 也是不同的 user_id。

可关键是,update 语句并不会将被 update 了的 id 返回给程序,所以我们后面的 select 语句拿不到对应的 user_id。 这个方案先否决。

方案3

方案1的基础上,在 select 语句中,手工地干扰一下,使得不同的 worker 取到不同的条目

outdate_time = now() - 10s
random_number = random_int(0, worker_count * 2)
line = sql.query('select user_id where updated_time < ?  order by updated_time asc limit 1 offset ?',
  [outdate_time, random_number])

这时,我们的 worker 有很大的几率可以取出不同 user_id。但这里也还有个问题就是,很可能两个 worker 的 random_number 是同一个值。那么就发生了两次重复读取,不过对于我们的业务来说,重复读取只会造成资源的浪费, 而不会带来数据一致性的问题。只要尽量减少重复读的几率,那么这个方案就是可被接受的。

其中 worker_count * 2 是拍脑袋决定的数,如果数据库中始终有大量需要处理的数据,可以加大点。

方案4

方案3还是挺不完美的,虽然能解决问题,但是从概念上来说,我们需要的是队列。 队列的意思就是:排队!排队!排队!

方案3只是从业务逻辑层面出发,做出了一些规避,模拟了我们需要的效果。

那么回到方案2,其实方案2是更接近队列的。因为不同的 worker 真正在等待另一个 worker 更新东西。 可方案2无奈的是,我们拿不到被更新的id。那么有没有办法拿到呢?

其实是有的,用 mysql 的 LAST_INSERT_ID() 函数。

LAST_INSERT_ID(): Value of the AUTOINCREMENT column for the last INSERT

关于这个函数可以看看 https://dev.mysql.com/doc/refman/5.7/en/information-functions.html 这里的详细介绍。

这个函数本来的含义是,拿到 AUTO_INCREMENT 那一列的最新值。也就是我们最新 insert 进表的那个 id。 但实际上,它也可以作为一个 sql 语句中的变量来使用,它可以被赋值,然后取出。 而且它的作用域是同一 connection 内,这样我们多个 worker 如果对 LAST_INSERT_ID 赋了不同的值, 也不会互相干扰,因为不同的 worker 使用不同的 connection。

这时,我们的查询在方案2的基础上就变成:

sql.begin_transaction

outdate_time = now() - 10s
sql.query('update user_block_status set updated_time=now(),
 id=LAST_INSERT_ID(id) where updated_time < ?  order by updated_time asc limit 1',
  [outdate_time])

line = sql.query('select user_id where id = LAST_INSERT_ID()')

## do sth with line.user_id

sql.commit

ok,已经能排队了,业务上已经可以满足了。

目前性能上说,网络io还是三个,而且,【行级锁】没有被利用的特定依然存在。 写锁依然要排队,为什么这么说?因为不管 worker 有多少个,当他们并发的时候,where 条件都始终把它们 指向同一行数据,所以还是要为了同一行数据排队。即使目前我们已经达成了【排队之后,互相更新不同条目】这个目的。

方案4就总的性价比来说,目前跟方案3相比,还不一定谁好谁坏。 方案4的性能在于多个worker抢一个锁,大家总是等;方案3是无脑乱取,造成资源浪费,降低worker的效率,浪费机器。

什么情况下方案3好? 如果总是有一大堆数据没有被处理的话,那么把方案3的乱取范围开大点,就能更好避免浪费。 而当一大堆数据等待处理的时候,方案4却不停在排队,这就等于堵住了。

还有一种情况就是,方案4的写锁排队已经成为瓶颈。但其实这跟上面是一回事,当总是有一大堆 worker 来取 东西的话,说明就是有一大堆数据没有被处理。否则开那么多 worker 干嘛。

什么情况下方案4好? 前提就是,写锁排队并不成为瓶颈。如果要处理的数据并不是那么多,那么使用方案4的话,可以降低我们需要的 worker 数量,节约机器。 而且 worker 数量评估可以更加理性。

方案5

那么,我们把方案3的 offset 思想加进来吧。可惜啊可惜,update 语法只支持 limit,不支持 offset。

UPDATE [LOW_PRIORITY] [IGNORE] table_reference
    SET col_name1={expr1|DEFAULT} [, col_name2={expr2|DEFAULT}] ...
    [WHERE where_condition]
    [ORDER BY ...]
    [LIMIT row_count]

那就绕一绕。

不用 offset,而是通过更改 outdate_time 的值,让他们获得不同的行数据。

我们的程序是要求 10s 算作过期,那么 11s、20s、30s 肯定也算过期吧。那就这样写:

// 在 10 到 30s 之间随机取值
outdate_time = now() - (random(10, 30))s
sql.query('update user_block_status set updated_time=now(),
 id=LAST_INSERT_ID(id) where updated_time < ?  order by updated_time asc  limit 1',
  [outdate_time])

where updated_time < now() - 10swhere updated_time < now() - 12swhere updated_time < now() - 15s(不要在 where 条件里面写计算,这只是示例) 还是有可能锁定同一条数据。但至少,这个方案既利用上了行级锁,也不会造成多个 worker 处理同一 user_id 的 资源浪费。

方案6

锁的问题差不多就这么解决了。

我们再回头看看,发现还有个 io 问题可以再弄弄。现在还是 3 个 io 嘛。

其实到了现在这步,

begin_transaction 可以去掉了。因为我们只有一个涉及写锁的操作在里面,这个操作本身作为单一语句, 就已经是原子性的了。

但由于我们利用了 LAST_INSERT_ID,所以我们要保证 update 语句和它之后的 select 语句在同一个 connection 中。

很多的 mysql 库实现都是用了连接池的,所以同一段代码中的两条 sql 有可能会利用两条 connection, 导致得到我们非预期的 user_id。

但就我们的业务来说,LAST_INSERT_ID 混了其实是没关系的。每个 worker 始终还是会得到一个 unique 的 user_id。 这就够了。那么我们也不必加一些多余的逻辑,保证这两条语句取到同一个 connection。

这时,io 操作从 3,降低到了 2。

那么,有没有可能降到 1 呢。

其实也可以啊…因为基本所有 mysql 库都支持 multistatements 特性。

我们可以在一条 query 写两个语句,返回接口会是一个数组,分别表示这两个语句的值。

类似这样,sql.query('update .....; select ....;')。这是支持的。而且这么一来, 同一 connection 的问题也解决了。避免为以后留坑。

重写方案

outdate_time = now() - (random(10, 30))s
result = sql.query('update user_block_status set updated_time=now(),
 user_id=LAST_INSERT_ID(user_id) where updated_time < ?  order by updated_time asc limit 1;

 select * from user_block_status where user_id = LAST_INSERT_ID()',
  [outdate_time])

// do something with result[1].user_id

。。。。。。。。。。。。。

还是有坑的。。。。。。。。。。。。。。。

如果 where updated_time < ? 一条都不命中,那么会发生什么结果?

首先,update 没有改变任何行。而 LAST_INSERT_ID 还是会返回一个合理的 id,有可能是真正的 LAST_INSERT_ID, 也可能是这条 connection 中上次手工设置的。

在这里可以多说一下 LAST_INSERT_ID 的特性。默认情况下,LAST_INSERT_ID() 不带参数会返回最新插入那条的 id。 带参数的情况下 LAST_INSERT_ID(id) 本身的返回值就是参数,然后在接下来的调用中,如果不发生任何 insert,那么 值会在 connection 中一直保持。如果发生了 insert,就会被更新。

如果不处理这个 update nothing 的异常情况,当队列全部被处理完的时候, 我们的 worker 会一直工作,不会停下来。所以我们要在取 LAST_INSERT_ID 的值时, 判断一下上一条 update 语句到底有没有发生作用。

这时候我们需要用到另一个跟 LAST_INSERT_ID 一起出现在文档中的函数,

ROW_COUNT(): The number of rows updated

判断一下 ROW_COUNT,如果是 0 的话,就条件不符,这时候我们在程序里面拿到的值就是空。

最终方案

outdate_time = now() - (random(10, 30))s
result = sql.query('update user_block_status set updated_time=now(),
  user_id=LAST_INSERT_ID(user_id) where updated_time < ?  order by updated_time asc limit 1;

  select * from user_block_status where user_id = LAST_INSERT_ID()
    and ROW_COUNT() <> 0',
  [outdate_time])

// do something with result[1].user_id

当然,mysql 用来解决这种队列问题可能不是一个好的方案。队列相关的知识,我还在努力学习中。

参考资料:

  1. http://www.cnblogs.com/zhoujinyi/p/3437475.html
  2. https://dev.mysql.com/doc/refman/5.7/en/information-functions.html
19 回复

要获取 last insert id,相比 Mysql ,Postgres 就比较爽了 UPDATE users SET xxx = yyy RETURNING id

-. - 其实我没怎么看明白需求。

@cfddream 不能同意更多。支持postges

来自炫酷的 CNodeMD

@xadillax 。。。

@cfddream @klesh 这个屌啊。。

@i5ting 100w条数据存js内存中不合适,而且一条也不能丢

@xadillax 我重新描述一下

数据表里面有 100w 条数据,每条都有独立的 user_id,有一百个后台任务进程会来获取 user_id,要求获取到的 user_id 都是独立的。

大神,能解锁我的话题吗?pls~~~~~~

我觉得可以从程序层面解决这一类问题:无关数据库,无关队列
在写数据库时,给已经更新的数据一个唯一标识符(由worker程序给定,前提worker可控),然后通过这个worker指定的标识符查找,具体可以参考mongodb objectID的实现。

@hanyuzhou2006 可以详细点吗?还没太听明白

@alsotang

  1. worker端使用一个生成hashCode的方法
  2. update数据时将code写进去
  3. select数据where code=上述code 以上面为例
//1.生成code的函数
 var hashCode = (function(){  
   var  suffix = parseInt(worker.id); //每个worker是独一无二的 
   var count = 0;  
   return function(){  
       return (Date.now()*10000+(count++%1000)*10+suffix).toString(16);      //允许同一毫秒内有1000并发,足够了,不够再放大
	 }  
  })();  
//2.update and get
code = hashCode()
now_time = now()
outdate_time = now_time - (random(10, 30))s
result = sql.query('update user_block_status set updated_time=?,code=? where  updated_time <? order by updated_time asc limit 1;
select * from user_block_status where code=?',now_time,code,outdate_time,code)

@alsotang 存内存没法分布式检查吧

@hanyuzhou2006 是的,这个方法也很好!

可以使用自定义变量

SET @ids := '';

UPDATE table_name
SET updated_time = now(), extra = (SELECT @ids := CONCAT_WS(',', id, @ids))
WHERE xxx
ORDER BY xxx
LIMIT 1 or 更多;

SELECT @ids;
  • 这个方法需要有一个多余列 extra
  • node-mysql 使用 multiStatements

要慢慢看😂😂

来自酷炫的 CNodeMD

刚看到这个 pg 的 http://git.oschina.net/brh/mq2 pg 支持 update return, 支持 FOR UPDATE SKIP LOCKED 可惜 mysql innodb 不支持, 我现在 innodb update where 条件太多, 表锁没法降级 … 只好增大 LIMIT, 减少 DB hit

<p style=“text-align:right”><a href=“https://github.com/lumia2046/cnode”> — — 来自lumia2046-react-cnode</a></p>

呃,竟然是我 自豪地采用 CNodeJS ionic

回到顶部