Skip to content

Commit

Permalink
Fixes loophole where a failed send would cause a journey to stall (#434)
Browse files Browse the repository at this point in the history
  • Loading branch information
pushchris authored Apr 6, 2024
1 parent cd1e255 commit 968ccdd
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ exports.up = async function(knex) {
.defaultTo('0')
})

await knex.raw('UPDATE campaign_sends SET reference_id = user_step_id, reference_type = "journal"')
await knex.raw('UPDATE campaign_sends SET reference_id = user_step_id, reference_type = "journey"')

await knex.schema.alterTable('campaign_sends', function(table) {
table.unique(['user_id', 'campaign_id', 'reference_id'])
Expand Down
11 changes: 8 additions & 3 deletions apps/platform/src/journey/JourneyStep.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import Rule from '../rules/Rule'
import { check } from '../rules/RuleEngine'
import App from '../app'
import { RRule } from 'rrule'
import { CampaignSend } from '../campaigns/Campaign'
import { CampaignSend, CampaignSendReferenceType } from '../campaigns/Campaign'
import { createEvent } from '../users/UserEventRepository'
import { JourneyState } from './JourneyState'

Expand Down Expand Up @@ -268,21 +268,26 @@ export class JourneyAction extends JourneyStep {
state.job(async () => {
let send_id = await getCampaignSend(campaign.id, state.user.id, `${userStep.id}`).then(s => s?.id)

const reference = {
reference_id: `${userStep.id}`,
reference_type: 'journey' as CampaignSendReferenceType,
}

if (!send_id) {
send_id = await CampaignSend.insert({
campaign_id: campaign.id,
user_id: state.user.id,
state: 'pending',
send_at: new Date(),
reference_id: `${userStep.id}`,
...reference,
})
}

return sendCampaignJob({
campaign,
user: state.user,
send_id,
reference_id: `${userStep.id}`,
...reference,
})
})
}
Expand Down
51 changes: 32 additions & 19 deletions apps/platform/src/providers/MessageTriggerService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,33 +145,28 @@ export const throttleSend = async (channel: Channel, points = 1): Promise<RateLi
)
}

export const notifyJourney = async (reference_id: string, response?: any) => {

const referenceId = parseInt(reference_id, 10)
// save response into user step
if (response) {
await JourneyUserStep.update(q => q.where('id', referenceId), {
data: {
response,
},
})
}

// trigger processing of this journey entrance
await JourneyProcessJob.from({ entrance_id: referenceId }).queue()
}

export const failSend = async ({ campaign, user, context }: MessageTriggerHydrated<TemplateType>, error: Error, shouldNotify = (_: any) => true) => {

// Update send record
await updateSendState({
campaign,
user,
reference_id: context.reference_id,
state: 'failed',
})

// Create an event on the failure
await createEvent(user, {
name: campaign.eventName('failed'),
data: { ...context, error },
}, true, ({ result, ...data }) => data)

// If this send is part of a journey, notify the journey
if (context.reference_id && context.reference_type === 'journey') {
await notifyJourney(context.reference_id)
}

// Notify of the error if it's a critical one
if (shouldNotify(error)) App.main.error.notify(error)
}

Expand All @@ -185,13 +180,31 @@ export const finalizeSend = async (data: MessageTriggerHydrated<TemplateType>, r
reference_id: context.reference_id,
})

// Create an event on the user about the email
// Create an event on the user about the send
await createEvent(user, {
name: campaign.eventName('sent'),
data: { ...context, result },
}, true, ({ result, ...data }) => data)

if (context.reference_id) {
await notifyJourney(context.reference_id)
// If this send is part of a journey, notify the journey
if (context.reference_id && context.reference_type === 'journey') {
await notifyJourney(context.reference_id, campaign.channel === 'webhook' ? result : undefined)
}
}

export const notifyJourney = async (reference_id: string, response?: any) => {

const referenceId = parseInt(reference_id, 10)

// Save response into user step
if (response) {
await JourneyUserStep.update(q => q.where('id', referenceId), {
data: {
response,
},
})
}

// Trigger processing of this journey entrance
await JourneyProcessJob.from({ entrance_id: referenceId }).queue()
}
10 changes: 0 additions & 10 deletions apps/platform/src/providers/webhook/WebhookJob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { WebhookTemplate } from '../../render/Template'
import { updateSendState } from '../../campaigns/CampaignService'
import { failSend, finalizeSend, loadSendJob, messageLock, prepareSend } from '../MessageTriggerService'
import { loadWebhookChannel } from '.'
import { JourneyUserStep } from '../../journey/JourneyStep'
import { releaseLock } from '../../core/Lock'

export default class WebhookJob extends Job {
Expand Down Expand Up @@ -39,15 +38,6 @@ export default class WebhookJob extends Job {
try {
const result = await channel.send(template, data)
await finalizeSend(data, result)

if (result.response
&& trigger.reference_id
&& trigger.reference_type === 'journey'
) {
await JourneyUserStep.update(q => q.where('id', trigger.reference_id), {
data: { response: result.response },
})
}
} catch (error: any) {
await failSend(data, error)
} finally {
Expand Down
1 change: 1 addition & 0 deletions apps/ui/public/locales/en.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"abort_campaign": "Abort Campaign",
"aborted": "Aborted",
"action": "Action",
"add_admin": "Add Admin",
"add_admin_description": "Add a new admin to this organization. Admins have full access to all projects and settings, members can only access projects they are a part of.",
"add_list": "Add List",
Expand Down
1 change: 1 addition & 0 deletions apps/ui/public/locales/es.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"abort_campaign": "Cancelar campaña",
"aborted": "Cancellado",
"action": "Acción",
"add_admin": "Añadir Admin",
"add_admin_description": "Añade un nuevo administrador a esta organización. Los administradores tienen acceso completo a todos los proyectos y ajustes, los miembros sólo pueden acceder a los proyectos de los que forman parte.",
"add_list": "Añadir lista",
Expand Down

0 comments on commit 968ccdd

Please sign in to comment.