Skip to content

Commit ecd7a94

Browse files
committed
Refactored code to be a bit cleaner.
1 parent 649b8c4 commit ecd7a94

File tree

2 files changed

+92
-103
lines changed

2 files changed

+92
-103
lines changed

kosync.py

Lines changed: 91 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -1,139 +1,127 @@
11
# -*- coding: utf-8 -*-
22
import time
33
import uuid
4-
from distutils.util import strtobool
54
from os import getenv
6-
from typing import Optional
75

86
from dotenv import load_dotenv
9-
from fastapi import FastAPI, Header
7+
from fastapi import FastAPI, Header, Depends, HTTPException
108
from fastapi.responses import JSONResponse
119
from pydantic import BaseModel
12-
from tinydb import Query, TinyDB
10+
from tinydb import TinyDB, Query
1311

1412
app = FastAPI(openapi_url=None, redoc_url=None)
1513
db = TinyDB("data/db.json")
1614
users = db.table("users")
1715
documents = db.table("documents")
1816
load_dotenv()
1917

18+
19+
def str_to_bool(value: str) -> bool:
20+
return value.lower() in ("true", "1", "yes")
21+
22+
2023
class KosyncUser(BaseModel):
21-
username: Optional[str] = None
22-
password: Optional[str] = None
24+
username: str
25+
password: str
2326

2427

2528
class KosyncDocument(BaseModel):
26-
document: Optional[str] = None
27-
progress: Optional[str] = None
28-
percentage: Optional[float] = None
29-
device: Optional[str] = None
30-
device_id: Optional[str] = None
29+
document: str
30+
progress: str
31+
percentage: float
32+
device: str
33+
device_id: str
34+
35+
36+
def get_auth_headers(
37+
x_auth_user: str = Header(...), x_auth_key: str = Header(...)
38+
):
39+
"""Dependency to extract authentication headers and validate user."""
40+
query = Query()
41+
user_exists = users.contains((query.username == x_auth_user) & (query.password == x_auth_key))
42+
if not user_exists:
43+
raise HTTPException(status_code=401, detail="Unauthorized")
44+
return x_auth_user
3145

3246

3347
@app.post("/users/create")
3448
def register(kosync_user: KosyncUser):
35-
# Check whether new registrations are allowed on this server based on the OPEN_REGISTRATIONS environment variable.
36-
# By default registrations are enabled.
37-
registrations_allowed = bool(strtobool(getenv("OPEN_REGISTRATIONS", "True")))
38-
if registrations_allowed:
39-
# check if username or password is missing
40-
if kosync_user.username is None or kosync_user.password is None:
41-
return JSONResponse(status_code=400, content={"message": f"Invalid request"})
42-
# check if user already exists
43-
QUser = Query()
44-
if users.contains(QUser.username == kosync_user.username):
45-
return JSONResponse(status_code=409, content="Username is already registered.")
46-
# register new user
47-
if users.insert({'username': kosync_user.username, 'password': kosync_user.password}):
48-
return JSONResponse(status_code=201, content={"username": kosync_user.username})
49-
# if something went wrong
50-
return JSONResponse(status_code=500, content="Unknown server error")
51-
else:
52-
return JSONResponse(status_code=403, content="This server is currently not accepting new registrations.")
49+
"""Registers a new user if registration is allowed."""
50+
if not str_to_bool(getenv("OPEN_REGISTRATIONS", "True")):
51+
raise HTTPException(status_code=403, detail="Registrations are disabled.")
52+
53+
query = Query()
54+
if users.contains(query.username == kosync_user.username):
55+
raise HTTPException(status_code=409, detail="Username is already registered.")
56+
57+
users.insert({"username": kosync_user.username, "password": kosync_user.password})
58+
return JSONResponse(status_code=201, content={"username": kosync_user.username})
5359

5460

5561
@app.get("/users/auth")
56-
def authorize(x_auth_user: Optional[str] = Header(None), x_auth_key: Optional[str] = Header(None)):
57-
# check if username or password is missing
58-
if x_auth_user is None or x_auth_key is None:
59-
return JSONResponse(status_code=401, content={"message": f"Unauthorized"})
60-
# check if username is in database
61-
QUser = Query()
62-
# check username and password combination
63-
if users.contains(QUser.username == x_auth_user):
64-
if users.contains((QUser.username == x_auth_user) & (QUser.password == x_auth_key)):
65-
return JSONResponse(status_code=200, content={"authorized": f"OK"})
66-
else:
67-
return JSONResponse(status_code=401, content={"message": f"Unauthorized"})
68-
return JSONResponse(status_code=403, content={"message": f"Forbidden"})
62+
def authorize(auth_user: str = Depends(get_auth_headers)):
63+
"""Validates user credentials."""
64+
return JSONResponse(status_code=200, content={"authorized": "OK"})
6965

7066

7167
@app.put("/syncs/progress")
72-
def update_progress(kosync_document: KosyncDocument, x_auth_user: Optional[str] = Header(None),
73-
x_auth_key: Optional[str] = Header(None)):
74-
# check if username or password is missing
75-
if x_auth_user is None or x_auth_key is None:
76-
return JSONResponse(status_code=401, content={"message": f"Unauthorized"})
77-
QUser = Query()
78-
QDocument = Query()
79-
# check if username is in database
80-
if not users.contains(QUser.username == x_auth_user):
81-
return JSONResponse(status_code=403, content={"message": f"Forbidden"})
82-
# check username and password combination before put data in database
83-
if users.contains((QUser.username == x_auth_user) & (QUser.password == x_auth_key)):
84-
# add new document progress
85-
timestamp = int(time.time())
86-
if kosync_document.document is None or kosync_document.progress is None or kosync_document.percentage is None \
87-
or kosync_document.device is None or kosync_document.device_id is None:
88-
return JSONResponse(status_code=500, content="Unknown server error")
89-
else:
90-
if documents.upsert({'username': x_auth_user, 'document': kosync_document.document,
91-
'progress': kosync_document.progress, 'percentage': kosync_document.percentage,
92-
'device': kosync_document.device, 'device_id': kosync_document.device_id,
93-
'timestamp': timestamp}, (QDocument.username == x_auth_user) &
94-
(QDocument.document == kosync_document.document)):
95-
return JSONResponse(status_code=200,
96-
content={"document": kosync_document.document, "timestamp": timestamp})
97-
else:
98-
return JSONResponse(status_code=401, content={"message": f"Unauthorized"})
68+
def update_progress(
69+
kosync_document: KosyncDocument,
70+
auth_user: str = Depends(get_auth_headers)
71+
):
72+
"""Updates document progress for a user."""
73+
timestamp = int(time.time())
74+
75+
query = Query()
76+
documents.upsert(
77+
{
78+
"username": auth_user,
79+
"document": kosync_document.document,
80+
"progress": kosync_document.progress,
81+
"percentage": kosync_document.percentage,
82+
"device": kosync_document.device,
83+
"device_id": kosync_document.device_id,
84+
"timestamp": timestamp,
85+
},
86+
(query.username == auth_user) & (query.document == kosync_document.document),
87+
)
88+
89+
return JSONResponse(status_code=200, content={"document": kosync_document.document, "timestamp": timestamp})
9990

10091

10192
@app.get("/syncs/progress/{document}")
102-
def get_progress(document: Optional[str] = None, x_auth_user: Optional[str] = Header(None),
103-
x_auth_key: Optional[str] = Header(None)):
104-
# check if username or password is missing
105-
if x_auth_user is None or x_auth_key is None:
106-
return JSONResponse(status_code=401, content={"message": f"Unauthorized"})
107-
# check if document parameter exists
108-
if document is None:
109-
return JSONResponse(status_code=500, content="Unknown server error")
110-
111-
QUser = Query()
112-
QDocument = Query()
113-
114-
# check if username is in database
115-
if not users.contains(QUser.username == x_auth_user):
116-
return JSONResponse(status_code=403, content={"message": f"Forbidden"})
117-
118-
# check username and password combination before get progress data
119-
if users.contains((QUser.username == x_auth_user) & (QUser.password == x_auth_key)):
120-
# get document progress if user has the document
121-
result = documents.get((QDocument.username == x_auth_user) & (QDocument.document == document))
122-
if result:
123-
rrdi = bool(strtobool(getenv("RECEIVE_RANDOM_DEVICE_ID", "False")))
124-
if rrdi == False:
125-
device_id = result["device_id"]
126-
else:
127-
device_id = uuid.uuid1()
128-
device_id = str(device_id.hex).upper()
129-
return JSONResponse(status_code=200,
130-
content={'username': x_auth_user, 'document': result["document"],
131-
'progress': result["progress"], 'percentage': result["percentage"],
132-
'device': result["device"], 'device_id': device_id,
133-
'timestamp': result["timestamp"]})
93+
def get_progress(
94+
document: str,
95+
auth_user: str = Depends(get_auth_headers)
96+
):
97+
"""Retrieves progress of a document."""
98+
query = Query()
99+
result = documents.get((query.username == auth_user) & (query.document == document))
100+
101+
if not result:
102+
raise HTTPException(status_code=404, detail="Document not found")
103+
104+
# Determine device_id based on environment setting
105+
if str_to_bool(getenv("RECEIVE_RANDOM_DEVICE_ID", "False")):
106+
device_id = uuid.uuid4().hex.upper()
134107
else:
135-
return JSONResponse(status_code=401, content={"message": f"Unauthorized"})
108+
device_id = result["device_id"]
109+
110+
return JSONResponse(
111+
status_code=200,
112+
content={
113+
"username": auth_user,
114+
"document": result["document"],
115+
"progress": result["progress"],
116+
"percentage": result["percentage"],
117+
"device": result["device"],
118+
"device_id": device_id,
119+
"timestamp": result["timestamp"],
120+
},
121+
)
122+
136123

137124
@app.get("/healthstatus")
138125
def get_healthstatus():
139-
return JSONResponse(status_code=200, content={"message": f"healthy"})
126+
"""Health check endpoint."""
127+
return JSONResponse(status_code=200, content={"message": "healthy"})

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@ fastapi==0.111.0
22
tinydb==4.8.0
33
uvicorn==0.29.0
44
python-dotenv==1.0.1
5+
pydantic~=2.10.6

0 commit comments

Comments
 (0)