From 098c5755b05da1470426cbd3148e28961d37e6bf Mon Sep 17 00:00:00 2001 From: Jordan Petridis Date: Thu, 18 Jan 2018 13:47:40 +0200 Subject: [PATCH] hammond-data: Do batch indexing of new_episodes. --- hammond-data/Cargo.toml | 2 +- hammond-data/src/dbqueries.rs | 16 +++++++-- hammond-data/src/feed.rs | 61 +++++++++++++++++++++++------------ 3 files changed, 56 insertions(+), 23 deletions(-) diff --git a/hammond-data/Cargo.toml b/hammond-data/Cargo.toml index 13b6037..f360ede 100644 --- a/hammond-data/Cargo.toml +++ b/hammond-data/Cargo.toml @@ -10,7 +10,7 @@ chrono = "0.4.0" derive_builder = "0.5.1" dotenv = "0.10.1" error-chain = "0.11.0" -itertools = "0.7.4" +itertools = "0.7.6" lazy_static = "1.0.0" log = "0.3.8" r2d2 = "0.8.2" diff --git a/hammond-data/src/dbqueries.rs b/hammond-data/src/dbqueries.rs index 1493ce0..019f218 100644 --- a/hammond-data/src/dbqueries.rs +++ b/hammond-data/src/dbqueries.rs @@ -6,8 +6,8 @@ use diesel::prelude::*; use database::connection; use errors::*; -use models::{Episode, EpisodeCleanerQuery, EpisodeMinimal, EpisodeWidgetQuery, Podcast, - PodcastCoverQuery, Source}; +use models::{Episode, EpisodeCleanerQuery, EpisodeMinimal, EpisodeWidgetQuery, NewEpisode, + Podcast, PodcastCoverQuery, Source}; pub fn get_sources() -> Result> { use schema::source::dsl::*; @@ -322,6 +322,18 @@ pub fn episode_exists(title_: &str, podcast_id_: i32) -> Result { .map_err(From::from) } +pub(crate) fn index_new_episodes(eps: &[NewEpisode]) -> Result<()> { + use schema::episode::dsl::*; + let db = connection(); + let con = db.get()?; + + diesel::insert_into(episode) + .values(eps) + .execute(&*con) + .map_err(From::from) + .map(|_| ()) +} + pub fn update_none_to_played_now(parent: &Podcast) -> Result { use schema::episode::dsl::*; diff --git a/hammond-data/src/feed.rs b/hammond-data/src/feed.rs index 63050ca..515130e 100644 --- a/hammond-data/src/feed.rs +++ b/hammond-data/src/feed.rs @@ -1,13 +1,16 @@ //! Index Feeds. use futures::future::*; -// use futures::prelude::*; -use errors::*; -use models::{NewEpisode, NewPodcast, Podcast}; +use itertools::{Either, Itertools}; use rayon::prelude::*; use rss; -// use models::{IndexState, Source}; -// use pipeline::*; +// use futures::prelude::*; + +use dbqueries; +use errors::*; +use models::{IndexState, Update}; +use models::{NewEpisode, NewPodcast, Podcast}; +use pipeline::*; #[derive(Debug)] /// Wrapper struct that hold a `Source` id and the `rss::Channel` @@ -26,10 +29,11 @@ impl Feed { /// Index the contents of the RSS `Feed` into the database. pub fn index(&self) -> Result<()> { let pd = self.parse_podcast().into_podcast()?; - self.index_channel_items(&pd) + self.index_channel_items_sync(&pd) } // TODO: Refactor transcactions and find a way to do it in parallel. + #[allow(dead_code)] fn index_channel_items(&self, pd: &Podcast) -> Result<()> { let episodes = self.parse_channel_items(pd); @@ -42,6 +46,7 @@ impl Feed { Ok(()) } + #[allow(dead_code)] fn parse_podcast(&self) -> NewPodcast { NewPodcast::new(&self.channel, self.source_id) } @@ -51,6 +56,7 @@ impl Feed { Box::new(ok(self.parse_podcast())) } + #[allow(dead_code)] fn parse_channel_items(&self, pd: &Podcast) -> Vec { let items = self.channel.items(); let new_episodes: Vec<_> = items @@ -60,21 +66,36 @@ impl Feed { new_episodes } -} -// fn parse_channel_items2( -// &self, -// pd: &Podcast, -// ) -> (Vec>, Vec>) { -// let items = self.channel.items(); -// let (insert, update): (Vec<_>, Vec<_>) = items -// .into_iter() -// .filter_map(|item| glue(item, pd.id()).ok()) -// .filter(|&state| state == IndexState::NotChanged) -// .partition(|&state| state == IndexState::Index); -// (insert, update) -// } -// } + #[allow(dead_code)] + fn index_channel_items_sync(&self, pd: &Podcast) -> Result<()> { + let items = self.channel.items(); + let (insert, update): (Vec<_>, Vec<_>) = items + .into_iter() + .filter_map(|item| glue(item, pd.id()).ok()) + .filter(|state| match state { + &IndexState::NotChanged => false, + _ => true, + }) + .partition_map(|state| match state { + IndexState::Index(e) => Either::Left(e), + IndexState::Update(e) => Either::Right(e), + // How not to use the unimplemented macro... + IndexState::NotChanged => unimplemented!(), + }); + + dbqueries::index_new_episodes(insert.as_slice())?; + + update.par_iter().for_each(|&(ref ep, rowid)| { + if let Err(err) = ep.update(rowid) { + error!("Failed to index episode: {:?}.", ep.title()); + error!("Error msg: {}", err); + }; + }); + + Ok(()) + } +} #[cfg(test)] mod tests {