[Kotlin] 스터디 Kotlin Coroutine: Deep Dive 13-14장
13-14장(2부) 에 해당하는 내용입니다.
13장. 코루틴 스코프 만들기
interface CoroutineScope {
val coroutineContext: CoroutineContext
}
CoroutineScope 는 CoroutineContext를 가진 인터페이스로, 이를 구현하여 코루틴 스코프를 만들 수 있습니다.
CoroutineScope을 사용하고자 하는 클래스에서 아래와 같이 직접 구현할 수도 있지만, cancel 등의 함수를 호출하면 더 이상 코루틴을 시작할 수 없다는 문제가 발생하기 때문에, 직접 구현보다는 클래스 내에 CoroutineScope 객체를 가지도록 개발하는 방법을 이용합니다.
class Test: CoroutineScope {
override val coroutineContext: CoroutineContext = Job()
// ...
}
CoroutineScope 팩토리 함수
context를 인자로 받는 아래 CoroutineScope 팩토리 함수를 이용하면 쉽게 코루틴 스코프 생성이 가능합니다.
fun CoroutineScope(
context: CoroutineContext
): CoroutineScope =
ContextScope(
if(context[Job] != null) context
else context + Job()
)
internal class ContextScope(
context: CoroutineContext
): CoroutineScope {
// ...
}
14장. 공유 상태로 인한 문제
동기화의 필요성
서로 다른 스레드에서 동작하는 코루틴들이 동시에 접근하여 상태를 조작하는 경우 공유 상태 문제
가 발생할 수 있습니다.
간단한 예시인 카운팅을 살펴보면
var count = 0
suspend fun increaseCount() {
withContext(Dispatchers.Default) {
repeat(1000) {
launch {
repeat(1000) { count ++ }
}
}
}
}
- increaseCount 내에서 1000번 launch
- launch 로 생성된 코루틴 내에서 1000번 count 증가
위 코드에서 원하는 결과는 count = 10000000 입니다.
하지만 서로 다른 스레드에서 동시에 count를 증가시키게 되면
- 스레드 A 에서 count = 0 접근
- 스레드 B 에서 count = 0 접근, count = 1로 증가
- 스레드 A 에서 이미 가져온 count = 0 을 1로 증가시키고, count 값을 반영함
이 경우, 최종 count는 1000000 보다 작은 값이 됩니다.
때문에, 공유 상태에 대한 동기화가 필요합니다.
동기화 방법
syncronized
- 값이 변경되는 부분을 syncronized 블록으로 감싸주는 방법
- syncronized 의 인자로 들어가는 lock을 가진 객체만 블록 내부에 접근이 가능
suspend fun increaseCount() {
withContext(Dispatchers.Default) {
repeat(1000) {
launch {
repeat(1000) {
syncronized(lock) {
count ++
}
}
}
}
}
}
- 문제점
- syncronized는 스레드를 블로킹한다는 것 → 자원의 낭비
- syncronized 블록 내에서 중단 함수 사용 불가
자바의 전통적인 방식인 syncronized를 사용하여 문제를 해결할 수는 있지만, 좀 더 코루틴에 맞는 블로킹없이 중단하거나 충돌을 피하는 방법을 사용해야 합니다.
Atomic
- AtomicInteger, AtomicBoolean 등 값이 간단한 경우
- 스레드를 블로킹하지 않는 방식
- 멀티 스레딩 환경에서는 성능향상을 위해 CPU 캐시 값을 보게 되는데, Atomic은 캐시 값과 실제로 메모리에 저장된 값을 동기화
- CAS(Compare And Swap) 알고리즘
- 기존 값(Compared Value)과 변경할 값(Exchanged Value)을 전달하고,
- 기존 값(Compared Value)이 현재 메모리가 가지고 있는 값(Destination)과 같다면 값 변경
var count = AtomicInteger()
suspend fun increaseCount() {
withContext(Dispatchers.Default) {
repeat(1000) {
launch {
repeat(1000) { counter.incrementAndGet() }
}
}
}
}
- 문제점
- 여러 변수를 공유하는 경우에는 적절하지 않음
Single Thread Dispatcher
- limitedParallelism을 이용해 싱글 스레드를 사용하는 디스패처 생성
- 상태를 변경하는 구문들을 withContext(dispatcher)로 래핑하여 하나의 스레드만 값에 접근하도록 제한
val dispatcher = Dispatcher.IO.limitedParallelism(1)
var count = 0
suspend fun increaseCount() {
withContext(Dispatchers.Default) {
repeat(1000) {
launch {
repeat(1000) {
withContext(dispatcher) { count ++ }
}
}
}
}
}
Mutex
- lock을 가진 코루틴만 크리티컬 섹션에 접근할 수 있고, 나머지 코루틴은 순차적으로 큐에 들어감
- syncronized와 달리 스레드를 블로킹하지 않고 코루틴을 중단 시키는 방식
- lock, unlock을 직접 호출할 수 있지만, 안전하게 withLock을 사용하자
val mutex = Mutex()
var count = 0
suspend fun increaseCount() {
withContext(Dispatchers.Default) {
repeat(1000) {
launch {
repeat(1000) {
mutex.withLock { count ++ }
}
}
}
}
}
// 구현되어 있는 Mutex
public suspend inline fun <T> Mutex.withLock(owner: Any? = null, action: () -> T): T {
contract {
callsInPlace(action, InvocationKind.EXACTLY_ONCE)
}
lock(owner)
try {
return action()
} finally {
unlock(owner)
}
}
- 단점
- lock을 두번 사용하면 withLock { withLock {} } 교착 상태에 진입
-
코루틴이 중단된 경우 lock을 해제할 수 없음
→ lock이 필요한 크리티컬 섹션 내에서는 중단 함수 사용을 피하기
- Semaphore
- Mutex가 한번에 하나의 코루틴만 크리티컬 섹션에 접근 가능하다면, Semaphore는 여러 코루틴이 동시 접근 가능
- 실제로 공유 상태 문제를 해결하는 방법은 아님
- Semaphore(1) == Mutex
Actor
- 상태 접근은 단일 스레드로 제한하고, 다른 코루틴에서는 Channel을 통해 상태 변경을 요청
val c = actor {
for (msg in channel) {
when(msg) {
Increase -> count ++
}
}
}
// send messages to the actor
c.send(...)
...
// stop the actor when it is no longer needed
c.close()
Comments