一.情景介绍: 1.有一个接口提供update数据的功能,但这个接口在update时,需要做一些额外的操作,即需要通过先find后update的方式来修改数据,这样在并发的情况下如何保证数据的一致性? 2.使用mongoose 二.相关伪代码如下:
const user = await user.find({"id":id}).exec();
//要对user的属性做一些修改
user.xxx = xxxxxx;
await user.save();
当这个接口被并发调用时,就会出现值覆盖的问题,比如第一个请求是把user.age + 1,第二个请求同第一个请求,但实际效果user.age可能只加了1,因为第二个请求取到的user.age时,第一个请求还没有执行完save操作,导致第二个请求读到了脏数据,所以后面的值会覆盖前面的值. 三.局限性 1.中间的这部分修改操作是必须的,且无法转换成mongo原生支持的操作符(指$inc等),通常见于利用原属性值计算新值,比如user.age = (new Date().getTime() - user.birthday).必须在内存中计算属性值. 2.这个接口是并发调用,不能很好的实现队列的机制. 3.mongodb不支持行级锁,且没有暴露出来,所以通过mongodb的锁机制去搞这个感觉也没什么思路 四.方法一 1.把最后那个user.save()改成 user.findOneAndUpdate({“id”:id,“updated_at”:user.updated_at},{update属性}) ,返回找不到对应的document时,可以重新走一遍这个流程,如果返回数据了,证明修改成功,这个依赖的是每次update操作,都会更新其updated_at字段,变相的用程序实现乐观锁,这样做会不会陷入死循环状态?这样会不会有其它问题? 2.希望各位大神有其它的思路和方法指教一下.
mongodb其实不是很适合做这类操作,因为它只支持文档级别的原子操作,并且没有事务,由于node单线程,天生异步,因此node里面没有锁的概念,而且node也不需要锁。所以在这种情况下只能考虑从第三方来实现,比如可以借助redis实现一个分布式锁。
@nullcc 基于redis实现一个分布式锁,刚才刷其它网站时看到了,对redis的分布式锁没有经验,我要去看下如何实现…因为目前的这个项目是以cluster模式启动了(以后可能是发展为集群),所以借助redis来实现分布式锁我觉得是很不错的方向.感觉回答.
@Fov6363 具体来说,你有一个资源A,你想通过一个接口来更新这个资源A,并且在这个接口中除了更新A还有一些其他操作。比如我们需要先从DB中获取A,做一些相关操作,计算出A的一个值,然后真正更新A。这个过程中有可能产生竞争,导致数据一致性有问题。
如果用redis实现一个分布式锁,考虑最简单的情况,一般来说mongodb的文档都有一个_id
,就拿这个_id
做key,value为1,表示同一时间只能有一个操作方操作资源A。redis的incr和decr操作都是原子的,所以在你要操作这个资源A的时候,要像在多线程编程中先acquire lock,操作完毕要release lock。这是最基本的情况,如果要做到稳定高效,你还要考虑获取锁超时、执行失败时release lock的问题。大致是这个思路。
@Fov6363 这个库应该是可以用的。文档的第一句已经表明了它的用处: This is a node.js implementation of the redlock algorithm for distributed redis locks. It provides strong guarantees in both single-redis and multi-redis environments, and provides fault tolerance through use of multiple independent redis instances or clusters.
具体到例子里面也看到了一些ttl的设置,error handling之类的,仔细研究一下用法就行。
const debug = require('debug');
const Promise = require('bluebird');
const http = require('http');
var client1 = require('redis').createClient(6379, '127.0.0.1');
var Redlock = require('redlock');
const Db = require('mongodb').Db;
const MongoClient = require('mongodb').MongoClient;
const Server = require('mongodb').Server;
var collection;
var redlock = new Redlock(
// you should have one client for each redis node
// in your cluster
[client1],
{
// the expected clock drift; for more details
// see http://redis.io/topics/distlock
driftFactor: 0.01, // time in ms
// the max number of times Redlock will attempt
// to lock a resource before erroring
retryCount: 10,
// the time in ms between attempts
retryDelay: 200, // time in ms
// the max time in ms randomly added to retries
// to improve performance under high contention
// see https://www.awsarchitectureblog.com/2015/03/backoff.html
retryJitter: 200 // time in ms
}
);
const hostname = '127.0.0.1';
const port = 3000;
const server = http.createServer((req, res) => {
res.statusCode = 200;
res.setHeader('Content-Type', 'text/plain');
// res.end('Hello World\n');
update(res);
});
var resource = "1219924275036161";
var ttl = 10000;
MongoClient.connect('mongodb://localhost:27017/baas',async function(err, db) {
// Create a collection
collection = db.collection('users');
// Insert the docs
server.listen(port, hostname, () => {
console.log(`Server running at http://${hostname}:${port}/`);
});
});
function update(res) {
console.log('-----');
redlock.lock(resource,ttl).then(lock => {
collection.find({'id':1219924275036161}).toArray(function (err,result) {
let age = result[0].user_data.age;
age += 1;
// age = 0;
collection.updateOne({'id':1219924275036161},{'$set':{'user_data.age':age}},function (err,result) {
lock.unlock();
res.end('nihao');
})
});
})
}
赋上一个小demo,其中运行了两次,第一次不使用redlock包,第二次使用redloack包.使用wrk工具测试如下: 第一次
➜ wrk wrk -t4 -c200 -d30s --latency http://127.0.0.1:3000
Running 30s test @ http://127.0.0.1:3000
4 threads and 200 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 82.95ms 35.47ms 408.34ms 83.17%
Req/Sec 620.24 185.87 1.15k 67.79%
Latency Distribution
50% 74.00ms
75% 95.12ms
90% 121.55ms
99% 236.99ms
73916 requests in 30.10s, 9.16MB read
Socket errors: connect 0, read 67, write 0, timeout 0
Requests/sec: 2455.65
Transfer/sec: 311.75KB
最后的age为976,起始为0. 第二次:
➜ wrk wrk -t4 -c200 -d30s --latency http://127.0.0.1:3000
Running 30s test @ http://127.0.0.1:3000
4 threads and 200 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 155.27ms 341.91ms 2.00s 86.25%
Req/Sec 165.63 144.66 0.91k 76.46%
Latency Distribution
50% 1.38ms
75% 3.83ms
90% 658.86ms
99% 1.50s
18377 requests in 30.09s, 2.28MB read
Socket errors: connect 0, read 87, write 0, timeout 64
Requests/sec: 610.74
Transfer/sec: 77.53KB
QPS显示下降的很厉害,age此时为19380,起始值为976,其中在运行时,报一些未捕捉reject的错误,应该是这些错误导致了实际结果与预测结果有误.
第二个例子里的
Socket errors: connect 0, read 87, write 0, timeout 64
出现了64个连接超时的,这部分影响也很大。 还有就是一些参数的调优,因为这个锁是独占锁,可以计算出在一段时间内,一共能够成功进行几次操作。
./wrk -t4 -c200 -d30s --latency http://127.0.0.1:3000
Running 30s test @ http://127.0.0.1:3000
4 threads and 200 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 52.63ms 209.61ms 2.00s 93.22%
Req/Sec 437.32 414.88 1.65k 73.17%
Latency Distribution
50% 766.00us
75% 0.88ms
90% 1.51ms
99% 1.17s
30797 requests in 30.09s, 3.82MB read
Socket errors: connect 0, read 90, write 0, timeout 32
Requests/sec: 1023.42
Transfer/sec: 129.93KB
@Fov6363 我机器上的结果。
@nullcc 收到,很强