Coroutine?
코루틴은 코틀린에서 제공하는 비동기 프로그래밍
을 위한 도구
코루틴을 사용하면 비동기 코드를 동기 코드처럼 쉽게 작성
할 수 있음
코루틴은 라이트웨이트 스레드
라고 할 수 있으며, 기존 스레드에 비해 생성 비용이 낮고, 컨텍스트 스위칭 비용이 적어 성능이 우수하다.
왜냐하면 코루틴은 비동기 작업을 쉽게 시작하고, 결과를 기다리며
, 중단할 수 있는
기능을 제공하기 때문.
따라서 코루틴으로 비동기 작업의 생명주기
를 효과적으로 관리할 수 있다.
Flow?
Flow(Cold flow)
플로우는 코틀린에서 제공하는 리액티브 프로그래밍
을 위한 도구.
플로우는 데이터 스트림을 표현하고, 이 스트림을 통해 데이터의 변화를 감지
하고 반응
할 수 있게 해줌
플로우는 cold stream
이라는 특징을 가지고 있어, 구독자가 구독을 시작하기 전까지는 데이터를 생성하지 않음 → 리소스를 효율적으로 사용할 수 있게 해준다.
→ flow를 collect메소드를 통해 구독하기 전까지는 데이터를 생성하지 않는다.
따라서 데이터를 동적으로 관리하고, 사용자 인터페이스를 실시간으로 업데이트하는 데 유용함.
fun fetchUserData(): Flow<UserData> = flow {
val data = queryUserData()
emit(data)
}
// 구독(collect)
fetchUserData().collect { data ->
showUserData(data)
}
SharedFlow(Hot Flow)
모든 collector
들에게 emit
이 전파된다. 그래서 모든 collector
들은 모두 emit value
를 가질 수 있다.
collector
의 유무와 상관없이 독립적으로 존재할 수 있기 때문에 hot flow
라고 불린다.
→ collector가 없어도 emit을 할 수 있다
Flow(Cold Flow)?
collect가 없으면 데이터가 생성되지 않는다.
반대로 일반적인 flow는 cold라고 불리며 각 collector에 의해 개별로 시작된다.
따라서 collector
가 있는 상황에서만 부르지 않으므로 메모리 측면에서는 좋다고 볼 수 없다.
최근 상태값을 공유해야하는 경우
, StateFlow라는 특수 SharedFlow를 사용
참고 flow → hot flow?
일반 cold Flow를 shareIn연산자를 통해 hot flow로 컨버팅해서 사용가능하다.
Complete 조건
특정 트리거가 없으면, 완료되지 않는다. shared flow의 구독자는 flow를 취소할 수 있다. 실행되는 코루틴을
cancel
함으로써 취소가능하다. 또한 Flow.take
나 Flow.takeWhile
을 연산자를 사용해서 완료할 수 있다.
간단한 Shared Flow 예시(event bus)
class EventBus {
private val _events = MutableSharedFlow<Event>() // private mutable shared flow
val events = _events.asSharedFlow() // publicly exposed as read-only shared flow
suspend fun produceEvent(event: Event) {
_events.emit(event) // suspends until all subscribers receive it
}
}
MutableSharedFlow()
생성자를 통해서 사용가능
MutableSharedFlow() 생성자 파라미터, Replay 캐시 & 버퍼
shared flow는 캐시와 버퍼를 통해 새로운 구독자는 emit된 최근 값을 받을 수 있다.
replay
= : 새로운 구독자에게 전달할 replay cache 수.
MutableSharedFlow.resetReplayCache를 사용해서 리셋 가능하다.extraBufferCapacity
= : emit한 데이터를 담을 버퍼의 크기.
추가 버퍼를 생성하여 emit했지만 로직 처리나 네트워크 이슈로 인해 처리속도가 느린 구독자가 해당 emit을 처리하지 못한 경우, emit 한 데이터가 버퍼에 유지되도록 할 수 있다. 이런 역할을 하는 버퍼의 크기onBufferOverflow
= 버퍼가 가득찼을때 수행할 전략.
ex.BufferOverflow.DROP_OLDEST
: 버퍼가 가득찼을 시 오래된 데이터 제거
버퍼를 사용하지 않는 shared flow
- 기본 구현: 매개변수 없이
MutableSharedFlow()
를 사용하여 생성된SharedFlow
는 기본적으로 버퍼가 없음. - 동작 방식: 이러한
SharedFlow
에emit
호출이 이루어지면, 모든 구독자가 발행된 값을 수신할 때까지emit
호출은 일시 중단 - 구독자 없음: 구독자가 없는 경우
emit
호출은 즉시 반환됨 → 즉, 구독자가 없으면 발행된 값은 즉시 손실 tryEmit
호출:tryEmit
는 구독자가 없을 때만 성공하고true
를 반환. 이 경우에도 발행된 값은 바로 손실
예시
kotlinCopy code
val unbufferedFlow = MutableSharedFlow<Int>()
// 구독자가 없으므로, 이 값은 즉시 손실됩니다.
unbufferedFlow.tryEmit(1) // true를 반환하며, 값은 손실됩니다.
// 구독자가 이 값을 수신하기 전까지 emit 호출은 일시 중단됩니다.
unbufferedFlow.emit(2) // 구독자가 없으면 즉시 반환됩니다.
구독자가 있는데, 너무 느리게 처리하는 경우 emit이 지연되는 상황이 오므로 반드시 buffer를 둬야한다.
동시성
shared flow
의 모든 메서드는 thread-safe
하고, 외부 동기화없이
병렬 코루틴으로부터 안전하게 실행 가능하다.
Channel Flow
- 콜드 플로우(Cold Flow):
channelFlow
로 생성된Flow
는 콜드 플로우. 이는Flow
에 종단 연산자(terminal operator)가 적용될 때마다block
이 호출된다는 것을 의미. - 동시성 지원:
channelFlow
는 다른 컨텍스트나 동시에 실행되는 코드에서 요소를 생산할 수 있게 해줌. 이를 통해 복잡한 비동기 또는 멀티스레딩 환경에서 유연하게 데이터를 생성하고 처리할 수 있음. - 스레드 안전성 및 컨텍스트 보존: 제공된
ProducerScope
는 다양한 컨텍스트에서 동시에 안전하게 사용될 수 있음.
예시
channelFlow
를 사용하는 일반적인 예시로는 여러 플로우를 병합하거나, 다른 디스패처를 사용하여 데이터를 발행하는 경우 등이 있음.
// 여러 플로우 병합 예시
fun <T> Flow<T>.merge(other: Flow<T>): Flow<T> = channelFlow {
launch { collect { send(it) } }
other.collect { send(it) }
}
// 다른 디스패처에서 데이터 발행
fun <T> contextualFlow(): Flow<T> = channelFlow {
launch(Dispatchers.IO) { send(computeIoValue()) }
launch(Dispatchers.Default) { send(computeCpuValue()) }
}
Flow
와 channelFlow
의 차이점
- 동시성 처리:
Flow
는 기본적으로 순차적으로 데이터를 처리하지만,channelFlow
는 동시성을 지원하여 여러 코루틴 컨텍스트에서 데이터를 동시에 발행 가능. - 구현 복잡성:
channelFlow
는 다양한 컨텍스트에서 복잡한 데이터 생성 로직을 구현할 수 있는 반면, 기본Flow
는 보다 단순한 사용 사례에 적합함. - 용도:
channelFlow
는 외부 콜백 기반 API와의 통합, 다중 소스에서의 데이터 병합, 다양한 스레드에서의 데이터 발행 등과 같은 더 복잡한 시나리오에 적합함.
ChannelFlow는 멀티스레딩 환경에서 다양한 데이터 소스로부터 데이터를 수집하고 병합하는 복잡한 상황에 특히 유용하다. 그러나 구현이 기본 Flow보다 복잡할 수 있으므로, 간단한 데이터 스트림에는 기본 Flow를 사용하는 것이 더 적합할 수 있다.
Reference
SharedFlow
SharedFlow A hot Flow that shares emitted values among all its collectors in a broadcast fashion, so that all collectors get all emitted values. A shared flow is called hot because its active instance exists independently of the presence of collectors. Thi
kotlinlang.org
[Coroutine Flow] 1. Flow란 무엇인가?
Coroutine의 Flow는 데이터 스트림이며, 코루틴 상에서 리액티브 프로그래밍 지원 하기 위한 구성요소이다. 이를 이해하기 위해서는 먼저 리액티브(반응형) 프로그래밍이 무엇인지, 그리고 Flow가 리
kotlinworld.com