Skip to Content

Channels trong Kotlin Coroutines

1. Giới thiệu

Channels là primitive cho communication giữa coroutines. Chúng giống như BlockingQueue nhưng có suspending operations thay vì blocking.

2. Basic Channel

import kotlinx.coroutines.* import kotlinx.coroutines.channels.* fun main() = runBlocking { val channel = Channel<Int>() // Producer launch { for (x in 1..5) { println("Sending $x") channel.send(x) } channel.close() // Đóng khi xong } // Consumer for (value in channel) { println("Received $value") } println("Done!") } // Output: // Sending 1 // Received 1 // Sending 2 // Received 2 // ...

3. Channel Types

Rendezvous Channel (default)

Không có buffer - sender suspend cho đến khi receiver sẵn sàng:

import kotlinx.coroutines.* import kotlinx.coroutines.channels.* fun main() = runBlocking { val channel = Channel<Int>() // Rendezvous (capacity = 0) launch { println("Sending 1...") channel.send(1) // Suspends until received println("Sent 1!") } delay(1000) println("Receiving...") println("Got: ${channel.receive()}") } // Output: // Sending 1... // (waits 1 second) // Receiving... // Sent 1! // Got: 1

Buffered Channel

import kotlinx.coroutines.* import kotlinx.coroutines.channels.* fun main() = runBlocking { val channel = Channel<Int>(capacity = 3) // Buffer size 3 launch { for (x in 1..5) { println("Sending $x") channel.send(x) // Won't suspend until buffer full println("Sent $x") } } delay(500) // Let producer fill buffer for (value in channel) { println("Received $value") delay(100) if (value == 5) break } }

Conflated Channel

Chỉ giữ giá trị mới nhất:

import kotlinx.coroutines.* import kotlinx.coroutines.channels.* fun main() = runBlocking { val channel = Channel<Int>(Channel.CONFLATED) launch { for (x in 1..5) { println("Sending $x") channel.send(x) } } delay(100) // Let producer send all values println("Received: ${channel.receive()}") // Only gets latest (5) } // Output: // Sending 1 // Sending 2 // Sending 3 // Sending 4 // Sending 5 // Received: 5

Unlimited Channel

import kotlinx.coroutines.channels.* val channel = Channel<Int>(Channel.UNLIMITED) // Never suspends on send

So sánh

TypeCapacitySuspend on sendUse case
Rendezvous0AlwaysSynchronization
BufferedNWhen fullThroughput
Conflated1Never (replaces)Latest value
UnlimitedNeverMemory tradeoff

4. Producers và Consumers

produce builder

import kotlinx.coroutines.* import kotlinx.coroutines.channels.* fun CoroutineScope.produceNumbers(): ReceiveChannel<Int> = produce { for (x in 1..10) { send(x * x) } } // Channel auto-closed when done fun main() = runBlocking { val squares = produceNumbers() squares.consumeEach { println(it) } // 1, 4, 9, 16, 25, 36, 49, 64, 81, 100 }

actor builder

import kotlinx.coroutines.* import kotlinx.coroutines.channels.* sealed class CounterMsg object Increment : CounterMsg() class GetCount(val response: CompletableDeferred<Int>) : CounterMsg() fun CoroutineScope.counterActor() = actor<CounterMsg> { var counter = 0 for (msg in channel) { when (msg) { is Increment -> counter++ is GetCount -> msg.response.complete(counter) } } } fun main() = runBlocking { val counter = counterActor() // Increment concurrently repeat(100) { launch { counter.send(Increment) } } // Get final count val response = CompletableDeferred<Int>() counter.send(GetCount(response)) println("Count: ${response.await()}") // 100 counter.close() }

5. Fan-out và Fan-in

Fan-out: Multiple consumers

import kotlinx.coroutines.* import kotlinx.coroutines.channels.* fun main() = runBlocking { val channel = produce { repeat(10) { send(it) delay(100) } } // Multiple consumers repeat(3) { consumerId -> launch { for (msg in channel) { println("Consumer $consumerId received $msg") } } } } // Each message goes to only one consumer

Fan-in: Multiple producers

import kotlinx.coroutines.* import kotlinx.coroutines.channels.* fun main() = runBlocking { val channel = Channel<String>() // Multiple producers launch { sendStrings(channel, "A", 200) } launch { sendStrings(channel, "B", 300) } launch { sendStrings(channel, "C", 500) } // Single consumer repeat(9) { println(channel.receive()) } coroutineContext.cancelChildren() } suspend fun sendStrings(channel: SendChannel<String>, prefix: String, delay: Long) { repeat(3) { delay(delay) channel.send("$prefix$it") } } // Output interleaved: A0, B0, A1, C0, B1, A2, B2, C1, C2

6. Select Expression

Chọn từ nhiều channels:

import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.selects.* fun main() = runBlocking { val channel1 = produce { delay(100) send("from channel 1") } val channel2 = produce { delay(50) send("from channel 2") } // Select first available val result = select<String> { channel1.onReceive { it } channel2.onReceive { it } } println(result) // "from channel 2" (faster) }

Select with timeout

import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.selects.* fun main() = runBlocking { val channel = Channel<String>() launch { delay(500) channel.send("Hello") } val result = select<String?> { channel.onReceive { it } onTimeout(200) { null } } println(result ?: "Timeout!") // "Timeout!" }

7. Pipeline Pattern

import kotlinx.coroutines.* import kotlinx.coroutines.channels.* fun CoroutineScope.produceNumbers() = produce { var x = 1 while (true) { send(x++) delay(100) } } fun CoroutineScope.square(numbers: ReceiveChannel<Int>) = produce { for (x in numbers) { send(x * x) } } fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, predicate: (Int) -> Boolean) = produce { for (x in numbers) { if (predicate(x)) send(x) } } fun main() = runBlocking { val numbers = produceNumbers() val squares = square(numbers) val evenSquares = filter(squares) { it % 2 == 0 } repeat(5) { println(evenSquares.receive()) } coroutineContext.cancelChildren() } // Output: 4, 16, 36, 64, 100

8. Broadcast Channel (Deprecated → SharedFlow)

import kotlinx.coroutines.* import kotlinx.coroutines.flow.* // Modern approach: use SharedFlow instead of BroadcastChannel fun main() = runBlocking { val sharedFlow = MutableSharedFlow<Int>() // Multiple subscribers launch { sharedFlow.collect { println("Subscriber 1: $it") } } launch { sharedFlow.collect { println("Subscriber 2: $it") } } delay(100) // Emit to all subscribers repeat(3) { sharedFlow.emit(it) delay(100) } delay(100) coroutineContext.cancelChildren() }

9. Channels vs Flows

FeatureChannelFlow
Hot/ColdHotCold
Multiple collectorsSharedEach gets all values
BackpressureBuilt-in (suspend)Operators
Use caseCommunicationData streams
import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.* fun main() = runBlocking { // Channel: hot, shared val channel = produce { repeat(3) { send(it) } } launch { channel.consumeEach { println("A: $it") } } launch { channel.consumeEach { println("B: $it") } } // Each value goes to ONE consumer delay(100) // Flow: cold, each collector gets all val flow = flow { repeat(3) { emit(it) } } launch { flow.collect { println("X: $it") } } launch { flow.collect { println("Y: $it") } } // Both X and Y get all values delay(100) }

10. Practical Examples

Rate limiter with Channel

import kotlinx.coroutines.* import kotlinx.coroutines.channels.* class RateLimiter( private val requestsPerSecond: Int ) { private val tickets = Channel<Unit>(requestsPerSecond) init { CoroutineScope(Dispatchers.Default).launch { while (true) { repeat(requestsPerSecond) { tickets.trySend(Unit) } delay(1000) } } } suspend fun acquire() { tickets.receive() } } suspend fun main() { val limiter = RateLimiter(5) // 5 requests per second repeat(20) { i -> limiter.acquire() println("Request $i at ${System.currentTimeMillis()}") } }

Work queue

import kotlinx.coroutines.* import kotlinx.coroutines.channels.* data class Task(val id: Int, val data: String) data class Result(val taskId: Int, val result: String) fun main() = runBlocking { val tasks = Channel<Task>(Channel.UNLIMITED) val results = Channel<Result>(Channel.UNLIMITED) // Workers repeat(3) { workerId -> launch { for (task in tasks) { delay(100) // Simulate work results.send(Result(task.id, "Processed by $workerId: ${task.data}")) } } } // Submit tasks repeat(10) { tasks.send(Task(it, "Data $it")) } tasks.close() // Collect results repeat(10) { println(results.receive()) } }

📝 Tóm tắt

OperationDescription
send(value)Send value (suspends if full)
receive()Receive value (suspends if empty)
close()Close channel
produce { }Create producer coroutine
actor { }Create actor coroutine

Channel types:

  • Rendezvous: Synchronous handoff
  • Buffered: Async with fixed capacity
  • Conflated: Only keep latest
  • Unlimited: Never suspend (memory!)

Khi nào dùng Channel vs Flow:

  • Channel: Communication giữa coroutines
  • Flow: Data streams, transformations
Last updated on