diff --git a/hammond-data/src/feed.rs b/hammond-data/src/feed.rs index 2395779..44ef022 100644 --- a/hammond-data/src/feed.rs +++ b/hammond-data/src/feed.rs @@ -4,12 +4,11 @@ use futures::future::*; use itertools::{Either, Itertools}; use rayon::prelude::*; use rss; -// use futures::prelude::*; use dbqueries; use errors::*; use models::{IndexState, Update}; -use models::{NewPodcast, Podcast}; +use models::{NewEpisode, NewPodcast, Podcast}; use pipeline::*; #[derive(Debug)] @@ -32,17 +31,23 @@ impl Feed { self.index_channel_items(&pd) } - #[allow(dead_code)] + /// Index the contents of the RSS `Feed` into the database. + pub fn index_async(self) -> Box> { + let fut = self.parse_podcast_async() + .and_then(|pd| pd.into_podcast()) + .and_then(move |pd| self.index_channel_items_async(&pd)); + + Box::new(fut) + } + fn parse_podcast(&self) -> NewPodcast { NewPodcast::new(&self.channel, self.source_id) } - #[allow(dead_code)] - fn parse_podcast_futture(&self) -> Box> { + fn parse_podcast_async(&self) -> Box> { Box::new(ok(self.parse_podcast())) } - #[allow(dead_code)] fn index_channel_items(&self, pd: &Podcast) -> Result<()> { let items = self.channel.items(); let (insert, update): (Vec<_>, Vec<_>) = items @@ -70,6 +75,51 @@ impl Feed { Ok(()) } + + fn index_channel_items_async(&self, pd: &Podcast) -> Box> { + let fut = self.get_stuff(pd) + .and_then(|(insert, update)| { + info!("Indexing {} episodes.", insert.len()); + dbqueries::index_new_episodes(insert.as_slice())?; + Ok((insert, update)) + }) + .map(|(_, update)| { + info!("Updating {} episodes.", update.len()); + update.iter().for_each(|&(ref ep, rowid)| { + if let Err(err) = ep.update(rowid) { + error!("Failed to index episode: {:?}.", ep.title()); + error!("Error msg: {}", err); + }; + }) + }); + + Box::new(fut) + } + + fn get_stuff( + &self, + pd: &Podcast, + ) -> Box, Vec<(NewEpisode, i32)>), Error = Error>> { + let (insert, update): (Vec<_>, Vec<_>) = self.channel + .items() + .into_iter() + .map(|item| glue_async(item, pd.id())) + .map(|fut| { + fut.and_then(|x| match x { + IndexState::NotChanged => bail!("Nothing to do here."), + _ => Ok(x), + }) + }) + .flat_map(|fut| fut.wait()) + .partition_map(|state| match state { + IndexState::Index(e) => Either::Left(e), + IndexState::Update(e) => Either::Right(e), + // How not to use the unimplemented macro... + IndexState::NotChanged => unimplemented!(), + }); + + Box::new(ok((insert, update))) + } } #[cfg(test)] diff --git a/hammond-data/src/models/new_podcast.rs b/hammond-data/src/models/new_podcast.rs index 7168aa3..c44402d 100644 --- a/hammond-data/src/models/new_podcast.rs +++ b/hammond-data/src/models/new_podcast.rs @@ -69,6 +69,7 @@ impl Index for NewPodcast { // This is messy if (self.link() != old.link()) || (self.title() != old.title()) || (self.image_uri() != old.image_uri()) + || (self.description() != old.description()) { self.update(old.id()) } else { diff --git a/hammond-data/src/pipeline.rs b/hammond-data/src/pipeline.rs index cc8bfd3..c2064e3 100644 --- a/hammond-data/src/pipeline.rs +++ b/hammond-data/src/pipeline.rs @@ -37,7 +37,7 @@ pub fn pipeline>(sources: S, ignore_etags: bool) .into_iter() // FIXME: Make proper indexing futures instead of wrapping up existing // blocking functions - .map(|s| s.into_feed(&client, ignore_etags).and_then(|feed| feed.index())) + .map(|s| s.into_feed(&client, ignore_etags).and_then(|feed| feed.index_async())) .collect(); let f = core.run(collect_futures(list))?; @@ -73,6 +73,16 @@ pub(crate) fn glue(item: &rss::Item, id: i32) -> Result> determine_ep_state(e, &item) } +#[allow(dead_code)] +pub(crate) fn glue_async<'a>( + item: &'a rss::Item, + id: i32, +) -> Box, Error = Error> + 'a> { + Box::new( + result(NewEpisodeMinimal::new(item, id)).and_then(move |ep| determine_ep_state(ep, item)), + ) +} + // Weird magic from #rust irc channel // kudos to remexre /// docs