Pipeline: Split the pipeline from the cpu-pool and the tokio-core.

This commit is contained in:
Jordan Petridis 2018-01-26 13:34:04 +02:00
parent b370af35d4
commit 2912bad110
No known key found for this signature in database
GPG Key ID: CEABAD9F5683B9A6
5 changed files with 93 additions and 45 deletions

View File

@ -105,7 +105,6 @@ mod tests {
use Source;
use database::truncate_db;
use dbqueries;
use pipeline;
use utils::get_feed;
use std::fs;
@ -143,27 +142,6 @@ mod tests {
]
};
#[test]
/// Insert feeds and update/index them.
fn test_index_loop() {
truncate_db().unwrap();
URLS.iter().for_each(|&(_, url)| {
// Index the urls into the source table.
Source::from_url(url).unwrap();
});
let sources = dbqueries::get_sources().unwrap();
pipeline::pipeline(sources, true).unwrap();
let sources = dbqueries::get_sources().unwrap();
// Run again to cover Unique constrains erros.
pipeline::pipeline(sources, true).unwrap();
// Assert the index rows equal the controlled results
assert_eq!(dbqueries::get_sources().unwrap().len(), 5);
assert_eq!(dbqueries::get_podcasts().unwrap().len(), 5);
assert_eq!(dbqueries::get_episodes().unwrap().len(), 354);
}
#[test]
fn test_complete_index() {
truncate_db().unwrap();

View File

@ -6,6 +6,7 @@ use futures_cpupool::CpuPool;
// use futures::prelude::*;
use hyper::Client;
use hyper::client::HttpConnector;
use hyper_tls::HttpsConnector;
use tokio_core::reactor::Core;
@ -43,7 +44,32 @@ macro_rules! clone {
/// Messy temp diagram:
/// Source -> GET Request -> Update Etags -> Check Status -> Parse xml/Rss ->
/// Convert `rss::Channel` into Feed -> Index Podcast -> Index Episodes.
pub fn pipeline<S: IntoIterator<Item = Source>>(sources: S, ignore_etags: bool) -> Result<()> {
///
/// # Panics
/// If `sources` contains no Items.
pub fn pipeline<S: IntoIterator<Item = Source>>(
sources: S,
ignore_etags: bool,
tokio_core: &mut Core,
pool: CpuPool,
client: Client<HttpsConnector<HttpConnector>>,
) -> Result<()> {
let list: Vec<_> = sources
.into_iter()
.map(clone!(pool => move |s| s.into_feed(&client, pool.clone(), ignore_etags)))
.map(|fut| fut.and_then(clone!(pool => move |feed| pool.clone().spawn(feed.index()))))
.map(|fut| fut.map(|_| ()).map_err(|err| error!("Error: {}", err)))
.collect();
assert!(!list.is_empty());
// Thats not really concurrent yet I think.
tokio_core.run(collect_futures(list))?;
Ok(())
}
/// Creates a tokio-core, a cpu_pool, and a hyper::Client and runs the pipeline.
pub fn run<S: IntoIterator<Item = Source>>(sources: S, ignore_etags: bool) -> Result<()> {
let pool = CpuPool::new_num_cpus();
let mut core = Core::new()?;
let handle = core.handle();
@ -52,20 +78,7 @@ pub fn pipeline<S: IntoIterator<Item = Source>>(sources: S, ignore_etags: bool)
.connector(HttpsConnector::new(4, &handle)?)
.build(&handle);
let list: Vec<_> = sources
.into_iter()
.map(clone!(pool => move |s| s.into_feed(&client, pool.clone(), ignore_etags)))
.map(|fut| fut.and_then(clone!(pool => move |feed| pool.clone().spawn(feed.index()))))
.map(|fut| fut.map(|_| ()).map_err(|err| error!("Error: {}", err)))
.collect();
// TODO: this could be moved at the start of the function.
if !list.is_empty() {
// Thats not really concurrent yet I think.
core.run(collect_futures(list))?;
}
Ok(())
pipeline(sources, ignore_etags, &mut core, pool, client)
}
fn determine_ep_state(ep: NewEpisodeMinimal, item: &rss::Item) -> Result<IndexState<NewEpisode>> {
@ -121,3 +134,61 @@ where
})
}))
}
#[cfg(test)]
mod tests {
use super::*;
use Source;
use database::truncate_db;
// (path, url) tuples.
const URLS: &[(&str, &str)] = {
&[
(
"tests/feeds/2018-01-20-Intercepted.xml",
"https://web.archive.org/web/20180120083840if_/https://feeds.feedburner.\
com/InterceptedWithJeremyScahill",
),
(
"tests/feeds/2018-01-20-LinuxUnplugged.xml",
"https://web.archive.org/web/20180120110314if_/https://feeds.feedburner.\
com/linuxunplugged",
),
(
"tests/feeds/2018-01-20-TheTipOff.xml",
"https://web.archive.org/web/20180120110727if_/https://rss.acast.com/thetipoff",
),
(
"tests/feeds/2018-01-20-StealTheStars.xml",
"https://web.archive.org/web/20180120104957if_/https://rss.art19.\
com/steal-the-stars",
),
(
"tests/feeds/2018-01-20-GreaterThanCode.xml",
"https://web.archive.org/web/20180120104741if_/https://www.greaterthancode.\
com/feed/podcast",
),
]
};
#[test]
/// Insert feeds and update/index them.
fn test_pipeline() {
truncate_db().unwrap();
URLS.iter().for_each(|&(_, url)| {
// Index the urls into the source table.
Source::from_url(url).unwrap();
});
let sources = dbqueries::get_sources().unwrap();
run(sources, true).unwrap();
let sources = dbqueries::get_sources().unwrap();
// Run again to cover Unique constrains erros.
run(sources, true).unwrap();
// Assert the index rows equal the controlled results
assert_eq!(dbqueries::get_sources().unwrap().len(), 5);
assert_eq!(dbqueries::get_podcasts().unwrap().len(), 5);
assert_eq!(dbqueries::get_episodes().unwrap().len(), 354);
}
}

View File

@ -218,7 +218,7 @@ mod tests {
use super::*;
use hammond_data::Source;
use hammond_data::dbqueries;
use hammond_data::pipeline::pipeline;
use hammond_data::pipeline;
#[test]
// This test inserts an rss feed to your `XDG_DATA/hammond/hammond.db` so we make it explicit
@ -231,7 +231,7 @@ mod tests {
// Copy it's id
let sid = source.id();
// Convert Source it into a future Feed and index it
pipeline(vec![source], true).unwrap();
pipeline::run(vec![source], true).unwrap();
// Get the Podcast
let pd = dbqueries::get_podcast_from_source_id(sid).unwrap().into();

View File

@ -119,7 +119,7 @@ mod tests {
use hammond_data::{Episode, Source};
use hammond_data::dbqueries;
use hammond_data::pipeline::pipeline;
use hammond_data::pipeline;
use hammond_data::utils::get_download_folder;
use std::{thread, time};
@ -138,7 +138,7 @@ mod tests {
let source = Source::from_url(url).unwrap();
// Copy it's id
let sid = source.id();
pipeline(vec![source], true).unwrap();
pipeline::run(vec![source], true).unwrap();
// Get the Podcast
let pd = dbqueries::get_podcast_from_source_id(sid).unwrap();

View File

@ -28,13 +28,13 @@ pub fn refresh_feed(headerbar: Arc<Header>, source: Option<Vec<Source>>, sender:
if let Some(s) = source {
// feed::index_loop(s);
// TODO: determine if it needs to ignore_etags.
if let Err(err) = pipeline::pipeline(s, true) {
if let Err(err) = pipeline::run(s, true) {
error!("Error While trying to update the database.");
error!("Error msg: {}", err);
}
} else {
let sources = dbqueries::get_sources().unwrap();
if let Err(err) = pipeline::pipeline(sources, false) {
if let Err(err) = pipeline::run(sources, false) {
error!("Error While trying to update the database.");
error!("Error msg: {}", err);
}
@ -83,7 +83,6 @@ mod tests {
use super::*;
use hammond_data::Source;
use hammond_data::dbqueries;
use hammond_data::pipeline::pipeline;
#[test]
// This test inserts an rss feed to your `XDG_DATA/hammond/hammond.db` so we make it explicit
@ -95,7 +94,7 @@ mod tests {
let source = Source::from_url(url).unwrap();
// Copy it's id
let sid = source.id();
pipeline(vec![source], true).unwrap();
pipeline::run(vec![source], true).unwrap();
// Get the Podcast
let pd = dbqueries::get_podcast_from_source_id(sid).unwrap();