From eaf137e1499649ff1e7936357e50c93e7fef7bdd Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Mon, 22 Feb 2021 12:13:29 +0200 Subject: [PATCH 1/4] adds test that reproduces the ` reactor.core.Exceptions$OverflowException` Signed-off-by: Oleh Dokuka --- .../io/rsocket/core/RSocketRequesterTest.java | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java index 45770d375..a10f73141 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java @@ -1142,6 +1142,45 @@ public void testWorkaround858() { rule.assertHasNoLeaks(); } + @Test + // see https://github.com/rsocket/rsocket-java/issues/959 + public void testWorkaround959() { + for (int i = 1; i < 100000; i += 2) { + ByteBuf buffer = rule.alloc().buffer(); + buffer.writeCharSequence("test", CharsetUtil.UTF_8); + + final AssertSubscriber assertSubscriber = new AssertSubscriber<>(3); + rule.socket.requestStream(ByteBufPayload.create(buffer)).subscribe(assertSubscriber); + + final ByteBuf payloadFrame = + PayloadFrameCodec.encode( + rule.alloc(), i, false, false, true, Unpooled.EMPTY_BUFFER, Unpooled.EMPTY_BUFFER); + + RaceTestUtils.race( + () -> { + rule.connection.addToReceivedBuffer(payloadFrame.copy()); + rule.connection.addToReceivedBuffer(payloadFrame.copy()); + rule.connection.addToReceivedBuffer(payloadFrame); + }, + () -> { + assertSubscriber.request(1); + assertSubscriber.request(1); + assertSubscriber.request(1); + }); + + Assertions.assertThat(rule.connection.getSent()) + .allMatch(ByteBuf::release); + + Assertions.assertThat(rule.socket.isDisposed()).isFalse(); + + assertSubscriber.values().forEach(ReferenceCountUtil::safeRelease); + assertSubscriber.assertNoError(); + + rule.connection.clearSendReceiveBuffers(); + rule.assertHasNoLeaks(); + } + } + public static class ClientSocketRule extends AbstractSocketRule { @Override protected RSocketRequester newRSocket() { From 682fca339ea357f351399908d1cd7e8c7d35f859 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Mon, 22 Feb 2021 12:19:35 +0200 Subject: [PATCH 2/4] adds problem fix Signed-off-by: Oleh Dokuka --- rsocket-core/src/main/java/io/rsocket/core/RequestOperator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rsocket-core/src/main/java/io/rsocket/core/RequestOperator.java b/rsocket-core/src/main/java/io/rsocket/core/RequestOperator.java index 38c392408..0db4c767c 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RequestOperator.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RequestOperator.java @@ -93,7 +93,6 @@ public Context currentContext() { @Override public void request(long n) { - this.s.request(n); if (!firstRequest) { try { this.hookOnRemainingRequests(n); @@ -115,6 +114,7 @@ public void request(long n) { if (firstLoop) { firstLoop = false; try { + this.s.request(Long.MAX_VALUE); this.hookOnFirstRequest(n); } catch (Throwable throwable) { onError(throwable); From 48fef8b1f0440b1678a8aac59c7a4dcd7de03fca Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Mon, 22 Feb 2021 12:24:32 +0200 Subject: [PATCH 3/4] fixes googleJavaFormat Signed-off-by: Oleh Dokuka --- .../src/test/java/io/rsocket/core/RSocketRequesterTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java index a10f73141..74a2b22af 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java @@ -1168,8 +1168,7 @@ public void testWorkaround959() { assertSubscriber.request(1); }); - Assertions.assertThat(rule.connection.getSent()) - .allMatch(ByteBuf::release); + Assertions.assertThat(rule.connection.getSent()).allMatch(ByteBuf::release); Assertions.assertThat(rule.socket.isDisposed()).isFalse(); From bcab4eb2cc7edda0865d162b8bb6e14fb487f6b1 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Mon, 22 Feb 2021 16:40:11 +0200 Subject: [PATCH 4/4] more improvements Signed-off-by: Oleh Dokuka --- .../java/io/rsocket/core/RequestOperator.java | 8 ++++++++ .../io/rsocket/core/RSocketRequesterTest.java | 15 +++++++++++---- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/core/RequestOperator.java b/rsocket-core/src/main/java/io/rsocket/core/RequestOperator.java index 0db4c767c..09eeadb6c 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RequestOperator.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RequestOperator.java @@ -114,6 +114,14 @@ public void request(long n) { if (firstLoop) { firstLoop = false; try { + // since in all the scenarios where RequestOperator is used, the + // CorePublisher is either UnicastProcessor or UnicastProcessor.next() + // we are free to propagate unbounded demand to that publisher right after + // the first request happens. UnicastProcessor is only there to allow sending signals from + // the + // connection to a real subscriber and does not have to check the real demand + // For more info see + // https://github.com/rsocket/rsocket/blob/master/Protocol.md#handling-the-unexpected this.s.request(Long.MAX_VALUE); this.hookOnFirstRequest(n); } catch (Throwable throwable) { diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java index 74a2b22af..1ce68cfeb 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java @@ -1142,15 +1142,22 @@ public void testWorkaround858() { rule.assertHasNoLeaks(); } - @Test + @ParameterizedTest + @ValueSource(strings = {"stream", "channel"}) // see https://github.com/rsocket/rsocket-java/issues/959 - public void testWorkaround959() { - for (int i = 1; i < 100000; i += 2) { + public void testWorkaround959(String type) { + for (int i = 1; i < 20000; i += 2) { ByteBuf buffer = rule.alloc().buffer(); buffer.writeCharSequence("test", CharsetUtil.UTF_8); final AssertSubscriber assertSubscriber = new AssertSubscriber<>(3); - rule.socket.requestStream(ByteBufPayload.create(buffer)).subscribe(assertSubscriber); + if (type.equals("stream")) { + rule.socket.requestStream(ByteBufPayload.create(buffer)).subscribe(assertSubscriber); + } else if (type.equals("channel")) { + rule.socket + .requestChannel(Flux.just(ByteBufPayload.create(buffer))) + .subscribe(assertSubscriber); + } final ByteBuf payloadFrame = PayloadFrameCodec.encode(