From 43c2fede379af45ddf2121af996a68be7eecc4ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stanis=C5=82aw=20Czech?= Date: Thu, 29 Jan 2026 15:32:31 +0100 Subject: [PATCH 1/4] Convert suport elements to class --- test/test-helper.js | 100 +++++++++++++++++++++----------------------- 1 file changed, 48 insertions(+), 52 deletions(-) diff --git a/test/test-helper.js b/test/test-helper.js index 363ffa657..6dacb68ac 100644 --- a/test/test-helper.js +++ b/test/test-helper.js @@ -41,6 +41,54 @@ process.on("unhandledRejection", (reason) => { testUnhandledError = reason; }); +/** + * A polyfill of Map, valid for testing. It does not support update of values + */ +class MapPolyFill { + constructor(arr) { + this.arr = arr || []; + const self = this; + Object.defineProperty(this, "size", { + get: function () { + return self.arr.length; + }, + configurable: false, + }); + } + set(k, v) { + this.arr.push([k, v]); + } + get(k) { + return this.arr.filter(function (item) { + return item[0] === k; + })[0]; + } + forEach(callback) { + this.arr.forEach(function (item) { + // first the value, then the key + callback(item[1], item[0]); + }); + } + toString() { + return this.arr.toString(); + } +} + +class SetPolyFill { + constructor(arr) { + this.arr = arr || []; + } + forEach(cb, thisArg) { + this.arr.forEach(cb, thisArg); + } + add(x) { + this.arr.push(x); + } + toString() { + return this.arr.toString(); + } +} + const helper = { /** * Creates a ccm cluster, initializes a Client instance the before() and after() hooks, create @@ -1055,58 +1103,6 @@ const helper = { }, }; -/** - * A polyfill of Map, valid for testing. It does not support update of values - * @constructor - */ -function MapPolyFill(arr) { - this.arr = arr || []; - const self = this; - Object.defineProperty(this, "size", { - get: function () { - return self.arr.length; - }, - configurable: false, - }); -} - -MapPolyFill.prototype.set = function (k, v) { - this.arr.push([k, v]); -}; - -MapPolyFill.prototype.get = function (k) { - return this.arr.filter(function (item) { - return item[0] === k; - })[0]; -}; - -MapPolyFill.prototype.forEach = function (callback) { - this.arr.forEach(function (item) { - // first the value, then the key - callback(item[1], item[0]); - }); -}; - -MapPolyFill.prototype.toString = function () { - return this.arr.toString(); -}; - -function SetPolyFill(arr) { - this.arr = arr || []; -} - -SetPolyFill.prototype.forEach = function (cb, thisArg) { - this.arr.forEach(cb, thisArg); -}; - -SetPolyFill.prototype.add = function (x) { - this.arr.push(x); -}; - -SetPolyFill.prototype.toString = function () { - return this.arr.toString(); -}; - // Core driver used ccmHelper helper.ccmHelper = helper.ccm; From b996bc11f5a94ba7cf6bc7ddfadb92cdbcb9c5b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stanis=C5=82aw=20Czech?= Date: Thu, 29 Jan 2026 15:48:08 +0100 Subject: [PATCH 2/4] Remove helper FallthroughRetryPolicy This helper policy was used only in broken integration tests. This policy appears to be re-implementation of the driver FallthroughRetryPolicy. Why: who knows... --- test/test-helper.js | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/test/test-helper.js b/test/test-helper.js index 6dacb68ac..aaad9168a 100644 --- a/test/test-helper.js +++ b/test/test-helper.js @@ -639,7 +639,6 @@ const helper = { Map: MapPolyFill, Set: SetPolyFill, AllowListPolicy: AllowListPolicy, - FallthroughRetryPolicy: FallthroughRetryPolicy, /** * Determines if test tracing is enabled */ @@ -1971,21 +1970,6 @@ AllowListPolicy.prototype.newQueryPlan = function (keyspace, info, callback) { }); }; -function FallthroughRetryPolicy() {} - -util.inherits(FallthroughRetryPolicy, policies.retry.RetryPolicy); - -FallthroughRetryPolicy.prototype.onUnavailable = function () { - this.rethrowResult(); -}; - -FallthroughRetryPolicy.prototype.onReadTimeout = - FallthroughRetryPolicy.prototype.onUnavailable; -FallthroughRetryPolicy.prototype.onWriteTimeout = - FallthroughRetryPolicy.prototype.onUnavailable; -FallthroughRetryPolicy.prototype.onRequestError = - FallthroughRetryPolicy.prototype.onUnavailable; - /** * Conditionally executes func if testVersion is <= the current cassandra version. * @param {String} testVersion Minimum version of Cassandra/Scylla needed. From d81ff7dc1722c9019d4e155116bd59a649547f60 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stanis=C5=82aw=20Czech?= Date: Thu, 29 Jan 2026 15:50:00 +0100 Subject: [PATCH 3/4] Remove helper AllowListPolicy This helper policy was used only in broken integration tests. This policy appears to be re-implementation of the driver AllowListPolicy. At least here there is an explanation, that it was used to filter by last octet of IP address, instead of full IP address as the base policy. Still, we cannot use this policy as we lack support for custom policies. If it becomes necessary to have such filtering again, we can implement it at that point in time. --- test/test-helper.js | 37 ------------------------------------- 1 file changed, 37 deletions(-) diff --git a/test/test-helper.js b/test/test-helper.js index aaad9168a..e1a8bc781 100644 --- a/test/test-helper.js +++ b/test/test-helper.js @@ -638,7 +638,6 @@ const helper = { }, Map: MapPolyFill, Set: SetPolyFill, - AllowListPolicy: AllowListPolicy, /** * Determines if test tracing is enabled */ @@ -1934,42 +1933,6 @@ const dataProviderWithCollections = dataProvider ]); helper.dataProviderWithCollections = dataProviderWithCollections; -/** - * For test purposes, filters the child policy by last octet of the ip address - * @param {Array} list - * @param [childPolicy] - * @constructor - */ -function AllowListPolicy(list, childPolicy) { - this.list = list; - this.childPolicy = - childPolicy || new policies.loadBalancing.RoundRobinPolicy(); -} - -util.inherits(AllowListPolicy, policies.loadBalancing.LoadBalancingPolicy); - -AllowListPolicy.prototype.init = function (client, hosts, callback) { - this.childPolicy.init(client, hosts, callback); -}; - -AllowListPolicy.prototype.newQueryPlan = function (keyspace, info, callback) { - const list = this.list; - this.childPolicy.newQueryPlan(keyspace, info, function (err, iterator) { - callback(err, { - next: function () { - let item = iterator.next(); - while (!item.done) { - if (list.indexOf(helper.lastOctetOf(item.value)) >= 0) { - break; - } - item = iterator.next(); - } - return item; - }, - }); - }); -}; - /** * Conditionally executes func if testVersion is <= the current cassandra version. * @param {String} testVersion Minimum version of Cassandra/Scylla needed. From da661b258e566bb4641b321c9cb1408abaebda4f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stanis=C5=82aw=20Czech?= Date: Thu, 29 Jan 2026 16:17:13 +0100 Subject: [PATCH 4/4] Refactor ccm helpers This is done to allow editor to properly recognize the types of the helper.ccmHelper --- test/test-helper.js | 989 ++++++++++++++++++++++---------------------- 1 file changed, 504 insertions(+), 485 deletions(-) diff --git a/test/test-helper.js b/test/test-helper.js index e1a8bc781..807215f6f 100644 --- a/test/test-helper.js +++ b/test/test-helper.js @@ -89,6 +89,507 @@ class SetPolyFill { } } +const ccm = { + /** + * Removes previous and creates a new cluster (create, populate and start) + * @param {Number|String} nodeLength number of nodes in the cluster. If multiple dcs, use the notation x:y:z:... + * @param {{[vnodes]: Boolean, [yaml]: Array., [jvmArgs]: Array., [ssl]: Boolean, + * [dseYaml]: Array., [workloads]: Array., [sleep]: Number, [ipFormat]: String|null, partitioner: String}} options + * @param {Function} callback + */ + startAll: function (nodeLength, options, callback) { + const self = helper.ccm; + options = options || {}; + // adapt to multi dc format so data center naming is consistent. + if (typeof nodeLength === "number") { + nodeLength = nodeLength + ":0"; + } + + const serverInfo = helper.getServerInfo(); + + helper.trace( + `Starting ${serverInfo.isScylla ? "Scylla" : "Cassandra"} cluster v${serverInfo.version} with ${nodeLength} node(s)`, + ); + + utils.series( + [ + function (next) { + // it wont hurt to remove + self.exec(["remove"], function () { + // ignore error + next(); + }); + }, + function (next) { + const clusterName = helper.getRandomName("test"); + let create = ["create", clusterName]; + + if (serverInfo.isScylla) { + create.push("--scylla"); + } + + create.push("-v", serverInfo.version); + + if (process.env["CCM_INSTALL_DIR"]) { + create = [ + "create", + clusterName, + "--install-dir=" + process.env["CCM_INSTALL_DIR"], + ]; + helper.trace("With", create[2]); + } + + if (options.ssl) { + create.push("--ssl", self.getPath("ssl")); + } + + if (options.partitioner) { + create.push("-p"); + create.push(options.partitioner); + } + + self.exec(create, helper.waitCallback(options.sleep, next)); + }, + function (next) { + const populate = ["populate", "-n", nodeLength.toString()]; + if (options.vnodes) { + populate.push("--vnodes"); + } + if (options.ipFormat) { + populate.push("--ip-format=" + options.ipFormat); + } + self.exec( + populate, + helper.waitCallback(options.sleep, next), + ); + }, + function (next) { + if (!options.yaml || !options.yaml.length) { + return next(); + } + helper.trace("With cassandra yaml options", options.yaml); + self.exec(["updateconf"].concat(options.yaml), next); + }, + function (next) { + if (!options.dseYaml || !options.dseYaml.length) { + return next(); + } + helper.trace("With dse yaml options", options.dseYaml); + self.exec(["updatedseconf"].concat(options.dseYaml), next); + }, + function (next) { + if (!options.workloads || !options.workloads.length) { + return next(); + } + helper.trace("With workloads", options.workloads); + self.exec( + ["setworkload", options.workloads.join(",")], + next, + ); + }, + function (next) { + const start = ["start", "--wait-for-binary-proto"]; + + if ( + helper.isWin() && + helper.isCassandraGreaterThan("2.2.4") + ) { + start.push("--quiet-windows"); + } + + if ( + Array.isArray(options.jvmArgs) && + !helper.getServerInfo().isScylla + ) { + options.jvmArgs.forEach(function (arg) { + start.push("--jvm_arg", arg); + }, this); + helper.trace("With jvm args", options.jvmArgs); + } + + self.exec(start, helper.waitCallback(options.sleep, next)); + }, + self.waitForUp.bind(self), + ], + function (err) { + callback(err); + }, + ); + }, + + start: function (nodeLength, options) { + return function executeStartAll(next) { + helper.ccm.startAll(nodeLength, options, next); + }; + }, + + /** + * Adds a new node to the cluster + * @param {number|{nodeIndex: number, dc?: string}} options 1 based index of the node or options. + * @param {Function} callback + */ + bootstrapNode: function (options, callback) { + if (typeof options === "number") { + options = { nodeIndex: options }; + } + + const ipPrefix = helper.ipPrefix; + helper.trace("bootstrapping node", options.nodeIndex); + const ccmArgs = [ + "add", + "node" + options.nodeIndex, + "-i", + ipPrefix + options.nodeIndex, + "-j", + (7000 + 100 * options.nodeIndex).toString(), + "-b", + ]; + + if (helper.getServerInfo().isScylla) { + ccmArgs.push("--scylla"); + } + + if (options.dc) { + ccmArgs.push("-d", options.dc); + } + + helper.ccm.exec(ccmArgs, callback); + }, + + decommissionNode: function (nodeIndex, callback) { + helper.trace("decommissioning node", nodeIndex); + const args = ["node" + nodeIndex, "decommission"]; + helper.ccm.exec(args, callback); + }, + + /** + * Sets the workload(s) for a given node. + * @param {Number} nodeIndex 1 based index of the node + * @param {Array} workloads workloads to set. + * @param {Function} callback + */ + setWorkload: function (nodeIndex, workloads, callback) { + helper.trace("node", nodeIndex, "with workloads", workloads); + helper.ccm.exec( + ["node" + nodeIndex, "setworkload", workloads.join(",")], + callback, + ); + }, + + /** + * @param {Number} nodeIndex 1 based index of the node + * @param {Function} callback + */ + startNode: function (nodeIndex, callback) { + const args = [ + "node" + nodeIndex, + "start", + "--wait-other-notice", + "--wait-for-binary-proto", + ]; + + if (helper.isWin() && helper.isCassandraGreaterThan("2.2.4")) { + args.push("--quiet-windows"); + } + + helper.ccm.exec(args, callback); + }, + + /** + * @param {Number} nodeIndex 1 based index of the node + * @param {Function} callback + */ + stopNode: function (nodeIndex, callback) { + helper.ccm.exec(["node" + nodeIndex, "stop"], callback); + }, + + pauseNode: function (nodeIndex, callback) { + helper.ccm.exec(["node" + nodeIndex, "pause"], callback); + }, + + resumeNode: function (nodeIndex, callback) { + helper.ccm.exec(["node" + nodeIndex, "resume"], callback); + }, + + exec: function (params, callback) { + helper.ccm.spawn("ccm", params, callback); + }, + + spawn: function (processName, params, callback) { + if (!callback) { + callback = function () {}; + } + params = params || []; + const originalProcessName = processName; + if (process.platform.indexOf("win") === 0) { + params = ["/c", processName].concat(params); + processName = "cmd.exe"; + } + const p = spawn(processName, params); + const stdoutArray = []; + const stderrArray = []; + let closing = 0; + p.stdout.setEncoding("utf8"); + p.stderr.setEncoding("utf8"); + p.stdout.on("data", function (data) { + stdoutArray.push(data); + }); + + p.stderr.on("data", function (data) { + stderrArray.push(data); + }); + + p.on("close", function (code) { + if (closing++ > 0) { + // avoid calling multiple times + return; + } + const info = { + code: code, + stdout: stdoutArray, + stderr: stderrArray, + }; + let err = null; + if (code !== 0) { + err = new Error( + "Error executing " + + originalProcessName + + ":\n" + + info.stderr.join("\n") + + info.stdout.join("\n"), + ); + err.info = info; + } + callback(err, info); + }); + }, + + remove: function (callback) { + helper.ccm.exec(["remove"], callback); + }, + + removeIfAny: function (callback) { + helper.ccm.exec(["remove"], function () { + // Ignore errors + callback(); + }); + }, + + /** + * Reads the logs to see if the cql protocol is up + * @param callback + */ + waitForUp: function (callback) { + let started = false; + let retryCount = 0; + const self = helper.ccm; + utils.whilst( + function () { + return !started && retryCount < 60; + }, + function iterator(next) { + self.exec(["node1", "showlog"], function (err, info) { + if (err) { + return next(err); + } + const regex = /Starting listening for CQL clients/im; + started = regex.test(info.stdout.join("")); + retryCount++; + if (!started) { + // wait 1 sec between retries + return setTimeout(next, 1000); + } + return next(); + }); + }, + callback, + ); + }, + + /** + * Gets the path of the ccm + * @param subPath + */ + getPath: function (subPath) { + let ccmPath = process.env.CCM_PATH; + if (!ccmPath) { + ccmPath = + process.platform === "win32" + ? process.env.HOMEPATH + : process.env.HOME; + ccmPath = path.join(ccmPath, "workspace/tools/ccm"); + } + return path.join(ccmPath, subPath); + }, +}; + +const ads = { + _spawnAndWait: function (processName, params, cb) { + if (!cb) { + throw new Error("Callback is required"); + } + + const originalProcessName = processName; + if (process.platform.indexOf("win") === 0) { + params = ["/c", processName].concat(params); + processName = "cmd.exe"; + } + helper.trace("Executing: " + processName + " " + params.join(" ")); + + let timeout; + + const callbackOnce = (err) => { + if (timeout) { + clearTimeout(timeout); + } + cb(err); + cb = utils.noop; + }; + + // If process hasn't completed in 10 seconds. + timeout = setTimeout(function () { + callbackOnce( + new Error( + "Timed out while waiting for " + + processName + + " to complete.", + ), + ); + }, 10000); + + const p = spawn(processName, params, { + env: Object.assign( + { KRB5_CONFIG: this.getKrb5ConfigPath() }, + process.env, + ), + }); + + p.stdout.on("data", function (data) { + helper.trace("%s_out> %s", originalProcessName, data); + if (data.indexOf("Principal Initialization Complete.") !== -1) { + callbackOnce(); + } + }); + + p.stderr.on("data", function (data) { + helper.trace("%s_err> %s", originalProcessName, data); + }); + + p.on("error", function (err) { + helper.trace("Sub-process emitted error", processName, err); + callbackOnce(err); + }); + + p.on("close", function (code) { + helper.trace("%s exited with code %d", originalProcessName, code); + clearTimeout(timeout); + if (code !== 0) { + callbackOnce( + new Error( + "Process exited with non-zero exit code: " + code, + ), + ); + } + }); + + return p; + }, + + /** + * Starts the embedded-ads jar with ldap (port 10389) and kerberos enabled (port 10088). Depends on ADS_JAR + * environment variable to resolve the absolute file path of the embedded-ads jar. + * + * @param {Function} cb Callback to invoke when server is started and listening. + */ + start: function (cb) { + const self = this; + temp.mkdir("ads", function (err, dir) { + if (err) { + cb(err); + } + self.dir = dir; + const jarFile = self.getJar(); + const params = ["-jar", jarFile, "-k", "--confdir", self.dir]; + + self.process = self._spawnAndWait("java", params, function (err) { + if (!err) { + // Set KRB5_CONFIG environment variable so kerberos module knows to use it. + process.env.KRB5_CONFIG = self.getKrb5ConfigPath(); + } + + cb(err); + }); + }); + }, + + _exec: function (processName, params, callback) { + if (params.length === 0) { + childProcessExec(processName, callback); + return; + } + + childProcessExec(`${processName} ${params.join(" ")}`, callback); + }, + + /** + * Stops the server process. + * @param {Function} cb Callback to invoke when server stopped or with an error. + */ + stop: function (cb) { + if (this.process !== undefined) { + if (this.process.exitCode) { + helper.trace( + "Server already stopped with exit code %d.", + this.process.exitCode, + ); + cb(); + } else { + this.process.on("close", function () { + cb(); + }); + this.process.on("error", cb); + this.process.kill("SIGINT"); + } + } else { + cb(Error("Process is not defined.")); + } + }, + + /** + * Gets the path of the embedded-ads jar. Resolved from ADS_JAR environment variable or $HOME/embedded-ads.jar. + */ + getJar: function () { + let adsJar = process.env.ADS_JAR; + if (!adsJar) { + helper.trace( + "ADS_JAR environment variable not set, using $HOME/embedded-ads.jar", + ); + adsJar = + process.platform === "win32" + ? process.env.HOMEPATH + : process.env.HOME; + adsJar = path.join(adsJar, "embedded-ads.jar"); + } + helper.trace("Using %s for embedded ADS server.", adsJar); + return adsJar; + }, + + /** + * Returns the file path to the keytab for the given user. + * @param {String} username User to resolve keytab for. + */ + getKeytabPath: function (username) { + return path.join(this.dir, username + ".keytab"); + }, + + /** + * Returns the file path to the krb5.conf file generated by ads. + */ + getKrb5ConfigPath: function () { + return path.join(this.dir, "krb5.conf"); + }, +}; + const helper = { /** * Creates a ccm cluster, initializes a Client instance the before() and after() hooks, create @@ -209,8 +710,9 @@ const helper = { return prefix + ("000000000000000" + value.toString()).slice(-16); }, ipPrefix: "127.0.0.", - ccm: {}, - ads: {}, + ccm: ccm, + ccmHelper: ccm, + ads: ads, /** * Returns a cql string with a CREATE TABLE command containing all common types * @param {String} tableName @@ -1101,489 +1603,6 @@ const helper = { }, }; -// Core driver used ccmHelper -helper.ccmHelper = helper.ccm; - -/** - * Removes previous and creates a new cluster (create, populate and start) - * @param {Number|String} nodeLength number of nodes in the cluster. If multiple dcs, use the notation x:y:z:... - * @param {{[vnodes]: Boolean, [yaml]: Array., [jvmArgs]: Array., [ssl]: Boolean, - * [dseYaml]: Array., [workloads]: Array., [sleep]: Number, [ipFormat]: String|null, partitioner: String}} options - * @param {Function} callback - */ -helper.ccm.startAll = function (nodeLength, options, callback) { - const self = helper.ccm; - options = options || {}; - // adapt to multi dc format so data center naming is consistent. - if (typeof nodeLength === "number") { - nodeLength = nodeLength + ":0"; - } - - const serverInfo = helper.getServerInfo(); - - helper.trace( - `Starting ${serverInfo.isScylla ? "Scylla" : "Cassandra"} cluster v${serverInfo.version} with ${nodeLength} node(s)`, - ); - - utils.series( - [ - function (next) { - // it wont hurt to remove - self.exec(["remove"], function () { - // ignore error - next(); - }); - }, - function (next) { - const clusterName = helper.getRandomName("test"); - let create = ["create", clusterName]; - - if (serverInfo.isScylla) { - create.push("--scylla"); - } - - create.push("-v", serverInfo.version); - - if (process.env["CCM_INSTALL_DIR"]) { - create = [ - "create", - clusterName, - "--install-dir=" + process.env["CCM_INSTALL_DIR"], - ]; - helper.trace("With", create[2]); - } - - if (options.ssl) { - create.push("--ssl", self.getPath("ssl")); - } - - if (options.partitioner) { - create.push("-p"); - create.push(options.partitioner); - } - - self.exec(create, helper.waitCallback(options.sleep, next)); - }, - function (next) { - const populate = ["populate", "-n", nodeLength.toString()]; - if (options.vnodes) { - populate.push("--vnodes"); - } - if (options.ipFormat) { - populate.push("--ip-format=" + options.ipFormat); - } - self.exec(populate, helper.waitCallback(options.sleep, next)); - }, - function (next) { - if (!options.yaml || !options.yaml.length) { - return next(); - } - helper.trace("With cassandra yaml options", options.yaml); - self.exec(["updateconf"].concat(options.yaml), next); - }, - function (next) { - if (!options.dseYaml || !options.dseYaml.length) { - return next(); - } - helper.trace("With dse yaml options", options.dseYaml); - self.exec(["updatedseconf"].concat(options.dseYaml), next); - }, - function (next) { - if (!options.workloads || !options.workloads.length) { - return next(); - } - helper.trace("With workloads", options.workloads); - self.exec(["setworkload", options.workloads.join(",")], next); - }, - function (next) { - const start = ["start", "--wait-for-binary-proto"]; - - if (helper.isWin() && helper.isCassandraGreaterThan("2.2.4")) { - start.push("--quiet-windows"); - } - - if ( - Array.isArray(options.jvmArgs) && - !helper.getServerInfo().isScylla - ) { - options.jvmArgs.forEach(function (arg) { - start.push("--jvm_arg", arg); - }, this); - helper.trace("With jvm args", options.jvmArgs); - } - - self.exec(start, helper.waitCallback(options.sleep, next)); - }, - self.waitForUp.bind(self), - ], - function (err) { - callback(err); - }, - ); -}; - -helper.ccm.start = function (nodeLength, options) { - return function executeStartAll(next) { - helper.ccm.startAll(nodeLength, options, next); - }; -}; - -/** - * Adds a new node to the cluster - * @param {number|{nodeIndex: number, dc?: string}} options 1 based index of the node or options. - * @param {Function} callback - */ -helper.ccm.bootstrapNode = function (options, callback) { - if (typeof options === "number") { - options = { nodeIndex: options }; - } - - const ipPrefix = helper.ipPrefix; - helper.trace("bootstrapping node", options.nodeIndex); - const ccmArgs = [ - "add", - "node" + options.nodeIndex, - "-i", - ipPrefix + options.nodeIndex, - "-j", - (7000 + 100 * options.nodeIndex).toString(), - "-b", - ]; - - if (helper.getServerInfo().isScylla) { - ccmArgs.push("--scylla"); - } - - if (options.dc) { - ccmArgs.push("-d", options.dc); - } - - helper.ccm.exec(ccmArgs, callback); -}; - -helper.ccm.decommissionNode = function (nodeIndex, callback) { - helper.trace("decommissioning node", nodeIndex); - const args = ["node" + nodeIndex, "decommission"]; - helper.ccm.exec(args, callback); -}; - -/** - * Sets the workload(s) for a given node. - * @param {Number} nodeIndex 1 based index of the node - * @param {Array} workloads workloads to set. - * @param {Function} callback - */ -helper.ccm.setWorkload = function (nodeIndex, workloads, callback) { - helper.trace("node", nodeIndex, "with workloads", workloads); - helper.ccm.exec( - ["node" + nodeIndex, "setworkload", workloads.join(",")], - callback, - ); -}; - -/** - * @param {Number} nodeIndex 1 based index of the node - * @param {Function} callback - */ -helper.ccm.startNode = function (nodeIndex, callback) { - const args = [ - "node" + nodeIndex, - "start", - "--wait-other-notice", - "--wait-for-binary-proto", - ]; - - if (helper.isWin() && helper.isCassandraGreaterThan("2.2.4")) { - args.push("--quiet-windows"); - } - - helper.ccm.exec(args, callback); -}; - -/** - * @param {Number} nodeIndex 1 based index of the node - * @param {Function} callback - */ -helper.ccm.stopNode = function (nodeIndex, callback) { - helper.ccm.exec(["node" + nodeIndex, "stop"], callback); -}; - -helper.ccm.pauseNode = function (nodeIndex, callback) { - helper.ccm.exec(["node" + nodeIndex, "pause"], callback); -}; - -helper.ccm.resumeNode = function (nodeIndex, callback) { - helper.ccm.exec(["node" + nodeIndex, "resume"], callback); -}; - -helper.ccm.exec = function (params, callback) { - helper.ccm.spawn("ccm", params, callback); -}; - -helper.ccm.spawn = function (processName, params, callback) { - if (!callback) { - callback = function () {}; - } - params = params || []; - const originalProcessName = processName; - if (process.platform.indexOf("win") === 0) { - params = ["/c", processName].concat(params); - processName = "cmd.exe"; - } - const p = spawn(processName, params); - const stdoutArray = []; - const stderrArray = []; - let closing = 0; - p.stdout.setEncoding("utf8"); - p.stderr.setEncoding("utf8"); - p.stdout.on("data", function (data) { - stdoutArray.push(data); - }); - - p.stderr.on("data", function (data) { - stderrArray.push(data); - }); - - p.on("close", function (code) { - if (closing++ > 0) { - // avoid calling multiple times - return; - } - const info = { code: code, stdout: stdoutArray, stderr: stderrArray }; - let err = null; - if (code !== 0) { - err = new Error( - "Error executing " + - originalProcessName + - ":\n" + - info.stderr.join("\n") + - info.stdout.join("\n"), - ); - err.info = info; - } - callback(err, info); - }); -}; - -helper.ccm.remove = function (callback) { - helper.ccm.exec(["remove"], callback); -}; - -helper.ccm.removeIfAny = function (callback) { - helper.ccm.exec(["remove"], function () { - // Ignore errors - callback(); - }); -}; - -/** - * Reads the logs to see if the cql protocol is up - * @param callback - */ -helper.ccm.waitForUp = function (callback) { - let started = false; - let retryCount = 0; - const self = helper.ccm; - utils.whilst( - function () { - return !started && retryCount < 60; - }, - function iterator(next) { - self.exec(["node1", "showlog"], function (err, info) { - if (err) { - return next(err); - } - const regex = /Starting listening for CQL clients/im; - started = regex.test(info.stdout.join("")); - retryCount++; - if (!started) { - // wait 1 sec between retries - return setTimeout(next, 1000); - } - return next(); - }); - }, - callback, - ); -}; - -/** - * Gets the path of the ccm - * @param subPath - */ -helper.ccm.getPath = function (subPath) { - let ccmPath = process.env.CCM_PATH; - if (!ccmPath) { - ccmPath = - process.platform === "win32" - ? process.env.HOMEPATH - : process.env.HOME; - ccmPath = path.join(ccmPath, "workspace/tools/ccm"); - } - return path.join(ccmPath, subPath); -}; - -helper.ads._spawnAndWait = function (processName, params, cb) { - if (!cb) { - throw new Error("Callback is required"); - } - - const originalProcessName = processName; - if (process.platform.indexOf("win") === 0) { - params = ["/c", processName].concat(params); - processName = "cmd.exe"; - } - helper.trace("Executing: " + processName + " " + params.join(" ")); - - let timeout; - - const callbackOnce = (err) => { - if (timeout) { - clearTimeout(timeout); - } - cb(err); - cb = utils.noop; - }; - - // If process hasn't completed in 10 seconds. - timeout = setTimeout(function () { - callbackOnce( - new Error( - "Timed out while waiting for " + processName + " to complete.", - ), - ); - }, 10000); - - const p = spawn(processName, params, { - env: Object.assign( - { KRB5_CONFIG: this.getKrb5ConfigPath() }, - process.env, - ), - }); - - p.stdout.on("data", function (data) { - helper.trace("%s_out> %s", originalProcessName, data); - if (data.indexOf("Principal Initialization Complete.") !== -1) { - callbackOnce(); - } - }); - - p.stderr.on("data", function (data) { - helper.trace("%s_err> %s", originalProcessName, data); - }); - - p.on("error", function (err) { - helper.trace("Sub-process emitted error", processName, err); - callbackOnce(err); - }); - - p.on("close", function (code) { - helper.trace("%s exited with code %d", originalProcessName, code); - clearTimeout(timeout); - if (code !== 0) { - callbackOnce( - new Error("Process exited with non-zero exit code: " + code), - ); - } - }); - - return p; -}; - -/** - * Starts the embedded-ads jar with ldap (port 10389) and kerberos enabled (port 10088). Depends on ADS_JAR - * environment variable to resolve the absolute file path of the embedded-ads jar. - * - * @param {Function} cb Callback to invoke when server is started and listening. - */ -helper.ads.start = function (cb) { - const self = this; - temp.mkdir("ads", function (err, dir) { - if (err) { - cb(err); - } - self.dir = dir; - const jarFile = self.getJar(); - const params = ["-jar", jarFile, "-k", "--confdir", self.dir]; - - self.process = self._spawnAndWait("java", params, function (err) { - if (!err) { - // Set KRB5_CONFIG environment variable so kerberos module knows to use it. - process.env.KRB5_CONFIG = self.getKrb5ConfigPath(); - } - - cb(err); - }); - }); -}; - -helper.ads._exec = function (processName, params, callback) { - if (params.length === 0) { - childProcessExec(processName, callback); - return; - } - - childProcessExec(`${processName} ${params.join(" ")}`, callback); -}; - -/** - * Stops the server process. - * @param {Function} cb Callback to invoke when server stopped or with an error. - */ -helper.ads.stop = function (cb) { - if (this.process !== undefined) { - if (this.process.exitCode) { - helper.trace( - "Server already stopped with exit code %d.", - this.process.exitCode, - ); - cb(); - } else { - this.process.on("close", function () { - cb(); - }); - this.process.on("error", cb); - this.process.kill("SIGINT"); - } - } else { - cb(Error("Process is not defined.")); - } -}; - -/** - * Gets the path of the embedded-ads jar. Resolved from ADS_JAR environment variable or $HOME/embedded-ads.jar. - */ -helper.ads.getJar = function () { - let adsJar = process.env.ADS_JAR; - if (!adsJar) { - helper.trace( - "ADS_JAR environment variable not set, using $HOME/embedded-ads.jar", - ); - adsJar = - process.platform === "win32" - ? process.env.HOMEPATH - : process.env.HOME; - adsJar = path.join(adsJar, "embedded-ads.jar"); - } - helper.trace("Using %s for embedded ADS server.", adsJar); - return adsJar; -}; - -/** - * Returns the file path to the keytab for the given user. - * @param {String} username User to resolve keytab for. - */ -helper.ads.getKeytabPath = function (username) { - return path.join(this.dir, username + ".keytab"); -}; - -/** - * Returns the file path to the krb5.conf file generated by ads. - */ -helper.ads.getKrb5ConfigPath = function () { - return path.join(this.dir, "krb5.conf"); -}; - /** * @type {Array<{subtypeString : string, typeInfo: import('../lib/encoder').VectorColumnInfo, value: Array}>} */