Tutorial: 5-QueueWorker

5-QueueWorker

The QueueWorker does not accept ForkTasks. It will handle start/stop action, and therefore when a Task is added it must not be started.

The QueueWorker expect tasks to exit once their job is done, it'll then start queued tasks according to the concurrency setting. If you want to run things in series or in parallel, you'll have to set the according concurrency. For example, having a concurrency to 1 will run each task when the previous one exits. On the other hand, setting concurrency to 10 will run at most 10 tasks at once.

For example:

var relieve = require('relieve')
var ScriptTask = relieve.tasks.ScriptTask
var QueueWorker = relieve.workers.QueueWorker

var worker = new QueueWorker({concurrency: 10})

worker
.add(new ScriptTask('sometask.js'))
.add(new ScriptTask('sometask.js'))

worker.run()
//each task has exit, we can remove them from the worker
.then(function() {
  for(let t of worker.tasks) {
    worker.remove(t.name)
  }
})

You can find working use cases here.

Compressing Task

//task.js
'use strict';
var archiver = require('archiver')
var fs = require('fs')
var channel = process.relieve.ipc

module.exports = {
  start: function() {
    var archive = archiver('zip')
    var action = process.argv[3]
    var output = fs.createWriteStream(action.dest)

    output.on('close', function() {
      channel.send('finish', archive.pointer() + ' bytes written')
      process.exit(0) //Note that we exit this tasks when it's done
    })

    archive.on('error', function(err) { throw err })
    archive.pipe(output)
    archive.bulk([action.src])
    archive.finalize()
  }
}

This task, when started, will compress things we gave through the task arguments. The QueueWorker will be able to run lots of those in separated processes.

'use strict';
var relieve = require('relieve')
var ScriptTask = relieve.tasks.ScriptTask
var QueueWorker = relieve.workers.QueueWorker

var worker = new QueueWorker({concurrency: 10})

//assuming that I have a request for 5 compressions
var actions = [
  {src: ['fixtures/somepath', 'fixtures/somedir/'], dest: 'one.zip'},
  {src: ['fixtures/foo'], dest: 'two.zip'},
  {src: ['fixtures/bar'], dest: 'three.zip'},
  {src: ['fixtures/video.mkv', 'fixtures/audio.mp3'], dest: 'four.zip'},
  {src: ['fixtures/directory/*'], dest: 'five.zip'},
]

//I'm creating one task per action
for(let i in actions) {
  var task = new ScriptTask(__dirname + '/task.js')
  task.name = actions[i].dest //the name is the destination name
  task.arguments = [actions[i]] //give my action as arguments to the task

  worker.add(task)
}

//compress everything
worker.run()
//remove tasks when done
.then(function() {
  for(let t of worker.tasks) {
    worker.remove(t.name)
  }
})

Cksfv

cksfv is a tool that compares CRC hashes. It is commonly used to check the integrity of a multi-part rar file. It works by calculating the hash of each file and comparing the hash to the expected one. This example shows how the QueueWorker can be used to share a cpu-intensive task through multiple process. My benchmarks shows an improvement with many large parts, it won't show improvements with small parts (ie < 100mb).

For example, using 50 250mb parts, having a 20 concurrency:

  • without workers 832125ms (13.8 min), cpu use is about 32% on one process
  • without workers 225141ms (3.75 min), cpu use is about 2% on each of the 20 child processes

The Task:

//task.js
'use strict';
var channel = process.relieve.ipc
var readLine = require('./readLine.js')

module.exports = {
  readLine: function(path, line) {
    readLine(path, line)
    .then(function(resp) {
      channel.send('cksfv', resp)
      process.exit(0)
    })
  }
}

This is the part that calculates the CRC hash:

//readLine.js
'use strict';
var Promise = require('bluebird')
var crc = require('crc')
var fs = Promise.promisifyAll(require('fs'))
var p = require('path')

function readLine(path, line) {
  if(!line.trim())
    return Promise.resolve()

  var original = line.trim().slice(-8)
  var filepath = line.slice(0, -9).trim()
  var resp = {original: original, filepath: filepath}

  console.log('Processsing %s', p.resolve(p.dirname(path), filepath))

  return fs.readFileAsync(p.resolve(p.dirname(path), filepath))
  .then(function(buffer) {
    var str = crc.crc32(buffer).toString(16)

    while(str.length < 8) {
      str = '0'+str
    }

    resp.calculate = str

    return resp
  })
}

module.exports = readLine

The Worker reads the .sfv file and sends every line to a new Task.

It's then executed with: node worker.js path/to/file.sfv.

//worker.js
'use strict';
var relieve = require('relieve')
var assert = require('assert')
var Promise = require('bluebird')
var p = require('path')
var fs = Promise.promisifyAll(require('fs'))
var eol = require('os').EOL
var CallableTask = relieve.tasks.CallableTask
var QueueWorker = relieve.workers.QueueWorker

assert.ok(
  typeof process.argv[2] == 'string' && p.extname(process.argv[2]) == '.sfv',
  'Sfv file must be provided'
);

var path = process.argv[2]
var map = []

//Call my task method
function call(line) {
  return function() {
    worker.task(line).call('readLine', path, line)
  }
}

var worker = new QueueWorker({concurrency: 20})

console.time('cksfv')

fs.readFileAsync(path)
.then(function(data) {
  data = data.toString()
    .split(eol)
    .map(e => e.trim())
    .filter(e => e.length)

  //Creates tasks for each lines, those are not started yet
  data.map(function(line) {
    let task = new CallableTask(__dirname + '/task.js')
    task.name = line
    task.once('start', call(line))
    task.once('cksfv', function(resp) {
      map.push(resp)
    })
    worker.add(task)
  })

  //Here we start tasks keeping a 20 concurrency
  worker.run()
  .then(function() {
    let errors = 0
    for(let i in map) {
      if(map[i].original != map[i].calculate) {
        console.error('File %s does not match crc', map[i].filepath)
        errors++
      }
    }

    console.log('Done checking with %d errors', errors)
    console.timeEnd('cksfv')
  })
})