Solon v3.6.0

V2 to V1 兼容转换参考

</> markdown

下面仅为参考,并不能完全兼容。比如:

  • v1 的 sendAndSubscribe 也可以发送数据被 lpc 接收并返回。v2 不支持。
  • v2 的 call 支持等待或回调(可以获取 CompletableFuture);v1 的 sendAndRequest 则只能等待。

v2 只使用 call 作为 lpc 的基础(对应 v1 的是 sendAndRequest),相对更专业些。v2 对泛型的使用,是比较漂亮的。

1、to V1 兼容转换应用示例

final String topic = "demo.hello";

// for send
DamiBusV1.<String>listen(topic, event -> {
    System.out.println("Received data: " + event.getPayload());
});
DamiBusV1.send(topic, "hi");


// for sendAndRequest
DamiBusV1.<String, String>listen(topic, (event, data, sink) -> {
    System.out.println("Received data: " + data);
    sink.complete("hello!"); //sink = CompletableFuture
});
DamiBusV1.<String, String>sendAndRequest(topic, "hi"); //(支持 lpc 发送)


// for sendAndSubscribe
DamiBusV1.<String, String>listen(topic, (event, att, data, sink) -> {
    System.out.println("Received data: " + data);
    sink.onNext("hello!"); //sink = Subscriber
    sink.onNext("miss you!"); //sink = Subscriber
    sink.onComplete();
});
DamiBusV1.<String, String>sendAndSubscribe(topic, "hi", item -> {
    System.out.println("Callback data: " + item);
});

2、to V1 兼容转换参考

当 “监听侧” 异步处理时。 v1 的接口 sendAndRequest、sendAndSubscribe 是无法传递异常的。如无必要,不建议使用 v1 的接口(或者只做个中间临时过渡)。另外,下面的 listen 为了方便使用,是各有小区别的(需要注意下)。

泛型可以结合需求,进一步限制范围。

package lab;

import org.noear.dami2.Dami;
import org.noear.dami2.bus.EventListener;
import org.noear.dami2.bus.receivable.CallEventListener;
import org.noear.dami2.bus.receivable.StreamEventListener;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.function.Supplier;

public interface DamiBusV1 {
    static Logger logger = LoggerFactory.getLogger(DamiBusV1.class);

    /**
     * 发送
     *
     * @param <P> 荷载类型
     */
    static <P> void send(String topic, P payload) {
        Dami.bus().send(topic, payload);
    }

    /**
     * 发送并要求一次答复(支持 lpc 发送)
     *
     * @param <D> 发送数据类型
     * @param <R> 响应数据类型
     *
     */
    static <D, R> R sendAndRequest(String topic, D data) throws InterruptedException, ExecutionException {
        return Dami.bus().<D, R>call(topic, data).get();
    }

    /**
     * 发送并要求一次答复(支持 lpc 发送)
     *
     * @param <D>      发送数据类型
     * @param <R>      响应数据类型
     * @param fallback 应急处理(如果没有订阅)
     *
     */
    static <D, R> R sendAndRequest(String topic, D data, Supplier<R> fallback) throws InterruptedException, ExecutionException {
        if (fallback == null) {
            return Dami.bus().<D, R>call(topic, data, r -> {
                r.complete(fallback.get());
            }).get();
        } else {
            return Dami.bus().<D, R>call(topic, data, r -> {
                r.complete(fallback.get());
            }).get();
        }

    }

    /**
     * 发送并要求多次答复(响应式流)(不支持 lpc)
     *
     * @param <D>      发送数据类型
     * @param <R>      响应数据类型
     * @param callback 回调
     *
     */
    static <D, R> void sendAndSubscribe(String topic, D data, Consumer<R> callback) {
        sendAndSubscribe(topic, data, callback, null);
    }

    /**
     * 发送并要求多次答复(响应式流)(不支持 lpc)
     * 
     * @param <D>      发送数据类型
     * @param <R>      响应数据类型
     * @param callback 回调
     * @param fallback 应急处理(如果没有订阅)
     */
    static <D, R> void sendAndSubscribe(String topic, D data, Consumer<R> callback, Supplier<R> fallback) {
        Subscriber<R> subscriber = new Subscriber<R>() {
            private Subscription subscription;

            @Override
            public void onSubscribe(Subscription subscription) {
                this.subscription = subscription;
                subscription.request(1);
            }

            @Override
            public void onNext(R r) {
                callback.accept(r);
                if (subscription != null) {
                    subscription.request(1);
                }
            }

            @Override
            public void onError(Throwable throwable) {
                logger.error(throwable.getMessage(), throwable);
            }

            @Override
            public void onComplete() {

            }
        };

        if (fallback == null) {
            Dami.bus().<D, R>stream(topic, data).subscribe(subscriber);
        } else {
            Dami.bus().<D, R>stream(topic, data, r -> {
                r.onNext(fallback.get());
                r.onComplete();
            }).subscribe(subscriber);
        }
    }

    /**
     * on send
     *
     * @param <P> 荷载类型
     */
    static <P> void listen(String topic, EventListener<P> listener) {
        Dami.bus().listen(topic, listener);
    }

    /**
     * on sendAndRequest
     *
     * @param <D> 发送数据类型
     * @param <R> 响应数据类型
     */
    static <D, R> void listen(String topic, CallEventListener<D, R> listener) {
        Dami.bus().listen(topic, listener);
    }

    /**
     * on sendAndSubscribe
     *
     * @param <D> 发送数据类型
     * @param <R> 响应数据类型
     */
    static <D, R> void listen(String topic, StreamEventListener<D, R> listener) {
        Dami.bus().listen(topic, listener);
    }

    /// //////////////
}