Update table feed items

This commit is contained in:
2023-10-08 18:13:58 +02:00
parent ec35f66a88
commit e56ba37e7e
12 changed files with 136 additions and 14 deletions
+1
View File
@@ -5,5 +5,6 @@ use rss::Channel;
pub async fn get_feed(feed: &str) -> Result<Channel, Box<dyn Error>> {
let content = reqwest::get(feed).await?.bytes().await?;
let channel = Channel::read_from(&content[..])?;
log::info!("{:?}", channel);
Ok(channel)
}
+6 -6
View File
@@ -7,9 +7,9 @@ pub struct Feed {
pub title: String,
pub items: Vec<Article>,
}
impl Feed {
pub fn new(title: String, items: Vec<Article>) -> Feed {
Feed { title, items }
}
}
//
// impl Feed {
// pub fn new(title: String, items: Vec<Article>) -> Feed {
// Feed { title, items }
// }
// }
+49 -7
View File
@@ -1,12 +1,17 @@
use super::feeds;
use crate::models::feed::rss_feed::Feed;
use crate::models::feed_item::new_feed_item::NewFeedItem;
use crate::{
database::establish_connection,
models::feed::rss_feed::Feed,
schema::feed::{self, user_id},
schema::{
feed::{self, user_id},
feed_item,
},
};
use actix_web::{web, HttpRequest, HttpResponse, Responder};
use diesel::prelude::*;
use futures::StreamExt;
use scraper::{Html, Selector};
use serde_derive::Deserialize;
#[derive(Deserialize)]
@@ -18,26 +23,63 @@ pub async fn sync(_req: HttpRequest, data: web::Json<JsonUser>) -> impl Responde
let mut connection: diesel::PgConnection = establish_connection();
let req_user_id = data.user_id.parse::<i32>().unwrap();
log::info!("{:?}", req_user_id);
let feed: Vec<Feed> = feed::table
.filter(user_id.eq(req_user_id))
.load::<Feed>(&mut connection)
.unwrap();
log::info!("Found {} feeds to sync.", feed.len());
// Create an asynchronous stream of Feed items
let feed_stream = futures::stream::iter(feed.clone().into_iter()).map(|feed| {
// Asynchronously fetch the feed_list for each feed
log::info!("processing feed: {:?}", feed);
async move {
let _feed_list = feeds::get_feed(&feed.url).await.unwrap();
// Process feed_list here
log::info!("start moved");
let feed_list: rss::Channel = feeds::get_feed(&feed.url).await.unwrap();
log::info!("{:?}", feed_list);
feed_list.into_items().into_iter().for_each(|item| {
let title = item.title.unwrap();
let frag = Html::parse_fragment(&item.content.unwrap());
let mut content = "".to_string();
let frag_clone = frag.clone();
frag.tree.into_iter().for_each(|node| {
let selector_img = Selector::parse("img").unwrap();
for element in frag_clone.select(&selector_img) {
if !content.starts_with("<img") {
content.push_str(&element.html());
content.push_str("<br>")
}
}
if let scraper::node::Node::Text(text) = node {
content.push_str(&text.text);
}
});
let mut connection: diesel::PgConnection = establish_connection();
let new_feed_item =
NewFeedItem::new(feed.id, content.clone(), title.clone(), feed.url.clone());
let insert_result = diesel::insert_into(feed_item::table)
.values(&new_feed_item)
.execute(&mut connection);
log::info!("{:?}", insert_result);
});
}
});
// Execute the asynchronous stream
tokio::spawn(feed_stream.for_each(|_| async {}));
let result = tokio::spawn(feed_stream.for_each(|_| async {})).await;
HttpResponse::Ok()
if result.is_err() {
log::error!("{:?}", result);
HttpResponse::InternalServerError()
} else {
HttpResponse::Ok()
}
}
// pub async fn sync(req: HttpRequest) -> impl Responder {
// let request = req.clone();