Pipeline: Minor refactor

Use the proper Stream API to return a future to run on the
executor. Previously I was using a workaround to convert the
Stream into a future and run it to completion in the Executor,
since I was not aware of a better API.
This commit is contained in:
Jordan Petridis 2018-09-29 12:03:00 +03:00
parent ba986847d6
commit 62029f6164
No known key found for this signature in database
GPG Key ID: E8523968931763BE

View File

@ -1,7 +1,7 @@
// FIXME:
//! Docs.
use futures::{future::ok, lazy, prelude::*, stream::iter_ok};
use futures::{lazy, prelude::*, stream::iter_ok};
use tokio_core::reactor::Core;
use tokio_threadpool::{self, ThreadPool};
@ -26,26 +26,23 @@ pub fn pipeline<'a, S>(
sources: S,
client: HttpsClient,
pool: tokio_threadpool::Sender,
) -> impl Future<Item = Vec<()>, Error = DataError> + 'a
) -> impl Future<Item = (), Error = ()> + 'a
where
S: Stream<Item = Source, Error = DataError> + 'a,
{
sources
.and_then(move |s| s.into_feed(client.clone()))
.and_then(move |feed| {
let fut = lazy(|| feed.index().map_err(|err| error!("Error: {}", err)));
pool.spawn(fut).map_err(From::from)
})
// the stream will stop at the first error so
// we ensure that everything will succeded regardless.
.map_err(|err| {
match err {
// Avoid spamming the stderr when its not an eactual error
DataError::FeedNotModified(_) => (),
_ => error!("Error: {}", err),
}
}).then(|_| ok::<(), DataError>(()))
.collect()
})
.for_each(move |feed| {
let fut = lazy(|| feed.index().map_err(|err| error!("Error: {}", err)));
pool.spawn(fut).map_err(|_| ())
})
}
/// Creates a tokio `reactor::Core`, and a `hyper::Client` and
@ -62,7 +59,7 @@ where
let stream = iter_ok::<_, DataError>(sources);
let p = pipeline(stream, client, sender);
core.run(p)?;
let _ = core.run(p);
pool.shutdown_on_idle().wait().unwrap();
Ok(())