Skip to content

Commit

Permalink
Improve cache error dispatching with delayError
Browse files Browse the repository at this point in the history
  • Loading branch information
tspoke committed Jan 25, 2018
1 parent 79b68f0 commit 49211f8
Showing 1 changed file with 96 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,68 +102,70 @@ public boolean isCache() {

@Override
public Flowable<Optional<List<T>>> getAll(final Filter filter, final SortingMode sortingMode) {
List<Flowable<Optional<List<T>>>> flowables = new ArrayList<>();
Flowable<Optional<List<T>>> flowStorage = dao.getAll(filter, sortingMode);

if (hasSyncedStore()) {
flowStorage = flowStorage
.flatMap(new Function<Optional<List<T>>, Flowable<Optional<List<T>>>>() {
@Override
public Flowable<Optional<List<T>>> apply(Optional<List<T>> items) throws Exception {
final List<T> copy = new ArrayList<>(items.get());
if (filter == null) {
// full replacement, we clean up the Store dao
return syncedStore.deleteAll().map(new Function<Integer, Optional<List<T>>>() {
@Override
public Optional<List<T>> apply(Integer integer) throws Exception {
return Optional.wrap(copy);
}
});
}
return Flowable.just(Optional.wrap(copy));
}
})
.flatMap(new Function<Optional<List<T>>, Flowable<Optional<List<T>>>>() {
@Override
public Flowable<Optional<List<T>>> apply(Optional<List<T>> items) throws Exception {
return syncedStore.insertOrUpdate(items.get());
}
});

if (syncedStore.isCache()) {
return flowStorage.startWith(syncedStore.getAll(filter, sortingMode));
}

flowables.add(syncedStore.getAll(filter, sortingMode));
if (!hasSyncedStore()) {
return flowStorage;
}

final List<Flowable<Optional<List<T>>>> flowables = new ArrayList<>();
flowStorage = flowStorage
.flatMap(new Function<Optional<List<T>>, Flowable<Optional<List<T>>>>() {
@Override
public Flowable<Optional<List<T>>> apply(Optional<List<T>> items) throws Exception {
final List<T> copy = new ArrayList<>(items.get());
if (filter == null) {
// full replacement, we clean up the Store dao
return syncedStore.deleteAll().map(new Function<Integer, Optional<List<T>>>() {
@Override
public Optional<List<T>> apply(Integer integer) throws Exception {
return Optional.wrap(copy);
}
});
}
return Flowable.just(Optional.wrap(copy));
}
})
.flatMap(new Function<Optional<List<T>>, Flowable<Optional<List<T>>>>() {
@Override
public Flowable<Optional<List<T>>> apply(Optional<List<T>> items) throws Exception {
return syncedStore.insertOrUpdate(items.get());
}
});

flowables.add(syncedStore.getAll(filter, sortingMode));
flowables.add(flowStorage);
return Flowable.concat(flowables);

if (syncedStore.isCache()) {
return Flowable.concatDelayError(flowables);
} else {
return Flowable.concat(flowables);
}
}

@Override
public Flowable<Optional<List<T>>> getAll(final List<T> items) {
List<Flowable<Optional<List<T>>>> flowables = new ArrayList<>();
Flowable<Optional<List<T>>> flowStorage = dao.getAll(items);

if (hasSyncedStore()) {
flowStorage = flowStorage
.flatMap(new Function<Optional<List<T>>, Flowable<Optional<List<T>>>>() {
@Override
public Flowable<Optional<List<T>>> apply(Optional<List<T>> items) throws Exception {
return syncedStore.insertOrUpdate(items.get());
}
});
if (!hasSyncedStore()) {
return flowStorage;
}

if (syncedStore.isCache()) {
return flowStorage.startWith(syncedStore.getAll(items));
final List<Flowable<Optional<List<T>>>> flowables = new ArrayList<>();
flowStorage = flowStorage.flatMap(new Function<Optional<List<T>>, Flowable<Optional<List<T>>>>() {
@Override
public Flowable<Optional<List<T>>> apply(Optional<List<T>> items) throws Exception {
return syncedStore.insertOrUpdate(items.get());
}
});
flowables.add(syncedStore.getAll(items));
flowables.add(flowStorage);

flowables.add(syncedStore.getAll(items));
if (syncedStore.isCache()) {
return Flowable.concatDelayError(flowables);
} else {
return Flowable.concat(flowables);
}

flowables.add(flowStorage);
return Flowable.concat(flowables);
}

public final Flowable<Optional<List<T>>> getAll(final Filter filter) {
Expand All @@ -176,50 +178,52 @@ public final Flowable<Optional<List<T>>> getAll() {

@Override
public Flowable<Optional<T>> getOne(final Filter filter, final SortingMode sortingMode) {
List<Flowable<Optional<T>>> flowables = new ArrayList<>();
Flowable<Optional<T>> flowStorage = dao.getOne(filter, sortingMode);

if (hasSyncedStore()) {
flowStorage = flowStorage
.flatMap(new Function<Optional<T>, Flowable<Optional<T>>>() {
@Override
public Flowable<Optional<T>> apply(Optional<T> item) throws Exception {
return syncedStore.insertOrUpdate(item.get());
}
});
if (!hasSyncedStore()) {
return flowStorage;
}

if (syncedStore.isCache()) {
return flowStorage.startWith(syncedStore.getOne(filter, sortingMode));
final List<Flowable<Optional<T>>> flowables = new ArrayList<>();
flowStorage = flowStorage.flatMap(new Function<Optional<T>, Flowable<Optional<T>>>() {
@Override
public Flowable<Optional<T>> apply(Optional<T> item) throws Exception {
return syncedStore.insertOrUpdate(item.get());
}
});
flowables.add(syncedStore.getOne(filter, sortingMode));
flowables.add(flowStorage);

flowables.add(syncedStore.getOne(filter, sortingMode));
if (syncedStore.isCache()) {
return Flowable.concatDelayError(flowables);
} else {
return Flowable.concat(flowables);
}

flowables.add(flowStorage);
return Flowable.concat(flowables);
}

@Override
public Flowable<Optional<T>> getOne(final T item) {
List<Flowable<Optional<T>>> flowables = new ArrayList<>();
Flowable<Optional<T>> flowStorage = dao.getOne(item);

if (hasSyncedStore()) {
flowStorage = flowStorage
.flatMap(new Function<Optional<T>, Flowable<Optional<T>>>() {
@Override
public Flowable<Optional<T>> apply(Optional<T> item) throws Exception {
return syncedStore.insertOrUpdate(item.get());
}
});
if (syncedStore.isCache()) {
return flowStorage.startWith(syncedStore.getOne(item));
}
flowables.add(syncedStore.getOne(item));
if (!hasSyncedStore()) {
return flowStorage;
}

final List<Flowable<Optional<T>>> flowables = new ArrayList<>();
flowStorage = flowStorage.flatMap(new Function<Optional<T>, Flowable<Optional<T>>>() {
@Override
public Flowable<Optional<T>> apply(Optional<T> item) throws Exception {
return syncedStore.insertOrUpdate(item.get());
}
});
flowables.add(syncedStore.getOne(item));
flowables.add(flowStorage);
return Flowable.concat(flowables);

if (syncedStore.isCache()) {
return Flowable.concatDelayError(flowables);
} else {
return Flowable.concat(flowables);
}
}

public Flowable<Optional<T>> getOne(final Filter filter) {
Expand All @@ -236,25 +240,26 @@ public Flowable<Optional<T>> getOne() {

@Override
public Flowable<Optional<T>> getById(final int id) {
List<Flowable<Optional<T>>> flowables = new ArrayList<>();
Flowable<Optional<T>> flowStorage = dao.getById(id);

if (hasSyncedStore()) {
flowStorage = flowStorage
.flatMap(new Function<Optional<T>, Flowable<Optional<T>>>() {
@Override
public Flowable<Optional<T>> apply(final Optional<T> item) throws Exception {
return syncedStore.insertOrUpdate(item.get());
}
});
if (syncedStore.isCache()) {
return flowStorage.startWith(syncedStore.getById(id));
}
flowables.add(syncedStore.getById(id));
if (!hasSyncedStore()) {
return flowStorage;
}

final List<Flowable<Optional<T>>> flowables = new ArrayList<>();
flowStorage = flowStorage.flatMap(new Function<Optional<T>, Flowable<Optional<T>>>() {
@Override
public Flowable<Optional<T>> apply(final Optional<T> item) throws Exception {
return syncedStore.insertOrUpdate(item.get());
}
});
flowables.add(syncedStore.getById(id));
flowables.add(flowStorage);
return Flowable.concat(flowables);

if (syncedStore.isCache()) {
return Flowable.concatDelayError(flowables);
} else {
return Flowable.concat(flowables);
}
}

@Override
Expand Down

0 comments on commit 49211f8

Please sign in to comment.