o
    T۷i!                     @  s   d dl mZ d dlZd dlZd dlmZ d dlmZ d dl	m
Z
 d dlmZmZ ddlmZmZ dd	lmZ dd
lmZmZ ddlmZ G dd dZdS )    )annotationsN)	lru_cache)
SSLContext)Any)	HTTPErrorURLError   )PyJWKPyJWKSet)decode_complete)PyJWKClientConnectionErrorPyJWKClientError)JWKSetCachec                   @  sj   e Zd Z							d.d/ddZd0ddZd1d2ddZd1d3d d!Zd4d$d%Zd5d(d)Ze	d6d,d-Z
dS )7PyJWKClientF   T,  N   uristr
cache_keysboolmax_cached_keysintcache_jwk_setlifespanfloatheadersdict[str, Any] | Nonetimeoutssl_contextSSLContext | Nonec	           
      C  sz   |du ri }|| _ d| _|| _|| _|| _|r)|dkr#td| dt|| _nd| _|r;t|d| j}	|	| _dS dS )u  A client for retrieving signing keys from a JWKS endpoint.

        ``PyJWKClient`` uses a two-tier caching system to avoid unnecessary
        network requests:

        **Tier 1 — JWK Set cache** (enabled by default):
        Caches the entire JSON Web Key Set response from the endpoint.
        Controlled by:

        - ``cache_jwk_set``: Set to ``True`` (the default) to enable this
          cache. When enabled, the JWK Set is fetched from the network only
          when the cache is empty or expired.
        - ``lifespan``: Time in seconds before the cached JWK Set expires.
          Defaults to ``300`` (5 minutes). Must be greater than 0.

        **Tier 2 — Signing key cache** (disabled by default):
        Caches individual signing keys (looked up by ``kid``) using an LRU
        cache with **no time-based expiration**. Keys are evicted only when
        the cache reaches its maximum size. Controlled by:

        - ``cache_keys``: Set to ``True`` to enable this cache.
          Defaults to ``False``.
        - ``max_cached_keys``: Maximum number of signing keys to keep in
          the LRU cache. Defaults to ``16``.

        :param uri: The URL of the JWKS endpoint.
        :type uri: str
        :param cache_keys: Enable the per-key LRU cache (Tier 2).
        :type cache_keys: bool
        :param max_cached_keys: Max entries in the signing key LRU cache.
        :type max_cached_keys: int
        :param cache_jwk_set: Enable the JWK Set response cache (Tier 1).
        :type cache_jwk_set: bool
        :param lifespan: TTL in seconds for the JWK Set cache.
        :type lifespan: float
        :param headers: Optional HTTP headers to include in requests.
        :type headers: dict or None
        :param timeout: HTTP request timeout in seconds.
        :type timeout: float
        :param ssl_context: Optional SSL context for the request.
        :type ssl_context: ssl.SSLContext or None
        Nr   z/Lifespan must be greater than 0, the input is "")maxsize)	r   jwk_set_cacher   r   r   r   r   r   get_signing_key)
selfr   r   r   r   r   r   r   r   r$    r&   E/home/ubuntu/vllm_env/lib/python3.10/site-packages/jwt/jwks_client.py__init__   s$   5

zPyJWKClient.__init__returnr   c              
   C  s   d}z\z,t jj| j| jd}t jj|| j| jd}t	|}W d   n1 s)w   Y  W n  t
tfyO } zt|trB|  td| d|d}~ww |W | jdur^| j| S S | jdurk| j| w w )ae  Fetch the JWK Set from the JWKS endpoint.

        Makes an HTTP request to the configured ``uri`` and returns the
        parsed JSON response. If the JWK Set cache is enabled, the
        response is stored in the cache.

        :returns: The parsed JWK Set as a dictionary.
        :raises PyJWKClientConnectionError: If the HTTP request fails.
        N)urlr   )r   contextz'Fail to fetch data from the url, err: "r!   )urllibrequestRequestr   r   urlopenr   r   jsonloadr   TimeoutError
isinstancer   closer   r#   put)r%   jwk_setrresponseer&   r&   r'   
fetch_data_   s4   




zPyJWKClient.fetch_datarefreshr
   c                 C  sH   d}| j dur|s| j  }|du r|  }t|tstdt|S )aN  Return the JWK Set, using the cache when available.

        :param refresh: Force a fresh fetch from the endpoint, bypassing
            the cache.
        :type refresh: bool
        :returns: The JWK Set.
        :rtype: PyJWKSet
        :raises PyJWKClientError: If the endpoint does not return a JSON
            object.
        Nz.The JWKS endpoint did not return a JSON object)r#   getr:   r3   dictr   r
   	from_dict)r%   r;   datar&   r&   r'   get_jwk_set|   s   


zPyJWKClient.get_jwk_setlist[PyJWK]c                 C  s*   |  |}dd |jD }|std|S )a  Return all signing keys from the JWK Set.

        Filters the JWK Set to keys whose ``use`` is ``"sig"`` (or
        unspecified) and that have a ``kid``.

        :param refresh: Force a fresh fetch from the endpoint, bypassing
            the cache.
        :type refresh: bool
        :returns: A list of signing keys.
        :rtype: list[PyJWK]
        :raises PyJWKClientError: If no signing keys are found.
        c                 S  s    g | ]}|j d v r|jr|qS ))sigN)public_key_usekey_id).0jwk_set_keyr&   r&   r'   
<listcomp>   s
    z0PyJWKClient.get_signing_keys.<locals>.<listcomp>z2The JWKS endpoint did not contain any signing keys)r@   keysr   )r%   r;   r6   signing_keysr&   r&   r'   get_signing_keys   s   
zPyJWKClient.get_signing_keyskidr	   c                 C  sH   |   }| ||}|s"| j dd}| ||}|s"td| d|S )a  Return the signing key matching the given ``kid``.

        If no match is found in the current JWK Set, the set is
        refreshed from the endpoint and the lookup is retried once.

        :param kid: The key ID to look up.
        :type kid: str
        :returns: The matching signing key.
        :rtype: PyJWK
        :raises PyJWKClientError: If no matching key is found after
            refreshing.
        T)r;   z,Unable to find a signing key that matches: "r!   )rJ   	match_kidr   )r%   rK   rI   signing_keyr&   r&   r'   r$      s   
zPyJWKClient.get_signing_keytokenstr | bytesc                 C  s(   t |ddid}|d }| |dS )aG  Return the signing key for a JWT by reading its ``kid`` header.

        Extracts the ``kid`` from the token's unverified header and
        delegates to :meth:`get_signing_key`.

        :param token: The encoded JWT.
        :type token: str or bytes
        :returns: The matching signing key.
        :rtype: PyJWK
        verify_signatureF)optionsheaderrK   )decode_tokenr$   r<   )r%   rN   
unverifiedrR   r&   r&   r'   get_signing_key_from_jwt   s   z$PyJWKClient.get_signing_key_from_jwtrI   PyJWK | Nonec                 C  s&   d}| D ]}|j |kr|} |S q|S )a7  Find a key in *signing_keys* that matches *kid*.

        :param signing_keys: The list of keys to search.
        :type signing_keys: list[PyJWK]
        :param kid: The key ID to match.
        :type kid: str
        :returns: The matching key, or ``None`` if not found.
        :rtype: PyJWK or None
        N)rD   )rI   rK   rM   keyr&   r&   r'   rL      s   
zPyJWKClient.match_kid)Fr   Tr   Nr   N)r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r    )r)   r   )F)r;   r   r)   r
   )r;   r   r)   rA   )rK   r   r)   r	   )rN   rO   r)   r	   )rI   rA   rK   r   r)   rV   )__name__
__module____qualname__r(   r:   r@   rJ   r$   rU   staticmethodrL   r&   r&   r&   r'   r      s     
N

r   )
__future__r   r0   urllib.requestr,   	functoolsr   sslr   typingr   urllib.errorr   r   api_jwkr	   r
   api_jwtr   rS   
exceptionsr   r   r#   r   r   r&   r&   r&   r'   <module>   s    