diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index a707387..94aaef7 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -32,7 +32,6 @@ flatpak: --env=APP_ID="org.gnome.PodcastsDevel" \ --env=LOCALEDIR="./podcasts-gtk/po" \ --env=VERSION="0.0.0" \ - --env=RUSTFLAGS="--cfg rayon_unstable" \ --env=CARGO_HOME="target/cargo-home" \ --env=CARGO_TARGET_DIR="target_test/" \ app ${MANIFEST_PATH} \ diff --git a/CHANGELOG.md b/CHANGELOG.md index 3a2d431..e8bfdcb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Changed: ### Fixed: +- Fixed a regression where indexing feeds was blocking the `tokio reactor` #88 !70 ### Removed: @@ -199,4 +200,4 @@ not being able to access any file. - Added appdata.xml file ## [0.1.0] - 2017-11-13 -- Initial Release \ No newline at end of file +- Initial Release diff --git a/Cargo.lock b/Cargo.lock index 2fbe73b..3f3e44d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1501,11 +1501,12 @@ dependencies = [ "pretty_assertions 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)", "rayon 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", - "rayon-futures 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "rfc822_sanitizer 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "rss 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-executor 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-threadpool 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "url 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "xdg 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "xml-rs 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1692,15 +1693,6 @@ dependencies = [ "num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "rayon-futures" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)", - "rayon-core 1.4.1 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "redox_syscall" version = "0.1.40" @@ -2671,7 +2663,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum rand_core 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "edecf0f94da5551fc9b492093e30b041a891657db7940ee221f9d2f66e82eef2" "checksum rayon 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "df7a791f788cb4c516f0e091301a29c2b71ef680db5e644a7d68835c8ae6dbfa" "checksum rayon-core 1.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b055d1e92aba6877574d8fe604a63c8b5df60f60e5982bf7ccbb1338ea527356" -"checksum rayon-futures 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ea70dae42033c388536c7e9bdca084a14d75659c12e25fc105e7552464a87c4a" "checksum redox_syscall 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)" = "c214e91d3ecf43e9a4e41e578973adeb14b474f2bee858742d127af75a0112b1" "checksum redox_termios 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "7e891cfe48e9100a70a3b6eb652fef28920c117d366339687bd5576160db0f76" "checksum regex 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "9329abc99e39129fcceabd24cf5d85b4671ef7c29c50e972bc5afe32438ec384" diff --git a/podcasts-data/Cargo.toml b/podcasts-data/Cargo.toml index 6437f89..a96aa67 100644 --- a/podcasts-data/Cargo.toml +++ b/podcasts-data/Cargo.toml @@ -11,7 +11,6 @@ derive_builder = "0.5.1" lazy_static = "1.1.0" log = "0.4.4" rayon = "1.0.2" -rayon-futures = "0.1.0" rfc822_sanitizer = "0.3.3" rss = "1.5.0" url = "1.7.1" @@ -20,6 +19,8 @@ xml-rs = "0.8.0" futures = "0.1.23" hyper = "0.11.27" tokio-core = "0.1.17" +tokio-threadpool = "0.1.6" +tokio-executor = "0.1" hyper-tls = "0.1.3" native-tls = "0.1.5" num_cpus = "1.8.0" diff --git a/podcasts-data/src/errors.rs b/podcasts-data/src/errors.rs index 64b20c9..5e6e2ec 100644 --- a/podcasts-data/src/errors.rs +++ b/podcasts-data/src/errors.rs @@ -4,6 +4,7 @@ use diesel_migrations::RunMigrationsError; use hyper; use native_tls; use rss; +use tokio_executor; use url; use xml; @@ -46,6 +47,8 @@ pub enum DataError { R2D2PoolError(#[cause] r2d2::PoolError), #[fail(display = "Hyper Error: {}", _0)] HyperError(#[cause] hyper::Error), + #[fail(display = "Tokio Spawn Error: {}", _0)] + SpawnError(#[cause] tokio_executor::SpawnError), #[fail(display = "Failed to parse a url: {}", _0)] UrlError(#[cause] url::ParseError), #[fail(display = "TLS Error: {}", _0)] @@ -101,6 +104,12 @@ impl From for DataError { } } +impl From for DataError { + fn from(err: tokio_executor::SpawnError) -> Self { + DataError::SpawnError(err) + } +} + impl From for DataError { fn from(err: url::ParseError) -> Self { DataError::UrlError(err) diff --git a/podcasts-data/src/lib.rs b/podcasts-data/src/lib.rs index 45a7444..3654627 100644 --- a/podcasts-data/src/lib.rs +++ b/podcasts-data/src/lib.rs @@ -80,10 +80,11 @@ extern crate hyper_tls; extern crate native_tls; extern crate num_cpus; extern crate rayon; -extern crate rayon_futures; extern crate rfc822_sanitizer; extern crate rss; extern crate tokio_core; +extern crate tokio_executor; +extern crate tokio_threadpool; extern crate url; extern crate xdg; extern crate xml; diff --git a/podcasts-data/src/pipeline.rs b/podcasts-data/src/pipeline.rs index 555748c..0716c01 100644 --- a/podcasts-data/src/pipeline.rs +++ b/podcasts-data/src/pipeline.rs @@ -9,35 +9,13 @@ use hyper::client::HttpConnector; use hyper::Client; use hyper_tls::HttpsConnector; use tokio_core::reactor::Core; +use tokio_threadpool::{self, ThreadPool}; use num_cpus; -use rayon; -use rayon_futures::ScopeFutureExt; use errors::DataError; use Source; -// use std::sync::{Arc, Mutex}; - -// http://gtk-rs.org/tuto/closures -#[macro_export] -macro_rules! clone { - (@param _) => ( _ ); - (@param $x:ident) => ( $x ); - ($($n:ident),+ => move || $body:expr) => ( - { - $( let $n = $n.clone(); )+ - move || $body - } - ); - ($($n:ident),+ => move |$($p:tt),+| $body:expr) => ( - { - $( let $n = $n.clone(); )+ - move |$(clone!(@param $p),)+| $body - } - ); -} - type HttpsClient = Client>; /// The pipline to be run for indexing and updating a Podcast feed that originates from @@ -48,14 +26,18 @@ type HttpsClient = Client>; /// Convert `rss::Channel` into `Feed` -> Index Podcast -> Index Episodes. pub fn pipeline<'a, S>( sources: S, - client: &HttpsClient, + client: HttpsClient, + pool: tokio_threadpool::Sender, ) -> impl Future, Error = DataError> + 'a where S: Stream + 'a, { sources - .and_then(clone!(client => move |s| s.into_feed(client.clone()))) - .and_then(|feed| rayon::scope(|s| s.spawn_future(feed.index()))) + .and_then(move |s| s.into_feed(client.clone())) + .and_then(move |feed| { + let fut = lazy(|| feed.index().map_err(|err| error!("Error: {}", err))); + pool.spawn(fut).map_err(From::from) + }) // the stream will stop at the first error so // we ensure that everything will succeded regardless. .map_err(|err| error!("Error: {}", err)) @@ -69,6 +51,8 @@ pub fn run(sources: S) -> Result<(), DataError> where S: IntoIterator, { + let pool = ThreadPool::new(); + let sender = pool.sender().clone(); let mut core = Core::new()?; let handle = core.handle(); let client = Client::configure() @@ -76,8 +60,11 @@ where .build(&handle); let stream = iter_ok::<_, DataError>(sources); - let p = pipeline(stream, &client); - core.run(p).map(|_| ()) + let p = pipeline(stream, client, sender); + core.run(p)?; + + pool.shutdown_on_idle().wait().unwrap(); + Ok(()) } #[cfg(test)] diff --git a/scripts/cargo.sh b/scripts/cargo.sh index 14cebbc..023001c 100755 --- a/scripts/cargo.sh +++ b/scripts/cargo.sh @@ -1,7 +1,6 @@ #!/bin/sh export CARGO_HOME=$1/target/cargo-home -export RUSTFLAGS="--cfg rayon_unstable" export LOCALEDIR="$3" export APP_ID="$4" export VERSION="$5" diff --git a/scripts/test.sh b/scripts/test.sh index a258a2f..72326ee 100755 --- a/scripts/test.sh +++ b/scripts/test.sh @@ -26,7 +26,6 @@ xvfb-run -a -s "-screen 0 1024x768x24" \ --env=APP_ID="org.gnome.PodcastsDevel" \ --env=LOCALEDIR="./podcasts-gtk/po" \ --env=VERSION="0.0.0" \ - --env=RUSTFLAGS="--cfg rayon_unstable" \ --env=CARGO_HOME="target/cargo-home" \ --env=CARGO_TARGET_DIR="target_test/" \ app ${MANIFEST_PATH} \