From 5570fdd1189f63b3536e4fac4bad55cb83243246 Mon Sep 17 00:00:00 2001 From: Jordan Petridis Date: Thu, 23 Nov 2017 21:09:50 +0200 Subject: [PATCH] Switched to using r2d2 instead of a databaseConnection object. --- .gitlab-ci.yml | 6 +- hammond-data/src/database.rs | 56 +++++++---------- hammond-data/src/dbqueries.rs | 87 +++++++++++--------------- hammond-data/src/feed.rs | 33 ++++++---- hammond-data/src/models/insertables.rs | 26 +++++--- hammond-data/src/models/queryables.rs | 6 +- rustfmt.toml | 1 - 7 files changed, 99 insertions(+), 116 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 49deba7..f8401fd 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -44,7 +44,7 @@ test:stable: script: - rustc --version && cargo --version - cargo build --all - - cargo test --all --verbose + - cargo test -j 1 --all --verbose test:nightly: # Nightly @@ -52,8 +52,8 @@ test:nightly: image: "rustlang/rust:nightly" script: - rustc --version && cargo --version - - cargo build --all - - cargo test --all --verbose + - cargo build + - cargo test -j 1 --verbose # - cargo bench # Configure and run rustfmt on nightly diff --git a/hammond-data/src/database.rs b/hammond-data/src/database.rs index 163d18c..23a1087 100644 --- a/hammond-data/src/database.rs +++ b/hammond-data/src/database.rs @@ -1,26 +1,23 @@ -// use r2d2_diesel::ConnectionManager; -// use diesel::SqliteConnection; +use r2d2_diesel::ConnectionManager; use diesel::prelude::*; +use r2d2; use std::path::PathBuf; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::io; -// use std::time::Duration; +use std::time::Duration; use errors::*; #[cfg(not(test))] use xdg_; -// type Pool = r2d2::Pool>; -type Database = Arc>; +type Pool = Arc>>; embed_migrations!("migrations/"); lazy_static!{ - // static ref POOL: Pool = init_pool(DB_PATH.to_str().unwrap()); - - static ref DB: Arc> = Arc::new(Mutex::new(establish_connection())); + static ref POOL: Pool = init_pool(DB_PATH.to_str().unwrap()); } #[cfg(not(test))] @@ -40,35 +37,26 @@ lazy_static! { static ref DB_PATH: PathBuf = TEMPDIR.path().join("hammond.db"); } -pub fn connection() -> Database { - // POOL.clone() - Arc::clone(&DB) +pub fn connection() -> Pool { + // Arc::clone(&DB) + Arc::clone(&POOL) } -// fn init_pool(db_path: &str) -> Pool { -// let config = r2d2::Config::builder() -// // .pool_size(60) -// // .min_idle(Some(60)) -// // .connection_timeout(Duration::from_secs(60)) -// .build(); -// let manager = ConnectionManager::::new(db_path); -// let pool = r2d2::Pool::new(config, manager).expect("Failed to create pool."); -// info!("Database pool initialized."); +fn init_pool(db_path: &str) -> Pool { + let config = r2d2::Config::builder() + .pool_size(1) + .connection_timeout(Duration::from_secs(60)) + .build(); + let manager = ConnectionManager::::new(db_path); + let pool = Arc::new(r2d2::Pool::new(config, manager).expect("Failed to create pool.")); + info!("Database pool initialized."); -// { -// let db = pool.clone().get().unwrap(); -// utils::run_migration_on(&*db).unwrap(); -// } + { + let db = Arc::clone(&pool).get().unwrap(); + run_migration_on(&*db).unwrap(); + } -// pool -// } - -pub fn establish_connection() -> SqliteConnection { - let database_url = DB_PATH.to_str().unwrap(); - let db = SqliteConnection::establish(database_url) - .expect(&format!("Error connecting to {}", database_url)); - run_migration_on(&db).unwrap(); - db + pool } pub fn run_migration_on(connection: &SqliteConnection) -> Result<()> { diff --git a/hammond-data/src/dbqueries.rs b/hammond-data/src/dbqueries.rs index 02c45b3..2cec230 100644 --- a/hammond-data/src/dbqueries.rs +++ b/hammond-data/src/dbqueries.rs @@ -12,7 +12,7 @@ pub fn get_sources() -> QueryResult> { use schema::source::dsl::*; let db = connection(); - let con = db.lock().unwrap(); + let con = db.get().unwrap(); source.load::(&*con) } @@ -20,7 +20,7 @@ pub fn get_podcasts() -> QueryResult> { use schema::podcast::dsl::*; let db = connection(); - let con = db.lock().unwrap(); + let con = db.get().unwrap(); podcast.load::(&*con) } @@ -28,7 +28,7 @@ pub fn get_episodes() -> QueryResult> { use schema::episode::dsl::*; let db = connection(); - let con = db.lock().unwrap(); + let con = db.get().unwrap(); episode.order(epoch.desc()).load::(&*con) } @@ -36,7 +36,7 @@ pub fn get_downloaded_episodes() -> QueryResult> { use schema::episode::dsl::*; let db = connection(); - let con = db.lock().unwrap(); + let con = db.get().unwrap(); episode .filter(local_uri.is_not_null()) .load::(&*con) @@ -46,7 +46,7 @@ pub fn get_played_episodes() -> QueryResult> { use schema::episode::dsl::*; let db = connection(); - let con = db.lock().unwrap(); + let con = db.get().unwrap(); episode.filter(played.is_not_null()).load::(&*con) } @@ -54,7 +54,7 @@ pub fn get_episode_from_id(ep_id: i32) -> QueryResult { use schema::episode::dsl::*; let db = connection(); - let con = db.lock().unwrap(); + let con = db.get().unwrap(); episode.filter(id.eq(ep_id)).get_result::(&*con) } @@ -62,7 +62,7 @@ pub fn get_episode_local_uri_from_id(ep_id: i32) -> QueryResult> use schema::episode::dsl::*; let db = connection(); - let con = db.lock().unwrap(); + let con = db.get().unwrap(); episode .filter(id.eq(ep_id)) @@ -74,7 +74,7 @@ pub fn get_episodes_with_limit(limit: u32) -> QueryResult> { use schema::episode::dsl::*; let db = connection(); - let con = db.lock().unwrap(); + let con = db.get().unwrap(); episode .order(epoch.desc()) @@ -86,7 +86,7 @@ pub fn get_podcast_from_id(pid: i32) -> QueryResult { use schema::podcast::dsl::*; let db = connection(); - let con = db.lock().unwrap(); + let con = db.get().unwrap(); podcast.filter(id.eq(pid)).get_result::(&*con) } @@ -94,7 +94,7 @@ pub fn get_pd_episodes(parent: &Podcast) -> QueryResult> { use schema::episode::dsl::*; let db = connection(); - let con = db.lock().unwrap(); + let con = db.get().unwrap(); Episode::belonging_to(parent) .order(epoch.desc()) @@ -105,7 +105,7 @@ pub fn get_pd_unplayed_episodes(parent: &Podcast) -> QueryResult> { use schema::episode::dsl::*; let db = connection(); - let con = db.lock().unwrap(); + let con = db.get().unwrap(); Episode::belonging_to(parent) .filter(played.is_null()) @@ -117,7 +117,7 @@ pub fn get_pd_episodes_limit(parent: &Podcast, limit: u32) -> QueryResult QueryResult { use schema::source::dsl::*; let db = connection(); - let con = db.lock().unwrap(); + let con = db.get().unwrap(); source.filter(uri.eq(uri_)).get_result::(&*con) } @@ -137,48 +137,43 @@ pub fn get_podcast_from_title(title_: &str) -> QueryResult { use schema::podcast::dsl::*; let db = connection(); - let con = db.lock().unwrap(); + let con = db.get().unwrap(); podcast .filter(title.eq(title_)) .get_result::(&*con) } -pub fn get_episode_from_uri(uri_: &str) -> QueryResult { +pub fn get_episode_from_uri(con: &SqliteConnection, uri_: &str) -> QueryResult { use schema::episode::dsl::*; - let db = connection(); - let con = db.lock().unwrap(); episode.filter(uri.eq(uri_)).get_result::(&*con) } pub fn remove_feed(pd: &Podcast) -> QueryResult<()> { - delete_source(pd.source_id())?; - delete_podcast(*pd.id())?; - delete_podcast_episodes(*pd.id())?; + let db = connection(); + let con = db.get().unwrap(); + + delete_source(&con, pd.source_id())?; + delete_podcast(&con, *pd.id())?; + delete_podcast_episodes(&con, *pd.id())?; Ok(()) } -pub fn delete_source(source_id: i32) -> QueryResult { +pub fn delete_source(con: &SqliteConnection, source_id: i32) -> QueryResult { use schema::source::dsl::*; - let db = connection(); - let con = db.lock().unwrap(); diesel::delete(source.filter(id.eq(source_id))).execute(&*con) } -pub fn delete_podcast(podcast_id: i32) -> QueryResult { +pub fn delete_podcast(con: &SqliteConnection, podcast_id: i32) -> QueryResult { use schema::podcast::dsl::*; - let db = connection(); - let con = db.lock().unwrap(); diesel::delete(podcast.filter(id.eq(podcast_id))).execute(&*con) } -pub fn delete_podcast_episodes(parent_id: i32) -> QueryResult { +pub fn delete_podcast_episodes(con: &SqliteConnection, parent_id: i32) -> QueryResult { use schema::episode::dsl::*; - let db = connection(); - let con = db.lock().unwrap(); diesel::delete(episode.filter(podcast_id.eq(parent_id))).execute(&*con) } @@ -186,7 +181,7 @@ pub fn update_none_to_played_now(parent: &Podcast) -> QueryResult { use schema::episode::dsl::*; let db = connection(); - let con = db.lock().unwrap(); + let con = db.get().unwrap(); let epoch_now = Utc::now().timestamp() as i32; con.transaction(|| -> QueryResult { @@ -196,44 +191,32 @@ pub fn update_none_to_played_now(parent: &Podcast) -> QueryResult { }) } -pub fn insert_new_source(s: &NewSource) -> QueryResult { +pub fn insert_new_source(con: &SqliteConnection, s: &NewSource) -> QueryResult { use schema::source::dsl::*; - let db = connection(); - let tempdb = db.lock().unwrap(); - diesel::insert_into(source).values(s).execute(&*tempdb) + diesel::insert_into(source).values(s).execute(&*con) } -pub fn insert_new_podcast(pd: &NewPodcast) -> QueryResult { +pub fn insert_new_podcast(con: &SqliteConnection, pd: &NewPodcast) -> QueryResult { use schema::podcast::dsl::*; - let db = connection(); - let tempdb = db.lock().unwrap(); - diesel::insert_into(podcast).values(pd).execute(&*tempdb) + diesel::insert_into(podcast).values(pd).execute(&*con) } -pub fn insert_new_episode(ep: &NewEpisode) -> QueryResult { +pub fn insert_new_episode(con: &SqliteConnection, ep: &NewEpisode) -> QueryResult { use schema::episode::dsl::*; - let db = connection(); - let tempdb = db.lock().unwrap(); - diesel::insert_into(episode).values(ep).execute(&*tempdb) + diesel::insert_into(episode).values(ep).execute(&*con) } -pub fn replace_podcast(pd: &NewPodcast) -> QueryResult { +pub fn replace_podcast(con: &SqliteConnection, pd: &NewPodcast) -> QueryResult { use schema::podcast::dsl::*; - let db = connection(); - let tempdb = db.lock().unwrap(); - - diesel::replace_into(podcast).values(pd).execute(&*tempdb) + diesel::replace_into(podcast).values(pd).execute(&*con) } -pub fn replace_episode(ep: &NewEpisode) -> QueryResult { +pub fn replace_episode(con: &SqliteConnection, ep: &NewEpisode) -> QueryResult { use schema::episode::dsl::*; - let db = connection(); - let tempdb = db.lock().unwrap(); - - diesel::replace_into(episode).values(ep).execute(&*tempdb) + diesel::replace_into(episode).values(ep).execute(&*con) } diff --git a/hammond-data/src/feed.rs b/hammond-data/src/feed.rs index 157a009..fac2966 100644 --- a/hammond-data/src/feed.rs +++ b/hammond-data/src/feed.rs @@ -6,6 +6,7 @@ use dbqueries; use parser; use models::{Episode, NewEpisode, NewPodcast, Podcast, Source}; +use database::connection; use errors::*; #[derive(Debug)] @@ -33,18 +34,20 @@ impl Feed { Ok(()) } - #[allow(dead_code)] - fn index_channel(&self) -> Result<()> { - self.parse_channel().index()?; - Ok(()) - } + // #[allow(dead_code)] + // fn index_channel(&self) -> Result<()> { + // self.parse_channel().index()?; + // Ok(()) + // } // TODO: Refactor transcactions and find a way to do it in parallel. fn index_channel_items(&self, pd: &Podcast) -> Result<()> { let episodes = self.parse_channel_items(pd); + let db = connection(); + let con = db.get().unwrap(); episodes.into_iter().for_each(|x| { - let e = x.index(); + let e = x.index(&con); if let Err(err) = e { error!("Failed to index episode: {:?}.", x); error!("Error msg: {}", err); @@ -74,10 +77,13 @@ impl Feed { #[allow(dead_code)] fn get_episodes(&self) -> Result> { let pd = self.get_podcast()?; + let eps = self.parse_channel_items(&pd); - let episodes: Vec<_> = self.parse_channel_items(&pd) - .into_par_iter() - .filter_map(|ep| ep.into_episode().ok()) + let db = connection(); + let con = db.get().unwrap(); + // TODO: Make it parallel + let episodes: Vec<_> = eps.into_iter() + .filter_map(|ep| ep.into_episode(&con).ok()) .collect(); Ok(episodes) @@ -135,7 +141,7 @@ pub fn fetch(feeds: Vec) -> Vec { mod tests { use rss; - use models::NewSource; + use models::Source; use std::fs; use std::io::BufReader; @@ -152,8 +158,9 @@ mod tests { "http://feeds.feedburner.com/linuxunplugged", ]; - inpt.iter().for_each(|feed| { - NewSource::new_with_uri(feed).into_source().unwrap(); + inpt.iter().for_each(|url| { + // Index the urls into the source table. + Source::from_url(url).unwrap(); }); index_all().unwrap(); @@ -187,7 +194,7 @@ mod tests { let mut feeds: Vec<_> = urls.iter() .map(|&(path, url)| { // Create and insert a Source into db - let s = NewSource::new_with_uri(url).into_source().unwrap(); + let s = Source::from_url(url).unwrap(); // open the xml file let feed = fs::File::open(path).unwrap(); diff --git a/hammond-data/src/models/insertables.rs b/hammond-data/src/models/insertables.rs index fbf2bd6..a804a13 100644 --- a/hammond-data/src/models/insertables.rs +++ b/hammond-data/src/models/insertables.rs @@ -6,6 +6,7 @@ use utils::url_cleaner; use errors::*; use dbqueries; +use database::connection; #[derive(Insertable)] #[table_name = "source"] @@ -27,9 +28,12 @@ impl NewSource { } fn index(&self) { + let db = connection(); + let con = db.get().unwrap(); + // Throw away the result like `insert or ignore` // Diesel deos not support `insert or ignore` yet. - let _ = dbqueries::insert_new_source(self); + let _ = dbqueries::insert_new_source(&con, self); } // Look out for when tryinto lands into stable. @@ -58,13 +62,13 @@ impl NewEpisode { // TODO: Currently using diesel from master git. // Watch out for v0.99.0 beta and change the toml. // TODO: Refactor into batch indexes instead. - pub fn into_episode(self) -> Result { - self.index()?; - Ok(dbqueries::get_episode_from_uri(&self.uri.unwrap())?) + pub fn into_episode(self, con: &SqliteConnection) -> Result { + self.index(con)?; + Ok(dbqueries::get_episode_from_uri(con, &self.uri.unwrap())?) } - pub fn index(&self) -> QueryResult<()> { - let ep = dbqueries::get_episode_from_uri(&self.uri.clone().unwrap()); + pub fn index(&self, con: &SqliteConnection) -> QueryResult<()> { + let ep = dbqueries::get_episode_from_uri(con, &self.uri.clone().unwrap()); match ep { Ok(foo) => { @@ -75,11 +79,11 @@ impl NewEpisode { if foo.title() != self.title.as_ref().map(|x| x.as_str()) || foo.published_date() != self.published_date.as_ref().map(|x| x.as_str()) { - dbqueries::replace_episode(self)?; + dbqueries::replace_episode(con, self)?; } } Err(_) => { - dbqueries::insert_new_episode(self)?; + dbqueries::insert_new_episode(con, self)?; } } Ok(()) @@ -107,6 +111,8 @@ impl NewPodcast { pub fn index(&self) -> QueryResult<()> { let pd = dbqueries::get_podcast_from_title(&self.title); + let db = connection(); + let con = db.get().unwrap(); match pd { Ok(foo) => { if foo.source_id() != self.source_id { @@ -114,11 +120,11 @@ impl NewPodcast { }; if foo.link() != self.link { - dbqueries::replace_podcast(self)?; + dbqueries::replace_podcast(&con, self)?; } } Err(_) => { - dbqueries::insert_new_podcast(self)?; + dbqueries::insert_new_podcast(&con, self)?; } } Ok(()) diff --git a/hammond-data/src/models/queryables.rs b/hammond-data/src/models/queryables.rs index 6719f19..90124e7 100644 --- a/hammond-data/src/models/queryables.rs +++ b/hammond-data/src/models/queryables.rs @@ -133,7 +133,7 @@ impl Episode { pub fn save(&self) -> QueryResult { let db = connection(); - let tempdb = db.lock().unwrap(); + let tempdb = db.get().unwrap(); self.save_changes::(&*tempdb) } @@ -215,7 +215,7 @@ impl Podcast { pub fn save(&self) -> QueryResult { let db = connection(); - let tempdb = db.lock().unwrap(); + let tempdb = db.get().unwrap(); self.save_changes::(&*tempdb) } @@ -276,7 +276,7 @@ impl<'a> Source { pub fn save(&self) -> QueryResult { let db = connection(); - let tempdb = db.lock().unwrap(); + let tempdb = db.get().unwrap(); self.save_changes::(&*tempdb) } diff --git a/rustfmt.toml b/rustfmt.toml index 3f0ae26..63e38c1 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -7,7 +7,6 @@ wrap_comments = true error_on_line_overflow = true error_on_line_overflow_comments = true tab_spaces = 4 -fn_call_width = 60 newline_style = "Unix" fn_call_style = "Block" report_todo = "Never"