์๋ ํ์ธ์
์ค๋์ ์ ๋ฒ ํฌ์คํ ์ ์ด์ด์
์ฝ๋ฃจํด Channel์ ๋ํด ๊ณต๋ถํ ๋ด์ฉ์ ์ ๋ฆฌํด๋ณด๋ ค ํฉ๋๋ค.

์ฝ๋ฃจํด์ ๊ธฐ๋ณธ ์ฌ์ฉ๋ฐฉ๋ฒ์ ์ด์ ํฌ์คํ ์์ ํ์ธํ ์ ์์ผ๋ ํจ๊ป ํ์ธํด๋ณด์ธ์ :)
[Kotlin] ์ฝํ๋ฆฐ Coroutine(์ฝ๋ฃจํด)์ ๋ํ์ฌ (1) - CoroutineScope์ CoroutineContext ๋ฐ ๊ธฐ๋ณธ ์ฌ์ฉ๋ฐฉ๋ฒ
์๋ ํ์ธ์ ์ค๋์ ์ฝํ๋ฆฐ์ Coroutine์ ๋ํ์ฌ ํฌ์คํ ํด๋ณด๋ ค ํฉ๋๋ค. Coroutine์ด๋? ํ๋ก๊ทธ๋๋ฐ์์ ํ๋์ ์์ ๋จ์๋ฅผ Routine์ด๋ผ ํ๋๋ฐ ๊ฐ Routine์ด ํ์ (Co)ํ์ฌ ์์ ์ ์ฒ๋ฆฌํ๋ ๊ณผ์ ์ ํฉ์น๊ฒ
devyo-111commit.tistory.com
์ฝ๋ฃจํด Channel(์ฑ๋)์ด๋?
2๊ฐ ์ด์์ ์ฝ๋ฃจํด ์ฌ์ด์ ์ฐ๊ฒฐ๋ ์์ฐ์์ ์๋น์์ ๊ด๊ณ๋ฅผ ์ ์ํ๋ ๊ฒ์ ์๋ฏธํ๋๋ฐ
์ฆ, ์ผ์ข ์ ํ์ดํ ๋ผ์ธ์ผ๋ก์จ ํ์ชฝ์์ ์ฑ๋์ ์ด๋ฉด ๋ค๋ฅธ ์ชฝ์์๋ ํด๋น ๊ฐ์ ๋ฐ์ ์ฒ๋ฆฌํ๋ ๊ฐ๋ ์ ๋๋ค.
๊ฐ๋จํ ์์ ๋ฅผ ๋ณด๋ฉฐ ์ค๋ช ๋๋ฆฌ๊ฒ ์ต๋๋ค.
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlin.math.roundToInt
fun main(){
runBlocking {
val channelData = listOf(3.3 , 2.2 , 13.7 , 5.5)
val channel = Channel<Double>()
launch(Dispatchers.Default) {
channelData.forEach{
println("data : $it")
channel.send(it * it)
}
}
launch {
repeat(channelData.size){
delay(500)
println("Channel received : ${channel.receive().roundToInt()}")
// ๋ณด๋ด์ง ์์๋๋ก ๋ฐํ ํ๋๊ฒ์ ํ์ธ ํ ์ ์๋ค.
}
}
}
}
์ ์ฝ๋๋ ๋ฆฌ์คํธ์ ์ ์ฅ๋ ๊ฐ์ ์ฑ๋์ ํตํด ์ก/์์ ํ๋ ์ฝ๋๋ก
์ฒซ ๋ฒ์งธ ์ฝ๋ฃจํด์ ๋ฆฌ์คํธ์ ๊ฐ์ ์ ๊ณฑํ์ฌ ์ฑ๋์ ๋ณด๋ด๋ ์ญํ ์ ์ํํ๊ณ
๋ ๋ฒ์งธ ์ฝ๋ฃจํด์ ์ฑ๋์์ ์์ ๋ ๊ฐ์ ๊บผ๋ด์ print ํ๋ ์ญํ ์ ์ํํฉ๋๋ค.
๊ฒฐ๊ณผ๋ฅผ ๋ณด๋ฉด ์๊ฒ ์ง๋ง ์ฑ๋์ ํตํด ๊ฐ์ ์์ ์์๋ Stream ํ๋๋ก ์ ์ก๋๊ธฐ ๋๋ฌธ์ ํญ์ ์์๊ฐ ๋ณด์ฅ๋ฉ๋๋ค.
Buffered Channel
์ฝ๋ฃจํด Channel์ ์์ฑํ ๋ ์์ฑ์์ ํ๋ผ๋ฏธํฐ๋ฅผ ์ฃผ๊ฒ ๋๋ฉด ์ฑ๋์ ๋ฐ์ดํฐ๋ฅผ ๋ด์ ์ ์๋ ๋ฒํผ์ ์ฌ์ด์ฆ๋ฅผ ์ง์ ํ ์ ์์ต๋๋ค.
๋ฟ๋ง ์๋๋ผ, BufferOverFlow์ ์ ์๋ ์์ฑ์ ํตํด ๋ฒํผ๊ฐ ๊ฐ๋ ์ฐจ๋ฉด ์ด๋ป๊ฒ ๊ฐ์ ๋ณด๋ผ ๊ฑด์ง์ ๋ํ ํ๋๋ค๋ ์ ์ํ ์ ์์ต๋๋ค.
( ๊ธฐ๋ณธ์ ์ผ๋ก Coroutine์ ๋ฒํผ๊ฐ ์ด๊ณผ๋๋ฉด ๋จผ์ ๋ฒํผ์ ๋ค์ด์จ ์ค๋๋ ๊ฐ์ ๋ฒ๋ฆฐ๋ค. )
/** ์์ฑ์๋ฅผ ํตํด ๋ฒํผsizes๋ 5 ๋ฒํผ๊ฐ ๊ฐ๋์ฐผ์๋๋ ์ดํ์ ๋ค์ด์ค๋ ๋ฐ์ดํฐ๋ฅผ ๋ฒ๋ฆฌ๋๋ก ์ค์ **/
val channel = Channel<Int>(5 , BufferOverflow.DROP_LATEST)
์์ฑ | ์ค๋ช |
BufferOverflow.DROP_OLDEST
|
๋ฒํผ๊ฐ ๊ฐ๋์ฐจ๋ฉด ์ค๋๋ ๊ฐ(์ด์ ์ ๋ด๊ณ ์๋ ๊ฐ)์ ๋ฒ๋ฆฐ๋ค. |
BufferOverflow.DROP_LATEST
|
๋ฒํผ๊ฐ ๊ฐ๋์ฐจ๋ฉด ์๋ก์ด ๊ฐ(์๋กญ๊ฒ ์ ์ก๋ ๊ฐ)์ ๋ฒ๋ฆฐ๋ค. |
BufferOverflow.SUSPEND
|
๋ฒํผ๊ฐ ๊ฐ๋์ฐจ๋ฉด idle์ํ๊ฐ ๋์๋ค๊ฐ ๋ค์ ์ ์กํ๋ค. |
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
fun main() {
runBlocking {
val channel = Channel<Int>(5 , BufferOverflow.DROP_LATEST)
//val channel = Channel<Int>(5 , BufferOverflow.DROP_OLDEST)
//val channel = Channel<Int>(5 , BufferOverflow.SUSPEND)
launch {
(1..50).forEach{
println("Send : $it")
channel.send(it)
}
}
delay(2000L) // ์ง์ฐ์ ํตํด ์ธ์์ ์ผ๋ก ๋ฒํผ๊ฐ ์ฐจ๋๋ก ํ๋ค.
val job = launch {
val channelIterator = channel.iterator()
while (channelIterator.hasNext()){
println("Received : ${channelIterator.next()}")
}
channel.close()
}
job.join()
job.cancel()
}
}
์ ์์ ๋ ์ฑ๋์ 5๊ฐ์ ๋ฒํผ ์ฌ์ด์ฆ๋ฅผ ์ ์ํด์ฃผ๊ณ 1~10๊น์ง์ ๊ฐ์ ์ฑ๋์ ๋ณด๋ด์ค ํ
BufferOverFlow ๋ฐ์ ์์ ๋ฒํผ๊ฐ ๊ฐ๋ ์ฐจ๋ฉด ์ต๊ทผ์ ์ ์ก๋ ๊ฐ๋ค์ ๋ฒ๋ฆฌ๋๋ก ํ๋ ์ฝ๋์ ๋๋ค.
๊ฐ ์์ฑ์ ๋ณ๊ฒฝํ๋ฉด ๋ค๋ฅธ ๊ฒฐ๊ด๊ฐ์ ์ป์ ์ ์์ต๋๋ค.
BufferOverflow.DROP_LATEST
BufferOverflow.DROP_OLDEST
BufferOverflow.SUSPEND
์ด์์ผ๋ก ์ฝ๋ฃจํด ์ฑ๋์ ๋ํ ํฌ์คํ ์ ๋ง์น๊ฒ ์ต๋๋ค.
๋ค์ ํฌ์คํ ์ ์ฝ๋ฃจํด ํํ ๋ฆฌ์ผ ๋ง์ง๋ง ํฌ์คํ ์ผ๋ก
์ฝ๋ฃจํด Flow์ ๋ํด์ ๊ณต๋ถํ ๋ด์ฉ์ ํฌ์คํ ํด๋ณด๋ ค ํฉ๋๋ค.
๊ทธ๋ผ ๋ค์ ํฌ์คํ ์์ ๋ต๊ฒ ์ต๋๋ค. ๐
์ค๋๋ ์ฆ์ฝํ์ธ์ :)