Skip to content
This repository has been archived by the owner on Feb 23, 2022. It is now read-only.

Commit

Permalink
Supplier function for initial state (#21)
Browse files Browse the repository at this point in the history
* updated android gradle plugin and kotlin 1.3.0

* Added supplier function for initial state

* Tests that supplier runs on subscribeOn scheudler #14

* update travis to build tools 28.0.3

* Formatting tests
  • Loading branch information
sockeqwe authored Nov 5, 2018
1 parent dfdabee commit 733cf63
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 21 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ android:
- platform-tools
- tools
- android-28
- build-tools-28.0.2
- build-tools-28.0.3
- extra-android-support
- extra-android-m2repository
licenses:
Expand Down
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
apply from: 'dependencies.gradle'

buildscript {
ext.kotlin_version = '1.2.71'
ext.kotlin_version = '1.3.0'
repositories {
google()
mavenCentral()
jcenter()
}
dependencies {
classpath 'com.android.tools.build:gradle:3.2.0'
classpath 'com.android.tools.build:gradle:3.2.1'
classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
//classpath 'com.facebook.testing.screenshot:plugin:0.7.0'
classpath 'com.karumi:shot:2.0.0'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,34 +22,56 @@ import io.reactivex.subjects.Subject
* side effect ([Throwable] has been thrown) then the ReduxStore reaches onError() as well and
* according to the reactive stream specs the store cannot recover the error state.
*
* @param initialState The initial state. This one will be emitted directly in onSubscribe()
* @param initialStateSupplier A function that computes the initial state. The computation is
* done lazily once an observer subscribes and it is done on the [io.reactivex.Scheduler] that
* you have specified in subscibeOn(). The computed initial state will be emitted directly
* in onSubscribe()
* @param sideEffects The sideEffects. See [SideEffect]
* @param reducer The reducer. See [Reducer].
* @param S The type of the State
* @param A The type of the Actions
*/
fun <S: Any, A: Any> Observable<A>.reduxStore(
initialState: S,
fun <S : Any, A : Any> Observable<A>.reduxStore(
initialStateSupplier: () -> S,
sideEffects: Iterable<SideEffect<S, A>>,
reducer: Reducer<S, A>
): Observable<S> {
return RxJavaPlugins.onAssembly(
ObservableReduxStore(
initialState = initialState,
initialStateSupplier = initialStateSupplier,
upstreamActionsObservable = this,
reducer = reducer,
sideEffects = sideEffects
)
)
}

/**
* Just a convenience method to use a fixed value as initial state (rather than a supplier function).
* However, under the hood it creates a fixed supplier function that captures this fixed value.
*
* @see reduxStore
* @param initialState The initial state. The initial state is emitted directly in on onSubscribe().
* @param sideEffects The SideEffects. See [SideEffect].
* @param reducer The reducer. See [Reducer].
*/
fun <S : Any, A : Any> Observable<A>.reduxStore(
initialState: S,
sideEffects: Iterable<SideEffect<S, A>>,
reducer: Reducer<S, A>
): Observable<S> = reduxStore(
initialStateSupplier = { initialState },
sideEffects = sideEffects,
reducer = reducer
)

/**
* Just a convenience method to use varags for arbitarry many sideeffects instead a list of SideEffects.
* See [reduxStore] documentation.
*
* @see reduxStore
*/
fun <S: Any, A: Any> Observable<A>.reduxStore(
fun <S : Any, A : Any> Observable<A>.reduxStore(
initialState: S,
vararg sideEffects: SideEffect<S, A>,
reducer: Reducer<S, A>
Expand All @@ -59,18 +81,35 @@ fun <S: Any, A: Any> Observable<A>.reduxStore(
reducer = reducer
)

/**
* Just a convenience method to use varags for arbitarry many sideeffects instead a list of SideEffects.
* See [reduxStore] documentation.
*
* @see reduxStore
*/
fun <S : Any, A : Any> Observable<A>.reduxStore(
initialStateSupplier: () -> S,
vararg sideEffects: SideEffect<S, A>,
reducer: Reducer<S, A>
): Observable<S> = reduxStore(
initialStateSupplier = initialStateSupplier,
sideEffects = sideEffects.toList(),
reducer = reducer
)

/**
* Use [Observable.reduxStore] to create an instance of this kind of Observable.
*
* @param S The type of the State
* @param A The type of the Actions
* @see [Observable.reduxStore]
*/
private class ObservableReduxStore<S: Any, A: Any>(
private class ObservableReduxStore<S : Any, A : Any>(
/**
* The initial state. This one will be emitted directly in onSubscribe()
* The initial state. This one will be emitted directly in onSubscribe().
* The supplier is runs on the Scheduler that has been specified in .subscribeOn(MyScheduler).
*/
private val initialState: S,
private val initialStateSupplier: () -> S,
/**
* The upstream that emits Actions (i.e. actions triggered by an User through User Interface)
*/
Expand All @@ -96,7 +135,7 @@ private class ObservableReduxStore<S: Any, A: Any>(
val storeObserver = ReduxStoreObserver(
actualObserver = serializedObserver,
internalDisposables = disposables,
initialState = initialState,
initialState = initialStateSupplier(),
reducer = reducer
)

Expand Down Expand Up @@ -138,7 +177,7 @@ private class ObservableReduxStore<S: Any, A: Any>(
/**
* Simple observer for internal reduxStore
*/
private class ReduxStoreObserver<S: Any, A: Any>(
private class ReduxStoreObserver<S : Any, A : Any>(
private val actualObserver: Observer<in S>,
private val internalDisposables: CompositeDisposable,
initialState: S,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import io.reactivex.schedulers.Schedulers
import io.reactivex.subjects.PublishSubject
import org.junit.Assert
import org.junit.Test
import java.util.concurrent.atomic.AtomicInteger

class ObservableReduxTest {

Expand Down Expand Up @@ -129,7 +130,6 @@ class ObservableReduxTest {
actions: Observable<String>,
accessor: StateAccessor<String>
): Observable<String> = actions.flatMap {
println("Doing something with $it")
Observable.empty<String>()
}

Expand Down Expand Up @@ -158,12 +158,61 @@ class ObservableReduxTest {
val testException = Exception("test")

Observable
.just("Action1")
.reduxStore("Initial", sideEffects = emptyList()) { _, _ ->
throw testException
}
.test()
.assertError(ReducerException::class.java)
.assertErrorMessage("Exception was thrown by reducer, state = 'Initial', action = 'Action1'")
.just("Action1")
.reduxStore("Initial", sideEffects = emptyList()) { _, _ ->
throw testException
}
.test()
.assertError(ReducerException::class.java)
.assertErrorMessage("Exception was thrown by reducer, state = 'Initial', action = 'Action1'")
}

@Test
fun `subscribing new observer calls initial state supplier on subscribeOn scheduler`() {
val ioSchedulerNamePrefix = "Thread[RxCachedThreadScheduler-"
val initialStateCount = AtomicInteger()
val initialStateSupplierCallingThread = Array<Thread?>(2) { null }
val initialState = "initial State"
val action1 = "Action 1"
val testThread = Thread.currentThread()

val observable: Observable<String> = Observable.fromCallable { action1 }
.observeOn(Schedulers.newThread())
.reduxStore(initialStateSupplier = {
val index = initialStateCount.getAndIncrement()
initialStateSupplierCallingThread[index] = Thread.currentThread()
initialState
},
sideEffects = emptyList(),
reducer = { _, action -> action }
).subscribeOn(Schedulers.io())
.take(2)


val observer1 = observable.test()
val observer2 = observable.test()

observer1.awaitTerminalEvent()
observer2.awaitTerminalEvent()

observer1.assertValues(initialState, action1)
observer2.assertValues(initialState, action1)

observer1.assertNoErrors()
observer2.assertNoErrors()

Assert.assertEquals(2, initialStateCount.get())
Assert.assertNotSame(testThread, initialStateSupplierCallingThread[0])
Assert.assertNotSame(testThread, initialStateSupplierCallingThread[1])
Assert.assertTrue(
initialStateSupplierCallingThread[0]
.toString()
.startsWith(ioSchedulerNamePrefix)
)
Assert.assertTrue(
initialStateSupplierCallingThread[1]
.toString()
.startsWith(ioSchedulerNamePrefix)
)
}
}

0 comments on commit 733cf63

Please sign in to comment.