AsyncJS 异步流程控制DEMO详细介绍
发布于 6 年前 作者 oliyg 4250 次浏览 来自 分享

1. 基本流程

串行流程、并行流程、混合执行 series, waterfall; parallel, parallelLimit; auto;

1.1. 串行流程

1.1.1. series(多个函数依次执行,之间没有数据交换)

有多个异步函数需要依次调用,一个完成之后才能执行下一个。各函数之间没有数据的交换,仅仅需要保证其执行顺序。这时可使用series。

async.series([
    function(callback) {
        // do some stuff ...
        callback(null, 'one');
    },
    function(callback) {
        // do some more stuff ...
        callback(null, 'two');
    }
],
// optional callback
function(err, results) {
    // results is now equal to ['one', 'two']
});

另外还需要注意的是:多个series调用之间是不分先后的,因为series本身也是异步调用。

1.1.2. waterfall(多个函数依次执行,且前一个的输出为后一个的输入)

与seires相似,按顺序依次执行多个函数。不同之处,每一个函数产生的值,都将传给下一个函数。如果中途出错,后面的函数将不会被执行。错误信息以及之前产生的结果,将传给waterfall最终的callback。

注意,该函数不支持json格式的tasks

async.waterfall([
    function(callback) {
        callback(null, 'one', 'two');
    },
    function(arg1, arg2, callback) {
        // arg1 now equals 'one' and arg2 now equals 'two'
        callback(null, 'three');
    },
    function(arg1, callback) {
        // arg1 now equals 'three'
        callback(null, 'done');
    }
], function (err, result) {
    // result now equals 'done'
});

1.2. 并行流程

1.2.1. parallel(多个函数并行执行)

并行执行多个函数,每个函数都是立即执行,不需要等待其它函数先执行。传给最终callback的数组中的数据按照tasks中声明的顺序,而不是执行完成的顺序。

如果某个函数出错,则立刻将err和已经执行完的函数的结果值传给parallel最终的callback。其它未执行完的函数的值不会传到最终数据,但要占个位置。

同时支持json形式的tasks,其最终callback的结果也为json形式。

示例代码:

async.parallel([
    function(callback) {
        setTimeout(function() {
            callback(null, 'one');
        }, 200);
    },
    function(callback) {
        setTimeout(function() {
            callback(null, 'two');
        }, 100);
    }
],
// optional callback
function(err, results) {
    // the results array will equal ['one','two'] even though
    // the second function had a shorter timeout.
});

以json形式传入tasks

async.parallel({
    one: function(callback) {
        setTimeout(function() {
            callback(null, 1);
        }, 200);
    },
    two: function(callback) {
        setTimeout(function() {
            callback(null, 2);
        }, 100);
    }
}, function(err, results) {
    // results is now equals to: {one: 1, two: 2}
});

1.2.2. parallelLimit(多个函数并行执行并限制同时执行的最大数量)

parallelLimit 与上面的 parallel 最大的不同就是限制最大同时执行的数量

async.parallelLimit([(cb) => {
  setTimeout(() => {
    cb(null, 'one');
  }, 1000);
}, (cb) => {
  setTimeout(() => {
    cb(null, 'two');
  }, 2000);
}], 1, (err, value) => {
  log(value);
});
// 需时3s左右

1.3. 混合执行

1.3.1. auto(tasks, [callback]) (多个函数有依赖关系,有的并行执行,有的依次执行)

用来处理有依赖关系的多个任务的执行。比如某些任务之间彼此独立,可以并行执行;但某些任务依赖于其它某些任务,只能等那些任务完成后才能执行。

如异步获取两个数据并打印:

async.auto({
  getData: function (callback) {
    setTimeout(() => {
      log('data got')
      callback(null, 'data');
    }, 3000);
  },
  getAnotherData: function (callback) {
    setTimeout(() => {
      log('another data got')
      callback(null, 'another data');
    }, 1000);
  },
  printData: ['getData', 'getAnotherData', function (result, callback) {
    log(result);
  }]
});
// another data got
// data got
// { getAnotherData: 'another data', getData: 'data' }

1.3.2. autoInject(tasks, callbackopt) (与auto类似但回调函数被作为参数传入下一个串行函数)

Dependent tasks are specified as parameters to the function

可以说算是 auto 方法的语法糖

async.autoInject({
    getData: function (callback) {
        axios({ methods: 'get', url: 'http://baidu.com/' }).then(d => callback(null, d.status))
    },
    getAnotherData: function (callback) {
        axios({ methods: 'get', url: 'http://sogou.com/' }).then(d => callback(null, d.status))
    },
    writeFile: function (getData, getAnotherData, callback) {
        fs.writeFile('./d.json', JSON.stringify([getData, getAnotherData]), function (err) {
            if (err) { callback(err) } else { callback(null, 'finish') }
        })
    }
}, function (err, result) {
    if (err) { console.log(err) } else {
        console.log(result) // { getData: 200, getAnotherData: 200, writeFile: 'finish' }
    }
})

2. 循环流程

2.1. whilst(用可于异步调用的while)

相当于while,但其中的异步调用将在完成后才会进行下一次循环。举例如下:

var count = 0;
async.whilst(
  () => { return count < 5; }, // 是否满足条件
  (callback) => { // 满足条件执行函数
    count++;
    setTimeout(() => {
      log(count)
      callback(null, count);
    }, 1000);
  },
  (err, value) => { // 不满足条件完成循环
    log('result: ' + count);
  }
);

2.2. doWhilst (后验证的异步while)

首先执行函数

count = 0;
async.doWhilst((callback) => { // 先执行函数
  count++;
  setTimeout(() => {
    log(count);
    callback(null, count);
  }, 1000);
}, () => { // 后验证条件
  return count < 5;
}, (err, value) => { // 主回调
  log('result: ' + count);
});

2.3. until (与while相似,但判断条件相反)

var count = 0;
async.until(
  () => { return count >= 5; },
  (callback) => {
    count++;
    setTimeout(() => {
      log(count)
      callback(null, count);
    }, 1000);
  },
  (err, value) => {
    log('result: ' + count);
  }
);

2.4. doUntil (后验证Until循环)

var count = 0;
async.doUntil(
  (callback) => { // 先执行
    count++;
    setTimeout(() => {
      log(count)
      callback(null, count);
    }, 1000);
  },
  () => { return count >= 5; }, // 后验证
  (err, value) => {
    log('result: ' + count);
  }
);

2.5. during (类似whilst,回调判断)

与whilst类似,但测试的是一个异步函数的回调(err, true or false),判断回调第二参数是否为真

count = 0
async.during(callback => {
    callback(null, count < 5)
}, callback => {
    count++
    console.log(count)
    setTimeout(callback, 1000);
}, err => {
    if (err) console.log(err)
    console.log('finish')
})

2.6. doDuring (类似doWhilst,回调判断)

count = 0
async.doDuring(
    function (callback) { // 先执行一次
        count++;
        log(count);
        setTimeout(callback, 1000);
    },
    function (callback) { // 判断是否满足条件
        return callback(null, count < 5);
    },
    function (err) {
        // 5 seconds have passed
        log('finished');
    }
);

2.7. retry (按照频率重复执行多次直到成功位置)

在返回错误之前,尝试从函数获得成功响应的次数不超过规定的次数。 如果任务成功,则回调将传递成功任务的结果。 如果所有尝试都失败,回调将传递最终尝试的错误和结果。

async.retry({times: 3, interval: 200}, apiMethod, function(err, result) {
    log(result);
}); // 尝试三次,间隔200,打印结果

2.8. retryable

包装任务使其可以retry

async.auto({
    dep1: async.retryable(3, getFromFlakyService),
    process: ["dep1", async.retryable(3, function (results, cb) {
        maybeProcessData(results.dep1, cb);
    })]
}, callback);

2.9. times (重复调用 n 次)

调用 n 次,序号 n 可作为参数传入函数

async.times(5, (n, next) => {
    axios({ methods: 'get', url: 'http://sogou.com/' }).then(d => next(null, d.status)).catch(e => next(e))
}, (err, result) => {
    if (err) {
        console.log(err)
    } else {
        console.log(result) // [ 200, 200, 200, 200, 200 ]
    }
})

2.10. timesLimit (与times类似,但可限制并行执行最大数量)

timesLimit(count, limit, iteratee, callback)

2.11. timesSeries (只可并行执行一个)

The same as times but runs only a single async operation at a time.

timesSeries(n, iteratee, callback)

2.12. forever (除非捕获到错误,将允许一直执行——调用自己)

记得调用 next

async.forever((next) => { setTimeout(() => { axios({ methods: ‘get’, url: ‘http://www.baidu.com/’ }).then(d => console.log(d.status)).then(next) }, 1000) }, error => { if (error) { console.log(error) } })

3. 集合流程

Async提供了很多针对集合的函数,可以简化我们对集合进行异步操作时的步骤

3.1. each 集合中的元素执行函数

对所有集合中的所有元素,都执行同一个函数,并行:

Note, that since this function applies iteratee to each item in parallel, there is no guarantee that the iteratee functions will complete in order.不能够保证按照顺序完成

let data = [
  'http://www.baidu.com',
  'http://www.sogou.com',
  'http://www.bing.com',
];

async.each(data, (item, callback) => {
  console.log('processing...' + item);

  axios({
    methods: 'get',
    url: item
  }).then(d => {
    console.log(d.data.length);
    callback();
  });

}, (err) => {
  if (err) console.log(err);
});

// processing...http://www.baidu.com
// processing...http://www.sogou.com
// processing...http://www.bing.com
// 22430
// 111984
// 116593

3.2. eachLimit

The same as each but runs a maximum of limit async operations at a time.

let data = [
  'http://www.baidu.com',
  'http://www.sogou.com',
  'http://www.bing.com',
];

async.eachLimit(data, 2, (item, callback) => {
  console.log('processing...' + item);

  axios({
    methods: 'get',
    url: item
  }).then(d => {
    console.log(d.data.length);
    callback();
  });

}, (err) => {
  if (err) console.log(err);
});

// processing...http://www.baidu.com
// processing...http://www.sogou.com
// 22430
// processing...http://www.bing.com
// 112087
// 116593

3.3. eachOf/forEachOf

Like each, except that it passes the key (or index) as the second argument to the iteratee. 执行的函数有三个参数分别是:item, index, callback

let data = [
  'http://www.baidu.com',
  'http://www.sogou.com',
  'http://www.bing.com',
];

// async.forEachOf(data, (item, index, callback) => { forEachOf 与 eachOf 相同
async.eachOf(data, (item, index, callback) => {
  console.log('processing...NO.' + index);

  axios({
    methods: 'get',
    url: item
  }).then(d => {
    console.log(d.data.length);
    callback();
  });

}, (err) => {
  if (err) console.log(err);
});
// processing...NO.0
// processing...NO.1
// processing...NO.2
// 112477
// 22430
// 116593

3.4. eachOfLimit/forEachOfLimit

The same as eachOf but runs a maximum of limit async operations at a time. 多个限制参数,不再累述

3.5. eachOfSeries/forEachOfSeries

The same as eachOf but runs only a single async operation at a time. 相当于eachOfLimit 限制为 1

let data = [
  'http://www.baidu.com',
  'http://www.sogou.com',
  'http://www.bing.com',
];

async.eachOfSeries(data, (item, index, callback) => {
  console.log('processing...NO.' + index);

  axios({
    methods: 'get',
    url: item
  }).then(d => {
    console.log(d.data.length);
    callback();
  });

}, (err) => {
  if (err) console.log(err);
});
// processing...NO.0
// 111979
// processing...NO.1
// 22430
// processing...NO.2
// 116593

3.6. eachSeries/forEachSeries

相当于 eachLimit 限制为 1

3.7. map 返回集合数据执行函数的结果集合

there is no guarantee that the iteratee functions will complete in order 不保证按顺序完成

let data = [
  'http://www.baidu.com',
  'http://www.sogou.com',
  'http://www.bing.com',
];

async.map(data, (item, callback) => {
  console.log('processing...NO.' + item);

  axios({
    methods: 'get',
    url: item
  }).then(d => {
    callback(null, d.data.length);
  });

}, (err, value) => {
  if (err) console.log(err);
  console.log(value);
});
// processing...NO.http://www.baidu.com
// processing...NO.http://www.sogou.com
// processing...NO.http://www.bing.com
// [112677, 22430, 116593]

3.8. mapLimit

与 map 相似,多了限制参数

3.9. mapSeries

相当于 mapLimit 限制 1

3.10. mapValues

用于处理对象

A relative of map, designed for use with objects. 针对遍历对象值的情况

let obj = {
  site1: 'http://www.baidu.com',
  site2: 'http://www.sogou.com',
  site3: 'http://www.bing.com'
};

async.mapValues(obj, (value, key, callback) => {
  console.log('processing...NO.' + key);

  axios({
    methods: 'get',
    url: value
  }).then(d => {
    callback(null, d.data.length);
  });

}, (err, value) => {
  if (err) console.log(err);
  console.log(value);
});
// processing...NO.site1
// processing...NO.site2
// processing...NO.site3
// { site1: 112122, site2: 22430, site3: 116593 }

3.11. mapValuesLimit

多了限制参数

3.12. mapValuesSeries

相当于 mapValuesLimit 限制 1

3.13. filter 返回满足条件的原始元素

返回的是通过测试的原始数据,只有筛选,没有修改原始数据

Returns a new array of all the values in coll which pass an async truth test. 过滤返回经过验证结果为真的

This operation is performed in parallel, but the results array will be in the same order as the original. 最终结果将与原始数据的顺序一致

let data = [
  'http://www.baidu.com',
  'http://www.sogou.com',
  'http://www.bing.com',
];

async.filter(data, (item, callback) => {
  console.log('processing...NO.' + item);

  axios({
    methods: 'get',
    url: item
  }).then(d => {
    let length = d.data.length;
    if (length > 100000) {
      callback(null, true);
    } else {
      callback(null, false);
    }
  });

}, (err, value) => {
  if (err) console.log(err);
  console.log(value);
});
// processing...NO.http://www.baidu.com
// processing...NO.http://www.sogou.com
// processing...NO.http://www.bing.com
// ['http://www.baidu.com', 'http://www.bing.com']

3.14. filterLimit

3.15. filterSeries

3.16. reject 剔除满足条件的 (与filter相反)

与filter相反,通过验证的,予以剔除

The opposite of filter. Removes values that pass an async truth test.

3.17. rejectLimit

3.18. rejectSeries

3.19. detect 第一个满足条件的

Returns the first value in coll that passes an async truth test. 获取集合中第一个满足测试的数据

let data = [
  'http://www.sogou.com',
  'http://www.baidu.com',
  'http://www.bing.com',
];

async.detect(data, (item, callback) => {
  console.log('processing...NO.' + item);

  axios({
    methods: 'get',
    url: item
  }).then(d => {
    let length = d.data.length;
    if (length > 100000) {
      callback(null, true);
    } else {
      callback(null, false);
    }
  });

}, (err, value) => {
  if (err) console.log(err);
  console.log(value);
});
// processing...NO.http://www.sogou.com
// processing...NO.http://www.baidu.com
// processing...NO.http://www.bing.com
// http://www.baidu.com

3.20. detectLimit

3.21. detectSeries

3.22. some/any 至少有一个符合条件的触发主回调

If any iteratee call returns true, the main callback is immediately called. 一旦有符合条件的数据,马上触发主回调函数

let data = [
  'http://www.sogou.com',
  'http://www.baidu.com',
  'http://www.bing.com',
];

async.some(data, (item, callback) => {

  axios({
    methods: 'get',
    url: item
  }).then(d => {
    let length = d.data.length;
    if (length > 100000) {
      callback(null, true);
    } else {
      callback(null, false);
    }
  });

}, (err, isSatisfied) => {
  if (err) console.log(err);
  console.log(isSatisfied); // true
});

3.23. someLimit/anyLimit

3.24. someSeries/anySeries

3.25. every/all 全部满足条件返回true

Returns true if every element in coll satisfies an async test. If any iteratee call returns false, the main callback is immediately called. 全部满足条件时返回true

let data = [
  'http://www.sogou.com',
  'http://www.baidu.com',
  'http://www.bing.com',
];

async.every(data, (item, callback) => {

  axios({
    methods: 'get',
    url: item
  }).then(d => {
    let length = d.data.length;
    if (length > 0) {
      callback(null, true);
    } else {
      callback(null, false);
    }
  });

}, (err, isSatisfied) => {
  if (err) console.log(err);
  console.log(isSatisfied); // true
});

3.26. everyLimit/allLimit

3.27. everySeries/allSeries

3.28. concat 合并结果

将结果合并,同样的,不能保证结果按照原来的顺序排序

let data = [
  'http://www.sogou.com',
  'http://www.baidu.com',
  'http://www.bing.com',
];

async.concat(data, (item, callback) => {

  axios({
    methods: 'get',
    url: item
  }).then(d => {
    callback(null, { item: item, length: d.data.length });
  });

}, (err, value) => {
  if (err) console.log(err);
  console.log(value);
});
// [ { item: 'http://www.sogou.com', length: 22430 },
//   { item: 'http://www.baidu.com', length: 111979 },
//   { item: 'http://www.bing.com', length: 116550 } ]

3.29. concatLimit

3.30. concatSeries

3.31. groupBy 结果根据对象的key分组

传入一系列对象,并根据设置的 key 进行分组

let data = [{
  year: 2001,
  url: 'http://www.baidu.com'
}, {
  year: 2001,
  url: 'http://www.sogou.com'
}, {
  year: 2017,
  url: 'http://www.bing.com'
}]

async.groupBy(data, (item, callback) => {

  axios({
    methods: 'get',
    url: item.url
  }).then(d => {
    callback(null, item.year); // 按照 year 分组
  });

}, (err, value) => {
  if (err) console.log(err);
  console.log(value);
});
// {
//   '2001':
//   [{ year: 2001, url: 'http://www.baidu.com' },
//   { year: 2001, url: 'http://www.sogou.com' }],
//     '2017': [{ year: 2017, url: 'http://www.bing.com' }]
// }

3.32. groupByLimit

3.33. groupBySeries

3.34. reduce 串行累加集合数据

Reduces coll into a single value 逐渐累加

This function only operates in series. 只支持 series 模式,不支持并行

This function is for situations where each step in the reduction needs to be async; if you can get the data before reducing it, then it’s probably a good idea to do so. 适合每一 reduce 步都需要异步获取数据的情况

async.reduce([2, 3, 4], 1, (memo, item, callback) => { // 1 为 memo

  setTimeout(() => {
    callback(null, memo + item);
  }, 100);

}, (err, value) => {
  if (err) console.log(err);
  console.log(value); // 10
});

3.35. reduceRight

Same as reduce, only operates on array in reverse order. 与reduce类似

3.36. srotBy 按顺序排列

排列顺序(item / item * -1)

async.sortBy([13, 21, 321, 421, 3, 21, , , , 23121, 1], (item, callback) => {

  setTimeout(() => {
    callback(null, item * -1); // 从大到小
    // callback(null, item); // 从小到大
  }, 100);

}, (err, value) => {
  if (err) console.log(err);
  console.log(value);
});
// [23121, 421, 321, 21, 21, 13, undefined, undefined, undefined, 3, 1]

3.37. transform 通过某种规则转化集合

acc 意为 accumulate,obj 则是 object

async.transform([1, 2, 3], (acc, item, index, callback) => {

  setTimeout(() => {
    acc.push(index + ': ' + item);
    callback(null);
  }, 100);

}, (err, value) => {
  if (err) console.log(err);
  console.log(value);
});
// ['0: 1', '1: 2', '2: 3']

对象的例子:

async.transform({ name: 'oli', age: 12 }, (obj, val, key, callback) => {

  setTimeout(() => {
    obj[key] = val + '...';
    callback(null);
  }, 100);

}, (err, value) => {
  if (err) console.log(err);
  console.log(value);
});
// { name: 'oli...', age: '12...' }

4. 其他

4.1. tryEach ()

If one of the tasks were successful, the callback will be passed the result of the successful task 一旦其中一个成功,则callback返回该成功的任务的返回值

It runs each task in series but stops whenever any of the functions were successful. 测试哪个成功

async.tryEach([
    function getDataFromFirstWebsite(callback) {
        // Try getting the data from the first website
        callback(err, data);
    },
    function getDataFromSecondWebsite(callback) {
        // First website failed,
        // Try getting the data from the backup website
        callback(err, data);
    }
],
// optional callback
function(err, results) {
    Now do something with the data.
});

4.2. race (哪个优先结束)

并行运行任务函数的数组,一旦任何一个完成或传递错误信息,主回调将立即调用。相当于 Promise.race()

async.race([
    function(callback) {
        setTimeout(function() {
            callback(null, 'one');
        }, 200);
    },
    function(callback) {
        setTimeout(function() {
            callback(null, 'two'); // 优先触发
        }, 100);
    }
],
// 主回调
function(err, result) {
    // the result will be equal to 'two' as it finishes earlier
});

4.3. compose (将多个异步函数串联在一起返回一个新的函数 类似 f(g(h())))

把f(),g(),h()异步函数,组合成f(g(h()))的形式

Each function consumes the return value of the function that follows

注意执行的顺序,这里的 add1mul3 即为 先执行 mul3 然后执行 add1

function add1(n, callback) {
  setTimeout(function () {
    callback(null, n + 1);
  }, 10);
}

function mul3(n, callback) {
  setTimeout(function () {
    callback(null, n * 3);
  }, 10);
}

var add1mul3 = async.compose(mul3, add1); // 1) add1() 2) mul3()
add1mul3(4, function (err, result) {
  console.log(result); // 15
});

4.4. seq

Each function consumes the return value of the previous function. 与 compose 类似

pp.get('/cats', function(request, response) {
    var User = request.models.User;
    async.seq(
        _.bind(User.get, User),  // 'User.get' has signature (id, callback(err, data))
        function(user, fn) {
            user.getCats(fn);      // 'getCats' has signature (callback(err, data))
        }
    )(req.session.user_id, function (err, cats) {
        if (err) {
            console.error(err);
            response.json({ status: 'error', message: err.message });
        } else {
            response.json({ status: 'ok', message: 'Cats found', data: cats });
        }
    });
});

4.5. apply (给函数预绑定参数)

async.parallel([
    async.apply(fs.writeFile, 'testfile1', 'test1'),
    async.apply(fs.writeFile, 'testfile2', 'test2')
]);

4.6. applyEach

async.applyEach([enableSearch, updateSchema], 'bucket', callback);

4.7. applyEachSeries

only a single async operation at a time.

4.8. queue (串行的消息队列)

是一个串行的消息队列,通过限制了worker数量,不再一次性全部执行。当worker数量不够用时,新加入的任务将会排队等候,直到有新的worker可用

If all workers are in progress, the task is queued until one becomes available

queue(worker, concurrency) concurrency 用来定义worker的数量

let q = async.queue(function (task, callback) {
  console.log('deal with ' + task.url);
  setTimeout(() => {
    axios({
      methods: 'get',
      url: task.url
    }).then((d) => {
      console.log(d.data.length);
      callback();
    });
  }, 2000);
}, 1);

q.drain = function () {
  console.log('all done');
}
q.push({ url: 'http://www.baidu.com' }, function (err) {
  if (err) {
    console.log(err);
  }
});
q.unshift([{ url: 'http://www.baidu.com' }, { url: 'http://www.sogou.com' }], function (err) {
  if (err) {
    console.log(err);
  }
});
// deal with http://www.sogou.com
// 22430
// deal with http://www.baidu.com
// 112041
// deal with http://www.baidu.com
// 112220
// all done

4.9. priorityQueue(worker, concurrency)

与 queue 不同的是,push 可以设置 priority push(task, priority, [callback])

4.10. cargo (串行的消息队列,每次负责一组任务)

While queue passes only one task to one of a group of workers at a time, cargo passes an array of tasks to a single worker, repeating when the worker is finished. queue 一个 worker 负责一个任务,但 cargo 则是一个 worker 负责一组任务。

// create a cargo object with payload 2
var cargo = async.cargo(function(tasks, callback) {
    for (var i=0; i<tasks.length; i++) {
        console.log('hello ' + tasks[i].name);
    }
    callback();
}, 2);

// add some items
cargo.push({name: 'foo'}, function(err) {
    console.log('finished processing foo');
});
cargo.push({name: 'bar'}, function(err) {
    console.log('finished processing bar');
});
cargo.push({name: 'baz'}, function(err) {
    console.log('finished processing baz');
});

4.11. nextTick ()

浏览器使用 setImmediate

Calls callback on a later loop around the event loop

var call_order = [];

async.nextTick(function () {
  call_order.push('two'); // 4
  console.log(call_order);
});

setTimeout(() => {
  call_order.push('four'); // 7
  console.log(call_order);
}, 100);

async.nextTick(function () {
  call_order.push('three'); // 5
  console.log(call_order);
});

console.log(call_order); // 1

call_order.push('one'); // 2

console.log(call_order); // 3

async.setImmediate(function (a, b, c) {
  console.log(a, b, c); // 6
}, 1, 2, 3);

4.12. asyncify (同步函数转化成异步函数)

直接传入同步方法:

async.waterfall([
    async.apply(fs.readFile, './d.json', "utf8"),
    async.asyncify(JSON.parse), // 同步函数
    function (data, next) { // 转成异步
        console.log(data) // data...
        next(data) // passing data...
    }
], (data) => {
    console.log(data); // data...
});

包裹一层函数:

async.waterfall([
    async.asyncify(function () {
        // 同步函数转称异步函数传入参数需要用到一层function并return结果
        return fs.readFileSync('./d.json', "utf8")
    }),
    function (data, next) { // 执行回调并调用 next
        console.log(data.length)
        next(data)
    },
    async.asyncify(JSON.parse),
    function (data, next) {
        console.log(data)
        next(data)
    }
], (data) => {
    console.log(data);
});

4.13. constant (一般用来给串行流程的下个阶段传值)

如asyncify的例子可以修改为:

async.waterfall([
    async.constant('./d.json', 'utf8'), // 设置参数
    fs.readFile, // 调用参数
    function (data, next) {
        console.log(data.length)
        next(data)
    },
    async.asyncify(JSON.parse),
    function (data, next) {
        console.log(data)
        next(data)
    }
], (data) => {
    console.log(data);
});

4.14. dir (调用console.dir)

Logs the result of an async function to the console using console.dir to display the properties of the resulting object. 注意浏览器兼容性 console.dir 适用于 FireFox and Chrome

// in a module
var hello = function(name, callback) {
    setTimeout(function() {
        callback(null, {hello: name});
    }, 1000);
};

// in the node repl
node> async.dir(hello, 'world');
{hello: 'world'}

4.15. ensureAsync (确保任务异步执行)

function sometimesAsync(arg, callback) {
    if (cache[arg]) {
        return callback(null, cache[arg]); // this would be synchronous!!
    } else {
        doSomeIO(arg, callback); // this IO would be asynchronous
    }
}

// this has a risk of stack overflows if many results are cached in a row
async.mapSeries(args, sometimesAsync, done);

// this will defer sometimesAsync's callback if necessary,
// preventing stack overflows
async.mapSeries(args, async.ensureAsync(sometimesAsync), done);

4.16. log (调用console.log)

// in a module
var hello = function(name, callback) {
    setTimeout(function() {
        callback(null, 'hello ' + name);
    }, 1000);
};

// in the node repl
node> async.log(hello, 'world');
'hello world'

4.17. memoize (缓存结果)

Caches the results of an async function. When creating a hash to store function results against, the callback is omitted from the hash and an optional hash function can be used.

var slow_fn = function(name, callback) {
    // do something
    callback(null, result);
};
var fn = async.memoize(slow_fn);

// fn can now be used as if it were slow_fn
fn('some name', function() {
    // callback
});

例如:

var slow_fn = function (url, callback) {
    axios({ methods: 'get', url }).then(e => {
        console.log(e.status)
        callback(null, e.data)
    })
};
var fn = async.memoize(slow_fn);

fn('http://baidu.com', function (e, data) {
    console.dir('done')
});
fn('http://sogou.com', function (e, data) {
    console.dir('done')
});
// 200
// 'done'
// 200
// 'done'

4.18. unmemoize

Undoes a memoized function, reverting it to the original

4.19. nextTick

var call_order = [];
async.nextTick(function() {
    call_order.push('two');
    // call_order now equals ['one','two']
});
call_order.push('one');

async.setImmediate(function (a, b, c) {
    // a, b, and c equal 1, 2, and 3
}, 1, 2, 3);

4.20. reflect (遇到error继续执行)

always completes with a result object, even when it errors.

async.parallel([
    async.reflect(function(callback) {
        // do some stuff ...
        callback(null, 'one');
    }),
    async.reflect(function(callback) {
        // do some more stuff but error ...
        callback('bad stuff happened');
    }),
    async.reflect(function(callback) {
        // do some more stuff ...
        callback(null, 'two');
    })
],
// optional callback
function(err, results) {
    // values
    // results[0].value = 'one'
    // results[1].error = 'bad stuff happened'
    // results[2].value = 'two'
});

例如:

async.parallel([
    async.reflect((callback) => {
        setTimeout(() => {
            callback('error')
        }, 1000)
    }),
    (callback) => {
        setTimeout(() => {
            callback(null, 'data')
        }, 2000)
    }
], (err, result) => {
    if (err) {
        console.error(err)
    } else {
        console.dir(result) // [ { error: 'error' }, 'data' ]
    }
})

4.21. reflectAll (包裹reflect)

A helper function that wraps an array or an object of functions with reflect.

let tasks = [
    function(callback) {
        setTimeout(function() {
            callback(null, 'one');
        }, 200);
    },
    function(callback) {
        // do some more stuff but error ...
        callback(new Error('bad stuff happened'));
    },
    function(callback) {
        setTimeout(function() {
            callback(null, 'two');
        }, 100);
    }
];

async.parallel(async.reflectAll(tasks), // reflectAll 包含了tasks任务列表
// optional callback
function(err, results) {
    // values
    // results[0].value = 'one'
    // results[1].error = Error('bad stuff happened')
    // results[2].value = 'two'
});

4.22. setImmediate (浏览器中相当于 setTimeout(callback, 0))

var call_order = [];
async.nextTick(function() {
    call_order.push('two');
    // call_order now equals ['one','two']
});
call_order.push('one');

async.setImmediate(function (a, b, c) {
    // a, b, and c equal 1, 2, and 3
}, 1, 2, 3);

4.23. timeout (限制异步函数timeout最长延迟,时间过了返回错误)

Sets a time limit on an asynchronous function.

If the function does not call its callback within the specified milliseconds, it will be called with a timeout error. 超过时间返回错误:‘ETIMEDOUT’

function getData(url, callback) {
    axios({ methods: 'get', url }).then(d => {
        setTimeout(() => {
            return callback(null, d.status)
        }, 1000);
    })
}

let wrappeed = async.timeout(getData, 1000)

wrappeed('http://baidu.com', (err, data) => {
    if (err) {
        console.log(err)
    } else {
        console.log(data)
    }
})
// { Error: Callback function "getData" timed out.
//     at Timeout.timeoutCallback [as _onTimeout] (/Users/apple/test/node_modules/_async@2.6.0@async/dist/async.js:4916:26)
//     at ontimeout (timers.js:386:14)
//     at tryOnTimeout (timers.js:250:5)
//     at Timer.listOnTimeout (timers.js:214:5) code: 'ETIMEDOUT' }
3 回复

有 Async Functions,async 模块就是浮云啦

为毛不用原生的 async/await ?

老哥 为啥不用原生呢 如果是新项目基本上都上async await 了吧 你这个 我之前也在用 也不错

回到顶部