diff --git a/src/reader/sync.rs b/src/reader/sync.rs
index 616c865..729dc99 100644
--- a/src/reader/sync.rs
+++ b/src/reader/sync.rs
@@ -10,7 +10,7 @@ use crate::{
};
use actix_web::{web, HttpRequest, HttpResponse, Responder};
use diesel::prelude::*;
-use futures::StreamExt;
+use rss::Item;
use scraper::{Html, Selector};
use serde_derive::Deserialize;
@@ -19,67 +19,61 @@ pub struct JsonUser {
user_id: String,
}
+fn create_feed_item(item: Item, feed: &Feed) {
+ let title = item.title.unwrap();
+ let frag = Html::parse_fragment(&item.content.unwrap());
+ let mut content = "".to_string();
+ let frag_clone = frag.clone();
+ frag.tree.into_iter().for_each(|node| {
+ let selector_img = Selector::parse("img").unwrap();
+
+ for element in frag_clone.select(&selector_img) {
+ if !content.starts_with("
")
+ }
+ }
+ if let scraper::node::Node::Text(text) = node {
+ content.push_str(&text.text);
+ }
+ });
+
+ let mut connection: diesel::PgConnection = establish_connection();
+ let new_feed_item = NewFeedItem::new(feed.id, content.clone(), title.clone(), feed.url.clone());
+ let insert_result = diesel::insert_into(feed_item::table)
+ .values(&new_feed_item)
+ .execute(&mut connection);
+
+ log::info!("{:?}", insert_result);
+}
+
pub async fn sync(_req: HttpRequest, data: web::Json) -> impl Responder {
let mut connection: diesel::PgConnection = establish_connection();
let req_user_id = data.user_id.parse::().unwrap();
- let feed: Vec = feed::table
+ let feeds: Vec = feed::table
.filter(user_id.eq(req_user_id))
.load::(&mut connection)
.unwrap();
- log::info!("Found {} feeds to sync.", feed.len());
+ log::info!("Found {} feeds to sync.", feeds.len());
- // Create an asynchronous stream of Feed items
- let feed_stream = futures::stream::iter(feed.clone().into_iter()).map(|feed| {
- // Asynchronously fetch the feed_list for each feed
- log::info!("processing feed: {:?}", feed);
- async move {
- log::info!("start moved");
- let feed_list: rss::Channel = feeds::get_feed(&feed.url).await.unwrap();
- log::info!("{:?}", feed_list);
+ for feed in feeds {
+ log::info!("Try to get url: {}", feed.url);
+ let result = feeds::get_feed(&feed.url).await;
- feed_list.into_items().into_iter().for_each(|item| {
- let title = item.title.unwrap();
- let frag = Html::parse_fragment(&item.content.unwrap());
- let mut content = "".to_string();
- let frag_clone = frag.clone();
- frag.tree.into_iter().for_each(|node| {
- let selector_img = Selector::parse("img").unwrap();
-
- for element in frag_clone.select(&selector_img) {
- if !content.starts_with("
")
- }
- }
- if let scraper::node::Node::Text(text) = node {
- content.push_str(&text.text);
- }
- });
-
- let mut connection: diesel::PgConnection = establish_connection();
- let new_feed_item =
- NewFeedItem::new(feed.id, content.clone(), title.clone(), feed.url.clone());
- let insert_result = diesel::insert_into(feed_item::table)
- .values(&new_feed_item)
- .execute(&mut connection);
-
- log::info!("{:?}", insert_result);
- });
+ match result {
+ Ok(channel) => {
+ for item in channel.into_items() {
+ create_feed_item(item, &feed)
+ }
+ }
+ Err(e) => log::error!("Could not get channel {}. Error: {}", feed.url, e),
}
- });
-
- // Execute the asynchronous stream
- let result = tokio::spawn(feed_stream.for_each(|_| async {})).await;
-
- if result.is_err() {
- log::error!("{:?}", result);
- HttpResponse::InternalServerError()
- } else {
- HttpResponse::Ok()
}
+
+ HttpResponse::Ok()
}
// pub async fn sync(req: HttpRequest) -> impl Responder {
// let request = req.clone();