Rayon is so nice.
This commit is contained in:
parent
ea15e6aa63
commit
e61044aebb
@ -98,18 +98,19 @@ pub fn index_loop(db: SqliteConnection) -> Result<()> {
|
|||||||
use std::io::Read;
|
use std::io::Read;
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
|
|
||||||
let mut f = fetch_feeds(&db)?;
|
|
||||||
let bar = Arc::new(Mutex::new(db));
|
let bar = Arc::new(Mutex::new(db));
|
||||||
|
|
||||||
|
let mut f = fetch_feeds(bar.clone())?;
|
||||||
|
|
||||||
for &mut (ref mut req, ref source) in f.iter_mut() {
|
for &mut (ref mut req, ref source) in f.iter_mut() {
|
||||||
let mut buf = String::new();
|
let mut buf = String::new();
|
||||||
req.read_to_string(&mut buf)?;
|
req.read_to_string(&mut buf)?;
|
||||||
let chan = rss::Channel::from_str(&buf)?;
|
let chan = rss::Channel::from_str(&buf)?;
|
||||||
let pd = feedparser::parse_podcast(&chan, source.id())?;
|
let pd = feedparser::parse_podcast(&chan, source.id())?;
|
||||||
|
|
||||||
let fakedb = bar.lock().unwrap();
|
let tempdb = bar.lock().unwrap();
|
||||||
let pd = insert_return_podcast(&fakedb, &pd)?;
|
let pd = insert_return_podcast(&tempdb, &pd)?;
|
||||||
drop(fakedb);
|
drop(tempdb);
|
||||||
|
|
||||||
let foo: Vec<_> = chan.items()
|
let foo: Vec<_> = chan.items()
|
||||||
.par_iter()
|
.par_iter()
|
||||||
@ -134,12 +135,20 @@ pub fn index_loop(db: SqliteConnection) -> Result<()> {
|
|||||||
|
|
||||||
// TODO: maybe refactor into an Iterator for lazy evaluation.
|
// TODO: maybe refactor into an Iterator for lazy evaluation.
|
||||||
// TODO: After fixing etag/lmod, add sent_etag:bool arg and logic to bypass it.
|
// TODO: After fixing etag/lmod, add sent_etag:bool arg and logic to bypass it.
|
||||||
pub fn fetch_feeds(connection: &SqliteConnection) -> Result<Vec<(reqwest::Response, Source)>> {
|
pub fn fetch_feeds(
|
||||||
let mut feeds = dbqueries::get_sources(connection)?;
|
connection: Arc<Mutex<SqliteConnection>>,
|
||||||
|
) -> Result<Vec<(reqwest::Response, Source)>> {
|
||||||
|
let tempdb = connection.lock().unwrap();
|
||||||
|
let mut feeds = dbqueries::get_sources(&tempdb)?;
|
||||||
|
drop(tempdb);
|
||||||
|
|
||||||
let results: Vec<(reqwest::Response, Source)> = feeds
|
let results: Vec<_> = feeds
|
||||||
.iter_mut()
|
.par_iter_mut()
|
||||||
.map(|x| refresh_source(connection, x).unwrap())
|
.map(|x| {
|
||||||
|
let dbmutex = connection.clone();
|
||||||
|
let db = dbmutex.lock().unwrap();
|
||||||
|
refresh_source(&db, x).unwrap()
|
||||||
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
Ok(results)
|
Ok(results)
|
||||||
@ -155,14 +164,14 @@ fn refresh_source(
|
|||||||
let mut headers = Headers::new();
|
let mut headers = Headers::new();
|
||||||
|
|
||||||
if let Some(foo) = feed.http_etag() {
|
if let Some(foo) = feed.http_etag() {
|
||||||
headers.set(ETag(EntityTag::new(false, foo.to_owned())));
|
headers.set(ETag(EntityTag::new(true, foo.to_owned())));
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(foo) = feed.last_modified() {
|
if let Some(foo) = feed.last_modified() {
|
||||||
headers.set(LastModified(foo.parse::<HttpDate>()?));
|
headers.set(LastModified(foo.parse::<HttpDate>()?));
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("{:?}", headers);
|
info!("Headers: {:?}", headers);
|
||||||
// FIXME: I have fucked up somewhere here.
|
// FIXME: I have fucked up somewhere here.
|
||||||
// Getting back 200 codes even though I supposedly sent etags.
|
// Getting back 200 codes even though I supposedly sent etags.
|
||||||
let req = client.get(feed.uri())?.headers(headers).send()?;
|
let req = client.get(feed.uri())?.headers(headers).send()?;
|
||||||
|
|||||||
@ -180,19 +180,17 @@ impl<'a> Source {
|
|||||||
/// corresponding db row.
|
/// corresponding db row.
|
||||||
pub fn update_etag(&mut self, con: &SqliteConnection, req: &reqwest::Response) -> Result<()> {
|
pub fn update_etag(&mut self, con: &SqliteConnection, req: &reqwest::Response) -> Result<()> {
|
||||||
let headers = req.headers();
|
let headers = req.headers();
|
||||||
debug!("{:#?}", headers);
|
|
||||||
|
|
||||||
// let etag = headers.get_raw("ETag").unwrap();
|
// let etag = headers.get_raw("ETag").unwrap();
|
||||||
let etag = headers.get::<ETag>();
|
let etag = headers.get::<ETag>();
|
||||||
let lmod = headers.get::<LastModified>();
|
let lmod = headers.get::<LastModified>();
|
||||||
|
|
||||||
|
// FIXME: This dsnt work most of the time apparently
|
||||||
if self.http_etag() != etag.map(|x| x.tag())
|
if self.http_etag() != etag.map(|x| x.tag())
|
||||||
|| self.last_modified != lmod.map(|x| format!("{}", x))
|
|| self.last_modified != lmod.map(|x| format!("{}", x))
|
||||||
{
|
{
|
||||||
self.http_etag = etag.map(|x| x.tag().to_string().to_owned());
|
self.http_etag = etag.map(|x| x.tag().to_string().to_owned());
|
||||||
self.last_modified = lmod.map(|x| format!("{}", x));
|
self.last_modified = lmod.map(|x| format!("{}", x));
|
||||||
info!("Self etag: {:?}", self.http_etag);
|
|
||||||
info!("Self last_mod: {:?}", self.last_modified);
|
|
||||||
self.save_changes::<Source>(con)?;
|
self.save_changes::<Source>(con)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user