From 3c7ba8c9d9a0f1b48541e1ba0e70f6a5b89f8aac Mon Sep 17 00:00:00 2001 From: Jordan Petridis Date: Wed, 18 Apr 2018 02:49:21 +0300 Subject: [PATCH] Feed: Convert index_channel_items to a Future/Steam impl. --- Cargo.lock | 6 ++--- Cargo.toml | 2 ++ hammond-data/src/feed.rs | 50 +++++++++++++++++------------------- hammond-data/src/pipeline.rs | 10 -------- 4 files changed, 29 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a6b810c..9357162 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -691,7 +691,7 @@ dependencies = [ "rand 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "rayon 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "rfc822_sanitizer 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", - "rss 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "rss 1.4.0 (git+https://github.com/alatiera/rss)", "string_cache 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "string_cache_codegen 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1493,7 +1493,7 @@ dependencies = [ [[package]] name = "rss" version = "1.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" +source = "git+https://github.com/alatiera/rss#ca25285cb44d82c8dadfefe05f248ea9a14c996c" dependencies = [ "derive_builder 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", "failure 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2271,7 +2271,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum remove_dir_all 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "dfc5b3ce5d5ea144bb04ebd093a9e14e9765bcfec866aecda9b6dec43b3d1e24" "checksum reqwest 0.8.5 (registry+https://github.com/rust-lang/crates.io-index)" = "241faa9a8ca28a03cbbb9815a5d085f271d4c0168a19181f106aa93240c22ddb" "checksum rfc822_sanitizer 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "680e8305c1e0cdf836dc4bec5424e045f278c975a3cac36d1ca01c4695f9d815" -"checksum rss 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "dfaaee0d75c76ae197cc2671dc8c531314006e48790c725a9bf0b227d91d19e8" +"checksum rss 1.4.0 (git+https://github.com/alatiera/rss)" = "" "checksum rustc-demangle 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "11fb43a206a04116ffd7cfcf9bcb941f8eb6cc7ff667272246b0a1c74259a3cb" "checksum safemem 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e27a8b19b835f7aea908818e871f5cc3a5a186550c30773be987e155e8163d8f" "checksum schannel 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)" = "fbaffce35eb61c5b00846e73128b0cd62717e7c0ec46abbec132370d013975b4" diff --git a/Cargo.toml b/Cargo.toml index d1deebf..850ef0b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,3 +8,5 @@ members = [ [profile.release] debug = false +[patch.crates-io] +rss = { git = "https://github.com/alatiera/rss" } \ No newline at end of file diff --git a/hammond-data/src/feed.rs b/hammond-data/src/feed.rs index 7060e10..4201678 100644 --- a/hammond-data/src/feed.rs +++ b/hammond-data/src/feed.rs @@ -1,6 +1,8 @@ //! Index Feeds. use futures::future::*; +use futures::prelude::*; +use futures::stream; use rss; use dbqueries; @@ -26,7 +28,7 @@ impl Feed { pub fn index(self) -> Box + Send> { let fut = self.parse_podcast_async() .and_then(|pd| pd.to_podcast()) - .and_then(move |pd| self.index_channel_items(&pd)); + .and_then(move |pd| self.index_channel_items(pd)); Box::new(fut) } @@ -39,19 +41,11 @@ impl Feed { Box::new(ok(self.parse_podcast())) } - fn index_channel_items( - &self, - pd: &Podcast, - ) -> Box + Send> { - Box::new(ok(self.index_stuff(pd))) - } - - fn index_stuff(&self, pd: &Podcast) { - let insert: Vec<_> = self.channel - .items() - .iter() + fn index_channel_items(self, pd: Podcast) -> Box + Send> { + let stream = stream::iter_ok::<_, DataError>(self.channel.items_owened()); + let insert = stream // FIXME: print the error - .filter_map(|item| glue(item, pd.id()).ok()) + .filter_map(move |item| glue(&item, pd.id()).ok()) .filter_map(|state| match state { IndexState::NotChanged => None, // Update individual rows, and filter them @@ -68,20 +62,24 @@ impl Feed { // only Index is left, collect them for batch index .collect(); - if !insert.is_empty() { - info!("Indexing {} episodes.", insert.len()); - if let Err(err) = dbqueries::index_new_episodes(insert.as_slice()) { - error!("Failed batch indexng, Fallign back to individual indexing."); - error!("{}", err); + let idx = insert.map(|vec| { + if !vec.is_empty() { + info!("Indexing {} episodes.", vec.len()); + if let Err(err) = dbqueries::index_new_episodes(vec.as_slice()) { + error!("Failed batch indexng, Fallign back to individual indexing."); + error!("{}", err); - insert.iter().for_each(|ep| { - if let Err(err) = ep.index() { - error!("Failed to index episode: {:?}.", ep.title()); - error!("{}", err); - }; - }) + vec.iter().for_each(|ep| { + if let Err(err) = ep.index() { + error!("Failed to index episode: {:?}.", ep.title()); + error!("{}", err); + }; + }) + } } - } + }); + + Box::new(idx) } } @@ -175,7 +173,7 @@ mod tests { let feed = get_feed(path, 42); let pd = feed.parse_podcast().to_podcast().unwrap(); - feed.index_channel_items(&pd).wait().unwrap(); + feed.index_channel_items(pd).wait().unwrap(); assert_eq!(dbqueries::get_podcasts().unwrap().len(), 1); assert_eq!(dbqueries::get_episodes().unwrap().len(), 43); } diff --git a/hammond-data/src/pipeline.rs b/hammond-data/src/pipeline.rs index b91b6c6..2428898 100644 --- a/hammond-data/src/pipeline.rs +++ b/hammond-data/src/pipeline.rs @@ -107,16 +107,6 @@ fn determine_ep_state( } } -#[allow(unused)] -pub(crate) fn glue_async<'a>( - item: &'a rss::Item, - id: i32, -) -> Box, Error = DataError> + 'a> { - Box::new( - result(NewEpisodeMinimal::new(item, id)).and_then(move |ep| determine_ep_state(ep, item)), - ) -} - pub(crate) fn glue(item: &rss::Item, id: i32) -> Result, DataError> { NewEpisodeMinimal::new(item, id).and_then(move |ep| determine_ep_state(ep, item)) }