带有反压力的反应式SQS轮询器 - java

尝试创建SQS轮询器,它:

进行指数轮询(以减少队列中没有消息的请求数)
如果队列中有很多消息,则更频繁地查询SQS
如果收到一定数量的消息,将产生反压,它将停止轮询
不受AWS API速率限制的限制

作为示例,我使用的是this JavaRx实现,该实现可以轻松转换为Project Reactor并使用反压功能对其进行丰富。

private static final Long DEFAULT_BACKOFF = 500L;
private static final Long MAX_BACKOFF = 8000L;
private static final Logger LOGGER = LoggerFactory.getLogger(SqsPollerService.class);
private static volatile boolean stopRequested;

public Flux<Message> pollMessages(GetQueueUrlResult q)
{
    return Flux.create(sink -> {
        long backoff = DEFAULT_BACKOFF;

        while (!stopRequested)
        {
            if (sink.isCancelled())
            {
                sink.error(new RuntimeException("Stop requested"));
                break;
            }

            Future<ReceiveMessageResult> future = sink.requestedFromDownstream() > 0
                    ? amazonSQS.receiveMessageAsync(createRequest(q))
                    : completedFuture(new ReceiveMessageResult());

            try
            {
                ReceiveMessageResult result = future.get();

                if (result != null && !result.getMessages().isEmpty())
                {
                    backoff = DEFAULT_BACKOFF;

                    LOGGER.info("New messages found in queue size={}", result.getMessages().size());

                    result.getMessages().forEach(m -> {
                        if (sink.requestedFromDownstream() > 0L)
                        {
                            sink.next(m);
                        }
                    });
                }
                else
                {
                    if (backoff < MAX_BACKOFF)
                    {
                        backoff = backoff * 2;
                    }

                    LOGGER.debug("No messages found on queue.  Sleeping for {} ms.", backoff);

                    // This is to prevent rate limiting by the AWS api
                    Thread.sleep(backoff);
                }
            }
            catch (InterruptedException e)
            {
                stopRequested = true;
            }
            catch (ExecutionException e)
            {
                sink.error(e);
            }
        }
    });
}

实施似乎可行,但有几个问题:

看起来可以使用Reactor Primitives在循环中查询Future结果,并使用Flux.generate进行了尝试,但无法控制对SqsClient的异步调用的次数
如果使用Flux.interval方法,则不了解如何正确实施退避政策
不喜欢Thread.sleep调用任何想法如何替换它?
在取消信号的情况下如何正确停止循环?现在使用sink.error可以解决这种情况。

参考方案

您如何看待以下解决方案:

    private static final Integer batchSize = 1;
    private static final Integer intervalRequest = 3000;
    private static final Integer waitTimeout = 10;
    private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

    private static final SqsAsyncClient sqsAsync =
       SqsAsyncClient
         .builder()
         .endpointOverride(URI.create(queueUrl))
         .build();

    public static Flux<Message> sqsPublisher =
        Flux.create(sink -> {
                if (sink.isCancelled()) {
                    sink.error(new RuntimeException("Stop requested"));
                }

            scheduler.scheduleWithFixedDelay(() -> {
                long numberOfRequests = Math.min(sink.requestedFromDownstream(), batchSize);
                if (numberOfRequests > 0) {
                    ReceiveMessageRequest request = ReceiveMessageRequest
                            .builder()
                            .queueUrl(queueUrl)
                            .maxNumberOfMessages((int) numberOfRequests)
                            .waitTimeSeconds(waitTimeout).build();

                    CompletableFuture<ReceiveMessageResponse> response = sqsAsync.receiveMessage(request);

                    response.thenApply(responseValue -> {
                        if (responseValue != null && responseValue.messages() != null && !responseValue.messages().isEmpty()) {
                            responseValue.messages().stream().limit(numberOfRequests).forEach(sink::next);
                        }
                        return responseValue;
                    });

                }
            }, intervalRequest, intervalRequest, TimeUnit.MILLISECONDS);
        });

java:继承 - java

有哪些替代继承的方法? java大神给出的解决方案 有效的Java:偏重于继承而不是继承。 (这实际上也来自“四人帮”)。他提出的理由是,如果扩展类未明确设计为继承,则继承会引起很多不正常的副作用。例如,对super.someMethod()的任何调用都可以引导您通过未知代码的意外路径。取而代之的是,持有对本来应该扩展的类的引用,然后委托给它。这是与Eric…

Java-如何将此字符串转换为日期? - java

我从服务器收到此消息,我不明白T和Z的含义,2012-08-24T09:59:59Z将此字符串转换为Date对象的正确SimpleDateFormat模式是什么? java大神给出的解决方案 这是ISO 8601标准。您可以使用SimpleDateFormat simpleFormat = new SimpleDateFormat("yyyy-MM…

JAVA:如何检查对象数组中的所有对象是否都是子类的对象? - java

我有一个对象数组。现在,我要检查所有这些对象是否都是MyObject的实例。有没有比这更好的选择:boolean check = true; for (Object o : justAList){ if (!(o instanceof MyObject)){ check = false; break; } } java大神给出的解决方案 如果您不喜欢循环,则…

JAVA 8具有任何匹配属性的对象的过滤器列表 - java

我的要求是通过匹配任何属性的字符串来过滤对象列表。例如,假设Contact类具有三个属性:街道,城市,电话。我知道java流过滤器是如何工作的,在这里我必须将输入字符串与每个属性进行比较,如下所示:contactList.stream().filter(contact -> contact.getStreet().equals("dubai&…

Java-固定大小的列表与指定初始容量的列表之间的差异 - java

我在理解这一点上遇到了问题。当我们做 List<Integer> list = Arrays.asList(array); 我们不能在该列表上使用添加,删除之类的方法。我知道Arrays.asList()返回固定大小的列表。我不明白的是,如果我们创建一个具有指定初始容量的列表,例如List<Integer> list2 = new A…