From 8095919bbe6790943da8e9aac584f68ab199f307 Mon Sep 17 00:00:00 2001 From: Jordan Petridis Date: Tue, 23 Jan 2018 10:12:32 +0200 Subject: [PATCH] This sort of works? --- Cargo.lock | 1 + hammond-data/Cargo.toml | 1 + hammond-data/benches/bench.rs | 2 +- hammond-data/src/feed.rs | 1 + hammond-data/src/lib.rs | 2 +- hammond-data/src/pipeline.rs | 23 ++++++++++++++++++++--- 6 files changed, 25 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a707898..f18ec05 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -618,6 +618,7 @@ dependencies = [ "dotenv 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)", "error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.11.14 (registry+https://github.com/rust-lang/crates.io-index)", "hyper-tls 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "itertools 0.7.6 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/hammond-data/Cargo.toml b/hammond-data/Cargo.toml index da46a51..617a887 100644 --- a/hammond-data/Cargo.toml +++ b/hammond-data/Cargo.toml @@ -24,6 +24,7 @@ hyper = "0.11.14" tokio-core = "0.1.12" hyper-tls = "0.1.2" native-tls = "0.1.5" +futures-cpupool = "0.1.8" [dependencies.diesel] features = ["sqlite", "r2d2"] diff --git a/hammond-data/benches/bench.rs b/hammond-data/benches/bench.rs index a01eaf0..19bcee1 100644 --- a/hammond-data/benches/bench.rs +++ b/hammond-data/benches/bench.rs @@ -59,7 +59,7 @@ static URLS: &[(&[u8], &str)] = &[ ), ]; -fn index_urls() -> Vec>> { +fn index_urls() -> Vec + Send>> { let feeds: Vec<_> = URLS.iter() .map(|&(buff, url)| { // Create and insert a Source into db diff --git a/hammond-data/src/feed.rs b/hammond-data/src/feed.rs index 5d16b20..8d16a26 100644 --- a/hammond-data/src/feed.rs +++ b/hammond-data/src/feed.rs @@ -53,6 +53,7 @@ impl Feed { }) .map(|(_, update)| { if !update.is_empty() { + info!("Updating {} episodes.", update.len()); // see get_stuff for more update .into_iter() diff --git a/hammond-data/src/lib.rs b/hammond-data/src/lib.rs index 97b4454..aeeda15 100644 --- a/hammond-data/src/lib.rs +++ b/hammond-data/src/lib.rs @@ -39,7 +39,7 @@ extern crate log; extern crate ammonia; extern crate chrono; extern crate futures; -// extern crate futures_cpupool; +extern crate futures_cpupool; extern crate hyper; extern crate hyper_tls; extern crate itertools; diff --git a/hammond-data/src/pipeline.rs b/hammond-data/src/pipeline.rs index d7b2993..694b9dd 100644 --- a/hammond-data/src/pipeline.rs +++ b/hammond-data/src/pipeline.rs @@ -2,7 +2,7 @@ //! Docs. use futures::future::*; -// use futures_cpupool::CpuPool; +use futures_cpupool::CpuPool; // use futures::prelude::*; use hyper::Client; @@ -20,6 +20,23 @@ use models::{IndexState, NewEpisode, NewEpisodeMinimal}; use std; // use std::sync::{Arc, Mutex}; +macro_rules! clone { + (@param _) => ( _ ); + (@param $x:ident) => ( $x ); + ($($n:ident),+ => move || $body:expr) => ( + { + $( let $n = $n.clone(); )+ + move || $body + } + ); + ($($n:ident),+ => move |$($p:tt),+| $body:expr) => ( + { + $( let $n = $n.clone(); )+ + move |$(clone!(@param $p),)+| $body + } + ); +} + /// The pipline to be run for indexing and updating a Podcast feed that originates from /// `Source.uri`. /// @@ -27,7 +44,7 @@ use std; /// Source -> GET Request -> Update Etags -> Check Status -> Parse xml/Rss -> /// Convert `rss::Channel` into Feed -> Index Podcast -> Index Episodes. pub fn pipeline>(sources: S, ignore_etags: bool) -> Result<()> { - // let _pool = CpuPool::new_num_cpus(); + let _pool = CpuPool::new_num_cpus(); let mut core = Core::new()?; let handle = core.handle(); @@ -39,7 +56,7 @@ pub fn pipeline>(sources: S, ignore_etags: bool) let list = sources .into_iter() .map(|s| s.into_feed(&client, ignore_etags)) - .map(|fut| fut.and_then(|feed| feed.index())) + .map(|fut| fut.and_then(clone!(_pool => move |feed| _pool.clone().spawn(feed.index())))) .collect(); let f = core.run(collect_futures(list))?;