Source

queryindexmanager.js

'use strict';

const PromiseHelper = require('./promisehelper');
const HttpExecutor = require('./httpexecutor');
const CompoundTimeout = require('./compoundtimeout');
const errors = require('./errors');

class QueryIndex {
  constructor() {
    this.name = '';
    this.isPrimary = false;
    this.type = '';
    this.state = '';
    this.keyspace = '';
    this.indexKey = [];
    this.condition = [];
  }

  _fromData(data) {
    this.name = data.name;
    this.isPrimary = data.is_primary;
    this.type = data.using;
    this.state = data.state;
    this.keyspace = data.keyspace_id;
    this.indexKey = data.index_key;
    this.condition = data.condition;

    return this;
  }
}

/**
 * QueryIndexManager provides an interface for managing the
 * query indexes on the cluster.
 *
 * @category Management
 */
class QueryIndexManager {
  /**
   * @hideconstructor
   */
  constructor(cluster) {
    this._cluster = cluster;
  }

  get _http() {
    return new HttpExecutor(this._cluster._getClusterConn());
  }

  async _createIndex(bucketName, options, callback) {
    return PromiseHelper.wrapAsync(async () => {
      var qs = '';

      if (!options.fields) {
        qs += 'CREATE PRIMARY INDEX';
      } else {
        qs += 'CREATE INDEX';
      }

      if (options.name) {
        qs += ' `' + options.name + '`';
      }

      qs += ' ON `' + bucketName + '`';

      if (options.fields && options.fields.length > 0) {
        qs += '(';
        for (var i = 0; i < options.fields.length; ++i) {
          if (i > 0) {
            qs += ', ';
          }

          qs += '`' + options.fields[i] + '`';
        }
        qs += ')';
      }

      var withOpts = {};

      if (options.deferred) {
        withOpts.defer_build = true;
      }

      if (options.numReplicas) {
        withOpts.num_replica = options.numReplicas;
      }

      if (Object.keys(withOpts).length > 0) {
        qs += ' WITH ' + JSON.stringify(withOpts);
      }

      try {
        await this._cluster.query(qs, {
          timeout: options.timeout,
        });
      } catch (err) {
        if (options.ignoreIfExists && err instanceof errors.IndexExistsError) {
          // swallow the error if the user wants us to
        } else {
          throw err;
        }
      }

      return true;
    }, callback);
  }

  /**
   * @typedef {function(Error, boolean)} CreateQueryIndexCallback
   */
  /**
   *
   * @param {string} bucketName
   * @param {string} indexName
   * @param {string[]} fields
   * @param {*} [options]
   * @param {boolean} [options.ignoreIfExists]
   * @param {boolean} [options.deferred]
   * @param {integer} [options.timeout]
   * @param {CreateQueryIndexCallback} [callback]
   *
   * @throws {IndexExistsError}
   * @throws {CouchbaseError}
   * @returns {Promise<boolean>}
   */
  async createIndex(bucketName, indexName, fields, options, callback) {
    if (options instanceof Function) {
      callback = arguments[2];
      options = undefined;
    }
    if (!options) {
      options = {};
    }

    return PromiseHelper.wrapAsync(async () => {
      return this._createIndex(bucketName, {
        name: indexName,
        fields: fields,
        ignoreIfExists: options.ignoreIfExists,
        deferred: options.deferred,
        timeout: options.timeout,
      });
    }, callback);
  }

  /**
   * @typedef {function(Error, boolean)} CreatePrimaryIndexCallback
   */
  /**
   *
   * @param {string} bucketName
   * @param {*} [options]
   * @param {boolean} [options.ignoreIfExists]
   * @param {boolean} [options.deferred]
   * @param {integer} [options.timeout]
   * @param {CreatePrimaryIndexCallback} [callback]
   *
   * @throws {IndexExistsError}
   * @throws {CouchbaseError}
   * @returns {Promise<boolean>}
   */
  async createPrimaryIndex(bucketName, options, callback) {
    if (options instanceof Function) {
      callback = arguments[0];
      options = undefined;
    }
    if (!options) {
      options = {};
    }

    return PromiseHelper.wrapAsync(async () => {
      return this._createIndex(bucketName, {
        name: options.name,
        ignoreIfExists: options.ignoreIfExists,
        deferred: options.deferred,
        timeout: options.timeout,
      });
    }, callback);
  }

  async _dropIndex(bucketName, options, callback) {
    return PromiseHelper.wrapAsync(async () => {
      var qs = '';

      if (!options.name) {
        qs += 'DROP PRIMARY INDEX `' + bucketName + '`';
      } else {
        qs += 'DROP INDEX `' + bucketName + '`.`' + options.name + '`';
      }

      try {
        await this._cluster.query(qs, {
          timeout: options.timeout,
        });
      } catch (err) {
        if (
          options.ignoreIfNotExists &&
          err instanceof errors.QueryIndexNotFoundError
        ) {
          // swallow the error if the user wants us to
        } else {
          throw err;
        }
      }

      return true;
    }, callback);
  }

  /**
   * @typedef {function(Error, boolean)} DropQueryIndexCallback
   */
  /**
   *
   * @param {string} bucketName
   * @param {string} indexName
   * @param {*} [options]
   * @param {boolean} [options.ignoreIfNotExists]
   * @param {integer} [options.timeout]
   * @param {DropQueryIndexCallback} [callback]
   *
   * @throws {QueryIndexNotFoundError}
   * @throws {CouchbaseError}
   * @returns {Promise<boolean>}
   */
  async dropIndex(bucketName, indexName, options, callback) {
    if (options instanceof Function) {
      callback = arguments[2];
      options = undefined;
    }
    if (!options) {
      options = {};
    }

    return PromiseHelper.wrapAsync(async () => {
      return this._dropIndex(bucketName, {
        name: indexName,
        ignoreIfNotExists: options.ignoreIfNotExists,
        timeout: options.timeout,
      });
    }, callback);
  }

  /**
   * @typedef {function(Error, boolean)} DropPrimaryIndexCallback
   */
  /**
   *
   * @param {string} bucketName
   * @param {*} [options]
   * @param {boolean} [options.ignoreIfNotExists]
   * @param {integer} [options.timeout]
   * @param {DropPrimaryIndexCallback} [callback]
   *
   * @throws {QueryIndexNotFoundError}
   * @throws {CouchbaseError}
   * @returns {Promise<boolean>}
   */
  async dropPrimaryIndex(bucketName, options, callback) {
    if (options instanceof Function) {
      callback = arguments[0];
      options = undefined;
    }
    if (!options) {
      options = {};
    }

    return PromiseHelper.wrapAsync(async () => {
      return this._dropIndex(bucketName, {
        name: options.name,
        ignoreIfNotExists: options.ignoreIfNotExists,
        timeout: options.timeout,
      });
    }, callback);
  }

  /**
   * @typedef {Object} QueryIndex
   * @property {string} name
   * @property {boolean} isPrimary
   * @property {string} type
   * @property {string} state
   * @property {string} keyspace
   * @property {string} indexKey
   */

  /**
   * @typedef {function(Error, QueryIndex[])} GetAllIndexesCallback
   */
  /**
   *
   * @param {string} bucketName
   * @param {*} [options]
   * @param {integer} [options.timeout]
   * @param {GetAllIndexesCallback} [callback]
   *
   * @throws {CouchbaseError}
   * @returns {Promise<QueryIndex[]>}
   */
  async getAllIndexes(bucketName, options, callback) {
    if (options instanceof Function) {
      callback = arguments[1];
      options = undefined;
    }
    if (!options) {
      options = {};
    }

    return PromiseHelper.wrapAsync(async () => {
      var qs = '';
      qs += 'SELECT idx.* FROM system:indexes AS idx';
      qs += ' WHERE keyspace_id="' + bucketName + '"';
      qs += ' AND `using`="gsi" ORDER BY is_primary DESC, name ASC';

      var res = await this._cluster.query(qs, {
        timeout: options.timeout,
      });

      var indexes = [];
      res.rows.forEach((row) => {
        indexes.push(new QueryIndex()._fromData(row));
      });

      return indexes;
    }, callback);
  }

  /**
   * @typedef {function(Error, string[])} BuildDeferredIndexesCallback
   */
  /**
   *
   * @param {string} bucketName
   * @param {*} [options]
   * @param {integer} [options.timeout]
   * @param {BuildDeferredIndexesCallback} [callback]
   *
   * @throws {CouchbaseError}
   * @returns {Promise<string[]>}
   */
  async buildDeferredIndexes(bucketName, options, callback) {
    if (options instanceof Function) {
      callback = arguments[1];
      options = undefined;
    }
    if (!options) {
      options = {};
    }

    var timer = new CompoundTimeout(options.timeout);

    return PromiseHelper.wrapAsync(async () => {
      var indices = await this.getAllIndexes({
        timeout: timer.left(),
      });

      var deferredList = [];
      for (var i = 0; i < indices.length; ++i) {
        var index = indices[i];

        if (index.state === 'deferred' || index.state === 'pending') {
          deferredList.push(index.name);
        }
      }

      // If there are no deferred indexes, we have nothing to do.
      if (!deferredList) {
        return [];
      }

      var qs = '';
      qs += 'BUILD INDEX ON `' + bucketName + '` ';
      qs += '(';
      for (var j = 0; j < deferredList.length; ++j) {
        if (j > 0) {
          qs += ', ';
        }
        qs += '`' + deferredList[j] + '`';
      }
      qs += ')';

      // Run our deferred build query
      await this._cluster.query(qs, {
        timeout: timer.left(),
      });

      // Return the list of indices that we built
      return deferredList;
    }, callback);
  }

  /**
   * @typedef {function(Error, boolean)} WatchIndexesCallback
   */
  /**
   *
   * @param {string} bucketName
   * @param {string[]} indexNames
   * @param {number} duration
   * @param {*} [options]
   * @param {integer} [options.watchPrimary]
   * @param {WatchIndexesCallback} [callback]
   *
   * @throws {CouchbaseError}
   * @returns {Promise<boolean>}
   */
  async watchIndexes(bucketName, indexNames, duration, options, callback) {
    if (options instanceof Function) {
      callback = arguments[1];
      options = undefined;
    }
    if (!options) {
      options = {};
    }

    var timer = new CompoundTimeout(duration);

    return PromiseHelper.wrapAsync(async () => {
      if (options.watchPrimary) {
        indexNames = [...indexNames, '#primary'];
      }

      var curInterval = 50;
      for (;;) {
        // Get all the indexes that are currently registered
        var foundIdxs = await this.getAllIndexes(bucketName, {
          timeout: timer.left(),
        });
        var onlineIdxs = foundIdxs.filter((idx) => idx.state === 'online');
        var onlineIdxNames = onlineIdxs.map((idx) => idx.name);

        // Check if all the indexes we want are online
        var allOnline = true;
        indexNames.forEach((indexName) => {
          allOnline &= onlineIdxNames.indexOf(indexName) !== -1;
        });

        // If all the indexes are online, we've succeeded
        if (allOnline) {
          break;
        }

        // Add 500 to our interval to a max of 1000
        curInterval = Math.min(curInterval, curInterval + 500);

        // Make sure we don't go past our user-specified duration
        curInterval = Math.min(curInterval, timer.left());

        if (curInterval <= 0) {
          throw new errors.CouchbaseError(
            'Failed to find all indexes online within the alloted time.'
          );
        }

        // Wait until curInterval expires
        await new Promise((resolve) => setTimeout(resolve, curInterval));
      }
    }, callback);
  }
}
module.exports = QueryIndexManager;