diff --git a/Cargo.toml b/Cargo.toml index 32f7478..612ddd6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0" authors = ["Jordan Petridis "] [dependencies] +rayon = "0.8.2" error-chain = "0.11.0" structopt = "0.1.0" structopt-derive = "0.1.0" diff --git a/src/feedparser.rs b/src/feedparser.rs index 038ec85..bd9039a 100644 --- a/src/feedparser.rs +++ b/src/feedparser.rs @@ -55,7 +55,7 @@ pub fn parse_episode<'a>(item: &'a Item, parent_id: i32) -> Result bar.timestamp() as i32, diff --git a/src/index_feed.rs b/src/index_feed.rs index 631467f..19ecf15 100644 --- a/src/index_feed.rs +++ b/src/index_feed.rs @@ -2,12 +2,14 @@ use diesel::prelude::*; use diesel; use rss; use reqwest; +use rayon::prelude::*; +use std::sync::{Arc, Mutex}; use schema; use dbqueries; use feedparser; use errors::*; -use models::{Episode, NewSource, Podcast, Source}; +use models::{Episode, NewEpisode, NewSource, Podcast, Source}; pub fn foo() { let inpt = vec![ @@ -28,7 +30,7 @@ pub fn foo() { } } - index_loop(&db).unwrap(); + index_loop(db).unwrap(); } fn insert_source(con: &SqliteConnection, url: &str) -> Result { @@ -70,8 +72,8 @@ fn index_podcast( Ok(dbqueries::load_podcast(con, &pd.title)?) } -fn index_episode(con: &SqliteConnection, item: &rss::Item, parent: &Podcast) -> Result { - let ep = feedparser::parse_episode(item, parent.id())?; +fn index_episode(con: &SqliteConnection, ep: &NewEpisode) -> Result { + // let ep = feedparser::parse_episode(item, parent.id())?; match dbqueries::load_episode(con, &ep.uri.unwrap()) { Ok(mut foo) => if foo.title() != ep.title || foo.published_date() != ep.published_date { @@ -84,41 +86,59 @@ fn index_episode(con: &SqliteConnection, item: &rss::Item, parent: &Podcast) -> foo.save_changes::(con)?; }, Err(_) => { - diesel::insert(&ep) - .into(schema::episode::table) - .execute(con)?; + diesel::insert(ep).into(schema::episode::table).execute(con)?; } } Ok(dbqueries::load_episode(con, &ep.uri.unwrap())?) } -pub fn index_loop(db: &SqliteConnection) -> Result<()> { +pub fn index_loop(db: SqliteConnection) -> Result<()> { use std::io::Read; use std::str::FromStr; - let mut f = fetch_feeds(db)?; + let mut f = fetch_feeds(&db)?; + let bar = Arc::new(Mutex::new(db)); for &mut (ref mut req, ref source) in f.iter_mut() { let mut buf = String::new(); req.read_to_string(&mut buf)?; let chan = rss::Channel::from_str(&buf)?; - let pd = index_podcast(db, &chan, source)?; + let mut pd = Podcast::new(); - let _: Vec<_> = chan.items() - .iter() - .map(|x| index_episode(db, &x, &pd)) + { + let fakedb = bar.lock().unwrap(); + pd = index_podcast(&fakedb, &chan, source)?; + } + + let foo: Vec<_> = chan.items() + .par_iter() + .map(|x| feedparser::parse_episode(&x, pd.id()).unwrap()) + .collect(); + + // info!("{:#?}", pd); + info!("{:#?}", foo); + let _: Vec<_> = foo.par_iter() + .map(|x| { + let z = bar.clone(); + baz(z, x) + }) .collect(); - info!("{:#?}", pd); // info!("{:#?}", episodes); // info!("{:?}", chan); } Ok(()) } -// TODO: make it into an iterator that yields reqwest::response +fn baz(arc: Arc>, ep: &NewEpisode) -> Result<()> { + let db = arc.lock().unwrap(); + index_episode(&db, ep)?; + Ok(()) +} + +// TODO: refactor into an Iterator // TODO: After fixing etag/lmod, add sent_etag:bool arg and logic to bypass it. pub fn fetch_feeds(connection: &SqliteConnection) -> Result> { use reqwest::header::{ETag, EntityTag, Headers, HttpDate, LastModified}; @@ -140,12 +160,16 @@ pub fn fetch_feeds(connection: &SqliteConnection) -> Result { continue; diff --git a/src/lib.rs b/src/lib.rs index 3068b6b..104d1ef 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,6 +21,7 @@ extern crate diesel_codegen; extern crate chrono; extern crate hyper; +extern crate rayon; extern crate reqwest; extern crate rss; extern crate time; @@ -43,6 +44,7 @@ pub mod errors { use time; use diesel::migrations::RunMigrationsError; use diesel::result; + // use std::sync; error_chain! { foreign_links { @@ -55,6 +57,7 @@ pub mod errors { ChronoError(chrono::ParseError); DurationError(time::OutOfRangeError); HyperError(hyper::error::Error); + // MutexPoison(sync::PoisonError); } } } @@ -75,11 +78,11 @@ lazy_static!{ HAMMOND_XDG.create_data_directory(HAMMOND_XDG.get_data_home()).unwrap() }; - static ref HAMMOND_CONFIG: PathBuf = { + static ref _HAMMOND_CONFIG: PathBuf = { HAMMOND_XDG.create_config_directory(HAMMOND_XDG.get_config_home()).unwrap() }; - static ref HAMMOND_CACHE: PathBuf = { + static ref _HAMMOND_CACHE: PathBuf = { HAMMOND_XDG.create_cache_directory(HAMMOND_XDG.get_cache_home()).unwrap() }; diff --git a/src/models.rs b/src/models.rs index ccb8564..3f594bd 100644 --- a/src/models.rs +++ b/src/models.rs @@ -108,6 +108,17 @@ pub struct Podcast { } impl Podcast { + pub fn new() -> Podcast { + Podcast { + id: 0, + title: String::new(), + link: String::new(), + description: String::new(), + image_uri: None, + source_id: 0, + } + } + pub fn id(&self) -> i32 { self.id }