Switched to using r2d2 instead of a databaseConnection object.

This commit is contained in:
Jordan Petridis 2017-11-23 21:09:50 +02:00
parent aa7c493e81
commit 5570fdd118
No known key found for this signature in database
GPG Key ID: CEABAD9F5683B9A6
7 changed files with 99 additions and 116 deletions

View File

@ -44,7 +44,7 @@ test:stable:
script: script:
- rustc --version && cargo --version - rustc --version && cargo --version
- cargo build --all - cargo build --all
- cargo test --all --verbose - cargo test -j 1 --all --verbose
test:nightly: test:nightly:
# Nightly # Nightly
@ -52,8 +52,8 @@ test:nightly:
image: "rustlang/rust:nightly" image: "rustlang/rust:nightly"
script: script:
- rustc --version && cargo --version - rustc --version && cargo --version
- cargo build --all - cargo build
- cargo test --all --verbose - cargo test -j 1 --verbose
# - cargo bench # - cargo bench
# Configure and run rustfmt on nightly # Configure and run rustfmt on nightly

View File

@ -1,26 +1,23 @@
// use r2d2_diesel::ConnectionManager; use r2d2_diesel::ConnectionManager;
// use diesel::SqliteConnection;
use diesel::prelude::*; use diesel::prelude::*;
use r2d2;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::{Arc, Mutex}; use std::sync::Arc;
use std::io; use std::io;
// use std::time::Duration; use std::time::Duration;
use errors::*; use errors::*;
#[cfg(not(test))] #[cfg(not(test))]
use xdg_; use xdg_;
// type Pool = r2d2::Pool<ConnectionManager<SqliteConnection>>; type Pool = Arc<r2d2::Pool<ConnectionManager<SqliteConnection>>>;
type Database = Arc<Mutex<SqliteConnection>>;
embed_migrations!("migrations/"); embed_migrations!("migrations/");
lazy_static!{ lazy_static!{
// static ref POOL: Pool = init_pool(DB_PATH.to_str().unwrap()); static ref POOL: Pool = init_pool(DB_PATH.to_str().unwrap());
static ref DB: Arc<Mutex<SqliteConnection>> = Arc::new(Mutex::new(establish_connection()));
} }
#[cfg(not(test))] #[cfg(not(test))]
@ -40,35 +37,26 @@ lazy_static! {
static ref DB_PATH: PathBuf = TEMPDIR.path().join("hammond.db"); static ref DB_PATH: PathBuf = TEMPDIR.path().join("hammond.db");
} }
pub fn connection() -> Database { pub fn connection() -> Pool {
// POOL.clone() // Arc::clone(&DB)
Arc::clone(&DB) Arc::clone(&POOL)
} }
// fn init_pool(db_path: &str) -> Pool { fn init_pool(db_path: &str) -> Pool {
// let config = r2d2::Config::builder() let config = r2d2::Config::builder()
// // .pool_size(60) .pool_size(1)
// // .min_idle(Some(60)) .connection_timeout(Duration::from_secs(60))
// // .connection_timeout(Duration::from_secs(60)) .build();
// .build(); let manager = ConnectionManager::<SqliteConnection>::new(db_path);
// let manager = ConnectionManager::<SqliteConnection>::new(db_path); let pool = Arc::new(r2d2::Pool::new(config, manager).expect("Failed to create pool."));
// let pool = r2d2::Pool::new(config, manager).expect("Failed to create pool."); info!("Database pool initialized.");
// info!("Database pool initialized.");
// { {
// let db = pool.clone().get().unwrap(); let db = Arc::clone(&pool).get().unwrap();
// utils::run_migration_on(&*db).unwrap(); run_migration_on(&*db).unwrap();
// } }
// pool pool
// }
pub fn establish_connection() -> SqliteConnection {
let database_url = DB_PATH.to_str().unwrap();
let db = SqliteConnection::establish(database_url)
.expect(&format!("Error connecting to {}", database_url));
run_migration_on(&db).unwrap();
db
} }
pub fn run_migration_on(connection: &SqliteConnection) -> Result<()> { pub fn run_migration_on(connection: &SqliteConnection) -> Result<()> {

View File

@ -12,7 +12,7 @@ pub fn get_sources() -> QueryResult<Vec<Source>> {
use schema::source::dsl::*; use schema::source::dsl::*;
let db = connection(); let db = connection();
let con = db.lock().unwrap(); let con = db.get().unwrap();
source.load::<Source>(&*con) source.load::<Source>(&*con)
} }
@ -20,7 +20,7 @@ pub fn get_podcasts() -> QueryResult<Vec<Podcast>> {
use schema::podcast::dsl::*; use schema::podcast::dsl::*;
let db = connection(); let db = connection();
let con = db.lock().unwrap(); let con = db.get().unwrap();
podcast.load::<Podcast>(&*con) podcast.load::<Podcast>(&*con)
} }
@ -28,7 +28,7 @@ pub fn get_episodes() -> QueryResult<Vec<Episode>> {
use schema::episode::dsl::*; use schema::episode::dsl::*;
let db = connection(); let db = connection();
let con = db.lock().unwrap(); let con = db.get().unwrap();
episode.order(epoch.desc()).load::<Episode>(&*con) episode.order(epoch.desc()).load::<Episode>(&*con)
} }
@ -36,7 +36,7 @@ pub fn get_downloaded_episodes() -> QueryResult<Vec<Episode>> {
use schema::episode::dsl::*; use schema::episode::dsl::*;
let db = connection(); let db = connection();
let con = db.lock().unwrap(); let con = db.get().unwrap();
episode episode
.filter(local_uri.is_not_null()) .filter(local_uri.is_not_null())
.load::<Episode>(&*con) .load::<Episode>(&*con)
@ -46,7 +46,7 @@ pub fn get_played_episodes() -> QueryResult<Vec<Episode>> {
use schema::episode::dsl::*; use schema::episode::dsl::*;
let db = connection(); let db = connection();
let con = db.lock().unwrap(); let con = db.get().unwrap();
episode.filter(played.is_not_null()).load::<Episode>(&*con) episode.filter(played.is_not_null()).load::<Episode>(&*con)
} }
@ -54,7 +54,7 @@ pub fn get_episode_from_id(ep_id: i32) -> QueryResult<Episode> {
use schema::episode::dsl::*; use schema::episode::dsl::*;
let db = connection(); let db = connection();
let con = db.lock().unwrap(); let con = db.get().unwrap();
episode.filter(id.eq(ep_id)).get_result::<Episode>(&*con) episode.filter(id.eq(ep_id)).get_result::<Episode>(&*con)
} }
@ -62,7 +62,7 @@ pub fn get_episode_local_uri_from_id(ep_id: i32) -> QueryResult<Option<String>>
use schema::episode::dsl::*; use schema::episode::dsl::*;
let db = connection(); let db = connection();
let con = db.lock().unwrap(); let con = db.get().unwrap();
episode episode
.filter(id.eq(ep_id)) .filter(id.eq(ep_id))
@ -74,7 +74,7 @@ pub fn get_episodes_with_limit(limit: u32) -> QueryResult<Vec<Episode>> {
use schema::episode::dsl::*; use schema::episode::dsl::*;
let db = connection(); let db = connection();
let con = db.lock().unwrap(); let con = db.get().unwrap();
episode episode
.order(epoch.desc()) .order(epoch.desc())
@ -86,7 +86,7 @@ pub fn get_podcast_from_id(pid: i32) -> QueryResult<Podcast> {
use schema::podcast::dsl::*; use schema::podcast::dsl::*;
let db = connection(); let db = connection();
let con = db.lock().unwrap(); let con = db.get().unwrap();
podcast.filter(id.eq(pid)).get_result::<Podcast>(&*con) podcast.filter(id.eq(pid)).get_result::<Podcast>(&*con)
} }
@ -94,7 +94,7 @@ pub fn get_pd_episodes(parent: &Podcast) -> QueryResult<Vec<Episode>> {
use schema::episode::dsl::*; use schema::episode::dsl::*;
let db = connection(); let db = connection();
let con = db.lock().unwrap(); let con = db.get().unwrap();
Episode::belonging_to(parent) Episode::belonging_to(parent)
.order(epoch.desc()) .order(epoch.desc())
@ -105,7 +105,7 @@ pub fn get_pd_unplayed_episodes(parent: &Podcast) -> QueryResult<Vec<Episode>> {
use schema::episode::dsl::*; use schema::episode::dsl::*;
let db = connection(); let db = connection();
let con = db.lock().unwrap(); let con = db.get().unwrap();
Episode::belonging_to(parent) Episode::belonging_to(parent)
.filter(played.is_null()) .filter(played.is_null())
@ -117,7 +117,7 @@ pub fn get_pd_episodes_limit(parent: &Podcast, limit: u32) -> QueryResult<Vec<Ep
use schema::episode::dsl::*; use schema::episode::dsl::*;
let db = connection(); let db = connection();
let con = db.lock().unwrap(); let con = db.get().unwrap();
Episode::belonging_to(parent) Episode::belonging_to(parent)
.order(epoch.desc()) .order(epoch.desc())
@ -129,7 +129,7 @@ pub fn get_source_from_uri(uri_: &str) -> QueryResult<Source> {
use schema::source::dsl::*; use schema::source::dsl::*;
let db = connection(); let db = connection();
let con = db.lock().unwrap(); let con = db.get().unwrap();
source.filter(uri.eq(uri_)).get_result::<Source>(&*con) source.filter(uri.eq(uri_)).get_result::<Source>(&*con)
} }
@ -137,48 +137,43 @@ pub fn get_podcast_from_title(title_: &str) -> QueryResult<Podcast> {
use schema::podcast::dsl::*; use schema::podcast::dsl::*;
let db = connection(); let db = connection();
let con = db.lock().unwrap(); let con = db.get().unwrap();
podcast podcast
.filter(title.eq(title_)) .filter(title.eq(title_))
.get_result::<Podcast>(&*con) .get_result::<Podcast>(&*con)
} }
pub fn get_episode_from_uri(uri_: &str) -> QueryResult<Episode> { pub fn get_episode_from_uri(con: &SqliteConnection, uri_: &str) -> QueryResult<Episode> {
use schema::episode::dsl::*; use schema::episode::dsl::*;
let db = connection();
let con = db.lock().unwrap();
episode.filter(uri.eq(uri_)).get_result::<Episode>(&*con) episode.filter(uri.eq(uri_)).get_result::<Episode>(&*con)
} }
pub fn remove_feed(pd: &Podcast) -> QueryResult<()> { pub fn remove_feed(pd: &Podcast) -> QueryResult<()> {
delete_source(pd.source_id())?; let db = connection();
delete_podcast(*pd.id())?; let con = db.get().unwrap();
delete_podcast_episodes(*pd.id())?;
delete_source(&con, pd.source_id())?;
delete_podcast(&con, *pd.id())?;
delete_podcast_episodes(&con, *pd.id())?;
Ok(()) Ok(())
} }
pub fn delete_source(source_id: i32) -> QueryResult<usize> { pub fn delete_source(con: &SqliteConnection, source_id: i32) -> QueryResult<usize> {
use schema::source::dsl::*; use schema::source::dsl::*;
let db = connection();
let con = db.lock().unwrap();
diesel::delete(source.filter(id.eq(source_id))).execute(&*con) diesel::delete(source.filter(id.eq(source_id))).execute(&*con)
} }
pub fn delete_podcast(podcast_id: i32) -> QueryResult<usize> { pub fn delete_podcast(con: &SqliteConnection, podcast_id: i32) -> QueryResult<usize> {
use schema::podcast::dsl::*; use schema::podcast::dsl::*;
let db = connection();
let con = db.lock().unwrap();
diesel::delete(podcast.filter(id.eq(podcast_id))).execute(&*con) diesel::delete(podcast.filter(id.eq(podcast_id))).execute(&*con)
} }
pub fn delete_podcast_episodes(parent_id: i32) -> QueryResult<usize> { pub fn delete_podcast_episodes(con: &SqliteConnection, parent_id: i32) -> QueryResult<usize> {
use schema::episode::dsl::*; use schema::episode::dsl::*;
let db = connection();
let con = db.lock().unwrap();
diesel::delete(episode.filter(podcast_id.eq(parent_id))).execute(&*con) diesel::delete(episode.filter(podcast_id.eq(parent_id))).execute(&*con)
} }
@ -186,7 +181,7 @@ pub fn update_none_to_played_now(parent: &Podcast) -> QueryResult<usize> {
use schema::episode::dsl::*; use schema::episode::dsl::*;
let db = connection(); let db = connection();
let con = db.lock().unwrap(); let con = db.get().unwrap();
let epoch_now = Utc::now().timestamp() as i32; let epoch_now = Utc::now().timestamp() as i32;
con.transaction(|| -> QueryResult<usize> { con.transaction(|| -> QueryResult<usize> {
@ -196,44 +191,32 @@ pub fn update_none_to_played_now(parent: &Podcast) -> QueryResult<usize> {
}) })
} }
pub fn insert_new_source(s: &NewSource) -> QueryResult<usize> { pub fn insert_new_source(con: &SqliteConnection, s: &NewSource) -> QueryResult<usize> {
use schema::source::dsl::*; use schema::source::dsl::*;
let db = connection(); diesel::insert_into(source).values(s).execute(&*con)
let tempdb = db.lock().unwrap();
diesel::insert_into(source).values(s).execute(&*tempdb)
} }
pub fn insert_new_podcast(pd: &NewPodcast) -> QueryResult<usize> { pub fn insert_new_podcast(con: &SqliteConnection, pd: &NewPodcast) -> QueryResult<usize> {
use schema::podcast::dsl::*; use schema::podcast::dsl::*;
let db = connection(); diesel::insert_into(podcast).values(pd).execute(&*con)
let tempdb = db.lock().unwrap();
diesel::insert_into(podcast).values(pd).execute(&*tempdb)
} }
pub fn insert_new_episode(ep: &NewEpisode) -> QueryResult<usize> { pub fn insert_new_episode(con: &SqliteConnection, ep: &NewEpisode) -> QueryResult<usize> {
use schema::episode::dsl::*; use schema::episode::dsl::*;
let db = connection(); diesel::insert_into(episode).values(ep).execute(&*con)
let tempdb = db.lock().unwrap();
diesel::insert_into(episode).values(ep).execute(&*tempdb)
} }
pub fn replace_podcast(pd: &NewPodcast) -> QueryResult<usize> { pub fn replace_podcast(con: &SqliteConnection, pd: &NewPodcast) -> QueryResult<usize> {
use schema::podcast::dsl::*; use schema::podcast::dsl::*;
let db = connection(); diesel::replace_into(podcast).values(pd).execute(&*con)
let tempdb = db.lock().unwrap();
diesel::replace_into(podcast).values(pd).execute(&*tempdb)
} }
pub fn replace_episode(ep: &NewEpisode) -> QueryResult<usize> { pub fn replace_episode(con: &SqliteConnection, ep: &NewEpisode) -> QueryResult<usize> {
use schema::episode::dsl::*; use schema::episode::dsl::*;
let db = connection(); diesel::replace_into(episode).values(ep).execute(&*con)
let tempdb = db.lock().unwrap();
diesel::replace_into(episode).values(ep).execute(&*tempdb)
} }

View File

@ -6,6 +6,7 @@ use dbqueries;
use parser; use parser;
use models::{Episode, NewEpisode, NewPodcast, Podcast, Source}; use models::{Episode, NewEpisode, NewPodcast, Podcast, Source};
use database::connection;
use errors::*; use errors::*;
#[derive(Debug)] #[derive(Debug)]
@ -33,18 +34,20 @@ impl Feed {
Ok(()) Ok(())
} }
#[allow(dead_code)] // #[allow(dead_code)]
fn index_channel(&self) -> Result<()> { // fn index_channel(&self) -> Result<()> {
self.parse_channel().index()?; // self.parse_channel().index()?;
Ok(()) // Ok(())
} // }
// TODO: Refactor transcactions and find a way to do it in parallel. // TODO: Refactor transcactions and find a way to do it in parallel.
fn index_channel_items(&self, pd: &Podcast) -> Result<()> { fn index_channel_items(&self, pd: &Podcast) -> Result<()> {
let episodes = self.parse_channel_items(pd); let episodes = self.parse_channel_items(pd);
let db = connection();
let con = db.get().unwrap();
episodes.into_iter().for_each(|x| { episodes.into_iter().for_each(|x| {
let e = x.index(); let e = x.index(&con);
if let Err(err) = e { if let Err(err) = e {
error!("Failed to index episode: {:?}.", x); error!("Failed to index episode: {:?}.", x);
error!("Error msg: {}", err); error!("Error msg: {}", err);
@ -74,10 +77,13 @@ impl Feed {
#[allow(dead_code)] #[allow(dead_code)]
fn get_episodes(&self) -> Result<Vec<Episode>> { fn get_episodes(&self) -> Result<Vec<Episode>> {
let pd = self.get_podcast()?; let pd = self.get_podcast()?;
let eps = self.parse_channel_items(&pd);
let episodes: Vec<_> = self.parse_channel_items(&pd) let db = connection();
.into_par_iter() let con = db.get().unwrap();
.filter_map(|ep| ep.into_episode().ok()) // TODO: Make it parallel
let episodes: Vec<_> = eps.into_iter()
.filter_map(|ep| ep.into_episode(&con).ok())
.collect(); .collect();
Ok(episodes) Ok(episodes)
@ -135,7 +141,7 @@ pub fn fetch(feeds: Vec<Source>) -> Vec<Feed> {
mod tests { mod tests {
use rss; use rss;
use models::NewSource; use models::Source;
use std::fs; use std::fs;
use std::io::BufReader; use std::io::BufReader;
@ -152,8 +158,9 @@ mod tests {
"http://feeds.feedburner.com/linuxunplugged", "http://feeds.feedburner.com/linuxunplugged",
]; ];
inpt.iter().for_each(|feed| { inpt.iter().for_each(|url| {
NewSource::new_with_uri(feed).into_source().unwrap(); // Index the urls into the source table.
Source::from_url(url).unwrap();
}); });
index_all().unwrap(); index_all().unwrap();
@ -187,7 +194,7 @@ mod tests {
let mut feeds: Vec<_> = urls.iter() let mut feeds: Vec<_> = urls.iter()
.map(|&(path, url)| { .map(|&(path, url)| {
// Create and insert a Source into db // Create and insert a Source into db
let s = NewSource::new_with_uri(url).into_source().unwrap(); let s = Source::from_url(url).unwrap();
// open the xml file // open the xml file
let feed = fs::File::open(path).unwrap(); let feed = fs::File::open(path).unwrap();

View File

@ -6,6 +6,7 @@ use utils::url_cleaner;
use errors::*; use errors::*;
use dbqueries; use dbqueries;
use database::connection;
#[derive(Insertable)] #[derive(Insertable)]
#[table_name = "source"] #[table_name = "source"]
@ -27,9 +28,12 @@ impl NewSource {
} }
fn index(&self) { fn index(&self) {
let db = connection();
let con = db.get().unwrap();
// Throw away the result like `insert or ignore` // Throw away the result like `insert or ignore`
// Diesel deos not support `insert or ignore` yet. // Diesel deos not support `insert or ignore` yet.
let _ = dbqueries::insert_new_source(self); let _ = dbqueries::insert_new_source(&con, self);
} }
// Look out for when tryinto lands into stable. // Look out for when tryinto lands into stable.
@ -58,13 +62,13 @@ impl NewEpisode {
// TODO: Currently using diesel from master git. // TODO: Currently using diesel from master git.
// Watch out for v0.99.0 beta and change the toml. // Watch out for v0.99.0 beta and change the toml.
// TODO: Refactor into batch indexes instead. // TODO: Refactor into batch indexes instead.
pub fn into_episode(self) -> Result<Episode> { pub fn into_episode(self, con: &SqliteConnection) -> Result<Episode> {
self.index()?; self.index(con)?;
Ok(dbqueries::get_episode_from_uri(&self.uri.unwrap())?) Ok(dbqueries::get_episode_from_uri(con, &self.uri.unwrap())?)
} }
pub fn index(&self) -> QueryResult<()> { pub fn index(&self, con: &SqliteConnection) -> QueryResult<()> {
let ep = dbqueries::get_episode_from_uri(&self.uri.clone().unwrap()); let ep = dbqueries::get_episode_from_uri(con, &self.uri.clone().unwrap());
match ep { match ep {
Ok(foo) => { Ok(foo) => {
@ -75,11 +79,11 @@ impl NewEpisode {
if foo.title() != self.title.as_ref().map(|x| x.as_str()) if foo.title() != self.title.as_ref().map(|x| x.as_str())
|| foo.published_date() != self.published_date.as_ref().map(|x| x.as_str()) || foo.published_date() != self.published_date.as_ref().map(|x| x.as_str())
{ {
dbqueries::replace_episode(self)?; dbqueries::replace_episode(con, self)?;
} }
} }
Err(_) => { Err(_) => {
dbqueries::insert_new_episode(self)?; dbqueries::insert_new_episode(con, self)?;
} }
} }
Ok(()) Ok(())
@ -107,6 +111,8 @@ impl NewPodcast {
pub fn index(&self) -> QueryResult<()> { pub fn index(&self) -> QueryResult<()> {
let pd = dbqueries::get_podcast_from_title(&self.title); let pd = dbqueries::get_podcast_from_title(&self.title);
let db = connection();
let con = db.get().unwrap();
match pd { match pd {
Ok(foo) => { Ok(foo) => {
if foo.source_id() != self.source_id { if foo.source_id() != self.source_id {
@ -114,11 +120,11 @@ impl NewPodcast {
}; };
if foo.link() != self.link { if foo.link() != self.link {
dbqueries::replace_podcast(self)?; dbqueries::replace_podcast(&con, self)?;
} }
} }
Err(_) => { Err(_) => {
dbqueries::insert_new_podcast(self)?; dbqueries::insert_new_podcast(&con, self)?;
} }
} }
Ok(()) Ok(())

View File

@ -133,7 +133,7 @@ impl Episode {
pub fn save(&self) -> QueryResult<Episode> { pub fn save(&self) -> QueryResult<Episode> {
let db = connection(); let db = connection();
let tempdb = db.lock().unwrap(); let tempdb = db.get().unwrap();
self.save_changes::<Episode>(&*tempdb) self.save_changes::<Episode>(&*tempdb)
} }
@ -215,7 +215,7 @@ impl Podcast {
pub fn save(&self) -> QueryResult<Podcast> { pub fn save(&self) -> QueryResult<Podcast> {
let db = connection(); let db = connection();
let tempdb = db.lock().unwrap(); let tempdb = db.get().unwrap();
self.save_changes::<Podcast>(&*tempdb) self.save_changes::<Podcast>(&*tempdb)
} }
@ -276,7 +276,7 @@ impl<'a> Source {
pub fn save(&self) -> QueryResult<Source> { pub fn save(&self) -> QueryResult<Source> {
let db = connection(); let db = connection();
let tempdb = db.lock().unwrap(); let tempdb = db.get().unwrap();
self.save_changes::<Source>(&*tempdb) self.save_changes::<Source>(&*tempdb)
} }

View File

@ -7,7 +7,6 @@ wrap_comments = true
error_on_line_overflow = true error_on_line_overflow = true
error_on_line_overflow_comments = true error_on_line_overflow_comments = true
tab_spaces = 4 tab_spaces = 4
fn_call_width = 60
newline_style = "Unix" newline_style = "Unix"
fn_call_style = "Block" fn_call_style = "Block"
report_todo = "Never" report_todo = "Never"