Pipeline: Convert the sources iterator into a Stream and return a Future

`futures::stream::iter_ok` is so conviniet, why had None told me
about it before?
This commit is contained in:
Jordan Petridis 2018-04-18 01:40:06 +03:00
parent 049418c2f5
commit 835078a84c
No known key found for this signature in database
GPG Key ID: CEABAD9F5683B9A6
2 changed files with 20 additions and 31 deletions

View File

@ -379,29 +379,17 @@ pub fn update_none_to_played_now(parent: &Podcast) -> Result<usize, DataError> {
mod tests { mod tests {
use super::*; use super::*;
use database::*; use database::*;
use pipeline::*; use pipeline;
use hyper::Client;
use hyper_tls::HttpsConnector;
use tokio_core::reactor::Core;
use num_cpus;
#[test] #[test]
fn test_update_none_to_played_now() { fn test_update_none_to_played_now() {
truncate_db().unwrap(); truncate_db().unwrap();
let mut core = Core::new().unwrap();
let handle = core.handle();
let client = Client::configure()
.connector(HttpsConnector::new(num_cpus::get(), &handle).unwrap())
.build(&handle);
let url = "https://web.archive.org/web/20180120083840if_/https://feeds.feedburner.\ let url = "https://web.archive.org/web/20180120083840if_/https://feeds.feedburner.\
com/InterceptedWithJeremyScahill"; com/InterceptedWithJeremyScahill";
let source = Source::from_url(url).unwrap(); let source = Source::from_url(url).unwrap();
let id = source.id(); let id = source.id();
pipeline(vec![source], true, &mut core, client).unwrap(); pipeline::run(vec![source], true).unwrap();
let pd = get_podcast_from_source_id(id).unwrap(); let pd = get_podcast_from_source_id(id).unwrap();
let eps_num = get_pd_unplayed_episodes(&pd).unwrap().len(); let eps_num = get_pd_unplayed_episodes(&pd).unwrap().len();

View File

@ -1,8 +1,11 @@
// FIXME: // FIXME:
//! Docs. //! Docs.
#![allow(unused)]
use futures::future::*; use futures::future::*;
// use futures::prelude::*; use futures::prelude::*;
use futures::stream::*;
use hyper::client::HttpConnector; use hyper::client::HttpConnector;
use hyper::Client; use hyper::Client;
@ -46,28 +49,25 @@ type HttpsClient = Client<HttpsConnector<HttpConnector>>;
/// Messy temp diagram: /// Messy temp diagram:
/// Source -> GET Request -> Update Etags -> Check Status -> Parse `xml/Rss` -> /// Source -> GET Request -> Update Etags -> Check Status -> Parse `xml/Rss` ->
/// Convert `rss::Channel` into `Feed` -> Index Podcast -> Index Episodes. /// Convert `rss::Channel` into `Feed` -> Index Podcast -> Index Episodes.
pub fn pipeline<S>( pub fn pipeline<'a, S>(
sources: S, sources: S,
ignore_etags: bool, ignore_etags: bool,
tokio_core: &mut Core,
client: HttpsClient, client: HttpsClient,
) -> Result<(), DataError> ) -> Box<Future<Item = Vec<()>, Error = DataError> + 'a>
where where
S: IntoIterator<Item = Source>, S: IntoIterator<Item = Source> + 'a,
{ {
let list: Vec<_> = sources let stream = iter_ok::<_, DataError>(sources);
.into_iter() let pipeline = stream
.map(clone!(client => move |s| s.into_feed(client.clone(), ignore_etags))) .and_then(clone!(client => move |s| s.into_feed(client.clone(), ignore_etags)))
.map(|feed| { .and_then(|feed| feed.index())
feed.and_then(|f| f.index()) .map_err(|err| error!("Error: {}", err))
.map_err(|err| error!("Error: {}", err)) // the stream will stop at the first error so
// join_all stops at the first error so // we ensure that everything will succeded regardless.
// we ensure that everything will succeded regardless. .then(|_| ok::<(), DataError>(()))
.then(|_| ok::<(), DataError>(()))
})
.collect(); .collect();
tokio_core.run(join_all(list)).map(|_| ()) Box::new(pipeline)
} }
/// Creates a tokio `reactor::Core`, and a `hyper::Client` and /// Creates a tokio `reactor::Core`, and a `hyper::Client` and
@ -82,7 +82,8 @@ where
.connector(HttpsConnector::new(num_cpus::get(), &handle)?) .connector(HttpsConnector::new(num_cpus::get(), &handle)?)
.build(&handle); .build(&handle);
pipeline(sources, ignore_etags, &mut core, client) let p = pipeline(sources, ignore_etags, client);
core.run(p).map(|_| ())
} }
fn determine_ep_state( fn determine_ep_state(