o
    i+o                     @  s<  U d Z ddlmZ ddlZddlZddlZddlZddlZddlZddl	Z	ddl
Z
ddlmZmZ ddlmZmZmZ ddlmZ ddlZddlmZ e  ejejdd	 ed
ZdZee jjZdZ ed d Z!g dZ"h dZ#de$d< d#d$ddZ%eG dd dZ&G dd dZ'd d! Z(e)d"kre(  dS dS )%a  Fleet deployer: provision N GPU instances on Vast.ai, bootstrap, start production workers.

Usage:
    python scripts/deploy_fleet.py --num-gpus 5 --max-price 0.55
    python scripts/deploy_fleet.py --num-gpus 5 --dry-run          # show what would be rented
    python scripts/deploy_fleet.py --status                        # check running fleet
    python scripts/deploy_fleet.py --destroy-all                   # tear down everything
    )annotationsN)ThreadPoolExecutoras_completed)	dataclassfieldasdict)Path)load_dotenvz'%(asctime)s [%(levelname)s] %(message)s)levelformatfleetzhttps://console.vast.ai/api/v0z%bharathkumar192/codecbench-sft:latestresultszfleet_state.json)	RTX 4090zRTX 3090zRTX 3090 TiL40SL40zRTX 4080z	RTX 4080Sz	RTX A6000z	RTX A5000>    _  w  <  x  ̦    zset[int]BAD_MACHINES vram_gbfloatgpu_namestrreturnintc                   s6   | dkrdS t  fdddD }|r| dkrdS dS )	N(      c                 3  s    | ]}| v V  qd S N ).0kr   r"   scripts/deploy_fleet.py	<genexpr>/   s    z"pick_batch_size.<locals>.<genexpr>)40903090A100         )any)r   r   high_endr"   r%   r&   pick_batch_size,   s   r0   c                   @  sv   e Zd ZU ded< ded< ded< ded< dZded	< d
Zded< dZded< dZded< d
Zded< d
Z	ded< dS )InstanceInfor   instance_idr   r   
machine_idr   price_per_hourr   ssh_hostr   ssh_portcreatedstatus   
batch_sizer   
worker_pidN)
__name__
__module____qualname____annotations__r5   r6   r8   r:   r   r;   r"   r"   r"   r&   r1   5   s   
 r1   c                   @  s   e Zd ZdLdMddZdNdOddZdPddZdQddZdRdSddZdTdSd d!ZdUdVd$d%Z	dWd&d'Z
	(dXdYd+d,ZdWd-d.ZdWd/d0ZdQd1d2ZdZd5d6Zd[d7d8Zd[d9d:Zd\d;d<Zd]d?d@Zd^d_dEdFZd^d`dGdHZd[dIdJZdKS )aFleetDeployer皙?api_keyr   	max_pricer   c                 C  s:   || _ || _d| dd| _g | _t d d | _d S )NzBearer zapplication/json)AuthorizationzContent-Typez.ssh
id_ed25519)rB   rC   headers	instancesr   homeidentity_file)selfrB   rC   r"   r"   r&   __init__D   s
   zFleetDeployer.__init__   r   limitr   r   
list[dict]c              	   C  s   |dddiddiddid|id| j iddggd	}td
D ]:}tjt d| j|dd}|jdkr;td|d   q|	  |
 dg }dd |D }t|dd d  S |	  g S )N	on-demandeqTFr-   lte	dph_totalasc)rM   typerentablerentednum_gpusr   rR   order   	/bundles/   rF   jsontimeout  r9   offersc                 S  sZ   g | ])}| d tvr+t| dddkr+t| dddkr+t| dddkr|qS )r3   reliabilityr   g(\?	inet_down2   
disk_spacer   )getr   r   r#   or"   r"   r&   
<listcomp>Z   s    z/FleetDeployer.search_offers.<locals>.<listcomp>c                 S  s&   | d |  dp	d |  dpd fS )NrR   rb   r   ra   )re   rg   r"   r"   r&   <lambda>c   s   & z-FleetDeployer.search_offers.<locals>.<lambda>key)rC   rangerequestspostBASE_URLrF   status_codetimesleepraise_for_statusr]   re   sorted)rJ   r   rM   bodyattemptrr`   filteredr"   r"   r&   search_offersK   s,   

zFleetDeployer.search_offersrW   c           
      C  s.  i }t D ]}| j|dd}|D ]}||d< q|||< td|t| qg }t }dD ]'}||g D ]}t||kr= n|d}||v rGq3|| || q3q+g }	t D ]}|dv r^qW|		||g  qW|	j
dd d	 |	D ]}t||kr| n|d}||v rqr|| || qr|d
| S )zUPick N offers prioritizing high-throughput GPUs (L40S, 4090) then filling with value.rc   )rM   	_gpu_typez  %s: %d offers available)r   r   r   r3   c                 S  s   | d S )NrR   r"   ri   r"   r"   r&   rj      s    z0FleetDeployer.pick_best_offers.<locals>.<lambda>rk   N)GPU_PREFERENCESrz   loggerinfolensetre   appendaddextendsort)
rJ   rW   per_typegpur`   rg   pickedused_machinesmid	remainingr"   r"   r&   pick_best_offersh   sD   




zFleetDeployer.pick_best_offersofferdictInstanceInfo | Nonec                 C  s  t d }i }d}d}| r?|  D ]*}| }|r>|ds>d|v r>|dd\}}|||< |dkr8|}q|dkr>|}q|dd	d
 }	|dd}
t|	|
}d| d}t	ddd|dd ||d}|rx|rxd|d< ||d< ||d< d }t
dD ]"}tjt d|d  d| j|d}|jdkrtd|d   q~ |r|jstd|d |r|jd d  d S d d S | d }|sd S |dd	d
 }	t||dd!|d"d	|d#d	|	t|	|ddd$}td%||j|j|j |S )&Nz.envr   #=r-   DOCKER_USERNAME
DOCKER_PATgpu_ramr      r   a  mkdir -p /tmp/pipeline; cd /app && python3 - <<'PY'
import os
keys = [
    'HF_TOKEN','R2_ENDPOINT_URL','R2_BUCKET_DESTINATION','R2_ACCESS_KEY_ID',
    'R2_SECRET_ACCESS_KEY','ACCOUNT_ID','S3_API','DATABASE_URL','VAST_API'
]
with open('/app/.env', 'w') as f:
    for k in keys:
        f.write(f"{k}={os.environ.get(k, '')}\n")
PY
python3 -m codecbench.pipeline.cli sft-run --batch-size z6 --offer-id vast_$CONTAINER_ID >/tmp/worker.log 2>&1 &<   
ssh_directzfleet-r   )imagediskruntypelabelenvonstartzhttps://index.docker.io/v1/docker_login_repodocker_login_userdocker_login_passrY   z/asks/id/)rF   r]   r_   zCreate failed for offer %s: %s   zno responsenew_contractunknownr3   rR   )r2   r   r3   r4   r   r:   z*Created instance %d: %s @ $%.3f/hr (BS=%d))PROJECT_ROOTexists	read_text
splitlinesstrip
startswithsplitre   r0   DOCKER_IMAGErm   rn   putrp   rF   rq   rr   rs   okr}   errortextr]   r1   r~   r   r4   r:   )rJ   r   env_fileenv_varsdocker_user
docker_patliner$   vr   r   bsr   create_bodyrx   rw   iidr~   r"   r"   r&   create_instance   s   


 


zFleetDeployer.create_instanceX  r~   r1   r^   boolc           	   	   C  sV  t   }t   | |k rtjt d|j d| jdd}|js%t d q| d| }t	|t
r<|r:|d ni }|dpBd	}|d
pId	}|dkrg|drg|drg|d |_|d |_d|_dS d|v so|dv rtd|j||d d  d|_dS tt   | }td|j|pd||d d  t d t   | |k sd|_dS )N/instances/r   r[   rF   r^      rG   r   actual_statusr   
status_msgrunningr5   r6   TError>   deadexitedofflinezInstance %d failed: %s / %s   failedFz  Instance %d: %s [%ds] %spendingP   r^   )rr   rn   re   rp   r2   rF   r   rs   r]   
isinstancelistr5   r6   r8   r}   r   r   r~   )	rJ   r~   r^   t0rx   instactualmsgelapsedr"   r"   r&   wait_instance_ready   s<   




z!FleetDeployer.wait_instance_ready   c                 C  s   t   }t   | |k rKz(tjddddddt| jdt|jd|j dgd	d	d
d}|jdkr3W d	S W n	 ty=   Y nw t 	d t   | |k sdS )Nssh-oStrictHostKeyChecking=noConnectTimeout=10-i-proot@zecho OKTrL   capture_outputr   r^   r   
   F)
rr   
subprocessrunr   rI   r6   r5   
returncode	Exceptionrs   )rJ   r~   r^   r   rx   r"   r"   r&   wait_ssh  s$   
 

zFleetDeployer.wait_ssh  cmdc                 C  s   ddt | jdddddt |jd|j |g}z%tj|dd|d	}|jd
kr:td|j	|j
|j dd   W dS W dS  tjyN   td|j	 Y dS w )Nr   r   r   r   zServerAliveInterval=30r   r   Tr   r   r^   r   zSSH cmd failed on %d (%s): %sFzSSH cmd timed out on %d)r   rI   r6   r5   r   r   r   r}   r   r2   r   stderrr   TimeoutExpired)rJ   r~   r   r^   fullrx   r"   r"   r&   run_ssh  s   
zFleetDeployer.run_sshc                 C  sd   t td }g d}ddddd| j d|j g||d	|j d
}tj|dddd}|jdkS )Nr   )
z--exclude=venv/z--exclude=repos/z--exclude=data/z--exclude=results/z--exclude=metafiles/z--exclude=__pycache__/z--exclude=.git/z--exclude=*.pycz--exclude=.cursor/z--exclude=models/rsyncz-avzz--timeout=60z-ezssh -i z  -o StrictHostKeyChecking=no -p r   z:/app/Tx   r   r   )r   r   rI   r6   r5   r   r   r   )rJ   r~   srcexcludesr   rx   r"   r"   r&   	sync_code!  s   
zFleetDeployer.sync_coder   log_filepoll_intervalc                 C  s  d| d| d}ddt | jddddd	t |jd
|j |g}z tj|dddd}|jdkr?td|j	|j
dd  W dS W n ty[ }	 ztd|j	|	 W Y d}	~	dS d}	~	ww t }
t |
 |k rt| z`tjddt | jddddd	t |jd
|j d| dgdddd}|jdkrd|jv rdd |j dD }|r|d dd  }|dkrW dS td||j	|jdd  W dS W n	 ty   Y nw t |
 |k shtd|j	| dS ) zGRun a command via nohup, poll for completion. Survives SSH proxy drops.znohup bash -c 'z; echo __DONE_EXIT_$?' > z 2>&1 & echo $!r   r   r   r   r   r   r   Tr[   r   r   z'Failed to launch detached cmd on %d: %sNr   FzSSH launch error on %d: %sztail -5 z 2>/dev/nullrL   __DONE_EXIT_c                 S  s   g | ]}d |v r|qS )r   r"   )r#   lr"   r"   r&   rh   M      z2FleetDeployer.run_ssh_detached.<locals>.<listcomp>
0z-Detached cmd failed (exit=%s) on %d. Tail:
%sr   z&Detached cmd timed out on %d after %ds)r   rI   r6   r5   r   r   r   r}   r   r2   r   r   rr   rs   stdoutr   r   )rJ   r~   r   r   r^   r   wrapperr   rx   er   check	exit_linecoder"   r"   r&   run_ssh_detached1  sX   


zFleetDeployer.run_ssh_detachedc                 C  sr   t d|j|j | j|dddst d|j dS t d|j|j | j|dd	ds4t d
|j dS d|_dS )zLightweight bootstrap: pip install -e . (code from rsync) + verify GPU.
        Docker image already has all deps and Spark-TTS repo. Models download on first worker.setup().
        z  [%d %s] pip-editable ...zDcd /app && python3 -m pip install --no-cache-dir -e . 2>&1 | tail -3r   r^   zpip install -e . failed for %dFz  [%d %s] verify-gpu ...zipython3 -c "import torch; assert torch.cuda.is_available(); print('GPU:', torch.cuda.get_device_name(0))"r   zGPU verification failed for %dbootstrappedT)r}   r~   r2   r   r   r   r8   )rJ   r~   r"   r"   r&   bootstrap_instance[  s   z FleetDeployer.bootstrap_instancec                 C  s  d|j  d|j d}ddt| jddddd	t|jd
|j |g}tdD ]}z\tj|dddd}|j	
 rC|j	
 dd 
 nd}|jdkrg| rgt||_d|_td|j|j|j|j  W  dS td|d |j|j|j	
 dd |j
 dd  W n0 tjy   td|d |j Y n ty } ztd|d |j| W Y d}~nd}~ww td q&td|j dS )zStart the production pipeline worker as a background process.
        Uses bash -c with explicit fd closing so SSH doesn't hang waiting for nohup.
        zCcd /app && python3 -m codecbench.pipeline.cli sft-run --batch-size z --offer-id vast_z+ </dev/null >/tmp/worker.log 2>&1 & echo $!r   r   r   r   zConnectTimeout=30r   r   rY   Tr   r   r   r   r   r   worker_runningz&Worker started on %d (%s) PID=%d BS=%dz6Worker start attempt %d on %d: rc=%d out=[%s] err=[%s]r-   8Nz'Worker start attempt %d timed out on %dz'Worker start attempt %d error on %d: %s   z-Failed to start worker on %d after 3 attemptsF)r:   r2   r   rI   r6   r5   rm   r   r   r   r   r   r   isdigitr   r;   r8   r}   r~   r   warningr   r   r   rr   rs   r   )rJ   r~   
worker_cmdr   rw   rx   	last_liner   r"   r"   r&   start_workerl  sB   &
 "zFleetDeployer.start_workerc              
   C  s   |  |}|s	dS z!| j|dds| |j W dS td|j|j|j d|_|W S  t	yN } zt
d|j|t  | |j W Y d}~dS d}~ww )zPCreate instance and wait for it to start. Worker auto-starts via onstart script.Nr   r   u>   Instance %d running at %s:%d — onstart worker auto-launchingr  zProvision failed for %d: %s
%s)r   r   destroy_instancer2   r}   r~   r5   r6   r8   r   r   	traceback
format_exc)rJ   r   r~   r   r"   r"   r&   provision_one  s(   
zFleetDeployer.provision_oner2   Nonec                 C  s8   t jt d| d| jdd}|jrtd| d S d S )Nr   r   r[   r   zDestroyed instance %d)rn   deleterp   rF   r   r}   r~   )rJ   r2   rx   r"   r"   r&   r
    s   zFleetDeployer.destroy_instancec                 C  s:   |   }|D ]	}| |d  qtjdd td d S )Nr2   T)
missing_okzAll fleet instances destroyed)
load_stater
  FLEET_STATE_FILEunlinkr}   r~   )rJ   stater   r"   r"   r&   destroy_all  s
   zFleetDeployer.destroy_allc                 C  s8   t jjddd dd | jD }t tj|dd d S )NT)parentsexist_okc                 S  s   g | ]}t |qS r"   )r   r#   ir"   r"   r&   rh     s    z,FleetDeployer.save_state.<locals>.<listcomp>r9   )indent)r  parentmkdirrG   
write_textr]   dumps)rJ   datar"   r"   r&   
save_state  s   zFleetDeployer.save_statec                 C  s   t  rtt  S g S r!   )r  r   r]   loadsr   )rJ   r"   r"   r&   r    s   zFleetDeployer.load_stateoffer_iddict | Nonec              	   C  s   dD ]@}ddddiddiddid|id	d
ggd}t jt d| j|dd}|js*q| dg D ]}|d|krA|    S q2qdS )zSFetch a specific offer by ID. Searches multiple price tiers to avoid limit cutoffs.)g333333?rA   g       @g      @i  rO   rP   TFr-   rQ   rR   rS   )rM   rT   rU   rV   rW   rR   rX   rZ   r[   r\   r`   r   N)rn   ro   rp   rF   r   r]   re   )rJ   r"  rC   rv   rx   rg   r"   r"   r&   get_offer_details  s    zFleetDeployer.get_offer_detailsF	offer_ids	list[int]dry_runc                   s  g }|D ]} |}|r|| qtd| q|s#td dS tdd  tdt| d td  d}t|D ]_\}}|d	dd
 }t	|}	|d }
||
7 }|dd}|dd}|dd}t
|dddd }td|d  d|d dd|dd|	 d|
dd|dd|dd|dd|  q?td|dd |d! d"d# td d |rtd$ dS td%t| td&d'd(L  fd)d*|D }t|D ]5}|| }z| }|rj|   W q ty } ztd+|d| W Y d}~qd}~ww W d   n	1 sw   Y  d,d- jD }t|t| }tdd  td.t| d/t| d0 td  |D ]%}td1|j d2|jdd3|j d|jdd4|j d|j d5|j  qM|r~td6| d7 td d   dS )8z)Deploy specific hand-picked offers by ID.z)Offer %d not found or no longer availablezNo valid offers found!Nr   F======================================================================z  MANUAL FLEET DEPLOYMENT ( GPUs)r   r   r   rR   rb   inet_upra   geolocation?     r-   . r   <20 .0fzGB  BS=  $.3fz/hr  DL=z UL=  rel=
  Total fleet cost: $/hr ($   .2f/day)"     DRY RUN — no instances created*Provisioning %d instances (8 at a time)...r    provmax_workersthread_name_prefixc                      i | ]
}  j||qS r"   submitr  rf   poolrJ   r"   r&   
<dictcomp>      z5FleetDeployer.deploy_by_offer_ids.<locals>.<dictcomp>Provision error for %s: %sc                 S     g | ]	}|j d kr|qS r  r8   r  r"   r"   r&   rh         z5FleetDeployer.deploy_by_offer_ids.<locals>.<listcomp>  DEPLOYMENT RESULT: r    workers running  Instance :  BS=	/hr  PID=:
    instance(s) failed to provision)r$  r   r}   r  r   printr   	enumeratere   r0   r   r~   r   r   resultrG   r   r   r2   r   r:   r4   r;   r5   r6   )rJ   r%  r'  r`   oidrg   
total_costr  vramr   pricedlulrelgeofuturesfutr   r~   r   r   failed_countr"   rD  r&   deploy_by_offer_ids  s   


(
 
z!FleetDeployer.deploy_by_offer_idsc                   s   |}|std dS tdd  tdt| d td  d}t|D ]@\}}|ddd	 }t|}|d
 }	||	7 }td|d  d|d dd|dd| d|	dd|d d|ddd q*td|dd|d dd td d |rtd dS tdt| t	d d!d"K  fd#d$|D }
t
|
D ]4}|
| }z| }|rj|   W q ty } ztd%|d| W Y d}~qd}~ww W d   n1 sw   Y  d&d' jD }t|t| }tdd  td(t| d)t| d* td  |D ]%}td+|j d,|jdd-|j d|jdd.|j d|j d/|j  q|rHtd0| d1 td d   dS )2z0Deploy N GPU instances across diverse GPU types.zNo suitable offers found!Nr   r(  z  FLEET DEPLOYMENT PLAN (r)  r   r   r   rR   r.  r-   r/  r   r0  r1  r2  zGB VRAM  BS=r3  r4  z/hr  machine=r3   r5  ra   r6  r7  r8  r9  r:  r;  r<  r    r=  r>  c                   rA  r"   rB  rf   rD  r"   r&   rF  $  rG  z.FleetDeployer.deploy_fleet.<locals>.<dictcomp>rH  c                 S  rI  rJ  rK  r  r"   r"   r&   rh   0  rL  z.FleetDeployer.deploy_fleet.<locals>.<listcomp>rM  r   rN  rO  rP  rQ  rR  rS  rT  rU  )r   r}   r   rV  r   rW  re   r0   r~   r   r   rX  rG   r   r   r   r2   r   r:   r4   r;   r5   r6   )rJ   rW   r'  r`   rZ  r  rg   r[  r   r\  ra  rb  r   r~   r   r   r   r"   rD  r&   deploy_fleet  s~   


(
 
zFleetDeployer.deploy_fleetc                 C  sp  |   }|std dS tdd  tdt| d td  |D ]}|d }|dd	}|d
d}|dd}z>tjt d| d| jdd}|jrt| d| }t	|t
rg|re|d ni }|dpr|dprd}	nd|j }	W n ty   d}	Y nw d}
d}|r|r|	dkrzFtjddt| jdddddt|d | d!gd"d"dd#}|jdkr|j d}|r|d$  nd%}|d%k}
d|dd$ d&d }W n	 ty   Y nw |
rd'n	|	dkrd(|	 nd)}td*| d+| d,| d-|d.d	  td/|	 d0| d1|  |r-|dd2d D ]}td3|dd4   qq$tdd d dS )5z$Check health of all fleet instances.z0No fleet state found. Deploy first with --deployNr   r(  z  FLEET STATUS (z instances)r2   r   r,  r5   r   r6   r   r   r   r   r   rG   r   	cur_stater   
api_error_api_unreachableFr   r   r   r   r   zConnectTimeout=5r   r   zetail -3 /tmp/worker.log 2>/dev/null && ps aux | grep 'codecbench.pipeline.cli' | grep -v grep | wc -lTr   r   r   r  OKzVAST:WORKER_DOWNz
  [z] Instance z (z) BS=r:   z       Vast: z  SSH: rS  z       Log: r   )r  rV  r   re   rn   rp   rF   r   r]   r   r   rq   r   r   r   r   rI   r   r   r   r   join)rJ   r  r   r   r   ssh_hssh_prx   dvast_statusworker_alive
recent_loglines
proc_countstatus_emojir   r"   r"   r&   check_status>  sh   



(zFleetDeployer.check_statusN)rA   )rB   r   rC   r   )rL   )r   r   rM   r   r   rN   )rW   r   r   rN   )r   r   r   r   )r   )r~   r1   r^   r   r   r   )r   )r   )r~   r1   r   r   r^   r   r   r   )r~   r1   r   r   )r   r   )r~   r1   r   r   r   r   r^   r   r   r   r   r   )r2   r   r   r  )r   r  )r   rN   )r"  r   r   r#  )F)r%  r&  r'  r   r   r  )rW   r   r'  r   r   r  )r<   r=   r>   rK   rz   r   r   r   r   r   r   r   r  r	  r  r
  r  r   r  r$  rd  re  rv  r"   r"   r"   r&   r@   C   s,    

+S

*

!




?6r@   c                  C  s4  t jdd} | jdddd | jdtd dd	 | jd
tdd | jdtdd | jddd | jdddd | jdddd |  }tj	dd}|sTt
d td t||jd}|jrd|  d S |jrm|  d S |jss|jr|jr|jdd |jdD |jd d S |j|j|jd d S |   d S )Nz(Fleet deployer for Vast.ai GPU instances)descriptionz--deploy
store_truezDeploy new fleet)actionhelpz--offer-idsz9Comma-separated Vast.ai offer IDs to deploy (manual pick))rT   defaultrz  z
--num-gpusr  )rT   r{  z--max-priceg?z	--dry-run)ry  z--statuszCheck fleet healthz--destroy-allzDestroy all fleet instancesVAST_KEYr   zERROR: VAST_KEY not setr-   )rC   c                 S  s   g | ]}t | qS r"   )r   r   )r#   xr"   r"   r&   rh     r   zmain.<locals>.<listcomp>,)r'  )argparseArgumentParseradd_argumentr   r   r   
parse_argsosenvironre   rV  sysexitr@   rC   r  r8   rv  deployr'  r%  rd  r   re  rW   
print_help)parserargsrB   deployerr"   r"   r&   mainz  s8   


r  __main__)r   )r   r   r   r   r   r   )*__doc__
__future__r   r  r]   loggingr  r   r  rr   r  concurrent.futuresr   r   dataclassesr   r   r   pathlibr   rn   dotenvr	   basicConfigINFO	getLoggerr}   rp   __file__resolver  r   r   r  r|   r   r?   r0   r1   r@   r  r<   r"   r"   r"   r&   <module>   sH   
	    ;#
