Node.js高级程序员晋升系列之-从0开始实现一个锁
发布于 4 年前 作者 zhuozhongcao 3803 次浏览 来自 分享

接上一篇 Node.js高级程序员晋升系列之-Node进程模型解析和服务器端多进程部署 不是说Node是单线程,为什么会需要锁? 回答这个问题之前,我们先来看上一篇文章的一个例子 class WeixinTokenService { private token = null

async getToken() {
    if (!this.token) {
        this.token = await someHttpResquest()
    }
    return this.token
}

} 复制代码这段代码具体在什么时候会出bug呢?假设遇到一种这样的顺序

第一个请求到达,调用await someHttpRequest()之后交出控制权 第二个请求到达,在判断!this.token时为真,也开始请求await someHttpRequest() 之后哪一个请求后返回结果,this.token的值就是谁,但这个先后顺序是无法保证的,同时两次调用http接口这个行为也不是我们想要的

我们希望的应该是:当第一个接口请求的时候,另外一个接口就等着,等到第一个接口请求结束了,第二个接口就可以直接用第一个接口的结果 那我,我们第一反应就可以写出这样的代码 class WeixinTokenService { private token = null private isRequest = false private peddingCallback = []

getToken(callback) {
    if (this.token) {
        // 已经有token了
        return callback(this.token)
    }
    this.peddingCallback.push(callback)
    if (this.isRequest) {
        return
    }
    this.isRequest = true
    someHttpResquest().then(token => {
        this.token = token
        this.paddingCallback.forEach(call => call())
        this.paddingCallback = []
        this.isRequest = false
    })
}

} 复制代码这么写下来,我们发现了几个问题

代码逻辑很繁琐 无法使用Promise了,外层的async函数也会受到影响,当然我们也可以在外面再套一层return new Promise,但总体来说这个解决方案很差

让我们回忆一下如果是在Java这种语言中,我们如何解决这个问题(为了引出后续的话题,我们使用了Lock来解决这个问题而非此处更适合的synchronized关键字) public class WeixinTokenService { private String token = null private Lock lock = new Lock()

public String getToken() {
    if (this.token != null) {
        return this.token
    }
    this.lock.lock()
    // 思考一下这里为什么会重复一句
    if (this.token != null) {
        return this.token
    }
    try {
        this.token = someHttpResquest()
    } finally {
        this.lock.unlock()
    }
    return this.token
}

} 复制代码是的,java直接用一个进程锁在开始http请求之前锁住,这样第一个请求将this.token赋值之后再释放出锁,等其他的线程再重新拿到锁的时候,就可以直接用之前的结果(见注释处) 这给了我们一个灵感,我们也希望我们能在某一个把代码锁住,等其他的请求完成之后,再继续进行,但是很遗憾,node是没有阻塞这个概念的(Java的Lock是基于阻塞,即将一个线程停止掉,知道某一个事件发生之后重新恢复),而node只有一根线程,你要是这个这个线程阻塞住了(比如while (true) {}),整个应用也基本挂了。 然后我们有另外一个武器,他就是Promise,我们可以这么思考,将 this.lock.lock() //替换为 await this.lock.lock() 复制代码基于这个思路,你能否写出一个Lock类来? 下面是我们的实现(下面的实现可以同时锁住多个key,同时为了说清楚中间变量,我们用到了typescript)

interface ResolveRejectStorage { key: string; resolve: (data: Unlocker) => void; reject: (err: any) => void; }

class Unlocker { constructor(private lock: LockService, private key: string) {}

release() {
    this.lock.releaseKeys(this.key);
    this.key = null;
}

}

export class LockService { // 当前正在运行的任务的key集合 private keyList = new Set<string>(); private waiting: ResolveRejectStorage[] = [];

lock(key: string): Promise<Unlocker> {
    // 返回这个promise给外层的代码等待
    return new Promise<Unlocker>((resolve, reject) => {
        this.waiting.push({
            resolve,
            reject,
            key,
        });
        this.lockReally();
    });
}

releaseKeys(key: string) {
    this.keyList.delete(key);
    // 尝试其他任务
    this.lockReally();
}

private lockReally() {
    for (let task of this.waiting) {
        // 如果keyList中任意一个key被keys包含,就认为有冲突 需要等待 否则可以运行
        if (!this.keyList.has(task.key)) {
            // 加入keyList
            this.keyList.add(task.key);
            // 在waiting中remove掉
            let index = this.waiting.indexOf(task);
            this.waiting.splice(index, 1);

            // 最终的外层返回值
            let unlock = new Unlocker(this, task.key);
            task.resolve(unlock);

            // 如果10s之后还没有release自动unlock(程序可能出错了)
            setTimeout(() => {
                unlock.release();
            }, 10 * 1000);
        }
    }
}

}

复制代码基于LockService,我们的代码会变成这样 class WeixinTokenService { private token = null private lockService = new LockService()

async getToken() {
    let unlocker = await this.lockService.lock('fetch-wx-token')
    try {
        if (this.token) {
            return this.token
        }
        this.token = await someHttpResquest()
        return this.token
    } finally {
        unlocker.release()
    }
    
}

} 复制代码或者我们可以给LockService封装一个统一的方法 export class LockService { async runInLock<T>(key: string, fn: () => T | Promise<T>) { const unlocker = await this.lock(key) try { return await fn() } finally { unlocker.release() } } } 复制代码那么我们的调用代码会变成这样 class WeixinTokenService { private token = null private lockService = new LockService()

async getToken() {
    return this.lockService.runInLock('fetch-wx-token', async () => {
        if (this.token) {
            return this.token
        }
        this.token = await someHttpResquest()
        return this.token
    })
}

} 复制代码到现在为止,我们已经解决了单进程下的代码执行控制问题,我们只需要进行一个很小的推广就能适用于多进程的情况,我们下期再见。 欢迎加入Node开发高级进阶群,群主有时间就会给大家解决一些Node实战中会遇到的各种问题 173618474ba9c470 (1).png

作者:曹酌中 链接:https://juejin.im/post/5f156c26f265da22fb287f07 来源:掘金 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

回到顶部