Skip to content

Commit

Permalink
fix:level db testing
Browse files Browse the repository at this point in the history
Signed-off-by: Francisco Javier Ribó Labrador <elribonazo@gmail.com>
  • Loading branch information
elribonazo committed Jun 15, 2024
1 parent fbcb786 commit 967dc2c
Show file tree
Hide file tree
Showing 12 changed files with 198 additions and 122 deletions.
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
},
{
"command": "npm run test",
"cwd": "${workspaceFolder}/packages/indexdb",
"cwd": "${workspaceFolder}/packages/leveldb",
"name": "TEST",
"request": "launch",
"type": "node-terminal",
Expand Down
10 changes: 10 additions & 0 deletions packages/indexdb/tests/setup.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
import "fake-indexeddb/auto";
import { TextEncoder, TextDecoder } from "util";

import { addRxPlugin } from "rxdb";
import { RxDBDevModePlugin } from "rxdb/plugins/dev-mode";
import nodeCrypto from "crypto";

if (process.env.NODE_ENV === "debug") {
addRxPlugin(RxDBDevModePlugin);
}

Object.defineProperty(globalThis, "crypto", {
value: {
getRandomValues: (arr) => nodeCrypto.getRandomValues(arr),
subtle: nodeCrypto.subtle
},
});

Object.assign(global, { TextDecoder, TextEncoder });
2 changes: 1 addition & 1 deletion packages/inmemory/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"clean-packages": "rm -rf node_modules && rm -rf build",
"build": "rm -rf build && npx rollup -c rollup/rollup.mjs",
"coverage": "npx vitest run --coverage && npx istanbul-badges-readme",
"testw": "NODE_ENV=debug vitest --run tests/*.test.ts",
"test": "NODE_ENV=debug vitest --run tests/*.test.ts",
"test:watch": "NODE_ENV=debug vitest tests/*.test.ts",
"test:debug": "NODE_ENV=debug vitest tests/*.test.ts --inspect-brk --pool threads --poolOptions.threads.singleThread"
},
Expand Down
1 change: 1 addition & 0 deletions packages/inmemory/tests/setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ if (process.env.NODE_ENV === "debug") {
Object.defineProperty(globalThis, "crypto", {
value: {
getRandomValues: (arr) => nodeCrypto.getRandomValues(arr),
subtle: nodeCrypto.subtle
},
});

Expand Down
6 changes: 3 additions & 3 deletions packages/leveldb/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
"prepublishOnly": "npm run build",
"clean-packages": "rm -rf node_modules && rm -rf build",
"build": "rm -rf build && npx rollup -c rollup/rollup.mjs",
"coverage": "npx vitest run --coverage && npx istanbul-badges-readme",
"test2": "NODE_ENV=debug vitest --run tests/*.test.ts",
"test:watch": "NODE_ENV=debug vitest tests/*.test.ts",
"coverage": "npx vitest run --coverage --pool threads --poolOptions.threads.singleThread",
"test": "NODE_ENV=debug vitest --run tests/*.test.ts --pool threads --poolOptions.threads.singleThread",
"test:watch": "NODE_ENV=debug vitest tests/*.test.ts --pool threads --poolOptions.threads.singleThread",
"test:debug": "NODE_ENV=debug vitest tests/*.test.ts --inspect-brk --pool threads --poolOptions.threads.singleThread"
},
"repository": {
Expand Down
141 changes: 89 additions & 52 deletions packages/leveldb/src/leveldb/instance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import {
getPrimaryFieldOfPrimaryKey,
getQueryMatcher,
getSortComparator,
getStartIndexStringFromLowerBound,
getStartIndexStringFromUpperBound,
now,
} from 'rxdb'

Expand All @@ -38,7 +40,11 @@ import type {
LevelDBSettings,
RxStorageLevelDBType,
} from './types'
import { fixTxPipe } from '@pluto-encrypted/shared'
import { boundGE, boundGT, boundLE, boundLT, compareDocsWithIndex } from '@pluto-encrypted/shared'

export function getIndexName(index: string[]): string {
return index.join(',');
}

export class RxStorageIntanceLevelDB<RxDocType> implements RxStorageInstance<
RxDocType,
Expand Down Expand Up @@ -145,65 +151,94 @@ export class RxStorageIntanceLevelDB<RxDocType> implements RxStorageInstance<
}

async query(preparedQuery: PreparedQuery<RxDocType>): Promise<RxStorageQueryResult<RxDocType>> {
const { queryPlan, query } = preparedQuery
const selector = query.selector
const selectorKeys = Object.keys(selector)
const skip = query.skip ? query.skip : 0
const limit = query.limit ? query.limit : Infinity
const skipPlusLimit = skip + limit
const queryMatcher: QueryMatcher<RxDocumentData<RxDocType>> = getQueryMatcher(
const queryPlan = preparedQuery.queryPlan;
const query = preparedQuery.query;

const skip = query.skip ? query.skip : 0;
const limit = query.limit ? query.limit : Infinity;
const skipPlusLimit = skip + limit;

let queryMatcher: QueryMatcher<RxDocumentData<RxDocType>> | false = false;
if (!queryPlan.selectorSatisfiedByIndex) {
queryMatcher = getQueryMatcher(
this.schema,
preparedQuery.query
);
}

const queryPlanFields: string[] = queryPlan.index;
const mustManuallyResort = !queryPlan.sortSatisfiedByIndex;
const index: string[] | undefined = queryPlanFields;
const lowerBound: any[] = queryPlan.startKeys;
const lowerBoundString = getStartIndexStringFromLowerBound(
this.schema,
query
)
index,
lowerBound
);

const queryPlanFields: string[] = queryPlan.index
const indexes: string[] = []
if (queryPlanFields.length === 1) {
indexes.push(fixTxPipe(queryPlanFields[0]!))
} else {
indexes.push(...queryPlanFields.map(field => fixTxPipe(field)))
}
let upperBound: any[] = queryPlan.endKeys;

const shouldAddCompoundIndexes = this.schema.indexes?.find((index) => {
if (typeof index === 'string') {
return indexes.find((index2) => index2 === index)
} else {
return index.find((subIndex) => {
return subIndex === index.find((indexValue) => indexValue === subIndex)
})
const upperBoundString = getStartIndexStringFromUpperBound(
this.schema,
index,
upperBound
);
const indexName = getIndexName(index);
const docsWithIndex = await this.internals.getIndex(indexName)

let indexOfLower = (queryPlan.inclusiveStart ? boundGE : boundGT)(
docsWithIndex,
{
indexString: lowerBoundString
} as any,
compareDocsWithIndex
);

const indexOfUpper = (queryPlan.inclusiveEnd ? boundLE : boundLT)(
docsWithIndex,
{
indexString: upperBoundString
} as any,
compareDocsWithIndex
);


let rows: RxDocumentData<RxDocType>[] = [];
let done = false;
while (!done) {
const currentRow = docsWithIndex[indexOfLower];
if (
!currentRow ||
indexOfLower > indexOfUpper
) {
break;
}
})

if (shouldAddCompoundIndexes) {
indexes.splice(0, indexes.length)
indexes.push(this.collectionName)
if (typeof shouldAddCompoundIndexes === 'string') {
indexes.push(shouldAddCompoundIndexes)
} else {
indexes.push(...shouldAddCompoundIndexes)
const [currentDoc] = await this.findDocumentsById([currentRow], false)

if (currentDoc && (!queryMatcher || queryMatcher(currentDoc))) {
rows.push(currentDoc);
}
} else {
indexes.unshift(this.collectionName)
}

const indexName: string = `[${indexes.join('+')}]`
const docsWithIndex = await this.internals.getIndex(indexName)
const documents: Array<RxDocumentData<RxDocType>> = await this.internals.bulkGet(docsWithIndex)
let filteredDocuments = documents.filter((document) => {
if (selectorKeys.length <= 0) {
return true
} else {
return queryMatcher(document)
if (
(rows.length >= skipPlusLimit && !mustManuallyResort)
) {
done = true;
}
})

const sortComparator = getSortComparator(this.schema, preparedQuery.query)
filteredDocuments = filteredDocuments.sort(sortComparator)
indexOfLower++;
}

filteredDocuments = filteredDocuments.slice(skip, skipPlusLimit)
return {
documents: filteredDocuments
if (mustManuallyResort) {
const sortComparator = getSortComparator(this.schema, preparedQuery.query);
rows = rows.sort(sortComparator);
}

// apply skip and limit boundaries.
rows = rows.slice(skip, skipPlusLimit);
return Promise.resolve({
documents: rows
});
}

async count(preparedQuery: PreparedQuery<RxDocType>): Promise<RxStorageCountResult> {
Expand Down Expand Up @@ -244,7 +279,9 @@ export class RxStorageIntanceLevelDB<RxDocType> implements RxStorageInstance<
}

async remove(): Promise<void> {
await Promise.resolve()
this.internals.removed = true;
this.internals.clear();
await this.close();
}

conflictResultionTasks(): Observable<RxConflictResultionTask<RxDocType>> {
Expand Down Expand Up @@ -306,15 +343,15 @@ export class RxStorageIntanceLevelDB<RxDocType> implements RxStorageInstance<
// const writeRow = bulkInsertDocs[i]!
// const docId = writeRow.document[primaryPath]
// await this.internals.bulkPut([writeRow.document], this.collectionName, this.schema)
// ret.success.push(writeRow.document)
// ret.success[docId as any] = writeRow.document
// }

// const bulkUpdateDocs = categorized.bulkUpdateDocs
// for (let i = 0; i < bulkUpdateDocs.length; ++i) {
// const writeRow = bulkUpdateDocs[i]!
// const docId = writeRow.document[primaryPath]
// await this.internals.bulkPut([writeRow.document], this.collectionName, this.schema)
// ret.success.push(writeRow.document)
// ret.success[docId as any] = writeRow.document
// }

// if (categorized.eventBulk.events.length > 0) {
Expand Down
90 changes: 63 additions & 27 deletions packages/leveldb/src/leveldb/internal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export class LevelDBInternal<RxDocType> implements LevelDBStorageInternals<RxDoc

await pull(
pullLevel.read(db),
pull.filter(row => !Array.isArray(row.value)),
pull.filter(row => row && !Array.isArray(row.value)),
pull.map(row => {
docsInDbMap.set(row.key, row.value)
return row
Expand Down Expand Up @@ -163,40 +163,76 @@ export class LevelDBInternal<RxDocType> implements LevelDBStorageInternals<RxDoc
}

async clear() {
const db = await this.getInstance()
return db.clear()
const db = await this.getInstance();
return new Promise<void>((resolve, reject) => {
db.clear({}, (err) => {
if (err) {
return reject(err)
}
return resolve()
})
})
}

async close() {
return this.db.close()
const db = await this.getInstance();
return new Promise<void>((resolve, reject) => {
db.close((err) => {
if (err) {
return reject(err)
}
return resolve()
})
})
}

private getField(item: RxDocumentData<RxDocType>, fieldName: string) {
const splitFieldName = fieldName.split(".");
let value
while (splitFieldName.length) {
const [name] = splitFieldName.splice(0, 1);
if (name && item[name] !== undefined) {
value = item[name]
} else if (name && value && value[name] !== undefined) {
value = value[name]
}
}
return value;
}

private encapsulateIndex(item: RxDocumentData<RxDocType>, collectionName: string, requiredIndexes: string[]) {
return `[${collectionName}+${requiredIndexes.map((fieldName) => this.getField(item, fieldName)).join("+")}]`
}

async bulkPut(items: Array<RxDocumentData<RxDocType>>, collectionName: string, schema: Readonly<RxJsonSchema<RxDocumentData<RxDocType>>>) {
const primaryKeyKey = typeof schema.primaryKey === 'string' ? schema.primaryKey : schema.primaryKey.key
const saferIndexList = safeIndexList(schema)

for (const item of items) {
const shouldDelete = item._deleted
const id = getPrivateKeyValue(item, schema)
if (shouldDelete) {
for (const requiredIndexes of saferIndexList) {
const requiredIndex = `[${collectionName}+${requiredIndexes.join('+')}]`
await this.removeFromIndex(requiredIndex, id)
}
await this.removeFromIndex(`[${collectionName}+${primaryKeyKey}]`, id)
await this.removeFromIndex('[all]', id)
await this.delete(id)
this.documents.delete(id)
} else {
for (const requiredIndexes of saferIndexList) {
const requiredIndex = `[${collectionName}+${requiredIndexes.join('+')}]`
await this.updateIndex(requiredIndex, id)
try {
const primaryKeyKey = typeof schema.primaryKey === 'string' ? schema.primaryKey : schema.primaryKey.key
const saferIndexList = safeIndexList(schema)
for (const item of items) {
const shouldDelete = item._deleted
const id = getPrivateKeyValue(item, schema)
if (shouldDelete) {
for (const requiredIndexes of saferIndexList) {
const requiredIndex = this.encapsulateIndex(item, collectionName, requiredIndexes)
await this.removeFromIndex(requiredIndex, id)
}
await this.removeFromIndex(`[${collectionName}+${primaryKeyKey}]`, id)
await this.removeFromIndex('[all]', id)
await this.delete(id)
this.documents.delete(id)
} else {
for (const requiredIndexes of saferIndexList) {
const requiredIndex = this.encapsulateIndex(item, collectionName, requiredIndexes)
await this.updateIndex(requiredIndex, id)
}
await this.updateIndex(`[${collectionName}+${primaryKeyKey}]`, id)
await this.updateIndex('[all]', id)
await this.set(id, item)
this.documents.set(id, item)
}
await this.updateIndex(`[${collectionName}+${primaryKeyKey}]`, id)
await this.updateIndex('[all]', id)
await this.set(id, item)
this.documents.set(id, item)
}
} catch (err) {
console.log(err);
}
}
}
1 change: 1 addition & 0 deletions packages/leveldb/tests/setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ if (process.env.NODE_ENV === "debug") {
Object.defineProperty(globalThis, "crypto", {
value: {
getRandomValues: (arr) => nodeCrypto.getRandomValues(arr),
subtle: nodeCrypto.subtle
},
});

Expand Down
1 change: 0 additions & 1 deletion packages/leveldb/vitest.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,5 @@ export default defineConfig({
'src/**/*',
],
},

}
})
4 changes: 2 additions & 2 deletions packages/test-suite/src/helper/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ export function testCorrectQueries<RxDocType>(
afterEach(async () => {
if (storageInstance) {
await storageInstance.cleanup(Infinity)
storageInstance = undefined
await storageInstance.close()
}
})

Expand All @@ -270,7 +270,7 @@ export function testCorrectQueries<RxDocType>(
schema,
options: {},
multiInstance: false,
devMode: true
devMode: false
});

const rawDocsData = input.data.map(row => {
Expand Down
Loading

0 comments on commit 967dc2c

Please sign in to comment.