Sinks란?
- Reactive Streams에서 발생하는 Signal을 프로그래밍적으로 Push할 수 있는 기능을 가지고 있는 Publisher의 일종이다
- Thread-Safe 하지 않을 수 있는 Processor보다 더 나은 대안이 된다
- Sinks는 Thread-Safe하게 Signal을 발생시킨다
- Sinks는
Sinks.Many 또는 Sinks.One Interface를 사용해서 Thread-Safe하게 Signal을 발생시킨다
public static void main(String[] args) {
int tasks = 6;
Flux.create(
(FluxSink<String> sink) ->
IntStream.range(1, tasks).forEach(n -> sink.next(doTask(n)))
)
.subscribeOn(Schedulers.boundedElastic())
.doOnNext(n -> log.info("# created data: {}", n))
.publishOn(Schedulers.parallel())
.map(result -> result + " processed")
.doOnNext(n -> log.info("# mapped data: {}", n))
.publishOn(Schedulers.parallel())
.subscribe(data -> log.info("# consumed data: {}", data));
}
private static String doTask(int taskNumber) {
return "task " + taskNumber + " result";
}
public static void main(String[] args) {
int tasks = 6;
Sinks.Many<String> unicastSink = Sinks.many().unicast().onBackpressureBuffer();
Flux<String> fluxView = unicastSink.asFlux();
IntStream
.range(1, tasks)
.forEach(n -> new Thread(() -> {
unicastSink.emitNext(doTask(n), Sinks.EmitFailureHandler.FAIL_FAST);
log.info("# emitted data: task {}", n);
}).start());
fluxView
.publishOn(Schedulers.parallel())
.map(result -> result + " processed")
.doOnNext(n -> log.info("# mapped data: {}", n))
.publishOn(Schedulers.parallel())
.subscribe(data -> log.info("# consumed data: {}", data));
}
private static String doTask(int taskNumber) {
return "task " + taskNumber + " result";
}
Sinks.One
Sinks.One<String> sinkOne = Sinks.one();
Mono<String> mono = sinkOne.asMono();
sinkOne.emitValue("Hello Reactor", Sinks.EmitFailureHandler.FAIL_FAST);
mono.subscribe(data -> log.info("Subscriber1: {}", data));
mono.subscribe(data -> log.info("Subscriber2: {}", data));
- Emit 된 데이터 중에서 단 하나의 데이터만 Subscriber에게 전달한다. 나머지 데이터는 Drop
Sinks.One<String> sinkOne = Sinks.one();
Mono<String> mono = sinkOne.asMono();
sinkOne.emitValue("Hello Reactor", Sinks.EmitFailureHandler.FAIL_FAST);
sinkOne.emitValue("Hi Reactor", Sinks.EmitFailureHandler.FAIL_FAST);
mono.subscribe(data -> log.info("Subscriber1: {}", data));
mono.subscribe(data -> log.info("Subscriber2: {}", data));
- 위 코드와 달리
sinkOne 에 하나의 Value를 더 emit하였다. 두번째 Emit한 값은 Drop되고 Hello Reactor 만 출력된다
Sinks.Many
Sinks.Many<Integer> unicastSink = Sinks.many().unicast().onBackpressureBuffer();
Flux<Integer> fluxView = unicastSink.asFlux();
unicastSink.emitNext(1, Sinks.EmitFailureHandler.FAIL_FAST);
unicastSink.emitNext(2, Sinks.EmitFailureHandler.FAIL_FAST);
fluxView.subscribe(data -> log.info("Subscriber1: {}", data));
unicastSink.emitNext(3, Sinks.EmitFailureHandler.FAIL_FAST);
fluxView.subscribe(data -> log.info("Subscriber2: {}", data));
unicast() 를 사용해서 단 하나의 Subscriber 에게만 데이터를 Emit 할 수 있다
- 따라서 위 코드는 두번 구독 되었으므로 Exception 이 발생한다