From f811a9c8f46d3161457947cd3b534c577e187224 Mon Sep 17 00:00:00 2001 From: Jordan Petridis Date: Wed, 18 Apr 2018 08:20:26 +0300 Subject: [PATCH] Feed: Split index_channel_items method. --- hammond-data/src/feed.rs | 92 ++++++++++++++++++++++++---------------- 1 file changed, 56 insertions(+), 36 deletions(-) diff --git a/hammond-data/src/feed.rs b/hammond-data/src/feed.rs index 078c79e..249076a 100644 --- a/hammond-data/src/feed.rs +++ b/hammond-data/src/feed.rs @@ -42,45 +42,19 @@ impl Feed { fn index_channel_items(self, pd: Podcast) -> Box + Send> { let stream = stream::iter_ok::<_, DataError>(self.channel.items_owned()); - let insert = stream - .filter_map(move |item| { - glue(&item, pd.id()) - .map_err(|err| error!("Failed to parse an episode: {}", err)) - .ok() - }) - .filter_map(|state| match state { - IndexState::NotChanged => None, - // Update individual rows, and filter them - IndexState::Update((ref ep, rowid)) => { - ep.update(rowid) - .map_err(|err| error!("{}", err)) - .map_err(|_| error!("Failed to index episode: {:?}.", ep.title())) - .ok(); - None - }, - IndexState::Index(s) => Some(s), - }) - // only Index is left, collect them for batch index - .collect(); - - let idx = insert.map(|vec| { - if !vec.is_empty() { - info!("Indexing {} episodes.", vec.len()); - if let Err(err) = dbqueries::index_new_episodes(vec.as_slice()) { - error!("Failed batch indexng, Fallign back to individual indexing."); - error!("{}", err); - - vec.iter().for_each(|ep| { - if let Err(err) = ep.index() { - error!("Failed to index episode: {:?}.", ep.title()); - error!("{}", err); - }; - }) - } - } + // Parse the episodes + let episodes = stream.filter_map(move |item| { + glue(&item, pd.id()) + .map_err(|err| error!("Failed to parse an episode: {}", err)) + .ok() }); + // Filter errors, Index updatable episodes, return insertables. + let insertables = filter_episodes(episodes); + // Batch index insertable episodes. + let idx = insertables.and_then(|eps| ok(batch_insert_episodes(eps))); + Box::new(idx) } } @@ -110,6 +84,52 @@ fn determine_ep_state( } } +fn filter_episodes<'a, S>( + stream: S, +) -> Box, Error = DataError> + Send + 'a> +where + S: Stream, Error = DataError> + Send + 'a, +{ + let list = stream.filter_map(|state| match state { + IndexState::NotChanged => None, + // Update individual rows, and filter them + IndexState::Update((ref ep, rowid)) => { + ep.update(rowid) + .map_err(|err| error!("{}", err)) + .map_err(|_| error!("Failed to index episode: {:?}.", ep.title())) + .ok(); + + None + }, + IndexState::Index(s) => Some(s), + }) + // only Index is left, collect them for batch index + .collect(); + + Box::new(list) +} + +fn batch_insert_episodes(episodes: Vec) { + if episodes.is_empty() { + return; + }; + + info!("Indexing {} episodes.", episodes.len()); + dbqueries::index_new_episodes(episodes.as_slice()) + .map_err(|err| { + error!("Failed batch indexng: {}", err); + info!("Fallign back to individual indexing."); + }) + .unwrap_or_else(|_| { + episodes.iter().for_each(|ep| { + ep.index() + .map_err(|err| error!("Error: {}.", err)) + .map_err(|_| error!("Failed to index episode: {:?}.", ep.title())) + .ok(); + }); + }) +} + #[cfg(test)] mod tests { use rss::Channel;