o
    T۷iU                     @  s  U d dl mZ d dlZd dlZd dlZd dlmZ d dlmZm	Z	m
Z
 d dlmZmZmZ d dlmZmZmZmZ ddlmZmZmZ dd	lmZmZmZmZmZmZmZmZm Z  dd
lm!Z! esie"e#ddrd dl$Z$e$j%dkryd dlm&Z& nd dl'm&Z& ddl(m)Z)m*Z* ddl+m,Z, ddl-m.Z.m/Z/m0Z0 ee)e,e1e2f Z3de4d< ee*e,e1e2f Z5de4d< G dd dZ6e6 Z7ee7_8e7j9Z9e7j:Z:e7j;Z;dS )    )annotationsN)timegm)	ContainerIterableSequence)datetime	timedeltatimezone)TYPE_CHECKINGAnyUnioncast   )PyJWS_ALGORITHM_UNSET_jws_global_obj)	DecodeErrorExpiredSignatureErrorImmatureSignatureErrorInvalidAudienceErrorInvalidIssuedAtErrorInvalidIssuerErrorInvalidJTIErrorInvalidSubjectErrorMissingRequiredClaimError)RemovedInPyjwt3WarningSPHINX_BUILD )   
   )	TypeAlias)AllowedPrivateKeysAllowedPublicKeys)PyJWK)FullOptionsOptions
SigOptionsr    AllowedPrivateKeyTypesAllowedPublicKeyTypesc                   @  s  e Zd ZdZd[ddZed\d	d
Zd]ddZdZd^ddZedddfd_ddZ			d`dad!d"Z
	#								$dbdcd7d8Zddd:d;Z	#								$dbded=d>Z				$dfdgdAdBZdhdEdFZ	dZdidGdHZdjdIdJZdkdMdNZdkdOdPZdkdQdRZdSdTdldVdWZdmdXdYZdS )nPyJWTNoptionsOptions | NonereturnNonec                 C  s6   |  |   | _|d ur| || _t|  d| _d S )N)r*   )_get_default_optionsr*   _merge_optionsr   _get_sig_options_jwsselfr*    r4   A/home/ubuntu/vllm_env/lib/python3.10/site-packages/jwt/api_jwt.py__init__+   s
   
zPyJWT.__init__r$   c                   C  s   ddddddddg dddS )NTF)verify_signature
verify_exp
verify_nbf
verify_iat
verify_aud
verify_iss
verify_sub
verify_jtirequire
strict_audenforce_minimum_key_lengthr4   r4   r4   r4   r5   r.   3   s   zPyJWT._get_default_optionsr&   c                 C  s   | j d | j dddS )Nr7   rA   F)r7   rA   r*   get)r3   r4   r4   r5   r0   C   s
   zPyJWT._get_sig_optionsc                 C  s   |d u r| j S |ddsE|dd|d< |dd|d< |dd|d< |dd|d< |dd|d< |d	d|d	< |d
d|d
< i | j |S )Nr7   Tr8   Fr9   r:   r;   r<   r=   r>   rB   r2   r4   r4   r5   r/   K   s   zPyJWT._merge_optionsTpayloaddict[str, Any]keyr'   	algorithm
str | Noneheadersdict[str, Any] | Nonejson_encodertype[json.JSONEncoder] | Nonesort_headersboolstrc           	      C  s   t |ts	td| }dD ]}t ||tr#t||  ||< qd|v r3t |d ts3td| j	|||d}| j
j||||||dS )a  Encode the ``payload`` as JSON Web Token.

        :param payload: JWT claims, e.g. ``dict(iss=..., aud=..., sub=...)``
        :type payload: dict[str, typing.Any]
        :param key: a key suitable for the chosen algorithm:

            * for **asymmetric algorithms**: PEM-formatted private key, a multiline string
            * for **symmetric algorithms**: plain string, sufficiently long for security

        :type key: str or bytes or PyJWK or :py:class:`jwt.algorithms.AllowedPrivateKeys`
        :param algorithm: algorithm to sign the token with, e.g. ``"ES256"``.
            If ``headers`` includes ``alg``, it will be preferred to this parameter.
            If ``key`` is a :class:`PyJWK` object, by default the key algorithm will be used.
        :type algorithm: str or None
        :param headers: additional JWT header fields, e.g. ``dict(kid="my-key-id")``.
        :type headers: dict[str, typing.Any] or None
        :param json_encoder: custom JSON encoder for ``payload`` and ``headers``
        :type json_encoder: json.JSONEncoder or None

        :rtype: str
        :returns: a JSON Web Token

        :raises TypeError: if ``payload`` is not a ``dict``
        zGExpecting a dict object, as JWT only supports JSON objects as payloads.)expiatnbfisszIssuer (iss) must be a string.)rI   rK   )rM   )
isinstancedict	TypeErrorcopyrC   r   r   utctimetuplerO   _encode_payloadr1   encode)	r3   rD   rF   rG   rI   rK   rM   
time_claimjson_payloadr4   r4   r5   rZ   Z   s0   
"zPyJWT.encodebytesc                 C  s   t j|d|ddS )z
        Encode a given payload to the bytes to be signed.

        This method is intended to be overridden by subclasses that need to
        encode the payload in a different way, e.g. compress the payload.
        ),:)
separatorsclszutf-8)jsondumpsrZ   )r3   rD   rI   rK   r4   r4   r5   rY      s   zPyJWT._encode_payloadr   r   jwtstr | bytesr(   
algorithmsSequence[str] | Noneverifybool | Nonedetached_payloadbytes | Noneaudiencestr | Iterable[str] | Noneissuerstr | Container[str] | Nonesubjectleewayfloat | timedeltakwargsr   c                 K  s   |rt jdt|  tdd |du rd}n|dd}|dur.||kr.t jdtdd | |}d|i}| jj	|||||d	}| 
|}| j|||||
|	d
 ||d< |S )u$  Identical to ``jwt.decode`` except for return value which is a dictionary containing the token header (JOSE Header),
        the token payload (JWT Payload), and token signature (JWT Signature) on the keys "header", "payload",
        and "signature" respectively.

        :param jwt: the token to be decoded
        :type jwt: str or bytes
        :param key: the key suitable for the allowed algorithm
        :type key: str or bytes or PyJWK or :py:class:`jwt.algorithms.AllowedPublicKeys`

        :param algorithms: allowed algorithms, e.g. ``["ES256"]``

            .. warning::

               Do **not** compute the ``algorithms`` parameter based on
               the ``alg`` from the token itself, or on any other data
               that an attacker may be able to influence, as that might
               expose you to various vulnerabilities (see `RFC 8725 §2.1
               <https://www.rfc-editor.org/rfc/rfc8725.html#section-2.1>`_). Instead,
               either hard-code a fixed value for ``algorithms``, or
               configure it in the same place you configure the
               ``key``. Make sure not to mix symmetric and asymmetric
               algorithms that interpret the ``key`` in different ways
               (e.g. HS\* and RS\*).
        :type algorithms: typing.Sequence[str] or None

        :param jwt.types.Options options: extended decoding and validation options
            Refer to :py:class:`jwt.types.Options` for more information.

        :param audience: optional, the value for ``verify_aud`` check
        :type audience: str or typing.Iterable[str] or None
        :param issuer: optional, the value for ``verify_iss`` check
        :type issuer: str or typing.Container[str] or None
        :param leeway: a time margin in seconds for the expiration check
        :type leeway: float or datetime.timedelta
        :rtype: dict[str, typing.Any]
        :returns: Decoded JWT with the JOSE Header on the key ``header``, the JWS
         Payload on the key ``payload``, and the JWS Signature on the key ``signature``.
        zypassing additional kwargs to decode_complete() is deprecated and will be removed in pyjwt version 3. Unsupported kwargs:    
stacklevelNTr7   zThe `verify` argument to `decode` does nothing in PyJWT 2.0 and newer. The equivalent is setting `verify_signature` to False in the `options` dictionary. This invocation has a mismatch between the kwarg and the option entry.)categoryrv   )rF   rf   r*   rj   )rl   rn   rq   rp   rD   )warningswarntuplekeysr   rC   DeprecationWarningr/   r1   decode_complete_decode_payload_validate_claims)r3   rd   rF   rf   r*   rh   rj   rl   rn   rp   rq   rs   r7   merged_optionssig_optionsdecodedrD   r4   r4   r5   r}      sL   9


	zPyJWT.decode_completer   c              
   C  sR   z	t |d }W n ty } ztd| |d}~ww t|ts'td|S )a  
        Decode the payload from a JWS dictionary (payload, signature, header).

        This method is intended to be overridden by subclasses that need to
        decode the payload in a different way, e.g. decompress compressed
        payloads.
        rD   zInvalid payload string: Nz-Invalid payload string: must be a json object)rb   loads
ValueErrorr   rT   rU   )r3   r   rD   er4   r4   r5   r~     s   
zPyJWT._decode_payload'AllowedPublicKeys | PyJWK | str | bytesc                 K  sV   |rt jdt|  tdd | j|||||||||	|
d
}tttt	f |d S )u  Verify the ``jwt`` token signature and return the token claims.

        :param jwt: the token to be decoded
        :type jwt: str or bytes
        :param key: the key suitable for the allowed algorithm
        :type key: str or bytes or PyJWK or :py:class:`jwt.algorithms.AllowedPublicKeys`

        :param algorithms: allowed algorithms, e.g. ``["ES256"]``
            If ``key`` is a :class:`PyJWK` object, allowed algorithms will default to the key algorithm.

            .. warning::

               Do **not** compute the ``algorithms`` parameter based on
               the ``alg`` from the token itself, or on any other data
               that an attacker may be able to influence, as that might
               expose you to various vulnerabilities (see `RFC 8725 §2.1
               <https://www.rfc-editor.org/rfc/rfc8725.html#section-2.1>`_). Instead,
               either hard-code a fixed value for ``algorithms``, or
               configure it in the same place you configure the
               ``key``. Make sure not to mix symmetric and asymmetric
               algorithms that interpret the ``key`` in different ways
               (e.g. HS\* and RS\*).
        :type algorithms: typing.Sequence[str] or None

        :param jwt.types.Options options: extended decoding and validation options
            Refer to :py:class:`jwt.types.Options` for more information.

        :param audience: optional, the value for ``verify_aud`` check
        :type audience: str or typing.Iterable[str] or None
        :param subject: optional, the value for ``verify_sub`` check
        :type subject: str or None
        :param issuer: optional, the value for ``verify_iss`` check
        :type issuer: str or typing.Container[str] or None
        :param leeway: a time margin in seconds for the expiration check
        :type leeway: float or datetime.timedelta
        :rtype: dict[str, typing.Any]
        :returns: the JWT claims
        zppassing additional kwargs to decode() is deprecated and will be removed in pyjwt version 3. Unsupported kwargs: rt   ru   )rh   rj   rl   rp   rn   rq   rD   )
rx   ry   rz   r{   r   r}   r   rU   rO   r   )r3   rd   rF   rf   r*   rh   rj   rl   rp   rn   rq   rs   r   r4   r4   r5   decode,  s*   9
zPyJWT.decodeIterable[str] | str | NoneContainer[str] | str | Nonec                 C  s  t |tr	| }|d urt |ttfstd| ||d  tjt	j
d }d|v r8|d r8| ||| d|v rG|d rG| ||| d|v rV|d	 rV| ||| |d
 r`| || |d rp| j|||ddd |d rz| || |d r| | d S d S )Nz+audience must be a string, iterable or Noner?   )tzrQ   r:   rR   r9   rP   r8   r<   r;   r@   Fstrictr=   r>   )rT   r   total_secondsrO   r   rV   _validate_required_claimsr   nowr	   utc	timestamp_validate_iat_validate_nbf_validate_exp_validate_iss_validate_audrC   _validate_sub_validate_jti)r3   rD   r*   rl   rn   rp   rq   r   r4   r4   r5   r   {  s.   
	zPyJWT._validate_claimsclaimsIterable[str]c                 C  s$   |D ]}| |d u rt|qd S N)rC   r   )r3   rD   r   claimr4   r4   r5   r     s
   zPyJWT._validate_required_claimsc                 C  sH   d|vrdS t |d tstd|dur |d|kr"tddS dS )z
        Checks whether "sub" if in the payload is valid or not.
        This is an Optional claim

        :param payload(dict): The payload which needs to be validated
        :param subject(str): The subject of the token
        subNzSubject must be a stringzInvalid subject)rT   rO   r   rC   )r3   rD   rp   r4   r4   r5   r     s   zPyJWT._validate_subc                 C  s(   d|vrdS t |dtstddS )z
        Checks whether "jti" if in the payload is valid or not
        This is an Optional claim

        :param payload(dict): The payload which needs to be validated
        jtiNzJWT ID must be a string)rT   rC   rO   r   )r3   rD   r4   r4   r5   r     s
   zPyJWT._validate_jtir   floatc                 C  B   zt |d }W n ty   tdd w ||| krtdd S )NrQ   z)Issued At claim (iat) must be an integer.z The token is not yet valid (iat))intr   r   r   )r3   rD   r   rq   rQ   r4   r4   r5   r     s   zPyJWT._validate_iatc                 C  r   )NrR   z*Not Before claim (nbf) must be an integer.z The token is not yet valid (nbf))r   r   r   r   )r3   rD   r   rq   rR   r4   r4   r5   r     s   
zPyJWT._validate_nbfc                 C  sB   zt |d }W n ty   tdd w ||| krtdd S )NrP   z/Expiration Time claim (exp) must be an integer.zSignature has expired)r   r   r   r   )r3   rD   r   rq   rP   r4   r4   r5   r     s   zPyJWT._validate_expFr   r   c                  s   |d u rd|vs|d sd S t dd|vs|d std|d  |r@t|ts-t dt ts6t d| kr>t dd S t trH g t tsQt dtdd  D r^t dt|trf|g}t fd	d|D rut d
d S )NaudzInvalid audiencezInvalid audience (strict)z&Invalid claim format in token (strict)zAudience doesn't match (strict)zInvalid claim format in tokenc                 s  s    | ]	}t |t V  qd S r   )rT   rO   ).0cr4   r4   r5   	<genexpr>'  s    z&PyJWT._validate_aud.<locals>.<genexpr>c                 3  s    | ]}| vV  qd S r   r4   )r   r   audience_claimsr4   r5   r   -  s    zAudience doesn't match)r   r   rT   rO   listanyall)r3   rD   rl   r   r4   r   r5   r     s4   




zPyJWT._validate_audc                 C  s   |d u rd S d|vrt d|d }t|tstdt|tr*||kr(tdd S z||vr3tdW d S  tyA   tdd w )NrS   z%Payload Issuer (iss) must be a stringzInvalid issuerz.Issuer param must be "str" or "Container[str]")r   rT   rO   r   rV   )r3   rD   rn   rS   r4   r4   r5   r   0  s,   

zPyJWT._validate_issr   )r*   r+   r,   r-   )r,   r$   )r,   r&   )r*   r+   r,   r$   )rD   rE   rF   r'   rG   rH   rI   rJ   rK   rL   rM   rN   r,   rO   )NN)rD   rE   rI   rJ   rK   rL   r,   r]   )	r   NNNNNNNr   )rd   re   rF   r(   rf   rg   r*   r+   rh   ri   rj   rk   rl   rm   rn   ro   rp   rH   rq   rr   rs   r   r,   rE   )r   rE   r,   rE   )rd   re   rF   r   rf   rg   r*   r+   rh   ri   rj   rk   rl   rm   rp   rH   rn   ro   rq   rr   rs   r   r,   rE   )NNNr   )rD   rE   r*   r$   rl   r   rn   r   rp   rH   rq   rr   r,   r-   )rD   rE   r   r   r,   r-   )rD   rE   rp   rH   r,   r-   )rD   rE   r,   r-   )rD   rE   r   r   rq   r   r,   r-   )rD   rE   rl   rm   r   rN   r,   r-   )rD   rE   rn   r   r,   r-   )__name__
__module____qualname__r6   staticmethodr.   r0   r/   r   rZ   rY   r}   r~   r   r   r   r   r   r   r   r   r   r   r4   r4   r4   r5   r)   *   sd    
E
nS
*




2r)   )<
__future__r   rb   osrx   calendarr   collections.abcr   r   r   r   r   r	   typingr
   r   r   r   api_jwsr   r   r   
exceptionsr   r   r   r   r   r   r   r   r   r   rN   getenvsysversion_infor    typing_extensionsrf   r!   r"   api_jwkr#   typesr$   r%   r&   rO   r]   r'   __annotations__r(   r)   _jwt_global_objr1   rZ   r}   r   r4   r4   r4   r5   <module>   s>    ,
    $
