diff --git a/podcasts-data/src/pipeline.rs b/podcasts-data/src/pipeline.rs index 211a1b4..dac255e 100644 --- a/podcasts-data/src/pipeline.rs +++ b/podcasts-data/src/pipeline.rs @@ -1,7 +1,7 @@ // FIXME: //! Docs. -use futures::{lazy, prelude::*, future::ok, stream::FuturesOrdered}; +use futures::{lazy, prelude::*, future::ok, stream::FuturesUnordered}; use tokio; use hyper::client::HttpConnector; @@ -28,7 +28,7 @@ pub fn pipeline<'a, S>( client: HttpsClient, ) -> impl Future + 'a where - S: Stream + 'a, + S: Stream + Send + 'a, { sources .and_then(move |s| s.into_feed(client.clone())) @@ -39,10 +39,16 @@ where _ => error!("Error: {}", err), } }) - .for_each(move |feed| { + .and_then(move |feed| { let fut = lazy(|| feed.index().map_err(|err| error!("Error: {}", err))); - tokio::spawn(fut) + tokio::spawn(fut); + Ok(()) }) + // For each terminates the stream at the first error so we make sure + // we pass good values regardless + .then(move |_| ok(())) + // Convert the stream into a Future to later execute as a tokio task + .for_each(move |_| ok(())) } /// Creates a tokio `reactor::Core`, and a `hyper::Client` and @@ -55,7 +61,7 @@ where let client = Client::builder().build::<_, Body>(https); let foo = sources.into_iter().map(ok::<_, _>); - let stream = FuturesOrdered::from_iter(foo); + let stream = FuturesUnordered::from_iter(foo); let p = pipeline(stream, client); tokio::run(p);