Made a type allias for Arc<Mutex<SqliteConnection>.

This commit is contained in:
Jordan Petridis 2017-10-23 12:29:04 +03:00
parent 0e5d976514
commit 8a313c145b
No known key found for this signature in database
GPG Key ID: CEABAD9F5683B9A6
9 changed files with 63 additions and 102 deletions

View File

@ -5,7 +5,6 @@ use diesel;
use rss;
use reqwest;
use rayon::prelude::*;
use std::sync::{Arc, Mutex};
use schema;
use dbqueries;
@ -13,9 +12,13 @@ use models::*;
use errors::*;
use feedparser;
use std::sync::{Arc, Mutex};
#[derive(Debug)]
pub struct Feed(pub reqwest::Response, pub Source);
pub type Database = Arc<Mutex<SqliteConnection>>;
fn index_source(con: &SqliteConnection, foo: &NewSource) -> Result<()> {
match dbqueries::load_source_from_uri(con, foo.uri) {
Ok(_) => Ok(()),
@ -80,8 +83,8 @@ fn insert_return_episode(con: &SqliteConnection, ep: &NewEpisode) -> Result<Epis
Ok(dbqueries::load_episode_from_uri(con, ep.uri.unwrap())?)
}
pub fn index_loop(db: &Arc<Mutex<SqliteConnection>>, force: bool) -> Result<()> {
let mut f = fetch_feeds(db, force)?;
pub fn index_loop(db: &Database) -> Result<()> {
let mut f = fetch_feeds(db)?;
f.par_iter_mut()
.for_each(|&mut Feed(ref mut req, ref source)| {
@ -98,7 +101,7 @@ pub fn index_loop(db: &Arc<Mutex<SqliteConnection>>, force: bool) -> Result<()>
pub fn complete_index_from_source(
req: &mut reqwest::Response,
source: &Source,
db: &Arc<Mutex<SqliteConnection>>,
db: &Database,
) -> Result<()> {
use std::io::Read;
use std::str::FromStr;
@ -107,16 +110,12 @@ pub fn complete_index_from_source(
req.read_to_string(&mut buf)?;
let chan = rss::Channel::from_str(&buf)?;
complete_index(&db, &chan, source)?;
complete_index(db, &chan, source)?;
Ok(())
}
fn complete_index(
db: &Arc<Mutex<SqliteConnection>>,
chan: &rss::Channel,
parent: &Source,
) -> Result<()> {
fn complete_index(db: &Database, chan: &rss::Channel, parent: &Source) -> Result<()> {
let pd = {
let conn = db.lock().unwrap();
index_channel(&conn, chan, parent)?
@ -132,7 +131,7 @@ fn index_channel(con: &SqliteConnection, chan: &rss::Channel, parent: &Source) -
Ok(pd)
}
fn index_channel_items(db: &Arc<Mutex<SqliteConnection>>, it: &[rss::Item], pd: &Podcast) {
fn index_channel_items(db: &Database, it: &[rss::Item], pd: &Podcast) {
let episodes: Vec<_> = it.par_iter()
.map(|x| feedparser::parse_episode(x, pd.id()))
.collect();
@ -140,7 +139,7 @@ fn index_channel_items(db: &Arc<Mutex<SqliteConnection>>, it: &[rss::Item], pd:
let conn = db.lock().unwrap();
let e = conn.transaction::<(), Error, _>(|| {
episodes.iter().for_each(|x| {
let e = index_episode(&conn, &x);
let e = index_episode(&conn, x);
if let Err(err) = e {
error!("Failed to index episode: {:?}.", x);
error!("Error msg: {}", err);
@ -157,7 +156,7 @@ fn index_channel_items(db: &Arc<Mutex<SqliteConnection>>, it: &[rss::Item], pd:
}
// Maybe this can be refactored into an Iterator for lazy evaluation.
pub fn fetch_feeds(db: &Arc<Mutex<SqliteConnection>>, force: bool) -> Result<Vec<Feed>> {
pub fn fetch_feeds(db: &Database) -> Result<Vec<Feed>> {
let mut feeds = {
let conn = db.lock().unwrap();
dbqueries::get_sources(&conn)?
@ -166,7 +165,7 @@ pub fn fetch_feeds(db: &Arc<Mutex<SqliteConnection>>, force: bool) -> Result<Vec
let results: Vec<Feed> = feeds
.par_iter_mut()
.filter_map(|x| {
let l = refresh_source(db, x, force);
let l = refresh_source(db, x);
if l.is_ok() {
l.ok()
} else {
@ -180,34 +179,26 @@ pub fn fetch_feeds(db: &Arc<Mutex<SqliteConnection>>, force: bool) -> Result<Vec
Ok(results)
}
pub fn refresh_source(
db: &Arc<Mutex<SqliteConnection>>,
feed: &mut Source,
force: bool,
) -> Result<Feed> {
pub fn refresh_source(db: &Database, feed: &mut Source) -> Result<Feed> {
use reqwest::header::{ETag, EntityTag, Headers, HttpDate, LastModified};
let client = reqwest::Client::new();
let req = if force {
client.get(feed.uri()).send()?
} else {
let mut headers = Headers::new();
let mut headers = Headers::new();
if let Some(foo) = feed.http_etag() {
headers.set(ETag(EntityTag::new(true, foo.to_owned())));
if let Some(foo) = feed.http_etag() {
headers.set(ETag(EntityTag::new(true, foo.to_owned())));
}
if let Some(foo) = feed.last_modified() {
if let Ok(x) = foo.parse::<HttpDate>() {
headers.set(LastModified(x));
}
}
if let Some(foo) = feed.last_modified() {
if let Ok(x) = foo.parse::<HttpDate>() {
headers.set(LastModified(x));
}
}
// FIXME: I have fucked up somewhere here.
// Getting back 200 codes even though I supposedly sent etags.
// info!("Headers: {:?}", headers);
client.get(feed.uri()).headers(headers).send()?
};
// FIXME: I have fucked up somewhere here.
// Getting back 200 codes even though I supposedly sent etags.
// info!("Headers: {:?}", headers);
let req = client.get(feed.uri()).headers(headers).send()?;
info!("GET to {} , returned: {}", feed.uri(), req.status());
@ -277,10 +268,10 @@ mod tests {
index_source(&tempdb, &NewSource::new_with_uri(feed)).unwrap()
});
index_loop(&db, true).unwrap();
index_loop(&db).unwrap();
// Run again to cover Unique constrains erros.
index_loop(&db, true).unwrap();
index_loop(&db).unwrap();
}
#[test]

View File

@ -25,11 +25,10 @@ extern crate xdg;
pub mod dbqueries;
pub mod models;
pub mod schema;
pub mod index_feed;
pub mod feedparser;
pub mod errors;
mod feedparser;
mod schema;
use diesel::prelude::*;
use errors::*;

View File

@ -1,11 +1,9 @@
use reqwest;
use diesel::SaveChangesDsl;
use SqliteConnection;
use reqwest::header::{ETag, LastModified};
use std::sync::{Arc, Mutex};
use schema::{episode, podcast, source};
use index_feed::Database;
use errors::*;
#[derive(Queryable, Identifiable, AsChangeset, Associations)]
@ -180,11 +178,7 @@ impl<'a> Source {
/// Extract Etag and LastModifier from req, and update self and the
/// corresponding db row.
pub fn update_etag(
&mut self,
db: &Arc<Mutex<SqliteConnection>>,
req: &reqwest::Response,
) -> Result<()> {
pub fn update_etag(&mut self, db: &Database, req: &reqwest::Response) -> Result<()> {
let headers = req.headers();
// let etag = headers.get_raw("ETag").unwrap();

View File

@ -5,9 +5,9 @@ use diesel::prelude::*;
use std::fs::{rename, DirBuilder, File};
use std::io::{BufWriter, Read, Write};
use std::path::Path;
use std::sync::{Arc, Mutex};
use errors::*;
use hammond_data::index_feed::Database;
use hammond_data::dbqueries;
use hammond_data::models::Episode;
use hammond_data::{DL_DIR, HAMMOND_CACHE};
@ -58,7 +58,7 @@ pub fn download_to(target: &str, url: &str) -> Result<()> {
// Initial messy prototype, queries load alot of not needed stuff.
// TODO: Refactor
pub fn latest_dl(connection: &Arc<Mutex<SqliteConnection>>, limit: u32) -> Result<()> {
pub fn latest_dl(connection: &Database, limit: u32) -> Result<()> {
let pds = {
let tempdb = connection.lock().unwrap();
dbqueries::get_podcasts(&tempdb)?
@ -104,11 +104,7 @@ pub fn get_dl_folder(pd_title: &str) -> Result<String> {
}
// TODO: Refactor
pub fn get_episode(
connection: &Arc<Mutex<SqliteConnection>>,
ep: &mut Episode,
dl_folder: &str,
) -> Result<()> {
pub fn get_episode(connection: &Database, ep: &mut Episode, dl_folder: &str) -> Result<()> {
// Check if its alrdy downloaded
if ep.local_uri().is_some() {
if Path::new(ep.local_uri().unwrap()).exists() {

View File

@ -1,12 +1,10 @@
use gtk;
use gtk::prelude::*;
use diesel::prelude::SqliteConnection;
use index_feed;
use hammond_data::index_feed::Database;
use utils;
use std::sync::{Arc, Mutex};
// http://gtk-rs.org/tuto/closures
macro_rules! clone {
(@param _) => ( _ );
@ -25,7 +23,7 @@ macro_rules! clone {
);
}
pub fn get_headerbar(db: &Arc<Mutex<SqliteConnection>>, stack: &gtk::Stack) -> gtk::HeaderBar {
pub fn get_headerbar(db: &Database, stack: &gtk::Stack) -> gtk::HeaderBar {
let builder = include_str!("../gtk/headerbar.ui");
let builder = gtk::Builder::new_from_string(builder);
@ -66,7 +64,7 @@ pub fn get_headerbar(db: &Arc<Mutex<SqliteConnection>>, stack: &gtk::Stack) -> g
header
}
fn on_add_bttn_clicked(db: &Arc<Mutex<SqliteConnection>>, stack: &gtk::Stack, url: &str) {
fn on_add_bttn_clicked(db: &Database, stack: &gtk::Stack, url: &str) {
let source = {
let tempdb = db.lock().unwrap();
index_feed::insert_return_source(&tempdb, url)

View File

@ -1,16 +1,14 @@
use glib;
use gtk;
// use gtk::prelude::*;
use hammond_data;
use hammond_data::index_feed::Feed;
use hammond_data::models::Source;
use diesel::prelude::SqliteConnection;
use hammond_data::index_feed::Database;
use std::thread;
use std::cell::RefCell;
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{channel, Receiver};
use views::podcasts_view;
@ -35,11 +33,11 @@ macro_rules! clone {
// Create a thread local storage that will store the arguments to be transfered.
thread_local!(
static GLOBAL: RefCell<Option<(Arc<Mutex<SqliteConnection>>,
static GLOBAL: RefCell<Option<(Database,
gtk::Stack,
Receiver<bool>)>> = RefCell::new(None));
pub fn refresh_db(db: &Arc<Mutex<SqliteConnection>>, stack: &gtk::Stack) {
pub fn refresh_db(db: &Database, stack: &gtk::Stack) {
// Create a async channel.
let (sender, receiver) = channel();
@ -51,7 +49,7 @@ pub fn refresh_db(db: &Arc<Mutex<SqliteConnection>>, stack: &gtk::Stack) {
// The implementation of how this is done is probably terrible but it works!.
// TODO: add timeout option and error reporting.
thread::spawn(clone!(db => move || {
let t = hammond_data::index_feed::index_loop(&db, false);
let t = hammond_data::index_feed::index_loop(&db);
if t.is_err() {
error!("Error While trying to update the database.");
error!("Error msg: {}", t.unwrap_err());
@ -63,7 +61,7 @@ pub fn refresh_db(db: &Arc<Mutex<SqliteConnection>>, stack: &gtk::Stack) {
}));
}
pub fn refresh_feed(db: &Arc<Mutex<SqliteConnection>>, stack: &gtk::Stack, source: &mut Source) {
pub fn refresh_feed(db: &Database, stack: &gtk::Stack, source: &mut Source) {
let (sender, receiver) = channel();
GLOBAL.with(clone!(db, stack => move |global| {
@ -73,7 +71,7 @@ pub fn refresh_feed(db: &Arc<Mutex<SqliteConnection>>, stack: &gtk::Stack, sourc
let mut source = source.clone();
// TODO: add timeout option and error reporting.
thread::spawn(clone!(db => move || {
let foo_ = hammond_data::index_feed::refresh_source(&db, &mut source, false);
let foo_ = hammond_data::index_feed::refresh_source(&db, &mut source);
if let Ok(x) = foo_ {
let Feed(mut req, s) = x;

View File

@ -1,10 +1,8 @@
use gtk;
use gtk::prelude::*;
use diesel::prelude::SqliteConnection;
use hammond_data::dbqueries;
use std::sync::{Arc, Mutex};
use hammond_data::index_feed::Database;
use widgets::podcast::*;
@ -28,11 +26,7 @@ macro_rules! clone {
// NOT IN USE.
// TRYING OUT STORELESS ATM.
pub fn populate_podcasts_flowbox(
db: &Arc<Mutex<SqliteConnection>>,
stack: &gtk::Stack,
flowbox: &gtk::FlowBox,
) {
pub fn populate_podcasts_flowbox(db: &Database, stack: &gtk::Stack, flowbox: &gtk::FlowBox) {
let tempdb = db.lock().unwrap();
let pd_model = podcast_liststore(&tempdb);
drop(tempdb);
@ -99,11 +93,7 @@ fn show_empty_view(stack: &gtk::Stack) {
info!("Empty view.");
}
pub fn populate_flowbox_no_store(
db: &Arc<Mutex<SqliteConnection>>,
stack: &gtk::Stack,
flowbox: &gtk::FlowBox,
) {
pub fn populate_flowbox_no_store(db: &Database, stack: &gtk::Stack, flowbox: &gtk::FlowBox) {
let podcasts = {
let db = db.lock().unwrap();
dbqueries::get_podcasts(&db)
@ -126,12 +116,12 @@ pub fn populate_flowbox_no_store(
}
}
fn setup_podcast_widget(db: &Arc<Mutex<SqliteConnection>>, stack: &gtk::Stack) {
fn setup_podcast_widget(db: &Database, stack: &gtk::Stack) {
let pd_widget = podcast_widget(db, None, None, None);
stack.add_named(&pd_widget, "pdw");
}
fn setup_podcasts_grid(db: &Arc<Mutex<SqliteConnection>>, stack: &gtk::Stack) {
fn setup_podcasts_grid(db: &Database, stack: &gtk::Stack) {
let builder = include_str!("../../gtk/podcasts_view.ui");
let builder = gtk::Builder::new_from_string(builder);
let grid: gtk::Grid = builder.get_object("grid").unwrap();
@ -146,14 +136,14 @@ fn setup_podcasts_grid(db: &Arc<Mutex<SqliteConnection>>, stack: &gtk::Stack) {
populate_flowbox_no_store(db, stack, &flowbox);
}
pub fn setup_stack(db: &Arc<Mutex<SqliteConnection>>) -> gtk::Stack {
pub fn setup_stack(db: &Database) -> gtk::Stack {
let stack = gtk::Stack::new();
setup_podcast_widget(db, &stack);
setup_podcasts_grid(db, &stack);
stack
}
pub fn update_podcasts_view(db: &Arc<Mutex<SqliteConnection>>, stack: &gtk::Stack) {
pub fn update_podcasts_view(db: &Database, stack: &gtk::Stack) {
let builder = include_str!("../../gtk/podcasts_view.ui");
let builder = gtk::Builder::new_from_string(builder);
let grid: gtk::Grid = builder.get_object("grid").unwrap();

View File

@ -1,14 +1,14 @@
use open;
use diesel::prelude::SqliteConnection;
use hammond_data::dbqueries;
use hammond_data::models::Episode;
use hammond_downloader::downloader;
use hammond_data::index_feed::Database;
use dissolve::strip_html_tags;
use std::thread;
use std::cell::RefCell;
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{channel, Receiver};
use std::path::Path;
@ -43,11 +43,7 @@ thread_local!(
Receiver<bool>))>> = RefCell::new(None));
// TODO: REFACTOR AND MODULATE ME.
fn epidose_widget(
db: &Arc<Mutex<SqliteConnection>>,
episode: &mut Episode,
pd_title: &str,
) -> gtk::Box {
fn epidose_widget(db: &Database, episode: &mut Episode, pd_title: &str) -> gtk::Box {
// This is just a prototype and will be reworked probably.
let builder = include_str!("../../gtk/episode_widget.ui");
let builder = gtk::Builder::new_from_string(builder);
@ -103,7 +99,7 @@ fn epidose_widget(
// TODO: show notification when dl is finished and block play_bttn till then.
fn on_dl_clicked(
db: &Arc<Mutex<SqliteConnection>>,
db: &Database,
pd_title: &str,
ep: &mut Episode,
dl_bttn: &gtk::Button,
@ -131,7 +127,7 @@ fn on_dl_clicked(
}));
}
fn on_play_bttn_clicked(db: &Arc<Mutex<SqliteConnection>>, episode_id: i32) {
fn on_play_bttn_clicked(db: &Database, episode_id: i32) {
let local_uri = {
let tempdb = db.lock().unwrap();
dbqueries::get_episode_local_uri(&tempdb, episode_id).unwrap()
@ -168,7 +164,7 @@ fn receive() -> glib::Continue {
glib::Continue(false)
}
pub fn episodes_listbox(db: &Arc<Mutex<SqliteConnection>>, pd_title: &str) -> gtk::ListBox {
pub fn episodes_listbox(db: &Database, pd_title: &str) -> gtk::ListBox {
// TODO: handle unwraps.
let conn = db.lock().unwrap();
let pd = dbqueries::load_podcast_from_title(&conn, pd_title).unwrap();

View File

@ -6,13 +6,12 @@ use diesel::prelude::SqliteConnection;
use hammond_data::dbqueries;
use hammond_data::models::Podcast;
use hammond_downloader::downloader;
use std::sync::{Arc, Mutex};
use hammond_data::index_feed::Database;
use widgets::episode::episodes_listbox;
pub fn podcast_widget(
db: &Arc<Mutex<SqliteConnection>>,
db: &Database,
title: Option<&str>,
description: Option<&str>,
image: Option<Pixbuf>,
@ -73,7 +72,7 @@ pub fn create_flowbox_child(title: &str, cover: Option<Pixbuf>) -> gtk::FlowBoxC
}
pub fn on_flowbox_child_activate(
db: &Arc<Mutex<SqliteConnection>>,
db: &Database,
stack: &gtk::Stack,
parent: &Podcast,
pixbuf: Option<Pixbuf>,
@ -118,7 +117,7 @@ pub fn podcast_liststore(connection: &SqliteConnection) -> gtk::ListStore {
podcast_model
}
// pub fn update_podcast_widget(db: &&Arc<Mutex<SqliteConnection>>, stack: &gtk::Stack, pd:
// pub fn update_podcast_widget(db: &&Database, stack: &gtk::Stack, pd:
// &Podcast){
// let old = stack.get_child_by_name("pdw").unwrap();
// let pdw = pd_widget_from_diesel_model(&db.clone(), pd, &stack.clone());
@ -129,7 +128,7 @@ pub fn podcast_liststore(connection: &SqliteConnection) -> gtk::ListStore {
// stack.set_visible_child_name(&vis);
// }
// pub fn pd_widget_from_diesel_model(db: &Arc<Mutex<SqliteConnection>>, pd: &Podcast) -> gtk::Box {
// pub fn pd_widget_from_diesel_model(db: &Database, pd: &Podcast) -> gtk::Box {
// let img = get_pixbuf_from_path(pd.image_uri(), pd.title());
// podcast_widget(db, Some(pd.title()), Some(pd.description()), img)
// }