๊พธ์ค€ํ•จ์ด ์ง„๋ฆฌ๋‹ค!!

์–ด์ œ๋ณด๋‹ค ๋ฐœ์ „ํ•œ ์˜ค๋Š˜์ด ๋˜๊ณ ํ”ˆ ๐Ÿง‘๐Ÿปโ€๐Ÿ’ป ์˜ ๋ธ”๋กœ๊ทธ

Tutorial/tutorial

[Kotlin] ์ฝ”ํ‹€๋ฆฐ Coroutine(์ฝ”๋ฃจํ‹ด)์— ๋Œ€ํ•˜์—ฌ (2) - ์ฝ”๋ฃจํ‹ด Channel(์ฑ„๋„)

๋ށ์š” 2022. 10. 11. 22:19

 

์•ˆ๋…•ํ•˜์„ธ์š”

์˜ค๋Š˜์€ ์ €๋ฒˆ ํฌ์ŠคํŒ…์— ์ด์–ด์„œ

์ฝ”๋ฃจํ‹ด Channel์— ๋Œ€ํ•ด ๊ณต๋ถ€ํ•œ ๋‚ด์šฉ์„ ์ •๋ฆฌํ•ด๋ณด๋ ค ํ•ฉ๋‹ˆ๋‹ค.

์ฝ”๋ฃจํ‹ด์˜ ๊ธฐ๋ณธ ์‚ฌ์šฉ๋ฐฉ๋ฒ•์€ ์ด์ „ ํฌ์ŠคํŒ…์—์„œ ํ™•์ธํ•  ์ˆ˜ ์žˆ์œผ๋‹ˆ ํ•จ๊ป˜ ํ™•์ธํ•ด๋ณด์„ธ์š” :)

 

 

[Kotlin] ์ฝ”ํ‹€๋ฆฐ Coroutine(์ฝ”๋ฃจํ‹ด)์— ๋Œ€ํ•˜์—ฌ (1) - CoroutineScope์™€ CoroutineContext ๋ฐ ๊ธฐ๋ณธ ์‚ฌ์šฉ๋ฐฉ๋ฒ•

์•ˆ๋…•ํ•˜์„ธ์š” ์˜ค๋Š˜์€ ์ฝ”ํ‹€๋ฆฐ์˜ Coroutine์— ๋Œ€ํ•˜์—ฌ ํฌ์ŠคํŒ…ํ•ด๋ณด๋ ค ํ•ฉ๋‹ˆ๋‹ค. Coroutine์ด๋ž€? ํ”„๋กœ๊ทธ๋ž˜๋ฐ์—์„œ ํ•˜๋‚˜์˜ ์ž‘์—… ๋‹จ์œ„๋ฅผ Routine์ด๋ผ ํ•˜๋Š”๋ฐ ๊ฐ Routine์ด ํ˜‘์—…(Co)ํ•˜์—ฌ ์ž‘์—…์„ ์ฒ˜๋ฆฌํ•˜๋Š” ๊ณผ์ •์„ ํ•ฉ์น˜๊ฒŒ

devyo-111commit.tistory.com

 

์ฝ”๋ฃจํ‹ด Channel(์ฑ„๋„)์ด๋ž€?

2๊ฐœ ์ด์ƒ์˜ ์ฝ”๋ฃจํ‹ด ์‚ฌ์ด์— ์—ฐ๊ฒฐ๋œ ์ƒ์‚ฐ์ž์™€ ์†Œ๋น„์ž์˜ ๊ด€๊ณ„๋ฅผ ์ •์˜ํ•˜๋Š” ๊ฒƒ์„ ์˜๋ฏธํ•˜๋Š”๋ฐ

์ฆ‰, ์ผ์ข…์˜ ํŒŒ์ดํ”„ ๋ผ์ธ์œผ๋กœ์จ ํ•œ์ชฝ์—์„œ ์ฑ„๋„์„ ์—ด๋ฉด ๋‹ค๋ฅธ ์ชฝ์—์„œ๋Š” ํ•ด๋‹น ๊ฐ’์„ ๋ฐ›์•„ ์ฒ˜๋ฆฌํ•˜๋Š” ๊ฐœ๋…์ž…๋‹ˆ๋‹ค.

 

๊ฐ„๋‹จํ•œ ์˜ˆ์ œ๋ฅผ ๋ณด๋ฉฐ ์„ค๋ช… ๋“œ๋ฆฌ๊ฒ ์Šต๋‹ˆ๋‹ค.

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlin.math.roundToInt

fun main(){
    runBlocking {
        val channelData = listOf(3.3 , 2.2 , 13.7 , 5.5)
        val channel = Channel<Double>()
        launch(Dispatchers.Default) {
            channelData.forEach{
                println("data : $it")
                channel.send(it * it)
            }
        }

        launch {
            repeat(channelData.size){
                delay(500)
                println("Channel received : ${channel.receive().roundToInt()}") 
                // ๋ณด๋‚ด์ง„ ์ˆœ์„œ๋Œ€๋กœ ๋ฐ˜ํ™˜ ํ•˜๋Š”๊ฒƒ์„ ํ™•์ธ ํ•  ์ˆ˜ ์žˆ๋‹ค.
            }
        }
    }
}

์œ„ ์ฝ”๋“œ๋Š” ๋ฆฌ์ŠคํŠธ์— ์ €์žฅ๋œ ๊ฐ’์„ ์ฑ„๋„์„ ํ†ตํ•ด ์†ก/์ˆ˜์‹ ํ•˜๋Š” ์ฝ”๋“œ๋กœ

์ฒซ ๋ฒˆ์งธ ์ฝ”๋ฃจํ‹ด์€ ๋ฆฌ์ŠคํŠธ์˜ ๊ฐ’์„ ์ œ๊ณฑํ•˜์—ฌ ์ฑ„๋„์— ๋ณด๋‚ด๋Š” ์—ญํ• ์„ ์ˆ˜ํ–‰ํ•˜๊ณ 

๋‘ ๋ฒˆ์งธ ์ฝ”๋ฃจํ‹ด์€ ์ฑ„๋„์—์„œ ์ˆ˜์‹ ๋œ ๊ฐ’์„ ๊บผ๋‚ด์„œ print ํ•˜๋Š” ์—ญํ• ์„ ์ˆ˜ํ–‰ํ•ฉ๋‹ˆ๋‹ค.

๊ฒฐ๊ณผ๋ฅผ ๋ณด๋ฉด ์•Œ๊ฒ ์ง€๋งŒ ์ฑ„๋„์„ ํ†ตํ•ด ๊ฐ’์„ ์ˆ˜์‹  ์‹œ์—๋Š” Stream ํ˜•๋Œ€๋กœ ์ „์†ก๋˜๊ธฐ ๋•Œ๋ฌธ์— ํ•ญ์ƒ ์ˆœ์„œ๊ฐ€ ๋ณด์žฅ๋ฉ๋‹ˆ๋‹ค.

 

 

Buffered Channel

์ฝ”๋ฃจํ‹ด Channel์„ ์ƒ์„ฑํ•  ๋•Œ ์ƒ์„ฑ์ž์— ํŒŒ๋ผ๋ฏธํ„ฐ๋ฅผ ์ฃผ๊ฒŒ ๋˜๋ฉด ์ฑ„๋„์— ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ด์„ ์ˆ˜ ์žˆ๋Š” ๋ฒ„ํผ์˜ ์‚ฌ์ด์ฆˆ๋ฅผ ์ง€์ •ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

๋ฟ๋งŒ ์•„๋‹ˆ๋ผ, BufferOverFlow์— ์ •์˜๋œ ์†์„ฑ์„ ํ†ตํ•ด ๋ฒ„ํผ๊ฐ€ ๊ฐ€๋“ ์ฐจ๋ฉด ์–ด๋–ป๊ฒŒ ๊ฐ’์„ ๋ณด๋‚ผ ๊ฑด์ง€์— ๋Œ€ํ•œ ํ–‰๋™๋“ค๋„ ์ •์˜ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

( ๊ธฐ๋ณธ์ ์œผ๋กœ Coroutine์€ ๋ฒ„ํผ๊ฐ€ ์ดˆ๊ณผ๋˜๋ฉด ๋จผ์ € ๋ฒ„ํผ์— ๋“ค์–ด์˜จ ์˜ค๋ž˜๋œ ๊ฐ’์€ ๋ฒ„๋ฆฐ๋‹ค. )

 

/** ์ƒ์„ฑ์ž๋ฅผ ํ†ตํ•ด ๋ฒ„ํผsizes๋Š” 5 ๋ฒ„ํผ๊ฐ€ ๊ฐ€๋“์ฐผ์„๋•Œ๋Š” ์ดํ›„์— ๋“ค์–ด์˜ค๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ๋ฒ„๋ฆฌ๋„๋ก ์„ค์ • **/
val channel = Channel<Int>(5 , BufferOverflow.DROP_LATEST)
์†์„ฑ ์„ค๋ช…
BufferOverflow.DROP_OLDEST
๋ฒ„ํผ๊ฐ€ ๊ฐ€๋“์ฐจ๋ฉด ์˜ค๋ž˜๋œ ๊ฐ’(์ด์ „์— ๋‹ด๊ณ ์žˆ๋˜ ๊ฐ’)์„ ๋ฒ„๋ฆฐ๋‹ค.
BufferOverflow.DROP_LATEST
๋ฒ„ํผ๊ฐ€ ๊ฐ€๋“์ฐจ๋ฉด ์ƒˆ๋กœ์šด ๊ฐ’(์ƒˆ๋กญ๊ฒŒ ์ „์†ก๋œ ๊ฐ’)์„ ๋ฒ„๋ฆฐ๋‹ค.
BufferOverflow.SUSPEND
๋ฒ„ํผ๊ฐ€ ๊ฐ€๋“์ฐจ๋ฉด idle์ƒํƒœ๊ฐ€ ๋˜์—ˆ๋‹ค๊ฐ€ ๋‹ค์‹œ ์ „์†กํ•œ๋‹ค.

 

import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun main() {
    runBlocking {
        val channel = Channel<Int>(5 , BufferOverflow.DROP_LATEST)
        //val channel = Channel<Int>(5 , BufferOverflow.DROP_OLDEST)
        //val channel = Channel<Int>(5 , BufferOverflow.SUSPEND)
        launch {
            (1..50).forEach{
                println("Send : $it")
                channel.send(it)
            }
        }
        delay(2000L) // ์ง€์—ฐ์„ ํ†ตํ•ด ์ธ์œ„์ ์œผ๋กœ ๋ฒ„ํผ๊ฐ€ ์ฐจ๋„๋ก ํ•œ๋‹ค.
        val job = launch {
            val channelIterator = channel.iterator()
            while (channelIterator.hasNext()){
                println("Received : ${channelIterator.next()}")
            }
            channel.close()
        }
        job.join()
        job.cancel()
    }
}

 

์œ„ ์˜ˆ์ œ๋Š” ์ฑ„๋„์— 5๊ฐœ์˜ ๋ฒ„ํผ ์‚ฌ์ด์ฆˆ๋ฅผ ์ •์˜ํ•ด์ฃผ๊ณ  1~10๊นŒ์ง€์˜ ๊ฐ’์„ ์ฑ„๋„์— ๋ณด๋‚ด์ค€ ํ›„

BufferOverFlow ๋ฐœ์ƒ ์‹œ์— ๋ฒ„ํผ๊ฐ€ ๊ฐ€๋“ ์ฐจ๋ฉด ์ตœ๊ทผ์— ์ „์†ก๋œ ๊ฐ’๋“ค์€ ๋ฒ„๋ฆฌ๋„๋ก ํ•˜๋Š” ์ฝ”๋“œ์ž…๋‹ˆ๋‹ค.

 

๊ฐ ์†์„ฑ์„ ๋ณ€๊ฒฝํ•˜๋ฉด ๋‹ค๋ฅธ ๊ฒฐ๊ด๊ฐ’์„ ์–ป์„ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. 

 

BufferOverflow.DROP_LATEST

 

์ฒ˜์Œ์— ์ „์†ก๋œ 5๊ฐœ์˜ ๊ฐ’๋งŒ ์ถœ๋ ฅ

 

BufferOverflow.DROP_OLDEST

 

๋งˆ์ง€๋ง‰์— ์ˆ˜์‹ ๋œ 5๊ฐœ์˜ ๊ฐ’๋งŒ ์ถœ๋ ฅ

 

BufferOverflow.SUSPEND

 

๋ฒ„ํผ๊ฐ€ ๊ฐ€๋“์ฐจ๋ฉด idle์ƒํƒœ๊ฐ€ ๋˜์–ด ๊ฐ’์„ ๋น„์šฐ๊ณ  ๋‹ค์‹œ ๊ฐ’์„ ์ˆ˜์‹ ํ•œ๋‹ค.

 

์ด์ƒ์œผ๋กœ ์ฝ”๋ฃจํ‹ด ์ฑ„๋„์— ๋Œ€ํ•œ ํฌ์ŠคํŒ…์„ ๋งˆ์น˜๊ฒ ์Šต๋‹ˆ๋‹ค.

๋‹ค์Œ ํฌ์ŠคํŒ…์€ ์ฝ”๋ฃจํ‹ด ํŠœํ† ๋ฆฌ์–ผ ๋งˆ์ง€๋ง‰ ํฌ์ŠคํŒ…์œผ๋กœ

์ฝ”๋ฃจํ‹ด Flow์— ๋Œ€ํ•ด์„œ ๊ณต๋ถ€ํ•œ ๋‚ด์šฉ์„ ํฌ์ŠคํŒ…ํ•ด๋ณด๋ ค ํ•ฉ๋‹ˆ๋‹ค.

๊ทธ๋Ÿผ ๋‹ค์Œ ํฌ์ŠคํŒ…์—์„œ ๋ต™๊ฒ ์Šต๋‹ˆ๋‹ค. ๐Ÿ‘‹

 

์˜ค๋Š˜๋„ ์ฆ์ฝ”ํ•˜์„ธ์š” :)