Switch to using DataError instead of failure::Error.
This commit is contained in:
parent
435ce05ac7
commit
8ba9f928d6
@ -8,11 +8,6 @@ use url;
|
||||
|
||||
use std::io;
|
||||
|
||||
#[allow(dead_code)]
|
||||
#[derive(Fail, Debug)]
|
||||
#[fail(display = "IO Error: {}", _0)]
|
||||
struct IOError(io::Error);
|
||||
|
||||
// fadsadfs NOT SYNC
|
||||
// #[derive(Fail, Debug)]
|
||||
// #[fail(display = "RSS Error: {}", _0)]
|
||||
@ -34,6 +29,10 @@ pub enum DataError {
|
||||
UrlError(#[cause] url::ParseError),
|
||||
#[fail(display = "TLS Error: {}", _0)]
|
||||
TLSError(#[cause] native_tls::Error),
|
||||
#[fail(display = "IO Error: {}", _0)]
|
||||
IOError(io::Error),
|
||||
#[fail(display = "WANNABE BAIL ERROR: {}", _0)]
|
||||
DiscountBail(String),
|
||||
}
|
||||
|
||||
impl From<RunMigrationsError> for DataError {
|
||||
@ -77,3 +76,9 @@ impl From<native_tls::Error> for DataError {
|
||||
DataError::TLSError(err)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<io::Error> for DataError {
|
||||
fn from(err: io::Error) -> Self {
|
||||
DataError::IOError(err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,11 +1,11 @@
|
||||
//! Index Feeds.
|
||||
|
||||
use failure::Error;
|
||||
use futures::future::*;
|
||||
use itertools::{Either, Itertools};
|
||||
use rss;
|
||||
|
||||
use dbqueries;
|
||||
use errors::DataError;
|
||||
use models::{Index, IndexState, Update};
|
||||
use models::{NewEpisode, NewPodcast, Podcast};
|
||||
use pipeline::*;
|
||||
@ -26,7 +26,7 @@ pub struct Feed {
|
||||
|
||||
impl Feed {
|
||||
/// Index the contents of the RSS `Feed` into the database.
|
||||
pub fn index(self) -> Box<Future<Item = (), Error = Error> + Send> {
|
||||
pub fn index(self) -> Box<Future<Item = (), Error = DataError> + Send> {
|
||||
let fut = self.parse_podcast_async()
|
||||
.and_then(|pd| pd.to_podcast())
|
||||
.and_then(move |pd| self.index_channel_items(&pd));
|
||||
@ -38,11 +38,14 @@ impl Feed {
|
||||
NewPodcast::new(&self.channel, self.source_id)
|
||||
}
|
||||
|
||||
fn parse_podcast_async(&self) -> Box<Future<Item = NewPodcast, Error = Error> + Send> {
|
||||
fn parse_podcast_async(&self) -> Box<Future<Item = NewPodcast, Error = DataError> + Send> {
|
||||
Box::new(ok(self.parse_podcast()))
|
||||
}
|
||||
|
||||
fn index_channel_items(&self, pd: &Podcast) -> Box<Future<Item = (), Error = Error> + Send> {
|
||||
fn index_channel_items(
|
||||
&self,
|
||||
pd: &Podcast,
|
||||
) -> Box<Future<Item = (), Error = DataError> + Send> {
|
||||
let fut = self.get_stuff(pd)
|
||||
.and_then(|(insert, update)| {
|
||||
if !insert.is_empty() {
|
||||
@ -79,7 +82,10 @@ impl Feed {
|
||||
Box::new(fut)
|
||||
}
|
||||
|
||||
fn get_stuff(&self, pd: &Podcast) -> Box<Future<Item = InsertUpdate, Error = Error> + Send> {
|
||||
fn get_stuff(
|
||||
&self,
|
||||
pd: &Podcast,
|
||||
) -> Box<Future<Item = InsertUpdate, Error = DataError> + Send> {
|
||||
let (insert, update): (Vec<_>, Vec<_>) = self.channel
|
||||
.items()
|
||||
.into_iter()
|
||||
@ -90,7 +96,7 @@ impl Feed {
|
||||
// I am not sure what the optimizations are on match vs allocating None.
|
||||
.map(|fut| {
|
||||
fut.and_then(|x| match x {
|
||||
IndexState::NotChanged => bail!("Nothing to do here."),
|
||||
IndexState::NotChanged => return Err(DataError::DiscountBail(format!("Nothing to do here."))),
|
||||
_ => Ok(x),
|
||||
})
|
||||
})
|
||||
|
||||
@ -27,7 +27,7 @@ extern crate derive_builder;
|
||||
extern crate diesel;
|
||||
#[macro_use]
|
||||
extern crate diesel_migrations;
|
||||
#[macro_use]
|
||||
// #[macro_use]
|
||||
extern crate failure;
|
||||
#[macro_use]
|
||||
extern crate failure_derive;
|
||||
|
||||
@ -2,9 +2,9 @@ use chrono::prelude::*;
|
||||
use diesel;
|
||||
use diesel::SaveChangesDsl;
|
||||
use diesel::prelude::*;
|
||||
use failure::Error;
|
||||
|
||||
use database::connection;
|
||||
use errors::DataError;
|
||||
use models::{Podcast, Save};
|
||||
use schema::episode;
|
||||
|
||||
@ -31,9 +31,9 @@ pub struct Episode {
|
||||
podcast_id: i32,
|
||||
}
|
||||
|
||||
impl Save<Episode> for Episode {
|
||||
impl Save<Episode, DataError> for Episode {
|
||||
/// Helper method to easily save/"sync" current state of self to the Database.
|
||||
fn save(&self) -> Result<Episode, Error> {
|
||||
fn save(&self) -> Result<Episode, DataError> {
|
||||
let db = connection();
|
||||
let tempdb = db.get()?;
|
||||
|
||||
@ -180,7 +180,7 @@ impl Episode {
|
||||
}
|
||||
|
||||
/// Sets the `played` value with the current `epoch` timestap and save it.
|
||||
pub fn set_played_now(&mut self) -> Result<(), Error> {
|
||||
pub fn set_played_now(&mut self) -> Result<(), DataError> {
|
||||
let epoch = Utc::now().timestamp() as i32;
|
||||
self.set_played(Some(epoch));
|
||||
self.save().map(|_| ())
|
||||
@ -223,9 +223,9 @@ impl From<Episode> for EpisodeWidgetQuery {
|
||||
}
|
||||
}
|
||||
|
||||
impl Save<usize> for EpisodeWidgetQuery {
|
||||
impl Save<usize, DataError> for EpisodeWidgetQuery {
|
||||
/// Helper method to easily save/"sync" current state of self to the Database.
|
||||
fn save(&self) -> Result<usize, Error> {
|
||||
fn save(&self) -> Result<usize, DataError> {
|
||||
use schema::episode::dsl::*;
|
||||
|
||||
let db = connection();
|
||||
@ -342,7 +342,7 @@ impl EpisodeWidgetQuery {
|
||||
}
|
||||
|
||||
/// Sets the `played` value with the current `epoch` timestap and save it.
|
||||
pub fn set_played_now(&mut self) -> Result<(), Error> {
|
||||
pub fn set_played_now(&mut self) -> Result<(), DataError> {
|
||||
let epoch = Utc::now().timestamp() as i32;
|
||||
self.set_played(Some(epoch));
|
||||
self.save().map(|_| ())
|
||||
@ -361,9 +361,9 @@ pub struct EpisodeCleanerQuery {
|
||||
played: Option<i32>,
|
||||
}
|
||||
|
||||
impl Save<usize> for EpisodeCleanerQuery {
|
||||
impl Save<usize, DataError> for EpisodeCleanerQuery {
|
||||
/// Helper method to easily save/"sync" current state of self to the Database.
|
||||
fn save(&self) -> Result<usize, Error> {
|
||||
fn save(&self) -> Result<usize, DataError> {
|
||||
use schema::episode::dsl::*;
|
||||
|
||||
let db = connection();
|
||||
|
||||
@ -23,8 +23,6 @@ pub use self::episode::{Episode, EpisodeMinimal, EpisodeWidgetQuery};
|
||||
pub use self::podcast::{Podcast, PodcastCoverQuery};
|
||||
pub use self::source::Source;
|
||||
|
||||
use failure::Error;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub enum IndexState<T> {
|
||||
Index(T),
|
||||
@ -32,20 +30,21 @@ pub enum IndexState<T> {
|
||||
NotChanged,
|
||||
}
|
||||
|
||||
pub trait Insert {
|
||||
fn insert(&self) -> Result<(), Error>;
|
||||
pub trait Insert<T, E> {
|
||||
fn insert(&self) -> Result<T, E>;
|
||||
}
|
||||
|
||||
pub trait Update {
|
||||
fn update(&self, i32) -> Result<(), Error>;
|
||||
pub trait Update<T, E> {
|
||||
fn update(&self, i32) -> Result<T, E>;
|
||||
}
|
||||
|
||||
pub trait Index: Insert + Update {
|
||||
fn index(&self) -> Result<(), Error>;
|
||||
// This might need to change in the future
|
||||
pub trait Index<T, E>: Insert<T, E> + Update<T, E> {
|
||||
fn index(&self) -> Result<T, E>;
|
||||
}
|
||||
|
||||
/// FIXME: DOCS
|
||||
pub trait Save<T> {
|
||||
pub trait Save<T, E> {
|
||||
/// Helper method to easily save/"sync" current state of a diesel model to the Database.
|
||||
fn save(&self) -> Result<T, Error>;
|
||||
fn save(&self) -> Result<T, E>;
|
||||
}
|
||||
|
||||
@ -1,12 +1,12 @@
|
||||
use ammonia;
|
||||
use diesel;
|
||||
use diesel::prelude::*;
|
||||
use failure::Error;
|
||||
use rfc822_sanitizer::parse_from_rfc2822_with_fallback as parse_rfc822;
|
||||
use rss;
|
||||
|
||||
use database::connection;
|
||||
use dbqueries;
|
||||
use errors::DataError;
|
||||
use models::{Episode, EpisodeMinimal, Index, Insert, Update};
|
||||
use parser;
|
||||
use schema::episode;
|
||||
@ -43,8 +43,8 @@ impl From<NewEpisodeMinimal> for NewEpisode {
|
||||
}
|
||||
}
|
||||
|
||||
impl Insert for NewEpisode {
|
||||
fn insert(&self) -> Result<(), Error> {
|
||||
impl Insert<(), DataError> for NewEpisode {
|
||||
fn insert(&self) -> Result<(), DataError> {
|
||||
use schema::episode::dsl::*;
|
||||
let db = connection();
|
||||
let con = db.get()?;
|
||||
@ -58,8 +58,8 @@ impl Insert for NewEpisode {
|
||||
}
|
||||
}
|
||||
|
||||
impl Update for NewEpisode {
|
||||
fn update(&self, episode_id: i32) -> Result<(), Error> {
|
||||
impl Update<(), DataError> for NewEpisode {
|
||||
fn update(&self, episode_id: i32) -> Result<(), DataError> {
|
||||
use schema::episode::dsl::*;
|
||||
let db = connection();
|
||||
let con = db.get()?;
|
||||
@ -73,9 +73,9 @@ impl Update for NewEpisode {
|
||||
}
|
||||
}
|
||||
|
||||
impl Index for NewEpisode {
|
||||
impl Index<(), DataError> for NewEpisode {
|
||||
// Does not update the episode description if it's the only thing that has changed.
|
||||
fn index(&self) -> Result<(), Error> {
|
||||
fn index(&self) -> Result<(), DataError> {
|
||||
let exists = dbqueries::episode_exists(self.title(), self.podcast_id())?;
|
||||
|
||||
if exists {
|
||||
@ -113,12 +113,12 @@ impl PartialEq<Episode> for NewEpisode {
|
||||
impl NewEpisode {
|
||||
/// Parses an `rss::Item` into a `NewEpisode` Struct.
|
||||
#[allow(dead_code)]
|
||||
pub(crate) fn new(item: &rss::Item, podcast_id: i32) -> Result<Self, Error> {
|
||||
pub(crate) fn new(item: &rss::Item, podcast_id: i32) -> Result<Self, DataError> {
|
||||
NewEpisodeMinimal::new(item, podcast_id).map(|ep| ep.into_new_episode(item))
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub(crate) fn to_episode(&self) -> Result<Episode, Error> {
|
||||
pub(crate) fn to_episode(&self) -> Result<Episode, DataError> {
|
||||
self.index()?;
|
||||
dbqueries::get_episode_from_pk(&self.title, self.podcast_id).map_err(From::from)
|
||||
}
|
||||
@ -182,9 +182,11 @@ impl PartialEq<EpisodeMinimal> for NewEpisodeMinimal {
|
||||
}
|
||||
|
||||
impl NewEpisodeMinimal {
|
||||
pub(crate) fn new(item: &rss::Item, parent_id: i32) -> Result<Self, Error> {
|
||||
pub(crate) fn new(item: &rss::Item, parent_id: i32) -> Result<Self, DataError> {
|
||||
if item.title().is_none() {
|
||||
bail!("No title specified for the item.")
|
||||
return Err(DataError::DiscountBail(format!(
|
||||
"No title specified for the item."
|
||||
)));
|
||||
}
|
||||
|
||||
let title = item.title().unwrap().trim().to_owned();
|
||||
@ -195,7 +197,9 @@ impl NewEpisodeMinimal {
|
||||
} else if item.link().is_some() {
|
||||
item.link().map(|s| url_cleaner(s))
|
||||
} else {
|
||||
bail!("No url specified for the item.")
|
||||
return Err(DataError::DiscountBail(format!(
|
||||
"No url specified for the item."
|
||||
)));
|
||||
};
|
||||
|
||||
// Default to rfc2822 represantation of epoch 0.
|
||||
|
||||
@ -2,9 +2,9 @@ use diesel;
|
||||
use diesel::prelude::*;
|
||||
|
||||
use ammonia;
|
||||
use failure::Error;
|
||||
use rss;
|
||||
|
||||
use errors::DataError;
|
||||
use models::{Index, Insert, Update};
|
||||
use models::Podcast;
|
||||
use schema::podcast;
|
||||
@ -27,8 +27,8 @@ pub(crate) struct NewPodcast {
|
||||
source_id: i32,
|
||||
}
|
||||
|
||||
impl Insert for NewPodcast {
|
||||
fn insert(&self) -> Result<(), Error> {
|
||||
impl Insert<(), DataError> for NewPodcast {
|
||||
fn insert(&self) -> Result<(), DataError> {
|
||||
use schema::podcast::dsl::*;
|
||||
let db = connection();
|
||||
let con = db.get()?;
|
||||
@ -41,8 +41,8 @@ impl Insert for NewPodcast {
|
||||
}
|
||||
}
|
||||
|
||||
impl Update for NewPodcast {
|
||||
fn update(&self, podcast_id: i32) -> Result<(), Error> {
|
||||
impl Update<(), DataError> for NewPodcast {
|
||||
fn update(&self, podcast_id: i32) -> Result<(), DataError> {
|
||||
use schema::podcast::dsl::*;
|
||||
let db = connection();
|
||||
let con = db.get()?;
|
||||
@ -58,8 +58,8 @@ impl Update for NewPodcast {
|
||||
|
||||
// TODO: Maybe return an Enum<Action(Resut)> Instead.
|
||||
// It would make unti testing better too.
|
||||
impl Index for NewPodcast {
|
||||
fn index(&self) -> Result<(), Error> {
|
||||
impl Index<(), DataError> for NewPodcast {
|
||||
fn index(&self) -> Result<(), DataError> {
|
||||
let exists = dbqueries::podcast_exists(self.source_id)?;
|
||||
|
||||
if exists {
|
||||
@ -118,7 +118,7 @@ impl NewPodcast {
|
||||
}
|
||||
|
||||
// Look out for when tryinto lands into stable.
|
||||
pub(crate) fn to_podcast(&self) -> Result<Podcast, Error> {
|
||||
pub(crate) fn to_podcast(&self) -> Result<Podcast, DataError> {
|
||||
self.index()?;
|
||||
dbqueries::get_podcast_from_source_id(self.source_id).map_err(From::from)
|
||||
}
|
||||
|
||||
@ -2,12 +2,12 @@
|
||||
|
||||
use diesel;
|
||||
use diesel::prelude::*;
|
||||
use failure::Error;
|
||||
use url::Url;
|
||||
|
||||
use database::connection;
|
||||
use dbqueries;
|
||||
// use models::{Insert, Update};
|
||||
use errors::DataError;
|
||||
use models::Source;
|
||||
use schema::source;
|
||||
|
||||
@ -32,7 +32,7 @@ impl NewSource {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn insert_or_ignore(&self) -> Result<(), Error> {
|
||||
pub(crate) fn insert_or_ignore(&self) -> Result<(), DataError> {
|
||||
use schema::source::dsl::*;
|
||||
let db = connection();
|
||||
let con = db.get()?;
|
||||
@ -45,7 +45,7 @@ impl NewSource {
|
||||
}
|
||||
|
||||
// Look out for when tryinto lands into stable.
|
||||
pub(crate) fn to_source(&self) -> Result<Source, Error> {
|
||||
pub(crate) fn to_source(&self) -> Result<Source, DataError> {
|
||||
self.insert_or_ignore()?;
|
||||
dbqueries::get_source_from_uri(&self.uri).map_err(From::from)
|
||||
}
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
use diesel::SaveChangesDsl;
|
||||
use failure::Error;
|
||||
|
||||
use database::connection;
|
||||
use errors::DataError;
|
||||
use models::{Save, Source};
|
||||
use schema::podcast;
|
||||
|
||||
@ -23,9 +23,9 @@ pub struct Podcast {
|
||||
source_id: i32,
|
||||
}
|
||||
|
||||
impl Save<Podcast> for Podcast {
|
||||
impl Save<Podcast, DataError> for Podcast {
|
||||
/// Helper method to easily save/"sync" current state of self to the Database.
|
||||
fn save(&self) -> Result<Podcast, Error> {
|
||||
fn save(&self) -> Result<Podcast, DataError> {
|
||||
let db = connection();
|
||||
let tempdb = db.get()?;
|
||||
|
||||
|
||||
@ -1,5 +1,4 @@
|
||||
use diesel::SaveChangesDsl;
|
||||
use failure::Error;
|
||||
use rss::Channel;
|
||||
use url::Url;
|
||||
|
||||
@ -14,6 +13,7 @@ use futures::prelude::*;
|
||||
use futures_cpupool::CpuPool;
|
||||
|
||||
use database::connection;
|
||||
use errors::DataError;
|
||||
use feed::{Feed, FeedBuilder};
|
||||
use models::{NewSource, Save};
|
||||
use schema::source;
|
||||
@ -32,9 +32,9 @@ pub struct Source {
|
||||
http_etag: Option<String>,
|
||||
}
|
||||
|
||||
impl Save<Source> for Source {
|
||||
impl Save<Source, DataError> for Source {
|
||||
/// Helper method to easily save/"sync" current state of self to the Database.
|
||||
fn save(&self) -> Result<Source, Error> {
|
||||
fn save(&self) -> Result<Source, DataError> {
|
||||
let db = connection();
|
||||
let con = db.get()?;
|
||||
|
||||
@ -85,7 +85,7 @@ impl Source {
|
||||
|
||||
/// Extract Etag and LastModifier from res, and update self and the
|
||||
/// corresponding db row.
|
||||
fn update_etag(&mut self, res: &Response) -> Result<(), Error> {
|
||||
fn update_etag(&mut self, res: &Response) -> Result<(), DataError> {
|
||||
let headers = res.headers();
|
||||
|
||||
let etag = headers.get::<ETag>().map(|x| x.tag());
|
||||
@ -109,29 +109,42 @@ impl Source {
|
||||
// 403: Forbidden
|
||||
// 408: Timeout
|
||||
// 410: Feed deleted
|
||||
fn match_status(mut self, res: Response) -> Result<(Self, Response), Error> {
|
||||
// TODO: Rething this api,
|
||||
fn match_status(mut self, res: Response) -> Result<(Self, Response), DataError> {
|
||||
self.update_etag(&res)?;
|
||||
let code = res.status();
|
||||
match code {
|
||||
StatusCode::NotModified => bail!("304: skipping.."),
|
||||
StatusCode::NotModified => {
|
||||
return Err(DataError::DiscountBail(format!("304: skipping..")))
|
||||
}
|
||||
StatusCode::MovedPermanently => {
|
||||
error!("Feed was moved permanently.");
|
||||
self.handle_301(&res)?;
|
||||
bail!("301: Feed was moved permanently.")
|
||||
return Err(DataError::DiscountBail(format!(
|
||||
"301: Feed was moved permanently."
|
||||
)));
|
||||
}
|
||||
StatusCode::TemporaryRedirect => debug!("307: Temporary Redirect."),
|
||||
StatusCode::PermanentRedirect => warn!("308: Permanent Redirect."),
|
||||
StatusCode::Unauthorized => bail!("401: Unauthorized."),
|
||||
StatusCode::Forbidden => bail!("403: Forbidden."),
|
||||
StatusCode::NotFound => bail!("404: Not found."),
|
||||
StatusCode::RequestTimeout => bail!("408: Request Timeout."),
|
||||
StatusCode::Gone => bail!("410: Feed was deleted."),
|
||||
StatusCode::Unauthorized => {
|
||||
return Err(DataError::DiscountBail(format!("401: Unauthorized.")))
|
||||
}
|
||||
StatusCode::Forbidden => {
|
||||
return Err(DataError::DiscountBail(format!("403: Forbidden.")))
|
||||
}
|
||||
StatusCode::NotFound => return Err(DataError::DiscountBail(format!("404: Not found."))),
|
||||
StatusCode::RequestTimeout => {
|
||||
return Err(DataError::DiscountBail(format!("408: Request Timeout.")))
|
||||
}
|
||||
StatusCode::Gone => {
|
||||
return Err(DataError::DiscountBail(format!("410: Feed was deleted.")))
|
||||
}
|
||||
_ => info!("HTTP StatusCode: {}", code),
|
||||
};
|
||||
Ok((self, res))
|
||||
}
|
||||
|
||||
fn handle_301(&mut self, res: &Response) -> Result<(), Error> {
|
||||
fn handle_301(&mut self, res: &Response) -> Result<(), DataError> {
|
||||
let headers = res.headers();
|
||||
|
||||
if let Some(url) = headers.get::<Location>() {
|
||||
@ -150,7 +163,7 @@ impl Source {
|
||||
/// Construct a new `Source` with the given `uri` and index it.
|
||||
///
|
||||
/// This only indexes the `Source` struct, not the Podcast Feed.
|
||||
pub fn from_url(uri: &str) -> Result<Source, Error> {
|
||||
pub fn from_url(uri: &str) -> Result<Source, DataError> {
|
||||
let url = Url::parse(uri)?;
|
||||
|
||||
NewSource::new(&url).to_source()
|
||||
@ -169,7 +182,7 @@ impl Source {
|
||||
client: &Client<HttpsConnector<HttpConnector>>,
|
||||
pool: CpuPool,
|
||||
ignore_etags: bool,
|
||||
) -> Box<Future<Item = Feed, Error = Error>> {
|
||||
) -> Box<Future<Item = Feed, Error = DataError>> {
|
||||
let id = self.id();
|
||||
let feed = self.request_constructor(client, ignore_etags)
|
||||
.and_then(move |(_, res)| response_to_channel(res, pool))
|
||||
@ -190,7 +203,7 @@ impl Source {
|
||||
self,
|
||||
client: &Client<HttpsConnector<HttpConnector>>,
|
||||
ignore_etags: bool,
|
||||
) -> Box<Future<Item = (Self, Response), Error = Error>> {
|
||||
) -> Box<Future<Item = (Self, Response), Error = DataError>> {
|
||||
// FIXME: remove unwrap somehow
|
||||
let uri = Uri::from_str(self.uri()).unwrap();
|
||||
let mut req = Request::new(Method::Get, uri);
|
||||
@ -221,7 +234,7 @@ impl Source {
|
||||
fn response_to_channel(
|
||||
res: Response,
|
||||
pool: CpuPool,
|
||||
) -> Box<Future<Item = Channel, Error = Error> + Send> {
|
||||
) -> Box<Future<Item = Channel, Error = DataError> + Send> {
|
||||
let chan = res.body()
|
||||
.concat2()
|
||||
.map(|x| x.into_iter())
|
||||
|
||||
@ -10,14 +10,13 @@ use hyper::client::HttpConnector;
|
||||
use hyper_tls::HttpsConnector;
|
||||
use tokio_core::reactor::Core;
|
||||
|
||||
use failure::Error;
|
||||
use num_cpus;
|
||||
use rss;
|
||||
|
||||
use Source;
|
||||
use dbqueries;
|
||||
use errors::DataError;
|
||||
use models::{IndexState, NewEpisode, NewEpisodeMinimal};
|
||||
// use Feed;
|
||||
|
||||
// use std::sync::{Arc, Mutex};
|
||||
|
||||
@ -50,7 +49,7 @@ pub fn pipeline<S: IntoIterator<Item = Source>>(
|
||||
tokio_core: &mut Core,
|
||||
pool: &CpuPool,
|
||||
client: Client<HttpsConnector<HttpConnector>>,
|
||||
) -> Result<(), Error> {
|
||||
) -> Result<(), DataError> {
|
||||
let list: Vec<_> = sources
|
||||
.into_iter()
|
||||
.map(clone!(pool => move |s| s.into_feed(&client, pool.clone(), ignore_etags)))
|
||||
@ -59,7 +58,9 @@ pub fn pipeline<S: IntoIterator<Item = Source>>(
|
||||
.collect();
|
||||
|
||||
if list.is_empty() {
|
||||
bail!("No futures were found to run.");
|
||||
return Err(DataError::DiscountBail(format!(
|
||||
"No futures were found to run."
|
||||
)));
|
||||
}
|
||||
|
||||
// Thats not really concurrent yet I think.
|
||||
@ -69,7 +70,7 @@ pub fn pipeline<S: IntoIterator<Item = Source>>(
|
||||
}
|
||||
|
||||
/// Creates a tokio `reactor::Core`, a `CpuPool`, and a `hyper::Client` and runs the pipeline.
|
||||
pub fn run(sources: Vec<Source>, ignore_etags: bool) -> Result<(), Error> {
|
||||
pub fn run(sources: Vec<Source>, ignore_etags: bool) -> Result<(), DataError> {
|
||||
if sources.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
@ -85,7 +86,7 @@ pub fn run(sources: Vec<Source>, ignore_etags: bool) -> Result<(), Error> {
|
||||
}
|
||||
|
||||
/// Docs
|
||||
pub fn index_single_source(s: Source, ignore_etags: bool) -> Result<(), Error> {
|
||||
pub fn index_single_source(s: Source, ignore_etags: bool) -> Result<(), DataError> {
|
||||
let pool = CpuPool::new_num_cpus();
|
||||
let mut core = Core::new()?;
|
||||
let handle = core.handle();
|
||||
@ -104,7 +105,7 @@ pub fn index_single_source(s: Source, ignore_etags: bool) -> Result<(), Error> {
|
||||
fn determine_ep_state(
|
||||
ep: NewEpisodeMinimal,
|
||||
item: &rss::Item,
|
||||
) -> Result<IndexState<NewEpisode>, Error> {
|
||||
) -> Result<IndexState<NewEpisode>, DataError> {
|
||||
// Check if feed exists
|
||||
let exists = dbqueries::episode_exists(ep.title(), ep.podcast_id())?;
|
||||
|
||||
@ -125,7 +126,7 @@ fn determine_ep_state(
|
||||
pub(crate) fn glue_async<'a>(
|
||||
item: &'a rss::Item,
|
||||
id: i32,
|
||||
) -> Box<Future<Item = IndexState<NewEpisode>, Error = Error> + 'a> {
|
||||
) -> Box<Future<Item = IndexState<NewEpisode>, Error = DataError> + 'a> {
|
||||
Box::new(
|
||||
result(NewEpisodeMinimal::new(item, id)).and_then(move |ep| determine_ep_state(ep, item)),
|
||||
)
|
||||
@ -137,7 +138,7 @@ pub(crate) fn glue_async<'a>(
|
||||
#[cfg_attr(feature = "cargo-clippy", allow(type_complexity))]
|
||||
pub fn collect_futures<F>(
|
||||
futures: Vec<F>,
|
||||
) -> Box<Future<Item = Vec<Result<F::Item, F::Error>>, Error = Error>>
|
||||
) -> Box<Future<Item = Vec<Result<F::Item, F::Error>>, Error = DataError>>
|
||||
where
|
||||
F: 'static + Future,
|
||||
<F as Future>::Item: 'static,
|
||||
|
||||
@ -3,12 +3,11 @@
|
||||
use chrono::prelude::*;
|
||||
use rayon::prelude::*;
|
||||
|
||||
use failure::Error;
|
||||
use itertools::Itertools;
|
||||
use url::{Position, Url};
|
||||
|
||||
use dbqueries;
|
||||
// use errors::*;
|
||||
use errors::DataError;
|
||||
use models::{EpisodeCleanerQuery, Podcast, Save};
|
||||
use xdg_dirs::DL_DIR;
|
||||
|
||||
@ -16,7 +15,7 @@ use std::fs;
|
||||
use std::path::Path;
|
||||
|
||||
/// Scan downloaded `episode` entries that might have broken `local_uri`s and set them to `None`.
|
||||
fn download_checker() -> Result<(), Error> {
|
||||
fn download_checker() -> Result<(), DataError> {
|
||||
let mut episodes = dbqueries::get_downloaded_episodes()?;
|
||||
|
||||
episodes
|
||||
@ -34,7 +33,7 @@ fn download_checker() -> Result<(), Error> {
|
||||
}
|
||||
|
||||
/// Delete watched `episodes` that have exceded their liftime after played.
|
||||
fn played_cleaner() -> Result<(), Error> {
|
||||
fn played_cleaner() -> Result<(), DataError> {
|
||||
let mut episodes = dbqueries::get_played_cleaner_episodes()?;
|
||||
|
||||
let now_utc = Utc::now().timestamp() as i32;
|
||||
@ -58,7 +57,7 @@ fn played_cleaner() -> Result<(), Error> {
|
||||
}
|
||||
|
||||
/// Check `ep.local_uri` field and delete the file it points to.
|
||||
fn delete_local_content(ep: &mut EpisodeCleanerQuery) -> Result<(), Error> {
|
||||
fn delete_local_content(ep: &mut EpisodeCleanerQuery) -> Result<(), DataError> {
|
||||
if ep.local_uri().is_some() {
|
||||
let uri = ep.local_uri().unwrap().to_owned();
|
||||
if Path::new(&uri).exists() {
|
||||
@ -87,7 +86,7 @@ fn delete_local_content(ep: &mut EpisodeCleanerQuery) -> Result<(), Error> {
|
||||
///
|
||||
/// Runs a cleaner for played Episode's that are pass the lifetime limit and
|
||||
/// scheduled for removal.
|
||||
pub fn checkup() -> Result<(), Error> {
|
||||
pub fn checkup() -> Result<(), DataError> {
|
||||
info!("Running database checks.");
|
||||
download_checker()?;
|
||||
played_cleaner()?;
|
||||
@ -124,7 +123,7 @@ pub fn replace_extra_spaces(s: &str) -> String {
|
||||
}
|
||||
|
||||
/// Returns the URI of a Podcast Downloads given it's title.
|
||||
pub fn get_download_folder(pd_title: &str) -> Result<String, Error> {
|
||||
pub fn get_download_folder(pd_title: &str) -> Result<String, DataError> {
|
||||
// It might be better to make it a hash of the title or the podcast rowid
|
||||
let download_fold = format!("{}/{}", DL_DIR.to_str().unwrap(), pd_title);
|
||||
|
||||
@ -138,7 +137,7 @@ pub fn get_download_folder(pd_title: &str) -> Result<String, Error> {
|
||||
/// Removes all the entries associated with the given show from the database,
|
||||
/// and deletes all of the downloaded content.
|
||||
// TODO: Write Tests
|
||||
pub fn delete_show(pd: &Podcast) -> Result<(), Error> {
|
||||
pub fn delete_show(pd: &Podcast) -> Result<(), DataError> {
|
||||
dbqueries::remove_feed(pd)?;
|
||||
info!("{} was removed succesfully.", pd.title());
|
||||
|
||||
|
||||
@ -17,6 +17,7 @@ use hammond_data::xdg_dirs::HAMMOND_CACHE;
|
||||
use std::result;
|
||||
|
||||
use failure::Error;
|
||||
|
||||
type Result<T> = result::Result<T, Error>;
|
||||
|
||||
// TODO: Replace path that are of type &str with std::path.
|
||||
|
||||
Loading…
Reference in New Issue
Block a user