From 7e4588450c22d1634a6328abc068bc6308ac6c04 Mon Sep 17 00:00:00 2001 From: Marin Todorov Date: Sat, 4 Jun 2016 13:37:35 +0200 Subject: [PATCH 1/8] * rx_add and rx_delete --- Example/RxRealm.xcodeproj/project.pbxproj | 40 ++- Example/RxRealm/ViewController.swift | 50 +-- Example/RxRealm_Tests/RxRealmTests.swift | 167 ---------- Example/RxRealm_Tests/RxRealmWriteSinks.swift | 310 ++++++++++++++++++ Example/RxRealm_Tests/TestModels.swift | 20 +- Pod/Classes/RealmObserver.swift | 33 +- Pod/Classes/RxRealm.swift | 109 +++++- RxRealm.podspec | 2 +- 8 files changed, 498 insertions(+), 233 deletions(-) create mode 100644 Example/RxRealm_Tests/RxRealmWriteSinks.swift diff --git a/Example/RxRealm.xcodeproj/project.pbxproj b/Example/RxRealm.xcodeproj/project.pbxproj index 7a1d5e1..e770f5c 100644 --- a/Example/RxRealm.xcodeproj/project.pbxproj +++ b/Example/RxRealm.xcodeproj/project.pbxproj @@ -14,6 +14,7 @@ 607FACDB1AFB9204008FA782 /* Main.storyboard in Resources */ = {isa = PBXBuildFile; fileRef = 607FACD91AFB9204008FA782 /* Main.storyboard */; }; 607FACDD1AFB9204008FA782 /* Images.xcassets in Resources */ = {isa = PBXBuildFile; fileRef = 607FACDC1AFB9204008FA782 /* Images.xcassets */; }; 607FACE01AFB9204008FA782 /* LaunchScreen.xib in Resources */ = {isa = PBXBuildFile; fileRef = 607FACDE1AFB9204008FA782 /* LaunchScreen.xib */; }; + 9C3182371D02BAA90003F1EB /* RxRealmWriteSinks.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9C3182361D02BAA90003F1EB /* RxRealmWriteSinks.swift */; }; 9CBA07791CD4A5FA00ABF96E /* RxRealmResultsTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9CBA07781CD4A5FA00ABF96E /* RxRealmResultsTests.swift */; }; 9CBA077C1CD4A68000ABF96E /* TestModels.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9CBA077A1CD4A62300ABF96E /* TestModels.swift */; }; 9CEB7A4A1CC834340077C44D /* RxRealmTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9CEB7A491CC834340077C44D /* RxRealmTests.swift */; }; @@ -44,6 +45,7 @@ 607FACDF1AFB9204008FA782 /* Base */ = {isa = PBXFileReference; lastKnownFileType = file.xib; name = Base; path = Base.lproj/LaunchScreen.xib; sourceTree = ""; }; 69375A1E91DF8BF5A1D8E7AE /* Pods_RxRealm_Tests.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = Pods_RxRealm_Tests.framework; sourceTree = BUILT_PRODUCTS_DIR; }; 74D9A8127363946A03ABDCFB /* RxRealm.podspec */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = text; name = RxRealm.podspec; path = ../RxRealm.podspec; sourceTree = ""; }; + 9C3182361D02BAA90003F1EB /* RxRealmWriteSinks.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RxRealmWriteSinks.swift; sourceTree = ""; }; 9CBA07781CD4A5FA00ABF96E /* RxRealmResultsTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RxRealmResultsTests.swift; sourceTree = ""; }; 9CBA077A1CD4A62300ABF96E /* TestModels.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = TestModels.swift; sourceTree = ""; }; 9CEB7A471CC834340077C44D /* RxRealm_Tests.xctest */ = {isa = PBXFileReference; explicitFileType = wrapper.cfbundle; includeInIndex = 0; path = RxRealm_Tests.xctest; sourceTree = BUILT_PRODUCTS_DIR; }; @@ -156,6 +158,7 @@ isa = PBXGroup; children = ( 9CBA077A1CD4A62300ABF96E /* TestModels.swift */, + 9C3182361D02BAA90003F1EB /* RxRealmWriteSinks.swift */, 9CEB7A491CC834340077C44D /* RxRealmTests.swift */, 9CBA07781CD4A5FA00ABF96E /* RxRealmResultsTests.swift */, 9CEC5B351CD692A600B43868 /* RxRealmListTests.swift */, @@ -174,12 +177,12 @@ isa = PBXNativeTarget; buildConfigurationList = 607FACEF1AFB9204008FA782 /* Build configuration list for PBXNativeTarget "RxRealm_Example" */; buildPhases = ( - 327D603D4690007E8B638B0E /* 📦 Check Pods Manifest.lock */, + 327D603D4690007E8B638B0E /* [CP] Check Pods Manifest.lock */, 607FACCC1AFB9204008FA782 /* Sources */, 607FACCD1AFB9204008FA782 /* Frameworks */, 607FACCE1AFB9204008FA782 /* Resources */, - 7986DE64790C01CA0573C129 /* 📦 Embed Pods Frameworks */, - 70C190BAB54D5CB1641CCE04 /* 📦 Copy Pods Resources */, + 7986DE64790C01CA0573C129 /* [CP] Embed Pods Frameworks */, + 70C190BAB54D5CB1641CCE04 /* [CP] Copy Pods Resources */, ); buildRules = ( ); @@ -194,12 +197,12 @@ isa = PBXNativeTarget; buildConfigurationList = 9CEB7A501CC834340077C44D /* Build configuration list for PBXNativeTarget "RxRealm_Tests" */; buildPhases = ( - 47EF88764B0DDD36B74C66E8 /* 📦 Check Pods Manifest.lock */, + 47EF88764B0DDD36B74C66E8 /* [CP] Check Pods Manifest.lock */, 9CEB7A431CC834340077C44D /* Sources */, 9CEB7A441CC834340077C44D /* Frameworks */, 9CEB7A451CC834340077C44D /* Resources */, - 051741DAB4D8631AD7E12258 /* 📦 Embed Pods Frameworks */, - 87796072080EEF71421C3ED1 /* 📦 Copy Pods Resources */, + 051741DAB4D8631AD7E12258 /* [CP] Embed Pods Frameworks */, + 87796072080EEF71421C3ED1 /* [CP] Copy Pods Resources */, ); buildRules = ( ); @@ -269,14 +272,14 @@ /* End PBXResourcesBuildPhase section */ /* Begin PBXShellScriptBuildPhase section */ - 051741DAB4D8631AD7E12258 /* 📦 Embed Pods Frameworks */ = { + 051741DAB4D8631AD7E12258 /* [CP] Embed Pods Frameworks */ = { isa = PBXShellScriptBuildPhase; buildActionMask = 2147483647; files = ( ); inputPaths = ( ); - name = "📦 Embed Pods Frameworks"; + name = "[CP] Embed Pods Frameworks"; outputPaths = ( ); runOnlyForDeploymentPostprocessing = 0; @@ -284,14 +287,14 @@ shellScript = "\"${SRCROOT}/Pods/Target Support Files/Pods-RxRealm_Tests/Pods-RxRealm_Tests-frameworks.sh\"\n"; showEnvVarsInLog = 0; }; - 327D603D4690007E8B638B0E /* 📦 Check Pods Manifest.lock */ = { + 327D603D4690007E8B638B0E /* [CP] Check Pods Manifest.lock */ = { isa = PBXShellScriptBuildPhase; buildActionMask = 2147483647; files = ( ); inputPaths = ( ); - name = "📦 Check Pods Manifest.lock"; + name = "[CP] Check Pods Manifest.lock"; outputPaths = ( ); runOnlyForDeploymentPostprocessing = 0; @@ -299,14 +302,14 @@ shellScript = "diff \"${PODS_ROOT}/../Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [[ $? != 0 ]] ; then\n cat << EOM\nerror: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\nEOM\n exit 1\nfi\n"; showEnvVarsInLog = 0; }; - 47EF88764B0DDD36B74C66E8 /* 📦 Check Pods Manifest.lock */ = { + 47EF88764B0DDD36B74C66E8 /* [CP] Check Pods Manifest.lock */ = { isa = PBXShellScriptBuildPhase; buildActionMask = 2147483647; files = ( ); inputPaths = ( ); - name = "📦 Check Pods Manifest.lock"; + name = "[CP] Check Pods Manifest.lock"; outputPaths = ( ); runOnlyForDeploymentPostprocessing = 0; @@ -314,14 +317,14 @@ shellScript = "diff \"${PODS_ROOT}/../Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [[ $? != 0 ]] ; then\n cat << EOM\nerror: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\nEOM\n exit 1\nfi\n"; showEnvVarsInLog = 0; }; - 70C190BAB54D5CB1641CCE04 /* 📦 Copy Pods Resources */ = { + 70C190BAB54D5CB1641CCE04 /* [CP] Copy Pods Resources */ = { isa = PBXShellScriptBuildPhase; buildActionMask = 2147483647; files = ( ); inputPaths = ( ); - name = "📦 Copy Pods Resources"; + name = "[CP] Copy Pods Resources"; outputPaths = ( ); runOnlyForDeploymentPostprocessing = 0; @@ -329,14 +332,14 @@ shellScript = "\"${SRCROOT}/Pods/Target Support Files/Pods-RxRealm_Example/Pods-RxRealm_Example-resources.sh\"\n"; showEnvVarsInLog = 0; }; - 7986DE64790C01CA0573C129 /* 📦 Embed Pods Frameworks */ = { + 7986DE64790C01CA0573C129 /* [CP] Embed Pods Frameworks */ = { isa = PBXShellScriptBuildPhase; buildActionMask = 2147483647; files = ( ); inputPaths = ( ); - name = "📦 Embed Pods Frameworks"; + name = "[CP] Embed Pods Frameworks"; outputPaths = ( ); runOnlyForDeploymentPostprocessing = 0; @@ -344,14 +347,14 @@ shellScript = "\"${SRCROOT}/Pods/Target Support Files/Pods-RxRealm_Example/Pods-RxRealm_Example-frameworks.sh\"\n"; showEnvVarsInLog = 0; }; - 87796072080EEF71421C3ED1 /* 📦 Copy Pods Resources */ = { + 87796072080EEF71421C3ED1 /* [CP] Copy Pods Resources */ = { isa = PBXShellScriptBuildPhase; buildActionMask = 2147483647; files = ( ); inputPaths = ( ); - name = "📦 Copy Pods Resources"; + name = "[CP] Copy Pods Resources"; outputPaths = ( ); runOnlyForDeploymentPostprocessing = 0; @@ -376,6 +379,7 @@ buildActionMask = 2147483647; files = ( 9CEB7A4A1CC834340077C44D /* RxRealmTests.swift in Sources */, + 9C3182371D02BAA90003F1EB /* RxRealmWriteSinks.swift in Sources */, 9CFB8DC21CF22BA4004BC090 /* RxRealmRealmTests.swift in Sources */, 9CBA07791CD4A5FA00ABF96E /* RxRealmResultsTests.swift in Sources */, 9CEC5B381CD6942E00B43868 /* RxRealmLinkingObjectsTests.swift in Sources */, diff --git a/Example/RxRealm/ViewController.swift b/Example/RxRealm/ViewController.swift index 69b5862..e5797ca 100644 --- a/Example/RxRealm/ViewController.swift +++ b/Example/RxRealm/ViewController.swift @@ -24,36 +24,36 @@ class ViewController: UIViewController { override func viewDidLoad() { super.viewDidLoad() + /* + Observable> - wrap Results as observable + */ + realm.objects(Lap).asObservable() + .map {laps in "\(laps.count) laps"} + .subscribeNext {[unowned self]text in + self.title = text + } + .addDisposableTo(bag) - - //Observable> - let lapCount = realm.objects(Lap).asObservable().map {laps in "\(laps.count) laps"} - lapCount.subscribeNext {[unowned self]text in - self.title = text - }.addDisposableTo(bag) - - //Observable> - let laps = realm.objects(Lap).sorted("time", ascending: false).asObservableArray() - - laps - .bindTo(tableView.rx_itemsWithCellIdentifier("Cell", cellType: UITableViewCell.self)) {row, element, cell in - cell.textLabel!.text = formatter.stringFromDate(NSDate(timeIntervalSinceReferenceDate: element.time)) - }.addDisposableTo(bag) - - + /* + Observable> - convert Results to Array and wrap as observable + */ + realm.objects(Lap).sorted("time", ascending: false).asObservableArray() + .map {array in array.prefix(5) } + .bindTo(tableView.rx_itemsWithCellIdentifier("Cell", cellType: UITableViewCell.self)) {row, element, cell in + cell.textLabel!.text = formatter.stringFromDate(NSDate(timeIntervalSinceReferenceDate: element.time)) + }.addDisposableTo(bag) + + /* + Use bindable sinks to add objects + */ addOneItemButton.rx_tap - .map { - return Lap() - } - .bindTo(realm.rx_add()) + .map { Lap() } + .bindTo(Realm.rx_add()) .addDisposableTo(bag) addTwoItemsButton.rx_tap - .map { - return [Lap(), Lap()] - } - .bindTo(realm.rx_add()) + .map { [Lap(), Lap()] } + .bindTo(Realm.rx_add()) .addDisposableTo(bag) - } } \ No newline at end of file diff --git a/Example/RxRealm_Tests/RxRealmTests.swift b/Example/RxRealm_Tests/RxRealmTests.swift index 7aef5ef..9810087 100644 --- a/Example/RxRealm_Tests/RxRealmTests.swift +++ b/Example/RxRealm_Tests/RxRealmTests.swift @@ -6,7 +6,6 @@ import XCTest -import Pods_RxRealm_Tests import RxSwift import RealmSwift import RxRealm @@ -243,170 +242,4 @@ class RxRealm_Tests: XCTestCase { } } - func testRxAddObject() { - let expectation = expectationWithDescription("Message1") - let realm = realmInMemory(#function) - let bag = DisposeBag() - let events = [ - next(0, Message("1")), - completed(0) - ] - - let rx_add: AnyObserver = realm.rx_add() - let scheduler = TestScheduler(initialClock: 0) - let observer = scheduler.createObserver(Array.self) - let observable = scheduler.createHotObservable(events).asObservable() - let messages$ = realm.objects(Message).asObservableArray().shareReplay(1) - - - messages$.subscribe(observer) - .addDisposableTo(bag) - - messages$.subscribeNext { - switch $0.count { - case 1: - expectation.fulfill() - default: - break - } - }.addDisposableTo(bag) - - observable - .subscribe(rx_add) - .addDisposableTo(bag) - - scheduler.start() - - waitForExpectationsWithTimeout(0.1, handler: nil) - - - XCTAssertEqual(observer.events.count, 1) - XCTAssertEqual(observer.events[0].time, 0) - XCTAssertTrue(observer.events[0].value.element!.equalTo([Message("1")])) - } - - func testRxAddObjects() { - let expectation = expectationWithDescription("Message1") - let realm = realmInMemory(#function) - let bag = DisposeBag() - let events = [ - next(0, [Message("1"), Message("2")]), - completed(0) - ] - - let rx_add: AnyObserver<[Message]> = realm.rx_add() - let scheduler = TestScheduler(initialClock: 0) - let observer = scheduler.createObserver(Array.self) - let observable = scheduler.createHotObservable(events).asObservable() - let messages$ = realm.objects(Message).asObservableArray().shareReplay(1) - - observable.subscribe(rx_add) - .addDisposableTo(bag) - - messages$.subscribe(observer) - .addDisposableTo(bag) - - messages$.subscribeNext { - switch $0.count { - case 2: - expectation.fulfill() - default: - break - } - }.addDisposableTo(bag) - - scheduler.start() - - waitForExpectationsWithTimeout(0.1, handler: nil) - - XCTAssertEqual(observer.events.count, 1) - XCTAssertEqual(observer.events[0].time, 0) - XCTAssertTrue(observer.events[0].value.element!.equalTo([Message("1"), Message("2")])) - } - - func testRxDeleteItem() { - let expectation = expectationWithDescription("Message1") - let realm = realmInMemory(#function) - let element = Message("1") - let scheduler = TestScheduler(initialClock: 0) - let messages$ = realm.objects(Message).asObservableArray().shareReplay(1) - let rx_delete: AnyObserver = realm.rx_delete() - - try! realm.write { - realm.add(element) - } - let bag = DisposeBag() - let events = [ - next(0, element), - completed(0) - ] - let observer = scheduler.createObserver(Array.self) - let observable = scheduler.createHotObservable(events).asObservable() - - observable.subscribe(rx_delete) - .addDisposableTo(bag) - - messages$.subscribe(observer) - .addDisposableTo(bag) - - messages$.subscribeNext { - switch $0.count { - case 0: - expectation.fulfill() - default: - break - } - }.addDisposableTo(bag) - - scheduler.start() - - waitForExpectationsWithTimeout(0.1, handler: nil) - - XCTAssertEqual(observer.events.count, 1) - XCTAssertEqual(observer.events[0].time, 0) - XCTAssertEqual(observer.events[0].value.element!, [Message]()) - } - - func testRxDeleteItems() { - let expectation = expectationWithDescription("Message1") - let realm = realmInMemory(#function) - let elements = [Message("1"), Message("1")] - let scheduler = TestScheduler(initialClock: 0) - let messages$ = realm.objects(Message).asObservableArray().shareReplay(1) - let rx_delete: AnyObserver<[Message]> = realm.rx_delete() - - try! realm.write { - realm.add(elements) - } - let bag = DisposeBag() - let events = [ - next(0, elements), - completed(0) - ] - let observer = scheduler.createObserver(Array.self) - let observable = scheduler.createHotObservable(events).asObservable() - - observable.subscribe(rx_delete) - .addDisposableTo(bag) - - messages$.subscribe(observer) - .addDisposableTo(bag) - - messages$.subscribeNext { - switch $0.count { - case 0: - expectation.fulfill() - default: - break - } - }.addDisposableTo(bag) - - scheduler.start() - - waitForExpectationsWithTimeout(0.1, handler: nil) - - XCTAssertEqual(observer.events.count, 1) - XCTAssertEqual(observer.events[0].time, 0) - XCTAssertTrue(observer.events[0].value.element!.isEmpty) - } } diff --git a/Example/RxRealm_Tests/RxRealmWriteSinks.swift b/Example/RxRealm_Tests/RxRealmWriteSinks.swift new file mode 100644 index 0000000..88e47e5 --- /dev/null +++ b/Example/RxRealm_Tests/RxRealmWriteSinks.swift @@ -0,0 +1,310 @@ +// +// RxRealmWriteSinks.swift +// RxRealm +// +// Created by Marin Todorov on 6/4/16. +// Copyright © 2016 CocoaPods. All rights reserved. +// + +import XCTest + +import RxSwift +import RxCocoa +import RealmSwift +import RxRealm +import RxTests + +class RxRealmWriteSinks: XCTestCase { + private func realmInMemoryConfiguration(name: String) -> Realm.Configuration { + var conf = Realm.Configuration() + conf.inMemoryIdentifier = name + return conf + } + + private func realmInMemory(name: String) -> Realm { + var conf = Realm.Configuration() + conf.inMemoryIdentifier = name + return try! Realm(configuration: conf) + } + + func testRxAddObject() { + let expectation = expectationWithDescription("Message1") + let realm = realmInMemory(#function) + let bag = DisposeBag() + let events = [ + next(0, Message("1")), + completed(0) + ] + + let rx_add: AnyObserver = realm.rx_add() + let scheduler = TestScheduler(initialClock: 0) + let observer = scheduler.createObserver(Array.self) + let observable = scheduler.createHotObservable(events).asObservable() + let messages$ = realm.objects(Message).asObservableArray().shareReplay(1) + + + messages$.subscribe(observer) + .addDisposableTo(bag) + + messages$.subscribeNext { + switch $0.count { + case 1: + expectation.fulfill() + default: + break + } + }.addDisposableTo(bag) + + observable + .subscribe(rx_add) + .addDisposableTo(bag) + + scheduler.start() + + waitForExpectationsWithTimeout(0.1, handler: nil) + + + XCTAssertEqual(observer.events.count, 1) + XCTAssertEqual(observer.events[0].time, 0) + XCTAssertTrue(observer.events[0].value.element!.equalTo([Message("1")])) + } + + func testRxAddObjects() { + let expectation = expectationWithDescription("Message1") + let realm = realmInMemory(#function) + let bag = DisposeBag() + let events = [ + next(0, [Message("1"), Message("2")]), + completed(0) + ] + + let rx_add: AnyObserver<[Message]> = realm.rx_add() + let scheduler = TestScheduler(initialClock: 0) + let observer = scheduler.createObserver(Array.self) + let observable = scheduler.createHotObservable(events).asObservable() + let messages$ = realm.objects(Message).asObservableArray().shareReplay(1) + + observable.subscribe(rx_add) + .addDisposableTo(bag) + + messages$.subscribe(observer) + .addDisposableTo(bag) + + messages$.subscribeNext { + switch $0.count { + case 2: + expectation.fulfill() + default: + break + } + }.addDisposableTo(bag) + + scheduler.start() + + waitForExpectationsWithTimeout(0.1, handler: nil) + + XCTAssertEqual(observer.events.count, 1) + XCTAssertEqual(observer.events[0].time, 0) + XCTAssertTrue(observer.events[0].value.element!.equalTo([Message("1"), Message("2")])) + } + + func testRxAddUpdateObjects() { + let expectation = expectationWithDescription("Message1") + let realm = realmInMemory(#function) + let bag = DisposeBag() + let events = [ + next(0, [UniqueObject(1), UniqueObject(2)]), + next(1, [UniqueObject(1), UniqueObject(3)]), + completed(2) + ] + + let rx_add: AnyObserver<[UniqueObject]> = realm.rx_add(update: true) + let scheduler = TestScheduler(initialClock: 0) + let observer = scheduler.createObserver(Array.self) + let observable = scheduler.createHotObservable(events).asObservable() + let messages$ = realm.objects(UniqueObject).asObservableArray().shareReplay(1) + + observable.subscribe(rx_add) + .addDisposableTo(bag) + + messages$.subscribe(observer) + .addDisposableTo(bag) + + messages$.subscribeNext { + switch $0.count { + case 3: + expectation.fulfill() + default: + break + } + }.addDisposableTo(bag) + + scheduler.start() + + waitForExpectationsWithTimeout(5, handler: {error in + //check that UniqueObject with id == 1 was overwritten + XCTAssertTrue(observer.events.last!.value.element!.count == 3) + XCTAssertTrue(observer.events.last!.value.element![0] == UniqueObject(1)) + XCTAssertTrue(observer.events.last!.value.element![1] == UniqueObject(2)) + XCTAssertTrue(observer.events.last!.value.element![2] == UniqueObject(3)) + }) + + } + + + func testRxDeleteItem() { + let expectation = expectationWithDescription("Message1") + let realm = realmInMemory(#function) + let element = Message("1") + let scheduler = TestScheduler(initialClock: 0) + let messages$ = realm.objects(Message).asObservableArray().shareReplay(1) + let rx_delete: AnyObserver = Realm.rx_delete() + + try! realm.write { + realm.add(element) + } + let bag = DisposeBag() + let events = [ + next(0, element), + completed(0) + ] + let observer = scheduler.createObserver(Array.self) + let observable = scheduler.createHotObservable(events).asObservable() + + observable.subscribe(rx_delete) + .addDisposableTo(bag) + + messages$.subscribe(observer) + .addDisposableTo(bag) + + messages$.subscribeNext { + switch $0.count { + case 0: + expectation.fulfill() + default: + break + } + }.addDisposableTo(bag) + + scheduler.start() + + waitForExpectationsWithTimeout(0.1, handler: nil) + + XCTAssertEqual(observer.events.count, 1) + XCTAssertEqual(observer.events[0].time, 0) + XCTAssertEqual(observer.events[0].value.element!, [Message]()) + } + + func testRxDeleteItems() { + let expectation = expectationWithDescription("Message1") + let realm = realmInMemory(#function) + let elements = [Message("1"), Message("1")] + let scheduler = TestScheduler(initialClock: 0) + let messages$ = realm.objects(Message).asObservableArray().shareReplay(1) + let rx_delete: AnyObserver<[Message]> = Realm.rx_delete() + + try! realm.write { + realm.add(elements) + } + let bag = DisposeBag() + let events = [ + next(0, elements), + completed(0) + ] + let observer = scheduler.createObserver(Array.self) + let observable = scheduler.createHotObservable(events).asObservable() + + observable.subscribe(rx_delete) + .addDisposableTo(bag) + + messages$.subscribe(observer) + .addDisposableTo(bag) + + messages$.subscribeNext { + switch $0.count { + case 0: + expectation.fulfill() + default: + break + } + }.addDisposableTo(bag) + + scheduler.start() + + waitForExpectationsWithTimeout(0.1, handler: nil) + + XCTAssertEqual(observer.events.count, 1) + XCTAssertEqual(observer.events[0].time, 0) + XCTAssertTrue(observer.events[0].value.element!.isEmpty) + } + + func testRxAddObjectsInBackground() { + let expectation = expectationWithDescription("Message1") + var conf = Realm.Configuration() + conf.deleteRealmIfMigrationNeeded = true + + let realm = try! Realm(configuration: conf) + try! realm.write { + realm.deleteAll() + } + + let bag = DisposeBag() + + let scheduler = TestScheduler(initialClock: 0) + let observer = scheduler.createObserver(Array) + + let messages$ = realm.objects(Message).asObservableArray().shareReplay(1) + + messages$ + .filter {$0.count == 6} + .subscribeNext {_ in expectation.fulfill() } + .addDisposableTo(bag) + + messages$ + .subscribe(observer) + .addDisposableTo(bag) + + // subscribe/write on current thread + [Message("1")].toObservable() + .subscribe( realm.rx_add() ) + .addDisposableTo(bag) + + delayInBackground(0.1, closure: { + // subscribe/write on background thread + let realm = try! Realm(configuration: conf) + [Message("2")].toObservable() + .subscribe(realm.rx_add() ) + .addDisposableTo(bag) + }) + + // subscribe on current/write on main + [Message("3")].toObservable() + .observeOn(MainScheduler.instance) + .subscribe( Realm.rx_add(conf) ) + .addDisposableTo(bag) + + // subscribe on current/write on background + [Message("4")].toObservable() + .observeOn( ConcurrentDispatchQueueScheduler( + queue: dispatch_get_global_queue(QOS_CLASS_BACKGROUND, 0))) + .subscribe( Realm.rx_add(conf) ) + .addDisposableTo(bag) + + // subscribe on current/write on background + [[Message("5"), Message("6")]].toObservable() + .observeOn( ConcurrentDispatchQueueScheduler( + queue: dispatch_get_global_queue(QOS_CLASS_BACKGROUND, 0))) + .subscribe( Realm.rx_add(conf) ) + .addDisposableTo(bag) + + scheduler.start() + + waitForExpectationsWithTimeout(5.0, handler: {error in + let finalResult = observer.events.last!.value.element! + XCTAssertTrue(finalResult.count == 6) + XCTAssertTrue((try! Realm()).objects(Message).sorted("text") + .reduce("", combine: { acc, el in acc + el.text + }) == "123456" /*😈*/) + }) + } +} \ No newline at end of file diff --git a/Example/RxRealm_Tests/TestModels.swift b/Example/RxRealm_Tests/TestModels.swift index ff93cb4..94c4b38 100644 --- a/Example/RxRealm_Tests/TestModels.swift +++ b/Example/RxRealm_Tests/TestModels.swift @@ -52,4 +52,22 @@ class User: Object { func ==(lhs: User, rhs: User) -> Bool { return lhs.name == rhs.name -} \ No newline at end of file +} + +//MARK: UniqueObject +class UniqueObject: Object { + dynamic var id = 0 + + convenience init(_ id: Int) { + self.init() + self.id = id + } + + override class func primaryKey() -> String? { + return "id" + } +} + +func ==(lhs: UniqueObject, rhs: UniqueObject) -> Bool { + return lhs.id == rhs.id +} diff --git a/Pod/Classes/RealmObserver.swift b/Pod/Classes/RealmObserver.swift index 7f8a35c..c07857d 100644 --- a/Pod/Classes/RealmObserver.swift +++ b/Pod/Classes/RealmObserver.swift @@ -11,30 +11,45 @@ import RxSwift import RealmSwift /** - `RealmObserver` doesn't retain target realm object and in case owned realm object is released, element isn't bound. - + `RealmObserver` retains target realm object until it receives a .Completed or .Error event. */ class RealmObserver: ObserverType { - weak var realm: Realm? + var realm: Realm? + var configuration: Realm.Configuration? + let binding: (Realm, E) -> Void init(realm: Realm, binding: (Realm, E) -> Void) { self.realm = realm self.binding = binding } + + init(configuration: Realm.Configuration, binding: (Realm, E) -> Void) { + self.configuration = configuration + self.binding = binding + } + /** Binds next element realm. */ func on(event: Event) { switch event { case .Next(let element): - if let realm = realm { - binding(realm, element) + //this will "cache" the realm on this thread, until completed/errored + if let configuration = configuration where realm == nil { + realm = try! Realm() + } + + guard let realm = realm else { + fatalError("No realm in RealmObserver at time of a .Next event") } + + binding(realm, element) + case .Error(let error): - print("Binding error to Realm: \(error)") + realm = nil case .Completed: - break + realm = nil } } /** @@ -45,4 +60,8 @@ class RealmObserver: ObserverType { func asObserver() -> AnyObserver { return AnyObserver(eventHandler: on) } + + deinit { + realm = nil + } } diff --git a/Pod/Classes/RxRealm.swift b/Pod/Classes/RxRealm.swift index d2d9361..bef9ef8 100644 --- a/Pod/Classes/RxRealm.swift +++ b/Pod/Classes/RxRealm.swift @@ -172,41 +172,122 @@ public extension Realm { } } } +} + +public extension Realm { + /** + Returns bindable sink wich adds object sequence to a Realm + - param: configuration (by default uses `Realm.Configuration.defaultConfiguration`) + to use to get a Realm for the write operations + - param: update - if set to `true` it will override existing objects with matching primary key + - returns: `AnyObserver`, which you can use to subscribe an `Observable` to + */ + public static func rx_add( + configuration: Realm.Configuration = Realm.Configuration.defaultConfiguration, + update: Bool = false) -> AnyObserver { + + return RealmObserver(configuration: configuration) {realm, elements in + try! realm.write { + realm.add(elements, update: update) + } + }.asObserver() + } /** - Returns bindable sink wich adds objects in sequence to Realm + Returns bindable sink wich adds an object to a Realm + - param: configuration (by default uses `Realm.Configuration.defaultConfiguration`) + to use to get a Realm for the write operations + - param: update - if set to `true` it will override existing objects with matching primary key + - returns: `AnyObserver`, which you can use to subscribe an `Observable` to */ - public func rx_add() -> AnyObserver { - return RealmObserver(realm: self, binding: { (realm, element) in + public static func rx_add( + configuration: Realm.Configuration = Realm.Configuration.defaultConfiguration, + update: Bool = false) -> AnyObserver { + + return RealmObserver(configuration: configuration) {realm, element in try! realm.write { - realm.add(element) + realm.add(element, update: update) } - }).asObserver() + }.asObserver() + } + + /** + Returns bindable sink wich adds object sequence to the current Realm + - param: update - if set to `true` it will override existing objects with matching primary key + - returns: `AnyObserver`, which you can use to subscribe an `Observable` to + */ + public func rx_add(update update: Bool = false) -> AnyObserver { + return RealmObserver(realm: self) {realm, element in + try! realm.write { + realm.add(element, update: update) + } + }.asObserver() } /** - Returns bindable sink wich adds object to Realm + Returns bindable sink wich adds an object to Realm + - param: update - if set to `true` it will override existing objects with matching primary key + - returns: `AnyObserver`, which you can use to subscribe an `Observable` to */ - public func rx_add() -> AnyObserver { - return RealmObserver(realm: self, binding: { (realm, element) in + public func rx_add(update update: Bool = false) -> AnyObserver { + return RealmObserver(realm: self) {realm, element in try! realm.write { - realm.add(element) + realm.add(element, update: update) } - }).asObserver() + }.asObserver() } + /** - Returns bindable sink wich deletes objects in sequence from Realm + Returns bindable sink wich deletes objects in sequence from Realm. + - returns: `AnyObserver`, which you can use to subscribe an `Observable` to */ - public func rx_delete() -> AnyObserver { - return RealmObserver(realm: self, binding: { (realm, elements) in + public static func rx_delete() -> AnyObserver { + return AnyObserver {event in + + guard let elements = event.element, + var generator = elements.generate() as S.Generator?, + let first = generator.next(), + let realm = first.realm else { + + return + } + try! realm.write { realm.delete(elements) } - }).asObserver() + } } + /** Returns bindable sink wich deletes object from Realm + - returns: `AnyObserver`, which you can use to subscribe an `Observable` to + */ + public static func rx_delete() -> AnyObserver { + return AnyObserver {event in + + guard let element = event.element, + let realm = element.realm else { + return + } + + try! realm.write { + realm.delete(element) + } + } + } + + /** + Returns bindable sink wich deletes objects in sequence from Realm. + - returns: `AnyObserver`, which you can use to subscribe an `Observable` to */ + public func rx_delete() -> AnyObserver { + return RealmObserver(realm: self, binding: { (realm, elements) in + try! realm.write { + realm.delete(elements) + } + }).asObserver() + } + public func rx_delete() -> AnyObserver { return RealmObserver(realm: self, binding: { (realm, elements) in try! realm.write { diff --git a/RxRealm.podspec b/RxRealm.podspec index 0568c08..d69bf5e 100644 --- a/RxRealm.podspec +++ b/RxRealm.podspec @@ -1,7 +1,7 @@ Pod::Spec.new do |s| s.name = "RxRealm" - s.version = "0.1.5" + s.version = "0.1.7" s.summary = "An Rx wrapper of Realm's collection type" s.description = <<-DESC From ab537916c483a0b85d42e345c3e65bf6d8f00476 Mon Sep 17 00:00:00 2001 From: Marin Todorov Date: Sat, 4 Jun 2016 13:52:23 +0200 Subject: [PATCH 2/8] * update the Readme --- README.md | 65 ++++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 59 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 34a379a..3395c6f 100644 --- a/README.md +++ b/README.md @@ -7,9 +7,11 @@ ## Usage -This library is a very thin wrapper around the reactive collection types that __RealmSwift__ provides. +This library is a thin wrapper around __RealmSwift__. -The extension adds to `Results`, `List`, `LinkingObjects` and `AnyRealmCollection` these methods: +### Observing collections + +RxRealm adds to `Results`, `List`, `LinkingObjects` and `AnyRealmCollection` these methods: #### asObservable() `asObservable()` - emits every time the collection changes: @@ -59,7 +61,60 @@ realm.objects(Lap).asObservableChangeset() `asObservableArrayChangeset()` combines the result of `asObservableArray()` and `asObservableChangeset()` returning an `Observable, RealmChangeset?>`. -#### Example app +### Write transactions + +#### rx_add() + +__write to existing realm reference)__ You can add newly created objects to a realm that you already have initialized: + +```swift +let realm = try! Realm() +[Message("hello"), Message("world")].toObservable() + .subscribe(realm.rx_add()) +``` + +Be careful, this will retain your realm until the `Observable` completes or errors out. + +__write to the default realm)__ You can leave it to RxRealm to grab the default Realm on any thread your subscribe and write objects to it: + +```swift +[Message("hello"), Message("world")].toObservable() + .observeOn( ..you can switch threads if you want ) + .subscribe(Realm.rx_add()) +``` + +__write to a specific realm)__ If you want to switch threads and don't use the default realm, provide a `Realm.Configuration`: + +```swift +var conf = Realm.Configuration() +... custom configuration settings ... + +[Message("hello"), Message("world")].toObservable() + .observeOn( ..you can switch threads if you want ) + .subscribe(Realm.rx_add(conf)) +``` + +#### rx_delete() + +__delete from existing realm reference)__ Delete objects from existing realm reference: + +```swift +let realm = try! Realm() +realm.objects(Messages).asObservable() + .subscribe(realm.rx_delete()) +``` + +Be careful, this will retain your realm until the `Observable` completes or errors out. + +__delete automatically from objects' realm)__ You can leave it to RxRealm to grab the Realm from the first object and use it: + +```swift +someCollectionOfPersistedObjects.toObservable() + .subscribe(Realm.rx_delete()) +``` + + +## Example app To run the example project, clone the repo, and run `pod install` from the Example directory first. The app uses RxSwift, RxCocoa using RealmSwift, RxRealm to observe Results from Realm. @@ -95,12 +150,10 @@ Run `carthage update` to build the framework and drag the built `RxRealm.framewo #### As Source -You can grab the __RxRealm.swift__ file from this repo and include it in your project. +You can grab the files in `Pod/Classes` from this repo and include it in your project. ## TODO -* Carthage -* Add `asObservable()` to the Realm class * Test add platforms and add compatibility for the pod ## License From 39018701a0b95e6162dc3d81f8166f4a4727d301 Mon Sep 17 00:00:00 2001 From: Marin Todorov Date: Sat, 4 Jun 2016 19:52:03 +0200 Subject: [PATCH 3/8] * more explanations for asserts --- Example/RxRealm_Tests/RxRealmWriteSinks.swift | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Example/RxRealm_Tests/RxRealmWriteSinks.swift b/Example/RxRealm_Tests/RxRealmWriteSinks.swift index 88e47e5..dff06eb 100644 --- a/Example/RxRealm_Tests/RxRealmWriteSinks.swift +++ b/Example/RxRealm_Tests/RxRealmWriteSinks.swift @@ -239,7 +239,7 @@ class RxRealmWriteSinks: XCTestCase { } func testRxAddObjectsInBackground() { - let expectation = expectationWithDescription("Message1") + let expectation = expectationWithDescription("All writes successful") var conf = Realm.Configuration() conf.deleteRealmIfMigrationNeeded = true @@ -301,10 +301,10 @@ class RxRealmWriteSinks: XCTestCase { waitForExpectationsWithTimeout(5.0, handler: {error in let finalResult = observer.events.last!.value.element! - XCTAssertTrue(finalResult.count == 6) + XCTAssertTrue(finalResult.count == 6, "The final amount of objects in realm are not correct") XCTAssertTrue((try! Realm()).objects(Message).sorted("text") .reduce("", combine: { acc, el in acc + el.text - }) == "123456" /*😈*/) + }) == "123456" /*😈*/, "The final list of objects is not the one expected") }) } } \ No newline at end of file From 0720eeb8b84fa6aa69186f40f56e61d2f5350e42 Mon Sep 17 00:00:00 2001 From: Marin Todorov Date: Sun, 5 Jun 2016 08:16:41 +0200 Subject: [PATCH 4/8] * fixes a typo --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 3395c6f..5a1bf74 100644 --- a/README.md +++ b/README.md @@ -150,7 +150,7 @@ Run `carthage update` to build the framework and drag the built `RxRealm.framewo #### As Source -You can grab the files in `Pod/Classes` from this repo and include it in your project. +You can grab the files in `Pod/Classes` from this repo and include them in your project. ## TODO From 75c95406547861ec17fd6052681bd2bcc963ae63 Mon Sep 17 00:00:00 2001 From: Marin Todorov Date: Sun, 5 Jun 2016 11:49:52 +0200 Subject: [PATCH 5/8] * remove rococo --- Example/RxRealm_Tests/RxRealmWriteSinks.swift | 1 - 1 file changed, 1 deletion(-) diff --git a/Example/RxRealm_Tests/RxRealmWriteSinks.swift b/Example/RxRealm_Tests/RxRealmWriteSinks.swift index dff06eb..53dd08e 100644 --- a/Example/RxRealm_Tests/RxRealmWriteSinks.swift +++ b/Example/RxRealm_Tests/RxRealmWriteSinks.swift @@ -9,7 +9,6 @@ import XCTest import RxSwift -import RxCocoa import RealmSwift import RxRealm import RxTests From def44795f493b58a55245803a47a4ea53201bfc0 Mon Sep 17 00:00:00 2001 From: Marin Todorov Date: Mon, 13 Jun 2016 10:01:13 +0200 Subject: [PATCH 6/8] * doh! fixed RealmObserver + Configuration --- Example/RxRealm_Tests/RxRealmWriteSinks.swift | 35 +++++++++---------- Pod/Classes/RealmObserver.swift | 11 +++--- .../xcschemes/RxRealm-OSX.xcscheme | 6 ++-- 3 files changed, 25 insertions(+), 27 deletions(-) diff --git a/Example/RxRealm_Tests/RxRealmWriteSinks.swift b/Example/RxRealm_Tests/RxRealmWriteSinks.swift index 53dd08e..90b9c28 100644 --- a/Example/RxRealm_Tests/RxRealmWriteSinks.swift +++ b/Example/RxRealm_Tests/RxRealmWriteSinks.swift @@ -237,31 +237,30 @@ class RxRealmWriteSinks: XCTestCase { XCTAssertTrue(observer.events[0].value.element!.isEmpty) } - func testRxAddObjectsInBackground() { - let expectation = expectationWithDescription("All writes successful") - var conf = Realm.Configuration() - conf.deleteRealmIfMigrationNeeded = true + func testRxAddObjectsInBg() { + let expectation = expectationWithDescription("All writes completed") - let realm = try! Realm(configuration: conf) - try! realm.write { - realm.deleteAll() - } + let realm = realmInMemory(#function) + var conf = Realm.Configuration() + conf.inMemoryIdentifier = #function let bag = DisposeBag() - + let scheduler = TestScheduler(initialClock: 0) - let observer = scheduler.createObserver(Array) + let observer = scheduler.createObserver(Results) - let messages$ = realm.objects(Message).asObservableArray().shareReplay(1) + let messages$ = realm.objects(Message).asObservable().shareReplay(1) messages$ + .subscribe(observer).addDisposableTo(bag) + + messages$ + .doOnNext {e in print("el: \(e)")} .filter {$0.count == 6} .subscribeNext {_ in expectation.fulfill() } .addDisposableTo(bag) - messages$ - .subscribe(observer) - .addDisposableTo(bag) + scheduler.start() // subscribe/write on current thread [Message("1")].toObservable() @@ -282,7 +281,6 @@ class RxRealmWriteSinks: XCTestCase { .subscribe( Realm.rx_add(conf) ) .addDisposableTo(bag) - // subscribe on current/write on background [Message("4")].toObservable() .observeOn( ConcurrentDispatchQueueScheduler( queue: dispatch_get_global_queue(QOS_CLASS_BACKGROUND, 0))) @@ -295,15 +293,14 @@ class RxRealmWriteSinks: XCTestCase { queue: dispatch_get_global_queue(QOS_CLASS_BACKGROUND, 0))) .subscribe( Realm.rx_add(conf) ) .addDisposableTo(bag) - - scheduler.start() waitForExpectationsWithTimeout(5.0, handler: {error in + XCTAssertNil(error) let finalResult = observer.events.last!.value.element! XCTAssertTrue(finalResult.count == 6, "The final amount of objects in realm are not correct") - XCTAssertTrue((try! Realm()).objects(Message).sorted("text") + XCTAssertTrue((try! Realm(configuration: conf)).objects(Message).sorted("text") .reduce("", combine: { acc, el in acc + el.text - }) == "123456" /*😈*/, "The final list of objects is not the one expected") + }) == "123456" /*😈*/, "The final list of objects is not the one expected") }) } } \ No newline at end of file diff --git a/Pod/Classes/RealmObserver.swift b/Pod/Classes/RealmObserver.swift index c07857d..fdd8d15 100644 --- a/Pod/Classes/RealmObserver.swift +++ b/Pod/Classes/RealmObserver.swift @@ -11,7 +11,8 @@ import RxSwift import RealmSwift /** - `RealmObserver` retains target realm object until it receives a .Completed or .Error event. + `RealmObserver` retains target realm object until it receives a .Completed or .Error event + or the observer is being deinitialized */ class RealmObserver: ObserverType { var realm: Realm? @@ -30,14 +31,14 @@ class RealmObserver: ObserverType { } /** - Binds next element realm. + Binds next element */ func on(event: Event) { switch event { case .Next(let element): //this will "cache" the realm on this thread, until completed/errored if let configuration = configuration where realm == nil { - realm = try! Realm() + realm = try! Realm(configuration: configuration) } guard let realm = realm else { @@ -53,9 +54,9 @@ class RealmObserver: ObserverType { } } /** - Erases type of observer. + Erases the type of observer - - returns: type erased observer. + - returns: AnyObserver, type erased observer */ func asObserver() -> AnyObserver { return AnyObserver(eventHandler: on) diff --git a/RxRealm.xcodeproj/xcshareddata/xcschemes/RxRealm-OSX.xcscheme b/RxRealm.xcodeproj/xcshareddata/xcschemes/RxRealm-OSX.xcscheme index dd9f748..8c60d3e 100644 --- a/RxRealm.xcodeproj/xcshareddata/xcschemes/RxRealm-OSX.xcscheme +++ b/RxRealm.xcodeproj/xcshareddata/xcschemes/RxRealm-OSX.xcscheme @@ -15,7 +15,7 @@ @@ -46,7 +46,7 @@ @@ -64,7 +64,7 @@ From fa05ac470b7bdb6a5e705d51210d8868243ca744 Mon Sep 17 00:00:00 2001 From: Marin Todorov Date: Mon, 13 Jun 2016 10:33:01 +0200 Subject: [PATCH 7/8] * added a test for creating a realm on the subscribed bg thread --- Example/RxRealm_Tests/RxRealmWriteSinks.swift | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/Example/RxRealm_Tests/RxRealmWriteSinks.swift b/Example/RxRealm_Tests/RxRealmWriteSinks.swift index 90b9c28..5e57938 100644 --- a/Example/RxRealm_Tests/RxRealmWriteSinks.swift +++ b/Example/RxRealm_Tests/RxRealmWriteSinks.swift @@ -255,8 +255,7 @@ class RxRealmWriteSinks: XCTestCase { .subscribe(observer).addDisposableTo(bag) messages$ - .doOnNext {e in print("el: \(e)")} - .filter {$0.count == 6} + .filter {$0.count == 8} .subscribeNext {_ in expectation.fulfill() } .addDisposableTo(bag) @@ -294,13 +293,26 @@ class RxRealmWriteSinks: XCTestCase { .subscribe( Realm.rx_add(conf) ) .addDisposableTo(bag) + // subscribe on current/write on a realm in background + [[Message("7"), Message("8")]].toObservable() + .observeOn( ConcurrentDispatchQueueScheduler( + queue: dispatch_get_global_queue(QOS_CLASS_BACKGROUND, 0))) + .subscribeNext {messages in + let realm = try! Realm(configuration: conf) + try! realm.write { + realm.add(messages) + } + } + .addDisposableTo(bag) + + waitForExpectationsWithTimeout(5.0, handler: {error in XCTAssertNil(error) let finalResult = observer.events.last!.value.element! - XCTAssertTrue(finalResult.count == 6, "The final amount of objects in realm are not correct") + XCTAssertTrue(finalResult.count == 8, "The final amount of objects in realm are not correct") XCTAssertTrue((try! Realm(configuration: conf)).objects(Message).sorted("text") .reduce("", combine: { acc, el in acc + el.text - }) == "123456" /*😈*/, "The final list of objects is not the one expected") + }) == "12345678" /*😈*/, "The final list of objects is not the one expected") }) } } \ No newline at end of file From 67875a8e8101108027432a0b465d2043166a59bd Mon Sep 17 00:00:00 2001 From: Marin Todorov Date: Mon, 13 Jun 2016 10:40:36 +0200 Subject: [PATCH 8/8] * added 1 example to readme --- README.md | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/README.md b/README.md index 5a1bf74..970f72f 100644 --- a/README.md +++ b/README.md @@ -94,6 +94,19 @@ var conf = Realm.Configuration() .subscribe(Realm.rx_add(conf)) ``` +If you want to create yourself the Realm on a different thread than the subscription you can do that too (allows you to error handle): + +```swift +[Message("hello"), Message("world")].toObservable() + .observeOn( ..you can switch threads if you want ) + .subscribeNext {messages in + let realm = try! Realm() + try! realm.write { + realm.add(messages) + } + } +``` + #### rx_delete() __delete from existing realm reference)__ Delete objects from existing realm reference: