[java] 如何使用Junit来测试异步进程



Answers

另一种方法是使用CountDownLatch类。

public class DatabaseTest {

    /**
     * Data limit
     */
    private static final int DATA_LIMIT = 5;

    /**
     * Countdown latch
     */
    private CountDownLatch lock = new CountDownLatch(1);

    /**
     * Received data
     */
    private List<Data> receiveddata;

    @Test
    public void testDataRetrieval() throws Exception {
        Database db = new MockDatabaseImpl();
        db.getData(DATA_LIMIT, new DataCallback() {
            @Override
            public void onSuccess(List<Data> data) {
                receiveddata = data;
                lock.countDown();
            }
        });

        lock.await(2000, TimeUnit.MILLISECONDS);

        assertNotNull(receiveddata);
        assertEquals(DATA_LIMIT, receiveddata.size());
    }
}

注意 ,不能仅将与常规对象syncronized用作锁,因为快速回调可以在调用锁的等待方法之前释放锁。 请参阅Joe Walnes的博客文章。

编辑删除CountDownLatch周围的同步块,感谢来自@jtahlborn和@Ring的评论

Question

你如何测试使用Junit激发异步过程的方法?

我不知道如何让我的测试等待进程结束(这不完全是一个单元测试,它更像是一个集成测试,因为它涉及几个类而不只是一个)




测试线程/异步代码没有任何内在的错误,尤其是线程是您正在测试的代码的重点 。 测试这些东西的一般方法是:

  • 阻止主测试线程
  • 从其他线程捕获失败的断言
  • 解锁主测试线程
  • 反思任何失败

但这是一个测试的很多样板。 更好/更简单的方法是使用ConcurrentUnit

  final Waiter waiter = new Waiter();

  new Thread(() -> {
    doSomeWork();
    waiter.assertTrue(true);
    waiter.resume();
  }).start();

  // Wait for resume() to be called
  waiter.await(1000);

CountdownLatch方法相比,这样做的好处在于它不那么冗长,因为任何线程中发生的断言失败都会正确地报告给主线程,这意味着测试在应该执行时会失败。 将CountdownLatch方法与ConcurrentUnit进行比较的CountdownLatchhere

我还为那些想要了解更多细节的人写了here关于该主题的here




如果您使用CompletableFuture (在Java 8中引入)或SettableFuture (来自Google Guava ),则可以在完成测试后立即完成测试,而不是等待预设的时间量。 你的测试看起来像这样:

CompletableFuture<String> future = new CompletableFuture<>();
executorService.submit(new Runnable() {         
    @Override
    public void run() {
        future.complete("Hello World!");                
    }
});
assertEquals("Hello World!", future.get());



我发现一种非常有用的测试异步方法的方法是在对象测试的构造函数中注入一个Executor实例。 在生产中,执行程序实例被配置为异步运行,而在测试中它可以被模拟为同步运行。

所以假设我试图测试异步方法Foo#doAsync(Callback c)

class Foo {
  private final Executor executor;
  public Foo(Executor executor) {
    this.executor = executor;
  }

  public void doAsync(Callback c) {
    executor.execute(new Runnable() {
      @Override public void run() {
        // Do stuff here
        c.onComplete(data);
      }
    });
  }
}

在生产中,我会用Executors.newSingleThreadExecutor() Executor实例构建Foo ,而在测试中我可能会用一个同步执行程序来构建它,它执行以下操作 -

class SynchronousExecutor implements Executor {
  @Override public void execute(Runnable r) {
    r.run();
  }
}

现在我对异步方法的JUnit测试非常干净 -

@Test public void testDoAsync() {
  Executor executor = new SynchronousExecutor();
  Foo objectToTest = new Foo(executor);

  Callback callback = mock(Callback.class);
  objectToTest.doAsync(callback);

  // Verify that Callback#onComplete was called using Mockito.
  verify(callback).onComplete(any(Data.class));

  // Assert that we got back the data that we expected.
  assertEquals(expectedData, callback.getData());
}



我找到一个库socket.io来测试异步逻辑。 它使用LinkedBlockingQueue看起来简单而简单。 这里是一个example

    @Test(timeout = TIMEOUT)
public void message() throws URISyntaxException, InterruptedException {
    final BlockingQueue<Object> values = new LinkedBlockingQueue<Object>();

    socket = client();
    socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
        @Override
        public void call(Object... objects) {
            socket.send("foo", "bar");
        }
    }).on(Socket.EVENT_MESSAGE, new Emitter.Listener() {
        @Override
        public void call(Object... args) {
            values.offer(args);
        }
    });
    socket.connect();

    assertThat((Object[])values.take(), is(new Object[] {"hello client"}));
    assertThat((Object[])values.take(), is(new Object[] {"foo", "bar"}));
    socket.disconnect();
}

使用LinkedBlockingQueue将API阻塞,直到像同步方式一样获得结果。 并设置超时以避免假设有太多时间来等待结果。




尽可能避免使用并行线程进行测试(这是大部分时间)。 这只会让你的测试失败(有时候通过,有时会失败)。

只有当你需要调用其他库/系统时,你可能需要等待其他线程,在这种情况下,总是使用Awaitility库而不是Thread.sleep()

切勿在测试中调用get()join() ,否则您的测试可能会永久运行在您的CI服务器上,以防将来永远无法完成。 在调用get()之前,始终在测试中首先声明isDone() get() 。 对于CompletionStage,即.toCompletableFuture().isDone()

当你测试这样的非阻塞方法时:

public static CompletionStage<Foo> doSomething(BarService service) {
    CompletionStage<Bar> future = service.getBar();
    return future.thenApply(bar -> fooToBar());
}

那么你不应该仅仅通过在测试中传递完整的Future来测试结果,还应该确保你的方法doSomething()不会通过调用join()get()阻塞。 如果您使用非阻塞框架,这一点尤为重要。

为此,请测试您设置为手动完成的未完成未来:

@Test
public void testDoSomething() {
    CompletableFuture<Bar> innerFuture = new CompletableFuture<>();
    fooResult = doSomething(() -> innerFuture).toCompletableFuture();
    assertFalse(fooResult.isDone());

    // this triggers the future to complete
    innerFuture.complete(new Bar());
    assertTrue(fooResult.isDone());

    // futher asserts about fooResult here
}

这样,如果将future.join()添加到doSomething(),则测试将失败。

如果你的服务使用了一个ExecutorService,比如在thenApplyAsync(..., executorService) ,那么在你的测试中注入一个单线程的ExecutorService,比如来自guava的ExecutorService:

ExecutorService executorService = Executors.newSingleThreadExecutor();

如果您的代码使用thenApplyAsync(...)类的thenApplyAsync(...) ,请重写代码以使用ExecutorService(有很多很好的理由),或者使用Awaitility。

为了缩短这个例子,我在测试中将BarService作为一个Java8 lambda实现的方法参数,通常它会是一个你会模拟的注入引用。




值得一提的是,在并发实践中有非常有用的章节Testing Concurrent Programs ,它描述了一些单元测试方法并给出了问题的解决方案。




这是我现在使用的测试结果是异步生成的。

public class TestUtil {

    public static <R> R await(Consumer<CompletableFuture<R>> completer) {
        return await(20, TimeUnit.SECONDS, completer);
    }

    public static <R> R await(int time, TimeUnit unit, Consumer<CompletableFuture<R>> completer) {
        CompletableFuture<R> f = new CompletableFuture<>();
        completer.accept(f);
        try {
            return f.get(time, unit);
        } catch (InterruptedException | TimeoutException e) {
            throw new RuntimeException("Future timed out", e);
        } catch (ExecutionException e) {
            throw new RuntimeException("Future failed", e.getCause());
        }
    }
}

使用静态导入,测试读起来不错。 (注意,在这个例子中,我开始一个线索来说明这个想法)

    @Test
    public void testAsync() {
        String result = await(f -> {
            new Thread(() -> f.complete("My Result")).start();
        });
        assertEquals("My Result", result);
    }

如果未调用f.complete ,则在超时后测试将失败。 您也可以使用f.completeExceptionally失败。




Links