From 0887789f5e653dd92ad397fb39561df6dffcb45c Mon Sep 17 00:00:00 2001 From: Jordan Petridis Date: Sat, 29 Sep 2018 13:35:45 +0300 Subject: [PATCH] Pipeline: Complete the move to Tokio Runtime --- Cargo.lock | 44 ++++++------------------------ podcasts-data/Cargo.toml | 4 +-- podcasts-data/src/errors.rs | 9 ------ podcasts-data/src/feed.rs | 7 ++--- podcasts-data/src/lib.rs | 4 +-- podcasts-data/src/models/source.rs | 7 ++--- podcasts-data/src/pipeline.rs | 21 ++++++-------- 7 files changed, 26 insertions(+), 70 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 47564a3..15a8a2e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -921,7 +921,7 @@ dependencies = [ "log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", "net2 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-executor 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-reactor 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1524,9 +1524,7 @@ dependencies = [ "rfc822_sanitizer 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "rss 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-executor 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-threadpool 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", "url 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "xdg 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "xml-rs 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1783,7 +1781,7 @@ dependencies = [ "serde 1.0.78 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.31 (registry+https://github.com/rust-lang/crates.io-index)", "serde_urlencoded 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", "url 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "uuid 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1849,11 +1847,6 @@ dependencies = [ "antidote 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "scoped-tls" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" - [[package]] name = "scopeguard" version = "0.3.3" @@ -2127,13 +2120,14 @@ dependencies = [ [[package]] name = "tokio" -version = "0.1.8" +version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ + "bytes 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.24 (registry+https://github.com/rust-lang/crates.io-index)", "mio 0.6.16 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-codec 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-current-thread 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-current-thread 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-executor 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-fs 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2155,27 +2149,9 @@ dependencies = [ "tokio-io 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "tokio-core" -version = "0.1.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "bytes 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.24 (registry+https://github.com/rust-lang/crates.io-index)", - "iovec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", - "mio 0.6.16 (registry+https://github.com/rust-lang/crates.io-index)", - "scoped-tls 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-executor 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-io 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-reactor 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-timer 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "tokio-current-thread" -version = "0.1.1" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "futures 0.1.24 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2655,7 +2631,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum safemem 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e27a8b19b835f7aea908818e871f5cc3a5a186550c30773be987e155e8163d8f" "checksum schannel 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)" = "dc1fabf2a7b6483a141426e1afd09ad543520a77ac49bd03c286e7696ccfd77f" "checksum scheduled-thread-pool 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1a2ff3fc5223829be817806c6441279c676e454cc7da608faf03b0ccc09d3889" -"checksum scoped-tls 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "332ffa32bf586782a3efaeb58f127980944bbc8c4d6913a86107ac2a5ab24b28" "checksum scopeguard 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "94258f53601af11e6a49f722422f6e3425c52b06245a5cf9bc09908b174f5e27" "checksum security-framework 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "697d3f3c23a618272ead9e1fb259c1411102b31c6af8b93f1d64cca9c3b0e8e0" "checksum security-framework-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "ab01dfbe5756785b5b4d46e0289e5a18071dfa9a7c2b24213ea00b9ef9b665bf" @@ -2687,10 +2662,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum termion 1.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "689a3bdfaab439fd92bc87df5c4c78417d3cbe537487274e9b0b2dce76e92096" "checksum thread_local 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "c6b53e329000edc2b34dbe8545fd20e55a333362d0a321909685a19bd28c3f1b" "checksum time 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)" = "d825be0eb33fda1a7e68012d51e9c7f451dc1a69391e7fdc197060bb8c56667b" -"checksum tokio 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "fbb6a6e9db2702097bfdfddcb09841211ad423b86c75b5ddaca1d62842ac492c" +"checksum tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)" = "6e93c78d23cc61aa245a8acd2c4a79c4d7fa7fb5c3ca90d5737029f043a84895" "checksum tokio-codec 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "881e9645b81c2ce95fcb799ded2c29ffb9f25ef5bef909089a420e5961dd8ccb" -"checksum tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)" = "aeeffbbb94209023feaef3c196a41cbcdafa06b4a6f893f68779bb5e53796f71" -"checksum tokio-current-thread 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8fdfb899688ac16f618076bd09215edbfda0fd5dfecb375b6942636cb31fa8a7" +"checksum tokio-current-thread 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "f90fcd90952f0a496d438a976afba8e5c205fb12123f813d8ab3aa1c8436638c" "checksum tokio-executor 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "c117b6cf86bb730aab4834f10df96e4dd586eff2c3c27d3781348da49e255bde" "checksum tokio-fs 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "b5cbe4ca6e71cb0b62a66e4e6f53a8c06a6eefe46cc5f665ad6f274c9906f135" "checksum tokio-io 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "8d6cc2de7725863c86ac71b0b9068476fec50834f055a243558ef1655bbd34cb" diff --git a/podcasts-data/Cargo.toml b/podcasts-data/Cargo.toml index c838490..4cdc10f 100644 --- a/podcasts-data/Cargo.toml +++ b/podcasts-data/Cargo.toml @@ -19,9 +19,7 @@ xml-rs = "0.8.0" futures = "0.1.23" hyper = "0.12.11" http = "0.1.13" -tokio-core = "0.1.17" -tokio-threadpool = "0.1.7" -tokio-executor = "0.1.5" +tokio = "0.1.11" hyper-tls = "0.3.0" native-tls = "0.2.1" num_cpus = "1.8.0" diff --git a/podcasts-data/src/errors.rs b/podcasts-data/src/errors.rs index 3947223..bee3044 100644 --- a/podcasts-data/src/errors.rs +++ b/podcasts-data/src/errors.rs @@ -5,7 +5,6 @@ use http; use hyper; use native_tls; use rss; -use tokio_executor; use url; use xml; @@ -50,8 +49,6 @@ pub enum DataError { HyperError(#[cause] hyper::Error), #[fail(display = "ToStr Error: {}", _0)] HttpToStr(#[cause] http::header::ToStrError), - #[fail(display = "Tokio Spawn Error: {}", _0)] - SpawnError(#[cause] tokio_executor::SpawnError), #[fail(display = "Failed to parse a url: {}", _0)] UrlError(#[cause] url::ParseError), #[fail(display = "TLS Error: {}", _0)] @@ -115,12 +112,6 @@ impl From for DataError { } } -impl From for DataError { - fn from(err: tokio_executor::SpawnError) -> Self { - DataError::SpawnError(err) - } -} - impl From for DataError { fn from(err: url::ParseError) -> Self { DataError::UrlError(err) diff --git a/podcasts-data/src/feed.rs b/podcasts-data/src/feed.rs index a39a8a1..9153823 100644 --- a/podcasts-data/src/feed.rs +++ b/podcasts-data/src/feed.rs @@ -122,7 +122,7 @@ fn batch_insert_episodes(episodes: &[NewEpisode]) { mod tests { use failure::Error; use rss::Channel; - use tokio_core::reactor::Core; + use tokio::{self, prelude::*}; use database::truncate_db; use dbqueries; @@ -176,10 +176,9 @@ mod tests { get_feed(path, s.id()) }).collect(); - let mut core = Core::new()?; // Index the channes - let list: Vec<_> = feeds.into_iter().map(|x| x.index()).collect(); - let _foo = core.run(join_all(list)); + let stream_ = stream::iter_ok(feeds).for_each(|x| x.index()); + tokio::run(stream_.map_err(|_| ())); // Assert the index rows equal the controlled results assert_eq!(dbqueries::get_sources()?.len(), 5); diff --git a/podcasts-data/src/lib.rs b/podcasts-data/src/lib.rs index 05fe9b3..6c54454 100644 --- a/podcasts-data/src/lib.rs +++ b/podcasts-data/src/lib.rs @@ -84,9 +84,7 @@ extern crate num_cpus; extern crate rayon; extern crate rfc822_sanitizer; extern crate rss; -extern crate tokio_core; -extern crate tokio_executor; -extern crate tokio_threadpool; +extern crate tokio; extern crate url; extern crate xdg; extern crate xml; diff --git a/podcasts-data/src/models/source.rs b/podcasts-data/src/models/source.rs index 9652629..c7fc4e4 100644 --- a/podcasts-data/src/models/source.rs +++ b/podcasts-data/src/models/source.rs @@ -286,8 +286,8 @@ fn response_to_channel(res: Response) -> impl Future Result<(), Error> { truncate_db()?; - let mut core = Core::new()?; + let mut rt = tokio::runtime::Runtime::new()?; let https = HttpsConnector::new(num_cpus::get())?; let client = Client::builder().build::<_, Body>(https); @@ -304,9 +304,8 @@ mod tests { com/InterceptedWithJeremyScahill"; let source = Source::from_url(url)?; let id = source.id(); - let feed = source.into_feed(client); - let feed = core.run(feed)?; + let feed = rt.block_on(feed)?; let expected = get_feed("tests/feeds/2018-01-20-Intercepted.xml", id); assert_eq!(expected, feed); diff --git a/podcasts-data/src/pipeline.rs b/podcasts-data/src/pipeline.rs index 35ef543..211a1b4 100644 --- a/podcasts-data/src/pipeline.rs +++ b/podcasts-data/src/pipeline.rs @@ -1,9 +1,8 @@ // FIXME: //! Docs. -use futures::{lazy, prelude::*, stream::iter_ok}; -use tokio_core::reactor::Core; -use tokio_threadpool::{self, ThreadPool}; +use futures::{lazy, prelude::*, future::ok, stream::FuturesOrdered}; +use tokio; use hyper::client::HttpConnector; use hyper::{Client, Body}; @@ -14,6 +13,8 @@ use num_cpus; use errors::DataError; use Source; +use std::iter::FromIterator; + type HttpsClient = Client>; /// The pipline to be run for indexing and updating a Podcast feed that originates from @@ -25,7 +26,6 @@ type HttpsClient = Client>; pub fn pipeline<'a, S>( sources: S, client: HttpsClient, - pool: tokio_threadpool::Sender, ) -> impl Future + 'a where S: Stream + 'a, @@ -41,7 +41,7 @@ where }) .for_each(move |feed| { let fut = lazy(|| feed.index().map_err(|err| error!("Error: {}", err))); - pool.spawn(fut).map_err(|_| ()) + tokio::spawn(fut) }) } @@ -51,17 +51,14 @@ pub fn run(sources: S) -> Result<(), DataError> where S: IntoIterator, { - let pool = ThreadPool::new(); - let sender = pool.sender().clone(); - let mut core = Core::new()?; let https = HttpsConnector::new(num_cpus::get())?; let client = Client::builder().build::<_, Body>(https); - let stream = iter_ok::<_, DataError>(sources); - let p = pipeline(stream, client, sender); - let _ = core.run(p); + let foo = sources.into_iter().map(ok::<_, _>); + let stream = FuturesOrdered::from_iter(foo); + let p = pipeline(stream, client); + tokio::run(p); - pool.shutdown_on_idle().wait().unwrap(); Ok(()) }