1.21 Gigawatts. Remove non-future indexing loop.
Tried to have a seperate futures loop but it's too confusign having too write a Trait2, functon2, etc version of everything and keep it together. Futures are functional sort of, so the synchronous versioun can be removed. It still needs a ton of work though to be ready, or even get near the perf of of the sync+rayon version.
This commit is contained in:
parent
5d88998180
commit
93372a30d0
5
Cargo.lock
generated
5
Cargo.lock
generated
@ -564,10 +564,12 @@ dependencies = [
|
||||
"glob 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"hammond-data 0.1.0",
|
||||
"hyper 0.11.12 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"hyper-tls 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"mime_guess 1.8.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"reqwest 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"tempdir 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"tokio-core 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -584,6 +586,8 @@ dependencies = [
|
||||
"hammond-data 0.1.0",
|
||||
"hammond-downloader 0.1.0",
|
||||
"humansize 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"hyper 0.11.12 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"hyper-tls 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"lazy_static 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"loggerv 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
@ -591,6 +595,7 @@ dependencies = [
|
||||
"rayon 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"regex 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"send-cell 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"tokio-core 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@ -32,25 +32,6 @@ static URLS: &[(&[u8], &str)] = &[
|
||||
(LAS, "https://feeds2.feedburner.com/TheLinuxActionShow"),
|
||||
];
|
||||
|
||||
static URLS2: &[&str] = &[
|
||||
"https://www.pcper.com/rss/podcasts-mp3.rss",
|
||||
"https://feeds.feedburner.com/InterceptedWithJeremyScahill",
|
||||
"http://www.badvoltage.org/feed/ogg/",
|
||||
"https://www.theguardian.com/news/series/the-audio-long-read/podcast.xml",
|
||||
"http://feeds.feedburner.com/coderradiomp3",
|
||||
"https://rss.art19.com/steal-the-stars",
|
||||
"https://feeds.mozilla-podcasts.org/irl",
|
||||
"http://economicupdate.libsyn.com/rss",
|
||||
"http://feeds.feedburner.com/linuxunplugged",
|
||||
"http://ubuntupodcast.org/feed/ogg/",
|
||||
"http://www.newrustacean.com/feed.xml",
|
||||
"http://feeds.propublica.org/propublica/podcast",
|
||||
"https://rss.acast.com/thetipoff",
|
||||
"http://feeds.soundcloud.com/users/soundcloud:users:277306156/sounds.rss",
|
||||
"http://revolutionspodcast.libsyn.com/rss/",
|
||||
"https://www.greaterthancode.com/feed/podcast",
|
||||
];
|
||||
|
||||
fn index_urls() {
|
||||
let feeds: Vec<_> = URLS.iter()
|
||||
.map(|&(buff, url)| {
|
||||
@ -84,24 +65,10 @@ fn bench_index_unchanged_feeds(b: &mut Bencher) {
|
||||
});
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_get_normal_feeds(b: &mut Bencher) {
|
||||
// Index first so it will only bench the comparison test case.
|
||||
truncate_db().unwrap();
|
||||
URLS2.iter().for_each(|url| {
|
||||
Source::from_url(url).unwrap();
|
||||
});
|
||||
|
||||
b.iter(|| {
|
||||
let sources = hammond_data::dbqueries::get_sources().unwrap();
|
||||
index_loop(sources);
|
||||
});
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_get_future_feeds(b: &mut Bencher) {
|
||||
truncate_db().unwrap();
|
||||
URLS2.iter().for_each(|url| {
|
||||
URLS.iter().for_each(|&(_, url)| {
|
||||
Source::from_url(url).unwrap();
|
||||
});
|
||||
|
||||
|
||||
@ -38,7 +38,7 @@ lazy_static! {
|
||||
}
|
||||
|
||||
/// Get an r2d2 SqliteConnection.
|
||||
pub fn connection() -> Pool {
|
||||
pub(crate) fn connection() -> Pool {
|
||||
POOL.clone()
|
||||
}
|
||||
|
||||
|
||||
@ -6,7 +6,8 @@ use diesel::prelude::*;
|
||||
|
||||
use database::connection;
|
||||
use errors::*;
|
||||
use models::{Episode, EpisodeCleanerQuery, EpisodeWidgetQuery, Podcast, PodcastCoverQuery, Source};
|
||||
use models::{Episode, EpisodeCleanerQuery, EpisodeMinimal, EpisodeWidgetQuery, Podcast,
|
||||
PodcastCoverQuery, Source};
|
||||
|
||||
pub fn get_sources() -> Result<Vec<Source>> {
|
||||
use schema::source::dsl::*;
|
||||
@ -238,13 +239,29 @@ pub fn get_podcast_from_source_id(sid: i32) -> Result<Podcast> {
|
||||
.map_err(From::from)
|
||||
}
|
||||
|
||||
pub fn get_episode_from_pk(con: &SqliteConnection, title_: &str, pid: i32) -> QueryResult<Episode> {
|
||||
pub fn get_episode_from_pk(title_: &str, pid: i32) -> Result<Episode> {
|
||||
use schema::episode::dsl::*;
|
||||
let db = connection();
|
||||
let con = db.get()?;
|
||||
|
||||
episode
|
||||
.filter(title.eq(title_))
|
||||
.filter(podcast_id.eq(pid))
|
||||
.get_result::<Episode>(&*con)
|
||||
.map_err(From::from)
|
||||
}
|
||||
|
||||
pub fn get_episode_minimal_from_pk(title_: &str, pid: i32) -> Result<EpisodeMinimal> {
|
||||
use schema::episode::dsl::*;
|
||||
let db = connection();
|
||||
let con = db.get()?;
|
||||
|
||||
episode
|
||||
.select((rowid, title, uri, epoch, duration, guid, podcast_id))
|
||||
.filter(title.eq(title_))
|
||||
.filter(podcast_id.eq(pid))
|
||||
.get_result::<EpisodeMinimal>(&*con)
|
||||
.map_err(From::from)
|
||||
}
|
||||
|
||||
pub fn remove_feed(pd: &Podcast) -> Result<()> {
|
||||
|
||||
@ -1,19 +1,13 @@
|
||||
//! Index Feeds.
|
||||
|
||||
use diesel::prelude::*;
|
||||
use futures::future::*;
|
||||
use futures::prelude::*;
|
||||
use rayon::iter::IntoParallelIterator;
|
||||
// use futures::prelude::*;
|
||||
use errors::*;
|
||||
use models::{NewEpisode, NewPodcast, Podcast};
|
||||
use rayon::prelude::*;
|
||||
use rss;
|
||||
|
||||
use database::connection;
|
||||
use dbqueries;
|
||||
use errors::*;
|
||||
use models::{NewEpisode, NewPodcast, Podcast, Source};
|
||||
|
||||
// #[cfg(test)]
|
||||
// use models::queryables::Episode;
|
||||
// use models::{IndexState, Source};
|
||||
// use pipeline::*;
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Wrapper struct that hold a `Source` id and the `rss::Channel`
|
||||
@ -24,11 +18,6 @@ pub struct Feed {
|
||||
}
|
||||
|
||||
impl Feed {
|
||||
/// Constructor that consumes a `Source` and returns the corresponding `Feed` struct.
|
||||
pub fn from_source(s: &mut Source) -> Result<Feed> {
|
||||
s.into_feed(false)
|
||||
}
|
||||
|
||||
/// Constructor that consumes a `Source` and a `rss::Channel` returns a `Feed` struct.
|
||||
pub fn from_channel_source(channel: rss::Channel, source_id: i32) -> Feed {
|
||||
Feed { channel, source_id }
|
||||
@ -40,31 +29,15 @@ impl Feed {
|
||||
self.index_channel_items(&pd)
|
||||
}
|
||||
|
||||
/// Docs
|
||||
// FIXME: docs
|
||||
// FIXME: lifetime stuff
|
||||
pub fn index_future(self) -> Box<Future<Item = (), Error = Error>> {
|
||||
let indx = self.parse_podcast_futture()
|
||||
.and_then(|pd| pd.into_podcast())
|
||||
.and_then(move |pd| self.index_channel_items(&pd));
|
||||
|
||||
Box::new(indx)
|
||||
}
|
||||
|
||||
// TODO: Refactor transcactions and find a way to do it in parallel.
|
||||
fn index_channel_items(&self, pd: &Podcast) -> Result<()> {
|
||||
let episodes = self.parse_channel_items(pd);
|
||||
let db = connection();
|
||||
let con = db.get()?;
|
||||
|
||||
let _ = con.transaction::<(), Error, _>(|| {
|
||||
episodes.into_iter().for_each(|x| {
|
||||
if let Err(err) = x.index(&con) {
|
||||
error!("Failed to index episode: {:?}.", x.title());
|
||||
error!("Error msg: {}", err);
|
||||
};
|
||||
});
|
||||
Ok(())
|
||||
episodes.iter().for_each(|x| {
|
||||
if let Err(err) = x.index() {
|
||||
error!("Failed to index episode: {:?}.", x.title());
|
||||
error!("Error msg: {}", err);
|
||||
};
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
@ -73,6 +46,7 @@ impl Feed {
|
||||
NewPodcast::new(&self.channel, self.source_id)
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn parse_podcast_futture(&self) -> Box<FutureResult<NewPodcast, Error>> {
|
||||
Box::new(ok(self.parse_podcast()))
|
||||
}
|
||||
@ -86,69 +60,28 @@ impl Feed {
|
||||
|
||||
new_episodes
|
||||
}
|
||||
|
||||
// This could also retrurn a FutureResult<Vec<FutureNewEpisode, Error>>, Error> Instead
|
||||
#[allow(dead_code)]
|
||||
fn parse_episodes_future(&self, pd: &Podcast) -> Box<Vec<FutureResult<NewEpisode, Error>>> {
|
||||
let episodes = self.channel
|
||||
.items()
|
||||
.par_iter()
|
||||
.map(|item| result(NewEpisode::new(item, pd.id())))
|
||||
.collect();
|
||||
|
||||
Box::new(episodes)
|
||||
}
|
||||
|
||||
// #[cfg(test)]
|
||||
// /// This returns only the episodes in the xml feed.
|
||||
// fn get_episodes(&self) -> Result<Vec<Episode>> {
|
||||
// let pd = self.get_podcast()?;
|
||||
// let eps = self.parse_channel_items(&pd);
|
||||
|
||||
// let db = connection();
|
||||
// let con = db.get()?;
|
||||
// let episodes: Vec<_> = eps.into_iter()
|
||||
// .filter_map(|ep| ep.into_episode(&con).ok())
|
||||
// .collect();
|
||||
|
||||
// Ok(episodes)
|
||||
// }
|
||||
}
|
||||
// fn parse_channel_items2(
|
||||
// &self,
|
||||
// pd: &Podcast,
|
||||
// ) -> (Vec<IndexState<NewEpisode>>, Vec<IndexState<NewEpisode>>) {
|
||||
// let items = self.channel.items();
|
||||
// let (insert, update): (Vec<_>, Vec<_>) = items
|
||||
// .into_iter()
|
||||
// .filter_map(|item| glue(item, pd.id()).ok())
|
||||
// .filter(|&state| state == IndexState::NotChanged)
|
||||
// .partition(|&state| state == IndexState::Index);
|
||||
|
||||
/// Index a "list" of `Source`s.
|
||||
pub fn index_loop<S: IntoParallelIterator<Item = Source>>(sources: S) {
|
||||
sources
|
||||
.into_par_iter()
|
||||
.filter_map(|mut source| {
|
||||
let foo = Feed::from_source(&mut source);
|
||||
if let Err(err) = foo {
|
||||
error!("Error: {}", err);
|
||||
None
|
||||
} else {
|
||||
foo.ok()
|
||||
}
|
||||
})
|
||||
// Handle the indexing of a `Feed` into the Database.
|
||||
.for_each(|feed| {
|
||||
if let Err(err) = feed.index() {
|
||||
error!("Error While trying to update the database.");
|
||||
error!("Error msg: {}", err);
|
||||
}
|
||||
});
|
||||
|
||||
info!("Indexing done.");
|
||||
}
|
||||
|
||||
/// Retrieves all `Sources` from the database and updates/indexes them.
|
||||
pub fn index_all() -> Result<()> {
|
||||
let sources = dbqueries::get_sources()?;
|
||||
index_loop(sources);
|
||||
Ok(())
|
||||
}
|
||||
// (insert, update)
|
||||
// }
|
||||
// }
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use Source;
|
||||
use database::truncate_db;
|
||||
use dbqueries;
|
||||
use pipeline;
|
||||
use std::fs;
|
||||
use std::io::BufReader;
|
||||
|
||||
@ -169,11 +102,12 @@ mod tests {
|
||||
// Index the urls into the source table.
|
||||
Source::from_url(url).unwrap();
|
||||
});
|
||||
let sources = dbqueries::get_sources().unwrap();
|
||||
pipeline::pipeline(sources, true).unwrap();
|
||||
|
||||
index_all().unwrap();
|
||||
|
||||
let sources = dbqueries::get_sources().unwrap();
|
||||
// Run again to cover Unique constrains erros.
|
||||
index_all().unwrap();
|
||||
pipeline::pipeline(sources, true).unwrap()
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@ -6,7 +6,8 @@ pub(crate) mod episode;
|
||||
pub(crate) mod podcast;
|
||||
pub(crate) mod source;
|
||||
|
||||
use diesel::prelude::*;
|
||||
// use futures::prelude::*;
|
||||
// use futures::future::*;
|
||||
|
||||
pub(crate) use self::new_episode::{NewEpisode, NewEpisodeMinimal};
|
||||
pub(crate) use self::new_podcast::NewPodcast;
|
||||
@ -17,17 +18,20 @@ pub(crate) use self::episode::EpisodeCleanerQuery;
|
||||
pub use self::podcast::{Podcast, PodcastCoverQuery};
|
||||
pub use self::source::Source;
|
||||
|
||||
use errors::*;
|
||||
|
||||
#[allow(dead_code)]
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub enum IndexState<T> {
|
||||
Index(T),
|
||||
Update(T),
|
||||
Update((T, i32)),
|
||||
NotChanged,
|
||||
}
|
||||
|
||||
pub trait Insert {
|
||||
fn insert(&self, &SqliteConnection) -> QueryResult<usize>;
|
||||
fn insert(&self) -> Result<()>;
|
||||
}
|
||||
|
||||
pub trait Update {
|
||||
fn update(&self, &SqliteConnection, i32) -> QueryResult<usize>;
|
||||
fn update(&self, i32) -> Result<()>;
|
||||
}
|
||||
|
||||
@ -1,11 +1,13 @@
|
||||
use diesel;
|
||||
use diesel::prelude::*;
|
||||
|
||||
use diesel;
|
||||
use schema::episode;
|
||||
|
||||
use ammonia;
|
||||
use rfc822_sanitizer::parse_from_rfc2822_with_fallback as parse_rfc822;
|
||||
use rss;
|
||||
|
||||
use database::connection;
|
||||
use dbqueries;
|
||||
use errors::*;
|
||||
use models::{Episode, Insert, Update};
|
||||
@ -44,43 +46,49 @@ impl From<NewEpisodeMinimal> for NewEpisode {
|
||||
}
|
||||
|
||||
impl Insert for NewEpisode {
|
||||
fn insert(&self, con: &SqliteConnection) -> QueryResult<usize> {
|
||||
fn insert(&self) -> Result<()> {
|
||||
use schema::episode::dsl::*;
|
||||
diesel::insert_into(episode).values(self).execute(&*con)
|
||||
let db = connection();
|
||||
let con = db.get()?;
|
||||
|
||||
info!("Indexing {:?}", self.title);
|
||||
diesel::insert_into(episode)
|
||||
.values(self)
|
||||
.execute(&*con)
|
||||
.map_err(From::from)
|
||||
.map(|_| ())
|
||||
}
|
||||
}
|
||||
|
||||
impl Update for NewEpisode {
|
||||
fn update(&self, con: &SqliteConnection, episode_id: i32) -> QueryResult<usize> {
|
||||
fn update(&self, episode_id: i32) -> Result<()> {
|
||||
use schema::episode::dsl::*;
|
||||
let db = connection();
|
||||
let con = db.get()?;
|
||||
|
||||
info!("Updating {:?}", self.title);
|
||||
diesel::update(episode.filter(rowid.eq(episode_id)))
|
||||
.set(self)
|
||||
.execute(&*con)
|
||||
.map_err(From::from)
|
||||
.map(|_| ())
|
||||
}
|
||||
}
|
||||
|
||||
impl NewEpisode {
|
||||
#[allow(dead_code)]
|
||||
/// Parses an `rss::Item` into a `NewEpisode` Struct.
|
||||
pub(crate) fn new(item: &rss::Item, podcast_id: i32) -> Result<Self> {
|
||||
NewEpisodeMinimal::new(item, podcast_id).map(|ep| ep.into_new_episode(item))
|
||||
}
|
||||
|
||||
// TODO: Refactor into batch indexes instead.
|
||||
#[allow(dead_code)]
|
||||
pub(crate) fn into_episode(self, con: &SqliteConnection) -> Result<Episode> {
|
||||
self.index(con)?;
|
||||
Ok(dbqueries::get_episode_from_pk(
|
||||
con,
|
||||
&self.title,
|
||||
self.podcast_id,
|
||||
)?)
|
||||
pub(crate) fn into_episode(self) -> Result<Episode> {
|
||||
self.index()?;
|
||||
dbqueries::get_episode_from_pk(&self.title, self.podcast_id)
|
||||
}
|
||||
|
||||
pub(crate) fn index(&self, con: &SqliteConnection) -> QueryResult<()> {
|
||||
let ep = dbqueries::get_episode_from_pk(con, &self.title, self.podcast_id);
|
||||
pub(crate) fn index(&self) -> Result<()> {
|
||||
let ep = dbqueries::get_episode_from_pk(&self.title, self.podcast_id);
|
||||
|
||||
match ep {
|
||||
Ok(foo) => {
|
||||
@ -92,11 +100,11 @@ impl NewEpisode {
|
||||
|| foo.uri() != self.uri.as_ref().map(|s| s.as_str())
|
||||
|| foo.duration() != self.duration
|
||||
{
|
||||
self.update(con, foo.rowid())?;
|
||||
self.update(foo.rowid())?;
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
self.insert(con)?;
|
||||
self.insert()?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
|
||||
@ -29,20 +29,31 @@ pub(crate) struct NewPodcast {
|
||||
}
|
||||
|
||||
impl Insert for NewPodcast {
|
||||
fn insert(&self, con: &SqliteConnection) -> QueryResult<usize> {
|
||||
fn insert(&self) -> Result<()> {
|
||||
use schema::podcast::dsl::*;
|
||||
diesel::insert_into(podcast).values(self).execute(&*con)
|
||||
let db = connection();
|
||||
let con = db.get()?;
|
||||
|
||||
diesel::insert_into(podcast)
|
||||
.values(self)
|
||||
.execute(&*con)
|
||||
.map(|_| ())
|
||||
.map_err(From::from)
|
||||
}
|
||||
}
|
||||
|
||||
impl Update for NewPodcast {
|
||||
fn update(&self, con: &SqliteConnection, podcast_id: i32) -> QueryResult<usize> {
|
||||
fn update(&self, podcast_id: i32) -> Result<()> {
|
||||
use schema::podcast::dsl::*;
|
||||
let db = connection();
|
||||
let con = db.get()?;
|
||||
|
||||
info!("Updating {}", self.title);
|
||||
diesel::update(podcast.filter(id.eq(podcast_id)))
|
||||
.set(self)
|
||||
.execute(&*con)
|
||||
.map(|_| ())
|
||||
.map_err(From::from)
|
||||
}
|
||||
}
|
||||
|
||||
@ -87,19 +98,17 @@ impl NewPodcast {
|
||||
pub(crate) fn index(&self) -> Result<()> {
|
||||
let pd = dbqueries::get_podcast_from_source_id(self.source_id);
|
||||
|
||||
let db = connection();
|
||||
let con = db.get()?;
|
||||
match pd {
|
||||
Ok(foo) => {
|
||||
if (foo.link() != self.link) || (foo.title() != self.title)
|
||||
|| (foo.image_uri() != self.image_uri.as_ref().map(|x| x.as_str()))
|
||||
{
|
||||
info!("NewEpisode: {:?}\n OldEpisode: {:?}", self, foo);
|
||||
self.update(&con, foo.id())?;
|
||||
self.update(foo.id())?;
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
self.insert(&con)?;
|
||||
self.insert()?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
|
||||
@ -5,7 +5,8 @@ use diesel::prelude::*;
|
||||
|
||||
use database::connection;
|
||||
use dbqueries;
|
||||
use models::{Insert, Update};
|
||||
// use models::{Insert, Update};
|
||||
use models::Insert;
|
||||
use models::Source;
|
||||
use schema::source;
|
||||
|
||||
@ -24,9 +25,14 @@ pub(crate) struct NewSource {
|
||||
}
|
||||
|
||||
impl Insert for NewSource {
|
||||
fn insert(&self, con: &SqliteConnection) -> QueryResult<usize> {
|
||||
fn insert(&self) -> Result<()> {
|
||||
use schema::source::dsl::*;
|
||||
diesel::insert_into(source).values(self).execute(&*con)
|
||||
let db = connection();
|
||||
let con = db.get()?;
|
||||
|
||||
// FIXME: Insert or ignore
|
||||
let _ = diesel::insert_into(source).values(self).execute(&*con);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@ -39,19 +45,9 @@ impl NewSource {
|
||||
}
|
||||
}
|
||||
|
||||
fn index(&self) -> Result<()> {
|
||||
let db = connection();
|
||||
let con = db.get()?;
|
||||
|
||||
// Throw away the result like `insert or ignore`
|
||||
// Diesel deos not support `insert or ignore` yet.
|
||||
let _ = self.insert(&con);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Look out for when tryinto lands into stable.
|
||||
pub(crate) fn into_source(self) -> Result<Source> {
|
||||
self.index()?;
|
||||
self.insert()?;
|
||||
dbqueries::get_source_from_uri(&self.uri)
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,5 +1,4 @@
|
||||
use diesel::SaveChangesDsl;
|
||||
use reqwest;
|
||||
use rss::Channel;
|
||||
|
||||
use hyper;
|
||||
@ -17,7 +16,6 @@ use feed::Feed;
|
||||
use models::NewSource;
|
||||
use schema::source;
|
||||
|
||||
use std::io::Read;
|
||||
use std::str::FromStr;
|
||||
|
||||
#[derive(Queryable, Identifiable, AsChangeset, PartialEq)]
|
||||
@ -79,26 +77,8 @@ impl Source {
|
||||
|
||||
/// Extract Etag and LastModifier from res, and update self and the
|
||||
/// corresponding db row.
|
||||
fn update_etag(&mut self, res: &reqwest::Response) -> Result<()> {
|
||||
let headers = res.headers();
|
||||
|
||||
let etag = headers.get::<ETag>();
|
||||
let lmod = headers.get::<LastModified>();
|
||||
|
||||
if self.http_etag() != etag.map(|x| x.tag()) || self.last_modified != lmod.map(|x| {
|
||||
format!("{}", x)
|
||||
}) {
|
||||
self.http_etag = etag.map(|x| x.tag().to_string().to_owned());
|
||||
self.last_modified = lmod.map(|x| format!("{}", x));
|
||||
self.save()?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Extract Etag and LastModifier from res, and update self and the
|
||||
/// corresponding db row.
|
||||
fn update_etag2(mut self, res: &Response) -> Result<()> {
|
||||
// FIXME: With &mut self the closure is of type FnMut instead of FnOnce.
|
||||
fn update_etag(mut self, res: &Response) -> Result<()> {
|
||||
let headers = res.headers();
|
||||
|
||||
let etag = headers.get::<ETag>();
|
||||
@ -123,43 +103,7 @@ impl Source {
|
||||
///
|
||||
/// Consumes `self` and Returns the corresponding `Feed` Object.
|
||||
// TODO: Refactor into TryInto once it lands on stable.
|
||||
pub fn into_feed(&mut self, ignore_etags: bool) -> Result<Feed> {
|
||||
use reqwest::header::{EntityTag, Headers, HttpDate, IfModifiedSince, IfNoneMatch};
|
||||
|
||||
let mut headers = Headers::new();
|
||||
|
||||
if !ignore_etags {
|
||||
if let Some(foo) = self.http_etag() {
|
||||
headers.set(IfNoneMatch::Items(vec![
|
||||
EntityTag::new(true, foo.to_owned()),
|
||||
]));
|
||||
}
|
||||
|
||||
if let Some(foo) = self.last_modified() {
|
||||
if let Ok(x) = foo.parse::<HttpDate>() {
|
||||
headers.set(IfModifiedSince(x));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let client = reqwest::Client::builder().referer(false).build()?;
|
||||
let mut res = client.get(self.uri()).headers(headers).send()?;
|
||||
|
||||
info!("GET to {} , returned: {}", self.uri(), res.status());
|
||||
|
||||
self.update_etag(&res)?;
|
||||
match_status(res.status())?;
|
||||
|
||||
let mut buf = String::new();
|
||||
res.read_to_string(&mut buf)?;
|
||||
let chan = Channel::from_str(&buf)?;
|
||||
|
||||
Ok(Feed::from_channel_source(chan, self.id))
|
||||
}
|
||||
|
||||
// FIXME:
|
||||
/// Docs
|
||||
pub fn into_fututre_feed(
|
||||
pub fn into_feed(
|
||||
self,
|
||||
client: &Client<HttpsConnector<HttpConnector>>,
|
||||
ignore_etags: bool,
|
||||
@ -168,7 +112,7 @@ impl Source {
|
||||
let feed = request_constructor(&self, client, ignore_etags)
|
||||
.map_err(From::from)
|
||||
.and_then(move |res| {
|
||||
self.update_etag2(&res)?;
|
||||
self.update_etag(&res)?;
|
||||
Ok(res)
|
||||
})
|
||||
.and_then(|res| -> Result<Response> {
|
||||
@ -278,7 +222,7 @@ mod tests {
|
||||
let url = "http://www.newrustacean.com/feed.xml";
|
||||
let source = Source::from_url(url).unwrap();
|
||||
|
||||
let feed = source.into_fututre_feed(&client, true);
|
||||
let feed = source.into_feed(&client, true);
|
||||
|
||||
assert!(core.run(feed).is_ok());
|
||||
}
|
||||
|
||||
@ -2,7 +2,6 @@ use rss::Item;
|
||||
|
||||
/// Parses an Item Itunes extension and returns it's duration value in seconds.
|
||||
// FIXME: Rafactor
|
||||
// TODO: Write tests
|
||||
#[allow(non_snake_case)]
|
||||
pub(crate) fn parse_itunes_duration(item: &Item) -> Option<i32> {
|
||||
let duration = item.itunes_ext().map(|s| s.duration())??;
|
||||
|
||||
@ -2,14 +2,19 @@
|
||||
//! Docs.
|
||||
|
||||
use futures::future::*;
|
||||
use futures::prelude::*;
|
||||
// use futures::prelude::*;
|
||||
|
||||
use hyper::Client;
|
||||
use hyper_tls::HttpsConnector;
|
||||
use tokio_core::reactor::Core;
|
||||
|
||||
use rss;
|
||||
|
||||
use Source;
|
||||
use dbqueries;
|
||||
use errors::*;
|
||||
use models::{IndexState, Insert, NewEpisode, NewEpisodeMinimal, Update};
|
||||
// use models::new_episode::NewEpisodeMinimal;
|
||||
// use Feed;
|
||||
|
||||
use std;
|
||||
@ -32,7 +37,7 @@ pub fn pipeline<S: IntoIterator<Item = Source>>(sources: S, ignore_etags: bool)
|
||||
.into_iter()
|
||||
// FIXME: Make proper indexing futures instead of wrapping up existing
|
||||
// blocking functions
|
||||
.map(|s| s.into_fututre_feed(&client, ignore_etags).map(|feed| feed.index_future()))
|
||||
.map(|s| s.into_feed(&client, ignore_etags).and_then(|feed| feed.index()))
|
||||
.collect();
|
||||
|
||||
let f = core.run(collect_futures(list))?;
|
||||
@ -43,9 +48,54 @@ pub fn pipeline<S: IntoIterator<Item = Source>>(sources: S, ignore_etags: bool)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub(crate) fn determine_episode_state(
|
||||
ep: NewEpisodeMinimal,
|
||||
item: &rss::Item,
|
||||
) -> Result<IndexState<NewEpisode>> {
|
||||
// determine if feed exists
|
||||
let exists = dbqueries::episode_exists(ep.title(), ep.podcast_id())?;
|
||||
|
||||
if !exists {
|
||||
return Ok(IndexState::Index(ep.into_new_episode(item)));
|
||||
} else {
|
||||
let old = dbqueries::get_episode_minimal_from_pk(ep.title(), ep.podcast_id())?;
|
||||
let rowid = old.rowid();
|
||||
|
||||
if ep != old.into() {
|
||||
return Ok(IndexState::Update((ep.into_new_episode(item), rowid)));
|
||||
} else {
|
||||
return Ok(IndexState::NotChanged);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub(crate) fn model_state<T: Insert + Update>(state: IndexState<T>) -> Result<()> {
|
||||
match state {
|
||||
IndexState::NotChanged => Ok(()),
|
||||
IndexState::Index(t) => t.insert(),
|
||||
IndexState::Update((t, rowid)) => t.update(rowid),
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub(crate) fn model_state_future<T: Insert + Update>(
|
||||
state: IndexState<T>,
|
||||
) -> Box<FutureResult<(), Error>> {
|
||||
Box::new(result(model_state(state)))
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub(crate) fn glue(item: &rss::Item, id: i32) -> Result<IndexState<NewEpisode>> {
|
||||
let e = NewEpisodeMinimal::new(item, id)?;
|
||||
determine_episode_state(e, &item)
|
||||
}
|
||||
|
||||
// Weird magic from #rust irc channel
|
||||
// kudos to remexre
|
||||
fn collect_futures<F>(
|
||||
/// docs
|
||||
pub fn collect_futures<F>(
|
||||
futures: Vec<F>,
|
||||
) -> Box<Future<Item = Vec<std::result::Result<F::Item, F::Error>>, Error = Error>>
|
||||
where
|
||||
|
||||
@ -153,7 +153,7 @@ mod tests {
|
||||
|
||||
use self::tempdir::TempDir;
|
||||
use super::*;
|
||||
use database::{connection, truncate_db};
|
||||
use database::truncate_db;
|
||||
use models::new_episode::NewEpisodeBuilder;
|
||||
use std::fs::File;
|
||||
use std::io::Write;
|
||||
@ -169,14 +169,12 @@ mod tests {
|
||||
writeln!(tmp_file, "Foooo").unwrap();
|
||||
|
||||
// Setup episodes
|
||||
let db = connection();
|
||||
let con = db.get().unwrap();
|
||||
let n1 = NewEpisodeBuilder::default()
|
||||
.title("foo_bar".to_string())
|
||||
.podcast_id(0)
|
||||
.build()
|
||||
.unwrap()
|
||||
.into_episode(&con)
|
||||
.into_episode()
|
||||
.unwrap();
|
||||
|
||||
let n2 = NewEpisodeBuilder::default()
|
||||
@ -184,15 +182,14 @@ mod tests {
|
||||
.podcast_id(1)
|
||||
.build()
|
||||
.unwrap()
|
||||
.into_episode(&con)
|
||||
.into_episode()
|
||||
.unwrap();
|
||||
|
||||
let mut ep1 = dbqueries::get_episode_from_pk(&con, n1.title(), n1.podcast_id()).unwrap();
|
||||
let mut ep2 = dbqueries::get_episode_from_pk(&con, n2.title(), n2.podcast_id()).unwrap();
|
||||
let mut ep1 = dbqueries::get_episode_from_pk(n1.title(), n1.podcast_id()).unwrap();
|
||||
let mut ep2 = dbqueries::get_episode_from_pk(n2.title(), n2.podcast_id()).unwrap();
|
||||
ep1.set_local_uri(Some(valid_path.to_str().unwrap()));
|
||||
ep2.set_local_uri(Some(bad_path.to_str().unwrap()));
|
||||
|
||||
drop(con);
|
||||
ep1.save().unwrap();
|
||||
ep2.save().unwrap();
|
||||
|
||||
@ -214,24 +211,15 @@ mod tests {
|
||||
|
||||
let _tmp_dir = helper_db();
|
||||
download_checker().unwrap();
|
||||
let episode = {
|
||||
let db = connection();
|
||||
let con = db.get().unwrap();
|
||||
dbqueries::get_episode_from_pk(&con, "bar_baz", 1).unwrap()
|
||||
};
|
||||
let episode = dbqueries::get_episode_from_pk("bar_baz", 1).unwrap();
|
||||
assert!(episode.local_uri().is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_download_cleaner() {
|
||||
let _tmp_dir = helper_db();
|
||||
let mut episode: EpisodeCleanerQuery = {
|
||||
let db = connection();
|
||||
let con = db.get().unwrap();
|
||||
dbqueries::get_episode_from_pk(&con, "foo_bar", 0)
|
||||
.unwrap()
|
||||
.into()
|
||||
};
|
||||
let mut episode: EpisodeCleanerQuery =
|
||||
dbqueries::get_episode_from_pk("foo_bar", 0).unwrap().into();
|
||||
|
||||
let valid_path = episode.local_uri().unwrap().to_owned();
|
||||
delete_local_content(&mut episode).unwrap();
|
||||
@ -241,11 +229,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_played_cleaner_expired() {
|
||||
let _tmp_dir = helper_db();
|
||||
let mut episode = {
|
||||
let db = connection();
|
||||
let con = db.get().unwrap();
|
||||
dbqueries::get_episode_from_pk(&con, "foo_bar", 0).unwrap()
|
||||
};
|
||||
let mut episode = dbqueries::get_episode_from_pk("foo_bar", 0).unwrap();
|
||||
let now_utc = Utc::now().timestamp() as i32;
|
||||
// let limit = now_utc - 172_800;
|
||||
let epoch = now_utc - 200_000;
|
||||
@ -261,11 +245,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_played_cleaner_none() {
|
||||
let _tmp_dir = helper_db();
|
||||
let mut episode = {
|
||||
let db = connection();
|
||||
let con = db.get().unwrap();
|
||||
dbqueries::get_episode_from_pk(&con, "foo_bar", 0).unwrap()
|
||||
};
|
||||
let mut episode = dbqueries::get_episode_from_pk("foo_bar", 0).unwrap();
|
||||
let now_utc = Utc::now().timestamp() as i32;
|
||||
// limit = 172_800;
|
||||
let epoch = now_utc - 20_000;
|
||||
|
||||
@ -15,3 +15,7 @@ glob = "0.2.11"
|
||||
|
||||
[dependencies.hammond-data]
|
||||
path = "../hammond-data"
|
||||
|
||||
[dev-dependencies]
|
||||
tokio-core = "0.1.12"
|
||||
hyper-tls = "0.1.2"
|
||||
|
||||
@ -219,20 +219,28 @@ mod tests {
|
||||
use hammond_data::Source;
|
||||
use hammond_data::dbqueries;
|
||||
|
||||
use hyper::Client;
|
||||
use hyper_tls::HttpsConnector;
|
||||
use tokio_core::reactor::Core;
|
||||
|
||||
#[test]
|
||||
// This test inserts an rss feed to your `XDG_DATA/hammond/hammond.db` so we make it explicit
|
||||
// to run it.
|
||||
#[ignore]
|
||||
fn test_cache_image() {
|
||||
let url = "http://www.newrustacean.com/feed.xml";
|
||||
let mut core = Core::new().unwrap();
|
||||
let client = Client::configure()
|
||||
.connector(HttpsConnector::new(4, &core.handle()).unwrap())
|
||||
.build(&core.handle());
|
||||
|
||||
let url = "http://www.newrustacean.com/feed.xml";
|
||||
// Create and index a source
|
||||
let mut source = Source::from_url(url).unwrap();
|
||||
let source = Source::from_url(url).unwrap();
|
||||
// Copy it's id
|
||||
let sid = source.id();
|
||||
|
||||
// Convert Source it into a Feed and index it
|
||||
let feed = source.into_feed(true).unwrap();
|
||||
// Convert Source it into a future Feed and index it
|
||||
let future = source.into_feed(&client, true);
|
||||
let feed = core.run(future).unwrap();
|
||||
feed.index().unwrap();
|
||||
|
||||
// Get the Podcast
|
||||
|
||||
@ -13,3 +13,8 @@ extern crate tempdir;
|
||||
|
||||
pub mod downloader;
|
||||
pub mod errors;
|
||||
|
||||
#[cfg(test)]
|
||||
extern crate hyper_tls;
|
||||
#[cfg(test)]
|
||||
extern crate tokio_core;
|
||||
|
||||
@ -30,3 +30,8 @@ path = "../hammond-data"
|
||||
|
||||
[dependencies.hammond-downloader]
|
||||
path = "../hammond-downloader"
|
||||
|
||||
[dev-dependencies]
|
||||
hyper-tls = "0.1.2"
|
||||
tokio-core = "0.1.12"
|
||||
hyper = "0.11.12"
|
||||
|
||||
@ -21,6 +21,13 @@ extern crate regex;
|
||||
extern crate send_cell;
|
||||
// extern crate rayon;
|
||||
|
||||
#[cfg(test)]
|
||||
extern crate hyper;
|
||||
#[cfg(test)]
|
||||
extern crate hyper_tls;
|
||||
#[cfg(test)]
|
||||
extern crate tokio_core;
|
||||
|
||||
// use rayon::prelude::*;
|
||||
use log::LogLevel;
|
||||
|
||||
|
||||
@ -117,8 +117,11 @@ pub fn add(id: i32, directory: &str, sender: Sender<Action>) {
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
use hyper::Client;
|
||||
use hyper_tls::HttpsConnector;
|
||||
use tokio_core::reactor::Core;
|
||||
|
||||
use hammond_data::{Episode, Source};
|
||||
use hammond_data::database;
|
||||
use hammond_data::dbqueries;
|
||||
use hammond_data::utils::get_download_folder;
|
||||
|
||||
@ -133,25 +136,26 @@ mod tests {
|
||||
// THIS IS NOT A RELIABLE TEST
|
||||
// Just quick sanity check
|
||||
fn test_start_dl() {
|
||||
let url = "http://www.newrustacean.com/feed.xml";
|
||||
let mut core = Core::new().unwrap();
|
||||
let client = Client::configure()
|
||||
.connector(HttpsConnector::new(4, &core.handle()).unwrap())
|
||||
.build(&core.handle());
|
||||
|
||||
let url = "http://www.newrustacean.com/feed.xml";
|
||||
// Create and index a source
|
||||
let mut source = Source::from_url(url).unwrap();
|
||||
let source = Source::from_url(url).unwrap();
|
||||
// Copy it's id
|
||||
let sid = source.id();
|
||||
|
||||
// Convert Source it into a Feed and index it
|
||||
let feed = source.into_feed(true).unwrap();
|
||||
// Convert Source it into a future Feed and index it
|
||||
let future = source.into_feed(&client, true);
|
||||
let feed = core.run(future).unwrap();
|
||||
feed.index().unwrap();
|
||||
|
||||
// Get the Podcast
|
||||
let pd = dbqueries::get_podcast_from_source_id(sid).unwrap();
|
||||
// Get an episode
|
||||
let episode: Episode = {
|
||||
let con = database::connection();
|
||||
dbqueries::get_episode_from_pk(&*con.get().unwrap(), "e000: Hello, world!", pd.id())
|
||||
.unwrap()
|
||||
};
|
||||
let episode: Episode =
|
||||
dbqueries::get_episode_from_pk("e000: Hello, world!", pd.id()).unwrap();
|
||||
|
||||
let (sender, _rx) = channel();
|
||||
|
||||
|
||||
@ -84,20 +84,28 @@ mod tests {
|
||||
use hammond_data::Source;
|
||||
use hammond_data::dbqueries;
|
||||
|
||||
use hyper::Client;
|
||||
use hyper_tls::HttpsConnector;
|
||||
use tokio_core::reactor::Core;
|
||||
|
||||
#[test]
|
||||
// This test inserts an rss feed to your `XDG_DATA/hammond/hammond.db` so we make it explicit
|
||||
// to run it.
|
||||
#[ignore]
|
||||
fn test_get_pixbuf_from_path() {
|
||||
let url = "http://www.newrustacean.com/feed.xml";
|
||||
let mut core = Core::new().unwrap();
|
||||
let client = Client::configure()
|
||||
.connector(HttpsConnector::new(4, &core.handle()).unwrap())
|
||||
.build(&core.handle());
|
||||
|
||||
let url = "http://www.newrustacean.com/feed.xml";
|
||||
// Create and index a source
|
||||
let mut source = Source::from_url(url).unwrap();
|
||||
let source = Source::from_url(url).unwrap();
|
||||
// Copy it's id
|
||||
let sid = source.id();
|
||||
|
||||
// Convert Source it into a Feed and index it
|
||||
let feed = source.into_feed(true).unwrap();
|
||||
// Convert Source it into a future Feed and index it
|
||||
let future = source.into_feed(&client, true);
|
||||
let feed = core.run(future).unwrap();
|
||||
feed.index().unwrap();
|
||||
|
||||
// Get the Podcast
|
||||
|
||||
Loading…
Reference in New Issue
Block a user