V2 to V1 兼容转换参考
下面仅为参考,并不能完全兼容。比如:
- v1 的 sendAndSubscribe 也可以发送数据被 lpc 接收并返回。v2 不支持。
- v2 的 call 支持等待或回调(可以获取 CompletableFuture);v1 的 sendAndRequest 则只能等待。
v2 只使用 call 作为 lpc 的基础(对应 v1 的是 sendAndRequest),相对更专业些。v2 对泛型的使用,是比较漂亮的。
1、to V1 兼容转换应用示例
final String topic = "demo.hello";
// for send
DamiBusV1.<String>listen(topic, event -> {
System.out.println("Received data: " + event.getPayload());
});
DamiBusV1.send(topic, "hi");
// for sendAndRequest
DamiBusV1.<String, String>listen(topic, (event, data, sink) -> {
System.out.println("Received data: " + data);
sink.complete("hello!"); //sink = CompletableFuture
});
DamiBusV1.<String, String>sendAndRequest(topic, "hi"); //(支持 lpc 发送)
// for sendAndSubscribe
DamiBusV1.<String, String>listen(topic, (event, att, data, sink) -> {
System.out.println("Received data: " + data);
sink.onNext("hello!"); //sink = Subscriber
sink.onNext("miss you!"); //sink = Subscriber
sink.onComplete();
});
DamiBusV1.<String, String>sendAndSubscribe(topic, "hi", item -> {
System.out.println("Callback data: " + item);
});
2、to V1 兼容转换参考
当 “监听侧” 异步处理时。 v1 的接口 sendAndRequest、sendAndSubscribe 是无法传递异常的。如无必要,不建议使用 v1 的接口(或者只做个中间临时过渡)。另外,下面的 listen 为了方便使用,是各有小区别的(需要注意下)。
泛型可以结合需求,进一步限制范围。
package lab;
import org.noear.dami2.Dami;
import org.noear.dami2.bus.EventListener;
import org.noear.dami2.bus.receivable.CallEventListener;
import org.noear.dami2.bus.receivable.StreamEventListener;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.function.Supplier;
public interface DamiBusV1 {
static Logger logger = LoggerFactory.getLogger(DamiBusV1.class);
/**
* 发送
*
* @param <P> 荷载类型
*/
static <P> void send(String topic, P payload) {
Dami.bus().send(topic, payload);
}
/**
* 发送并要求一次答复(支持 lpc 发送)
*
* @param <D> 发送数据类型
* @param <R> 响应数据类型
*
*/
static <D, R> R sendAndRequest(String topic, D data) throws InterruptedException, ExecutionException {
return Dami.bus().<D, R>call(topic, data).get();
}
/**
* 发送并要求一次答复(支持 lpc 发送)
*
* @param <D> 发送数据类型
* @param <R> 响应数据类型
* @param fallback 应急处理(如果没有订阅)
*
*/
static <D, R> R sendAndRequest(String topic, D data, Supplier<R> fallback) throws InterruptedException, ExecutionException {
if (fallback == null) {
return Dami.bus().<D, R>call(topic, data, r -> {
r.complete(fallback.get());
}).get();
} else {
return Dami.bus().<D, R>call(topic, data, r -> {
r.complete(fallback.get());
}).get();
}
}
/**
* 发送并要求多次答复(响应式流)(不支持 lpc)
*
* @param <D> 发送数据类型
* @param <R> 响应数据类型
* @param callback 回调
*
*/
static <D, R> void sendAndSubscribe(String topic, D data, Consumer<R> callback) {
sendAndSubscribe(topic, data, callback, null);
}
/**
* 发送并要求多次答复(响应式流)(不支持 lpc)
*
* @param <D> 发送数据类型
* @param <R> 响应数据类型
* @param callback 回调
* @param fallback 应急处理(如果没有订阅)
*/
static <D, R> void sendAndSubscribe(String topic, D data, Consumer<R> callback, Supplier<R> fallback) {
Subscriber<R> subscriber = new Subscriber<R>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(R r) {
callback.accept(r);
if (subscription != null) {
subscription.request(1);
}
}
@Override
public void onError(Throwable throwable) {
logger.error(throwable.getMessage(), throwable);
}
@Override
public void onComplete() {
}
};
if (fallback == null) {
Dami.bus().<D, R>stream(topic, data).subscribe(subscriber);
} else {
Dami.bus().<D, R>stream(topic, data, r -> {
r.onNext(fallback.get());
r.onComplete();
}).subscribe(subscriber);
}
}
/**
* on send
*
* @param <P> 荷载类型
*/
static <P> void listen(String topic, EventListener<P> listener) {
Dami.bus().listen(topic, listener);
}
/**
* on sendAndRequest
*
* @param <D> 发送数据类型
* @param <R> 响应数据类型
*/
static <D, R> void listen(String topic, CallEventListener<D, R> listener) {
Dami.bus().listen(topic, listener);
}
/**
* on sendAndSubscribe
*
* @param <D> 发送数据类型
* @param <R> 响应数据类型
*/
static <D, R> void listen(String topic, StreamEventListener<D, R> listener) {
Dami.bus().listen(topic, listener);
}
/// //////////////
}