Pipeline: Print the error right away instead of waiting till all futures complete.

This commit is contained in:
Jordan Petridis 2018-01-23 18:18:56 +02:00
parent e74a2df27f
commit f9096e5fac
No known key found for this signature in database
GPG Key ID: CEABAD9F5683B9A6
3 changed files with 7 additions and 9 deletions

View File

@ -211,7 +211,7 @@ impl NewEpisodeMinimal {
let duration = parser::parse_itunes_duration(item); let duration = parser::parse_itunes_duration(item);
Ok(NewEpisodeMinimalBuilder::default() NewEpisodeMinimalBuilder::default()
.title(title) .title(title)
.uri(uri) .uri(uri)
.duration(duration) .duration(duration)
@ -219,7 +219,7 @@ impl NewEpisodeMinimal {
.guid(guid) .guid(guid)
.podcast_id(parent_id) .podcast_id(parent_id)
.build() .build()
.unwrap()) .map_err(From::from)
} }
pub(crate) fn into_new_episode(self, item: &rss::Item) -> NewEpisode { pub(crate) fn into_new_episode(self, item: &rss::Item) -> NewEpisode {

View File

@ -172,12 +172,12 @@ impl Source {
Ok(res) Ok(res)
}) })
.and_then(move |res| response_to_channel(res, pool)) .and_then(move |res| response_to_channel(res, pool))
.map(move |chan| { .and_then(move |chan| {
FeedBuilder::default() FeedBuilder::default()
.channel(chan) .channel(chan)
.source_id(id) .source_id(id)
.build() .build()
.unwrap() .map_err(From::from)
}); });
Box::new(feed) Box::new(feed)

View File

@ -57,13 +57,11 @@ pub fn pipeline<S: IntoIterator<Item = Source>>(sources: S, ignore_etags: bool)
.into_iter() .into_iter()
.map(clone!(pool => move |s| s.into_feed(&client, pool.clone(), ignore_etags))) .map(clone!(pool => move |s| s.into_feed(&client, pool.clone(), ignore_etags)))
.map(|fut| fut.and_then(clone!(pool => move |feed| pool.clone().spawn(feed.index())))) .map(|fut| fut.and_then(clone!(pool => move |feed| pool.clone().spawn(feed.index()))))
.map(|fut| fut.map(|_| ()).map_err(|err| error!("Error: {}", err)))
.collect(); .collect();
let f = core.run(collect_futures(list))?; // Thats not really concurrent yet I think.
f.into_iter() core.run(collect_futures(list))?;
.filter_map(|x| x.err())
.for_each(|err| error!("Error: {}", err));
Ok(()) Ok(())
} }