Minor cleanup of the mess.

This commit is contained in:
Jordan Petridis 2018-01-14 08:27:50 +02:00
parent e63a366fdc
commit 2f7a22355f
No known key found for this signature in database
GPG Key ID: CEABAD9F5683B9A6
5 changed files with 75 additions and 60 deletions

View File

@ -19,7 +19,8 @@
unused_parens, while_true)]
#![deny(missing_debug_implementations, missing_docs, trivial_casts, trivial_numeric_casts)]
// FIXME: uncomment
// unused_extern_crates, unused)]
// #![deny(unused_extern_crates, unused)]
// #![feature(conservative_impl_trait)]
#[macro_use]
@ -59,15 +60,15 @@ extern crate xdg;
#[allow(missing_docs)]
pub mod dbqueries;
pub mod utils;
pub mod feed;
#[allow(missing_docs)]
pub mod errors;
pub mod utils;
pub mod feed;
pub mod database;
pub mod pipeline;
pub(crate) mod models;
mod parser;
mod schema;
pub mod pipeline;
pub use models::queryables::{Episode, EpisodeWidgetQuery, Podcast, PodcastCoverQuery, Source};

View File

@ -39,7 +39,7 @@ impl Insert for NewSource {
}
impl NewSource {
pub(crate) fn new_with_uri(uri: &str) -> NewSource {
pub(crate) fn new(uri: &str) -> NewSource {
NewSource {
uri: uri.trim().to_string(),
last_modified: None,

View File

@ -4,16 +4,12 @@ use diesel;
use reqwest;
use diesel::SaveChangesDsl;
use reqwest::header::{ETag, LastModified};
use rss::Channel;
use hyper;
use hyper::Client;
use hyper::client::HttpConnector;
use hyper::Method;
use hyper::Uri;
use hyper::{Client, Method, Request, Response, StatusCode, Uri};
use hyper::header::{ETag, EntityTag, HttpDate, IfModifiedSince, IfNoneMatch, LastModified};
use hyper_tls::HttpsConnector;
// use hyper::header::{ETag, LastModified};
use futures::prelude::*;
// use futures::future::ok;
@ -639,10 +635,10 @@ impl Source {
Ok(self.save_changes::<Source>(&*tempdb)?)
}
/// Extract Etag and LastModifier from req, and update self and the
/// Extract Etag and LastModifier from res, and update self and the
/// corresponding db row.
fn update_etag(&mut self, req: &reqwest::Response) -> Result<()> {
let headers = req.headers();
fn update_etag(&mut self, res: &reqwest::Response) -> Result<()> {
let headers = res.headers();
let etag = headers.get::<ETag>();
let lmod = headers.get::<LastModified>();
@ -658,9 +654,10 @@ impl Source {
Ok(())
}
/// Docs
pub fn update_etag2(mut self, req: &hyper::Response) -> Result<()> {
let headers = req.headers();
/// Extract Etag and LastModifier from res, and update self and the
/// corresponding db row.
fn update_etag2(mut self, res: &Response) -> Result<()> {
let headers = res.headers();
let etag = headers.get::<ETag>();
let lmod = headers.get::<LastModified>();
@ -686,7 +683,6 @@ impl Source {
// TODO: Refactor into TryInto once it lands on stable.
pub fn into_feed(&mut self, ignore_etags: bool) -> Result<Feed> {
use reqwest::header::{EntityTag, Headers, HttpDate, IfModifiedSince, IfNoneMatch};
use reqwest::StatusCode;
let mut headers = Headers::new();
@ -705,44 +701,20 @@ impl Source {
}
let client = reqwest::Client::builder().referer(false).build()?;
let mut req = client.get(self.uri()).headers(headers).send()?;
let mut res = client.get(self.uri()).headers(headers).send()?;
info!("GET to {} , returned: {}", self.uri(), req.status());
info!("GET to {} , returned: {}", self.uri(), res.status());
self.update_etag(&req)?;
// TODO match on more stuff
// 301: Moved Permanently
// 304: Up to date Feed, checked with the Etag
// 307: Temporary redirect of the url
// 308: Permanent redirect of the url
// 401: Unathorized
// 403: Forbidden
// 408: Timeout
// 410: Feed deleted
match req.status() {
StatusCode::NotModified => bail!("304: skipping.."),
StatusCode::TemporaryRedirect => debug!("307: Temporary Redirect."),
// TODO: Change the source uri to the new one
StatusCode::MovedPermanently | StatusCode::PermanentRedirect => {
warn!("Feed was moved permanently.")
}
StatusCode::Unauthorized => bail!("401: Unauthorized."),
StatusCode::Forbidden => bail!("403: Forbidden."),
StatusCode::NotFound => bail!("404: Not found."),
StatusCode::RequestTimeout => bail!("408: Request Timeout."),
StatusCode::Gone => bail!("410: Feed was deleted."),
_ => (),
};
self.update_etag(&res)?;
match_status(res.status())?;
let mut buf = String::new();
req.read_to_string(&mut buf)?;
res.read_to_string(&mut buf)?;
let chan = Channel::from_str(&buf)?;
Ok(Feed::from_channel_source(chan, self.id))
}
#[allow(dead_code)]
/// Docs
pub fn into_fututre_feed(
self,
@ -750,6 +722,8 @@ impl Source {
ignore_etags: bool,
) -> Box<Future<Item = Feed, Error = Error>> {
let id = self.id();
// TODO: make URI future
// TODO: make a status match future
let feed = request_constructor(&self, client, ignore_etags)
.map(move |res| {
if let Err(err) = self.update_etag2(&res) {
@ -766,21 +740,23 @@ impl Source {
}
/// Construct a new `Source` with the given `uri` and index it.
///
/// This only indexes the `Source` struct, not the Podcast Feed.
pub fn from_url(uri: &str) -> Result<Source> {
NewSource::new_with_uri(uri).into_source()
NewSource::new(uri).into_source()
}
}
// TODO: make ignore_etags an Enum for better ergonomics.
// #bools_are_just_2variant_enmus
fn request_constructor(
s: &Source,
client: &Client<HttpsConnector<HttpConnector>>,
ignore_etags: bool,
) -> Box<Future<Item = hyper::Response, Error = Error>> {
use hyper::header::{EntityTag, HttpDate, IfModifiedSince, IfNoneMatch};
) -> Box<Future<Item = Response, Error = Error>> {
// FIXME: remove unwrap
let uri = Uri::from_str(&s.uri()).unwrap();
let mut req = hyper::Request::new(Method::Get, uri);
let mut req = Request::new(Method::Get, uri);
if !ignore_etags {
if let Some(foo) = s.http_etag() {
@ -800,7 +776,7 @@ fn request_constructor(
Box::new(work)
}
fn response_to_channel(res: hyper::Response) -> Box<Future<Item = Channel, Error = Error>> {
fn response_to_channel(res: Response) -> Box<Future<Item = Channel, Error = Error>> {
let chan = res.body()
.concat2()
.map(|x| x.into_iter())
@ -814,6 +790,33 @@ fn response_to_channel(res: hyper::Response) -> Box<Future<Item = Channel, Error
Box::new(chan)
}
// TODO match on more stuff
// 301: Moved Permanently
// 304: Up to date Feed, checked with the Etag
// 307: Temporary redirect of the url
// 308: Permanent redirect of the url
// 401: Unathorized
// 403: Forbidden
// 408: Timeout
// 410: Feed deleted
fn match_status(code: StatusCode) -> Result<()> {
match code {
StatusCode::NotModified => bail!("304: skipping.."),
StatusCode::TemporaryRedirect => debug!("307: Temporary Redirect."),
// TODO: Change the source uri to the new one
StatusCode::MovedPermanently | StatusCode::PermanentRedirect => {
warn!("Feed was moved permanently.")
}
StatusCode::Unauthorized => bail!("401: Unauthorized."),
StatusCode::Forbidden => bail!("403: Forbidden."),
StatusCode::NotFound => bail!("404: Not found."),
StatusCode::RequestTimeout => bail!("408: Request Timeout."),
StatusCode::Gone => bail!("410: Feed was deleted."),
_ => (),
};
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -1,13 +1,13 @@
//! Docs.
use futures::future::*;
use errors::Error;
use Source;
use tokio_core::reactor::Core;
use hyper::Client;
use hyper_tls::HttpsConnector;
// use futures::future::*;
use futures::prelude::*;
use futures::future::*;
// Weird magic from #rust irc channel
// kudos to remexre
@ -42,8 +42,13 @@ mod dirtyhack {
use super::*;
use errors::*;
/// Docs
pub fn pipeline<S: IntoIterator<Item = Source>>(sources: S) -> Result<()> {
/// The pipline to be run for indexing and updating a Podcast feed that originates from
/// `Source.uri`.
///
/// Messy temp diagram:
/// Source -> GET Request -> Update Etags -> Check Status -> Parse xml/Rss ->
/// Convert rss::Channel into Feed -> Index Podcast -> Index Episodes.
pub fn pipeline<S: IntoIterator<Item = Source>>(sources: S, ignore_etags: bool) -> Result<()> {
let mut core = Core::new()?;
let handle = core.handle();
let client = Client::configure()
@ -53,7 +58,9 @@ mod dirtyhack {
let list = sources
.into_iter()
.map(|s| s.into_fututre_feed(&client, false).map(|feed| feed.index()))
// FIXME: Make proper indexing futures instead of wrapping up existing
// blocking functions
.map(|s| s.into_fututre_feed(&client, ignore_etags).map(|feed| feed.index()))
.collect();
let f = core.run(collect_futures(list))?;

View File

@ -19,18 +19,22 @@ use app::Action;
/// If `source` is None, Fetches all the `Source` entries in the database and updates them.
/// When It's done,it queues up a `RefreshViews` action.
pub fn refresh_feed(headerbar: Arc<Header>, source: Option<Vec<Source>>, sender: Sender<Action>) {
// TODO: make it an application channel action.
// I missed it before apparently.
headerbar.show_update_notification();
thread::spawn(move || {
// FIXME: This is messy at best.
if let Some(s) = source {
// feed::index_loop(s);
if let Err(err) = pipeline::pipeline(s) {
// TODO: determine if it needs to ignore_etags.
if let Err(err) = pipeline::pipeline(s, true) {
error!("Error While trying to update the database.");
error!("Error msg: {}", err);
}
} else {
let sources = dbqueries::get_sources().unwrap();
if let Err(err) = pipeline::pipeline(sources) {
if let Err(err) = pipeline::pipeline(sources, false) {
error!("Error While trying to update the database.");
error!("Error msg: {}", err);
}