diff --git a/podcasts-data/src/feed.rs b/podcasts-data/src/feed.rs index 1920c58..1bf0d31 100644 --- a/podcasts-data/src/feed.rs +++ b/podcasts-data/src/feed.rs @@ -21,7 +21,6 @@ #![allow(clippy::unit_arg)] //! Index Feeds. -use futures::future::*; use futures::prelude::*; use futures::stream; use rss; @@ -54,7 +53,7 @@ impl Feed { NewShow::new(&self.channel, self.source_id) } - fn index_channel_items(self, pd: Show) -> impl Future> + Send { + async fn index_channel_items(self, pd: Show) -> Result<(), DataError> { let stream = stream::iter(self.channel.into_items()); // Parse the episodes let episodes = stream.filter_map(move |item| { @@ -67,9 +66,9 @@ impl Feed { } }); // Filter errors, Index updatable episodes, return insertables. - filter_episodes(episodes) - // Batch index insertable episodes. - .and_then(|eps| ok(batch_insert_episodes(&eps))) + let insertable_episodes = filter_episodes(episodes).await?; + batch_insert_episodes(&insertable_episodes); + Ok(()) } } @@ -94,32 +93,31 @@ fn determine_ep_state( } } -fn filter_episodes<'a, S>( - stream: S, -) -> impl Future, DataError>> + Send + 'a +async fn filter_episodes<'a, S>(stream: S) -> Result, DataError> where - S: Stream, DataError>> + Send + 'a, + S: Stream, DataError>>, { stream .try_filter_map(|state| { - async move { - match state { - IndexState::NotChanged => Ok(None), + async { + let result = match state { + IndexState::NotChanged => None, // Update individual rows, and filter them IndexState::Update((ref ep, rowid)) => { ep.update(rowid) .map_err(|err| error!("{}", err)) .map_err(|_| error!("Failed to index episode: {:?}.", ep.title())) .ok(); - - Ok(None) + None } - IndexState::Index(s) => Ok(Some(s)), - } + IndexState::Index(s) => Some(s), + }; + Ok(result) } }) // only Index is left, collect them for batch index .try_collect() + .await } fn batch_insert_episodes(episodes: &[NewEpisode]) {