diff --git a/hammond-data/src/pipeline.rs b/hammond-data/src/pipeline.rs index bc9a7f3..4edbddb 100644 --- a/hammond-data/src/pipeline.rs +++ b/hammond-data/src/pipeline.rs @@ -53,18 +53,16 @@ pub fn pipeline>( let list: Vec<_> = sources .into_iter() .map(clone!(client => move |s| s.into_feed(client.clone(), ignore_etags))) - .map(|fut| fut.and_then(|feed| feed.index())) - .map(|fut| fut.map(|_| ()).map_err(|err| error!("Error: {}", err))) + .map(|feed| { + feed.and_then(|f| f.index()) + .map_err(|err| error!("Error: {}", err)) + // join_all stops at the first error so + // we ensure that everything will succeded regardless. + .then(|_| ok::<(), DataError>(())) + }) .collect(); - if list.is_empty() { - return Err(DataError::EmptyFuturesList); - } - - // Thats not really concurrent yet I think. - tokio_core.run(collect_futures(list))?; - - Ok(()) + tokio_core.run(join_all(list)).map(|_| ()) } /// Creates a tokio `reactor::Core`, and a `hyper::Client` and