How To Solve Async Queue Process Issues In Node.Js

We had a situation in a project we have to process the data asynchronously like one after the other. In this scenario, the async queue has become a handy tool for this case. It worked well until we found something in my code that is making the queue to freeze/stop. The issue occurred for some tasks whenever there is an error like missing/Invalid keys in the task object, the queue goes into an idle state and never drains out and not letting the current task to be finished and the next tasks were never going to process.

In our case, the queue processing a task included multiple levels of async-await function calls which made the use of return or call the drain in every function as a complex solution to use. So we started debugging to know the cause using setIntervals and found few lines where they throw an error because of the task object parameters.

const async = require(‘async’)
//  queue worker function that process the task
var Queue = async.queue(async function (task, callback) {
 console.log(“#########################”)
 console.log(‘given task is ‘ + task.process + ‘ ‘ + task.wallet.value);
 await process1(task, callback)
}, 1);

// assign a callback
Queue.drain = function () {
 console.log(‘task is processed’);
};

process1 function is the worker function that is called when a new task is pushed to the queue. This function gets and updates the existing wallet amount.

async function process1(task, callback) {
 var wallet = await getWallet();  // get user wallet from database
 let value = task.wallet.value// processing logic
 task.process == ‘credit’ ? wallet += value : wallet -= value
 await setWallet(wallet);// save updated wallet data
 callback()
}

Here are the tasks defined to be performed by the queue. In these tasks, we can observe that in the third index of tasks array, the wallet is missing. So this will raise an unhandledPromiseRejectionWarning and as a result queue will be stopped.

var tasks = [
 {
   process: ‘credit’, wallet: { value: 100 }
 },
 {
   process: ‘credit’, wallet: { value: 200 }
 },
 {
   process: ‘debit’, wallet: { value: 300 },
 },
 {
   process: ‘credit’,// wallet: { value: 400 },
 },
 {
   process: ‘credit’, wallet: { value: 500 },
}]

getWallet function returns the current wallet amount.
setWallet function updates the wallet amount. 

async function getWallet() {
 return balance
}

async function setWallet(newBalance) {
 console.log(“new balance: “, newBalance)
 balance = newBalance
 console.log(‘updated wallet:’ + balance)
}

Initializing the wallet amount with 0 and pushing the tasks to the queue

var balance = 0;
console.log(balance)
Queue.push(tasks, function () {
 console.log(`successfully processed a task  `)
})

Log generated after running the code is :




 

From the above log, we can observe that the async queue is stopped at the 4th task and the 5th task is not processed.
Then i decided to validate the keys of the task and proceed to process only when the task object is valid. If it is not valid then i would call the callback function directly and this helped me to resolve the queue stops.

Created Validation Function:
This function checks whether the given task is a valid task by checking the key-value pairs.

function validateTask(task) {
 let operations = [‘credit’, ‘debit’]
 if (task && task.process && operations.includes(task.process) && task.wallet && task.wallet.value >= 0 && task.wallet.value > 0) {
   console.log(‘given task is valid’)
   return true
 }
 else {
   console.log(‘given task is invalid’);
   return false
 }
}

Modified Worker Definition:
In the worker definition, a given task is validated and if it is valid, then we process the task, else we call the callback directly.

// queue worker function that validates the task and process it
var Queue = async.queue(function (task, callback) {
 console.log(“#########################”)
 console.log(‘given task is ‘ + JSON.stringify(task));
 if (validateTask(task)) {
   process1(task, function () {
     callback();
  })
 } else { callback() }
}, 1);

then it worked. Result :





Here, in the 4th task where no wallet is defined, validateTask function checks and avoids the processing of this task, and thus the next task is processed.
But as it is not a permanent solution, from the documentation of async, we observed that this type of error occurred in async v1.5.x and was resolved in v2.6.x using the queue.error function.
So updating the async module to v2.6.x or greater( used 3.2.0 which is latest now) with few changes in worker function resolved the issue completely without the need for separate validation.
Use this command to update the async node_module

 npm i async@3.2.0

add queue.error function
This function is called whenever the queue worker experiences an error.

Queue.error(function (err, callback) {
 console.error(‘task experienced an error:’ + err);
});

Update the worker definition.

//  queue worker function that process the task
var Queue = async.queue(async function (task, callback) {
 console.log(“#########################”)
 console.log(‘given task is ‘ + task.process + ‘ ‘ + task.wallet.value);
 await process1(task, callback)
}, 1);
Queue.drain(() => {
 console.log(‘task is processed’)
})

async function process1(task, callback) {
 var wallet = await getWallet();  // get user wallet from database
 // processing logic
 let value = task.wallet.value
 task.process == ‘credit’ ? wallet += value : wallet -= value
 await setWallet(wallet);// save updated wallet data
 // callback()
}

Final Result :
  


 

While processing the 4th task, an error is thrown and it is caught by queue.error() and the error hook is executed. After this task, the fifth task is also processed successfully.

So from now on whenever the queue experiences an error due to any task while processing, the queue calls the error hook (queue.error()) and starts processing the next task.

Leave A Comment

Your email address will not be published. Required fields are marked *