Merge pull request 'Move ingest into main project' (#277) from 225-move-ingest-into-main-project into main
Some checks failed
Push Workflows / rustfmt (push) Successful in 8s
Push Workflows / mdbook (push) Successful in 14s
Push Workflows / leptos-test (push) Has started running
Push Workflows / docker-build (push) Has been cancelled
Push Workflows / docs (push) Has started running
Push Workflows / nix-build (push) Has been cancelled
Push Workflows / mdbook-server (push) Has been cancelled
Push Workflows / test (push) Has been cancelled
Push Workflows / build (push) Has been cancelled
Push Workflows / clippy (push) Has been cancelled

Reviewed-on: #277
This commit was merged in pull request #277.
This commit is contained in:
2025-10-27 17:17:58 +00:00
12 changed files with 672 additions and 0 deletions

70
Cargo.lock generated
View File

@@ -203,6 +203,26 @@ dependencies = [
"syn",
]
[[package]]
name = "audiotags"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44e797ce0164cf599c71f2c3849b56301d96a3dc033544588e875686b050ed39"
dependencies = [
"audiotags-macro",
"id3",
"metaflac",
"mp4ameta",
"readme-rustdocifier",
"thiserror 1.0.63",
]
[[package]]
name = "audiotags-macro"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8eaa9b2312fc01f7291f3b7b0f52ed08b1c0177c96a2e696ab55695cc4d06889"
[[package]]
name = "autocfg"
version = "1.2.0"
@@ -1258,6 +1278,12 @@ version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024"
[[package]]
name = "hex"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]]
name = "hmac"
version = "0.12.1"
@@ -1755,6 +1781,17 @@ dependencies = [
"syn",
]
[[package]]
name = "id3"
version = "1.16.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aadb14a5ba1a0d58ecd4a29bfc9b8f1d119eee24aa01a62c1ec93eb9630a1d86"
dependencies = [
"bitflags 2.5.0",
"byteorder",
"flate2",
]
[[package]]
name = "ident_case"
version = "1.0.1"
@@ -2175,6 +2212,7 @@ name = "libretunes"
version = "0.1.0"
dependencies = [
"async-trait",
"audiotags",
"axum",
"axum-login",
"cfg-if",
@@ -2312,6 +2350,16 @@ version = "2.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c8640c5d730cb13ebd907d8d04b52f55ac9a2eec55b440c8892f40d56c76c1d"
[[package]]
name = "metaflac"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fdf25a3451319c52a4a56d956475fbbb763bfb8420e2187d802485cb0fd8d965"
dependencies = [
"byteorder",
"hex",
]
[[package]]
name = "migrations_internals"
version = "2.1.0"
@@ -2376,6 +2424,22 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "mp4ameta"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb23d62e8eb5299a3f79657c70ea9269eac8f6239a76952689bcd06a74057e81"
dependencies = [
"lazy_static",
"mp4ameta_proc",
]
[[package]]
name = "mp4ameta_proc"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07dcca13d1740c0a665f77104803360da0bdb3323ecce2e93fa2c959a6d52806"
[[package]]
name = "multer"
version = "3.1.0"
@@ -2851,6 +2915,12 @@ dependencies = [
"syn",
]
[[package]]
name = "readme-rustdocifier"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08ad765b21a08b1a8e5cdce052719188a23772bcbefb3c439f0baaf62c56ceac"
[[package]]
name = "redis-protocol"
version = "6.0.0"

View File

@@ -65,6 +65,7 @@ libretunes_macro = { git = "https://git.libretunes.xyz/LibreTunes/LibreTunes-Mac
rand = { version = "0.9.1", optional = true }
clap = { version = "4.5.39", features = ["derive", "env"], optional = true }
tokio-tungstenite = { version = "0.26.2", optional = true }
audiotags = { version = "0.5.0", default-features = false, optional = true }
url = { version = "2.5.7", optional = true }
[features]
@@ -96,6 +97,7 @@ ssr = [
"flexi_logger",
"leptos-use/ssr",
"image-convert",
"dep:audiotags",
"rand",
"dep:url",
"dep:clap",

143
src/ingest/create.rs Normal file
View File

@@ -0,0 +1,143 @@
use crate::prelude::*;
use image_convert::ImageResource;
use std::fs;
use std::path::Path;
pub fn create_artist(
mut artist: backend::NewArtist,
image: &Option<ImageResource>,
db_conn: &mut PgPooledConn,
image_base_path: &Path,
) -> BackendResult<backend::Artist> {
let image_save_path = image
.as_ref()
.map(|image| save_image(image, image_base_path))
.transpose()
.context("Error saving artist image")?;
artist.image_path = image_save_path;
let new_artist = diesel::insert_into(artists::table)
.values(&artist)
.get_result(db_conn)
.context(format!(
"Error inserting new artist \"{}\" into database",
artist.name
))?;
Ok(new_artist)
}
pub fn create_album(
mut album: backend::NewAlbum,
artists: Vec<i32>,
image: &Option<ImageResource>,
db_conn: &mut PgPooledConn,
image_base_path: &Path,
) -> BackendResult<backend::Album> {
let image_save_path = image
.as_ref()
.map(|image| save_image(image, image_base_path))
.transpose()
.context("Error saving album image")?;
album.image_path = image_save_path;
let new_album = db_conn
.transaction(|db_conn| {
let new_album: backend::Album = diesel::insert_into(albums::table)
.values(&album)
.get_result(db_conn)
.context(format!(
"Error inserting new album \"{}\" into database",
album.title
))?;
for artist_id in artists {
diesel::insert_into(album_artists::table)
.values((
album_artists::album_id.eq(new_album.id),
album_artists::artist_id.eq(artist_id),
))
.execute(db_conn)
.context(format!("Error adding artist {artist_id} to album"))?;
}
Ok::<backend::Album, BackendError>(new_album)
})
.context(format!(
"Error running database transaction for album \"{}\"",
album.title
))?;
Ok(new_album)
}
pub fn create_song(
mut song: backend::NewSong,
artists: Vec<i32>,
image: Option<ImageResource>,
db_conn: &mut PgPooledConn,
image_base_path: &Path,
) -> BackendResult<backend::Song> {
let image_save_path = image
.as_ref()
.map(|image| save_image(image, image_base_path))
.transpose()
.context("Error saving song image")?;
song.image_path = image_save_path;
let new_song = db_conn
.transaction(|db_conn| {
let new_song: backend::Song = diesel::insert_into(songs::table)
.values(&song)
.get_result(db_conn)
.context("Error inserting new song \"{}\" into database")?;
for artist_id in artists {
diesel::insert_into(song_artists::table)
.values((
song_artists::song_id.eq(new_song.id),
song_artists::artist_id.eq(artist_id),
))
.execute(db_conn)
.context(format!("Error adding artist {artist_id} to song"))?;
}
Ok::<backend::Song, BackendError>(new_song)
})
.context(format!(
"Error running database transaction for song \"{}\"",
song.title
))?;
Ok(new_song)
}
pub fn save_image(image: &ImageResource, image_base_path: &Path) -> BackendResult<LocalPath> {
let relative_path = AssetType::Image.new_path("webp");
let full_path = image_base_path.join(relative_path.clone().path());
let parent_path = full_path
.parent()
.ok_or(BackendError::InternalError(format!(
"Unable to get parent of path \"{}\"",
full_path.display()
)))?;
fs::create_dir_all(parent_path).context(format!(
"Failed to create parent directories for new user image at \"{}\"",
parent_path.display()
))?;
let mut image_target = ImageResource::from_path(&full_path);
image_convert::to_webp(&mut image_target, image, &image_convert::WEBPConfig::new())
.map_err(|e| InputError::InvalidInput(format!("{e}")))
.context("Error saving image as webp")?;
Ok(relative_path)
}

48
src/ingest/find.rs Normal file
View File

@@ -0,0 +1,48 @@
use crate::prelude::*;
pub fn find_artist(
name: String,
db_conn: &mut PgPooledConn,
) -> BackendResult<Option<backend::Artist>> {
artists::table
.filter(artists::name.eq(&name))
.first(db_conn)
.optional()
.context(format!("Error finding artist \"{name}\""))
}
pub fn find_album(
title: String,
db_conn: &mut PgPooledConn,
) -> BackendResult<Option<backend::Album>> {
albums::table
.filter(albums::title.eq(&title))
.first(db_conn)
.optional()
.context(format!("Error finding album \"{title}\""))
}
pub fn find_song(
song: backend::NewSong,
db_conn: &mut PgPooledConn,
) -> BackendResult<Option<backend::Song>> {
songs::table
.filter(songs::title.eq(&song.title))
.filter(songs::album_id.eq(&song.album_id))
.first(db_conn)
.optional()
.context(format!("Error finding song \"{}\"", song.title))
}
pub fn find_song_from_file(path: LocalPath, db_conn: &mut PgPooledConn) -> BackendResult<bool> {
use diesel::dsl::{exists, select};
select(exists(
songs::table.filter(songs::storage_path.eq(path.clone())),
))
.get_result(db_conn)
.context(format!(
"Error checking if file \"{}\" exists in the song database",
path.path().display()
))
}

4
src/ingest/mod.rs Normal file
View File

@@ -0,0 +1,4 @@
pub mod create;
pub mod find;
pub mod scan;
pub mod task;

327
src/ingest/scan.rs Normal file
View File

@@ -0,0 +1,327 @@
use crate::ingest::create::*;
use crate::ingest::find::*;
use crate::prelude::*;
use image_convert::ImageResource;
use std::path::{Path, PathBuf};
pub async fn full_scan(state: &BackendState) {
info!("Ingest running...");
let ingest_path = state.config.audio_path.clone();
if let Err(e) = process_dir(ingest_path, state) {
error!("Error scanning for audio ingest: {e}");
}
}
pub fn process_dir(path: PathBuf, state: &BackendState) -> BackendResult<()> {
debug!("Scanning directory {} for audio files...", path.display());
let contents = path.read_dir().context("Error reading directory")?;
for dir_entry in contents {
let dir_entry = match dir_entry {
Ok(dir_entry) => dir_entry,
Err(e) => {
warn!(
"Error getting directory entry of {}: {e}, skipping...",
path.display()
);
continue;
}
};
let entry_type = match dir_entry.file_type() {
Ok(entry_type) => entry_type,
Err(e) => {
warn!(
"Error getting directory entry type of file {}: {e}, skipping...",
dir_entry.path().display()
);
continue;
}
};
if entry_type.is_dir() {
if let Err(e) = process_dir(dir_entry.path(), state) {
warn!(
"Failed to process directory {}: {e}, skipping...",
dir_entry.path().display()
);
}
} else if entry_type.is_symlink() {
if let Err(e) = process_link(dir_entry.path(), state) {
warn!(
"Failed to process symlink {}: {e}, skipping...",
dir_entry.path().display()
);
}
} else if entry_type.is_file() {
if let Err(e) = process_file(dir_entry.path(), state) {
warn!(
"Failed to process file {}: {e}, skipping...",
dir_entry.path().display()
);
}
} else {
unreachable!("One of is_file, is_dir, or is_symlink must be true")
}
}
Ok(())
}
pub fn process_link(path: PathBuf, state: &BackendState) -> BackendResult<()> {
debug!("Processing symlink {}...", path.display());
let destination = path.read_link().context("Failed to follow symlink")?;
if destination.is_file() {
process_file(destination.clone(), state).context(format!(
"Failed to process file {} pointed to by symlink {}",
destination.display(),
path.display()
))
} else if destination.is_dir() {
process_dir(destination.clone(), state).context(format!(
"Failed to process directory {} pointed to by symlink {}",
destination.display(),
path.display()
))
} else if destination.is_symlink() {
process_link(destination.clone(), state).context(format!(
"Failed to process symlink {} pointed to by symlink {}",
destination.display(),
path.display()
))
} else {
unreachable!("One of is_file, is_dir, or is_symlink must be true")
}
}
pub fn process_file(path: PathBuf, state: &BackendState) -> BackendResult<()> {
debug!("Processing file {}...", path.display());
let stripped_path = LocalPath::from_file_path(path.clone(), AssetType::Audio, state)
.context("Error stripping file path")?;
let mut db_conn = state.get_db_conn()?;
let song_ingested = find_song_from_file(stripped_path.clone(), &mut db_conn)
.context("Error checking if song file already ingested")?;
// Check if path exists
if song_ingested {
return Ok(());
}
// Read exif data
let config = audiotags::config::Config {
sep_artist: "\0",
parse_multiple_artists: true,
};
let tag = audiotags::Tag::new()
.with_config(config)
.read_from_path(&path)
.context(format!(
"Error reading audio tags for file {}",
path.display()
))?;
let title = tag.title().ok_or(InputError::MissingField(format!(
"No title tag in file {}",
path.display()
)))?;
let artists = tag.artists().unwrap_or(vec![]);
let album_name = tag.album().map(|a| a.title);
let album_artists = tag.album_artists();
let track = tag.track_number().map(|track| track as i32);
let album_cover = tag.album_cover();
let release_date = tag
.date()
.map(|ts| {
NaiveDate::from_ymd_opt(
ts.year,
ts.month.unwrap_or(1) as u32,
ts.day.unwrap_or(1) as u32,
)
.ok_or(BackendError::InternalError(format!(
"Error creating NaiveDate from metadata timestamp {ts:?}"
)))
})
.transpose()?;
let duration = tag.duration().map(|d| Ok(d as u64)).unwrap_or_else(|| {
let file = std::fs::File::open(&path).context(format!(
"Error opening audio file {} to read duration",
path.display()
))?;
crate::util::audio::extract_metadata(file)
.map(|(_codec, duration)| duration)
.context(format!(
"Error extracting duration from audio file {}",
path.display()
))
})?;
let duration = i32::try_from(duration)
.map_err(|_| {
BackendError::InternalError(format!("u64 {duration} can't be represented as i32"))
})
.context("Error converting song duration")?;
let image_base_path = state.get_asset_path(&AssetType::Image);
db_conn
.transaction(|db_conn| {
let song_artist_ids = artists
.into_iter()
.map(|artist_name| {
find_or_create_artist(artist_name.to_string(), &None, db_conn, &image_base_path)
.map(|artist| artist.id)
})
.collect::<BackendResult<Vec<i32>>>()
.context(format!("Error with artists for new song {title}"))?;
let album_cover_resource = album_cover
.map(|album_cover| {
let cursor = std::io::Cursor::new(album_cover.data);
image_convert::ImageResource::from_reader(cursor)
})
.transpose()
.context("Error setting up reader for album cover")?;
let song_album = album_name
.map(|album_name| {
find_or_create_album(
album_name.to_string(),
&album_cover_resource,
album_artists.unwrap_or(vec![]),
db_conn,
&image_base_path,
)
.context(format!("Error finding/creating album for new song {title}"))
})
.transpose()
.context(format!("Error with album for new song {title}"))?;
let proto_song = backend::NewSong {
title: title.to_string(),
album_id: song_album.map(|album| album.id),
track,
duration,
release_date,
storage_path: stripped_path,
image_path: None,
};
let found_song =
find_song(proto_song.clone(), db_conn).context("Error finding song for ingest")?;
if let Some(song) = found_song {
info!(
"Found song {} instead of ingesting {}",
song.title,
path.display()
);
return Ok(());
}
create_song(
proto_song.clone(),
song_artist_ids,
None,
db_conn,
&image_base_path,
)
.context(format!("Error creating ingested song {}", proto_song.title))?;
Ok::<(), BackendError>(())
})
.context(format!(
"Error running database transaction to ingest song from file {}",
path.display()
))
}
fn find_or_create_artist(
artist_name: String,
artist_image: &Option<ImageResource>,
db_conn: &mut PgPooledConn,
image_base_path: &Path,
) -> BackendResult<backend::Artist> {
db_conn
.transaction(|db_conn| {
find_artist(artist_name.clone(), db_conn)
.context(format!("Error trying to find artist {artist_name}"))?
.map_or_else(
|| {
let new_artist = backend::NewArtist {
name: artist_name.to_string(),
image_path: None,
};
create_artist(new_artist, artist_image, db_conn, image_base_path)
.context(format!("Error creating artist {artist_name}"))
},
Result::Ok,
)
})
.context(format!(
"Error running database transaction to find or create artist {artist_name}"
))
}
fn find_or_create_album(
album_name: String,
album_image: &Option<ImageResource>,
artists: Vec<&str>,
db_conn: &mut PgPooledConn,
image_base_path: &Path,
) -> BackendResult<backend::Album> {
db_conn
.transaction(|db_conn| {
find_album(album_name.clone(), db_conn)
.context(format!("Error trying to find album {album_name}"))?
.map_or_else(
|| {
let new_album = backend::NewAlbum {
title: album_name.to_string(),
release_date: None,
image_path: None,
};
let artists: Vec<i32> = artists
.into_iter()
.map(|artist_name| {
find_or_create_artist(
artist_name.to_string(),
&None,
db_conn,
image_base_path,
)
.map(|artist| artist.id)
})
.collect::<BackendResult<Vec<i32>>>()
.context(format!(
"Error finding/creating artists for album {album_name}"
))?;
create_album(new_album, artists, album_image, db_conn, image_base_path)
.context("Error creating artist")
},
Result::Ok,
)
})
.context(format!(
"Error running database transaction to find or create album {album_name}"
))
}

25
src/ingest/task.rs Normal file
View File

@@ -0,0 +1,25 @@
use crate::ingest::scan::full_scan;
use crate::prelude::*;
use tokio::task::{spawn, JoinHandle};
use tokio::time::{interval_at, Duration, Instant};
pub const INITIAL_SCAN_DELAY: Duration = Duration::from_secs(10);
pub const SCAN_INTERVAL: Duration = Duration::from_hours(1);
/// Start the ingest task
/// Waits an initial delay for startup to complete, then runs a full ingest
/// scan on a regular interval
pub async fn start_task(state: BackendState) -> JoinHandle<!> {
info!("Starting ingest task...");
let start_time = Instant::now() + INITIAL_SCAN_DELAY;
let mut scan_interval = interval_at(start_time, SCAN_INTERVAL);
spawn(async move {
loop {
scan_interval.tick().await;
full_scan(&state).await;
}
})
}

View File

@@ -26,6 +26,8 @@
)]
#![allow(clippy::unused_unit, clippy::unit_arg, clippy::type_complexity)]
#![recursion_limit = "256"]
#![feature(duration_constructors)]
#![feature(never_type)]
#![feature(adt_const_params)]
pub mod api;
@@ -42,6 +44,7 @@ use cfg_if::cfg_if;
cfg_if! {
if #[cfg(feature = "ssr")] {
pub mod schema;
pub mod ingest;
}
}

View File

@@ -55,6 +55,11 @@ async fn main() {
libretunes::util::database::migrate(&mut db_conn);
drop(db_conn); // Close the connection after migrations
// Create a task to periodically run ingest
if !state.config.disable_ingest {
let _ingest_task = libretunes::ingest::task::start_task(state.clone()).await;
}
debug!("Setting up session store...");
let session_store = RedisStore::new(state.get_redis_conn());
let session_layer = SessionManagerLayer::new(session_store);

View File

@@ -26,6 +26,10 @@ pub struct Config {
#[clap(long, env)]
/// The URL for the Redis connection.
pub redis_url: String,
#[clap(long, env, default_value = "false")]
/// Disable ingest
pub disable_ingest: bool,
}
#[derive(clap::Args, Clone)]

View File

@@ -106,6 +106,14 @@ impl BackendError {
BackendError::new(BackendErrorType::Internal(message.into()))
}
/// Creates a new `BackendError` for an audiotag error
#[cfg(feature = "ssr")]
#[track_caller]
#[allow(non_snake_case)]
pub fn AudiotagError(error: audiotags::error::Error) -> Self {
BackendError::new(BackendErrorType::Audiotag(error.to_string()))
}
/// Adds a context message to the error
#[track_caller]
#[allow(non_snake_case)]
@@ -193,6 +201,14 @@ impl From<ServerFnErrorErr> for BackendError {
}
}
#[cfg(feature = "ssr")]
impl From<audiotags::error::Error> for BackendError {
#[track_caller]
fn from(err: audiotags::error::Error) -> Self {
BackendError::AudiotagError(err)
}
}
pub trait Contextualize<R> {
/// Add context to the `Result` if it is an `Err`.
#[track_caller]
@@ -246,6 +262,9 @@ enum BackendErrorType {
#[error("Internal server error: {0}")]
Internal(String),
#[error("Audio tag error: {0}")]
Audiotag(String),
}
impl FromServerFnError for BackendError {

View File

@@ -110,6 +110,28 @@ impl LocalPath {
Ok(WebPath::new(url_path.path()))
}
#[cfg(feature = "ssr")]
pub fn from_file_path<P: Into<PathBuf>>(
path: P,
asset_type: AssetType,
state: &BackendState,
) -> BackendResult<Self> {
let root_path = state.get_asset_path(&asset_type);
let path = path.into();
path.strip_prefix(root_path.clone())
.map_err(|e| {
BackendError::InternalError(format!(
"Error stripping base path {} from path {}: {}",
root_path.display(),
path.display(),
e
))
})
.map(LocalPath::new)
}
#[cfg(feature = "ssr")]
/// Convert a server-side image path to a web path,
/// or fall back to a placeholder if it fails or the `path` is `None`,