-
-
Notifications
You must be signed in to change notification settings - Fork 15
/
main.py
177 lines (143 loc) · 5.94 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
import os
import json
import httpx
import uvicorn
from deta import Deta
from typing import Optional
from fastapi import FastAPI, Response, HTTPException
from fastapi.responses import JSONResponse, HTMLResponse, RedirectResponse
from fastapi.middleware.cors import CORSMiddleware
from urllib.parse import urlparse
# local imports
import ingredients
deta = Deta()
#deta = Deta(project_key=os.environ.get("DETA_PROJECT_KEY"), project_id=os.environ.get("DETA_PROJECT_ID"))
db = deta.Base("ingredients-stats")
cache_db = deta.Base("ingredients-cache")
app = FastAPI(
title="Ingredients – API",
license_info="https://github.com/berrysauce/ingredients/blob/main/LICENSE.md",
docs_url=None,
redoc_url=None
)
origins = [
"https://ingredients.tech",
"https://dev.ingredients.tech",
"https://cdn.ingredients.tech",
"https://ingredients.work",
"https://dev.ingredients.work",
"https://cdn.ingredients.work"
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/", response_class=HTMLResponse)
def get_root():
with open("pages/api.html", "r") as f:
return HTMLResponse(content=f.read())
@app.get("/docs")
def get_docs():
return RedirectResponse(url="https://github.com/berrysauce/ingredients/blob/main/README.md#-using-the-api", status_code=301)
@app.get("/ingredients", response_class=JSONResponse)
def get_scan(url: str, includeCategories: Optional[bool] = False):
# increase compatibility
if url[:4] != "http":
# muhahaha! HTTPS for the win!
url = "https://" + url
# Parse URL and remove port, query, and fragment
r = urlparse(url)
parsed_url = r.scheme + "://" + r.netloc.split(":")[0] + r.path
# ----- CACHE -----
# request data from cache
try:
cache_data = cache_db.get(key=url)
if cache_data != None:
if cache_data["categories"].get("other"):
# move "other" to the end of the dict
cache_data["categories"]["other"] = cache_data["categories"].pop("other")
return cache_data
except Exception:
# Deta hasn't defined a specific exception for this error
# just ignore it
pass
# -----------------
try:
data = ingredients.scan(parsed_url)
matching_ingredients = data
matching_categories = []
return_data = {
"url": url,
"matching_ingredients": len(matching_ingredients),
"matches": {}
}
for ingredient in matching_ingredients:
if ingredient.split("/")[0] not in matching_categories:
matching_categories.append(ingredient.split("/")[0])
return_data["matches"][ingredient.split("/")[0]] = []
for category in matching_categories:
for ingredient in matching_ingredients:
if ingredient.split("/")[0] == category:
with open(f"ingredients/{ingredient}", "r") as f:
ingredient_data = json.loads(f.read())
ingredient_name = ingredient.split("/")[1].replace(".json", "")
# ----- STATS -----
# increment matching scans for each ingredient
db_ingredient = db.get(ingredient_name)
try:
db.update(key=ingredient_name, updates={
"matching_scans": int(db_ingredient["matching_scans"]) + 1
})
except Exception:
# Deta hasn't defined a specific exception for this error
# just ignore it
pass
# -----------------
return_data["matches"][ingredient.split("/")[0]].append(
{
"id": ingredient_name,
"name": ingredient_data["name"],
"description": ingredient_data["description"],
"icon": ingredient_data["icon"],
"match_percentage": round(((db_ingredient["matching_scans"] + 1) / db_ingredient["total_scans"]) * 100, 1)
}
)
if includeCategories:
with open("ingredients/categories.json", "r") as f:
return_data["categories"] = json.loads(f.read())
# ----- CACHE -----
# add data to cache
# expiry: 15 minutes (900 seconds)
try:
cache_db.put(key=url, data=return_data, expire_in=900)
except Exception:
# Deta hasn't defined a specific exception for this error
# just ignore it
pass
# -----------------
return return_data
except httpx.InvalidURL as e:
raise HTTPException(status_code=400, detail=str(e))
except httpx.RequestError as e:
raise HTTPException(status_code=400, detail=str(e))
except:
raise HTTPException(status_code=500, detail=f"Unknown error")
@app.get("/icon/{icon}")
def get_icon(icon: str, response: Response):
# increase compatibility
if ".png" not in icon:
icon += ".png"
# parse icon and remove path, query, and fragment
parsed_icon = icon.lower().split("/")[0].split("?")[0].split("#")[0]
try:
with open("./icons/" + parsed_icon, "rb") as f:
return Response(content=f.read(), media_type="image/png")
except FileNotFoundError:
raise HTTPException(status_code=404, detail="Icon not found")
except:
raise HTTPException(status_code=500, detail="An error occured while fetching the icon")
if __name__ == "__main__":
uvicorn.run(app, host="localhost", port=8000)