From 1036176e5155ae7ce146706e6811538ac6e6b5b5 Mon Sep 17 00:00:00 2001 From: Jordan Petridis Date: Sat, 14 Apr 2018 07:41:50 +0300 Subject: [PATCH] pipeline: Make sure that the futures will be run. Use .then() combinator to override the result and return Ok(()) even if the task fails. That allows us to use join_all instead of the custom written collect_futures function. --- hammond-data/src/pipeline.rs | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/hammond-data/src/pipeline.rs b/hammond-data/src/pipeline.rs index bc9a7f3..4edbddb 100644 --- a/hammond-data/src/pipeline.rs +++ b/hammond-data/src/pipeline.rs @@ -53,18 +53,16 @@ pub fn pipeline>( let list: Vec<_> = sources .into_iter() .map(clone!(client => move |s| s.into_feed(client.clone(), ignore_etags))) - .map(|fut| fut.and_then(|feed| feed.index())) - .map(|fut| fut.map(|_| ()).map_err(|err| error!("Error: {}", err))) + .map(|feed| { + feed.and_then(|f| f.index()) + .map_err(|err| error!("Error: {}", err)) + // join_all stops at the first error so + // we ensure that everything will succeded regardless. + .then(|_| ok::<(), DataError>(())) + }) .collect(); - if list.is_empty() { - return Err(DataError::EmptyFuturesList); - } - - // Thats not really concurrent yet I think. - tokio_core.run(collect_futures(list))?; - - Ok(()) + tokio_core.run(join_all(list)).map(|_| ()) } /// Creates a tokio `reactor::Core`, and a `hyper::Client` and