Skip to content

Commit

Permalink
Polish for review
Browse files Browse the repository at this point in the history
  • Loading branch information
tjgreen42 committed Dec 12, 2024
1 parent 09b355b commit 4c3a0be
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 28 deletions.
23 changes: 20 additions & 3 deletions pgvectorscale/src/access_method/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,14 @@ impl<'a, 'b> BuildState<'a, 'b> {
}
}

/// Maximum number of dimensions supported by pgvector's vector type. Also
/// the maximum number of dimensions that can be indexed with diskann.
pub const MAX_DIMENSION: u32 = 16000;

/// Maximum number of dimensions that can be indexed with diskann without
/// using the SBQ storage type.
pub const MAX_DIMENSION_NO_SBQ: u32 = 2000;

#[pg_guard]
pub extern "C" fn ambuild(
heaprel: pg_sys::Relation,
Expand All @@ -75,7 +81,7 @@ pub extern "C" fn ambuild(
let opt = TSVIndexOptions::from_relation(&index_relation);

notice!(
"Starting index build with num_neighbors={}, search_list_size={}, max_alpha={}, storage_layout={:?}",
"Starting index build with num_neighbors={}, search_list_size={}, max_alpha={}, storage_layout={:?}.",
opt.get_num_neighbors(),
opt.search_list_size,
opt.max_alpha,
Expand All @@ -87,7 +93,7 @@ pub extern "C" fn ambuild(
let distance_type = unsafe {
let fmgr_info = index_getprocinfo(indexrel, 1, DISKANN_DISTANCE_TYPE_PROC);
if fmgr_info.is_null() {
error!("Internal error: no distance type function found for index");
error!("No distance type function found for index");
}
let result = FunctionCall0Coll(fmgr_info, InvalidOid).value() as u16;
DistanceType::from_u16(result)
Expand All @@ -102,10 +108,21 @@ pub extern "C" fn ambuild(

if meta_page.get_num_dimensions_to_index() == 0 {
error!("No dimensions to index");
} else if meta_page.get_num_dimensions_to_index() > MAX_DIMENSION {
}

if meta_page.get_num_dimensions_to_index() > MAX_DIMENSION {
error!("Too many dimensions to index (max is {})", MAX_DIMENSION);
}

if meta_page.get_num_dimensions_to_index() > MAX_DIMENSION_NO_SBQ
&& meta_page.get_storage_type() == StorageType::Plain
{
error!(
"Too many dimensions to index without plain storage (max is {}). Use storag_layout=memory_optimized instead.",
MAX_DIMENSION_NO_SBQ
);
}

let ntuples = do_heap_scan(index_info, &heap_relation, &index_relation, meta_page);

let mut result = unsafe { PgBox::<pg_sys::IndexBuildResult>::alloc0() };
Expand Down
3 changes: 0 additions & 3 deletions pgvectorscale/src/access_method/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ pub const NUM_DIMENSIONS_DEFAULT_SENTINEL: u32 = 0;
pub const SBQ_NUM_BITS_PER_DIMENSION_DEFAULT_SENTINEL: u32 = 0;
const DEFAULT_MAX_ALPHA: f64 = 1.2;

pub const MAX_DIMENSIONS_WITHOUT_SBQ: u32 = 2000;
pub const MAX_DIMENSIONS: u32 = 64000;

impl TSVIndexOptions {
//note: this should only be used when building a new index. The options aren't really versioned.
//therefore, we should move all the options to the meta page when building the index (meta pages are properly versioned).
Expand Down
53 changes: 32 additions & 21 deletions pgvectorscale/src/util/chain.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
//! This module defines the `ChainTape` data structure, which is used to store large data items that
//! are too big to fit in a single page. See `Tape` for a simpler version that assumes each data
//! item fits in a single page.
//!
//! All page entries begin with a header that contains a block number of the next page in the chain,
//! if applicable. If there is a next page, the entry continues (at position 1) on that page.
//!
//! The implementation supports an append-only sequence of writes via `ChainTapeWriter` and reads
//! via `ChainTapeReader`. The writer returns an `ItemPointer` that can be used to read the data
//! back. Reads are done via an iterator that returns `ReadableBuffer` objects for the segments
//! of the data.
use anyhow::Result;

use pgrx::{
Expand All @@ -19,7 +31,7 @@ struct ChainItemHeader {
next: BlockNumber,
}

const ARCHIVED_CHAIN_HEADER_SIZE: usize = std::mem::size_of::<ArchivedChainItemHeader>();
const CHAIN_HEADER_SIZE: usize = std::mem::size_of::<ArchivedChainItemHeader>();

// Empirically-measured slop factor for how much `pg_sys::PageGetFreeSpace` can
// overestimate the free space in a page in our usage patterns.
Expand Down Expand Up @@ -52,7 +64,7 @@ impl<'a, S: StatsNodeWrite> ChainTapeWriter<'a, S> {
let mut current_page = WritablePage::modify(self.index, self.current);

// If there isn't enough space for the header plus some data, start a new page.
if current_page.get_free_space() + PG_SLOP_SIZE < ARCHIVED_CHAIN_HEADER_SIZE + 1 {
if current_page.get_free_space() + PG_SLOP_SIZE < CHAIN_HEADER_SIZE + 1 {
current_page = WritablePage::new(self.index, self.page_type);
self.current = current_page.get_block_number();
}
Expand All @@ -61,15 +73,13 @@ impl<'a, S: StatsNodeWrite> ChainTapeWriter<'a, S> {
let mut result: Option<super::ItemPointer> = None;

// Write the data in chunks, creating new pages as needed.
while ARCHIVED_CHAIN_HEADER_SIZE + data.len() + PG_SLOP_SIZE > current_page.get_free_space()
{
while CHAIN_HEADER_SIZE + data.len() + PG_SLOP_SIZE > current_page.get_free_space() {
let next_page = WritablePage::new(self.index, self.page_type);
let header = ChainItemHeader {
next: next_page.get_block_number(),
};
let header_bytes = rkyv::to_bytes::<_, 256>(&header).unwrap();
let data_size =
current_page.get_free_space() - PG_SLOP_SIZE - ARCHIVED_CHAIN_HEADER_SIZE;
let data_size = current_page.get_free_space() - PG_SLOP_SIZE - CHAIN_HEADER_SIZE;
let chunk = &data[..data_size];
let combined = [header_bytes.as_slice(), chunk].concat();
let offset_number = current_page.add_item(combined.as_ref());
Expand Down Expand Up @@ -146,11 +156,11 @@ impl<'a, S: StatsNodeRead> Iterator for ChainedItemIterator<'a, S> {
assert!(page.get_type() == self.page_type);
let mut item = page.get_item_unchecked(self.ip.offset);
let slice = item.get_data_slice();
let header_slice = &slice[..ARCHIVED_CHAIN_HEADER_SIZE];
let header_slice = &slice[..CHAIN_HEADER_SIZE];
let header =
rkyv::from_bytes::<ChainItemHeader>(header_slice).expect("failed to read header");
self.ip = ItemPointer::new(header.next, 1);
item.advance(ARCHIVED_CHAIN_HEADER_SIZE);
item.advance(CHAIN_HEADER_SIZE);

Some(item)
}
Expand Down Expand Up @@ -193,18 +203,22 @@ mod tests {

let index = make_test_relation();
{
// ChainTape can be used for small items too
let mut tape = ChainTapeWriter::new(&index, PageType::SbqMeans, &mut wstats);
let data = b"hello world";
let ip = tape.write(data).unwrap();
let mut reader = ChainTapeReader::new(&index, PageType::SbqMeans, &mut rstats);

let mut iter = reader.read(ip);
let item = iter.next().unwrap();
assert_eq!(item.get_data_slice(), data);
assert!(iter.next().is_none());
for _ in 0..100 {
let data = b"hello world";
let ip = tape.write(data).unwrap();
let mut reader = ChainTapeReader::new(&index, PageType::SbqMeans, &mut rstats);

let mut iter = reader.read(ip);
let item = iter.next().unwrap();
assert_eq!(item.get_data_slice(), data);
assert!(iter.next().is_none());
}
}

for data_size in BLCKSZ - 100..BLCKSZ + 100 {
// Exhaustively test around the neighborhood of a page size
let mut bigdata = vec![0u8; data_size as usize];
for i in 0..bigdata.len() {
bigdata[i] = (i % 256) as u8;
Expand All @@ -224,7 +238,7 @@ mod tests {
}

for data_size in (2 * BLCKSZ - 100)..(2 * BLCKSZ + 100) {
// create data buffer of length > 1 page
// Exhaustively test around the neighborhood of a 2-page size
let mut bigdata = vec![0u8; data_size as usize];
for i in 0..bigdata.len() {
bigdata[i] = (i % 256) as u8;
Expand All @@ -244,7 +258,7 @@ mod tests {
}

for data_size in (3 * BLCKSZ - 100)..(3 * BLCKSZ + 100) {
// create data buffer of length > 1 page
// Exhaustively test around the neighborhood of a 3-page size
let mut bigdata = vec![0u8; data_size as usize];
for i in 0..bigdata.len() {
bigdata[i] = (i % 256) as u8;
Expand All @@ -262,8 +276,5 @@ mod tests {
assert_eq!(count, bigdata.len());
}
}

// assert_eq!(wstats.node_writes, 2);
// assert_eq!(rstats.node_reads, 3);
}
}
2 changes: 2 additions & 0 deletions pgvectorscale/src/util/page.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ impl PageType {
}

/// `ChainTape` supports chaining of pages that might contain large data.
/// This is not supported for all page types. Note that `Tape` requires
/// that the page type not be chained.
pub fn is_chained(self) -> bool {
matches!(self, PageType::SbqMeans)
}
Expand Down
2 changes: 1 addition & 1 deletion pgvectorscale/src/util/tape.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! `Tape`` provides a simple infinite-tape read/write abstraction over postgres pages.
//! Tape provides a simple infinite-tape-writing abstraction over postgres pages.
use super::page::{PageType, ReadablePage, WritablePage};
use pgrx::{
Expand Down

0 comments on commit 4c3a0be

Please sign in to comment.