Skip to Content
Kotlin⚡ Kotlin CoroutinesFlow - Luồng dữ liệu

Flow - Luồng dữ liệu

Flow là cách Kotlin xử lý luồng dữ liệu bất đồng bộ. Nếu coroutines là để xử lý một giá trị async, Flow là để xử lý nhiều giá trị theo thời gian.

Flow là gì?

// suspend function: Trả về MỘT giá trị suspend fun fetchUser(): User // Flow: Trả về NHIỀU giá trị theo thời gian fun observeUsers(): Flow<User>

Ví dụ thực tế

Use CaseSuspend FunctionFlow
Load usergetUser()-
Real-time updates-observeUser()
Search suggestions-searchFlow
Location updates-locationFlow
Database changes-getAllUsersFlow()

Tạo Flow

1. flow builder

fun simpleFlow(): Flow<Int> = flow { for (i in 1..3) { delay(100) // Suspend emit(i) // Phát ra giá trị } }

2. flowOf() - Từ values

val numbersFlow = flowOf(1, 2, 3, 4, 5)

3. asFlow() - Từ collections

val listFlow = listOf("A", "B", "C").asFlow() val rangeFlow = (1..10).asFlow()

4. Flow từ callback

fun locationFlow(): Flow<Location> = callbackFlow { val callback = object : LocationCallback() { override fun onLocationResult(result: LocationResult) { trySend(result.lastLocation) // Phát ra location } } locationClient.requestLocationUpdates(request, callback, Looper.getMainLooper()) awaitClose { // Cleanup khi flow bị cancel locationClient.removeLocationUpdates(callback) } }

Collect Flow

Flow chỉ chạy khi được collect (cold stream).

Cách collect

fun main() = runBlocking { simpleFlow().collect { value -> println("Received: $value") } } // Output: // Received: 1 // Received: 2 // Received: 3

Các hàm terminal khác

val flow = flowOf(1, 2, 3, 4, 5) // Lấy tất cả thành list val list = flow.toList() // [1, 2, 3, 4, 5] // Lấy giá trị đầu val first = flow.first() // 1 // Lấy giá trị cuối val last = flow.last() // 5 // Đếm val count = flow.count() // 5 // Reduce val sum = flow.reduce { acc, value -> acc + value } // 15

Flow Operators

Intermediate Operators

val flow = (1..10).asFlow() // map - Transform từng giá trị flow.map { it * 2 } // 2, 4, 6, 8, ... // filter - Lọc giá trị flow.filter { it % 2 == 0 } // 2, 4, 6, 8, 10 // take - Lấy n giá trị đầu flow.take(3) // 1, 2, 3 // drop - Bỏ n giá trị đầu flow.drop(3) // 4, 5, 6, 7, 8, 9, 10 // transform - Custom transformation flow.transform { value -> emit(value) emit(value * 10) } // 1, 10, 2, 20, 3, 30, ...

Chaining operators

(1..100).asFlow() .filter { it % 2 == 0 } // Số chẵn .map { it * it } // Bình phương .take(5) // Lấy 5 đầu .collect { println(it) } // 4, 16, 36, 64, 100

flowOn - Chuyển Dispatcher

fun userFlow(): Flow<User> = flow { // Runs on IO val user = api.getUser() emit(user) } .flowOn(Dispatchers.IO) // Upstream chạy trên IO // Collect chạy trên caller's context viewModelScope.launch { userFlow().collect { user -> // Runs on Main (caller's context) updateUI(user) } }

Lưu ý: flowOn chỉ ảnh hưởng upstream (code trước nó), không ảnh hưởng collect.


StateFlow và SharedFlow

StateFlow

StateFlow lưu trữ giá trị hiện tại và emit cho collectors mới.

class CounterViewModel : ViewModel() { // MutableStateFlow với initial value private val _count = MutableStateFlow(0) // Public read-only StateFlow val count: StateFlow<Int> = _count.asStateFlow() fun increment() { _count.value++ } } // Collect trong UI @Composable fun CounterScreen(viewModel: CounterViewModel) { val count by viewModel.count.collectAsState() Text("Count: $count") }

StateFlow vs LiveData

FeatureStateFlowLiveData
NullableKhông null mặc địnhNullable
Initial valueBắt buộcOptional
Lifecycle-awareManualTự động
OperatorsNhiều (map, filter…)Ít
Kotlin-first❌ (Java)

SharedFlow

SharedFlow cho phép multiple emissions mà không cần giá trị ban đầu.

class EventViewModel : ViewModel() { private val _events = MutableSharedFlow<UiEvent>() val events: SharedFlow<UiEvent> = _events.asSharedFlow() fun showToast(message: String) { viewModelScope.launch { _events.emit(UiEvent.ShowToast(message)) } } } sealed class UiEvent { data class ShowToast(val message: String) : UiEvent() object NavigateBack : UiEvent() }

Collect trong Android

Lifecycle-aware collecting

// ❌ SAI - Tiếp tục collect khi app ở background lifecycleScope.launch { viewModel.uiState.collect { state -> updateUI(state) } } // ✅ ĐÚNG - Chỉ collect khi STARTED lifecycleScope.launch { repeatOnLifecycle(Lifecycle.State.STARTED) { viewModel.uiState.collect { state -> updateUI(state) } } }

Trong Compose

@Composable fun UserScreen(viewModel: UserViewModel) { val uiState by viewModel.uiState.collectAsState() // Hoặc với lifecycle-awareness val uiState by viewModel.uiState.collectAsStateWithLifecycle() when (uiState) { is UiState.Loading -> LoadingScreen() is UiState.Success -> UserContent(uiState.data) is UiState.Error -> ErrorScreen(uiState.message) } }

Ví dụ thực tế

Search với debounce

class SearchViewModel : ViewModel() { private val searchQuery = MutableStateFlow("") val searchResults: StateFlow<List<Result>> = searchQuery .debounce(300) // Đợi 300ms không có input mới .filter { it.length >= 2 } // Ít nhất 2 ký tự .distinctUntilChanged() // Không search lại nếu query giống .flatMapLatest { query -> // Cancel search cũ nếu có query mới searchRepository.search(query) } .stateIn( scope = viewModelScope, started = SharingStarted.WhileSubscribed(5000), initialValue = emptyList() ) fun onQueryChanged(query: String) { searchQuery.value = query } }

Database real-time updates

@Dao interface UserDao { @Query("SELECT * FROM users") fun observeAllUsers(): Flow<List<User>> // Room tự emit khi data thay đổi } class UserRepository(private val dao: UserDao) { fun observeUsers(): Flow<List<User>> = dao.observeAllUsers() .flowOn(Dispatchers.IO) } // ViewModel class UserListViewModel(private val repository: UserRepository) : ViewModel() { val users: StateFlow<List<User>> = repository.observeUsers() .stateIn( scope = viewModelScope, started = SharingStarted.WhileSubscribed(5000), initialValue = emptyList() ) }

📝 Tóm tắt

ConceptMô tả
FlowCold stream, chạy khi collect
emit()Phát ra giá trị
collect()Nhận giá trị
map, filterTransform operators
StateFlowHot flow với current value
SharedFlowHot flow cho events
flowOnChuyển dispatcher
stateInConvert Flow → StateFlow

Best Practices

  1. Dùng StateFlow cho UI state
  2. Dùng SharedFlow cho one-time events
  3. flowOn(Dispatchers.IO) cho network/database
  4. collectAsStateWithLifecycle trong Compose
  5. repeatOnLifecycle trong Views

Xem thêm

Last updated on