From 55442529a8952868969af1dc89c152451c198280 Mon Sep 17 00:00:00 2001 From: Jordan Petridis Date: Sun, 19 Nov 2017 22:06:10 +0200 Subject: [PATCH] Initial switch to using r2d2. --- Cargo.lock | 38 +++++++++ hammond-data/Cargo.toml | 3 +- hammond-data/benches/bench.rs | 44 ++-------- hammond-data/src/dbqueries.rs | 24 ++++-- hammond-data/src/feed.rs | 109 ++++++++----------------- hammond-data/src/lib.rs | 48 ++++++++++- hammond-data/src/models/insertables.rs | 31 ++++--- hammond-data/src/models/queryables.rs | 15 ++-- hammond-data/src/utils.rs | 12 +++ 9 files changed, 175 insertions(+), 149 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 51f2af0..b198188 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -25,6 +25,11 @@ name = "ansi_term" version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "antidote" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "atk-sys" version = "0.4.0" @@ -544,6 +549,8 @@ dependencies = [ "error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", + "r2d2 0.7.4 (registry+https://github.com/rust-lang/crates.io-index)", + "r2d2-diesel 0.16.0 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)", "rayon 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", "reqwest 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1009,6 +1016,25 @@ name = "quote" version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "r2d2" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "antidote 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", + "scheduled-thread-pool 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "r2d2-diesel" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "diesel 0.16.0 (git+https://github.com/diesel-rs/diesel.git)", + "r2d2 0.7.4 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "rand" version = "0.3.18" @@ -1148,6 +1174,14 @@ dependencies = [ "winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "scheduled-thread-pool" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "antidote 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "scoped-tls" version = "0.1.0" @@ -1521,6 +1555,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum advapi32-sys 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e06588080cb19d0acb6739808aafa5f26bfb2ca015b2b6370028b44cf7cb8a9a" "checksum aho-corasick 0.6.3 (registry+https://github.com/rust-lang/crates.io-index)" = "500909c4f87a9e52355b26626d890833e9e1d53ac566db76c36faa984b889699" "checksum ansi_term 0.10.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6b3568b48b7cefa6b8ce125f9bb4989e52fbcc29ebea88df04cc7c5f12f70455" +"checksum antidote 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "34fde25430d87a9388dadbe6e34d7f72a462c8b43ac8d309b42b0a8505d7e2a5" "checksum atk-sys 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c69658a4e18d5c9575f716e24559645d08a4044d6946c30c2e0025952c84d842" "checksum atty 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "21e50800ec991574876040fff8ee46b136a53e985286fbe6a3bdfe6421b78860" "checksum backtrace 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "8709cc7ec06f6f0ae6c2c7e12f6ed41540781f72b488d83734978295ceae182e" @@ -1622,6 +1657,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum precomputed-hash 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "925383efa346730478fb4838dbe9137d2a47675ad789c546d150a6e1dd4ab31c" "checksum quick-xml 0.9.4 (registry+https://github.com/rust-lang/crates.io-index)" = "19a3a610544419c527d5f51ae1a6ae3db533e25c117d3eed8fce6434f70c5e95" "checksum quote 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)" = "7a6e920b65c65f10b2ae65c831a81a073a89edd28c7cce89475bff467ab4167a" +"checksum r2d2 0.7.4 (registry+https://github.com/rust-lang/crates.io-index)" = "2c8284508b38df440f8f3527395e23c4780b22f74226b270daf58fee38e4bcce" +"checksum r2d2-diesel 0.16.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f6b921696a6c45991296d21b52ed973b9fb56f6c47524fda1f99458c2d6c0478" "checksum rand 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)" = "6475140dfd8655aeb72e1fd4b7a1cc1c202be65d71669476e392fe62532b9edd" "checksum rayon 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ed02d09394c94ffbdfdc755ad62a132e94c3224a8354e78a1200ced34df12edf" "checksum rayon-core 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e64b609139d83da75902f88fd6c01820046840a18471e4dfcd5ac7c0f46bea53" @@ -1637,6 +1674,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum rustc-serialize 0.3.24 (registry+https://github.com/rust-lang/crates.io-index)" = "dcf128d1287d2ea9d80910b5f1120d0b8eede3fbf1abe91c40d39ea7d51e6fda" "checksum safemem 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e27a8b19b835f7aea908818e871f5cc3a5a186550c30773be987e155e8163d8f" "checksum schannel 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "7554288337c1110e34d7a2433518d889374c1de1a45f856b7bcddb03702131fc" +"checksum scheduled-thread-pool 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2d9fbe48ead32343b76f544c85953bf260ed39219a8bbbb62cd85f6a00f9644f" "checksum scoped-tls 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f417c22df063e9450888a7561788e9bd46d3bb3c1466435b4eccb903807f147d" "checksum scopeguard 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "94258f53601af11e6a49f722422f6e3425c52b06245a5cf9bc09908b174f5e27" "checksum secur32-sys 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3f412dfa83308d893101dd59c10d6fda8283465976c28c287c5c855bf8d216bc" diff --git a/hammond-data/Cargo.toml b/hammond-data/Cargo.toml index ed09934..211db5f 100644 --- a/hammond-data/Cargo.toml +++ b/hammond-data/Cargo.toml @@ -10,6 +10,8 @@ dotenv = "0.10.1" error-chain = "0.11.0" lazy_static = "0.2.10" log = "0.3.8" +r2d2 = "0.7.4" +r2d2-diesel = "0.16.0" rayon = "0.9.0" reqwest = "0.8.1" rfc822_sanitizer = "0.3.3" @@ -23,7 +25,6 @@ git = "https://github.com/diesel-rs/diesel.git" [dependencies.diesel_codegen] features = ["sqlite"] -# version = "0.16.0" git = "https://github.com/diesel-rs/diesel.git" [dev-dependencies] diff --git a/hammond-data/benches/bench.rs b/hammond-data/benches/bench.rs index f3a7599..5516cbf 100644 --- a/hammond-data/benches/bench.rs +++ b/hammond-data/benches/bench.rs @@ -8,22 +8,14 @@ extern crate rss; extern crate tempdir; extern crate test; -use diesel::prelude::*; use rayon::prelude::*; -use rand::Rng; use test::Bencher; -use hammond_data::utils::run_migration_on; use hammond_data::models::NewSource; use hammond_data::feed::{index, Feed}; -use hammond_data::Database; use std::io::BufReader; -use std::path::PathBuf; -use std::sync::{Arc, Mutex}; - -struct TempDB(tempdir::TempDir, PathBuf, SqliteConnection); // Big rss feed const PCPER: &[u8] = include_bytes!("feeds/pcpermp3.xml"); @@ -40,57 +32,35 @@ static URLS: &[(&[u8], &str)] = &[ (LAS, "https://feeds2.feedburner.com/TheLinuxActionShow"), ]; -/// Create and return a Temporary DB. -/// Will be destroed once the returned variable(s) is dropped. -fn get_temp_db() -> TempDB { - let mut rng = rand::thread_rng(); - - let tmp_dir = tempdir::TempDir::new("hammond_unit_test").unwrap(); - let db_path = tmp_dir - .path() - .join(format!("hammonddb_{}.db", rng.gen::())); - - let db = SqliteConnection::establish(db_path.to_str().unwrap()).unwrap(); - run_migration_on(&db).unwrap(); - - TempDB(tmp_dir, db_path, db) -} - -fn index_urls(m: &Database) { +fn index_urls() { URLS.par_iter() .map(|&(buff, url)| { // Create and insert a Source into db - let s = NewSource::new_with_uri(url).into_source(m).unwrap(); + let s = NewSource::new_with_uri(url).into_source().unwrap(); // parse it into a channel let chan = rss::Channel::read_from(BufReader::new(buff)).unwrap(); - Feed::new_from_channel_source(chan, s) + Feed::from_channel_source(chan, s) }) .for_each(|feed| { - index(m, &mut [feed]); + index(&mut [feed]); }); } #[bench] fn bench_index_feeds(b: &mut Bencher) { - let TempDB(_tmp_dir, _db_path, db) = get_temp_db(); - let m = Arc::new(Mutex::new(db)); - b.iter(|| { - index_urls(&Arc::clone(&m)); + index_urls(); }); } #[bench] fn bench_index_unchanged_feeds(b: &mut Bencher) { - let TempDB(_tmp_dir, _db_path, db) = get_temp_db(); - let m = Arc::new(Mutex::new(db)); - // Index first so it will only bench the comparison test case. - index_urls(&Arc::clone(&m)); + index_urls(); b.iter(|| { for _ in 0..10 { - index_urls(&Arc::clone(&m)); + index_urls(); } }); } diff --git a/hammond-data/src/dbqueries.rs b/hammond-data/src/dbqueries.rs index 679f5b8..18701b9 100644 --- a/hammond-data/src/dbqueries.rs +++ b/hammond-data/src/dbqueries.rs @@ -8,22 +8,29 @@ use chrono::prelude::*; /// Random db querries helper functions. /// Probably needs cleanup. -pub fn get_sources(con: &SqliteConnection) -> QueryResult> { +use POOL; + +pub fn get_sources() -> QueryResult> { use schema::source::dsl::*; - source.load::(con) + let con = POOL.get().unwrap(); + let s = source.load::(&*con); + // s.iter().for_each(|x| println!("{:#?}", x)); + s } -pub fn get_podcasts(con: &SqliteConnection) -> QueryResult> { +pub fn get_podcasts() -> QueryResult> { use schema::podcast::dsl::*; - podcast.load::(con) + let con = POOL.get().unwrap(); + podcast.load::(&*con) } -pub fn get_episodes(con: &SqliteConnection) -> QueryResult> { +pub fn get_episodes() -> QueryResult> { use schema::episode::dsl::*; - episode.order(epoch.desc()).load::(con) + let con = POOL.get().unwrap(); + episode.order(epoch.desc()).load::(&*con) } pub fn get_downloaded_episodes(con: &SqliteConnection) -> QueryResult> { @@ -104,10 +111,11 @@ pub fn get_pd_episodes_limit( .load::(con) } -pub fn get_source_from_uri(con: &SqliteConnection, uri_: &str) -> QueryResult { +pub fn get_source_from_uri(uri_: &str) -> QueryResult { use schema::source::dsl::*; - source.filter(uri.eq(uri_)).get_result::(con) + let con = POOL.get().unwrap(); + source.filter(uri.eq(uri_)).get_result::(&*con) } pub fn get_podcast_from_title(con: &SqliteConnection, title_: &str) -> QueryResult { diff --git a/hammond-data/src/feed.rs b/hammond-data/src/feed.rs index 66abdce..093a63c 100644 --- a/hammond-data/src/feed.rs +++ b/hammond-data/src/feed.rs @@ -6,13 +6,11 @@ use rss; use dbqueries; use parser; -use Database; +use POOL; use models::{Podcast, Source}; use errors::*; -use std::sync::Arc; - #[derive(Debug)] pub struct Feed { @@ -21,42 +19,42 @@ pub struct Feed { } impl Feed { - pub fn new_from_source(db: &Database, s: Source) -> Result { - s.refresh(db) + pub fn from_source(s: Source) -> Result { + s.refresh() } - pub fn new_from_channel_source(chan: rss::Channel, s: Source) -> Feed { + pub fn from_channel_source(chan: rss::Channel, s: Source) -> Feed { Feed { channel: chan, source: s, } } - fn index(&self, db: &Database) -> Result<()> { - let pd = self.index_channel(db)?; + fn index(&self) -> Result<()> { + let pd = self.index_channel()?; - self.index_channel_items(db, &pd)?; + self.index_channel_items(&pd)?; Ok(()) } - fn index_channel(&self, db: &Database) -> Result { + fn index_channel(&self) -> Result { let pd = parser::new_podcast(&self.channel, *self.source.id()); // Convert NewPodcast to Podcast - pd.into_podcast(db) + pd.into_podcast() } // TODO: Refactor transcactions and find a way to do it in parallel. - fn index_channel_items(&self, db: &Database, pd: &Podcast) -> Result<()> { + fn index_channel_items(&self, pd: &Podcast) -> Result<()> { let items = self.channel.items(); let episodes: Vec<_> = items .into_par_iter() .map(|item| parser::new_episode(item, *pd.id())) .collect(); - let tempdb = db.lock().unwrap(); + let tempdb = POOL.clone().get().unwrap(); let _ = tempdb.transaction::<(), Error, _>(|| { episodes.into_iter().for_each(|x| { - let e = x.index(&tempdb); + let e = x.index(&*tempdb); if let Err(err) = e { error!("Failed to index episode: {:?}.", x); error!("Error msg: {}", err); @@ -68,17 +66,17 @@ impl Feed { } } -pub fn index_all(db: &Database) -> Result<()> { - let mut f = fetch_all(db)?; +pub fn index_all() -> Result<()> { + let mut f = fetch_all()?; - index(db, &mut f); + index(&mut f); info!("Indexing done."); Ok(()) } -pub fn index(db: &Database, feeds: &mut [Feed]) { +pub fn index(feeds: &mut [Feed]) { feeds.into_par_iter().for_each(|f| { - let e = f.index(&Arc::clone(db)); + let e = f.index(); if e.is_err() { error!("Error While trying to update the database."); error!("Error msg: {}", e.unwrap_err()); @@ -86,22 +84,19 @@ pub fn index(db: &Database, feeds: &mut [Feed]) { }); } -pub fn fetch_all(db: &Database) -> Result> { - let feeds = { - let conn = db.lock().unwrap(); - dbqueries::get_sources(&conn)? - }; +pub fn fetch_all() -> Result> { + let feeds = dbqueries::get_sources()?; - let results = fetch(db, feeds); + let results = fetch(feeds); Ok(results) } -pub fn fetch(db: &Database, feeds: Vec) -> Vec { +pub fn fetch(feeds: Vec) -> Vec { let results: Vec<_> = feeds .into_par_iter() .filter_map(|x| { let uri = x.uri().to_owned(); - let l = Feed::new_from_source(&Arc::clone(db), x); + let l = Feed::from_source(x); if l.is_ok() { l.ok() } else { @@ -118,46 +113,17 @@ pub fn fetch(db: &Database, feeds: Vec) -> Vec { #[cfg(test)] mod tests { - extern crate rand; - extern crate tempdir; - - use diesel::prelude::*; use rss; - use self::rand::Rng; use models::NewSource; - use utils::run_migration_on; - use std::io::BufReader; - use std::path::PathBuf; use std::fs; - use std::sync::Mutex; + use std::io::BufReader; use super::*; - struct TempDB(tempdir::TempDir, PathBuf, SqliteConnection); - - /// Create and return a Temporary DB. - /// Will be destroed once the returned variable(s) is dropped. - fn get_temp_db() -> TempDB { - let mut rng = rand::thread_rng(); - - let tmp_dir = tempdir::TempDir::new("hammond_unit_test").unwrap(); - let db_path = tmp_dir - .path() - .join(format!("hammonddb_{}.db", rng.gen::())); - - let db = SqliteConnection::establish(db_path.to_str().unwrap()).unwrap(); - run_migration_on(&db).unwrap(); - - TempDB(tmp_dir, db_path, db) - } - #[test] /// Insert feeds and update/index them. fn test_index_loop() { - let TempDB(_tmp_dir, _db_path, db) = get_temp_db(); - let db = Arc::new(Mutex::new(db)); - let inpt = vec![ "https://request-for-explanation.github.io/podcast/rss.xml", "https://feeds.feedburner.com/InterceptedWithJeremyScahill", @@ -166,23 +132,17 @@ mod tests { ]; inpt.iter().for_each(|feed| { - NewSource::new_with_uri(feed) - .into_source(&db.clone()) - .unwrap(); + NewSource::new_with_uri(feed).into_source().unwrap(); }); - index_all(&db).unwrap(); + index_all().unwrap(); // Run again to cover Unique constrains erros. - index_all(&db).unwrap(); + index_all().unwrap(); } #[test] fn test_complete_index() { - let TempDB(_tmp_dir, _db_path, db) = get_temp_db(); - // complete_index runs in parallel so it requires a mutex as argument. - let m = Arc::new(Mutex::new(db)); - // vec of (path, url) tuples. let urls = vec![ ( @@ -195,7 +155,7 @@ mod tests { ), ( "tests/feeds/TheBreakthrough.xml", - "http://feeds.feedburner.com/propublica/podcast", + "http://feeds.propublica.org/propublica/podcast", ), ( "tests/feeds/R4Explanation.xml", @@ -206,25 +166,22 @@ mod tests { let mut feeds: Vec<_> = urls.iter() .map(|&(path, url)| { // Create and insert a Source into db - let s = NewSource::new_with_uri(url) - .into_source(&m.clone()) - .unwrap(); + let s = NewSource::new_with_uri(url).into_source().unwrap(); // open the xml file let feed = fs::File::open(path).unwrap(); // parse it into a channel let chan = rss::Channel::read_from(BufReader::new(feed)).unwrap(); - Feed::new_from_channel_source(chan, s) + Feed::from_channel_source(chan, s) }) .collect(); // Index the channels - index(&m, &mut feeds); + index(&mut feeds); // Assert the index rows equal the controlled results - let tempdb = m.lock().unwrap(); - assert_eq!(dbqueries::get_sources(&tempdb).unwrap().len(), 4); - assert_eq!(dbqueries::get_podcasts(&tempdb).unwrap().len(), 4); - assert_eq!(dbqueries::get_episodes(&tempdb).unwrap().len(), 274); + assert_eq!(dbqueries::get_sources().unwrap().len(), 4); + assert_eq!(dbqueries::get_podcasts().unwrap().len(), 4); + assert_eq!(dbqueries::get_episodes().unwrap().len(), 274); } } diff --git a/hammond-data/src/lib.rs b/hammond-data/src/lib.rs index 17bebf0..20206a1 100644 --- a/hammond-data/src/lib.rs +++ b/hammond-data/src/lib.rs @@ -16,6 +16,8 @@ extern crate diesel; extern crate diesel_codegen; extern crate chrono; +extern crate r2d2; +extern crate r2d2_diesel; extern crate rayon; extern crate reqwest; extern crate rfc822_sanitizer; @@ -55,11 +57,49 @@ lazy_static!{ HAMMOND_XDG.create_cache_directory(HAMMOND_XDG.get_cache_home()).unwrap() }; - static ref DB_PATH: PathBuf = { - HAMMOND_XDG.place_data_file("hammond.db").unwrap() - }; - pub static ref DL_DIR: PathBuf = { HAMMOND_XDG.create_data_directory("Downloads").unwrap() }; + + pub static ref DB_PATH: PathBuf = HAMMOND_XDG.place_data_file("hammond.db").unwrap(); +} + +#[cfg(not(test))] +lazy_static! { + pub static ref POOL: utils::Pool = utils::init_pool(DB_PATH.to_str().unwrap()); +} + +#[cfg(test)] +lazy_static! { + static ref TEMPDB: TempDB = get_temp_db(); + + pub static ref POOL: &'static utils::Pool = &TEMPDB.2; +} + +#[cfg(test)] +struct TempDB(tempdir::TempDir, PathBuf, utils::Pool); + +#[cfg(test)] +extern crate rand; +#[cfg(test)] +extern crate tempdir; +#[cfg(test)] +use rand::Rng; + +#[cfg(test)] +/// Create and return a Temporary DB. +/// Will be destroed once the returned variable(s) is dropped. +fn get_temp_db() -> TempDB { + let mut rng = rand::thread_rng(); + + let tmp_dir = tempdir::TempDir::new("hammond_unit_test").unwrap(); + let db_path = tmp_dir + .path() + .join(format!("hammonddb_{}.db", rng.gen::())); + + let pool = utils::init_pool(db_path.to_str().unwrap()); + let db = pool.get().unwrap(); + utils::run_migration_on(&db).unwrap(); + + TempDB(tmp_dir, db_path, pool) } diff --git a/hammond-data/src/models/insertables.rs b/hammond-data/src/models/insertables.rs index 65e2598..507b12e 100644 --- a/hammond-data/src/models/insertables.rs +++ b/hammond-data/src/models/insertables.rs @@ -3,7 +3,7 @@ use diesel; use schema::{episode, podcast, source}; use models::{Podcast, Source}; -use Database; +use POOL; use errors::*; use dbqueries; @@ -26,21 +26,20 @@ impl<'a> NewSource<'a> { } } - fn index(&self, db: &Database) { + fn index(&self) { use schema::source::dsl::*; - let tempdb = db.lock().unwrap(); + let tempdb = POOL.clone().get().unwrap(); // Throw away the result like `insert or ignore` // Diesel deos not support `insert or ignore` yet. let _ = diesel::insert_into(source).values(self).execute(&*tempdb); } // Look out for when tryinto lands into stable. - pub fn into_source(self, db: &Database) -> QueryResult { - self.index(db); + pub fn into_source(self) -> QueryResult { + self.index(); - let tempdb = db.lock().unwrap(); - dbqueries::get_source_from_uri(&tempdb, self.uri) + dbqueries::get_source_from_uri(self.uri) } } @@ -104,28 +103,28 @@ pub struct NewPodcast { impl NewPodcast { // Look out for when tryinto lands into stable. - pub fn into_podcast(self, db: &Database) -> Result { - self.index(db)?; - let tempdb = db.lock().unwrap(); - Ok(dbqueries::get_podcast_from_title(&tempdb, &self.title)?) + pub fn into_podcast(self) -> Result { + self.index()?; + let tempdb = POOL.clone().get().unwrap(); + Ok(dbqueries::get_podcast_from_title(&*tempdb, &self.title)?) } - fn index(&self, db: &Database) -> QueryResult<()> { + fn index(&self) -> QueryResult<()> { use schema::podcast::dsl::*; let pd = { - let tempdb = db.lock().unwrap(); - dbqueries::get_podcast_from_title(&tempdb, &self.title) + let tempdb = POOL.clone().get().unwrap(); + dbqueries::get_podcast_from_title(&*tempdb, &self.title) }; match pd { Ok(foo) => if foo.link() != self.link { - let tempdb = db.lock().unwrap(); + let tempdb = POOL.clone().get().unwrap(); diesel::replace_into(podcast) .values(self) .execute(&*tempdb)?; }, Err(_) => { - let tempdb = db.lock().unwrap(); + let tempdb = POOL.clone().get().unwrap(); diesel::insert_into(podcast).values(self).execute(&*tempdb)?; } } diff --git a/hammond-data/src/models/queryables.rs b/hammond-data/src/models/queryables.rs index be31df6..a9eb1b0 100644 --- a/hammond-data/src/models/queryables.rs +++ b/hammond-data/src/models/queryables.rs @@ -11,6 +11,7 @@ use errors::*; use models::insertables::NewPodcast; use Database; +use POOL; use std::io::Read; use std::str::FromStr; @@ -265,7 +266,7 @@ impl<'a> Source { /// Extract Etag and LastModifier from req, and update self and the /// corresponding db row. - fn update_etag(&mut self, db: &Database, req: &reqwest::Response) -> Result<()> { + fn update_etag(&mut self, req: &reqwest::Response) -> Result<()> { let headers = req.headers(); // let etag = headers.get_raw("ETag").unwrap(); @@ -278,18 +279,18 @@ impl<'a> Source { { self.http_etag = etag.map(|x| x.tag().to_string().to_owned()); self.last_modified = lmod.map(|x| format!("{}", x)); - self.save(db)?; + self.save()?; } Ok(()) } - pub fn save(&self, db: &Database) -> QueryResult { - let tempdb = db.lock().unwrap(); + pub fn save(&self) -> QueryResult { + let tempdb = POOL.clone().get().unwrap(); self.save_changes::(&*tempdb) } - pub fn refresh(mut self, db: &Database) -> Result { + pub fn refresh(mut self) -> Result { use reqwest::header::{ETag, EntityTag, Headers, HttpDate, LastModified}; let mut headers = Headers::new(); @@ -322,12 +323,12 @@ impl<'a> Source { // _ => (), // }; - self.update_etag(db, &req)?; + self.update_etag(&req)?; let mut buf = String::new(); req.read_to_string(&mut buf)?; let chan = Channel::from_str(&buf)?; - Ok(Feed::new_from_channel_source(chan, self)) + Ok(Feed::from_channel_source(chan, self)) } } diff --git a/hammond-data/src/utils.rs b/hammond-data/src/utils.rs index 572cdf8..112ee11 100644 --- a/hammond-data/src/utils.rs +++ b/hammond-data/src/utils.rs @@ -2,6 +2,10 @@ use rayon::prelude::*; use diesel::prelude::*; use chrono::prelude::*; +use r2d2; +use diesel::sqlite::SqliteConnection; +use r2d2_diesel::ConnectionManager; + use errors::*; use dbqueries; use Database; @@ -15,11 +19,19 @@ use DB_PATH; embed_migrations!("migrations/"); +pub type Pool = r2d2::Pool>; + pub fn init() -> Result<()> { let conn = establish_connection(); run_migration_on(&conn) } +pub fn init_pool(db_path: &str) -> Pool { + let config = r2d2::Config::default(); + let manager = ConnectionManager::::new(db_path); + r2d2::Pool::new(config, manager).expect("Failed to create pool.") +} + pub fn run_migration_on(connection: &SqliteConnection) -> Result<()> { info!("Running DB Migrations..."); embedded_migrations::run(connection)?;