diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 9348a3f58..02c8436e4 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -54,6 +54,16 @@ jobs: run: | bazel test --test_tag_filters=-lint //... + - name: Cleanup space + run: | + df -h + sudo apt-get autoremove -y + sudo apt-get clean + docker images prune -a + sudo rm -rf /usr/local/share/powershell + sudo rm -rf /opt/hostedtoolcache + df -h + - name: Build all artifacts run: | bazel build //... diff --git a/backend/components/schema-registry-manager/Dockerfile b/backend/components/schema-registry-manager/Dockerfile new file mode 100644 index 000000000..e06995486 --- /dev/null +++ b/backend/components/schema-registry-manager/Dockerfile @@ -0,0 +1,19 @@ +FROM node:18 + +WORKDIR /app + +COPY ./src/package*.json ./ +COPY ./src/tsconfig*.json ./ + +RUN npm install +RUN npm install typescript -g + +COPY ./src/app.ts ./ +COPY ./src/types.ts ./ +COPY ./src/providers/karapace.ts ./providers/ + +RUN tsc + +EXPOSE 3000 + +CMD [ "node", "app.js" ] diff --git a/backend/components/schema-registry-manager/Makefile b/backend/components/schema-registry-manager/Makefile new file mode 100644 index 000000000..3aa78401e --- /dev/null +++ b/backend/components/schema-registry-manager/Makefile @@ -0,0 +1,6 @@ +build: + docker build -t schema-registry-manager . + +release: build + docker tag schema-registry-manager ghcr.io/airyhq/backend/schema-registry-manager:release + docker push ghcr.io/airyhq/backend/schema-registry-manager:release diff --git a/backend/components/schema-registry-manager/helm/BUILD b/backend/components/schema-registry-manager/helm/BUILD new file mode 100644 index 000000000..8d6495211 --- /dev/null +++ b/backend/components/schema-registry-manager/helm/BUILD @@ -0,0 +1,3 @@ +load("//tools/build:helm.bzl", "helm_ruleset_core_version") + +helm_ruleset_core_version() \ No newline at end of file diff --git a/backend/components/schema-registry-manager/helm/Chart.yaml b/backend/components/schema-registry-manager/helm/Chart.yaml new file mode 100644 index 000000000..7205b553c --- /dev/null +++ b/backend/components/schema-registry-manager/helm/Chart.yaml @@ -0,0 +1,5 @@ +apiVersion: v2 +appVersion: "1.0" +description: Schema registry component to manage different providers +name: schema-registry-manager +version: 1.0 diff --git a/backend/components/schema-registry-manager/helm/templates/configmap.yaml b/backend/components/schema-registry-manager/helm/templates/configmap.yaml new file mode 100644 index 000000000..05de4d589 --- /dev/null +++ b/backend/components/schema-registry-manager/helm/templates/configmap.yaml @@ -0,0 +1,10 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ .Values.component }} + labels: + core.airy.co/managed: "true" + core.airy.co/mandatory: "{{ .Values.mandatory }}" + core.airy.co/component: "{{ .Values.component }}" + annotations: + core.airy.co/enabled: "{{ .Values.enabled }}" diff --git a/backend/components/schema-registry-manager/helm/templates/deployment.yaml b/backend/components/schema-registry-manager/helm/templates/deployment.yaml new file mode 100644 index 000000000..b90214517 --- /dev/null +++ b/backend/components/schema-registry-manager/helm/templates/deployment.yaml @@ -0,0 +1,48 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ .Values.component }} + labels: + app: {{ .Values.component }} + core.airy.co/managed: "true" + core.airy.co/mandatory: "{{ .Values.mandatory }}" + core.airy.co/component: {{ .Values.component }} +spec: + replicas: {{ if .Values.enabled }} 1 {{ else }} 0 {{ end }} + selector: + matchLabels: + app: {{ .Values.component }} + strategy: + rollingUpdate: + maxSurge: 1 + maxUnavailable: 1 + type: RollingUpdate + template: + metadata: + labels: + app: {{ .Values.component }} + spec: + containers: + - name: app + image: "ghcr.io/airyhq/{{ .Values.image }}:{{ .Values.imageTag }}" + imagePullPolicy: Always + envFrom: + - configMapRef: + name: security + - configMapRef: + name: kafka-config + - configMapRef: + name: {{ .Values.component }} + env: + - name: KAFKA_TOPIC_NAME + value: {{ .Values.kafka.topic }} + livenessProbe: + httpGet: + path: /actuator/health + port: {{ .Values.port }} + httpHeaders: + - name: Health-Check + value: health-check + initialDelaySeconds: 43200 + periodSeconds: 10 + failureThreshold: 3 diff --git a/backend/components/schema-registry-manager/helm/templates/service.yaml b/backend/components/schema-registry-manager/helm/templates/service.yaml new file mode 100644 index 000000000..4d636e8b2 --- /dev/null +++ b/backend/components/schema-registry-manager/helm/templates/service.yaml @@ -0,0 +1,15 @@ +apiVersion: v1 +kind: Service +metadata: + name: {{ .Values.component }} + labels: + app: {{ .Values.component }} +spec: + type: ClusterIP + clusterIP: None + ports: + - name: {{ .Values.component }} + port: 80 + targetPort: {{ .Values.port }} + selector: + app: {{ .Values.component }} diff --git a/backend/components/schema-registry-manager/helm/values.yaml b/backend/components/schema-registry-manager/helm/values.yaml new file mode 100644 index 000000000..200ef3ca3 --- /dev/null +++ b/backend/components/schema-registry-manager/helm/values.yaml @@ -0,0 +1,9 @@ +component: schema-registry-manager +mandatory: false +enabled: false +image: backend/schema-registry-manager +imageTag: release +port: 3000 +resources: +kafka: + topic: application.communication.messages \ No newline at end of file diff --git a/backend/components/schema-registry-manager/src/app.ts b/backend/components/schema-registry-manager/src/app.ts new file mode 100644 index 000000000..fb7f384e3 --- /dev/null +++ b/backend/components/schema-registry-manager/src/app.ts @@ -0,0 +1,263 @@ +import dotenv from 'dotenv'; +import express, {Express, Request as ExpressRequest, Response as ExpressResponse} from 'express'; +import http from 'http'; +import cors from 'cors'; + +import {SchemaProvider} from './types'; +import { + checkCompatibilityOfNewSchema, + createSchema, + deleteSchema, + getLastMessage, + getSchemaInfo, + getSchemaVersions, + getSchemas, + updateSchema, +} from './providers/karapace'; + +dotenv.config(); + +const app: Express = express(); +const port = process.env.PORT || 3000; +const bodyParser = require('body-parser'); +const currentProvider: SchemaProvider = SchemaProvider.karapace; + +// Middleware +app.use(bodyParser.json()); + +// CORS options +const corsOptions = { + origin: 'http://localhost:8080', +}; + +// Use cors middleware with the specified options +app.use(cors(corsOptions)); + +app.get('/schemas.provider', (req: ExpressRequest, res: ExpressResponse) => { + res.status(200).send(currentProvider); +}); + +app.get('/schemas.list', (req: ExpressRequest, res: ExpressResponse) => { + switch (currentProvider) { + case SchemaProvider.karapace: + getSchemas(req.get('host') as string) + .then((response: string[]) => { + res.send(response); + }) + .catch((e: any) => { + res.status(500).send(e); + }); + break; + default: + res.status(404).send('Provider Not Found'); + break; + } +}); + +app.get('/schemas.versions', (req: ExpressRequest, res: ExpressResponse) => { + if (!req.query.topicName) { + res.status(400).send('Missing topicName'); + return; + } + + switch (currentProvider) { + case SchemaProvider.karapace: + getSchemaVersions(req.get('host') as string, req.query.topicName as string) + .then((response: any) => { + res.status(200).send(response); + }) + .catch((e: any) => { + res.status(500).send(e); + }); + break; + default: + res.status(404).send('Provider Not Found'); + break; + } +}); + +app.get('/schemas.info', (req: ExpressRequest, res: ExpressResponse) => { + if (!req.query.topicName) { + res.status(400).send('Missing topicName'); + return; + } + + let version = 'latest'; + if (req.query.version) { + version = req.query.version as string; + } + + switch (currentProvider) { + case SchemaProvider.karapace: + getSchemaInfo(req.get('host') as string, req.query.topicName as string, version) + .then((response: any) => { + res.status(200).send(response); + }) + .catch((e: any) => { + res.status(500).send(e); + }); + break; + default: + res.status(404).send('Provider Not Found'); + break; + } +}); + +app.post('/schemas.update', (req: ExpressRequest, res: ExpressResponse) => { + if (!req.query.topicName) { + res.status(400).send('Missing topicName'); + return; + } + if (!req.body.schema) { + res.status(400).send('Missing schema'); + return; + } + + switch (currentProvider) { + case SchemaProvider.karapace: + updateSchema(req.get('host') as string, req.query.topicName as string, req.body.schema as string) + .then((response: any) => { + res.status(200).send(response); + }) + .catch((e: any) => { + res.status(500).send(e); + }); + break; + default: + res.status(404).send('Provider Not Found'); + break; + } +}); + +app.post('/schemas.create', (req: ExpressRequest, res: ExpressResponse) => { + if (!req.query.topicName) { + res.status(400).send('Missing topicName'); + return; + } + if (!req.body.schema) { + res.status(400).send('Missing schema'); + return; + } + + switch (currentProvider) { + case SchemaProvider.karapace: + createSchema(req.get('host') as string, req.query.topicName as string, req.body.schema as string) + .then((response: any) => { + res.status(200).send(response); + }) + .catch((e: any) => { + res.status(500).send(e); + }); + break; + default: + res.status(404).send('Provider Not Found'); + break; + } +}); + +app.post('/schemas.compatibility', (req: ExpressRequest, res: ExpressResponse) => { + if (!req.query.topicName) { + res.status(400).send('Missing topicName'); + return; + } + if (!req.query.version) { + res.status(400).send('Missing version'); + return; + } + if (!req.body.schema) { + res.status(400).send('Missing schema'); + return; + } + + switch (currentProvider) { + case SchemaProvider.karapace: + checkCompatibilityOfNewSchema( + req.get('host') as string, + req.query.topicName as string, + req.body.schema as string, + req.query.version as string + ) + .then((response: any) => { + res.status(200).send(response); + }) + .catch((e: any) => { + res.status(500).send(e); + }); + break; + default: + res.status(404).send('Provider Not Found'); + break; + } +}); + +app.post('/schemas.delete', (req: ExpressRequest, res: ExpressResponse) => { + if (!req.query.topicName) { + res.status(400).send('Missing topicName'); + return; + } + + switch (currentProvider) { + case SchemaProvider.karapace: + deleteSchema(req.get('host') as string, req.query.topicName as string) + .then((response: any) => { + res.status(200).send(response); + }) + .catch((e: any) => { + res.status(500).send(e); + }); + break; + default: + res.status(404).send('Provider Not Found'); + break; + } +}); + +app.get('/schemas.lastMessage', (req: ExpressRequest, res: ExpressResponse) => { + if (!req.query.topicName) { + res.status(400).send('Missing topicName'); + return; + } + + switch (currentProvider) { + case SchemaProvider.karapace: + getLastMessage(req.get('host') as string, req.query.topicName as string) + .then((response: any) => { + res.status(200).send(response); + }) + .catch((e: any) => { + res.status(500).send(e); + }); + break; + default: + res.status(404).send('Provider Not Found'); + break; + } +}); + +async function startHealthcheck() { + const server = http.createServer((req: any, res: any) => { + if (req.url === '/actuator/health' && req.method === 'GET') { + const response = {status: 'UP'}; + const jsonResponse = JSON.stringify(response); + + res.writeHead(200, {'Content-Type': 'application/json'}); + res.end(jsonResponse); + } else { + res.writeHead(404, {'Content-Type': 'text/plain'}); + res.end('Not Found'); + } + }); + + server.listen(80, () => { + console.log('Health-check started'); + }); +} + +async function main() { + startHealthcheck(); + app.listen(port, () => { + console.log(`Server is running on http://localhost:${port}`); + }); +} + +main().catch(console.error); diff --git a/backend/components/schema-registry-manager/src/package.json b/backend/components/schema-registry-manager/src/package.json new file mode 100644 index 000000000..970c2ea5f --- /dev/null +++ b/backend/components/schema-registry-manager/src/package.json @@ -0,0 +1,17 @@ +{ + "dependencies": { + "@types/express": "^4.17.21", + "@types/node": "^20.10.3", + "cors": "^2.8.5", + "dotenv": "^16.4.2", + "express": "^4.18.2", + "node-fetch": "^2.6.1" + }, + "devDependencies": { + "@types/cors": "^2.8.17", + "@types/express": "^4.17.21", + "@types/node": "^20.10.3", + "ts-node": "^10.9.2", + "typescript": "^5.3.3" + } +} diff --git a/backend/components/schema-registry-manager/src/providers/karapace.ts b/backend/components/schema-registry-manager/src/providers/karapace.ts new file mode 100644 index 000000000..487aa31a8 --- /dev/null +++ b/backend/components/schema-registry-manager/src/providers/karapace.ts @@ -0,0 +1,121 @@ +export async function getSchemas(host: string) { + return getData(host, 'subjects').then(response => { + return response; + }); +} + +export async function getSchemaVersions(host: string, topicName: string) { + return getData(host, `subjects/${topicName}/versions`).then(response => { + if (response.error_code && response.error_code.toString().includes('404') && !topicName.includes('-value')) { + return Promise.reject('404 Not Found'); + } + return response; + }); +} + +export async function getSchemaInfo(host: string, topicName: string, version: string) { + return getData(host, `subjects/${topicName}/versions/${version}`).then(response => { + if (response.error_code && response.error_code.toString().includes('404') && !topicName.includes('-value')) { + return Promise.reject('404 Not Found'); + } + return response; + }); +} + +export async function updateSchema(host: string, topicName: string, schema: string) { + const body = { + schema: JSON.stringify({...JSON.parse(schema)}), + }; + return postData(host, `subjects/${topicName}/versions`, body).then(response => { + if (response.error_code && response.error_code.toString().includes('404') && !topicName.includes('-value')) { + return Promise.reject('404 Not Found'); + } + if (response.id) return response; + if (response.message) return Promise.reject(response.message); + return Promise.reject('Unknown Error'); + }); +} + +export async function createSchema(host: string, topicName: string, schema: string) { + const body = { + schema: JSON.stringify({...JSON.parse(schema)}), + }; + return postData(host, `subjects/${topicName}/versions`, body) + .then(response => { + if (response.id) return response; + if (response.message) return Promise.reject(response.message); + return Promise.reject('Unknown Error'); + }) + .catch(e => { + return Promise.reject(e); + }); +} + +export async function checkCompatibilityOfNewSchema(host: string, topicName: string, schema: string, version: string) { + const body = { + schema: JSON.stringify({...JSON.parse(schema)}), + }; + + return postData(host, `compatibility/subjects/${topicName}/versions/${version}`, body) + .then(response => { + if (response.error_code && response.error_code.toString().includes('404') && !topicName.includes('-value')) { + return Promise.reject('404 Not Found'); + } + if (response.is_compatible !== undefined) { + if (response.is_compatible === true) { + return response; + } + return Promise.reject('Schema Not Compatible'); + } + if (response.message) return Promise.reject(response.message); + return Promise.reject('Unknown Error'); + }) + .catch(e => { + return Promise.reject(e); + }); +} + +export async function deleteSchema(host: string, topicName: string) { + return deleteData(host, `subjects/${topicName}`).then(response => { + if (response.error_code && response.error_code.toString().includes('404') && !topicName.includes('-value')) { + return Promise.reject('404 Not Found'); + } + return response; + }); +} + +export async function getLastMessage(host: string, topicName: string) { + const body = { + ksql: `PRINT '${topicName}' FROM BEGINNING LIMIT 1;`, + streamsProperties: {}, + }; + return postData(host, 'query', body).then(response => { + return response; + }); +} + +async function getData(host: string, url: string) { + const response = await fetch('https://' + host + '/' + url, { + method: 'GET', + }); + return response.json(); +} + +async function deleteData(host: string, url: string) { + const response = await fetch('https://' + host + '/' + url, { + method: 'DELETE', + }); + return response.json(); +} + +async function postData(host: string, url: string, body: any) { + const response = await fetch('https://' + host + '/' + url, { + method: 'POST', + headers: { + 'Content-Type': 'application/vnd.schemaregistry.v1+json', + }, + body: JSON.stringify(body), + }); + + return response.json(); +} diff --git a/backend/components/schema-registry-manager/src/tsconfig.json b/backend/components/schema-registry-manager/src/tsconfig.json new file mode 100644 index 000000000..8975f6604 --- /dev/null +++ b/backend/components/schema-registry-manager/src/tsconfig.json @@ -0,0 +1,10 @@ +{ + "compilerOptions": { + "target": "ES2016", + "module": "commonjs", + "strict": true, + "esModuleInterop": true, + "skipLibCheck": true, + "forceConsistentCasingInFileNames": true + } +} \ No newline at end of file diff --git a/backend/components/schema-registry-manager/src/types.ts b/backend/components/schema-registry-manager/src/types.ts new file mode 100644 index 000000000..53776abb6 --- /dev/null +++ b/backend/components/schema-registry-manager/src/types.ts @@ -0,0 +1,4 @@ +export enum SchemaProvider { + karapace = 'karapace', + confluentCloud = 'confluent-cloud', +}