diff --git a/hammond-data/src/feed.rs b/hammond-data/src/feed.rs index 519d879..f416c8c 100644 --- a/hammond-data/src/feed.rs +++ b/hammond-data/src/feed.rs @@ -105,7 +105,6 @@ mod tests { use Source; use database::truncate_db; use dbqueries; - use pipeline; use utils::get_feed; use std::fs; @@ -143,27 +142,6 @@ mod tests { ] }; - #[test] - /// Insert feeds and update/index them. - fn test_index_loop() { - truncate_db().unwrap(); - URLS.iter().for_each(|&(_, url)| { - // Index the urls into the source table. - Source::from_url(url).unwrap(); - }); - let sources = dbqueries::get_sources().unwrap(); - pipeline::pipeline(sources, true).unwrap(); - - let sources = dbqueries::get_sources().unwrap(); - // Run again to cover Unique constrains erros. - pipeline::pipeline(sources, true).unwrap(); - - // Assert the index rows equal the controlled results - assert_eq!(dbqueries::get_sources().unwrap().len(), 5); - assert_eq!(dbqueries::get_podcasts().unwrap().len(), 5); - assert_eq!(dbqueries::get_episodes().unwrap().len(), 354); - } - #[test] fn test_complete_index() { truncate_db().unwrap(); diff --git a/hammond-data/src/pipeline.rs b/hammond-data/src/pipeline.rs index 07ff05c..3359e9b 100644 --- a/hammond-data/src/pipeline.rs +++ b/hammond-data/src/pipeline.rs @@ -6,6 +6,7 @@ use futures_cpupool::CpuPool; // use futures::prelude::*; use hyper::Client; +use hyper::client::HttpConnector; use hyper_tls::HttpsConnector; use tokio_core::reactor::Core; @@ -43,7 +44,32 @@ macro_rules! clone { /// Messy temp diagram: /// Source -> GET Request -> Update Etags -> Check Status -> Parse xml/Rss -> /// Convert `rss::Channel` into Feed -> Index Podcast -> Index Episodes. -pub fn pipeline>(sources: S, ignore_etags: bool) -> Result<()> { +/// +/// # Panics +/// If `sources` contains no Items. +pub fn pipeline>( + sources: S, + ignore_etags: bool, + tokio_core: &mut Core, + pool: CpuPool, + client: Client>, +) -> Result<()> { + let list: Vec<_> = sources + .into_iter() + .map(clone!(pool => move |s| s.into_feed(&client, pool.clone(), ignore_etags))) + .map(|fut| fut.and_then(clone!(pool => move |feed| pool.clone().spawn(feed.index())))) + .map(|fut| fut.map(|_| ()).map_err(|err| error!("Error: {}", err))) + .collect(); + + assert!(!list.is_empty()); + // Thats not really concurrent yet I think. + tokio_core.run(collect_futures(list))?; + + Ok(()) +} + +/// Creates a tokio-core, a cpu_pool, and a hyper::Client and runs the pipeline. +pub fn run>(sources: S, ignore_etags: bool) -> Result<()> { let pool = CpuPool::new_num_cpus(); let mut core = Core::new()?; let handle = core.handle(); @@ -52,20 +78,7 @@ pub fn pipeline>(sources: S, ignore_etags: bool) .connector(HttpsConnector::new(4, &handle)?) .build(&handle); - let list: Vec<_> = sources - .into_iter() - .map(clone!(pool => move |s| s.into_feed(&client, pool.clone(), ignore_etags))) - .map(|fut| fut.and_then(clone!(pool => move |feed| pool.clone().spawn(feed.index())))) - .map(|fut| fut.map(|_| ()).map_err(|err| error!("Error: {}", err))) - .collect(); - - // TODO: this could be moved at the start of the function. - if !list.is_empty() { - // Thats not really concurrent yet I think. - core.run(collect_futures(list))?; - } - - Ok(()) + pipeline(sources, ignore_etags, &mut core, pool, client) } fn determine_ep_state(ep: NewEpisodeMinimal, item: &rss::Item) -> Result> { @@ -121,3 +134,61 @@ where }) })) } + +#[cfg(test)] +mod tests { + use super::*; + use Source; + use database::truncate_db; + + // (path, url) tuples. + const URLS: &[(&str, &str)] = { + &[ + ( + "tests/feeds/2018-01-20-Intercepted.xml", + "https://web.archive.org/web/20180120083840if_/https://feeds.feedburner.\ + com/InterceptedWithJeremyScahill", + ), + ( + "tests/feeds/2018-01-20-LinuxUnplugged.xml", + "https://web.archive.org/web/20180120110314if_/https://feeds.feedburner.\ + com/linuxunplugged", + ), + ( + "tests/feeds/2018-01-20-TheTipOff.xml", + "https://web.archive.org/web/20180120110727if_/https://rss.acast.com/thetipoff", + ), + ( + "tests/feeds/2018-01-20-StealTheStars.xml", + "https://web.archive.org/web/20180120104957if_/https://rss.art19.\ + com/steal-the-stars", + ), + ( + "tests/feeds/2018-01-20-GreaterThanCode.xml", + "https://web.archive.org/web/20180120104741if_/https://www.greaterthancode.\ + com/feed/podcast", + ), + ] + }; + + #[test] + /// Insert feeds and update/index them. + fn test_pipeline() { + truncate_db().unwrap(); + URLS.iter().for_each(|&(_, url)| { + // Index the urls into the source table. + Source::from_url(url).unwrap(); + }); + let sources = dbqueries::get_sources().unwrap(); + run(sources, true).unwrap(); + + let sources = dbqueries::get_sources().unwrap(); + // Run again to cover Unique constrains erros. + run(sources, true).unwrap(); + + // Assert the index rows equal the controlled results + assert_eq!(dbqueries::get_sources().unwrap().len(), 5); + assert_eq!(dbqueries::get_podcasts().unwrap().len(), 5); + assert_eq!(dbqueries::get_episodes().unwrap().len(), 354); + } +} diff --git a/hammond-downloader/src/downloader.rs b/hammond-downloader/src/downloader.rs index 7f520af..4a0e90a 100644 --- a/hammond-downloader/src/downloader.rs +++ b/hammond-downloader/src/downloader.rs @@ -218,7 +218,7 @@ mod tests { use super::*; use hammond_data::Source; use hammond_data::dbqueries; - use hammond_data::pipeline::pipeline; + use hammond_data::pipeline; #[test] // This test inserts an rss feed to your `XDG_DATA/hammond/hammond.db` so we make it explicit @@ -231,7 +231,7 @@ mod tests { // Copy it's id let sid = source.id(); // Convert Source it into a future Feed and index it - pipeline(vec![source], true).unwrap(); + pipeline::run(vec![source], true).unwrap(); // Get the Podcast let pd = dbqueries::get_podcast_from_source_id(sid).unwrap().into(); diff --git a/hammond-gtk/src/manager.rs b/hammond-gtk/src/manager.rs index fab0be9..e310cb1 100644 --- a/hammond-gtk/src/manager.rs +++ b/hammond-gtk/src/manager.rs @@ -119,7 +119,7 @@ mod tests { use hammond_data::{Episode, Source}; use hammond_data::dbqueries; - use hammond_data::pipeline::pipeline; + use hammond_data::pipeline; use hammond_data::utils::get_download_folder; use std::{thread, time}; @@ -138,7 +138,7 @@ mod tests { let source = Source::from_url(url).unwrap(); // Copy it's id let sid = source.id(); - pipeline(vec![source], true).unwrap(); + pipeline::run(vec![source], true).unwrap(); // Get the Podcast let pd = dbqueries::get_podcast_from_source_id(sid).unwrap(); diff --git a/hammond-gtk/src/utils.rs b/hammond-gtk/src/utils.rs index 9ce150f..67f6d49 100644 --- a/hammond-gtk/src/utils.rs +++ b/hammond-gtk/src/utils.rs @@ -28,13 +28,13 @@ pub fn refresh_feed(headerbar: Arc
, source: Option>, sender: if let Some(s) = source { // feed::index_loop(s); // TODO: determine if it needs to ignore_etags. - if let Err(err) = pipeline::pipeline(s, true) { + if let Err(err) = pipeline::run(s, true) { error!("Error While trying to update the database."); error!("Error msg: {}", err); } } else { let sources = dbqueries::get_sources().unwrap(); - if let Err(err) = pipeline::pipeline(sources, false) { + if let Err(err) = pipeline::run(sources, false) { error!("Error While trying to update the database."); error!("Error msg: {}", err); } @@ -83,7 +83,6 @@ mod tests { use super::*; use hammond_data::Source; use hammond_data::dbqueries; - use hammond_data::pipeline::pipeline; #[test] // This test inserts an rss feed to your `XDG_DATA/hammond/hammond.db` so we make it explicit @@ -95,7 +94,7 @@ mod tests { let source = Source::from_url(url).unwrap(); // Copy it's id let sid = source.id(); - pipeline(vec![source], true).unwrap(); + pipeline::run(vec![source], true).unwrap(); // Get the Podcast let pd = dbqueries::get_podcast_from_source_id(sid).unwrap();