diff --git a/podcasts-data/src/pipeline.rs b/podcasts-data/src/pipeline.rs index 6048027..e4efdb5 100644 --- a/podcasts-data/src/pipeline.rs +++ b/podcasts-data/src/pipeline.rs @@ -16,27 +16,6 @@ use num_cpus; use errors::DataError; use Source; -// use std::sync::{Arc, Mutex}; - -// http://gtk-rs.org/tuto/closures -#[macro_export] -macro_rules! clone { - (@param _) => ( _ ); - (@param $x:ident) => ( $x ); - ($($n:ident),+ => move || $body:expr) => ( - { - $( let $n = $n.clone(); )+ - move || $body - } - ); - ($($n:ident),+ => move |$($p:tt),+| $body:expr) => ( - { - $( let $n = $n.clone(); )+ - move |$(clone!(@param $p),)+| $body - } - ); -} - type HttpsClient = Client>; /// The pipline to be run for indexing and updating a Podcast feed that originates from @@ -47,19 +26,19 @@ type HttpsClient = Client>; /// Convert `rss::Channel` into `Feed` -> Index Podcast -> Index Episodes. pub fn pipeline<'a, S>( sources: S, - client: &HttpsClient, - pool: &tokio_threadpool::Sender, + client: HttpsClient, + pool: tokio_threadpool::Sender, ) -> impl Future, Error = DataError> + 'a where S: Stream + 'a, { sources - .and_then(clone!(client => move |s| s.into_feed(client.clone()))) - .and_then(clone!(pool => move |feed| { + .and_then(move |s| s.into_feed(client.clone())) + .and_then(move |feed| { pool.spawn(lazy(|| { feed.index().map_err(|err| error!("Error: {}", err)) })).map_err(From::from) - })) + }) // the stream will stop at the first error so // we ensure that everything will succeded regardless. .map_err(|err| error!("Error: {}", err)) @@ -82,7 +61,7 @@ where .build(&handle); let stream = iter_ok::<_, DataError>(sources); - let p = pipeline(stream, &client, &sender); + let p = pipeline(stream, client, sender.clone()); core.run(p).map(|_| ()) }