Skip to content
This repository has been archived by the owner on Apr 13, 2023. It is now read-only.

Commit

Permalink
Add floawable support
Browse files Browse the repository at this point in the history
  • Loading branch information
zchu committed Oct 11, 2017
1 parent 1cfe9d1 commit 5d64047
Show file tree
Hide file tree
Showing 14 changed files with 90 additions and 63 deletions.
11 changes: 1 addition & 10 deletions rxcache/src/main/java/com/zchu/rxcache/LruMemoryCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
import com.zchu.rxcache.utils.MemorySizeOf;
import com.zchu.rxcache.utils.Occupy;

import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;

/**
Expand Down Expand Up @@ -78,14 +76,7 @@ private long countSize(Object value) {
if (value instanceof Bitmap) {
LogUtils.debug("Bitmap");
size = MemorySizeOf.sizeOf((Bitmap) value);
} else if (value instanceof Serializable) {
LogUtils.debug("Serializable");
try {
size = MemorySizeOf.sizeOf((Serializable) value);
} catch (IOException e) {
size = occupy.occupyof(value);
}
} else {
}else {
size = occupy.occupyof(value);
}
LogUtils.debug("size=" + size + " value=" + value);
Expand Down
25 changes: 15 additions & 10 deletions rxcache/src/main/java/com/zchu/rxcache/RxCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
import com.zchu.rxcache.data.CacheResult;
import com.zchu.rxcache.diskconverter.IDiskConverter;
import com.zchu.rxcache.diskconverter.SerializableDiskConverter;
import com.zchu.rxcache.stategy.IStrategy;
import com.zchu.rxcache.stategy.IFlowableStrategy;
import com.zchu.rxcache.stategy.IObservableStrategy;
import com.zchu.rxcache.utils.LogUtils;

import org.reactivestreams.Publisher;
Expand Down Expand Up @@ -49,12 +50,16 @@ private RxCache(Builder builder) {

}

/**
*
* notice: Deprecated! Use {@link #transformObservable(String, Type, IObservableStrategy)} ()} replace.
*/
@Deprecated
public <T> ObservableTransformer<T, CacheResult<T>> transformer(String key, Type type, IStrategy strategy) {
public <T> ObservableTransformer<T, CacheResult<T>> transformer(String key, Type type, IObservableStrategy strategy) {
return transformObservable(key, type, strategy);
}

public <T> ObservableTransformer<T, CacheResult<T>> transformObservable(final String key, final Type type, final IStrategy strategy) {
public <T> ObservableTransformer<T, CacheResult<T>> transformObservable(final String key, final Type type, final IObservableStrategy strategy) {
return new ObservableTransformer<T, CacheResult<T>>() {
@Override
public ObservableSource<CacheResult<T>> apply(Observable<T> tObservable) {
Expand All @@ -63,7 +68,7 @@ public ObservableSource<CacheResult<T>> apply(Observable<T> tObservable) {
};
}

public <T> FlowableTransformer<T, CacheResult<T>> transformFlowable(final String key, final Type type, final IStrategy strategy) {
public <T> FlowableTransformer<T, CacheResult<T>> transformFlowable(final String key, final Type type, final IFlowableStrategy strategy) {
return new FlowableTransformer<T, CacheResult<T>>() {
@Override
public Publisher<CacheResult<T>> apply(Flowable<T> flowable) {
Expand Down Expand Up @@ -92,11 +97,11 @@ public void subscribe(ObservableEmitter<T> observableEmitter) throws Exception {
/**
* 读取
*/
public <T> Flowable<T> load2(String key, Type type) {
return load2(key, type, BackpressureStrategy.BUFFER);
public <T> Flowable<T> load2Flowable(String key, Type type) {
return load2Flowable(key, type, BackpressureStrategy.LATEST);
}

public <T> Flowable<T> load2(final String key, final Type type, BackpressureStrategy backpressureStrategy) {
public <T> Flowable<T> load2Flowable(final String key, final Type type, BackpressureStrategy backpressureStrategy) {
return Flowable.create(new FlowableOnSubscribe<T>() {
@Override
public void subscribe(FlowableEmitter<T> flowableEmitter) throws Exception {
Expand Down Expand Up @@ -128,14 +133,14 @@ public void subscribe(ObservableEmitter<Boolean> observableEmitter) throws Excep
/**
* 保存
*/
public <T> Flowable<Boolean> save2(final String key, final T value, final CacheTarget target) {
return save2(key, value, target, BackpressureStrategy.BUFFER);
public <T> Flowable<Boolean> save2Flowable(final String key, final T value, final CacheTarget target) {
return save2Flowable(key, value, target, BackpressureStrategy.LATEST);
}

/**
* 保存
*/
public <T> Flowable<Boolean> save2(final String key, final T value, final CacheTarget target, BackpressureStrategy strategy) {
public <T> Flowable<Boolean> save2Flowable(final String key, final T value, final CacheTarget target, BackpressureStrategy strategy) {
return Flowable.create(new FlowableOnSubscribe<Boolean>() {
@Override
public void subscribe(FlowableEmitter<Boolean> flowableEmitter) throws Exception {
Expand Down
4 changes: 2 additions & 2 deletions rxcache/src/main/java/com/zchu/rxcache/RxCacheHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public CacheResult<T> apply(@NonNull Throwable throwable) throws Exception {

public static <T> Flowable<CacheResult<T>> loadCacheFlowable(final RxCache rxCache, final String key, Type type, final boolean needEmpty) {
Flowable<CacheResult<T>> flowable = rxCache
.<T>load2(key, type)
.<T>load2Flowable(key, type)
.flatMap(new Function<T, Publisher<CacheResult<T>>>() {
@Override
public Publisher<CacheResult<T>> apply(@NonNull T t) throws Exception {
Expand Down Expand Up @@ -212,7 +212,7 @@ public Publisher<? extends CacheResult<T>> apply(@NonNull Throwable throwable) t

public static <T> Flowable<CacheResult<T>> saveCacheSyncFlowable(RxCache rxCache, final String key, final T t, CacheTarget target) {
return rxCache
.save2(key, t, target)
.save2Flowable(key, t, target)
.map(new Function<Boolean, CacheResult<T>>() {
@Override
public CacheResult<T> apply(@NonNull Boolean aBoolean) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@

/**
* 作者: 赵成柱 on 2016/9/12 0012.
* notice: Deprecated!
*/
@Deprecated
public abstract class BaseStrategy implements IStrategy {

@Deprecated
protected <T> Observable<CacheResult<T>> loadCache(final RxCache rxCache, final String key, Type type, final boolean needEmpty) {
Observable<CacheResult<T>> observable = rxCache
.<T>load(key, type)
Expand All @@ -45,6 +48,7 @@ public ObservableSource<? extends CacheResult<T>> apply(@NonNull Throwable throw
return observable;
}

@Deprecated
protected <T> Observable<CacheResult<T>> loadRemote(final RxCache rxCache, final String key, Observable<T> source, final CacheTarget target, final boolean needEmpty) {
Observable<CacheResult<T>> observable = source
.map(new Function<T, CacheResult<T>>() {
Expand Down Expand Up @@ -85,7 +89,7 @@ public ObservableSource<? extends CacheResult<T>> apply(@NonNull Throwable throw
return observable;
}


@Deprecated
protected <T> Observable<CacheResult<T>> loadRemoteSync(final RxCache rxCache, final String key, Observable<T> source, final CacheTarget target, final boolean needEmpty) {
Observable<CacheResult<T>> observable = source
.flatMap(new Function<T, ObservableSource<CacheResult<T>>>() {
Expand All @@ -106,6 +110,7 @@ public ObservableSource<? extends CacheResult<T>> apply(@NonNull Throwable throw

}

@Deprecated
protected <T> Observable<CacheResult<T>> saveCacheSync(RxCache rxCache, final String key, final T t, CacheTarget target) {
return rxCache.save(key, t, target)
.map(new Function<Boolean, CacheResult<T>>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
* 先缓存,后网络
* 作者: 赵成柱 on 2016/9/12 0012.
*/
public final class CacheAndRemoteStrategy extends BaseStrategy {
public final class CacheAndRemoteStrategy implements IStrategy {
private boolean isSync;

public CacheAndRemoteStrategy() {
Expand All @@ -31,12 +31,12 @@ public CacheAndRemoteStrategy(boolean isSync) {

@Override
public <T> Observable<CacheResult<T>> execute(RxCache rxCache, String key, Observable<T> source, Type type) {
Observable<CacheResult<T>> cache = loadCache(rxCache, key, type,true);
Observable<CacheResult<T>> cache = RxCacheHelper.loadCache(rxCache, key, type, true);
Observable<CacheResult<T>> remote;
if (isSync) {
remote = loadRemoteSync(rxCache, key, source, CacheTarget.MemoryAndDisk,false);
remote = RxCacheHelper.loadRemoteSync(rxCache, key, source, CacheTarget.MemoryAndDisk, false);
} else {
remote = loadRemote(rxCache, key, source, CacheTarget.MemoryAndDisk,false);
remote = RxCacheHelper.loadRemote(rxCache, key, source, CacheTarget.MemoryAndDisk, false);
}
return Observable.concat(cache, remote)
.filter(new Predicate<CacheResult<T>>() {
Expand All @@ -49,12 +49,12 @@ public boolean test(@NonNull CacheResult<T> result) throws Exception {

@Override
public <T> Publisher<CacheResult<T>> flow(RxCache rxCache, String key, Flowable<T> source, Type type) {
Flowable<CacheResult<T>> cache = RxCacheHelper.loadCacheFlowable(rxCache, key, type,true);
Flowable<CacheResult<T>> cache = RxCacheHelper.loadCacheFlowable(rxCache, key, type, true);
Flowable<CacheResult<T>> remote;
if (isSync) {
remote = RxCacheHelper.loadRemoteSyncFlowable(rxCache, key, source, CacheTarget.MemoryAndDisk,false);
remote = RxCacheHelper.loadRemoteSyncFlowable(rxCache, key, source, CacheTarget.MemoryAndDisk, false);
} else {
remote =RxCacheHelper.loadRemoteFlowable(rxCache, key, source, CacheTarget.MemoryAndDisk,false);
remote = RxCacheHelper.loadRemoteFlowable(rxCache, key, source, CacheTarget.MemoryAndDisk, false);
}
return Flowable.concat(cache, remote)
.filter(new Predicate<CacheResult<T>>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* 优先缓存
* 作者: 赵成柱 on 2016/9/12 0012.
*/
public final class FirstCacheStrategy extends BaseStrategy {
public final class FirstCacheStrategy implements IStrategy {
private boolean isSync;

public FirstCacheStrategy() {
Expand All @@ -29,12 +29,12 @@ public FirstCacheStrategy(boolean isSync) {

@Override
public <T> Observable<CacheResult<T>> execute(RxCache rxCache, String key, Observable<T> source, Type type) {
Observable<CacheResult<T>> cache = loadCache(rxCache, key, type,true);
Observable<CacheResult<T>> cache = RxCacheHelper.loadCache(rxCache, key, type,true);
Observable<CacheResult<T>> remote;
if (isSync) {
remote = loadRemoteSync(rxCache, key, source, CacheTarget.MemoryAndDisk,false);
remote = RxCacheHelper.loadRemoteSync(rxCache, key, source, CacheTarget.MemoryAndDisk,false);
} else {
remote = loadRemote(rxCache, key, source, CacheTarget.MemoryAndDisk,false);
remote = RxCacheHelper.loadRemote(rxCache, key, source, CacheTarget.MemoryAndDisk,false);
}
return cache.switchIfEmpty(remote);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* 优先网络
* 作者: 赵成柱 on 2016/9/12 0012.
*/
public final class FirstRemoteStrategy extends BaseStrategy {
public final class FirstRemoteStrategy implements IStrategy {
private boolean isSync;

public FirstRemoteStrategy() {
Expand All @@ -30,15 +30,15 @@ public FirstRemoteStrategy(boolean isSync) {

@Override
public <T> Observable<CacheResult<T>> execute(RxCache rxCache, String key, Observable<T> source, Type type) {
Observable<CacheResult<T>> cache = loadCache(rxCache, key, type, true);
Observable<CacheResult<T>> cache = RxCacheHelper.loadCache(rxCache, key, type, true);
Observable<CacheResult<T>> remote;
if (isSync) {
remote = loadRemoteSync(rxCache, key, source, CacheTarget.MemoryAndDisk, false);
remote = RxCacheHelper.loadRemoteSync(rxCache, key, source, CacheTarget.MemoryAndDisk, false);
} else {
remote = loadRemote(rxCache, key, source, CacheTarget.MemoryAndDisk, false);
remote = RxCacheHelper.loadRemote(rxCache, key, source, CacheTarget.MemoryAndDisk, false);
}
return Observable
.concatDelayError(Arrays.asList(cache,remote))
.concatDelayError(Arrays.asList(remote,cache))
.take(1);
}

Expand All @@ -49,11 +49,10 @@ public <T> Publisher<CacheResult<T>> flow(RxCache rxCache, String key, Flowable<
if (isSync) {
remote = RxCacheHelper.loadRemoteSyncFlowable(rxCache, key, source, CacheTarget.MemoryAndDisk, false);
} else {

remote =RxCacheHelper.loadRemoteFlowable(rxCache, key, source, CacheTarget.MemoryAndDisk, false);
}
return Flowable
.concatDelayError(Arrays.asList(cache,remote))
.concatDelayError(Arrays.asList(remote,cache))
.take(1);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.zchu.rxcache.stategy;


import com.zchu.rxcache.RxCache;
import com.zchu.rxcache.data.CacheResult;

import org.reactivestreams.Publisher;

import java.lang.reflect.Type;

import io.reactivex.Flowable;


/**
* author : zchu
* date : 2017/10/11
* desc :
*/
public interface IFlowableStrategy {

<T> Publisher<CacheResult<T>> flow(RxCache rxCache, String key, Flowable<T> source, Type type);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.zchu.rxcache.stategy;


import com.zchu.rxcache.RxCache;
import com.zchu.rxcache.data.CacheResult;

import java.lang.reflect.Type;

import io.reactivex.Observable;


/**
* author : zchu
* date : 2017/10/11
* desc :
*/
public interface IObservableStrategy {

<T> Observable<CacheResult<T>> execute(RxCache rxCache, String key, Observable<T> source, Type type);
}
16 changes: 1 addition & 15 deletions rxcache/src/main/java/com/zchu/rxcache/stategy/IStrategy.java
Original file line number Diff line number Diff line change
@@ -1,23 +1,9 @@
package com.zchu.rxcache.stategy;


import com.zchu.rxcache.RxCache;
import com.zchu.rxcache.data.CacheResult;

import org.reactivestreams.Publisher;

import java.lang.reflect.Type;

import io.reactivex.Flowable;
import io.reactivex.Observable;


/**
* 作者: 赵成柱 on 2016/9/12 0012.
*/
public interface IStrategy {

<T> Observable<CacheResult<T>> execute(RxCache rxCache, String key, Observable<T> source, Type type);
public interface IStrategy extends IObservableStrategy,IFlowableStrategy {

<T> Publisher<CacheResult<T>> flow(RxCache rxCache, String key, Flowable<T> source, Type type);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
* 仅加载网络,不缓存
*/

public final class NoneStrategy implements IStrategy {
public final class NoneStrategy implements IStrategy {

private NoneStrategy() {
}
Expand All @@ -28,7 +28,6 @@ private NoneStrategy() {

@Override
public <T> Observable<CacheResult<T>> execute(RxCache rxCache, final String key, Observable<T> source, Type type) {

return source.map(new Function<T, CacheResult<T>>() {
@Override
public CacheResult<T> apply(@NonNull T t) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* 仅加载缓存
* 作者: 赵成柱 on 2016/9/12 0012.
*/
public final class OnlyCacheStrategy extends BaseStrategy {
public final class OnlyCacheStrategy implements IStrategy {

@Override
public <T> Observable<CacheResult<T>> execute(RxCache rxCache, String key, Observable<T> source, Type type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* 仅加载网络,但数据依然会被缓存
* 作者: 赵成柱 on 2016/9/12 0012.
*/
class OnlyRemoteStrategy extends BaseStrategy {
class OnlyRemoteStrategy implements IStrategy {
private boolean isSync;

public OnlyRemoteStrategy() {
Expand All @@ -31,9 +31,9 @@ public OnlyRemoteStrategy(boolean isSync) {
@Override
public <T> Observable<CacheResult<T>> execute(RxCache rxCache, String key, Observable<T> source, Type type) {
if (isSync) {
return loadRemoteSync(rxCache, key, source, CacheTarget.MemoryAndDisk, false);
return RxCacheHelper.loadRemoteSync(rxCache, key, source, CacheTarget.MemoryAndDisk, false);
} else {
return loadRemote(rxCache, key, source, CacheTarget.MemoryAndDisk, false);
return RxCacheHelper.loadRemote(rxCache, key, source, CacheTarget.MemoryAndDisk, false);
}
}

Expand Down
2 changes: 1 addition & 1 deletion sample/src/main/java/com/zchu/sample/MainActivity.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public List<GankBean.ResultsBean> apply(@NonNull GankBean gankBean) throws Excep
return gankBean.getResults();
}
})
.compose(rxCache.<List<GankBean.ResultsBean>>transformer("custom_key", new TypeToken<List<GankBean.ResultsBean>>() {
.compose(rxCache.<List<GankBean.ResultsBean>>transformObservable("custom_key", new TypeToken<List<GankBean.ResultsBean>>() {
}.getType(), strategy))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
Expand Down

0 comments on commit 5d64047

Please sign in to comment.