publishOn() : Operator 체인에서 Downstream Operator의 실행을 위한 스레드를 지정한다
Flux.fromArray(new Integer[] {1, 3, 5, 7})
.doOnNext(data -> log.info("fromArray: {}", data))
.publishOn(Schedulers.parallel())
.filter(data -> data > 3)
.doOnNext(data -> log.info("filter: {}", data))
.map(data -> data * 10)
.doOnNext(data -> log.info("map: {}", data))
.subscribe(data -> log.info("subscribe: {}", data));
publishOn() 에서 Scheduler를 지정하게 되면 지정된 Scheduler가 스레드를 하나 생성해서 Downstream, 즉 하위 Operator Chain의 로직들을 수행하도록 한다subscribeOn() : 최상위 Upstream Publisher의 실행을 위한 스레드를 지정한다. 즉, 원본 데이터 소스를 Emit 하기 위한 Scheduler 를 지정한다
Flux.fromArray(new Integer[] {1, 3, 5, 7})
.subscribeOn(Schedulers.boundedElastic())
.doOnNext(data -> log.info("fromArray: {}", data))
.filter(data -> data > 3)
.doOnNext(data -> log.info("filter: {}", data))
.publishOn(Schedulers.parallel())
.map(data -> data * 10)
.doOnNext(data -> log.info("map: {}", data))
.subscribe(data -> log.info("subscribe: {}", data));
fromArray 이다parallel() : Downstream 에 대한 데이터 처리를 병렬로 분할 처리하기 위한 스레드를 지정한다

Flux.fromArray(new Integer[] {1, 3, 5, 7, 9, 11, 13, 15})
.parallel()
.subscribe(data -> log.info("parallel: {}", data));
// Console
14:12:45.529 [main] INFO com.example.springreactive.section07.ParallelExample01 -- parallel: 1
14:12:45.530 [main] INFO com.example.springreactive.section07.ParallelExample01 -- parallel: 3
14:12:45.530 [main] INFO com.example.springreactive.section07.ParallelExample01 -- parallel: 5
14:12:45.530 [main] INFO com.example.springreactive.section07.ParallelExample01 -- parallel: 7
14:12:45.530 [main] INFO com.example.springreactive.section07.ParallelExample01 -- parallel: 9
14:12:45.530 [main] INFO com.example.springreactive.section07.ParallelExample01 -- parallel: 11
14:12:45.530 [main] INFO com.example.springreactive.section07.ParallelExample01 -- parallel: 13
14:12:45.530 [main] INFO com.example.springreactive.section07.ParallelExample01 -- parallel: 15
main 스레드에서 처리 한 것을 확인할 수 있다parallel() Operator 만 추가했다고 해서 병렬로 처리하는 것이 아니기 때문Flux.fromArray(new Integer[] {1, 3, 5, 7, 9, 11, 13, 15})
.parallel()
.runOn(Schedulers.parallel())
.subscribe(data -> log.info("parallel: {}", data));
// Console
14:11:39.461 [parallel-7] INFO com.example.springreactive.section07.ParallelExample02 -- parallel: 13
14:11:39.461 [parallel-3] INFO com.example.springreactive.section07.ParallelExample02 -- parallel: 5
14:11:39.461 [parallel-4] INFO com.example.springreactive.section07.ParallelExample02 -- parallel: 7
14:11:39.461 [parallel-6] INFO com.example.springreactive.section07.ParallelExample02 -- parallel: 11
14:11:39.461 [parallel-2] INFO com.example.springreactive.section07.ParallelExample02 -- parallel: 3
14:11:39.461 [parallel-1] INFO com.example.springreactive.section07.ParallelExample02 -- parallel: 1
14:11:39.461 [parallel-8] INFO com.example.springreactive.section07.ParallelExample02 -- parallel: 15
14:11:39.461 [parallel-5] INFO com.example.springreactive.section07.ParallelExample02 -- parallel: 9
parallel 스레드에서 병렬로 작업을 처리하는 것을 Console 에서 확인할 수 있다Flux.fromArray(new Integer[] {1, 3, 5, 7, 9, 11, 13, 15, 17, 19})
.parallel(4)
.runOn(Schedulers.parallel())
.subscribe(data -> log.info("parallel: {}", data));
// Console
14:15:46.675 [parallel-2] INFO com.example.springreactive.section07.ParallelExample03 -- parallel: 3
14:15:46.675 [parallel-1] INFO com.example.springreactive.section07.ParallelExample03 -- parallel: 1
14:15:46.675 [parallel-3] INFO com.example.springreactive.section07.ParallelExample03 -- parallel: 5
14:15:46.676 [parallel-2] INFO com.example.springreactive.section07.ParallelExample03 -- parallel: 11
14:15:46.676 [parallel-2] INFO com.example.springreactive.section07.ParallelExample03 -- parallel: 19
14:15:46.675 [parallel-4] INFO com.example.springreactive.section07.ParallelExample03 -- parallel: 7
14:15:46.676 [parallel-1] INFO com.example.springreactive.section07.ParallelExample03 -- parallel: 9
14:15:46.676 [parallel-3] INFO com.example.springreactive.section07.ParallelExample03 -- parallel: 13
14:15:46.677 [parallel-1] INFO com.example.springreactive.section07.ParallelExample03 -- parallel: 17
14:15:46.677 [parallel-4] INFO com.example.springreactive.section07.ParallelExample03 -- parallel: 15
parallel() Operator 에 숫자값을 대입 해 주었다