o
    T۷i7                     @  s  d dl mZ d dlZd dlZd dlZd dlmZ d dlmZm	Z	 ddl
mZmZmZmZ ddlmZ ddlmZmZmZmZmZ dd	lmZmZ dd
lmZmZ er`ddl
mZmZ ddlmZ e  Z!G dd dZ"e" Z#e#j$Z$e#j%Z%e#j&Z&e#j'Z'e#j(Z(e#j)Z)e#j*Z*dS )    )annotationsN)Sequence)TYPE_CHECKINGAny   )	Algorithmget_default_algorithms
has_cryptorequires_cryptography)PyJWK)DecodeErrorInvalidAlgorithmErrorInvalidKeyErrorInvalidSignatureErrorInvalidTokenError)base64url_decodebase64url_encode)InsecureKeyLengthWarningRemovedInPyjwt3Warning)AllowedPrivateKeysAllowedPublicKeys)
SigOptionsc                   @  s   e Zd ZU dZ		dPdQd	d
ZedRddZdSddZdTddZdUddZ	dVddZ
eddddfdWd+d,Z	-			dXdYd5d6Z	-			dXdZd8d9Zd[d:d;Zd\d=d>Z	-	d]d^dBdCZdDhZdEedF< ddGd_dIdJZd`dLdMZdadNdOZdS )bPyJWSJWTN
algorithmsSequence[str] | NoneoptionsSigOptions | NonereturnNonec                 C  st   t  | _|d urt|nt| j| _t| j D ]}|| jvr$| j|= q|  | _|d ur8i | j|| _d S d S )N)r   _algorithmsset_valid_algslistkeys_get_default_optionsr   )selfr   r   key r(   A/home/ubuntu/vllm_env/lib/python3.10/site-packages/jwt/api_jws.py__init__$   s   

zPyJWS.__init__r   c                   C  s
   dddS )NTF)verify_signatureenforce_minimum_key_lengthr(   r(   r(   r(   r)   r%   7   s   
zPyJWS._get_default_optionsalg_idstralg_objr   c                 C  s>   || j v r	tdt|tstd|| j |< | j| dS )z
        Registers a new Algorithm for use when creating and verifying tokens.

        :param str alg_id: the ID of the Algorithm
        :param alg_obj: the Algorithm object
        :type alg_obj: Algorithm
        z Algorithm already has a handler.z!Object is not of type `Algorithm`N)r    
ValueError
isinstancer   	TypeErrorr"   add)r&   r-   r/   r(   r(   r)   register_algorithm;   s   


zPyJWS.register_algorithmc                 C  s*   || j vr	td| j |= | j| dS )z
        Unregisters an Algorithm for use when creating and verifying tokens
        :param str alg_id: the ID of the Algorithm
        :raises KeyError: if algorithm is not registered.
        zJThe specified algorithm could not be removed because it is not registered.N)r    KeyErrorr"   remove)r&   r-   r(   r(   r)   unregister_algorithmL   s   
zPyJWS.unregister_algorithm	list[str]c                 C  s
   t | jS )zh
        Returns a list of supported values for the `alg` parameter.

        :rtype: list[str]
        )r#   r"   )r&   r(   r(   r)   get_algorithms[   s   
zPyJWS.get_algorithmsalg_namec              
   C  sN   z| j | W S  ty& } zts|tv rtd| d|td|d}~ww )a/  
        For a given string name, return the matching Algorithm object.

        Example usage:
        >>> jws_obj = PyJWS()
        >>> jws_obj.get_algorithm_by_name("RS256")

        :param alg_name: The name of the algorithm to retrieve
        :type alg_name: str
        :rtype: Algorithm
        zAlgorithm 'z9' could not be found. Do you have cryptography installed?Algorithm not supportedN)r    r5   r	   r
   NotImplementedError)r&   r:   er(   r(   r)   get_algorithm_by_namec   s   

zPyJWS.get_algorithm_by_nameFTpayloadbytesr'   (AllowedPrivateKeys | PyJWK | str | bytes	algorithm
str | Noneheadersdict[str, Any] | Nonejson_encodertype[json.JSONEncoder] | Noneis_payload_detachedboolsort_headersc                 C  s  g }|t u rt|tr|j}	nd}	n|d u r"t|tr|j}	nd}	n|}	|r<|d}
|
r1|d }	|d}|du r<d}| j|	d}|rP| j|dd || |d	 sW|d	= |r^d|d< nd|v re|d= tj	|d
||d
 }|t| |r||}nt|}|| d|}| |	}t|tr|j}||}||}|r| jddrt|tj|tdd |||}|t| |rd|d< d|}|dS )NHS256nonealgb64FT)typrM   encodingrO   ),:)
separatorscls	sort_keys   .r,      
stacklevel    r   utf-8)_ALGORITHM_UNSETr1   r   algorithm_nameget
header_typ_validate_headersupdatejsondumpsencodeappendr   joinr>   r'   prepare_keycheck_key_lengthr   r   warningswarnr   signdecode)r&   r?   r'   rB   rD   rF   rH   rJ   segments
algorithm_headers_algheaders_b64headerjson_headermsg_payloadsigning_inputr/   key_length_msg	signatureencoded_stringr(   r(   r)   re   x   sj   














zPyJWS.encode jwtstr | bytes'AllowedPublicKeys | PyJWK | str | bytesdetached_payloadbytes | Nonekwargsdict[str, Any]c                 K  s   |rt jdt|  tdd |d u r| j}ni | j|}|d }|r1|s1t|ts1td| 	|\}	}
}}| 
| |dddu r^|d u rOtd	|}	d
|
d
dd |	g}
|ri| |
|||| |	||dS )Nzypassing additional kwargs to decode_complete() is deprecated and will be removed in pyjwt version 3. Unsupported kwargs: rX   rY   r+   z\It is required that you pass in a value for the "algorithms" argument when calling decode().rN   TFzIt is required that you pass in a value for the "detached_payload" argument to decode a message having the b64 header set to false.rW   r   r   )r?   rr   rw   )rj   rk   tupler$   r   r   r1   r   r   _loadra   r_   rg   rsplit_verify_signature)r&   rz   r'   r   r   r}   r   merged_optionsr+   r?   ru   rr   rw   r(   r(   r)   decode_complete   s>   	

zPyJWS.decode_completer   c                 K  s>   |rt jdt|  tdd | j|||||d}|d S )Nzppassing additional kwargs to decode() is deprecated and will be removed in pyjwt version 3. Unsupported kwargs: rX   rY   )r}   r?   )rj   rk   r   r$   r   r   )r&   rz   r'   r   r   r}   r   decodedr(   r(   r)   rm     s   	

zPyJWS.decodec                 C  s   |  |d }| | |S )zReturns back the JWT header parameters as a `dict`

        Note: The signature is not verified so the header parameters
        should not be fully trusted until signature verification is complete
        rX   )r   ra   )r&   rz   rD   r(   r(   r)   get_unverified_header  s   
zPyJWS.get_unverified_header*tuple[bytes, bytes, dict[str, Any], bytes]c              
   C  sl  t |tr
|d}t |tstdt z|dd\}}|dd\}}W n ty9 } ztd|d }~ww zt|}W n t	t
jfyT } ztd|d }~ww zt|}W n typ }	 ztd|	 |	d }	~	ww t |tsztdzt|}
W n t	t
jfy } ztd	|d }~ww zt|}W n t	t
jfy } ztd
|d }~ww |
|||fS )Nr\   z$Invalid token type. Token must be a rW   r   zNot enough segmentszInvalid header paddingzInvalid header string: z,Invalid header string: must be a json objectzInvalid payload paddingzInvalid crypto padding)r1   r.   re   r@   r   r   splitr0   r   r2   binasciiErrorrc   loadsdict)r&   rz   ru   crypto_segmentheader_segmentpayload_segmenterrheader_datarr   r=   r?   rw   r(   r(   r)   r   &  sL   







zPyJWS._loadru   rr   rw   c              
   C  s   |d u rt |tr|jg}z|d }W n ty   tdd w |r*|d ur.||vr.tdt |tr:|j}|j}nz| |}W n tyR }	 ztd|	d }	~	ww |	|}|
|}
|
rr| jddrjt|
tj|
tdd ||||s}td	d S )
NrM   zAlgorithm not specifiedz&The specified alg value is not allowedr;   r,   F   rY   zSignature verification failed)r1   r   r^   r5   r   r   r'   r>   r<   rh   ri   r   r_   r   rj   rk   r   verifyr   )r&   ru   rr   rw   r'   r   rM   r/   prepared_keyr=   rv   r(   r(   r)   r   L  s6   




zPyJWS._verify_signaturerN   zset[str]_supported_critrP   rQ   c                C  s8   d|v r|  |d  |sd|v r| | d S d S d S )Nkidcrit)_validate_kid_validate_crit)r&   rD   rQ   r(   r(   r)   ra   u  s
   zPyJWS._validate_headersr   c                 C  s   t |ts	tdd S )Nz(Key ID header parameter must be a string)r1   r.   r   )r&   r   r(   r(   r)   r   }  s   
zPyJWS._validate_kidc                 C  sv   |d }t |trt|dkrtd|D ]#}t |ts td|| jvr,td| ||vr8td| dqd S )Nr   r   z/Invalid 'crit' header: must be a non-empty listz-Invalid 'crit' header: values must be stringsz Unsupported critical extension: zCritical extension 'z' is missing from headers)r1   r#   lenr   r.   r   )r&   rD   r   extr(   r(   r)   r     s   


zPyJWS._validate_crit)NN)r   r   r   r   r   r   )r   r   )r-   r.   r/   r   r   r   )r-   r.   r   r   )r   r8   )r:   r.   r   r   )r?   r@   r'   rA   rB   rC   rD   rE   rF   rG   rH   rI   rJ   rI   r   r.   )ry   NNN)rz   r{   r'   r|   r   r   r   r   r}   r~   r   r   r   r   )rz   r{   r'   r|   r   r   r   r   r}   r~   r   r   r   r   )rz   r{   r   r   )rz   r{   r   r   )ry   N)ru   r@   rr   r   rw   r@   r'   r|   r   r   r   r   )rD   r   rQ   rI   r   r   )r   r   r   r   )rD   r   r   r   )__name__
__module____qualname__r`   r*   staticmethodr%   r4   r7   r9   r>   r]   re   r   rm   r   r   r   r   __annotations__ra   r   r   r(   r(   r(   r)   r   !   sJ   
 



]6

+'
r   )+
__future__r   r   rc   rj   collections.abcr   typingr   r   r   r   r   r	   r
   api_jwkr   
exceptionsr   r   r   r   r   utilsr   r   r   r   r   r   typesr   objectr]   r   _jws_global_objre   r   rm   r4   r7   r>   r   r(   r(   r(   r)   <module>   s4      q
