diff --git a/cassandra/query.py b/cassandra/query.py index 6c6878fdb4..cf7552ae5e 100644 --- a/cassandra/query.py +++ b/cassandra/query.py @@ -23,6 +23,24 @@ import re import struct import time + +# Regex to detect LWT (Lightweight Transaction) queries in CQL strings. +# Matches: INSERT ... IF NOT EXISTS, UPDATE/DELETE ... IF EXISTS, +# and conditional updates/deletes (e.g. UPDATE ... IF col = ..., +# UPDATE ... IF "col" = ...). +# Uses word boundaries and case-insensitive matching. +# This is a best-effort heuristic for SimpleStatement; PreparedStatement +# gets the authoritative is_lwt flag from the server. +_LWT_PATTERN = re.compile( + r'\bIF\s+(?:NOT\s+)?EXISTS\b' # IF [NOT] EXISTS + r'|\bIF\s+[a-zA-Z_"]', # IF or IF "" (conditional) + re.IGNORECASE +) + +# DML verbs that can be LWT queries. Only these statement types can contain +# Paxos/LWT clauses. DDL statements (CREATE/ALTER/DROP) also use IF [NOT] EXISTS +# but those are not LWT operations. +_LWT_DML_VERBS = frozenset({'INSERT', 'UPDATE', 'DELETE', 'BEGIN'}) import warnings from cassandra import ConsistencyLevel, OperationTimedOut @@ -416,6 +434,32 @@ def __str__(self): (self.query_string, consistency)) __repr__ = __str__ + def is_lwt(self): + """ + Detect whether this query is a Lightweight Transaction (LWT) by + inspecting the query string for ``IF [NOT] EXISTS`` or ``IF `` + clauses in DML statements (INSERT, UPDATE, DELETE, BEGIN BATCH). + + DDL statements like ``CREATE TABLE IF NOT EXISTS`` are excluded. + + This is a best-effort heuristic. For authoritative LWT detection, + use :class:`.PreparedStatement` which gets the ``is_lwt`` flag from + the server during PREPARE. + + The result is cached after the first call. + """ + try: + return self._cached_is_lwt + except AttributeError: + # Quick check: only DML statements can be LWT + query = self._query_string.lstrip() + first_word = query.split(None, 1)[0].upper() if query else '' + if first_word not in _LWT_DML_VERBS: + self._cached_is_lwt = False + else: + self._cached_is_lwt = bool(_LWT_PATTERN.search(query)) + return self._cached_is_lwt + class PreparedStatement(object): """ diff --git a/tests/unit/test_query.py b/tests/unit/test_query.py index 6b0ebe690e..38a0cfb28b 100644 --- a/tests/unit/test_query.py +++ b/tests/unit/test_query.py @@ -103,15 +103,174 @@ def test_is_lwt_propagates_from_statements(self): batch_with_bound.add(bound_lwt) assert batch_with_bound.is_lwt() is True - class LwtSimpleStatement(SimpleStatement): - def __init__(self): - super(LwtSimpleStatement, self).__init__( - "INSERT INTO test.table (id) VALUES (2) IF NOT EXISTS" - ) - - def is_lwt(self): - return True + # SimpleStatement now detects LWT from query string (no subclass needed) + lwt_simple = SimpleStatement( + "INSERT INTO test.table (id) VALUES (2) IF NOT EXISTS" + ) + assert lwt_simple.is_lwt() is True batch_with_simple = BatchStatement() - batch_with_simple.add(LwtSimpleStatement()) + batch_with_simple.add(lwt_simple) assert batch_with_simple.is_lwt() is True + +class SimpleStatementIsLwtTest(unittest.TestCase): + """Tests for SimpleStatement.is_lwt() CQL-based LWT detection.""" + + # --- INSERT IF NOT EXISTS --- + + def test_insert_if_not_exists(self): + s = SimpleStatement("INSERT INTO ks.t (a) VALUES (1) IF NOT EXISTS") + assert s.is_lwt() is True + + def test_insert_if_not_exists_lowercase(self): + s = SimpleStatement("insert into ks.t (a) values (1) if not exists") + assert s.is_lwt() is True + + def test_insert_if_not_exists_mixed_case(self): + s = SimpleStatement("INSERT INTO ks.t (a) VALUES (1) If Not Exists") + assert s.is_lwt() is True + + # --- UPDATE IF EXISTS --- + + def test_update_if_exists(self): + s = SimpleStatement("UPDATE ks.t SET a=1 WHERE k=1 IF EXISTS") + assert s.is_lwt() is True + + # --- DELETE IF EXISTS --- + + def test_delete_if_exists(self): + s = SimpleStatement("DELETE FROM ks.t WHERE k=1 IF EXISTS") + assert s.is_lwt() is True + + # --- Conditional UPDATE (IF = ) --- + + def test_conditional_update_equals(self): + s = SimpleStatement("UPDATE ks.t SET a=1 WHERE k=1 IF a = 2") + assert s.is_lwt() is True + + def test_conditional_update_not_equals(self): + s = SimpleStatement("UPDATE ks.t SET a=1 WHERE k=1 IF a != 2") + assert s.is_lwt() is True + + def test_conditional_update_greater_than(self): + s = SimpleStatement("UPDATE ks.t SET a=1 WHERE k=1 IF a > 2") + assert s.is_lwt() is True + + def test_conditional_update_multiple_conditions(self): + s = SimpleStatement( + "UPDATE ks.t SET a=1 WHERE k=1 IF a = 2 AND b = 3") + assert s.is_lwt() is True + + # --- Conditional DELETE --- + + def test_conditional_delete(self): + s = SimpleStatement("DELETE FROM ks.t WHERE k=1 IF a = 2") + assert s.is_lwt() is True + + # --- Non-LWT queries (should return False) --- + + def test_select_not_lwt(self): + s = SimpleStatement("SELECT * FROM ks.t WHERE k=1") + assert s.is_lwt() is False + + def test_insert_without_if(self): + s = SimpleStatement("INSERT INTO ks.t (a) VALUES (1)") + assert s.is_lwt() is False + + def test_update_without_if(self): + s = SimpleStatement("UPDATE ks.t SET a=1 WHERE k=1") + assert s.is_lwt() is False + + def test_delete_without_if(self): + s = SimpleStatement("DELETE FROM ks.t WHERE k=1") + assert s.is_lwt() is False + + def test_create_table_with_if_not_exists(self): + """DDL IF NOT EXISTS is correctly excluded — only DML can be LWT.""" + s = SimpleStatement("CREATE TABLE IF NOT EXISTS ks.t (a int PRIMARY KEY)") + assert s.is_lwt() is False + + def test_create_index_if_not_exists(self): + s = SimpleStatement("CREATE INDEX IF NOT EXISTS idx ON ks.t (a)") + assert s.is_lwt() is False + + def test_create_keyspace_if_not_exists(self): + s = SimpleStatement( + "CREATE KEYSPACE IF NOT EXISTS ks WITH replication = " + "{'class': 'SimpleStrategy', 'replication_factor': 1}") + assert s.is_lwt() is False + + def test_drop_table_if_exists(self): + s = SimpleStatement("DROP TABLE IF EXISTS ks.t") + assert s.is_lwt() is False + + def test_alter_table_not_lwt(self): + s = SimpleStatement("ALTER TABLE ks.t ADD col int") + assert s.is_lwt() is False + + # --- Caching --- + + def test_result_is_cached(self): + s = SimpleStatement("INSERT INTO ks.t (a) VALUES (1) IF NOT EXISTS") + assert s.is_lwt() is True + assert s.is_lwt() is True # should use cache + assert s._cached_is_lwt is True + + def test_non_lwt_result_is_cached(self): + s = SimpleStatement("SELECT * FROM ks.t") + assert s.is_lwt() is False + assert s._cached_is_lwt is False + + # --- Edge cases --- + + def test_multiline_query(self): + s = SimpleStatement(""" + INSERT INTO ks.t (a, b) + VALUES (1, 2) + IF NOT EXISTS + """) + assert s.is_lwt() is True + + def test_extra_whitespace(self): + s = SimpleStatement("UPDATE ks.t SET a=1 WHERE k=1 IF EXISTS") + assert s.is_lwt() is True + + def test_tab_separated(self): + s = SimpleStatement("DELETE FROM ks.t WHERE k=1\tIF\tEXISTS") + assert s.is_lwt() is True + + # --- Quoted identifiers --- + + def test_conditional_with_quoted_identifier(self): + s = SimpleStatement('UPDATE ks.t SET a=1 WHERE k=1 IF "my_col" = 2') + assert s.is_lwt() is True + + def test_conditional_delete_quoted_identifier(self): + s = SimpleStatement('DELETE FROM ks.t WHERE k=1 IF "Col" = 2') + assert s.is_lwt() is True + + # --- BEGIN BATCH --- + + def test_begin_batch_with_lwt(self): + s = SimpleStatement( + "BEGIN BATCH " + "INSERT INTO ks.t (a) VALUES (1) IF NOT EXISTS " + "APPLY BATCH") + assert s.is_lwt() is True + + def test_begin_batch_without_lwt(self): + s = SimpleStatement( + "BEGIN BATCH " + "INSERT INTO ks.t (a) VALUES (1) " + "APPLY BATCH") + assert s.is_lwt() is False + + # --- Leading whitespace --- + + def test_leading_whitespace(self): + s = SimpleStatement(" \n INSERT INTO ks.t (a) VALUES (1) IF NOT EXISTS") + assert s.is_lwt() is True + + def test_leading_whitespace_ddl(self): + s = SimpleStatement(" \n CREATE TABLE IF NOT EXISTS ks.t (a int PRIMARY KEY)") + assert s.is_lwt() is False