From 049418c2f504219d1e4621f02f47392713aa1836 Mon Sep 17 00:00:00 2001 From: Jordan Petridis Date: Tue, 17 Apr 2018 12:05:10 +0300 Subject: [PATCH] Feed: clean up clunky impl of indexing episodes. --- Cargo.lock | 1 - hammond-data/Cargo.toml | 1 - hammond-data/benches/bench.rs | 25 +------ hammond-data/src/feed.rs | 122 ++++++++++------------------------ hammond-data/src/lib.rs | 1 - hammond-data/src/pipeline.rs | 5 ++ 6 files changed, 44 insertions(+), 111 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 302744a..a6b810c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -683,7 +683,6 @@ dependencies = [ "html5ever 0.22.0 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.11.25 (registry+https://github.com/rust-lang/crates.io-index)", "hyper-tls 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", - "itertools 0.7.8 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "native-tls 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/hammond-data/Cargo.toml b/hammond-data/Cargo.toml index 9803326..641ea23 100644 --- a/hammond-data/Cargo.toml +++ b/hammond-data/Cargo.toml @@ -8,7 +8,6 @@ workspace = "../" ammonia = "1.1.0" chrono = "0.4.2" derive_builder = "0.5.1" -itertools = "0.7.8" lazy_static = "1.0.0" log = "0.4.1" rayon = "1.0.1" diff --git a/hammond-data/benches/bench.rs b/hammond-data/benches/bench.rs index 8eb4ae2..beb6fc0 100644 --- a/hammond-data/benches/bench.rs +++ b/hammond-data/benches/bench.rs @@ -1,3 +1,5 @@ +#![allow(unused)] + #[macro_use] extern crate criterion; use criterion::Criterion; @@ -56,22 +58,6 @@ static FEEDS: &[(&[u8], &str)] = &[ (STARS, STARS_URL), ]; -// This is broken and I don't know why. -fn bench_pipeline(c: &mut Criterion) { - truncate_db().unwrap(); - FEEDS.iter().for_each(|&(_, url)| { - Source::from_url(url).unwrap(); - }); - - c.bench_function("pipline", move |b| { - b.iter(|| { - let sources = hammond_data::dbqueries::get_sources().unwrap(); - pipeline::run(sources, true).unwrap(); - }) - }); - truncate_db().unwrap(); -} - fn bench_index_large_feed(c: &mut Criterion) { truncate_db().unwrap(); let url = "https://www.greaterthancode.com/feed/podcast"; @@ -114,10 +100,5 @@ fn bench_index_small_feed(c: &mut Criterion) { truncate_db().unwrap(); } -criterion_group!( - benches, - bench_pipeline, - bench_index_large_feed, - bench_index_small_feed -); +criterion_group!(benches, bench_index_large_feed, bench_index_small_feed); criterion_main!(benches); diff --git a/hammond-data/src/feed.rs b/hammond-data/src/feed.rs index d896b8e..7060e10 100644 --- a/hammond-data/src/feed.rs +++ b/hammond-data/src/feed.rs @@ -1,17 +1,14 @@ //! Index Feeds. use futures::future::*; -use itertools::{Either, Itertools}; use rss; use dbqueries; use errors::DataError; use models::{Index, IndexState, Update}; -use models::{NewEpisode, NewPodcast, Podcast}; +use models::{NewPodcast, Podcast}; use pipeline::*; -type InsertUpdate = (Vec, Vec>); - /// Wrapper struct that hold a `Source` id and the `rss::Channel` /// that corresponds to the `Source.uri` field. #[derive(Debug, Clone, Builder, PartialEq)] @@ -46,69 +43,45 @@ impl Feed { &self, pd: &Podcast, ) -> Box + Send> { - let fut = self.get_stuff(pd) - .and_then(|(insert, update)| { - if !insert.is_empty() { - info!("Indexing {} episodes.", insert.len()); - if let Err(err) = dbqueries::index_new_episodes(insert.as_slice()) { - error!("Failed batch indexng, Fallign back to individual indexing."); - error!("{}", err); - insert.iter().for_each(|ep| { - if let Err(err) = ep.index() { - error!("Failed to index episode: {:?}.", ep.title()); - error!("{}", err); - }; - }) - } - } - Ok((insert, update)) - }) - .map(|(_, update)| { - if !update.is_empty() { - info!("Updating {} episodes.", update.len()); - // see get_stuff for more - update - .into_iter() - .filter_map(|x| x) - .for_each(|(ref ep, rowid)| { - if let Err(err) = ep.update(rowid) { - error!("Failed to index episode: {:?}.", ep.title()); - error!("{}", err); - }; - }) - } - }); - - Box::new(fut) + Box::new(ok(self.index_stuff(pd))) } - fn get_stuff( - &self, - pd: &Podcast, - ) -> Box + Send> { - let (insert, update): (Vec<_>, Vec<_>) = self.channel + fn index_stuff(&self, pd: &Podcast) { + let insert: Vec<_> = self.channel .items() - .into_iter() - .map(|item| glue_async(item, pd.id())) - // This is sort of ugly but I think it's cheaper than pushing None - // to updated and filtering it out later. - // Even though we already map_filter in index_channel_items. - // I am not sure what the optimizations are on match vs allocating None. - .map(|fut| { - fut.and_then(|x| match x { - IndexState::NotChanged => Err(DataError::EpisodeNotChanged), - _ => Ok(x), - }) - }) - .flat_map(|fut| fut.wait()) - .partition_map(|state| match state { - IndexState::Index(e) => Either::Left(e), - IndexState::Update(e) => Either::Right(Some(e)), - // This should never occur - IndexState::NotChanged => Either::Right(None), - }); + .iter() + // FIXME: print the error + .filter_map(|item| glue(item, pd.id()).ok()) + .filter_map(|state| match state { + IndexState::NotChanged => None, + // Update individual rows, and filter them + IndexState::Update((ref ep, rowid)) => { + ep.update(rowid) + .map_err(|err| error!("{}", err)) + .map_err(|_| error!("Failed to index episode: {:?}.", ep.title())) + .ok(); - Box::new(ok((insert, update))) + None + }, + IndexState::Index(s) => Some(s), + }) + // only Index is left, collect them for batch index + .collect(); + + if !insert.is_empty() { + info!("Indexing {} episodes.", insert.len()); + if let Err(err) = dbqueries::index_new_episodes(insert.as_slice()) { + error!("Failed batch indexng, Fallign back to individual indexing."); + error!("{}", err); + + insert.iter().for_each(|ep| { + if let Err(err) = ep.index() { + error!("Failed to index episode: {:?}.", ep.title()); + error!("{}", err); + }; + }) + } + } } } @@ -206,27 +179,4 @@ mod tests { assert_eq!(dbqueries::get_podcasts().unwrap().len(), 1); assert_eq!(dbqueries::get_episodes().unwrap().len(), 43); } - - #[test] - fn test_feed_get_stuff() { - truncate_db().unwrap(); - - let path = "tests/feeds/2018-01-20-Intercepted.xml"; - let feed = get_feed(path, 42); - let pd = feed.parse_podcast().to_podcast().unwrap(); - - let (insert, update) = feed.get_stuff(&pd).wait().unwrap(); - assert_eq!(43, insert.len()); - assert_eq!(0, update.len()); - - feed.index().wait().unwrap(); - - let path = "tests/feeds/2018-02-03-Intercepted.xml"; - let feed = get_feed(path, 42); - let pd = feed.parse_podcast().to_podcast().unwrap(); - - let (insert, update) = feed.get_stuff(&pd).wait().unwrap(); - assert_eq!(4, insert.len()); - assert_eq!(43, update.len()); - } } diff --git a/hammond-data/src/lib.rs b/hammond-data/src/lib.rs index ebb2d2b..2c2ee4f 100644 --- a/hammond-data/src/lib.rs +++ b/hammond-data/src/lib.rs @@ -48,7 +48,6 @@ extern crate chrono; extern crate futures; extern crate hyper; extern crate hyper_tls; -extern crate itertools; extern crate native_tls; extern crate num_cpus; extern crate rayon; diff --git a/hammond-data/src/pipeline.rs b/hammond-data/src/pipeline.rs index f71d3ce..0a007f2 100644 --- a/hammond-data/src/pipeline.rs +++ b/hammond-data/src/pipeline.rs @@ -106,6 +106,7 @@ fn determine_ep_state( } } +#[allow(unused)] pub(crate) fn glue_async<'a>( item: &'a rss::Item, id: i32, @@ -115,6 +116,10 @@ pub(crate) fn glue_async<'a>( ) } +pub(crate) fn glue(item: &rss::Item, id: i32) -> Result, DataError> { + NewEpisodeMinimal::new(item, id).and_then(move |ep| determine_ep_state(ep, item)) +} + #[cfg(test)] mod tests { use super::*;