Skip to content

Commit

Permalink
Retry get metadata on sync
Browse files Browse the repository at this point in the history
  • Loading branch information
TheIsrael1 committed Oct 18, 2024
1 parent 7861f83 commit b05e6c6
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 108 deletions.
31 changes: 31 additions & 0 deletions packages/core/src/agents/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,34 @@ export function getPineconeConfig({ dir }: { dir: string }) {
const agentBlueprintPath = path.join(agentDirPath, `pinecone.json`);
return getAgentFile(agentBlueprintPath);
}

export const retryFn = async <T>(
operation: () => Promise<T>,
{
maxAttempts = 3,
initialDelay = 1000,
maxDelay = 10000,
factor = 2,
jitter = true,
} = {}
) => {
let delay = initialDelay;
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
try {
return await operation();
} catch (error) {
if (attempt === maxAttempts) {
throw error; // Rethrow the error on the last attempt
}
console.warn(`Attempt ${attempt} failed. Retrying in ${delay}ms...`);
await new Promise((resolve) => setTimeout(resolve, delay));
// Calculate next delay with exponential backoff
delay = Math.min(delay * factor, maxDelay);
// Add jitter if enabled
if (jitter) {
const jitterFactor = 0.5 + Math.random();
delay = Math.floor(delay * jitterFactor);
}
}
}
};
116 changes: 20 additions & 96 deletions packages/core/src/agents/vector-sync.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { openai } from '@ai-sdk/openai';
import { z } from 'zod';
import { embed } from 'ai';
import { pick } from 'lodash';
import { getAgentBlueprint, getPineconeConfig } from './utils';
import { pick, set } from 'lodash';
import { getAgentBlueprint, getPineconeConfig, retryFn } from './utils';
import { Mastra } from '../framework';
import { VectorLayer } from '../vector-access';
import { IntegrationApi } from '../types';
Expand Down Expand Up @@ -38,10 +38,21 @@ export async function executeIndexSync({ event, mastra }: any) {
}

for (const index of indexes) {
const indexMetadata =
await mastra.vectorLayer.getPineconeIndexWithMetadata({
name: index,
});
const getPineconeIndexWithMetadata = async () => {
try {
const res = await mastra.vectorLayer.getPineconeIndexWithMetadata({
name: index,
});
if (!res.length) {
throw new Error('No index metadata found');
}
return res;
} catch (e: any) {
throw new Error(e);
}
};

const indexMetadata = await retryFn<any>(getPineconeIndexWithMetadata);

if (!indexMetadata?.length) {
console.error('No index metadata found for', index);
Expand Down Expand Up @@ -540,101 +551,14 @@ export interface VectorStats {
namespaces: Record<string, { vectorCount: number }>;
}

export const fetchPineconeIndexes = async () => {
try {
const response = await fetch('https://api.pinecone.io/indexes', {
method: 'GET',
headers: {
'Api-Key': process.env.PINECONE_API_KEY!,
'X-Pinecone-API-Version': 'unstable',
},
cache: 'no-store',
});

const { indexes } = (await response.json()) || {};

return indexes as VectorIndex[];
} catch (err) {
console.log('Error fetching indexes using JS fetch====', err);
}
};

export const fetchPineconedIndexByName = async (name: string) => {
try {
const response = await fetch(`https://api.pinecone.io/indexes/${name}`, {
method: 'GET',
headers: {
'Api-Key': process.env.PINECONE_API_KEY!,
'X-Pinecone-API-Version': 'unstable',
},
cache: 'no-store',
});

const data = (await response.json()) || {};

return data as VectorIndex;
} catch (err) {
console.log('Error fetching indexes using JS fetch====', err);
}
};

export const fetchPineconeIndexStats = async (host: string) => {
try {
const response = await fetch(`https://${host}/describe_index_stats`, {
method: 'GET',
headers: {
'Api-Key': process.env.PINECONE_API_KEY!,
'X-Pinecone-API-Version': '2024-07',
},
cache: 'no-store',
});

const data = (await response.json()) || {};

return data as VectorStats;
} catch (err) {
console.log('Error fetching indexes using JS fetch====', err);
}
};

export const fetchPineconeRecordByNamespaceAndId = async ({
namespace,
id,
host,
}: {
namespace: string;
id: string;
host: string;
}) => {
try {
const response = await fetch(
`https://${host}/vectors/fetch?ids=${id}&namespace=${namespace}`,
{
method: 'GET',
headers: {
'Api-Key': process.env.PINECONE_API_KEY!,
'X-Pinecone-API-Version': 'unstable',
},

cache: 'no-store',
}
);

const data = (await response.json()) || {};

return data as Record<string, any>;
} catch (err) {
console.log('Error fetching indexe', err);
}
};

export const getPineconeIndices = async () => {
const indexes = await fetchPineconeIndexes();
const vectorLayer = new VectorLayer();
const indexes = await vectorLayer.fetchPineconeIndexes();

if (indexes && indexes?.length > 0) {
const indexesWithStats = await Promise.all(
indexes.map(async (index) => {
const stats = await fetchPineconeIndexStats(index.host);
const stats = await vectorLayer.fetchPineconeIndexStats(index.host);
let namespaces: string[] = [];

if (stats?.namespaces) {
Expand Down
79 changes: 67 additions & 12 deletions packages/core/src/vector-access/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,7 @@ import { openai } from '@ai-sdk/openai';
import { Pinecone } from '@pinecone-database/pinecone';
import { embed } from 'ai';
import { VectorEntityData } from './types';
import {
fetchPineconeIndexStats,
fetchPineconeRecordByNamespaceAndId,
fetchPineconedIndexByName,
} from '../agents';
import { VectorIndex, VectorStats } from '../agents';

export class VectorLayer {
supportedProviders = ['PINECONE'];
Expand Down Expand Up @@ -58,15 +54,76 @@ export class VectorLayer {
return embedding as any;
}

fetchPineconeIndexes = async () => {
try {
const response = await fetch('https://api.pinecone.io/indexes', {
method: 'GET',
headers: {
'Api-Key': process.env.PINECONE_API_KEY!,
'X-Pinecone-API-Version': 'unstable',
},
cache: 'no-store',
});

const { indexes } = (await response.json()) || {};

return indexes as VectorIndex[];
} catch (err) {
console.log('Error fetching indexes using JS fetch====', err);
}
};

fetchPineconedIndexByName = async (name: string) => {
try {
const response = await fetch(`https://api.pinecone.io/indexes/${name}`, {
method: 'GET',
headers: {
'Api-Key': process.env.PINECONE_API_KEY!,
'X-Pinecone-API-Version': 'unstable',
},
cache: 'no-store',
});

const data = (await response.json()) || {};

return data as VectorIndex;
} catch (err) {
console.log('Error fetching indexes using JS fetch====', err);
}
};

fetchPineconeIndexStats = async (host: string) => {
try {
const response = await fetch(`https://${host}/describe_index_stats`, {
method: 'GET',
headers: {
'Api-Key': process.env.PINECONE_API_KEY!,
'X-Pinecone-API-Version': '2024-07',
},
cache: 'no-store',
});

const data = (await response.json()) || {};

return data as VectorStats;
} catch (err) {
console.log('Error fetching indexes using JS fetch====', err);
}
};

async getPineconeIndexWithMetadata({ name }: { name: string }) {
try {
if (!name) {
console.log('Index name not passed');
return [];
}
const newIndex = await fetchPineconedIndexByName(name);

const indexQuery = await fetchPineconeIndexStats(newIndex?.host!);
const newIndex = await this.getPineconeIndex({ name });
const indexQuery = await newIndex?.describeIndexStats();

console.log({
newIndex,
});

console.log({
indexQuery,
Expand All @@ -83,11 +140,9 @@ export class VectorLayer {

if (namespaces.length) {
for (const namespace of namespaces) {
const namespaceData = await fetchPineconeRecordByNamespaceAndId({
host: newIndex?.host!,
namespace,
id: name,
});
const namespaceData = await newIndex
?.namespace(namespace)
.fetch([name]);

const metadata = namespaceData?.records?.[name]?.metadata;

Expand Down

0 comments on commit b05e6c6

Please sign in to comment.