节点异步循环 - 如何使此代码按顺序运行?

IT技术 javascript node.js asynchronous
2021-03-21 15:19:08

我知道有几篇关于这个的帖子,但根据我发现的那些帖子,这应该可以正常工作。

我想在一个循环中发出一个 http 请求,并且我不希望循环在请求回调被触发之前进行迭代。我正在使用异步库,如下所示:

const async = require("async");
const request = require("request");

let data = [
    "Larry",
    "Curly",
    "Moe"
];

async.forEachOf(data, (result, idx, callback) => {
    console.log("Loop iterated", idx);
    let fullUri = "https://jsonplaceholder.typicode.com/posts";
    request({
        url: fullUri
    }, 
    (err, res, body) => {
        console.log("Request callback fired...");
        if (err || res.statusCode !== 200) return callback(err);
        console.log(result);
        callback();
    });
});

我看到的是:

Loop iterated 0
Loop iterated 1
Loop iterated 2
Request callback fired...
Curly
Request callback fired...
Larry
Request callback fired...
Moe

我需要看到的是:

Loop iterated 0
Request callback fired...
Curly
Loop iterated 1
Request callback fired...
Larry
Loop iterated 2
Request callback fired...
Moe

另外,如果有一个内置的方法来做同样的事情(async/await?Promise?)并且可以删除异步库,那就更好了。

我已经看到了一些很聪明的递归示例,但是当我将其用于更复杂的情况(例如每个循环多次请求调用等)时,我觉得这种方法很难遵循,并且不那么可读。

2个回答

你可以async完全放弃,async/await很容易地

Promise您的请求和使用 async/await

只是request变成一个Promise这样你就可以await了。

更好的是使用request-promise-native已经使用原生 Promise 包装了请求。

串行示例

从那时起,这是一个灌篮async/await

const rp = require('request-promise-native')

const users = [1, 2, 3, 4]
const results = []

for (const idUser of users) {
  const result = await rp('http://foo.com/users/' + idUser)

  results.push(result)
}

并行示例

现在,上述解决方案的问题在于它很慢 - 请求是串行运行的。大多数时候这并不理想。

如果您不需要下一个请求的前一个请求的结果,只需继续执行 aPromise.all来触发并行请求。

const users = [1, 2, 3, 4]

const pendingPromises = []
for (const idUser of users) {
  // Here we won't `await` on *each and every* request.
  // We'll just prepare it and push it into an Array
  pendingPromises.push(rp('http://foo.com/users/' + idUser))
}

// Then we `await` on a a `Promise.all` of those requests
// which will fire all the prepared promises *simultaneously*, 
// and resolve when all have been completed
const results = await Promise.all(pendingPromises)

错误处理

错误处理async/await由普通try..catch块提供,为简洁起见,我已将其省略。

哦哦 - 您可能会在创建这么多函数时遇到调用堆栈限制问题 - 不过,您的回调样式代码也会出现同样的问题。还有内存呢?您是否打算将整个 CSV 加载到内存中?合身吗?您可能需要查看您具体需要做什么?
2021-04-22 15:19:08
我也是这么想的。现在使用小文件进行测试,但预计会更大。我已经通过multer将文件流式传输到API端点。然后我需要处理每一行,将其拆分为几条记录,然后将这些记录插入到分散在其他几个微服务端点的表中......因此是“请求”调用。
2021-04-30 15:19:08
完毕。非常感谢!可以在这里找到:stackoverflow.com/questions/47999919/...
2021-05-05 15:19:08
谢谢,尼古拉斯。试图标记你,但不能。此例程将是非常大的 CSV 上传的一部分。有多少未决的Promise太多了?如果我有 100,000 条记录怎么办?还是……几百万?
2021-05-11 15:19:08
好的,不错。您能否为此提出另一个问题,以便我可以帮助您?
2021-05-12 15:19:08

如果您有许多(数千)个 url 需要处理,最好定义一个批处理大小并递归调用 process 函数来处理一个批处理。

最好限制活动连接的数量,您可以使用来限制活动连接或一定时间内的连接(每秒仅 5 个)。

最后但并非最不重要的; 如果你使用Promise.all你想确保当一个Promise被拒绝时,不是所有的成功都会丢失。您可以捕获被拒绝的请求并返回一个Fail类型对象,然后它会使用此 Fail 类型进行解析。

代码看起来像这样:

const async = require("async");
//lib comes from: https://github.com/amsterdamharu/lib/blob/master/src/index.js
const lib = require("lib");
const request = require("request");

const Fail = function(reason){this.reason=reason;};
const isFail = o=>(o&&o.constructor)===Fail;
const requestAsPromise = fullUri =>
  new Promise(
    (resolve,reject)=>
      request({
        url: fullUri
      }, 
      (err, res, body) => {
        console.log("Request callback fired...");
        if (err || res.statusCode !== 200) reject(err);
        console.log("Success:",fullUri);
        resolve([res,body]);
      })
  )
const process = 
  handleBatchResult =>
  batchSize =>
  maxFunction =>
  urls =>
    Promise.all(
      urls.slice(0,batchSize)
      .map(
        url=>
          maxFunction(requestAsPromise)(url)
          .catch(err=>new Fail([err,url]))//catch reject and resolve with fail object
      )
    )
    .then(handleBatch)
    .catch(panic=>console.error(panic))
    .then(//recursively call itself with next batch
      _=>
        process(handleBatchResult)(batchSize)(maxFunction)(urls.slice(batchSize))
    );

const handleBatch = results =>{//this will handle results of a batch
  //maybe write successes to file but certainly write failed
  //  you can retry later     
  const successes = results.filter(result=>!isFail(result));
  //failed are the requests that failed
  const failed = results.filter(isFail);
  //To get the failed urls you can do
  const failedUrls = failed.map(([error,url])=>url);
};

const per_batch_1000_max_10_active = 
  process (handleBatch) (1000) (lib.throttle(10));

//start the process
per_batch_1000_max_10_active(largeArrayOfUrls)
.then(
  result=>console.log("Process done")
  ,err=>console.error("This should not happen:".err)
);

在您handleBatchResult可以将失败的请求存储到文件中以供稍后尝试const [error,uri] = failedResultItem;,如果大量请求失败,您应该放弃。

之后handleBatchResult还有一个.catch,那就是你的恐慌模式,它不应该,所以我倒是建议失败有管道错误记录到文件(Linux版)。