尝试创建SQS轮询器,它:
进行指数轮询(以减少队列中没有消息的请求数)
如果队列中有很多消息,则更频繁地查询SQS
如果收到一定数量的消息,将产生反压,它将停止轮询
不受AWS API速率限制的限制
作为示例,我使用的是this JavaRx实现,该实现可以轻松转换为Project Reactor并使用反压功能对其进行丰富。
private static final Long DEFAULT_BACKOFF = 500L;
private static final Long MAX_BACKOFF = 8000L;
private static final Logger LOGGER = LoggerFactory.getLogger(SqsPollerService.class);
private static volatile boolean stopRequested;
public Flux<Message> pollMessages(GetQueueUrlResult q)
{
return Flux.create(sink -> {
long backoff = DEFAULT_BACKOFF;
while (!stopRequested)
{
if (sink.isCancelled())
{
sink.error(new RuntimeException("Stop requested"));
break;
}
Future<ReceiveMessageResult> future = sink.requestedFromDownstream() > 0
? amazonSQS.receiveMessageAsync(createRequest(q))
: completedFuture(new ReceiveMessageResult());
try
{
ReceiveMessageResult result = future.get();
if (result != null && !result.getMessages().isEmpty())
{
backoff = DEFAULT_BACKOFF;
LOGGER.info("New messages found in queue size={}", result.getMessages().size());
result.getMessages().forEach(m -> {
if (sink.requestedFromDownstream() > 0L)
{
sink.next(m);
}
});
}
else
{
if (backoff < MAX_BACKOFF)
{
backoff = backoff * 2;
}
LOGGER.debug("No messages found on queue. Sleeping for {} ms.", backoff);
// This is to prevent rate limiting by the AWS api
Thread.sleep(backoff);
}
}
catch (InterruptedException e)
{
stopRequested = true;
}
catch (ExecutionException e)
{
sink.error(e);
}
}
});
}
实施似乎可行,但有几个问题:
看起来可以使用Reactor Primitives在循环中查询Future结果,并使用Flux.generate
进行了尝试,但无法控制对SqsClient的异步调用的次数
如果使用Flux.interval
方法,则不了解如何正确实施退避政策
不喜欢Thread.sleep
调用任何想法如何替换它?
在取消信号的情况下如何正确停止循环?现在使用sink.error
可以解决这种情况。
参考方案
您如何看待以下解决方案:
private static final Integer batchSize = 1;
private static final Integer intervalRequest = 3000;
private static final Integer waitTimeout = 10;
private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private static final SqsAsyncClient sqsAsync =
SqsAsyncClient
.builder()
.endpointOverride(URI.create(queueUrl))
.build();
public static Flux<Message> sqsPublisher =
Flux.create(sink -> {
if (sink.isCancelled()) {
sink.error(new RuntimeException("Stop requested"));
}
scheduler.scheduleWithFixedDelay(() -> {
long numberOfRequests = Math.min(sink.requestedFromDownstream(), batchSize);
if (numberOfRequests > 0) {
ReceiveMessageRequest request = ReceiveMessageRequest
.builder()
.queueUrl(queueUrl)
.maxNumberOfMessages((int) numberOfRequests)
.waitTimeSeconds(waitTimeout).build();
CompletableFuture<ReceiveMessageResponse> response = sqsAsync.receiveMessage(request);
response.thenApply(responseValue -> {
if (responseValue != null && responseValue.messages() != null && !responseValue.messages().isEmpty()) {
responseValue.messages().stream().limit(numberOfRequests).forEach(sink::next);
}
return responseValue;
});
}
}, intervalRequest, intervalRequest, TimeUnit.MILLISECONDS);
});
java:继承 - java有哪些替代继承的方法? java大神给出的解决方案 有效的Java:偏重于继承而不是继承。 (这实际上也来自“四人帮”)。他提出的理由是,如果扩展类未明确设计为继承,则继承会引起很多不正常的副作用。例如,对super.someMethod()的任何调用都可以引导您通过未知代码的意外路径。取而代之的是,持有对本来应该扩展的类的引用,然后委托给它。这是与Eric…
Java-如何将此字符串转换为日期? - java我从服务器收到此消息,我不明白T和Z的含义,2012-08-24T09:59:59Z将此字符串转换为Date对象的正确SimpleDateFormat模式是什么? java大神给出的解决方案 这是ISO 8601标准。您可以使用SimpleDateFormat simpleFormat = new SimpleDateFormat("yyyy-MM…
JAVA:如何检查对象数组中的所有对象是否都是子类的对象? - java我有一个对象数组。现在,我要检查所有这些对象是否都是MyObject的实例。有没有比这更好的选择:boolean check = true; for (Object o : justAList){ if (!(o instanceof MyObject)){ check = false; break; } } java大神给出的解决方案 如果您不喜欢循环,则…
JAVA 8具有任何匹配属性的对象的过滤器列表 - java我的要求是通过匹配任何属性的字符串来过滤对象列表。例如,假设Contact类具有三个属性:街道,城市,电话。我知道java流过滤器是如何工作的,在这里我必须将输入字符串与每个属性进行比较,如下所示:contactList.stream().filter(contact -> contact.getStreet().equals("dubai&…
Java-固定大小的列表与指定初始容量的列表之间的差异 - java我在理解这一点上遇到了问题。当我们做 List<Integer> list = Arrays.asList(array); 我们不能在该列表上使用添加,删除之类的方法。我知道Arrays.asList()返回固定大小的列表。我不明白的是,如果我们创建一个具有指定初始容量的列表,例如List<Integer> list2 = new A…