diff --git a/hammond-data/benches/bench.rs b/hammond-data/benches/bench.rs index be88041..b47d286 100644 --- a/hammond-data/benches/bench.rs +++ b/hammond-data/benches/bench.rs @@ -15,7 +15,7 @@ use rand::Rng; use test::Bencher; use hammond_data::run_migration_on; -use hammond_data::index_feed::{complete_index, insert_return_source, Database}; +use hammond_data::index_feed::{index_feeds, insert_return_source, Database, Feed}; // use std::io::BufRead; use std::path::PathBuf; @@ -63,9 +63,10 @@ fn index_urls(m: &Database) { }; // parse it into a channel let chan = rss::Channel::read_from(buff).unwrap(); + let feed = Feed(chan, s); // Index the channel - complete_index(m, &chan, &s).unwrap(); + index_feeds(m, &mut [feed]); }); } diff --git a/hammond-data/src/index_feed.rs b/hammond-data/src/index_feed.rs index b8e03a5..00d701a 100644 --- a/hammond-data/src/index_feed.rs +++ b/hammond-data/src/index_feed.rs @@ -1,7 +1,6 @@ use diesel::prelude::*; use diesel; use rss; -use reqwest; use rayon::prelude::*; use dbqueries; @@ -11,12 +10,51 @@ use feedparser; use std::sync::{Arc, Mutex}; -#[derive(Debug)] -pub struct Feed(pub reqwest::Response, pub Source); - pub type Database = Arc>; -fn index_source(con: &SqliteConnection, foo: &NewSource) { +#[derive(Debug)] +pub struct Feed(pub rss::Channel, pub Source); + +impl Feed { + fn index(&self, db: &Database) -> Result<()> { + let tempdb = db.lock().unwrap(); + let pd = self.index_channel(&tempdb)?; + drop(tempdb); + + self.index_channel_items(db, &pd)?; + Ok(()) + } + + fn index_channel(&self, con: &SqliteConnection) -> Result { + let pd = feedparser::parse_podcast(&self.0, self.1.id()); + // Convert NewPodcast to Podcast + insert_return_podcast(con, &pd) + } + + fn index_channel_items(&self, db: &Database, pd: &Podcast) -> Result<()> { + let it = self.0.items(); + let episodes: Vec<_> = it.par_iter() + .map(|x| feedparser::parse_episode(x, pd.id())) + .collect(); + + let conn = db.lock().unwrap(); + let e = conn.transaction::<(), Error, _>(|| { + episodes.iter().for_each(|x| { + let e = index_episode(&conn, x); + if let Err(err) = e { + error!("Failed to index episode: {:?}.", x); + error!("Error msg: {}", err); + }; + }); + Ok(()) + }); + drop(conn); + + e + } +} + +pub fn index_source(con: &SqliteConnection, foo: &NewSource) { use schema::source::dsl::*; // Throw away the result like `insert or ignore` @@ -78,96 +116,41 @@ fn insert_return_podcast(con: &SqliteConnection, pd: &NewPodcast) -> Result Result<()> { let mut f = fetch_all_feeds(db)?; - index_feed(db, &mut f); + index_feeds(db, &mut f); info!("Indexing done."); Ok(()) } -pub fn index_feed(db: &Database, f: &mut [Feed]) { - f.par_iter_mut() - .for_each(|&mut Feed(ref mut req, ref source)| { - let e = complete_index_from_source(req, source, db); - if e.is_err() { - error!("Error While trying to update the database."); - error!("Error msg: {}", e.unwrap_err()); - }; - }); -} - -pub fn complete_index_from_source( - req: &mut reqwest::Response, - source: &Source, - db: &Database, -) -> Result<()> { - use std::io::Read; - use std::str::FromStr; - - let mut buf = String::new(); - req.read_to_string(&mut buf)?; - let chan = rss::Channel::from_str(&buf)?; - - complete_index(db, &chan, source) -} - -pub fn complete_index(db: &Database, chan: &rss::Channel, parent: &Source) -> Result<()> { - let pd = { - let conn = db.lock().unwrap(); - index_channel(&conn, chan, parent)? - }; - index_channel_items(db, chan.items(), &pd); - Ok(()) -} - -fn index_channel(con: &SqliteConnection, chan: &rss::Channel, parent: &Source) -> Result { - let pd = feedparser::parse_podcast(chan, parent.id()); - // Convert NewPodcast to Podcast - insert_return_podcast(con, &pd) -} - -fn index_channel_items(db: &Database, it: &[rss::Item], pd: &Podcast) { - let episodes: Vec<_> = it.par_iter() - .map(|x| feedparser::parse_episode(x, pd.id())) - .collect(); - - let conn = db.lock().unwrap(); - let e = conn.transaction::<(), Error, _>(|| { - episodes.iter().for_each(|x| { - let e = index_episode(&conn, x); - if let Err(err) = e { - error!("Failed to index episode: {:?}.", x); - error!("Error msg: {}", err); - }; - }); - Ok(()) +pub fn index_feeds(db: &Database, f: &mut [Feed]) { + f.into_par_iter().for_each(|x| { + let e = x.index(db); + if e.is_err() { + error!("Error While trying to update the database."); + error!("Error msg: {}", e.unwrap_err()); + }; }); - drop(conn); - - if let Err(err) = e { - error!("Episodes Transcaction Failed."); - error!("Error msg: {}", err); - }; } -// Maybe this can be refactored into an Iterator for lazy evaluation. pub fn fetch_all_feeds(db: &Database) -> Result> { - let mut feeds = { + let feeds = { let conn = db.lock().unwrap(); dbqueries::get_sources(&conn)? }; - let results = fetch_feeds(db, &mut feeds); + let results = fetch_feeds(db, feeds); Ok(results) } -pub fn fetch_feeds(db: &Database, feeds: &mut [Source]) -> Vec { - let results: Vec = feeds - .par_iter_mut() +pub fn fetch_feeds(db: &Database, feeds: Vec) -> Vec { + let results: Vec<_> = feeds + .into_par_iter() .filter_map(|x| { - let l = refresh_source(db, x); + let uri = x.uri().to_owned(); + let l = x.refresh(db); if l.is_ok() { l.ok() } else { - error!("Error While trying to fetch from source: {}.", x.uri()); + error!("Error While trying to fetch from source: {}.", uri); error!("Error msg: {}", l.unwrap_err()); None } @@ -177,43 +160,6 @@ pub fn fetch_feeds(db: &Database, feeds: &mut [Source]) -> Vec { results } -pub fn refresh_source(db: &Database, feed: &mut Source) -> Result { - use reqwest::header::{ETag, EntityTag, Headers, HttpDate, LastModified}; - - let mut headers = Headers::new(); - - if let Some(foo) = feed.http_etag() { - headers.set(ETag(EntityTag::new(true, foo.to_owned()))); - } - - if let Some(foo) = feed.last_modified() { - if let Ok(x) = foo.parse::() { - headers.set(LastModified(x)); - } - } - - // FIXME: I have fucked up somewhere here. - // Getting back 200 codes even though I supposedly sent etags. - // info!("Headers: {:?}", headers); - let client = reqwest::Client::builder().referer(false).build()?; - let req = client.get(feed.uri()).headers(headers).send()?; - - info!("GET to {} , returned: {}", feed.uri(), req.status()); - - // TODO match on more stuff - // 301: Permanent redirect of the url - // 302: Temporary redirect of the url - // 304: Up to date Feed, checked with the Etag - // 410: Feed deleted - // match req.status() { - // reqwest::StatusCode::NotModified => (), - // _ => (), - // }; - - feed.update_etag(db, &req)?; - Ok(Feed(req, feed.clone())) -} - #[cfg(test)] mod tests { @@ -308,9 +254,10 @@ mod tests { let feed = fs::File::open(path).unwrap(); // parse it into a channel let chan = rss::Channel::read_from(BufReader::new(feed)).unwrap(); + let feed = Feed(chan, s); // Index the channel - complete_index(&m, &chan, &s).unwrap(); + index_feeds(&m, &mut [feed]); }); // Assert the index rows equal the controlled results diff --git a/hammond-data/src/models/insertables.rs b/hammond-data/src/models/insertables.rs index 64f8c0e..ec90134 100644 --- a/hammond-data/src/models/insertables.rs +++ b/hammond-data/src/models/insertables.rs @@ -1,12 +1,19 @@ +use diesel::prelude::*; + use schema::{episode, podcast, source}; +use models::Source; +use index_feed::Database; +use index_feed; +use dbqueries; + #[derive(Insertable)] #[table_name = "source"] #[derive(Debug, Clone)] pub struct NewSource<'a> { pub uri: &'a str, - pub last_modified: Option<&'a str>, - pub http_etag: Option<&'a str>, + last_modified: Option<&'a str>, + http_etag: Option<&'a str>, } impl<'a> NewSource<'a> { @@ -17,6 +24,12 @@ impl<'a> NewSource<'a> { http_etag: None, } } + + pub fn into_source(self, db: &Database) -> QueryResult { + let tempdb = db.lock().unwrap(); + index_feed::index_source(&tempdb, &self); + dbqueries::get_source_from_uri(&tempdb, self.uri) + } } #[derive(Insertable)] diff --git a/hammond-data/src/models/queryables.rs b/hammond-data/src/models/queryables.rs index 15b0aa2..0699f63 100644 --- a/hammond-data/src/models/queryables.rs +++ b/hammond-data/src/models/queryables.rs @@ -3,13 +3,17 @@ use reqwest; use diesel::SaveChangesDsl; use diesel::result::QueryResult; use reqwest::header::{ETag, LastModified}; +use rss::Channel; use schema::{episode, podcast, source}; -use index_feed::Database; +use index_feed::{Database, Feed}; use errors::*; use models::insertables::NewPodcast; +use std::io::Read; +use std::str::FromStr; + #[derive(Queryable, Identifiable, AsChangeset, Associations)] #[table_name = "episode"] #[changeset_options(treat_none_as_null = "true")] @@ -272,7 +276,7 @@ impl<'a> Source { /// Extract Etag and LastModifier from req, and update self and the /// corresponding db row. - pub fn update_etag(&mut self, db: &Database, req: &reqwest::Response) -> Result<()> { + fn update_etag(&mut self, db: &Database, req: &reqwest::Response) -> Result<()> { let headers = req.headers(); // let etag = headers.get_raw("ETag").unwrap(); @@ -295,4 +299,46 @@ impl<'a> Source { let tempdb = db.lock().unwrap(); self.save_changes::(&*tempdb) } + + pub fn refresh(mut self, db: &Database) -> Result { + use reqwest::header::{ETag, EntityTag, Headers, HttpDate, LastModified}; + + let mut headers = Headers::new(); + + if let Some(foo) = self.http_etag() { + headers.set(ETag(EntityTag::new(true, foo.to_owned()))); + } + + if let Some(foo) = self.last_modified() { + if let Ok(x) = foo.parse::() { + headers.set(LastModified(x)); + } + } + + // FIXME: I have fucked up somewhere here. + // Getting back 200 codes even though I supposedly sent etags. + // info!("Headers: {:?}", headers); + let client = reqwest::Client::builder().referer(false).build()?; + let mut req = client.get(self.uri()).headers(headers).send()?; + + info!("GET to {} , returned: {}", self.uri(), req.status()); + + // TODO match on more stuff + // 301: Permanent redirect of the url + // 302: Temporary redirect of the url + // 304: Up to date Feed, checked with the Etag + // 410: Feed deleted + // match req.status() { + // reqwest::StatusCode::NotModified => (), + // _ => (), + // }; + + self.update_etag(db, &req)?; + + let mut buf = String::new(); + req.read_to_string(&mut buf)?; + let chan = Channel::from_str(&buf)?; + + Ok(Feed(chan, self)) + } } diff --git a/hammond-gtk/src/utils.rs b/hammond-gtk/src/utils.rs index be3616a..0bff632 100644 --- a/hammond-gtk/src/utils.rs +++ b/hammond-gtk/src/utils.rs @@ -42,14 +42,14 @@ pub fn refresh_feed( let feeds = { if let Some(mut vec) = source { - Ok(index_feed::fetch_feeds(&db, &mut vec)) + Ok(index_feed::fetch_feeds(&db, vec)) } else { index_feed::fetch_all_feeds(&db) } }; if let Ok(mut x) = feeds { - index_feed::index_feed(&db, &mut x); + index_feed::index_feeds(&db, &mut x); info!("Indexing done."); sender.send(true).expect("Couldn't send data to channel");;