diff --git a/openlibrary/accounts/model.py b/openlibrary/accounts/model.py index 73d8c006530..ce0372e2b9b 100644 --- a/openlibrary/accounts/model.py +++ b/openlibrary/accounts/model.py @@ -41,15 +41,15 @@ class OLAuthenticationError(Exception): pass -def append_random_suffix(text, limit=9999): +def append_random_suffix(text: str, limit: int = 9999) -> str: return f'{text}{random.randint(0, limit)}' -def valid_email(email): +def valid_email(email: str) -> bool: return validate_email(email) -def sendmail(to, msg, cc=None): +def sendmail(to: str, msg, cc=None) -> None: cc = cc or [] if config.get('dummy_sendmail'): message = ( @@ -70,13 +70,13 @@ def sendmail(to, msg, cc=None): ) -def verify_hash(secret_key, text, hash): +def verify_hash(secret_key, text, hash) -> bool: """Verifies if the hash is generated""" salt = hash.split('$', 1)[0] return generate_hash(secret_key, text, salt) == hash -def generate_hash(secret_key, text, salt=None): +def generate_hash(secret_key, text, salt=None) -> str: if not isinstance(secret_key, bytes): secret_key = secret_key.encode('utf-8') salt = ( @@ -95,11 +95,11 @@ def get_secret_key(): return config.infobase['secret_key'] -def generate_uuid(): +def generate_uuid() -> str: return str(uuid.uuid4()).replace("-", "") -def send_verification_email(username, email): +def send_verification_email(username: str, email: str) -> None: """Sends account verification email.""" key = f"account/{username}/verify" @@ -113,7 +113,7 @@ def send_verification_email(username, email): sendmail(email, msg) -def create_link_doc(key, username, email): +def create_link_doc(key: str, username: str, email: str) -> dict: """Creates doc required for generating verification link email. The doc contains username, email and a generated code. @@ -135,35 +135,35 @@ def create_link_doc(key, username, email): } -def clear_cookies(): +def clear_cookies() -> None: web.setcookie('pd', "", expires=-1) web.setcookie('sfw', "", expires=-1) class Link(web.storage): - def get_expiration_time(self): + def get_expiration_time(self) -> datetime.datetime: d = self['expires_on'].split(".")[0] return datetime.datetime.strptime(d, "%Y-%m-%dT%H:%M:%S") - def get_creation_time(self): + def get_creation_time(self) -> datetime.datetime: d = self['created_on'].split(".")[0] return datetime.datetime.strptime(d, "%Y-%m-%dT%H:%M:%S") - def delete(self): + def delete(self) -> None: del web.ctx.site.store[self['_key']] class Account(web.storage): @property - def username(self): + def username(self) -> str: return self._key.split("/")[-1] - def get_edit_count(self): + def get_edit_count(self) -> int: user = self.get_user() return (user and user.get_edit_count()) or 0 @property - def registered_on(self): + def registered_on(self) -> datetime.datetime | None: """Returns the registration time.""" t = self.get("created_on") return t and helpers.parse_datetime(t) @@ -174,7 +174,7 @@ def activated_on(self): return user and user.created @property - def displayname(self): + def displayname(self) -> str: if doc := self.get_user(): return doc.displayname or self.username elif "data" in self: @@ -182,34 +182,34 @@ def displayname(self): else: return self.username - def creation_time(self): + def creation_time(self) -> datetime.datetime: d = self['created_on'].split(".")[0] return datetime.datetime.strptime(d, "%Y-%m-%dT%H:%M:%S") - def get_recentchanges(self, limit=100, offset=0): + def get_recentchanges(self, limit: int = 100, offset: int = 0): q = {"author": self.get_user().key, "limit": limit, "offset": offset} return web.ctx.site.recentchanges(q) - def verify_password(self, password): + def verify_password(self, password) -> bool: return verify_hash(get_secret_key(), password, self.enc_password) - def update_password(self, new_password): + def update_password(self, new_password) -> None: web.ctx.site.update_account(self.username, password=new_password) - def update_email(self, email): + def update_email(self, email) -> None: web.ctx.site.update_account(self.username, email=email) - def send_verification_email(self): + def send_verification_email(self) -> None: send_verification_email(self.username, self.email) - def activate(self): + def activate(self) -> None: web.ctx.site.activate_account(username=self.username) - def block(self): + def block(self) -> None: """Blocks this account.""" web.ctx.site.update_account(self.username, status="blocked") - def unblock(self): + def unblock(self) -> None: """Unblocks this account.""" web.ctx.site.update_account(self.username, status="active") @@ -243,25 +243,25 @@ def login(self, password): return "ok" @classmethod - def generate_random_password(cls, n=12): + def generate_random_password(cls, n: int = 12) -> str: return ''.join( random.SystemRandom().choice(string.ascii_uppercase + string.digits) for _ in range(n) ) - def generate_login_code(self): + def generate_login_code(self) -> str: """Returns a string that can be set as login cookie to log in as this user.""" user_key = "/people/" + self.username t = datetime.datetime(*time.gmtime()[:6]).isoformat() text = f"{user_key},{t}" return text + "," + generate_hash(get_secret_key(), text) - def _save(self): + def _save(self) -> None: """Saves this account in store.""" web.ctx.site.store[self._key] = self @property - def last_login(self): + def last_login(self) -> datetime.datetime: """Returns the last_login time of the user, if available. The `last_login` will not be available for accounts, who haven't @@ -281,26 +281,26 @@ def get_user(self) -> 'User': key = "/people/" + self.username return web.ctx.site.get(key) - def get_creation_info(self): + def get_creation_info(self) -> dict: key = "/people/" + self.username doc = web.ctx.site.get(key) return doc.get_creation_info() - def get_activation_link(self): + def get_activation_link(self) -> Link | bool: key = f"account/{self.username}/verify" if doc := web.ctx.site.store.get(key): return Link(doc) else: return False - def get_password_reset_link(self): + def get_password_reset_link(self) -> Link | bool: key = f"account/{self.username}/password" if doc := web.ctx.site.store.get(key): return Link(doc) else: return False - def get_links(self): + def get_links(self) -> list[Link]: """Returns all the verification links present in the database.""" return web.ctx.site.store.values( type="account-link", name="username", value=self.username @@ -313,21 +313,21 @@ def get_tags(self) -> list[str]: def has_tag(self, tag: str) -> bool: return tag in self.get_tags() - def add_tag(self, tag): + def add_tag(self, tag) -> None: tags = self.get_tags() if tag not in tags: tags.append(tag) self['tags'] = tags self._save() - def remove_tag(self, tag): + def remove_tag(self, tag) -> None: tags = self.get_tags() if tag in tags: tags.remove(tag) self['tags'] = tags self._save() - def set_bot_flag(self, flag): + def set_bot_flag(self, flag) -> None: """Enables/disables the bot flag.""" self.bot = flag self._save() @@ -402,7 +402,7 @@ def get_linked_ia_account(self): if 'values' in act and 'email' in act['values']: return InternetArchiveAccount.get(email=act['values']['email']) - def render_link(self): + def render_link(self) -> str: return f'{web.net.htmlquote(self.displayname)}' @@ -564,14 +564,14 @@ def get_by_email( return None @property - def verified(self): + def verified(self) -> bool: return getattr(self, 'status', '') != 'pending' @property - def blocked(self): + def blocked(self) -> bool: return getattr(self, 'status', '') == 'blocked' - def unlink(self): + def unlink(self) -> None: """Careful, this will save any other changes to the ol user object as well """ @@ -581,7 +581,7 @@ def unlink(self): self.internetarchive_itemname = None stats.increment('ol.account.xauth.unlinked') - def link(self, itemname): + def link(self, itemname) -> None: """Careful, this will save any other changes to the ol user object as well """ @@ -593,7 +593,7 @@ def link(self, itemname): self.internetarchive_itemname = itemname stats.increment('ol.account.xauth.linked') - def save_s3_keys(self, s3_keys): + def save_s3_keys(self, s3_keys: dict) -> None: _ol_account = web.ctx.site.store.get(self._key) _ol_account['s3_keys'] = s3_keys web.ctx.site.store[self._key] = _ol_account @@ -607,7 +607,7 @@ def update_last_login(self): self.last_login = last_login @classmethod - def authenticate(cls, email, password, test=False): + def authenticate(cls, email: str, password: str, test: bool = False) -> str: ol_account = cls.get(email=email, test=test) if not ol_account: return "account_not_found" @@ -618,12 +618,11 @@ def authenticate(cls, email, password, test=False): except ClientException as e: code = e.get_data().get("code") return code - else: - return "ok" + return "ok" class InternetArchiveAccount(web.storage): - def __init__(self, **kwargs): + def __init__(self, **kwargs) -> None: for k, v in kwargs.items(): setattr(self, k, v) @@ -750,11 +749,17 @@ def xauth(cls, op, test=None, s3_key=None, s3_secret=None, xauth_url=None, **dat return {'error': response.text, 'code': response.status_code} @classmethod - def s3auth(cls, access_key, secret_key): + def s3auth(cls, access_key: str, secret_key: str): """Authenticates an Archive.org user based on s3 keys""" from openlibrary.core import lending url = lending.config_ia_s3_auth_url + + if not isinstance(url, str): + raise TypeError( + f"Expected 'url' to be a string, got {type(url).__name__} instead" + ) + try: response = requests.get( url,