From 6117aff4e4cc817c921eb6dd69da13f52632e629 Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Wed, 8 Feb 2017 14:57:13 +1100 Subject: [PATCH 1/3] DO NOT COMMIT: remove ListenableFuture from surface This won't compile yet, since it needs change in gax: https://github.com/googleapis/gax-java/pull/198 --- .../spi/v1beta1/ErrorGroupServiceClient.java | 4 +- .../spi/v1beta1/ErrorStatsServiceClient.java | 6 +-- .../v1beta1/ReportErrorsServiceClient.java | 2 +- .../spi/v1/LanguageServiceClient.java | 9 ++-- .../logging/spi/v2/ConfigServiceV2Client.java | 10 ++-- .../spi/v2/LoggingServiceV2Client.java | 10 ++-- .../spi/v2/MetricsServiceV2Client.java | 10 ++-- .../monitoring/spi/v3/GroupServiceClient.java | 12 ++--- .../spi/v3/MetricServiceClient.java | 17 +++---- .../pubsub/spi/v1/MessageDispatcher.java | 13 ++++- .../cloud/pubsub/spi/v1/MessageReceiver.java | 17 +++++-- .../google/cloud/pubsub/spi/v1/Publisher.java | 50 +++++++++++++++++-- .../cloud/pubsub/spi/v1/PublisherClient.java | 18 +++---- .../cloud/pubsub/spi/v1/SubscriberClient.java | 22 ++++---- .../pubsub/spi/v1/PublisherImplTest.java | 36 ++++++------- .../pubsub/spi/v1/SubscriberImplTest.java | 19 ++++--- .../speech/spi/v1beta1/SpeechClient.java | 4 +- .../trace/spi/v1/TraceServiceClient.java | 6 +-- .../vision/spi/v1/ImageAnnotatorClient.java | 2 +- 19 files changed, 163 insertions(+), 104 deletions(-) diff --git a/google-cloud-errorreporting/src/main/java/com/google/cloud/errorreporting/spi/v1beta1/ErrorGroupServiceClient.java b/google-cloud-errorreporting/src/main/java/com/google/cloud/errorreporting/spi/v1beta1/ErrorGroupServiceClient.java index 33e1edc3d84a..c745a1643fbf 100644 --- a/google-cloud-errorreporting/src/main/java/com/google/cloud/errorreporting/spi/v1beta1/ErrorGroupServiceClient.java +++ b/google-cloud-errorreporting/src/main/java/com/google/cloud/errorreporting/spi/v1beta1/ErrorGroupServiceClient.java @@ -214,7 +214,7 @@ private final ErrorGroup getGroup(GetGroupRequest request) { * GetGroupRequest request = GetGroupRequest.newBuilder() * .setGroupNameWithGroupName(groupName) * .build(); - * ListenableFuture<ErrorGroup> future = errorGroupServiceClient.getGroupCallable().futureCall(request); + * RpcFuture<ErrorGroup> future = errorGroupServiceClient.getGroupCallable().futureCall(request); * // Do something * ErrorGroup response = future.get(); * } @@ -281,7 +281,7 @@ private final ErrorGroup updateGroup(UpdateGroupRequest request) { * UpdateGroupRequest request = UpdateGroupRequest.newBuilder() * .setGroup(group) * .build(); - * ListenableFuture<ErrorGroup> future = errorGroupServiceClient.updateGroupCallable().futureCall(request); + * RpcFuture<ErrorGroup> future = errorGroupServiceClient.updateGroupCallable().futureCall(request); * // Do something * ErrorGroup response = future.get(); * } diff --git a/google-cloud-errorreporting/src/main/java/com/google/cloud/errorreporting/spi/v1beta1/ErrorStatsServiceClient.java b/google-cloud-errorreporting/src/main/java/com/google/cloud/errorreporting/spi/v1beta1/ErrorStatsServiceClient.java index 4a53ccaf92bb..71e545c4cb16 100644 --- a/google-cloud-errorreporting/src/main/java/com/google/cloud/errorreporting/spi/v1beta1/ErrorStatsServiceClient.java +++ b/google-cloud-errorreporting/src/main/java/com/google/cloud/errorreporting/spi/v1beta1/ErrorStatsServiceClient.java @@ -251,7 +251,7 @@ public final ListGroupStatsPagedResponse listGroupStats(ListGroupStatsRequest re * .setProjectNameWithProjectName(projectName) * .setTimeRange(timeRange) * .build(); - * ListenableFuture<ListGroupStatsPagedResponse> future = errorStatsServiceClient.listGroupStatsPagedCallable().futureCall(request); + * RpcFuture<ListGroupStatsPagedResponse> future = errorStatsServiceClient.listGroupStatsPagedCallable().futureCall(request); * // Do something * for (ErrorGroupStats element : future.get().iterateAllElements()) { * // doThingsWith(element); @@ -370,7 +370,7 @@ public final ListEventsPagedResponse listEvents(ListEventsRequest request) { * .setProjectNameWithProjectName(projectName) * .setGroupId(groupId) * .build(); - * ListenableFuture<ListEventsPagedResponse> future = errorStatsServiceClient.listEventsPagedCallable().futureCall(request); + * RpcFuture<ListEventsPagedResponse> future = errorStatsServiceClient.listEventsPagedCallable().futureCall(request); * // Do something * for (ErrorEvent element : future.get().iterateAllElements()) { * // doThingsWith(element); @@ -475,7 +475,7 @@ private final DeleteEventsResponse deleteEvents(DeleteEventsRequest request) { * DeleteEventsRequest request = DeleteEventsRequest.newBuilder() * .setProjectNameWithProjectName(projectName) * .build(); - * ListenableFuture<DeleteEventsResponse> future = errorStatsServiceClient.deleteEventsCallable().futureCall(request); + * RpcFuture<DeleteEventsResponse> future = errorStatsServiceClient.deleteEventsCallable().futureCall(request); * // Do something * DeleteEventsResponse response = future.get(); * } diff --git a/google-cloud-errorreporting/src/main/java/com/google/cloud/errorreporting/spi/v1beta1/ReportErrorsServiceClient.java b/google-cloud-errorreporting/src/main/java/com/google/cloud/errorreporting/spi/v1beta1/ReportErrorsServiceClient.java index 8189c716e86c..9d7fafe286ef 100644 --- a/google-cloud-errorreporting/src/main/java/com/google/cloud/errorreporting/spi/v1beta1/ReportErrorsServiceClient.java +++ b/google-cloud-errorreporting/src/main/java/com/google/cloud/errorreporting/spi/v1beta1/ReportErrorsServiceClient.java @@ -241,7 +241,7 @@ public final ReportErrorEventResponse reportErrorEvent(ReportErrorEventRequest r * .setProjectNameWithProjectName(projectName) * .setEvent(event) * .build(); - * ListenableFuture<ReportErrorEventResponse> future = reportErrorsServiceClient.reportErrorEventCallable().futureCall(request); + * RpcFuture<ReportErrorEventResponse> future = reportErrorsServiceClient.reportErrorEventCallable().futureCall(request); * // Do something * ReportErrorEventResponse response = future.get(); * } diff --git a/google-cloud-language/src/main/java/com/google/cloud/language/spi/v1/LanguageServiceClient.java b/google-cloud-language/src/main/java/com/google/cloud/language/spi/v1/LanguageServiceClient.java index 870869cee965..3064eba369ec 100644 --- a/google-cloud-language/src/main/java/com/google/cloud/language/spi/v1/LanguageServiceClient.java +++ b/google-cloud-language/src/main/java/com/google/cloud/language/spi/v1/LanguageServiceClient.java @@ -24,7 +24,6 @@ import com.google.cloud.language.v1.AnalyzeSyntaxRequest; import com.google.cloud.language.v1.AnalyzeSyntaxResponse; import com.google.cloud.language.v1.AnnotateTextRequest; -import com.google.cloud.language.v1.AnnotateTextRequest.Features; import com.google.cloud.language.v1.AnnotateTextResponse; import com.google.cloud.language.v1.Document; import com.google.cloud.language.v1.EncodingType; @@ -225,7 +224,7 @@ private final AnalyzeSentimentResponse analyzeSentiment(AnalyzeSentimentRequest * AnalyzeSentimentRequest request = AnalyzeSentimentRequest.newBuilder() * .setDocument(document) * .build(); - * ListenableFuture<AnalyzeSentimentResponse> future = languageServiceClient.analyzeSentimentCallable().futureCall(request); + * RpcFuture<AnalyzeSentimentResponse> future = languageServiceClient.analyzeSentimentCallable().futureCall(request); * // Do something * AnalyzeSentimentResponse response = future.get(); * } @@ -307,7 +306,7 @@ public final AnalyzeEntitiesResponse analyzeEntities(AnalyzeEntitiesRequest requ * .setDocument(document) * .setEncodingType(encodingType) * .build(); - * ListenableFuture<AnalyzeEntitiesResponse> future = languageServiceClient.analyzeEntitiesCallable().futureCall(request); + * RpcFuture<AnalyzeEntitiesResponse> future = languageServiceClient.analyzeEntitiesCallable().futureCall(request); * // Do something * AnalyzeEntitiesResponse response = future.get(); * } @@ -388,7 +387,7 @@ public final AnalyzeSyntaxResponse analyzeSyntax(AnalyzeSyntaxRequest request) { * .setDocument(document) * .setEncodingType(encodingType) * .build(); - * ListenableFuture<AnalyzeSyntaxResponse> future = languageServiceClient.analyzeSyntaxCallable().futureCall(request); + * RpcFuture<AnalyzeSyntaxResponse> future = languageServiceClient.analyzeSyntaxCallable().futureCall(request); * // Do something * AnalyzeSyntaxResponse response = future.get(); * } @@ -476,7 +475,7 @@ public final AnnotateTextResponse annotateText(AnnotateTextRequest request) { * .setFeatures(features) * .setEncodingType(encodingType) * .build(); - * ListenableFuture<AnnotateTextResponse> future = languageServiceClient.annotateTextCallable().futureCall(request); + * RpcFuture<AnnotateTextResponse> future = languageServiceClient.annotateTextCallable().futureCall(request); * // Do something * AnnotateTextResponse response = future.get(); * } diff --git a/google-cloud-logging/src/main/java/com/google/cloud/logging/spi/v2/ConfigServiceV2Client.java b/google-cloud-logging/src/main/java/com/google/cloud/logging/spi/v2/ConfigServiceV2Client.java index 6a930936adf8..b8e74eb4f962 100644 --- a/google-cloud-logging/src/main/java/com/google/cloud/logging/spi/v2/ConfigServiceV2Client.java +++ b/google-cloud-logging/src/main/java/com/google/cloud/logging/spi/v2/ConfigServiceV2Client.java @@ -231,7 +231,7 @@ public final ListSinksPagedResponse listSinks(ListSinksRequest request) { * ListSinksRequest request = ListSinksRequest.newBuilder() * .setParentWithParentNameOneof(parent) * .build(); - * ListenableFuture<ListSinksPagedResponse> future = configServiceV2Client.listSinksPagedCallable().futureCall(request); + * RpcFuture<ListSinksPagedResponse> future = configServiceV2Client.listSinksPagedCallable().futureCall(request); * // Do something * for (LogSink element : future.get().iterateAllElements()) { * // doThingsWith(element); @@ -335,7 +335,7 @@ private final LogSink getSink(GetSinkRequest request) { * GetSinkRequest request = GetSinkRequest.newBuilder() * .setSinkNameWithSinkNameOneof(sinkName) * .build(); - * ListenableFuture<LogSink> future = configServiceV2Client.getSinkCallable().futureCall(request); + * RpcFuture<LogSink> future = configServiceV2Client.getSinkCallable().futureCall(request); * // Do something * LogSink response = future.get(); * } @@ -421,7 +421,7 @@ public final LogSink createSink(CreateSinkRequest request) { * .setParentWithParentNameOneof(parent) * .setSink(sink) * .build(); - * ListenableFuture<LogSink> future = configServiceV2Client.createSinkCallable().futureCall(request); + * RpcFuture<LogSink> future = configServiceV2Client.createSinkCallable().futureCall(request); * // Do something * LogSink response = future.get(); * } @@ -515,7 +515,7 @@ public final LogSink updateSink(UpdateSinkRequest request) { * .setSinkNameWithSinkNameOneof(sinkName) * .setSink(sink) * .build(); - * ListenableFuture<LogSink> future = configServiceV2Client.updateSinkCallable().futureCall(request); + * RpcFuture<LogSink> future = configServiceV2Client.updateSinkCallable().futureCall(request); * // Do something * LogSink response = future.get(); * } @@ -591,7 +591,7 @@ private final void deleteSink(DeleteSinkRequest request) { * DeleteSinkRequest request = DeleteSinkRequest.newBuilder() * .setSinkNameWithSinkNameOneof(sinkName) * .build(); - * ListenableFuture<Void> future = configServiceV2Client.deleteSinkCallable().futureCall(request); + * RpcFuture<Void> future = configServiceV2Client.deleteSinkCallable().futureCall(request); * // Do something * future.get(); * } diff --git a/google-cloud-logging/src/main/java/com/google/cloud/logging/spi/v2/LoggingServiceV2Client.java b/google-cloud-logging/src/main/java/com/google/cloud/logging/spi/v2/LoggingServiceV2Client.java index 502d6b1d950e..bcebf5b2ea2c 100644 --- a/google-cloud-logging/src/main/java/com/google/cloud/logging/spi/v2/LoggingServiceV2Client.java +++ b/google-cloud-logging/src/main/java/com/google/cloud/logging/spi/v2/LoggingServiceV2Client.java @@ -256,7 +256,7 @@ private final void deleteLog(DeleteLogRequest request) { * DeleteLogRequest request = DeleteLogRequest.newBuilder() * .setLogNameWithLogNameOneof(logName) * .build(); - * ListenableFuture<Void> future = loggingServiceV2Client.deleteLogCallable().futureCall(request); + * RpcFuture<Void> future = loggingServiceV2Client.deleteLogCallable().futureCall(request); * // Do something * future.get(); * } @@ -356,7 +356,7 @@ public final WriteLogEntriesResponse writeLogEntries(WriteLogEntriesRequest requ * WriteLogEntriesRequest request = WriteLogEntriesRequest.newBuilder() * .addAllEntries(entries) * .build(); - * ListenableFuture<WriteLogEntriesResponse> future = loggingServiceV2Client.writeLogEntriesCallable().futureCall(request); + * RpcFuture<WriteLogEntriesResponse> future = loggingServiceV2Client.writeLogEntriesCallable().futureCall(request); * // Do something * WriteLogEntriesResponse response = future.get(); * } @@ -452,7 +452,7 @@ public final ListLogEntriesPagedResponse listLogEntries(ListLogEntriesRequest re * ListLogEntriesRequest request = ListLogEntriesRequest.newBuilder() * .addAllResourceNames(resourceNames) * .build(); - * ListenableFuture<ListLogEntriesPagedResponse> future = loggingServiceV2Client.listLogEntriesPagedCallable().futureCall(request); + * RpcFuture<ListLogEntriesPagedResponse> future = loggingServiceV2Client.listLogEntriesPagedCallable().futureCall(request); * // Do something * for (LogEntry element : future.get().iterateAllElements()) { * // doThingsWith(element); @@ -530,7 +530,7 @@ public final ListMonitoredResourceDescriptorsPagedResponse listMonitoredResource *

    * try (LoggingServiceV2Client loggingServiceV2Client = LoggingServiceV2Client.create()) {
    *   ListMonitoredResourceDescriptorsRequest request = ListMonitoredResourceDescriptorsRequest.newBuilder().build();
-   *   ListenableFuture<ListMonitoredResourceDescriptorsPagedResponse> future = loggingServiceV2Client.listMonitoredResourceDescriptorsPagedCallable().futureCall(request);
+   *   RpcFuture<ListMonitoredResourceDescriptorsPagedResponse> future = loggingServiceV2Client.listMonitoredResourceDescriptorsPagedCallable().futureCall(request);
    *   // Do something
    *   for (MonitoredResourceDescriptor element : future.get().iterateAllElements()) {
    *     // doThingsWith(element);
@@ -636,7 +636,7 @@ public final ListLogsPagedResponse listLogs(ListLogsRequest request) {
    *   ListLogsRequest request = ListLogsRequest.newBuilder()
    *     .setParentWithParentNameOneof(parent)
    *     .build();
-   *   ListenableFuture<ListLogsPagedResponse> future = loggingServiceV2Client.listLogsPagedCallable().futureCall(request);
+   *   RpcFuture<ListLogsPagedResponse> future = loggingServiceV2Client.listLogsPagedCallable().futureCall(request);
    *   // Do something
    *   for (String element : future.get().iterateAllElements()) {
    *     // doThingsWith(element);
diff --git a/google-cloud-logging/src/main/java/com/google/cloud/logging/spi/v2/MetricsServiceV2Client.java b/google-cloud-logging/src/main/java/com/google/cloud/logging/spi/v2/MetricsServiceV2Client.java
index f54d3c3b3de5..85c13df7fa34 100644
--- a/google-cloud-logging/src/main/java/com/google/cloud/logging/spi/v2/MetricsServiceV2Client.java
+++ b/google-cloud-logging/src/main/java/com/google/cloud/logging/spi/v2/MetricsServiceV2Client.java
@@ -233,7 +233,7 @@ public final ListLogMetricsPagedResponse listLogMetrics(ListLogMetricsRequest re
    *   ListLogMetricsRequest request = ListLogMetricsRequest.newBuilder()
    *     .setParentWithParentNameOneof(parent)
    *     .build();
-   *   ListenableFuture<ListLogMetricsPagedResponse> future = metricsServiceV2Client.listLogMetricsPagedCallable().futureCall(request);
+   *   RpcFuture<ListLogMetricsPagedResponse> future = metricsServiceV2Client.listLogMetricsPagedCallable().futureCall(request);
    *   // Do something
    *   for (LogMetric element : future.get().iterateAllElements()) {
    *     // doThingsWith(element);
@@ -337,7 +337,7 @@ private final LogMetric getLogMetric(GetLogMetricRequest request) {
    *   GetLogMetricRequest request = GetLogMetricRequest.newBuilder()
    *     .setMetricNameWithMetricNameOneof(metricName)
    *     .build();
-   *   ListenableFuture<LogMetric> future = metricsServiceV2Client.getLogMetricCallable().futureCall(request);
+   *   RpcFuture<LogMetric> future = metricsServiceV2Client.getLogMetricCallable().futureCall(request);
    *   // Do something
    *   LogMetric response = future.get();
    * }
@@ -416,7 +416,7 @@ public final LogMetric createLogMetric(CreateLogMetricRequest request) {
    *     .setParentWithParentNameOneof(parent)
    *     .setMetric(metric)
    *     .build();
-   *   ListenableFuture<LogMetric> future = metricsServiceV2Client.createLogMetricCallable().futureCall(request);
+   *   RpcFuture<LogMetric> future = metricsServiceV2Client.createLogMetricCallable().futureCall(request);
    *   // Do something
    *   LogMetric response = future.get();
    * }
@@ -497,7 +497,7 @@ public final LogMetric updateLogMetric(UpdateLogMetricRequest request) {
    *     .setMetricNameWithMetricNameOneof(metricName)
    *     .setMetric(metric)
    *     .build();
-   *   ListenableFuture<LogMetric> future = metricsServiceV2Client.updateLogMetricCallable().futureCall(request);
+   *   RpcFuture<LogMetric> future = metricsServiceV2Client.updateLogMetricCallable().futureCall(request);
    *   // Do something
    *   LogMetric response = future.get();
    * }
@@ -566,7 +566,7 @@ private final void deleteLogMetric(DeleteLogMetricRequest request) {
    *   DeleteLogMetricRequest request = DeleteLogMetricRequest.newBuilder()
    *     .setMetricNameWithMetricNameOneof(metricName)
    *     .build();
-   *   ListenableFuture<Void> future = metricsServiceV2Client.deleteLogMetricCallable().futureCall(request);
+   *   RpcFuture<Void> future = metricsServiceV2Client.deleteLogMetricCallable().futureCall(request);
    *   // Do something
    *   future.get();
    * }
diff --git a/google-cloud-monitoring/src/main/java/com/google/cloud/monitoring/spi/v3/GroupServiceClient.java b/google-cloud-monitoring/src/main/java/com/google/cloud/monitoring/spi/v3/GroupServiceClient.java
index c927f6dc3ee2..8f4a2750e503 100644
--- a/google-cloud-monitoring/src/main/java/com/google/cloud/monitoring/spi/v3/GroupServiceClient.java
+++ b/google-cloud-monitoring/src/main/java/com/google/cloud/monitoring/spi/v3/GroupServiceClient.java
@@ -226,7 +226,7 @@ public final ListGroupsPagedResponse listGroups(ListGroupsRequest request) {
    *   ListGroupsRequest request = ListGroupsRequest.newBuilder()
    *     .setNameWithProjectName(name)
    *     .build();
-   *   ListenableFuture<ListGroupsPagedResponse> future = groupServiceClient.listGroupsPagedCallable().futureCall(request);
+   *   RpcFuture<ListGroupsPagedResponse> future = groupServiceClient.listGroupsPagedCallable().futureCall(request);
    *   // Do something
    *   for (Group element : future.get().iterateAllElements()) {
    *     // doThingsWith(element);
@@ -327,7 +327,7 @@ private final Group getGroup(GetGroupRequest request) {
    *   GetGroupRequest request = GetGroupRequest.newBuilder()
    *     .setNameWithGroupName(name)
    *     .build();
-   *   ListenableFuture<Group> future = groupServiceClient.getGroupCallable().futureCall(request);
+   *   RpcFuture<Group> future = groupServiceClient.getGroupCallable().futureCall(request);
    *   // Do something
    *   Group response = future.get();
    * }
@@ -403,7 +403,7 @@ public final Group createGroup(CreateGroupRequest request) {
    *     .setNameWithProjectName(name)
    *     .setGroup(group)
    *     .build();
-   *   ListenableFuture<Group> future = groupServiceClient.createGroupCallable().futureCall(request);
+   *   RpcFuture<Group> future = groupServiceClient.createGroupCallable().futureCall(request);
    *   // Do something
    *   Group response = future.get();
    * }
@@ -471,7 +471,7 @@ public final Group updateGroup(UpdateGroupRequest request) {
    *   UpdateGroupRequest request = UpdateGroupRequest.newBuilder()
    *     .setGroup(group)
    *     .build();
-   *   ListenableFuture<Group> future = groupServiceClient.updateGroupCallable().futureCall(request);
+   *   RpcFuture<Group> future = groupServiceClient.updateGroupCallable().futureCall(request);
    *   // Do something
    *   Group response = future.get();
    * }
@@ -539,7 +539,7 @@ private final void deleteGroup(DeleteGroupRequest request) {
    *   DeleteGroupRequest request = DeleteGroupRequest.newBuilder()
    *     .setNameWithGroupName(name)
    *     .build();
-   *   ListenableFuture<Void> future = groupServiceClient.deleteGroupCallable().futureCall(request);
+   *   RpcFuture<Void> future = groupServiceClient.deleteGroupCallable().futureCall(request);
    *   // Do something
    *   future.get();
    * }
@@ -611,7 +611,7 @@ public final ListGroupMembersPagedResponse listGroupMembers(ListGroupMembersRequ
    *   ListGroupMembersRequest request = ListGroupMembersRequest.newBuilder()
    *     .setNameWithGroupName(name)
    *     .build();
-   *   ListenableFuture<ListGroupMembersPagedResponse> future = groupServiceClient.listGroupMembersPagedCallable().futureCall(request);
+   *   RpcFuture<ListGroupMembersPagedResponse> future = groupServiceClient.listGroupMembersPagedCallable().futureCall(request);
    *   // Do something
    *   for (MonitoredResource element : future.get().iterateAllElements()) {
    *     // doThingsWith(element);
diff --git a/google-cloud-monitoring/src/main/java/com/google/cloud/monitoring/spi/v3/MetricServiceClient.java b/google-cloud-monitoring/src/main/java/com/google/cloud/monitoring/spi/v3/MetricServiceClient.java
index 7dbe8909633d..b1f5dba7b6ab 100644
--- a/google-cloud-monitoring/src/main/java/com/google/cloud/monitoring/spi/v3/MetricServiceClient.java
+++ b/google-cloud-monitoring/src/main/java/com/google/cloud/monitoring/spi/v3/MetricServiceClient.java
@@ -33,7 +33,6 @@
 import com.google.monitoring.v3.ListMonitoredResourceDescriptorsRequest;
 import com.google.monitoring.v3.ListMonitoredResourceDescriptorsResponse;
 import com.google.monitoring.v3.ListTimeSeriesRequest;
-import com.google.monitoring.v3.ListTimeSeriesRequest.TimeSeriesView;
 import com.google.monitoring.v3.ListTimeSeriesResponse;
 import com.google.monitoring.v3.MetricDescriptorName;
 import com.google.monitoring.v3.MonitoredResourceDescriptorName;
@@ -281,7 +280,7 @@ public final ListMonitoredResourceDescriptorsPagedResponse listMonitoredResource
    *   ListMonitoredResourceDescriptorsRequest request = ListMonitoredResourceDescriptorsRequest.newBuilder()
    *     .setNameWithProjectName(name)
    *     .build();
-   *   ListenableFuture<ListMonitoredResourceDescriptorsPagedResponse> future = metricServiceClient.listMonitoredResourceDescriptorsPagedCallable().futureCall(request);
+   *   RpcFuture<ListMonitoredResourceDescriptorsPagedResponse> future = metricServiceClient.listMonitoredResourceDescriptorsPagedCallable().futureCall(request);
    *   // Do something
    *   for (MonitoredResourceDescriptor element : future.get().iterateAllElements()) {
    *     // doThingsWith(element);
@@ -396,7 +395,7 @@ private final MonitoredResourceDescriptor getMonitoredResourceDescriptor(
    *   GetMonitoredResourceDescriptorRequest request = GetMonitoredResourceDescriptorRequest.newBuilder()
    *     .setNameWithMonitoredResourceDescriptorName(name)
    *     .build();
-   *   ListenableFuture<MonitoredResourceDescriptor> future = metricServiceClient.getMonitoredResourceDescriptorCallable().futureCall(request);
+   *   RpcFuture<MonitoredResourceDescriptor> future = metricServiceClient.getMonitoredResourceDescriptorCallable().futureCall(request);
    *   // Do something
    *   MonitoredResourceDescriptor response = future.get();
    * }
@@ -473,7 +472,7 @@ public final ListMetricDescriptorsPagedResponse listMetricDescriptors(
    *   ListMetricDescriptorsRequest request = ListMetricDescriptorsRequest.newBuilder()
    *     .setNameWithProjectName(name)
    *     .build();
-   *   ListenableFuture<ListMetricDescriptorsPagedResponse> future = metricServiceClient.listMetricDescriptorsPagedCallable().futureCall(request);
+   *   RpcFuture<ListMetricDescriptorsPagedResponse> future = metricServiceClient.listMetricDescriptorsPagedCallable().futureCall(request);
    *   // Do something
    *   for (MetricDescriptor element : future.get().iterateAllElements()) {
    *     // doThingsWith(element);
@@ -579,7 +578,7 @@ private final MetricDescriptor getMetricDescriptor(GetMetricDescriptorRequest re
    *   GetMetricDescriptorRequest request = GetMetricDescriptorRequest.newBuilder()
    *     .setNameWithMetricDescriptorName(name)
    *     .build();
-   *   ListenableFuture<MetricDescriptor> future = metricServiceClient.getMetricDescriptorCallable().futureCall(request);
+   *   RpcFuture<MetricDescriptor> future = metricServiceClient.getMetricDescriptorCallable().futureCall(request);
    *   // Do something
    *   MetricDescriptor response = future.get();
    * }
@@ -662,7 +661,7 @@ public final MetricDescriptor createMetricDescriptor(CreateMetricDescriptorReque
    *     .setNameWithProjectName(name)
    *     .setMetricDescriptor(metricDescriptor)
    *     .build();
-   *   ListenableFuture<MetricDescriptor> future = metricServiceClient.createMetricDescriptorCallable().futureCall(request);
+   *   RpcFuture<MetricDescriptor> future = metricServiceClient.createMetricDescriptorCallable().futureCall(request);
    *   // Do something
    *   MetricDescriptor response = future.get();
    * }
@@ -736,7 +735,7 @@ private final void deleteMetricDescriptor(DeleteMetricDescriptorRequest request)
    *   DeleteMetricDescriptorRequest request = DeleteMetricDescriptorRequest.newBuilder()
    *     .setNameWithMetricDescriptorName(name)
    *     .build();
-   *   ListenableFuture<Void> future = metricServiceClient.deleteMetricDescriptorCallable().futureCall(request);
+   *   RpcFuture<Void> future = metricServiceClient.deleteMetricDescriptorCallable().futureCall(request);
    *   // Do something
    *   future.get();
    * }
@@ -841,7 +840,7 @@ public final ListTimeSeriesPagedResponse listTimeSeries(ListTimeSeriesRequest re
    *     .setInterval(interval)
    *     .setView(view)
    *     .build();
-   *   ListenableFuture<ListTimeSeriesPagedResponse> future = metricServiceClient.listTimeSeriesPagedCallable().futureCall(request);
+   *   RpcFuture<ListTimeSeriesPagedResponse> future = metricServiceClient.listTimeSeriesPagedCallable().futureCall(request);
    *   // Do something
    *   for (TimeSeries element : future.get().iterateAllElements()) {
    *     // doThingsWith(element);
@@ -969,7 +968,7 @@ public final void createTimeSeries(CreateTimeSeriesRequest request) {
    *     .setNameWithProjectName(name)
    *     .addAllTimeSeries(timeSeries)
    *     .build();
-   *   ListenableFuture<Void> future = metricServiceClient.createTimeSeriesCallable().futureCall(request);
+   *   RpcFuture<Void> future = metricServiceClient.createTimeSeriesCallable().futureCall(request);
    *   // Do something
    *   future.get();
    * }
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java
index 62b338618499..ad53b3accf99 100644
--- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java
@@ -281,12 +281,23 @@ public void processReceivedMessages(List r
       final PubsubMessage message = userMessage.getMessage();
       final AckHandler ackHandler = acksIterator.next();
       final SettableFuture response = SettableFuture.create();
+      final MessageReceiver.AckReplyConsumer consumer =
+          new MessageReceiver.AckReplyConsumer() {
+            @Override
+            public void accept(AckReply reply, Throwable t) {
+              if (reply != null) {
+                response.set(reply);
+              } else {
+                response.setException(t);
+              }
+            }
+          };
       Futures.addCallback(response, ackHandler);
       executor.submit(
           new Runnable() {
             @Override
             public void run() {
-              receiver.receiveMessage(message, response);
+              receiver.receiveMessage(message, consumer);
             }
           });
     }
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageReceiver.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageReceiver.java
index abafebf22e48..7f0eb9dd9a19 100644
--- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageReceiver.java
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageReceiver.java
@@ -16,7 +16,6 @@
 
 package com.google.cloud.pubsub.spi.v1;
 
-import com.google.common.util.concurrent.SettableFuture;
 import com.google.pubsub.v1.PubsubMessage;
 
 /** This interface can be implemented by users of {@link Subscriber} to receive messages. */
@@ -36,8 +35,18 @@ enum AckReply {
   }
 
   /**
-   * Called when a message is received by the subscriber. The implementation must arrange for {@code
-   * reponse} to be set after processing the {@code message}.
+   * Accepts a reply, sending it to the service.
+   *
+   * 

Both the interface and its method is named after the Java 8's {@code BiConsumer} interface + * to ease migration when we finally move. */ - void receiveMessage(final PubsubMessage message, final SettableFuture response); + interface AckReplyConsumer { + void accept(AckReply ackReply, Throwable t); + } + + /** + * Called when a message is received by the subscriber. The implementation must arrange for {@link + * AckReplyConsumer#accept} to be called after processing the {@code message}. + */ + void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer); } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java index b0175e9467eb..7a002da09754 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java @@ -16,17 +16,22 @@ package com.google.cloud.pubsub.spi.v1; +import com.google.api.gax.core.Function; import com.google.api.gax.core.RetrySettings; +import com.google.api.gax.core.RpcFuture; +import com.google.api.gax.core.RpcFutureCallback; import com.google.api.gax.grpc.BundlingSettings; import com.google.api.gax.grpc.ChannelProvider; import com.google.api.gax.grpc.ExecutorProvider; import com.google.api.gax.grpc.FlowControlSettings; import com.google.api.gax.grpc.FlowController; import com.google.api.gax.grpc.InstantiatingExecutorProvider; +import com.google.api.gax.grpc.RpcFutures; import com.google.auth.oauth2.GoogleCredentials; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.ForwardingListenableFuture.SimpleForwardingListenableFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -80,7 +85,7 @@ * Publisher.newBuilder(MY_TOPIC) * .setMaxBundleDuration(new Duration(10 * 1000)) * .build(); - * List<ListenableFuture<String>> results = new ArrayList<>(); + * List<RpcFuture<String>> results = new ArrayList<>(); * * for (PubsubMessage messages : messagesToPublish) { * results.add(publisher.publish(message)); @@ -206,7 +211,7 @@ public TopicName getTopicName() { * @param message the message to publish. * @return the message ID wrapped in a future. */ - public ListenableFuture publish(PubsubMessage message) { + public RpcFuture publish(PubsubMessage message) { if (shutdown.get()) { throw new IllegalStateException("Cannot publish on a shut-down publisher."); } @@ -215,7 +220,7 @@ public ListenableFuture publish(PubsubMessage message) { try { flowController.reserve(1, messageSize); } catch (FlowController.FlowControlException e) { - return Futures.immediateFailedFuture(e); + return RpcFutures.immediateFailedFuture(e); } OutstandingBundle bundleToSend = null; SettableFuture publishResult = SettableFuture.create(); @@ -287,7 +292,44 @@ public void run() { }); } - return publishResult; + return new ListenableFutureDelegate(publishResult); + } + + private static class ListenableFutureDelegate extends SimpleForwardingListenableFuture + implements RpcFuture { + ListenableFutureDelegate(ListenableFuture delegate) { + super(delegate); + } + + public void addCallback(final RpcFutureCallback callback) { + Futures.addCallback( + this, + new FutureCallback() { + @Override + public void onFailure(Throwable t) { + callback.onFailure(t); + } + + @Override + public void onSuccess(V v) { + callback.onSuccess(v); + } + }); + } + + public RpcFuture catching( + Class exceptionType, final Function callback) { + return new ListenableFutureDelegate( + Futures.catching( + this, + exceptionType, + new com.google.common.base.Function() { + @Override + public V apply(X input) { + return callback.apply(input); + } + })); + } } private void setupDurationBasedPublishAlarm() { diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PublisherClient.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PublisherClient.java index e159bf016dc3..75b8f3a08ecd 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PublisherClient.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PublisherClient.java @@ -265,7 +265,7 @@ private final Topic createTopic(Topic request) { * Topic request = Topic.newBuilder() * .setNameWithTopicName(name) * .build(); - * ListenableFuture<Topic> future = publisherClient.createTopicCallable().futureCall(request); + * RpcFuture<Topic> future = publisherClient.createTopicCallable().futureCall(request); * // Do something * Topic response = future.get(); * } @@ -358,7 +358,7 @@ public final PublishResponse publish(PublishRequest request) { * .setTopicWithTopicName(topic) * .addAllMessages(messages) * .build(); - * ListenableFuture<PublishResponse> future = publisherClient.publishCallable().futureCall(request); + * RpcFuture<PublishResponse> future = publisherClient.publishCallable().futureCall(request); * // Do something * PublishResponse response = future.get(); * } @@ -425,7 +425,7 @@ private final Topic getTopic(GetTopicRequest request) { * GetTopicRequest request = GetTopicRequest.newBuilder() * .setTopicWithTopicName(topic) * .build(); - * ListenableFuture<Topic> future = publisherClient.getTopicCallable().futureCall(request); + * RpcFuture<Topic> future = publisherClient.getTopicCallable().futureCall(request); * // Do something * Topic response = future.get(); * } @@ -497,7 +497,7 @@ public final ListTopicsPagedResponse listTopics(ListTopicsRequest request) { * ListTopicsRequest request = ListTopicsRequest.newBuilder() * .setProjectWithProjectName(project) * .build(); - * ListenableFuture<ListTopicsPagedResponse> future = publisherClient.listTopicsPagedCallable().futureCall(request); + * RpcFuture<ListTopicsPagedResponse> future = publisherClient.listTopicsPagedCallable().futureCall(request); * // Do something * for (Topic element : future.get().iterateAllElements()) { * // doThingsWith(element); @@ -603,7 +603,7 @@ public final ListTopicSubscriptionsPagedResponse listTopicSubscriptions( * ListTopicSubscriptionsRequest request = ListTopicSubscriptionsRequest.newBuilder() * .setTopicWithTopicName(topic) * .build(); - * ListenableFuture<ListTopicSubscriptionsPagedResponse> future = publisherClient.listTopicSubscriptionsPagedCallable().futureCall(request); + * RpcFuture<ListTopicSubscriptionsPagedResponse> future = publisherClient.listTopicSubscriptionsPagedCallable().futureCall(request); * // Do something * for (SubscriptionName element : future.get().iterateAllAsSubscriptionName()) { * // doThingsWith(element); @@ -715,7 +715,7 @@ private final void deleteTopic(DeleteTopicRequest request) { * DeleteTopicRequest request = DeleteTopicRequest.newBuilder() * .setTopicWithTopicName(topic) * .build(); - * ListenableFuture<Void> future = publisherClient.deleteTopicCallable().futureCall(request); + * RpcFuture<Void> future = publisherClient.deleteTopicCallable().futureCall(request); * // Do something * future.get(); * } @@ -793,7 +793,7 @@ public final Policy setIamPolicy(SetIamPolicyRequest request) { * .setResource(formattedResource) * .setPolicy(policy) * .build(); - * ListenableFuture<Policy> future = publisherClient.setIamPolicyCallable().futureCall(request); + * RpcFuture<Policy> future = publisherClient.setIamPolicyCallable().futureCall(request); * // Do something * Policy response = future.get(); * } @@ -865,7 +865,7 @@ private final Policy getIamPolicy(GetIamPolicyRequest request) { * GetIamPolicyRequest request = GetIamPolicyRequest.newBuilder() * .setResource(formattedResource) * .build(); - * ListenableFuture<Policy> future = publisherClient.getIamPolicyCallable().futureCall(request); + * RpcFuture<Policy> future = publisherClient.getIamPolicyCallable().futureCall(request); * // Do something * Policy response = future.get(); * } @@ -950,7 +950,7 @@ public final TestIamPermissionsResponse testIamPermissions(TestIamPermissionsReq * .setResource(formattedResource) * .addAllPermissions(permissions) * .build(); - * ListenableFuture<TestIamPermissionsResponse> future = publisherClient.testIamPermissionsCallable().futureCall(request); + * RpcFuture<TestIamPermissionsResponse> future = publisherClient.testIamPermissionsCallable().futureCall(request); * // Do something * TestIamPermissionsResponse response = future.get(); * } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/SubscriberClient.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/SubscriberClient.java index 53cf5d9d2f04..251885c603ea 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/SubscriberClient.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/SubscriberClient.java @@ -328,7 +328,7 @@ public final Subscription createSubscription(Subscription request) { * .setNameWithSubscriptionName(name) * .setTopicWithTopicName(topic) * .build(); - * ListenableFuture<Subscription> future = subscriberClient.createSubscriptionCallable().futureCall(request); + * RpcFuture<Subscription> future = subscriberClient.createSubscriptionCallable().futureCall(request); * // Do something * Subscription response = future.get(); * } @@ -399,7 +399,7 @@ private final Subscription getSubscription(GetSubscriptionRequest request) { * GetSubscriptionRequest request = GetSubscriptionRequest.newBuilder() * .setSubscriptionWithSubscriptionName(subscription) * .build(); - * ListenableFuture<Subscription> future = subscriberClient.getSubscriptionCallable().futureCall(request); + * RpcFuture<Subscription> future = subscriberClient.getSubscriptionCallable().futureCall(request); * // Do something * Subscription response = future.get(); * } @@ -471,7 +471,7 @@ public final ListSubscriptionsPagedResponse listSubscriptions(ListSubscriptionsR * ListSubscriptionsRequest request = ListSubscriptionsRequest.newBuilder() * .setProjectWithProjectName(project) * .build(); - * ListenableFuture<ListSubscriptionsPagedResponse> future = subscriberClient.listSubscriptionsPagedCallable().futureCall(request); + * RpcFuture<ListSubscriptionsPagedResponse> future = subscriberClient.listSubscriptionsPagedCallable().futureCall(request); * // Do something * for (Subscription element : future.get().iterateAllElements()) { * // doThingsWith(element); @@ -586,7 +586,7 @@ private final void deleteSubscription(DeleteSubscriptionRequest request) { * DeleteSubscriptionRequest request = DeleteSubscriptionRequest.newBuilder() * .setSubscriptionWithSubscriptionName(subscription) * .build(); - * ListenableFuture<Void> future = subscriberClient.deleteSubscriptionCallable().futureCall(request); + * RpcFuture<Void> future = subscriberClient.deleteSubscriptionCallable().futureCall(request); * // Do something * future.get(); * } @@ -685,7 +685,7 @@ public final void modifyAckDeadline(ModifyAckDeadlineRequest request) { * .addAllAckIds(ackIds) * .setAckDeadlineSeconds(ackDeadlineSeconds) * .build(); - * ListenableFuture<Void> future = subscriberClient.modifyAckDeadlineCallable().futureCall(request); + * RpcFuture<Void> future = subscriberClient.modifyAckDeadlineCallable().futureCall(request); * // Do something * future.get(); * } @@ -776,7 +776,7 @@ public final void acknowledge(AcknowledgeRequest request) { * .setSubscriptionWithSubscriptionName(subscription) * .addAllAckIds(ackIds) * .build(); - * ListenableFuture<Void> future = subscriberClient.acknowledgeCallable().futureCall(request); + * RpcFuture<Void> future = subscriberClient.acknowledgeCallable().futureCall(request); * // Do something * future.get(); * } @@ -869,7 +869,7 @@ public final PullResponse pull(PullRequest request) { * .setSubscriptionWithSubscriptionName(subscription) * .setMaxMessages(maxMessages) * .build(); - * ListenableFuture<PullResponse> future = subscriberClient.pullCallable().futureCall(request); + * RpcFuture<PullResponse> future = subscriberClient.pullCallable().futureCall(request); * // Do something * PullResponse response = future.get(); * } @@ -1018,7 +1018,7 @@ public final void modifyPushConfig(ModifyPushConfigRequest request) { * .setSubscriptionWithSubscriptionName(subscription) * .setPushConfig(pushConfig) * .build(); - * ListenableFuture<Void> future = subscriberClient.modifyPushConfigCallable().futureCall(request); + * RpcFuture<Void> future = subscriberClient.modifyPushConfigCallable().futureCall(request); * // Do something * future.get(); * } @@ -1096,7 +1096,7 @@ public final Policy setIamPolicy(SetIamPolicyRequest request) { * .setResource(formattedResource) * .setPolicy(policy) * .build(); - * ListenableFuture<Policy> future = subscriberClient.setIamPolicyCallable().futureCall(request); + * RpcFuture<Policy> future = subscriberClient.setIamPolicyCallable().futureCall(request); * // Do something * Policy response = future.get(); * } @@ -1168,7 +1168,7 @@ private final Policy getIamPolicy(GetIamPolicyRequest request) { * GetIamPolicyRequest request = GetIamPolicyRequest.newBuilder() * .setResource(formattedResource) * .build(); - * ListenableFuture<Policy> future = subscriberClient.getIamPolicyCallable().futureCall(request); + * RpcFuture<Policy> future = subscriberClient.getIamPolicyCallable().futureCall(request); * // Do something * Policy response = future.get(); * } @@ -1253,7 +1253,7 @@ public final TestIamPermissionsResponse testIamPermissions(TestIamPermissionsReq * .setResource(formattedResource) * .addAllPermissions(permissions) * .build(); - * ListenableFuture<TestIamPermissionsResponse> future = subscriberClient.testIamPermissionsCallable().futureCall(request); + * RpcFuture<TestIamPermissionsResponse> future = subscriberClient.testIamPermissionsCallable().futureCall(request); * // Do something * TestIamPermissionsResponse response = future.get(); * } diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java index b01e7a4a3a6d..d078c496f475 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java @@ -23,6 +23,7 @@ import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.times; +import com.google.api.gax.core.RpcFuture; import com.google.api.gax.grpc.BundlingSettings; import com.google.api.gax.grpc.ChannelProvider; import com.google.api.gax.grpc.ExecutorProvider; @@ -30,7 +31,6 @@ import com.google.api.gax.grpc.FlowControlSettings; import com.google.api.gax.grpc.InstantiatingExecutorProvider; import com.google.cloud.pubsub.spi.v1.Publisher.Builder; -import com.google.common.util.concurrent.ListenableFuture; import com.google.protobuf.ByteString; import com.google.pubsub.v1.PublishRequest; import com.google.pubsub.v1.PublishResponse; @@ -136,8 +136,8 @@ public void testPublishByDuration() throws Exception { testPublisherServiceImpl.addPublishResponse( PublishResponse.newBuilder().addMessageIds("1").addMessageIds("2")); - ListenableFuture publishFuture1 = sendTestMessage(publisher, "A"); - ListenableFuture publishFuture2 = sendTestMessage(publisher, "B"); + RpcFuture publishFuture1 = sendTestMessage(publisher, "A"); + RpcFuture publishFuture2 = sendTestMessage(publisher, "B"); assertFalse(publishFuture1.isDone()); assertFalse(publishFuture2.isDone()); @@ -169,9 +169,9 @@ public void testPublishByNumBundledMessages() throws Exception { .addPublishResponse(PublishResponse.newBuilder().addMessageIds("1").addMessageIds("2")) .addPublishResponse(PublishResponse.newBuilder().addMessageIds("3").addMessageIds("4")); - ListenableFuture publishFuture1 = sendTestMessage(publisher, "A"); - ListenableFuture publishFuture2 = sendTestMessage(publisher, "B"); - ListenableFuture publishFuture3 = sendTestMessage(publisher, "C"); + RpcFuture publishFuture1 = sendTestMessage(publisher, "A"); + RpcFuture publishFuture2 = sendTestMessage(publisher, "B"); + RpcFuture publishFuture3 = sendTestMessage(publisher, "C"); // Note we are not advancing time but message should still get published @@ -180,7 +180,7 @@ public void testPublishByNumBundledMessages() throws Exception { assertFalse(publishFuture3.isDone()); - ListenableFuture publishFuture4 = + RpcFuture publishFuture4 = publisher.publish(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("D")).build()); assertEquals("3", publishFuture3.get()); @@ -209,9 +209,9 @@ public void testSinglePublishByNumBytes() throws Exception { .addPublishResponse(PublishResponse.newBuilder().addMessageIds("1").addMessageIds("2")) .addPublishResponse(PublishResponse.newBuilder().addMessageIds("3").addMessageIds("4")); - ListenableFuture publishFuture1 = sendTestMessage(publisher, "A"); - ListenableFuture publishFuture2 = sendTestMessage(publisher, "B"); - ListenableFuture publishFuture3 = sendTestMessage(publisher, "C"); + RpcFuture publishFuture1 = sendTestMessage(publisher, "A"); + RpcFuture publishFuture2 = sendTestMessage(publisher, "B"); + RpcFuture publishFuture3 = sendTestMessage(publisher, "C"); // Note we are not advancing time but message should still get published @@ -219,7 +219,7 @@ public void testSinglePublishByNumBytes() throws Exception { assertEquals("2", publishFuture2.get()); assertFalse(publishFuture3.isDone()); - ListenableFuture publishFuture4 = sendTestMessage(publisher, "D"); + RpcFuture publishFuture4 = sendTestMessage(publisher, "D"); assertEquals("3", publishFuture3.get()); assertEquals("4", publishFuture4.get()); @@ -245,18 +245,18 @@ public void testPublishMixedSizeAndDuration() throws Exception { PublishResponse.newBuilder().addMessageIds("1").addMessageIds("2")); testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("3")); - ListenableFuture publishFuture1 = sendTestMessage(publisher, "A"); + RpcFuture publishFuture1 = sendTestMessage(publisher, "A"); fakeExecutor.advanceTime(Duration.standardSeconds(2)); assertFalse(publishFuture1.isDone()); - ListenableFuture publishFuture2 = sendTestMessage(publisher, "B"); + RpcFuture publishFuture2 = sendTestMessage(publisher, "B"); // Publishing triggered by bundle size assertEquals("1", publishFuture1.get()); assertEquals("2", publishFuture2.get()); - ListenableFuture publishFuture3 = sendTestMessage(publisher, "C"); + RpcFuture publishFuture3 = sendTestMessage(publisher, "C"); assertFalse(publishFuture3.isDone()); @@ -272,7 +272,7 @@ public void testPublishMixedSizeAndDuration() throws Exception { publisher.shutdown(); } - private ListenableFuture sendTestMessage(Publisher publisher, String data) { + private RpcFuture sendTestMessage(Publisher publisher, String data) { return publisher.publish( PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(data)).build()); } @@ -293,7 +293,7 @@ public void testPublishFailureRetries() throws Exception { testPublisherServiceImpl.addPublishError(new Throwable("Transiently failing")); testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("1")); - ListenableFuture publishFuture1 = sendTestMessage(publisher, "A"); + RpcFuture publishFuture1 = sendTestMessage(publisher, "A"); assertEquals("1", publishFuture1.get()); @@ -323,7 +323,7 @@ public void testPublishFailureRetries_exceededsRetryDuration() throws Exception for (int i = 0; i < 11; ++i) { testPublisherServiceImpl.addPublishError(new FakeException()); } - ListenableFuture publishFuture1 = sendTestMessage(publisher, "A"); + RpcFuture publishFuture1 = sendTestMessage(publisher, "A"); try { publishFuture1.get(); @@ -357,7 +357,7 @@ public void testPublishFailureRetries_nonRetryableFailsImmediately() throws Exce .build(); // To demonstrate that reaching duration will trigger publish testPublisherServiceImpl.addPublishError(new StatusException(Status.INVALID_ARGUMENT)); - ListenableFuture publishFuture1 = sendTestMessage(publisher, "A"); + RpcFuture publishFuture1 = sendTestMessage(publisher, "A"); try { publishFuture1.get(); diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java index 0e38968dc73d..943e646300df 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java @@ -28,7 +28,6 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; -import com.google.common.util.concurrent.SettableFuture; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.PullResponse; import com.google.pubsub.v1.ReceivedMessage; @@ -84,7 +83,7 @@ public static Collection data() { private TestReceiver testReceiver; static class TestReceiver implements MessageReceiver { - private final LinkedBlockingQueue> outstandingMessageReplies = + private final LinkedBlockingQueue outstandingMessageReplies = new LinkedBlockingQueue<>(); private AckReply ackReply = AckReply.ACK; private Optional messageCountLatch = Optional.absent(); @@ -114,17 +113,17 @@ void waitForExpectedMessages() throws InterruptedException { } @Override - public void receiveMessage(PubsubMessage message, SettableFuture response) { + public void receiveMessage(PubsubMessage message, MessageReceiver.AckReplyConsumer consumer) { if (explicitAckReplies) { try { - outstandingMessageReplies.put(response); + outstandingMessageReplies.put(consumer); } catch (InterruptedException e) { throw new IllegalStateException(e); } } else { - replyTo(response); + replyTo(consumer); } - + if (messageCountLatch.isPresent()) { messageCountLatch.get().countDown(); } @@ -141,17 +140,17 @@ public void replyNextOutstandingMessage() { public void replyAllOutstandingMessage() { Preconditions.checkState(explicitAckReplies); - SettableFuture reply; + MessageReceiver.AckReplyConsumer reply; while ((reply = outstandingMessageReplies.poll()) != null) { replyTo(reply); } } - private void replyTo(SettableFuture reply) { + private void replyTo(MessageReceiver.AckReplyConsumer reply) { if (error.isPresent()) { - reply.setException(error.get()); + reply.accept(null, error.get()); } else { - reply.set(ackReply); + reply.accept(ackReply, null); } } } diff --git a/google-cloud-speech/src/main/java/com/google/cloud/speech/spi/v1beta1/SpeechClient.java b/google-cloud-speech/src/main/java/com/google/cloud/speech/spi/v1beta1/SpeechClient.java index a652215d5180..2a64e33e085f 100644 --- a/google-cloud-speech/src/main/java/com/google/cloud/speech/spi/v1beta1/SpeechClient.java +++ b/google-cloud-speech/src/main/java/com/google/cloud/speech/spi/v1beta1/SpeechClient.java @@ -260,7 +260,7 @@ public final SyncRecognizeResponse syncRecognize(SyncRecognizeRequest request) { * .setConfig(config) * .setAudio(audio) * .build(); - * ListenableFuture<SyncRecognizeResponse> future = speechClient.syncRecognizeCallable().futureCall(request); + * RpcFuture<SyncRecognizeResponse> future = speechClient.syncRecognizeCallable().futureCall(request); * // Do something * SyncRecognizeResponse response = future.get(); * } @@ -370,7 +370,7 @@ public final OperationFuture asyncRecognizeAsync( * .setConfig(config) * .setAudio(audio) * .build(); - * ListenableFuture<Operation> future = speechClient.asyncRecognizeCallable().futureCall(request); + * RpcFuture<Operation> future = speechClient.asyncRecognizeCallable().futureCall(request); * // Do something * Operation response = future.get(); * } diff --git a/google-cloud-trace/src/main/java/com/google/cloud/trace/spi/v1/TraceServiceClient.java b/google-cloud-trace/src/main/java/com/google/cloud/trace/spi/v1/TraceServiceClient.java index 9329fa206f44..2a18efdd56df 100644 --- a/google-cloud-trace/src/main/java/com/google/cloud/trace/spi/v1/TraceServiceClient.java +++ b/google-cloud-trace/src/main/java/com/google/cloud/trace/spi/v1/TraceServiceClient.java @@ -237,7 +237,7 @@ public final void patchTraces(PatchTracesRequest request) { * .setProjectId(projectId) * .setTraces(traces) * .build(); - * ListenableFuture<Void> future = traceServiceClient.patchTracesCallable().futureCall(request); + * RpcFuture<Void> future = traceServiceClient.patchTracesCallable().futureCall(request); * // Do something * future.get(); * } @@ -311,7 +311,7 @@ private final Trace getTrace(GetTraceRequest request) { * .setProjectId(projectId) * .setTraceId(traceId) * .build(); - * ListenableFuture<Trace> future = traceServiceClient.getTraceCallable().futureCall(request); + * RpcFuture<Trace> future = traceServiceClient.getTraceCallable().futureCall(request); * // Do something * Trace response = future.get(); * } @@ -381,7 +381,7 @@ public final ListTracesPagedResponse listTraces(ListTracesRequest request) { * ListTracesRequest request = ListTracesRequest.newBuilder() * .setProjectId(projectId) * .build(); - * ListenableFuture<ListTracesPagedResponse> future = traceServiceClient.listTracesPagedCallable().futureCall(request); + * RpcFuture<ListTracesPagedResponse> future = traceServiceClient.listTracesPagedCallable().futureCall(request); * // Do something * for (Trace element : future.get().iterateAllElements()) { * // doThingsWith(element); diff --git a/google-cloud-vision/src/main/java/com/google/cloud/vision/spi/v1/ImageAnnotatorClient.java b/google-cloud-vision/src/main/java/com/google/cloud/vision/spi/v1/ImageAnnotatorClient.java index 73d5cd08fe95..7ebe53e6ba62 100644 --- a/google-cloud-vision/src/main/java/com/google/cloud/vision/spi/v1/ImageAnnotatorClient.java +++ b/google-cloud-vision/src/main/java/com/google/cloud/vision/spi/v1/ImageAnnotatorClient.java @@ -208,7 +208,7 @@ private final BatchAnnotateImagesResponse batchAnnotateImages( * BatchAnnotateImagesRequest request = BatchAnnotateImagesRequest.newBuilder() * .addAllRequests(requests) * .build(); - * ListenableFuture<BatchAnnotateImagesResponse> future = imageAnnotatorClient.batchAnnotateImagesCallable().futureCall(request); + * RpcFuture<BatchAnnotateImagesResponse> future = imageAnnotatorClient.batchAnnotateImagesCallable().futureCall(request); * // Do something * BatchAnnotateImagesResponse response = future.get(); * } From 15ded7e3dcd06ce91ac66b05fb3c2b6e70993e95 Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Thu, 9 Feb 2017 16:54:04 +1100 Subject: [PATCH 2/3] pr comment --- .../CreateSubscriptionAndPullMessages.java | 8 ++--- .../CreateTopicAndPublishMessages.java | 11 +++---- .../google/cloud/pubsub/spi/v1/AckReply.java | 31 +++++++++++++++++++ .../cloud/pubsub/spi/v1/AckReplyConsumer.java | 27 ++++++++++++++++ .../pubsub/spi/v1/MessageDispatcher.java | 5 ++- .../cloud/pubsub/spi/v1/MessageReceiver.java | 24 -------------- .../cloud/pubsub/spi/v1/Subscriber.java | 1 - .../pubsub/spi/v1/SubscriberImplTest.java | 9 +++--- pom.xml | 4 +-- 9 files changed, 75 insertions(+), 45 deletions(-) create mode 100644 google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/AckReply.java create mode 100644 google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/AckReplyConsumer.java diff --git a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndPullMessages.java b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndPullMessages.java index 31411e2b227c..7c013dd5a08f 100644 --- a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndPullMessages.java +++ b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndPullMessages.java @@ -16,11 +16,12 @@ package com.google.cloud.examples.pubsub.snippets; +import com.google.cloud.pubsub.spi.v1.AckReply; +import com.google.cloud.pubsub.spi.v1.AckReplyConsumer; import com.google.cloud.pubsub.spi.v1.MessageReceiver; import com.google.cloud.pubsub.spi.v1.Subscriber; import com.google.cloud.pubsub.spi.v1.SubscriberClient; import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.SettableFuture; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.PushConfig; import com.google.pubsub.v1.SubscriptionName; @@ -43,10 +44,9 @@ public static void main(String... args) throws Exception { MessageReceiver receiver = new MessageReceiver() { @Override - public void receiveMessage( - PubsubMessage message, SettableFuture response) { + public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) { System.out.println("got message: " + message.getData().toStringUtf8()); - response.set(MessageReceiver.AckReply.ACK); + consumer.accept(AckReply.ACK, null); } }; Subscriber subscriber = null; diff --git a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateTopicAndPublishMessages.java b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateTopicAndPublishMessages.java index 3dd75a6c91ff..af2e672490b1 100644 --- a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateTopicAndPublishMessages.java +++ b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateTopicAndPublishMessages.java @@ -16,10 +16,9 @@ package com.google.cloud.examples.pubsub.snippets; +import com.google.api.gax.core.RpcFuture; import com.google.cloud.pubsub.spi.v1.Publisher; import com.google.cloud.pubsub.spi.v1.PublisherClient; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; import com.google.protobuf.ByteString; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.TopicName; @@ -42,15 +41,15 @@ public static void main(String... args) throws Exception { try { publisher = Publisher.newBuilder(topic).build(); List messages = Arrays.asList("first message", "second message"); - List> messageIds = new ArrayList<>(); + List> messageIds = new ArrayList<>(); for (String message : messages) { ByteString data = ByteString.copyFromUtf8(message); PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build(); - ListenableFuture messageIdFuture = publisher.publish(pubsubMessage); + RpcFuture messageIdFuture = publisher.publish(pubsubMessage); messageIds.add(messageIdFuture); } - for (String messageId : Futures.allAsList(messageIds).get()) { - System.out.println("published with message ID: " + messageId); + for (RpcFuture messageId : messageIds) { + System.out.println("published with message ID: " + messageId.get()); } } finally { if (publisher != null) { diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/AckReply.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/AckReply.java new file mode 100644 index 000000000000..64d4505ae16f --- /dev/null +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/AckReply.java @@ -0,0 +1,31 @@ +/* + * Copyright 2017 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub.spi.v1; + +/** A reply to a Pubsub message, to be sent back to the service. */ +public enum AckReply { + /** + * Acknowledges that the message has been successfully processed. The service will not send the + * message again. + */ + ACK, + /** + * Signals that the message has not been successfully processed. The service will resend the + * message. + */ + NACK +} diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/AckReplyConsumer.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/AckReplyConsumer.java new file mode 100644 index 000000000000..223d4546d813 --- /dev/null +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/AckReplyConsumer.java @@ -0,0 +1,27 @@ +/* + * Copyright 2017 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub.spi.v1; + +/** + * Used by {@link MessageReceiver}, {@code AckReplyConsumer} accepts an {@link MessageReceiver.AckReply}, sending it to the service. + * + *

Both the interface and its method is named after the Java 8's {@code BiConsumer} interface + * to make migration to Java 8 and adopting its patterns easier. + */ +public interface AckReplyConsumer { + void accept(AckReply ackReply, Throwable t); +} diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java index c061c4aaaa1b..e14108946669 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java @@ -19,7 +19,6 @@ import com.google.api.gax.grpc.FlowController; import com.google.api.stats.Distribution; import com.google.cloud.Clock; -import com.google.cloud.pubsub.spi.v1.MessageReceiver.AckReply; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.primitives.Ints; @@ -281,8 +280,8 @@ public void processReceivedMessages(List r final PubsubMessage message = userMessage.getMessage(); final AckHandler ackHandler = acksIterator.next(); final SettableFuture response = SettableFuture.create(); - final MessageReceiver.AckReplyConsumer consumer = - new MessageReceiver.AckReplyConsumer() { + final AckReplyConsumer consumer = + new AckReplyConsumer() { @Override public void accept(AckReply reply, Throwable t) { if (reply != null) { diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageReceiver.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageReceiver.java index 7f0eb9dd9a19..52dd6ea8262b 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageReceiver.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageReceiver.java @@ -20,30 +20,6 @@ /** This interface can be implemented by users of {@link Subscriber} to receive messages. */ public interface MessageReceiver { - /** A reply to a message, to be sent back to the service. */ - enum AckReply { - /** - * Acknowledges that the message has been successfully processed. The service will not send the - * message again. - */ - ACK, - /** - * Signals that the message has not been successfully processed. The service will resend the - * message. - */ - NACK - } - - /** - * Accepts a reply, sending it to the service. - * - *

Both the interface and its method is named after the Java 8's {@code BiConsumer} interface - * to ease migration when we finally move. - */ - interface AckReplyConsumer { - void accept(AckReply ackReply, Throwable t); - } - /** * Called when a message is received by the subscriber. The implementation must arrange for {@link * AckReplyConsumer#accept} to be called after processing the {@code message}. diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java index 27fbbacad2e1..81239fa9a8fe 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java @@ -24,7 +24,6 @@ import com.google.auth.Credentials; import com.google.auth.oauth2.GoogleCredentials; import com.google.cloud.Clock; -import com.google.cloud.pubsub.spi.v1.MessageReceiver.AckReply; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java index 943e646300df..4ef2c1dc7c98 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java @@ -22,7 +22,6 @@ import com.google.api.gax.grpc.FixedExecutorProvider; import com.google.api.gax.grpc.InstantiatingExecutorProvider; import com.google.cloud.pubsub.spi.v1.FakeSubscriberServiceImpl.ModifyAckDeadline; -import com.google.cloud.pubsub.spi.v1.MessageReceiver.AckReply; import com.google.cloud.pubsub.spi.v1.Subscriber.Builder; import com.google.common.base.Function; import com.google.common.base.Optional; @@ -83,7 +82,7 @@ public static Collection data() { private TestReceiver testReceiver; static class TestReceiver implements MessageReceiver { - private final LinkedBlockingQueue outstandingMessageReplies = + private final LinkedBlockingQueue outstandingMessageReplies = new LinkedBlockingQueue<>(); private AckReply ackReply = AckReply.ACK; private Optional messageCountLatch = Optional.absent(); @@ -113,7 +112,7 @@ void waitForExpectedMessages() throws InterruptedException { } @Override - public void receiveMessage(PubsubMessage message, MessageReceiver.AckReplyConsumer consumer) { + public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) { if (explicitAckReplies) { try { outstandingMessageReplies.put(consumer); @@ -140,13 +139,13 @@ public void replyNextOutstandingMessage() { public void replyAllOutstandingMessage() { Preconditions.checkState(explicitAckReplies); - MessageReceiver.AckReplyConsumer reply; + AckReplyConsumer reply; while ((reply = outstandingMessageReplies.poll()) != null) { replyTo(reply); } } - private void replyTo(MessageReceiver.AckReplyConsumer reply) { + private void replyTo(AckReplyConsumer reply) { if (error.isPresent()) { reply.accept(null, error.get()); } else { diff --git a/pom.xml b/pom.xml index 2334310fdde1..df07c6f64743 100644 --- a/pom.xml +++ b/pom.xml @@ -92,7 +92,7 @@ github 0.6.0 1.0.3 - 0.0.28 + 0.0.29 0.1.5 0.8.4-alpha-SNAPSHOT 0.8.4-beta-SNAPSHOT @@ -177,7 +177,7 @@ [1.7,) - + From 900effea06f8e5d8b6dd641039429cdbbe2e8a60 Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Fri, 10 Feb 2017 10:01:54 +1100 Subject: [PATCH 3/3] pr comment --- .../java/com/google/cloud/pubsub/spi/v1/AckReplyConsumer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/AckReplyConsumer.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/AckReplyConsumer.java index 223d4546d813..952a24f5ce0d 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/AckReplyConsumer.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/AckReplyConsumer.java @@ -17,7 +17,7 @@ package com.google.cloud.pubsub.spi.v1; /** - * Used by {@link MessageReceiver}, {@code AckReplyConsumer} accepts an {@link MessageReceiver.AckReply}, sending it to the service. + * Accepts a reply, sending it to the service. * *

Both the interface and its method is named after the Java 8's {@code BiConsumer} interface * to make migration to Java 8 and adopting its patterns easier.