首页 > 文章列表 > 如何在Java 9中使用Flow API实现响应式流?

如何在Java 9中使用Flow API实现响应式流?

477 2023-08-30

Flow API 是自 Java 9 以来对反应式流规范的官方支持。它是 Iterator Observer 的组合< /strong>模式。 Flow API 是一种互操作规范,而不是像 RxJava 这样的最终用户 API。

Flow API 由四个基本接口组成:

  • 订阅者订阅者向发布者订阅回调。
  • 发布者发布者向注册订阅者发布数据项流。
  • 订阅发布者和订阅者之间的链接。
  • 处理器处理器位于发布者和订阅者之间,并将一个流转换为另一个流。

在下面的示例中,我们创建了一个基本订阅者,它请求一个数据对象,打印它并请求另一个数据对象。我们可以使用 Java 提供的发布者实现 (SubmissionPublisher) 来完成我们的会话。

示例

import java.util.concurrent.Flow;
import java.util.List;
import java.util.concurrent.SubmissionPublisher;

class MySubscriber<T>implements Flow.Subscriber<T> {
   private Flow.Subscription subscription;
   @Override
   public void onSubscribe(Flow.Subscription subscription) {
      this.subscription = subscription;
      this.subscription.request(1);
   }
   @Override
   public void onNext(T item) {
      System.out.println(item);
      subscription.request(1);
   }
   @Override
   public void onError(Throwable throwable) {
      throwable.printStackTrace();
   }
   @Override
   public void onComplete() {
      System.out.println("Done");
   }
}

// main class
public class FlowTest {
   public static void main(String args[]) {
      List<String> items = List.of("1", "2", "3", "4", "5", "6", "7", "8", "9", "10");
      SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
      publisher.subscribe(new MySubscriber<>());
      items.forEach(s -> {
         try {
            Thread.sleep(1000);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
         publisher.submit(s);
      });
      publisher.close();
   }
}

输出

1
2
3
4
5
6
7
8
9
10
Done