Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 28 additions & 15 deletions lib/control-connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ class ControlConnection extends events.EventEmitter {
this._addressTranslator = this.options.policies.addressResolution;
this._reconnectionPolicy = this.options.policies.reconnection;
this._reconnectionSchedule = this._reconnectionPolicy.newSchedule();
this._refreshPromise = null;
this._isShuttingDown = false;

// Reference to the encoder of the last valid connection
Expand Down Expand Up @@ -216,7 +217,6 @@ class ControlConnection extends events.EventEmitter {

_setHealthListeners(host, connection) {
const self = this;
let wasRefreshCalled = 0;

function removeListeners() {
host.removeListener('down', downOrIgnoredHandler);
Expand All @@ -225,15 +225,11 @@ class ControlConnection extends events.EventEmitter {
}

function startReconnecting(hostDown) {
if (wasRefreshCalled++ !== 0) {
// Prevent multiple calls to reconnect
return;
}

removeListeners();

// Don't attempt to reconnect when the ControlConnection is being shutdown
if (self._isShuttingDown) {
Comment thread
toptobes marked this conversation as resolved.
// Don't attempt to reconnect when the ControlConnection is being shutdown
this.log('info', 'The ControlConnection will not be refreshed as the Client is being shutdown');
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
this.log('info', 'The ControlConnection will not be refreshed as the Client is being shutdown');
self.log('info', 'The ControlConnection will not be refreshed as the Client is being shutdown');

return;
}

Expand Down Expand Up @@ -412,15 +408,13 @@ class ControlConnection extends events.EventEmitter {
// To acquire metadata we need to specify the cassandra version
this.metadata.setCassandraVersion(this.host.getCassandraVersion());
this.metadata.buildTokens(this.hosts);
}

if (!this.options.isMetadataSyncEnabled) {
this.metadata.initialized = true;
return;
}

if (isReconnecting && this.options.isMetadataSyncEnabled) {
await this.metadata.refreshKeyspacesInternal(false);
this.metadata.initialized = true;
}

this.metadata.initialized = true;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this will have the side effect that if metadata sync is not enabled, this.metadata.initialized = true; will still run everytime it refreshes, not just once at initialization.

}

async _refreshControlConnection(hostIterator) {
Expand All @@ -446,13 +440,32 @@ class ControlConnection extends events.EventEmitter {
}

/**
* Acquires a new connection and refreshes topology and keyspace metadata.
* Acquires a new connection and refreshes topology and keyspace metadata, with protection against concurrent refreshes.
* <p>When it fails obtaining a connection and there aren't any more hosts, it schedules reconnection.</p>
* <p>When it fails obtaining the metadata, it marks connection and/or host unusable and retries using the same
* iterator from query plan / host list</p>
* @param {Iterator<Host>} [hostIterator]
*/
async _refresh(hostIterator) {
if (this._refreshPromise) {
return await this._refreshPromise;
}

this._refreshPromise = this._unsafeDoRefresh(hostIterator);

try {
return await this._refreshPromise;
} finally {
this._refreshPromise = null;
}
}

/**
* The actual implementation of the refresh logic, without protection against concurrent executions.
* <p>Should only be used via _refresh.</p>
* @param {Iterator<Host>} [hostIterator]
*/
async _unsafeDoRefresh(hostIterator) {
if (this._isShuttingDown) {
this.log('info', 'The ControlConnection will not be refreshed as the Client is being shutdown');
return;
Expand Down Expand Up @@ -499,7 +512,7 @@ class ControlConnection extends events.EventEmitter {
}

// Retry the whole thing with the same query plan
return await this._refresh(hostIterator);
return await this._unsafeDoRefresh(hostIterator);
}

this._reconnectionSchedule = this._reconnectionPolicy.newSchedule();
Expand Down
19 changes: 19 additions & 0 deletions test/integration/short/control-connection-tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,25 @@ describe('ControlConnection', function () {
assert.strictEqual(cc.hosts.length, 1);
});

it('should not break when refreshing concurrently', async () => {
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may need a better heuristic for ensuring the refreshes are okay... not sure... I just "borrowed" this from Jane's original PR

const cc = newInstance();
cc.options.policies.loadBalancing = new policies.loadBalancing.RoundRobinPolicy();
disposeAfter(cc);

await cc.init();
await new Promise(r => cc.options.policies.loadBalancing.init(null, cc.hosts, r));

const refreshPromises = [];
// randomly emit cc._refresh 100 times
for (let i = 0; i < 100; i++) {
refreshPromises.push(cc._refresh());
await helper.delayAsync(~~(Math.random() * 100));
}
await Promise.all(refreshPromises);
Comment thread
toptobes marked this conversation as resolved.
assert.ok(cc.host);
assert.ok(cc.connection);
});

it('should reconnect when host used goes down', async () => {
const options = clientOptions.extend(
utils.extend({ pooling: helper.getPoolingOptions(1, 1, 500) }, helper.baseOptions));
Expand Down
Loading