From e64883eecbb11a02e79c06834cbd283edb4b705f Mon Sep 17 00:00:00 2001 From: Jordan Petridis Date: Mon, 3 Sep 2018 15:21:25 +0300 Subject: [PATCH] Pipeline: reuse the prexisting runtime executor Instead of creating our own threadpool, we should reuse the executor of the tokio::runtime::Runtime that backs the tokio::reactor::Core. --- Cargo.lock | 1 + podcasts-data/Cargo.toml | 1 + podcasts-data/src/lib.rs | 1 + podcasts-data/src/pipeline.rs | 23 +++++++++-------------- 4 files changed, 12 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3f3e44d..baf5772 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1504,6 +1504,7 @@ dependencies = [ "rfc822_sanitizer 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "rss 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-executor 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-threadpool 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/podcasts-data/Cargo.toml b/podcasts-data/Cargo.toml index a96aa67..1ccb927 100644 --- a/podcasts-data/Cargo.toml +++ b/podcasts-data/Cargo.toml @@ -18,6 +18,7 @@ xdg = "2.1.0" xml-rs = "0.8.0" futures = "0.1.23" hyper = "0.11.27" +tokio = "0.1.8" tokio-core = "0.1.17" tokio-threadpool = "0.1.6" tokio-executor = "0.1" diff --git a/podcasts-data/src/lib.rs b/podcasts-data/src/lib.rs index 3654627..7605929 100644 --- a/podcasts-data/src/lib.rs +++ b/podcasts-data/src/lib.rs @@ -82,6 +82,7 @@ extern crate num_cpus; extern crate rayon; extern crate rfc822_sanitizer; extern crate rss; +extern crate tokio; extern crate tokio_core; extern crate tokio_executor; extern crate tokio_threadpool; diff --git a/podcasts-data/src/pipeline.rs b/podcasts-data/src/pipeline.rs index 89200fd..0b4beba 100644 --- a/podcasts-data/src/pipeline.rs +++ b/podcasts-data/src/pipeline.rs @@ -1,15 +1,14 @@ // FIXME: //! Docs. -use futures::future::*; use futures::prelude::*; -use futures::stream::*; +use futures::{future::ok, lazy, stream::iter_ok}; use hyper::client::HttpConnector; use hyper::Client; use hyper_tls::HttpsConnector; +use tokio::runtime::TaskExecutor; use tokio_core::reactor::Core; -use tokio_threadpool::{self, ThreadPool}; use num_cpus; @@ -27,7 +26,7 @@ type HttpsClient = Client>; pub fn pipeline<'a, S>( sources: S, client: HttpsClient, - pool: tokio_threadpool::Sender, + executor: TaskExecutor, ) -> impl Future, Error = DataError> + 'a where S: Stream + 'a, @@ -35,9 +34,9 @@ where sources .and_then(move |s| s.into_feed(client.clone())) .and_then(move |feed| { - pool.spawn(lazy(|| { - feed.index().map_err(|err| error!("Error: {}", err)) - })).map_err(From::from) + let fut = lazy(|| feed.index().map_err(|err| error!("Error: {}", err))); + executor.spawn(fut); + Ok(()) }) // the stream will stop at the first error so // we ensure that everything will succeded regardless. @@ -52,20 +51,16 @@ pub fn run(sources: S) -> Result<(), DataError> where S: IntoIterator, { - let pool = ThreadPool::new(); - let sender = pool.sender().clone(); let mut core = Core::new()?; + let executor = core.runtime().executor(); let handle = core.handle(); let client = Client::configure() .connector(HttpsConnector::new(num_cpus::get(), &handle)?) .build(&handle); let stream = iter_ok::<_, DataError>(sources); - let p = pipeline(stream, client, sender); - core.run(p)?; - - pool.shutdown_on_idle().wait().unwrap(); - Ok(()) + let p = pipeline(stream, client, executor); + core.run(p).map(|_| ()) } #[cfg(test)]