Merge branch 'alatiera/rever-futures-upgrade' into 'master'

Revert 096197cf81

See merge request World/podcasts!144
This commit is contained in:
Jordan Petridis 2020-06-21 15:06:57 +00:00
commit 355cf9a36c
7 changed files with 1308 additions and 1459 deletions

2531
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -16,11 +16,12 @@ rss = "1.9.0"
url = "2.1.1"
xdg = "2.2.0"
xml-rs = "0.8.3"
futures = "0.3.5"
hyper = "0.13.6"
http = "0.2.1"
hyper-tls = "0.4.1"
native-tls = "0.2.4"
futures = "0.1.29"
hyper = "0.12.35"
http = "0.1.19"
tokio = "0.1.22"
hyper-tls = "0.3.2"
native-tls = "0.2.3"
num_cpus = "1.13.0"
failure = "0.1.8"
failure_derive = "0.1.8"
@ -34,10 +35,6 @@ version = "1.4.5"
features = ["sqlite"]
version = "1.4.0"
[dependencies.tokio]
features = ["rt-core", "rt-threaded", "macros"]
version = "0.2.21"
[dev-dependencies]
rand = "0.7.2"
tempdir = "0.3.7"

View File

@ -21,6 +21,7 @@
#![allow(clippy::unit_arg)]
//! Index Feeds.
use futures::future::*;
use futures::prelude::*;
use futures::stream;
use rss;
@ -44,31 +45,31 @@ pub struct Feed {
impl Feed {
/// Index the contents of the RSS `Feed` into the database.
pub async fn index(self) -> Result<(), DataError> {
let show = self.parse_podcast().to_podcast()?;
self.index_channel_items(show).await
pub fn index(self) -> impl Future<Item = (), Error = DataError> + Send {
ok(self.parse_podcast())
.and_then(|pd| pd.to_podcast())
.and_then(move |pd| self.index_channel_items(pd))
}
fn parse_podcast(&self) -> NewShow {
NewShow::new(&self.channel, self.source_id)
}
async fn index_channel_items(self, pd: Show) -> Result<(), DataError> {
let stream = stream::iter(self.channel.into_items());
fn index_channel_items(self, pd: Show) -> impl Future<Item = (), Error = DataError> + Send {
let stream = stream::iter_ok::<_, DataError>(self.channel.into_items());
// Parse the episodes
let episodes = stream.filter_map(move |item| {
let ret = NewEpisodeMinimal::new(&item, pd.id())
.and_then(move |ep| determine_ep_state(ep, &item));
if ret.is_ok() {
future::ready(Some(ret))
} else {
future::ready(None)
}
NewEpisodeMinimal::new(&item, pd.id())
.and_then(move |ep| determine_ep_state(ep, &item))
.map_err(|err| error!("Failed to parse an episode: {}", err))
.ok()
});
// Filter errors, Index updatable episodes, return insertables.
let insertable_episodes = filter_episodes(episodes).await?;
batch_insert_episodes(&insertable_episodes);
Ok(())
filter_episodes(episodes)
// Batch index insertable episodes.
.and_then(|eps| ok(batch_insert_episodes(&eps)))
}
}
@ -93,31 +94,28 @@ fn determine_ep_state(
}
}
async fn filter_episodes<'a, S>(stream: S) -> Result<Vec<NewEpisode>, DataError>
fn filter_episodes<'a, S>(
stream: S,
) -> impl Future<Item = Vec<NewEpisode>, Error = DataError> + Send + 'a
where
S: Stream<Item = Result<IndexState<NewEpisode>, DataError>>,
S: Stream<Item = IndexState<NewEpisode>, Error = DataError> + Send + 'a,
{
stream
.try_filter_map(|state| {
async {
let result = match state {
IndexState::NotChanged => None,
// Update individual rows, and filter them
IndexState::Update((ref ep, rowid)) => {
ep.update(rowid)
.map_err(|err| error!("{}", err))
.map_err(|_| error!("Failed to index episode: {:?}.", ep.title()))
.ok();
None
}
IndexState::Index(s) => Some(s),
};
Ok(result)
.filter_map(|state| match state {
IndexState::NotChanged => None,
// Update individual rows, and filter them
IndexState::Update((ref ep, rowid)) => {
ep.update(rowid)
.map_err(|err| error!("{}", err))
.map_err(|_| error!("Failed to index episode: {:?}.", ep.title()))
.ok();
None
}
IndexState::Index(s) => Some(s),
})
// only Index is left, collect them for batch index
.try_collect()
.await
.collect()
}
fn batch_insert_episodes(episodes: &[NewEpisode]) {
@ -144,9 +142,8 @@ fn batch_insert_episodes(episodes: &[NewEpisode]) {
#[cfg(test)]
mod tests {
use failure::Error;
use futures::executor::block_on;
use rss::Channel;
use tokio;
use tokio::{self, prelude::*};
use crate::database::truncate_db;
use crate::dbqueries;
@ -202,9 +199,8 @@ mod tests {
.collect();
// Index the channes
let stream_ = stream::iter(feeds).for_each(|x| x.index().map(|x| x.unwrap()));
let mut rt = tokio::runtime::Runtime::new()?;
rt.block_on(stream_);
let stream_ = stream::iter_ok(feeds).for_each(|x| x.index());
tokio::run(stream_.map_err(|_| ()));
// Assert the index rows equal the controlled results
assert_eq!(dbqueries::get_sources()?.len(), 5);
@ -236,7 +232,7 @@ mod tests {
let feed = get_feed(path, 42);
let pd = feed.parse_podcast().to_podcast()?;
block_on(feed.index_channel_items(pd))?;
feed.index_channel_items(pd).wait()?;
assert_eq!(dbqueries::get_podcasts()?.len(), 1);
assert_eq!(dbqueries::get_episodes()?.len(), 43);
Ok(())

View File

@ -578,6 +578,10 @@ mod tests {
let ep = EXPECTED_MINIMAL_INTERCEPTED_1
.clone()
.into_new_episode(&item);
println!(
"EPISODE: {:#?}\nEXPECTED: {:#?}",
ep, *EXPECTED_INTERCEPTED_1
);
assert_eq!(ep, *EXPECTED_INTERCEPTED_1);
let item = channel.items().iter().nth(15).unwrap();

View File

@ -31,6 +31,9 @@ use http::header::{
USER_AGENT as USER_AGENT_HEADER,
};
use http::{Request, Response, StatusCode, Uri};
// use futures::future::ok;
use futures::future::{loop_fn, Future, Loop};
use futures::prelude::*;
use base64::{encode_config, URL_SAFE};
@ -159,7 +162,7 @@ impl Source {
let code = res.status();
if code.is_success() {
// If request is successful save the etag
// If request is succesful save the etag
self = self.update_etag(&res)?
} else {
match code.as_u16() {
@ -189,7 +192,7 @@ impl Source {
return Err(DataError::FeedRedirect(self));
}
401 => return Err(self.make_err("401: Unauthorized.", code)),
403 => return Err(self.make_err("403: Forbidden.", code)),
403 => return Err(self.make_err("403: Forbidden.", code)),
404 => return Err(self.make_err("404: Not found.", code)),
408 => return Err(self.make_err("408: Request Timeout.", code)),
410 => return Err(self.make_err("410: Feed was deleted..", code)),
@ -236,45 +239,41 @@ impl Source {
///
/// Consumes `self` and Returns the corresponding `Feed` Object.
// Refactor into TryInto once it lands on stable.
pub async fn into_feed(
pub fn into_feed(
self,
client: Client<HttpsConnector<HttpConnector>>,
) -> Result<Feed, DataError> {
) -> impl Future<Item = Feed, Error = DataError> {
let id = self.id();
let response = loop_fn(self, move |source| {
source
.request_constructor(&client.clone())
.then(|res| match res {
Ok(response) => Ok(Loop::Break(response)),
Err(err) => match err {
DataError::FeedRedirect(s) => {
info!("Following redirect...");
Ok(Loop::Continue(s))
}
e => Err(e),
},
})
});
let resp = self.get_response(&client).await?;
let chan = response_to_channel(resp).await?;
FeedBuilder::default()
.channel(chan)
.source_id(id)
.build()
.map_err(From::from)
response
.and_then(response_to_channel)
.and_then(move |chan| {
FeedBuilder::default()
.channel(chan)
.source_id(id)
.build()
.map_err(From::from)
})
}
async fn get_response(
fn request_constructor(
self,
client: &Client<HttpsConnector<HttpConnector>>,
) -> Result<Response<Body>, DataError> {
let mut source = self;
loop {
match source.request_constructor(&client.clone()).await {
Ok(response) => return Ok(response),
Err(err) => match err {
DataError::FeedRedirect(s) => {
info!("Following redirect...");
source = s;
}
e => return Err(e),
},
}
}
}
async fn request_constructor(
self,
client: &Client<HttpsConnector<HttpConnector>>,
) -> Result<Response<Body>, DataError> {
) -> impl Future<Item = Response<Body>, Error = DataError> {
// FIXME: remove unwrap somehow
let uri = Uri::from_str(self.uri()).unwrap();
let mut req = Request::get(uri).body(Body::empty()).unwrap();
@ -305,22 +304,30 @@ impl Source {
.insert(IF_MODIFIED_SINCE, HeaderValue::from_str(lmod).unwrap());
}
let res = client.request(req).await?;
//.map_err(From::from)
self.match_status(res)
client
.request(req)
.map_err(From::from)
.and_then(move |res| self.match_status(res))
}
}
async fn response_to_channel(res: Response<Body>) -> Result<Channel, DataError> {
let chunk = hyper::body::to_bytes(res.into_body()).await?;
let buf = String::from_utf8_lossy(&chunk).into_owned();
Channel::from_str(&buf).map_err(From::from)
fn response_to_channel(
res: Response<Body>,
) -> impl Future<Item = Channel, Error = DataError> + Send {
res.into_body()
.concat2()
.map(|x| x.into_iter())
.map_err(From::from)
.map(|iter| iter.collect::<Vec<u8>>())
.map(|utf_8_bytes| String::from_utf8_lossy(&utf_8_bytes).into_owned())
.and_then(|buf| Channel::from_str(&buf).map_err(From::from))
}
#[cfg(test)]
mod tests {
use super::*;
use failure::Error;
use num_cpus;
use tokio;
use crate::database::truncate_db;
@ -331,7 +338,7 @@ mod tests {
truncate_db()?;
let mut rt = tokio::runtime::Runtime::new()?;
let https = HttpsConnector::new();
let https = HttpsConnector::new(num_cpus::get())?;
let client = Client::builder().build::<_, Body>(https);
let url = "https://web.archive.org/web/20180120083840if_/https://feeds.feedburner.\

View File

@ -210,7 +210,7 @@ mod tests {
use super::*;
use chrono::Local;
use failure::Error;
use futures::executor::block_on;
use futures::Future;
use crate::database::{truncate_db, TEMPDIR};
use crate::utils::get_feed;
@ -318,7 +318,7 @@ mod tests {
// Create and insert a Source into db
let s = Source::from_url(url).unwrap();
let feed = get_feed(path, s.id());
block_on(feed.index()).unwrap();
feed.index().wait().unwrap();
});
let mut map: HashSet<Opml> = HashSet::new();

View File

@ -20,13 +20,15 @@
// FIXME:
//! Docs.
use futures::{future::ok, prelude::*, stream::FuturesUnordered};
use futures::{future::ok, lazy, prelude::*, stream::FuturesUnordered};
use tokio;
use hyper::client::HttpConnector;
use hyper::{Body, Client};
use hyper_tls::HttpsConnector;
use num_cpus;
use crate::errors::DataError;
use crate::Source;
@ -40,24 +42,29 @@ type HttpsClient = Client<HttpsConnector<HttpConnector>>;
/// Messy temp diagram:
/// Source -> GET Request -> Update Etags -> Check Status -> Parse `xml/Rss` ->
/// Convert `rss::Channel` into `Feed` -> Index Podcast -> Index Episodes.
#[tokio::main]
pub async fn pipeline<'a, S>(mut sources: S, client: HttpsClient)
pub fn pipeline<'a, S>(sources: S, client: HttpsClient) -> impl Future<Item = (), Error = ()> + 'a
where
S: Stream<Item = Result<Source, DataError>> + Send + 'a + std::marker::Unpin,
S: Stream<Item = Source, Error = DataError> + Send + 'a,
{
while let Some(source_result) = sources.next().await {
if let Ok(source) = source_result {
match source.into_feed(client.clone()).await {
Ok(feed) => {
let fut = feed.index().map_err(|err| error!("Error: {}", err));
tokio::spawn(fut);
}
// Avoid spamming the stderr when it's not an actual error
Err(DataError::FeedNotModified(_)) => (),
Err(err) => error!("Error: {}", err),
};
}
}
sources
.and_then(move |s| s.into_feed(client.clone()))
.map_err(|err| {
match err {
// Avoid spamming the stderr when its not an eactual error
DataError::FeedNotModified(_) => (),
_ => error!("Error: {}", err),
}
})
.and_then(move |feed| {
let fut = lazy(|| feed.index().map_err(|err| error!("Error: {}", err)));
tokio::spawn(fut);
Ok(())
})
// For each terminates the stream at the first error so we make sure
// we pass good values regardless
.then(move |_| ok(()))
// Convert the stream into a Future to later execute as a tokio task
.for_each(move |_| ok(()))
}
/// Creates a tokio `reactor::Core`, and a `hyper::Client` and
@ -66,12 +73,13 @@ pub fn run<S>(sources: S) -> Result<(), DataError>
where
S: IntoIterator<Item = Source>,
{
let https = HttpsConnector::new();
let https = HttpsConnector::new(num_cpus::get())?;
let client = Client::builder().build::<_, Body>(https);
let foo = sources.into_iter().map(ok::<_, _>);
let stream = FuturesUnordered::from_iter(foo);
pipeline(stream, client);
let p = pipeline(stream, client);
tokio::run(p);
Ok(())
}
@ -113,7 +121,7 @@ mod tests {
run(sources)?;
let sources = dbqueries::get_sources()?;
// Run again to cover Unique constrains errors.
// Run again to cover Unique constrains erros.
run(sources)?;
// Assert the index rows equal the controlled results