Channels trong Kotlin Coroutines
1. Giới thiệu
Channels là primitive cho communication giữa coroutines. Chúng giống như BlockingQueue nhưng có suspending operations thay vì blocking.
2. Basic Channel
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking {
val channel = Channel<Int>()
// Producer
launch {
for (x in 1..5) {
println("Sending $x")
channel.send(x)
}
channel.close() // Đóng khi xong
}
// Consumer
for (value in channel) {
println("Received $value")
}
println("Done!")
}
// Output:
// Sending 1
// Received 1
// Sending 2
// Received 2
// ...3. Channel Types
Rendezvous Channel (default)
Không có buffer - sender suspend cho đến khi receiver sẵn sàng:
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking {
val channel = Channel<Int>() // Rendezvous (capacity = 0)
launch {
println("Sending 1...")
channel.send(1) // Suspends until received
println("Sent 1!")
}
delay(1000)
println("Receiving...")
println("Got: ${channel.receive()}")
}
// Output:
// Sending 1...
// (waits 1 second)
// Receiving...
// Sent 1!
// Got: 1Buffered Channel
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking {
val channel = Channel<Int>(capacity = 3) // Buffer size 3
launch {
for (x in 1..5) {
println("Sending $x")
channel.send(x) // Won't suspend until buffer full
println("Sent $x")
}
}
delay(500) // Let producer fill buffer
for (value in channel) {
println("Received $value")
delay(100)
if (value == 5) break
}
}Conflated Channel
Chỉ giữ giá trị mới nhất:
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking {
val channel = Channel<Int>(Channel.CONFLATED)
launch {
for (x in 1..5) {
println("Sending $x")
channel.send(x)
}
}
delay(100) // Let producer send all values
println("Received: ${channel.receive()}") // Only gets latest (5)
}
// Output:
// Sending 1
// Sending 2
// Sending 3
// Sending 4
// Sending 5
// Received: 5Unlimited Channel
import kotlinx.coroutines.channels.*
val channel = Channel<Int>(Channel.UNLIMITED) // Never suspends on sendSo sánh
| Type | Capacity | Suspend on send | Use case |
|---|---|---|---|
| Rendezvous | 0 | Always | Synchronization |
| Buffered | N | When full | Throughput |
| Conflated | 1 | Never (replaces) | Latest value |
| Unlimited | ∞ | Never | Memory tradeoff |
4. Producers và Consumers
produce builder
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun CoroutineScope.produceNumbers(): ReceiveChannel<Int> = produce {
for (x in 1..10) {
send(x * x)
}
} // Channel auto-closed when done
fun main() = runBlocking {
val squares = produceNumbers()
squares.consumeEach { println(it) }
// 1, 4, 9, 16, 25, 36, 49, 64, 81, 100
}actor builder
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
sealed class CounterMsg
object Increment : CounterMsg()
class GetCount(val response: CompletableDeferred<Int>) : CounterMsg()
fun CoroutineScope.counterActor() = actor<CounterMsg> {
var counter = 0
for (msg in channel) {
when (msg) {
is Increment -> counter++
is GetCount -> msg.response.complete(counter)
}
}
}
fun main() = runBlocking {
val counter = counterActor()
// Increment concurrently
repeat(100) {
launch {
counter.send(Increment)
}
}
// Get final count
val response = CompletableDeferred<Int>()
counter.send(GetCount(response))
println("Count: ${response.await()}") // 100
counter.close()
}5. Fan-out và Fan-in
Fan-out: Multiple consumers
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking {
val channel = produce {
repeat(10) {
send(it)
delay(100)
}
}
// Multiple consumers
repeat(3) { consumerId ->
launch {
for (msg in channel) {
println("Consumer $consumerId received $msg")
}
}
}
}
// Each message goes to only one consumerFan-in: Multiple producers
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking {
val channel = Channel<String>()
// Multiple producers
launch { sendStrings(channel, "A", 200) }
launch { sendStrings(channel, "B", 300) }
launch { sendStrings(channel, "C", 500) }
// Single consumer
repeat(9) {
println(channel.receive())
}
coroutineContext.cancelChildren()
}
suspend fun sendStrings(channel: SendChannel<String>, prefix: String, delay: Long) {
repeat(3) {
delay(delay)
channel.send("$prefix$it")
}
}
// Output interleaved: A0, B0, A1, C0, B1, A2, B2, C1, C26. Select Expression
Chọn từ nhiều channels:
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.selects.*
fun main() = runBlocking {
val channel1 = produce {
delay(100)
send("from channel 1")
}
val channel2 = produce {
delay(50)
send("from channel 2")
}
// Select first available
val result = select<String> {
channel1.onReceive { it }
channel2.onReceive { it }
}
println(result) // "from channel 2" (faster)
}Select with timeout
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.selects.*
fun main() = runBlocking {
val channel = Channel<String>()
launch {
delay(500)
channel.send("Hello")
}
val result = select<String?> {
channel.onReceive { it }
onTimeout(200) { null }
}
println(result ?: "Timeout!") // "Timeout!"
}7. Pipeline Pattern
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun CoroutineScope.produceNumbers() = produce {
var x = 1
while (true) {
send(x++)
delay(100)
}
}
fun CoroutineScope.square(numbers: ReceiveChannel<Int>) = produce {
for (x in numbers) {
send(x * x)
}
}
fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, predicate: (Int) -> Boolean) = produce {
for (x in numbers) {
if (predicate(x)) send(x)
}
}
fun main() = runBlocking {
val numbers = produceNumbers()
val squares = square(numbers)
val evenSquares = filter(squares) { it % 2 == 0 }
repeat(5) {
println(evenSquares.receive())
}
coroutineContext.cancelChildren()
}
// Output: 4, 16, 36, 64, 1008. Broadcast Channel (Deprecated → SharedFlow)
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
// Modern approach: use SharedFlow instead of BroadcastChannel
fun main() = runBlocking {
val sharedFlow = MutableSharedFlow<Int>()
// Multiple subscribers
launch {
sharedFlow.collect { println("Subscriber 1: $it") }
}
launch {
sharedFlow.collect { println("Subscriber 2: $it") }
}
delay(100)
// Emit to all subscribers
repeat(3) {
sharedFlow.emit(it)
delay(100)
}
delay(100)
coroutineContext.cancelChildren()
}9. Channels vs Flows
| Feature | Channel | Flow |
|---|---|---|
| Hot/Cold | Hot | Cold |
| Multiple collectors | Shared | Each gets all values |
| Backpressure | Built-in (suspend) | Operators |
| Use case | Communication | Data streams |
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
// Channel: hot, shared
val channel = produce {
repeat(3) { send(it) }
}
launch { channel.consumeEach { println("A: $it") } }
launch { channel.consumeEach { println("B: $it") } }
// Each value goes to ONE consumer
delay(100)
// Flow: cold, each collector gets all
val flow = flow {
repeat(3) { emit(it) }
}
launch { flow.collect { println("X: $it") } }
launch { flow.collect { println("Y: $it") } }
// Both X and Y get all values
delay(100)
}10. Practical Examples
Rate limiter with Channel
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
class RateLimiter(
private val requestsPerSecond: Int
) {
private val tickets = Channel<Unit>(requestsPerSecond)
init {
CoroutineScope(Dispatchers.Default).launch {
while (true) {
repeat(requestsPerSecond) {
tickets.trySend(Unit)
}
delay(1000)
}
}
}
suspend fun acquire() {
tickets.receive()
}
}
suspend fun main() {
val limiter = RateLimiter(5) // 5 requests per second
repeat(20) { i ->
limiter.acquire()
println("Request $i at ${System.currentTimeMillis()}")
}
}Work queue
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
data class Task(val id: Int, val data: String)
data class Result(val taskId: Int, val result: String)
fun main() = runBlocking {
val tasks = Channel<Task>(Channel.UNLIMITED)
val results = Channel<Result>(Channel.UNLIMITED)
// Workers
repeat(3) { workerId ->
launch {
for (task in tasks) {
delay(100) // Simulate work
results.send(Result(task.id, "Processed by $workerId: ${task.data}"))
}
}
}
// Submit tasks
repeat(10) {
tasks.send(Task(it, "Data $it"))
}
tasks.close()
// Collect results
repeat(10) {
println(results.receive())
}
}📝 Tóm tắt
| Operation | Description |
|---|---|
send(value) | Send value (suspends if full) |
receive() | Receive value (suspends if empty) |
close() | Close channel |
produce { } | Create producer coroutine |
actor { } | Create actor coroutine |
Channel types:
- Rendezvous: Synchronous handoff
- Buffered: Async with fixed capacity
- Conflated: Only keep latest
- Unlimited: Never suspend (memory!)
Khi nào dùng Channel vs Flow:
- Channel: Communication giữa coroutines
- Flow: Data streams, transformations
Last updated on