接上一篇 Node.js高级程序员晋升系列之-Node进程模型解析和服务器端多进程部署 不是说Node是单线程,为什么会需要锁? 回答这个问题之前,我们先来看上一篇文章的一个例子 class WeixinTokenService { private token = null
async getToken() {
if (!this.token) {
this.token = await someHttpResquest()
}
return this.token
}
} 复制代码这段代码具体在什么时候会出bug呢?假设遇到一种这样的顺序
第一个请求到达,调用await someHttpRequest()之后交出控制权 第二个请求到达,在判断!this.token时为真,也开始请求await someHttpRequest() 之后哪一个请求后返回结果,this.token的值就是谁,但这个先后顺序是无法保证的,同时两次调用http接口这个行为也不是我们想要的
我们希望的应该是:当第一个接口请求的时候,另外一个接口就等着,等到第一个接口请求结束了,第二个接口就可以直接用第一个接口的结果 那我,我们第一反应就可以写出这样的代码 class WeixinTokenService { private token = null private isRequest = false private peddingCallback = []
getToken(callback) {
if (this.token) {
// 已经有token了
return callback(this.token)
}
this.peddingCallback.push(callback)
if (this.isRequest) {
return
}
this.isRequest = true
someHttpResquest().then(token => {
this.token = token
this.paddingCallback.forEach(call => call())
this.paddingCallback = []
this.isRequest = false
})
}
} 复制代码这么写下来,我们发现了几个问题
代码逻辑很繁琐 无法使用Promise了,外层的async函数也会受到影响,当然我们也可以在外面再套一层return new Promise,但总体来说这个解决方案很差
让我们回忆一下如果是在Java这种语言中,我们如何解决这个问题(为了引出后续的话题,我们使用了Lock来解决这个问题而非此处更适合的synchronized关键字) public class WeixinTokenService { private String token = null private Lock lock = new Lock()
public String getToken() {
if (this.token != null) {
return this.token
}
this.lock.lock()
// 思考一下这里为什么会重复一句
if (this.token != null) {
return this.token
}
try {
this.token = someHttpResquest()
} finally {
this.lock.unlock()
}
return this.token
}
} 复制代码是的,java直接用一个进程锁在开始http请求之前锁住,这样第一个请求将this.token赋值之后再释放出锁,等其他的线程再重新拿到锁的时候,就可以直接用之前的结果(见注释处) 这给了我们一个灵感,我们也希望我们能在某一个把代码锁住,等其他的请求完成之后,再继续进行,但是很遗憾,node是没有阻塞这个概念的(Java的Lock是基于阻塞,即将一个线程停止掉,知道某一个事件发生之后重新恢复),而node只有一根线程,你要是这个这个线程阻塞住了(比如while (true) {}),整个应用也基本挂了。 然后我们有另外一个武器,他就是Promise,我们可以这么思考,将 this.lock.lock() //替换为 await this.lock.lock() 复制代码基于这个思路,你能否写出一个Lock类来? 下面是我们的实现(下面的实现可以同时锁住多个key,同时为了说清楚中间变量,我们用到了typescript)
interface ResolveRejectStorage { key: string; resolve: (data: Unlocker) => void; reject: (err: any) => void; }
class Unlocker { constructor(private lock: LockService, private key: string) {}
release() {
this.lock.releaseKeys(this.key);
this.key = null;
}
}
export class LockService { // 当前正在运行的任务的key集合 private keyList = new Set<string>(); private waiting: ResolveRejectStorage[] = [];
lock(key: string): Promise<Unlocker> {
// 返回这个promise给外层的代码等待
return new Promise<Unlocker>((resolve, reject) => {
this.waiting.push({
resolve,
reject,
key,
});
this.lockReally();
});
}
releaseKeys(key: string) {
this.keyList.delete(key);
// 尝试其他任务
this.lockReally();
}
private lockReally() {
for (let task of this.waiting) {
// 如果keyList中任意一个key被keys包含,就认为有冲突 需要等待 否则可以运行
if (!this.keyList.has(task.key)) {
// 加入keyList
this.keyList.add(task.key);
// 在waiting中remove掉
let index = this.waiting.indexOf(task);
this.waiting.splice(index, 1);
// 最终的外层返回值
let unlock = new Unlocker(this, task.key);
task.resolve(unlock);
// 如果10s之后还没有release自动unlock(程序可能出错了)
setTimeout(() => {
unlock.release();
}, 10 * 1000);
}
}
}
}
复制代码基于LockService,我们的代码会变成这样 class WeixinTokenService { private token = null private lockService = new LockService()
async getToken() {
let unlocker = await this.lockService.lock('fetch-wx-token')
try {
if (this.token) {
return this.token
}
this.token = await someHttpResquest()
return this.token
} finally {
unlocker.release()
}
}
} 复制代码或者我们可以给LockService封装一个统一的方法 export class LockService { async runInLock<T>(key: string, fn: () => T | Promise<T>) { const unlocker = await this.lock(key) try { return await fn() } finally { unlocker.release() } } } 复制代码那么我们的调用代码会变成这样 class WeixinTokenService { private token = null private lockService = new LockService()
async getToken() {
return this.lockService.runInLock('fetch-wx-token', async () => {
if (this.token) {
return this.token
}
this.token = await someHttpResquest()
return this.token
})
}
} 复制代码到现在为止,我们已经解决了单进程下的代码执行控制问题,我们只需要进行一个很小的推广就能适用于多进程的情况,我们下期再见。 欢迎加入Node开发高级进阶群,群主有时间就会给大家解决一些Node实战中会遇到的各种问题
作者:曹酌中 链接:https://juejin.im/post/5f156c26f265da22fb287f07 来源:掘金 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。