Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,21 @@

import rx.Observable;
import rx.Scheduler;
import rx.Single;
import rx.functions.Func1;

public class RequestHelper {

@Nonnull
public static <T> Observable<T> request(@Nonnull CurrentLoggedInUserDao.LoggedInUserDao loggedInUserDao,
@Nonnull final Scheduler networkScheduler,
@Nonnull final Func1<String, Observable<T>> request) {
public static <T> Single<T> request(@Nonnull CurrentLoggedInUserDao.LoggedInUserDao loggedInUserDao,
@Nonnull final Scheduler networkScheduler,
@Nonnull final Func1<String, Single<T>> request) {
return loggedInUserDao.authTokenObservable(false)
.flatMap(new Func1<String, Observable<T>>() {
.flatMap(new Func1<String, Single<T>>() {
@Override
public Observable<T> call(String s) {
public Single<T> call(String s) {
return request.call(s)
.subscribeOn(networkScheduler)
.unsubscribeOn(networkScheduler);
.subscribeOn(networkScheduler);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.Single;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.subjects.PublishSubject;
Expand Down Expand Up @@ -122,14 +123,16 @@ private OperatorMergeNextToken<PostsIdsResponse, Object> loadMorePostsIds(@Nonnu
@Override
public Observable<PostsIdsResponse> call(@Nullable final PostsIdsResponse previous) {
if (previous == null) {
return createRequest(null);
return createRequest(null)
.toObservable();
} else {
final String nextToken = previous.nextToken();
if (nextToken == null) {
return Observable.never();
}
return createRequest(nextToken)
.map(joinWithPreviousResponse(previous));
.map(joinWithPreviousResponse(previous))
.toObservable();
}

}
Expand All @@ -148,11 +151,11 @@ public PostsIdsResponse call(PostsIdsResponse moreData) {
}

@Nonnull
private Observable<PostsIdsResponse> createRequest(@Nullable final String nextToken) {
private Single<PostsIdsResponse> createRequest(@Nullable final String nextToken) {
return RequestHelper.request(loggedInUserDao, networkScheduler,
new Func1<String, Observable<PostsIdsResponse>>() {
new Func1<String, Single<PostsIdsResponse>>() {
@Override
public Observable<PostsIdsResponse> call(String authorization) {
public Single<PostsIdsResponse> call(String authorization) {
return postsService.listPostsIds(authorization, nextToken);
}
});
Expand All @@ -169,14 +172,15 @@ private OperatorMergeNextToken<PostsResponse, Object> loadMorePosts(@Nonnull fin
@Override
public Observable<PostsResponse> call(@Nullable final PostsResponse previous) {
if (previous == null) {
return createRequest(null);
return createRequest(null).toObservable();
} else {
final String nextToken = previous.nextToken();
if (nextToken == null) {
return Observable.never();
}
return createRequest(nextToken)
.map(joinWithPreviousResponse(previous));
.map(joinWithPreviousResponse(previous))
.toObservable();
}

}
Expand All @@ -195,11 +199,11 @@ public PostsResponse call(PostsResponse moreData) {
}

@Nonnull
private Observable<PostsResponse> createRequest(@Nullable final String nextToken) {
private Single<PostsResponse> createRequest(@Nullable final String nextToken) {
return RequestHelper.request(loggedInUserDao, networkScheduler,
new Func1<String, Observable<PostsResponse>>() {
new Func1<String, Single<PostsResponse>>() {
@Override
public Observable<PostsResponse> call(String authorization) {
public Single<PostsResponse> call(String authorization) {
return postsService.listPosts(authorization, nextToken);
}
});
Expand Down Expand Up @@ -233,34 +237,35 @@ public Observer<Object> refreshObserver() {
}

@Nonnull
public Observable<ResponseOrError<PostWithBody>> postRequestObserver(@Nonnull final AddPost post) {
public Single<ResponseOrError<PostWithBody>> postRequestObserver(@Nonnull final AddPost post) {
return currentLoggedInUserDao.currentLoggedInUserObservable()
.first()
.compose(ResponseOrError.flatMap(new Func1<CurrentLoggedInUserDao.LoggedInUserDao, Observable<ResponseOrError<PostWithBody>>>() {
.toSingle()
.compose(ResponseOrError.singleFlatMap(new Func1<CurrentLoggedInUserDao.LoggedInUserDao, Single<ResponseOrError<PostWithBody>>>() {
@Override
public Observable<ResponseOrError<PostWithBody>> call(CurrentLoggedInUserDao.LoggedInUserDao loggedInUserDao) {
public Single<ResponseOrError<PostWithBody>> call(CurrentLoggedInUserDao.LoggedInUserDao loggedInUserDao) {
return postRequestObserver(loggedInUserDao, post);
}
}));
}


@Nonnull
private Observable<ResponseOrError<PostWithBody>> postRequestObserver(@Nonnull CurrentLoggedInUserDao.LoggedInUserDao loggedInUserDao, @Nonnull final AddPost post) {
private Single<ResponseOrError<PostWithBody>> postRequestObserver(@Nonnull CurrentLoggedInUserDao.LoggedInUserDao loggedInUserDao, @Nonnull final AddPost post) {
return RequestHelper.request(loggedInUserDao, networkScheduler,
new Func1<String, Observable<PostWithBody>>() {
new Func1<String, Single<PostWithBody>>() {
@Override
public Observable<PostWithBody> call(String authorization) {
public Single<PostWithBody> call(String authorization) {
return postsService.createPost(authorization, post);
}
})
.doOnNext(new Action1<PostWithBody>() {
.doOnSuccess(new Action1<PostWithBody>() {
@Override
public void call(PostWithBody postWithBody) {
refreshSubject.doOnNext(null);
}
})
.compose(ResponseOrError.<PostWithBody>toResponseOrErrorObservable());
.compose(ResponseOrError.<PostWithBody>toResponseOrErrorSingle());
}

public class PostDao {
Expand All @@ -275,13 +280,14 @@ public class PostDao {
@Override
public Observable<ResponseOrError<PostWithBody>> call(CurrentLoggedInUserDao.LoggedInUserDao loggedInUserDao) {
return RequestHelper.request(loggedInUserDao, networkScheduler,
new Func1<String, Observable<PostWithBody>>() {
new Func1<String, Single<PostWithBody>>() {
@Override
public Observable<PostWithBody> call(String authorization) {
public Single<PostWithBody> call(String authorization) {
return postsService.getPost(authorization, id);
}
})
.compose(ResponseOrError.<PostWithBody>toResponseOrErrorObservable())
.compose(ResponseOrError.<PostWithBody>toResponseOrErrorSingle())
.toObservable()
.compose(MoreOperators.<PostWithBody>repeatOnError(networkScheduler))
.compose(MoreOperators.<ResponseOrError<PostWithBody>>refresh(refreshSubject))
.compose(MoreOperators.<ResponseOrError<PostWithBody>>cacheWithTimeout(networkScheduler));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,24 @@
import retrofit2.http.POST;
import retrofit2.http.Path;
import retrofit2.http.Query;
import rx.Observable;
import rx.Single;

public interface PostsService {

@GET("v1/posts?limit=50&fields=next_token%2Cposts(id%2Cname)&prettyPrint=false")
@Nonnull
Observable<PostsResponse> listPosts(@Header("Authorization") String authorization, @Query("next_token") @Nullable String nextToken);
Single<PostsResponse> listPosts(@Header("Authorization") String authorization, @Query("next_token") @Nullable String nextToken);

@GET("v1/posts_ids?limit=50&prettyPrint=false")
@Nonnull
Observable<PostsIdsResponse> listPostsIds(@Header("Authorization") String authorization, @Query("next_token") @Nullable String nextToken);
Single<PostsIdsResponse> listPostsIds(@Header("Authorization") String authorization, @Query("next_token") @Nullable String nextToken);

@GET("v1/posts/{postId}?prettyPrint=false")
@Nonnull
Observable<PostWithBody> getPost(@Header("Authorization") String authorization, @Path("postId") @Nonnull String id);
Single<PostWithBody> getPost(@Header("Authorization") String authorization, @Path("postId") @Nonnull String id);

@POST("v1/posts")
@Nonnull
Observable<PostWithBody> createPost(@Header("Authorization") String authorization, @Body @Nonnull AddPost addPost);
Single<PostWithBody> createPost(@Header("Authorization") String authorization, @Body @Nonnull AddPost addPost);

}
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,13 @@ public void call() {
showProgress.onNext(true);
}
})
.doOnNext(new Action1<ResponseOrError<PostWithBody>>() {
.doOnUnsubscribe(new Action0() {
@Override
public void call(ResponseOrError<PostWithBody> postWithBodyResponseOrError) {
public void call() {
showProgress.onNext(false);
}
});
})
.toObservable();
}
})
.publish();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import javax.annotation.Nonnull;

import rx.Observable;
import rx.Single;
import rx.SingleSubscriber;
import rx.Subscriber;
import rx.functions.Action0;
import rx.subscriptions.Subscriptions;
Expand All @@ -57,23 +59,22 @@ public String userId() {

@Nonnull
@Override
public Observable<String> authTokenObservable(final boolean forceRefresh) {
return Observable.create(new Observable.OnSubscribe<String>() {
public Single<String> authTokenObservable(final boolean forceRefresh) {
return Single.create(new Single.OnSubscribe<String>() {
@Override
public void call(final Subscriber<? super String> subscriber) {
public void call(final SingleSubscriber<? super String> singleSubscriber) {
final Task<GetTokenResult> token = firebaseUser.getToken(forceRefresh);
final OnCompleteListener<GetTokenResult> listener = new OnCompleteListener<GetTokenResult>() {
@Override
public void onComplete(@NonNull Task<GetTokenResult> task) {
if (subscriber.isUnsubscribed()) {
if (singleSubscriber.isUnsubscribed()) {
return;
}
final Exception exception = task.getException();
if (exception != null) {
subscriber.onError(exception);
singleSubscriber.onError(exception);
} else {
subscriber.onNext(task.getResult().getToken());
subscriber.onCompleted();
singleSubscriber.onSuccess(task.getResult().getToken());
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import javax.annotation.Nonnull;

import rx.Observable;
import rx.Single;

/**
* Dao that contain currently logged in user
Expand Down Expand Up @@ -71,7 +72,7 @@ interface LoggedInUserDao {
* @return observable with auth token or error
*/
@Nonnull
Observable<String> authTokenObservable(boolean forceRefresh);
Single<String> authTokenObservable(boolean forceRefresh);

}

Expand Down
86 changes: 86 additions & 0 deletions rx-extensions/src/main/java/com/appunite/rx/ResponseOrError.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import javax.annotation.Nullable;

import rx.Observable;
import rx.Single;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.functions.Func3;
Expand Down Expand Up @@ -218,6 +219,26 @@ public Observable<ResponseOrError<T>> call(final Observable<T> observable) {
};
}

/**
* Convert {@link Single} that can throw error to {@link Observable<ResponseOrError>}
*
* If source Observable returns obj, result observable will return ResponseOrError.fromData(obj)
* If source Observable throws err, result observable will return ResponseOrError.fromError(err)
*
* @param <T> type of data
* @return observable
*/
@Nonnull
public static <T> Single.Transformer<T, ResponseOrError<T>> toResponseOrErrorSingle() {
return new Single.Transformer<T, ResponseOrError<T>>() {

@Override
public Single<ResponseOrError<T>> call(final Single<T> single) {
return toResponseOrErrorSingle(single);
}
};
}

@Nonnull
private static <T> Observable<ResponseOrError<T>> toResponseOrErrorObservable(@Nonnull Observable<T> observable) {
return observable
Expand All @@ -236,6 +257,24 @@ public Observable<? extends ResponseOrError<T>> call(final Throwable throwable)

}

@Nonnull
private static <T> Single<ResponseOrError<T>> toResponseOrErrorSingle(@Nonnull Single<T> single) {
return single
.map(new Func1<T, ResponseOrError<T>>() {
@Override
public ResponseOrError<T> call(final T t) {
return ResponseOrError.fromData(t);
}
})
.onErrorResumeNext(new Func1<Throwable, Single<? extends ResponseOrError<T>>>() {
@Override
public Single<? extends ResponseOrError<T>> call(final Throwable throwable) {
return Single.just(ResponseOrError.<T>fromError(throwable));
}
});

}

/**
* Map only success response ignoring error
*
Expand Down Expand Up @@ -312,6 +351,36 @@ public Observable<ResponseOrError<K>> call(final Observable<ResponseOrError<T>>
};
}

/**
* Flat map only success response ignoring error
*
* <pre>
* {@code
* Single<ResponseOrError<Boolean>> output =
* Single.just(ResponseOrError.fromData("text")
* .compose(ResponseOrError.map(new Func<String, Single<Boolean>) {
* Single<Boolean> call(String in) {
* return Single.just(in != null);
* }
* });
* }
* </pre>
*
* @param func that maps data of ResponseOrError to Single
* @param <T> type of source object
* @param <K> type of destination object
* @return single
*/
@Nonnull
public static <T, K> Single.Transformer<ResponseOrError<T>, ResponseOrError<K>> singleFlatMap(@Nonnull final Func1<T, Single<ResponseOrError<K>>> func) {
return new Single.Transformer<ResponseOrError<T>, ResponseOrError<K>>() {
@Override
public Single<ResponseOrError<K>> call(final Single<ResponseOrError<T>> observableObservable) {
return flatMap(observableObservable, func);
}
};
}

@Nonnull
private static <T, K> Observable<ResponseOrError<K>> flatMap(@Nonnull final Observable<ResponseOrError<T>> observable,
@Nonnull final Func1<T, Observable<ResponseOrError<K>>> func) {
Expand All @@ -329,6 +398,23 @@ public Observable<ResponseOrError<K>> call(final ResponseOrError<T> response) {
});
}

@Nonnull
private static <T, K> Single<ResponseOrError<K>> flatMap(@Nonnull final Single<ResponseOrError<T>> single,
@Nonnull final Func1<T, Single<ResponseOrError<K>>> func) {
checkNotNull(single);
checkNotNull(func);
return single.flatMap(new Func1<ResponseOrError<T>, Single<ResponseOrError<K>>>() {
@Override
public Single<ResponseOrError<K>> call(final ResponseOrError<T> response) {
if (response.isError()) {
return Single.just(ResponseOrError.<K>fromError(response.error()));
} else {
return func.call(response.data());
}
}
});
}

/**
* Switch map only success response ignoring error
*
Expand Down