Skip to content

Commit

Permalink
Init commit
Browse files Browse the repository at this point in the history
  • Loading branch information
o-tsaruk committed May 23, 2023
0 parents commit 75895db
Show file tree
Hide file tree
Showing 113 changed files with 8,545 additions and 0 deletions.
1 change: 1 addition & 0 deletions .envrc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
use flake
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[workspace]
members = ["crates/*"]

[workspace.package]
version = "0.1.0"
20 changes: 20 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright (c) 2023 Brushfam
//
// Permission is hereby granted, free of charge, to any person obtaining
// a copy of this software and associated documentation files (the"Software"),
// to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to
// permit persons to whom the Software is furnished to do so, subject to
// the following conditions:
//
// The above copyright notice and this permission notice shall be
// included in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
// LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
21 changes: 21 additions & 0 deletions crates/builder/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[package]
name = "builder"
version.workspace = true
edition = "2021"
publish = false

[dependencies]
anyhow = "1.0.71"
bollard = "0.14.0"
clap = { version = "4.2.7", features = ["derive"] }
derive_more = { version = "0.99.17", default-features = false, features = ["display", "error", "from"] }
futures-util = "0.3.28"
itertools = "0.10.5"
tar = "0.4.38"
tempfile = "3.5.0"
tracing = "0.1.37"
tokio = { version = "1.28.1", features = ["rt-multi-thread", "macros", "process", "sync"] }
tokio-stream = "0.1.14"

common = { path = "../common", features = ["logging", "s3"] }
db = { path = "../db" }
14 changes: 14 additions & 0 deletions crates/builder/src/cli.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use clap::{Parser, Subcommand};

#[derive(Parser)]
#[command(about, version)]
pub(crate) struct Cli {
#[command(subcommand)]
pub command: Command,
}

#[derive(Subcommand)]
pub(crate) enum Command {
/// Start processing new build sessions.
Serve,
}
3 changes: 3 additions & 0 deletions crates/builder/src/commands.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
mod serve;

pub use serve::serve;
51 changes: 51 additions & 0 deletions crates/builder/src/commands/serve.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use std::sync::Arc;

use bollard::{errors::Error, Docker};
use common::config;
use db::{DatabaseConnection, DbErr};
use derive_more::{Display, Error, From};
use futures_util::{stream::FuturesUnordered, FutureExt, StreamExt};
use tokio::sync::mpsc;
use tracing::{info, instrument};

use crate::{log_collector, process::worker};

#[derive(Display, Debug, From, Error)]
pub enum ServeError {
DbErr(DbErr),
}

#[instrument(skip_all, err)]
pub async fn serve(
builder_config: config::Builder,
storage_config: config::Storage,
database: DatabaseConnection,
) -> Result<(), Error> {
let builder_config = Arc::new(builder_config);
let storage_config = Arc::new(storage_config);
let docker = Arc::new(Docker::connect_with_socket_defaults()?);
let database = Arc::new(database);

info!("spawning log collector");
let (sender, receiver) = mpsc::unbounded_channel();
tokio::spawn(log_collector::collect_logs(database.clone(), receiver));

info!("started build session processing");

(0..builder_config.worker_count)
.map(|_| {
tokio::spawn(worker::spawn(
builder_config.clone(),
storage_config.clone(),
docker.clone(),
database.clone(),
sender.clone(),
))
.map(|_| ())
})
.collect::<FuturesUnordered<_>>()
.collect::<()>()
.await;

Ok(())
}
29 changes: 29 additions & 0 deletions crates/builder/src/log_collector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use std::sync::Arc;

use db::{log, ActiveModelTrait, DatabaseConnection};
use tokio::sync::mpsc::UnboundedReceiver;
use tracing::error;

pub(crate) struct LogEntry {
pub(crate) build_session_id: i64,
pub(crate) text: String,
}

pub(crate) async fn collect_logs(
db: Arc<DatabaseConnection>,
mut receiver: UnboundedReceiver<LogEntry>,
) {
while let Some(log_entry) = receiver.recv().await {
let insert = log::ActiveModel {
build_session_id: db::ActiveValue::Set(log_entry.build_session_id),
text: db::ActiveValue::Set(log_entry.text),
..Default::default()
}
.insert(&*db)
.await;

if let Err(e) = insert {
error!(%e, "unable to insert log entry")
}
}
}
30 changes: 30 additions & 0 deletions crates/builder/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
mod cli;
mod commands;
mod log_collector;
mod process;

use clap::Parser;
use cli::{Cli, Command};
use common::{config::Config, logging};
use db::Database;
use tracing::info;

#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
let config = Config::new()?;

logging::init(&config);

let Some(builder_config) = config.builder else {
return Err(anyhow::Error::msg("unable to load builder config"));
};

info!("connecting to database");
let database = Database::connect(&config.database.url).await?;

match Cli::parse().command {
Command::Serve => commands::serve(builder_config, config.storage, database).await?,
}

Ok(())
}
3 changes: 3 additions & 0 deletions crates/builder/src/process.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub(crate) mod container;
pub(crate) mod volume;
pub(crate) mod worker;
208 changes: 208 additions & 0 deletions crates/builder/src/process/container.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
use std::{
collections::HashMap,
fmt,
io::{self, Cursor, Read, Write},
};

use bollard::{
container::{
AttachContainerOptions, Config, CreateContainerOptions, DownloadFromContainerOptions,
LogOutput, RemoveContainerOptions,
},
errors::Error,
service::MountTypeEnum,
service::{
ContainerWaitResponse, HostConfig, Mount, MountVolumeOptions,
MountVolumeOptionsDriverConfig,
},
Docker,
};
use common::config;
use derive_more::{Display, Error, From};
use futures_util::{Stream, TryStreamExt};

use crate::process::volume::{Volume, VolumeError};

#[derive(Debug, Display, Error, From)]
pub enum ContainerRemoveError {
Docker(Error),
Volume(VolumeError),
}

#[derive(Debug, Display, Error, From)]
pub enum DownloadFromContainerError {
Docker(Error),
Io(io::Error),

#[display(fmt = "file size limit exceeded")]
FileSizeLimitExceeded,

#[display(fmt = "file not found")]
FileNotFound,
}

pub struct Container {
id: String,
volume: Volume,
}

impl Container {
pub async fn new<U: fmt::Display>(
config: &config::Builder,
client: &Docker,
volume: Volume,
build_session_token: &str,
rust_version: &str,
cargo_contract_version: &str,
source_code_url: U,
) -> Result<Self, Error> {
let host_config = HostConfig {
cap_add: Some(vec![String::from("DAC_OVERRIDE")]),
cap_drop: Some(vec![String::from("ALL")]),
memory: Some(config.memory_limit),
memory_swap: Some(config.memory_swap_limit),
mounts: Some(vec![Mount {
target: Some(String::from("/root")),
typ: Some(MountTypeEnum::VOLUME),
volume_options: Some(MountVolumeOptions {
driver_config: Some(MountVolumeOptionsDriverConfig {
name: Some(String::from("local")),
options: Some(HashMap::from([
(String::from("device"), volume.device().to_string()),
(String::from("type"), String::from("ext4")),
])),
}),
..Default::default()
}),
..Default::default()
}]),
pids_limit: Some(768),
security_opt: Some(vec![String::from("no-new-privileges")]),
..Default::default()
};

let container = client
.create_container(
Some(CreateContainerOptions {
name: build_session_token,
..Default::default()
}),
Config {
image: Some("ink-builder"),
env: Some(vec![
&format!("SOURCE_CODE_URL={source_code_url}"),
&format!("CARGO_CONTRACT_VERSION={cargo_contract_version}"),
&format!("RUST_VERSION={rust_version}"),
&format!("BUILD_SESSION_TOKEN={build_session_token}"),
]),
host_config: Some(host_config),
attach_stdout: Some(true),
attach_stderr: Some(true),
..Default::default()
},
)
.await?;

client
.start_container::<String>(&container.id, None)
.await?;

Ok(Self {
id: container.id,
volume,
})
}

pub async fn logs(
&self,
client: &Docker,
) -> Result<impl Stream<Item = Result<LogOutput, Error>>, Error> {
let raw = client
.attach_container::<String>(
&self.id,
Some(AttachContainerOptions {
stdout: Some(true),
stderr: Some(true),
stream: Some(true),
logs: Some(true),
..Default::default()
}),
)
.await?;

Ok(raw.output)
}

pub async fn wasm_file<'a>(
&self,
client: &Docker,
buf: &'a mut [u8],
) -> Result<&'a [u8], DownloadFromContainerError> {
self.download_from_container_to_buf(client, "/root/artifacts/ink/main.wasm", buf)
.await
}

pub async fn metadata_file<'a>(
&self,
client: &Docker,
buf: &'a mut [u8],
) -> Result<&'a [u8], DownloadFromContainerError> {
self.download_from_container_to_buf(client, "/root/artifacts/ink/main.json", buf)
.await
}

pub fn events(
&self,
client: &Docker,
) -> impl Stream<Item = Result<ContainerWaitResponse, Error>> {
client.wait_container::<String>(&self.id, None)
}

pub async fn remove(self, client: &Docker) -> Result<(), ContainerRemoveError> {
client
.remove_container(
&self.id,
Some(RemoveContainerOptions {
v: true,
force: true,
..Default::default()
}),
)
.await?;

self.volume.close().await?;

Ok(())
}

async fn download_from_container_to_buf<'a>(
&self,
client: &Docker,
path: &str,
buf: &'a mut [u8],
) -> Result<&'a [u8], DownloadFromContainerError> {
let mut cursor = Cursor::new(buf);

let mut stream =
client.download_from_container(&self.id, Some(DownloadFromContainerOptions { path }));

while let Some(chunk) = stream.try_next().await? {
cursor
.write(&chunk)
.map_err(|_| DownloadFromContainerError::FileSizeLimitExceeded)?;
}

let position = cursor.position() as usize;

// Re-use the same buffer to store both archived and unarchived files.
let (archive, file_buf) = cursor.into_inner().split_at_mut(position);

let file_size = tar::Archive::new(&*archive)
.entries()?
.next()
.ok_or(DownloadFromContainerError::FileNotFound)??
.read(file_buf)?;

Ok(&file_buf[..file_size])
}
}
Loading

0 comments on commit 75895db

Please sign in to comment.