Operators 종류
새로운 Sequence 생성 | 기존 Sequence 작업 | Sequence내부동작 확인 | Sequence데이터 필터링 | 에러 핸들링 |
Creating | Transforming | Peeking | Filtering | Handling errors |
|
|
|
|
|
* 이후 테스트는 맨 아래에 있는 샘플데이터 클래스 두 개를 사전에 작성후 테스트 진행
Creating
1. fromStream() : Java의 Stream을 입력으로 전달받아 emit.
* 예시코드 : Stream의 인자를 emit해서 더함
public class exampleCode1 {
public static void main(String[] args) {
Flux
.fromStream(Stream.of(2, 3, 4, 5, 6)) // (1)
.reduce((a, b) -> a + b) // (2)
.subscribe(System.out::println);
}
}
2. fromIterable() : Java의 Iterable을 입력으로 전달받아 emit. List, Map, Set 등 컬렉션을 파라미터로 전달
* 예시코드 : sampleData내 리스트를 emit. 조건에 맞는 데이터를 순차적으로 출력
@Slf4j
public class FromIterableExample01 {
public static void main(String[] args) {
Flux
.fromIterable(SampleData.coffeeList)
.subscribe(coffee -> log.info("{} : {}", coffee.getKorName(), coffee.getPrice()));
}
}
3. create() : 프로그래밍 방식으로 Signal(이벤트)를 발생시킴. 한 번에 여러 건의 데이터를 비동기적으로 emit 가능
* 일반적으로 Publisher가 데이터를 emit할 경우, onNext Signal 이벤트를 전송한다라고 표현함
* 예제코드
1) create로 FluxSink라는 람다 표현식을 받음(파라미터는 람다 파라미터) (발주따오는 애)
2) onRequest() :subscriber가 요청하면 작업 시작시킴 (작업반장)
3) .next() : 들어오는 리소스(List resource) emit (방출담당)
4) .complete() : 리소스를 모두 emit했으므로, Sequence 종료시킴 (시마이 담당)
5) .onDispose() : Sequence가 닫히기 직전에 호출되어 후처리 작업함 (연장업무 담당)
@Slf4j
public class CreateExample {
private static List<Integer> source = Arrays.asList(1,2,3,4,5,8,7,18,69);
public static void main(String[] args) {
Flux
.create((FluxSink<Integer> sink)-> {
sink.onRequest(n -> {
for (int i = 0; i < source.size(); i++) {
sink.next(source.get(i));
}
sink.complete();
});
sink.onDispose(() -> log.info("# clean up"));
}).subscribe(data->log.info("# onNext: {}",data));
}
}
Transforming
1. flatMap() : 내부로 들어오는 데이터 한 건당 새로운 Sequence 생성
* Inner Sequence : flatMap() 내부에서 정의하는 Sequence
* 단, 쓰레드 생성 및 처리에 순서는 보장되지 않음
* 예제 코드 : 구구단
1) 외부 쓰레드 하나당 데이터는 dan으로 지정. 2부터 8카운트까지 데이터 emit
2) 내부 쓰레드 하나당 데이터는 num으로 지정. 1부터 9카운트까지 데이터 emit
3) dan으로 emit 한 데이터 * num으로 emit한 데이터를 log로 출력
@Slf4j
public class FlatMapExample01 {
public static void main(String[] args) throws InterruptedException{
Flux
.range(2, 8)
.flatMap(dan -> Flux
.range(1,9)
.publishOn(Schedulers.parallel())
.map(num -> dan + " x " + num + " = " + dan * num))
.subscribe(log::info);
Thread.sleep(100L);
}
}
2. concat() : 입력으로 전달하는 Publisher의 Sequence를 연결해서 차례대로 데이터 emit (Sequence를 순서대로 통합)
* 예제 코드 1 : 두 개의 Flux Sequence를 concat()을 통해 순서대로 붙여서 하나의 Sequence로 동작함
@Slf4j
public class ConcatExample01 {
public static void main(String[] args) {
Flux
.concat(Flux.just("Monday", "Tuesday", "Wednesday", "Thursday", "Friday"),
Flux.just("Saturday", "Sunday"))
.subscribe(log::info);
}
}
* 예제 코드 2 : 샘플데이터에서 각각의 쓰레드를 열어 데이터를 불러와서 > 하나의 쓰레드로 합친 뒤 > 합산
@Slf4j
public class ConcatExample02 {
public static void main(String[] args) {
Flux
.concat(Flux.fromIterable(SampleData.salesOfCafeA),
Flux.fromIterable(SampleData.salesOfCafeB),
Flux.fromIterable(SampleData.salesOfCafeC))
.reduce((a,b) -> a+b)
.subscribe(data -> log.info("# total sales: {}", data));
}
}
3. zip() : 각각의 Publisher Sequence에서 emit된 데이터를 하나씩 전달받아 새로운 데이터를 만든 후 결합.
* 예제 코드
1) .interval() : 파라미터로 지정한 시간 간격으로 0부터 1씩 증가하는 숫자 emit
2) .take() : 파라미터로 지정한 숫자만큼만 데이터 emit
3) .zip() : 앞에거 먼저 끝나도 뒤에거 차례 안끝났으면 기다렸다가 같이 처리하고 다시 일함
@Slf4j
public class ZipExample01 {
public static void main(String[] args) throws InterruptedException {
Flux<Long> source1 = Flux.interval(Duration.ofMillis(200L)).take(4);
Flux<Long> source2 = Flux.interval(Duration.ofMillis(400L)).take(6);
Flux
.zip(source1, source2, (data1, data2) -> data1 + data2)
.subscribe(data -> log.info("# onNext: {}",data));
Thread.sleep(3000L);
}
}
interval (sec) |
0.0 | 0.2 | 0.4 | 0.6 | 0.8 | 1.0 | 1.2 | 1.4 | 1.6 | 1.8 | 2.0 | 2.2 | 2.4 | ... |
source1 | 0 | 1 | 2 | 3 (stop) |
... | |||||||||
source2 | 0 | 1 | 2 | 3 | 4 | 5 (stop) |
... | |||||||
zip | 0 | 2 | 4 | 6 (Stop) |
Peeking
1. doOnNext() : 데이터 emit시 트리거 되어 side-effect를 추가할 수 있음. 리턴값 없음. 주로 로깅에 사용.
* 함수형 프로그래밍에서의 side-effect : 어떤 동작을 실행하되 리턴값이 없는 것을 의미
* 예시 코드 : 로그를 통해 데이터에 대한 유효성 검증을 진행
@Slf4j
public class DoOnNextExample01 {
public static void main(String[] args) {
Flux
.fromIterable(SampleData.coffeeList)
.doOnNext(coffee -> validateCoffee(coffee))
.subscribe(data -> log.info("{} : {}", data.getKorName(), data.getPrice()));
}
// 유효성 검증용 void 메서드
private static void validateCoffee(Coffee coffee) {
if(coffee == null) {
throw new RuntimeException("Not found coffee");
}
}
}
2. log() : Publisher에서 발생하는 Signal을 로그로 출력
* 예제 코드 : Stream에 대한 log출력
public class LogExample01 {
public static void main(String[] args) {
Flux
.fromStream(Stream.of(2,3,4,5,6))
.log()
.reduce((a,b)-> a+b)
.log()
.subscribe(System.out::println);
}
}
Error
1. error() : 의도적으로 onError Signal을 발생시킬 때 사용
* 예제 코드 : 유효하지 않은 커피 객체(null)을 전달받아 onError Signal 발생 > 에러객체를 전달받아 에러 메세지 출력
@Slf4j
public class ErrorExample01 {
public static void main(String[] args) {
Mono.justOrEmpty(findVerifiedCoffee())
.switchIfEmpty(Mono.error(new RuntimeException("Not found coffee")))
.subscribe(
data-> log.info("{} : {}", data.getKorName(), data.getPrice()),
error-> log.error("# onError: {}", error.getMessage())
);
}
private static Coffee findVerifiedCoffee() {
// DB에서 Coffee 정보를 조회하는 로직 작성
return null;
}
}
2. timeout() : 주어진 시간동안 emit되는 데이터가 없으면 onError Signal 발생시킴
3. retry() : Sequence상에서 에러가 발생하면, 주어진 숫자만큼 재구독해서 Sequence 다시 시작
* 예제코드 : 주석 참조
@Slf4j
public class TimeoutRetryExample01 {
public static void main(String[] args) throws InterruptedException {
getCoffees()
.collect(Collectors.toSet()) // 커피 목록을 세트로 받아옴
.subscribe(bookSet -> bookSet.stream()
.forEach(data -> log.info("{} : {}", data.getKorName(), data.getPrice())));
Thread.sleep(12000); // 12초뒤 모든 스레드 종료
}
private static Flux<Coffee> getCoffees () {
final int [] count = {0};
return Flux
.fromIterable(SampleData.coffeeList)
.delayElements(Duration.ofMillis(500)) // 0.5초 대기시간
.map(coffee -> {
try { // 기본 실행 부분
count[0]++; // count변수를 두고,
if(count[0] ==3 ) { // count가 3이 쌓일때까지 해보고
Thread.sleep(2000); // 안되면 스레드 2초뒤 종료
}
}
catch (InterruptedException e) { // 위에서 안되면 예외가 발생한 것으로 간주
}
return coffee; // 어쨋거나 coffee객체는 반환
})
.timeout(Duration.ofSeconds(2)) // 2초 대기 했다가
.retry(1) // 1번 더 재도전
.doOnNext(coffee -> log.info("# getCoffees > doOnNext: {}, {}", coffee.getKorName(), coffee.getPrice()));
}
}
* 로그 gif : 설정한 시간의 delay를 두고 로그가 뜨며, retry를 통한 새로운 sequence로 delay가 생김.
이후 12초 후 쓰레드 종료
'Codestates [Back-end] > 데일리 로그 [TIL]' 카테고리의 다른 글
22.10.17 SpringBoot - JRE를 이용한 WAR 실행 파일 빌드 및 실행 (0) | 2022.10.17 |
---|---|
22.10.14 Spring WebFlux - Spring WebFlux (0) | 2022.10.14 |
22.10.12 Spring WebFlux - 스케줄러 (0) | 2022.10.13 |
22.10.12 Spring Webflux - ProjectReactor / 마블 다이어그램 (0) | 2022.10.12 |
22.10.12 Spring WebFlux - 기본 코드 구조 / MVC vs WebFlux (0) | 2022.10.12 |