src/worker/ThreadPool.js
import { EventDispatcher } from "three";
import { ConfigurationMessage, Message } from "./messages";
import { Action } from "./Action";
import * as events from "./thread-pool-events";
import worker from "../../tmp/worker.txt";
/**
* Manages worker threads.
*
* @implements {Disposable}
* @implements {EventListener}
*/
export class ThreadPool extends EventDispatcher {
/**
* Constructs a new thread pool.
*
* @param {Number} [maxWorkers=navigator.hardwareConcurrency] - Limits the amount of active workers. The default limit is the amount of logical processors.
*/
constructor(maxWorkers = navigator.hardwareConcurrency) {
super();
/**
* An object URL that points to the worker program.
*
* @type {String}
* @private
*/
this.workerURL = URL.createObjectURL(new Blob([worker], { type: "text/javascript" }));
/**
* The maximum number of active worker threads.
*
* @type {Number}
*/
this.maxWorkers = Math.min(navigator.hardwareConcurrency, Math.max(maxWorkers, 1));
/**
* A list of existing workers.
*
* @type {Worker[]}
* @private
*/
this.workers = [];
/**
* Keeps track of workers that are currently busy.
*
* @type {WeakSet}
* @private
*/
this.busyWorkers = new WeakSet();
/**
* A configuration message.
*
* This object will be sent to each newly created worker.
*
* @type {ConfigurationMessage}
*/
this.configurationMessage = new ConfigurationMessage();
}
/**
* Handles events.
*
* @param {Event} event - An event.
*/
handleEvent(event) {
switch(event.type) {
case "message": {
this.busyWorkers.delete(event.target);
events.message.worker = event.target;
events.message.response = event.data;
this.dispatchEvent(events.message);
if(this.workers.length > this.maxWorkers) {
this.closeWorker(event.target);
}
break;
}
case "error": {
// Errors are being handled in the worker.
console.error("Encountered an unexpected error", event);
break;
}
}
}
/**
* Closes a worker.
*
* @param {Worker} worker - The worker to close.
*/
closeWorker(worker) {
const index = this.workers.indexOf(worker);
if(this.busyWorkers.has(worker)) {
this.busyWorkers.delete(worker);
worker.terminate();
} else {
worker.postMessage(new Message(Action.CLOSE));
}
worker.removeEventListener("message", this);
worker.removeEventListener("error", this);
if(index >= 0) {
this.workers.splice(index, 1);
}
}
/**
* Creates a new worker.
*
* @private
* @return {Worker} The worker.
*/
createWorker() {
const worker = new Worker(this.workerURL);
this.workers.push(worker);
worker.addEventListener("message", this);
worker.addEventListener("error", this);
worker.postMessage(this.configurationMessage);
return worker;
}
/**
* Polls an available worker and returns it. The worker will be excluded from
* subsequent polls until it finishes its task and sends a message back.
*
* @return {Worker} A worker or null if all resources are currently exhausted.
*/
getWorker() {
let worker = null;
let i, l;
// Check if an existing worker is available.
for(i = 0, l = this.workers.length; i < l; ++i) {
if(!this.busyWorkers.has(this.workers[i])) {
worker = this.workers[i];
this.busyWorkers.add(worker);
break;
}
}
// Try to create a new worker if all existing ones are busy.
if(worker === null && this.workers.length < this.maxWorkers) {
if(this.workerURL !== null) {
worker = this.createWorker();
this.busyWorkers.add(worker);
}
}
return worker;
}
/**
* Resets this thread pool by closing all workers.
*/
clear() {
while(this.workers.length > 0) {
this.closeWorker(this.workers.pop());
}
}
/**
* Removes all active workers and releases the worker program blob.
*/
dispose() {
this.clear();
URL.revokeObjectURL(this.workerURL);
this.workerURL = null;
}
}