From 8fe1b70000cc7e80399b8e70733cfea84905b1db Mon Sep 17 00:00:00 2001 From: Jordan Petridis Date: Sun, 2 Sep 2018 21:20:42 +0300 Subject: [PATCH 1/8] Pipeline: Remove dependancy on rayon_futures This requires a RUSTFLAG to be set before hand for rayon to build. This brakes a lot of tools like rls and clippy by default and require special configs for itnegration. Additionally, rayon_futures is still 0.1 and not much work seem to have gone into it. Ideally it should be replased with the tokio runtime/threadpool. --- .gitlab-ci.yml | 1 - Cargo.lock | 11 ----------- podcasts-data/Cargo.toml | 1 - podcasts-data/src/lib.rs | 1 - podcasts-data/src/pipeline.rs | 4 +--- scripts/cargo.sh | 1 - scripts/test.sh | 1 - 7 files changed, 1 insertion(+), 19 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index a707387..94aaef7 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -32,7 +32,6 @@ flatpak: --env=APP_ID="org.gnome.PodcastsDevel" \ --env=LOCALEDIR="./podcasts-gtk/po" \ --env=VERSION="0.0.0" \ - --env=RUSTFLAGS="--cfg rayon_unstable" \ --env=CARGO_HOME="target/cargo-home" \ --env=CARGO_TARGET_DIR="target_test/" \ app ${MANIFEST_PATH} \ diff --git a/Cargo.lock b/Cargo.lock index 2fbe73b..0c3b1e1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1501,7 +1501,6 @@ dependencies = [ "pretty_assertions 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)", "rayon 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", - "rayon-futures 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "rfc822_sanitizer 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "rss 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1692,15 +1691,6 @@ dependencies = [ "num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "rayon-futures" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)", - "rayon-core 1.4.1 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "redox_syscall" version = "0.1.40" @@ -2671,7 +2661,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum rand_core 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "edecf0f94da5551fc9b492093e30b041a891657db7940ee221f9d2f66e82eef2" "checksum rayon 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "df7a791f788cb4c516f0e091301a29c2b71ef680db5e644a7d68835c8ae6dbfa" "checksum rayon-core 1.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b055d1e92aba6877574d8fe604a63c8b5df60f60e5982bf7ccbb1338ea527356" -"checksum rayon-futures 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ea70dae42033c388536c7e9bdca084a14d75659c12e25fc105e7552464a87c4a" "checksum redox_syscall 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)" = "c214e91d3ecf43e9a4e41e578973adeb14b474f2bee858742d127af75a0112b1" "checksum redox_termios 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "7e891cfe48e9100a70a3b6eb652fef28920c117d366339687bd5576160db0f76" "checksum regex 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "9329abc99e39129fcceabd24cf5d85b4671ef7c29c50e972bc5afe32438ec384" diff --git a/podcasts-data/Cargo.toml b/podcasts-data/Cargo.toml index 6437f89..845df5c 100644 --- a/podcasts-data/Cargo.toml +++ b/podcasts-data/Cargo.toml @@ -11,7 +11,6 @@ derive_builder = "0.5.1" lazy_static = "1.1.0" log = "0.4.4" rayon = "1.0.2" -rayon-futures = "0.1.0" rfc822_sanitizer = "0.3.3" rss = "1.5.0" url = "1.7.1" diff --git a/podcasts-data/src/lib.rs b/podcasts-data/src/lib.rs index 45a7444..b0f4a48 100644 --- a/podcasts-data/src/lib.rs +++ b/podcasts-data/src/lib.rs @@ -80,7 +80,6 @@ extern crate hyper_tls; extern crate native_tls; extern crate num_cpus; extern crate rayon; -extern crate rayon_futures; extern crate rfc822_sanitizer; extern crate rss; extern crate tokio_core; diff --git a/podcasts-data/src/pipeline.rs b/podcasts-data/src/pipeline.rs index 555748c..98e224e 100644 --- a/podcasts-data/src/pipeline.rs +++ b/podcasts-data/src/pipeline.rs @@ -11,8 +11,6 @@ use hyper_tls::HttpsConnector; use tokio_core::reactor::Core; use num_cpus; -use rayon; -use rayon_futures::ScopeFutureExt; use errors::DataError; use Source; @@ -55,7 +53,7 @@ where { sources .and_then(clone!(client => move |s| s.into_feed(client.clone()))) - .and_then(|feed| rayon::scope(|s| s.spawn_future(feed.index()))) + .and_then(|feed| feed.index()) // the stream will stop at the first error so // we ensure that everything will succeded regardless. .map_err(|err| error!("Error: {}", err)) diff --git a/scripts/cargo.sh b/scripts/cargo.sh index 14cebbc..023001c 100755 --- a/scripts/cargo.sh +++ b/scripts/cargo.sh @@ -1,7 +1,6 @@ #!/bin/sh export CARGO_HOME=$1/target/cargo-home -export RUSTFLAGS="--cfg rayon_unstable" export LOCALEDIR="$3" export APP_ID="$4" export VERSION="$5" diff --git a/scripts/test.sh b/scripts/test.sh index a258a2f..72326ee 100755 --- a/scripts/test.sh +++ b/scripts/test.sh @@ -26,7 +26,6 @@ xvfb-run -a -s "-screen 0 1024x768x24" \ --env=APP_ID="org.gnome.PodcastsDevel" \ --env=LOCALEDIR="./podcasts-gtk/po" \ --env=VERSION="0.0.0" \ - --env=RUSTFLAGS="--cfg rayon_unstable" \ --env=CARGO_HOME="target/cargo-home" \ --env=CARGO_TARGET_DIR="target_test/" \ app ${MANIFEST_PATH} \ From f3760271801756754a1bcba78e3067d39afc10d5 Mon Sep 17 00:00:00 2001 From: Jordan Petridis Date: Sun, 2 Sep 2018 23:11:03 +0300 Subject: [PATCH 2/8] Pipeline: Use tokio threadpool to index feeds --- Cargo.lock | 2 ++ podcasts-data/Cargo.toml | 2 ++ podcasts-data/src/errors.rs | 9 +++++++++ podcasts-data/src/lib.rs | 2 ++ podcasts-data/src/pipeline.rs | 19 ++++++++++++++++--- 5 files changed, 31 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0c3b1e1..3f3e44d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1505,6 +1505,8 @@ dependencies = [ "rss 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-executor 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-threadpool 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "url 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "xdg 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "xml-rs 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/podcasts-data/Cargo.toml b/podcasts-data/Cargo.toml index 845df5c..a96aa67 100644 --- a/podcasts-data/Cargo.toml +++ b/podcasts-data/Cargo.toml @@ -19,6 +19,8 @@ xml-rs = "0.8.0" futures = "0.1.23" hyper = "0.11.27" tokio-core = "0.1.17" +tokio-threadpool = "0.1.6" +tokio-executor = "0.1" hyper-tls = "0.1.3" native-tls = "0.1.5" num_cpus = "1.8.0" diff --git a/podcasts-data/src/errors.rs b/podcasts-data/src/errors.rs index 64b20c9..5e6e2ec 100644 --- a/podcasts-data/src/errors.rs +++ b/podcasts-data/src/errors.rs @@ -4,6 +4,7 @@ use diesel_migrations::RunMigrationsError; use hyper; use native_tls; use rss; +use tokio_executor; use url; use xml; @@ -46,6 +47,8 @@ pub enum DataError { R2D2PoolError(#[cause] r2d2::PoolError), #[fail(display = "Hyper Error: {}", _0)] HyperError(#[cause] hyper::Error), + #[fail(display = "Tokio Spawn Error: {}", _0)] + SpawnError(#[cause] tokio_executor::SpawnError), #[fail(display = "Failed to parse a url: {}", _0)] UrlError(#[cause] url::ParseError), #[fail(display = "TLS Error: {}", _0)] @@ -101,6 +104,12 @@ impl From for DataError { } } +impl From for DataError { + fn from(err: tokio_executor::SpawnError) -> Self { + DataError::SpawnError(err) + } +} + impl From for DataError { fn from(err: url::ParseError) -> Self { DataError::UrlError(err) diff --git a/podcasts-data/src/lib.rs b/podcasts-data/src/lib.rs index b0f4a48..3654627 100644 --- a/podcasts-data/src/lib.rs +++ b/podcasts-data/src/lib.rs @@ -83,6 +83,8 @@ extern crate rayon; extern crate rfc822_sanitizer; extern crate rss; extern crate tokio_core; +extern crate tokio_executor; +extern crate tokio_threadpool; extern crate url; extern crate xdg; extern crate xml; diff --git a/podcasts-data/src/pipeline.rs b/podcasts-data/src/pipeline.rs index 98e224e..34ce165 100644 --- a/podcasts-data/src/pipeline.rs +++ b/podcasts-data/src/pipeline.rs @@ -9,6 +9,7 @@ use hyper::client::HttpConnector; use hyper::Client; use hyper_tls::HttpsConnector; use tokio_core::reactor::Core; +use tokio_threadpool::{self, ThreadPool}; use num_cpus; @@ -47,13 +48,18 @@ type HttpsClient = Client>; pub fn pipeline<'a, S>( sources: S, client: &HttpsClient, + pool: &tokio_threadpool::Sender, ) -> impl Future, Error = DataError> + 'a where S: Stream + 'a, { sources .and_then(clone!(client => move |s| s.into_feed(client.clone()))) - .and_then(|feed| feed.index()) + .and_then(clone!(pool => move |feed| { + pool.spawn(lazy(|| { + feed.index().map_err(|err| error!("Error: {}", err)) + })).map_err(From::from) + })) // the stream will stop at the first error so // we ensure that everything will succeded regardless. .map_err(|err| error!("Error: {}", err)) @@ -67,6 +73,7 @@ pub fn run(sources: S) -> Result<(), DataError> where S: IntoIterator, { + let pool = ThreadPool::new(); let mut core = Core::new()?; let handle = core.handle(); let client = Client::configure() @@ -74,8 +81,14 @@ where .build(&handle); let stream = iter_ok::<_, DataError>(sources); - let p = pipeline(stream, &client); - core.run(p).map(|_| ()) + let p = { + let sender = pool.sender(); + pipeline(stream, &client, &sender) + }; + core.run(p)?; + + pool.shutdown_on_idle().wait().unwrap(); + Ok(()) } #[cfg(test)] From 68136941faefedf0ad24d55b0ee55360a6939cb9 Mon Sep 17 00:00:00 2001 From: Jordan Petridis Date: Sun, 2 Sep 2018 23:43:41 +0300 Subject: [PATCH 3/8] Pipeline: Remove use of clone! macro --- podcasts-data/src/pipeline.rs | 37 +++++++---------------------------- 1 file changed, 7 insertions(+), 30 deletions(-) diff --git a/podcasts-data/src/pipeline.rs b/podcasts-data/src/pipeline.rs index 34ce165..89200fd 100644 --- a/podcasts-data/src/pipeline.rs +++ b/podcasts-data/src/pipeline.rs @@ -16,27 +16,6 @@ use num_cpus; use errors::DataError; use Source; -// use std::sync::{Arc, Mutex}; - -// http://gtk-rs.org/tuto/closures -#[macro_export] -macro_rules! clone { - (@param _) => ( _ ); - (@param $x:ident) => ( $x ); - ($($n:ident),+ => move || $body:expr) => ( - { - $( let $n = $n.clone(); )+ - move || $body - } - ); - ($($n:ident),+ => move |$($p:tt),+| $body:expr) => ( - { - $( let $n = $n.clone(); )+ - move |$(clone!(@param $p),)+| $body - } - ); -} - type HttpsClient = Client>; /// The pipline to be run for indexing and updating a Podcast feed that originates from @@ -47,19 +26,19 @@ type HttpsClient = Client>; /// Convert `rss::Channel` into `Feed` -> Index Podcast -> Index Episodes. pub fn pipeline<'a, S>( sources: S, - client: &HttpsClient, - pool: &tokio_threadpool::Sender, + client: HttpsClient, + pool: tokio_threadpool::Sender, ) -> impl Future, Error = DataError> + 'a where S: Stream + 'a, { sources - .and_then(clone!(client => move |s| s.into_feed(client.clone()))) - .and_then(clone!(pool => move |feed| { + .and_then(move |s| s.into_feed(client.clone())) + .and_then(move |feed| { pool.spawn(lazy(|| { feed.index().map_err(|err| error!("Error: {}", err)) })).map_err(From::from) - })) + }) // the stream will stop at the first error so // we ensure that everything will succeded regardless. .map_err(|err| error!("Error: {}", err)) @@ -74,6 +53,7 @@ where S: IntoIterator, { let pool = ThreadPool::new(); + let sender = pool.sender().clone(); let mut core = Core::new()?; let handle = core.handle(); let client = Client::configure() @@ -81,10 +61,7 @@ where .build(&handle); let stream = iter_ok::<_, DataError>(sources); - let p = { - let sender = pool.sender(); - pipeline(stream, &client, &sender) - }; + let p = pipeline(stream, client, sender); core.run(p)?; pool.shutdown_on_idle().wait().unwrap(); From e64883eecbb11a02e79c06834cbd283edb4b705f Mon Sep 17 00:00:00 2001 From: Jordan Petridis Date: Mon, 3 Sep 2018 15:21:25 +0300 Subject: [PATCH 4/8] Pipeline: reuse the prexisting runtime executor Instead of creating our own threadpool, we should reuse the executor of the tokio::runtime::Runtime that backs the tokio::reactor::Core. --- Cargo.lock | 1 + podcasts-data/Cargo.toml | 1 + podcasts-data/src/lib.rs | 1 + podcasts-data/src/pipeline.rs | 23 +++++++++-------------- 4 files changed, 12 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3f3e44d..baf5772 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1504,6 +1504,7 @@ dependencies = [ "rfc822_sanitizer 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "rss 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-executor 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-threadpool 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/podcasts-data/Cargo.toml b/podcasts-data/Cargo.toml index a96aa67..1ccb927 100644 --- a/podcasts-data/Cargo.toml +++ b/podcasts-data/Cargo.toml @@ -18,6 +18,7 @@ xdg = "2.1.0" xml-rs = "0.8.0" futures = "0.1.23" hyper = "0.11.27" +tokio = "0.1.8" tokio-core = "0.1.17" tokio-threadpool = "0.1.6" tokio-executor = "0.1" diff --git a/podcasts-data/src/lib.rs b/podcasts-data/src/lib.rs index 3654627..7605929 100644 --- a/podcasts-data/src/lib.rs +++ b/podcasts-data/src/lib.rs @@ -82,6 +82,7 @@ extern crate num_cpus; extern crate rayon; extern crate rfc822_sanitizer; extern crate rss; +extern crate tokio; extern crate tokio_core; extern crate tokio_executor; extern crate tokio_threadpool; diff --git a/podcasts-data/src/pipeline.rs b/podcasts-data/src/pipeline.rs index 89200fd..0b4beba 100644 --- a/podcasts-data/src/pipeline.rs +++ b/podcasts-data/src/pipeline.rs @@ -1,15 +1,14 @@ // FIXME: //! Docs. -use futures::future::*; use futures::prelude::*; -use futures::stream::*; +use futures::{future::ok, lazy, stream::iter_ok}; use hyper::client::HttpConnector; use hyper::Client; use hyper_tls::HttpsConnector; +use tokio::runtime::TaskExecutor; use tokio_core::reactor::Core; -use tokio_threadpool::{self, ThreadPool}; use num_cpus; @@ -27,7 +26,7 @@ type HttpsClient = Client>; pub fn pipeline<'a, S>( sources: S, client: HttpsClient, - pool: tokio_threadpool::Sender, + executor: TaskExecutor, ) -> impl Future, Error = DataError> + 'a where S: Stream + 'a, @@ -35,9 +34,9 @@ where sources .and_then(move |s| s.into_feed(client.clone())) .and_then(move |feed| { - pool.spawn(lazy(|| { - feed.index().map_err(|err| error!("Error: {}", err)) - })).map_err(From::from) + let fut = lazy(|| feed.index().map_err(|err| error!("Error: {}", err))); + executor.spawn(fut); + Ok(()) }) // the stream will stop at the first error so // we ensure that everything will succeded regardless. @@ -52,20 +51,16 @@ pub fn run(sources: S) -> Result<(), DataError> where S: IntoIterator, { - let pool = ThreadPool::new(); - let sender = pool.sender().clone(); let mut core = Core::new()?; + let executor = core.runtime().executor(); let handle = core.handle(); let client = Client::configure() .connector(HttpsConnector::new(num_cpus::get(), &handle)?) .build(&handle); let stream = iter_ok::<_, DataError>(sources); - let p = pipeline(stream, client, sender); - core.run(p)?; - - pool.shutdown_on_idle().wait().unwrap(); - Ok(()) + let p = pipeline(stream, client, executor); + core.run(p).map(|_| ()) } #[cfg(test)] From 40dd2d69230ad1ccdb26a222dd8c1dc2ef6a1c08 Mon Sep 17 00:00:00 2001 From: Jordan Petridis Date: Mon, 3 Sep 2018 15:39:02 +0300 Subject: [PATCH 5/8] DataError: Remove unused error variant This was added due to Threadpool::spawn returning errors, but its no longer used. --- Cargo.lock | 2 -- podcasts-data/Cargo.toml | 2 -- podcasts-data/src/errors.rs | 9 --------- podcasts-data/src/lib.rs | 2 -- 4 files changed, 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index baf5772..60563a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1506,8 +1506,6 @@ dependencies = [ "tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-executor 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-threadpool 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "url 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "xdg 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "xml-rs 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/podcasts-data/Cargo.toml b/podcasts-data/Cargo.toml index 1ccb927..a856189 100644 --- a/podcasts-data/Cargo.toml +++ b/podcasts-data/Cargo.toml @@ -20,8 +20,6 @@ futures = "0.1.23" hyper = "0.11.27" tokio = "0.1.8" tokio-core = "0.1.17" -tokio-threadpool = "0.1.6" -tokio-executor = "0.1" hyper-tls = "0.1.3" native-tls = "0.1.5" num_cpus = "1.8.0" diff --git a/podcasts-data/src/errors.rs b/podcasts-data/src/errors.rs index 5e6e2ec..64b20c9 100644 --- a/podcasts-data/src/errors.rs +++ b/podcasts-data/src/errors.rs @@ -4,7 +4,6 @@ use diesel_migrations::RunMigrationsError; use hyper; use native_tls; use rss; -use tokio_executor; use url; use xml; @@ -47,8 +46,6 @@ pub enum DataError { R2D2PoolError(#[cause] r2d2::PoolError), #[fail(display = "Hyper Error: {}", _0)] HyperError(#[cause] hyper::Error), - #[fail(display = "Tokio Spawn Error: {}", _0)] - SpawnError(#[cause] tokio_executor::SpawnError), #[fail(display = "Failed to parse a url: {}", _0)] UrlError(#[cause] url::ParseError), #[fail(display = "TLS Error: {}", _0)] @@ -104,12 +101,6 @@ impl From for DataError { } } -impl From for DataError { - fn from(err: tokio_executor::SpawnError) -> Self { - DataError::SpawnError(err) - } -} - impl From for DataError { fn from(err: url::ParseError) -> Self { DataError::UrlError(err) diff --git a/podcasts-data/src/lib.rs b/podcasts-data/src/lib.rs index 7605929..4fffa7c 100644 --- a/podcasts-data/src/lib.rs +++ b/podcasts-data/src/lib.rs @@ -84,8 +84,6 @@ extern crate rfc822_sanitizer; extern crate rss; extern crate tokio; extern crate tokio_core; -extern crate tokio_executor; -extern crate tokio_threadpool; extern crate url; extern crate xdg; extern crate xml; From 3c52131ab58e55e15accc875673cc99b5acebafd Mon Sep 17 00:00:00 2001 From: Jordan Petridis Date: Mon, 3 Sep 2018 15:51:33 +0300 Subject: [PATCH 6/8] Update Changelog --- CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3a2d431..e8bfdcb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Changed: ### Fixed: +- Fixed a regression where indexing feeds was blocking the `tokio reactor` #88 !70 ### Removed: @@ -199,4 +200,4 @@ not being able to access any file. - Added appdata.xml file ## [0.1.0] - 2017-11-13 -- Initial Release \ No newline at end of file +- Initial Release From 3019fcf63045be02e4f8270fae48f95b81008dde Mon Sep 17 00:00:00 2001 From: Jordan Petridis Date: Mon, 3 Sep 2018 19:49:51 +0300 Subject: [PATCH 7/8] Pipeline: Use a custom tokio threadpool This reverts commit e64883eecbb11a02e79c06834cbd283edb4b705f and 40dd2d69230ad1ccdb26a222dd8c1dc2ef6a1c08 Seems like core.run() returns once its done even if there are still tasks in the Runtime underneath. A way to solve that would be to call the shutdown_on_idle method. We need ownership of the threadpool in order to invoke `shutdown_on_idle` method but core.runtime only returns a referrence so we need to create our own threadpool. --- Cargo.lock | 3 ++- podcasts-data/Cargo.toml | 3 ++- podcasts-data/src/errors.rs | 9 +++++++++ podcasts-data/src/lib.rs | 3 ++- podcasts-data/src/pipeline.rs | 23 ++++++++++++++--------- 5 files changed, 29 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 60563a5..3f3e44d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1504,8 +1504,9 @@ dependencies = [ "rfc822_sanitizer 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "rss 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-executor 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-threadpool 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "url 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "xdg 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "xml-rs 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/podcasts-data/Cargo.toml b/podcasts-data/Cargo.toml index a856189..a96aa67 100644 --- a/podcasts-data/Cargo.toml +++ b/podcasts-data/Cargo.toml @@ -18,8 +18,9 @@ xdg = "2.1.0" xml-rs = "0.8.0" futures = "0.1.23" hyper = "0.11.27" -tokio = "0.1.8" tokio-core = "0.1.17" +tokio-threadpool = "0.1.6" +tokio-executor = "0.1" hyper-tls = "0.1.3" native-tls = "0.1.5" num_cpus = "1.8.0" diff --git a/podcasts-data/src/errors.rs b/podcasts-data/src/errors.rs index 64b20c9..5e6e2ec 100644 --- a/podcasts-data/src/errors.rs +++ b/podcasts-data/src/errors.rs @@ -4,6 +4,7 @@ use diesel_migrations::RunMigrationsError; use hyper; use native_tls; use rss; +use tokio_executor; use url; use xml; @@ -46,6 +47,8 @@ pub enum DataError { R2D2PoolError(#[cause] r2d2::PoolError), #[fail(display = "Hyper Error: {}", _0)] HyperError(#[cause] hyper::Error), + #[fail(display = "Tokio Spawn Error: {}", _0)] + SpawnError(#[cause] tokio_executor::SpawnError), #[fail(display = "Failed to parse a url: {}", _0)] UrlError(#[cause] url::ParseError), #[fail(display = "TLS Error: {}", _0)] @@ -101,6 +104,12 @@ impl From for DataError { } } +impl From for DataError { + fn from(err: tokio_executor::SpawnError) -> Self { + DataError::SpawnError(err) + } +} + impl From for DataError { fn from(err: url::ParseError) -> Self { DataError::UrlError(err) diff --git a/podcasts-data/src/lib.rs b/podcasts-data/src/lib.rs index 4fffa7c..3654627 100644 --- a/podcasts-data/src/lib.rs +++ b/podcasts-data/src/lib.rs @@ -82,8 +82,9 @@ extern crate num_cpus; extern crate rayon; extern crate rfc822_sanitizer; extern crate rss; -extern crate tokio; extern crate tokio_core; +extern crate tokio_executor; +extern crate tokio_threadpool; extern crate url; extern crate xdg; extern crate xml; diff --git a/podcasts-data/src/pipeline.rs b/podcasts-data/src/pipeline.rs index 0b4beba..89200fd 100644 --- a/podcasts-data/src/pipeline.rs +++ b/podcasts-data/src/pipeline.rs @@ -1,14 +1,15 @@ // FIXME: //! Docs. +use futures::future::*; use futures::prelude::*; -use futures::{future::ok, lazy, stream::iter_ok}; +use futures::stream::*; use hyper::client::HttpConnector; use hyper::Client; use hyper_tls::HttpsConnector; -use tokio::runtime::TaskExecutor; use tokio_core::reactor::Core; +use tokio_threadpool::{self, ThreadPool}; use num_cpus; @@ -26,7 +27,7 @@ type HttpsClient = Client>; pub fn pipeline<'a, S>( sources: S, client: HttpsClient, - executor: TaskExecutor, + pool: tokio_threadpool::Sender, ) -> impl Future, Error = DataError> + 'a where S: Stream + 'a, @@ -34,9 +35,9 @@ where sources .and_then(move |s| s.into_feed(client.clone())) .and_then(move |feed| { - let fut = lazy(|| feed.index().map_err(|err| error!("Error: {}", err))); - executor.spawn(fut); - Ok(()) + pool.spawn(lazy(|| { + feed.index().map_err(|err| error!("Error: {}", err)) + })).map_err(From::from) }) // the stream will stop at the first error so // we ensure that everything will succeded regardless. @@ -51,16 +52,20 @@ pub fn run(sources: S) -> Result<(), DataError> where S: IntoIterator, { + let pool = ThreadPool::new(); + let sender = pool.sender().clone(); let mut core = Core::new()?; - let executor = core.runtime().executor(); let handle = core.handle(); let client = Client::configure() .connector(HttpsConnector::new(num_cpus::get(), &handle)?) .build(&handle); let stream = iter_ok::<_, DataError>(sources); - let p = pipeline(stream, client, executor); - core.run(p).map(|_| ()) + let p = pipeline(stream, client, sender); + core.run(p)?; + + pool.shutdown_on_idle().wait().unwrap(); + Ok(()) } #[cfg(test)] From 59ad90b98931e3475a1083e71094c2525caa01d7 Mon Sep 17 00:00:00 2001 From: Jordan Petridis Date: Mon, 3 Sep 2018 20:30:41 +0300 Subject: [PATCH 8/8] Pipeline: Minor formatting improvment --- podcasts-data/src/pipeline.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/podcasts-data/src/pipeline.rs b/podcasts-data/src/pipeline.rs index 89200fd..0716c01 100644 --- a/podcasts-data/src/pipeline.rs +++ b/podcasts-data/src/pipeline.rs @@ -35,9 +35,8 @@ where sources .and_then(move |s| s.into_feed(client.clone())) .and_then(move |feed| { - pool.spawn(lazy(|| { - feed.index().map_err(|err| error!("Error: {}", err)) - })).map_err(From::from) + let fut = lazy(|| feed.index().map_err(|err| error!("Error: {}", err))); + pool.spawn(fut).map_err(From::from) }) // the stream will stop at the first error so // we ensure that everything will succeded regardless.