Sinks란?

public static void main(String[] args) {
    int tasks = 6;

    Flux.create(
            (FluxSink<String> sink) -> 
                    IntStream.range(1, tasks).forEach(n -> sink.next(doTask(n)))
            )
            .subscribeOn(Schedulers.boundedElastic())
            .doOnNext(n -> log.info("# created data: {}", n))
            .publishOn(Schedulers.parallel())
            .map(result -> result + " processed")
            .doOnNext(n -> log.info("# mapped data: {}", n))
            .publishOn(Schedulers.parallel())
            .subscribe(data -> log.info("# consumed data: {}", data));
}

private static String doTask(int taskNumber) {
    return "task " + taskNumber + " result";
}
public static void main(String[] args) {
    int tasks = 6;

    Sinks.Many<String> unicastSink = Sinks.many().unicast().onBackpressureBuffer();
    Flux<String> fluxView = unicastSink.asFlux();
    IntStream
            .range(1, tasks)
            .forEach(n -> new Thread(() -> {
                unicastSink.emitNext(doTask(n), Sinks.EmitFailureHandler.FAIL_FAST);
                log.info("# emitted data: task {}", n);
            }).start());
    
    fluxView
            .publishOn(Schedulers.parallel())
            .map(result -> result + " processed")
            .doOnNext(n -> log.info("# mapped data: {}", n))
            .publishOn(Schedulers.parallel())
            .subscribe(data -> log.info("# consumed data: {}", data));
}

private static String doTask(int taskNumber) {
    return "task " + taskNumber + " result";
}

Sinks.One

Sinks.One<String> sinkOne = Sinks.one();
Mono<String> mono = sinkOne.asMono();

sinkOne.emitValue("Hello Reactor", Sinks.EmitFailureHandler.FAIL_FAST);

mono.subscribe(data -> log.info("Subscriber1: {}", data));
mono.subscribe(data -> log.info("Subscriber2: {}", data));
Sinks.One<String> sinkOne = Sinks.one();
Mono<String> mono = sinkOne.asMono();

sinkOne.emitValue("Hello Reactor", Sinks.EmitFailureHandler.FAIL_FAST);

sinkOne.emitValue("Hi Reactor", Sinks.EmitFailureHandler.FAIL_FAST);

mono.subscribe(data -> log.info("Subscriber1: {}", data));
mono.subscribe(data -> log.info("Subscriber2: {}", data));

Sinks.Many

Sinks.Many<Integer> unicastSink = Sinks.many().unicast().onBackpressureBuffer();
Flux<Integer> fluxView = unicastSink.asFlux();

unicastSink.emitNext(1, Sinks.EmitFailureHandler.FAIL_FAST);
unicastSink.emitNext(2, Sinks.EmitFailureHandler.FAIL_FAST);

fluxView.subscribe(data -> log.info("Subscriber1: {}", data));

unicastSink.emitNext(3, Sinks.EmitFailureHandler.FAIL_FAST);

fluxView.subscribe(data -> log.info("Subscriber2: {}", data));