diff --git a/Cargo.lock b/Cargo.lock index 54ecb37..edaa3e4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -656,7 +656,6 @@ dependencies = [ "failure 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "failure_derive 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", - "futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.11.24 (registry+https://github.com/rust-lang/crates.io-index)", "hyper-tls 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "itertools 0.7.8 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/hammond-data/Cargo.toml b/hammond-data/Cargo.toml index 51d87ed..a46116c 100644 --- a/hammond-data/Cargo.toml +++ b/hammond-data/Cargo.toml @@ -21,7 +21,6 @@ hyper = "0.11.24" tokio-core = "0.1.16" hyper-tls = "0.1.3" native-tls = "0.1.5" -futures-cpupool = "0.1.8" num_cpus = "1.8.0" failure = "0.1.1" failure_derive = "0.1.1" diff --git a/hammond-data/src/lib.rs b/hammond-data/src/lib.rs index 7d1d9e3..cae80a3 100644 --- a/hammond-data/src/lib.rs +++ b/hammond-data/src/lib.rs @@ -42,7 +42,6 @@ extern crate log; extern crate ammonia; extern crate chrono; extern crate futures; -extern crate futures_cpupool; extern crate hyper; extern crate hyper_tls; extern crate itertools; diff --git a/hammond-data/src/models/source.rs b/hammond-data/src/models/source.rs index fe6e87c..62b57f3 100644 --- a/hammond-data/src/models/source.rs +++ b/hammond-data/src/models/source.rs @@ -11,7 +11,6 @@ use hyper_tls::HttpsConnector; // use futures::future::ok; use futures::prelude::*; -use futures_cpupool::CpuPool; use database::connection; use errors::DataError; @@ -179,12 +178,11 @@ impl Source { pub fn into_feed( self, client: &Client>, - pool: CpuPool, ignore_etags: bool, ) -> Box> { let id = self.id(); let feed = self.request_constructor(client, ignore_etags) - .and_then(move |(_, res)| response_to_channel(res, pool)) + .and_then(move |(_, res)| response_to_channel(res)) .and_then(move |chan| { FeedBuilder::default() .channel(chan) @@ -237,10 +235,7 @@ impl Source { } #[allow(needless_pass_by_value)] -fn response_to_channel( - res: Response, - pool: CpuPool, -) -> Box + Send> { +fn response_to_channel(res: Response) -> Box + Send> { let chan = res.body() .concat2() .map(|x| x.into_iter()) @@ -249,8 +244,7 @@ fn response_to_channel( .map(|utf_8_bytes| String::from_utf8_lossy(&utf_8_bytes).into_owned()) .and_then(|buf| Channel::from_str(&buf).map_err(From::from)); - let cpu_chan = pool.spawn(chan); - Box::new(cpu_chan) + Box::new(chan) } #[cfg(test)] diff --git a/hammond-data/src/pipeline.rs b/hammond-data/src/pipeline.rs index 22dab65..c1015ca 100644 --- a/hammond-data/src/pipeline.rs +++ b/hammond-data/src/pipeline.rs @@ -2,7 +2,6 @@ //! Docs. use futures::future::*; -use futures_cpupool::CpuPool; // use futures::prelude::*; use hyper::Client; @@ -20,23 +19,6 @@ use models::{IndexState, NewEpisode, NewEpisodeMinimal}; // use std::sync::{Arc, Mutex}; -macro_rules! clone { - (@param _) => ( _ ); - (@param $x:ident) => ( $x ); - ($($n:ident),+ => move || $body:expr) => ( - { - $( let $n = $n.clone(); )+ - move || $body - } - ); - ($($n:ident),+ => move |$($p:tt),+| $body:expr) => ( - { - $( let $n = $n.clone(); )+ - move |$(clone!(@param $p),)+| $body - } - ); -} - /// The pipline to be run for indexing and updating a Podcast feed that originates from /// `Source.uri`. /// @@ -47,13 +29,12 @@ pub fn pipeline>( sources: S, ignore_etags: bool, tokio_core: &mut Core, - pool: &CpuPool, client: Client>, ) -> Result<(), DataError> { let list: Vec<_> = sources .into_iter() - .map(clone!(pool => move |s| s.into_feed(&client, pool.clone(), ignore_etags))) - .map(|fut| fut.and_then(clone!(pool => move |feed| pool.clone().spawn(feed.index())))) + .map(move |s| s.into_feed(&client, ignore_etags)) + .map(|fut| fut.and_then(|feed| feed.index())) .map(|fut| fut.map(|_| ()).map_err(|err| error!("Error: {}", err))) .collect(); @@ -67,26 +48,24 @@ pub fn pipeline>( Ok(()) } -/// Creates a tokio `reactor::Core`, a `CpuPool`, and a `hyper::Client` and +/// Creates a tokio `reactor::Core`, and a `hyper::Client` and /// runs the pipeline. pub fn run(sources: Vec, ignore_etags: bool) -> Result<(), DataError> { if sources.is_empty() { return Ok(()); } - let pool = CpuPool::new_num_cpus(); let mut core = Core::new()?; let handle = core.handle(); let client = Client::configure() .connector(HttpsConnector::new(num_cpus::get(), &handle)?) .build(&handle); - pipeline(sources, ignore_etags, &mut core, &pool, client) + pipeline(sources, ignore_etags, &mut core, client) } /// Docs pub fn index_single_source(s: Source, ignore_etags: bool) -> Result<(), DataError> { - let pool = CpuPool::new_num_cpus(); let mut core = Core::new()?; let handle = core.handle(); @@ -94,8 +73,8 @@ pub fn index_single_source(s: Source, ignore_etags: bool) -> Result<(), DataErro .connector(HttpsConnector::new(num_cpus::get(), &handle)?) .build(&handle); - let work = s.into_feed(&client, pool.clone(), ignore_etags) - .and_then(clone!(pool => move |feed| pool.clone().spawn(feed.index()))) + let work = s.into_feed(&client, ignore_etags) + .and_then(move |feed| feed.index()) .map(|_| ()); core.run(work)