Now that you are here, I believe you want to understand how to implement background jobs in Node.js. I took me some time to figure it out but I will put down the understanding and solution for you.
Problem Statement
To be able to run background jobs and take action while the main application can continue to work.
The Application
Let’s consider an application that takes request for report generation which will take some time and you cant have user waiting for the same. So there are two parts of the application. One a server side which takes user requests for new reports and other a background worker which will process the same in the background without disturbing the main application.
I gave the implementation design some thoughts and came up with 2 approaches. One with MongoDB and one with Redis. Let’s look at them.
MongoDB Approach
Server Module
This server module is simple Rest API written in Express and Mongo driver which just put the job request in Mongo and moves on. Mongo being a JS oriented database is quite fast when handling and maintaining JS objects from the application. So here is the code.
const express = require('express')
const bodyParser = require('body-parser')
const app = express()
const url = "mongodb://192.168.99.100:32774/"
const dbName = 'offlinereport'
var MongoClient = require('mongodb').MongoClient
var db
var batchStart = 1
const batchMax = 5
var addRequest = function(request){
if(batchStart > batchMax)
batchStart = 1
return new Promise(function(resolve, reject) {
var myObj = {...request, batch: batchStart, submitted: new Date().getTime()}
// use mongo
MongoClient.connect(url, function(err, db) {
if (err) reject(err);
var dbo = db.db(dbName)
dbo.collection("requests").insertOne(myObj, function(err, res) {
if (err) reject(err);
db.close();
batchStart++;
resolve(res)
})
})
});
}
app.use(bodyParser.json())
app.put('/add', function (req, res) {
addRequest(req.body).then(function(result){
res.send(req.body)
})
})
app.listen(3000, () => {
console.log('listening on 3000')
})
Worker Module
This worker module is node program which just polls Mongo for new updates and once it reads for its own batch, it will mark the same as processed continue to the job. The trick here is that the workers need to know which batch they are in so while starting the program we provide a command line argument for the batch number. So bottom line, it picks all pending jobs from DB and processes them one by one.
var MongoClient = require('mongodb').MongoClient
var url = "mongodb://192.168.99.100:32774/"
const {ObjectId} = require('mongodb')
const batch = process.argv[2]
const dbName = 'offlinereport'
var counter = 1
var getRecords = function() {
return new Promise(function(resolve, reject) {
MongoClient.connect(url, function(err, db) {
if (err) reject(err);
var dbo = db.db(dbName)
var query = { batch: 1, status: 'N' }
var sort = { submitted: 1}
dbo.collection("requests")
.find(query)
.sort(sort)
.limit(1)
.toArray(function(err, result) {
if (err) throw err
resolve(result)
db.close();
});
});
});
}
var updateRecordStatus = function(oid, status) {
return new Promise(function(resolve, reject) {
MongoClient.connect(url, function(err, db) {
if (err) reject(err)
var dbo = db.db(dbName)
var myquery = {_id: ObjectId(oid) }
var newvalues = { $set: { status: status } }
//console.log(myquery, newvalues)
dbo.collection("requests")
.updateOne(myquery, newvalues, function(err, res) {
if (err) reject({})
//console.log(res.result)
resolve({})
db.close();
});
});
});
}
function checkForUpdates () {
console.log(new Date(), 'check-for-updates started')
// get new data to process
getRecords().then(function(result) {
// mark record as processing
//console.log(result.length)
if(result.length > 0) {
var oid = result[0]._id
updateRecordStatus(oid, 'P').then(function (result){
console.log(new Date(), 'marked record as processing')
// process the record
// processing done, update data is processed
updateRecordStatus(oid, 'D').then(function (result){
console.log(new Date(), 'marked record as processed')
console.log(new Date(), 'check-for-updates completed')
console.log(new Date(), 'going to sleep..')
counter ++
setTimeout(checkForUpdates, 10000)
})
})
}
else{
console.log(new Date(), 'no new records, next time..')
counter ++
setTimeout(checkForUpdates, 2000)
}
})
}
checkForUpdates()
Redis Approach
Server Module
This server module is simple Rest API written in Express and Redis Library Bee-Queue which just put the job request in the queue and moves on.
const express = require(‘express’)
const Queue = require(‘bee-queue’);
const bodyParser = require(‘body-parser’)
const app = express()
const requests = new Queue(‘requests’, {
redis: {
host: ‘192.168.99.100’,
port: 32775,
db: 0,
options: {}
},
isWorker: false
});
var addRequest = function(request){
var myObj = {…request, submitted: new Date().getTime()}
const job = requests.createJob(myObj);
return job.timeout(3000).retries(2).save();
}
app.use(bodyParser.json())
app.put(‘/add’, function (req, res) {
addRequest(req.body).then(function(result){
console.log(‘job submitted’,result.id)
res.send(req.body)
})
})
app.listen(3000, () => {
console.log(‘listening on 3000’)
})
Worker Module
This is straight forward with the bee-queue providing a right method for thsi job.
const Queue = require('bee-queue');
const bodyParser = require('body-parser')
const requests = new Queue('requests', {
redis: {
host: '192.168.99.100',
port: 32775,
db: 0,
options: {}
},
isWorker: true,
removeOnSuccess: true,
});
requests.process(function (job, done) {
console.log(new Date(), `processing started for ${job.id}`);
console.log(new Date(), `processing completed for job ${job.id}`);
return done(null, job.data);
});
console.log(new Date(), 'worker started');
It is up to you which approach you choose based on your requirement. My personal preference is Redis. Let me know your views.