diff --git a/lib/control-connection.js b/lib/control-connection.js index fd707529..7c11cca2 100644 --- a/lib/control-connection.js +++ b/lib/control-connection.js @@ -87,6 +87,7 @@ class ControlConnection extends events.EventEmitter { this._addressTranslator = this.options.policies.addressResolution; this._reconnectionPolicy = this.options.policies.reconnection; this._reconnectionSchedule = this._reconnectionPolicy.newSchedule(); + this._refreshPromise = null; this._isShuttingDown = false; // Reference to the encoder of the last valid connection @@ -216,7 +217,6 @@ class ControlConnection extends events.EventEmitter { _setHealthListeners(host, connection) { const self = this; - let wasRefreshCalled = 0; function removeListeners() { host.removeListener('down', downOrIgnoredHandler); @@ -225,15 +225,11 @@ class ControlConnection extends events.EventEmitter { } function startReconnecting(hostDown) { - if (wasRefreshCalled++ !== 0) { - // Prevent multiple calls to reconnect - return; - } - removeListeners(); + // Don't attempt to reconnect when the ControlConnection is being shutdown if (self._isShuttingDown) { - // Don't attempt to reconnect when the ControlConnection is being shutdown + this.log('info', 'The ControlConnection will not be refreshed as the Client is being shutdown'); return; } @@ -412,15 +408,13 @@ class ControlConnection extends events.EventEmitter { // To acquire metadata we need to specify the cassandra version this.metadata.setCassandraVersion(this.host.getCassandraVersion()); this.metadata.buildTokens(this.hosts); + } - if (!this.options.isMetadataSyncEnabled) { - this.metadata.initialized = true; - return; - } - + if (isReconnecting && this.options.isMetadataSyncEnabled) { await this.metadata.refreshKeyspacesInternal(false); - this.metadata.initialized = true; } + + this.metadata.initialized = true; } async _refreshControlConnection(hostIterator) { @@ -446,13 +440,32 @@ class ControlConnection extends events.EventEmitter { } /** - * Acquires a new connection and refreshes topology and keyspace metadata. + * Acquires a new connection and refreshes topology and keyspace metadata, with protection against concurrent refreshes. *

When it fails obtaining a connection and there aren't any more hosts, it schedules reconnection.

*

When it fails obtaining the metadata, it marks connection and/or host unusable and retries using the same * iterator from query plan / host list

* @param {Iterator} [hostIterator] */ async _refresh(hostIterator) { + if (this._refreshPromise) { + return await this._refreshPromise; + } + + this._refreshPromise = this._unsafeDoRefresh(hostIterator); + + try { + return await this._refreshPromise; + } finally { + this._refreshPromise = null; + } + } + + /** + * The actual implementation of the refresh logic, without protection against concurrent executions. + *

Should only be used via _refresh.

+ * @param {Iterator} [hostIterator] + */ + async _unsafeDoRefresh(hostIterator) { if (this._isShuttingDown) { this.log('info', 'The ControlConnection will not be refreshed as the Client is being shutdown'); return; @@ -499,7 +512,7 @@ class ControlConnection extends events.EventEmitter { } // Retry the whole thing with the same query plan - return await this._refresh(hostIterator); + return await this._unsafeDoRefresh(hostIterator); } this._reconnectionSchedule = this._reconnectionPolicy.newSchedule(); diff --git a/test/integration/short/control-connection-tests.js b/test/integration/short/control-connection-tests.js index b4b77cfc..a190e078 100644 --- a/test/integration/short/control-connection-tests.js +++ b/test/integration/short/control-connection-tests.js @@ -153,6 +153,25 @@ describe('ControlConnection', function () { assert.strictEqual(cc.hosts.length, 1); }); + it('should not break when refreshing concurrently', async () => { + const cc = newInstance(); + cc.options.policies.loadBalancing = new policies.loadBalancing.RoundRobinPolicy(); + disposeAfter(cc); + + await cc.init(); + await new Promise(r => cc.options.policies.loadBalancing.init(null, cc.hosts, r)); + + const refreshPromises = []; + // randomly emit cc._refresh 100 times + for (let i = 0; i < 100; i++) { + refreshPromises.push(cc._refresh()); + await helper.delayAsync(~~(Math.random() * 100)); + } + await Promise.all(refreshPromises); + assert.ok(cc.host); + assert.ok(cc.connection); + }); + it('should reconnect when host used goes down', async () => { const options = clientOptions.extend( utils.extend({ pooling: helper.getPoolingOptions(1, 1, 500) }, helper.baseOptions));