Feed: Add an index_async implementation.

This commit is contained in:
Jordan Petridis 2018-01-18 18:57:44 +02:00
parent 1394366f91
commit 9dc555cad7
No known key found for this signature in database
GPG Key ID: CEABAD9F5683B9A6
3 changed files with 68 additions and 7 deletions

View File

@ -4,12 +4,11 @@ use futures::future::*;
use itertools::{Either, Itertools};
use rayon::prelude::*;
use rss;
// use futures::prelude::*;
use dbqueries;
use errors::*;
use models::{IndexState, Update};
use models::{NewPodcast, Podcast};
use models::{NewEpisode, NewPodcast, Podcast};
use pipeline::*;
#[derive(Debug)]
@ -32,17 +31,23 @@ impl Feed {
self.index_channel_items(&pd)
}
#[allow(dead_code)]
/// Index the contents of the RSS `Feed` into the database.
pub fn index_async(self) -> Box<Future<Item = (), Error = Error>> {
let fut = self.parse_podcast_async()
.and_then(|pd| pd.into_podcast())
.and_then(move |pd| self.index_channel_items_async(&pd));
Box::new(fut)
}
fn parse_podcast(&self) -> NewPodcast {
NewPodcast::new(&self.channel, self.source_id)
}
#[allow(dead_code)]
fn parse_podcast_futture(&self) -> Box<FutureResult<NewPodcast, Error>> {
fn parse_podcast_async(&self) -> Box<FutureResult<NewPodcast, Error>> {
Box::new(ok(self.parse_podcast()))
}
#[allow(dead_code)]
fn index_channel_items(&self, pd: &Podcast) -> Result<()> {
let items = self.channel.items();
let (insert, update): (Vec<_>, Vec<_>) = items
@ -70,6 +75,51 @@ impl Feed {
Ok(())
}
fn index_channel_items_async(&self, pd: &Podcast) -> Box<Future<Item = (), Error = Error>> {
let fut = self.get_stuff(pd)
.and_then(|(insert, update)| {
info!("Indexing {} episodes.", insert.len());
dbqueries::index_new_episodes(insert.as_slice())?;
Ok((insert, update))
})
.map(|(_, update)| {
info!("Updating {} episodes.", update.len());
update.iter().for_each(|&(ref ep, rowid)| {
if let Err(err) = ep.update(rowid) {
error!("Failed to index episode: {:?}.", ep.title());
error!("Error msg: {}", err);
};
})
});
Box::new(fut)
}
fn get_stuff(
&self,
pd: &Podcast,
) -> Box<Future<Item = (Vec<NewEpisode>, Vec<(NewEpisode, i32)>), Error = Error>> {
let (insert, update): (Vec<_>, Vec<_>) = self.channel
.items()
.into_iter()
.map(|item| glue_async(item, pd.id()))
.map(|fut| {
fut.and_then(|x| match x {
IndexState::NotChanged => bail!("Nothing to do here."),
_ => Ok(x),
})
})
.flat_map(|fut| fut.wait())
.partition_map(|state| match state {
IndexState::Index(e) => Either::Left(e),
IndexState::Update(e) => Either::Right(e),
// How not to use the unimplemented macro...
IndexState::NotChanged => unimplemented!(),
});
Box::new(ok((insert, update)))
}
}
#[cfg(test)]

View File

@ -69,6 +69,7 @@ impl Index for NewPodcast {
// This is messy
if (self.link() != old.link()) || (self.title() != old.title())
|| (self.image_uri() != old.image_uri())
|| (self.description() != old.description())
{
self.update(old.id())
} else {

View File

@ -37,7 +37,7 @@ pub fn pipeline<S: IntoIterator<Item = Source>>(sources: S, ignore_etags: bool)
.into_iter()
// FIXME: Make proper indexing futures instead of wrapping up existing
// blocking functions
.map(|s| s.into_feed(&client, ignore_etags).and_then(|feed| feed.index()))
.map(|s| s.into_feed(&client, ignore_etags).and_then(|feed| feed.index_async()))
.collect();
let f = core.run(collect_futures(list))?;
@ -73,6 +73,16 @@ pub(crate) fn glue(item: &rss::Item, id: i32) -> Result<IndexState<NewEpisode>>
determine_ep_state(e, &item)
}
#[allow(dead_code)]
pub(crate) fn glue_async<'a>(
item: &'a rss::Item,
id: i32,
) -> Box<Future<Item = IndexState<NewEpisode>, Error = Error> + 'a> {
Box::new(
result(NewEpisodeMinimal::new(item, id)).and_then(move |ep| determine_ep_state(ep, item)),
)
}
// Weird magic from #rust irc channel
// kudos to remexre
/// docs