o
    聱i%                     @  s   d dl mZ d dlZd dlZd dlmZ d dlmZmZ d dl	m
Z
 G dd de
ZG dd	 d	ZG d
d dZG dd deZG dd dZdS )    )annotationsNQueue)TYPE_CHECKINGOptional)BaseStreamerc                   @  sH   e Zd ZdZ		ddd	d
ZdddZddddZdd ZdddZdS )AudioStreamera  
    Audio streamer that stores audio chunks in queues for each sample in the batch.
    This allows streaming audio generation for multiple samples simultaneously.
    
    Parameters:
        batch_size (`int`):
            The batch size for generation
        stop_signal (`any`, *optional*):
            The signal to put in the queue when generation ends. Defaults to None.
        timeout (`float`, *optional*):
            The timeout for the audio queue. If `None`, the queue will block indefinitely.
    N
batch_sizeintstop_signalOptional[any]timeoutOptional[float]c                 C  sD   || _ || _|| _dd t|D | _dd t|D | _i | _d S )Nc                 S  s   g | ]}t  qS  r   .0_r   r   C/home/ubuntu/VibeVoice-finetuning/src/vibevoice/modular/streamer.py
<listcomp>&   s    z*AudioStreamer.__init__.<locals>.<listcomp>c                 S  s   g | ]}d qS )Fr   r   r   r   r   r   '   s    )r	   r   r   rangeaudio_queuesfinished_flagssample_indices_mapselfr	   r   r   r   r   r   __init__   s   
zAudioStreamer.__init__audio_chunkstorch.Tensorsample_indicesc                 C  sX   t |D ]%\}}| }|| jk r)| j| s)||   }| j| j|| jd qdS )a  
        Receives audio chunks and puts them in the appropriate queues.
        
        Args:
            audio_chunks: Tensor of shape (num_samples, ...) containing audio chunks
            sample_indices: Tensor indicating which samples these chunks belong to
        r   N)		enumerateitemr	   r   detachcpur   putr   r   r   r   i
sample_idxidxaudio_chunkr   r   r   r$   *   s   zAudioStreamer.putOptional[torch.Tensor]c                 C  s   |du r$t | jD ]}| j| s!| j| j| j| jd d| j|< q	dS |D ](}t|r1|	 n|}|| jk rN| j| sN| j| j| j| jd d| j|< q&dS )z
        Signals the end of generation for specified samples or all samples.
        
        Args:
            sample_indices: Optional tensor of sample indices to end. If None, ends all.
        Nr   T)
r   r	   r   r   r$   r   r   torch	is_tensorr!   )r   r   r(   r'   r   r   r   end9   s   


zAudioStreamer.endc                 C     t | S )z4Returns an iterator over the batch of audio streams.)AudioBatchIteratorr   r   r   r   __iter__N      zAudioStreamer.__iter__r'   c                 C  s*   || j krtd| d| j  t| |S )z+Get the audio stream for a specific sample.Sample index  exceeds batch size )r	   
ValueErrorAudioSampleIterator)r   r'   r   r   r   
get_streamR   s   

zAudioStreamer.get_streamNNr	   r
   r   r   r   r   r   r   r   r   Nr   r*   r'   r
   )	__name__
__module____qualname____doc__r   r$   r-   r1   r7   r   r   r   r   r      s    
r   c                   @  s*   e Zd ZdZdddZdd	 Zd
d ZdS )r6   z2Iterator for a single audio stream from the batch.streamerr   r'   r
   c                 C  s   || _ || _d S r;   )rB   r'   )r   rB   r'   r   r   r   r   \   s   
zAudioSampleIterator.__init__c                 C     | S r;   r   r0   r   r   r   r1   `      zAudioSampleIterator.__iter__c                 C  s0   | j j| j j| j jd}|| j jkrt |S )Nr   )rB   r   r'   getr   r   StopIteration)r   valuer   r   r   __next__c   s   zAudioSampleIterator.__next__N)rB   r   r'   r
   r>   r?   r@   rA   r   r1   rH   r   r   r   r   r6   Y   
    
r6   c                   @  s*   e Zd ZdZdddZdd Zdd	 Zd
S )r/   z?Iterator that yields audio chunks for all samples in the batch.rB   r   c                 C     || _ tt|j| _d S r;   rB   setr   r	   active_samplesr   rB   r   r   r   r   m      zAudioBatchIterator.__init__c                 C  rC   r;   r   r0   r   r   r   r1   q   rD   zAudioBatchIterator.__iter__c                 C  s   | j st i }t }| j D ]#}z| jj| jdd}|| jjkr'|| n|||< W q   Y q|  j |8  _ |r=|S | j rMdd l}|	d | 
 S t )NF)blockr   g{Gz?)rN   rF   rM   rB   r   rE   r   addtimesleeprH   )r   batch_chunkssamples_to_remover(   rG   rS   r   r   r   rH   t   s*   

zAudioBatchIterator.__next__N)rB   r   rI   r   r   r   r   r/   j   rJ   r/   c                      sP   e Zd ZdZ		dd fd	d
ZdddZddddZdddZdd Z  Z	S )AsyncAudioStreamerzC
    Async version of AudioStreamer for use in async contexts.
    Nr	   r
   r   r   r   r   c                   s2   t  ||| dd t|D | _t | _d S )Nc                 S  s   g | ]}t  qS r   )asyncior   r   r   r   r   r      s    z/AsyncAudioStreamer.__init__.<locals>.<listcomp>)superr   r   r   rX   get_running_looploopr   	__class__r   r   r      s   zAsyncAudioStreamer.__init__r   r   r   c                 C  sX   t |D ]%\}}| }|| jk r)| j| s)||   }| j| j| j	| qdS )z1Put audio chunks in the appropriate async queues.N)
r    r!   r	   r   r"   r#   r[   call_soon_threadsafer   
put_nowaitr%   r   r   r   r$      s   zAsyncAudioStreamer.putr*   c                 C  sf   |du r
t | j}ndd |D }|D ]}|| jk r0| j| s0| j| j| j| j d| j|< qdS )z3Signal the end of generation for specified samples.Nc                 S  s"   g | ]}t |r| n|qS r   )r+   r,   r!   )r   sr   r   r   r      s   " z*AsyncAudioStreamer.end.<locals>.<listcomp>T)r   r	   r   r[   r^   r   r_   r   )r   r   indices_to_endr(   r   r   r   r-      s   
zAsyncAudioStreamer.endr'   c                 C sN   || j krtd| d| j  	 | j|  I dH }|| jkr#dS |V  q)z8Get async iterator for a specific sample's audio stream.r3   r4   TN)r	   r5   r   rE   r   )r   r'   rG   r   r   r   r7      s   

zAsyncAudioStreamer.get_streamc                 C  r.   )z1Returns an async iterator over all audio streams.)AsyncAudioBatchIteratorr0   r   r   r   	__aiter__   r2   zAsyncAudioStreamer.__aiter__r8   r9   r:   r;   r<   r=   )
r>   r?   r@   rA   r   r$   r-   r7   rc   __classcell__r   r   r\   r   rW      s    


rW   c                   @  s2   e Zd ZdZdddZdd Zdd	 Zd
d ZdS )rb   z)Async iterator for batch audio streaming.rB   rW   c                 C  rK   r;   rL   rO   r   r   r   r      rP   z AsyncAudioBatchIterator.__init__c                 C  rC   r;   r   r0   r   r   r   rc      rD   z!AsyncAudioBatchIterator.__aiter__c           	   	     s    j st i }t } fdd j D }tj| tj jjdI d H \}}|D ]}|	  q*|
 D ]*\}}||v r_z|I d H }| jjkrO|| n|||< W q5 tjy^   Y q5w q5  j |8  _ |rk|S  j ru  I d H S t )Nc                   s   i | ]}|t  |qS r   )rX   create_task
_get_chunk)r   r(   r0   r   r   
<dictcomp>   s    z5AsyncAudioBatchIterator.__anext__.<locals>.<dictcomp>)return_whenr   )rN   StopAsyncIterationrM   rX   waitvaluesFIRST_COMPLETEDrB   r   cancelitemsr   rR   CancelledError	__anext__)	r   rU   rV   tasksdonependingtaskr(   rG   r   r0   r   rp      sB   



z!AsyncAudioBatchIterator.__anext__c                   s   | j j|  I dH S )z,Helper to get a chunk from a specific queue.N)rB   r   rE   )r   r(   r   r   r   rf     s   z"AsyncAudioBatchIterator._get_chunkN)rB   rW   )r>   r?   r@   rA   r   rc   rp   rf   r   r   r   r   rb      s    
.rb   )
__future__r   r+   rX   queuer   typingr   r   transformers.generationr   r   r6   r/   rW   rb   r   r   r   r   <module>   s    L,8