diff --git a/server/fishtest/__init__.py b/server/fishtest/__init__.py
index ba13a59e44..b2390a2b98 100644
--- a/server/fishtest/__init__.py
+++ b/server/fishtest/__init__.py
@@ -5,6 +5,7 @@
import traceback
import fishtest.github_api as gh
+from fishtest.emailer import EmailSender
from fishtest.routes import setup_routes
from fishtest.rundb import RunDb
from pyramid.authentication import AuthTktAuthenticationPolicy
@@ -47,6 +48,8 @@ def main(global_config, **settings):
# assume the instance is primary for backward compatibility.
is_primary_instance = port == primary_port
+ email_sender = EmailSender()
+
rundb = RunDb(port=port, is_primary_instance=is_primary_instance)
def add_rundb(event):
@@ -54,6 +57,7 @@ def add_rundb(event):
event.request.userdb = rundb.userdb
event.request.actiondb = rundb.actiondb
event.request.workerdb = rundb.workerdb
+ event.request.email_sender = email_sender
def add_renderer_globals(event):
pass
diff --git a/server/fishtest/emailer.py b/server/fishtest/emailer.py
new file mode 100644
index 0000000000..9aa9a68ee1
--- /dev/null
+++ b/server/fishtest/emailer.py
@@ -0,0 +1,84 @@
+import os
+
+import requests
+
+RESEND_API_URL = "https://api.resend.com/emails"
+RESEND_TIMEOUT = 10
+
+
+class EmailSendError(RuntimeError):
+ pass
+
+
+class EmailSender:
+ def __init__(
+ self,
+ api_key=None,
+ from_email=None,
+ from_name=None,
+ session=None,
+ ):
+ self.api_key = api_key or os.getenv("FISHTEST_RESEND_API_KEY")
+ self.from_email = from_email or os.getenv(
+ "FISHTEST_RESEND_FROM_EMAIL", "fishtest@resend.dev"
+ )
+ self.from_name = from_name or os.getenv("FISHTEST_RESEND_FROM_NAME", "Fishtest")
+ self.session = session or requests.Session()
+ missing_settings = []
+ if not self.api_key:
+ missing_settings.append("FISHTEST_RESEND_API_KEY")
+ if not self.from_email:
+ missing_settings.append("FISHTEST_RESEND_FROM_EMAIL")
+ if missing_settings:
+ print(
+ "Email sending is not configured; missing {}.".format(
+ ", ".join(missing_settings)
+ ),
+ flush=True,
+ )
+
+ def _from_field(self):
+ if not self.from_email:
+ raise EmailSendError("FISHTEST_RESEND_FROM_EMAIL is missing.")
+ if self.from_name:
+ return f"{self.from_name} <{self.from_email}>"
+ return self.from_email
+
+ def send(
+ self,
+ to_email,
+ subject,
+ text,
+ html=None,
+ reply_to=None,
+ ):
+ if not self.api_key:
+ raise EmailSendError("FISHTEST_RESEND_API_KEY is missing.")
+ if not to_email:
+ raise EmailSendError("to_email is required.")
+
+ payload = {
+ "from": self._from_field(),
+ "to": [to_email] if isinstance(to_email, str) else to_email,
+ "subject": subject,
+ "text": text,
+ }
+ if html:
+ payload["html"] = html
+ if reply_to:
+ payload["reply_to"] = reply_to
+
+ headers = {"Authorization": f"Bearer {self.api_key}"}
+ try:
+ response = self.session.post(
+ RESEND_API_URL, json=payload, headers=headers, timeout=RESEND_TIMEOUT
+ )
+ except Exception as exc:
+ raise EmailSendError(f"Failed to reach Resend: {exc}") from exc
+
+ if not response.ok:
+ raise EmailSendError(
+ f"Resend error {response.status_code}: {response.text}"
+ )
+
+ return response.json()
diff --git a/server/fishtest/routes.py b/server/fishtest/routes.py
index 5dd7e9d965..c9abb9a83d 100644
--- a/server/fishtest/routes.py
+++ b/server/fishtest/routes.py
@@ -14,6 +14,8 @@ def setup_routes(config):
config.add_route("home", "/")
config.add_route("login", "/login")
+ config.add_route("forgot_password", "/forgot_password")
+ config.add_route("reset_password", "/reset_password/{token}")
config.add_route("nn_upload", "/upload")
config.add_route("logout", "/logout")
config.add_route("signup", "/signup")
diff --git a/server/fishtest/schemas.py b/server/fishtest/schemas.py
index 97737a7d53..ca945e8866 100644
--- a/server/fishtest/schemas.py
+++ b/server/fishtest/schemas.py
@@ -108,6 +108,12 @@ def size_is_length(pgn_doc):
"groups": [str, ...],
"tests_repo": union(github_repo, ""),
"machine_limit": uint,
+ "password_reset?": {
+ "token": str,
+ "expires_at": datetime_utc,
+ "opened_at?": datetime_utc,
+ "form_token?": str,
+ },
}
kvstore_schema = {
diff --git a/server/fishtest/templates/forgot_password.mak b/server/fishtest/templates/forgot_password.mak
new file mode 100644
index 0000000000..2f9fd140e1
--- /dev/null
+++ b/server/fishtest/templates/forgot_password.mak
@@ -0,0 +1,43 @@
+<%inherit file="base.mak"/>
+
+
+
+<%block name="head">
+
+%block>
+
+
+
+ Reset your password
+
+ Enter the email linked to your account and we'll send a reset link.
+
+
+
+
+
+
+
diff --git a/server/fishtest/templates/login.mak b/server/fishtest/templates/login.mak
index bf9683755c..a9a1b31955 100644
--- a/server/fishtest/templates/login.mak
+++ b/server/fishtest/templates/login.mak
@@ -58,6 +58,10 @@
+
+
diff --git a/server/fishtest/templates/reset_password.mak b/server/fishtest/templates/reset_password.mak
new file mode 100644
index 0000000000..3a60765cf5
--- /dev/null
+++ b/server/fishtest/templates/reset_password.mak
@@ -0,0 +1,57 @@
+<%inherit file="base.mak"/>
+
+
+
+
+
+ Choose a new password
+
+
+
+
+
+
diff --git a/server/fishtest/userdb.py b/server/fishtest/userdb.py
index 656204866f..f5f6ce3f2f 100644
--- a/server/fishtest/userdb.py
+++ b/server/fishtest/userdb.py
@@ -137,6 +137,68 @@ def save_user(self, user):
self.last_blocked_time = 0
self.clear_cache()
+ def set_password_reset(self, user, token, expires_at):
+ user["password_reset"] = {"token": token, "expires_at": expires_at}
+ self.save_user(user)
+
+ def clear_expired_password_reset(self, token, now):
+ result = self.users.update_one(
+ {"password_reset.token": token, "password_reset.expires_at": {"$lt": now}},
+ {"$unset": {"password_reset": ""}},
+ )
+ if result.matched_count:
+ self.clear_cache()
+ return result
+
+ def update_password_with_reset_token(self, user_id, token, new_password):
+ result = self.users.update_one(
+ {"_id": user_id, "password_reset.token": token},
+ {"$set": {"password": new_password}, "$unset": {"password_reset": ""}},
+ )
+ if result.modified_count:
+ self.clear_cache()
+ return result
+
+ def mark_password_reset_opened(self, user_id, token, opened_at):
+ result = self.users.update_one(
+ {
+ "_id": user_id,
+ "password_reset.token": token,
+ "password_reset.opened_at": {"$exists": False},
+ },
+ {"$set": {"password_reset.opened_at": opened_at}},
+ )
+ if result.modified_count:
+ self.clear_cache()
+ return result
+
+ def set_password_reset_form_token(self, user_id, token, form_token, opened_at):
+ result = self.users.update_one(
+ {
+ "_id": user_id,
+ "password_reset.token": token,
+ "password_reset.form_token": {"$exists": False},
+ },
+ {
+ "$set": {
+ "password_reset.form_token": form_token,
+ "password_reset.opened_at": opened_at,
+ }
+ },
+ )
+ if result.modified_count:
+ self.clear_cache()
+ return result
+
+ def update_password_with_reset_form_token(self, user_id, form_token, new_password):
+ result = self.users.update_one(
+ {"_id": user_id, "password_reset.form_token": form_token},
+ {"$set": {"password": new_password}, "$unset": {"password_reset": ""}},
+ )
+ if result.modified_count:
+ self.clear_cache()
+ return result
+
def remove_user(self, user, rejector):
result = self.users.delete_one({"_id": user["_id"]})
if result.deleted_count > 0:
diff --git a/server/fishtest/views.py b/server/fishtest/views.py
index dfb16a4de1..426b0fc75f 100644
--- a/server/fishtest/views.py
+++ b/server/fishtest/views.py
@@ -5,7 +5,8 @@
import json
import os
import re
-from datetime import UTC, datetime
+import secrets
+from datetime import UTC, datetime, timedelta
from pathlib import Path
import bson
@@ -40,6 +41,7 @@
from vtjson import ValidationError, union, validate
HTTP_TIMEOUT = 15.0
+PASSWORD_RESET_EXPIRY_HOURS = 1
def pagination(page_idx, num, page_size, query_params):
@@ -77,6 +79,30 @@ def pagination(page_idx, num, page_size, query_params):
return pages
+def run_captcha(request):
+ secret = os.environ.get("FISHTEST_CAPTCHA_SECRET")
+ if not secret:
+ print("FISHTEST_CAPTCHA_SECRET is missing.", flush=True)
+ else:
+ payload = {
+ "secret": secret,
+ "response": request.POST.get("g-recaptcha-response", ""),
+ "remoteip": request.remote_addr,
+ }
+ response = requests.post(
+ "https://www.google.com/recaptcha/api/siteverify",
+ data=payload,
+ timeout=HTTP_TIMEOUT,
+ ).json()
+ if "success" not in response or not response["success"]:
+ if "error-codes" in response:
+ print(response["error-codes"])
+ request.session.flash("Captcha failed", "error")
+ return False
+ return True
+ return True
+
+
@notfound_view_config(renderer="notfound.mak")
def notfound_view(request):
request.response.status = 404
@@ -150,6 +176,161 @@ def login(request):
return {}
+@view_config(
+ route_name="forgot_password",
+ renderer="forgot_password.mak",
+ require_csrf=True,
+ request_method=("GET", "POST"),
+)
+def forgot_password(request):
+ userid = request.authenticated_userid
+ if userid:
+ return home(request)
+
+ if request.method == "POST":
+ if not run_captcha(request):
+ return {}
+
+ email = request.POST.get("email", "").strip()
+ email_is_valid, validated_email = email_valid(email)
+ if not email_is_valid:
+ request.session.flash("Error! Invalid email: " + validated_email, "error")
+ return {}
+
+ user = request.userdb.find_by_email(validated_email)
+ if user is not None:
+ token = secrets.token_urlsafe(32)
+ expires_at = datetime.now(UTC) + timedelta(
+ hours=PASSWORD_RESET_EXPIRY_HOURS
+ )
+ request.userdb.set_password_reset(user, token, expires_at)
+ reset_url = request.route_url("reset_password", token=token)
+ body = (
+ "We received a request to reset your Fishtest password.\n\n"
+ f"Reset link: {reset_url}\n\n"
+ "This link will expire in 1 hour.\n"
+ "For your security, do not share this link with anyone, as it can "
+ "be used to change your password.\n\n"
+ "If you did not request a password reset, you can ignore this "
+ "email or contact the site administrators for assistance."
+ )
+ try:
+ request.email_sender.send(
+ user["email"],
+ "Fishtest password reset",
+ body,
+ )
+ except Exception as e:
+ print(f"failed to send password reset email to {validated_email}: {e}")
+
+ request.session.flash(
+ "If that email exists, a reset link has been sent, please check your inbox."
+ )
+ return HTTPFound(location=request.route_url("tests"))
+ return {}
+
+
+@view_config(
+ route_name="reset_password",
+ renderer="reset_password.mak",
+ require_csrf=True,
+ request_method=("GET", "POST"),
+)
+def reset_password(request):
+ userid = request.authenticated_userid
+ if userid:
+ request.session.flash(
+ "You are already logged in. Use profile settings to change your password."
+ )
+ return home(request)
+
+ token = request.matchdict.get("token", "")
+ if not token:
+ raise HTTPNotFound()
+
+ now = datetime.now(UTC)
+ user = request.userdb.users.find_one(
+ {"password_reset.token": token, "password_reset.expires_at": {"$gte": now}}
+ )
+ if not user:
+ expired_cleanup = request.userdb.clear_expired_password_reset(token, now)
+ if expired_cleanup.matched_count:
+ request.session.flash("Reset link has expired.", "error")
+ return HTTPFound(location=request.route_url("forgot_password"))
+ request.session.flash(
+ "Invalid reset link. It may have been replaced by a newer reset request.",
+ "error",
+ )
+ return HTTPFound(location=request.route_url("login"))
+
+ if request.method == "GET":
+ if user.get("password_reset", {}).get("form_token"):
+ request.session.flash(
+ "Reset link has already been opened. Please request a new one.",
+ "error",
+ )
+ return HTTPFound(location=request.route_url("forgot_password"))
+ form_token = secrets.token_urlsafe(32)
+ set_form = request.userdb.set_password_reset_form_token(
+ user["_id"], token, form_token, now
+ )
+ if set_form.modified_count == 0:
+ request.session.flash(
+ "Reset link has already been opened. Please request a new one.",
+ "error",
+ )
+ return HTTPFound(location=request.route_url("forgot_password"))
+ return {"token": token, "form_token": form_token}
+
+ if request.method == "POST":
+ form_token = request.POST.get("form_token", "").strip()
+ if not form_token:
+ request.session.flash(
+ "Reset form is invalid. Please request a new reset link.",
+ "error",
+ )
+ return HTTPFound(location=request.route_url("forgot_password"))
+ user = request.userdb.users.find_one(
+ {
+ "password_reset.form_token": form_token,
+ "password_reset.expires_at": {"$gte": now},
+ }
+ )
+ if not user:
+ request.session.flash(
+ "Reset link has expired or already been used.",
+ "error",
+ )
+ return HTTPFound(location=request.route_url("forgot_password"))
+ new_password = request.POST.get("password", "").strip()
+ new_password_verify = request.POST.get("password2", "").strip()
+ if new_password != new_password_verify:
+ request.session.flash("Error! Matching verify password required", "error")
+ return {"token": token, "form_token": form_token}
+ strong_password, password_err = password_strength(
+ new_password, user["username"], user["email"]
+ )
+ if not strong_password:
+ request.session.flash("Error! Weak password: " + password_err, "error")
+ return {"token": token, "form_token": form_token}
+
+ update_result = request.userdb.update_password_with_reset_form_token(
+ user["_id"],
+ form_token,
+ new_password,
+ )
+ if update_result.modified_count == 0:
+ request.session.flash(
+ "Unable to reset password. The reset link may have expired or already been used.",
+ "error",
+ )
+ return HTTPFound(location=request.route_url("forgot_password"))
+ request.session.flash("Success! Password updated. Please login.")
+ return HTTPFound(location=request.route_url("login"))
+
+ return {"token": token}
+
+
# Note that the allowed length of mailto URLs on Chrome/Windows is severely
# limited.
@@ -395,25 +576,8 @@ def signup(request):
request.session.flash(error, "error")
return {}
- secret = os.environ.get("FISHTEST_CAPTCHA_SECRET")
- if not secret:
- print("FISHTEST_CAPTCHA_SECRET is missing.", flush=True)
- else:
- payload = {
- "secret": secret,
- "response": request.POST.get("g-recaptcha-response", ""),
- "remoteip": request.remote_addr,
- }
- response = requests.post(
- "https://www.google.com/recaptcha/api/siteverify",
- data=payload,
- timeout=HTTP_TIMEOUT,
- ).json()
- if "success" not in response or not response["success"]:
- if "error-codes" in response:
- print(response["error-codes"])
- request.session.flash("Captcha failed", "error")
- return {}
+ if not run_captcha(request):
+ return {}
result = request.userdb.create_user(
username=signup_username,
diff --git a/server/tests/test_users.py b/server/tests/test_users.py
index 1543ed3fed..de4c13e088 100644
--- a/server/tests/test_users.py
+++ b/server/tests/test_users.py
@@ -1,8 +1,10 @@
+import secrets
import unittest
-from datetime import UTC, datetime
+from datetime import UTC, datetime, timedelta
+from unittest.mock import patch
import util
-from fishtest.views import login, signup
+from fishtest.views import forgot_password, login, reset_password, signup
from pyramid import testing
@@ -109,5 +111,262 @@ def tearDown(self):
testing.tearDown()
+class _DummyEmailSender:
+ def __init__(self, should_fail=False):
+ self.should_fail = should_fail
+ self.sent = []
+
+ def send(self, to_email, subject, text, html=None, reply_to=None):
+ if self.should_fail:
+ raise Exception("boom")
+ self.sent.append(
+ {
+ "to_email": to_email,
+ "subject": subject,
+ "text": text,
+ "html": html,
+ "reply_to": reply_to,
+ }
+ )
+ return {"ok": True}
+
+
+class ForgotResetPasswordTest(unittest.TestCase):
+ def setUp(self):
+ self.rundb = util.get_rundb()
+ self.config = testing.setUp()
+ self.config.add_route("forgot_password", "/forgot_password")
+ self.config.add_route("reset_password", "/reset_password/{token}")
+ self.config.add_route("login", "/login")
+ self.config.add_route("tests", "/tests")
+ self.test_user = {
+ "username": "ResetUser",
+ "password": "secret",
+ "email": "reset@user.net",
+ "tests_repo": "https://github.com/official-stockfish/Stockfish",
+ }
+ self.rundb.userdb.create_user(**self.test_user)
+
+ def tearDown(self):
+ self.rundb.userdb.users.delete_many({"username": self.test_user["username"]})
+ self.rundb.userdb.user_cache.delete_many(
+ {"username": self.test_user["username"]}
+ )
+ testing.tearDown()
+
+ def test_forgot_password_valid_email(self):
+ email_sender = _DummyEmailSender()
+ request = testing.DummyRequest(
+ userdb=self.rundb.userdb,
+ email_sender=email_sender,
+ method="POST",
+ params={"email": self.test_user["email"]},
+ remote_addr="127.0.0.1",
+ )
+ forgot_password(request)
+ self.assertEqual(len(email_sender.sent), 1)
+ user = self.rundb.userdb.find_by_email(self.test_user["email"])
+ self.assertIn("password_reset", user)
+ self.assertIn("token", user["password_reset"])
+ self.assertIn("expires_at", user["password_reset"])
+ self.assertGreater(user["password_reset"]["expires_at"], datetime.now(UTC))
+ self.assertIn(
+ "If that email exists, a reset link has been sent, please check your inbox.",
+ request.session.pop_flash()[0],
+ )
+
+ def test_forgot_password_invalid_email(self):
+ email_sender = _DummyEmailSender()
+ request = testing.DummyRequest(
+ userdb=self.rundb.userdb,
+ email_sender=email_sender,
+ method="POST",
+ params={"email": "not-an-email"},
+ remote_addr="127.0.0.1",
+ )
+ forgot_password(request)
+ self.assertEqual(len(email_sender.sent), 0)
+ self.assertIn("Error! Invalid email:", request.session.pop_flash("error")[0])
+
+ def test_forgot_password_nonexistent_email(self):
+ email_sender = _DummyEmailSender()
+ request = testing.DummyRequest(
+ userdb=self.rundb.userdb,
+ email_sender=email_sender,
+ method="POST",
+ params={"email": "missing-user@example.net"},
+ remote_addr="127.0.0.1",
+ )
+ with patch(
+ "fishtest.views.email_valid",
+ return_value=(True, "missing-user@example.net"),
+ ):
+ forgot_password(request)
+ self.assertEqual(len(email_sender.sent), 0)
+ self.assertIn(
+ "If that email exists, a reset link has been sent, please check your inbox.",
+ request.session.pop_flash()[0],
+ )
+
+ def test_forgot_password_email_send_error(self):
+ email_sender = _DummyEmailSender(should_fail=True)
+ request = testing.DummyRequest(
+ userdb=self.rundb.userdb,
+ email_sender=email_sender,
+ method="POST",
+ params={"email": self.test_user["email"]},
+ remote_addr="127.0.0.1",
+ )
+ forgot_password(request)
+ user = self.rundb.userdb.find_by_email(self.test_user["email"])
+ self.assertIn("password_reset", user)
+ self.assertIn(
+ "If that email exists, a reset link has been sent, please check your inbox.",
+ request.session.pop_flash()[0],
+ )
+
+ def test_reset_password_expired_token(self):
+ token = secrets.token_urlsafe(32)
+ user = self.rundb.userdb.find_by_email(self.test_user["email"])
+ expires_at = datetime.now(UTC) - timedelta(minutes=1)
+ self.rundb.userdb.set_password_reset(user, token, expires_at)
+ request = testing.DummyRequest(
+ userdb=self.rundb.userdb,
+ method="GET",
+ matchdict={"token": token},
+ remote_addr="127.0.0.1",
+ )
+ response = reset_password(request)
+ self.assertEqual(response.location, request.route_url("forgot_password"))
+ user = self.rundb.userdb.find_by_email(self.test_user["email"])
+ self.assertNotIn("password_reset", user)
+ self.assertIn(
+ "Reset link has expired.",
+ request.session.pop_flash("error")[0],
+ )
+
+ def test_reset_password_invalid_token(self):
+ token = secrets.token_urlsafe(32)
+ request = testing.DummyRequest(
+ userdb=self.rundb.userdb,
+ method="GET",
+ matchdict={"token": token},
+ remote_addr="127.0.0.1",
+ )
+ response = reset_password(request)
+ self.assertEqual(response.location, request.route_url("login"))
+ self.assertIn(
+ "Invalid reset link. It may have been replaced by a newer reset request.",
+ request.session.pop_flash("error")[0],
+ )
+
+ def test_reset_password_token_invalid_after_use(self):
+ token = secrets.token_urlsafe(32)
+ user = self.rundb.userdb.find_by_email(self.test_user["email"])
+ expires_at = datetime.now(UTC) + timedelta(hours=1)
+ self.rundb.userdb.set_password_reset(user, token, expires_at)
+ get_request = testing.DummyRequest(
+ userdb=self.rundb.userdb,
+ method="GET",
+ matchdict={"token": token},
+ remote_addr="127.0.0.1",
+ )
+ get_response = reset_password(get_request)
+ form_token = get_response["form_token"]
+ new_password = "CorrectHorseBatteryStaple123!@#"
+ request = testing.DummyRequest(
+ userdb=self.rundb.userdb,
+ method="POST",
+ matchdict={"token": token},
+ params={
+ "password": new_password,
+ "password2": new_password,
+ "form_token": form_token,
+ },
+ remote_addr="127.0.0.1",
+ )
+ response = reset_password(request)
+ self.assertEqual(response.location, request.route_url("login"))
+ user = self.rundb.userdb.find_by_email(self.test_user["email"])
+ self.assertNotIn("password_reset", user)
+ self.assertEqual(user["password"], new_password)
+
+ request = testing.DummyRequest(
+ userdb=self.rundb.userdb,
+ method="GET",
+ matchdict={"token": token},
+ remote_addr="127.0.0.1",
+ )
+ response = reset_password(request)
+ self.assertEqual(response.location, request.route_url("login"))
+ self.assertIn(
+ "Invalid reset link. It may have been replaced by a newer reset request.",
+ request.session.pop_flash("error")[0],
+ )
+
+ def test_reset_password_weak_password(self):
+ token = secrets.token_urlsafe(32)
+ user = self.rundb.userdb.find_by_email(self.test_user["email"])
+ expires_at = datetime.now(UTC) + timedelta(hours=1)
+ self.rundb.userdb.set_password_reset(user, token, expires_at)
+ get_request = testing.DummyRequest(
+ userdb=self.rundb.userdb,
+ method="GET",
+ matchdict={"token": token},
+ remote_addr="127.0.0.1",
+ )
+ get_response = reset_password(get_request)
+ form_token = get_response["form_token"]
+ request = testing.DummyRequest(
+ userdb=self.rundb.userdb,
+ method="POST",
+ matchdict={"token": token},
+ params={
+ "password": "short",
+ "password2": "short",
+ "form_token": form_token,
+ },
+ remote_addr="127.0.0.1",
+ )
+ response = reset_password(request)
+ self.assertEqual(response, {"token": token, "form_token": form_token})
+ user = self.rundb.userdb.find_by_email(self.test_user["email"])
+ self.assertIn("password_reset", user)
+ self.assertIn("Error! Weak password:", request.session.pop_flash("error")[0])
+
+ def test_reset_password_mismatch(self):
+ token = secrets.token_urlsafe(32)
+ user = self.rundb.userdb.find_by_email(self.test_user["email"])
+ expires_at = datetime.now(UTC) + timedelta(hours=1)
+ self.rundb.userdb.set_password_reset(user, token, expires_at)
+ get_request = testing.DummyRequest(
+ userdb=self.rundb.userdb,
+ method="GET",
+ matchdict={"token": token},
+ remote_addr="127.0.0.1",
+ )
+ get_response = reset_password(get_request)
+ form_token = get_response["form_token"]
+ request = testing.DummyRequest(
+ userdb=self.rundb.userdb,
+ method="POST",
+ matchdict={"token": token},
+ params={
+ "password": "MismatchPassword123!",
+ "password2": "Different123!",
+ "form_token": form_token,
+ },
+ remote_addr="127.0.0.1",
+ )
+ response = reset_password(request)
+ self.assertEqual(response, {"token": token, "form_token": form_token})
+ user = self.rundb.userdb.find_by_email(self.test_user["email"])
+ self.assertNotEqual(user["password"], "MismatchPassword123!")
+ self.assertIn(
+ "Error! Matching verify password required",
+ request.session.pop_flash("error")[0],
+ )
+
+
if __name__ == "__main__":
unittest.main()