diff --git a/src/cli/graph.ts b/src/cli/graph.ts index 5d54871..c3abfb8 100644 --- a/src/cli/graph.ts +++ b/src/cli/graph.ts @@ -16,7 +16,7 @@ export async function settleGraph(graph: UnsettledUserGraph) { image: await user.profile.image .then((image: Blob | null) => { if (!image) { - console.error(`Failed to download profile picture. (User: ${user.profile.username})`) + console.error(`Failed to download profile picture. (${user.profile.username})`) return null; } diff --git a/src/index.ts b/src/index.ts index 19addf7..1a7336a 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,6 +1,6 @@ import * as prompt from '@inquirer/prompts'; import {ExitPromptError} from '@inquirer/prompts'; -import {FollowerFetcherEvent, FollowerFetcherEventTypes, getFollowerGraph} from "./instagram/follower"; +import {FollowerFetcherEvent, FollowerFetcherEventTypes, fetchFollowerGraph} from "./instagram/follower"; import SessionData from "./instagram/session-data"; import {UnsettledUser, UnsettledUserGraph, UserGraph} from "./instagram/user"; import {PathOrFileDescriptor, writeFileSync} from "node:fs"; @@ -78,27 +78,30 @@ async function streamGraph(root: UnsettledUser, filename: string, stream: Readab graph = value.graph - const identifier = `(User: ${value.user.profile.username})` + const identifier = `(${value.user.profile.username})` + const time = `[${new Date().toISOString()}]` if (value.type === FollowerFetcherEventTypes.DEPTH_LIMIT_FOLLOWER) { - console.log(`Reached the maximum amount of followers to include. Currently included are ${value.amount}. ${identifier}`) + console.log(`${time} Reached the maximum amount of followers to include. Currently included are ${value.amount}. ${identifier}`) } else if (value.type === FollowerFetcherEventTypes.DEPTH_LIMIT_FOLLOWING) { - console.log(`Reached the maximum amount of followed users to include. Currently included are ${value.amount}. ${identifier}`) + console.log(`${time} Reached the maximum amount of followed users to include. Currently included are ${value.amount}. ${identifier}`) } else if (value.type === FollowerFetcherEventTypes.RATE_LIMIT_BATCH) { - console.log(`Reached follower batch limit. Resuming after ${value.delay} milliseconds. ${identifier}`) + console.log(`${time} Reached follower batch limit. Resuming after ${value.delay} milliseconds. ${identifier}`) await updatesSaveFiles(value.graph) } else if (value.type === FollowerFetcherEventTypes.RATE_LIMIT_DAILY) { - console.log(`Reached follower daily limit. Resuming after ${value.delay} milliseconds. ${identifier}`) + console.log(`${time} Reached follower daily limit. Resuming after ${value.delay} milliseconds. ${identifier}`) await updatesSaveFiles(value.graph) } else if (value.type === FollowerFetcherEventTypes.UPDATE) { const total = Object.entries(value.graph).length - const followers = value.added.followers.length; + const followers = value.added.followers.ids.length; const users = value.added.users.length + const targetUsername = value.added.followers.target.profile.username; + console.log( - `Added ${followers > 0 ? followers : 'no'} follower${followers > 1 ? 's' : ''} to ${value.user.profile.username}. ` + + `${time} Added ${followers > 0 ? followers : 'no'} follower${followers > 1 ? 's' : ''} to ${targetUsername}. ` + `Discovered ${users > 0 ? users : 'no'} new user${users > 1 ? 's' : ''}. ` + - `Total user count: ${total}, completely queried users ${value.added.progress.done}.` + `Total user count: ${total}, completely queried users ${value.added.progress.done}. ${identifier}` ) } } @@ -136,7 +139,7 @@ try { const includeFollowing = await prompt.confirm({message: "Include following?", default: true}) - const stream = getFollowerGraph({ + const stream = fetchFollowerGraph({ includeFollowing, root, session, @@ -152,6 +155,10 @@ try { }, parallelTasks: 20, delay: { + images: { + upper: 5000, + lower: 500 + }, pages: { upper: 40000, lower: 20000 @@ -170,6 +177,8 @@ try { }) const {graph: unsettledGraph, cancellation} = await streamGraph(root, filename, stream) + + console.log('Waiting for profile pictures to be downloaded.') const graph = await settleGraph(unsettledGraph) const fileWriters = Promise.allSettled([ @@ -185,7 +194,7 @@ try { await Promise.all([ fileWriters.then(() => { console.info( - "The may process still needs to wait on the rate limiting timeouts to exit cleanly. " + + "The process may still needs to wait on the rate limiting timeouts to exit cleanly. " + "Killing it should not cause any data lose." ) }), diff --git a/src/instagram/follower.ts b/src/instagram/follower.ts index 7daff35..68c7508 100644 --- a/src/instagram/follower.ts +++ b/src/instagram/follower.ts @@ -1,5 +1,5 @@ import SessionData, {sessionToCookie} from "./session-data"; -import {RandomDelayLimit, Limits} from "./limits"; +import {Limits, RandomDelayLimit} from "./limits"; import {downloadProfilePicture, UnsettledUser, UnsettledUserGraph} from "./user"; import {ReadableStream} from "node:stream/web"; import {hasJsonBody} from "./request"; @@ -9,7 +9,10 @@ export enum FollowerFetcherEventTypes { } export interface FollowerFetcherAddition { - followers: number[], + followers: { + target: UnsettledUser, + ids: number[] + }, users: UnsettledUser[], progress: { done: number @@ -37,17 +40,14 @@ function randomDelay(limit: RandomDelayLimit) { } -async function rateLimiter({graph, user, phase, taskCount, limits, controller}: { +async function rateLimiter({graph, user, phase, limits, controller}: { graph: UnsettledUserGraph, user: UnsettledUser, phase: number, - taskCount: number limits: Limits, controller: ReadableStreamDefaultController }) { - const phaseProgression = Math.floor( - Object.entries(graph).length / (limits.rate.batch.size - taskCount * 25) - ) + const phaseProgression = Math.floor(Object.entries(graph).length / limits.rate.batch.size) if (phase < phaseProgression) { if (phaseProgression > limits.rate.batch.count) { @@ -71,23 +71,39 @@ async function rateLimiter({graph, user, phase, taskCount, limits, controller}: }) await delay.delay - return phase + return phaseProgression } } - // delay between retrieving the next follower page await randomDelay(limits.rate.delay.pages).delay return phase } -function addFollowerToGraph({graph, followers, done, target, controller}: { +interface Task { + job: () => Promise, + user: UnsettledUser, + noWait?: boolean, + previousResults?: TaskResult[], + stop?: boolean + direction: FollowerDirection +} + +interface TaskResult { + additionalUsers: UnsettledUser[], + additionalFollowers: { + target: UnsettledUser, + ids: number[] + }, + completedUsers: number[], + graph: UnsettledUserGraph +} + +function addFollowerToGraph({graph, followers, target}: { graph: UnsettledUserGraph, followers: UnsettledUser[], - done: Set, target: number, - controller: ReadableStreamDefaultController -},) { +},): TaskResult { const followerIds = new Set(graph[target].followerIds) const additionalFollowers = followers .map(follower => follower.id) @@ -97,55 +113,49 @@ function addFollowerToGraph({graph, followers, done, target, controller}: { const additionalUsers = followers.filter(follower => graph[follower.id] === undefined) additionalUsers.forEach(user => graph[user.id] = user) - additionalUsers.filter(follower => follower.private) + const done = additionalUsers.filter(follower => follower.private) .map(follower => follower.id) - .forEach(id => done.add(id)) - - controller.enqueue({ - type: FollowerFetcherEventTypes.UPDATE, - user: graph[target], - added: { - followers: additionalFollowers, - users: additionalUsers, - progress: { - done: done.size - } - }, - graph - }) + + return { + additionalFollowers: {ids: additionalFollowers, target: graph[target]}, + additionalUsers, + completedUsers: done, + graph: {...graph} + } } -function addFollowingToGraph({graph, following, done, task, controller}: { +function addFollowingToGraph({graph, following, target}: { graph: UnsettledUserGraph, following: UnsettledUser[], - done: Set, - task: number, - controller: ReadableStreamDefaultController -},) { - following.filter(following => graph[following.id] !== undefined).forEach(user => addFollowerToGraph({ - graph, - followers: [graph[task]], - done, - controller, - target: user.id + target: number, +}): TaskResult[] { + if (!graph[target].followingCount) graph[target].followingCount = 0 + graph[target].followingCount += following.length + + const results: TaskResult[] = following + .filter(following => graph[following.id] !== undefined) + .map(user => addFollowerToGraph({ + graph, + followers: [graph[target]], + target: user.id + })) + + return results.concat(following.filter(following => graph[following.id] === undefined).map(user => { + graph[user.id] = {...user, followerIds: [target]}; + + return { + completedUsers: results.reduce((done: number[], result) => done.concat(result.completedUsers), []), + additionalUsers: [user], + additionalFollowers: { + target: user, + ids: [target] + }, + graph: {...graph} + } })) - - following.filter(following => graph[following.id] === undefined).forEach(user => { - graph[user.id] = { - ...user, - followerIds: [task] - }; - - controller.enqueue({ - graph: {...graph}, - type: FollowerFetcherEventTypes.UPDATE, - user, - added: {users: [user], progress: {done: done.size}, followers: [task]} - }) - }) } -export function getFollowerGraph({root, session, limits, includeFollowing}: { +export function fetchFollowerGraph({root, session, limits, includeFollowing}: { root: UnsettledUser, session: SessionData, includeFollowing: boolean, @@ -153,18 +163,17 @@ export function getFollowerGraph({root, session, limits, includeFollowing}: { }): ReadableStream { const graph: UnsettledUserGraph = {[root.id]: root} - let controller: ReadableStreamDefaultController - return new ReadableStream({ - start: async (c: ReadableStreamDefaultController) => { - controller = c - + start: async (controller: ReadableStreamDefaultController) => { if (root.private) { controller.enqueue({ type: FollowerFetcherEventTypes.UPDATE, user: root, added: { - followers: [], + followers: { + target: root, + ids: [] + }, users: [root], progress: { done: 1 @@ -177,12 +186,7 @@ export function getFollowerGraph({root, session, limits, includeFollowing}: { return } - try { - await createFollowerGraph({limits, graph, session, controller, includeFollowing}); - } catch (e) { - controller.error(e) - return - } + await createFollowerGraph({limits, graph, session, controller, includeFollowing}); controller.close(); }, @@ -196,6 +200,43 @@ function excess(current: number, limit: number, addition: any[]) { return addition.slice(addition.length - (current - limit)) } +async function taskRunner(graph: UnsettledUserGraph, task: Task, limits: Limits): Promise { + const result = await task.job() + const user = graph[task.user.id] + + let additions: TaskResult[] = [] + + if (result.direction === FollowerDirection.FOLLOWER) { + additions = [addFollowerToGraph({graph, followers: result.page, target: task.user.id})] + + if (!limits.depth.followers || user.followerIds.length <= limits.depth.followers) { + return {user, job: result.next, previousResults: additions, direction: result.direction} + } + } else if (result.direction === FollowerDirection.FOLLOWING) { + additions = addFollowingToGraph({graph, following: result.page, target: task.user.id}) + + if (!limits.depth.followers || (user.followingCount ?? 0) <= limits.depth.followers) { + return {user, job: result.next, previousResults: additions, direction: result.direction} + } + } + + const followers = result.direction === FollowerDirection.FOLLOWER; + const amount = followers ? user.followerIds.length : user.followingCount + + return { + job: null, + stop: true, + direction: result.direction, + user, + previousResults: [...additions, { + completedUsers: excess(amount, limits.depth.followers, result.page), + additionalUsers: [], + additionalFollowers: {ids: [], target: task.user}, + graph: {...graph} + }] + } +} + async function createFollowerGraph({controller, limits, graph, session, includeFollowing}: { controller: ReadableStreamDefaultController, graph: UnsettledUserGraph, @@ -207,120 +248,86 @@ async function createFollowerGraph({controller, limits, graph, session, includeF let phase = 0 for (let gen = 0; gen <= limits.depth.generations && !graph.canceled; ++gen) { - const open = Object.values(graph) + // create tasks for each uncompleted user, and put new jobs at the end of the queue, creating a more + // meaning full breath first algorithm + const taskQueue: Task[] = Object.values(graph) .filter(user => !done.has(user.id)) - .map(user => user.id) - - if (open.length < 1 || graph.canceled) break; // no open task, skip remaining generations - - while (open.length > 0 && !graph.canceled) { - const taskCount = Math.min(Math.floor(limits.rate.batch.size / 100), limits.rate.parallelTasks) - const tasks = open.splice(0, taskCount < 1 ? 1 : taskCount).map(async task => { - graph[task].followerIds = graph[task].followerIds ?? [] - - const followers = async () => { - let nextPage = undefined - - while (nextPage !== null && !graph.canceled) { - const newPhase = gen === 0 ? 0 : await rateLimiter({ - graph, - user: graph[task], - phase, - limits: limits, - taskCount: taskCount, - controller, - }) - - const followers = await fetchFollowers({ - session, - targetUser: graph[task], - nextPage, - limits, - direction: FollowerDirection.FOLLOWER - }) - - addFollowerToGraph({graph, followers: followers.page, done, target: task, controller}) - - nextPage = followers.nextPage - phase = newPhase - - const userFollowerCount = graph[task].followerIds.length; - if (limits.depth.followers > 0 && userFollowerCount >= limits.depth.followers) { - excess(userFollowerCount, limits.depth.followers, followers.page) - .forEach(user => done.add(user.id)) - - controller.enqueue({ - type: FollowerFetcherEventTypes.DEPTH_LIMIT_FOLLOWER, - user: graph[task], - graph, - amount: userFollowerCount - }) - break; - } - } + .reduce((tasks, user): Task[] => { + tasks.push({ + job: () => fetchFollowers({session, user, limits, direction: FollowerDirection.FOLLOWER}), + user, + noWait: true, + }) + + if (includeFollowing) { + tasks.push({ + job: () => fetchFollowers({session, user, limits, direction: FollowerDirection.FOLLOWING}), + user, + noWait: true + }) } - const following = async () => { - let nextPage = undefined - let followingCount = 0 - - while (nextPage !== null && !graph.canceled) { - const newPhase = gen === 0 ? 0 : await rateLimiter({ - graph, - user: graph[task], - phase, - taskCount: taskCount, - limits, - controller - }) - - const following = await fetchFollowers({ - session, - targetUser: graph[task], - nextPage, - limits, - direction: FollowerDirection.FOLLOWING - }) - - addFollowingToGraph({ - graph, - following: following.page, - done, - controller, - task: graph[task].id - }) - - followingCount += following.page.length - phase = newPhase - - if (limits.depth.followers > 0 && followingCount >= limits.depth.followers) { - excess(followingCount, limits.depth.followers, following.page) - .forEach(user => done.add(user.id)) - - controller.enqueue({ - type: FollowerFetcherEventTypes.DEPTH_LIMIT_FOLLOWING, - user: graph[task], - graph: {...graph}, - amount: followingCount - }) - break; - } + return tasks + }, []) - nextPage = following.nextPage; - } - } + if (taskQueue.length < 1) break; // no open task, skip remaining generations - try { - await Promise.all([followers(), (includeFollowing ? following() : Promise.resolve())]) - } catch (e) { - controller.error(e) - } + // Users per response: followers = 25, following = 200 + const maxParallel = Math.min( + Math.floor(limits.rate.batch.size / (25 + (includeFollowing ? 200 : 0))), + limits.rate.parallelTasks + ) - done.add(task); - }); + const runners = new Array(Math.max(maxParallel, 1)).fill(async () => { + while (taskQueue.length > 0 && !graph.canceled) { + const task = taskQueue.pop() + const followers = task.direction === FollowerDirection.FOLLOWER; - await Promise.all(tasks) - } + if (!task.noWait) phase = await rateLimiter({ + graph, + user: task.user, + phase, + limits, + controller + }) + + const next = await taskRunner(graph, task, limits) + + next.previousResults.forEach(result => { + result.completedUsers.forEach((id) => done.add(id)) + + controller.enqueue({ + type: FollowerFetcherEventTypes.UPDATE, + user: task.user, + graph: {...result.graph}, + added: { + followers: result.additionalFollowers, + users: result.additionalUsers, + progress: { + done: done.size + } + } + }) + }) + + if (next.stop) { + const amount = followers ? graph[task.user.id].followerIds.length : graph[task.user.id].followingCount + + controller.enqueue({ + type: followers ? FollowerFetcherEventTypes.DEPTH_LIMIT_FOLLOWER : FollowerFetcherEventTypes.DEPTH_LIMIT_FOLLOWING, + user: task.user, + graph: {...graph}, + amount + }) + } else if (next.job) { + taskQueue.push(next) + } else { + done.add(task.user.id) + } + } + }).map(runner => runner()) + + await Promise.all(runners).catch((e) => controller.error(e)) } return graph @@ -330,13 +337,19 @@ enum FollowerDirection { FOLLOWER, FOLLOWING } -async function fetchFollowers({session, targetUser, nextPage, direction, limits}: { - session: SessionData, targetUser: UnsettledUser, nextPage?: string, direction: FollowerDirection, limits: Limits -}): Promise<{ page: UnsettledUser[], nextPage: string }> { - const query = nextPage ? `?max_id=${nextPage}` : ''; +type FollowerPage = { page: UnsettledUser[], next: null | (() => Promise), direction: FollowerDirection } + +async function fetchFollowers({session, user: target, page, direction, limits}: { + session: SessionData, + user: UnsettledUser, + page?: undefined | string | null, + direction: FollowerDirection, + limits: Limits +}): Promise { + const query = page ? `?max_id=${page}` : ''; const directionPath = direction === FollowerDirection.FOLLOWING ? 'following' : 'followers' - const response = await fetch(`https://www.instagram.com/api/v1/friendships/${targetUser.id}/${directionPath}/${query}`, { + const response = await fetch(`https://www.instagram.com/api/v1/friendships/${target.id}/${directionPath}/${query}`, { headers: { "Sec-Fetch-Site": "same-origin", "X-IG-App-ID": "936619743392459", @@ -362,7 +375,7 @@ async function fetchFollowers({session, targetUser, nextPage, direction, limits} } } - const page = (await response.json()) as { + const result = (await response.json()) as { users: { id: string, full_name: string, @@ -374,21 +387,25 @@ async function fetchFollowers({session, targetUser, nextPage, direction, limits} } return { - page: page.users.map((user) => { + direction, + page: result.users.map((user) => { return { id: parseInt(user.id, 10), profile: { username: user.username, name: user.full_name, - image: randomDelay({ - lower: 0, - upper: limits.rate.delay.pages.upper - }).delay.then(() => downloadProfilePicture(user.profile_pic_url)) + image: randomDelay(limits.rate.delay.images).delay.then(() => downloadProfilePicture(user.profile_pic_url)) }, public: !user.is_private, - private: user.is_private && targetUser.id != session.user.id + private: user.is_private && target.id != session.user.id } }), - nextPage: page.next_max_id ?? null + next: result.next_max_id ? () => fetchFollowers({ + session, + user: target, + page: result.next_max_id, + direction, + limits + }) : null } } diff --git a/src/instagram/limits.ts b/src/instagram/limits.ts index d27e7c6..89d4911 100644 --- a/src/instagram/limits.ts +++ b/src/instagram/limits.ts @@ -15,6 +15,7 @@ export interface Limits { } parallelTasks: number delay: { + images: RandomDelayLimit, daily: RandomDelayLimit, batches: RandomDelayLimit, pages: RandomDelayLimit diff --git a/src/instagram/user.ts b/src/instagram/user.ts index 96366ea..02bb82a 100644 --- a/src/instagram/user.ts +++ b/src/instagram/user.ts @@ -8,6 +8,7 @@ export interface User { image: string }, followerIds?: number[], + followingCount?: number private?: boolean, public: boolean, personal?: boolean @@ -21,6 +22,7 @@ export interface UnsettledUser { image: Promise | null, } followerIds?: number[], + followingCount?: number private?: boolean, public: boolean, personal?: boolean diff --git a/src/visualization/index.ts b/src/visualization/index.ts index 406e7a1..f58e3bd 100644 --- a/src/visualization/index.ts +++ b/src/visualization/index.ts @@ -25,7 +25,7 @@ window.addEventListener("DOMContentLoaded", async () => { toolbar.addEventListener("remove-highlighting", () => visualization.removeHighlights()); toolbar.addEventListener("reset-positioning", () => visualization.resetPositioning()) - toolbar.addEventListener("search-user", function (event: CustomEvent) { + toolbar.addEventListener("search-user", async function (event: CustomEvent) { const matchingUser = users.find((user: User) => user.profile.username === event.detail); if (!matchingUser) return toolbar.setSearchError(`No user found with the exact username: ${event.detail}`)