library paging android Lazy holen von paginierten Objekten mit RxJava


0 Answers

android paging tutorial

Ich bin fast an RxJava verkauft, was ein perfekter Begleiter zu Retrofit ist, aber ich kämpfe bei der Migration meines Codes in ein gemeinsames Muster: Um Bandbreite zu sparen, möchte ich (paginierte) Objekte nach Bedarf von meinem Webservice holen , während meine Listview (oder Recyclerview) mit reaktiver Programmierung scrollt.

Mein vorheriger Code war perfekt, aber die reaktive Programmierung scheint es wert zu sein.

Listening / Recyclerview-Scrolling (und andere langweilige Dinge) zu hören, ist nicht das Problem und ein Observable zu bekommen, ist einfach mit Retrofit:

@GET("/api/messages")
Observable<List<Message>> getMessages(@Path("offset") int offset, @Path("limit") int limit);

Ich kann das Muster, das ich in der reaktiven Programmierung verwende, einfach nicht herausfinden.

Der Concat Operator scheint ein guter Ausgangspunkt zu sein, zusammen mit ConnectableObservable , um die Emission zu flatMap und vielleicht flatMap , aber wie?

BEARBEITEN:

Hier ist meine aktuelle (naive) Lösung:

public interface Paged<T> {
    boolean isLoading();
    void cancel();
    void next(int count);
    void next(int count, Scheduler scheduler);
    Observable<List<T>> asObservable();
    boolean hasCompleted();
    int position();
}

Und meine Implementierung mit einem Thema:

public abstract class SimplePaged<T> implements Paged<T> {

    final PublishSubject<List<T>> subject = PublishSubject.create();
    private volatile boolean loading;
    private volatile int offset;
    private Subscription subscription;

    @Override
    public boolean isLoading() {
        return loading;
    }

    @Override
    public synchronized void cancel() {
        if(subscription != null && !subscription.isUnsubscribed())
            subscription.unsubscribe();

        if(!hasCompleted())
            subject.onCompleted();

        subscription = null;
        loading = false;
    }

    @Override
    public void next(int count) {
        next(count, null);
    }

    @Override
    public synchronized void next(int count, Scheduler scheduler) {
        if (isLoading())
            throw new IllegalStateException("you can't call next() before onNext()");

        if(hasCompleted())
            throw new IllegalStateException("you can't call next() after onCompleted()");

        loading = true;

        Observable<List<T>> obs = onNextPage(offset, count).single();

        if(scheduler != null)
            obs = obs.subscribeOn(scheduler); // BEWARE! onNext/onError/onComplete will happens on that scheduler!

        subscription = obs.subscribe(this::onNext, this::onError, this::onComplete);
    }

    @Override
    public Observable<List<T>> asObservable() {
        return subject.asObservable();
    }

    @Override
    public boolean hasCompleted() {
        return subject.hasCompleted();
    }

    @Override
    public int position() {
        return offset;
    }

    /* Warning: functions below may be called from another thread */
    protected synchronized void onNext(List<T> items) {
        if (items != null)
            offset += items.size();

        loading = false;

        if (items == null || items.size() == 0)
            subject.onCompleted();
        else
            subject.onNext(items);
    }

    protected synchronized void onError(Throwable t) {
        loading = false;
        subject.onError(t);
    }

    protected synchronized void onComplete() {
        loading = false;
    }

    abstract protected Observable<List<T>> onNextPage(int offset, int count);

}


Related