From 69ff5b6f26387125dd2920e66773a6486166a119 Mon Sep 17 00:00:00 2001 From: v-pratap Date: Thu, 27 Nov 2025 06:32:32 +0000 Subject: [PATCH 01/15] fix(storage): gRPC misuse causing crashes due to concurrent writes from Flush() and Write() --- .../async/writer_connection_resumed.cc | 27 +++-- .../async/writer_connection_resumed_test.cc | 105 ++++++++++++++++-- 2 files changed, 111 insertions(+), 21 deletions(-) diff --git a/google/cloud/storage/internal/async/writer_connection_resumed.cc b/google/cloud/storage/internal/async/writer_connection_resumed.cc index 858b383173d9a..6dd13b73e2999 100644 --- a/google/cloud/storage/internal/async/writer_connection_resumed.cc +++ b/google/cloud/storage/internal/async/writer_connection_resumed.cc @@ -191,6 +191,7 @@ class AsyncWriterConnectionResumedState } void WriteLoop(std::unique_lock lk) { + if (writing_) return; // Determine if there's data left to write *before* potentially finalizing. writing_ = write_offset_ < resend_buffer_.size(); @@ -212,7 +213,8 @@ class AsyncWriterConnectionResumedState } // If not finalizing, check if an empty flush is needed. if (flush_) { - // Pass empty payload to FlushStep + writing_ = true; + // Pass empty payload to FlushStep return FlushStep(std::move(lk), absl::Cord{}); } @@ -263,8 +265,10 @@ class AsyncWriterConnectionResumedState auto impl = Impl(lk); lk.unlock(); impl->Query().then([this, result, w = WeakFromThis()](auto f) { - SetFlushed(std::unique_lock(mu_), std::move(result)); - if (auto self = w.lock()) return self->OnQuery(f.get()); + auto self = w.lock(); + if (!self) return; + self->OnQuery(f.get()); + self->SetFlushed(std::unique_lock(self->mu_), std::move(result)); }); } @@ -302,8 +306,9 @@ class AsyncWriterConnectionResumedState buffer_offset_ = persisted_size; write_offset_ -= static_cast(n); // If the buffer is small enough, collect all the handlers to notify them. - auto const handlers = ClearHandlersIfEmpty(lk); - WriteLoop(std::move(lk)); + auto const handlers = ClearHandlersIfEmpty(lk); + writing_ = false; + StartWriting(std::move(lk)); // The notifications are deferred until the lock is released, as they might // call back and try to acquire the lock. for (auto const& h : handlers) { @@ -325,7 +330,8 @@ class AsyncWriterConnectionResumedState if (!result.ok()) return Resume(std::move(result)); std::unique_lock lk(mu_); write_offset_ += write_size; - return WriteLoop(std::move(lk)); + writing_ = false; + return StartWriting(std::move(lk)); } void Resume(Status const& s) { @@ -353,7 +359,8 @@ class AsyncWriterConnectionResumedState bool was_finalizing; { std::unique_lock lk(mu_); - was_finalizing = finalizing_; + was_finalizing = finalizing_; + writing_ = false; if (!s.ok() && cancelled_) { return SetError(std::move(lk), std::move(s)); } @@ -471,10 +478,6 @@ class AsyncWriterConnectionResumedState // lock. for (auto& h : handlers) h->Execute(Status{}); flushed.set_value(result); - // Restart the write loop ONLY if we are not already finalizing. - // If finalizing_ is true, the completion will be handled by OnFinalize. - std::unique_lock loop_lk(mu_); - if (!finalizing_) WriteLoop(std::move(loop_lk)); } void SetError(std::unique_lock lk, Status const& status) { @@ -600,7 +603,7 @@ class AsyncWriterConnectionResumedState // - A Flush() call that returns an unsatisified future until the buffer is // small enough. std::vector> flush_handlers_; - + // True if the writing loop is activate. bool writing_ = false; diff --git a/google/cloud/storage/internal/async/writer_connection_resumed_test.cc b/google/cloud/storage/internal/async/writer_connection_resumed_test.cc index 66735e8cc877b..4fb39ca4bd3d8 100644 --- a/google/cloud/storage/internal/async/writer_connection_resumed_test.cc +++ b/google/cloud/storage/internal/async/writer_connection_resumed_test.cc @@ -22,6 +22,8 @@ #include "google/cloud/testing_util/status_matchers.h" #include #include +#include +#include namespace google { namespace cloud { @@ -170,7 +172,7 @@ TEST(WriterConnectionResumed, FlushEmpty) { auto mock = std::make_unique(); EXPECT_CALL(*mock, PersistedState) .WillRepeatedly(Return(MakePersistedState(0))); - EXPECT_CALL(*mock, Flush).WillOnce([&](auto const& p) { + EXPECT_CALL(*mock, Flush).WillRepeatedly([&](auto const& p) { EXPECT_TRUE(p.payload().empty()); return sequencer.PushBack("Flush").then([](auto f) { if (!f.get()) return TransientError(); @@ -214,13 +216,21 @@ TEST(WriteConnectionResumed, FlushNonEmpty) { EXPECT_CALL(*mock, PersistedState) .WillRepeatedly(Return(MakePersistedState(0))); - EXPECT_CALL(*mock, Flush).WillOnce([&](auto const& p) { - EXPECT_EQ(p.payload(), payload.payload()); - return sequencer.PushBack("Flush").then([](auto f) { - if (!f.get()) return TransientError(); - return Status{}; - }); - }); + EXPECT_CALL(*mock, Flush) + .WillOnce([&](auto const& p) { + EXPECT_EQ(p.payload(), payload.payload()); + return sequencer.PushBack("Flush").then([](auto f) { + if (!f.get()) return TransientError(); + return Status{}; + }); + }) + .WillOnce([&](auto const& p) { + EXPECT_TRUE(p.payload().empty()); + return sequencer.PushBack("Flush").then([](auto f) { + if (!f.get()) return TransientError(); + return Status{}; + }); + }); EXPECT_CALL(*mock, Query).WillOnce([&]() { return sequencer.PushBack("Query").then( [](auto f) -> StatusOr { @@ -394,6 +404,83 @@ TEST(WriteConnectionResumed, ResumeUsesAppendObjectSpecFromInitialRequest) { "projects/_/buckets/test-bucket"); } +TEST(WriteConnectionResumed, NoConcurrentWritesWhenFlushAndWriteRace) { + AsyncSequencer sequencer; + auto mock = std::make_unique(); + auto initial_request = google::storage::v2::BidiWriteObjectRequest{}; + auto first_response = google::storage::v2::BidiWriteObjectResponse{}; + + EXPECT_CALL(*mock, PersistedState) + .WillRepeatedly(Return(MakePersistedState(0))); + EXPECT_CALL(*mock, Flush(_)).WillRepeatedly([&](auto) { + return sequencer.PushBack("Flush").then([](auto f) { + if (!f.get()) return TransientError(); + return Status{}; + }); + }); + EXPECT_CALL(*mock, Query).WillOnce([&]() { + return sequencer.PushBack("Query").then([](auto f) -> StatusOr { + if (!f.get()) return TransientError(); + return 0; + }); + }); + + // Make Write detect concurrent invocations. If two writes run concurrently + // the compare_exchange will fail and the test will fail. + std::atomic in_write{false}; + EXPECT_CALL(*mock, Write(_)) + .WillRepeatedly([&](auto) { + bool expected = false; + EXPECT_TRUE(in_write.compare_exchange_strong(expected, true)); + // Simulate some work that allows a concurrent write to attempt to run. + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + in_write.store(false); + return make_ready_future(Status{}); + }); + + MockFactory mock_factory; + EXPECT_CALL(mock_factory, Call).Times(0); + + auto connection = MakeWriterConnectionResumed( + mock_factory.AsStdFunction(), std::move(mock), initial_request, nullptr, + first_response, Options{}); + + // Start a flush which will call impl->Flush() and block. + auto flush_future = connection->Flush({}); + // Allow the Flush to complete, this will schedule a Query (but Query will + // remain blocked until we pop it). + auto next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Flush"); + next.first.set_value(true); + + // Immediately perform a user Write after the flush completed but before + // Query completes. This can race with the OnQuery-driven write. + auto write_future = connection->Write(TestPayload(1024)); + + // Now allow the Query to complete; OnQuery may schedule a write. + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Query"); + next.first.set_value(true); + + // Wait for both futures to complete with a timeout to avoid indefinite hang. + auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(2); + while (!write_future.is_ready() && std::chrono::steady_clock::now() < deadline) { + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + } + deadline = std::chrono::steady_clock::now() + std::chrono::seconds(2); + while (!flush_future.is_ready() && std::chrono::steady_clock::now() < deadline) { + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + } + + ASSERT_TRUE(write_future.is_ready()); + ASSERT_TRUE(flush_future.is_ready()); + + // Both futures should complete successfully. + EXPECT_THAT(write_future.get(), StatusIs(StatusCode::kOk)); + EXPECT_THAT(flush_future.get(), StatusIs(StatusCode::kOk)); +} + + TEST(WriteConnectionResumed, WriteHandleAssignmentAfterResume) { struct { bool use_write_object_spec; @@ -463,4 +550,4 @@ TEST(WriteConnectionResumed, WriteHandleAssignmentAfterResume) { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage_internal } // namespace cloud -} // namespace google +} // namespace google \ No newline at end of file From ea3e1b9460a3d00dc17e0e18dc765e80e8ba0540 Mon Sep 17 00:00:00 2001 From: v-pratap Date: Fri, 16 Jan 2026 14:27:59 +0000 Subject: [PATCH 02/15] add state variable to prevent race condition --- .../async/writer_connection_resumed.cc | 59 +++++++++++-------- 1 file changed, 33 insertions(+), 26 deletions(-) diff --git a/google/cloud/storage/internal/async/writer_connection_resumed.cc b/google/cloud/storage/internal/async/writer_connection_resumed.cc index 6dd13b73e2999..958b5632ad1f1 100644 --- a/google/cloud/storage/internal/async/writer_connection_resumed.cc +++ b/google/cloud/storage/internal/async/writer_connection_resumed.cc @@ -186,17 +186,19 @@ class AsyncWriterConnectionResumedState } void StartWriting(std::unique_lock lk) { - if (writing_) return; + if (state_ != State::kIdle) return; WriteLoop(std::move(lk)); } void WriteLoop(std::unique_lock lk) { - if (writing_) return; + if (state_ != State::kIdle) return; + // Determine if there's data left to write *before* potentially finalizing. - writing_ = write_offset_ < resend_buffer_.size(); + auto const has_data = write_offset_ < resend_buffer_.size(); // If we are writing data, continue doing so. - if (writing_) { + if (has_data) { + state_ = State::kWriting; // Still data to write, determine the next chunk. auto const n = resend_buffer_.size() - write_offset_; auto payload = resend_buffer_.Subcord(write_offset_, n); @@ -204,7 +206,7 @@ class AsyncWriterConnectionResumedState return WriteStep(std::move(lk), std::move(payload)); } - // No data left to write (writing_ is false). + // No data left to write. // Check if we need to finalize (only if not already writing data AND not // already finalizing). if (finalize_ && !finalizing_) { @@ -213,23 +215,24 @@ class AsyncWriterConnectionResumedState } // If not finalizing, check if an empty flush is needed. if (flush_) { - writing_ = true; - // Pass empty payload to FlushStep + state_ = State::kWriting; + // Pass empty payload to FlushStep return FlushStep(std::move(lk), absl::Cord{}); } // No data to write, not finalizing, not flushing. The loop can stop. - // writing_ is already false. + state_ = State::kIdle; } // FinalizeStep is now called only when all data in resend_buffer_ is written. void FinalizeStep(std::unique_lock lk) { // Check *under lock* if we are already finalizing. - if (finalizing_) { + if (finalizing_ || state_ != State::kIdle) { // If another thread initiated FinalizeStep concurrently, just return. return; } // Mark that we are starting the finalization process. + state_ = State::kWriting; finalizing_ = true; auto impl = Impl(lk); lk.unlock(); @@ -306,8 +309,8 @@ class AsyncWriterConnectionResumedState buffer_offset_ = persisted_size; write_offset_ -= static_cast(n); // If the buffer is small enough, collect all the handlers to notify them. - auto const handlers = ClearHandlersIfEmpty(lk); - writing_ = false; + auto const handlers = ClearHandlersIfEmpty(lk); + state_ = State::kIdle; StartWriting(std::move(lk)); // The notifications are deferred until the lock is released, as they might // call back and try to acquire the lock. @@ -330,7 +333,7 @@ class AsyncWriterConnectionResumedState if (!result.ok()) return Resume(std::move(result)); std::unique_lock lk(mu_); write_offset_ += write_size; - writing_ = false; + state_ = State::kIdle; return StartWriting(std::move(lk)); } @@ -349,8 +352,9 @@ class AsyncWriterConnectionResumedState } // Include write_handle to enable fast resume instead of slow // takeover. Without handle, server performs full state validation. - if (latest_write_handle_) { - *append_object_spec.mutable_write_handle() = *latest_write_handle_; + if (first_response_.has_write_handle()) { + *append_object_spec.mutable_write_handle() = + first_response_.write_handle(); } append_object_spec.set_generation(first_response_.resource().generation()); ApplyWriteRedirectErrors(append_object_spec, std::move(proto_status)); @@ -359,9 +363,11 @@ class AsyncWriterConnectionResumedState bool was_finalizing; { std::unique_lock lk(mu_); - was_finalizing = finalizing_; - writing_ = false; + if (state_ == State::kResuming) return; + was_finalizing = finalizing_; + state_ = State::kResuming; if (!s.ok() && cancelled_) { + state_ = State::kIdle; return SetError(std::move(lk), std::move(s)); } } @@ -376,16 +382,13 @@ class AsyncWriterConnectionResumedState void OnResume(Status const& original_status, bool was_finalizing, StatusOr res) { std::unique_lock lk(mu_); - // Update write_handle from any resume response that contains it. - if (res && res->first_response.has_write_handle()) { - latest_write_handle_ = res->first_response.write_handle(); - } if (was_finalizing) { // If resuming due to a finalization error, we *must* complete the // finalized_ promise now, based on the resume attempt's outcome. if (!res) { // The resume attempt itself failed. Use that error. + state_ = State::kIdle; return SetError(std::move(lk), std::move(res).status()); } // Resume attempt succeeded, check the persisted state. @@ -401,12 +404,14 @@ class AsyncWriterConnectionResumedState // Use the original status that triggered the resume. Reset finalizing_ // before setting the error, as the attempt is now over. finalizing_ = false; + state_ = State::kIdle; return SetError(std::move(lk), std::move(original_status)); } // Resume was *not* triggered by finalization failure. if (!res) { // Regular resume attempt failed. + state_ = State::kIdle; return SetError(std::move(lk), std::move(res).status()); } // Regular resume attempt succeeded. Check state. @@ -434,7 +439,7 @@ class AsyncWriterConnectionResumedState void SetFinalized(std::unique_lock lk, google::storage::v2::Object object) { resend_buffer_.Clear(); - writing_ = false; + state_ = State::kIdle; finalize_ = false; finalizing_ = false; // Reset finalizing flag flush_ = false; @@ -482,7 +487,7 @@ class AsyncWriterConnectionResumedState void SetError(std::unique_lock lk, Status const& status) { resume_status_ = status; - writing_ = false; + state_ = State::kIdle; finalize_ = false; finalizing_ = false; // Reset finalizing flag flush_ = false; @@ -605,7 +610,12 @@ class AsyncWriterConnectionResumedState std::vector> flush_handlers_; // True if the writing loop is activate. - bool writing_ = false; + enum class State { + kIdle, + kWriting, + kResuming, + }; + State state_ = State::kIdle; // True if cancelled, in which case any RPC failures are final. bool cancelled_ = false; @@ -615,9 +625,6 @@ class AsyncWriterConnectionResumedState // Tracks if the final promise (`finalized_`) has been completed. bool finalized_promise_completed_ = false; - - // Track the latest write handle seen in responses. - absl::optional latest_write_handle_; }; /** From 02c3c513b60f35a0c786cb6202fa698d6f4e8acd Mon Sep 17 00:00:00 2001 From: v-pratap Date: Fri, 16 Jan 2026 14:37:05 +0000 Subject: [PATCH 03/15] fix the version --- .../storage/internal/async/writer_connection_resumed.cc | 7 ------- 1 file changed, 7 deletions(-) diff --git a/google/cloud/storage/internal/async/writer_connection_resumed.cc b/google/cloud/storage/internal/async/writer_connection_resumed.cc index 958b5632ad1f1..57b938bc5584b 100644 --- a/google/cloud/storage/internal/async/writer_connection_resumed.cc +++ b/google/cloud/storage/internal/async/writer_connection_resumed.cc @@ -82,13 +82,6 @@ class AsyncWriterConnectionResumedState } else { buffer_offset_ = absl::get(state); } - if (first_response_.has_write_handle()) { - latest_write_handle_ = first_response_.write_handle(); - } else if (initial_request_.has_append_object_spec() && - initial_request_.append_object_spec().has_write_handle()) { - latest_write_handle_ = - initial_request_.append_object_spec().write_handle(); - } } void Cancel() { From 62e4cede5b91f749fe3264bc4618d51bde718a02 Mon Sep 17 00:00:00 2001 From: v-pratap Date: Fri, 16 Jan 2026 15:08:34 +0000 Subject: [PATCH 04/15] resolved chris comments --- .../async/writer_connection_resumed.cc | 8 +- .../tests/async_client_integration_test.cc | 1266 ++++------------- 2 files changed, 294 insertions(+), 980 deletions(-) diff --git a/google/cloud/storage/internal/async/writer_connection_resumed.cc b/google/cloud/storage/internal/async/writer_connection_resumed.cc index 57b938bc5584b..dab7d9a3e5579 100644 --- a/google/cloud/storage/internal/async/writer_connection_resumed.cc +++ b/google/cloud/storage/internal/async/writer_connection_resumed.cc @@ -179,7 +179,6 @@ class AsyncWriterConnectionResumedState } void StartWriting(std::unique_lock lk) { - if (state_ != State::kIdle) return; WriteLoop(std::move(lk)); } @@ -358,11 +357,11 @@ class AsyncWriterConnectionResumedState std::unique_lock lk(mu_); if (state_ == State::kResuming) return; was_finalizing = finalizing_; - state_ = State::kResuming; if (!s.ok() && cancelled_) { state_ = State::kIdle; return SetError(std::move(lk), std::move(s)); } + state_ = State::kResuming; } // Pass the original status `s` and `was_finalizing` to the callback. factory_(std::move(request)) @@ -381,7 +380,6 @@ class AsyncWriterConnectionResumedState // finalized_ promise now, based on the resume attempt's outcome. if (!res) { // The resume attempt itself failed. Use that error. - state_ = State::kIdle; return SetError(std::move(lk), std::move(res).status()); } // Resume attempt succeeded, check the persisted state. @@ -397,14 +395,12 @@ class AsyncWriterConnectionResumedState // Use the original status that triggered the resume. Reset finalizing_ // before setting the error, as the attempt is now over. finalizing_ = false; - state_ = State::kIdle; return SetError(std::move(lk), std::move(original_status)); } // Resume was *not* triggered by finalization failure. if (!res) { // Regular resume attempt failed. - state_ = State::kIdle; return SetError(std::move(lk), std::move(res).status()); } // Regular resume attempt succeeded. Check state. @@ -601,7 +597,7 @@ class AsyncWriterConnectionResumedState // - A Flush() call that returns an unsatisified future until the buffer is // small enough. std::vector> flush_handlers_; - + // True if the writing loop is activate. enum class State { kIdle, diff --git a/google/cloud/storage/tests/async_client_integration_test.cc b/google/cloud/storage/tests/async_client_integration_test.cc index 7424a2de485d4..cd06954f51334 100644 --- a/google/cloud/storage/tests/async_client_integration_test.cc +++ b/google/cloud/storage/tests/async_client_integration_test.cc @@ -17,8 +17,8 @@ #include "google/cloud/storage/async/bucket_name.h" #include "google/cloud/storage/async/client.h" #include "google/cloud/storage/async/idempotency_policy.h" -#include "google/cloud/storage/async/options.h" #include "google/cloud/storage/async/read_all.h" +#include "google/cloud/opentelemetry_options.h" #include "google/cloud/storage/grpc_plugin.h" #include "google/cloud/storage/testing/storage_integration_test.h" #include "google/cloud/grpc_options.h" @@ -32,6 +32,8 @@ #include #include #include +#include +#include #include namespace google { @@ -52,15 +54,7 @@ using ::testing::VariantWith; class AsyncClientIntegrationTest : public google::cloud::storage::testing::StorageIntegrationTest { - protected: - void SetUp() override { - bucket_name_ = - GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_BUCKET_NAME").value_or(""); - ASSERT_THAT(bucket_name_, Not(IsEmpty())) - << "GOOGLE_CLOUD_CPP_STORAGE_TEST_BUCKET_NAME is not set"; - } - - std::string const& bucket_name() const { return bucket_name_; } +protected: using google::cloud::storage::testing::StorageIntegrationTest:: ScheduleForDelete; @@ -71,993 +65,317 @@ class AsyncClientIntegrationTest .set_generation(object.generation())); } - private: +private: std::string bucket_name_; }; -auto TestOptions() { - // Disable metrics in the test, they just make the logs harder to grok. - return Options{} - .set(false) - .set(1) - .set(TracingOptions().SetOptions( - "truncate_string_field_longer_than=2048")); -} - -auto AlwaysRetry() { - return TestOptions().set( - MakeAlwaysRetryIdempotencyPolicy); -} +namespace gcs = ::google::cloud::storage; -TEST_F(AsyncClientIntegrationTest, ObjectCRUD) { - auto async = AsyncClient(TestOptions()); - auto object_name = MakeRandomObjectName(); +// auto AlwaysRetry() { +// return google::cloud::Options{}.set( +// MakeAlwaysRetryIdempotencyPolicy); +// } - auto insert = async - .InsertObject(BucketName(bucket_name()), object_name, - LoremIpsum(), AlwaysRetry()) - .get(); - ASSERT_STATUS_OK(insert); - ScheduleForDelete(*insert); - - auto full0 = async.ReadObjectRange(BucketName(bucket_name()), object_name, 0, - LoremIpsum().size()); - auto full1 = async.ReadObjectRange(BucketName(bucket_name()), object_name, 0, - LoremIpsum().size()); - auto partial0 = async.ReadObjectRange(BucketName(bucket_name()), object_name, - 2, LoremIpsum().size()); - auto partial1 = async.ReadObjectRange(BucketName(bucket_name()), object_name, - 2, LoremIpsum().size()); - - for (auto* p : {&full1, &full0}) { - auto response = p->get(); - ASSERT_STATUS_OK(response); - auto contents = response->contents(); - auto const full = std::accumulate(contents.begin(), contents.end(), - std::string{}, [](auto a, auto b) { - a += std::string(b); - return a; - }); - EXPECT_EQ(full, LoremIpsum()); +google::cloud::Options MakeOptions(google::cloud::Options opts) { + auto fallback = google::cloud::Options{}; + if (auto v = google::cloud::internal::GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_GRPC_ENDPOINT")) { + fallback.set(*v); } - for (auto* p : {&partial1, &partial0}) { - auto response = p->get(); - ASSERT_STATUS_OK(response); - auto contents = response->contents(); - auto const partial = std::accumulate(contents.begin(), contents.end(), - std::string{}, [](auto a, auto b) { - a += std::string(b); - return a; - }); - EXPECT_EQ(partial, LoremIpsum().substr(2)); + if (auto v = google::cloud::internal::GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_JSON_ENDPOINT")) { + fallback.set(*v); } - - auto status = async - .DeleteObject(BucketName(bucket_name()), object_name, - insert->generation()) - .get(); - EXPECT_STATUS_OK(status); - - auto request = google::storage::v2::ReadObjectRequest{}; - request.set_bucket(insert->bucket()); - request.set_object(insert->name()); - request.set_generation(insert->generation()); - auto head = async.ReadObjectRange(request, /*offset=*/0, /*limit=*/1).get(); - EXPECT_THAT(head, StatusIs(StatusCode::kNotFound)); -} - -TEST_F(AsyncClientIntegrationTest, ComposeObject) { - auto async = AsyncClient(TestOptions()); - auto o1 = MakeRandomObjectName(); - auto o2 = MakeRandomObjectName(); - auto destination = MakeRandomObjectName(); - - auto insert1 = async.InsertObject(BucketName(bucket_name()), o1, LoremIpsum(), - AlwaysRetry()); - auto insert2 = async.InsertObject(BucketName(bucket_name()), o2, LoremIpsum(), - AlwaysRetry()); - std::vector> inserted{insert1.get(), - insert2.get()}; - for (auto const& insert : inserted) { - ASSERT_STATUS_OK(insert); - ScheduleForDelete(*insert); + if (auto v = google::cloud::internal::GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_AUTHORITY")) { + fallback.set(*v); } - std::vector sources; - std::transform(inserted.begin(), inserted.end(), std::back_inserter(sources), - [](auto const& o) { - google::storage::v2::ComposeObjectRequest::SourceObject r; - r.set_name(o->name()); - r.set_generation(o->generation()); - return r; - }); - auto pending = async.ComposeObject(BucketName(bucket_name()), destination, - std::move(sources)); - auto const composed = pending.get(); - EXPECT_STATUS_OK(composed); - ScheduleForDelete(*composed); - - auto read = async - .ReadObjectRange(BucketName(bucket_name()), destination, 0, - 2 * LoremIpsum().size()) - .get(); - ASSERT_STATUS_OK(read); - auto contents = read->contents(); - auto const full_contents = std::accumulate(contents.begin(), contents.end(), - std::string{}, [](auto a, auto b) { - a += std::string(b); - return a; - }); - EXPECT_EQ(full_contents, LoremIpsum() + LoremIpsum()); - EXPECT_THAT(read->metadata(), Optional(IsProtoEqual(*composed))); -} - -TEST_F(AsyncClientIntegrationTest, StreamingRead) { - auto async = AsyncClient(TestOptions()); - auto object_name = MakeRandomObjectName(); - // Create a relatively large object so the streaming read makes sense. We - // aim for something around 5MiB, enough for 3 `Read()` calls. - auto constexpr kLineSize = 64; - auto constexpr kLineCount = 5 * 1024 * 1024 / kLineSize; - auto const block = MakeRandomData(kLineSize); - std::vector insert_data(kLineCount); - std::generate(insert_data.begin(), insert_data.end(), [&, n = 0]() mutable { - return std::to_string(++n) + ": " + block; - }); - auto const expected_size = std::accumulate( - insert_data.begin(), insert_data.end(), static_cast(0), - [](auto a, auto const& b) { return a + b.size(); }); - - auto insert = async - .InsertObject(BucketName(bucket_name()), object_name, - insert_data, AlwaysRetry()) - .get(); - ASSERT_STATUS_OK(insert); - ScheduleForDelete(*insert); - - ASSERT_EQ(insert->size(), expected_size); - - auto r = async.ReadObject(BucketName(bucket_name()), object_name).get(); - ASSERT_STATUS_OK(r); - AsyncReader reader; - AsyncToken token; - std::tie(reader, token) = *std::move(r); - - std::string actual; - while (token.valid()) { - auto p = reader.Read(std::move(token)).get(); - ASSERT_STATUS_OK(p); - ReadPayload payload; - std::tie(payload, token) = *std::move(p); - for (auto v : payload.contents()) actual += std::string(v); + if (auto v = google::cloud::internal::GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_TARGET_API_VERSION")) { + fallback.set(*v); } - EXPECT_EQ(actual.size(), expected_size); - auto view = absl::string_view(actual); - for (auto const& expected : insert_data) { - ASSERT_GE(view.size(), expected.size()); - ASSERT_EQ(expected, view.substr(0, expected.size())); - view.remove_prefix(expected.size()); - } - EXPECT_EQ(view, absl::string_view{}); -} - -TEST_F(AsyncClientIntegrationTest, StreamingReadRange) { - auto async = AsyncClient(TestOptions()); - auto object_name = MakeRandomObjectName(); - // Create a relatively large object so the streaming read makes sense. We - // aim for something around 5MiB, enough for 3 `Read()` calls. - auto constexpr kLineSize = 64; - auto constexpr kLineCount = 5 * 1024 * 1024 / kLineSize; - auto constexpr kReadOffset = kLineCount * kLineSize / 2; - auto const block = MakeRandomData(kLineSize - 1) + "\n"; - std::string contents; - for (int i = 0; i != kLineCount; ++i) contents += block; - auto const expected_insert_size = contents.size(); - - auto insert = async - .InsertObject(BucketName(bucket_name()), object_name, - contents, AlwaysRetry()) - .get(); - ASSERT_STATUS_OK(insert); - ScheduleForDelete(*insert); - - ASSERT_EQ(insert->size(), expected_insert_size); - - auto request = google::storage::v2::ReadObjectRequest{}; - request.set_bucket(insert->bucket()); - request.set_object(insert->name()); - request.set_generation(insert->generation()); - request.set_read_offset(kReadOffset); - auto r = async.ReadObject(request).get(); - ASSERT_STATUS_OK(r); - AsyncReader reader; - AsyncToken token; - std::tie(reader, token) = *std::move(r); - - std::string actual; - while (token.valid()) { - auto p = reader.Read(std::move(token)).get(); - ASSERT_STATUS_OK(p); - ReadPayload payload; - std::tie(payload, token) = *std::move(p); - for (auto v : payload.contents()) actual += std::string(v); - } - - EXPECT_EQ(absl::string_view(actual), - absl::string_view(contents).substr(kReadOffset)); -} - -TEST_F(AsyncClientIntegrationTest, StartUnbufferedUploadEmpty) { - auto client = AsyncClient(TestOptions()); - auto object_name = MakeRandomObjectName(); - - auto w = client.StartUnbufferedUpload(BucketName(bucket_name()), object_name) - .get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - - auto metadata = writer.Finalize(std::move(token)).get(); - ASSERT_STATUS_OK(metadata); - ScheduleForDelete(*metadata); - - EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); - EXPECT_EQ(metadata->name(), object_name); - EXPECT_EQ(metadata->size(), 0); -} - -TEST_F(AsyncClientIntegrationTest, StartUnbufferedUploadMultiple) { - auto client = AsyncClient(TestOptions()); - auto object_name = MakeRandomObjectName(); - // Create a small block to send over and over. - auto constexpr kBlockSize = 256 * 1024; - auto constexpr kBlockCount = 16; - auto const block = MakeRandomData(kBlockSize); - - auto w = client.StartUnbufferedUpload(BucketName(bucket_name()), object_name) - .get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - for (int i = 0; i != kBlockCount; ++i) { - auto p = writer.Write(std::move(token), WritePayload(block)).get(); - ASSERT_STATUS_OK(p); - token = *std::move(p); - } - - auto metadata = writer.Finalize(std::move(token)).get(); - ASSERT_STATUS_OK(metadata); - ScheduleForDelete(*metadata); - - EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); - EXPECT_EQ(metadata->name(), object_name); - EXPECT_EQ(metadata->size(), kBlockCount * kBlockSize); -} - -TEST_F(AsyncClientIntegrationTest, StartUnbufferedUploadResume) { - auto client = AsyncClient(TestOptions()); - auto object_name = MakeRandomObjectName(); - // Create a small block to send over and over. - auto constexpr kBlockSize = 256 * 1024; - auto constexpr kInitialBlockCount = 4; - auto constexpr kTotalBlockCount = 4 + kInitialBlockCount; - auto constexpr kDesiredSize = kBlockSize * kTotalBlockCount; - auto const block = MakeRandomData(kBlockSize); - - auto w = client.StartUnbufferedUpload(BucketName(bucket_name()), object_name) - .get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - - auto const upload_id = writer.UploadId(); - for (int i = 0; i != kInitialBlockCount - 1; ++i) { - auto p = writer.Write(std::move(token), WritePayload(block)).get(); - ASSERT_STATUS_OK(p); - token = *std::move(p); - } - - // Reset the existing writer and resume the upload. - writer = AsyncWriter(); - w = client.ResumeUnbufferedUpload(upload_id).get(); - ASSERT_STATUS_OK(w); - std::tie(writer, token) = *std::move(w); - ASSERT_EQ(writer.UploadId(), upload_id); - auto const persisted = writer.PersistedState(); - // We don't expect this to be larger that the total size of the object. - // Incidentally, this shows the value fits into an `int`. - ASSERT_THAT(persisted, VariantWith(Le(kDesiredSize))); - // Cast to `int` because otherwise we need to write multiple casts below. - auto offset = static_cast(absl::get(persisted)); - if (offset % kBlockSize != 0) { - auto s = block.substr(offset % kBlockSize); - auto const size = s.size(); - auto p = writer.Write(std::move(token), WritePayload(std::move(s))).get(); - ASSERT_STATUS_OK(p); - offset += static_cast(size); - token = *std::move(p); - } - while (offset < kDesiredSize) { - auto const n = std::min(kBlockSize, kDesiredSize - offset); - auto p = - writer.Write(std::move(token), WritePayload(block.substr(0, n))).get(); - ASSERT_STATUS_OK(p); - offset += n; - token = *std::move(p); - } - - auto metadata = writer.Finalize(std::move(token)).get(); - ASSERT_STATUS_OK(metadata); - ScheduleForDelete(*metadata); - - EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); - EXPECT_EQ(metadata->name(), object_name); - EXPECT_EQ(metadata->size(), kDesiredSize); -} - -TEST_F(AsyncClientIntegrationTest, StartUnbufferedUploadResumeFinalized) { - auto client = AsyncClient(TestOptions()); - auto object_name = MakeRandomObjectName(); - // Create a small block to send over and over. - auto constexpr kBlockSize = static_cast(256 * 1024); - auto const block = MakeRandomData(kBlockSize); - - auto w = client.StartUnbufferedUpload(BucketName(bucket_name()), object_name) - .get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - - auto const upload_id = writer.UploadId(); - auto metadata = writer.Finalize(std::move(token), WritePayload(block)).get(); - ASSERT_STATUS_OK(metadata); - ScheduleForDelete(*metadata); - - EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); - EXPECT_EQ(metadata->name(), object_name); - EXPECT_EQ(metadata->size(), kBlockSize); - - w = client.ResumeUnbufferedUpload(upload_id).get(); - ASSERT_STATUS_OK(w); - std::tie(writer, token) = *std::move(w); - EXPECT_FALSE(token.valid()); - EXPECT_THAT(writer.PersistedState(), VariantWith( - IsProtoEqual(*metadata))); -} - -TEST_F(AsyncClientIntegrationTest, StartBufferedUploadEmpty) { - auto client = AsyncClient(TestOptions()); - auto object_name = MakeRandomObjectName(); - - auto w = - client.StartBufferedUpload(BucketName(bucket_name()), object_name).get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - - auto metadata = writer.Finalize(std::move(token)).get(); - ASSERT_STATUS_OK(metadata); - ScheduleForDelete(*metadata); - - EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); - EXPECT_EQ(metadata->name(), object_name); - EXPECT_EQ(metadata->size(), 0); -} - -TEST_F(AsyncClientIntegrationTest, StartBufferedUploadMultiple) { - auto client = AsyncClient(TestOptions()); - auto object_name = MakeRandomObjectName(); - // Create a small block to send over and over. - auto constexpr kBlockSize = 256 * 1024; - auto constexpr kBlockCount = 16; - auto const block = MakeRandomData(kBlockSize); - - auto w = - client.StartBufferedUpload(BucketName(bucket_name()), object_name).get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - for (int i = 0; i != kBlockCount; ++i) { - auto p = writer.Write(std::move(token), WritePayload(block)).get(); - ASSERT_STATUS_OK(p); - token = *std::move(p); - } - - auto metadata = writer.Finalize(std::move(token)).get(); - ASSERT_STATUS_OK(metadata); - ScheduleForDelete(*metadata); - - EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); - EXPECT_EQ(metadata->name(), object_name); - EXPECT_EQ(metadata->size(), kBlockCount * kBlockSize); -} - -TEST_F(AsyncClientIntegrationTest, RewriteObject) { - auto async = AsyncClient(TestOptions()); - auto o1 = MakeRandomObjectName(); - auto o2 = MakeRandomObjectName(); - - auto constexpr kBlockSize = 4 * 1024 * 1024; - auto insert = async - .InsertObject(BucketName(bucket_name()), o1, - MakeRandomData(kBlockSize), AlwaysRetry()) - .get(); - ASSERT_STATUS_OK(insert); - ScheduleForDelete(*insert); - - // Start a rewrite, but limit each iteration to a small number of bytes, to - // force multiple calls. - google::storage::v2::Object metadata; - AsyncRewriter rewriter; - AsyncToken token; - google::storage::v2::RewriteObjectRequest request; - request.set_destination_name(o2); - request.set_destination_bucket(BucketName(bucket_name()).FullName()); - request.set_source_object(o1); - request.set_source_bucket(BucketName(bucket_name()).FullName()); - request.set_max_bytes_rewritten_per_call(1024 * 1024); - std::tie(rewriter, token) = async.StartRewrite(std::move(request)); - while (token.valid()) { - auto rt = rewriter.Iterate(std::move(token)).get(); - ASSERT_STATUS_OK(rt); - google::storage::v2::RewriteResponse response; - AsyncToken t; - std::tie(response, t) = *std::move(rt); - token = std::move(t); - if (!response.has_resource()) continue; - metadata = response.resource(); - ScheduleForDelete(metadata); - EXPECT_FALSE(token.valid()); - } - EXPECT_EQ(metadata.name(), o2); - EXPECT_EQ(metadata.size(), insert->size()); -} - -TEST_F(AsyncClientIntegrationTest, RewriteObjectResume) { - auto async = AsyncClient(TestOptions()); - auto destination = - GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_DESTINATION_BUCKET_NAME"); - if (!destination || destination->empty()) GTEST_SKIP(); - - auto constexpr kBlockSize = 4 * 1024 * 1024; - auto source = - async - .InsertObject(BucketName(bucket_name()), MakeRandomObjectName(), - MakeRandomData(kBlockSize), AlwaysRetry()) - .get(); - ASSERT_STATUS_OK(source); - ScheduleForDelete(*source); - - // Start a rewrite, but limit each iteration to a small number of bytes, to - // force multiple calls. - AsyncRewriter rewriter; - AsyncToken token; - auto const expected_name = MakeRandomObjectName(); - google::storage::v2::RewriteObjectRequest start_request; - start_request.set_destination_name(expected_name); - start_request.set_destination_bucket(BucketName(*destination).FullName()); - start_request.set_source_object(source->name()); - start_request.set_source_bucket(source->bucket()); - start_request.set_max_bytes_rewritten_per_call(1024 * 1024); - std::tie(rewriter, token) = async.StartRewrite(start_request); - - auto rt = rewriter.Iterate(std::move(token)).get(); - ASSERT_STATUS_OK(rt); - google::storage::v2::RewriteResponse response; - AsyncToken t; - std::tie(response, t) = *std::move(rt); - - // We want to resume a partially completed resume. Verify the first rewrite - // did not complete things. - ASSERT_THAT(response.rewrite_token(), Not(IsEmpty())); + fallback.set(false); + return google::cloud::internal::MergeOptions(std::move(opts), fallback); +} + + +google::cloud::storage::Client MakeGrpcClient(std::string project_id) { + auto options = MakeOptions(google::cloud::Options{} + .set(project_id)); + return google::cloud::storage::MakeGrpcClient(std::move(options)); +} + +google::cloud::storage_experimental::AsyncClient MakeAsyncClient(std::string project_id) { + auto options = MakeOptions(google::cloud::Options{} + .set(project_id) + .set({"rpc"}) + .set(true)); + return google::cloud::storage_experimental::AsyncClient(options); +} + +class ThreadPool { +public: + // Constructor initializes the thread pool with a given number of worker threads. + ThreadPool(size_t threads) : stop_(false) { + if (threads == 0) { + throw std::invalid_argument("Thread count cannot be zero."); + } + for (size_t i = 0; i < threads; ++i) { + workers_.emplace_back([this] { + while (true) { + std::function task; + { + // Acquire a lock on the task queue. + std::unique_lock lock(this->queue_mutex_); + + // Wait for a task to be available or for the pool to stop. + this->condition_.wait(lock, [this] { + return this->stop_ || !this->tasks_.empty(); + }); + + // If the pool is stopping and no tasks are left, exit the thread. + if (this->stop_ && this->tasks_.empty()) { + return; + } + + // Get the next task from the queue. + task = std::move(this->tasks_.front()); + this->tasks_.pop(); + } + + // Execute the task. + task(); + } + }); + } + } + + // Adds a new task to the thread pool. + template + auto enqueue(F&& f, Args&&... args) -> std::future::type> { + using return_type = typename std::result_of::type; + + // Create a packaged_task to wrap the function and its arguments. + auto task = std::make_shared>( + std::bind(std::forward(f), std::forward(args)...) + ); + + std::future res = task->get_future(); + { + // Acquire a lock on the queue and push the task. + std::unique_lock lock(queue_mutex_); + + // Don't allow enqueueing after stopping the pool. + if (stop_) { + throw std::runtime_error("enqueue on stopped ThreadPool"); + } + + tasks_.emplace([task]() { (*task)(); }); + } + + // Notify one waiting thread that a new task is available. + condition_.notify_one(); + return res; + } + + // Destructor stops all worker threads and joins them. + ~ThreadPool() { + { + std::unique_lock lock(queue_mutex_); + stop_ = true; + } + + // Notify all threads so they can wake up and exit their loops. + condition_.notify_all(); + + for (std::thread& worker : workers_) { + worker.join(); + } + } + +private: + std::vector workers_; + std::queue> tasks_; + + std::mutex queue_mutex_; + std::condition_variable condition_; + + bool stop_; +}; - google::storage::v2::RewriteObjectRequest resume_request; - resume_request.set_source_bucket(source->bucket()); - resume_request.set_source_object(source->name()); - resume_request.set_destination_bucket(BucketName(*destination).FullName()); - resume_request.set_destination_name(expected_name); - resume_request.set_max_bytes_rewritten_per_call(1024 * 1024); - std::tie(rewriter, token) = async.ResumeRewrite(std::move(resume_request)); - google::storage::v2::Object metadata; - while (token.valid()) { - auto rt = rewriter.Iterate(std::move(token)).get(); - ASSERT_STATUS_OK(rt); +void ReadRangeTask(std::shared_ptr descriptor, + std::int64_t& offset, + std::int64_t& limit) { + AsyncReader r; AsyncToken t; - std::tie(response, t) = *std::move(rt); - token = std::move(t); - if (!response.has_resource()) continue; - metadata = response.resource(); - ScheduleForDelete(metadata); - EXPECT_EQ(metadata.bucket(), BucketName(*destination).FullName()); - EXPECT_EQ(metadata.name(), expected_name); - EXPECT_EQ(metadata.size(), source->size()); - EXPECT_FALSE(token.valid()); - } -} - -TEST_F(AsyncClientIntegrationTest, InsertFailure) { - auto async = AsyncClient(TestOptions()); - - auto insert = async - .InsertObject(BucketName(MakeRandomBucketName()), - MakeRandomObjectName(), LoremIpsum()) - .get(); - ASSERT_THAT(insert, Not(IsOk())); -} - -TEST_F(AsyncClientIntegrationTest, ReadFailure) { - auto async = AsyncClient(TestOptions()); - - auto read = async - .ReadObject(BucketName(MakeRandomBucketName()), - MakeRandomObjectName()) - .get(); - // At the moment, only connectivity errors are detected before the first - // `Read()` call. Accept such failures too: - if (!read) return; - AsyncReader reader; - AsyncToken token; - std::tie(reader, token) = *std::move(read); - auto payload = ReadAll(std::move(reader), std::move(token)).get(); - ASSERT_THAT(payload, Not(IsOk())); -} - -TEST_F(AsyncClientIntegrationTest, ReadRangeFailure) { - auto async = AsyncClient(TestOptions()); - - auto payload = - async - .ReadObjectRange(BucketName(MakeRandomBucketName()), - MakeRandomObjectName(), /*offset=*/0, /*limit=*/1) - .get(); - ASSERT_THAT(payload, Not(IsOk())); -} - -TEST_F(AsyncClientIntegrationTest, StartBufferedUploadFailure) { - auto async = AsyncClient(TestOptions()); - - auto writer = async - .StartBufferedUpload(BucketName(MakeRandomBucketName()), - MakeRandomObjectName()) - .get(); - ASSERT_THAT(writer, Not(IsOk())); -} - -TEST_F(AsyncClientIntegrationTest, ResumeBufferedUploadFailure) { - auto async = AsyncClient(TestOptions()); - - auto writer = async.ResumeBufferedUpload("test-only-invalid-upload-id").get(); - ASSERT_THAT(writer, Not(IsOk())); -} - -TEST_F(AsyncClientIntegrationTest, StartUnbufferedUploadFailure) { - auto async = AsyncClient(TestOptions()); - - auto writer = async - .StartUnbufferedUpload(BucketName(MakeRandomBucketName()), - MakeRandomObjectName()) - .get(); - ASSERT_THAT(writer, Not(IsOk())); -} - -TEST_F(AsyncClientIntegrationTest, ResumeUnbufferedUploadFailure) { - auto async = AsyncClient(TestOptions()); - - auto writer = - async.ResumeUnbufferedUpload("test-only-invalid-upload-id").get(); - ASSERT_THAT(writer, Not(IsOk())); -} - -TEST_F(AsyncClientIntegrationTest, ComposeObjectFailure) { - auto async = AsyncClient(TestOptions()); - - auto make_source = [](std::string name) { - auto source = google::storage::v2::ComposeObjectRequest::SourceObject{}; - source.set_name(std::move(name)); - return source; - }; - auto composed = - async - .ComposeObject(BucketName(bucket_name()), MakeRandomObjectName(), - {make_source(MakeRandomObjectName()), - make_source(MakeRandomObjectName())}) - .get(); - ASSERT_THAT(composed, Not(IsOk())); -} - -TEST_F(AsyncClientIntegrationTest, DeleteObjectFailure) { - auto async = AsyncClient(TestOptions()); - - auto deleted = - async.DeleteObject(BucketName(bucket_name()), MakeRandomObjectName()) - .get(); - ASSERT_THAT(deleted, Not(IsOk())); -} - -TEST_F(AsyncClientIntegrationTest, StartRewriteFailure) { - auto async = AsyncClient(TestOptions()); - - AsyncRewriter rewriter; - AsyncToken token; - std::tie(rewriter, token) = - async.StartRewrite(BucketName(bucket_name()), MakeRandomObjectName(), - BucketName(bucket_name()), MakeRandomObjectName()); - ASSERT_TRUE(token.valid()); - auto iteration = rewriter.Iterate(std::move(token)).get(); - ASSERT_THAT(iteration, Not(IsOk())); -} - -TEST_F(AsyncClientIntegrationTest, ResumeRewriteFailure) { - auto async = AsyncClient(TestOptions()); - - AsyncRewriter rewriter; - AsyncToken token; - std::tie(rewriter, token) = - async.ResumeRewrite(BucketName(bucket_name()), MakeRandomObjectName(), - BucketName(bucket_name()), MakeRandomObjectName(), - "test-only-invalid-rewrite-token"); - ASSERT_TRUE(token.valid()); - auto iteration = rewriter.Iterate(std::move(token)).get(); - ASSERT_THAT(iteration, Not(IsOk())); -} - -TEST_F(AsyncClientIntegrationTest, StartAppendableObjectUploadEmpty) { - if (!UsingEmulator()) GTEST_SKIP(); - auto async = AsyncClient(TestOptions()); - auto client = MakeIntegrationTestClient(true, TestOptions()); - auto object_name = MakeRandomObjectName(); - - auto create = - client.CreateBucket(bucket_name(), storage::BucketMetadata{} - .set_location("us-west4") - .set_storage_class("RAPID")); - if (!create && create.status().code() != StatusCode::kAlreadyExists) { - GTEST_FAIL() << "cannot create bucket: " << create.status(); - } - auto w = - async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) - .get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - - auto metadata = writer.Finalize(std::move(token)).get(); - ASSERT_STATUS_OK(metadata); - ScheduleForDelete(*metadata); - - EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); - EXPECT_EQ(metadata->name(), object_name); - EXPECT_EQ(metadata->size(), 0); -} - -TEST_F(AsyncClientIntegrationTest, StartAppendableObjectUploadMultiple) { - if (!UsingEmulator()) GTEST_SKIP(); - auto async = AsyncClient(TestOptions()); - auto client = MakeIntegrationTestClient(true, TestOptions()); - auto object_name = MakeRandomObjectName(); - // Create a small block to send over and over. - auto constexpr kBlockSize = 256 * 1024; - auto constexpr kBlockCount = 16; - auto const block = MakeRandomData(kBlockSize); - - auto create = - client.CreateBucket(bucket_name(), storage::BucketMetadata{} - .set_location("us-west4") - .set_storage_class("RAPID")); - if (!create && create.status().code() != StatusCode::kAlreadyExists) { - GTEST_FAIL() << "cannot create bucket: " << create.status(); - } - auto w = - async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) - .get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - for (int i = 0; i != kBlockCount; ++i) { - auto p = writer.Write(std::move(token), WritePayload(block)).get(); - ASSERT_STATUS_OK(p); - token = *std::move(p); - } - - auto metadata = writer.Finalize(std::move(token)).get(); - ASSERT_STATUS_OK(metadata); - ScheduleForDelete(*metadata); - - EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); - EXPECT_EQ(metadata->name(), object_name); - EXPECT_EQ(metadata->size(), kBlockCount * kBlockSize); -} - -TEST_F(AsyncClientIntegrationTest, ResumeAppendableObjectUpload) { - // Skipping the test till we get the takeover feature on testbench. - GTEST_SKIP(); - auto async = AsyncClient(TestOptions()); - auto client = MakeIntegrationTestClient(true, TestOptions()); - auto object_name = MakeRandomObjectName(); - // Create a small block to send over and over. - auto constexpr kBlockSize = 256 * 1024; - auto constexpr kInitialBlockCount = 4; - auto constexpr kTotalBlockCount = 4 + kInitialBlockCount; - auto constexpr kDesiredSize = kBlockSize * kTotalBlockCount; - auto const block = MakeRandomData(kBlockSize); - - auto create = - client.CreateBucket(bucket_name(), storage::BucketMetadata{} - .set_location("us-west4") - .set_storage_class("RAPID")); - if (!create && create.status().code() != StatusCode::kAlreadyExists) { - GTEST_FAIL() << "cannot create bucket: " << create.status(); - } - auto w = - async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) - .get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - - for (int i = 0; i != kInitialBlockCount - 1; ++i) { - auto p = writer.Write(std::move(token), WritePayload(block)).get(); - ASSERT_STATUS_OK(p); - token = *std::move(p); - } - - writer.Close(); - - // Reset the existing writer and resume the upload. - writer = AsyncWriter(); - - auto object_metadata = client.GetObjectMetadata(bucket_name(), object_name); - ASSERT_STATUS_OK(object_metadata); - auto m = *object_metadata; - auto generation = m.generation(); - - w = async - .ResumeAppendableObjectUpload(BucketName(bucket_name()), object_name, - generation) - .get(); - ASSERT_STATUS_OK(w); - std::tie(writer, token) = *std::move(w); - auto const persisted = writer.PersistedState(); - ASSERT_THAT(persisted, VariantWith(Le(kDesiredSize))); - // Cast to `int` because otherwise we need to write multiple casts below. - auto offset = static_cast(absl::get(persisted)); - if (offset % kBlockSize != 0) { - auto s = block.substr(offset % kBlockSize); - auto const size = s.size(); - auto p = writer.Write(std::move(token), WritePayload(std::move(s))).get(); - ASSERT_STATUS_OK(p); - offset += static_cast(size); - token = *std::move(p); - } - while (offset < kDesiredSize) { - auto const n = std::min(kBlockSize, kDesiredSize - offset); - auto p = - writer.Write(std::move(token), WritePayload(block.substr(0, n))).get(); - ASSERT_STATUS_OK(p); - offset += n; - token = *std::move(p); - } - - auto metadata = writer.Finalize(std::move(token)).get(); - ASSERT_STATUS_OK(metadata); - ScheduleForDelete(*metadata); - - EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); - EXPECT_EQ(metadata->name(), object_name); - EXPECT_EQ(metadata->size(), kDesiredSize); -} - -TEST_F(AsyncClientIntegrationTest, ResumeFinalizedAppendableObjectUpload) { - // Skipping the test till we get the takeover feature on testbench. - GTEST_SKIP(); - auto async = AsyncClient(TestOptions()); - auto client = MakeIntegrationTestClient(true, TestOptions()); - auto object_name = MakeRandomObjectName(); - // Create a small block to send over and over. - auto constexpr kBlockSize = static_cast(256 * 1024); + + // 1. Get the reader and token for the specified range + // MODIFIED: Call Read() directly on the descriptor object + std::tie(r, t) = descriptor->Read(offset, limit); + + // 2. Consume the entire stream for this range + while (t.valid()) { + auto read = r.Read(std::move(t)).get(); + + // ASSERT_STATUS_OK will flag the test as failed and abort this + // thread if the status is not OK. + ASSERT_STATUS_OK(read); + + ReadPayload p; + AsyncToken t_new; + std::tie(p, t_new) = *std::move(read); + t = std::move(t_new); + + // In this test, we are discarding the payload `p`, just as + // the original single-threaded loop did. + } +} + + +TEST_F(AsyncClientIntegrationTest, StartAppendableUploadEmpty) { + auto project_id = "bajajnehaa-devrel-test"; + // auto const kproject = google::cloud::Project(project_id); + + auto client = MakeGrpcClient(project_id); + + auto bucket_name = std::string{"gcs-grpc-team-fastbyte-bajajnehaa-test-us-west4"}; + auto object_name = "vaibhav-test-file-111"; + auto placement = gcs::BucketCustomPlacementConfig{{"us-west4-a"}}; + // auto hns = gcs::BucketHierarchicalNamespace{true}; + auto ubla = gcs::BucketIamConfiguration{gcs::UniformBucketLevelAccess{true, {}}, absl::nullopt}; + + auto constexpr kBlockSize = 1024*1024*10; + auto constexpr kBlockCount = 1000; auto const block = MakeRandomData(kBlockSize); - - auto create = - client.CreateBucket(bucket_name(), storage::BucketMetadata{} - .set_location("us-west4") - .set_storage_class("RAPID")); - if (!create && create.status().code() != StatusCode::kAlreadyExists) { - GTEST_FAIL() << "cannot create bucket: " << create.status(); - } - auto w = - async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) - .get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - - auto metadata = writer.Finalize(std::move(token), WritePayload(block)).get(); - ASSERT_STATUS_OK(metadata); - ScheduleForDelete(*metadata); - - EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); - EXPECT_EQ(metadata->name(), object_name); - EXPECT_EQ(metadata->size(), kBlockSize); - - auto object_metadata = client.GetObjectMetadata(bucket_name(), object_name); - ASSERT_STATUS_OK(object_metadata); - auto m = *object_metadata; - auto generation = m.generation(); - - w = async - .ResumeAppendableObjectUpload(BucketName(bucket_name()), object_name, - generation) - .get(); - ASSERT_STATUS_OK(w); - std::tie(writer, token) = *std::move(w); - EXPECT_FALSE(token.valid()); - EXPECT_THAT(writer.PersistedState(), VariantWith( - IsProtoEqual(*metadata))); -} - -TEST_F(AsyncClientIntegrationTest, ExplicitFlushAppendableObjectUpload) { - if (!UsingEmulator()) GTEST_SKIP(); - auto async = AsyncClient(TestOptions()); - auto client = MakeIntegrationTestClient(true, TestOptions()); - auto object_name = MakeRandomObjectName(); - // Create a small block to send over and over. - auto constexpr kBlockSize = static_cast(256 * 1024); - auto const block = MakeRandomData(kBlockSize); - - auto create = - client.CreateBucket(bucket_name(), storage::BucketMetadata{} - .set_location("us-west4") - .set_storage_class("RAPID")); - if (!create && create.status().code() != StatusCode::kAlreadyExists) { - GTEST_FAIL() << "cannot create bucket: " << create.status(); - } - auto w = - async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) - .get(); + auto const block2 = MakeRandomData(kBlockSize); + + auto async = MakeAsyncClient(project_id); + // auto w = async.StartBufferedUpload(BucketName(bucket_name), object_name) + // .get(); + // ASSERT_STATUS_OK(w); + auto w = async.StartAppendableObjectUpload(BucketName(bucket_name), object_name) + .get(); ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - - auto p = writer.Write(std::move(token), WritePayload(block)).get(); - ASSERT_STATUS_OK(p); - token = *std::move(p); - - // Explicitly flush the data. - auto flush_status = writer.Flush().get(); - EXPECT_STATUS_OK(flush_status); - - auto metadata = writer.Finalize(std::move(token)).get(); - ASSERT_STATUS_OK(metadata); - ScheduleForDelete(*metadata); - - EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); - EXPECT_EQ(metadata->name(), object_name); - EXPECT_EQ(metadata->size(), kBlockSize); -} - -TEST_F(AsyncClientIntegrationTest, Open) { - if (!UsingEmulator()) GTEST_SKIP(); - auto async = AsyncClient(TestOptions()); - auto client = MakeIntegrationTestClient(true, TestOptions()); - auto object_name = MakeRandomObjectName(); - auto create = client.CreateBucket( - bucket_name(), storage::BucketMetadata{}.set_location("us-west4")); - if (!create && create.status().code() != StatusCode::kAlreadyExists) { - GTEST_FAIL() << "cannot create bucket: " << create.status(); - } - - auto constexpr kSize = 8 * 1024; - auto constexpr kStride = 2 * kSize; - auto constexpr kBlockCount = 4; - auto const block = MakeRandomData(kSize); - - auto w = - async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) - .get(); - ASSERT_STATUS_OK(w); AsyncWriter writer; AsyncToken token; std::tie(writer, token) = *std::move(w); - for (int i = 0; i != kBlockCount; ++i) { + for (int i = 0; i < kBlockCount; ++i) { + std::cout << "Writing data iteration #" << i << std::endl; auto p = writer.Write(std::move(token), WritePayload(block)).get(); ASSERT_STATUS_OK(p); token = *std::move(p); } - - auto metadata = writer.Finalize(std::move(token)).get(); - ASSERT_STATUS_OK(metadata); - - auto spec = google::storage::v2::BidiReadObjectSpec{}; - spec.set_bucket(BucketName(bucket_name()).FullName()); - spec.set_object(object_name); - auto descriptor = async.Open(spec).get(); - ASSERT_STATUS_OK(descriptor); - - AsyncReader r0; - AsyncToken t0; - auto actual0 = std::string{}; - std::tie(r0, t0) = descriptor->Read(0 * kStride, kSize); - while (t0.valid()) { - auto read = r0.Read(std::move(t0)).get(); - ASSERT_STATUS_OK(read); - ReadPayload p; - AsyncToken t; - std::tie(p, t) = *std::move(read); - for (auto sv : p.contents()) actual0 += std::string(sv); - t0 = std::move(t); - } - - EXPECT_EQ(actual0.size(), kSize); - client.DeleteObject(bucket_name(), object_name, - storage::Generation(metadata->generation())); + +// auto metadata1 = writer.Finalize(std::move(token)).get(); +// ASSERT_STATUS_OK(metadata1); +// std::cout << "Request metadata: " << metadata1->generation() << std::endl; + // EXPECT_EQ(1,2); + + // auto close = writer.Close(); + + // auto object_metadata = client.GetObjectMetadata(bucket_name, object_name); + // auto m = *object_metadata; + // auto generation = m.generation(); + + // auto w1 = async.ResumeAppendableObjectUpload(BucketName(bucket_name), object_name, generation) + // .get(); + + // ASSERT_STATUS_OK(w1); + + // AsyncWriter writer1; + // AsyncToken token1; + // std::tie(writer1, token1) = *std::move(w1); + + // for (int i = 0; i < kBlockCount; ++i) { + // // std::cout << "Writing data iteration #" << i << std::endl; + // auto p = writer1.Write(std::move(token1), WritePayload(block)).get(); + // ASSERT_STATUS_OK(p); + // token1 = *std::move(p); + // } + + // // auto object_metadata1 = client.GetObjectMetadata(bucket_name, object_name); + // // auto m1 = *object_metadata1; + // // // auto generation1 = m1.generation(); + // // std::cout << "Object metadata1: " << m << std::endl; + + // auto metadata = writer1.Finalize(std::move(token1)).get(); + // ASSERT_STATUS_OK(metadata); + // // // ScheduleForDelete(*metadata); + + // EXPECT_EQ(metadata->bucket(), BucketName(bucket_name).FullName()); + // EXPECT_EQ(metadata->name()," object_name"); + // EXPECT_EQ(metadata->size(), kBlockCount * kBlockSize); + // EXPECT_EQ("dddd", "Sdfs"); + // std::cout << "Test completed successfully" << std::endl; + // client.DeleteObject(bucket_name, object_name); + +// auto spec = google::storage::v2::BidiReadObjectSpec{}; +// // std::cout << object_metadata->bucket() << "\n"; + +// // spec.set_bucket("projects/_/buckets/gcs-grpc-team-fastbyte-bajajnehaa-test-us-west4"); +// // spec.set_object(object_name); +// // auto descriptor_status = async.Open(spec).get(); +// // ASSERT_STATUS_OK(descriptor_status); +// // ObjectDescriptor descriptor = *std::move(descriptor_status); +// // auto descriptor_ptr = +// // std::make_shared(std::move(descriptor)); +// // std::shared_ptr descriptor_ptr = std::make_shared(*descriptor); + +// // --- Start of ThreadPool implementation --- + +// // 1. Initialize the ThreadPool +// // Use hardware_concurrency to get a reasonable number of threads +// size_t num_threads = std::thread::hardware_concurrency(); +// std::cout << "Starting ThreadPool with " << num_threads << " threads." << std::endl; +// ThreadPool pool(num_threads); + +// // 2. Define read parameters and storage for futures +// std::vector> futures; +// int num_reads = 1000; +// std::int64_t read_offset = 0; +// std::int64_t read_limit = 1024 * 1024 * 1024; // 1 GiB + +// std::cout << "Enqueuing " << num_reads << " read tasks..." << std::endl; + +// // 3. Enqueue all the read tasks +// // The original loop is replaced with this loop. +// for (int i = 0; i < num_reads; ++i) { +// // Pass *descriptor (the ObjectDescriptor) by value. +// // This is safe because it's a copyable wrapper. +// futures.push_back( +// pool.enqueue(ReadRangeTask, descriptor_ptr, read_offset, read_limit) +// ); +// } + +// // 4. Wait for all enqueued tasks to complete +// std::cout << "Waiting for all " << futures.size() << " read tasks to complete..." << std::endl; +// for (auto& f : futures) { +// f.get(); // This blocks until the future is ready. +// // If a task failed (e.g., via ASSERT_STATUS_OK), +// // gtest will have already flagged the failure. +// // If the task threw an exception, get() will re-throw it. +// } + +// std::cout << "All " << num_reads << " parallel read tasks completed." << std::endl; + + // --- End of ThreadPool implementation --- + + // auto actual0 = std::string{}; + // for(int i =0 ; i< 1000 ; i++){ + // std::tie(r0, t0) = descriptor->Read(0 , 1024* 1024* 1024); + // actual0 = std::string{}; + // while (t0.valid()) { + // auto read = r0.Read(std::move(t0)).get(); + // ASSERT_STATUS_OK(read); + // ReadPayload p; + // AsyncToken t; + // std::tie(p, t) = *std::move(read); + // t0 = std::move(t); + // } + // } + +// auto ans = block + block + block; + // EXPECT_EQ(1,2); } -TEST_F(AsyncClientIntegrationTest, OpenExceedMaximumRange) { - if (!UsingEmulator()) GTEST_SKIP(); - auto async = AsyncClient( - TestOptions().set(1024)); - auto client = MakeIntegrationTestClient(true, TestOptions()); - auto object_name = MakeRandomObjectName(); - - auto create = client.CreateBucket( - bucket_name(), storage::BucketMetadata{}.set_location("us-west4")); - if (!create && create.status().code() != StatusCode::kAlreadyExists) { - GTEST_FAIL() << "cannot create bucket: " << create.status(); - } - - auto constexpr kSize = 2048; - auto const block = MakeRandomData(kSize); - - auto w = - async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) - .get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - auto p = writer.Write(std::move(token), WritePayload(block)).get(); - ASSERT_STATUS_OK(p); - token = *std::move(p); - - auto metadata = writer.Finalize(std::move(token)).get(); - ASSERT_STATUS_OK(metadata); - - auto spec = google::storage::v2::BidiReadObjectSpec{}; - spec.set_bucket(BucketName(bucket_name()).FullName()); - spec.set_object(object_name); - auto descriptor = async.Open(spec).get(); - ASSERT_STATUS_OK(descriptor); - - AsyncReader r0; - AsyncToken t0; - auto actual0 = std::string{}; - std::tie(r0, t0) = descriptor->Read(0, kSize); - while (t0.valid()) { - auto read = r0.Read(std::move(t0)).get(); - ASSERT_STATUS_OK(read); - ReadPayload p; - AsyncToken t; - std::tie(p, t) = *std::move(read); - for (auto sv : p.contents()) actual0 += std::string(sv); - t0 = std::move(t); - } - - EXPECT_EQ(actual0.size(), kSize); - client.DeleteObject(bucket_name(), object_name, - storage::Generation(metadata->generation())); -} } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END @@ -1065,4 +383,4 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace cloud } // namespace google -#endif // GOOGLE_CLOUD_CPP_STORAGE_HAVE_GRPC +#endif // GOOGLE_CLOUD_CPP_STORAGE_HAVE_GRPC \ No newline at end of file From ca0af9852c39e7cc5ffe9a35d8c958c595f79a59 Mon Sep 17 00:00:00 2001 From: v-pratap Date: Fri, 16 Jan 2026 15:12:36 +0000 Subject: [PATCH 05/15] remove unwanted change --- .../tests/async_client_integration_test.cc | 1266 +++++++++++++---- 1 file changed, 974 insertions(+), 292 deletions(-) diff --git a/google/cloud/storage/tests/async_client_integration_test.cc b/google/cloud/storage/tests/async_client_integration_test.cc index cd06954f51334..7424a2de485d4 100644 --- a/google/cloud/storage/tests/async_client_integration_test.cc +++ b/google/cloud/storage/tests/async_client_integration_test.cc @@ -17,8 +17,8 @@ #include "google/cloud/storage/async/bucket_name.h" #include "google/cloud/storage/async/client.h" #include "google/cloud/storage/async/idempotency_policy.h" +#include "google/cloud/storage/async/options.h" #include "google/cloud/storage/async/read_all.h" -#include "google/cloud/opentelemetry_options.h" #include "google/cloud/storage/grpc_plugin.h" #include "google/cloud/storage/testing/storage_integration_test.h" #include "google/cloud/grpc_options.h" @@ -32,8 +32,6 @@ #include #include #include -#include -#include #include namespace google { @@ -54,7 +52,15 @@ using ::testing::VariantWith; class AsyncClientIntegrationTest : public google::cloud::storage::testing::StorageIntegrationTest { -protected: + protected: + void SetUp() override { + bucket_name_ = + GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_BUCKET_NAME").value_or(""); + ASSERT_THAT(bucket_name_, Not(IsEmpty())) + << "GOOGLE_CLOUD_CPP_STORAGE_TEST_BUCKET_NAME is not set"; + } + + std::string const& bucket_name() const { return bucket_name_; } using google::cloud::storage::testing::StorageIntegrationTest:: ScheduleForDelete; @@ -65,317 +71,993 @@ class AsyncClientIntegrationTest .set_generation(object.generation())); } -private: + private: std::string bucket_name_; }; -namespace gcs = ::google::cloud::storage; +auto TestOptions() { + // Disable metrics in the test, they just make the logs harder to grok. + return Options{} + .set(false) + .set(1) + .set(TracingOptions().SetOptions( + "truncate_string_field_longer_than=2048")); +} + +auto AlwaysRetry() { + return TestOptions().set( + MakeAlwaysRetryIdempotencyPolicy); +} -// auto AlwaysRetry() { -// return google::cloud::Options{}.set( -// MakeAlwaysRetryIdempotencyPolicy); -// } +TEST_F(AsyncClientIntegrationTest, ObjectCRUD) { + auto async = AsyncClient(TestOptions()); + auto object_name = MakeRandomObjectName(); -google::cloud::Options MakeOptions(google::cloud::Options opts) { - auto fallback = google::cloud::Options{}; - if (auto v = google::cloud::internal::GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_GRPC_ENDPOINT")) { - fallback.set(*v); + auto insert = async + .InsertObject(BucketName(bucket_name()), object_name, + LoremIpsum(), AlwaysRetry()) + .get(); + ASSERT_STATUS_OK(insert); + ScheduleForDelete(*insert); + + auto full0 = async.ReadObjectRange(BucketName(bucket_name()), object_name, 0, + LoremIpsum().size()); + auto full1 = async.ReadObjectRange(BucketName(bucket_name()), object_name, 0, + LoremIpsum().size()); + auto partial0 = async.ReadObjectRange(BucketName(bucket_name()), object_name, + 2, LoremIpsum().size()); + auto partial1 = async.ReadObjectRange(BucketName(bucket_name()), object_name, + 2, LoremIpsum().size()); + + for (auto* p : {&full1, &full0}) { + auto response = p->get(); + ASSERT_STATUS_OK(response); + auto contents = response->contents(); + auto const full = std::accumulate(contents.begin(), contents.end(), + std::string{}, [](auto a, auto b) { + a += std::string(b); + return a; + }); + EXPECT_EQ(full, LoremIpsum()); } - if (auto v = google::cloud::internal::GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_JSON_ENDPOINT")) { - fallback.set(*v); + for (auto* p : {&partial1, &partial0}) { + auto response = p->get(); + ASSERT_STATUS_OK(response); + auto contents = response->contents(); + auto const partial = std::accumulate(contents.begin(), contents.end(), + std::string{}, [](auto a, auto b) { + a += std::string(b); + return a; + }); + EXPECT_EQ(partial, LoremIpsum().substr(2)); } - if (auto v = google::cloud::internal::GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_AUTHORITY")) { - fallback.set(*v); + + auto status = async + .DeleteObject(BucketName(bucket_name()), object_name, + insert->generation()) + .get(); + EXPECT_STATUS_OK(status); + + auto request = google::storage::v2::ReadObjectRequest{}; + request.set_bucket(insert->bucket()); + request.set_object(insert->name()); + request.set_generation(insert->generation()); + auto head = async.ReadObjectRange(request, /*offset=*/0, /*limit=*/1).get(); + EXPECT_THAT(head, StatusIs(StatusCode::kNotFound)); +} + +TEST_F(AsyncClientIntegrationTest, ComposeObject) { + auto async = AsyncClient(TestOptions()); + auto o1 = MakeRandomObjectName(); + auto o2 = MakeRandomObjectName(); + auto destination = MakeRandomObjectName(); + + auto insert1 = async.InsertObject(BucketName(bucket_name()), o1, LoremIpsum(), + AlwaysRetry()); + auto insert2 = async.InsertObject(BucketName(bucket_name()), o2, LoremIpsum(), + AlwaysRetry()); + std::vector> inserted{insert1.get(), + insert2.get()}; + for (auto const& insert : inserted) { + ASSERT_STATUS_OK(insert); + ScheduleForDelete(*insert); } - if (auto v = google::cloud::internal::GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_TARGET_API_VERSION")) { - fallback.set(*v); + std::vector sources; + std::transform(inserted.begin(), inserted.end(), std::back_inserter(sources), + [](auto const& o) { + google::storage::v2::ComposeObjectRequest::SourceObject r; + r.set_name(o->name()); + r.set_generation(o->generation()); + return r; + }); + auto pending = async.ComposeObject(BucketName(bucket_name()), destination, + std::move(sources)); + auto const composed = pending.get(); + EXPECT_STATUS_OK(composed); + ScheduleForDelete(*composed); + + auto read = async + .ReadObjectRange(BucketName(bucket_name()), destination, 0, + 2 * LoremIpsum().size()) + .get(); + ASSERT_STATUS_OK(read); + auto contents = read->contents(); + auto const full_contents = std::accumulate(contents.begin(), contents.end(), + std::string{}, [](auto a, auto b) { + a += std::string(b); + return a; + }); + EXPECT_EQ(full_contents, LoremIpsum() + LoremIpsum()); + EXPECT_THAT(read->metadata(), Optional(IsProtoEqual(*composed))); +} + +TEST_F(AsyncClientIntegrationTest, StreamingRead) { + auto async = AsyncClient(TestOptions()); + auto object_name = MakeRandomObjectName(); + // Create a relatively large object so the streaming read makes sense. We + // aim for something around 5MiB, enough for 3 `Read()` calls. + auto constexpr kLineSize = 64; + auto constexpr kLineCount = 5 * 1024 * 1024 / kLineSize; + auto const block = MakeRandomData(kLineSize); + std::vector insert_data(kLineCount); + std::generate(insert_data.begin(), insert_data.end(), [&, n = 0]() mutable { + return std::to_string(++n) + ": " + block; + }); + auto const expected_size = std::accumulate( + insert_data.begin(), insert_data.end(), static_cast(0), + [](auto a, auto const& b) { return a + b.size(); }); + + auto insert = async + .InsertObject(BucketName(bucket_name()), object_name, + insert_data, AlwaysRetry()) + .get(); + ASSERT_STATUS_OK(insert); + ScheduleForDelete(*insert); + + ASSERT_EQ(insert->size(), expected_size); + + auto r = async.ReadObject(BucketName(bucket_name()), object_name).get(); + ASSERT_STATUS_OK(r); + AsyncReader reader; + AsyncToken token; + std::tie(reader, token) = *std::move(r); + + std::string actual; + while (token.valid()) { + auto p = reader.Read(std::move(token)).get(); + ASSERT_STATUS_OK(p); + ReadPayload payload; + std::tie(payload, token) = *std::move(p); + for (auto v : payload.contents()) actual += std::string(v); } - fallback.set(false); - return google::cloud::internal::MergeOptions(std::move(opts), fallback); -} - - -google::cloud::storage::Client MakeGrpcClient(std::string project_id) { - auto options = MakeOptions(google::cloud::Options{} - .set(project_id)); - return google::cloud::storage::MakeGrpcClient(std::move(options)); -} - -google::cloud::storage_experimental::AsyncClient MakeAsyncClient(std::string project_id) { - auto options = MakeOptions(google::cloud::Options{} - .set(project_id) - .set({"rpc"}) - .set(true)); - return google::cloud::storage_experimental::AsyncClient(options); -} - -class ThreadPool { -public: - // Constructor initializes the thread pool with a given number of worker threads. - ThreadPool(size_t threads) : stop_(false) { - if (threads == 0) { - throw std::invalid_argument("Thread count cannot be zero."); - } - for (size_t i = 0; i < threads; ++i) { - workers_.emplace_back([this] { - while (true) { - std::function task; - { - // Acquire a lock on the task queue. - std::unique_lock lock(this->queue_mutex_); - - // Wait for a task to be available or for the pool to stop. - this->condition_.wait(lock, [this] { - return this->stop_ || !this->tasks_.empty(); - }); - - // If the pool is stopping and no tasks are left, exit the thread. - if (this->stop_ && this->tasks_.empty()) { - return; - } - - // Get the next task from the queue. - task = std::move(this->tasks_.front()); - this->tasks_.pop(); - } - - // Execute the task. - task(); - } - }); - } - } - - // Adds a new task to the thread pool. - template - auto enqueue(F&& f, Args&&... args) -> std::future::type> { - using return_type = typename std::result_of::type; - - // Create a packaged_task to wrap the function and its arguments. - auto task = std::make_shared>( - std::bind(std::forward(f), std::forward(args)...) - ); - - std::future res = task->get_future(); - { - // Acquire a lock on the queue and push the task. - std::unique_lock lock(queue_mutex_); - - // Don't allow enqueueing after stopping the pool. - if (stop_) { - throw std::runtime_error("enqueue on stopped ThreadPool"); - } - - tasks_.emplace([task]() { (*task)(); }); - } - - // Notify one waiting thread that a new task is available. - condition_.notify_one(); - return res; - } - - // Destructor stops all worker threads and joins them. - ~ThreadPool() { - { - std::unique_lock lock(queue_mutex_); - stop_ = true; - } - - // Notify all threads so they can wake up and exit their loops. - condition_.notify_all(); - - for (std::thread& worker : workers_) { - worker.join(); - } - } - -private: - std::vector workers_; - std::queue> tasks_; - - std::mutex queue_mutex_; - std::condition_variable condition_; - - bool stop_; -}; + EXPECT_EQ(actual.size(), expected_size); + auto view = absl::string_view(actual); + for (auto const& expected : insert_data) { + ASSERT_GE(view.size(), expected.size()); + ASSERT_EQ(expected, view.substr(0, expected.size())); + view.remove_prefix(expected.size()); + } + EXPECT_EQ(view, absl::string_view{}); +} + +TEST_F(AsyncClientIntegrationTest, StreamingReadRange) { + auto async = AsyncClient(TestOptions()); + auto object_name = MakeRandomObjectName(); + // Create a relatively large object so the streaming read makes sense. We + // aim for something around 5MiB, enough for 3 `Read()` calls. + auto constexpr kLineSize = 64; + auto constexpr kLineCount = 5 * 1024 * 1024 / kLineSize; + auto constexpr kReadOffset = kLineCount * kLineSize / 2; + auto const block = MakeRandomData(kLineSize - 1) + "\n"; + std::string contents; + for (int i = 0; i != kLineCount; ++i) contents += block; + auto const expected_insert_size = contents.size(); + auto insert = async + .InsertObject(BucketName(bucket_name()), object_name, + contents, AlwaysRetry()) + .get(); + ASSERT_STATUS_OK(insert); + ScheduleForDelete(*insert); -void ReadRangeTask(std::shared_ptr descriptor, - std::int64_t& offset, - std::int64_t& limit) { - AsyncReader r; + ASSERT_EQ(insert->size(), expected_insert_size); + + auto request = google::storage::v2::ReadObjectRequest{}; + request.set_bucket(insert->bucket()); + request.set_object(insert->name()); + request.set_generation(insert->generation()); + request.set_read_offset(kReadOffset); + auto r = async.ReadObject(request).get(); + ASSERT_STATUS_OK(r); + AsyncReader reader; + AsyncToken token; + std::tie(reader, token) = *std::move(r); + + std::string actual; + while (token.valid()) { + auto p = reader.Read(std::move(token)).get(); + ASSERT_STATUS_OK(p); + ReadPayload payload; + std::tie(payload, token) = *std::move(p); + for (auto v : payload.contents()) actual += std::string(v); + } + + EXPECT_EQ(absl::string_view(actual), + absl::string_view(contents).substr(kReadOffset)); +} + +TEST_F(AsyncClientIntegrationTest, StartUnbufferedUploadEmpty) { + auto client = AsyncClient(TestOptions()); + auto object_name = MakeRandomObjectName(); + + auto w = client.StartUnbufferedUpload(BucketName(bucket_name()), object_name) + .get(); + ASSERT_STATUS_OK(w); + AsyncWriter writer; + AsyncToken token; + std::tie(writer, token) = *std::move(w); + + auto metadata = writer.Finalize(std::move(token)).get(); + ASSERT_STATUS_OK(metadata); + ScheduleForDelete(*metadata); + + EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); + EXPECT_EQ(metadata->name(), object_name); + EXPECT_EQ(metadata->size(), 0); +} + +TEST_F(AsyncClientIntegrationTest, StartUnbufferedUploadMultiple) { + auto client = AsyncClient(TestOptions()); + auto object_name = MakeRandomObjectName(); + // Create a small block to send over and over. + auto constexpr kBlockSize = 256 * 1024; + auto constexpr kBlockCount = 16; + auto const block = MakeRandomData(kBlockSize); + + auto w = client.StartUnbufferedUpload(BucketName(bucket_name()), object_name) + .get(); + ASSERT_STATUS_OK(w); + AsyncWriter writer; + AsyncToken token; + std::tie(writer, token) = *std::move(w); + for (int i = 0; i != kBlockCount; ++i) { + auto p = writer.Write(std::move(token), WritePayload(block)).get(); + ASSERT_STATUS_OK(p); + token = *std::move(p); + } + + auto metadata = writer.Finalize(std::move(token)).get(); + ASSERT_STATUS_OK(metadata); + ScheduleForDelete(*metadata); + + EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); + EXPECT_EQ(metadata->name(), object_name); + EXPECT_EQ(metadata->size(), kBlockCount * kBlockSize); +} + +TEST_F(AsyncClientIntegrationTest, StartUnbufferedUploadResume) { + auto client = AsyncClient(TestOptions()); + auto object_name = MakeRandomObjectName(); + // Create a small block to send over and over. + auto constexpr kBlockSize = 256 * 1024; + auto constexpr kInitialBlockCount = 4; + auto constexpr kTotalBlockCount = 4 + kInitialBlockCount; + auto constexpr kDesiredSize = kBlockSize * kTotalBlockCount; + auto const block = MakeRandomData(kBlockSize); + + auto w = client.StartUnbufferedUpload(BucketName(bucket_name()), object_name) + .get(); + ASSERT_STATUS_OK(w); + AsyncWriter writer; + AsyncToken token; + std::tie(writer, token) = *std::move(w); + + auto const upload_id = writer.UploadId(); + for (int i = 0; i != kInitialBlockCount - 1; ++i) { + auto p = writer.Write(std::move(token), WritePayload(block)).get(); + ASSERT_STATUS_OK(p); + token = *std::move(p); + } + + // Reset the existing writer and resume the upload. + writer = AsyncWriter(); + w = client.ResumeUnbufferedUpload(upload_id).get(); + ASSERT_STATUS_OK(w); + std::tie(writer, token) = *std::move(w); + ASSERT_EQ(writer.UploadId(), upload_id); + auto const persisted = writer.PersistedState(); + // We don't expect this to be larger that the total size of the object. + // Incidentally, this shows the value fits into an `int`. + ASSERT_THAT(persisted, VariantWith(Le(kDesiredSize))); + // Cast to `int` because otherwise we need to write multiple casts below. + auto offset = static_cast(absl::get(persisted)); + if (offset % kBlockSize != 0) { + auto s = block.substr(offset % kBlockSize); + auto const size = s.size(); + auto p = writer.Write(std::move(token), WritePayload(std::move(s))).get(); + ASSERT_STATUS_OK(p); + offset += static_cast(size); + token = *std::move(p); + } + while (offset < kDesiredSize) { + auto const n = std::min(kBlockSize, kDesiredSize - offset); + auto p = + writer.Write(std::move(token), WritePayload(block.substr(0, n))).get(); + ASSERT_STATUS_OK(p); + offset += n; + token = *std::move(p); + } + + auto metadata = writer.Finalize(std::move(token)).get(); + ASSERT_STATUS_OK(metadata); + ScheduleForDelete(*metadata); + + EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); + EXPECT_EQ(metadata->name(), object_name); + EXPECT_EQ(metadata->size(), kDesiredSize); +} + +TEST_F(AsyncClientIntegrationTest, StartUnbufferedUploadResumeFinalized) { + auto client = AsyncClient(TestOptions()); + auto object_name = MakeRandomObjectName(); + // Create a small block to send over and over. + auto constexpr kBlockSize = static_cast(256 * 1024); + auto const block = MakeRandomData(kBlockSize); + + auto w = client.StartUnbufferedUpload(BucketName(bucket_name()), object_name) + .get(); + ASSERT_STATUS_OK(w); + AsyncWriter writer; + AsyncToken token; + std::tie(writer, token) = *std::move(w); + + auto const upload_id = writer.UploadId(); + auto metadata = writer.Finalize(std::move(token), WritePayload(block)).get(); + ASSERT_STATUS_OK(metadata); + ScheduleForDelete(*metadata); + + EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); + EXPECT_EQ(metadata->name(), object_name); + EXPECT_EQ(metadata->size(), kBlockSize); + + w = client.ResumeUnbufferedUpload(upload_id).get(); + ASSERT_STATUS_OK(w); + std::tie(writer, token) = *std::move(w); + EXPECT_FALSE(token.valid()); + EXPECT_THAT(writer.PersistedState(), VariantWith( + IsProtoEqual(*metadata))); +} + +TEST_F(AsyncClientIntegrationTest, StartBufferedUploadEmpty) { + auto client = AsyncClient(TestOptions()); + auto object_name = MakeRandomObjectName(); + + auto w = + client.StartBufferedUpload(BucketName(bucket_name()), object_name).get(); + ASSERT_STATUS_OK(w); + AsyncWriter writer; + AsyncToken token; + std::tie(writer, token) = *std::move(w); + + auto metadata = writer.Finalize(std::move(token)).get(); + ASSERT_STATUS_OK(metadata); + ScheduleForDelete(*metadata); + + EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); + EXPECT_EQ(metadata->name(), object_name); + EXPECT_EQ(metadata->size(), 0); +} + +TEST_F(AsyncClientIntegrationTest, StartBufferedUploadMultiple) { + auto client = AsyncClient(TestOptions()); + auto object_name = MakeRandomObjectName(); + // Create a small block to send over and over. + auto constexpr kBlockSize = 256 * 1024; + auto constexpr kBlockCount = 16; + auto const block = MakeRandomData(kBlockSize); + + auto w = + client.StartBufferedUpload(BucketName(bucket_name()), object_name).get(); + ASSERT_STATUS_OK(w); + AsyncWriter writer; + AsyncToken token; + std::tie(writer, token) = *std::move(w); + for (int i = 0; i != kBlockCount; ++i) { + auto p = writer.Write(std::move(token), WritePayload(block)).get(); + ASSERT_STATUS_OK(p); + token = *std::move(p); + } + + auto metadata = writer.Finalize(std::move(token)).get(); + ASSERT_STATUS_OK(metadata); + ScheduleForDelete(*metadata); + + EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); + EXPECT_EQ(metadata->name(), object_name); + EXPECT_EQ(metadata->size(), kBlockCount * kBlockSize); +} + +TEST_F(AsyncClientIntegrationTest, RewriteObject) { + auto async = AsyncClient(TestOptions()); + auto o1 = MakeRandomObjectName(); + auto o2 = MakeRandomObjectName(); + + auto constexpr kBlockSize = 4 * 1024 * 1024; + auto insert = async + .InsertObject(BucketName(bucket_name()), o1, + MakeRandomData(kBlockSize), AlwaysRetry()) + .get(); + ASSERT_STATUS_OK(insert); + ScheduleForDelete(*insert); + + // Start a rewrite, but limit each iteration to a small number of bytes, to + // force multiple calls. + google::storage::v2::Object metadata; + AsyncRewriter rewriter; + AsyncToken token; + google::storage::v2::RewriteObjectRequest request; + request.set_destination_name(o2); + request.set_destination_bucket(BucketName(bucket_name()).FullName()); + request.set_source_object(o1); + request.set_source_bucket(BucketName(bucket_name()).FullName()); + request.set_max_bytes_rewritten_per_call(1024 * 1024); + std::tie(rewriter, token) = async.StartRewrite(std::move(request)); + while (token.valid()) { + auto rt = rewriter.Iterate(std::move(token)).get(); + ASSERT_STATUS_OK(rt); + google::storage::v2::RewriteResponse response; + AsyncToken t; + std::tie(response, t) = *std::move(rt); + token = std::move(t); + if (!response.has_resource()) continue; + metadata = response.resource(); + ScheduleForDelete(metadata); + EXPECT_FALSE(token.valid()); + } + EXPECT_EQ(metadata.name(), o2); + EXPECT_EQ(metadata.size(), insert->size()); +} + +TEST_F(AsyncClientIntegrationTest, RewriteObjectResume) { + auto async = AsyncClient(TestOptions()); + auto destination = + GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_DESTINATION_BUCKET_NAME"); + if (!destination || destination->empty()) GTEST_SKIP(); + + auto constexpr kBlockSize = 4 * 1024 * 1024; + auto source = + async + .InsertObject(BucketName(bucket_name()), MakeRandomObjectName(), + MakeRandomData(kBlockSize), AlwaysRetry()) + .get(); + ASSERT_STATUS_OK(source); + ScheduleForDelete(*source); + + // Start a rewrite, but limit each iteration to a small number of bytes, to + // force multiple calls. + AsyncRewriter rewriter; + AsyncToken token; + auto const expected_name = MakeRandomObjectName(); + google::storage::v2::RewriteObjectRequest start_request; + start_request.set_destination_name(expected_name); + start_request.set_destination_bucket(BucketName(*destination).FullName()); + start_request.set_source_object(source->name()); + start_request.set_source_bucket(source->bucket()); + start_request.set_max_bytes_rewritten_per_call(1024 * 1024); + std::tie(rewriter, token) = async.StartRewrite(start_request); + + auto rt = rewriter.Iterate(std::move(token)).get(); + ASSERT_STATUS_OK(rt); + google::storage::v2::RewriteResponse response; + AsyncToken t; + std::tie(response, t) = *std::move(rt); + + // We want to resume a partially completed resume. Verify the first rewrite + // did not complete things. + ASSERT_THAT(response.rewrite_token(), Not(IsEmpty())); + + google::storage::v2::RewriteObjectRequest resume_request; + resume_request.set_source_bucket(source->bucket()); + resume_request.set_source_object(source->name()); + resume_request.set_destination_bucket(BucketName(*destination).FullName()); + resume_request.set_destination_name(expected_name); + resume_request.set_max_bytes_rewritten_per_call(1024 * 1024); + std::tie(rewriter, token) = async.ResumeRewrite(std::move(resume_request)); + + google::storage::v2::Object metadata; + while (token.valid()) { + auto rt = rewriter.Iterate(std::move(token)).get(); + ASSERT_STATUS_OK(rt); AsyncToken t; - - // 1. Get the reader and token for the specified range - // MODIFIED: Call Read() directly on the descriptor object - std::tie(r, t) = descriptor->Read(offset, limit); - - // 2. Consume the entire stream for this range - while (t.valid()) { - auto read = r.Read(std::move(t)).get(); - - // ASSERT_STATUS_OK will flag the test as failed and abort this - // thread if the status is not OK. - ASSERT_STATUS_OK(read); - - ReadPayload p; - AsyncToken t_new; - std::tie(p, t_new) = *std::move(read); - t = std::move(t_new); - - // In this test, we are discarding the payload `p`, just as - // the original single-threaded loop did. - } -} - - -TEST_F(AsyncClientIntegrationTest, StartAppendableUploadEmpty) { - auto project_id = "bajajnehaa-devrel-test"; - // auto const kproject = google::cloud::Project(project_id); - - auto client = MakeGrpcClient(project_id); - - auto bucket_name = std::string{"gcs-grpc-team-fastbyte-bajajnehaa-test-us-west4"}; - auto object_name = "vaibhav-test-file-111"; - auto placement = gcs::BucketCustomPlacementConfig{{"us-west4-a"}}; - // auto hns = gcs::BucketHierarchicalNamespace{true}; - auto ubla = gcs::BucketIamConfiguration{gcs::UniformBucketLevelAccess{true, {}}, absl::nullopt}; - - auto constexpr kBlockSize = 1024*1024*10; - auto constexpr kBlockCount = 1000; + std::tie(response, t) = *std::move(rt); + token = std::move(t); + if (!response.has_resource()) continue; + metadata = response.resource(); + ScheduleForDelete(metadata); + EXPECT_EQ(metadata.bucket(), BucketName(*destination).FullName()); + EXPECT_EQ(metadata.name(), expected_name); + EXPECT_EQ(metadata.size(), source->size()); + EXPECT_FALSE(token.valid()); + } +} + +TEST_F(AsyncClientIntegrationTest, InsertFailure) { + auto async = AsyncClient(TestOptions()); + + auto insert = async + .InsertObject(BucketName(MakeRandomBucketName()), + MakeRandomObjectName(), LoremIpsum()) + .get(); + ASSERT_THAT(insert, Not(IsOk())); +} + +TEST_F(AsyncClientIntegrationTest, ReadFailure) { + auto async = AsyncClient(TestOptions()); + + auto read = async + .ReadObject(BucketName(MakeRandomBucketName()), + MakeRandomObjectName()) + .get(); + // At the moment, only connectivity errors are detected before the first + // `Read()` call. Accept such failures too: + if (!read) return; + AsyncReader reader; + AsyncToken token; + std::tie(reader, token) = *std::move(read); + auto payload = ReadAll(std::move(reader), std::move(token)).get(); + ASSERT_THAT(payload, Not(IsOk())); +} + +TEST_F(AsyncClientIntegrationTest, ReadRangeFailure) { + auto async = AsyncClient(TestOptions()); + + auto payload = + async + .ReadObjectRange(BucketName(MakeRandomBucketName()), + MakeRandomObjectName(), /*offset=*/0, /*limit=*/1) + .get(); + ASSERT_THAT(payload, Not(IsOk())); +} + +TEST_F(AsyncClientIntegrationTest, StartBufferedUploadFailure) { + auto async = AsyncClient(TestOptions()); + + auto writer = async + .StartBufferedUpload(BucketName(MakeRandomBucketName()), + MakeRandomObjectName()) + .get(); + ASSERT_THAT(writer, Not(IsOk())); +} + +TEST_F(AsyncClientIntegrationTest, ResumeBufferedUploadFailure) { + auto async = AsyncClient(TestOptions()); + + auto writer = async.ResumeBufferedUpload("test-only-invalid-upload-id").get(); + ASSERT_THAT(writer, Not(IsOk())); +} + +TEST_F(AsyncClientIntegrationTest, StartUnbufferedUploadFailure) { + auto async = AsyncClient(TestOptions()); + + auto writer = async + .StartUnbufferedUpload(BucketName(MakeRandomBucketName()), + MakeRandomObjectName()) + .get(); + ASSERT_THAT(writer, Not(IsOk())); +} + +TEST_F(AsyncClientIntegrationTest, ResumeUnbufferedUploadFailure) { + auto async = AsyncClient(TestOptions()); + + auto writer = + async.ResumeUnbufferedUpload("test-only-invalid-upload-id").get(); + ASSERT_THAT(writer, Not(IsOk())); +} + +TEST_F(AsyncClientIntegrationTest, ComposeObjectFailure) { + auto async = AsyncClient(TestOptions()); + + auto make_source = [](std::string name) { + auto source = google::storage::v2::ComposeObjectRequest::SourceObject{}; + source.set_name(std::move(name)); + return source; + }; + auto composed = + async + .ComposeObject(BucketName(bucket_name()), MakeRandomObjectName(), + {make_source(MakeRandomObjectName()), + make_source(MakeRandomObjectName())}) + .get(); + ASSERT_THAT(composed, Not(IsOk())); +} + +TEST_F(AsyncClientIntegrationTest, DeleteObjectFailure) { + auto async = AsyncClient(TestOptions()); + + auto deleted = + async.DeleteObject(BucketName(bucket_name()), MakeRandomObjectName()) + .get(); + ASSERT_THAT(deleted, Not(IsOk())); +} + +TEST_F(AsyncClientIntegrationTest, StartRewriteFailure) { + auto async = AsyncClient(TestOptions()); + + AsyncRewriter rewriter; + AsyncToken token; + std::tie(rewriter, token) = + async.StartRewrite(BucketName(bucket_name()), MakeRandomObjectName(), + BucketName(bucket_name()), MakeRandomObjectName()); + ASSERT_TRUE(token.valid()); + auto iteration = rewriter.Iterate(std::move(token)).get(); + ASSERT_THAT(iteration, Not(IsOk())); +} + +TEST_F(AsyncClientIntegrationTest, ResumeRewriteFailure) { + auto async = AsyncClient(TestOptions()); + + AsyncRewriter rewriter; + AsyncToken token; + std::tie(rewriter, token) = + async.ResumeRewrite(BucketName(bucket_name()), MakeRandomObjectName(), + BucketName(bucket_name()), MakeRandomObjectName(), + "test-only-invalid-rewrite-token"); + ASSERT_TRUE(token.valid()); + auto iteration = rewriter.Iterate(std::move(token)).get(); + ASSERT_THAT(iteration, Not(IsOk())); +} + +TEST_F(AsyncClientIntegrationTest, StartAppendableObjectUploadEmpty) { + if (!UsingEmulator()) GTEST_SKIP(); + auto async = AsyncClient(TestOptions()); + auto client = MakeIntegrationTestClient(true, TestOptions()); + auto object_name = MakeRandomObjectName(); + + auto create = + client.CreateBucket(bucket_name(), storage::BucketMetadata{} + .set_location("us-west4") + .set_storage_class("RAPID")); + if (!create && create.status().code() != StatusCode::kAlreadyExists) { + GTEST_FAIL() << "cannot create bucket: " << create.status(); + } + auto w = + async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) + .get(); + ASSERT_STATUS_OK(w); + AsyncWriter writer; + AsyncToken token; + std::tie(writer, token) = *std::move(w); + + auto metadata = writer.Finalize(std::move(token)).get(); + ASSERT_STATUS_OK(metadata); + ScheduleForDelete(*metadata); + + EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); + EXPECT_EQ(metadata->name(), object_name); + EXPECT_EQ(metadata->size(), 0); +} + +TEST_F(AsyncClientIntegrationTest, StartAppendableObjectUploadMultiple) { + if (!UsingEmulator()) GTEST_SKIP(); + auto async = AsyncClient(TestOptions()); + auto client = MakeIntegrationTestClient(true, TestOptions()); + auto object_name = MakeRandomObjectName(); + // Create a small block to send over and over. + auto constexpr kBlockSize = 256 * 1024; + auto constexpr kBlockCount = 16; + auto const block = MakeRandomData(kBlockSize); + + auto create = + client.CreateBucket(bucket_name(), storage::BucketMetadata{} + .set_location("us-west4") + .set_storage_class("RAPID")); + if (!create && create.status().code() != StatusCode::kAlreadyExists) { + GTEST_FAIL() << "cannot create bucket: " << create.status(); + } + auto w = + async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) + .get(); + ASSERT_STATUS_OK(w); + AsyncWriter writer; + AsyncToken token; + std::tie(writer, token) = *std::move(w); + for (int i = 0; i != kBlockCount; ++i) { + auto p = writer.Write(std::move(token), WritePayload(block)).get(); + ASSERT_STATUS_OK(p); + token = *std::move(p); + } + + auto metadata = writer.Finalize(std::move(token)).get(); + ASSERT_STATUS_OK(metadata); + ScheduleForDelete(*metadata); + + EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); + EXPECT_EQ(metadata->name(), object_name); + EXPECT_EQ(metadata->size(), kBlockCount * kBlockSize); +} + +TEST_F(AsyncClientIntegrationTest, ResumeAppendableObjectUpload) { + // Skipping the test till we get the takeover feature on testbench. + GTEST_SKIP(); + auto async = AsyncClient(TestOptions()); + auto client = MakeIntegrationTestClient(true, TestOptions()); + auto object_name = MakeRandomObjectName(); + // Create a small block to send over and over. + auto constexpr kBlockSize = 256 * 1024; + auto constexpr kInitialBlockCount = 4; + auto constexpr kTotalBlockCount = 4 + kInitialBlockCount; + auto constexpr kDesiredSize = kBlockSize * kTotalBlockCount; + auto const block = MakeRandomData(kBlockSize); + + auto create = + client.CreateBucket(bucket_name(), storage::BucketMetadata{} + .set_location("us-west4") + .set_storage_class("RAPID")); + if (!create && create.status().code() != StatusCode::kAlreadyExists) { + GTEST_FAIL() << "cannot create bucket: " << create.status(); + } + auto w = + async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) + .get(); + ASSERT_STATUS_OK(w); + AsyncWriter writer; + AsyncToken token; + std::tie(writer, token) = *std::move(w); + + for (int i = 0; i != kInitialBlockCount - 1; ++i) { + auto p = writer.Write(std::move(token), WritePayload(block)).get(); + ASSERT_STATUS_OK(p); + token = *std::move(p); + } + + writer.Close(); + + // Reset the existing writer and resume the upload. + writer = AsyncWriter(); + + auto object_metadata = client.GetObjectMetadata(bucket_name(), object_name); + ASSERT_STATUS_OK(object_metadata); + auto m = *object_metadata; + auto generation = m.generation(); + + w = async + .ResumeAppendableObjectUpload(BucketName(bucket_name()), object_name, + generation) + .get(); + ASSERT_STATUS_OK(w); + std::tie(writer, token) = *std::move(w); + auto const persisted = writer.PersistedState(); + ASSERT_THAT(persisted, VariantWith(Le(kDesiredSize))); + // Cast to `int` because otherwise we need to write multiple casts below. + auto offset = static_cast(absl::get(persisted)); + if (offset % kBlockSize != 0) { + auto s = block.substr(offset % kBlockSize); + auto const size = s.size(); + auto p = writer.Write(std::move(token), WritePayload(std::move(s))).get(); + ASSERT_STATUS_OK(p); + offset += static_cast(size); + token = *std::move(p); + } + while (offset < kDesiredSize) { + auto const n = std::min(kBlockSize, kDesiredSize - offset); + auto p = + writer.Write(std::move(token), WritePayload(block.substr(0, n))).get(); + ASSERT_STATUS_OK(p); + offset += n; + token = *std::move(p); + } + + auto metadata = writer.Finalize(std::move(token)).get(); + ASSERT_STATUS_OK(metadata); + ScheduleForDelete(*metadata); + + EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); + EXPECT_EQ(metadata->name(), object_name); + EXPECT_EQ(metadata->size(), kDesiredSize); +} + +TEST_F(AsyncClientIntegrationTest, ResumeFinalizedAppendableObjectUpload) { + // Skipping the test till we get the takeover feature on testbench. + GTEST_SKIP(); + auto async = AsyncClient(TestOptions()); + auto client = MakeIntegrationTestClient(true, TestOptions()); + auto object_name = MakeRandomObjectName(); + // Create a small block to send over and over. + auto constexpr kBlockSize = static_cast(256 * 1024); auto const block = MakeRandomData(kBlockSize); - auto const block2 = MakeRandomData(kBlockSize); - - auto async = MakeAsyncClient(project_id); - // auto w = async.StartBufferedUpload(BucketName(bucket_name), object_name) - // .get(); - // ASSERT_STATUS_OK(w); - auto w = async.StartAppendableObjectUpload(BucketName(bucket_name), object_name) - .get(); + + auto create = + client.CreateBucket(bucket_name(), storage::BucketMetadata{} + .set_location("us-west4") + .set_storage_class("RAPID")); + if (!create && create.status().code() != StatusCode::kAlreadyExists) { + GTEST_FAIL() << "cannot create bucket: " << create.status(); + } + auto w = + async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) + .get(); + ASSERT_STATUS_OK(w); + AsyncWriter writer; + AsyncToken token; + std::tie(writer, token) = *std::move(w); + + auto metadata = writer.Finalize(std::move(token), WritePayload(block)).get(); + ASSERT_STATUS_OK(metadata); + ScheduleForDelete(*metadata); + + EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); + EXPECT_EQ(metadata->name(), object_name); + EXPECT_EQ(metadata->size(), kBlockSize); + + auto object_metadata = client.GetObjectMetadata(bucket_name(), object_name); + ASSERT_STATUS_OK(object_metadata); + auto m = *object_metadata; + auto generation = m.generation(); + + w = async + .ResumeAppendableObjectUpload(BucketName(bucket_name()), object_name, + generation) + .get(); + ASSERT_STATUS_OK(w); + std::tie(writer, token) = *std::move(w); + EXPECT_FALSE(token.valid()); + EXPECT_THAT(writer.PersistedState(), VariantWith( + IsProtoEqual(*metadata))); +} + +TEST_F(AsyncClientIntegrationTest, ExplicitFlushAppendableObjectUpload) { + if (!UsingEmulator()) GTEST_SKIP(); + auto async = AsyncClient(TestOptions()); + auto client = MakeIntegrationTestClient(true, TestOptions()); + auto object_name = MakeRandomObjectName(); + // Create a small block to send over and over. + auto constexpr kBlockSize = static_cast(256 * 1024); + auto const block = MakeRandomData(kBlockSize); + + auto create = + client.CreateBucket(bucket_name(), storage::BucketMetadata{} + .set_location("us-west4") + .set_storage_class("RAPID")); + if (!create && create.status().code() != StatusCode::kAlreadyExists) { + GTEST_FAIL() << "cannot create bucket: " << create.status(); + } + auto w = + async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) + .get(); ASSERT_STATUS_OK(w); + AsyncWriter writer; + AsyncToken token; + std::tie(writer, token) = *std::move(w); + + auto p = writer.Write(std::move(token), WritePayload(block)).get(); + ASSERT_STATUS_OK(p); + token = *std::move(p); + + // Explicitly flush the data. + auto flush_status = writer.Flush().get(); + EXPECT_STATUS_OK(flush_status); + + auto metadata = writer.Finalize(std::move(token)).get(); + ASSERT_STATUS_OK(metadata); + ScheduleForDelete(*metadata); + + EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); + EXPECT_EQ(metadata->name(), object_name); + EXPECT_EQ(metadata->size(), kBlockSize); +} + +TEST_F(AsyncClientIntegrationTest, Open) { + if (!UsingEmulator()) GTEST_SKIP(); + auto async = AsyncClient(TestOptions()); + auto client = MakeIntegrationTestClient(true, TestOptions()); + auto object_name = MakeRandomObjectName(); + auto create = client.CreateBucket( + bucket_name(), storage::BucketMetadata{}.set_location("us-west4")); + if (!create && create.status().code() != StatusCode::kAlreadyExists) { + GTEST_FAIL() << "cannot create bucket: " << create.status(); + } + + auto constexpr kSize = 8 * 1024; + auto constexpr kStride = 2 * kSize; + auto constexpr kBlockCount = 4; + auto const block = MakeRandomData(kSize); + + auto w = + async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) + .get(); + ASSERT_STATUS_OK(w); AsyncWriter writer; AsyncToken token; std::tie(writer, token) = *std::move(w); - for (int i = 0; i < kBlockCount; ++i) { - std::cout << "Writing data iteration #" << i << std::endl; + for (int i = 0; i != kBlockCount; ++i) { auto p = writer.Write(std::move(token), WritePayload(block)).get(); ASSERT_STATUS_OK(p); token = *std::move(p); } - -// auto metadata1 = writer.Finalize(std::move(token)).get(); -// ASSERT_STATUS_OK(metadata1); -// std::cout << "Request metadata: " << metadata1->generation() << std::endl; - // EXPECT_EQ(1,2); - - // auto close = writer.Close(); - - // auto object_metadata = client.GetObjectMetadata(bucket_name, object_name); - // auto m = *object_metadata; - // auto generation = m.generation(); - - // auto w1 = async.ResumeAppendableObjectUpload(BucketName(bucket_name), object_name, generation) - // .get(); - - // ASSERT_STATUS_OK(w1); - - // AsyncWriter writer1; - // AsyncToken token1; - // std::tie(writer1, token1) = *std::move(w1); - - // for (int i = 0; i < kBlockCount; ++i) { - // // std::cout << "Writing data iteration #" << i << std::endl; - // auto p = writer1.Write(std::move(token1), WritePayload(block)).get(); - // ASSERT_STATUS_OK(p); - // token1 = *std::move(p); - // } - - // // auto object_metadata1 = client.GetObjectMetadata(bucket_name, object_name); - // // auto m1 = *object_metadata1; - // // // auto generation1 = m1.generation(); - // // std::cout << "Object metadata1: " << m << std::endl; - - // auto metadata = writer1.Finalize(std::move(token1)).get(); - // ASSERT_STATUS_OK(metadata); - // // // ScheduleForDelete(*metadata); - - // EXPECT_EQ(metadata->bucket(), BucketName(bucket_name).FullName()); - // EXPECT_EQ(metadata->name()," object_name"); - // EXPECT_EQ(metadata->size(), kBlockCount * kBlockSize); - // EXPECT_EQ("dddd", "Sdfs"); - // std::cout << "Test completed successfully" << std::endl; - // client.DeleteObject(bucket_name, object_name); - -// auto spec = google::storage::v2::BidiReadObjectSpec{}; -// // std::cout << object_metadata->bucket() << "\n"; - -// // spec.set_bucket("projects/_/buckets/gcs-grpc-team-fastbyte-bajajnehaa-test-us-west4"); -// // spec.set_object(object_name); -// // auto descriptor_status = async.Open(spec).get(); -// // ASSERT_STATUS_OK(descriptor_status); -// // ObjectDescriptor descriptor = *std::move(descriptor_status); -// // auto descriptor_ptr = -// // std::make_shared(std::move(descriptor)); -// // std::shared_ptr descriptor_ptr = std::make_shared(*descriptor); - -// // --- Start of ThreadPool implementation --- - -// // 1. Initialize the ThreadPool -// // Use hardware_concurrency to get a reasonable number of threads -// size_t num_threads = std::thread::hardware_concurrency(); -// std::cout << "Starting ThreadPool with " << num_threads << " threads." << std::endl; -// ThreadPool pool(num_threads); - -// // 2. Define read parameters and storage for futures -// std::vector> futures; -// int num_reads = 1000; -// std::int64_t read_offset = 0; -// std::int64_t read_limit = 1024 * 1024 * 1024; // 1 GiB - -// std::cout << "Enqueuing " << num_reads << " read tasks..." << std::endl; - -// // 3. Enqueue all the read tasks -// // The original loop is replaced with this loop. -// for (int i = 0; i < num_reads; ++i) { -// // Pass *descriptor (the ObjectDescriptor) by value. -// // This is safe because it's a copyable wrapper. -// futures.push_back( -// pool.enqueue(ReadRangeTask, descriptor_ptr, read_offset, read_limit) -// ); -// } - -// // 4. Wait for all enqueued tasks to complete -// std::cout << "Waiting for all " << futures.size() << " read tasks to complete..." << std::endl; -// for (auto& f : futures) { -// f.get(); // This blocks until the future is ready. -// // If a task failed (e.g., via ASSERT_STATUS_OK), -// // gtest will have already flagged the failure. -// // If the task threw an exception, get() will re-throw it. -// } - -// std::cout << "All " << num_reads << " parallel read tasks completed." << std::endl; - - // --- End of ThreadPool implementation --- - - // auto actual0 = std::string{}; - // for(int i =0 ; i< 1000 ; i++){ - // std::tie(r0, t0) = descriptor->Read(0 , 1024* 1024* 1024); - // actual0 = std::string{}; - // while (t0.valid()) { - // auto read = r0.Read(std::move(t0)).get(); - // ASSERT_STATUS_OK(read); - // ReadPayload p; - // AsyncToken t; - // std::tie(p, t) = *std::move(read); - // t0 = std::move(t); - // } - // } - -// auto ans = block + block + block; - // EXPECT_EQ(1,2); + + auto metadata = writer.Finalize(std::move(token)).get(); + ASSERT_STATUS_OK(metadata); + + auto spec = google::storage::v2::BidiReadObjectSpec{}; + spec.set_bucket(BucketName(bucket_name()).FullName()); + spec.set_object(object_name); + auto descriptor = async.Open(spec).get(); + ASSERT_STATUS_OK(descriptor); + + AsyncReader r0; + AsyncToken t0; + auto actual0 = std::string{}; + std::tie(r0, t0) = descriptor->Read(0 * kStride, kSize); + while (t0.valid()) { + auto read = r0.Read(std::move(t0)).get(); + ASSERT_STATUS_OK(read); + ReadPayload p; + AsyncToken t; + std::tie(p, t) = *std::move(read); + for (auto sv : p.contents()) actual0 += std::string(sv); + t0 = std::move(t); + } + + EXPECT_EQ(actual0.size(), kSize); + client.DeleteObject(bucket_name(), object_name, + storage::Generation(metadata->generation())); } +TEST_F(AsyncClientIntegrationTest, OpenExceedMaximumRange) { + if (!UsingEmulator()) GTEST_SKIP(); + auto async = AsyncClient( + TestOptions().set(1024)); + auto client = MakeIntegrationTestClient(true, TestOptions()); + auto object_name = MakeRandomObjectName(); + + auto create = client.CreateBucket( + bucket_name(), storage::BucketMetadata{}.set_location("us-west4")); + if (!create && create.status().code() != StatusCode::kAlreadyExists) { + GTEST_FAIL() << "cannot create bucket: " << create.status(); + } + + auto constexpr kSize = 2048; + auto const block = MakeRandomData(kSize); + + auto w = + async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) + .get(); + ASSERT_STATUS_OK(w); + AsyncWriter writer; + AsyncToken token; + std::tie(writer, token) = *std::move(w); + auto p = writer.Write(std::move(token), WritePayload(block)).get(); + ASSERT_STATUS_OK(p); + token = *std::move(p); + + auto metadata = writer.Finalize(std::move(token)).get(); + ASSERT_STATUS_OK(metadata); + + auto spec = google::storage::v2::BidiReadObjectSpec{}; + spec.set_bucket(BucketName(bucket_name()).FullName()); + spec.set_object(object_name); + auto descriptor = async.Open(spec).get(); + ASSERT_STATUS_OK(descriptor); + + AsyncReader r0; + AsyncToken t0; + auto actual0 = std::string{}; + std::tie(r0, t0) = descriptor->Read(0, kSize); + while (t0.valid()) { + auto read = r0.Read(std::move(t0)).get(); + ASSERT_STATUS_OK(read); + ReadPayload p; + AsyncToken t; + std::tie(p, t) = *std::move(read); + for (auto sv : p.contents()) actual0 += std::string(sv); + t0 = std::move(t); + } + + EXPECT_EQ(actual0.size(), kSize); + client.DeleteObject(bucket_name(), object_name, + storage::Generation(metadata->generation())); +} } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END @@ -383,4 +1065,4 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace cloud } // namespace google -#endif // GOOGLE_CLOUD_CPP_STORAGE_HAVE_GRPC \ No newline at end of file +#endif // GOOGLE_CLOUD_CPP_STORAGE_HAVE_GRPC From 8635ca670e54d77e4deda1272e44d7e2c60d0a7b Mon Sep 17 00:00:00 2001 From: v-pratap Date: Fri, 16 Jan 2026 15:15:07 +0000 Subject: [PATCH 06/15] remove extra state setting --- google/cloud/storage/internal/async/writer_connection_resumed.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/google/cloud/storage/internal/async/writer_connection_resumed.cc b/google/cloud/storage/internal/async/writer_connection_resumed.cc index dab7d9a3e5579..9193fe69e3796 100644 --- a/google/cloud/storage/internal/async/writer_connection_resumed.cc +++ b/google/cloud/storage/internal/async/writer_connection_resumed.cc @@ -358,7 +358,6 @@ class AsyncWriterConnectionResumedState if (state_ == State::kResuming) return; was_finalizing = finalizing_; if (!s.ok() && cancelled_) { - state_ = State::kIdle; return SetError(std::move(lk), std::move(s)); } state_ = State::kResuming; From a01f036da2d8121fb056171b38f6f8ccb383d7d1 Mon Sep 17 00:00:00 2001 From: v-pratap Date: Fri, 16 Jan 2026 15:40:42 +0000 Subject: [PATCH 07/15] recover write handle changes --- .../async/writer_connection_resumed.cc | 22 +++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/google/cloud/storage/internal/async/writer_connection_resumed.cc b/google/cloud/storage/internal/async/writer_connection_resumed.cc index 9193fe69e3796..cc2c25cd6e1ec 100644 --- a/google/cloud/storage/internal/async/writer_connection_resumed.cc +++ b/google/cloud/storage/internal/async/writer_connection_resumed.cc @@ -82,6 +82,13 @@ class AsyncWriterConnectionResumedState } else { buffer_offset_ = absl::get(state); } + if (first_response_.has_write_handle()) { + latest_write_handle_ = first_response_.write_handle(); + } else if (initial_request_.has_append_object_spec() && + initial_request_.append_object_spec().has_write_handle()) { + latest_write_handle_ = + initial_request_.append_object_spec().write_handle(); + } } void Cancel() { @@ -179,6 +186,7 @@ class AsyncWriterConnectionResumedState } void StartWriting(std::unique_lock lk) { + if (writing_) return; WriteLoop(std::move(lk)); } @@ -344,9 +352,8 @@ class AsyncWriterConnectionResumedState } // Include write_handle to enable fast resume instead of slow // takeover. Without handle, server performs full state validation. - if (first_response_.has_write_handle()) { - *append_object_spec.mutable_write_handle() = - first_response_.write_handle(); + if (latest_write_handle_) { + *append_object_spec.mutable_write_handle() = *latest_write_handle_; } append_object_spec.set_generation(first_response_.resource().generation()); ApplyWriteRedirectErrors(append_object_spec, std::move(proto_status)); @@ -373,6 +380,10 @@ class AsyncWriterConnectionResumedState void OnResume(Status const& original_status, bool was_finalizing, StatusOr res) { std::unique_lock lk(mu_); + // Update write_handle from any resume response that contains it. + if (res && res->first_response.has_write_handle()) { + latest_write_handle_ = res->first_response.write_handle(); + } if (was_finalizing) { // If resuming due to a finalization error, we *must* complete the @@ -613,6 +624,9 @@ class AsyncWriterConnectionResumedState // Tracks if the final promise (`finalized_`) has been completed. bool finalized_promise_completed_ = false; + + // Track the latest write handle seen in responses. + absl::optional latest_write_handle_; }; /** @@ -715,4 +729,4 @@ MakeWriterConnectionResumed( GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage_internal } // namespace cloud -} // namespace google +} // namespace google \ No newline at end of file From 52dddd7e407d3bba13f1dbcc5f1577f55417cf4f Mon Sep 17 00:00:00 2001 From: v-pratap Date: Mon, 19 Jan 2026 08:53:32 +0000 Subject: [PATCH 08/15] fix the format --- .../async/writer_connection_resumed.cc | 4 +-- .../async/writer_connection_resumed_test.cc | 33 ++++++++++--------- 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/google/cloud/storage/internal/async/writer_connection_resumed.cc b/google/cloud/storage/internal/async/writer_connection_resumed.cc index cc2c25cd6e1ec..2f7541e9b71bf 100644 --- a/google/cloud/storage/internal/async/writer_connection_resumed.cc +++ b/google/cloud/storage/internal/async/writer_connection_resumed.cc @@ -186,7 +186,6 @@ class AsyncWriterConnectionResumedState } void StartWriting(std::unique_lock lk) { - if (writing_) return; WriteLoop(std::move(lk)); } @@ -271,7 +270,8 @@ class AsyncWriterConnectionResumedState auto self = w.lock(); if (!self) return; self->OnQuery(f.get()); - self->SetFlushed(std::unique_lock(self->mu_), std::move(result)); + self->SetFlushed(std::unique_lock(self->mu_), + std::move(result)); }); } diff --git a/google/cloud/storage/internal/async/writer_connection_resumed_test.cc b/google/cloud/storage/internal/async/writer_connection_resumed_test.cc index 4fb39ca4bd3d8..f64f4aa9c9e10 100644 --- a/google/cloud/storage/internal/async/writer_connection_resumed_test.cc +++ b/google/cloud/storage/internal/async/writer_connection_resumed_test.cc @@ -419,24 +419,24 @@ TEST(WriteConnectionResumed, NoConcurrentWritesWhenFlushAndWriteRace) { }); }); EXPECT_CALL(*mock, Query).WillOnce([&]() { - return sequencer.PushBack("Query").then([](auto f) -> StatusOr { - if (!f.get()) return TransientError(); - return 0; - }); + return sequencer.PushBack("Query").then( + [](auto f) -> StatusOr { + if (!f.get()) return TransientError(); + return 0; + }); }); // Make Write detect concurrent invocations. If two writes run concurrently // the compare_exchange will fail and the test will fail. std::atomic in_write{false}; - EXPECT_CALL(*mock, Write(_)) - .WillRepeatedly([&](auto) { - bool expected = false; - EXPECT_TRUE(in_write.compare_exchange_strong(expected, true)); - // Simulate some work that allows a concurrent write to attempt to run. - std::this_thread::sleep_for(std::chrono::milliseconds(50)); - in_write.store(false); - return make_ready_future(Status{}); - }); + EXPECT_CALL(*mock, Write(_)).WillRepeatedly([&](auto) { + bool expected = false; + EXPECT_TRUE(in_write.compare_exchange_strong(expected, true)); + // Simulate some work that allows a concurrent write to attempt to run. + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + in_write.store(false); + return make_ready_future(Status{}); + }); MockFactory mock_factory; EXPECT_CALL(mock_factory, Call).Times(0); @@ -464,11 +464,13 @@ TEST(WriteConnectionResumed, NoConcurrentWritesWhenFlushAndWriteRace) { // Wait for both futures to complete with a timeout to avoid indefinite hang. auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(2); - while (!write_future.is_ready() && std::chrono::steady_clock::now() < deadline) { + while (!write_future.is_ready() && + std::chrono::steady_clock::now() < deadline) { std::this_thread::sleep_for(std::chrono::milliseconds(5)); } deadline = std::chrono::steady_clock::now() + std::chrono::seconds(2); - while (!flush_future.is_ready() && std::chrono::steady_clock::now() < deadline) { + while (!flush_future.is_ready() && + std::chrono::steady_clock::now() < deadline) { std::this_thread::sleep_for(std::chrono::milliseconds(5)); } @@ -480,7 +482,6 @@ TEST(WriteConnectionResumed, NoConcurrentWritesWhenFlushAndWriteRace) { EXPECT_THAT(flush_future.get(), StatusIs(StatusCode::kOk)); } - TEST(WriteConnectionResumed, WriteHandleAssignmentAfterResume) { struct { bool use_write_object_spec; From 448906b51f4c3fd2ddfeff909e6ca762bbae8e54 Mon Sep 17 00:00:00 2001 From: v-pratap Date: Mon, 19 Jan 2026 09:43:35 +0000 Subject: [PATCH 09/15] fix the format --- .../cloud/storage/internal/async/writer_connection_resumed.cc | 4 ++-- .../storage/internal/async/writer_connection_resumed_test.cc | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/google/cloud/storage/internal/async/writer_connection_resumed.cc b/google/cloud/storage/internal/async/writer_connection_resumed.cc index 2f7541e9b71bf..c8e116bee3728 100644 --- a/google/cloud/storage/internal/async/writer_connection_resumed.cc +++ b/google/cloud/storage/internal/async/writer_connection_resumed.cc @@ -266,7 +266,7 @@ class AsyncWriterConnectionResumedState write_offset_ += write_size; auto impl = Impl(lk); lk.unlock(); - impl->Query().then([this, result, w = WeakFromThis()](auto f) { + impl->Query().then([result, w = WeakFromThis()](auto f) { auto self = w.lock(); if (!self) return; self->OnQuery(f.get()); @@ -729,4 +729,4 @@ MakeWriterConnectionResumed( GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage_internal } // namespace cloud -} // namespace google \ No newline at end of file +} // namespace google diff --git a/google/cloud/storage/internal/async/writer_connection_resumed_test.cc b/google/cloud/storage/internal/async/writer_connection_resumed_test.cc index f64f4aa9c9e10..81d1245dcdd73 100644 --- a/google/cloud/storage/internal/async/writer_connection_resumed_test.cc +++ b/google/cloud/storage/internal/async/writer_connection_resumed_test.cc @@ -551,4 +551,4 @@ TEST(WriteConnectionResumed, WriteHandleAssignmentAfterResume) { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage_internal } // namespace cloud -} // namespace google \ No newline at end of file +} // namespace google From ba96b287d6f559f92d0bcee3ac75112daa39554b Mon Sep 17 00:00:00 2001 From: v-pratap Date: Mon, 19 Jan 2026 13:46:27 +0000 Subject: [PATCH 10/15] fix the format --- .../internal/async/writer_connection_resumed_test.cc | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/google/cloud/storage/internal/async/writer_connection_resumed_test.cc b/google/cloud/storage/internal/async/writer_connection_resumed_test.cc index 81d1245dcdd73..70561848ca46e 100644 --- a/google/cloud/storage/internal/async/writer_connection_resumed_test.cc +++ b/google/cloud/storage/internal/async/writer_connection_resumed_test.cc @@ -203,8 +203,10 @@ TEST(WriterConnectionResumed, FlushEmpty) { next = sequencer.PopFrontWithName(); EXPECT_EQ(next.second, "Query"); next.first.set_value(true); - - EXPECT_THAT(flush.get(), StatusIs(StatusCode::kOk)); + auto w = flush.get(); + if(!w.ok()) { + FAIL() << "Flush failed: " << w; + } } TEST(WriteConnectionResumed, FlushNonEmpty) { From 472b7e568127a921d523f991115939aec8220a11 Mon Sep 17 00:00:00 2001 From: v-pratap Date: Mon, 19 Jan 2026 14:20:43 +0000 Subject: [PATCH 11/15] fix the noex integration test failure --- .../async/writer_connection_resumed_test.cc | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/google/cloud/storage/internal/async/writer_connection_resumed_test.cc b/google/cloud/storage/internal/async/writer_connection_resumed_test.cc index 70561848ca46e..fd8c7a57ff5e5 100644 --- a/google/cloud/storage/internal/async/writer_connection_resumed_test.cc +++ b/google/cloud/storage/internal/async/writer_connection_resumed_test.cc @@ -203,9 +203,8 @@ TEST(WriterConnectionResumed, FlushEmpty) { next = sequencer.PopFrontWithName(); EXPECT_EQ(next.second, "Query"); next.first.set_value(true); - auto w = flush.get(); - if(!w.ok()) { - FAIL() << "Flush failed: " << w; + if (!flush.get().ok()) { + FAIL() << "Flush failed: " << flush.get(); } } @@ -478,10 +477,12 @@ TEST(WriteConnectionResumed, NoConcurrentWritesWhenFlushAndWriteRace) { ASSERT_TRUE(write_future.is_ready()); ASSERT_TRUE(flush_future.is_ready()); - - // Both futures should complete successfully. - EXPECT_THAT(write_future.get(), StatusIs(StatusCode::kOk)); - EXPECT_THAT(flush_future.get(), StatusIs(StatusCode::kOk)); + if (!write_future.get().ok()) { + FAIL() << "Write failed: " << write_future.get(); + } + if (!flush_future.get().ok()) { + FAIL() << "Flush failed: " << flush_future.get(); + } } TEST(WriteConnectionResumed, WriteHandleAssignmentAfterResume) { From f43c7285b365f919210f3467dda5ecb61914e4b6 Mon Sep 17 00:00:00 2001 From: v-pratap Date: Mon, 19 Jan 2026 14:54:35 +0000 Subject: [PATCH 12/15] fix noes integration test failure --- .../storage/internal/async/writer_connection_resumed_test.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/google/cloud/storage/internal/async/writer_connection_resumed_test.cc b/google/cloud/storage/internal/async/writer_connection_resumed_test.cc index fd8c7a57ff5e5..b0553c31a9b62 100644 --- a/google/cloud/storage/internal/async/writer_connection_resumed_test.cc +++ b/google/cloud/storage/internal/async/writer_connection_resumed_test.cc @@ -415,14 +415,14 @@ TEST(WriteConnectionResumed, NoConcurrentWritesWhenFlushAndWriteRace) { .WillRepeatedly(Return(MakePersistedState(0))); EXPECT_CALL(*mock, Flush(_)).WillRepeatedly([&](auto) { return sequencer.PushBack("Flush").then([](auto f) { - if (!f.get()) return TransientError(); + if (!f.valid() || !f.get()) return TransientError(); return Status{}; }); }); EXPECT_CALL(*mock, Query).WillOnce([&]() { return sequencer.PushBack("Query").then( [](auto f) -> StatusOr { - if (!f.get()) return TransientError(); + if (!f.valid() || !f.get()) return TransientError(); return 0; }); }); From 7da458ad39c755ad90774e803e2b671ef11db44b Mon Sep 17 00:00:00 2001 From: v-pratap Date: Mon, 19 Jan 2026 16:45:44 +0000 Subject: [PATCH 13/15] revert noes integration test failure --- .../async/writer_connection_resumed_test.cc | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/google/cloud/storage/internal/async/writer_connection_resumed_test.cc b/google/cloud/storage/internal/async/writer_connection_resumed_test.cc index b0553c31a9b62..1b9f9f18c880b 100644 --- a/google/cloud/storage/internal/async/writer_connection_resumed_test.cc +++ b/google/cloud/storage/internal/async/writer_connection_resumed_test.cc @@ -203,9 +203,7 @@ TEST(WriterConnectionResumed, FlushEmpty) { next = sequencer.PopFrontWithName(); EXPECT_EQ(next.second, "Query"); next.first.set_value(true); - if (!flush.get().ok()) { - FAIL() << "Flush failed: " << flush.get(); - } + EXPECT_THAT(flush.get(), StatusIs(StatusCode::kOk)); } TEST(WriteConnectionResumed, FlushNonEmpty) { @@ -415,14 +413,14 @@ TEST(WriteConnectionResumed, NoConcurrentWritesWhenFlushAndWriteRace) { .WillRepeatedly(Return(MakePersistedState(0))); EXPECT_CALL(*mock, Flush(_)).WillRepeatedly([&](auto) { return sequencer.PushBack("Flush").then([](auto f) { - if (!f.valid() || !f.get()) return TransientError(); + if (!f.get()) return TransientError(); return Status{}; }); }); EXPECT_CALL(*mock, Query).WillOnce([&]() { return sequencer.PushBack("Query").then( [](auto f) -> StatusOr { - if (!f.valid() || !f.get()) return TransientError(); + if (!f.get()) return TransientError(); return 0; }); }); @@ -477,12 +475,8 @@ TEST(WriteConnectionResumed, NoConcurrentWritesWhenFlushAndWriteRace) { ASSERT_TRUE(write_future.is_ready()); ASSERT_TRUE(flush_future.is_ready()); - if (!write_future.get().ok()) { - FAIL() << "Write failed: " << write_future.get(); - } - if (!flush_future.get().ok()) { - FAIL() << "Flush failed: " << flush_future.get(); - } + EXPECT_THAT(write_future.get(), StatusIs(StatusCode::kOk)); + EXPECT_THAT(flush_future.get(), StatusIs(StatusCode::kOk)); } TEST(WriteConnectionResumed, WriteHandleAssignmentAfterResume) { From 43a29f5cf4545a1439a3b596429fdf6228fb3e4b Mon Sep 17 00:00:00 2001 From: v-pratap Date: Mon, 19 Jan 2026 17:16:11 +0000 Subject: [PATCH 14/15] fix noes integration test failure --- .../async/writer_connection_resumed_test.cc | 25 +++++++------------ 1 file changed, 9 insertions(+), 16 deletions(-) diff --git a/google/cloud/storage/internal/async/writer_connection_resumed_test.cc b/google/cloud/storage/internal/async/writer_connection_resumed_test.cc index 1b9f9f18c880b..a8645e6d73283 100644 --- a/google/cloud/storage/internal/async/writer_connection_resumed_test.cc +++ b/google/cloud/storage/internal/async/writer_connection_resumed_test.cc @@ -174,15 +174,13 @@ TEST(WriterConnectionResumed, FlushEmpty) { .WillRepeatedly(Return(MakePersistedState(0))); EXPECT_CALL(*mock, Flush).WillRepeatedly([&](auto const& p) { EXPECT_TRUE(p.payload().empty()); - return sequencer.PushBack("Flush").then([](auto f) { - if (!f.get()) return TransientError(); + return sequencer.PushBack("Flush").then([](auto) { return Status{}; }); }); EXPECT_CALL(*mock, Query).WillOnce([&]() { return sequencer.PushBack("Query").then( - [](auto f) -> StatusOr { - if (!f.get()) return TransientError(); + [](auto) -> StatusOr { return 0; }); }); @@ -218,22 +216,19 @@ TEST(WriteConnectionResumed, FlushNonEmpty) { EXPECT_CALL(*mock, Flush) .WillOnce([&](auto const& p) { EXPECT_EQ(p.payload(), payload.payload()); - return sequencer.PushBack("Flush").then([](auto f) { - if (!f.get()) return TransientError(); - return Status{}; + return sequencer.PushBack("Flush").then([](auto) { + return Status{}; }); }) .WillOnce([&](auto const& p) { EXPECT_TRUE(p.payload().empty()); - return sequencer.PushBack("Flush").then([](auto f) { - if (!f.get()) return TransientError(); - return Status{}; + return sequencer.PushBack("Flush").then([](auto) { + return Status{}; }); }); EXPECT_CALL(*mock, Query).WillOnce([&]() { return sequencer.PushBack("Query").then( - [](auto f) -> StatusOr { - if (!f.get()) return TransientError(); + [](auto) -> StatusOr { return 1024; }); }); @@ -412,15 +407,13 @@ TEST(WriteConnectionResumed, NoConcurrentWritesWhenFlushAndWriteRace) { EXPECT_CALL(*mock, PersistedState) .WillRepeatedly(Return(MakePersistedState(0))); EXPECT_CALL(*mock, Flush(_)).WillRepeatedly([&](auto) { - return sequencer.PushBack("Flush").then([](auto f) { - if (!f.get()) return TransientError(); + return sequencer.PushBack("Flush").then([](auto) { return Status{}; }); }); EXPECT_CALL(*mock, Query).WillOnce([&]() { return sequencer.PushBack("Query").then( - [](auto f) -> StatusOr { - if (!f.get()) return TransientError(); + [](auto) -> StatusOr { return 0; }); }); From 981827904ff57fb48fc06b032846fc135c7df345 Mon Sep 17 00:00:00 2001 From: v-pratap Date: Mon, 19 Jan 2026 17:18:27 +0000 Subject: [PATCH 15/15] fix the format --- .../async/writer_connection_resumed_test.cc | 28 +++++-------------- 1 file changed, 7 insertions(+), 21 deletions(-) diff --git a/google/cloud/storage/internal/async/writer_connection_resumed_test.cc b/google/cloud/storage/internal/async/writer_connection_resumed_test.cc index a8645e6d73283..0189991bcd633 100644 --- a/google/cloud/storage/internal/async/writer_connection_resumed_test.cc +++ b/google/cloud/storage/internal/async/writer_connection_resumed_test.cc @@ -174,15 +174,11 @@ TEST(WriterConnectionResumed, FlushEmpty) { .WillRepeatedly(Return(MakePersistedState(0))); EXPECT_CALL(*mock, Flush).WillRepeatedly([&](auto const& p) { EXPECT_TRUE(p.payload().empty()); - return sequencer.PushBack("Flush").then([](auto) { - return Status{}; - }); + return sequencer.PushBack("Flush").then([](auto) { return Status{}; }); }); EXPECT_CALL(*mock, Query).WillOnce([&]() { return sequencer.PushBack("Query").then( - [](auto) -> StatusOr { - return 0; - }); + [](auto) -> StatusOr { return 0; }); }); MockFactory mock_factory; @@ -216,21 +212,15 @@ TEST(WriteConnectionResumed, FlushNonEmpty) { EXPECT_CALL(*mock, Flush) .WillOnce([&](auto const& p) { EXPECT_EQ(p.payload(), payload.payload()); - return sequencer.PushBack("Flush").then([](auto) { - return Status{}; - }); + return sequencer.PushBack("Flush").then([](auto) { return Status{}; }); }) .WillOnce([&](auto const& p) { EXPECT_TRUE(p.payload().empty()); - return sequencer.PushBack("Flush").then([](auto) { - return Status{}; - }); + return sequencer.PushBack("Flush").then([](auto) { return Status{}; }); }); EXPECT_CALL(*mock, Query).WillOnce([&]() { return sequencer.PushBack("Query").then( - [](auto) -> StatusOr { - return 1024; - }); + [](auto) -> StatusOr { return 1024; }); }); MockFactory mock_factory; @@ -407,15 +397,11 @@ TEST(WriteConnectionResumed, NoConcurrentWritesWhenFlushAndWriteRace) { EXPECT_CALL(*mock, PersistedState) .WillRepeatedly(Return(MakePersistedState(0))); EXPECT_CALL(*mock, Flush(_)).WillRepeatedly([&](auto) { - return sequencer.PushBack("Flush").then([](auto) { - return Status{}; - }); + return sequencer.PushBack("Flush").then([](auto) { return Status{}; }); }); EXPECT_CALL(*mock, Query).WillOnce([&]() { return sequencer.PushBack("Query").then( - [](auto) -> StatusOr { - return 0; - }); + [](auto) -> StatusOr { return 0; }); }); // Make Write detect concurrent invocations. If two writes run concurrently