首页 > 文章列表 > RxJava2.x中如何优雅地在Observable的onComplete后取消订阅?

RxJava2.x中如何优雅地在Observable的onComplete后取消订阅?

331 2024-12-14

RxJava2.x中如何优雅地在Observable的onComplete后取消订阅?

rxjava的observable订阅时在oncomplete被调用时取消订阅

在非android环境中使用rxjava2.x时,如果需要在observable的oncomplete被调用时取消订阅,可以采用以下方法:

在oncomplete中设置completablefuture.complete通知调用方已结束:

table.subscribe(tableins -> {
            // system.out.println("-------information-------");
            system.out.println(tableins);
        }, throwable -> {
            throw new schemaexportexception(throwable);
        }, new action() {
            @override
            public void run() throws exception {
                system.out.println("complete");
                // 在这里取消订阅
                completablefuture.complete();
            }
        });

调用方可以根据completablefuture来判断observable是否已完成:

long startStamp = System.currentTimeMillis();
// Flowable
Flowable<Table> tableFlowable = result.getAll(dbName.get(), strategy).flatMap(new Function<Table, Publisher<Table>>() {
            @Override
            public Publisher<Table> apply(@NonNull Table table) throws Exception {
                return result.getTableColumn(table).flatMap(new Function<List<Column>, SingleSource<Table>>() {
                    @Override
                    public SingleSource<Table> apply(@NonNull List<Column> columns) throws Exception {
                        return Single.just(table.fillColumn(columns));
                    }
                }).flatMapPublisher(new Function<Table, Publisher<? extends Table>>() {
                    @Override
                    public Publisher<? extends Table> apply(@NonNull Table table) throws Exception {
                        return Flowable.just(table);
                    }
                });
            }
        });
Disposable disposable = null;
try {
            disposable = out.flush(info, tableFlowable);
            CompletableFuture<String> future = out.getFuture();
            while (!future.isDone()) {
                logger.info("[ERE-Flowable]未完成,线程休眠1秒");
                Thread.currentThread().sleep(1000, 0);
            }
            String result = future.get();
            logger.info("[ERE-Flowable]完成, 结果:" + result);
            if (result.equals("OK")) {
                long finishStamp = System.currentTimeMillis();
                clearHander(disposable, "[ERE-Flowable]RxJava disposed because complete, WithTime: " + (finishStamp - startStamp));
            }
        } catch (Exception e) {
            clearHander(disposable, "[ERE-Flowable]RxJava disposed has Exception: " + e.getMessage());
        }
来源:1732953211