Home Reference Source

src/worker/ThreadPool.js

import { EventDispatcher } from "three";
import { ConfigurationMessage, Message } from "./messages";
import { Action } from "./Action";
import * as events from "./thread-pool-events";

import worker from "../../tmp/worker.txt";

/**
 * Manages worker threads.
 *
 * @implements {Disposable}
 * @implements {EventListener}
 */

export class ThreadPool extends EventDispatcher {

	/**
	 * Constructs a new thread pool.
	 *
	 * @param {Number} [maxWorkers=navigator.hardwareConcurrency] - Limits the amount of active workers. The default limit is the amount of logical processors.
	 */

	constructor(maxWorkers = navigator.hardwareConcurrency) {

		super();

		/**
		 * An object URL that points to the worker program.
		 *
		 * @type {String}
		 * @private
		 */

		this.workerURL = URL.createObjectURL(new Blob([worker], { type: "text/javascript" }));

		/**
		 * The maximum number of active worker threads.
		 *
		 * @type {Number}
		 */

		this.maxWorkers = Math.min(navigator.hardwareConcurrency, Math.max(maxWorkers, 1));

		/**
		 * A list of existing workers.
		 *
		 * @type {Worker[]}
		 * @private
		 */

		this.workers = [];

		/**
		 * Keeps track of workers that are currently busy.
		 *
		 * @type {WeakSet}
		 * @private
		 */

		this.busyWorkers = new WeakSet();

		/**
		 * A configuration message.
		 *
		 * This object will be sent to each newly created worker.
		 *
		 * @type {ConfigurationMessage}
		 */

		this.configurationMessage = new ConfigurationMessage();

	}

	/**
	 * Handles events.
	 *
	 * @param {Event} event - An event.
	 */

	handleEvent(event) {

		switch(event.type) {

			case "message": {

				this.busyWorkers.delete(event.target);

				events.message.worker = event.target;
				events.message.response = event.data;

				this.dispatchEvent(events.message);

				if(this.workers.length > this.maxWorkers) {

					this.closeWorker(event.target);

				}

				break;

			}

			case "error": {

				// Errors are being handled in the worker.
				console.error("Encountered an unexpected error", event);
				break;

			}

		}

	}

	/**
	 * Closes a worker.
	 *
	 * @param {Worker} worker - The worker to close.
	 */

	closeWorker(worker) {

		const index = this.workers.indexOf(worker);

		if(this.busyWorkers.has(worker)) {

			this.busyWorkers.delete(worker);
			worker.terminate();

		} else {

			worker.postMessage(new Message(Action.CLOSE));

		}

		worker.removeEventListener("message", this);
		worker.removeEventListener("error", this);

		if(index >= 0) {

			this.workers.splice(index, 1);

		}

	}

	/**
	 * Creates a new worker.
	 *
	 * @private
	 * @return {Worker} The worker.
	 */

	createWorker() {

		const worker = new Worker(this.workerURL);

		this.workers.push(worker);

		worker.addEventListener("message", this);
		worker.addEventListener("error", this);

		worker.postMessage(this.configurationMessage);

		return worker;

	}

	/**
	 * Polls an available worker and returns it. The worker will be excluded from
	 * subsequent polls until it finishes its task and sends a message back.
	 *
	 * @return {Worker} A worker or null if all resources are currently exhausted.
	 */

	getWorker() {

		let worker = null;

		let i, l;

		// Check if an existing worker is available.
		for(i = 0, l = this.workers.length; i < l; ++i) {

			if(!this.busyWorkers.has(this.workers[i])) {

				worker = this.workers[i];
				this.busyWorkers.add(worker);

				break;

			}

		}

		// Try to create a new worker if all existing ones are busy.
		if(worker === null && this.workers.length < this.maxWorkers) {

			if(this.workerURL !== null) {

				worker = this.createWorker();
				this.busyWorkers.add(worker);

			}

		}

		return worker;

	}

	/**
	 * Resets this thread pool by closing all workers.
	 */

	clear() {

		while(this.workers.length > 0) {

			this.closeWorker(this.workers.pop());

		}

	}

	/**
	 * Removes all active workers and releases the worker program blob.
	 */

	dispose() {

		this.clear();

		URL.revokeObjectURL(this.workerURL);

		this.workerURL = null;

	}

}