From 3129c521b4b2ed168690f25dff8154ad3d3a4560 Mon Sep 17 00:00:00 2001 From: mace Date: Wed, 11 Oct 2023 19:04:26 +0200 Subject: [PATCH] Sync working --- src/reader/sync.rs | 92 ++++++++++++++++++++++------------------------ 1 file changed, 43 insertions(+), 49 deletions(-) diff --git a/src/reader/sync.rs b/src/reader/sync.rs index 616c865..729dc99 100644 --- a/src/reader/sync.rs +++ b/src/reader/sync.rs @@ -10,7 +10,7 @@ use crate::{ }; use actix_web::{web, HttpRequest, HttpResponse, Responder}; use diesel::prelude::*; -use futures::StreamExt; +use rss::Item; use scraper::{Html, Selector}; use serde_derive::Deserialize; @@ -19,67 +19,61 @@ pub struct JsonUser { user_id: String, } +fn create_feed_item(item: Item, feed: &Feed) { + let title = item.title.unwrap(); + let frag = Html::parse_fragment(&item.content.unwrap()); + let mut content = "".to_string(); + let frag_clone = frag.clone(); + frag.tree.into_iter().for_each(|node| { + let selector_img = Selector::parse("img").unwrap(); + + for element in frag_clone.select(&selector_img) { + if !content.starts_with("") + } + } + if let scraper::node::Node::Text(text) = node { + content.push_str(&text.text); + } + }); + + let mut connection: diesel::PgConnection = establish_connection(); + let new_feed_item = NewFeedItem::new(feed.id, content.clone(), title.clone(), feed.url.clone()); + let insert_result = diesel::insert_into(feed_item::table) + .values(&new_feed_item) + .execute(&mut connection); + + log::info!("{:?}", insert_result); +} + pub async fn sync(_req: HttpRequest, data: web::Json) -> impl Responder { let mut connection: diesel::PgConnection = establish_connection(); let req_user_id = data.user_id.parse::().unwrap(); - let feed: Vec = feed::table + let feeds: Vec = feed::table .filter(user_id.eq(req_user_id)) .load::(&mut connection) .unwrap(); - log::info!("Found {} feeds to sync.", feed.len()); + log::info!("Found {} feeds to sync.", feeds.len()); - // Create an asynchronous stream of Feed items - let feed_stream = futures::stream::iter(feed.clone().into_iter()).map(|feed| { - // Asynchronously fetch the feed_list for each feed - log::info!("processing feed: {:?}", feed); - async move { - log::info!("start moved"); - let feed_list: rss::Channel = feeds::get_feed(&feed.url).await.unwrap(); - log::info!("{:?}", feed_list); + for feed in feeds { + log::info!("Try to get url: {}", feed.url); + let result = feeds::get_feed(&feed.url).await; - feed_list.into_items().into_iter().for_each(|item| { - let title = item.title.unwrap(); - let frag = Html::parse_fragment(&item.content.unwrap()); - let mut content = "".to_string(); - let frag_clone = frag.clone(); - frag.tree.into_iter().for_each(|node| { - let selector_img = Selector::parse("img").unwrap(); - - for element in frag_clone.select(&selector_img) { - if !content.starts_with("") - } - } - if let scraper::node::Node::Text(text) = node { - content.push_str(&text.text); - } - }); - - let mut connection: diesel::PgConnection = establish_connection(); - let new_feed_item = - NewFeedItem::new(feed.id, content.clone(), title.clone(), feed.url.clone()); - let insert_result = diesel::insert_into(feed_item::table) - .values(&new_feed_item) - .execute(&mut connection); - - log::info!("{:?}", insert_result); - }); + match result { + Ok(channel) => { + for item in channel.into_items() { + create_feed_item(item, &feed) + } + } + Err(e) => log::error!("Could not get channel {}. Error: {}", feed.url, e), } - }); - - // Execute the asynchronous stream - let result = tokio::spawn(feed_stream.for_each(|_| async {})).await; - - if result.is_err() { - log::error!("{:?}", result); - HttpResponse::InternalServerError() - } else { - HttpResponse::Ok() } + + HttpResponse::Ok() } // pub async fn sync(req: HttpRequest) -> impl Responder { // let request = req.clone();