如何使用Jersey Client 2.2.x取消待处理的异步请求并取消注册调用回调? - java

我有一个带有sleep方法的简单REST服务,该方法除了在指定的时间(毫秒)内不进行任何睡眠外,然后返回No Content响应。我的RESTTest类尝试首先调用http://localhost:8080/myapp/rest/sleep/7500(睡眠7.5秒),但仅等待5秒。 5秒后,它将取消接收到的Future(尝试取消待处理的请求),并调用http://localhost:8080/myapp/rest/sleep/5000(休眠5秒)并等待5秒。

public class RESTTest {
    private final Client client = ClientBuilder.newClient();
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition responseReceived = lock.newCondition();

    public static void main(final String... arguments) {
        new RESTTest().listen(10000);
    }

    public void listen(final long time) {
        System.out.println("Listen for " + time + " ms.");
        Future<Response> _response =
            client.
                target("http://localhost:8080/myapp/rest/sleep/" + time)).
                request().
                async().
                    get(
                        new InvocationCallback<Response>() {
                            public void completed(final Response response) {
                                System.out.println("COMPLETED");
                                lock.lock();
                                try {
                                    responseReceived.signalAll();
                                } finally {
                                    lock.unlock();
                                }
                            }

                            public void failed(final Throwable throwable) {
                                lock.lock();
                                try {
                                    responseReceived.signalAll();
                                } finally {
                                    lock.unlock();
                                }
                            }
                        });
        lock.lock();
        try {
            System.out.println("Waiting for 5000 ms.");
            if (!responseReceived.await(5000, TimeUnit.MILLISECONDS)) {
                System.out.println("Timed out!");
                _response.cancel(true);
                listen(5000);
            } else {
                System.out.println("Response received.");
            }
        } catch (final InterruptedException exception) {
            // Do nothing.
        } finally {
            lock.unlock();
        }
    }
}

现在,我希望看到“ COMPLETED”字符串仅打印一次,并且“收到响应”。字符串也只打印一次。但是,“ COMPLETED”字符串将被打印两次!

Listen for 7500 ms.
Waiting for 5000 ms.
Timed out!
Listen for 5000 ms.
Waiting for 5000 ms.
COMPLETED
Response received.
COMPLETED

我在这里想念什么?

谢谢,

参考方案

我相信您已经弄清楚了,但是这里有一个非常模块化的解决方案,您可以将其与简单的Guava ListenableFuture一起使用。您不必像我在Futures.allAsList中那样汇总响应,但是您可以在末尾执行类似的操作并删除CountDownLatch。

顺便说一句,我很确定您的问题是线程问题。您正在看到COMPLETED,因为在您下次调用listen(5000)之后调用了该回调。请记住,异步将被线程化,因此输出到控制台的时间可能会延迟到下一次上下文切换之前。 7500信号量解锁后,服务器可能正在响应。

private Client client;

@Before
public void setup() {
    final ClientConfig clientConfig = new ClientConfig();
    clientConfig.register(OrtbBidRequestBodyReader.class);
    clientConfig.register(OrtbBidRequestBodyWriter.class);
    clientConfig.connectorProvider(new CachingConnectorProvider(new HttpUrlConnectorProvider()));
    clientConfig.property(ClientProperties.ASYNC_THREADPOOL_SIZE, 3);
    client = ClientBuilder.newClient(clientConfig);
}

@Test
public void testAsync() throws InterruptedException, ExecutionException, JsonProcessingException {

    final WebTarget target = client
            .target("http://localhost:8081/dsp-receiver-0.0.1-SNAPSHOT/ortb/bid/123123?testbid=bid");

    final AtomicInteger successcount = new AtomicInteger();
    final AtomicInteger noBid = new AtomicInteger();
    final AtomicInteger clientError = new AtomicInteger();

    final InvocationCallback<Response> callback = new InvocationCallback<Response>() {
        @Override
        public void completed(final Response response) {
            if (response.getStatus() == 200) {
                successcount.incrementAndGet();
            } else if (response.getStatus() == 204) {
                noBid.incrementAndGet();
            } else {
                clientError.incrementAndGet();
            }
        }

        @Override
        public void failed(final Throwable e) {
            clientError.incrementAndGet();
            logger.info("Client Error", e);
        }
    };

    final Entity<OrtbBidRequest> entity = Entity.entity(testBidRequest, MediaType.APPLICATION_JSON);
    final List<ListenableFuture<Response>> allFutures = Lists.newArrayList();
    final Stopwatch stopwatch = Stopwatch.createStarted();
    for (int i = 0; i < 100000; i++) {
        logger.info("Running request {}", i);
        final Future<Response> future = target.request().accept(MediaType.APPLICATION_JSON).async().post(entity,
                callback);
        final ListenableFuture<Response> response = JdkFutureAdapters.listenInPoolThread(future);
        allFutures.add(response);

        // For each 100 of requests we will wait on them, otherwise we
        // may run out of memory. This is really just to test the stamina
        // of the dsp
        if (i % 200 == 0) {
            Futures.allAsList(allFutures).get();
            allFutures.clear();
        }
    }

    logger.info("success count {}  nobid {} client error {} ", successcount, noBid, clientError);
    logger.info("Total time {} ms ", stopwatch.stop());
}

Java内存收集用法 - java

我试过了,最后像这样挡住了。Client client = new Client(); try { ... } catch { ... } finally { client = null; } 我想问如果发生异常,是否需要client = null清除客户端对象的内存使用。 参考方案 每当对象由于任何原因没有引用时,它就有资格进行垃圾回收,包括变量是否由于程…

Java:将文件上传到FTP问题(数据包丢失) - java

我正在尝试将文件从Java应用程序传输到FTP服务器该程序可以正常工作,文件已传输,但是当我在FTO文件夹中打开文件时,文件已损坏,我认为在文件传输过程中数据包丢失了。为什么?我该如何解决?另一个问题,如果要停止文件上传,如何停止while?谢谢大家!我班上的代码:FTPClient client = new FTPClient(); InputStream…

Java-搜索字符串数组中的字符串 - java

在Java中,我们是否有任何方法可以发现特定字符串是字符串数组的一部分。我可以避免出现一个循环。例如String [] array = {"AA","BB","CC" }; string x = "BB" 我想要一个if (some condition to tell wheth…

如何使用泛型实现比较器? - java

Arraylist只是一个小问题。我想按名称对ArrayList<Client>进行排序。Class Client{ String name; int phonenumber ..} 这段代码可以完成工作,但是我有一个编译器警告:“使用未经检查或不安全的操作”。有什么问题? public void sortByName(){ Collection…

Java Globbing模式以匹配目录和文件 - java

我正在使用递归函数遍历根目录下的文件。我只想提取*.txt文件,但不想排除目录。现在,我的代码如下所示:val stream = Files.newDirectoryStream(head, "*.txt") 但是这样做将不会匹配任何目录,并且返回的iterator()是False。我使用的是Mac,所以我不想包含的噪音文件是.DS_ST…