Playing around with rayon.

This commit is contained in:
Jordan Petridis 2017-09-26 10:21:37 +03:00
parent e1cc4f0d9f
commit a0064fcb4f
No known key found for this signature in database
GPG Key ID: CEABAD9F5683B9A6
5 changed files with 58 additions and 19 deletions

View File

@ -4,6 +4,7 @@ version = "0.1.0"
authors = ["Jordan Petridis <jordanpetridis@protonmail.com>"] authors = ["Jordan Petridis <jordanpetridis@protonmail.com>"]
[dependencies] [dependencies]
rayon = "0.8.2"
error-chain = "0.11.0" error-chain = "0.11.0"
structopt = "0.1.0" structopt = "0.1.0"
structopt-derive = "0.1.0" structopt-derive = "0.1.0"

View File

@ -55,7 +55,7 @@ pub fn parse_episode<'a>(item: &'a Item, parent_id: i32) -> Result<models::NewEp
// FIXME: Figure out the format sequence of rfc822. // FIXME: Figure out the format sequence of rfc822.
// This is the closest I got it, // This is the closest I got it,
// its also a direct copy of the sequence of rfc2822. // its also a direct copy of the sequence of rfc2822.
let date = DateTime::parse_from_str(&foo, "%a, %e %b %Y %H:%M:%S %z"); let date = DateTime::parse_from_str(&foo, "%a, %e %B %Y %H:%M:%S %z");
match date { match date {
Ok(bar) => bar.timestamp() as i32, Ok(bar) => bar.timestamp() as i32,

View File

@ -2,12 +2,14 @@ use diesel::prelude::*;
use diesel; use diesel;
use rss; use rss;
use reqwest; use reqwest;
use rayon::prelude::*;
use std::sync::{Arc, Mutex};
use schema; use schema;
use dbqueries; use dbqueries;
use feedparser; use feedparser;
use errors::*; use errors::*;
use models::{Episode, NewSource, Podcast, Source}; use models::{Episode, NewEpisode, NewSource, Podcast, Source};
pub fn foo() { pub fn foo() {
let inpt = vec![ let inpt = vec![
@ -28,7 +30,7 @@ pub fn foo() {
} }
} }
index_loop(&db).unwrap(); index_loop(db).unwrap();
} }
fn insert_source(con: &SqliteConnection, url: &str) -> Result<Source> { fn insert_source(con: &SqliteConnection, url: &str) -> Result<Source> {
@ -70,8 +72,8 @@ fn index_podcast(
Ok(dbqueries::load_podcast(con, &pd.title)?) Ok(dbqueries::load_podcast(con, &pd.title)?)
} }
fn index_episode(con: &SqliteConnection, item: &rss::Item, parent: &Podcast) -> Result<Episode> { fn index_episode(con: &SqliteConnection, ep: &NewEpisode) -> Result<Episode> {
let ep = feedparser::parse_episode(item, parent.id())?; // let ep = feedparser::parse_episode(item, parent.id())?;
match dbqueries::load_episode(con, &ep.uri.unwrap()) { match dbqueries::load_episode(con, &ep.uri.unwrap()) {
Ok(mut foo) => if foo.title() != ep.title || foo.published_date() != ep.published_date { Ok(mut foo) => if foo.title() != ep.title || foo.published_date() != ep.published_date {
@ -84,41 +86,59 @@ fn index_episode(con: &SqliteConnection, item: &rss::Item, parent: &Podcast) ->
foo.save_changes::<Episode>(con)?; foo.save_changes::<Episode>(con)?;
}, },
Err(_) => { Err(_) => {
diesel::insert(&ep) diesel::insert(ep).into(schema::episode::table).execute(con)?;
.into(schema::episode::table)
.execute(con)?;
} }
} }
Ok(dbqueries::load_episode(con, &ep.uri.unwrap())?) Ok(dbqueries::load_episode(con, &ep.uri.unwrap())?)
} }
pub fn index_loop(db: &SqliteConnection) -> Result<()> { pub fn index_loop(db: SqliteConnection) -> Result<()> {
use std::io::Read; use std::io::Read;
use std::str::FromStr; use std::str::FromStr;
let mut f = fetch_feeds(db)?; let mut f = fetch_feeds(&db)?;
let bar = Arc::new(Mutex::new(db));
for &mut (ref mut req, ref source) in f.iter_mut() { for &mut (ref mut req, ref source) in f.iter_mut() {
let mut buf = String::new(); let mut buf = String::new();
req.read_to_string(&mut buf)?; req.read_to_string(&mut buf)?;
let chan = rss::Channel::from_str(&buf)?; let chan = rss::Channel::from_str(&buf)?;
let pd = index_podcast(db, &chan, source)?; let mut pd = Podcast::new();
let _: Vec<_> = chan.items() {
.iter() let fakedb = bar.lock().unwrap();
.map(|x| index_episode(db, &x, &pd)) pd = index_podcast(&fakedb, &chan, source)?;
}
let foo: Vec<_> = chan.items()
.par_iter()
.map(|x| feedparser::parse_episode(&x, pd.id()).unwrap())
.collect();
// info!("{:#?}", pd);
info!("{:#?}", foo);
let _: Vec<_> = foo.par_iter()
.map(|x| {
let z = bar.clone();
baz(z, x)
})
.collect(); .collect();
info!("{:#?}", pd);
// info!("{:#?}", episodes); // info!("{:#?}", episodes);
// info!("{:?}", chan); // info!("{:?}", chan);
} }
Ok(()) Ok(())
} }
// TODO: make it into an iterator that yields reqwest::response fn baz(arc: Arc<Mutex<SqliteConnection>>, ep: &NewEpisode) -> Result<()> {
let db = arc.lock().unwrap();
index_episode(&db, ep)?;
Ok(())
}
// TODO: refactor into an Iterator
// TODO: After fixing etag/lmod, add sent_etag:bool arg and logic to bypass it. // TODO: After fixing etag/lmod, add sent_etag:bool arg and logic to bypass it.
pub fn fetch_feeds(connection: &SqliteConnection) -> Result<Vec<(reqwest::Response, Source)>> { pub fn fetch_feeds(connection: &SqliteConnection) -> Result<Vec<(reqwest::Response, Source)>> {
use reqwest::header::{ETag, EntityTag, Headers, HttpDate, LastModified}; use reqwest::header::{ETag, EntityTag, Headers, HttpDate, LastModified};
@ -140,12 +160,16 @@ pub fn fetch_feeds(connection: &SqliteConnection) -> Result<Vec<(reqwest::Respon
} }
info!("{:?}", headers); info!("{:?}", headers);
// FIXME: I have fucked up something here. // FIXME: I have fucked up somewhere here.
// Getting back 200 codes even though I supposedly sent etags. // Getting back 200 codes even though I supposedly sent etags.
let req = client.get(feed.uri())?.headers(headers).send()?; let req = client.get(feed.uri())?.headers(headers).send()?;
info!("{}", req.status()); info!("{}", req.status());
// TODO match on more stuff // TODO match on more stuff
// 301: Permanent redirect of the url
// 302: Temporary redirect of the url
// 304: Up to date Feed, checked with the Etag
// 410: Feed deleted
match req.status() { match req.status() {
reqwest::StatusCode::NotModified => { reqwest::StatusCode::NotModified => {
continue; continue;

View File

@ -21,6 +21,7 @@ extern crate diesel_codegen;
extern crate chrono; extern crate chrono;
extern crate hyper; extern crate hyper;
extern crate rayon;
extern crate reqwest; extern crate reqwest;
extern crate rss; extern crate rss;
extern crate time; extern crate time;
@ -43,6 +44,7 @@ pub mod errors {
use time; use time;
use diesel::migrations::RunMigrationsError; use diesel::migrations::RunMigrationsError;
use diesel::result; use diesel::result;
// use std::sync;
error_chain! { error_chain! {
foreign_links { foreign_links {
@ -55,6 +57,7 @@ pub mod errors {
ChronoError(chrono::ParseError); ChronoError(chrono::ParseError);
DurationError(time::OutOfRangeError); DurationError(time::OutOfRangeError);
HyperError(hyper::error::Error); HyperError(hyper::error::Error);
// MutexPoison(sync::PoisonError);
} }
} }
} }
@ -75,11 +78,11 @@ lazy_static!{
HAMMOND_XDG.create_data_directory(HAMMOND_XDG.get_data_home()).unwrap() HAMMOND_XDG.create_data_directory(HAMMOND_XDG.get_data_home()).unwrap()
}; };
static ref HAMMOND_CONFIG: PathBuf = { static ref _HAMMOND_CONFIG: PathBuf = {
HAMMOND_XDG.create_config_directory(HAMMOND_XDG.get_config_home()).unwrap() HAMMOND_XDG.create_config_directory(HAMMOND_XDG.get_config_home()).unwrap()
}; };
static ref HAMMOND_CACHE: PathBuf = { static ref _HAMMOND_CACHE: PathBuf = {
HAMMOND_XDG.create_cache_directory(HAMMOND_XDG.get_cache_home()).unwrap() HAMMOND_XDG.create_cache_directory(HAMMOND_XDG.get_cache_home()).unwrap()
}; };

View File

@ -108,6 +108,17 @@ pub struct Podcast {
} }
impl Podcast { impl Podcast {
pub fn new() -> Podcast {
Podcast {
id: 0,
title: String::new(),
link: String::new(),
description: String::new(),
image_uri: None,
source_id: 0,
}
}
pub fn id(&self) -> i32 { pub fn id(&self) -> i32 {
self.id self.id
} }