Using Web Workers to Handle Memory Workload While Processing a Data Stream

Web Workers
Reading Time: 3 minutes

Introduction

When you are processing a data stream gradually over time, you usually don’t want to keep all the data in memory. Imagine we have a log file that we are parsing, and save a subset of the entries to a local database. If we try to read the entire file into RAM before parsing it, our app would likely crash with an out-of-memory error on large files. This is where Web Workers come in: they allow us to create separate processes that communicate with each other via message ports in order to offload tasks from our main threads (and avoid crashing).

A High-Throughput Data Stream

Let’s say we have a data stream that we want to process. We can use a generator function to create an infinite stream of numbers:

function* generateNumbers(numbersPerChunk) {
  while (true) {
    for (let i = 0; i < numbersPerChunk; i++) {
      yield Math.random() * 1000;
    }
  }
}

Handling a Data Stream in The Browser's Main Thread

Now we want to process the data stream in the main thread, we'll assume a stream of numbers and we want to calculate the average of the numbers. We can do this by creating a function that takes a generator function as an argument and returns the average of the numbers:

function* generateNumbers(numbersPerChunk) {
  while (true) {
    for (let i = 0; i < numbersPerChunk; i++) {
      yield Math.random() * 1000;
    }
  }
}

const dataStream = generateNumbers(1000);
let buffer = [];

const processBuffer = () => {
  const sum = buffer.reduce((acc, val) => acc + val, 0);
  const avg = sum / buffer.length;
  console.clear();
  console.log("Average:", avg);
  buffer = [];
};

let producer, consumer;

const runProducerConsumer = function () {
  producer = setInterval(() => {
    buffer.push(dataStream.next().value);
  }, 1);

  consumer = setInterval(() => {
    processBuffer();
    buffer = [];
  }, 1000);
};

const stopProducerConsumer = function () {
  clearInterval(producer);
  clearInterval(consumer);
};

runProducerConsumer();
setTimeout(stopProducerConsumer, 10000);

If we monitor the memory consumption we can see that the memory usage increases over time until the consumer runs and clears the buffer, then the memory usage drops again. This is because we are storing the chunks of data until they are processed.

We can reduce the interval at where the consumer runs, but that can block the main thread if run more often. If we want to process a large data stream, we’ll need to use a web worker to not block the main thread and process it more often.

Worker Threads to The Rescue

However, JavaScript workers let us offload that task easily

const blobConsumer = new Blob(
  [
    `
    let blobProducer = new Blob(
        [
          \`
          let producerInterval, dataStream;

          function* generateNumbers() {
            while (true) {
        yield Math.random() * 1000;
            }
          }

          self.onmessage = function(e) {
            if (e.data === "stop") {
        clearInterval(producerInterval);
            }
          };

          dataStream = generateNumbers();
          producerInterval = setInterval(() => {
            self.postMessage(dataStream.next().value);
          }, 1);
          \`,
        ],
        { type: "text/javascript" }
    );

    let buffer = [];

    function processBuffer() {
      const sum = buffer.reduce((acc, val) => acc + val, 0);
      const avg = sum / buffer.length;
      self.postMessage(avg);
      buffer = [];
    }

    self.onmessage = function (e) {
      if (e.data === "start") {
        producer = new Worker(self.URL.createObjectURL(blobProducer));

        producer.onmessage = (e) => {
          buffer.push(e.data);
          if (buffer.length === 10) {
            processBuffer();
          }
        };
      } else if (e.data === "stop") {
        producer.terminate();
      }
    };
    `,
  ],
  { type: "text/javascript" }
);
const worker = new Worker(window.URL.createObjectURL(blobConsumer));

worker.onmessage = function (e) {
  console.clear();
  console.log("Average:", e.data);
};

worker.postMessage("start");
setTimeout(() => {
  worker.postMessage("stop");
}, 10000);

Now we can see the memory consumption stays constant, and the average is calculated through time.

Conclusion

Web workers allow us to run expensive tasks in parallel, and avoid being blocked by long-running operations while relieving memory workload.

0 Shares:
You May Also Like