Update utils/instaloader_utils.py
Browse files- utils/instaloader_utils.py +17 -15
utils/instaloader_utils.py
CHANGED
@@ -1,13 +1,12 @@
|
|
1 |
import instaloader
|
2 |
-
from typing import List, Dict
|
3 |
-
import
|
|
|
4 |
|
5 |
# Initialize Instaloader
|
6 |
L = instaloader.Instaloader()
|
7 |
|
8 |
-
|
9 |
-
|
10 |
-
async def fetch_user_posts(username: str, max_posts: int = 20):
|
11 |
try:
|
12 |
print(f"Fetching posts for user: {username}")
|
13 |
profile = instaloader.Profile.from_username(L.context, username)
|
@@ -17,7 +16,7 @@ async def fetch_user_posts(username: str, max_posts: int = 20):
|
|
17 |
if count >= max_posts:
|
18 |
break
|
19 |
posts.append({
|
20 |
-
"caption": post.caption,
|
21 |
"likes": post.likes,
|
22 |
"comments": post.comments,
|
23 |
"date": post.date_utc.isoformat(),
|
@@ -25,12 +24,13 @@ async def fetch_user_posts(username: str, max_posts: int = 20):
|
|
25 |
})
|
26 |
|
27 |
if not posts:
|
28 |
-
|
29 |
-
|
30 |
-
|
|
|
31 |
|
32 |
except instaloader.exceptions.ProfileNotExistsException:
|
33 |
-
raise HTTPException(status_code=404, detail="Profile does not exist.")
|
34 |
except instaloader.exceptions.ConnectionException:
|
35 |
raise HTTPException(status_code=503, detail="Instagram connection failed. Try again later.")
|
36 |
except Exception as e:
|
@@ -64,7 +64,8 @@ def find_similar_accounts(username: str, rapidapi_key: str) -> List[str]:
|
|
64 |
print(f"API request failed: {e}")
|
65 |
return []
|
66 |
|
67 |
-
|
|
|
68 |
"""
|
69 |
Fetch posts for similar accounts (competitors) using the RapidAPI endpoint.
|
70 |
"""
|
@@ -78,7 +79,8 @@ def fetch_competitors_posts(username: str, rapidapi_key: str, max_posts: int = 5
|
|
78 |
all_posts = []
|
79 |
for account in similar_accounts:
|
80 |
print(f"Fetching posts for competitor: {account}")
|
81 |
-
|
82 |
-
|
83 |
-
|
84 |
-
|
|
|
|
1 |
import instaloader
|
2 |
+
from typing import List, Dict
|
3 |
+
from fastapi import HTTPException
|
4 |
+
import requests
|
5 |
|
6 |
# Initialize Instaloader
|
7 |
L = instaloader.Instaloader()
|
8 |
|
9 |
+
async def fetch_user_posts(username: str, max_posts: int = 20) -> List[Dict]:
|
|
|
|
|
10 |
try:
|
11 |
print(f"Fetching posts for user: {username}")
|
12 |
profile = instaloader.Profile.from_username(L.context, username)
|
|
|
16 |
if count >= max_posts:
|
17 |
break
|
18 |
posts.append({
|
19 |
+
"caption": post.caption or "No Caption",
|
20 |
"likes": post.likes,
|
21 |
"comments": post.comments,
|
22 |
"date": post.date_utc.isoformat(),
|
|
|
24 |
})
|
25 |
|
26 |
if not posts:
|
27 |
+
print("No posts found.")
|
28 |
+
return []
|
29 |
+
|
30 |
+
return posts
|
31 |
|
32 |
except instaloader.exceptions.ProfileNotExistsException:
|
33 |
+
raise HTTPException(status_code=404, detail=f"Profile '{username}' does not exist.")
|
34 |
except instaloader.exceptions.ConnectionException:
|
35 |
raise HTTPException(status_code=503, detail="Instagram connection failed. Try again later.")
|
36 |
except Exception as e:
|
|
|
64 |
print(f"API request failed: {e}")
|
65 |
return []
|
66 |
|
67 |
+
|
68 |
+
async def fetch_competitors_posts(username: str, rapidapi_key: str, max_posts: int = 50) -> List[Dict]:
|
69 |
"""
|
70 |
Fetch posts for similar accounts (competitors) using the RapidAPI endpoint.
|
71 |
"""
|
|
|
79 |
all_posts = []
|
80 |
for account in similar_accounts:
|
81 |
print(f"Fetching posts for competitor: {account}")
|
82 |
+
try:
|
83 |
+
competitor_posts = await fetch_user_posts(account, max_posts)
|
84 |
+
all_posts.extend(competitor_posts)
|
85 |
+
except HTTPException as e:
|
86 |
+
print(f"Error fetching posts for {account}: {e.detail}")
|