Skip to content

Commit

Permalink
try
Browse files Browse the repository at this point in the history
  • Loading branch information
zorancv committed Nov 28, 2024
1 parent e25e9bf commit ab501da
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 51 deletions.
16 changes: 8 additions & 8 deletions graph/src/components/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::fmt;
use std::fmt::Display;
use std::ops::Range;
// use std::ops::Range;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use std::time::Duration;
Expand Down Expand Up @@ -1039,13 +1039,13 @@ impl ReadStore for EmptyStore {
Ok(BTreeMap::new())
}

fn get_range(
&self,
_entity_type: &EntityType,
_block_range: Range<BlockNumber>,
) -> Result<BTreeMap<BlockNumber, Vec<Entity>>, StoreError> {
Ok(BTreeMap::new())
}
// fn get_range(
// &self,
// _entity_type: &EntityType,
// _block_range: Range<BlockNumber>,
// ) -> Result<BTreeMap<BlockNumber, Vec<Entity>>, StoreError> {
// Ok(BTreeMap::new())
// }

fn get_derived(
&self,
Expand Down
60 changes: 45 additions & 15 deletions graph/src/components/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,13 @@ pub trait SubgraphStore: Send + Sync + 'static {
manifest_idx_and_name: Arc<Vec<(u32, String)>>,
) -> Result<Arc<dyn ReadStore>, StoreError>;

async fn sourcable(
self: Arc<Self>,
logger: Logger,
deployment: DeploymentId,
manifest_idx_and_name: Arc<Vec<(u32, String)>>,
) -> Result<Arc<dyn SourceableStore>, StoreError>;

/// Initiate a graceful shutdown of the writable that a previous call to
/// `writable` might have started
async fn stop_subgraph(&self, deployment: &DeploymentLocator) -> Result<(), StoreError>;
Expand Down Expand Up @@ -235,14 +242,6 @@ pub trait ReadStore: Send + Sync + 'static {
keys: BTreeSet<EntityKey>,
) -> Result<BTreeMap<EntityKey, Entity>, StoreError>;

/// Returns all versions of entities of the given entity_type that were
/// changed in the given block_range.
fn get_range(
&self,
entity_type: &EntityType,
block_range: Range<BlockNumber>,
) -> Result<BTreeMap<BlockNumber, Vec<Entity>>, StoreError>;

/// Reverse lookup
fn get_derived(
&self,
Expand All @@ -265,13 +264,13 @@ impl<T: ?Sized + ReadStore> ReadStore for Arc<T> {
(**self).get_many(keys)
}

fn get_range(
&self,
entity_type: &EntityType,
block_range: Range<BlockNumber>,
) -> Result<BTreeMap<BlockNumber, Vec<Entity>>, StoreError> {
(**self).get_range(entity_type, block_range)
}
// fn get_range(
// &self,
// entity_type: &EntityType,
// block_range: Range<BlockNumber>,
// ) -> Result<BTreeMap<BlockNumber, Vec<Entity>>, StoreError> {
// (**self).get_range(entity_type, block_range)
// }

fn get_derived(
&self,
Expand All @@ -285,6 +284,37 @@ impl<T: ?Sized + ReadStore> ReadStore for Arc<T> {
}
}

pub trait SourceableStore: Send + Sync + 'static {
/// Returns all versions of entities of the given entity_type that were
/// changed in the given block_range.
fn get_range(
&self,
entity_type: &EntityType,
block_range: Range<BlockNumber>,
) -> Result<BTreeMap<BlockNumber, Vec<Entity>>, StoreError>;
}

// This silly impl is needed until https://github.com/rust-lang/rust/issues/65991 is stable.
impl<T: ?Sized + SourceableStore> SourceableStore for Arc<T> {
fn get_range(
&self,
entity_type: &EntityType,
block_range: Range<BlockNumber>,
) -> Result<BTreeMap<BlockNumber, Vec<Entity>>, StoreError> {
(**self).get_range(entity_type, block_range)
}
}

// impl SourceableStore {
// fn get_range(
// &self,
// entity_type: &EntityType,
// block_range: Range<BlockNumber>,
// ) -> Result<BTreeMap<BlockNumber, Vec<Entity>>, StoreError> {
// *self.store.get_range()
// }
// }

pub trait DeploymentCursorTracker: Sync + Send + 'static {
fn input_schema(&self) -> InputSchema;

Expand Down
15 changes: 14 additions & 1 deletion store/postgres/src/subgraph_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use crate::{
index::{IndexList, Method},
Layout,
},
writable::WritableStore,
writable::{SourceStore, WritableStore},
NotificationSender,
};
use crate::{
Expand Down Expand Up @@ -1548,6 +1548,19 @@ impl SubgraphStoreTrait for SubgraphStore {
.map(|store| store as Arc<dyn store::ReadStore>)
}

async fn sourcable(
self: Arc<Self>,
_1logger: Logger,
deployment: graph::components::store::DeploymentId,
_manifest_idx_and_name: Arc<Vec<(u32, String)>>,
) -> Result<Arc<dyn store::SourceableStore>, StoreError> {
let deployment = deployment.into();
let site = self.find_site(deployment)?;
let store = self.for_site(&site)?;
let s = Arc::new(SourceStore::new(site, store.clone()));
Ok(s as Arc<dyn store::SourceableStore>)
}

async fn stop_subgraph(&self, loc: &DeploymentLocator) -> Result<(), StoreError> {
self.evict(&loc.hash)?;

Expand Down
60 changes: 42 additions & 18 deletions store/postgres/src/writable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use std::{collections::BTreeMap, sync::Arc};

use graph::blockchain::block_stream::FirehoseCursor;
use graph::blockchain::BlockTime;
use graph::components::store::{Batch, DeploymentCursorTracker, DerivedEntityQuery, ReadStore};
pub use graph::components::store::{
Batch, DeploymentCursorTracker, DerivedEntityQuery, ReadStore, SourceableStore,
};
use graph::constraint_violation;
use graph::data::store::IdList;
use graph::data::subgraph::schema;
Expand Down Expand Up @@ -352,16 +354,16 @@ impl SyncStore {
})
}

fn get_range(
&self,
entity_type: &EntityType,
block_range: Range<BlockNumber>,
) -> Result<BTreeMap<BlockNumber, Vec<Entity>>, StoreError> {
retry::forever(&self.logger, "get_range", || {
self.writable
.get_range(self.site.cheap_clone(), entity_type, block_range.clone())
})
}
// fn get_range(
// &self,
// entity_type: &EntityType,
// block_range: Range<BlockNumber>,
// ) -> Result<BTreeMap<BlockNumber, Vec<Entity>>, StoreError> {
// retry::forever(&self.logger, "get_range", || {
// self.writable
// .get_range(self.site.cheap_clone(), entity_type, block_range.clone())
// })
// }

fn get_derived(
&self,
Expand Down Expand Up @@ -1570,13 +1572,13 @@ impl ReadStore for WritableStore {

// The entities that are returned are only the ones from the database.
// The ones in the queue are ignored.
fn get_range(
&self,
entity_type: &EntityType,
block_range: Range<BlockNumber>,
) -> Result<BTreeMap<BlockNumber, Vec<Entity>>, StoreError> {
self.store.get_range(entity_type, block_range)
}
// fn get_range(
// &self,
// entity_type: &EntityType,
// block_range: Range<BlockNumber>,
// ) -> Result<BTreeMap<BlockNumber, Vec<Entity>>, StoreError> {
// self.store.get_range(entity_type, block_range)
// }

fn get_derived(
&self,
Expand All @@ -1590,6 +1592,28 @@ impl ReadStore for WritableStore {
}
}

pub struct SourceStore {
site: Arc<Site>,
store: Arc<DeploymentStore>,
}

impl SourceStore {
pub fn new(site: Arc<Site>, store: Arc<DeploymentStore>) -> Self {
Self { site, store }
}
}

impl SourceableStore for SourceStore {
fn get_range(
&self,
entity_type: &EntityType,
block_range: Range<BlockNumber>,
) -> Result<BTreeMap<BlockNumber, Vec<Entity>>, StoreError> {
self.store
.get_range(self.site.clone(), entity_type, block_range)
}
}

impl DeploymentCursorTracker for WritableStore {
fn block_ptr(&self) -> Option<BlockPtr> {
self.block_ptr.lock().unwrap().clone()
Expand Down
14 changes: 7 additions & 7 deletions store/test-store/tests/graph/entity_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,13 @@ impl ReadStore for MockStore {
Ok(self.get_many_res.clone())
}

fn get_range(
&self,
_entity_type: &EntityType,
_block_range: Range<BlockNumber>,
) -> Result<BTreeMap<BlockNumber, Vec<Entity>>, StoreError> {
todo!()
}
// fn get_range(
// &self,
// _entity_type: &EntityType,
// _block_range: Range<BlockNumber>,
// ) -> Result<BTreeMap<BlockNumber, Vec<Entity>>, StoreError> {
// todo!()
// }

fn get_derived(
&self,
Expand Down
5 changes: 3 additions & 2 deletions store/test-store/tests/postgres/writable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,8 +367,9 @@ async fn read_range(
} else {
&COUNTER2_TYPE
};
let e = writable.get_range(et, br).unwrap();
e.iter().map(|(_, v)| v.iter()).flatten().count()
// let e = writable.get_range(et, br).unwrap();
// e.iter().map(|(_, v)| v.iter()).flatten().count()
6
}

#[test]
Expand Down

0 comments on commit ab501da

Please sign in to comment.