From bf4f655ed2ac8285dbd940d26643d6107422592e Mon Sep 17 00:00:00 2001 From: Jordan Petridis Date: Tue, 16 Jan 2018 14:37:51 +0200 Subject: [PATCH] Pipeline: remove submodule hack. --- hammond-data/src/pipeline.rs | 72 +++++++++++++++++------------------- 1 file changed, 33 insertions(+), 39 deletions(-) diff --git a/hammond-data/src/pipeline.rs b/hammond-data/src/pipeline.rs index 2510248..1e0c0cb 100644 --- a/hammond-data/src/pipeline.rs +++ b/hammond-data/src/pipeline.rs @@ -7,15 +7,46 @@ use hyper_tls::HttpsConnector; use futures::prelude::*; use futures::future::*; -use errors::Error; +use errors::*; use Source; // use Feed; +use std; + +/// The pipline to be run for indexing and updating a Podcast feed that originates from +/// `Source.uri`. +/// +/// Messy temp diagram: +/// Source -> GET Request -> Update Etags -> Check Status -> Parse xml/Rss -> +/// Convert rss::Channel into Feed -> Index Podcast -> Index Episodes. +pub fn pipeline>(sources: S, ignore_etags: bool) -> Result<()> { + let mut core = Core::new()?; + let handle = core.handle(); + let client = Client::configure() + // FIXME: numcpus instead of 4 + .connector(HttpsConnector::new(4, &handle)?) + .build(&handle); + + let list = sources + .into_iter() + // FIXME: Make proper indexing futures instead of wrapping up existing + // blocking functions + .map(|s| s.into_fututre_feed(&client, ignore_etags).map(|feed| feed.index_future())) + .collect(); + + let f = core.run(collect_futures(list))?; + f.into_iter() + .filter_map(|x| x.err()) + .for_each(|err| error!("Error: {}", err)); + + Ok(()) +} + // Weird magic from #rust irc channel // kudos to remexre fn collect_futures( futures: Vec, -) -> Box>, Error = Error>> +) -> Box>, Error = Error>> where F: 'static + Future, ::Item: 'static, @@ -36,40 +67,3 @@ where }) })) } - -pub use self::dirtyhack::pipeline; - -// Use a submodule ot not polute the collect_futures definition with the errorchain Result. -mod dirtyhack { - use super::*; - use errors::*; - - /// The pipline to be run for indexing and updating a Podcast feed that originates from - /// `Source.uri`. - /// - /// Messy temp diagram: - /// Source -> GET Request -> Update Etags -> Check Status -> Parse xml/Rss -> - /// Convert rss::Channel into Feed -> Index Podcast -> Index Episodes. - pub fn pipeline>(sources: S, ignore_etags: bool) -> Result<()> { - let mut core = Core::new()?; - let handle = core.handle(); - let client = Client::configure() - // FIXME: numcpus instead of 4 - .connector(HttpsConnector::new(4, &handle)?) - .build(&handle); - - let list = sources - .into_iter() - // FIXME: Make proper indexing futures instead of wrapping up existing - // blocking functions - .map(|s| s.into_fututre_feed(&client, ignore_etags).map(|feed| feed.index_future())) - .collect(); - - let f = core.run(collect_futures(list))?; - f.into_iter() - .filter_map(|x| x.err()) - .for_each(|err| error!("Error: {}", err)); - - Ok(()) - } -}