java documentation - 如何使用Junit來測試異步進程





jar (14)


另一種方法是使用CountDownLatch類。

public class DatabaseTest {

    /**
     * Data limit
     */
    private static final int DATA_LIMIT = 5;

    /**
     * Countdown latch
     */
    private CountDownLatch lock = new CountDownLatch(1);

    /**
     * Received data
     */
    private List<Data> receiveddata;

    @Test
    public void testDataRetrieval() throws Exception {
        Database db = new MockDatabaseImpl();
        db.getData(DATA_LIMIT, new DataCallback() {
            @Override
            public void onSuccess(List<Data> data) {
                receiveddata = data;
                lock.countDown();
            }
        });

        lock.await(2000, TimeUnit.MILLISECONDS);

        assertNotNull(receiveddata);
        assertEquals(DATA_LIMIT, receiveddata.size());
    }
}

注意 ,不能僅將與常規對象syncronized用作鎖,因為快速回調可以在調用鎖的等待方法之前釋放鎖。 請參閱Joe Walnes的博客文章。

編輯刪除CountDownLatch周圍的同步塊,感謝來自@jtahlborn和@Ring的評論

你如何測試使用Junit激發異步過程的方法?

我不知道如何讓我的測試等待進程結束(這不完全是一個單元測試,它更像是一個集成測試,因為它涉及幾個類而不只是一個)




這是我現在使用的測試結果是異步生成的。

public class TestUtil {

    public static <R> R await(Consumer<CompletableFuture<R>> completer) {
        return await(20, TimeUnit.SECONDS, completer);
    }

    public static <R> R await(int time, TimeUnit unit, Consumer<CompletableFuture<R>> completer) {
        CompletableFuture<R> f = new CompletableFuture<>();
        completer.accept(f);
        try {
            return f.get(time, unit);
        } catch (InterruptedException | TimeoutException e) {
            throw new RuntimeException("Future timed out", e);
        } catch (ExecutionException e) {
            throw new RuntimeException("Future failed", e.getCause());
        }
    }
}

使用靜態導入,測試讀起來不錯。 (注意,在這個例子中,我開始一個線索來說明這個想法)

    @Test
    public void testAsync() {
        String result = await(f -> {
            new Thread(() -> f.complete("My Result")).start();
        });
        assertEquals("My Result", result);
    }

如果未調用f.complete ,則在超時後測試將失敗。 您也可以使用f.completeExceptionally失敗。




如果你想測試邏輯只是不要異步測試它。

例如測試這個代碼對異步方法的結果有效。

public class Example {
    private Dependency dependency;

    public Example(Dependency dependency) {
        this.dependency = dependency;            
    }

    public CompletableFuture<String> someAsyncMethod(){
        return dependency.asyncMethod()
                .handle((r,ex) -> {
                    if(ex != null) {
                        return "got exception";
                    } else {
                        return r.toString();
                    }
                });
    }
}

public class Dependency {
    public CompletableFuture<Integer> asyncMethod() {
        // do some async stuff       
    }
}

在測試模擬與同步實現的依賴關係。 單元測試是完全同步的,運行時間為150ms。

public class DependencyTest {
    private Example sut;
    private Dependency dependency;

    public void setup() {
        dependency = Mockito.mock(Dependency.class);;
        sut = new Example(dependency);
    }

    @Test public void success() throws InterruptedException, ExecutionException {
        when(dependency.asyncMethod()).thenReturn(CompletableFuture.completedFuture(5));

        // When
        CompletableFuture<String> result = sut.someAsyncMethod();

        // Then
        assertThat(result.isCompletedExceptionally(), is(equalTo(false)));
        String value = result.get();
        assertThat(value, is(equalTo("5")));
    }

    @Test public void failed() throws InterruptedException, ExecutionException {
        // Given
        CompletableFuture<Integer> c = new CompletableFuture<Integer>();
        c.completeExceptionally(new RuntimeException("failed"));
        when(dependency.asyncMethod()).thenReturn(c);

        // When
        CompletableFuture<String> result = sut.someAsyncMethod();

        // Then
        assertThat(result.isCompletedExceptionally(), is(equalTo(false)));
        String value = result.get();
        assertThat(value, is(equalTo("got exception")));
    }
}

您不測試異步行為,但可以測試邏輯是否正確。




啟動流程並使用Future來等待結果。




這裡有很多答案,但最簡單的就是創建一個完整的CompletableFuture並使用它:

CompletableFuture.completedFuture("donzo")

所以在我的測試中:

this.exactly(2).of(mockEventHubClientWrapper).sendASync(with(any(LinkedList.class)));
this.will(returnValue(new CompletableFuture<>().completedFuture("donzo")));

我只是確保所有這些東西都會被調用。 如果您使用此代碼,此技術可以工作:

CompletableFuture.allOf(calls.toArray(new CompletableFuture[0])).join();

隨著所有CompletableFutures完成,它將通過它正確拉鍊!




如果您使用CompletableFuture (在Java 8中引入)或SettableFuture (來自Google Guava ),則可以在完成測試後立即完成測試,而不是等待預設的時間量。 你的測試看起來像這樣:

CompletableFuture<String> future = new CompletableFuture<>();
executorService.submit(new Runnable() {         
    @Override
    public void run() {
        future.complete("Hello World!");                
    }
});
assertEquals("Hello World!", future.get());



測試線程/異步代碼沒有任何內在的錯誤,尤其是線程是您正在測試的代碼的重點 。 測試這些東西的一般方法是:

  • 阻止主測試線程
  • 從其他線程捕獲失敗的斷言
  • 解鎖主測試線程
  • 反思任何失敗

但這是一個測試的很多樣板。 更好/更簡單的方法是使用ConcurrentUnit

  final Waiter waiter = new Waiter();

  new Thread(() -> {
    doSomeWork();
    waiter.assertTrue(true);
    waiter.resume();
  }).start();

  // Wait for resume() to be called
  waiter.await(1000);

CountdownLatch方法相比,這樣做的好處在於它不那麼冗長,因為任何線程中發生的斷言失敗都會​​正確地報告給主線程,這意味著測試在應該執行時會失敗。 將CountdownLatch方法與ConcurrentUnit進行比較的CountdownLatchhere

我還為那些想要了解更多細節的人寫了here關於該主題的here




我找到一個庫socket.io來測試異步邏輯。 它使用LinkedBlockingQueue看起來簡單而簡單。 這裡是一個example

    @Test(timeout = TIMEOUT)
public void message() throws URISyntaxException, InterruptedException {
    final BlockingQueue<Object> values = new LinkedBlockingQueue<Object>();

    socket = client();
    socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
        @Override
        public void call(Object... objects) {
            socket.send("foo", "bar");
        }
    }).on(Socket.EVENT_MESSAGE, new Emitter.Listener() {
        @Override
        public void call(Object... args) {
            values.offer(args);
        }
    });
    socket.connect();

    assertThat((Object[])values.take(), is(new Object[] {"hello client"}));
    assertThat((Object[])values.take(), is(new Object[] {"foo", "bar"}));
    socket.disconnect();
}

使用LinkedBlockingQueue將API阻塞,直到像同步方式一樣獲得結果。 並設置超時以避免假設有太多時間來等待結果。




您可以嘗試使用Awaitility庫。 它可以很容易地測試你正在討論的系統。




我發現一種​​非常有用的測試異步方法的方法是在對象測試的構造函數中註入一個Executor實例。 在生產中,執行程序實例被配置為異步運行,而在測試中它可以被模擬為同步運行。

所以假設我試圖測試異步方法Foo#doAsync(Callback c)

class Foo {
  private final Executor executor;
  public Foo(Executor executor) {
    this.executor = executor;
  }

  public void doAsync(Callback c) {
    executor.execute(new Runnable() {
      @Override public void run() {
        // Do stuff here
        c.onComplete(data);
      }
    });
  }
}

在生產中,我會用Executors.newSingleThreadExecutor() Executor實例構建Foo ,而在測試中我可能會用一個同步執行程序來構建它,它執行以下操作 -

class SynchronousExecutor implements Executor {
  @Override public void execute(Runnable r) {
    r.run();
  }
}

現在我對異步方法的JUnit測試非常乾淨 -

@Test public void testDoAsync() {
  Executor executor = new SynchronousExecutor();
  Foo objectToTest = new Foo(executor);

  Callback callback = mock(Callback.class);
  objectToTest.doAsync(callback);

  // Verify that Callback#onComplete was called using Mockito.
  verify(callback).onComplete(any(Data.class));

  // Assert that we got back the data that we expected.
  assertEquals(expectedData, callback.getData());
}



值得一提的是,在並發實踐中有非常有用的章節Testing Concurrent Programs ,它描述了一些單元測試方法並給出了問題的解決方案。




如何調用SomeObject.waitnotifyAll或者使用Robotiums Solo.waitForCondition(...)方法,或者使用我寫來完成此操作(請參閱註釋和測試類以了解如何使用)




恕我直言,不好的做法是讓單元測試在線程上創建或等待,等等。你希望這些測試在分秒內運行。 這就是為什麼我想提出一個測試異步流程的兩步法。

  1. 測試你的異步過程是否正確提交。 您可以模擬接受異步請求的對象,並確保提交的作業具有正確的屬性等。
  2. 測試你的異步回調是否正確。 在這裡,您可以嘲笑最初提交的作業,並假設它已正確初始化並確認您的回調是正確的。



我認為最好的解決方案是使用unittest 命令行界面 ,該界面將目錄添加到sys.path因此您不必(在TestLoader類中完成)。

例如,對於像這樣的目錄結構:

new_project
├── antigravity.py
└── test_antigravity.py

你可以運行:

$ cd new_project
$ python -m unittest test_antigravity

對於像你這樣的目錄結構:

new_project
├── antigravity
│   ├── __init__.py         # make it a package
│   └── antigravity.py
└── test
    ├── __init__.py         # also make test a package
    └── test_antigravity.py

test包內的測試模塊中,您可以照常導入antigravity包及其模塊:

# import the package
import antigravity

# import the antigravity module
from antigravity import antigravity

# or an object inside the antigravity module
from antigravity.antigravity import my_object

運行一個測試模塊:

要運行一個測試模塊,在這種情況下是test_antigravity.py

$ cd new_project
$ python -m unittest test.test_antigravity

只需引用測試模塊的方式與導入它的方式相同。

運行單個測試用例或測試方法:

您也可以運行單個TestCase或單個測試方法:

$ python -m unittest test.test_antigravity.GravityTestCase
$ python -m unittest test.test_antigravity.GravityTestCase.test_method

運行所有測試:

你也可以使用測試發現來發現和運行你所有的測試,它們必須是名為test*.py (可以用-p, --pattern標誌改變)的模塊或者包:

$ cd new_project
$ python -m unittest discover

這將運行test包內的所有test*.py模塊。





java unit-testing asynchronous junit