diff --git a/hammond-data/src/dbqueries.rs b/hammond-data/src/dbqueries.rs index 6e86d13..ae98314 100644 --- a/hammond-data/src/dbqueries.rs +++ b/hammond-data/src/dbqueries.rs @@ -12,28 +12,32 @@ use connection; pub fn get_sources() -> QueryResult> { use schema::source::dsl::*; - let con = connection().get().unwrap(); + let db = connection(); + let con = db.lock().unwrap(); source.load::(&*con) } pub fn get_podcasts() -> QueryResult> { use schema::podcast::dsl::*; - let con = connection().get().unwrap(); + let db = connection(); + let con = db.lock().unwrap(); podcast.load::(&*con) } pub fn get_episodes() -> QueryResult> { use schema::episode::dsl::*; - let con = connection().get().unwrap(); + let db = connection(); + let con = db.lock().unwrap(); episode.order(epoch.desc()).load::(&*con) } pub fn get_downloaded_episodes() -> QueryResult> { use schema::episode::dsl::*; - let con = connection().get().unwrap(); + let db = connection(); + let con = db.lock().unwrap(); episode .filter(local_uri.is_not_null()) .load::(&*con) @@ -42,21 +46,24 @@ pub fn get_downloaded_episodes() -> QueryResult> { pub fn get_played_episodes() -> QueryResult> { use schema::episode::dsl::*; - let con = connection().get().unwrap(); + let db = connection(); + let con = db.lock().unwrap(); episode.filter(played.is_not_null()).load::(&*con) } pub fn get_episode_from_id(ep_id: i32) -> QueryResult { use schema::episode::dsl::*; - let con = connection().get().unwrap(); + let db = connection(); + let con = db.lock().unwrap(); episode.filter(id.eq(ep_id)).get_result::(&*con) } pub fn get_episode_local_uri_from_id(ep_id: i32) -> QueryResult> { use schema::episode::dsl::*; - let con = connection().get().unwrap(); + let db = connection(); + let con = db.lock().unwrap(); episode .filter(id.eq(ep_id)) @@ -67,7 +74,8 @@ pub fn get_episode_local_uri_from_id(ep_id: i32) -> QueryResult> pub fn get_episodes_with_limit(limit: u32) -> QueryResult> { use schema::episode::dsl::*; - let con = connection().get().unwrap(); + let db = connection(); + let con = db.lock().unwrap(); episode .order(epoch.desc()) @@ -78,14 +86,16 @@ pub fn get_episodes_with_limit(limit: u32) -> QueryResult> { pub fn get_podcast_from_id(pid: i32) -> QueryResult { use schema::podcast::dsl::*; - let con = connection().get().unwrap(); + let db = connection(); + let con = db.lock().unwrap(); podcast.filter(id.eq(pid)).get_result::(&*con) } pub fn get_pd_episodes(parent: &Podcast) -> QueryResult> { use schema::episode::dsl::*; - let con = connection().get().unwrap(); + let db = connection(); + let con = db.lock().unwrap(); Episode::belonging_to(parent) .order(epoch.desc()) @@ -95,7 +105,8 @@ pub fn get_pd_episodes(parent: &Podcast) -> QueryResult> { pub fn get_pd_unplayed_episodes(parent: &Podcast) -> QueryResult> { use schema::episode::dsl::*; - let con = connection().get().unwrap(); + let db = connection(); + let con = db.lock().unwrap(); Episode::belonging_to(parent) .filter(played.is_null()) @@ -106,7 +117,8 @@ pub fn get_pd_unplayed_episodes(parent: &Podcast) -> QueryResult> { pub fn get_pd_episodes_limit(parent: &Podcast, limit: u32) -> QueryResult> { use schema::episode::dsl::*; - let con = connection().get().unwrap(); + let db = connection(); + let con = db.lock().unwrap(); Episode::belonging_to(parent) .order(epoch.desc()) @@ -117,14 +129,16 @@ pub fn get_pd_episodes_limit(parent: &Podcast, limit: u32) -> QueryResult QueryResult { use schema::source::dsl::*; - let con = connection().get().unwrap(); + let db = connection(); + let con = db.lock().unwrap(); source.filter(uri.eq(uri_)).get_result::(&*con) } pub fn get_podcast_from_title(title_: &str) -> QueryResult { use schema::podcast::dsl::*; - let con = connection().get().unwrap(); + let db = connection(); + let con = db.lock().unwrap(); podcast .filter(title.eq(title_)) .get_result::(&*con) @@ -133,45 +147,47 @@ pub fn get_podcast_from_title(title_: &str) -> QueryResult { pub fn get_episode_from_uri(uri_: &str) -> QueryResult { use schema::episode::dsl::*; - let con = connection().get().unwrap(); + let db = connection(); + let con = db.lock().unwrap(); episode.filter(uri.eq(uri_)).get_result::(&*con) } -pub fn remove_feed(pd: &Podcast) -> QueryResult { - let con = connection().get().unwrap(); - - con.transaction(|| -> QueryResult { - delete_source(pd.source_id())?; - delete_podcast(*pd.id())?; - delete_podcast_episodes(*pd.id()) - }) +pub fn remove_feed(pd: &Podcast) -> QueryResult<()> { + delete_source(pd.source_id())?; + delete_podcast(*pd.id())?; + delete_podcast_episodes(*pd.id())?; + Ok(()) } pub fn delete_source(source_id: i32) -> QueryResult { use schema::source::dsl::*; - let con = connection().get().unwrap(); + let db = connection(); + let con = db.lock().unwrap(); diesel::delete(source.filter(id.eq(source_id))).execute(&*con) } pub fn delete_podcast(podcast_id: i32) -> QueryResult { use schema::podcast::dsl::*; - let con = connection().get().unwrap(); + let db = connection(); + let con = db.lock().unwrap(); diesel::delete(podcast.filter(id.eq(podcast_id))).execute(&*con) } pub fn delete_podcast_episodes(parent_id: i32) -> QueryResult { use schema::episode::dsl::*; - let con = connection().get().unwrap(); + let db = connection(); + let con = db.lock().unwrap(); diesel::delete(episode.filter(podcast_id.eq(parent_id))).execute(&*con) } pub fn update_none_to_played_now(parent: &Podcast) -> QueryResult { use schema::episode::dsl::*; - let con = connection().get().unwrap(); + let db = connection(); + let con = db.lock().unwrap(); let epoch_now = Utc::now().timestamp() as i32; con.transaction(|| -> QueryResult { diff --git a/hammond-data/src/feed.rs b/hammond-data/src/feed.rs index 05bdb0b..1ac5b5d 100644 --- a/hammond-data/src/feed.rs +++ b/hammond-data/src/feed.rs @@ -1,17 +1,14 @@ use rayon::prelude::*; use diesel::Identifiable; -use diesel::prelude::*; use rss; use dbqueries; use parser; -use connection; use models::{Podcast, Source}; use errors::*; - #[derive(Debug)] pub struct Feed { channel: rss::Channel, @@ -51,16 +48,12 @@ impl Feed { .map(|item| parser::new_episode(item, *pd.id())) .collect(); - let tempdb = connection().get().unwrap(); - let _ = tempdb.transaction::<(), Error, _>(|| { - episodes.into_iter().for_each(|x| { - let e = x.index(&*tempdb); - if let Err(err) = e { - error!("Failed to index episode: {:?}.", x); - error!("Error msg: {}", err); - }; - }); - Ok(()) + episodes.into_iter().for_each(|x| { + let e = x.index(); + if let Err(err) = e { + error!("Failed to index episode: {:?}.", x); + error!("Error msg: {}", err); + }; }); Ok(()) } @@ -70,7 +63,6 @@ pub fn index_all() -> Result<()> { let mut f = fetch_all()?; index(&mut f); - info!("Indexing done."); Ok(()) } @@ -82,6 +74,7 @@ pub fn index(feeds: &mut [Feed]) { error!("Error msg: {}", e.unwrap_err()); }; }); + info!("Indexing done."); } pub fn fetch_all() -> Result> { diff --git a/hammond-data/src/lib.rs b/hammond-data/src/lib.rs index 776df89..48100f7 100644 --- a/hammond-data/src/lib.rs +++ b/hammond-data/src/lib.rs @@ -32,13 +32,16 @@ pub mod errors; mod parser; mod schema; -use r2d2_diesel::ConnectionManager; -use diesel::SqliteConnection; +// use r2d2_diesel::ConnectionManager; +// use diesel::SqliteConnection; +use diesel::prelude::*; use std::path::PathBuf; +use std::sync::{Arc, Mutex}; // use std::time::Duration; -type Pool = r2d2::Pool>; +// type Pool = r2d2::Pool>; +type Database = Arc>; lazy_static!{ #[allow(dead_code)] @@ -62,7 +65,9 @@ lazy_static!{ HAMMOND_XDG.create_data_directory("Downloads").unwrap() }; - static ref POOL: Pool = init_pool(DB_PATH.to_str().unwrap()); + // static ref POOL: Pool = init_pool(DB_PATH.to_str().unwrap()); + + static ref DB: Arc> = Arc::new(Mutex::new(establish_connection())); } #[cfg(not(test))] @@ -82,24 +87,33 @@ lazy_static! { static ref DB_PATH: PathBuf = TEMPDIR.path().join("hammond.db"); } -pub fn connection() -> Pool { - POOL.clone() +pub fn connection() -> Database { + // POOL.clone() + Arc::clone(&DB) } -fn init_pool(db_path: &str) -> Pool { - let config = r2d2::Config::builder() - // .pool_size(60) - // .min_idle(Some(60)) - // .connection_timeout(Duration::from_secs(60)) - .build(); - let manager = ConnectionManager::::new(db_path); - let pool = r2d2::Pool::new(config, manager).expect("Failed to create pool."); - info!("Database pool initialized."); +// fn init_pool(db_path: &str) -> Pool { +// let config = r2d2::Config::builder() +// // .pool_size(60) +// // .min_idle(Some(60)) +// // .connection_timeout(Duration::from_secs(60)) +// .build(); +// let manager = ConnectionManager::::new(db_path); +// let pool = r2d2::Pool::new(config, manager).expect("Failed to create pool."); +// info!("Database pool initialized."); - { - let db = pool.clone().get().unwrap(); - utils::run_migration_on(&*db).unwrap(); - } +// { +// let db = pool.clone().get().unwrap(); +// utils::run_migration_on(&*db).unwrap(); +// } - pool +// pool +// } + +pub fn establish_connection() -> SqliteConnection { + let database_url = DB_PATH.to_str().unwrap(); + let db = SqliteConnection::establish(database_url) + .expect(&format!("Error connecting to {}", database_url)); + utils::run_migration_on(&db).unwrap(); + db } diff --git a/hammond-data/src/models/insertables.rs b/hammond-data/src/models/insertables.rs index 0bbfe10..a988ed5 100644 --- a/hammond-data/src/models/insertables.rs +++ b/hammond-data/src/models/insertables.rs @@ -29,7 +29,8 @@ impl<'a> NewSource<'a> { fn index(&self) { use schema::source::dsl::*; - let tempdb = connection().get().unwrap(); + let db = connection(); + let tempdb = db.lock().unwrap(); // Throw away the result like `insert or ignore` // Diesel deos not support `insert or ignore` yet. let _ = diesel::insert_into(source).values(self).execute(&*tempdb); @@ -61,27 +62,25 @@ impl<'a> NewEpisode<'a> { // TODO: Currently using diesel from master git. // Watch out for v0.99.0 beta and change the toml. // TODO: Refactor into batch indexes instead. - pub fn index(&self, con: &SqliteConnection) -> QueryResult<()> { + pub fn index(&self) -> QueryResult<()> { use schema::episode::dsl::*; - let ep = { - // let tempdb = db.lock().unwrap(); - // dbqueries::get_episode_from_uri(&tempdb, self.uri.unwrap()) - dbqueries::get_episode_from_uri(self.uri.unwrap()) - }; + let ep = dbqueries::get_episode_from_uri(self.uri.unwrap()); + let db = connection(); + let con = db.lock().unwrap(); match ep { Ok(foo) => if foo.title() != self.title || foo.published_date() != self.published_date.as_ref().map(|x| x.as_str()) { // let tempdb = db.lock().unwrap(); - diesel::replace_into(episode).values(self).execute(con)?; - // .execute(&*tempdb)?; + diesel::replace_into(episode).values(self).execute(&*con)?; + // .execute(&tempdb)?; }, Err(_) => { // let tempdb = db.lock().unwrap(); - diesel::insert_into(episode).values(self).execute(con)?; - // .execute(&*tempdb)?; + diesel::insert_into(episode).values(self).execute(&*con)?; + // .execute(&tempdb)?; } } Ok(()) @@ -112,13 +111,17 @@ impl NewPodcast { match pd { Ok(foo) => if foo.link() != self.link { - let tempdb = connection().get().unwrap(); + let db = connection(); + let tempdb = db.lock().unwrap(); + diesel::replace_into(podcast) .values(self) .execute(&*tempdb)?; }, Err(_) => { - let tempdb = connection().get().unwrap(); + let db = connection(); + let tempdb = db.lock().unwrap(); + diesel::insert_into(podcast).values(self).execute(&*tempdb)?; } } diff --git a/hammond-data/src/models/queryables.rs b/hammond-data/src/models/queryables.rs index 14502f2..838817b 100644 --- a/hammond-data/src/models/queryables.rs +++ b/hammond-data/src/models/queryables.rs @@ -129,7 +129,9 @@ impl Episode { } pub fn save(&self) -> QueryResult { - let tempdb = connection().get().unwrap(); + let db = connection(); + let tempdb = db.lock().unwrap(); + self.save_changes::(&*tempdb) } } @@ -226,7 +228,9 @@ impl Podcast { } pub fn save(&self) -> QueryResult { - let tempdb = connection().get().unwrap(); + let db = connection(); + let tempdb = db.lock().unwrap(); + self.save_changes::(&*tempdb) } } @@ -285,7 +289,9 @@ impl<'a> Source { } pub fn save(&self) -> QueryResult { - let tempdb = connection().get().unwrap(); + let db = connection(); + let tempdb = db.lock().unwrap(); + self.save_changes::(&*tempdb) } diff --git a/hammond-data/src/utils.rs b/hammond-data/src/utils.rs index 6b3c362..4cd75de 100644 --- a/hammond-data/src/utils.rs +++ b/hammond-data/src/utils.rs @@ -9,13 +9,14 @@ use models::Episode; use std::path::Path; use std::fs; +use std::io; embed_migrations!("migrations/"); pub fn run_migration_on(connection: &SqliteConnection) -> Result<()> { info!("Running DB Migrations..."); - embedded_migrations::run(connection)?; - // embedded_migrations::run_with_output(connection, &mut std::io::stdout()) + // embedded_migrations::run(connection)?; + embedded_migrations::run_with_output(connection, &mut io::stdout())?; Ok(()) } diff --git a/hammond-gtk/src/utils.rs b/hammond-gtk/src/utils.rs index 62d93ee..b00bfd1 100644 --- a/hammond-gtk/src/utils.rs +++ b/hammond-gtk/src/utils.rs @@ -44,7 +44,6 @@ pub fn refresh_feed(stack: >k::Stack, source: Option>, delay: Opti if let Ok(mut x) = feeds { feed::index(&mut x); - info!("Indexing done."); sender.send(true).expect("Couldn't send data to channel");; glib::idle_add(refresh_podcasts_view);