Pipeline: Do not terminate the stream upon errors

Stream::for_each terminated the stream upon the first error. This
was causing feeds to not update if any one returned a non-200ish
result. To work around this, we create a succesfull result for
every entry regardless at the end.

While we are at it, aslo switch from FuturesOrdered stream
to FuturesUnordered. There is no reason to use Ordered, this was
a typo initially.
This commit is contained in:
Jordan Petridis 2018-10-04 18:48:11 +03:00
parent 357d99ac7c
commit df302ad517
No known key found for this signature in database
GPG Key ID: E8523968931763BE

View File

@ -1,7 +1,7 @@
// FIXME: // FIXME:
//! Docs. //! Docs.
use futures::{lazy, prelude::*, future::ok, stream::FuturesOrdered}; use futures::{lazy, prelude::*, future::ok, stream::FuturesUnordered};
use tokio; use tokio;
use hyper::client::HttpConnector; use hyper::client::HttpConnector;
@ -28,7 +28,7 @@ pub fn pipeline<'a, S>(
client: HttpsClient, client: HttpsClient,
) -> impl Future<Item = (), Error = ()> + 'a ) -> impl Future<Item = (), Error = ()> + 'a
where where
S: Stream<Item = Source, Error = DataError> + 'a, S: Stream<Item = Source, Error = DataError> + Send + 'a,
{ {
sources sources
.and_then(move |s| s.into_feed(client.clone())) .and_then(move |s| s.into_feed(client.clone()))
@ -39,10 +39,16 @@ where
_ => error!("Error: {}", err), _ => error!("Error: {}", err),
} }
}) })
.for_each(move |feed| { .and_then(move |feed| {
let fut = lazy(|| feed.index().map_err(|err| error!("Error: {}", err))); let fut = lazy(|| feed.index().map_err(|err| error!("Error: {}", err)));
tokio::spawn(fut) tokio::spawn(fut);
Ok(())
}) })
// For each terminates the stream at the first error so we make sure
// we pass good values regardless
.then(move |_| ok(()))
// Convert the stream into a Future to later execute as a tokio task
.for_each(move |_| ok(()))
} }
/// Creates a tokio `reactor::Core`, and a `hyper::Client` and /// Creates a tokio `reactor::Core`, and a `hyper::Client` and
@ -55,7 +61,7 @@ where
let client = Client::builder().build::<_, Body>(https); let client = Client::builder().build::<_, Body>(https);
let foo = sources.into_iter().map(ok::<_, _>); let foo = sources.into_iter().map(ok::<_, _>);
let stream = FuturesOrdered::from_iter(foo); let stream = FuturesUnordered::from_iter(foo);
let p = pipeline(stream, client); let p = pipeline(stream, client);
tokio::run(p); tokio::run(p);