Solon-Rx（约2Kb）是基于 reactive-streams 封装的 RxJava 极简版（约 2Mb 左右）。目前仅一个接口 Completable，意为：可完成的发布者。

使用场景及接口：

| 接口                                                 | 说明                 | 备注    | 
| ------------------------------ | ------------ | ------ | 
| `Completable`                                     | 作为返回类型      |   |
| | |   |
| `Completable::doOnError(err->{...})`                                    | 当出错时      |   |
| `Completable::doOnErrorResume(err->Completable)`         | 当出错时，恢复为一个新流      | v3.7.2 后支持  |
| `Completable::doOnComplete(()->{...})`                               | 当完成时      |   |
| | |   |
| `Completable.complete()`                     | 构建完成发布者     |   |
| `Completable.error(cause)`                   | 构建异常发布者     |   |
| | |   |
| `Completable.create(emitter->{...})`      | 构建发射器发布者     |   |
| `Completable.then(()->Completable)`                   | 当完成后（然后），下一个新流     |   |
| `Completable.then(Completable)`                         | 当完成后（然后），下一个新流     |   |
| | |   |
| `Completable.subscribeOn(executor)`      | 订阅于         | v3.7.2 后支持  |
| `Completable.delay(delay, unit)`              | 订阅延时     | v3.7.2 后支持  |


### 1、作为返回类型（主要用于过滤器）

```java
@FunctionalInterface
public interface ExFilter {
    /**
     * 过滤
     *
     * @param ctx   交换上下文
     * @param chain 过滤链
     */
    Completable doFilter(ExContext ctx, ExFilterChain chain);
}
```

### 2、构建返回对象（即，发布者）

```java
@Component
public class CloudGatewayFilterImpl implements CloudGatewayFilter {
    @Override
    public Completable doFilter(ExContext ctx, ExFilterChain chain) {
        String token = ctx.rawHeader("TOKEN");
        if (token == null) {
            ctx.newResponse().status(401);
            return Completable.complete();
        }
        
        return chain.doFilter(ctx);
    }
}
```

### 3、主要事件应用示例



* doOnError 事件应用

当出错时，记录异常日志。//事件，还会传递给后续的观察者。

```java
@Component(index = -99)
public class CloudGatewayFilterImpl implements CloudGatewayFilter {
    @Override
    public Completable doFilter(ExContext ctx, ExFilterChain chain) {
        return chain.doFilter(ctx).doOnError(e -> {
            log.error("{}", e);
        });
    }
}
```


* doOnErrorResume 事件应用

当出错时，调整输出状态。//以新的流，替代旧的流（之前的 OnError 事件，不再传递）

```java
@Component(index = -99)
public class CloudGatewayFilterImpl implements CloudGatewayFilter {
    @Override
    public Completable doFilter(ExContext ctx, ExFilterChain chain) {
        return chain.doFilter(ctx).doOnErrorResume(e -> {
            if (e instanceof StatusException) {
                StatusException se = (StatusException) e;

                ctx.newResponse().status(se.getCode());
            } else {
                ctx.newResponse().status(500);
            }

            return Completable.complete();
        });
    }
}
```


* doOnComplete 事件应用

调整响应头和响应体。//事件，还会传递给后续的观察者。

```java
//同步修改
@Component(index = -99)
public class CloudGatewayFilterImpl implements CloudGatewayFilter {
    @Override
    public Completable doFilter(ExContext ctx, ExFilterChain chain) {
        return chain.doFilter(ctx).doOnComplete(e -> {
            ctx.newResponse().headerAdd("X-TraceId", "xxx");
            ctx.newResponse().body(Buffer.buffer("no!"));
        });
    }
}

//异步修改，可以再用 `Completable.create` 嵌套下
@Component(index = -99)
public class CloudGatewayFilterImpl implements CloudGatewayFilter {
    @Override
    public Completable doFilter(ExContext ctx, ExFilterChain chain) {
        return Completable.create(emitter -> {
            chain.doFilter(ctx)
                    .doOnComplete(() -> {
                        ExBody exBody = ctx.newResponse().getBody();
                        if (exBody instanceof ExBodyOfStream) {
                            ExBodyOfStream streamBody = (ExBodyOfStream) exBody;
                            ((HttpClientResponse) streamBody.getStream()).body().andThen(bodyAr -> {
                                if (bodyAr.succeeded()) {
                                    // 获取响应体内容
                                    String content = bodyAr.result().toString();
                                    ctx.newResponse().header("MD5", Utils.md5(content));
                                    ctx.newResponse().body(Buffer.buffer(content + "#demo"));
                                    emitter.onComplete();
                                } else {
                                    emitter.onError(bodyAr.cause());
                                }
                            });
                        }
                    })
                    .doOnError(err -> {
                        emitter.onError(err);
                    })
                    .subscribe();
        });
    }
}
```

