hammond-data: Do batch indexing of new_episodes.

This commit is contained in:
Jordan Petridis 2018-01-18 13:47:40 +02:00
parent 93372a30d0
commit 098c5755b0
No known key found for this signature in database
GPG Key ID: CEABAD9F5683B9A6
3 changed files with 56 additions and 23 deletions

View File

@ -10,7 +10,7 @@ chrono = "0.4.0"
derive_builder = "0.5.1"
dotenv = "0.10.1"
error-chain = "0.11.0"
itertools = "0.7.4"
itertools = "0.7.6"
lazy_static = "1.0.0"
log = "0.3.8"
r2d2 = "0.8.2"

View File

@ -6,8 +6,8 @@ use diesel::prelude::*;
use database::connection;
use errors::*;
use models::{Episode, EpisodeCleanerQuery, EpisodeMinimal, EpisodeWidgetQuery, Podcast,
PodcastCoverQuery, Source};
use models::{Episode, EpisodeCleanerQuery, EpisodeMinimal, EpisodeWidgetQuery, NewEpisode,
Podcast, PodcastCoverQuery, Source};
pub fn get_sources() -> Result<Vec<Source>> {
use schema::source::dsl::*;
@ -322,6 +322,18 @@ pub fn episode_exists(title_: &str, podcast_id_: i32) -> Result<bool> {
.map_err(From::from)
}
pub(crate) fn index_new_episodes(eps: &[NewEpisode]) -> Result<()> {
use schema::episode::dsl::*;
let db = connection();
let con = db.get()?;
diesel::insert_into(episode)
.values(eps)
.execute(&*con)
.map_err(From::from)
.map(|_| ())
}
pub fn update_none_to_played_now(parent: &Podcast) -> Result<usize> {
use schema::episode::dsl::*;

View File

@ -1,13 +1,16 @@
//! Index Feeds.
use futures::future::*;
// use futures::prelude::*;
use errors::*;
use models::{NewEpisode, NewPodcast, Podcast};
use itertools::{Either, Itertools};
use rayon::prelude::*;
use rss;
// use models::{IndexState, Source};
// use pipeline::*;
// use futures::prelude::*;
use dbqueries;
use errors::*;
use models::{IndexState, Update};
use models::{NewEpisode, NewPodcast, Podcast};
use pipeline::*;
#[derive(Debug)]
/// Wrapper struct that hold a `Source` id and the `rss::Channel`
@ -26,10 +29,11 @@ impl Feed {
/// Index the contents of the RSS `Feed` into the database.
pub fn index(&self) -> Result<()> {
let pd = self.parse_podcast().into_podcast()?;
self.index_channel_items(&pd)
self.index_channel_items_sync(&pd)
}
// TODO: Refactor transcactions and find a way to do it in parallel.
#[allow(dead_code)]
fn index_channel_items(&self, pd: &Podcast) -> Result<()> {
let episodes = self.parse_channel_items(pd);
@ -42,6 +46,7 @@ impl Feed {
Ok(())
}
#[allow(dead_code)]
fn parse_podcast(&self) -> NewPodcast {
NewPodcast::new(&self.channel, self.source_id)
}
@ -51,6 +56,7 @@ impl Feed {
Box::new(ok(self.parse_podcast()))
}
#[allow(dead_code)]
fn parse_channel_items(&self, pd: &Podcast) -> Vec<NewEpisode> {
let items = self.channel.items();
let new_episodes: Vec<_> = items
@ -60,21 +66,36 @@ impl Feed {
new_episodes
}
}
// fn parse_channel_items2(
// &self,
// pd: &Podcast,
// ) -> (Vec<IndexState<NewEpisode>>, Vec<IndexState<NewEpisode>>) {
// let items = self.channel.items();
// let (insert, update): (Vec<_>, Vec<_>) = items
// .into_iter()
// .filter_map(|item| glue(item, pd.id()).ok())
// .filter(|&state| state == IndexState::NotChanged)
// .partition(|&state| state == IndexState::Index);
// (insert, update)
// }
// }
#[allow(dead_code)]
fn index_channel_items_sync(&self, pd: &Podcast) -> Result<()> {
let items = self.channel.items();
let (insert, update): (Vec<_>, Vec<_>) = items
.into_iter()
.filter_map(|item| glue(item, pd.id()).ok())
.filter(|state| match state {
&IndexState::NotChanged => false,
_ => true,
})
.partition_map(|state| match state {
IndexState::Index(e) => Either::Left(e),
IndexState::Update(e) => Either::Right(e),
// How not to use the unimplemented macro...
IndexState::NotChanged => unimplemented!(),
});
dbqueries::index_new_episodes(insert.as_slice())?;
update.par_iter().for_each(|&(ref ep, rowid)| {
if let Err(err) = ep.update(rowid) {
error!("Failed to index episode: {:?}.", ep.title());
error!("Error msg: {}", err);
};
});
Ok(())
}
}
#[cfg(test)]
mod tests {