diff --git a/graph/src/components/store/mod.rs b/graph/src/components/store/mod.rs index d0414abf096..76a1677540e 100644 --- a/graph/src/components/store/mod.rs +++ b/graph/src/components/store/mod.rs @@ -21,7 +21,7 @@ use std::collections::btree_map::Entry; use std::collections::{BTreeMap, BTreeSet, HashSet}; use std::fmt; use std::fmt::Display; -use std::ops::Range; +// use std::ops::Range; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; use std::time::Duration; @@ -1039,13 +1039,13 @@ impl ReadStore for EmptyStore { Ok(BTreeMap::new()) } - fn get_range( - &self, - _entity_type: &EntityType, - _block_range: Range, - ) -> Result>, StoreError> { - Ok(BTreeMap::new()) - } + // fn get_range( + // &self, + // _entity_type: &EntityType, + // _block_range: Range, + // ) -> Result>, StoreError> { + // Ok(BTreeMap::new()) + // } fn get_derived( &self, diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index 633af93334e..61d386672ad 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -193,6 +193,13 @@ pub trait SubgraphStore: Send + Sync + 'static { manifest_idx_and_name: Arc>, ) -> Result, StoreError>; + async fn sourcable( + self: Arc, + logger: Logger, + deployment: DeploymentId, + manifest_idx_and_name: Arc>, + ) -> Result, StoreError>; + /// Initiate a graceful shutdown of the writable that a previous call to /// `writable` might have started async fn stop_subgraph(&self, deployment: &DeploymentLocator) -> Result<(), StoreError>; @@ -235,14 +242,6 @@ pub trait ReadStore: Send + Sync + 'static { keys: BTreeSet, ) -> Result, StoreError>; - /// Returns all versions of entities of the given entity_type that were - /// changed in the given block_range. - fn get_range( - &self, - entity_type: &EntityType, - block_range: Range, - ) -> Result>, StoreError>; - /// Reverse lookup fn get_derived( &self, @@ -265,13 +264,13 @@ impl ReadStore for Arc { (**self).get_many(keys) } - fn get_range( - &self, - entity_type: &EntityType, - block_range: Range, - ) -> Result>, StoreError> { - (**self).get_range(entity_type, block_range) - } + // fn get_range( + // &self, + // entity_type: &EntityType, + // block_range: Range, + // ) -> Result>, StoreError> { + // (**self).get_range(entity_type, block_range) + // } fn get_derived( &self, @@ -285,6 +284,37 @@ impl ReadStore for Arc { } } +pub trait SourceableStore: Send + Sync + 'static { + /// Returns all versions of entities of the given entity_type that were + /// changed in the given block_range. + fn get_range( + &self, + entity_type: &EntityType, + block_range: Range, + ) -> Result>, StoreError>; +} + +// This silly impl is needed until https://github.com/rust-lang/rust/issues/65991 is stable. +impl SourceableStore for Arc { + fn get_range( + &self, + entity_type: &EntityType, + block_range: Range, + ) -> Result>, StoreError> { + (**self).get_range(entity_type, block_range) + } +} + +// impl SourceableStore { +// fn get_range( +// &self, +// entity_type: &EntityType, +// block_range: Range, +// ) -> Result>, StoreError> { +// *self.store.get_range() +// } +// } + pub trait DeploymentCursorTracker: Sync + Send + 'static { fn input_schema(&self) -> InputSchema; diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index bf5f3d38a20..943d24f4c99 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -44,7 +44,7 @@ use crate::{ index::{IndexList, Method}, Layout, }, - writable::WritableStore, + writable::{SourceStore, WritableStore}, NotificationSender, }; use crate::{ @@ -1548,6 +1548,19 @@ impl SubgraphStoreTrait for SubgraphStore { .map(|store| store as Arc) } + async fn sourcable( + self: Arc, + _1logger: Logger, + deployment: graph::components::store::DeploymentId, + _manifest_idx_and_name: Arc>, + ) -> Result, StoreError> { + let deployment = deployment.into(); + let site = self.find_site(deployment)?; + let store = self.for_site(&site)?; + let s = Arc::new(SourceStore::new(site, store.clone())); + Ok(s as Arc) + } + async fn stop_subgraph(&self, loc: &DeploymentLocator) -> Result<(), StoreError> { self.evict(&loc.hash)?; diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index 1869c2cbca7..647e03f4056 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -7,7 +7,9 @@ use std::{collections::BTreeMap, sync::Arc}; use graph::blockchain::block_stream::FirehoseCursor; use graph::blockchain::BlockTime; -use graph::components::store::{Batch, DeploymentCursorTracker, DerivedEntityQuery, ReadStore}; +pub use graph::components::store::{ + Batch, DeploymentCursorTracker, DerivedEntityQuery, ReadStore, SourceableStore, +}; use graph::constraint_violation; use graph::data::store::IdList; use graph::data::subgraph::schema; @@ -352,16 +354,16 @@ impl SyncStore { }) } - fn get_range( - &self, - entity_type: &EntityType, - block_range: Range, - ) -> Result>, StoreError> { - retry::forever(&self.logger, "get_range", || { - self.writable - .get_range(self.site.cheap_clone(), entity_type, block_range.clone()) - }) - } + // fn get_range( + // &self, + // entity_type: &EntityType, + // block_range: Range, + // ) -> Result>, StoreError> { + // retry::forever(&self.logger, "get_range", || { + // self.writable + // .get_range(self.site.cheap_clone(), entity_type, block_range.clone()) + // }) + // } fn get_derived( &self, @@ -1570,13 +1572,13 @@ impl ReadStore for WritableStore { // The entities that are returned are only the ones from the database. // The ones in the queue are ignored. - fn get_range( - &self, - entity_type: &EntityType, - block_range: Range, - ) -> Result>, StoreError> { - self.store.get_range(entity_type, block_range) - } + // fn get_range( + // &self, + // entity_type: &EntityType, + // block_range: Range, + // ) -> Result>, StoreError> { + // self.store.get_range(entity_type, block_range) + // } fn get_derived( &self, @@ -1590,6 +1592,28 @@ impl ReadStore for WritableStore { } } +pub struct SourceStore { + site: Arc, + store: Arc, +} + +impl SourceStore { + pub fn new(site: Arc, store: Arc) -> Self { + Self { site, store } + } +} + +impl SourceableStore for SourceStore { + fn get_range( + &self, + entity_type: &EntityType, + block_range: Range, + ) -> Result>, StoreError> { + self.store + .get_range(self.site.clone(), entity_type, block_range) + } +} + impl DeploymentCursorTracker for WritableStore { fn block_ptr(&self) -> Option { self.block_ptr.lock().unwrap().clone() diff --git a/store/test-store/tests/graph/entity_cache.rs b/store/test-store/tests/graph/entity_cache.rs index 314f82d1072..0d131ab25d4 100644 --- a/store/test-store/tests/graph/entity_cache.rs +++ b/store/test-store/tests/graph/entity_cache.rs @@ -67,13 +67,13 @@ impl ReadStore for MockStore { Ok(self.get_many_res.clone()) } - fn get_range( - &self, - _entity_type: &EntityType, - _block_range: Range, - ) -> Result>, StoreError> { - todo!() - } + // fn get_range( + // &self, + // _entity_type: &EntityType, + // _block_range: Range, + // ) -> Result>, StoreError> { + // todo!() + // } fn get_derived( &self, diff --git a/store/test-store/tests/postgres/writable.rs b/store/test-store/tests/postgres/writable.rs index b5f9842fe81..e76a9a2e5db 100644 --- a/store/test-store/tests/postgres/writable.rs +++ b/store/test-store/tests/postgres/writable.rs @@ -367,8 +367,9 @@ async fn read_range( } else { &COUNTER2_TYPE }; - let e = writable.get_range(et, br).unwrap(); - e.iter().map(|(_, v)| v.iter()).flatten().count() + // let e = writable.get_range(et, br).unwrap(); + // e.iter().map(|(_, v)| v.iter()).flatten().count() + 6 } #[test]