Skip to content

Commit 942d2f8

Browse files
committed
feat: user and admin interfaces for account assoications
Signed-off-by: Mike Fiedler <miketheman@gmail.com>
1 parent 1cc096b commit 942d2f8

File tree

5 files changed

+541
-1
lines changed

5 files changed

+541
-1
lines changed
Lines changed: 340 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,340 @@
1+
# SPDX-License-Identifier: Apache-2.0
2+
3+
import re
4+
import time
5+
6+
from http import HTTPStatus
7+
8+
from urllib3.util import parse_url
9+
10+
from warehouse.accounts.models import UniqueLoginStatus
11+
from warehouse.utils.otp import _get_totp
12+
13+
from ...common.constants import REMOTE_ADDR
14+
from ...common.db.accounts import (
15+
OAuthAccountAssociationFactory,
16+
UserFactory,
17+
UserUniqueLoginFactory,
18+
)
19+
from ...common.db.ip_addresses import IpAddressFactory
20+
21+
22+
class TestAccountAssociations:
23+
def _login_user(self, webtest, user):
24+
"""Helper method to log in a user with 2FA."""
25+
# Pre-create confirmed unique login for the test IP
26+
ip_address = IpAddressFactory.create(ip_address=REMOTE_ADDR)
27+
UserUniqueLoginFactory.create(
28+
user=user,
29+
ip_address=ip_address,
30+
status=UniqueLoginStatus.CONFIRMED,
31+
)
32+
33+
# Login - no device confirmation needed since login is pre-confirmed
34+
login_page = webtest.get("/account/login/", status=HTTPStatus.OK)
35+
login_form = login_page.forms["login-form"]
36+
login_form["username"] = user.username
37+
login_form["password"] = "password"
38+
two_factor_page = login_form.submit().follow(status=HTTPStatus.OK)
39+
40+
two_factor_form = two_factor_page.forms["totp-auth-form"]
41+
two_factor_form["totp_value"] = (
42+
_get_totp(user.totp_secret).generate(time.time()).decode()
43+
)
44+
two_factor_form.submit().follow(status=HTTPStatus.OK)
45+
46+
def test_view_account_associations_page(self, webtest):
47+
"""A user can view the account settings page with associations section."""
48+
# Create a user with a GitHub association
49+
user = UserFactory.create(
50+
with_verified_primary_email=True,
51+
with_terms_of_service_agreement=True,
52+
clear_pwd="password",
53+
)
54+
OAuthAccountAssociationFactory.create(
55+
user=user,
56+
service="github",
57+
external_username="testuser",
58+
)
59+
60+
# Login
61+
self._login_user(webtest, user)
62+
63+
# Visit account settings page
64+
account_page = webtest.get("/manage/account/", status=HTTPStatus.OK)
65+
66+
# Verify associations section is present
67+
assert "Account associations" in account_page.text
68+
assert "testuser" in account_page.text
69+
assert "github" in account_page.text.lower()
70+
71+
def test_connect_github_account(self, webtest):
72+
"""A user can connect a GitHub account via OAuth."""
73+
user = UserFactory.create(
74+
with_verified_primary_email=True,
75+
with_terms_of_service_agreement=True,
76+
clear_pwd="password",
77+
)
78+
# NullOAuthClient is configured via GITHUB_OAUTH_BACKEND in dev/environment
79+
80+
# Login
81+
self._login_user(webtest, user)
82+
83+
# Click "Connect GitHub" button (initiates OAuth flow)
84+
# NullOAuthClient will redirect immediately to callback with mock code
85+
connect_response = webtest.get(
86+
"/manage/account/associations/github/connect",
87+
status=HTTPStatus.SEE_OTHER,
88+
)
89+
90+
# Follow the redirect to the callback URL
91+
# NullOAuthClient creates a redirect to callback with mock parameters
92+
# Extract path from absolute URL to preserve webtest session
93+
parsed_url = parse_url(connect_response.location)
94+
callback_response = webtest.get(
95+
parsed_url.path,
96+
params=parsed_url.query,
97+
status=HTTPStatus.SEE_OTHER,
98+
)
99+
100+
# Follow redirect back to account page
101+
account_page = callback_response.follow(status=HTTPStatus.OK)
102+
103+
# Verify association was created
104+
assert "Account associations" in account_page.text
105+
assert "mockuser_" in account_page.text # NullOAuthClient generates mock users
106+
107+
def test_connect_github_account_invalid_state(self, webtest):
108+
"""OAuth flow rejects requests with invalid state tokens (CSRF protection)."""
109+
user = UserFactory.create(
110+
with_verified_primary_email=True,
111+
with_terms_of_service_agreement=True,
112+
clear_pwd="password",
113+
)
114+
115+
# Login
116+
self._login_user(webtest, user)
117+
118+
# Try to access callback directly with invalid state
119+
callback_response = webtest.get(
120+
"/manage/account/associations/github/callback",
121+
params={"code": "test", "state": "invalid"},
122+
status=HTTPStatus.SEE_OTHER,
123+
)
124+
125+
# Should redirect to account page with error
126+
account_page = callback_response.follow(status=HTTPStatus.OK)
127+
# Verify no association was created - user has no connected accounts
128+
assert "You have not connected" in account_page.text
129+
130+
def test_delete_account_association(self, webtest):
131+
"""A user can delete an account association."""
132+
user = UserFactory.create(
133+
with_verified_primary_email=True,
134+
with_terms_of_service_agreement=True,
135+
clear_pwd="password",
136+
)
137+
association = OAuthAccountAssociationFactory.create(
138+
user=user,
139+
service="github",
140+
external_username="testuser",
141+
)
142+
143+
# Login
144+
self._login_user(webtest, user)
145+
146+
# Visit account settings page
147+
account_page = webtest.get("/manage/account/", status=HTTPStatus.OK)
148+
149+
# Verify association is present
150+
assert "testuser" in account_page.text
151+
152+
# Re-authenticate for dangerous action (simulate confirm prompt)
153+
# In the real UI, this happens via a modal, but we'll POST directly
154+
confirm_page = webtest.get("/manage/account/", status=HTTPStatus.OK)
155+
csrf_token = confirm_page.html.find("input", {"name": "csrf_token"})["value"]
156+
157+
# Submit delete form
158+
delete_response = webtest.post(
159+
"/manage/account/associations/delete",
160+
{"csrf_token": csrf_token, "association_id": str(association.id)},
161+
status=HTTPStatus.SEE_OTHER,
162+
)
163+
164+
# Follow redirect back to account page
165+
account_page = delete_response.follow(status=HTTPStatus.OK)
166+
167+
# Verify association is gone from the associations section
168+
# (username may still appear in security history showing the removal event)
169+
assert "You have not connected any external accounts yet" in account_page.text
170+
171+
def test_cannot_delete_other_users_association(self, webtest):
172+
"""A user cannot delete another user's account association."""
173+
user1 = UserFactory.create(
174+
with_verified_primary_email=True,
175+
with_terms_of_service_agreement=True,
176+
clear_pwd="password",
177+
)
178+
user2 = UserFactory.create(
179+
with_verified_primary_email=True,
180+
with_terms_of_service_agreement=True,
181+
)
182+
association = OAuthAccountAssociationFactory.create(
183+
user=user2, service="github", external_username="user2github"
184+
)
185+
186+
# Login as user1
187+
self._login_user(webtest, user1)
188+
189+
# Get CSRF token
190+
account_page = webtest.get("/manage/account/", status=HTTPStatus.OK)
191+
csrf_token = account_page.html.find("input", {"name": "csrf_token"})["value"]
192+
193+
# Try to delete user2's association
194+
# Should redirect back to account page with error message
195+
delete_response = webtest.post(
196+
"/manage/account/associations/delete",
197+
{"csrf_token": csrf_token, "association_id": str(association.id)},
198+
status=HTTPStatus.SEE_OTHER,
199+
)
200+
delete_response.follow(status=HTTPStatus.OK)
201+
202+
# Check flash messages for the error
203+
flash_messages = webtest.get(
204+
"/_includes/unauthed/flash-messages/", status=HTTPStatus.OK
205+
)
206+
error_message = flash_messages.html.find(
207+
"span", {"class": "notification-bar__message"}
208+
)
209+
assert error_message is not None
210+
assert "Failed to remove account association" in error_message.text
211+
212+
def test_multiple_github_accounts_per_user(self, webtest):
213+
"""A user can connect multiple GitHub accounts (different external IDs)."""
214+
user = UserFactory.create(
215+
with_verified_primary_email=True,
216+
with_terms_of_service_agreement=True,
217+
clear_pwd="password",
218+
)
219+
220+
# Login
221+
self._login_user(webtest, user)
222+
223+
# Connect first GitHub account
224+
connect_response1 = webtest.get(
225+
"/manage/account/associations/github/connect",
226+
status=HTTPStatus.SEE_OTHER,
227+
)
228+
# Extract path from absolute URL to preserve webtest session
229+
parsed_url1 = parse_url(connect_response1.location)
230+
callback_response1 = webtest.get(
231+
parsed_url1.path,
232+
params=parsed_url1.query,
233+
status=HTTPStatus.SEE_OTHER,
234+
)
235+
account_page1 = callback_response1.follow(status=HTTPStatus.OK)
236+
# Get first mockuser name
237+
mockuser_match1 = re.search(r"mockuser_\w+", account_page1.text)
238+
assert mockuser_match1 is not None
239+
first_mockuser = mockuser_match1.group()
240+
241+
# Connect second GitHub account
242+
# NullOAuthClient generates different mock users each time
243+
connect_response2 = webtest.get(
244+
"/manage/account/associations/github/connect",
245+
status=HTTPStatus.SEE_OTHER,
246+
)
247+
# Extract path from absolute URL to preserve webtest session
248+
parsed_url2 = parse_url(connect_response2.location)
249+
callback_response2 = webtest.get(
250+
parsed_url2.path,
251+
params=parsed_url2.query,
252+
status=HTTPStatus.SEE_OTHER,
253+
)
254+
account_page2 = callback_response2.follow(status=HTTPStatus.OK)
255+
256+
# Verify both associations appear on the page
257+
assert first_mockuser in account_page2.text
258+
mockuser_match2 = re.search(
259+
r"mockuser_\w+", account_page2.text.replace(first_mockuser, "")
260+
)
261+
assert mockuser_match2 is not None # Found a second different mockuser
262+
263+
def test_github_oauth_error_response(self, webtest):
264+
"""OAuth flow handles error responses from GitHub."""
265+
user = UserFactory.create(
266+
with_verified_primary_email=True,
267+
with_terms_of_service_agreement=True,
268+
clear_pwd="password",
269+
)
270+
271+
self._login_user(webtest, user)
272+
273+
# Start OAuth flow to get a valid state token
274+
connect_response = webtest.get(
275+
"/manage/account/associations/github/connect", status=HTTPStatus.SEE_OTHER
276+
)
277+
# Extract state from the redirect URL
278+
parsed_url = parse_url(connect_response.location)
279+
state = parsed_url.query.split("state=")[1].split("&")[0]
280+
281+
# Simulate OAuth error response with valid state
282+
callback_response = webtest.get(
283+
"/manage/account/associations/github/callback",
284+
params={
285+
"state": state,
286+
"error": "access_denied",
287+
"error_description": "User declined",
288+
},
289+
status=HTTPStatus.SEE_OTHER,
290+
)
291+
callback_response.follow(status=HTTPStatus.OK)
292+
293+
# Check flash messages for the error
294+
flash_messages = webtest.get(
295+
"/_includes/unauthed/flash-messages/", status=HTTPStatus.OK
296+
)
297+
error_message = flash_messages.html.find(
298+
"span", {"class": "notification-bar__message"}
299+
)
300+
assert error_message is not None
301+
assert "GitHub OAuth failed" in error_message.text
302+
assert "User declined" in error_message.text
303+
304+
def test_github_oauth_missing_code(self, webtest):
305+
"""OAuth flow handles missing authorization code."""
306+
user = UserFactory.create(
307+
with_verified_primary_email=True,
308+
with_terms_of_service_agreement=True,
309+
clear_pwd="password",
310+
)
311+
312+
self._login_user(webtest, user)
313+
314+
# Start OAuth flow to get a valid state token
315+
connect_response = webtest.get(
316+
"/manage/account/associations/github/connect",
317+
status=HTTPStatus.SEE_OTHER,
318+
)
319+
# Extract state from the redirect URL
320+
parsed_url = parse_url(connect_response.location)
321+
state = parsed_url.query.split("state=")[1].split("&")[0]
322+
323+
# Call callback with valid state but no code
324+
callback_response = webtest.get(
325+
"/manage/account/associations/github/callback",
326+
params={"state": state},
327+
status=HTTPStatus.SEE_OTHER,
328+
)
329+
callback_response.follow(status=HTTPStatus.OK)
330+
331+
# Check flash messages for the error
332+
flash_messages = webtest.get(
333+
"/_includes/unauthed/flash-messages/",
334+
status=HTTPStatus.OK,
335+
)
336+
error_message = flash_messages.html.find(
337+
"span", {"class": "notification-bar__message"}
338+
)
339+
assert error_message is not None
340+
assert "No authorization code received from GitHub" in error_message.text

tests/unit/manage/test_views.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,12 @@ class TestManageAccount:
126126
)
127127
def test_default_response(self, monkeypatch, public_email, expected_public_email):
128128
breach_service = pretend.stub()
129-
user_service = pretend.stub()
129+
account_associations = pretend.stub()
130+
user_service = pretend.stub(
131+
get_account_associations=pretend.call_recorder(
132+
lambda user_id: account_associations
133+
)
134+
)
130135
organization_service = pretend.stub()
131136
name = pretend.stub()
132137
user_id = pretend.stub()
@@ -161,9 +166,11 @@ def test_default_response(self, monkeypatch, public_email, expected_public_email
161166
"add_email_form": add_email_obj,
162167
"change_password_form": change_pass_obj,
163168
"active_projects": view.active_projects,
169+
"account_associations": account_associations,
164170
}
165171
assert view.request == request
166172
assert view.user_service == user_service
173+
assert user_service.get_account_associations.calls == [pretend.call(user_id)]
167174
assert save_account_cls.calls == [
168175
pretend.call(
169176
name=name,

0 commit comments

Comments
 (0)