[android] Lazy holen von paginierten Objekten mit RxJava



Answers

Question

Ich bin fast an RxJava verkauft, was ein perfekter Begleiter zu Retrofit ist, aber ich kämpfe bei der Migration meines Codes in ein gemeinsames Muster: Um Bandbreite zu sparen, möchte ich (paginierte) Objekte nach Bedarf von meinem Webservice holen , während meine Listview (oder Recyclerview) mit reaktiver Programmierung scrollt.

Mein vorheriger Code war perfekt, aber die reaktive Programmierung scheint es wert zu sein.

Listening / Recyclerview-Scrolling (und andere langweilige Dinge) zu hören, ist nicht das Problem und ein Observable zu bekommen, ist einfach mit Retrofit:

@GET("/api/messages")
Observable<List<Message>> getMessages(@Path("offset") int offset, @Path("limit") int limit);

Ich kann das Muster, das ich in der reaktiven Programmierung verwende, einfach nicht herausfinden.

Der Concat Operator scheint ein guter Ausgangspunkt zu sein, zusammen mit ConnectableObservable , um die Emission zu flatMap und vielleicht flatMap , aber wie?

BEARBEITEN:

Hier ist meine aktuelle (naive) Lösung:

public interface Paged<T> {
    boolean isLoading();
    void cancel();
    void next(int count);
    void next(int count, Scheduler scheduler);
    Observable<List<T>> asObservable();
    boolean hasCompleted();
    int position();
}

Und meine Implementierung mit einem Thema:

public abstract class SimplePaged<T> implements Paged<T> {

    final PublishSubject<List<T>> subject = PublishSubject.create();
    private volatile boolean loading;
    private volatile int offset;
    private Subscription subscription;

    @Override
    public boolean isLoading() {
        return loading;
    }

    @Override
    public synchronized void cancel() {
        if(subscription != null && !subscription.isUnsubscribed())
            subscription.unsubscribe();

        if(!hasCompleted())
            subject.onCompleted();

        subscription = null;
        loading = false;
    }

    @Override
    public void next(int count) {
        next(count, null);
    }

    @Override
    public synchronized void next(int count, Scheduler scheduler) {
        if (isLoading())
            throw new IllegalStateException("you can't call next() before onNext()");

        if(hasCompleted())
            throw new IllegalStateException("you can't call next() after onCompleted()");

        loading = true;

        Observable<List<T>> obs = onNextPage(offset, count).single();

        if(scheduler != null)
            obs = obs.subscribeOn(scheduler); // BEWARE! onNext/onError/onComplete will happens on that scheduler!

        subscription = obs.subscribe(this::onNext, this::onError, this::onComplete);
    }

    @Override
    public Observable<List<T>> asObservable() {
        return subject.asObservable();
    }

    @Override
    public boolean hasCompleted() {
        return subject.hasCompleted();
    }

    @Override
    public int position() {
        return offset;
    }

    /* Warning: functions below may be called from another thread */
    protected synchronized void onNext(List<T> items) {
        if (items != null)
            offset += items.size();

        loading = false;

        if (items == null || items.size() == 0)
            subject.onCompleted();
        else
            subject.onNext(items);
    }

    protected synchronized void onError(Throwable t) {
        loading = false;
        subject.onError(t);
    }

    protected synchronized void onComplete() {
        loading = false;
    }

    abstract protected Observable<List<T>> onNextPage(int offset, int count);

}



Links