From 62029f6164c57013683f3a19b9c276857bc5fdfd Mon Sep 17 00:00:00 2001 From: Jordan Petridis Date: Sat, 29 Sep 2018 12:03:00 +0300 Subject: [PATCH] Pipeline: Minor refactor Use the proper Stream API to return a future to run on the executor. Previously I was using a workaround to convert the Stream into a future and run it to completion in the Executor, since I was not aware of a better API. --- podcasts-data/src/pipeline.rs | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/podcasts-data/src/pipeline.rs b/podcasts-data/src/pipeline.rs index 8355041..35ef543 100644 --- a/podcasts-data/src/pipeline.rs +++ b/podcasts-data/src/pipeline.rs @@ -1,7 +1,7 @@ // FIXME: //! Docs. -use futures::{future::ok, lazy, prelude::*, stream::iter_ok}; +use futures::{lazy, prelude::*, stream::iter_ok}; use tokio_core::reactor::Core; use tokio_threadpool::{self, ThreadPool}; @@ -26,26 +26,23 @@ pub fn pipeline<'a, S>( sources: S, client: HttpsClient, pool: tokio_threadpool::Sender, -) -> impl Future, Error = DataError> + 'a +) -> impl Future + 'a where S: Stream + 'a, { sources .and_then(move |s| s.into_feed(client.clone())) - .and_then(move |feed| { - let fut = lazy(|| feed.index().map_err(|err| error!("Error: {}", err))); - pool.spawn(fut).map_err(From::from) - }) - // the stream will stop at the first error so - // we ensure that everything will succeded regardless. .map_err(|err| { match err { // Avoid spamming the stderr when its not an eactual error DataError::FeedNotModified(_) => (), _ => error!("Error: {}", err), } - }).then(|_| ok::<(), DataError>(())) - .collect() + }) + .for_each(move |feed| { + let fut = lazy(|| feed.index().map_err(|err| error!("Error: {}", err))); + pool.spawn(fut).map_err(|_| ()) + }) } /// Creates a tokio `reactor::Core`, and a `hyper::Client` and @@ -62,7 +59,7 @@ where let stream = iter_ok::<_, DataError>(sources); let p = pipeline(stream, client, sender); - core.run(p)?; + let _ = core.run(p); pool.shutdown_on_idle().wait().unwrap(); Ok(())