Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,20 @@ internal class FetcherController<Key : Any, Network : Any, Output : Any, Local :
flow { emitAll(realFetcher(key)) }.map {
when (it) {
is FetcherResult.Data -> {
StoreReadResponse.Data(
it.value,
origin = StoreReadResponseOrigin.Fetcher(it.origin),
) as StoreReadResponse<Network>
try {
val network = it.value
val local = converter.fromNetworkToLocal(network)
sourceOfTruth?.write(key, local)
StoreReadResponse.Data(
network,
origin = StoreReadResponseOrigin.Fetcher(it.origin),
) as StoreReadResponse<Network>
} catch (exception: Throwable) {
StoreReadResponse.Error.Exception(
exception,
origin = StoreReadResponseOrigin.Fetcher(),
)
}
}

is FetcherResult.Error.Message ->
Expand All @@ -106,17 +116,15 @@ internal class FetcherController<Key : Any, Network : Any, Output : Any, Local :
StoreReadResponseOrigin.Fetcher()
emit(StoreReadResponse.NoNewData(origin))
},
/**
* When enabled, downstream collectors are never closed, instead, they are kept active to
* receive values dispatched by fetchers created after them. This makes [FetcherController]
* act like a [SourceOfTruth] in the lack of a [SourceOfTruth] provided by the developer.
*/
// When enabled, downstream collectors are never closed.
// Instead, they are kept active to receive values dispatched by fetchers created after them.
// This makes FetcherController act like a SourceOfTruth in the lack of a SourceOfTruth provided by the developer.
piggybackingDownstream = true,
onEach = { response ->
response.dataOrNull()?.let { network: Network ->
val local: Local = converter.fromNetworkToLocal(network)
sourceOfTruth?.write(key, local)
}
onEach = { _ ->
// Exceptions thrown here propagate to the actor and close downstream channels silently.
// This caused store.stream() and store.get() to hang indefinitely (see #660).
// Consequently, we are intentionally performing no work here.
// Conversion and SOT writes now happen in the source flow above.
},
)
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import org.mobilenativefoundation.store.store5.util.asSourceOfTruth
import kotlin.test.Test
import kotlin.test.assertContains
import kotlin.test.assertEquals
import kotlin.test.assertIs

@FlowPreview
@ExperimentalCoroutinesApi
Expand Down Expand Up @@ -1094,4 +1095,40 @@ class FlowStoreTests {
)

private fun <Key : Any, Output : Any> StoreBuilder<Key, Output>.buildWithTestScope() = scope(testScope).build()

@Test
fun stream_givenConverterThrows_thenEmitsError() =
testScope.runTest {
// Given
val exception = IllegalStateException("Converter failed")
val persister = InMemoryPersister<Int, String>()

val pipeline =
StoreBuilder.from(
fetcher = Fetcher.of { _: Int -> "network" },
sourceOfTruth = persister.asSourceOfTruth(),
converter =
object : Converter<String, String, String> {
override fun fromNetworkToLocal(network: String): String {
throw exception
}

override fun fromOutputToLocal(output: String): String = output
},
).buildWithTestScope()

// When + Then
pipeline.stream(StoreReadRequest.fresh(1)).test {
assertEquals(
Loading(
origin = StoreReadResponseOrigin.Fetcher(),
),
awaitItem(),
)

val errorResponse = awaitItem()
assertIs<StoreReadResponse.Error.Exception>(errorResponse)
assertEquals(exception.message, errorResponse.error.message)
}
}
}
Loading