Sync working
parent
e56ba37e7e
commit
3129c521b4
|
@ -10,7 +10,7 @@ use crate::{
|
||||||
};
|
};
|
||||||
use actix_web::{web, HttpRequest, HttpResponse, Responder};
|
use actix_web::{web, HttpRequest, HttpResponse, Responder};
|
||||||
use diesel::prelude::*;
|
use diesel::prelude::*;
|
||||||
use futures::StreamExt;
|
use rss::Item;
|
||||||
use scraper::{Html, Selector};
|
use scraper::{Html, Selector};
|
||||||
use serde_derive::Deserialize;
|
use serde_derive::Deserialize;
|
||||||
|
|
||||||
|
@ -19,67 +19,61 @@ pub struct JsonUser {
|
||||||
user_id: String,
|
user_id: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn create_feed_item(item: Item, feed: &Feed) {
|
||||||
|
let title = item.title.unwrap();
|
||||||
|
let frag = Html::parse_fragment(&item.content.unwrap());
|
||||||
|
let mut content = "".to_string();
|
||||||
|
let frag_clone = frag.clone();
|
||||||
|
frag.tree.into_iter().for_each(|node| {
|
||||||
|
let selector_img = Selector::parse("img").unwrap();
|
||||||
|
|
||||||
|
for element in frag_clone.select(&selector_img) {
|
||||||
|
if !content.starts_with("<img") {
|
||||||
|
content.push_str(&element.html());
|
||||||
|
content.push_str("<br>")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if let scraper::node::Node::Text(text) = node {
|
||||||
|
content.push_str(&text.text);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut connection: diesel::PgConnection = establish_connection();
|
||||||
|
let new_feed_item = NewFeedItem::new(feed.id, content.clone(), title.clone(), feed.url.clone());
|
||||||
|
let insert_result = diesel::insert_into(feed_item::table)
|
||||||
|
.values(&new_feed_item)
|
||||||
|
.execute(&mut connection);
|
||||||
|
|
||||||
|
log::info!("{:?}", insert_result);
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn sync(_req: HttpRequest, data: web::Json<JsonUser>) -> impl Responder {
|
pub async fn sync(_req: HttpRequest, data: web::Json<JsonUser>) -> impl Responder {
|
||||||
let mut connection: diesel::PgConnection = establish_connection();
|
let mut connection: diesel::PgConnection = establish_connection();
|
||||||
|
|
||||||
let req_user_id = data.user_id.parse::<i32>().unwrap();
|
let req_user_id = data.user_id.parse::<i32>().unwrap();
|
||||||
|
|
||||||
let feed: Vec<Feed> = feed::table
|
let feeds: Vec<Feed> = feed::table
|
||||||
.filter(user_id.eq(req_user_id))
|
.filter(user_id.eq(req_user_id))
|
||||||
.load::<Feed>(&mut connection)
|
.load::<Feed>(&mut connection)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
log::info!("Found {} feeds to sync.", feed.len());
|
log::info!("Found {} feeds to sync.", feeds.len());
|
||||||
|
|
||||||
// Create an asynchronous stream of Feed items
|
for feed in feeds {
|
||||||
let feed_stream = futures::stream::iter(feed.clone().into_iter()).map(|feed| {
|
log::info!("Try to get url: {}", feed.url);
|
||||||
// Asynchronously fetch the feed_list for each feed
|
let result = feeds::get_feed(&feed.url).await;
|
||||||
log::info!("processing feed: {:?}", feed);
|
|
||||||
async move {
|
|
||||||
log::info!("start moved");
|
|
||||||
let feed_list: rss::Channel = feeds::get_feed(&feed.url).await.unwrap();
|
|
||||||
log::info!("{:?}", feed_list);
|
|
||||||
|
|
||||||
feed_list.into_items().into_iter().for_each(|item| {
|
match result {
|
||||||
let title = item.title.unwrap();
|
Ok(channel) => {
|
||||||
let frag = Html::parse_fragment(&item.content.unwrap());
|
for item in channel.into_items() {
|
||||||
let mut content = "".to_string();
|
create_feed_item(item, &feed)
|
||||||
let frag_clone = frag.clone();
|
}
|
||||||
frag.tree.into_iter().for_each(|node| {
|
}
|
||||||
let selector_img = Selector::parse("img").unwrap();
|
Err(e) => log::error!("Could not get channel {}. Error: {}", feed.url, e),
|
||||||
|
|
||||||
for element in frag_clone.select(&selector_img) {
|
|
||||||
if !content.starts_with("<img") {
|
|
||||||
content.push_str(&element.html());
|
|
||||||
content.push_str("<br>")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if let scraper::node::Node::Text(text) = node {
|
|
||||||
content.push_str(&text.text);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
let mut connection: diesel::PgConnection = establish_connection();
|
|
||||||
let new_feed_item =
|
|
||||||
NewFeedItem::new(feed.id, content.clone(), title.clone(), feed.url.clone());
|
|
||||||
let insert_result = diesel::insert_into(feed_item::table)
|
|
||||||
.values(&new_feed_item)
|
|
||||||
.execute(&mut connection);
|
|
||||||
|
|
||||||
log::info!("{:?}", insert_result);
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
});
|
|
||||||
|
|
||||||
// Execute the asynchronous stream
|
|
||||||
let result = tokio::spawn(feed_stream.for_each(|_| async {})).await;
|
|
||||||
|
|
||||||
if result.is_err() {
|
|
||||||
log::error!("{:?}", result);
|
|
||||||
HttpResponse::InternalServerError()
|
|
||||||
} else {
|
|
||||||
HttpResponse::Ok()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
HttpResponse::Ok()
|
||||||
}
|
}
|
||||||
// pub async fn sync(req: HttpRequest) -> impl Responder {
|
// pub async fn sync(req: HttpRequest) -> impl Responder {
|
||||||
// let request = req.clone();
|
// let request = req.clone();
|
||||||
|
|
Loading…
Reference in New Issue