Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,23 +90,6 @@ def _deserialize_value(val: Any) -> Any:
return val


def _unpack_value_pb(value):
which = value.WhichOneof("kind")
if which == "null_value":
return None
elif which == "number_value":
return value.number_value
elif which == "string_value":
return value.string_value
elif which == "bool_value":
return value.bool_value
elif which == "struct_value":
return {k: _unpack_value_pb(v) for k, v in value.struct_value.fields.items()}
elif which == "list_value":
return [_unpack_value_pb(v) for v in value.list_value.values]
return None


def decode_from_string(encoded_partition_id):
gzip_bytes = base64.b64decode(bytes(encoded_partition_id, "utf-8"))
partition_id_bytes = gzip.decompress(gzip_bytes)
Expand All @@ -124,9 +107,7 @@ def decode_from_string(encoded_partition_id):
if "query" in partition_result and "params" in partition_result["query"]:
params_pb = partition_result["query"]["params"]
if params_pb:
partition_result["query"]["params"] = {
k: _unpack_value_pb(v) for k, v in params_pb.fields.items()
}
partition_result["query"]["params"] = MessageToDict(params_pb)
Comment thread
sinhasubham marked this conversation as resolved.

return PartitionId(btid, partition_result)

Expand Down
117 changes: 117 additions & 0 deletions packages/google-cloud-spanner/samples/samples/async_snippets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
#!/usr/bin/env python

# Copyright 2026 Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""This application demonstrates how to do basic asynchronous operations using
Cloud Spanner.
"""

import asyncio
from google.cloud.spanner_v1 import AsyncClient
from google.cloud.spanner_v1 import KeySet

# [START spanner_async_create_client]
async def async_create_client(instance_id, database_id):
"""Instantiates an asynchronous Spanner client."""
spanner_client = AsyncClient()
instance = spanner_client.instance(instance_id)
database = instance.database(database_id)

print("Async Spanner client instantiated successfully.")
return database
# [END spanner_async_create_client]


# [START spanner_async_query_data]
async def async_query_data(instance_id, database_id):
"""Queries sample data from the database using asynchronous SQL."""
spanner_client = AsyncClient()
instance = spanner_client.instance(instance_id)
database = instance.database(database_id)

async with database.snapshot() as snapshot:
results = await snapshot.execute_sql(
"SELECT SingerId, AlbumId, AlbumTitle FROM Albums"
)

async for row in results:
print("SingerId: {}, AlbumId: {}, AlbumTitle: {}".format(*row))
# [END spanner_async_query_data]


# [START spanner_async_insert_data]
async def async_insert_data(instance_id, database_id):
"""Inserts sample data into the database using DML asynchronously."""
spanner_client = AsyncClient()
instance = spanner_client.instance(instance_id)
database = instance.database(database_id)

async def insert_singers(transaction):
dml = (
"INSERT INTO Singers (SingerId, FirstName, LastName) VALUES "
"(12, 'Melissa', 'Garcia'), "
"(13, 'Russell', 'Morales')"
)
await transaction.execute_update(dml)

await database.run_in_transaction(insert_singers)
print("Async DML Insert transaction complete.")
# [END spanner_async_insert_data]


# [START spanner_async_read_write_transaction]
async def async_read_write_transaction(instance_id, database_id):
"""Performs an asynchronous read-write transaction."""
spanner_client = AsyncClient()
instance = spanner_client.instance(instance_id)
database = instance.database(database_id)

async def update_singer_lastname(transaction):
# Retrieve current name
results = await transaction.execute_sql(
"SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId = 12"
)
async for row in results:
print("Before Update - SingerId: {}, FirstName: {}, LastName: {}".format(*row))

# Update LastName
await transaction.execute_update(
"UPDATE Singers SET LastName = 'Jackson' WHERE SingerId = 12"
)

await database.run_in_transaction(update_singer_lastname)
print("Async read-write transaction complete.")
# [END spanner_async_read_write_transaction]


# [START spanner_async_read_only_transaction]
async def async_read_only_transaction(instance_id, database_id):
"""Performs an asynchronous read-only transaction."""
spanner_client = AsyncClient()
instance = spanner_client.instance(instance_id)
database = instance.database(database_id)

async with database.snapshot() as snapshot:
# Execute a read using standard KeySet
keyset = KeySet(all_=True)
results = await snapshot.read(
table="Singers",
columns=("SingerId", "FirstName", "LastName"),
keyset=keyset,
)

async for row in results:
print("Read Row - SingerId: {}, FirstName: {}, LastName: {}".format(*row))
# [END spanner_async_read_only_transaction]
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Copyright 2026 Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest
import async_snippets

@pytest.fixture(scope="module")
def database_ddl():
"""DDL statements to set up the database for testing async snippets."""
return [
"""CREATE TABLE Singers (
SingerId INT64 NOT NULL,
FirstName STRING(1024),
LastName STRING(1024),
SingerInfo BYTES(MAX)
) PRIMARY KEY (SingerId)""",
"""CREATE TABLE Albums (
SingerId INT64 NOT NULL,
AlbumId INT64 NOT NULL,
AlbumTitle STRING(MAX)
) PRIMARY KEY (SingerId, AlbumId),
INTERLEAVE IN PARENT Singers ON DELETE CASCADE"""
]


@pytest.mark.asyncio
async def test_async_snippets_flow(capsys, instance_id, sample_database):
# 1. Test Async Spanner Client Creation
db = await async_snippets.async_create_client(instance_id, sample_database.database_id)
assert db is not None
out, _ = capsys.readouterr()
assert "Async Spanner client instantiated successfully." in out

# 2. Test Async DML Insert
await async_snippets.async_insert_data(instance_id, sample_database.database_id)
out, _ = capsys.readouterr()
assert "Async DML Insert transaction complete." in out

# 3. Seed additional albums data via sync batch write for query testing
with sample_database.batch() as batch:
batch.insert(
table="Albums",
columns=("SingerId", "AlbumId", "AlbumTitle"),
values=[
(12, 1, "Total Junk"),
(13, 2, "Go, Go, Go"),
],
)

# 4. Test Async Query Data
await async_snippets.async_query_data(instance_id, sample_database.database_id)
out, _ = capsys.readouterr()
assert "SingerId: 12, AlbumId: 1, AlbumTitle: Total Junk" in out
assert "SingerId: 13, AlbumId: 2, AlbumTitle: Go, Go, Go" in out

# 5. Test Async Read-Write Transaction
await async_snippets.async_read_write_transaction(instance_id, sample_database.database_id)
out, _ = capsys.readouterr()
assert "Before Update - SingerId: 12, FirstName: Melissa, LastName: Garcia" in out
assert "Async read-write transaction complete." in out

# 6. Test Async Read-Only Transaction
await async_snippets.async_read_only_transaction(instance_id, sample_database.database_id)
out, _ = capsys.readouterr()
assert "Read Row - SingerId: 12, FirstName: Melissa, LastName: Jackson" in out
assert "Read Row - SingerId: 13, FirstName: Russell, LastName: Morales" in out
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# Copyright 2024 Google LLC All rights reserved.
#
# Copyright 2026 Google LLC All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# Copyright 2024 Google LLC All rights reserved.
#
# Copyright 2026 Google LLC All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand Down
Loading