From 933ba62f3947aa40e851b421db87b77c6dd2e1e0 Mon Sep 17 00:00:00 2001 From: Julian Hofer Date: Tue, 25 Feb 2020 20:22:59 +0100 Subject: [PATCH 1/5] Upgrade to Futures 0.3 --- Cargo.lock | 318 ++++++++++++++++++++--------- podcasts-data/Cargo.toml | 14 +- podcasts-data/src/feed.rs | 62 +++--- podcasts-data/src/models/source.rs | 82 ++++---- podcasts-data/src/opml.rs | 3 +- podcasts-data/src/pipeline.rs | 41 ++-- 6 files changed, 318 insertions(+), 202 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index da97852..a586c06 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -142,6 +142,11 @@ dependencies = [ "iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "bytes" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "c2-chacha" version = "0.2.3" @@ -569,16 +574,31 @@ version = "0.1.29" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] -name = "futures-channel" -version = "0.3.1" +name = "futures" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "futures-core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-channel 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-core 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-executor 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-io 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-sink 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-task 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-util 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "futures-channel" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures-core 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-sink 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] name = "futures-core" -version = "0.3.1" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] @@ -592,22 +612,22 @@ dependencies = [ [[package]] name = "futures-executor" -version = "0.3.1" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "futures-core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", - "futures-task 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", - "futures-util 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-core 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-task 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-util 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] name = "futures-io" -version = "0.3.1" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "futures-macro" -version = "0.3.1" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "proc-macro-hack 0.5.11 (registry+https://github.com/rust-lang/crates.io-index)", @@ -616,19 +636,28 @@ dependencies = [ "syn 1.0.14 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "futures-sink" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "futures-task" -version = "0.3.1" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "futures-util" -version = "0.3.1" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "futures-core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", - "futures-macro 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", - "futures-task 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-channel 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-core 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-io 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-macro 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-sink 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-task 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "memchr 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "pin-utils 0.1.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)", "proc-macro-hack 0.5.11 (registry+https://github.com/rust-lang/crates.io-index)", "proc-macro-nested 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", @@ -728,10 +757,10 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)", - "futures-channel 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", - "futures-core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", - "futures-io 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", - "futures-util 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-channel 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-core 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-io 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-util 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", "gio-sys 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)", "glib 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)", "glib-sys 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -757,11 +786,11 @@ version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)", - "futures-channel 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", - "futures-core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", - "futures-executor 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", - "futures-task 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", - "futures-util 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-channel 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-core 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-executor 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-task 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-util 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", "glib-sys 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)", "gobject-sys 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -799,9 +828,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", - "futures-channel 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", - "futures-core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", - "futures-util 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-channel 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-core 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-util 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", "glib 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)", "glib-sys 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)", "gobject-sys 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -971,6 +1000,24 @@ dependencies = [ "tokio-io 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "h2" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)", + "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-core 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-sink 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-util 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "http 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "indexmap 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-util 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "hermit-abi" version = "0.1.6" @@ -1022,6 +1069,16 @@ dependencies = [ "itoa 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "http" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)", + "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", + "itoa 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "http-body" version = "0.1.0" @@ -1033,6 +1090,15 @@ dependencies = [ "tokio-buf 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "http-body" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)", + "http 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "httparse" version = "1.3.4" @@ -1072,6 +1138,29 @@ dependencies = [ "want 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "hyper" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-channel 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-core 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-util 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "h2 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "http 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "http-body 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "httparse 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "itoa 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "net2 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)", + "pin-project 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", + "tower-service 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "want 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "hyper-tls" version = "0.3.2" @@ -1084,6 +1173,18 @@ dependencies = [ "tokio-io 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "hyper-tls" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)", + "hyper 0.13.2 (registry+https://github.com/rust-lang/crates.io-index)", + "native-tls 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-tls 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "ident_case" version = "1.0.1" @@ -1353,16 +1454,6 @@ dependencies = [ "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "mio-uds" -version = "0.6.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", - "libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)", - "mio 0.6.21 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "miow" version = "0.2.1" @@ -1637,6 +1728,29 @@ dependencies = [ "siphasher 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "pin-project" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "pin-project-internal 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "pin-project-internal" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "proc-macro2 1.0.8 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 1.0.14 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "pin-project-lite" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "pin-utils" version = "0.1.0-alpha.4" @@ -1659,10 +1773,10 @@ dependencies = [ "diesel_migrations 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "failure 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "failure_derive 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", - "http 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", - "hyper 0.12.35 (registry+https://github.com/rust-lang/crates.io-index)", - "hyper-tls 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "http 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "hyper 0.13.2 (registry+https://github.com/rust-lang/crates.io-index)", + "hyper-tls 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "maplit 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1674,7 +1788,7 @@ dependencies = [ "rfc822_sanitizer 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "rss 1.9.0 (registry+https://github.com/rust-lang/crates.io-index)", "tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", "url 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "xdg 2.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "xml-rs 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2378,18 +2492,28 @@ dependencies = [ "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", "mio 0.6.21 (registry+https://github.com/rust-lang/crates.io-index)", "num_cpus 1.12.0 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-codec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-current-thread 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-executor 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-fs 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-reactor 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-sync 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-tcp 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-threadpool 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-timer 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-udp 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-uds 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "tokio" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)", + "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", + "iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "memchr 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.6.21 (registry+https://github.com/rust-lang/crates.io-index)", + "pin-project-lite 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -2402,16 +2526,6 @@ dependencies = [ "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "tokio-codec" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-io 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "tokio-current-thread" version = "0.1.6" @@ -2430,16 +2544,6 @@ dependencies = [ "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "tokio-fs" -version = "0.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-io 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-threadpool 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "tokio-io" version = "0.1.12" @@ -2518,36 +2622,32 @@ dependencies = [ ] [[package]] -name = "tokio-udp" -version = "0.1.5" +name = "tokio-tls" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", - "mio 0.6.21 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-codec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-io 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-reactor 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", + "native-tls 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] -name = "tokio-uds" -version = "0.2.5" +name = "tokio-util" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", - "iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", - "libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)", + "bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-core 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-sink 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", - "mio 0.6.21 (registry+https://github.com/rust-lang/crates.io-index)", - "mio-uds 0.6.7 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-codec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-io 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-reactor 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", + "pin-project-lite 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "tower-service" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "try-lock" version = "0.2.2" @@ -2676,6 +2776,15 @@ dependencies = [ "try-lock 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "want" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "try-lock 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "wasi" version = "0.9.0+wasi-snapshot-preview1" @@ -2755,6 +2864,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" "checksum byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "a7c3dd8985a7111efc5c80b44e23ecdd8c007de8ade3b96595387e812b957cf5" "checksum bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)" = "206fdffcfa2df7cbe15601ef46c813fce0965eb3286db6b56c583b814b51c81c" +"checksum bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)" = "130aac562c0dd69c56b3b1cc8ffd2e17be31d0b6c25b61c96b76231aa23e39e1" "checksum c2-chacha 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "214238caa1bf3a496ec3392968969cab8549f96ff30652c9e56885329315f6bb" "checksum cairo-rs 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "0b528aca2ef1026235d0122495dbaee0b09479f77c51f6df8d9bb9cb1c6d6f87" "checksum cairo-sys-rs 0.9.2 (registry+https://github.com/rust-lang/crates.io-index)" = "ff65ba02cac715be836f63429ab00a767d48336efc5497c5637afb53b4f14d63" @@ -2804,14 +2914,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" "checksum futf 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "7c9c1ce3fa9336301af935ab852c437817d14cd33690446569392e65170aac3b" "checksum futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)" = "1b980f2816d6ee8673b6517b52cb0e808a180efc92e5c19d02cdda79066703ef" -"checksum futures-channel 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "fcae98ca17d102fd8a3603727b9259fcf7fa4239b603d2142926189bc8999b86" -"checksum futures-core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "79564c427afefab1dfb3298535b21eda083ef7935b4f0ecbfcb121f0aec10866" +"checksum futures 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "5c329ae8753502fb44ae4fc2b622fa2a94652c41e795143765ba0927f92ab780" +"checksum futures-channel 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "f0c77d04ce8edd9cb903932b608268b3fffec4163dc053b3b402bf47eac1f1a8" +"checksum futures-core 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "f25592f769825e89b92358db00d26f965761e094951ac44d3663ef25b7ac464a" "checksum futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "ab90cde24b3319636588d0c35fe03b1333857621051837ed769faefb4c2162e4" -"checksum futures-executor 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1e274736563f686a837a0568b478bdabfeaec2dca794b5649b04e2fe1627c231" -"checksum futures-io 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e676577d229e70952ab25f3945795ba5b16d63ca794ca9d2c860e5595d20b5ff" -"checksum futures-macro 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "52e7c56c15537adb4f76d0b7a76ad131cb4d2f4f32d3b0bcabcbe1c7c5e87764" -"checksum futures-task 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "0bae52d6b29cf440e298856fec3965ee6fa71b06aa7495178615953fd669e5f9" -"checksum futures-util 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "c0d66274fb76985d3c62c886d1da7ac4c0903a8c9f754e8fe0f35a6a6cc39e76" +"checksum futures-executor 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "f674f3e1bcb15b37284a90cedf55afdba482ab061c407a9c0ebbd0f3109741ba" +"checksum futures-io 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "a638959aa96152c7a4cddf50fcb1e3fede0583b27157c26e67d6f99904090dc6" +"checksum futures-macro 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "9a5081aa3de1f7542a794a397cde100ed903b0630152d0973479018fd85423a7" +"checksum futures-sink 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "3466821b4bc114d95b087b850a724c6f83115e929bc88f1fa98a3304a944c8a6" +"checksum futures-task 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "7b0a34e53cf6cdcd0178aa573aed466b646eb3db769570841fda0c7ede375a27" +"checksum futures-util 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "22766cf25d64306bedf0384da004d05c9974ab104fcc4528f1236181c18004c5" "checksum gdk 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2739c12374f83bad563ee839c2b3ea5c60391465a254fd4a54b6e3e9648dc61f" "checksum gdk-pixbuf 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e248220c46b329b097d4b158d2717f8c688f16dd76d0399ace82b3e98062bdd7" "checksum gdk-pixbuf-sys 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d8991b060a9e9161bafd09bf4a202e6fd404f5b4dd1a08d53a1e84256fb34ab0" @@ -2836,16 +2948,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum gtk 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7cd1d646cc9a2cb795f33b538779a3f22e71dc172f2aba08a41e84a2f72c0dec" "checksum gtk-sys 0.9.2 (registry+https://github.com/rust-lang/crates.io-index)" = "53def660c7b48b00b510c81ef2d2fbd3c570f1527081d8d7947f471513e1a4c1" "checksum h2 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)" = "a5b34c246847f938a410a03c5458c7fee2274436675e76d8b903c08efc29c462" +"checksum h2 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b9433d71e471c1736fd5a61b671fc0b148d7a2992f666c958d03cd8feb3b88d1" "checksum hermit-abi 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "eff2656d88f158ce120947499e971d743c05dbcbed62e5bd2f38f1698bbc3772" "checksum html2text 0.1.8 (git+https://github.com/jugglerchris/rust-html2text)" = "" "checksum html5ever 0.24.1 (registry+https://github.com/rust-lang/crates.io-index)" = "025483b0a1e4577bb28578318c886ee5f817dda6eb62473269349044406644cb" "checksum html5ever-atoms 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8e4a291981feff7291514f8219d5cd2c740d0c042d75cff248a7c00a025f9d40" "checksum http 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)" = "d6ccf5ede3a895d8856620237b2f02972c1bbc78d2965ad7fe8838d4a0ed41f0" +"checksum http 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b708cc7f06493459026f53b9a61a7a121a5d1ec6238dee58ea4941132b30156b" "checksum http-body 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6741c859c1b2463a423a1dbce98d418e6c3c3fc720fb0d45528657320920292d" +"checksum http-body 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "13d5ff830006f7646652e057693569bfe0d51760c0085a071769d142a205111b" "checksum httparse 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "cd179ae861f0c2e53da70d892f5f3029f9594be0c41dc5269cd371691b1dc2f9" "checksum humansize 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b6cab2627acfc432780848602f3f558f7e9dd427352224b0d9324025796d2a5e" "checksum hyper 0.12.35 (registry+https://github.com/rust-lang/crates.io-index)" = "9dbe6ed1438e1f8ad955a4701e9a944938e9519f6888d12d8558b645e247d5f6" +"checksum hyper 0.13.2 (registry+https://github.com/rust-lang/crates.io-index)" = "fa1c527bbc634be72aa7ba31e4e4def9bbb020f5416916279b7c705cd838893e" "checksum hyper-tls 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "3a800d6aa50af4b5850b2b0f659625ce9504df908e9733b635720483be26174f" +"checksum hyper-tls 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3adcd308402b9553630734e9c36b77a7e48b3821251ca2493e8cd596763aafaa" "checksum ident_case 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" "checksum idna 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "38f09e0f0b1fb55fdee1f17470ad800da77af5186a1a76c026b679358b7e844e" "checksum idna 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "02e2673c30ee86b5b96a9cb52ad15718aa1f966f5ab9ad54a8b95d5ca33120a9" @@ -2877,7 +2994,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum mime_guess 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1a0ed03949aef72dbdf3116a383d7b38b4768e6f960528cd6a6044aa9ed68599" "checksum miniz_oxide 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "6f3f74f726ae935c3f514300cc6773a0c9492abc5e972d42ba0c0ebb88757625" "checksum mio 0.6.21 (registry+https://github.com/rust-lang/crates.io-index)" = "302dec22bcf6bae6dfb69c647187f4b4d0fb6f535521f7bc022430ce8e12008f" -"checksum mio-uds 0.6.7 (registry+https://github.com/rust-lang/crates.io-index)" = "966257a94e196b11bb43aca423754d87429960a768de9414f3691d6957abf125" "checksum miow 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f2f3b1cf331de6896aabf6e9d55dca90356cc9960cca7eaaf408a355ae919" "checksum mpris-player 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d90fee39ea8cf6dbae3ffb144d6db7feca90f6c86229fc458039e0d4bca5bf02" "checksum muldiv 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "0419348c027fa7be448d2ae7ea0e4e04c2334c31dc4e74ab29f00a2a7ca69204" @@ -2907,6 +3023,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum phf_codegen 0.7.24 (registry+https://github.com/rust-lang/crates.io-index)" = "b03e85129e324ad4166b06b2c7491ae27fe3ec353af72e72cd1654c7225d517e" "checksum phf_generator 0.7.24 (registry+https://github.com/rust-lang/crates.io-index)" = "09364cc93c159b8b06b1f4dd8a4398984503483891b0c26b867cf431fb132662" "checksum phf_shared 0.7.24 (registry+https://github.com/rust-lang/crates.io-index)" = "234f71a15de2288bcb7e3b6515828d22af7ec8598ee6d24c3b526fa0a80b67a0" +"checksum pin-project 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)" = "7804a463a8d9572f13453c516a5faea534a2403d7ced2f0c7e100eeff072772c" +"checksum pin-project-internal 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)" = "385322a45f2ecf3410c68d2a549a4a2685e8051d0f278e39743ff4e451cb9b3f" +"checksum pin-project-lite 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "237844750cfbb86f67afe27eee600dfbbcb6188d734139b534cbfbf4f96792ae" "checksum pin-utils 0.1.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)" = "5894c618ce612a3fa23881b152b608bafb8c56cfc22f434a3ba3120b40f7b587" "checksum pkg-config 0.3.17 (registry+https://github.com/rust-lang/crates.io-index)" = "05da548ad6865900e60eaba7f589cc0783590a92e940c26953ff81ddbab2d677" "checksum ppv-lite86 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)" = "74490b50b9fbe561ac330df47c08f3f33073d2d00c150f719147d7c54522fa1b" @@ -2980,19 +3099,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum thread_local 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14" "checksum time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)" = "db8dcfca086c1143c9270ac42a2bbd8a7ee477b78ac8e45b19abfb0cbede4b6f" "checksum tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)" = "5a09c0b5bb588872ab2f09afa13ee6e9dac11e10a0ec9e8e3ba39a5a5d530af6" +"checksum tokio 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "8fdd17989496f49cdc57978c96f0c9fe5e4a58a8bddc6813c449a4624f6a030b" "checksum tokio-buf 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8fb220f46c53859a4b7ec083e41dec9778ff0b1851c0942b211edb89e0ccdc46" -"checksum tokio-codec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "5c501eceaf96f0e1793cf26beb63da3d11c738c4a943fdf3746d81d64684c39f" "checksum tokio-current-thread 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "d16217cad7f1b840c5a97dfb3c43b0c871fef423a6e8d2118c604e843662a443" "checksum tokio-executor 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)" = "ca6df436c42b0c3330a82d855d2ef017cd793090ad550a6bc2184f4b933532ab" -"checksum tokio-fs 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "3fe6dc22b08d6993916647d108a1a7d15b9cd29c4f4496c62b92c45b5041b7af" "checksum tokio-io 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)" = "5090db468dad16e1a7a54c8c67280c5e4b544f3d3e018f0b913b400261f85926" "checksum tokio-reactor 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)" = "6732fe6b53c8d11178dcb77ac6d9682af27fc6d4cb87789449152e5377377146" "checksum tokio-sync 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "d06554cce1ae4a50f42fba8023918afa931413aded705b560e29600ccf7c6d76" "checksum tokio-tcp 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "1d14b10654be682ac43efee27401d792507e30fd8d26389e1da3b185de2e4119" "checksum tokio-threadpool 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)" = "f0c32ffea4827978e9aa392d2f743d973c1dfa3730a2ed3f22ce1e6984da848c" "checksum tokio-timer 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)" = "1739638e364e558128461fc1ad84d997702c8e31c2e6b18fb99842268199e827" -"checksum tokio-udp 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "f02298505547f73e60f568359ef0d016d5acd6e830ab9bc7c4a5b3403440121b" -"checksum tokio-uds 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "037ffc3ba0e12a0ab4aca92e5234e0dedeb48fddf6ccd260f1f150a36a9f2445" +"checksum tokio-tls 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7bde02a3a5291395f59b06ec6945a3077602fac2b07eeeaf0dee2122f3619828" +"checksum tokio-util 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "571da51182ec208780505a32528fc5512a8fe1443ab960b3f2f3ef093cd16930" +"checksum tower-service 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e987b6bf443f4b5b3b6f38704195592cca41c5bb7aedd3c3693c7081f8289860" "checksum try-lock 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e604eb7b43c06650e854be16a2a03155743d3752dd1c943f6829e26b7a36e382" "checksum try_from 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "283d3b89e1368717881a9d51dad843cc435380d8109c9e47d38780a324698d8b" "checksum ucd-util 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "5ccdc2daea7cf8bc50cd8710d170a9d816678e54943829c5082bb1594312cf8e" @@ -3012,6 +3131,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum version_check 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)" = "078775d0255232fb988e6fccf26ddc9d1ac274299aaedcedce21c6f72cc533ce" "checksum void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" "checksum want 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b6395efa4784b027708f7451087e647ec73cc74f5d9bc2e418404248d679a230" +"checksum want 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0" "checksum wasi 0.9.0+wasi-snapshot-preview1 (registry+https://github.com/rust-lang/crates.io-index)" = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" "checksum winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a" "checksum winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)" = "8093091eeb260906a183e6ae1abdba2ef5ef2257a21801128899c3fc699229c6" diff --git a/podcasts-data/Cargo.toml b/podcasts-data/Cargo.toml index e750587..7063127 100644 --- a/podcasts-data/Cargo.toml +++ b/podcasts-data/Cargo.toml @@ -12,15 +12,15 @@ lazy_static = "1.4.0" log = "0.4.8" rayon = "1.2.0" rfc822_sanitizer = "0.3.3" -rss = "1.8.0" -url = "2.1.0" +rss = "1.9.0" +url = "2.1.1" xdg = "2.2.0" xml-rs = "0.8.0" -futures = "0.1.29" -hyper = "0.12.35" -http = "0.1.19" -tokio = "0.1.22" -hyper-tls = "0.3.2" +futures = "0.3.4" +hyper = "0.13.2" +http = "0.2.0" +tokio = "0.2.11" +hyper-tls = "0.4.1" native-tls = "0.2.3" num_cpus = "1.10.1" failure = "0.1.6" diff --git a/podcasts-data/src/feed.rs b/podcasts-data/src/feed.rs index c36095b..d04e887 100644 --- a/podcasts-data/src/feed.rs +++ b/podcasts-data/src/feed.rs @@ -45,26 +45,28 @@ pub struct Feed { impl Feed { /// Index the contents of the RSS `Feed` into the database. - pub fn index(self) -> impl Future + Send { - ok(self.parse_podcast()) - .and_then(|pd| pd.to_podcast()) - .and_then(move |pd| self.index_channel_items(pd)) + pub async fn index(self) -> Result<(), DataError>{ + let show = self.parse_podcast().to_podcast()?; + self.index_channel_items(show).await } fn parse_podcast(&self) -> NewShow { NewShow::new(&self.channel, self.source_id) } - fn index_channel_items(self, pd: Show) -> impl Future + Send { - let stream = stream::iter_ok::<_, DataError>(self.channel.into_items()); - + fn index_channel_items(self, pd: Show) -> impl Future> + Send { + let stream = stream::iter(self.channel.into_items()); // Parse the episodes let episodes = stream.filter_map(move |item| { - NewEpisodeMinimal::new(&item, pd.id()) - .and_then(move |ep| determine_ep_state(ep, &item)) - .map_err(|err| error!("Failed to parse an episode: {}", err)) - .ok() - }); + let ret = NewEpisodeMinimal::new(&item, pd.id()) + .and_then(move |ep| determine_ep_state(ep, &item)); + if ret.is_ok() { + future::ready(Some(ret)) + } + else { + future::ready(None) + } + }); // Filter errors, Index updatable episodes, return insertables. filter_episodes(episodes) @@ -96,26 +98,28 @@ fn determine_ep_state( fn filter_episodes<'a, S>( stream: S, -) -> impl Future, Error = DataError> + Send + 'a +) -> impl Future, DataError>> + Send + 'a where - S: Stream, Error = DataError> + Send + 'a, + S: Stream, DataError>> + Send + 'a, { stream - .filter_map(|state| match state { - IndexState::NotChanged => None, - // Update individual rows, and filter them - IndexState::Update((ref ep, rowid)) => { - ep.update(rowid) - .map_err(|err| error!("{}", err)) - .map_err(|_| error!("Failed to index episode: {:?}.", ep.title())) - .ok(); + .try_filter_map(|state| async move { + match state { + IndexState::NotChanged => Ok(None), + // Update individual rows, and filter them + IndexState::Update((ref ep, rowid)) => { + ep.update(rowid) + .map_err(|err| error!("{}", err)) + .map_err(|_| error!("Failed to index episode: {:?}.", ep.title())) + .ok(); - None + Ok(None) + } + IndexState::Index(s) => Ok(Some(s)), } - IndexState::Index(s) => Some(s), }) // only Index is left, collect them for batch index - .collect() + .try_collect() } fn batch_insert_episodes(episodes: &[NewEpisode]) { @@ -144,6 +148,7 @@ mod tests { use failure::Error; use rss::Channel; use tokio::{self, prelude::*}; + use futures::executor::block_on; use crate::database::truncate_db; use crate::dbqueries; @@ -199,8 +204,9 @@ mod tests { .collect(); // Index the channes - let stream_ = stream::iter_ok(feeds).for_each(|x| x.index()); - tokio::run(stream_.map_err(|_| ())); + let stream_ = stream::iter(feeds).for_each(|x| x.index().map(|x| x.unwrap())); + let mut rt = tokio::runtime::Runtime::new()?; + rt.block_on(stream_); // Assert the index rows equal the controlled results assert_eq!(dbqueries::get_sources()?.len(), 5); @@ -232,7 +238,7 @@ mod tests { let feed = get_feed(path, 42); let pd = feed.parse_podcast().to_podcast()?; - feed.index_channel_items(pd).wait()?; + block_on(feed.index_channel_items(pd))?; assert_eq!(dbqueries::get_podcasts()?.len(), 1); assert_eq!(dbqueries::get_episodes()?.len(), 43); Ok(()) diff --git a/podcasts-data/src/models/source.rs b/podcasts-data/src/models/source.rs index 509be15..7a09693 100644 --- a/podcasts-data/src/models/source.rs +++ b/podcasts-data/src/models/source.rs @@ -32,7 +32,7 @@ use http::header::{ }; use http::{Request, Response, StatusCode, Uri}; // use futures::future::ok; -use futures::future::{loop_fn, Future, Loop}; +use futures::future::Future; use futures::prelude::*; use base64::{encode_config, URL_SAFE}; @@ -239,41 +239,44 @@ impl Source { /// /// Consumes `self` and Returns the corresponding `Feed` Object. // Refactor into TryInto once it lands on stable. - pub fn into_feed( + pub async fn into_feed( self, client: Client>, - ) -> impl Future { + ) -> Result { + let id = self.id(); - let response = loop_fn(self, move |source| { - source - .request_constructor(&client.clone()) - .then(|res| match res { - Ok(response) => Ok(Loop::Break(response)), - Err(err) => match err { - DataError::FeedRedirect(s) => { - info!("Following redirect..."); - Ok(Loop::Continue(s)) - } - e => Err(e), - }, - }) - }); - response - .and_then(response_to_channel) - .and_then(move |chan| { - FeedBuilder::default() - .channel(chan) - .source_id(id) - .build() - .map_err(From::from) - }) + let resp = self.get_response(&client).await?; + let chan = response_to_channel(resp).await?; + + FeedBuilder::default() + .channel(chan) + .source_id(id) + .build() + .map_err(From::from) } - fn request_constructor( + async fn get_response(self, client: &Client>) -> Result, DataError> { + let mut source = self; + loop + { + match source.request_constructor(&client.clone()).await { + Ok(response) => return Ok(response), + Err(err) => match err { + DataError::FeedRedirect(s) => { + info!("Following redirect..."); + source = s; + } + e => return Err(e), + }, + } + }; + } + + async fn request_constructor( self, client: &Client>, - ) -> impl Future, Error = DataError> { + ) -> Result, DataError> { // FIXME: remove unwrap somehow let uri = Uri::from_str(self.uri()).unwrap(); let mut req = Request::get(uri).body(Body::empty()).unwrap(); @@ -304,23 +307,18 @@ impl Source { .insert(IF_MODIFIED_SINCE, HeaderValue::from_str(lmod).unwrap()); } - client - .request(req) - .map_err(From::from) - .and_then(move |res| self.match_status(res)) + let res = client.request(req).await?; + //.map_err(From::from) + self.match_status(res) } } -fn response_to_channel( +async fn response_to_channel( res: Response, -) -> impl Future + Send { - res.into_body() - .concat2() - .map(|x| x.into_iter()) - .map_err(From::from) - .map(|iter| iter.collect::>()) - .map(|utf_8_bytes| String::from_utf8_lossy(&utf_8_bytes).into_owned()) - .and_then(|buf| Channel::from_str(&buf).map_err(From::from)) +) -> Result { + let chunk = hyper::body::to_bytes(res.into_body()).await?; + let buf = String::from_utf8_lossy(&chunk).into_owned(); + Channel::from_str(&buf).map_err(From::from) } #[cfg(test)] @@ -338,7 +336,7 @@ mod tests { truncate_db()?; let mut rt = tokio::runtime::Runtime::new()?; - let https = HttpsConnector::new(num_cpus::get())?; + let https = HttpsConnector::new(); let client = Client::builder().build::<_, Body>(https); let url = "https://web.archive.org/web/20180120083840if_/https://feeds.feedburner.\ diff --git a/podcasts-data/src/opml.rs b/podcasts-data/src/opml.rs index 70430b3..fadbab8 100644 --- a/podcasts-data/src/opml.rs +++ b/podcasts-data/src/opml.rs @@ -211,6 +211,7 @@ mod tests { use chrono::Local; use failure::Error; use futures::Future; + use futures::executor::block_on; use crate::database::{truncate_db, TEMPDIR}; use crate::utils::get_feed; @@ -318,7 +319,7 @@ mod tests { // Create and insert a Source into db let s = Source::from_url(url).unwrap(); let feed = get_feed(path, s.id()); - feed.index().wait().unwrap(); + block_on(feed.index()).unwrap(); }); let mut map: HashSet = HashSet::new(); diff --git a/podcasts-data/src/pipeline.rs b/podcasts-data/src/pipeline.rs index c3a0271..0dbe575 100644 --- a/podcasts-data/src/pipeline.rs +++ b/podcasts-data/src/pipeline.rs @@ -20,7 +20,7 @@ // FIXME: //! Docs. -use futures::{future::ok, lazy, prelude::*, stream::FuturesUnordered}; +use futures::{future::ok, future::lazy, prelude::*, stream::FuturesUnordered}; use tokio; use hyper::client::HttpConnector; @@ -42,29 +42,18 @@ type HttpsClient = Client>; /// Messy temp diagram: /// Source -> GET Request -> Update Etags -> Check Status -> Parse `xml/Rss` -> /// Convert `rss::Channel` into `Feed` -> Index Podcast -> Index Episodes. -pub fn pipeline<'a, S>(sources: S, client: HttpsClient) -> impl Future + 'a +pub async fn pipeline<'a, S>(mut sources: S, client: HttpsClient) where - S: Stream + Send + 'a, + S: Stream> + Send + 'a + std::marker::Unpin { - sources - .and_then(move |s| s.into_feed(client.clone())) - .map_err(|err| { - match err { - // Avoid spamming the stderr when its not an eactual error - DataError::FeedNotModified(_) => (), - _ => error!("Error: {}", err), - } - }) - .and_then(move |feed| { - let fut = lazy(|| feed.index().map_err(|err| error!("Error: {}", err))); - tokio::spawn(fut); - Ok(()) - }) - // For each terminates the stream at the first error so we make sure - // we pass good values regardless - .then(move |_| ok(())) - // Convert the stream into a Future to later execute as a tokio task - .for_each(move |_| ok(())) + while let Some(source_result) = sources.next().await { + if let Ok(source) = source_result { + if let Ok(feed) = source.into_feed(client.clone()).await { + let fut = lazy(|_| feed.index().map_err(|err| error!("Error: {}", err))); + tokio::spawn(fut); + }; + } + } } /// Creates a tokio `reactor::Core`, and a `hyper::Client` and @@ -73,13 +62,14 @@ pub fn run(sources: S) -> Result<(), DataError> where S: IntoIterator, { - let https = HttpsConnector::new(num_cpus::get())?; + let https = HttpsConnector::new(); let client = Client::builder().build::<_, Body>(https); let foo = sources.into_iter().map(ok::<_, _>); let stream = FuturesUnordered::from_iter(foo); let p = pipeline(stream, client); - tokio::run(p); + let mut rt = tokio::runtime::Runtime::new()?; + rt.block_on(p); Ok(()) } @@ -110,6 +100,7 @@ mod tests { let bad_url = "https://gitlab.gnome.org/World/podcasts.atom"; // if a stream returns error/None it stops // bad we want to parse all feeds regardless if one fails + //TODO_JH: Remove comment Source::from_url(bad_url)?; URLS.iter().for_each(|url| { @@ -121,7 +112,7 @@ mod tests { run(sources)?; let sources = dbqueries::get_sources()?; - // Run again to cover Unique constrains erros. + // Run again to cover Unique constrains errors. run(sources)?; // Assert the index rows equal the controlled results From e830589e389318c167a3f08aa4d5fe50b8c6ea6f Mon Sep 17 00:00:00 2001 From: Julian Hofer Date: Sun, 1 Mar 2020 11:15:28 +0100 Subject: [PATCH 2/5] Remove unused imports --- podcasts-data/src/feed.rs | 2 +- podcasts-data/src/models/new_episode.rs | 4 ---- podcasts-data/src/models/source.rs | 4 ---- podcasts-data/src/opml.rs | 1 - podcasts-data/src/pipeline.rs | 14 ++++++++------ 5 files changed, 9 insertions(+), 16 deletions(-) diff --git a/podcasts-data/src/feed.rs b/podcasts-data/src/feed.rs index d04e887..d4cffc9 100644 --- a/podcasts-data/src/feed.rs +++ b/podcasts-data/src/feed.rs @@ -147,7 +147,7 @@ fn batch_insert_episodes(episodes: &[NewEpisode]) { mod tests { use failure::Error; use rss::Channel; - use tokio::{self, prelude::*}; + use tokio; use futures::executor::block_on; use crate::database::truncate_db; diff --git a/podcasts-data/src/models/new_episode.rs b/podcasts-data/src/models/new_episode.rs index 6a077ac..896fc1f 100644 --- a/podcasts-data/src/models/new_episode.rs +++ b/podcasts-data/src/models/new_episode.rs @@ -578,10 +578,6 @@ mod tests { let ep = EXPECTED_MINIMAL_INTERCEPTED_1 .clone() .into_new_episode(&item); - println!( - "EPISODE: {:#?}\nEXPECTED: {:#?}", - ep, *EXPECTED_INTERCEPTED_1 - ); assert_eq!(ep, *EXPECTED_INTERCEPTED_1); let item = channel.items().iter().nth(15).unwrap(); diff --git a/podcasts-data/src/models/source.rs b/podcasts-data/src/models/source.rs index 7a09693..a1c29fa 100644 --- a/podcasts-data/src/models/source.rs +++ b/podcasts-data/src/models/source.rs @@ -31,9 +31,6 @@ use http::header::{ USER_AGENT as USER_AGENT_HEADER, }; use http::{Request, Response, StatusCode, Uri}; -// use futures::future::ok; -use futures::future::Future; -use futures::prelude::*; use base64::{encode_config, URL_SAFE}; @@ -325,7 +322,6 @@ async fn response_to_channel( mod tests { use super::*; use failure::Error; - use num_cpus; use tokio; use crate::database::truncate_db; diff --git a/podcasts-data/src/opml.rs b/podcasts-data/src/opml.rs index fadbab8..a8a6c36 100644 --- a/podcasts-data/src/opml.rs +++ b/podcasts-data/src/opml.rs @@ -210,7 +210,6 @@ mod tests { use super::*; use chrono::Local; use failure::Error; - use futures::Future; use futures::executor::block_on; use crate::database::{truncate_db, TEMPDIR}; diff --git a/podcasts-data/src/pipeline.rs b/podcasts-data/src/pipeline.rs index 0dbe575..81d7d46 100644 --- a/podcasts-data/src/pipeline.rs +++ b/podcasts-data/src/pipeline.rs @@ -27,8 +27,6 @@ use hyper::client::HttpConnector; use hyper::{Body, Client}; use hyper_tls::HttpsConnector; -use num_cpus; - use crate::errors::DataError; use crate::Source; @@ -48,9 +46,14 @@ where { while let Some(source_result) = sources.next().await { if let Ok(source) = source_result { - if let Ok(feed) = source.into_feed(client.clone()).await { - let fut = lazy(|_| feed.index().map_err(|err| error!("Error: {}", err))); - tokio::spawn(fut); + match source.into_feed(client.clone()).await { + Ok(feed) => { + let fut = lazy(|_| feed.index().map_err(|err| error!("Error: {}", err))); + tokio::spawn(fut); + }, + // Avoid spamming the stderr when it's not an actual error + Err(DataError::FeedNotModified(_)) => (), + Err(err) => error!("Error: {}", err), }; } } @@ -100,7 +103,6 @@ mod tests { let bad_url = "https://gitlab.gnome.org/World/podcasts.atom"; // if a stream returns error/None it stops // bad we want to parse all feeds regardless if one fails - //TODO_JH: Remove comment Source::from_url(bad_url)?; URLS.iter().for_each(|url| { From 636e2aefdef19de5e8de7e5f3e18f55e20c68b24 Mon Sep 17 00:00:00 2001 From: Julian Hofer Date: Sun, 1 Mar 2020 18:55:05 +0100 Subject: [PATCH 3/5] Add tokio features and remove lazy keyword --- Cargo.lock | 17 +++++++------ podcasts-data/Cargo.toml | 5 +++- podcasts-data/src/feed.rs | 40 +++++++++++++++--------------- podcasts-data/src/models/source.rs | 25 +++++++++---------- podcasts-data/src/pipeline.rs | 10 ++++---- 5 files changed, 50 insertions(+), 47 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a586c06..c4d3dc9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1014,7 +1014,7 @@ dependencies = [ "indexmap 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-util 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1156,7 +1156,7 @@ dependencies = [ "net2 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)", "pin-project 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)", "tower-service 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "want 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1181,7 +1181,7 @@ dependencies = [ "bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.13.2 (registry+https://github.com/rust-lang/crates.io-index)", "native-tls 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-tls 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1788,7 +1788,7 @@ dependencies = [ "rfc822_sanitizer 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "rss 1.9.0 (registry+https://github.com/rust-lang/crates.io-index)", "tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)", "url 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "xdg 2.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "xml-rs 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2503,7 +2503,7 @@ dependencies = [ [[package]] name = "tokio" -version = "0.2.11" +version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2512,6 +2512,7 @@ dependencies = [ "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "memchr 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "mio 0.6.21 (registry+https://github.com/rust-lang/crates.io-index)", + "num_cpus 1.12.0 (registry+https://github.com/rust-lang/crates.io-index)", "pin-project-lite 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -2627,7 +2628,7 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "native-tls 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -2640,7 +2641,7 @@ dependencies = [ "futures-sink 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "pin-project-lite 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -3099,7 +3100,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum thread_local 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14" "checksum time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)" = "db8dcfca086c1143c9270ac42a2bbd8a7ee477b78ac8e45b19abfb0cbede4b6f" "checksum tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)" = "5a09c0b5bb588872ab2f09afa13ee6e9dac11e10a0ec9e8e3ba39a5a5d530af6" -"checksum tokio 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "8fdd17989496f49cdc57978c96f0c9fe5e4a58a8bddc6813c449a4624f6a030b" +"checksum tokio 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)" = "0fa5e81d6bc4e67fe889d5783bd2a128ab2e0cfa487e0be16b6a8d177b101616" "checksum tokio-buf 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8fb220f46c53859a4b7ec083e41dec9778ff0b1851c0942b211edb89e0ccdc46" "checksum tokio-current-thread 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "d16217cad7f1b840c5a97dfb3c43b0c871fef423a6e8d2118c604e843662a443" "checksum tokio-executor 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)" = "ca6df436c42b0c3330a82d855d2ef017cd793090ad550a6bc2184f4b933532ab" diff --git a/podcasts-data/Cargo.toml b/podcasts-data/Cargo.toml index 7063127..7e282b2 100644 --- a/podcasts-data/Cargo.toml +++ b/podcasts-data/Cargo.toml @@ -19,7 +19,6 @@ xml-rs = "0.8.0" futures = "0.3.4" hyper = "0.13.2" http = "0.2.0" -tokio = "0.2.11" hyper-tls = "0.4.1" native-tls = "0.2.3" num_cpus = "1.10.1" @@ -35,6 +34,10 @@ version = "1.4.3" features = ["sqlite"] version = "1.4.0" +[dependencies.tokio] +features = ["rt-core", "rt-threaded"] +version = "0.2.13" + [dev-dependencies] rand = "0.7.2" tempdir = "0.3.7" diff --git a/podcasts-data/src/feed.rs b/podcasts-data/src/feed.rs index d4cffc9..1920c58 100644 --- a/podcasts-data/src/feed.rs +++ b/podcasts-data/src/feed.rs @@ -45,7 +45,7 @@ pub struct Feed { impl Feed { /// Index the contents of the RSS `Feed` into the database. - pub async fn index(self) -> Result<(), DataError>{ + pub async fn index(self) -> Result<(), DataError> { let show = self.parse_podcast().to_podcast()?; self.index_channel_items(show).await } @@ -58,16 +58,14 @@ impl Feed { let stream = stream::iter(self.channel.into_items()); // Parse the episodes let episodes = stream.filter_map(move |item| { - let ret = NewEpisodeMinimal::new(&item, pd.id()) + let ret = NewEpisodeMinimal::new(&item, pd.id()) .and_then(move |ep| determine_ep_state(ep, &item)); if ret.is_ok() { future::ready(Some(ret)) - } - else { + } else { future::ready(None) - } - }); - + } + }); // Filter errors, Index updatable episodes, return insertables. filter_episodes(episodes) // Batch index insertable episodes. @@ -103,23 +101,25 @@ where S: Stream, DataError>> + Send + 'a, { stream - .try_filter_map(|state| async move { - match state { - IndexState::NotChanged => Ok(None), - // Update individual rows, and filter them - IndexState::Update((ref ep, rowid)) => { - ep.update(rowid) - .map_err(|err| error!("{}", err)) - .map_err(|_| error!("Failed to index episode: {:?}.", ep.title())) - .ok(); + .try_filter_map(|state| { + async move { + match state { + IndexState::NotChanged => Ok(None), + // Update individual rows, and filter them + IndexState::Update((ref ep, rowid)) => { + ep.update(rowid) + .map_err(|err| error!("{}", err)) + .map_err(|_| error!("Failed to index episode: {:?}.", ep.title())) + .ok(); - Ok(None) + Ok(None) + } + IndexState::Index(s) => Ok(Some(s)), } - IndexState::Index(s) => Ok(Some(s)), } }) // only Index is left, collect them for batch index - .try_collect() + .try_collect() } fn batch_insert_episodes(episodes: &[NewEpisode]) { @@ -146,9 +146,9 @@ fn batch_insert_episodes(episodes: &[NewEpisode]) { #[cfg(test)] mod tests { use failure::Error; + use futures::executor::block_on; use rss::Channel; use tokio; - use futures::executor::block_on; use crate::database::truncate_db; use crate::dbqueries; diff --git a/podcasts-data/src/models/source.rs b/podcasts-data/src/models/source.rs index a1c29fa..63d9ed4 100644 --- a/podcasts-data/src/models/source.rs +++ b/podcasts-data/src/models/source.rs @@ -159,7 +159,7 @@ impl Source { let code = res.status(); if code.is_success() { - // If request is succesful save the etag + // If request is successful save the etag self = self.update_etag(&res)? } else { match code.as_u16() { @@ -189,7 +189,7 @@ impl Source { return Err(DataError::FeedRedirect(self)); } 401 => return Err(self.make_err("401: Unauthorized.", code)), - 403 => return Err(self.make_err("403: Forbidden.", code)), + 403 => return Err(self.make_err("403: Forbidden.", code)), 404 => return Err(self.make_err("404: Not found.", code)), 408 => return Err(self.make_err("408: Request Timeout.", code)), 410 => return Err(self.make_err("410: Feed was deleted..", code)), @@ -240,23 +240,24 @@ impl Source { self, client: Client>, ) -> Result { - let id = self.id(); let resp = self.get_response(&client).await?; let chan = response_to_channel(resp).await?; - + FeedBuilder::default() .channel(chan) .source_id(id) .build() - .map_err(From::from) + .map_err(From::from) } - async fn get_response(self, client: &Client>) -> Result, DataError> { + async fn get_response( + self, + client: &Client>, + ) -> Result, DataError> { let mut source = self; - loop - { + loop { match source.request_constructor(&client.clone()).await { Ok(response) => return Ok(response), Err(err) => match err { @@ -267,7 +268,7 @@ impl Source { e => return Err(e), }, } - }; + } } async fn request_constructor( @@ -305,14 +306,12 @@ impl Source { } let res = client.request(req).await?; - //.map_err(From::from) + //.map_err(From::from) self.match_status(res) } } -async fn response_to_channel( - res: Response, -) -> Result { +async fn response_to_channel(res: Response) -> Result { let chunk = hyper::body::to_bytes(res.into_body()).await?; let buf = String::from_utf8_lossy(&chunk).into_owned(); Channel::from_str(&buf).map_err(From::from) diff --git a/podcasts-data/src/pipeline.rs b/podcasts-data/src/pipeline.rs index 81d7d46..9129ea3 100644 --- a/podcasts-data/src/pipeline.rs +++ b/podcasts-data/src/pipeline.rs @@ -20,7 +20,7 @@ // FIXME: //! Docs. -use futures::{future::ok, future::lazy, prelude::*, stream::FuturesUnordered}; +use futures::{future::ok, prelude::*, stream::FuturesUnordered}; use tokio; use hyper::client::HttpConnector; @@ -42,18 +42,18 @@ type HttpsClient = Client>; /// Convert `rss::Channel` into `Feed` -> Index Podcast -> Index Episodes. pub async fn pipeline<'a, S>(mut sources: S, client: HttpsClient) where - S: Stream> + Send + 'a + std::marker::Unpin + S: Stream> + Send + 'a + std::marker::Unpin, { while let Some(source_result) = sources.next().await { if let Ok(source) = source_result { match source.into_feed(client.clone()).await { Ok(feed) => { - let fut = lazy(|_| feed.index().map_err(|err| error!("Error: {}", err))); + let fut = feed.index().map_err(|err| error!("Error: {}", err)); tokio::spawn(fut); - }, + } // Avoid spamming the stderr when it's not an actual error Err(DataError::FeedNotModified(_)) => (), - Err(err) => error!("Error: {}", err), + Err(err) => error!("Error: {}", err), }; } } From 429356a2172d0c3ff31f11c0e551537da19ae488 Mon Sep 17 00:00:00 2001 From: Julian Hofer Date: Mon, 2 Mar 2020 16:12:28 +0100 Subject: [PATCH 4/5] Use tokio main-macro --- Cargo.lock | 12 ++++++++++++ podcasts-data/Cargo.toml | 2 +- podcasts-data/src/pipeline.rs | 5 ++--- 3 files changed, 15 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c4d3dc9..646fa62 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2515,6 +2515,7 @@ dependencies = [ "num_cpus 1.12.0 (registry+https://github.com/rust-lang/crates.io-index)", "pin-project-lite 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-macros 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -2555,6 +2556,16 @@ dependencies = [ "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "tokio-macros" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "proc-macro2 1.0.8 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 1.0.14 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "tokio-reactor" version = "0.1.11" @@ -3105,6 +3116,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum tokio-current-thread 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "d16217cad7f1b840c5a97dfb3c43b0c871fef423a6e8d2118c604e843662a443" "checksum tokio-executor 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)" = "ca6df436c42b0c3330a82d855d2ef017cd793090ad550a6bc2184f4b933532ab" "checksum tokio-io 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)" = "5090db468dad16e1a7a54c8c67280c5e4b544f3d3e018f0b913b400261f85926" +"checksum tokio-macros 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "f0c3acc6aa564495a0f2e1d59fab677cd7f81a19994cfc7f3ad0e64301560389" "checksum tokio-reactor 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)" = "6732fe6b53c8d11178dcb77ac6d9682af27fc6d4cb87789449152e5377377146" "checksum tokio-sync 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "d06554cce1ae4a50f42fba8023918afa931413aded705b560e29600ccf7c6d76" "checksum tokio-tcp 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "1d14b10654be682ac43efee27401d792507e30fd8d26389e1da3b185de2e4119" diff --git a/podcasts-data/Cargo.toml b/podcasts-data/Cargo.toml index 7e282b2..252896d 100644 --- a/podcasts-data/Cargo.toml +++ b/podcasts-data/Cargo.toml @@ -35,7 +35,7 @@ features = ["sqlite"] version = "1.4.0" [dependencies.tokio] -features = ["rt-core", "rt-threaded"] +features = ["rt-core", "rt-threaded", "macros"] version = "0.2.13" [dev-dependencies] diff --git a/podcasts-data/src/pipeline.rs b/podcasts-data/src/pipeline.rs index 9129ea3..a6d853d 100644 --- a/podcasts-data/src/pipeline.rs +++ b/podcasts-data/src/pipeline.rs @@ -40,6 +40,7 @@ type HttpsClient = Client>; /// Messy temp diagram: /// Source -> GET Request -> Update Etags -> Check Status -> Parse `xml/Rss` -> /// Convert `rss::Channel` into `Feed` -> Index Podcast -> Index Episodes. +#[tokio::main] pub async fn pipeline<'a, S>(mut sources: S, client: HttpsClient) where S: Stream> + Send + 'a + std::marker::Unpin, @@ -70,9 +71,7 @@ where let foo = sources.into_iter().map(ok::<_, _>); let stream = FuturesUnordered::from_iter(foo); - let p = pipeline(stream, client); - let mut rt = tokio::runtime::Runtime::new()?; - rt.block_on(p); + pipeline(stream, client); Ok(()) } From f9d577f596b3affabaa62fdbbf7d0175473be385 Mon Sep 17 00:00:00 2001 From: Julian Hofer Date: Sat, 14 Mar 2020 22:48:47 +0100 Subject: [PATCH 5/5] Convert more functions to "async fn" --- podcasts-data/src/feed.rs | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/podcasts-data/src/feed.rs b/podcasts-data/src/feed.rs index 1920c58..1bf0d31 100644 --- a/podcasts-data/src/feed.rs +++ b/podcasts-data/src/feed.rs @@ -21,7 +21,6 @@ #![allow(clippy::unit_arg)] //! Index Feeds. -use futures::future::*; use futures::prelude::*; use futures::stream; use rss; @@ -54,7 +53,7 @@ impl Feed { NewShow::new(&self.channel, self.source_id) } - fn index_channel_items(self, pd: Show) -> impl Future> + Send { + async fn index_channel_items(self, pd: Show) -> Result<(), DataError> { let stream = stream::iter(self.channel.into_items()); // Parse the episodes let episodes = stream.filter_map(move |item| { @@ -67,9 +66,9 @@ impl Feed { } }); // Filter errors, Index updatable episodes, return insertables. - filter_episodes(episodes) - // Batch index insertable episodes. - .and_then(|eps| ok(batch_insert_episodes(&eps))) + let insertable_episodes = filter_episodes(episodes).await?; + batch_insert_episodes(&insertable_episodes); + Ok(()) } } @@ -94,32 +93,31 @@ fn determine_ep_state( } } -fn filter_episodes<'a, S>( - stream: S, -) -> impl Future, DataError>> + Send + 'a +async fn filter_episodes<'a, S>(stream: S) -> Result, DataError> where - S: Stream, DataError>> + Send + 'a, + S: Stream, DataError>>, { stream .try_filter_map(|state| { - async move { - match state { - IndexState::NotChanged => Ok(None), + async { + let result = match state { + IndexState::NotChanged => None, // Update individual rows, and filter them IndexState::Update((ref ep, rowid)) => { ep.update(rowid) .map_err(|err| error!("{}", err)) .map_err(|_| error!("Failed to index episode: {:?}.", ep.title())) .ok(); - - Ok(None) + None } - IndexState::Index(s) => Ok(Some(s)), - } + IndexState::Index(s) => Some(s), + }; + Ok(result) } }) // only Index is left, collect them for batch index .try_collect() + .await } fn batch_insert_episodes(episodes: &[NewEpisode]) {