Merge branch 'alatiera/future-stuff' into 'next'

Replace rayon_futures with Tokio Runtime

See merge request World/podcasts!70
This commit is contained in:
Merge Bot 2018-09-03 20:24:55 +00:00
commit d025693ed4
9 changed files with 32 additions and 45 deletions

View File

@ -32,7 +32,6 @@ flatpak:
--env=APP_ID="org.gnome.PodcastsDevel" \
--env=LOCALEDIR="./podcasts-gtk/po" \
--env=VERSION="0.0.0" \
--env=RUSTFLAGS="--cfg rayon_unstable" \
--env=CARGO_HOME="target/cargo-home" \
--env=CARGO_TARGET_DIR="target_test/" \
app ${MANIFEST_PATH} \

View File

@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
### Changed:
### Fixed:
- Fixed a regression where indexing feeds was blocking the `tokio reactor` #88 !70
### Removed:

13
Cargo.lock generated
View File

@ -1501,11 +1501,12 @@ dependencies = [
"pretty_assertions 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)",
"rayon 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"rayon-futures 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rfc822_sanitizer 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"rss 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-executor 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-threadpool 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
"url 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
"xdg 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"xml-rs 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1692,15 +1693,6 @@ dependencies = [
"num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "rayon-futures"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)",
"rayon-core 1.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "redox_syscall"
version = "0.1.40"
@ -2671,7 +2663,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum rand_core 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "edecf0f94da5551fc9b492093e30b041a891657db7940ee221f9d2f66e82eef2"
"checksum rayon 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "df7a791f788cb4c516f0e091301a29c2b71ef680db5e644a7d68835c8ae6dbfa"
"checksum rayon-core 1.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b055d1e92aba6877574d8fe604a63c8b5df60f60e5982bf7ccbb1338ea527356"
"checksum rayon-futures 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ea70dae42033c388536c7e9bdca084a14d75659c12e25fc105e7552464a87c4a"
"checksum redox_syscall 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)" = "c214e91d3ecf43e9a4e41e578973adeb14b474f2bee858742d127af75a0112b1"
"checksum redox_termios 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "7e891cfe48e9100a70a3b6eb652fef28920c117d366339687bd5576160db0f76"
"checksum regex 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "9329abc99e39129fcceabd24cf5d85b4671ef7c29c50e972bc5afe32438ec384"

View File

@ -11,7 +11,6 @@ derive_builder = "0.5.1"
lazy_static = "1.1.0"
log = "0.4.4"
rayon = "1.0.2"
rayon-futures = "0.1.0"
rfc822_sanitizer = "0.3.3"
rss = "1.5.0"
url = "1.7.1"
@ -20,6 +19,8 @@ xml-rs = "0.8.0"
futures = "0.1.23"
hyper = "0.11.27"
tokio-core = "0.1.17"
tokio-threadpool = "0.1.6"
tokio-executor = "0.1"
hyper-tls = "0.1.3"
native-tls = "0.1.5"
num_cpus = "1.8.0"

View File

@ -4,6 +4,7 @@ use diesel_migrations::RunMigrationsError;
use hyper;
use native_tls;
use rss;
use tokio_executor;
use url;
use xml;
@ -46,6 +47,8 @@ pub enum DataError {
R2D2PoolError(#[cause] r2d2::PoolError),
#[fail(display = "Hyper Error: {}", _0)]
HyperError(#[cause] hyper::Error),
#[fail(display = "Tokio Spawn Error: {}", _0)]
SpawnError(#[cause] tokio_executor::SpawnError),
#[fail(display = "Failed to parse a url: {}", _0)]
UrlError(#[cause] url::ParseError),
#[fail(display = "TLS Error: {}", _0)]
@ -101,6 +104,12 @@ impl From<hyper::Error> for DataError {
}
}
impl From<tokio_executor::SpawnError> for DataError {
fn from(err: tokio_executor::SpawnError) -> Self {
DataError::SpawnError(err)
}
}
impl From<url::ParseError> for DataError {
fn from(err: url::ParseError) -> Self {
DataError::UrlError(err)

View File

@ -80,10 +80,11 @@ extern crate hyper_tls;
extern crate native_tls;
extern crate num_cpus;
extern crate rayon;
extern crate rayon_futures;
extern crate rfc822_sanitizer;
extern crate rss;
extern crate tokio_core;
extern crate tokio_executor;
extern crate tokio_threadpool;
extern crate url;
extern crate xdg;
extern crate xml;

View File

@ -9,35 +9,13 @@ use hyper::client::HttpConnector;
use hyper::Client;
use hyper_tls::HttpsConnector;
use tokio_core::reactor::Core;
use tokio_threadpool::{self, ThreadPool};
use num_cpus;
use rayon;
use rayon_futures::ScopeFutureExt;
use errors::DataError;
use Source;
// use std::sync::{Arc, Mutex};
// http://gtk-rs.org/tuto/closures
#[macro_export]
macro_rules! clone {
(@param _) => ( _ );
(@param $x:ident) => ( $x );
($($n:ident),+ => move || $body:expr) => (
{
$( let $n = $n.clone(); )+
move || $body
}
);
($($n:ident),+ => move |$($p:tt),+| $body:expr) => (
{
$( let $n = $n.clone(); )+
move |$(clone!(@param $p),)+| $body
}
);
}
type HttpsClient = Client<HttpsConnector<HttpConnector>>;
/// The pipline to be run for indexing and updating a Podcast feed that originates from
@ -48,14 +26,18 @@ type HttpsClient = Client<HttpsConnector<HttpConnector>>;
/// Convert `rss::Channel` into `Feed` -> Index Podcast -> Index Episodes.
pub fn pipeline<'a, S>(
sources: S,
client: &HttpsClient,
client: HttpsClient,
pool: tokio_threadpool::Sender,
) -> impl Future<Item = Vec<()>, Error = DataError> + 'a
where
S: Stream<Item = Source, Error = DataError> + 'a,
{
sources
.and_then(clone!(client => move |s| s.into_feed(client.clone())))
.and_then(|feed| rayon::scope(|s| s.spawn_future(feed.index())))
.and_then(move |s| s.into_feed(client.clone()))
.and_then(move |feed| {
let fut = lazy(|| feed.index().map_err(|err| error!("Error: {}", err)));
pool.spawn(fut).map_err(From::from)
})
// the stream will stop at the first error so
// we ensure that everything will succeded regardless.
.map_err(|err| error!("Error: {}", err))
@ -69,6 +51,8 @@ pub fn run<S>(sources: S) -> Result<(), DataError>
where
S: IntoIterator<Item = Source>,
{
let pool = ThreadPool::new();
let sender = pool.sender().clone();
let mut core = Core::new()?;
let handle = core.handle();
let client = Client::configure()
@ -76,8 +60,11 @@ where
.build(&handle);
let stream = iter_ok::<_, DataError>(sources);
let p = pipeline(stream, &client);
core.run(p).map(|_| ())
let p = pipeline(stream, client, sender);
core.run(p)?;
pool.shutdown_on_idle().wait().unwrap();
Ok(())
}
#[cfg(test)]

View File

@ -1,7 +1,6 @@
#!/bin/sh
export CARGO_HOME=$1/target/cargo-home
export RUSTFLAGS="--cfg rayon_unstable"
export LOCALEDIR="$3"
export APP_ID="$4"
export VERSION="$5"

View File

@ -26,7 +26,6 @@ xvfb-run -a -s "-screen 0 1024x768x24" \
--env=APP_ID="org.gnome.PodcastsDevel" \
--env=LOCALEDIR="./podcasts-gtk/po" \
--env=VERSION="0.0.0" \
--env=RUSTFLAGS="--cfg rayon_unstable" \
--env=CARGO_HOME="target/cargo-home" \
--env=CARGO_TARGET_DIR="target_test/" \
app ${MANIFEST_PATH} \