Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

futureStream from Queue<T> does not fire on timeout. #1102

Open
DarekDan opened this issue Dec 30, 2020 · 2 comments
Open

futureStream from Queue<T> does not fire on timeout. #1102

DarekDan opened this issue Dec 30, 2020 · 2 comments

Comments

@DarekDan
Copy link

DarekDan commented Dec 30, 2020

Describe the bug
I have created a reproducible test case, and it seems that this old issue still persists:
https://stackoverflow.com/questions/42363084/using-cyclops-react-for-batching-on-a-async-queue-stream

To Reproduce
Steps to reproduce the behavior:
Test case attached in comment.

Expected behavior
The future should be run after 200ms.

@DarekDan
Copy link
Author

DarekDan commented Dec 30, 2020

Here is a test case:

  @Test
   public void isolatedQueueTest() throws InterruptedException {
       Queue<String> pubSubMessageQueue = QueueFactories.<String>unboundedQueue().build();
       StreamSource.futureStream(pubSubMessageQueue, new LazyReact(Executors.newCachedThreadPool()))
               .parallel()
               .groupedBySizeAndTime(3, 20, TimeUnit.MILLISECONDS)
               .runFuture(Executors.newCachedThreadPool(), vectors -> vectors.forEach(
                       bufferedMessages -> log.info("Received {}",
                               bufferedMessages.stream().collect(Collectors.joining(",")))));
       ReactiveSeq.range(0,11)
               .forEachAsync(i -> pubSubMessageQueue.offer(i.toString()));
       TimeUnit.MILLISECONDS.sleep(1000);
   }

The last pair will not be logged.

@DarekDan
Copy link
Author

More accurate test case

    @Test
    public void isolatedQueueTest() throws InterruptedException {
        Queue<String> pubSubMessageQueue = QueueFactories.<String>unboundedQueue().build();
        ListX<String> receiver = ListX.empty();
        StreamSource.futureStream(pubSubMessageQueue, new LazyReact(Executors.newCachedThreadPool()))
                .parallel()
                .groupedBySizeAndTime(3, 20, TimeUnit.MILLISECONDS)
                .runFuture(Executors.newCachedThreadPool(), vectors -> vectors.forEach(
                        bufferedMessages -> receiver.addAll(bufferedMessages.stream().toList())));
        ReactiveSeq.range(0,11)
                .forEachAsync(i -> pubSubMessageQueue.offer(i.toString()));
        TimeUnit.MILLISECONDS.sleep(1000);
        pubSubMessageQueue.close();
        TimeUnit.MILLISECONDS.sleep(1000);
        assertEquals(11,receiver.size());
    }

Without close and secondary wait, this test case will fail.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant