Feed: clean up clunky impl of indexing episodes.
This commit is contained in:
parent
7c03266d16
commit
049418c2f5
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -683,7 +683,6 @@ dependencies = [
|
||||
"html5ever 0.22.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"hyper 0.11.25 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"hyper-tls 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"itertools 0.7.8 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"lazy_static 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"native-tls 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
|
||||
@ -8,7 +8,6 @@ workspace = "../"
|
||||
ammonia = "1.1.0"
|
||||
chrono = "0.4.2"
|
||||
derive_builder = "0.5.1"
|
||||
itertools = "0.7.8"
|
||||
lazy_static = "1.0.0"
|
||||
log = "0.4.1"
|
||||
rayon = "1.0.1"
|
||||
|
||||
@ -1,3 +1,5 @@
|
||||
#![allow(unused)]
|
||||
|
||||
#[macro_use]
|
||||
extern crate criterion;
|
||||
use criterion::Criterion;
|
||||
@ -56,22 +58,6 @@ static FEEDS: &[(&[u8], &str)] = &[
|
||||
(STARS, STARS_URL),
|
||||
];
|
||||
|
||||
// This is broken and I don't know why.
|
||||
fn bench_pipeline(c: &mut Criterion) {
|
||||
truncate_db().unwrap();
|
||||
FEEDS.iter().for_each(|&(_, url)| {
|
||||
Source::from_url(url).unwrap();
|
||||
});
|
||||
|
||||
c.bench_function("pipline", move |b| {
|
||||
b.iter(|| {
|
||||
let sources = hammond_data::dbqueries::get_sources().unwrap();
|
||||
pipeline::run(sources, true).unwrap();
|
||||
})
|
||||
});
|
||||
truncate_db().unwrap();
|
||||
}
|
||||
|
||||
fn bench_index_large_feed(c: &mut Criterion) {
|
||||
truncate_db().unwrap();
|
||||
let url = "https://www.greaterthancode.com/feed/podcast";
|
||||
@ -114,10 +100,5 @@ fn bench_index_small_feed(c: &mut Criterion) {
|
||||
truncate_db().unwrap();
|
||||
}
|
||||
|
||||
criterion_group!(
|
||||
benches,
|
||||
bench_pipeline,
|
||||
bench_index_large_feed,
|
||||
bench_index_small_feed
|
||||
);
|
||||
criterion_group!(benches, bench_index_large_feed, bench_index_small_feed);
|
||||
criterion_main!(benches);
|
||||
|
||||
@ -1,17 +1,14 @@
|
||||
//! Index Feeds.
|
||||
|
||||
use futures::future::*;
|
||||
use itertools::{Either, Itertools};
|
||||
use rss;
|
||||
|
||||
use dbqueries;
|
||||
use errors::DataError;
|
||||
use models::{Index, IndexState, Update};
|
||||
use models::{NewEpisode, NewPodcast, Podcast};
|
||||
use models::{NewPodcast, Podcast};
|
||||
use pipeline::*;
|
||||
|
||||
type InsertUpdate = (Vec<NewEpisode>, Vec<Option<(NewEpisode, i32)>>);
|
||||
|
||||
/// Wrapper struct that hold a `Source` id and the `rss::Channel`
|
||||
/// that corresponds to the `Source.uri` field.
|
||||
#[derive(Debug, Clone, Builder, PartialEq)]
|
||||
@ -46,69 +43,45 @@ impl Feed {
|
||||
&self,
|
||||
pd: &Podcast,
|
||||
) -> Box<Future<Item = (), Error = DataError> + Send> {
|
||||
let fut = self.get_stuff(pd)
|
||||
.and_then(|(insert, update)| {
|
||||
if !insert.is_empty() {
|
||||
info!("Indexing {} episodes.", insert.len());
|
||||
if let Err(err) = dbqueries::index_new_episodes(insert.as_slice()) {
|
||||
error!("Failed batch indexng, Fallign back to individual indexing.");
|
||||
error!("{}", err);
|
||||
insert.iter().for_each(|ep| {
|
||||
if let Err(err) = ep.index() {
|
||||
error!("Failed to index episode: {:?}.", ep.title());
|
||||
error!("{}", err);
|
||||
};
|
||||
})
|
||||
}
|
||||
}
|
||||
Ok((insert, update))
|
||||
})
|
||||
.map(|(_, update)| {
|
||||
if !update.is_empty() {
|
||||
info!("Updating {} episodes.", update.len());
|
||||
// see get_stuff for more
|
||||
update
|
||||
.into_iter()
|
||||
.filter_map(|x| x)
|
||||
.for_each(|(ref ep, rowid)| {
|
||||
if let Err(err) = ep.update(rowid) {
|
||||
error!("Failed to index episode: {:?}.", ep.title());
|
||||
error!("{}", err);
|
||||
};
|
||||
})
|
||||
}
|
||||
});
|
||||
|
||||
Box::new(fut)
|
||||
Box::new(ok(self.index_stuff(pd)))
|
||||
}
|
||||
|
||||
fn get_stuff(
|
||||
&self,
|
||||
pd: &Podcast,
|
||||
) -> Box<Future<Item = InsertUpdate, Error = DataError> + Send> {
|
||||
let (insert, update): (Vec<_>, Vec<_>) = self.channel
|
||||
fn index_stuff(&self, pd: &Podcast) {
|
||||
let insert: Vec<_> = self.channel
|
||||
.items()
|
||||
.into_iter()
|
||||
.map(|item| glue_async(item, pd.id()))
|
||||
// This is sort of ugly but I think it's cheaper than pushing None
|
||||
// to updated and filtering it out later.
|
||||
// Even though we already map_filter in index_channel_items.
|
||||
// I am not sure what the optimizations are on match vs allocating None.
|
||||
.map(|fut| {
|
||||
fut.and_then(|x| match x {
|
||||
IndexState::NotChanged => Err(DataError::EpisodeNotChanged),
|
||||
_ => Ok(x),
|
||||
})
|
||||
})
|
||||
.flat_map(|fut| fut.wait())
|
||||
.partition_map(|state| match state {
|
||||
IndexState::Index(e) => Either::Left(e),
|
||||
IndexState::Update(e) => Either::Right(Some(e)),
|
||||
// This should never occur
|
||||
IndexState::NotChanged => Either::Right(None),
|
||||
});
|
||||
.iter()
|
||||
// FIXME: print the error
|
||||
.filter_map(|item| glue(item, pd.id()).ok())
|
||||
.filter_map(|state| match state {
|
||||
IndexState::NotChanged => None,
|
||||
// Update individual rows, and filter them
|
||||
IndexState::Update((ref ep, rowid)) => {
|
||||
ep.update(rowid)
|
||||
.map_err(|err| error!("{}", err))
|
||||
.map_err(|_| error!("Failed to index episode: {:?}.", ep.title()))
|
||||
.ok();
|
||||
|
||||
Box::new(ok((insert, update)))
|
||||
None
|
||||
},
|
||||
IndexState::Index(s) => Some(s),
|
||||
})
|
||||
// only Index is left, collect them for batch index
|
||||
.collect();
|
||||
|
||||
if !insert.is_empty() {
|
||||
info!("Indexing {} episodes.", insert.len());
|
||||
if let Err(err) = dbqueries::index_new_episodes(insert.as_slice()) {
|
||||
error!("Failed batch indexng, Fallign back to individual indexing.");
|
||||
error!("{}", err);
|
||||
|
||||
insert.iter().for_each(|ep| {
|
||||
if let Err(err) = ep.index() {
|
||||
error!("Failed to index episode: {:?}.", ep.title());
|
||||
error!("{}", err);
|
||||
};
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -206,27 +179,4 @@ mod tests {
|
||||
assert_eq!(dbqueries::get_podcasts().unwrap().len(), 1);
|
||||
assert_eq!(dbqueries::get_episodes().unwrap().len(), 43);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_feed_get_stuff() {
|
||||
truncate_db().unwrap();
|
||||
|
||||
let path = "tests/feeds/2018-01-20-Intercepted.xml";
|
||||
let feed = get_feed(path, 42);
|
||||
let pd = feed.parse_podcast().to_podcast().unwrap();
|
||||
|
||||
let (insert, update) = feed.get_stuff(&pd).wait().unwrap();
|
||||
assert_eq!(43, insert.len());
|
||||
assert_eq!(0, update.len());
|
||||
|
||||
feed.index().wait().unwrap();
|
||||
|
||||
let path = "tests/feeds/2018-02-03-Intercepted.xml";
|
||||
let feed = get_feed(path, 42);
|
||||
let pd = feed.parse_podcast().to_podcast().unwrap();
|
||||
|
||||
let (insert, update) = feed.get_stuff(&pd).wait().unwrap();
|
||||
assert_eq!(4, insert.len());
|
||||
assert_eq!(43, update.len());
|
||||
}
|
||||
}
|
||||
|
||||
@ -48,7 +48,6 @@ extern crate chrono;
|
||||
extern crate futures;
|
||||
extern crate hyper;
|
||||
extern crate hyper_tls;
|
||||
extern crate itertools;
|
||||
extern crate native_tls;
|
||||
extern crate num_cpus;
|
||||
extern crate rayon;
|
||||
|
||||
@ -106,6 +106,7 @@ fn determine_ep_state(
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
pub(crate) fn glue_async<'a>(
|
||||
item: &'a rss::Item,
|
||||
id: i32,
|
||||
@ -115,6 +116,10 @@ pub(crate) fn glue_async<'a>(
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) fn glue(item: &rss::Item, id: i32) -> Result<IndexState<NewEpisode>, DataError> {
|
||||
NewEpisodeMinimal::new(item, id).and_then(move |ep| determine_ep_state(ep, item))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
Loading…
Reference in New Issue
Block a user