Skip to content

Commit

Permalink
Use the fetch queues for any horizon-related request (#1125)
Browse files Browse the repository at this point in the history
  • Loading branch information
andywer authored Aug 3, 2020
1 parent ba01ebe commit b4abb47
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 31 deletions.
31 changes: 31 additions & 0 deletions src/Generic/lib/observables.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { Observable, Subscription, SubscriptionObserver, unsubscribe } from "observable-fns"

type AsyncObservableInitializer<T> = (
observer: SubscriptionObserver<T>
) => Promise<Subscription<T> | (() => void) | void>

export function observableFromAsyncFactory<T>(init: AsyncObservableInitializer<T>): Observable<T> {
return new Observable<T>(observer => {
let downstreamUnsubscribe: Subscription<T> | (() => void) | void
let receivedUnsubscribe = false

init(observer).then(
returned => {
downstreamUnsubscribe = returned

if (receivedUnsubscribe) {
unsubscribe(downstreamUnsubscribe)
}
},
error => {
observer.error(error)
}
)

const upstreamUnsubscribe = () => {
receivedUnsubscribe = true
unsubscribe(downstreamUnsubscribe)
}
return upstreamUnsubscribe
})
}
72 changes: 41 additions & 31 deletions src/Workers/net-worker/stellar-network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@ import qs from "qs"
import { Asset, Horizon, Networks, ServerApi, Transaction } from "stellar-sdk"
import pkg from "../../../package.json"
import { Cancellation, CustomError } from "~Generic/lib/errors"
import { observableFromAsyncFactory } from "~Generic/lib/observables"
import { parseAssetID } from "~Generic/lib/stellar"
import { max } from "~Generic/lib/strings"
import { createReconnectingSSE } from "../lib/event-source"
import { parseJSONResponse } from "../lib/rest"
import { resetSubscriptions, subscribeToUpdatesAndPoll } from "../lib/subscription"
import { ServiceID } from "./errors"
import {
accountDataUpdates,
offerUpdates,
Expand All @@ -19,9 +23,6 @@ import {
OptimisticAccountUpdate,
OptimisticOfferUpdate
} from "./optimistic-updates/index"
import { parseJSONResponse } from "../lib/rest"
import { resetSubscriptions, subscribeToUpdatesAndPoll } from "../lib/subscription"
import { ServiceID } from "./errors"

export interface CollectionPage<T> {
_embedded: {
Expand Down Expand Up @@ -137,10 +138,13 @@ export function resetAllSubscriptions() {
}

export async function submitTransaction(horizonURL: string, txEnvelopeXdr: string, network: Networks) {
const fetchQueue = getFetchQueue(horizonURL)
const url = new URL(`/transactions`, horizonURL)

const response = await fetch(String(url) + "?" + qs.stringify({ tx: txEnvelopeXdr }), {
method: "POST"
const response = await fetchQueue.add(() => {
return fetch(String(url) + "?" + qs.stringify({ tx: txEnvelopeXdr }), {
method: "POST"
})
})

if (response.status === 200) {
Expand Down Expand Up @@ -206,6 +210,7 @@ async function waitForAccountData(horizonURL: string, accountID: string, shouldC
}

function subscribeToAccountEffectsUncached(horizonURL: string, accountID: string) {
const fetchQueue = getFetchQueue(horizonURL)
const serviceID = getServiceID(horizonURL)

let latestCursor: string | undefined
Expand Down Expand Up @@ -254,23 +259,25 @@ function subscribeToAccountEffectsUncached(horizonURL: string, accountID: string
}

return multicast(
new Observable<ServerApi.EffectRecord>(observer => {
return createReconnectingSSE(createURL, {
onMessage(message) {
const effect: ServerApi.EffectRecord = JSON.parse(message.data)

// Don't update latestCursor cursor here – if we do it too early it might cause
// shouldApplyUpdate() to return false, since it compares the new effect with itself
observer.next(effect)

if (effect.type === "account_removed" && effect.account === accountID) {
observer.complete()
observableFromAsyncFactory<ServerApi.EffectRecord>(async observer => {
return fetchQueue.add(() =>
createReconnectingSSE(createURL, {
onMessage(message) {
const effect: ServerApi.EffectRecord = JSON.parse(message.data)

// Don't update latestCursor cursor here – if we do it too early it might cause
// shouldApplyUpdate() to return false, since it compares the new effect with itself
observer.next(effect)

if (effect.type === "account_removed" && effect.account === accountID) {
observer.complete()
}
},
onUnexpectedError(error) {
observer.error(error)
}
},
onUnexpectedError(error) {
observer.error(error)
}
})
})
)
})
)
}
Expand Down Expand Up @@ -544,6 +551,7 @@ function subscribeToOrderbookUncached(horizonURL: string, sellingAsset: string,
const fetchUpdate = () => fetchOrderbookRecord(horizonURL, sellingAsset, buyingAsset)

let latestKnownSnapshot = ""
const fetchQueue = getFetchQueue(horizonURL)
const serviceID = getServiceID(horizonURL)

// TODO: Optimize - Make UpdateT = ValueT & { [$snapshot]: string }
Expand All @@ -565,16 +573,18 @@ function subscribeToOrderbookUncached(horizonURL: string, sellingAsset: string,
return snapshot !== latestKnownSnapshot
},
subscribeToUpdates() {
return new Observable<ServerApi.OrderbookRecord>(observer => {
return createReconnectingSSE(createURL, {
onMessage(message) {
const record: ServerApi.OrderbookRecord = JSON.parse(message.data)
observer.next(record)
},
onUnexpectedError(error) {
observer.error(error)
}
})
return observableFromAsyncFactory<ServerApi.OrderbookRecord>(observer => {
return fetchQueue.add(() =>
createReconnectingSSE(createURL, {
onMessage(message) {
const record: ServerApi.OrderbookRecord = JSON.parse(message.data)
observer.next(record)
},
onUnexpectedError(error) {
observer.error(error)
}
})
)
})
}
},
Expand Down

0 comments on commit b4abb47

Please sign in to comment.