diff --git a/hammond-data/src/dbqueries.rs b/hammond-data/src/dbqueries.rs index 008f7cf..6e8bb74 100644 --- a/hammond-data/src/dbqueries.rs +++ b/hammond-data/src/dbqueries.rs @@ -379,29 +379,17 @@ pub fn update_none_to_played_now(parent: &Podcast) -> Result { mod tests { use super::*; use database::*; - use pipeline::*; - - use hyper::Client; - use hyper_tls::HttpsConnector; - use tokio_core::reactor::Core; - - use num_cpus; + use pipeline; #[test] fn test_update_none_to_played_now() { truncate_db().unwrap(); - let mut core = Core::new().unwrap(); - let handle = core.handle(); - let client = Client::configure() - .connector(HttpsConnector::new(num_cpus::get(), &handle).unwrap()) - .build(&handle); - let url = "https://web.archive.org/web/20180120083840if_/https://feeds.feedburner.\ com/InterceptedWithJeremyScahill"; let source = Source::from_url(url).unwrap(); let id = source.id(); - pipeline(vec![source], true, &mut core, client).unwrap(); + pipeline::run(vec![source], true).unwrap(); let pd = get_podcast_from_source_id(id).unwrap(); let eps_num = get_pd_unplayed_episodes(&pd).unwrap().len(); diff --git a/hammond-data/src/pipeline.rs b/hammond-data/src/pipeline.rs index 0a007f2..b91b6c6 100644 --- a/hammond-data/src/pipeline.rs +++ b/hammond-data/src/pipeline.rs @@ -1,8 +1,11 @@ // FIXME: //! Docs. +#![allow(unused)] + use futures::future::*; -// use futures::prelude::*; +use futures::prelude::*; +use futures::stream::*; use hyper::client::HttpConnector; use hyper::Client; @@ -46,28 +49,25 @@ type HttpsClient = Client>; /// Messy temp diagram: /// Source -> GET Request -> Update Etags -> Check Status -> Parse `xml/Rss` -> /// Convert `rss::Channel` into `Feed` -> Index Podcast -> Index Episodes. -pub fn pipeline( +pub fn pipeline<'a, S>( sources: S, ignore_etags: bool, - tokio_core: &mut Core, client: HttpsClient, -) -> Result<(), DataError> +) -> Box, Error = DataError> + 'a> where - S: IntoIterator, + S: IntoIterator + 'a, { - let list: Vec<_> = sources - .into_iter() - .map(clone!(client => move |s| s.into_feed(client.clone(), ignore_etags))) - .map(|feed| { - feed.and_then(|f| f.index()) - .map_err(|err| error!("Error: {}", err)) - // join_all stops at the first error so - // we ensure that everything will succeded regardless. - .then(|_| ok::<(), DataError>(())) - }) + let stream = iter_ok::<_, DataError>(sources); + let pipeline = stream + .and_then(clone!(client => move |s| s.into_feed(client.clone(), ignore_etags))) + .and_then(|feed| feed.index()) + .map_err(|err| error!("Error: {}", err)) + // the stream will stop at the first error so + // we ensure that everything will succeded regardless. + .then(|_| ok::<(), DataError>(())) .collect(); - tokio_core.run(join_all(list)).map(|_| ()) + Box::new(pipeline) } /// Creates a tokio `reactor::Core`, and a `hyper::Client` and @@ -82,7 +82,8 @@ where .connector(HttpsConnector::new(num_cpus::get(), &handle)?) .build(&handle); - pipeline(sources, ignore_etags, &mut core, client) + let p = pipeline(sources, ignore_etags, client); + core.run(p).map(|_| ()) } fn determine_ep_state(