From 9ebe1b95dc87b97d293a4a0120204899d3ba23f2 Mon Sep 17 00:00:00 2001 From: Aleks Todorov Date: Fri, 1 Mar 2024 13:08:40 +0000 Subject: [PATCH] [WIP] Video uploads --- backend-rs/.env.toml | 1 + backend-rs/Cargo.lock | 63 +++++ backend-rs/Cargo.toml | 13 ++ backend-rs/entity/src/id.rs | 6 + backend-rs/entity/src/video.rs | 4 +- backend-rs/fs-file/src/lib.rs | 10 +- backend-rs/streamfox/Cargo.toml | 6 +- backend-rs/streamfox/src/codec.rs | 215 ++++++++++++++++++ backend-rs/streamfox/src/config.rs | 1 + .../streamfox/src/controllers/errors.rs | 47 +++- backend-rs/streamfox/src/controllers/video.rs | 117 +++++++++- backend-rs/streamfox/src/main.rs | 18 ++ backend-rs/streamfox/src/models/video.rs | 27 ++- 13 files changed, 519 insertions(+), 9 deletions(-) create mode 100644 backend-rs/streamfox/src/codec.rs diff --git a/backend-rs/.env.toml b/backend-rs/.env.toml index 62dd418..8805763 100644 --- a/backend-rs/.env.toml +++ b/backend-rs/.env.toml @@ -1,5 +1,6 @@ [config.app] config_root = "streamfox_home_local/config" +data_root = "streamfox_home_local/data" host = [0, 0, 0, 0] port = 8601 # Half a year, in seconds diff --git a/backend-rs/Cargo.lock b/backend-rs/Cargo.lock index b4b5d34..7b57aa3 100644 --- a/backend-rs/Cargo.lock +++ b/backend-rs/Cargo.lock @@ -207,6 +207,7 @@ dependencies = [ "bytes", "cookie", "futures-util", + "headers", "http", "http-body", "http-body-util", @@ -218,6 +219,21 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum-range" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1c30398a7f716ebdd7f3c8a4f7a7a6df48a30e002007fd57b2a7a00fac864bd" +dependencies = [ + "axum", + "axum-extra", + "bytes", + "futures", + "http-body", + "pin-project", + "tokio", +] + [[package]] name = "backtrace" version = "0.3.69" @@ -882,6 +898,30 @@ dependencies = [ "hashbrown 0.14.3", ] +[[package]] +name = "headers" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "322106e6bd0cba2d5ead589ddb8150a13d7c4217cf80d7c4f682ca994ccc6aa9" +dependencies = [ + "base64", + "bytes", + "headers-core", + "http", + "httpdate", + "mime", + "sha1", +] + +[[package]] +name = "headers-core" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54b4a22553d4242c49fddb9ba998a99962b5cc6f22cb5a3482bec22522403ce4" +dependencies = [ + "http", +] + [[package]] name = "heck" version = "0.4.1" @@ -2023,6 +2063,15 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde-this-or-that" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "634c5a3cb041e56cc2964386151c67d520f845445789da3bd46bfb1c94f5e3bb" +dependencies = [ + "serde", +] + [[package]] name = "serde-toml-merge" version = "0.3.4" @@ -2145,6 +2194,15 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "signal-hook-registry" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1" +dependencies = [ + "libc", +] + [[package]] name = "signature" version = "2.2.0" @@ -2453,6 +2511,7 @@ version = "0.0.0" dependencies = [ "axum", "axum-extra", + "axum-range", "base64", "bcrypt", "bs58", @@ -2462,6 +2521,7 @@ dependencies = [ "entity", "fs", "fs-file", + "futures", "jsonwebtoken", "lazy_static", "migration", @@ -2470,11 +2530,13 @@ dependencies = [ "sea-orm", "sea-orm-migration", "serde", + "serde-this-or-that", "serde_json", "serde_with", "thiserror", "time", "tokio", + "tokio-util", "toml-env", "tower-http", "tracing", @@ -2657,6 +2719,7 @@ dependencies = [ "mio", "num_cpus", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys 0.48.0", diff --git a/backend-rs/Cargo.toml b/backend-rs/Cargo.toml index cd49393..5392742 100644 --- a/backend-rs/Cargo.toml +++ b/backend-rs/Cargo.toml @@ -1,3 +1,16 @@ [workspace] resolver = "2" members = ["streamfox", "migration", "entity", "fs", "fs-file"] + +[profile.release] +opt-level = 3 +debug = "full" +# split-debuginfo = '...' # Platform-specific. +strip = "none" +debug-assertions = false +overflow-checks = false +lto = "fat" +panic = "unwind" +incremental = true +codegen-units = 16 +rpath = false diff --git a/backend-rs/entity/src/id.rs b/backend-rs/entity/src/id.rs index e7b60f9..20ebef3 100644 --- a/backend-rs/entity/src/id.rs +++ b/backend-rs/entity/src/id.rs @@ -49,6 +49,12 @@ impl Display for Id { } } +impl From for String { + fn from(value: Id) -> Self { + value.to_string() + } +} + impl FromStr for Id { type Err = bs58::decode::Error; diff --git a/backend-rs/entity/src/video.rs b/backend-rs/entity/src/video.rs index 0341444..1471b5e 100644 --- a/backend-rs/entity/src/video.rs +++ b/backend-rs/entity/src/video.rs @@ -2,7 +2,9 @@ use crate::id::Id; use sea_orm::entity::prelude::*; use serde::{Deserialize, Serialize}; -#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] +#[derive( + Clone, Debug, PartialEq, Eq, PartialOrd, EnumIter, DeriveActiveEnum, Serialize, Deserialize, +)] #[sea_orm(rs_type = "i16", db_type = "SmallInteger")] #[serde(untagged)] pub enum Status { diff --git a/backend-rs/fs-file/src/lib.rs b/backend-rs/fs-file/src/lib.rs index 6dc008d..85821aa 100644 --- a/backend-rs/fs-file/src/lib.rs +++ b/backend-rs/fs-file/src/lib.rs @@ -1,5 +1,5 @@ use std::path::{Path, PathBuf}; -use tokio::fs::{self, File as TokioFile}; +use tokio::fs::{self, File as TokioFile, OpenOptions}; #[derive(Debug)] pub struct File { @@ -24,4 +24,12 @@ impl File { TokioFile::create(&self.path).await } + + pub async fn open(&self) -> std::io::Result { + if self.path.exists() { + OpenOptions::new().append(true).open(&self.path).await + } else { + self.create().await + } + } } diff --git a/backend-rs/streamfox/Cargo.toml b/backend-rs/streamfox/Cargo.toml index e571ba0..e42a861 100644 --- a/backend-rs/streamfox/Cargo.toml +++ b/backend-rs/streamfox/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" [dependencies] axum = "0.7.4" axum-extra = { version = "0.9.2", features = ["cookie"] } +axum-range = { version = "0.4.0", default-features = false } base64 = "0.21.7" bcrypt = "0.15.0" bs58 = { version = "0.5.0", default-features = false } @@ -15,6 +16,7 @@ convert_case = "0.6.0" entity = { version = "0.0.0", path = "../entity", default-features = false } fs = { version = "0.0.0", path = "../fs", default-features = false } fs-file = { version = "0.0.0", path = "../fs-file", default-features = false } +futures = { version = "0.3.30", default-features = false } jsonwebtoken = { version = "9.2.0", default-features = false } lazy_static = "1.4.0" migration = { version = "0.0.0", path = "../migration", default-features = false } @@ -23,11 +25,13 @@ ring = "0.17.7" sea-orm = { version = "0.12.14", features = ["runtime-tokio-rustls", "sqlx-postgres", "debug-print"] } sea-orm-migration = { version = "0.12.14", default-features = false } serde = { version = "1.0.196", features = ["derive"] } +serde-this-or-that = { version = "0.4.2", default-features = false } serde_json = "1.0.113" serde_with = { version = "3.6.0", features = ["chrono_0_4"] } thiserror = "1.0.56" time = "0.3.34" -tokio = { version = "1.36.0", features = ["rt-multi-thread"] } +tokio = { version = "1.36.0", features = ["rt-multi-thread", "process"] } +tokio-util = { version = "0.7.10", default-features = false, features = ["io"] } toml-env = "1.1.1" tower-http = { version = "0.5.1", features = ["trace"] } tracing = "0.1.40" diff --git a/backend-rs/streamfox/src/codec.rs b/backend-rs/streamfox/src/codec.rs new file mode 100644 index 0000000..f946043 --- /dev/null +++ b/backend-rs/streamfox/src/codec.rs @@ -0,0 +1,215 @@ +use crate::MainFs; +use chrono::Duration; +use lazy_static::lazy_static; +use serde::Deserialize; +use serde_this_or_that::as_f64; +use std::collections::HashMap; +use std::num::ParseFloatError; +use std::string::FromUtf8Error; +use thiserror::Error; +use tokio::process::Command; + +lazy_static! { + static ref FORMAT_TO_MIME: HashMap<&'static str, &'static str> = HashMap::from([ + ("3dostr", "application/vnd.pg.format"), + ("3g2", "video/3gpp2"), + ("3gp", "video/3gpp"), + ("4xm", "audio/x-adpcm"), + ("a64", "application/octet-stream"), + ("aa", "application/octet-stream"), + ("aac", "audio/aac"), + ("ac3", "audio/x-ac3"), + ("acm", "application/octet-stream"), + ("adts", "audio/aac"), + ("aiff", "audio/aiff"), + ("amr", "audio/amr"), + ("apng", "image/png"), + ("asf", "video/x-ms-asf"), + ("asf_stream", "video/x-ms-asf"), + ("ass", "text/x-ass"), + ("au", "audio/basic"), + ("avi", "video/x-msvideo"), + ("avm2", "application/x-shockwave-flash"), + ("bin", "application/octet-stream"), + ("bit", "audio/bit"), + ("caf", "audio/x-caf"), + ("dts", "audio/x-dca"), + ("dvd", "video/mpeg"), + ("eac3", "audio/x-eac3"), + ("f4v", "application/f4v"), + ("flac", "audio/x-flac"), + ("flv", "video/x-flv"), + ("g722", "audio/G722"), + ("g723_1", "audio/g723"), + ("gif", "image/gif"), + ("gsm", "audio/x-gsm"), + ("h261", "video/x-h261"), + ("h263", "video/x-h263"), + ("hls", "application/x-mpegURL"), + ("hls,applehttp", "application/x-mpegURL"), + ("ico", "image/vnd.microsoft.icon"), + ("ilbc", "audio/iLBC"), + ("ipod", "video/mp4"), + ("ismv", "video/mp4"), + ("jacosub", "text/x-jacosub"), + ("jpeg_pipe", "image/jpeg"), + ("jpegls_pipe", "image/jpeg"), + ("latm", "audio/MP4A-LATM"), + ("live_flv", "video/x-flv"), + ("m4v", "video/x-m4v"), + ("matroska", "video/x-matroska"), + ("matroska,webm", "video/webm"), + ("microdvd", "text/x-microdvd"), + ("mjpeg", "video/x-mjpeg"), + ("mjpeg_2000", "video/x-mjpeg"), + ("mmf", "application/vnd.smaf"), + ("mov,mp4,m4a,3gp,3g2,mj2", "video/mp4"), + ("mp2", "audio/mpeg"), + ("mp3", "audio/mpeg"), + ("mp4", "video/mp4"), + ("mpeg", "video/mpeg"), + ("mpeg1video", "video/mpeg"), + ("mpeg2video", "video/mpeg"), + ("mpegts", "video/MP2T"), + ("mpegtsraw", "video/MP2T"), + ("mpegvideo", "video/mpeg"), + ("mpjpeg", "multipart/x-mixed-replace"), + ("mxf", "application/mxf"), + ("mxf_d10", "application/mxf"), + ("mxf_opatom", "application/mxf"), + ("nut", "video/x-nut"), + ("oga", "audio/ogg"), + ("ogg", "application/ogg"), + ("ogv", "video/ogg"), + ("oma", "audio/x-oma"), + ("opus", "audio/ogg"), + ("rm", "application/vnd.rn-realmedia"), + ("singlejpeg", "image/jpeg"), + ("smjpeg", "image/jpeg"), + ("spx", "audio/ogg"), + ("srt", "application/x-subrip"), + ("sup", "application/x-pgs"), + ("svcd", "video/mpeg"), + ("swf", "application/x-shockwave-flash"), + ("tta", "audio/x-tta"), + ("vcd", "video/mpeg"), + ("vob", "video/mpeg"), + ("voc", "audio/x-voc"), + ("wav", "audio/x-wav"), + ("webm", "video/webm"), + ("webm_chunk", "video/webm"), + ("webm_dash_manifest", "application/xml"), + ("webp", "image/webp"), + ("webvtt", "text/vtt"), + ("wv", "audio/x-wavpack"), + ]); +} + +#[derive(Deserialize)] +struct ProbeOutput<'a> { + #[serde(borrow)] + format: Format<'a>, +} + +#[derive(Deserialize)] +struct Format<'a> { + #[serde(borrow, rename = "format_name")] + name: &'a str, + #[serde(deserialize_with = "as_f64")] + duration: f64, +} + +pub struct Probe { + pub mime_type: String, + pub duration: Duration, +} + +#[derive(Error, Debug)] +pub enum ProbeError { + #[error("could not run ffprobe: `{0}`")] + Command(#[from] std::io::Error), + #[error("could not parse ffprobe error output as UTF-8: `{0}`")] + ParseErrorStream(#[from] FromUtf8Error), + #[error("ffprobe failed to run: `{0}`")] + FfProbeFailed(String), + #[error("could not parse probe output: `{0}`")] + ParseJson(#[from] serde_json::Error), + #[error("could not parse duration float: `{0}`")] + ParseFloat(#[from] ParseFloatError), + #[error("format `{0}` is not supported")] + UnsupportedFormat(String), +} + +pub async fn probe(fs: &MainFs) -> Result { + let output = Command::new("ffprobe") + .arg("-loglevel") + .arg("error") + .arg("-show_format") + .arg("-print_format") + .arg("json") + .arg(fs.video_stream().path().as_os_str()) + .output() + .await?; + + if !output.status.success() { + return Err(ProbeError::FfProbeFailed(String::from_utf8(output.stderr)?)); + } + + let probe: ProbeOutput = serde_json::from_slice(&output.stdout)?; + let format = probe.format; + + if let Some(mime_type) = FORMAT_TO_MIME.get(format.name) { + Ok(Probe { + mime_type: (*mime_type).into(), + duration: Duration::seconds(format.duration.ceil() as i64), + }) + } else { + Err(ProbeError::UnsupportedFormat(format.name.into())) + } +} + +#[derive(Error, Debug)] +pub enum GenerateThumbnailError { + #[error("could not run ffmpeg: `{0}`")] + Command(#[from] std::io::Error), + #[error("could not parse ffmpeg error output as UTF-8: `{0}`")] + ParseErrorStream(#[from] FromUtf8Error), + #[error("ffmpeg failed to run: `{0}`")] + FfMpegFailed(String), +} + +pub async fn generate_thumbnail(fs: &MainFs, probe: &Probe) -> Result<(), GenerateThumbnailError> { + const SEEK_FRACTION: f64 = 0.2; + + let output = Command::new("ffmpeg") + .arg("-loglevel") + .arg("error") + .arg("-y") + .arg("-ss") + .arg( + (SEEK_FRACTION * (probe.duration.num_seconds() as f64)) + .floor() + .to_string(), + ) + .arg("-i") + .arg(fs.video_stream().path().as_os_str()) + .arg("-vframes") + .arg("1") + .arg("-q:v") + .arg("2") + .arg("-vf") + .arg("scale=416:234:force_original_aspect_ratio=decrease,pad=416:234:-1:-1:color=black") + .arg("-f") + .arg("mjpeg") + .arg(fs.video_thumbnail().path().as_os_str()) + .output() + .await?; + + if !output.status.success() { + return Err(GenerateThumbnailError::FfMpegFailed(String::from_utf8( + output.stderr, + )?)); + } + + Ok(()) +} diff --git a/backend-rs/streamfox/src/config.rs b/backend-rs/streamfox/src/config.rs index 85a1578..01ed300 100644 --- a/backend-rs/streamfox/src/config.rs +++ b/backend-rs/streamfox/src/config.rs @@ -21,6 +21,7 @@ pub struct Config { #[derive(Serialize, Deserialize)] pub struct App { pub config_root: String, + pub data_root: String, pub host: Ipv4Array, pub port: Port, #[serde_as(as = "DurationSeconds")] diff --git a/backend-rs/streamfox/src/controllers/errors.rs b/backend-rs/streamfox/src/controllers/errors.rs index 780fbf3..2074fa0 100644 --- a/backend-rs/streamfox/src/controllers/errors.rs +++ b/backend-rs/streamfox/src/controllers/errors.rs @@ -1,6 +1,7 @@ +use crate::codec::{GenerateThumbnailError, ProbeError}; use crate::controllers::user::AuthError; use crate::models::user; -use axum::extract::rejection::JsonRejection; +use axum::extract::rejection::{JsonRejection, PathRejection}; use axum::extract::{FromRequest, Request}; use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; @@ -54,8 +55,15 @@ impl IntoResponse for Errors { } } +// #[derive(EnumString)] +pub enum Object { + Video, +} + #[derive(Error, Debug)] pub enum HandlerError { + #[error("Could not parse IDs in URL path.")] + ParseUrlPath(#[from] PathRejection), #[error(transparent)] JsonRejection(#[from] JsonRejection), #[error(transparent)] @@ -68,14 +76,34 @@ pub enum HandlerError { InvalidCredentials, #[error("User cannot be logged into.")] ImpossibleLogin, + #[error("Video overwriting is not allowed once upload has completed.")] + OverwriteVideo, + #[error("Content-Range header must specify the content start, end, and size.")] + InvalidRange, + #[error("Content-Range size does not match body size.")] + BodyRangeNoMatch, + #[error("Content-Range header must specify the same total size for every request.")] + InconsistentRange, + #[error("Could not probe video details.")] + ProbeVideo(#[from] ProbeError), + #[error("Could not generate video thumbnail.")] + GenerateThumbnail(#[from] GenerateThumbnailError), #[error("No user was logged in but a user is required: {0}.")] UserRequired(#[from] AuthError), + #[error("You do not own this object.")] + NotAnOwner, + + #[error("Could not find object.")] + ObjectNotFound, + #[error("Could not convert between integers.")] ConvertIntegers(#[from] TryFromIntError), #[error("Database transaction failed.")] Database(#[from] DbErr), + #[error("Input / output operation failed.")] + Io(#[from] std::io::Error), #[error("Could not create user.")] CreateUser(#[from] user::CreateError), #[error("Could not validate credentials.")] @@ -126,7 +154,10 @@ fn format_error_messages(field: &str, errors: ValidationErrorsKind) -> Vec Response { + tracing::debug!("Encountered {:?}", &self); + match self { + HandlerError::ParseUrlPath(_) => self.into_generic(StatusCode::BAD_REQUEST), HandlerError::JsonRejection(_) => self.into_generic(StatusCode::BAD_REQUEST), HandlerError::Validation(validation_errors) => (StatusCode::BAD_REQUEST, { let mut errors = FAILED_VALIDATION.clone(); @@ -139,9 +170,19 @@ impl IntoResponse for HandlerError { HandlerError::EmailTaken => self.failed_validation(StatusCode::BAD_REQUEST, "emailAddress"), HandlerError::InvalidCredentials => self.into_generic(StatusCode::BAD_REQUEST), HandlerError::ImpossibleLogin => self.into_generic(StatusCode::BAD_REQUEST), + HandlerError::OverwriteVideo => self.into_generic(StatusCode::BAD_REQUEST), + HandlerError::InvalidRange => self.into_generic(StatusCode::BAD_REQUEST), + HandlerError::BodyRangeNoMatch => self.into_generic(StatusCode::BAD_REQUEST), + HandlerError::InconsistentRange => self.into_generic(StatusCode::BAD_REQUEST), + HandlerError::ProbeVideo(_) => self.into_generic(StatusCode::BAD_REQUEST), + HandlerError::GenerateThumbnail(_) => self.into_generic(StatusCode::BAD_REQUEST), HandlerError::UserRequired(_) => self.into_generic(StatusCode::UNAUTHORIZED), + HandlerError::NotAnOwner => self.into_generic(StatusCode::FORBIDDEN), + + HandlerError::ObjectNotFound => self.into_generic(StatusCode::NOT_FOUND), + HandlerError::ConvertIntegers(ref inner) => { error!("Convert integers: {}", inner); self.into_generic(StatusCode::INTERNAL_SERVER_ERROR) @@ -150,6 +191,10 @@ impl IntoResponse for HandlerError { error!("Database: {}", inner); self.into_generic(StatusCode::INTERNAL_SERVER_ERROR) } + HandlerError::Io(ref inner) => { + error!("IO failed: {}", inner); + self.into_generic(StatusCode::INTERNAL_SERVER_ERROR) + } HandlerError::CreateUser(ref inner) => { error!("Create user: {}", inner); self.into_generic(StatusCode::INTERNAL_SERVER_ERROR) diff --git a/backend-rs/streamfox/src/controllers/video.rs b/backend-rs/streamfox/src/controllers/video.rs index 8ea6b90..a9f4880 100644 --- a/backend-rs/streamfox/src/controllers/video.rs +++ b/backend-rs/streamfox/src/controllers/video.rs @@ -1,16 +1,27 @@ -use crate::controllers::errors::HandlerError; +use crate::controllers::errors::{HandlerError, Object}; use crate::controllers::user::UserResponse; use crate::models::user::User; use crate::models::video; -use crate::AppState; -use axum::extract::State; +use crate::{codec, AppState, MainFsVar}; +use axum::body::{Body, HttpBody}; +use axum::extract::{Path, Request, State}; use axum::http::StatusCode; +use axum::middleware::Next; +use axum::response::Response; use axum::{Extension, Json}; +use axum_extra::headers::ContentRange; +use axum_extra::TypedHeader; +use cascade::cascade; use chrono::{DateTime, Utc}; use entity::id::Id; -use entity::video::Visibility; +use entity::video::{ActiveModel, Status, Visibility}; +use futures::TryStreamExt; +use sea_orm::{ActiveModelBehavior, ActiveValue}; use serde::Serialize; +use std::io::SeekFrom; use std::sync::Arc; +use tokio::io::{self, AsyncSeekExt, AsyncWriteExt, BufWriter}; +use tokio_util::io::StreamReader; #[derive(Serialize)] #[serde(rename_all = "camelCase")] @@ -76,3 +87,101 @@ pub async fn create_video( }), )) } + +pub async fn extract( + State(state): State, + Path(video_id): Path, + mut request: Request, + next: Next, +) -> Result { + let video = video::find(&state.connection, video_id) + .await? + .ok_or(HandlerError::ObjectNotFound)?; + + request.extensions_mut().insert(Arc::new(video)); + + Ok(next.run(request).await) +} + +pub async fn require_owner( + Extension(user): Extension>, + Extension(video): Extension>, + request: Request, + next: Next, +) -> Result { + if video.0.creator_id == user.id { + Ok(next.run(request).await) + } else { + Err(HandlerError::NotAnOwner) + } +} + +pub async fn upload_video( + State(state): State, + Extension(video): Extension>, + TypedHeader(range): TypedHeader, + body: Body, +) -> Result { + let video = &video.0; + + if video.status > Status::Uploading { + return Err(HandlerError::OverwriteVideo); + } + + let data_stream = body.into_data_stream(); + + let (start, end) = range.bytes_range().ok_or(HandlerError::InvalidRange)?; + let current_len = end - start + 1; + let total_len = range.bytes_len().ok_or(HandlerError::InvalidRange)?; + + if (current_len < data_stream.size_hint().lower()) + || data_stream + .size_hint() + .upper() + .is_some_and(|real_len| current_len != real_len) + { + return Err(HandlerError::BodyRangeNoMatch); + } + + let mut active = ActiveModel::new(); + active.id = ActiveValue::set(video.id); + + if start == 0 { + active.status = ActiveValue::set(Status::Uploading); + active.size_bytes = ActiveValue::set(total_len.try_into()?); + } else if total_len != (video.size_bytes as u64) { + return Err(HandlerError::InconsistentRange); + } + + let fs = cascade! { + state.fs.clone(); + ..set(MainFsVar::VideoId, video.id); + }; + + { + let mut file = fs.video_stream().open().await?; + file.seek(SeekFrom::Start(start)).await?; + + let mut reader = + StreamReader::new(data_stream.map_err(|err| io::Error::new(io::ErrorKind::Other, err))); + let mut writer = BufWriter::new(file); + + io::copy(&mut reader, &mut writer).await?; + writer.flush().await?; + } + + if (end + 1) < total_len { + video::update(&state.connection, active).await?; + return Ok(StatusCode::ACCEPTED); + } + + let probe = codec::probe(&fs).await?; + codec::generate_thumbnail(&fs, &probe).await?; + + active.mime_type = ActiveValue::set(probe.mime_type); + active.duration_secs = ActiveValue::set(probe.duration.num_seconds().try_into()?); + active.status = ActiveValue::set(Status::Complete); + video::update(&state.connection, active).await?; + + Ok(StatusCode::NO_CONTENT) +} diff --git a/backend-rs/streamfox/src/main.rs b/backend-rs/streamfox/src/main.rs index 97c702d..84f57e3 100644 --- a/backend-rs/streamfox/src/main.rs +++ b/backend-rs/streamfox/src/main.rs @@ -1,3 +1,4 @@ +mod codec; mod config; mod controllers; mod models; @@ -27,6 +28,11 @@ filesystem!( r" streamfox_default_password #streamfox_default_password + + videos + + stream #video_stream + thumbnail #video_thumbnail " ); @@ -52,6 +58,7 @@ type AppState = Arc; struct State { config: Config, + fs: MainFs, connection: DatabaseConnection, snowflakes: Snowflakes, } @@ -67,6 +74,7 @@ async fn main() -> Result<(), AppError> { let fs = cascade! { MainFs::new(); ..set(MainFsVar::ConfigRoot, config.app.config_root.clone()); + ..set(MainFsVar::DataRoot, config.app.data_root.clone()); }; tracing_subscriber::fmt() @@ -93,6 +101,7 @@ async fn main() -> Result<(), AppError> { let state = Arc::new(State { config, + fs, connection, snowflakes: Snowflakes { user_snowflake: SnowflakeGenerator::new(0), @@ -107,6 +116,15 @@ async fn main() -> Result<(), AppError> { let authenticated = Router::new() .route("/user", routing::get(user::get_user)) .route("/videos", routing::post(video::create_video)) + .route( + "/videos/:video_id/stream", + routing::put(video::upload_video) + .layer(middleware::from_fn(video::require_owner)) + .layer(middleware::from_fn_with_state( + Arc::clone(&state), + video::extract, + )), + ) .layer(middleware::from_fn_with_state( Arc::clone(&state), user::extract, diff --git a/backend-rs/streamfox/src/models/video.rs b/backend-rs/streamfox/src/models/video.rs index 2d58596..f143dcd 100644 --- a/backend-rs/streamfox/src/models/video.rs +++ b/backend-rs/streamfox/src/models/video.rs @@ -3,7 +3,7 @@ use crate::models::user::User; use crate::Snowflakes; use chrono::Local; use entity::id::Id; -use entity::video::{Status, Visibility}; +use entity::video::{ActiveModel, Status, Visibility}; use entity::{user, video, view}; use sea_orm::prelude::DateTimeWithTimeZone; use sea_orm::sea_query::Expr; @@ -101,3 +101,28 @@ pub async fn create( .into(), ) } + +pub async fn update(connection: &DatabaseConnection, mut video: ActiveModel) -> Result<(), DbErr> { + video.updated_at = ActiveValue::set(Local::now().fixed_offset()); + video.update(connection).await?; + Ok(()) +} + +pub async fn find( + connection: &DatabaseConnection, + id: Id, +) -> Result, DbErr> { + video::Entity::find_by_id(id) + .find_also_related(user::Entity) + .one(connection) + .await? + .map(|(video, user)| { + Ok(( + video, + user + .ok_or_else(|| DbErr::Custom("video creator was empty".into()))? + .into(), + )) + }) + .transpose() +}