client/preprocessing/daemon.js

module.exports = function (jiffClient) {
  var linkedList = require('../../common/linkedlist.js');
  var currentBatchLoad = 0;
  var suspendedTasks = 0;

  var getFirstTask = function (task) {
    if (task.count > 1) {
      var remainingTasks = Object.assign({}, task);

      var deferred1 = new jiffClient.helpers.Deferred();
      var deferred2 = new jiffClient.helpers.Deferred();
      Promise.all([deferred1.promise, deferred2.promise]).then(task.deferred.resolve);
      task.deferred = deferred1;
      remainingTasks.deferred = deferred2;

      remainingTasks.count--;
      task.count = 1;
      if (task.id_list != null) {
        task.id = remainingTasks.id_list.shift();
        task.id_list = null;
      }
      jiffClient.currentPreprocessingTasks.pushHead(remainingTasks);
    }
    if (task.id_list != null) {
      task.id = task.id_list[0];
      task.id_list = null;
    }
    return task;
  };

  var checkIfDone = function () {
    if (currentBatchLoad === 0 && suspendedTasks === 0) {
      var callback = jiffClient.preprocessingCallback;
      jiffClient.preprocessingCallback = null;
      callback(jiffClient);
    }
  };

  var buildID = function (task) {
    // Two kinds of operations: one that relies on different sets of senders and receivers, and one that has a set of holders
    if (task.dependent_op === 'open' || task.dependent_op === 'bits.open') { // TODO: make this part of the description in table
      var open_parties = task.params['open_parties'] != null ? task.params['open_parties'] : task.receivers_list;
      task.id = jiffClient.counters.gen_op_id2_preprocessing(task.dependent_op, open_parties, task.receivers_list);
    } else {
      task.id = jiffClient.counters.gen_op_id_preprocessing(task.dependent_op, task.receivers_list);
    }
  };

  var taskIsExecutable = function (task) {
    // if the protocol name is in the map, it can be directly executed
    var namespace = find_closest_namespace(task.dependent_op, task.params['namespace']);
    return (namespace == null);
  };

  var find_closest_namespace = function (op, starting_namespace) {
    var namespace_index = jiffClient.extensions.indexOf(starting_namespace);
    while (namespace_index >= 0) {
      var namespace = jiffClient.extensions[namespace_index];
      if (jiffClient.preprocessing_function_map[namespace] != null && jiffClient.preprocessing_function_map[namespace][op] != null) {
        return namespace;
      }
      namespace_index--;
    }

    return null;
  };

  // execute a task and handle it upon completion
  var executeTask = function (task) {
    currentBatchLoad++;

    var _params = Object.assign({}, task.params);
    _params.output_op_id = task.id;

    var protocol = task.protocols[task.dependent_op] || jiffClient.default_preprocessing_protocols[task.dependent_op];
    var result = protocol(task.threshold, task.receivers_list, task.compute_list, task.Zp, _params, task.protocols);

    if (result.promise == null || result.promise.then == null) {
      taskFinished(task, result);
    } else {
      result.promise.then(taskFinished.bind(null, task, result));
    }
  };
  var taskFinished = function (task, result) {
    currentBatchLoad--;

    if (task.receivers_list.indexOf(jiffClient.id) > -1) {
      jiffClient.store_preprocessing(task.id, result.share);
    }
    task.deferred.resolve();
    jiffClient.preprocessingDaemon();
  };

  // expand task by one level and replace the node in the task list
  var expandTask = function (task) {
    // copy of params
    var _params = Object.assign({}, task.params);

    // Recursively follow jiffClient.preprocessing_function_map
    // to figure out the sub-components/nested primitives of the given operation
    // and pre-process those with the right op_ids.

    // ID should never be null
    var namespace = find_closest_namespace(task.dependent_op, _params['namespace']);
    var preprocessing_dependencies = jiffClient.preprocessing_function_map[namespace][task.dependent_op];

    if (typeof(preprocessing_dependencies) === 'function') {
      preprocessing_dependencies = preprocessing_dependencies(task.dependent_op, task.count, task.protocols, task.threshold, task.receivers_list, task.compute_list, task.Zp, task.id, _params, task, jiffClient);
    }

    var newTasks = linkedList();
    var deferredChain = [];
    // build linked list of new dependencies, afterwords merge them with current tasks list
    for (var k = 0; k < preprocessing_dependencies.length; k++) {
      var dependency = preprocessing_dependencies[k];
      var next_op = dependency['op'];

      // copy both the originally given extra_params and the extra params of the dependency and merge them
      // together, dependency params overwrite duplicate keys.
      // If params are ever needed in non-leaf operations, this must be changed to accommodate
      var extra_params = Object.assign({}, _params, dependency['params']);
      extra_params['namespace'] = dependency['namespace'] != null ? dependency['namespace'] : 'base';
      if (dependency.handler != null) {
        extra_params = dependency.handler(task.threshold, task.receivers_list, task.compute_list, task.Zp, task.id, extra_params, task, jiffClient);
      }
      if (extra_params.ignore === true) {
        continue;
      }

      // compose ids similar to how the actual operation is implemented
      var next_id_list = [];
      var next_count = dependency['count'];

      if (next_count == null) {
        next_count = 1;
        next_id_list[0] = dependency['absolute_op_id'] || (task.id + dependency['op_id']);
      } else {
        next_count = next_count(task.threshold, task.receivers_list, task.compute_list, task.Zp, task.id, extra_params);
        for (var j = 0; j < next_count; j++) {
          next_id_list.push(dependency['absolute_op_id'] || (task.id + dependency['op_id'] + j));
        }
      }

      var nextTask = {
        dependent_op : next_op,
        count : next_count,
        threshold : dependency['threshold'] || task.threshold,
        receivers_list : dependency['receivers_list'] || task.receivers_list,
        compute_list : dependency['compute_list'] || task.compute_list,
        Zp : dependency['Zp'] || task.Zp,
        id_list : next_id_list,
        id : null,
        params : extra_params,
        protocols : task.protocols,
        deferred: new jiffClient.helpers.Deferred()
      };

      deferredChain.push(nextTask.deferred.promise);
      if (dependency.requires != null) {
        nextTask.wait = true;
        var required_ops = [];
        for (var r = 0; r < dependency.requires.length; r++) {
          if (dependency.requires[r] >= k) {
            throw new Error('Preprocessing dependency "' + next_op + '" in preprocessingMap for "' + task.dependent_op
              + '" at namespace "' + namespace + '" cannot require subsequent dependency ' + dependency.requires[r]);
          }
          required_ops.push(deferredChain[dependency.requires[r]]);
        }

        Promise.all(required_ops).then(function (nextTask) {
          delete nextTask['wait'];
          jiffClient.preprocessingDaemon();
        }.bind(null, nextTask));

        // add waiting task to the tail of the queue
        // jiffClient.currentPreprocessingTasks.add(nextTask);
        newTasks.add(nextTask);
      } else {
        // non-waiting tasks are added to the head of the queue
        newTasks.add(nextTask);
      }
    }

    Promise.all(deferredChain).then(task.deferred.resolve);
    jiffClient.currentPreprocessingTasks = newTasks.extend(jiffClient.currentPreprocessingTasks);
  };

  /**
   * Preprocessing Daemon that executes all currently scheduled preprocessing tasks (entries in jiffClient.currentPreprocessingTasks array) in order.
   * @method preprocessingDaemon
   * @memberof module:jiff-client~JIFFClient
   * @instance
   */
  jiffClient.preprocessingDaemon = function () {
    while (currentBatchLoad < jiffClient.preprocessingBatchSize) {
      var task = jiffClient.currentPreprocessingTasks.popHead();

      if (task == null) {
        checkIfDone();
        return;
      }

      if (task.object.wait) {
        jiffClient.currentPreprocessingTasks.pushHead(task.object);
        break;
      }

      task = getFirstTask(task.object);
      if (task.id == null) {
        buildID(task);
      }

      // check if task is executable or no
      if (taskIsExecutable(task)) {
        executeTask(task); // co-recursively calls preprocessingDaemon()
      } else {
        //expand single task
        expandTask(task);
      }
    }
  };
};