Database optimizations.

This commit is contained in:
Jordan Petridis 2017-10-23 10:47:54 +03:00
parent 9beea21a4f
commit 0e5d976514
No known key found for this signature in database
GPG Key ID: CEABAD9F5683B9A6
6 changed files with 58 additions and 36 deletions

View File

@ -98,7 +98,7 @@ pub fn index_loop(db: &Arc<Mutex<SqliteConnection>>, force: bool) -> Result<()>
pub fn complete_index_from_source( pub fn complete_index_from_source(
req: &mut reqwest::Response, req: &mut reqwest::Response,
source: &Source, source: &Source,
mutex: &Arc<Mutex<SqliteConnection>>, db: &Arc<Mutex<SqliteConnection>>,
) -> Result<()> { ) -> Result<()> {
use std::io::Read; use std::io::Read;
use std::str::FromStr; use std::str::FromStr;
@ -107,57 +107,66 @@ pub fn complete_index_from_source(
req.read_to_string(&mut buf)?; req.read_to_string(&mut buf)?;
let chan = rss::Channel::from_str(&buf)?; let chan = rss::Channel::from_str(&buf)?;
complete_index(mutex, &chan, source)?; complete_index(&db, &chan, source)?;
Ok(()) Ok(())
} }
fn complete_index( fn complete_index(
connection: &Arc<Mutex<SqliteConnection>>, db: &Arc<Mutex<SqliteConnection>>,
chan: &rss::Channel, chan: &rss::Channel,
parent: &Source, parent: &Source,
) -> Result<()> { ) -> Result<()> {
let pd = { let pd = {
let db = connection.lock().unwrap(); let conn = db.lock().unwrap();
index_channel(&db, chan, parent)? index_channel(&conn, chan, parent)?
}; };
index_channel_items(db, chan.items(), &pd);
index_channel_items(connection, chan.items(), &pd);
Ok(()) Ok(())
} }
fn index_channel(db: &SqliteConnection, chan: &rss::Channel, parent: &Source) -> Result<Podcast> { fn index_channel(con: &SqliteConnection, chan: &rss::Channel, parent: &Source) -> Result<Podcast> {
let pd = feedparser::parse_podcast(chan, parent.id()); let pd = feedparser::parse_podcast(chan, parent.id());
// Convert NewPodcast to Podcast // Convert NewPodcast to Podcast
let pd = insert_return_podcast(db, &pd)?; let pd = insert_return_podcast(con, &pd)?;
Ok(pd) Ok(pd)
} }
fn index_channel_items(connection: &Arc<Mutex<SqliteConnection>>, it: &[rss::Item], pd: &Podcast) { fn index_channel_items(db: &Arc<Mutex<SqliteConnection>>, it: &[rss::Item], pd: &Podcast) {
it.par_iter() let episodes: Vec<_> = it.par_iter()
.map(|x| feedparser::parse_episode(x, pd.id())) .map(|x| feedparser::parse_episode(x, pd.id()))
.for_each(|x| { .collect();
let db = connection.lock().unwrap();
let e = index_episode(&db, &x); let conn = db.lock().unwrap();
let e = conn.transaction::<(), Error, _>(|| {
episodes.iter().for_each(|x| {
let e = index_episode(&conn, &x);
if let Err(err) = e { if let Err(err) = e {
error!("Failed to index episode: {:?}.", x); error!("Failed to index episode: {:?}.", x);
error!("Error msg: {}", err); error!("Error msg: {}", err);
}; };
}); });
Ok(())
});
drop(conn);
if let Err(err) = e {
error!("Episodes Transcaction Failed.");
error!("Error msg: {}", err);
};
} }
// Maybe this can be refactored into an Iterator for lazy evaluation. // Maybe this can be refactored into an Iterator for lazy evaluation.
pub fn fetch_feeds(connection: &Arc<Mutex<SqliteConnection>>, force: bool) -> Result<Vec<Feed>> { pub fn fetch_feeds(db: &Arc<Mutex<SqliteConnection>>, force: bool) -> Result<Vec<Feed>> {
let tempdb = connection.lock().unwrap(); let mut feeds = {
let mut feeds = dbqueries::get_sources(&tempdb)?; let conn = db.lock().unwrap();
drop(tempdb); dbqueries::get_sources(&conn)?
};
let results: Vec<Feed> = feeds let results: Vec<Feed> = feeds
.par_iter_mut() .par_iter_mut()
.filter_map(|x| { .filter_map(|x| {
let db = connection.lock().unwrap(); let l = refresh_source(db, x, force);
let l = refresh_source(&db, x, force);
if l.is_ok() { if l.is_ok() {
l.ok() l.ok()
} else { } else {
@ -172,7 +181,7 @@ pub fn fetch_feeds(connection: &Arc<Mutex<SqliteConnection>>, force: bool) -> Re
} }
pub fn refresh_source( pub fn refresh_source(
connection: &SqliteConnection, db: &Arc<Mutex<SqliteConnection>>,
feed: &mut Source, feed: &mut Source,
force: bool, force: bool,
) -> Result<Feed> { ) -> Result<Feed> {
@ -212,7 +221,7 @@ pub fn refresh_source(
// _ => (), // _ => (),
// }; // };
feed.update_etag(connection, &req)?; feed.update_etag(db, &req)?;
Ok(Feed(req, feed.clone())) Ok(Feed(req, feed.clone()))
} }

View File

@ -3,6 +3,8 @@ use diesel::SaveChangesDsl;
use SqliteConnection; use SqliteConnection;
use reqwest::header::{ETag, LastModified}; use reqwest::header::{ETag, LastModified};
use std::sync::{Arc, Mutex};
use schema::{episode, podcast, source}; use schema::{episode, podcast, source};
use errors::*; use errors::*;
@ -178,7 +180,11 @@ impl<'a> Source {
/// Extract Etag and LastModifier from req, and update self and the /// Extract Etag and LastModifier from req, and update self and the
/// corresponding db row. /// corresponding db row.
pub fn update_etag(&mut self, con: &SqliteConnection, req: &reqwest::Response) -> Result<()> { pub fn update_etag(
&mut self,
db: &Arc<Mutex<SqliteConnection>>,
req: &reqwest::Response,
) -> Result<()> {
let headers = req.headers(); let headers = req.headers();
// let etag = headers.get_raw("ETag").unwrap(); // let etag = headers.get_raw("ETag").unwrap();
@ -191,7 +197,8 @@ impl<'a> Source {
{ {
self.http_etag = etag.map(|x| x.tag().to_string().to_owned()); self.http_etag = etag.map(|x| x.tag().to_string().to_owned());
self.last_modified = lmod.map(|x| format!("{}", x)); self.last_modified = lmod.map(|x| format!("{}", x));
self.save_changes::<Source>(con)?; let con = db.lock().unwrap();
self.save_changes::<Source>(&*con)?;
} }
Ok(()) Ok(())

View File

@ -73,9 +73,7 @@ pub fn refresh_feed(db: &Arc<Mutex<SqliteConnection>>, stack: &gtk::Stack, sourc
let mut source = source.clone(); let mut source = source.clone();
// TODO: add timeout option and error reporting. // TODO: add timeout option and error reporting.
thread::spawn(clone!(db => move || { thread::spawn(clone!(db => move || {
let db_ = db.lock().unwrap(); let foo_ = hammond_data::index_feed::refresh_source(&db, &mut source, false);
let foo_ = hammond_data::index_feed::refresh_source(&db_, &mut source, false);
drop(db_);
if let Ok(x) = foo_ { if let Ok(x) = foo_ {
let Feed(mut req, s) = x; let Feed(mut req, s) = x;

View File

@ -75,6 +75,8 @@ pub fn populate_podcasts_flowbox(
stack.remove(&old); stack.remove(&old);
stack.add_named(&pdw, "pdw"); stack.add_named(&pdw, "pdw");
stack.set_visible_child(&pdw); stack.set_visible_child(&pdw);
// aggresive memory cleanup
// probably not needed
old.destroy(); old.destroy();
println!("Hello World!, child activated"); println!("Hello World!, child activated");
})); }));
@ -167,5 +169,8 @@ pub fn update_podcasts_view(db: &Arc<Mutex<SqliteConnection>>, stack: &gtk::Stac
stack.add_named(&grid, "pd_grid"); stack.add_named(&grid, "pd_grid");
// preserve the visible widget // preserve the visible widget
stack.set_visible_child_name(&vis); stack.set_visible_child_name(&vis);
// aggresive memory cleanup
// probably not needed
old.destroy(); old.destroy();
} }

View File

@ -168,16 +168,16 @@ fn receive() -> glib::Continue {
glib::Continue(false) glib::Continue(false)
} }
pub fn episodes_listbox(connection: &Arc<Mutex<SqliteConnection>>, pd_title: &str) -> gtk::ListBox { pub fn episodes_listbox(db: &Arc<Mutex<SqliteConnection>>, pd_title: &str) -> gtk::ListBox {
// TODO: handle unwraps. // TODO: handle unwraps.
let m = connection.lock().unwrap(); let conn = db.lock().unwrap();
let pd = dbqueries::load_podcast_from_title(&m, pd_title).unwrap(); let pd = dbqueries::load_podcast_from_title(&conn, pd_title).unwrap();
let mut episodes = dbqueries::get_pd_episodes(&m, &pd).unwrap(); let mut episodes = dbqueries::get_pd_episodes(&conn, &pd).unwrap();
drop(m); drop(conn);
let list = gtk::ListBox::new(); let list = gtk::ListBox::new();
episodes.iter_mut().for_each(|ep| { episodes.iter_mut().for_each(|ep| {
let w = epidose_widget(connection, ep, pd_title); let w = epidose_widget(db, ep, pd_title);
list.add(&w) list.add(&w)
}); });

View File

@ -12,7 +12,7 @@ use std::sync::{Arc, Mutex};
use widgets::episode::episodes_listbox; use widgets::episode::episodes_listbox;
pub fn podcast_widget( pub fn podcast_widget(
connection: &Arc<Mutex<SqliteConnection>>, db: &Arc<Mutex<SqliteConnection>>,
title: Option<&str>, title: Option<&str>,
description: Option<&str>, description: Option<&str>,
image: Option<Pixbuf>, image: Option<Pixbuf>,
@ -29,7 +29,7 @@ pub fn podcast_widget(
if let Some(t) = title { if let Some(t) = title {
title_label.set_text(t); title_label.set_text(t);
let listbox = episodes_listbox(connection, t); let listbox = episodes_listbox(db, t);
view.add(&listbox); view.add(&listbox);
} }
@ -84,6 +84,9 @@ pub fn on_flowbox_child_activate(
stack.remove(&old); stack.remove(&old);
stack.add_named(&pdw, "pdw"); stack.add_named(&pdw, "pdw");
stack.set_visible_child(&pdw); stack.set_visible_child(&pdw);
// aggresive memory cleanup
// probably not needed
old.destroy(); old.destroy();
println!("Hello World!, child activated"); println!("Hello World!, child activated");
} }