April 11, 2023|8 min|Backend Engineering
Back to posts

Quarkus 响应式编程 - Mutiny

AI Summary

Mutiny 是 Quarkus 生态的响应式编程库,提供 Uni(单值异步)和 Multi(多值异步)两种类型。本文从响应式编程的核心概念讲起,对比了 Mutiny 与 Spring WebFlux(Reactor)的区别,详细介绍了 Uni、Multi 的基本用法、转换操作符、组合操作符,以及在 RESTEasy Reactive 和消息总线中的应用实战。

TL;DR

  • Mutiny = Quarkus 的响应式编程库,提供 UniMulti 两种核心类型
  • Uni<T> = 异步单值,类似 RxJava 的 Maybe
  • Multi<T> = 异步多值,类似 RxJava 的 Flowable
  • 👉 对比 Spring 的 Mono / Flux,Mutiny 更好上手
  • 👉 结合 Quarkus 的 RESTEasy Reactive,写响应式 REST 接口非常舒服

前言

看过 Go 并发模型的一定看到过一句话

不要通过共享内存来通信,而是通过通信来共享内存。

响应式编程其实是同样的哲学,只是表达方式不同。

  • Spring 生态:ReactorMono / Flux
  • Quarkus 生态:Mutiny

一、为什么需要响应式编程?

先不急着讲概念,说说痛点。

假设有个接口要调用三个服务,然后把结果拼在一起:

java
// 阻塞写法
public UserVO getUser(Long id) {
    User user = userService.getUser(id);           // 等 100ms
    List<Order> orders = orderService.getOrders(id); // 等 200ms
    List<Address> addresses = addressService.getAddresses(id); // 等 150ms
    return new UserVO(user, orders, addresses);    // 组装
}

三个请求是串行的,总耗时 = 100 + 200 + 150 = 450ms。

优化一下:

java
// 异步写法
public CompletableFuture<UserVO> getUser(Long id) {
    CompletableFuture<User> userFuture = userService.getUserAsync(id);
    CompletableFuture<List<Order>> ordersFuture = orderService.getOrdersAsync(id);
    CompletableFuture<List<Address>> addressesFuture = addressService.getAddressesAsync(id);

    return CompletableFuture.allOf(userFuture, ordersFuture, addressesFuture)
        .thenApply(v -> new UserVO(
            userFuture.join(),
            ordersFuture.join(),
            addressesFuture.join()
        ));
}

三个请求并行了,总耗时 = max(100, 200, 150) = 200ms。

这已经比串行快很多了,但还不够优雅。

如果订单服务返回空、地址服务超时了呢?错误处理怎么写?重试逻辑怎么加?

Mutiny 就是来解决这些问题的


二、Mutiny 核心概念:Uni 和 Multi

Mutiny 的核心就两种类型:

类型含义比喻类似
Uni<T>异步单值未来的某个 TOptional<T>, CompletionStage<T>
Multi<T>异步多值未来的某个列表Flux<T>

2.1 Uni<T>:异步单值

一个 Uni 代表一个异步结果,有也行,没有也行。

java
Uni<User> userUni = userService.findById(1L);

创建方式

java
// 从已存在的值
Uni<User> fromValue = Uni.createFrom().item(user);

// 从 null
Uni<User> fromNull = Uni.createFrom().nullItem();

// 从异常
Uni<User> fromError = Uni.createFrom().failure(new RuntimeException("oops"));

// 从异步任务
Uni<User> fromCallable = Uni.createFrom().call(() -> {
    return CompletableFuture.supplyAsync(() -> findUser());
});

// 手动完成
Uni<User> manual = Uni.createFrom().publisher(subscriber -> {
    subscriber.emit(user);
    subscriber.complete();
});

订阅和处理

java
userUni
    .onItem().transform(user -> user.getName().toUpperCase())
    .onItem().invoke(name -> System.out.println("Name: " + name))
    .onFailure().invoke(ex -> System.err.println("Error: " + ex.getMessage()))
    .subscribe().with(
        name -> System.out.println("Final: " + name),
        error -> System.err.println("Failed: " + error)
    );

链式调用:可以看到 Mutiny 大量使用链式调用,每个操作符都返回新的 Uni,这种风格非常流畅。

2.2 Multi<T>:异步多值

一个 Multi 代表多个异步结果,可以是零个、一个或多个。

java
Multi<User> userMulti = userService.findAll();

使用场景

  • 分页查询
  • 实时数据流
  • SSE(Server-Sent Events)

创建方式

java
// 从集合
Multi<Integer> numbers = Multi.createFrom().items(1, 2, 3, 4, 5);

// 从 publisher
Multi<String> fromPublisher = Multi.createFrom().publisher(Flux.just("a", "b", "c"));

// 手动控制
Multi<Integer> manual = Multi.createFrom().producer(subscriber -> {
    subscriber.emit(1);
    subscriber.emit(2);
    subscriber.emit(3);
    subscriber.onCompletion();
});

订阅

java
userMulti
    .onItem().transform(user -> user.getName())
    .filter(name -> name.startsWith("A"))
    .subscribe().with(
        name -> System.out.println("User: " + name),
        error -> System.err.println("Error: " + error),
        () -> System.out.println("Completed!")
    );

三、基本操作符

Mutiny 提供了丰富的操作符,跟 RxJava 类似。

3.1 转换操作符

java
// transform: 同步转换
userUni.onItem().transform(user -> user.getName());

// transformToUni: 异步转换(链式调用另一个异步操作)
userUni.onItem().transformToUni(user -> userService.getDetail(user.getId()));

// flatMap: 拍平(类似 JS 的 Promise chain)
userUni.onItem().flatMap(user -> detailService.getDetail(user.getId()));

3.2 过滤操作符

java
// filter: 过滤
userMulti.filter(user -> user.getAge() > 18);

// take: 取前 N 个
userMulti.take().atMost(10);

// distinct: 去重
userMulti.distinct();

3.3 组合操作符

java
// merge: 合并多个 Multi
Multi<User> merged = Multi.createBy().merging()
    .streams(userMulti1, userMulti2, userMulti3);

// combine: 组合(配对)- Uni 版本
Uni.combine().all()
    .unis(userUni, orderUni)
    .asTuple()
    .onItem().transform(tuple -> new UserOrder(tuple.getItem1(), tuple.getItem2()));

// zip: 拉链式组合
Uni<UserOrder> zipped = Uni.zip().all()
    .unis(userUni, orderUni)
    .combinedWith(UserOrder::new);

3.4 错误处理

java
// onFailure: 捕获异常
userUni.onFailure().invoke(ex -> log.error("Error", ex));

// recoverWithItem: 异常时返回默认值
userUni.onFailure().recoverWithItem(fallbackUser);

// recoverWithUni: 异常时执行另一个异步操作
userUni.onFailure().recoverWithUni(ex -> fallbackService.getDefault());

// retry: 重试
userUni.retry().withBackOff(Duration.ofSeconds(1)).atMost(3);

四、实战:RESTEasy Reactive

4.1 环境准备

添加依赖:

xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-rest</artifactId>
</dependency>
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-rest-jackson</artifactId>
</dependency>

4.2 返回 Uni

java
@Path("/users")
public class UserResource {

    @Inject
    UserService userService;

    @GET
    @Path("/{id}")
    public Uni<User> getUser(@PathParam Long id) {
        return userService.findById(id);
    }
}

注意:返回值是 Uni<User>,不是 User

Quarkus 会自动订阅这个 Uni,等结果返回后再序列化成 JSON。

4.3 返回 Multi(流式响应)

java
@GET
@Path("/users")
public Multi<User> getAllUsers() {
    return userService.findAll();
}

这种方式适合做流式响应,比如 SSE。

java
@GET
@Path("/stream")
public Multi<String> streamMessages() {
    return messageService.getMessageStream();
}

4.4 接收 Uni 参数

java
@POST
@Path("/users")
public Uni<Response> createUser(User user) {
    return userService.create(user)
        .onItem().transform(saved -> Response.status(201).entity(saved).build());
}

五、实战:响应式消息

Mutiny 不仅能用在 HTTP 层,还能用在消息通信上。

5.1 Kafka 响应式消费

添加依赖:

xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-messaging-kafka</artifactId>
</dependency>

消费者

java
@Incoming("orders")
@Outgoing("processed-orders")
public Uni<Order> processOrder(Order order) {
    return orderService.process(order);
}

消息配置(application.properties):

properties
mp.messaging.incoming.orders.connector=smallrye-kafka
mp.messaging.incoming.orders.topic=order-requests
mp.messaging.incoming.orders.group.id=order-processor

mp.messaging.outgoing.processed-orders.connector=smallrye-kafka
mp.messaging.outgoing.processed-orders.topic=order-results

生产者

java
@Inject
@Channel("order-events")
Emitter<Order> orderEmitter;

public void createOrder(Order order) {
    doCreate(order)
        .onItem().invoke(created -> orderEmitter.send(created))
        .subscribe().with(result -> {});
}

注意:Emitter.send() 是 fire-and-forget 的,不需要返回值。

5.2 AMQP 消息

java
@Incoming("prices")
public Uni<Void> processPrice(Message<Double> msg) {
    Double price = msg.getPayload();
    return priceService.update(price)
        .onItem().invoke(() -> msg.ack())
        .onFailure().invoke(e -> msg.nack(e))
        .replaceWithVoid();
}

六、Mutiny vs Reactor 对比

之前用过 Spring WebFlux(Reactor),对比一下:

对比维度Reactor (Spring)Mutiny (Quarkus)
核心类型Mono<T>, Flux<T>Uni<T>, Multi<T>
概念复杂度较高(冷/热流、背压)较低(概念直观)
学习曲线陡峭平缓
文档质量完善清晰
调试友好度一般更好

Mutiny 的优势

  • Uni / Multi 命名更直观,不容易混淆
  • 操作符设计更符合直觉
  • 错误信息更友好

Reactor 的优势

  • 生态更成熟,资料更多
  • 与 Spring WebFlux 无缝集成
  • 冷热流概念更完整

七、最佳实践

7.1 什么时候用 Uni,什么时候用 Multi?

java
// 查询单个 -> Uni
Uni<User> findById(Long id);

// 查询列表 -> Multi
Multi<User> findAll();

// 分页 -> Multi(流式返回每一页)
Multi<User> findAll(Paging paging);

7.2 避免嵌套 Uni

java
// ❌ 不好:嵌套的 Uni
userUni.onItem().transformToUni(user -> {
    return orderService.findByUser(user.getId())
        .onItem().transformToUni(order -> {
            return addressService.findByUser(user.getId())
                .onItem().transform(address -> new UserDetail(user, order, address));
        });
});

// ✅ 好:链式扁平化
userUni
    .flatMap(user -> Multi.createFrom().item(user)
        .onItem().transformToMulti(u -> orderService.findByUser(u.getId()))
        .onItem().transformToUni(order -> addressService.findByUser(user.getId())
            .onItem().transform(address -> new UserDetail(user, order, address))))

7.3 善用备用值和默认值

java
// 异常时返回默认值
userService.findById(id)
    .onFailure().recoverWithItem(User.DEFAULT);

// 异常时执行替代逻辑
userService.findById(id)
    .onFailure().recoverWithUni(ex -> fallbackService.getDefault());

总结

Mutiny 是 Quarkus 生态中非常优秀的响应式编程库:

  • 概念简单Uni = 单值,Multi = 多值,一目了然
  • API 友好:链式调用,操作符命名符合直觉
  • 集成完善:RESTEasy Reactive、Messaging(Kafka、AMQP)都有原生支持
  • 性能优异:底层基于 Vert.x,非阻塞、事件驱动

如果你在用 Quarkus,想写响应式代码,Mutiny 是首选。

如果你之前用过 Spring WebFlux,但觉得 Reactor 太复杂,Mutiny 会让你有「原来可以这么简单」的感觉。


参考

Command Palette

Search for a command to run...