diff --git a/Cargo.lock b/Cargo.lock index 60563a5..3f3e44d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1504,8 +1504,9 @@ dependencies = [ "rfc822_sanitizer 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "rss 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-executor 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-threadpool 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "url 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "xdg 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "xml-rs 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/podcasts-data/Cargo.toml b/podcasts-data/Cargo.toml index a856189..a96aa67 100644 --- a/podcasts-data/Cargo.toml +++ b/podcasts-data/Cargo.toml @@ -18,8 +18,9 @@ xdg = "2.1.0" xml-rs = "0.8.0" futures = "0.1.23" hyper = "0.11.27" -tokio = "0.1.8" tokio-core = "0.1.17" +tokio-threadpool = "0.1.6" +tokio-executor = "0.1" hyper-tls = "0.1.3" native-tls = "0.1.5" num_cpus = "1.8.0" diff --git a/podcasts-data/src/errors.rs b/podcasts-data/src/errors.rs index 64b20c9..5e6e2ec 100644 --- a/podcasts-data/src/errors.rs +++ b/podcasts-data/src/errors.rs @@ -4,6 +4,7 @@ use diesel_migrations::RunMigrationsError; use hyper; use native_tls; use rss; +use tokio_executor; use url; use xml; @@ -46,6 +47,8 @@ pub enum DataError { R2D2PoolError(#[cause] r2d2::PoolError), #[fail(display = "Hyper Error: {}", _0)] HyperError(#[cause] hyper::Error), + #[fail(display = "Tokio Spawn Error: {}", _0)] + SpawnError(#[cause] tokio_executor::SpawnError), #[fail(display = "Failed to parse a url: {}", _0)] UrlError(#[cause] url::ParseError), #[fail(display = "TLS Error: {}", _0)] @@ -101,6 +104,12 @@ impl From for DataError { } } +impl From for DataError { + fn from(err: tokio_executor::SpawnError) -> Self { + DataError::SpawnError(err) + } +} + impl From for DataError { fn from(err: url::ParseError) -> Self { DataError::UrlError(err) diff --git a/podcasts-data/src/lib.rs b/podcasts-data/src/lib.rs index 4fffa7c..3654627 100644 --- a/podcasts-data/src/lib.rs +++ b/podcasts-data/src/lib.rs @@ -82,8 +82,9 @@ extern crate num_cpus; extern crate rayon; extern crate rfc822_sanitizer; extern crate rss; -extern crate tokio; extern crate tokio_core; +extern crate tokio_executor; +extern crate tokio_threadpool; extern crate url; extern crate xdg; extern crate xml; diff --git a/podcasts-data/src/pipeline.rs b/podcasts-data/src/pipeline.rs index 0b4beba..89200fd 100644 --- a/podcasts-data/src/pipeline.rs +++ b/podcasts-data/src/pipeline.rs @@ -1,14 +1,15 @@ // FIXME: //! Docs. +use futures::future::*; use futures::prelude::*; -use futures::{future::ok, lazy, stream::iter_ok}; +use futures::stream::*; use hyper::client::HttpConnector; use hyper::Client; use hyper_tls::HttpsConnector; -use tokio::runtime::TaskExecutor; use tokio_core::reactor::Core; +use tokio_threadpool::{self, ThreadPool}; use num_cpus; @@ -26,7 +27,7 @@ type HttpsClient = Client>; pub fn pipeline<'a, S>( sources: S, client: HttpsClient, - executor: TaskExecutor, + pool: tokio_threadpool::Sender, ) -> impl Future, Error = DataError> + 'a where S: Stream + 'a, @@ -34,9 +35,9 @@ where sources .and_then(move |s| s.into_feed(client.clone())) .and_then(move |feed| { - let fut = lazy(|| feed.index().map_err(|err| error!("Error: {}", err))); - executor.spawn(fut); - Ok(()) + pool.spawn(lazy(|| { + feed.index().map_err(|err| error!("Error: {}", err)) + })).map_err(From::from) }) // the stream will stop at the first error so // we ensure that everything will succeded regardless. @@ -51,16 +52,20 @@ pub fn run(sources: S) -> Result<(), DataError> where S: IntoIterator, { + let pool = ThreadPool::new(); + let sender = pool.sender().clone(); let mut core = Core::new()?; - let executor = core.runtime().executor(); let handle = core.handle(); let client = Client::configure() .connector(HttpsConnector::new(num_cpus::get(), &handle)?) .build(&handle); let stream = iter_ok::<_, DataError>(sources); - let p = pipeline(stream, client, executor); - core.run(p).map(|_| ()) + let p = pipeline(stream, client, sender); + core.run(p)?; + + pool.shutdown_on_idle().wait().unwrap(); + Ok(()) } #[cfg(test)]