o
    zi                     @   s   d Z ddlmZmZmZmZ ddlmZ ddlZ	ddl
mZ ddlmZ ddlmZ ddlmZ G d	d
 d
eZG dd deZdS )zG
BasePredictionWriter
====================

Aids in saving predictions
    )AnyLiteralOptionalSequence)overrideN)Callback)LightningEnum)MisconfigurationException)is_param_in_hook_signaturec                   @   s<   e Zd ZdZdZdZedefddZedefddZ	d	S )
WriteIntervalbatchepochbatch_and_epochreturnc                 C      | | j | jfv S N)BATCHBATCH_AND_EPOCHself r   a/home/ubuntu/.local/lib/python3.10/site-packages/pytorch_lightning/callbacks/prediction_writer.pyon_batch%      zWriteInterval.on_batchc                 C   r   r   )EPOCHr   r   r   r   r   on_epoch)   r   zWriteInterval.on_epochN)
__name__
__module____qualname__r   r   r   propertyboolr   r   r   r   r   r   r       s    r   c                   @   s   e Zd ZdZdded ddfddZed	d
dddeddfddZd	d
ddde	de
ee  de	dededdfddZd	d
dddee	 dee	 ddf
ddZe	d d	d
ddde	de	dededdfddZed!ddZdS )"BasePredictionWritera	  Base class to implement how the predictions should be stored.

    Args:
        write_interval: When to write.

    Example::

        import torch
        from pytorch_lightning.callbacks import BasePredictionWriter

        class CustomWriter(BasePredictionWriter):

            def __init__(self, output_dir, write_interval):
                super().__init__(write_interval)
                self.output_dir = output_dir

            def write_on_batch_end(
                self, trainer, pl_module, prediction, batch_indices, batch, batch_idx, dataloader_idx
            ):
                torch.save(prediction, os.path.join(self.output_dir, dataloader_idx, f"{batch_idx}.pt"))

            def write_on_epoch_end(self, trainer, pl_module, predictions, batch_indices):
                torch.save(predictions, os.path.join(self.output_dir, "predictions.pt"))


        pred_writer = CustomWriter(output_dir="pred_path", write_interval="epoch")
        trainer = Trainer(callbacks=[pred_writer])
        model = BoringModel()
        trainer.predict(model, return_predictions=False)

    Example::

        # multi-device inference example

        import torch
        from pytorch_lightning.callbacks import BasePredictionWriter

        class CustomWriter(BasePredictionWriter):

            def __init__(self, output_dir, write_interval):
                super().__init__(write_interval)
                self.output_dir = output_dir

            def write_on_epoch_end(self, trainer, pl_module, predictions, batch_indices):
                # this will create N (num processes) files in `output_dir` each containing
                # the predictions of it's respective rank
                torch.save(predictions, os.path.join(self.output_dir, f"predictions_{trainer.global_rank}.pt"))

                # optionally, you can also save `batch_indices` to get the information about the data index
                # from your prediction data
                torch.save(batch_indices, os.path.join(self.output_dir, f"batch_indices_{trainer.global_rank}.pt"))


        # or you can set `write_interval="batch"` and override `write_on_batch_end` to save
        # predictions at batch level
        pred_writer = CustomWriter(output_dir="pred_path", write_interval="epoch")
        trainer = Trainer(accelerator="gpu", strategy="ddp", devices=8, callbacks=[pred_writer])
        model = BoringModel()
        trainer.predict(model, return_predictions=False)

    r   write_interval)r   r   r   r   Nc                 C   s4   |t tvrtddd tD  dt|| _d S )Nz"`write_interval` should be one of c                 S   s   g | ]}|j qS r   )value).0ir   r   r   
<listcomp>o   s    z1BasePredictionWriter.__init__.<locals>.<listcomp>.)listr   r	   interval)r   r"   r   r   r   __init__m   s   zBasePredictionWriter.__init__trainer
pl.Trainer	pl_modulepl.LightningModulestagec                 C   s   t |jdddrtdd S )Ndataloader_iterT)explicitzHThe `PredictionWriterCallback` does not support using `dataloader_iter`.)r
   predict_stepNotImplementedError)r   r+   r-   r/   r   r   r   setupr   s   zBasePredictionWriter.setup
predictionbatch_indices	batch_idxdataloader_idxc                 C      t  )z0Override with the logic to write a single batch.r3   )r   r+   r-   r5   r6   r   r7   r8   r   r   r   write_on_batch_endw   s   z'BasePredictionWriter.write_on_batch_endpredictionsc                 C   r9   )z-Override with the logic to write all batches.r:   )r   r+   r-   r<   r6   r   r   r   write_on_epoch_end   s   z'BasePredictionWriter.write_on_epoch_endr   outputsc              	   C   s.   | j jsd S |jj}| ||||||| d S r   )r)   r   predict_loopcurrent_batch_indicesr;   )r   r+   r-   r>   r   r7   r8   r6   r   r   r   on_predict_batch_end   s   
z)BasePredictionWriter.on_predict_batch_endc                 C   s,   | j jsd S |jj}| |||jj| d S r   )r)   r   r?   epoch_batch_indicesr=   r<   )r   r+   r-   rB   r   r   r   on_predict_epoch_end   s   z)BasePredictionWriter.on_predict_epoch_end)r   )r   )r+   r,   r-   r.   r   N)r   r   r   __doc__r   r*   r   strr4   r   r   r   intr;   r=   rA   rC   r   r   r   r   r!   .   sh    >
	


r!   )rD   typingr   r   r   r   typing_extensionsr   pytorch_lightningpl$pytorch_lightning.callbacks.callbackr   pytorch_lightning.utilitiesr   &pytorch_lightning.utilities.exceptionsr	   +pytorch_lightning.utilities.signature_utilsr
   r   r!   r   r   r   r   <module>   s   