Feed: Split index_channel_items method.

This commit is contained in:
Jordan Petridis 2018-04-18 08:20:26 +03:00
parent 771999c603
commit f811a9c8f4
No known key found for this signature in database
GPG Key ID: CEABAD9F5683B9A6

View File

@ -42,45 +42,19 @@ impl Feed {
fn index_channel_items(self, pd: Podcast) -> Box<Future<Item = (), Error = DataError> + Send> { fn index_channel_items(self, pd: Podcast) -> Box<Future<Item = (), Error = DataError> + Send> {
let stream = stream::iter_ok::<_, DataError>(self.channel.items_owned()); let stream = stream::iter_ok::<_, DataError>(self.channel.items_owned());
let insert = stream
.filter_map(move |item| {
glue(&item, pd.id())
.map_err(|err| error!("Failed to parse an episode: {}", err))
.ok()
})
.filter_map(|state| match state {
IndexState::NotChanged => None,
// Update individual rows, and filter them
IndexState::Update((ref ep, rowid)) => {
ep.update(rowid)
.map_err(|err| error!("{}", err))
.map_err(|_| error!("Failed to index episode: {:?}.", ep.title()))
.ok();
None // Parse the episodes
}, let episodes = stream.filter_map(move |item| {
IndexState::Index(s) => Some(s), glue(&item, pd.id())
}) .map_err(|err| error!("Failed to parse an episode: {}", err))
// only Index is left, collect them for batch index .ok()
.collect();
let idx = insert.map(|vec| {
if !vec.is_empty() {
info!("Indexing {} episodes.", vec.len());
if let Err(err) = dbqueries::index_new_episodes(vec.as_slice()) {
error!("Failed batch indexng, Fallign back to individual indexing.");
error!("{}", err);
vec.iter().for_each(|ep| {
if let Err(err) = ep.index() {
error!("Failed to index episode: {:?}.", ep.title());
error!("{}", err);
};
})
}
}
}); });
// Filter errors, Index updatable episodes, return insertables.
let insertables = filter_episodes(episodes);
// Batch index insertable episodes.
let idx = insertables.and_then(|eps| ok(batch_insert_episodes(eps)));
Box::new(idx) Box::new(idx)
} }
} }
@ -110,6 +84,52 @@ fn determine_ep_state(
} }
} }
fn filter_episodes<'a, S>(
stream: S,
) -> Box<Future<Item = Vec<NewEpisode>, Error = DataError> + Send + 'a>
where
S: Stream<Item = IndexState<NewEpisode>, Error = DataError> + Send + 'a,
{
let list = stream.filter_map(|state| match state {
IndexState::NotChanged => None,
// Update individual rows, and filter them
IndexState::Update((ref ep, rowid)) => {
ep.update(rowid)
.map_err(|err| error!("{}", err))
.map_err(|_| error!("Failed to index episode: {:?}.", ep.title()))
.ok();
None
},
IndexState::Index(s) => Some(s),
})
// only Index is left, collect them for batch index
.collect();
Box::new(list)
}
fn batch_insert_episodes(episodes: Vec<NewEpisode>) {
if episodes.is_empty() {
return;
};
info!("Indexing {} episodes.", episodes.len());
dbqueries::index_new_episodes(episodes.as_slice())
.map_err(|err| {
error!("Failed batch indexng: {}", err);
info!("Fallign back to individual indexing.");
})
.unwrap_or_else(|_| {
episodes.iter().for_each(|ep| {
ep.index()
.map_err(|err| error!("Error: {}.", err))
.map_err(|_| error!("Failed to index episode: {:?}.", ep.title()))
.ok();
});
})
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use rss::Channel; use rss::Channel;