Skip to content

Commit

Permalink
Merge pull request #103 from guardian/giant-integration
Browse files Browse the repository at this point in the history
enabling transcription integration for giant
  • Loading branch information
marjisound authored Oct 9, 2024
2 parents 0dca7ef + a2c4450 commit 34928a7
Show file tree
Hide file tree
Showing 10 changed files with 264 additions and 57 deletions.
22 changes: 17 additions & 5 deletions packages/backend-common/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { findParameter, getParameters } from './configHelpers';
import { Parameter, SSM } from '@aws-sdk/client-ssm';
import { defaultProvider } from '@aws-sdk/credential-provider-node';
import { logger } from '@guardian/transcription-service-backend-common';
import { DestinationService } from '@guardian/transcription-service-common';
export interface TranscriptionConfig {
auth: {
clientId: string;
Expand All @@ -16,9 +17,7 @@ export interface TranscriptionConfig {
emailNotificationFromAddress: string;
sourceMediaBucket: string;
transcriptionOutputBucket: string;
destinationQueueUrls: {
transcriptionService: string;
};
destinationQueueUrls: DestinationQueueUrls;
tableName: string;
};
aws: {
Expand All @@ -27,6 +26,11 @@ export interface TranscriptionConfig {
};
}

type DestinationQueueUrls = {
[DestinationService.TranscriptionService]: string;
[DestinationService.Giant]: string;
};

const credentialProvider = (onAws: boolean) =>
onAws ? undefined : defaultProvider({ profile: 'investigations' });

Expand Down Expand Up @@ -76,11 +80,18 @@ export const getConfig = async (): Promise<TranscriptionConfig> => {
stage === 'DEV'
? undefined
: findParameter(parameters, paramPath, 'deadLetterQueueUrl');
const destinationTopic = findParameter(

const destinationQueue = findParameter(
parameters,
paramPath,
'destinationQueueUrls/transcriptionService',
);

const giantDestinationQueue = findParameter(
parameters,
paramPath,
'destinationQueueUrls/giant',
);
// AWS clients take an optional 'endpoint' property that is only needed by localstack - on code/prod you don't need
// to set it. Here we inder the endpoint (http://localhost:4566) from the sqs url
const localstackEndpoint =
Expand Down Expand Up @@ -132,7 +143,8 @@ export const getConfig = async (): Promise<TranscriptionConfig> => {
sourceMediaBucket,
emailNotificationFromAddress,
destinationQueueUrls: {
transcriptionService: destinationTopic,
[DestinationService.TranscriptionService]: destinationQueue,
[DestinationService.Giant]: giantDestinationQueue,
},
tableName,
transcriptionOutputBucket,
Expand Down
2 changes: 1 addition & 1 deletion packages/backend-common/src/sqs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ export const parseTranscriptJobMessage = (
return job.data;
}
logger.error(
`Failed to parse message ${message.MessageId}, contents: ${message.Body}`,
`Failed to parse message ${message.MessageId}, contents: ${message.Body}, errors: ${JSON.stringify(job.error.errors, null, 2)}`,
);
return undefined;
};
Expand Down
14 changes: 13 additions & 1 deletion packages/cdk/lib/transcription-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,15 @@ export class TranscriptionService extends GuStack {
},
);

const giantTranscriptionOutputQueueArn = new GuStringParameter(
this,
'GiantTranscriptionOutputQueueArn',
{
fromSSM: true,
default: `/${props.stage}/investigations/GiantTranscriptionOutputQueueArn`,
},
).valueAsString;

const ssmPrefix = `arn:aws:ssm:${props.env.region}:${this.account}:parameter`;
const ssmPath = `${this.stage}/${this.stack}/${APP_NAME}`;
const domainName =
Expand Down Expand Up @@ -281,7 +290,10 @@ export class TranscriptionService extends GuStack {
}),
new GuAllowPolicy(this, 'WriteToDestinationTopic', {
actions: ['sqs:SendMessage'],
resources: [transcriptionOutputQueue.queueArn],
resources: [
transcriptionOutputQueue.queueArn,
giantTranscriptionOutputQueueArn,
],
}),
new GuAllowPolicy(this, 'WriteToELK', {
actions: [
Expand Down
5 changes: 5 additions & 0 deletions packages/common/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const zodLanguageCode = z.enum(getKeys(languageCodeToLanguage));

export enum DestinationService {
TranscriptionService = 'TranscriptionService',
Giant = 'Giant',
}

const SignedUrl = z.object({
Expand Down Expand Up @@ -46,6 +47,8 @@ export const TranscriptionJob = z.object({
outputBucketUrls: OutputBucketUrls,
languageCode: zodLanguageCode,
translate: z.boolean(),
// we can get rid of this when we switch to using a zip
translationOutputBucketUrls: z.optional(OutputBucketUrls),
});

export type TranscriptionJob = z.infer<typeof TranscriptionJob>;
Expand All @@ -61,6 +64,8 @@ export const TranscriptionOutputSuccess = TranscriptionOutputBase.extend({
status: z.literal('SUCCESS'),
languageCode: z.string(),
outputBucketKeys: OutputBucketKeys,
// we can get rid of this when we switch to using a zip
translationOutputBucketKeys: z.optional(OutputBucketKeys),
});

export const TranscriptionOutputFailure = TranscriptionOutputBase.extend({
Expand Down
51 changes: 42 additions & 9 deletions packages/worker/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
getASGClient,
} from '@guardian/transcription-service-backend-common';
import {
DestinationService,
OutputBucketKeys,
TranscriptionJob,
TranscriptionOutputFailure,
Expand All @@ -24,6 +25,7 @@ import {
getTranscriptionText,
convertToWav,
getOrCreateContainer,
WhisperBaseParams,
} from './transcribe';
import path from 'path';

Expand Down Expand Up @@ -226,12 +228,21 @@ const pollTranscriptionQueue = async (
}
await publishTranscriptionOutputFailure(
sqsClient,
config.app.destinationQueueUrls.transcriptionService,
config.app.destinationQueueUrls[job.transcriptDestinationService],
job,
);
return;
}

// Giant doesn't know the language of files uploaded to it, so for Giant files we first run language detection
// then based on the output, either run transcription or run transcription and translation, and return the output
// of both to the user. This is different from the transcription-service, where transcription and translation are
// two separate jobs
const combineTranscribeAndTranslate =
job.transcriptDestinationService === DestinationService.Giant &&
job.translate;
const extraTranslationTimMultiplier = combineTranscribeAndTranslate ? 2 : 1;

if (ffmpegResult.duration && ffmpegResult.duration !== 0) {
// Transcription time is usually slightly longer than file duration.
// Update visibility timeout to 2x the file duration plus 10 minutes for the model to load.
Expand All @@ -241,18 +252,23 @@ const pollTranscriptionQueue = async (
sqsClient,
config.app.taskQueueUrl,
receiptHandle,
ffmpegResult.duration * 2 + 600,
(ffmpegResult.duration * 2 + 600) * extraTranslationTimMultiplier,
);
}

const transcriptResult = await getTranscriptionText(
const whisperBaseParams: WhisperBaseParams = {
containerId,
ffmpegResult.wavPath,
fileToTranscribe,
wavPath: ffmpegResult.wavPath,
file: fileToTranscribe,
numberOfThreads,
config.app.stage === 'PROD' ? 'medium' : 'tiny',
model: config.app.stage === 'PROD' ? 'medium' : 'tiny',
};

const transcriptResult = await getTranscriptionText(
whisperBaseParams,
job.languageCode,
job.translate,
combineTranscribeAndTranslate,
);

// if we've received an interrupt signal we don't want to perform a half-finished transcript upload/publish as
Expand All @@ -272,6 +288,17 @@ const pollTranscriptionQueue = async (
transcriptResult.transcripts,
);

if (
combineTranscribeAndTranslate &&
transcriptResult.transcriptTranslations &&
job.translationOutputBucketUrls
) {
await uploadAllTranscriptsToS3(
job.translationOutputBucketUrls,
transcriptResult.transcriptTranslations,
);
}

const outputBucketKeys: OutputBucketKeys = {
srt: outputBucketUrls.srt.key,
json: outputBucketUrls.json.key,
Expand All @@ -288,17 +315,23 @@ const pollTranscriptionQueue = async (
userEmail: job.userEmail,
originalFilename: job.originalFilename,
outputBucketKeys,
translationOutputBucketKeys: job.translationOutputBucketUrls &&
transcriptResult.transcriptTranslations && {
srt: job.translationOutputBucketUrls.srt.key,
json: job.translationOutputBucketUrls.json.key,
text: job.translationOutputBucketUrls.text.key,
},
isTranslation: job.translate,
};

await publishTranscriptionOutput(
sqsClient,
config.app.destinationQueueUrls.transcriptionService,
config.app.destinationQueueUrls[job.transcriptDestinationService],
transcriptionOutput,
);

logger.info(
'Worker successfully transcribed the file and sent notification to sns',
`Worker successfully transcribed the file and sent notification to ${job.transcriptDestinationService} output queue`,
{
id: transcriptionOutput.id,
filename: transcriptionOutput.originalFilename,
Expand Down Expand Up @@ -333,7 +366,7 @@ const pollTranscriptionQueue = async (
if (receiveCount >= MAX_RECEIVE_COUNT) {
await publishTranscriptionOutputFailure(
sqsClient,
config.app.destinationQueueUrls.transcriptionService,
config.app.destinationQueueUrls[job.transcriptDestinationService],
job,
);
}
Expand Down
Loading

0 comments on commit 34928a7

Please sign in to comment.