nodejs 生产者跟消费者有没有好的解决方法
发布于 6 个月前 作者 papandadj 1400 次浏览 来自 问答

在写代码的时候碰见了一个问题。 首先, 我需要维护一个队列, 有多个方法可以在队列里面插入对象。比如:

let fifo = [];
function productor1(){
	fifo.push[obj1]
}
function productor2(){
	fifo.push[obj2]
}

然后, 有一个方法来消耗这个数组, 这个是同步的, 第一个数组元素消耗完才能消耗第二个数组元素。

function consumer(){
	let obj = fifo.shift()
	//处理
}

我应该怎样做到生产者跟消费者同时进行, 并且当数组为空的时候消费者休息, 当生产者往里面加入数据的时候, 消费者又马上开始消费。 尽量用emit来实现, 不要用定时器来判断, 感觉太占资源了

9 回复

@zy445566 这个数组里面的东西是同步执行的, 就是数组第一个索引位置执行完才能执行第二个, 你上面这么应该会出现两个同时执行吧?

刚刚点错了,还没写完,楼主真是秒回啊😓。加个是否在运行的判断就好了 消费者再调一下消费者,这个刚刚加的,这样消费者就不断运行了,队列空了就结束消费就好了 还有自己调自己要取消isRun判断

const Events= require('events');
const emitter = new Events();

let fifo = [];
function productor1(){
    fifo.push(1)
    emitMsg()
}
function productor2(){
    fifo.push(2)
    emitMsg()
}

function emitMsg() {
    if (fifo.length==1) {
        emitter.emit('msg');
    } 
}

let isRun = false;

function consumer(self=false) {
    if (isRun && !self){return;}
    isRun=true;
    // 模拟处理耗时
    setTimeout(()=>{
        let obj = fifo.shift();
        console.log(obj)
        if (fifo.length==0) {isRun=false;return;}
        consumer(true);
    },1000)
}

emitter.on('msg', () => {
    consumer()
});

(()=>{
    productor1();
    productor2();
})();

@zy445566 好的 谢谢。 理解意思

@atian25 自己生写算了

@zy445566 把isRun=false;写在上面是不是就不需要那个self了呀

@heguangda 写在上面只是一个默认值

试试 rxjs6。以下为 typescript 代码,可抹掉类型相关代码换成 js

import { Subject } from 'rxjs';

 // 定义类型 
export interface FifoItemType {
  ip: string
}

// 生产者流。 FifoItemType 为入口参数类型,省事可用 any 类型
const fifo$ = new Subject<FifoItemType>();
// 消费者订阅
const consumerSub = fifo$.subscribe(consumer)

// 生产数据
fifo$.next({ ip: '192.168.1.1' })
fifo$.next({ ip: '192.168.1.2' })

// 具体消费实现
function consumer(data: FifoItemType){
	//处理
	console.log(data.ip)
}

可以 export fifo$ 这样,其他地方都可以调用 .next() 方法生产数据. 上面的 consumerSub 表示 消费者订阅(consumer subscription),可执行 consumerSub.unsubscribe() 来取消订阅。

ps: rxjs 是(异步/同步)流控制的利器。用熟了对于开发效率提升很多(当然调试堆栈不那么友好……)


混合同步及异步消费且耦合的升级版本(需考虑生产速度高于消费速度的背压情况):

import { defer, of, Observable, Subject } from 'rxjs'
import { concatMap, map, tap } from 'rxjs/operators'

// 定义类型 
export interface FifoItemType {
  ip: string
}

// 生产者流。 FifoItemType 为入口参数类型,省事可用 any 类型
const fifo$ = new Subject<FifoItemType>().pipe(
  concatMap(consumerAsync),
  concatMap(consumerSync),
  tap(consumer),
  tap(data => consumer(data)),
)

// 消费者订阅
const consumerSub = fifo$.subscribe()

// 生产数据
fifo$.next({ ip: '192.168.1.1' })
fifo$.next({ ip: '192.168.1.2' })

// 异步消费实现
function consumerAsync(data: FifoItemType): Observable<FifoItemType> {
  const ret$ = defer(() => fetch('saveip/' + data.ip)).pipe(
    map(() => data),
  )
  return ret$
}
// 同步消费实现
function consumerSync(data: FifoItemType): Observable<FifoItemType> {
	//处理
	console.log(data.ip)
	return of(data)
}
// 有副作用消费实现
let outerIp = ''
function consumer(data: FifoItemType): void {
  outerIp = data.ip
}

用js的协程, yield切换,next恢复

来自酷炫的 CNodeMD

回到顶部