From 14a90e7138a7f8764b511141e047d1316fa2d028 Mon Sep 17 00:00:00 2001 From: Jordan Petridis Date: Fri, 6 Apr 2018 18:18:03 +0300 Subject: [PATCH] Remove Futures_Cpupool. The performance boost is not good enough to justify the code complexity it add and the memory overhead of yeat another threadpool. We will start refactoring the whole pipeline implemantation and might transition to either rayon-futures or tokio-runtime. --- Cargo.lock | 1 - hammond-data/Cargo.toml | 1 - hammond-data/src/lib.rs | 1 - hammond-data/src/models/source.rs | 12 +++-------- hammond-data/src/pipeline.rs | 33 ++++++------------------------- 5 files changed, 9 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 54ecb37..edaa3e4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -656,7 +656,6 @@ dependencies = [ "failure 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "failure_derive 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", - "futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.11.24 (registry+https://github.com/rust-lang/crates.io-index)", "hyper-tls 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "itertools 0.7.8 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/hammond-data/Cargo.toml b/hammond-data/Cargo.toml index 51d87ed..a46116c 100644 --- a/hammond-data/Cargo.toml +++ b/hammond-data/Cargo.toml @@ -21,7 +21,6 @@ hyper = "0.11.24" tokio-core = "0.1.16" hyper-tls = "0.1.3" native-tls = "0.1.5" -futures-cpupool = "0.1.8" num_cpus = "1.8.0" failure = "0.1.1" failure_derive = "0.1.1" diff --git a/hammond-data/src/lib.rs b/hammond-data/src/lib.rs index 7d1d9e3..cae80a3 100644 --- a/hammond-data/src/lib.rs +++ b/hammond-data/src/lib.rs @@ -42,7 +42,6 @@ extern crate log; extern crate ammonia; extern crate chrono; extern crate futures; -extern crate futures_cpupool; extern crate hyper; extern crate hyper_tls; extern crate itertools; diff --git a/hammond-data/src/models/source.rs b/hammond-data/src/models/source.rs index fe6e87c..62b57f3 100644 --- a/hammond-data/src/models/source.rs +++ b/hammond-data/src/models/source.rs @@ -11,7 +11,6 @@ use hyper_tls::HttpsConnector; // use futures::future::ok; use futures::prelude::*; -use futures_cpupool::CpuPool; use database::connection; use errors::DataError; @@ -179,12 +178,11 @@ impl Source { pub fn into_feed( self, client: &Client>, - pool: CpuPool, ignore_etags: bool, ) -> Box> { let id = self.id(); let feed = self.request_constructor(client, ignore_etags) - .and_then(move |(_, res)| response_to_channel(res, pool)) + .and_then(move |(_, res)| response_to_channel(res)) .and_then(move |chan| { FeedBuilder::default() .channel(chan) @@ -237,10 +235,7 @@ impl Source { } #[allow(needless_pass_by_value)] -fn response_to_channel( - res: Response, - pool: CpuPool, -) -> Box + Send> { +fn response_to_channel(res: Response) -> Box + Send> { let chan = res.body() .concat2() .map(|x| x.into_iter()) @@ -249,8 +244,7 @@ fn response_to_channel( .map(|utf_8_bytes| String::from_utf8_lossy(&utf_8_bytes).into_owned()) .and_then(|buf| Channel::from_str(&buf).map_err(From::from)); - let cpu_chan = pool.spawn(chan); - Box::new(cpu_chan) + Box::new(chan) } #[cfg(test)] diff --git a/hammond-data/src/pipeline.rs b/hammond-data/src/pipeline.rs index 22dab65..c1015ca 100644 --- a/hammond-data/src/pipeline.rs +++ b/hammond-data/src/pipeline.rs @@ -2,7 +2,6 @@ //! Docs. use futures::future::*; -use futures_cpupool::CpuPool; // use futures::prelude::*; use hyper::Client; @@ -20,23 +19,6 @@ use models::{IndexState, NewEpisode, NewEpisodeMinimal}; // use std::sync::{Arc, Mutex}; -macro_rules! clone { - (@param _) => ( _ ); - (@param $x:ident) => ( $x ); - ($($n:ident),+ => move || $body:expr) => ( - { - $( let $n = $n.clone(); )+ - move || $body - } - ); - ($($n:ident),+ => move |$($p:tt),+| $body:expr) => ( - { - $( let $n = $n.clone(); )+ - move |$(clone!(@param $p),)+| $body - } - ); -} - /// The pipline to be run for indexing and updating a Podcast feed that originates from /// `Source.uri`. /// @@ -47,13 +29,12 @@ pub fn pipeline>( sources: S, ignore_etags: bool, tokio_core: &mut Core, - pool: &CpuPool, client: Client>, ) -> Result<(), DataError> { let list: Vec<_> = sources .into_iter() - .map(clone!(pool => move |s| s.into_feed(&client, pool.clone(), ignore_etags))) - .map(|fut| fut.and_then(clone!(pool => move |feed| pool.clone().spawn(feed.index())))) + .map(move |s| s.into_feed(&client, ignore_etags)) + .map(|fut| fut.and_then(|feed| feed.index())) .map(|fut| fut.map(|_| ()).map_err(|err| error!("Error: {}", err))) .collect(); @@ -67,26 +48,24 @@ pub fn pipeline>( Ok(()) } -/// Creates a tokio `reactor::Core`, a `CpuPool`, and a `hyper::Client` and +/// Creates a tokio `reactor::Core`, and a `hyper::Client` and /// runs the pipeline. pub fn run(sources: Vec, ignore_etags: bool) -> Result<(), DataError> { if sources.is_empty() { return Ok(()); } - let pool = CpuPool::new_num_cpus(); let mut core = Core::new()?; let handle = core.handle(); let client = Client::configure() .connector(HttpsConnector::new(num_cpus::get(), &handle)?) .build(&handle); - pipeline(sources, ignore_etags, &mut core, &pool, client) + pipeline(sources, ignore_etags, &mut core, client) } /// Docs pub fn index_single_source(s: Source, ignore_etags: bool) -> Result<(), DataError> { - let pool = CpuPool::new_num_cpus(); let mut core = Core::new()?; let handle = core.handle(); @@ -94,8 +73,8 @@ pub fn index_single_source(s: Source, ignore_etags: bool) -> Result<(), DataErro .connector(HttpsConnector::new(num_cpus::get(), &handle)?) .build(&handle); - let work = s.into_feed(&client, pool.clone(), ignore_etags) - .and_then(clone!(pool => move |feed| pool.clone().spawn(feed.index()))) + let work = s.into_feed(&client, ignore_etags) + .and_then(move |feed| feed.index()) .map(|_| ()); core.run(work)