Skip to content

Logout button is just reauthenticating when using Zitadel OAuth #1604

@haripriyacv

Description

@haripriyacv

Hi,

I am using zitadel oAuth for my Querybook instance. The authentication is working fine, but when I logout, it simply reauthenticates and land me in the same page.

Could you give some insights on this issue?

import certifi
import requests


from flask import Markup, request, session as flask_session, redirect
import flask_login
from requests_oauthlib import OAuth2Session
from const.user_roles import UserRoleType

from models.user import  UserRole

from app.db import with_session, DBSession
from env import QuerybookSettings
from lib.logger import get_logger
from logic.user import (
    get_user_by_name,
    create_user,
    create_user_role,    
)
from .utils import (
    AuthenticationError,
    AuthUser,
    abort_unauthorized,
    QuerybookLoginManager,
)

LOG = get_logger(__file__)

OAUTH_CALLBACK_PATH = "/oauth2callback"


class OAuthLoginManager(object):
    def __init__(self):
        self.login_manager = QuerybookLoginManager()
        self.flask_app = None

    @property
    def oauth_session(self):
        oauth_config = self.oauth_config
        return OAuth2Session(
            oauth_config["client_id"],
            scope=oauth_config["scope"],
            redirect_uri=oauth_config["callback_url"],
        )


    @property
    def oauth_config(self):
        return {
            "callback_url": "{}{}".format(
                QuerybookSettings.PUBLIC_URL, OAUTH_CALLBACK_PATH
            ),
            "client_id": QuerybookSettings.OAUTH_CLIENT_ID,
            "client_secret": QuerybookSettings.OAUTH_CLIENT_SECRET,
            "authorization_url": QuerybookSettings.OAUTH_AUTHORIZATION_URL,
            "token_url": QuerybookSettings.OAUTH_TOKEN_URL,
            "profile_url": QuerybookSettings.OAUTH_USER_PROFILE,
            "scope": "openid email profile",
        }

    def init_app(self, flask_app):
        self.flask_app = flask_app

        self.login_manager.init_app(self.flask_app)
        self.flask_app.add_url_rule(
            OAUTH_CALLBACK_PATH, "oauth_callback", self.oauth_callback
        )

    def login(self, request):
        oauth_url, _ = self._get_authn_url()
        flask_session["next"] = request.path
        return redirect(oauth_url)

    def _get_authn_url(self):
        return self.oauth_session.authorization_url(
            self.oauth_config["authorization_url"]
        )

    def oauth_callback(self):
        LOG.debug("Handling Oauth callback...")

        if request.args.get("error"):
            return f"<h1>Error: {Markup.escape(request.args.get('error'))}</h1>"

        code = request.args.get("code")
        try:
            access_token = self._fetch_access_token(code)
            username, email, role = self._get_user_profile(access_token)
            with DBSession() as session:
                flask_login.login_user(
                    AuthUser(self.login_user(username, email, role, session=session))
                )
        except AuthenticationError as e:
            LOG.error("Failed authenticate oauth user", e)
            abort_unauthorized()

        next_url = QuerybookSettings.PUBLIC_URL
        if "next" in flask_session:
            next_url = flask_session["next"]
            del flask_session["next"]

        return redirect(next_url)

    def _fetch_access_token(self, code):
        # Prepare the data for the token request
        data = {
            "grant_type": "authorization_code",  # Explicitly specify the grant type
            "client_id": self.oauth_config["client_id"],
            "client_secret": self.oauth_config["client_secret"],
            "code": code,
            "redirect_uri": self.oauth_config["callback_url"],
        }

        # Make the POST request to the token URL
        headers = {"Content-Type": "application/x-www-form-urlencoded"}
        resp = requests.post(
            self.oauth_config["token_url"],
            data=data,
            headers=headers,
            # verify=False,  # Disable SSL verification
        )

        # Check for errors in the response
        if resp.status_code != 200:
            LOG.error(
                "Failed to fetch access token, status: %s, response: %s",
                resp.status_code,
                resp.text,
            )
            raise AuthenticationError("Failed to fetch access token.")

        # Return the access token from the response
        return resp.json().get("access_token")

    def _get_user_profile(self, access_token):
        resp = requests.get(
            self.oauth_config["profile_url"],
            headers={"Authorization": "Bearer {}".format(access_token)},
        )
        if not resp or resp.status_code != 200:
            raise AuthenticationError(
                "Failed to fetch user profile, status ({0})".format(
                    resp.status if resp else "None"
                )
            )
        return self._parse_user_profile(resp)

    def _parse_user_profile(self, profile_response):
        user = profile_response.json()
        username = user.get("preferred_username", "unknown_user")
        email = user.get("email", None)
        roles = user.get("roles", [])
        role = roles[0]["role"] if roles and "role" in roles[0] else "USER"
        return username, email, role

    def get_user_by_id(self, uid, session=None):
        return session.query(UserRole).filter(UserRole.uid == uid).all()        

    @with_session
    def login_user(self, username, email, role, session=None):
        if not username or not isinstance(username, str):
            raise AuthenticationError("Please provide a valid username")

        user = get_user_by_name(username, session=session)
        if not user:
            user = create_user(
                username=username, 
                fullname=username, 
                email=email, 
                session=session
            )
            # Get user roles from OAuth profile
        existing_roles = self.get_user_by_id(user.id, session=session)
        if role== "ADMIN" and not existing_roles:
                    create_user_role(
                        uid=user.id,
                        role=UserRoleType.ADMIN,
                        commit=True,
                        session=session
                    )            
        return user
    

        # Use the end_session_endpoi


login_manager = OAuthLoginManager()

ignore_paths = [OAUTH_CALLBACK_PATH]


def init_app(app):
    login_manager.init_app(app)


def login(request):
    return login_manager.login(request)


def oauth_authorization_url():
    return login_manager._get_authn_url()

`

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions