diff --git a/.github/workflows/closing-soon.yml b/.github/workflows/closing-soon.yml index 903ba9a87..1474fda4e 100644 --- a/.github/workflows/closing-soon.yml +++ b/.github/workflows/closing-soon.yml @@ -10,7 +10,7 @@ jobs: issues: write pull-requests: write steps: - - uses: actions/stale@v5 + - uses: actions/stale@v9 with: days-before-issue-stale: 90 days-before-issue-close: 60 diff --git a/.github/workflows/fsdp-eks-regression.yml b/.github/workflows/fsdp-eks-regression.yml deleted file mode 100644 index e5629c649..000000000 --- a/.github/workflows/fsdp-eks-regression.yml +++ /dev/null @@ -1,280 +0,0 @@ -name: FSDP Regression Test (EKS) - -on: - pull_request: - paths: - - '3.test_cases/pytorch/FSDP/**' - - workflow_dispatch: - -jobs: - regression: - strategy: - fail-fast: true - max-parallel: 1 - matrix: - cluster: [p5-eks] - model_config: [llama2_7b, llama2_13b, llama2_70b, llama3_1_8b, llama3_1_70b] - runs-on: [self-hosted, "${{ matrix.cluster }}"] - timeout-minutes: 360 # 6 hours for the full Llama 2 test - steps: - - name: Checkout code - uses: actions/checkout@v4 - with: - path: ${{ github.run_id }} - - - name: Set env vars - run: | - BUILD_ID="${{ github.run_id }}" - FSDP_DIR="$(pwd)/${BUILD_ID}/3.test_cases/pytorch/FSDP" - - # Set instance specific variables - if [[ "${{ matrix.cluster }}" == "p5-eks" ]]; then - EFA_PER_NODE=32 - INSTANCE_TYPE="p5.48xlarge" - elif [[ "${{ matrix.cluster }}" == "p5en-eks" ]]; then - EFA_PER_NODE=16 - INSTANCE_TYPE="p5en.48xlarge" - elif [[ "${{ matrix.cluster }}" == "p6-eks" ]]; then - EFA_PER_NODE=8 - INSTANCE_TYPE="p6-b200.48xlarge" - else - EFA_PER_NODE=32 # Leaving 32 as default for now - INSTANCE_TYPE="p5.48xlarge" # Leaving p5 as default for now - fi - - # Hardcoding these as 4 and 8 for p* - NUM_NODES=4 - GPU_PER_NODE=8 - - echo "BUILD_ID=$BUILD_ID" >> $GITHUB_ENV - echo "FSDP_DIR=$FSDP_DIR" >> $GITHUB_ENV - echo "NUM_NODES=$NUM_NODES" >> $GITHUB_ENV - echo "GPU_PER_NODE=$GPU_PER_NODE" >> $GITHUB_ENV - echo "EFA_PER_NODE=$EFA_PER_NODE" >> $GITHUB_ENV - echo "INSTANCE_TYPE=$INSTANCE_TYPE" >> $GITHUB_ENV - echo "Env vars set successfully!" - - - name: Build container image - working-directory: ${{ env.FSDP_DIR }} - run: | - echo "Building FSDP image" - AWS_REGION=$(aws ec2 describe-availability-zones --output text --query 'AvailabilityZones[0].[RegionName]') - ACCOUNT=$(aws sts get-caller-identity --query Account --output text) - REGISTRY=${ACCOUNT}.dkr.ecr.${AWS_REGION}.amazonaws.com/ - echo "REGISTRY=${REGISTRY}" >> $GITHUB_ENV - - sudo docker build -f Dockerfile -t ${REGISTRY}fsdp-regression:pytorch . - echo "FSDP image built!" - - - name: Create ECR repository and push image - run: | - # Create registry if needed - REGISTRY_COUNT=$(aws ecr describe-repositories | grep \"fsdp-regression\" | wc -l) - if [ "${REGISTRY_COUNT}" == "0" ]; then - aws ecr create-repository --repository-name fsdp-regression - fi - - # Login to registry - echo "Logging in to ${{ env.REGISTRY }}..." - aws ecr get-login-password | sudo docker login --username AWS --password-stdin ${{ env.REGISTRY }} - - # Push image to registry - sudo docker image push ${{ env.REGISTRY }}fsdp-regression:pytorch - echo "Image pushed to ECR successfully!" - - - name: Run training on EKS - id: run_test - working-directory: ${{ env.FSDP_DIR }} - env: - HF_TOKEN: ${{ secrets.HF_TOKEN }} - run: | - # Use model specific YAML file - pushd kubernetes - YAML_FILE="${{ matrix.model_config }}-fsdp.yaml" - - if [ ! -f "${YAML_FILE}" ]; then - echo "Error: YAML file ${YAML_FILE} does not exist!" - exit 1 - fi - - YAML_COPY="regression-${{ matrix.model_config }}-fsdp.yaml" - - # Create copy for modification - cp "${YAML_FILE}" "${YAML_COPY}" - - # Set env_vars for substitution - export IMAGE_URI=${{ env.REGISTRY }}fsdp-regression:pytorch - export INSTANCE_TYPE=${{ env.INSTANCE_TYPE }} - export NUM_NODES=${{ env.NUM_NODES }} - export GPU_PER_NODE=${{ env.GPU_PER_NODE }} - export EFA_PER_NODE=${{ env.EFA_PER_NODE }} - export FI_PROVIDER=efa - export HF_TOKEN=${{ secrets.HF_TOKEN }} - - # Apply K8s manifest - echo "Applying model specific Kubernetes manifest..." - envsubst < "${YAML_COPY}" | kubectl apply -f - - popd - - # Wait for training to start before monitoring - echo "Waiting for pods to become Ready using kubectl wait..." - - # Fix job name (replace underscore with hyphen) - JOB_NAME="${{ matrix.model_config }}-fsdp" - ACTUAL_JOB_NAME=$(echo ${JOB_NAME} | tr '_' '-') - echo "JOB_NAME=$JOB_NAME" >> $GITHUB_ENV - echo "ACTUAL_JOB_NAME=$ACTUAL_JOB_NAME" >> $GITHUB_ENV - - echo "Job name: $JOB_NAME" - echo "Actual job name: $ACTUAL_JOB_NAME" - - kubectl wait --for=condition=Ready pod -l app=${ACTUAL_JOB_NAME} --timeout=10m - if [ $? -ne 0 ]; then - echo "Timed out waiting for pods to become Ready" - kubectl get pods -l app=${ACTUAL_JOB_NAME} - exit 1 - fi - echo "All pods are running and ready!" - - echo "PyTorch Job Name: $(kubectl get pytorchjob)" - echo "Job Pods: $(kubectl get pods)" - - - name: Monitor training run - id: monitor_job - run: | - # Monitor until completion or err - echo "Monitoring job ${{ env.ACTUAL_JOB_NAME }}..." - - start_time=$(date +%s) - timeout=21600 - exit_code=0 - expected_pods=${{ env.NUM_NODES }} - last_log_time=$(date +%s) - - echo "Finding master pod for logs..." - WORKER_POD_0=$(kubectl get pods -l app=${{ env.ACTUAL_JOB_NAME }} -o name | head -1 | cut -d'/' -f2) - master_pod=$(kubectl logs $WORKER_POD_0 | grep master_addr= | head -1 | awk -F'=' '{print $2}' | tr -d '[:space:]') - - if [ -z "$master_pod" ]; then - echo "Could not determine master pod, using first pod for logs" - master_pod=$WORKER_POD_0 - fi - - echo "Selected pod for logs: $master_pod" - - while true; do - current_time=$(date +%s) - elapsed_time=$((current_time - start_time)) - - if [ $elapsed_time -gt $timeout ]; then - echo "Monitoring timed out after 6 hours... Exiting" - exit_code=1 - break - fi - - # Check for error conditions in pods - container_statuses=$(kubectl get pods -l app=${ACTUAL_JOB_NAME} -o jsonpath='{.items[*].status.containerStatuses[*].state}' 2>/dev/null) - pod_phases=$(kubectl get pods -l app=${ACTUAL_JOB_NAME} -o jsonpath='{.items[*].status.phase}' 2>/dev/null) - - if echo "$container_statuses" | grep -q "CrashLoopBackOff\|ImagePullBackOff\|ErrImagePull"; then - echo "Error detected in container status" - exit_code=1 - break - fi - - # Check for error states in pod phases - if echo "$pod_phases" | grep -q "Failed\|Unknown\|Evicted"; then - echo "Error detected in pod phase" - exit_code=1 - break - fi - - # Check PyTorchJob status - job_status=$(kubectl get pytorchjob ${{ env.ACTUAL_JOB_NAME }} -o jsonpath='{.status.conditions[0].type}' 2>/dev/null) - if [ "$job_status" == "Succeeded" ]; then - echo "Job completed successfully!" - exit_code=0 - break - elif [ "$job_status" == "Failed" ]; then - echo "Job failed" - exit_code=1 - break - fi - - # Check if all pods are in Succeeded state - succeeded_pods=$(kubectl get pods -l app=${ACTUAL_JOB_NAME} -o jsonpath='{.items[?(@.status.phase=="Succeeded")].metadata.name}' | wc -w) - total_pods=$(kubectl get pods -l app=${ACTUAL_JOB_NAME} | grep -v NAME | wc -l) - running_pods=$(kubectl get pods -l app=${ACTUAL_JOB_NAME} -o jsonpath='{.items[?(@.status.phase=="Running")].metadata.name}' | wc -w) - - # If all pods have succeeded, then consider the job complete and end loop. - if [ $total_pods -eq $expected_pods ] && [ $succeeded_pods -eq $total_pods ]; then - echo "All pods ($succeeded_pods/$total_pods) are in Succeeded state. Job completed successfully!" - exit_code=0 - break - fi - - # Check if all expected pods are running - if [ $total_pods -eq $expected_pods ] && [ $running_pods -eq $total_pods ]; then - echo "All expected pods (${running_pods}/${expected_pods}) are running" - echo "=== Recent logs from pod $master_pod ===" - kubectl logs $master_pod --tail=20 - echo "=== End of recent logs ===" - last_log_time=$current_time - elif [ $total_pods -lt $expected_pods ]; then - echo "Warning: Only $total_pods pods found, expected $expected_pods" - elif [ $running_pods -lt $total_pods ]; then - echo "Warning: Only $running_pods/$total_pods pods are running" - # List pods that aren't running - kubectl get pods -l app=${ACTUAL_JOB_NAME} -o jsonpath='{range .items[?(@.status.phase!="Running")]}{.metadata.name}{" is "}{.status.phase}{"\n"}{end}' - fi - - echo "Job status: $running_pods/$total_pods pods running (elapsed: $elapsed_time seconds)" - sleep 60 - done - - echo "exit_code=$exit_code" >> $GITHUB_OUTPUT - if [ $exit_code -ne 0 ]; then - echo "FSDP training on EKS failed with exit code: $exit_code" - exit $exit_code - fi - - - name: Collect pod logs - if: always() - run: | - echo "Collecting pod logs from training run" - mkdir -p pod-logs - - echo "Finding master pod..." - - # Find the first worker pod to search for master_addr - WORKER_POD_0=$(kubectl get pods -l app=${{ env.ACTUAL_JOB_NAME }} -o name | head -1 | cut -d'/' -f2) - MASTER_POD=$(kubectl logs $WORKER_POD_0 | grep master_addr= | head -1 | awk -F'=' '{print $2}' | tr -d '[:space:]') - - if [ -z "$MASTER_POD" ]; then - echo "Could not determine master pod, collecting logs from all pods" - for pod in $(kubectl get pods -l app=${{ env.ACTUAL_JOB_NAME }} -o name); do - pod_name=$(echo $pod | cut -d'/' -f2) - kubectl logs $pod_name > pod-logs/$pod_name.log - done - else - echo "Master pod is: $MASTER_POD" - kubectl logs $MASTER_POD > pod-logs/master-$MASTER_POD.log - echo "Master pod logs collected" - fi - - - name: Upload logs as artifacts - if: always() - uses: actions/upload-artifact@v4 - with: - name: regression-logs-${{ matrix.model_config }}-${{ matrix.cluster }}-${{ github.run_id }} - path: pod-logs - retention-days: 7 - - - name: Cleanup - if: always() - run: | - echo "Cleaning up..." - kubectl delete pytorchjob ${{ env.ACTUAL_JOB_NAME }} || true - rm -rf pod-logs - echo "Cleaned up successfully!" diff --git a/.github/workflows/fsdp-regression-test-container.yml b/.github/workflows/fsdp-regression-test-container.yml index 23176ccc8..9a72c0b6e 100644 --- a/.github/workflows/fsdp-regression-test-container.yml +++ b/.github/workflows/fsdp-regression-test-container.yml @@ -29,9 +29,9 @@ jobs: build: strategy: fail-fast: true - max-parallel: 3 + max-parallel: 1 matrix: - cluster: [p5, p5-smhp] + cluster: [p5] runs-on: ubuntu-latest concurrency: group: ${{ github.workflow }}-${{ matrix.cluster }}-build @@ -61,7 +61,7 @@ jobs: # Add host to known hosts with retry for i in {1..5}; do - if ssh-keyscan -H ${{ env.SLURM_HOST }} >> ~/.ssh/known_hosts 2>/dev/null; then + if ssh-keyscan -H "${{ env.SLURM_HOST }}" >> ~/.ssh/known_hosts 2>/dev/null; then echo "SSH keyscan successful" break fi @@ -70,8 +70,8 @@ jobs: done REMOTE_BUILD_PATH="${{ env.BASE_PATH }}/container-builds/${{ github.run_id }}-${{ matrix.cluster }}" - echo "remote_build_path=$REMOTE_BUILD_PATH" >> $GITHUB_OUTPUT - echo "REMOTE_BUILD_PATH=$REMOTE_BUILD_PATH" >> $GITHUB_ENV + echo "remote_build_path=$REMOTE_BUILD_PATH" >> "$GITHUB_OUTPUT" + echo "REMOTE_BUILD_PATH=$REMOTE_BUILD_PATH" >> "$GITHUB_ENV" - name: Transfer Code to Cluster run: | @@ -99,7 +99,9 @@ jobs: ENROOT_IMAGE="${{ env.BASE_PATH }}/enroot-images/fsdp-${{ github.run_id }}-${{ matrix.cluster }}.sqsh" echo "Building FSDP image on cluster..." + # shellcheck disable=SC2087 ssh -i ~/.ssh/slurm_key -o StrictHostKeyChecking=no -o ConnectTimeout=30 \ + -o ServerAliveInterval=60 -o ServerAliveCountMax=5 \ ${{ env.SLURM_USER }}@${{ env.SLURM_HOST }} << EOF set -e cd $FSDP_DIR @@ -111,9 +113,9 @@ jobs: echo "Enroot image created at: $ENROOT_IMAGE" # Clean up Docker image to save space docker rmi fsdp:pytorch || true -EOF + EOF - echo "enroot_image_path=$ENROOT_IMAGE" >> $GITHUB_OUTPUT + echo "enroot_image_path=$ENROOT_IMAGE" >> "$GITHUB_OUTPUT" echo "Container build completed successfully!" - name: Cleanup Build Directory @@ -127,10 +129,10 @@ EOF needs: build strategy: fail-fast: false - max-parallel: 6 + max-parallel: 2 matrix: - cluster: [p5, p5-smhp] - model_config: [llama2_7b, llama2_13b, llama2_70b, llama3_1_8b, llama3_1_70b] + cluster: [p5] + model_config: [llama3_1_8b, llama3_1_70b] runs-on: ubuntu-latest concurrency: group: ${{ github.workflow }}-${{ matrix.cluster }}-${{ matrix.model_config }} @@ -155,7 +157,7 @@ EOF chmod 600 ~/.ssh/slurm_key for i in {1..5}; do - if ssh-keyscan -H ${{ env.SLURM_HOST }} >> ~/.ssh/known_hosts 2>/dev/null; then + if ssh-keyscan -H "${{ env.SLURM_HOST }}" >> ~/.ssh/known_hosts 2>/dev/null; then break fi echo "SSH keyscan attempt $i failed, retrying..." @@ -171,25 +173,29 @@ EOF CHECKPOINT_DIR="${{ env.BASE_PATH }}/checkpoints-${BUILD_ID}-${{ matrix.model_config }}-${{ matrix.cluster }}" ENROOT_IMAGE="${{ env.BASE_PATH }}/enroot-images/fsdp-${{ github.run_id }}-${{ matrix.cluster }}.sqsh" - echo "remote_test_path=$REMOTE_TEST_PATH" >> $GITHUB_OUTPUT - echo "log_dir=$LOG_DIR" >> $GITHUB_OUTPUT - echo "checkpoint_dir=$CHECKPOINT_DIR" >> $GITHUB_OUTPUT - echo "enroot_image=$ENROOT_IMAGE" >> $GITHUB_OUTPUT + { + echo "remote_test_path=$REMOTE_TEST_PATH" + echo "log_dir=$LOG_DIR" + echo "checkpoint_dir=$CHECKPOINT_DIR" + echo "enroot_image=$ENROOT_IMAGE" + } >> "$GITHUB_OUTPUT" - echo "REMOTE_TEST_PATH=$REMOTE_TEST_PATH" >> $GITHUB_ENV - echo "LOG_DIR=$LOG_DIR" >> $GITHUB_ENV - echo "CHECKPOINT_DIR=$CHECKPOINT_DIR" >> $GITHUB_ENV - echo "ENROOT_IMAGE=$ENROOT_IMAGE" >> $GITHUB_ENV + { + echo "REMOTE_TEST_PATH=$REMOTE_TEST_PATH" + echo "LOG_DIR=$LOG_DIR" + echo "CHECKPOINT_DIR=$CHECKPOINT_DIR" + echo "ENROOT_IMAGE=$ENROOT_IMAGE" + } >> "$GITHUB_ENV" - name: Create Remote Directories run: | ssh -i ~/.ssh/slurm_key -o StrictHostKeyChecking=no -o ConnectTimeout=30 \ - ${{ env.SLURM_USER }}@${{ env.SLURM_HOST }} << EOF + ${{ env.SLURM_USER }}@${{ env.SLURM_HOST }} << 'EOF' mkdir -p ${{ env.REMOTE_TEST_PATH }} mkdir -p ${{ env.LOG_DIR }} mkdir -p ${{ env.CHECKPOINT_DIR }} chmod 755 ${{ env.LOG_DIR }} ${{ env.CHECKPOINT_DIR }} -EOF + EOF - name: Transfer Code to Cluster run: | @@ -213,6 +219,7 @@ EOF TMP_SBATCH="slurm/regression_test_${{ matrix.model_config }}_${{ matrix.cluster }}.sbatch" # Prepare job script on cluster + # shellcheck disable=SC2087 ssh -i ~/.ssh/slurm_key -o StrictHostKeyChecking=no -o ConnectTimeout=30 \ ${{ env.SLURM_USER }}@${{ env.SLURM_HOST }} << EOF set -e @@ -232,12 +239,21 @@ EOF sed -i "s|--checkpoint_dir=./checkpoints|--checkpoint_dir=/checkpoints|g" "$TMP_SBATCH" sed -i "s|--container-mounts.*|--container-mounts \\$FSX_MOUNT,${{ env.CHECKPOINT_DIR }}:/checkpoints|" "$TMP_SBATCH" + # Inject HF_TOKEN so compute nodes can access gated HuggingFace models + if [ -n "$HF_TOKEN" ]; then + sed -i '/^#SBATCH/!b;:a;n;/^#SBATCH/ba;i\export HF_TOKEN="'"$HF_TOKEN"'"' "$TMP_SBATCH" + fi + + # Point training to the pre-resolved data files manifest so processes + # skip the HuggingFace tree API (avoids HTTP 429 rate limits) + sed -i '/^export HF_HOME/a export HF_DATA_FILES_MANIFEST=/fsx/.cache/huggingface/c4_en_data_files.json' "$TMP_SBATCH" + # Submit job echo "Submitting Slurm job..." JOB_ID=\$(sbatch --parsable $TMP_SBATCH) echo "JOB_ID=\$JOB_ID" >> ${{ env.REMOTE_TEST_PATH }}/job_info.txt echo "Submitted job: \$JOB_ID" -EOF + EOF # Get job ID sleep 2 @@ -245,47 +261,79 @@ EOF ${{ env.SLURM_USER }}@${{ env.SLURM_HOST }} \ "cat ${{ env.REMOTE_TEST_PATH }}/job_info.txt | grep JOB_ID | cut -d= -f2") - echo "job_id=$JOB_ID" >> $GITHUB_OUTPUT - echo "JOB_ID=$JOB_ID" >> $GITHUB_ENV + echo "job_id=$JOB_ID" >> "$GITHUB_OUTPUT" + echo "JOB_ID=$JOB_ID" >> "$GITHUB_ENV" echo "Submitted Slurm job: $JOB_ID" - name: Monitor Job with Real-time Logs id: monitor_job run: | + # SSH wrapper with retry and keepalive + ssh_cmd() { + local max_retries=3 + local retry_delay=10 + for attempt in $(seq 1 $max_retries); do + if ssh -i ~/.ssh/slurm_key -o StrictHostKeyChecking=no \ + -o ConnectTimeout=30 -o ServerAliveInterval=60 -o ServerAliveCountMax=5 \ + ${{ env.SLURM_USER }}@${{ env.SLURM_HOST }} "export PATH=/opt/slurm/bin:\$PATH; $*"; then + return 0 + fi + echo "SSH attempt $attempt/$max_retries failed, retrying in ${retry_delay}s..." + sleep $retry_delay + done + echo "SSH failed after $max_retries attempts" + return 1 + } + echo "Monitoring job ${{ env.JOB_ID }}..." START_TIME=$(date +%s) TIMEOUT=21600 # 6 hours - - # Get initial log file name (will be updated once job starts) + LOG_FILE="${{ env.LOG_DIR }}/regression_test_${{ env.JOB_ID }}.out" - + while true; do CURRENT_TIME=$(date +%s) ELAPSED=$((CURRENT_TIME - START_TIME)) - + if [ $ELAPSED -gt $TIMEOUT ]; then echo "Timeout reached after 6 hours" exit 1 fi - - # Check job status - JOB_STATUS=$(ssh -i ~/.ssh/slurm_key -o StrictHostKeyChecking=no \ - ${{ env.SLURM_USER }}@${{ env.SLURM_HOST }} \ - "squeue -j ${{ env.JOB_ID }} -h -o %T 2>/dev/null || echo 'COMPLETED'") - - if [ -z "$JOB_STATUS" ] || [ "$JOB_STATUS" == "COMPLETED" ]; then - echo "Job completed successfully" - break + + # Check job status via squeue + JOB_STATUS=$(ssh_cmd "squeue -j ${{ env.JOB_ID }} -h -o %T 2>/dev/null" || true) + + if [ -z "$JOB_STATUS" ]; then + # Job left the queue -- check actual result via sacct + echo "Job no longer in queue, checking final state with sacct..." + FINAL_STATE=$(ssh_cmd "sacct -j ${{ env.JOB_ID }} --format=State --noheader -P | head -1 | tr -d ' '" || true) + EXIT_CODE=$(ssh_cmd "sacct -j ${{ env.JOB_ID }} --format=ExitCode --noheader -P | head -1" || true) + + echo "Final state: $FINAL_STATE (exit code: $EXIT_CODE)" + + if [ "$FINAL_STATE" == "COMPLETED" ]; then + echo "Job completed successfully" + break + else + echo "::group::Job error output (last 200 lines)" + ERR_FILE="${{ env.LOG_DIR }}/regression_test_${{ env.JOB_ID }}.err" + ssh_cmd "tail -n 200 $ERR_FILE 2>/dev/null || echo 'No stderr log found at $ERR_FILE'" || true + ssh_cmd "tail -n 200 $LOG_FILE 2>/dev/null || echo 'No stdout log found at $LOG_FILE'" || true + # Also check for slurm default logs in the working directory + FSDP_SLURM_DIR="${{ env.REMOTE_TEST_PATH }}/3.test_cases/pytorch/FSDP/slurm" + ssh_cmd "tail -n 200 $FSDP_SLURM_DIR/slurm-${{ env.JOB_ID }}.out 2>/dev/null || true" || true + echo "::endgroup::" + echo "Job finished with state: $FINAL_STATE" + exit 1 + fi elif [ "$JOB_STATUS" == "FAILED" ] || [ "$JOB_STATUS" == "CANCELLED" ] || [ "$JOB_STATUS" == "TIMEOUT" ]; then echo "Job failed with status: $JOB_STATUS" exit 1 fi - - # Stream logs in real-time - ssh -i ~/.ssh/slurm_key -o StrictHostKeyChecking=no \ - ${{ env.SLURM_USER }}@${{ env.SLURM_HOST }} \ - "tail -n 50 $LOG_FILE 2>/dev/null || echo 'Waiting for log file...'" - + + # Stream recent logs (best-effort, don't fail on SSH errors) + ssh_cmd "tail -n 50 $LOG_FILE 2>/dev/null || echo 'Waiting for log file...'" || true + echo "--- Job status: $JOB_STATUS (elapsed: $((ELAPSED / 60)) min) ---" sleep 30 done @@ -296,16 +344,25 @@ EOF echo "Retrieving logs from cluster..." mkdir -p ./logs - # Copy logs with retry + FSDP_SLURM_DIR="${{ env.REMOTE_TEST_PATH }}/3.test_cases/pytorch/FSDP/slurm" + + # Copy logs from LOG_DIR with retry for i in {1..3}; do if scp -i ~/.ssh/slurm_key -o StrictHostKeyChecking=no -o ConnectTimeout=30 -r \ - ${{ env.SLURM_USER }}@${{ env.SLURM_HOST }}:${{ env.LOG_DIR }}/* ./logs/ 2>/dev/null; then - echo "Logs retrieved successfully" + "${{ env.SLURM_USER }}@${{ env.SLURM_HOST }}:${{ env.LOG_DIR }}/*" ./logs/ 2>/dev/null; then + echo "Logs retrieved from LOG_DIR successfully" break fi echo "Log retrieval attempt $i failed, retrying..." sleep 10 done + + # Also grab any slurm-*.out files from the working directory (fallback logs) + scp -i ~/.ssh/slurm_key -o StrictHostKeyChecking=no -o ConnectTimeout=30 \ + "${{ env.SLURM_USER }}@${{ env.SLURM_HOST }}:${FSDP_SLURM_DIR}/slurm-*.out" ./logs/ 2>/dev/null || true + + echo "Retrieved log files:" + ls -la ./logs/ 2>/dev/null || echo "No logs found" - name: Upload logs as artifacts if: always() @@ -319,25 +376,44 @@ EOF if: always() run: | echo "Cleaning up remote resources..." - + # Cancel job if still running if [ -n "${{ env.JOB_ID }}" ]; then ssh -i ~/.ssh/slurm_key -o StrictHostKeyChecking=no \ ${{ env.SLURM_USER }}@${{ env.SLURM_HOST }} \ - "scancel ${{ env.JOB_ID }} 2>/dev/null || true" + "export PATH=/opt/slurm/bin:\$PATH; scancel ${{ env.JOB_ID }} 2>/dev/null || true" fi - - # Clean up directories + + # Clean up test directories (enroot image cleaned up by cleanup-enroot job) ssh -i ~/.ssh/slurm_key -o StrictHostKeyChecking=no \ - ${{ env.SLURM_USER }}@${{ env.SLURM_HOST }} << EOF - rm -rf ${{ env.REMOTE_TEST_PATH }} - rm -rf ${{ env.LOG_DIR }} - rm -rf ${{ env.CHECKPOINT_DIR }} - # Clean up enroot image if this is the last test for this build - if [ "${{ matrix.model_config }}" == "llama3_1_70b" ]; then - rm -f ${{ env.ENROOT_IMAGE }} || true - fi -EOF - + ${{ env.SLURM_USER }}@${{ env.SLURM_HOST }} \ + "rm -rf ${{ env.REMOTE_TEST_PATH }} ${{ env.LOG_DIR }} ${{ env.CHECKPOINT_DIR }}" || true + rm -rf ./logs echo "Cleanup completed!" + + cleanup-enroot: + name: Cleanup Enroot Images + needs: [build, run-tests] + runs-on: ubuntu-latest + if: always() + steps: + - name: Configure AWS Credentials + uses: aws-actions/configure-aws-credentials@v4 + with: + role-to-assume: ${{ env.AWS_ROLE_ARN }} + aws-region: ${{ env.AWS_REGION }} + + - name: Setup SSH Key + run: | + mkdir -p ~/.ssh + echo "${{ secrets.SLURM_SSH_KEY }}" > ~/.ssh/slurm_key + chmod 600 ~/.ssh/slurm_key + ssh-keyscan -H "${{ env.SLURM_HOST }}" >> ~/.ssh/known_hosts 2>/dev/null || true + + - name: Remove Enroot Images + run: | + echo "Cleaning up enroot images for run ${{ github.run_id }}..." + ssh -i ~/.ssh/slurm_key -o StrictHostKeyChecking=no -o ConnectTimeout=30 \ + ${{ env.SLURM_USER }}@${{ env.SLURM_HOST }} \ + "rm -f ${{ env.BASE_PATH }}/enroot-images/fsdp-${{ github.run_id }}-*.sqsh && echo 'Enroot images cleaned up'" || true diff --git a/.github/workflows/fsdp-regression-test-venv.yml b/.github/workflows/fsdp-regression-test-venv.yml index 25f99a3dc..9087e9650 100644 --- a/.github/workflows/fsdp-regression-test-venv.yml +++ b/.github/workflows/fsdp-regression-test-venv.yml @@ -1,7 +1,5 @@ name: FSDP Regression Test (venv) -# TODO: Additional test cases to matrix. Change max-parallel. - on: push: branches: [ "main" ] @@ -31,8 +29,8 @@ jobs: fail-fast: true max-parallel: 3 matrix: - cluster: [p5, p5-smhp] - model_config: [llama2_7b, llama2_13b, llama2_70b, llama3_1_8b, llama3_1_70b] + cluster: [p5] + model_config: [llama3_1_8b, llama3_1_70b] runs-on: ubuntu-latest concurrency: group: ${{ github.workflow }}-${{ matrix.cluster }}-${{ matrix.model_config }} @@ -58,7 +56,7 @@ jobs: # Add host to known hosts with retry for i in {1..5}; do - if ssh-keyscan -H ${{ env.SLURM_HOST }} >> ~/.ssh/known_hosts 2>/dev/null; then + if ssh-keyscan -H "${{ env.SLURM_HOST }}" >> ~/.ssh/known_hosts 2>/dev/null; then echo "SSH keyscan successful" break fi @@ -74,23 +72,27 @@ jobs: LOG_DIR="${{ env.HOME_PATH }}/regression-logs-${BUILD_ID}-${{ matrix.model_config }}-${{ matrix.cluster }}" CHECKPOINT_DIR="${{ env.BASE_PATH }}/checkpoints-${BUILD_ID}-${{ matrix.model_config }}-${{ matrix.cluster }}" - echo "remote_test_path=$REMOTE_TEST_PATH" >> $GITHUB_OUTPUT - echo "log_dir=$LOG_DIR" >> $GITHUB_OUTPUT - echo "checkpoint_dir=$CHECKPOINT_DIR" >> $GITHUB_OUTPUT + { + echo "remote_test_path=$REMOTE_TEST_PATH" + echo "log_dir=$LOG_DIR" + echo "checkpoint_dir=$CHECKPOINT_DIR" + } >> "$GITHUB_OUTPUT" - echo "REMOTE_TEST_PATH=$REMOTE_TEST_PATH" >> $GITHUB_ENV - echo "LOG_DIR=$LOG_DIR" >> $GITHUB_ENV - echo "CHECKPOINT_DIR=$CHECKPOINT_DIR" >> $GITHUB_ENV + { + echo "REMOTE_TEST_PATH=$REMOTE_TEST_PATH" + echo "LOG_DIR=$LOG_DIR" + echo "CHECKPOINT_DIR=$CHECKPOINT_DIR" + } >> "$GITHUB_ENV" - name: Create Remote Directories run: | ssh -i ~/.ssh/slurm_key -o StrictHostKeyChecking=no -o ConnectTimeout=30 \ - ${{ env.SLURM_USER }}@${{ env.SLURM_HOST }} << EOF + ${{ env.SLURM_USER }}@${{ env.SLURM_HOST }} << 'EOF' mkdir -p ${{ env.REMOTE_TEST_PATH }} mkdir -p ${{ env.LOG_DIR }} mkdir -p ${{ env.CHECKPOINT_DIR }} chmod 755 ${{ env.LOG_DIR }} ${{ env.CHECKPOINT_DIR }} -EOF + EOF - name: Transfer Code to Cluster run: | @@ -110,13 +112,120 @@ EOF FSDP_SLURM_DIR="${{ env.REMOTE_TEST_PATH }}/3.test_cases/pytorch/FSDP/slurm" echo "Creating virtual environment on cluster..." + # shellcheck disable=SC2087 ssh -i ~/.ssh/slurm_key -o StrictHostKeyChecking=no -o ConnectTimeout=30 \ ${{ env.SLURM_USER }}@${{ env.SLURM_HOST }} << EOF set -e cd $FSDP_SLURM_DIR bash ./create_venv.sh echo "Virtual environment created successfully!" -EOF + EOF + + - name: Pre-download Dataset and Tokenizers + env: + HF_TOKEN: ${{ secrets.HF_TOKEN }} + run: | + FSDP_SLURM_DIR="${{ env.REMOTE_TEST_PATH }}/3.test_cases/pytorch/FSDP/slurm" + MANIFEST_PATH="/fsx/.cache/huggingface/c4_en_data_files.json" + echo "MANIFEST_PATH=$MANIFEST_PATH" >> "$GITHUB_ENV" + + echo "Downloading dataset shards to local FSx and pre-caching tokenizers..." + # shellcheck disable=SC2087 + ssh -i ~/.ssh/slurm_key -o StrictHostKeyChecking=no -o ConnectTimeout=30 \ + ${{ env.SLURM_USER }}@${{ env.SLURM_HOST }} << EOF + set -e + cd $FSDP_SLURM_DIR + source env/bin/activate + export HF_HOME=/fsx/.cache/huggingface + export HF_DATASETS_CACHE=/fsx/.cache/huggingface/datasets + export HF_TOKEN="$HF_TOKEN" + mkdir -p /fsx/.cache/huggingface/datasets + + python3 -c " + import json, os, subprocess, glob as globmod + + manifest_path = '$MANIFEST_PATH' + local_dir = '/fsx/.cache/huggingface/c4_local/en' + + # Check if local data shards already exist and manifest points to them + if os.path.isfile(manifest_path): + import time + age_hours = (time.time() - os.path.getmtime(manifest_path)) / 3600 + try: + with open(manifest_path) as f: + m = json.load(f) + # If manifest already has local paths and is recent, skip + if (age_hours < 168 and m.get('train') + and m['train'][0].startswith('/fsx/') + and all(os.path.isfile(p) for p in m['train'][:3])): + print(f'Local-path manifest exists ({age_hours:.1f}h old), skipping download') + for k, v in m.items(): + print(f' {k}: {len(v)} files') + exit(0) + except (json.JSONDecodeError, KeyError): + pass + + os.makedirs(local_dir, exist_ok=True) + + # Download shards from HuggingFace if not already present + from huggingface_hub import HfApi + api = HfApi() + info = api.dataset_info('allenai/c4') + revision = info.sha + base_url = f'https://huggingface.co/datasets/allenai/c4/resolve/{revision}/en' + + # We only need a small subset for CI (100 training steps): + # 10 train shards (~3GB) + all 8 validation shards (~320MB) + NUM_TRAIN_SHARDS = 10 + + for split_prefix, split_name, count, total in [ + ('c4-train', 'train', NUM_TRAIN_SHARDS, 1024), + ('c4-validation', 'validation', 8, 8)]: + print(f'Checking {split_name} shards ({count} of {total})...') + for i in range(count): + fname = f'{split_prefix}.{i:05d}-of-{total:05d}.json.gz' + dest = os.path.join(local_dir, fname) + if os.path.isfile(dest) and os.path.getsize(dest) > 10000: + continue + url = f'{base_url}/{fname}' + print(f' Downloading {fname}...') + subprocess.run([ + 'curl', '-sL', + '-H', f'Authorization: Bearer {os.environ.get(\"HF_TOKEN\", \"\")}', + url, '-o', dest + ], check=True) + size_mb = os.path.getsize(dest) / 1024 / 1024 + print(f' {size_mb:.0f} MB') + + # Build manifest with local file paths + manifest = {} + for split_prefix, split_name, count, total in [ + ('c4-train', 'train', NUM_TRAIN_SHARDS, 1024), + ('c4-validation', 'validation', 8, 8)]: + files = [] + for i in range(count): + fname = f'{split_prefix}.{i:05d}-of-{total:05d}.json.gz' + fpath = os.path.join(local_dir, fname) + if os.path.isfile(fpath): + files.append(fpath) + manifest[split_name] = sorted(files) + print(f'{split_name}: {len(files)} local files') + + with open(manifest_path, 'w') as f: + json.dump(manifest, f) + print(f'Local-path manifest written to {manifest_path}') + + # Pre-cache tokenizers so HF_HUB_OFFLINE=1 works during training + from transformers import AutoTokenizer + for tok_name in ['hf-internal-testing/llama-tokenizer', 'mistralai/mathstral-7B-v0.1', 'mistralai/Mixtral-8x7B-v0.1']: + try: + print(f'Pre-caching tokenizer: {tok_name}') + AutoTokenizer.from_pretrained(tok_name, legacy=False) + except Exception as e: + print(f'Warning: could not cache {tok_name}: {e}') + print('Pre-download complete') + " + EOF - name: Prepare and Submit Slurm Job id: submit_job @@ -128,6 +237,7 @@ EOF TMP_SBATCH="regression_test_${{ matrix.model_config }}.sbatch" # Prepare and submit job on cluster + # shellcheck disable=SC2087 ssh -i ~/.ssh/slurm_key -o StrictHostKeyChecking=no -o ConnectTimeout=30 \ ${{ env.SLURM_USER }}@${{ env.SLURM_HOST }} << EOF set -e @@ -145,15 +255,25 @@ EOF sed -i "s|#SBATCH --error=.*|#SBATCH --error=${{ env.LOG_DIR }}/regression_test_%j.err|" "$TMP_SBATCH" sed -i "s|--checkpoint_dir=./checkpoints|--checkpoint_dir=${{ env.CHECKPOINT_DIR }}|g" "$TMP_SBATCH" - # Activate venv in the sbatch script - sed -i '1a source env/bin/activate' "$TMP_SBATCH" + # Activate venv in the sbatch script (must go AFTER all #SBATCH directives, + # otherwise Slurm stops parsing directives at the first non-comment line) + sed -i '/^#SBATCH/!b;:a;n;/^#SBATCH/ba;i\source env/bin/activate' "$TMP_SBATCH" + + # Inject HF_TOKEN so compute nodes can access gated HuggingFace models + if [ -n "$HF_TOKEN" ]; then + sed -i '/^source env\/bin\/activate/a export HF_TOKEN="'"$HF_TOKEN"'"' "$TMP_SBATCH" + fi + + # Point training to the pre-resolved data files manifest so processes + # skip the HuggingFace tree API (avoids HTTP 429 rate limits) + sed -i '/^export HF_HOME/a export HF_DATA_FILES_MANIFEST=${{ env.MANIFEST_PATH }}' "$TMP_SBATCH" # Submit job echo "Submitting Slurm job..." - JOB_ID=\$(sbatch --parsable \$TMP_SBATCH) + JOB_ID=\$(sbatch --parsable $TMP_SBATCH) echo "JOB_ID=\$JOB_ID" >> ${{ env.REMOTE_TEST_PATH }}/job_info.txt echo "Submitted job: \$JOB_ID" -EOF + EOF # Get job ID sleep 2 @@ -161,46 +281,79 @@ EOF ${{ env.SLURM_USER }}@${{ env.SLURM_HOST }} \ "cat ${{ env.REMOTE_TEST_PATH }}/job_info.txt | grep JOB_ID | cut -d= -f2") - echo "job_id=$JOB_ID" >> $GITHUB_OUTPUT - echo "JOB_ID=$JOB_ID" >> $GITHUB_ENV + echo "job_id=$JOB_ID" >> "$GITHUB_OUTPUT" + echo "JOB_ID=$JOB_ID" >> "$GITHUB_ENV" echo "Submitted Slurm job: $JOB_ID" - name: Monitor Job with Real-time Logs id: monitor_job run: | + # SSH wrapper with retry and keepalive + ssh_cmd() { + local max_retries=3 + local retry_delay=10 + for attempt in $(seq 1 $max_retries); do + if ssh -i ~/.ssh/slurm_key -o StrictHostKeyChecking=no \ + -o ConnectTimeout=30 -o ServerAliveInterval=60 -o ServerAliveCountMax=5 \ + ${{ env.SLURM_USER }}@${{ env.SLURM_HOST }} "export PATH=/opt/slurm/bin:\$PATH; $*"; then + return 0 + fi + echo "SSH attempt $attempt/$max_retries failed, retrying in ${retry_delay}s..." + sleep $retry_delay + done + echo "SSH failed after $max_retries attempts" + return 1 + } + echo "Monitoring job ${{ env.JOB_ID }}..." START_TIME=$(date +%s) TIMEOUT=21600 # 6 hours - + LOG_FILE="${{ env.LOG_DIR }}/regression_test_${{ env.JOB_ID }}.out" - + while true; do CURRENT_TIME=$(date +%s) ELAPSED=$((CURRENT_TIME - START_TIME)) - + if [ $ELAPSED -gt $TIMEOUT ]; then echo "Timeout reached after 6 hours" exit 1 fi - - # Check job status - JOB_STATUS=$(ssh -i ~/.ssh/slurm_key -o StrictHostKeyChecking=no \ - ${{ env.SLURM_USER }}@${{ env.SLURM_HOST }} \ - "squeue -j ${{ env.JOB_ID }} -h -o %T 2>/dev/null || echo 'COMPLETED'") - - if [ -z "$JOB_STATUS" ] || [ "$JOB_STATUS" == "COMPLETED" ]; then - echo "Job completed successfully" - break + + # Check job status via squeue + JOB_STATUS=$(ssh_cmd "squeue -j ${{ env.JOB_ID }} -h -o %T 2>/dev/null" || true) + + if [ -z "$JOB_STATUS" ]; then + # Job left the queue -- check actual result via sacct + echo "Job no longer in queue, checking final state with sacct..." + FINAL_STATE=$(ssh_cmd "sacct -j ${{ env.JOB_ID }} --format=State --noheader -P | head -1 | tr -d ' '" || true) + EXIT_CODE=$(ssh_cmd "sacct -j ${{ env.JOB_ID }} --format=ExitCode --noheader -P | head -1" || true) + + echo "Final state: $FINAL_STATE (exit code: $EXIT_CODE)" + + if [ "$FINAL_STATE" == "COMPLETED" ]; then + echo "Job completed successfully" + break + else + echo "::group::Job error output (last 200 lines)" + ERR_FILE="${{ env.LOG_DIR }}/regression_test_${{ env.JOB_ID }}.err" + ssh_cmd "tail -n 200 $ERR_FILE 2>/dev/null || echo 'No stderr log found at $ERR_FILE'" || true + ssh_cmd "tail -n 200 $LOG_FILE 2>/dev/null || echo 'No stdout log found at $LOG_FILE'" || true + # Also check for slurm default logs in the working directory + FSDP_SLURM_DIR="${{ env.REMOTE_TEST_PATH }}/3.test_cases/pytorch/FSDP/slurm" + ssh_cmd "tail -n 200 $FSDP_SLURM_DIR/slurm-${{ env.JOB_ID }}.out 2>/dev/null || true" || true + echo "::endgroup::" + echo "Job finished with state: $FINAL_STATE" + exit 1 + fi elif [ "$JOB_STATUS" == "FAILED" ] || [ "$JOB_STATUS" == "CANCELLED" ] || [ "$JOB_STATUS" == "TIMEOUT" ]; then echo "Job failed with status: $JOB_STATUS" exit 1 fi - - # Stream logs in real-time - ssh -i ~/.ssh/slurm_key -o StrictHostKeyChecking=no \ - ${{ env.SLURM_USER }}@${{ env.SLURM_HOST }} \ - "tail -n 50 $LOG_FILE 2>/dev/null || echo 'Waiting for log file...'" - + + # Stream recent logs (best-effort, don't fail on SSH errors) + ssh_cmd "tail -n 50 $LOG_FILE 2>/dev/null || echo 'Waiting for log file...'" || true + echo "--- Job status: $JOB_STATUS (elapsed: $((ELAPSED / 60)) min) ---" sleep 30 done @@ -211,16 +364,25 @@ EOF echo "Retrieving logs from cluster..." mkdir -p ./logs - # Copy logs with retry + FSDP_SLURM_DIR="${{ env.REMOTE_TEST_PATH }}/3.test_cases/pytorch/FSDP/slurm" + + # Copy logs from LOG_DIR with retry for i in {1..3}; do if scp -i ~/.ssh/slurm_key -o StrictHostKeyChecking=no -o ConnectTimeout=30 -r \ - ${{ env.SLURM_USER }}@${{ env.SLURM_HOST }}:${{ env.LOG_DIR }}/* ./logs/ 2>/dev/null; then - echo "Logs retrieved successfully" + "${{ env.SLURM_USER }}@${{ env.SLURM_HOST }}:${{ env.LOG_DIR }}/*" ./logs/ 2>/dev/null; then + echo "Logs retrieved from LOG_DIR successfully" break fi echo "Log retrieval attempt $i failed, retrying..." sleep 10 done + + # Also grab any slurm-*.out files from the working directory (fallback logs) + scp -i ~/.ssh/slurm_key -o StrictHostKeyChecking=no -o ConnectTimeout=30 \ + "${{ env.SLURM_USER }}@${{ env.SLURM_HOST }}:${FSDP_SLURM_DIR}/slurm-*.out" ./logs/ 2>/dev/null || true + + echo "Retrieved log files:" + ls -la ./logs/ 2>/dev/null || echo "No logs found" - name: Upload logs as artifacts if: always() @@ -239,7 +401,7 @@ EOF if [ -n "${{ env.JOB_ID }}" ]; then ssh -i ~/.ssh/slurm_key -o StrictHostKeyChecking=no \ ${{ env.SLURM_USER }}@${{ env.SLURM_HOST }} \ - "scancel ${{ env.JOB_ID }} 2>/dev/null || true" + "export PATH=/opt/slurm/bin:\$PATH; scancel ${{ env.JOB_ID }} 2>/dev/null || true" fi # Clean up directories @@ -248,7 +410,7 @@ EOF rm -rf ${{ env.REMOTE_TEST_PATH }} rm -rf ${{ env.LOG_DIR }} rm -rf ${{ env.CHECKPOINT_DIR }} -EOF + EOF rm -rf ./logs echo "Cleanup completed!" diff --git a/.github/workflows/megatron-ci-slurm.yaml b/.github/workflows/megatron-ci-slurm.yaml index c341f6841..45652f306 100644 --- a/.github/workflows/megatron-ci-slurm.yaml +++ b/.github/workflows/megatron-ci-slurm.yaml @@ -61,7 +61,7 @@ jobs: # Add host to known hosts with retry for i in {1..5}; do - if ssh-keyscan -H ${{ env.SLURM_HOST }} >> ~/.ssh/known_hosts 2>/dev/null; then + if ssh-keyscan -H "${{ env.SLURM_HOST }}" >> ~/.ssh/known_hosts 2>/dev/null; then echo "SSH keyscan successful" break fi @@ -70,8 +70,8 @@ jobs: done REMOTE_BUILD_PATH="${{ env.BASE_PATH }}/megatron-builds/${{ github.run_id }}-${{ matrix.cluster }}" - echo "remote_build_path=$REMOTE_BUILD_PATH" >> $GITHUB_OUTPUT - echo "REMOTE_BUILD_PATH=$REMOTE_BUILD_PATH" >> $GITHUB_ENV + echo "remote_build_path=$REMOTE_BUILD_PATH" >> "$GITHUB_OUTPUT" + echo "REMOTE_BUILD_PATH=$REMOTE_BUILD_PATH" >> "$GITHUB_ENV" - name: Transfer Code to Cluster run: | @@ -99,7 +99,9 @@ jobs: ENROOT_IMAGE="${{ env.BASE_PATH }}/enroot-images/megatron-${{ github.run_id }}-${{ matrix.cluster }}.sqsh" echo "Building Megatron-LM image on cluster..." + # shellcheck disable=SC2087 ssh -i ~/.ssh/slurm_key -o StrictHostKeyChecking=no -o ConnectTimeout=30 \ + -o ServerAliveInterval=60 -o ServerAliveCountMax=5 \ ${{ env.SLURM_USER }}@${{ env.SLURM_HOST }} << EOF set -e cd $MEGATRON_DIR @@ -111,9 +113,9 @@ jobs: echo "Enroot image created at: $ENROOT_IMAGE" # Clean up Docker image to save space docker rmi aws-megatron-lm || true -EOF + EOF - echo "enroot_image_path=$ENROOT_IMAGE" >> $GITHUB_OUTPUT + echo "enroot_image_path=$ENROOT_IMAGE" >> "$GITHUB_OUTPUT" echo "Container build completed successfully!" - name: Verify Container Build @@ -129,7 +131,7 @@ EOF echo "✗ Enroot image not found: \$ENROOT_IMAGE" exit 1 fi -EOF + EOF - name: Cleanup Build Directory if: always() @@ -163,7 +165,7 @@ EOF mkdir -p ~/.ssh echo "${{ secrets.SLURM_SSH_KEY }}" > ~/.ssh/slurm_key chmod 600 ~/.ssh/slurm_key - ssh-keyscan -H ${{ env.SLURM_HOST }} >> ~/.ssh/known_hosts 2>/dev/null || true + ssh-keyscan -H "${{ env.SLURM_HOST }}" >> ~/.ssh/known_hosts 2>/dev/null || true - name: Remove Enroot Images run: | @@ -176,4 +178,4 @@ EOF echo "Enroot images cleaned up" # List remaining images for verification ls -lh ${{ env.BASE_PATH }}/enroot-images/ | grep megatron || echo "No megatron images remaining" -EOF + EOF diff --git a/.github/workflows/pr-lint.yml b/.github/workflows/pr-lint.yml new file mode 100644 index 000000000..fb59e6fb5 --- /dev/null +++ b/.github/workflows/pr-lint.yml @@ -0,0 +1,101 @@ +name: PR Lint + +on: + pull_request: + branches: + - main + types: [opened, synchronize, reopened] + +permissions: + contents: read + +jobs: + lint: + name: Lint Changed Files + runs-on: ubuntu-latest + timeout-minutes: 10 + + steps: + - name: Checkout PR Code + uses: actions/checkout@v4 + with: + fetch-depth: 0 + ref: ${{ github.event.pull_request.head.sha }} + + - name: Get Changed Files + id: changed + run: | + # Get the list of files changed in this PR + MERGE_BASE=$(git merge-base "${{ github.event.pull_request.base.sha }}" "${{ github.event.pull_request.head.sha }}") + + PY_FILES=$(git diff --name-only --diff-filter=ACMR "$MERGE_BASE" -- '*.py' || true) + SH_FILES=$(git diff --name-only --diff-filter=ACMR "$MERGE_BASE" -- '*.sh' || true) + + { + echo "py_files<> "$GITHUB_OUTPUT" + + { + echo "sh_files<> "$GITHUB_OUTPUT" + + PY_COUNT=$(echo "$PY_FILES" | grep -c '.' || true) + SH_COUNT=$(echo "$SH_FILES" | grep -c '.' || true) + echo "py_count=$PY_COUNT" >> "$GITHUB_OUTPUT" + echo "sh_count=$SH_COUNT" >> "$GITHUB_OUTPUT" + + echo "Changed Python files: $PY_COUNT" + echo "Changed Shell files: $SH_COUNT" + + - name: Setup Python + if: steps.changed.outputs.py_count != '0' + uses: actions/setup-python@v5 + with: + python-version: '3.11' + + - name: Lint Python Files + if: steps.changed.outputs.py_count != '0' + run: | + pip install flake8 + + echo "Linting changed Python files..." + echo "${{ steps.changed.outputs.py_files }}" | while IFS= read -r f; do + [ -z "$f" ] && continue + [ -f "$f" ] || continue + echo " $f" + done + + # Run flake8 on changed files only + # E501: line too long (common in training scripts) + # W503: line break before binary operator (PEP8 changed stance) + # E402: module level import not at top of file (common in conditional imports) + echo "${{ steps.changed.outputs.py_files }}" | tr '\n' '\0' | xargs -0 -r \ + flake8 --max-line-length=120 --count --show-source \ + --ignore=E501,W503,E402 \ + --statistics + + - name: Check Shell Script Syntax + if: steps.changed.outputs.sh_count != '0' + run: | + echo "Checking shell script syntax..." + ERRORS=0 + + echo "${{ steps.changed.outputs.sh_files }}" | while IFS= read -r f; do + [ -z "$f" ] && continue + [ -f "$f" ] || continue + echo " Checking: $f" + if ! bash -n "$f" 2>&1; then + ERRORS=$((ERRORS + 1)) + fi + done + + if [ "$ERRORS" -gt 0 ]; then + echo "Found syntax errors in $ERRORS shell script(s)" + exit 1 + fi + + echo "All shell scripts passed syntax check" diff --git a/.github/workflows/pr-review-and-slurm-test.yml b/.github/workflows/pr-review-and-slurm-test.yml deleted file mode 100644 index e957bb204..000000000 --- a/.github/workflows/pr-review-and-slurm-test.yml +++ /dev/null @@ -1,820 +0,0 @@ -name: PR Review and Slurm Test - -on: - pull_request: - branches: - - main - types: [opened, synchronize, reopened] - -env: - AWS_REGION: us-east-1 - SLURM_HOST: p5en.smml.aiml.aws.dev - SLURM_USER: ghactions - RESULTS_PATH: /fsx/agents/pr-reviews/awsome-distributed-training - AWS_ROLE_ARN: arn:aws:iam::159553542841:role/awslabs-AOSH-GitHubActionsRole - -permissions: - id-token: write - contents: read - pull-requests: read - -jobs: - code-review: - name: Code Review and Analysis - runs-on: ubuntu-latest - timeout-minutes: 30 - - steps: - - name: Checkout PR Code - uses: actions/checkout@v4 - with: - fetch-depth: 0 - ref: ${{ github.event.pull_request.head.sha }} - - - name: Set up Python - uses: actions/setup-python@v5 - with: - python-version: '3.11' - - - name: Install Code Analysis Tools - run: | - pip install pylint flake8 bandit semgrep - npm install -g @microsoft/eslint-formatter-sarif - - - name: Run Static Analysis - id: static-analysis - run: | - echo "::group::Running Static Analysis" - - # Create results directory - mkdir -p review-results - - # Python linting - if find . -name "*.py" -type f | grep -q .; then - echo "Checking Python files..." - pylint --output-format=json $(find . -name "*.py" -type f) > review-results/pylint-results.json 2>/dev/null || true - flake8 --format=json --output-file=review-results/flake8-results.json . 2>/dev/null || true - fi - - # Shell script checking - if find . -name "*.sh" -type f | grep -q .; then - echo "Checking Shell scripts..." - for script in $(find . -name "*.sh" -type f); do - bash -n "$script" 2>&1 | tee -a review-results/shell-check.log || true - done - fi - - # Check for common errors - echo "Checking for common issues..." - - # Check for hardcoded secrets - if grep -r -i "password\|secret\|token\|key" --include="*.py" --include="*.sh" --include="*.json" . | grep -v "example\|test\|mock" | grep -E "(=|:).*[a-zA-Z0-9]{20,}" > review-results/potential-secrets.log 2>/dev/null; then - echo "⚠️ Potential hardcoded secrets found" >> review-results/issues.log - fi - - # Check for syntax errors in Python - python -m py_compile $(find . -name "*.py" -type f) 2>&1 | tee review-results/python-syntax-errors.log || true - - echo "::endgroup::" - - - name: Check for Incompatibilities - id: compatibility-check - run: | - echo "::group::Checking for Incompatibilities" - - # Check for API compatibility issues - if [ -f "requirements.txt" ]; then - echo "Checking requirements.txt for version conflicts..." - pip install --dry-run -r requirements.txt 2>&1 | tee review-results/pip-conflicts.log || true - fi - - # Check for deprecated functions - if find . -name "*.py" -type f | grep -q .; then - grep -r "deprecated\|DeprecationWarning" --include="*.py" . > review-results/deprecation-warnings.log 2>/dev/null || true - fi - - # Check Dockerfile syntax - if [ -f "Dockerfile" ]; then - echo "Checking Dockerfile..." - docker build --dry-run -t test-build . 2>&1 | tee review-results/dockerfile-check.log || true - fi - - echo "::endgroup::" - - - name: Upload Review Results - uses: actions/upload-artifact@v4 - with: - name: code-review-results - path: review-results/ - retention-days: 30 - - security-scan: - name: Security Best Practices Review - runs-on: ubuntu-latest - timeout-minutes: 30 - - steps: - - name: Checkout PR Code - uses: actions/checkout@v4 - with: - fetch-depth: 0 - ref: ${{ github.event.pull_request.head.sha }} - - - name: Run Security Scans - id: security-scan - run: | - echo "::group::Running Security Scans" - - mkdir -p security-results - - # Bandit for Python security issues - if find . -name "*.py" -type f | grep -q .; then - pip install bandit - bandit -r . -f json -o security-results/bandit-results.json || true - bandit -r . -f txt -o security-results/bandit-report.txt || true - fi - - # Check for hardcoded credentials - if command -v trufflehog &> /dev/null; then - trufflehog filesystem . --json > security-results/trufflehog-results.json 2>/dev/null || true - else - # Manual check for common patterns - grep -r -E "(password|passwd|pwd)\s*=\s*[\"'][^\"']{8,}[\"']" --include="*.py" --include="*.sh" --include="*.json" --include="*.yaml" --include="*.yml" . > security-results/credentials-check.log 2>/dev/null || true - grep -r -E "(api_key|apikey|api-key)\s*=\s*[\"'][^\"']{10,}[\"']" --include="*.py" --include="*.sh" --include="*.json" --include="*.yaml" --include="*.yml" . >> security-results/credentials-check.log 2>/dev/null || true - grep -r -E "(secret|token)\s*=\s*[\"'][^\"']{15,}[\"']" --include="*.py" --include="*.sh" --include="*.json" --include="*.yaml" --include="*.yml" . >> security-results/credentials-check.log 2>/dev/null || true - fi - - # Check for insecure configurations - echo "Checking for insecure configurations..." - - # Check for HTTP instead of HTTPS - grep -r "http://" --include="*.py" --include="*.sh" --include="*.json" --include="*.yaml" --include="*.yml" --include="*.tf" . | grep -v "localhost\|127.0.0.1\|example.com" > security-results/insecure-http.log 2>/dev/null || true - - # Check for overly permissive file permissions in scripts - if [ -f "install.sh" ]; then - if grep -E "chmod.*777|chmod.*a\+rw" install.sh > /dev/null 2>&1; then - echo "⚠️ Overly permissive file permissions found in install.sh" >> security-results/permission-issues.log - fi - fi - - # Check for eval/exec of user input - grep -r "eval\|exec" --include="*.py" --include="*.sh" . | grep -v "# " | head -20 > security-results/code-execution-risks.log 2>/dev/null || true - - # Check Dockerfile security - if [ -f "Dockerfile" ]; then - # Check for running as root - if ! grep -q "USER" Dockerfile; then - echo "⚠️ Dockerfile does not specify USER - container runs as root" >> security-results/dockerfile-security.log - fi - - # Check for latest tag usage - if grep -E "FROM.*:latest" Dockerfile > /dev/null 2>&1; then - echo "⚠️ Dockerfile uses 'latest' tag - use specific versions" >> security-results/dockerfile-security.log - fi - - # Check for ADD vs COPY - if grep -E "^ADD" Dockerfile > /dev/null 2>&1; then - echo "⚠️ Dockerfile uses ADD - prefer COPY for better security" >> security-results/dockerfile-security.log - fi - fi - - echo "::endgroup::" - - - name: Generate Security Report - run: | - echo "::group::Generating Security Report" - - cat > security-results/security-report.md << 'EOF' - # Security Review Report - - ## Summary - EOF - - # Count issues - CRITICAL=0 - HIGH=0 - MEDIUM=0 - LOW=0 - - if [ -f "security-results/bandit-results.json" ]; then - CRITICAL=$(jq '[.results[] | select(.issue_severity == "CRITICAL")] | length' security-results/bandit-results.json 2>/dev/null || echo 0) - HIGH=$(jq '[.results[] | select(.issue_severity == "HIGH")] | length' security-results/bandit-results.json 2>/dev/null || echo 0) - MEDIUM=$(jq '[.results[] | select(.issue_severity == "MEDIUM")] | length' security-results/bandit-results.json 2>/dev/null || echo 0) - LOW=$(jq '[.results[] | select(.issue_severity == "LOW")] | length' security-results/bandit-results.json 2>/dev/null || echo 0) - fi - - cat >> security-results/security-report.md << EOF - - Critical Issues: $CRITICAL - - High Issues: $HIGH - - Medium Issues: $MEDIUM - - Low Issues: $LOW - - ## Detailed Findings - EOF - - if [ -f "security-results/bandit-report.txt" ]; then - cat >> security-results/security-report.md << EOF - - ### Bandit Security Scan - \`\`\` - $(cat security-results/bandit-report.txt) - \`\`\` - EOF - fi - - if [ -f "security-results/credentials-check.log" ] && [ -s "security-results/credentials-check.log" ]; then - cat >> security-results/security-report.md << EOF - - ### Potential Hardcoded Credentials - \`\`\` - $(cat security-results/credentials-check.log) - \`\`\` - EOF - fi - - if [ -f "security-results/dockerfile-security.log" ]; then - cat >> security-results/security-report.md << EOF - - ### Dockerfile Security Issues - \`\`\` - $(cat security-results/dockerfile-security.log) - \`\`\` - EOF - fi - - echo "::endgroup::" - - - name: Upload Security Results - uses: actions/upload-artifact@v4 - with: - name: security-scan-results - path: security-results/ - retention-days: 30 - - - name: Check Security Gate - run: | - if [ -f "security-results/bandit-results.json" ]; then - CRITICAL=$(jq '[.results[] | select(.issue_severity == "CRITICAL")] | length' security-results/bandit-results.json 2>/dev/null || echo 0) - if [ "$CRITICAL" -gt 0 ]; then - echo "❌ Critical security issues found!" - exit 1 - fi - fi - echo "✅ No critical security issues found" - - version-check: - name: Version Requirements Check - runs-on: ubuntu-latest - timeout-minutes: 15 - - steps: - - name: Checkout PR Code - uses: actions/checkout@v4 - with: - fetch-depth: 0 - ref: ${{ github.event.pull_request.head.sha }} - - - name: Check Driver and Library Versions - id: version-check - run: | - echo "::group::Checking Version Requirements" - - mkdir -p version-results - - # Define minimum versions - MIN_EFA="1.47.0" - MIN_NCCL="2.28" - MIN_CUDA="13.0" - - echo "Minimum required versions:" > version-results/version-report.txt - echo " EFA Installer: $MIN_EFA" >> version-results/version-report.txt - echo " NCCL: $MIN_NCCL" >> version-results/version-report.txt - echo " CUDA: $MIN_CUDA" >> version-results/version-report.txt - echo "" >> version-results/version-report.txt - - VIOLATIONS=0 - - # Check Dockerfile - if [ -f "Dockerfile" ]; then - echo "Checking Dockerfile..." >> version-results/version-report.txt - - # Check for EFA - if grep -i "efa" Dockerfile > /dev/null 2>&1; then - EFA_VERSION=$(grep -i "efa" Dockerfile | grep -oE "[0-9]+\.[0-9]+\.[0-9]+" | head -1) - if [ -n "$EFA_VERSION" ]; then - echo " Found EFA version: $EFA_VERSION" >> version-results/version-report.txt - if [ "$(printf '%s\n' "$MIN_EFA" "$EFA_VERSION" | sort -V | head -n1)" != "$MIN_EFA" ]; then - echo " ❌ EFA version $EFA_VERSION is below minimum $MIN_EFA" >> version-results/version-report.txt - VIOLATIONS=$((VIOLATIONS + 1)) - else - echo " ✅ EFA version $EFA_VERSION meets requirement" >> version-results/version-report.txt - fi - fi - fi - - # Check for NCCL - if grep -i "nccl" Dockerfile > /dev/null 2>&1; then - NCCL_VERSION=$(grep -i "nccl" Dockerfile | grep -oE "[0-9]+\.[0-9]+(\.[0-9]+)?" | head -1) - if [ -n "$NCCL_VERSION" ]; then - echo " Found NCCL version: $NCCL_VERSION" >> version-results/version-report.txt - if [ "$(printf '%s\n' "$MIN_NCCL" "$NCCL_VERSION" | sort -V | head -n1)" != "$MIN_NCCL" ]; then - echo " ❌ NCCL version $NCCL_VERSION is below minimum $MIN_NCCL" >> version-results/version-report.txt - VIOLATIONS=$((VIOLATIONS + 1)) - else - echo " ✅ NCCL version $NCCL_VERSION meets requirement" >> version-results/version-report.txt - fi - fi - fi - - # Check for CUDA - if grep -i "cuda" Dockerfile > /dev/null 2>&1; then - CUDA_VERSION=$(grep -i "cuda" Dockerfile | grep -oE "[0-9]+\.[0-9]+" | head -1) - if [ -n "$CUDA_VERSION" ]; then - echo " Found CUDA version: $CUDA_VERSION" >> version-results/version-report.txt - if [ "$(printf '%s\n' "$MIN_CUDA" "$CUDA_VERSION" | sort -V | head -n1)" != "$MIN_CUDA" ]; then - echo " ❌ CUDA version $CUDA_VERSION is below minimum $MIN_CUDA" >> version-results/version-report.txt - VIOLATIONS=$((VIOLATIONS + 1)) - else - echo " ✅ CUDA version $CUDA_VERSION meets requirement" >> version-results/version-report.txt - fi - fi - fi - fi - - # Check requirements.txt - if [ -f "requirements.txt" ]; then - echo "" >> version-results/version-report.txt - echo "Checking requirements.txt..." >> version-results/version-report.txt - - # Check for relevant packages - if grep -i "nvidia\|cuda\|cupy" requirements.txt > /dev/null 2>&1; then - grep -i "nvidia\|cuda\|cupy" requirements.txt >> version-results/version-report.txt - fi - fi - - # Check sbatch scripts - for script in $(find . -name "*.sbatch" -o -name "*.sh" | xargs grep -l "sbatch\|srun" 2>/dev/null); do - echo "" >> version-results/version-report.txt - echo "Checking sbatch script: $script" >> version-results/version-report.txt - - # Check for module loads - if grep -E "module load|module use" "$script" > /dev/null 2>&1; then - grep -E "module load|module use" "$script" >> version-results/version-report.txt - - # Check for specific module versions - if grep -i "efa" "$script" | grep -oE "[0-9]+\.[0-9]+" > /dev/null 2>&1; then - EFA_MOD=$(grep -i "efa" "$script" | grep -oE "[0-9]+\.[0-9]+" | head -1) - if [ -n "$EFA_MOD" ]; then - if [ "$(printf '%s\n' "$MIN_EFA" "$EFA_MOD" | sort -V | head -n1)" != "$MIN_EFA" ]; then - echo " ❌ EFA module version $EFA_MOD is below minimum $MIN_EFA" >> version-results/version-report.txt - VIOLATIONS=$((VIOLATIONS + 1)) - fi - fi - fi - fi - done - - # Check for environment configuration files - if [ -f "environment.yml" ]; then - echo "" >> version-results/version-report.txt - echo "Checking environment.yml..." >> version-results/version-report.txt - grep -E "cuda|nccl|efa" environment.yml >> version-results/version-report.txt 2>/dev/null || true - fi - - if [ -f "pyproject.toml" ]; then - echo "" >> version-results/version-report.txt - echo "Checking pyproject.toml..." >> version-results/version-report.txt - grep -E "cuda|nccl|efa" pyproject.toml >> version-results/version-report.txt 2>/dev/null || true - fi - - # Create JSON report - cat > version-results/version-check.json << EOF - { - "timestamp": "$(date -u +"%Y-%m-%dT%H:%M:%SZ")", - "pr_number": "${{ github.event.pull_request.number }}", - "requirements": { - "efa_minimum": "$MIN_EFA", - "nccl_minimum": "$MIN_NCCL", - "cuda_minimum": "$MIN_CUDA" - }, - "violations": $VIOLATIONS, - "status": "$([ $VIOLATIONS -eq 0 ] && echo "PASS" || echo "FAIL")" - } - EOF - - echo "" >> version-results/version-report.txt - echo "Total violations: $VIOLATIONS" >> version-results/version-report.txt - - cat version-results/version-report.txt - echo "::endgroup::" - - if [ $VIOLATIONS -gt 0 ]; then - echo "❌ Version requirements not met" - exit 1 - fi - - - name: Upload Version Check Results - uses: actions/upload-artifact@v4 - with: - name: version-check-results - path: version-results/ - retention-days: 30 - - slurm-test: - name: Slurm Cluster Testing - runs-on: ubuntu-latest - timeout-minutes: 130 - needs: [code-review, security-scan, version-check] - if: ${{ always() && needs.code-review.result == 'success' && needs.security-scan.result == 'success' && needs.version-check.result == 'success' }} - - steps: - - name: Checkout PR Code - uses: actions/checkout@v4 - with: - fetch-depth: 0 - ref: ${{ github.event.pull_request.head.sha }} - - - name: Configure AWS Credentials - uses: aws-actions/configure-aws-credentials@v4 - with: - role-to-assume: ${{ env.AWS_ROLE_ARN }} - aws-region: ${{ env.AWS_REGION }} - - - name: Setup SSH Key - run: | - mkdir -p ~/.ssh - echo "${{ secrets.SLURM_SSH_KEY }}" > ~/.ssh/slurm_key - chmod 600 ~/.ssh/slurm_key - ssh-keyscan -H ${{ env.SLURM_HOST }} >> ~/.ssh/known_hosts 2>/dev/null || true - - - name: Prepare Test Environment - run: | - echo "::group::Preparing Test Environment" - - # Create test directory name - TEST_DIR="pr-${{ github.event.pull_request.number }}-$(date +%Y%m%d-%H%M%S)" - echo "TEST_DIR=$TEST_DIR" >> $GITHUB_ENV - echo "RESULTS_FILE=${{ github.event.pull_request.number }}-$(date +%Y%m%d)-results.json" >> $GITHUB_ENV - - # Create local test directory - mkdir -p test-artifacts - - # Copy PR code to test directory - cp -r . test-artifacts/source-code - - echo "Test directory: $TEST_DIR" - echo "Results file: ${{ github.event.pull_request.number }}-$(date +%Y%m%d)-results.json" - - echo "::endgroup::" - - - name: Transfer Code to Slurm Cluster - run: | - echo "::group::Transferring Code to Slurm Cluster" - - # Create remote directory - ssh -i ~/.ssh/slurm_key -o StrictHostKeyChecking=no ${{ env.SLURM_USER }}@${{ env.SLURM_HOST }} "mkdir -p ${{ env.RESULTS_PATH }}/${{ env.TEST_DIR }}" - - # Transfer code - scp -i ~/.ssh/slurm_key -o StrictHostKeyChecking=no -r test-artifacts/source-code/* ${{ env.SLURM_USER }}@${{ env.SLURM_HOST }}:${{ env.RESULTS_PATH }}/${{ env.TEST_DIR }}/ - - echo "Code transferred successfully" - echo "::endgroup::" - - - name: Execute Tests on Slurm - id: slurm-test - timeout-minutes: 120 - run: | - echo "::group::Executing Tests on Slurm Cluster" - - # Create test execution script - cat > test-artifacts/run-tests.sh << 'TESTSCRIPT' - #!/bin/bash - #SBATCH --job-name=pr-test-${{ github.event.pull_request.number }} - #SBATCH --nodes=8 - #SBATCH --ntasks-per-node=1 - #SBATCH --time=02:00:00 - #SBATCH --output=${{ env.RESULTS_PATH }}/${{ env.TEST_DIR }}/slurm-%j.out - #SBATCH --error=${{ env.RESULTS_PATH }}/${{ env.TEST_DIR }}/slurm-%j.err - - set -e - - cd ${{ env.RESULTS_PATH }}/${{ env.TEST_DIR }} - - # Initialize results JSON - cat > test-results.json << 'EOF' - { - "pr_number": "${{ github.event.pull_request.number }}", - "test_date": "$(date -u +"%Y-%m-%dT%H:%M:%SZ")", - "slurm_job_id": "$SLURM_JOB_ID", - "nodes_used": "$SLURM_NNODES", - "tests": {}, - "status": "RUNNING" - } - EOF - - # Check for README.md and follow instructions - if [ -f "README.md" ]; then - echo "Found README.md - following test instructions" - fi - - # Run unit tests if available - echo "Running unit tests..." - START_TIME=$(date +%s) - - if [ -f "requirements.txt" ]; then - pip install -r requirements.txt --user || true - fi - - if [ -f "setup.py" ] || [ -f "pyproject.toml" ]; then - pip install -e . --user || true - fi - - UNIT_TEST_OUTPUT="" - UNIT_TEST_STATUS="SKIPPED" - - if [ -f "pytest.ini" ] || [ -d "tests" ] || find . -name "test_*.py" -type f | grep -q .; then - pip install pytest --user || true - if python3 -m pytest --tb=short -v > unit-test-output.log 2>&1; then - UNIT_TEST_STATUS="PASSED" - UNIT_TEST_OUTPUT=$(cat unit-test-output.log) - else - UNIT_TEST_STATUS="FAILED" - UNIT_TEST_OUTPUT=$(cat unit-test-output.log) - fi - elif [ -f "Makefile" ] && grep -q "test" Makefile; then - if make test > unit-test-output.log 2>&1; then - UNIT_TEST_STATUS="PASSED" - UNIT_TEST_OUTPUT=$(cat unit-test-output.log) - else - UNIT_TEST_STATUS="FAILED" - UNIT_TEST_OUTPUT=$(cat unit-test-output.log) - fi - fi - - END_TIME=$(date +%s) - DURATION=$((END_TIME - START_TIME)) - - # Update results - python3 << PYEOF - import json - - with open('test-results.json', 'r') as f: - data = json.load(f) - - data['tests']['unit_tests'] = { - 'status': '$UNIT_TEST_STATUS', - 'duration_seconds': $DURATION, - 'output': """$UNIT_TEST_OUTPUT""" - } - - with open('test-results.json', 'w') as f: - json.dump(data, f, indent=2) - PYEOF - - # Run execution tests - echo "Running execution tests..." - START_TIME=$(date +%s) - - EXEC_TEST_OUTPUT="" - EXEC_TEST_STATUS="SKIPPED" - - # Check for execution test scripts - if [ -f "execute.py" ]; then - if python3 execute.py > exec-test-output.log 2>&1; then - EXEC_TEST_STATUS="PASSED" - EXEC_TEST_OUTPUT=$(cat exec-test-output.log) - else - EXEC_TEST_STATUS="FAILED" - EXEC_TEST_OUTPUT=$(cat exec-test-output.log) - fi - elif [ -f "run.sh" ]; then - chmod +x run.sh - if ./run.sh > exec-test-output.log 2>&1; then - EXEC_TEST_STATUS="PASSED" - EXEC_TEST_OUTPUT=$(cat exec-test-output.log) - else - EXEC_TEST_STATUS="FAILED" - EXEC_TEST_OUTPUT=$(cat exec-test-output.log) - fi - elif [ -f "main.py" ]; then - if python3 main.py > exec-test-output.log 2>&1; then - EXEC_TEST_STATUS="PASSED" - EXEC_TEST_OUTPUT=$(cat exec-test-output.log) - else - EXEC_TEST_STATUS="FAILED" - EXEC_TEST_OUTPUT=$(cat exec-test-output.log) - fi - fi - - END_TIME=$(date +%s) - DURATION=$((END_TIME - START_TIME)) - - # Update results - python3 << PYEOF - import json - - with open('test-results.json', 'r') as f: - data = json.load(f) - - data['tests']['execution_tests'] = { - 'status': '$EXEC_TEST_STATUS', - 'duration_seconds': $DURATION, - 'output': """$EXEC_TEST_OUTPUT""" - } - - # Determine overall status - overall_status = "PASSED" - for test_name, test_data in data['tests'].items(): - if test_data['status'] == 'FAILED': - overall_status = 'FAILED' - break - elif test_data['status'] == 'SKIPPED': - overall_status = 'PARTIAL' - - data['status'] = overall_status - data['completion_time'] = "$(date -u +"%Y-%m-%dT%H:%M:%SZ")" - - with open('test-results.json', 'w') as f: - json.dump(data, f, indent=2) - PYEOF - - # Copy results to final location - cp test-results.json ${{ env.RESULTS_PATH }}/${{ env.RESULTS_FILE }} - - echo "Tests completed. Results saved to: ${{ env.RESULTS_PATH }}/${{ env.RESULTS_FILE }}" - - # Exit with error if tests failed - if [ "$UNIT_TEST_STATUS" == "FAILED" ] || [ "$EXEC_TEST_STATUS" == "FAILED" ]; then - exit 1 - fi - TESTSCRIPT - - chmod +x test-artifacts/run-tests.sh - - # Transfer and submit job - scp -i ~/.ssh/slurm_key -o StrictHostKeyChecking=no test-artifacts/run-tests.sh ${{ env.SLURM_USER }}@${{ env.SLURM_HOST }}:${{ env.RESULTS_PATH }}/${{ env.TEST_DIR }}/ - - # Submit job and capture job ID - JOB_ID=$(ssh -i ~/.ssh/slurm_key -o StrictHostKeyChecking=no ${{ env.SLURM_USER }}@${{ env.SLURM_HOST }} "cd ${{ env.RESULTS_PATH }}/${{ env.TEST_DIR }} && sbatch run-tests.sh" | grep -oE '[0-9]+') - - echo "Submitted Slurm job: $JOB_ID" - echo "SLURM_JOB_ID=$JOB_ID" >> $GITHUB_ENV - - # Wait for job completion with timeout - echo "Waiting for job completion (timeout: 120 minutes)..." - START_WAIT=$(date +%s) - - while true; do - CURRENT_TIME=$(date +%s) - ELAPSED=$((CURRENT_TIME - START_WAIT)) - - if [ $ELAPSED -gt 7200 ]; then - echo "Timeout reached - cancelling job" - ssh -i ~/.ssh/slurm_key -o StrictHostKeyChecking=no ${{ env.SLURM_USER }}@${{ env.SLURM_HOST }} "scancel $JOB_ID" - exit 1 - fi - - JOB_STATUS=$(ssh -i ~/.ssh/slurm_key -o StrictHostKeyChecking=no ${{ env.SLURM_USER }}@${{ env.SLURM_HOST }} "squeue -j $JOB_ID -h -o %T" 2>/dev/null || echo "COMPLETED") - - if [ -z "$JOB_STATUS" ] || [ "$JOB_STATUS" == "COMPLETED" ]; then - echo "Job completed" - break - elif [ "$JOB_STATUS" == "FAILED" ] || [ "$JOB_STATUS" == "CANCELLED" ] || [ "$JOB_STATUS" == "TIMEOUT" ]; then - echo "Job failed with status: $JOB_STATUS" - exit 1 - fi - - echo "Job status: $JOB_STATUS (elapsed: $((ELAPSED / 60)) minutes)" - sleep 60 - done - - echo "::endgroup::" - - - name: Retrieve Test Results - if: always() - run: | - echo "::group::Retrieving Test Results" - - # Create local results directory - mkdir -p test-results - - # Copy results from Slurm cluster - scp -i ~/.ssh/slurm_key -o StrictHostKeyChecking=no ${{ env.SLURM_USER }}@${{ env.SLURM_HOST }}:${{ env.RESULTS_PATH }}/${{ env.TEST_DIR }}/test-results.json test-results/ 2>/dev/null || true - scp -i ~/.ssh/slurm_key -o StrictHostKeyChecking=no ${{ env.SLURM_USER }}@${{ env.SLURM_HOST }}:${{ env.RESULTS_PATH }}/${{ env.TEST_DIR }}/slurm-*.out test-results/ 2>/dev/null || true - scp -i ~/.ssh/slurm_key -o StrictHostKeyChecking=no ${{ env.SLURM_USER }}@${{ env.SLURM_HOST }}:${{ env.RESULTS_PATH }}/${{ env.TEST_DIR }}/slurm-*.err test-results/ 2>/dev/null || true - - # Copy the final results file - scp -i ~/.ssh/slurm_key -o StrictHostKeyChecking=no ${{ env.SLURM_USER }}@${{ env.SLURM_HOST }}:${{ env.RESULTS_PATH }}/${{ env.RESULTS_FILE }} test-results/ 2>/dev/null || true - - # Display results - if [ -f "test-results/test-results.json" ]; then - echo "Test Results:" - cat test-results/test-results.json | python3 -m json.tool || cat test-results/test-results.json - fi - - echo "::endgroup::" - - - name: Upload Test Results - if: always() - uses: actions/upload-artifact@v4 - with: - name: slurm-test-results - path: test-results/ - retention-days: 30 - - - name: Cleanup Slurm Resources - if: always() - run: | - echo "::group::Cleaning up Slurm Resources" - - # Cancel job if still running - if [ -n "${{ env.SLURM_JOB_ID }}" ]; then - ssh -i ~/.ssh/slurm_key -o StrictHostKeyChecking=no ${{ env.SLURM_USER }}@${{ env.SLURM_HOST }} "scancel ${{ env.SLURM_JOB_ID }}" 2>/dev/null || true - fi - - echo "Cleanup completed" - echo "::endgroup::" - - - name: Final Status Check - run: | - if [ -f "test-results/test-results.json" ]; then - STATUS=$(python3 -c "import json; data=json.load(open('test-results/test-results.json')); print(data.get('status', 'UNKNOWN'))") - if [ "$STATUS" == "FAILED" ]; then - echo "❌ Tests failed" - exit 1 - elif [ "$STATUS" == "PASSED" ]; then - echo "✅ All tests passed" - else - echo "⚠️ Tests status: $STATUS" - fi - else - echo "❌ No test results found" - exit 1 - fi - - notify-on-failure: - name: Send Failure Notification - runs-on: ubuntu-latest - needs: [code-review, security-scan, version-check, slurm-test] - if: failure() - - steps: - - name: Configure AWS Credentials - uses: aws-actions/configure-aws-credentials@v4 - with: - role-to-assume: ${{ env.AWS_ROLE_ARN }} - aws-region: ${{ env.AWS_REGION }} - - - name: Send Email Notification - run: | - # Get the email from GitHub secret to avoid exposing it in logs - NOTIFICATION_EMAIL="${{ secrets.NOTIFICATION_EMAIL }}" - - if [ -z "$NOTIFICATION_EMAIL" ]; then - echo "No notification email configured" - exit 0 - fi - - # Determine which job failed - FAILED_JOBS="" - if [ "${{ needs.code-review.result }}" == "failure" ]; then - FAILED_JOBS="$FAILED_JOBS- Code Review -" - fi - if [ "${{ needs.security-scan.result }}" == "failure" ]; then - FAILED_JOBS="$FAILED_JOBS- Security Scan -" - fi - if [ "${{ needs.version-check.result }}" == "failure" ]; then - FAILED_JOBS="$FAILED_JOBS- Version Check -" - fi - if [ "${{ needs.slurm-test.result }}" == "failure" ]; then - FAILED_JOBS="$FAILED_JOBS- Slurm Test -" - fi - - # Send email using AWS SES - aws ses send-email \ - --from "github-actions@aws.dev" \ - --to "$NOTIFICATION_EMAIL" \ - --subject "PR #${{ github.event.pull_request.number }} - Workflow Failed" \ - --text "Pull Request #${{ github.event.pull_request.number }} has failed workflow checks. - -Repository: ${{ github.repository }} -PR Title: ${{ github.event.pull_request.title }} -Author: ${{ github.event.pull_request.user.login }} -Branch: ${{ github.event.pull_request.head.ref }} - -Failed Jobs: -$FAILED_JOBS - -View details: ${{ github.event.pull_request.html_url }} - -Workflow Run: ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}" - - - name: Fallback Notification - if: failure() - run: | - echo "::warning::Workflow failed for PR #${{ github.event.pull_request.number }}" - echo "Failed jobs may require manual review" diff --git a/.gitignore b/.gitignore index 51347b993..1295f0e1e 100644 --- a/.gitignore +++ b/.gitignore @@ -168,3 +168,4 @@ micro-benchmarks/nccl-tests/slurm/find_bad_nodes/logs/analysis_summary_*.txt micro-benchmarks/nccl-tests/slurm/find_bad_nodes/logs/node_combinations_*.txt .testenv micro-benchmarks/nccl-tests/slurm/topology-aware-nccl-tests/debug_topology.json +log.failed diff --git a/3.test_cases/pytorch/FSDP/slurm/create_venv.sh b/3.test_cases/pytorch/FSDP/slurm/create_venv.sh index c300a0629..1fc3de720 100755 --- a/3.test_cases/pytorch/FSDP/slurm/create_venv.sh +++ b/3.test_cases/pytorch/FSDP/slurm/create_venv.sh @@ -16,7 +16,12 @@ else PYTHON_VERSION=$(python3 --version | awk '{print $2}' | awk -F'.' '{print $1"."$2}') fi -sudo apt install -y python$PYTHON_VERSION-venv +# Install venv package if sudo is available, otherwise assume pre-installed +if command -v sudo &>/dev/null && sudo -n true 2>/dev/null; then + sudo apt install -y python${PYTHON_VERSION}-venv +else + echo "No sudo access — assuming python${PYTHON_VERSION}-venv is pre-installed" +fi # Create and actiate Python virtual environment $PYTHON_V -m venv env diff --git a/3.test_cases/pytorch/FSDP/slurm/llama2_13b-training.sbatch b/3.test_cases/pytorch/FSDP/slurm/llama2_13b-training.sbatch index 23de071cf..d1c7daf59 100644 --- a/3.test_cases/pytorch/FSDP/slurm/llama2_13b-training.sbatch +++ b/3.test_cases/pytorch/FSDP/slurm/llama2_13b-training.sbatch @@ -32,7 +32,7 @@ export FSX_MOUNT=$(pwd):$DATA_PATH ## Plenty of EFA level variables ## For G4dn and other G5, comment out all #export FI_LOG_LEVEL=warn -export NCCL_DEBUG=INFO +export NCCL_DEBUG=WARN export FI_PROVIDER=efa export FI_EFA_USE_HUGE_PAGE=0 # Set to 0 when you see os.fork() causes OSError: Cannot allocate memory. Disabling huge page causes minor performance hit. ## Switching SYNC_MEMOPS to zero can boost throughput with FSDP @@ -40,14 +40,19 @@ export FI_EFA_USE_HUGE_PAGE=0 # Set to 0 when you see os.fork() causes OSErro ## Reduces memory synchronizations ## https://docs.nvidia.com/cuda/cuda-driver-api/group__CUDA__UNIFIED.html export FI_EFA_SET_CUDA_SYNC_MEMOPS=0 -# LD_PRELOAD is required for PyTorch to find the NCCL library -# This path assumes you are using the Deep Learning AMI -# If you are not using the DLAMI, you may need to update this path -export LD_PRELOAD=/usr/local/cuda-12.8/lib/libnccl.so +# LD_PRELOAD is required for PyTorch to find the system NCCL library +# which is built with EFA/OFI support. Without this, PyTorch may use +# its bundled NCCL which lacks EFA support. +export LD_PRELOAD=/lib/x86_64-linux-gnu/libnccl.so export NCCL_SOCKET_IFNAME=^docker,lo,veth,eth ## Set HuggingFace metadata timeout (in seconds) for large clusters export HF_HUB_ETAG_TIMEOUT=60 +## Use a shared cache on FSx so dataset metadata and tokenizer downloads +## are reused across runs and nodes, avoiding HuggingFace API rate limits. +export HF_HOME=/fsx/.cache/huggingface +export HF_DATASETS_CACHE=/fsx/.cache/huggingface/datasets +export HF_HUB_OFFLINE=1 ######################## diff --git a/3.test_cases/pytorch/FSDP/slurm/llama2_70b-training.sbatch b/3.test_cases/pytorch/FSDP/slurm/llama2_70b-training.sbatch index e1c8c1aa1..41ded5660 100644 --- a/3.test_cases/pytorch/FSDP/slurm/llama2_70b-training.sbatch +++ b/3.test_cases/pytorch/FSDP/slurm/llama2_70b-training.sbatch @@ -32,7 +32,7 @@ export FSX_MOUNT=$(pwd):$DATA_PATH ## Plenty of EFA level variables ## For G4dn and other G5, comment out all #export FI_LOG_LEVEL=warn -export NCCL_DEBUG=INFO +export NCCL_DEBUG=WARN export FI_PROVIDER=efa export FI_EFA_USE_HUGE_PAGE=0 # Set to 0 when you see os.fork() causes OSError: Cannot allocate memory. Disabling huge page causes minor performance hit. ## Switching SYNC_MEMOPS to zero can boost throughput with FSDP @@ -40,14 +40,19 @@ export FI_EFA_USE_HUGE_PAGE=0 # Set to 0 when you see os.fork() causes OSErro ## Reduces memory synchronizations ## https://docs.nvidia.com/cuda/cuda-driver-api/group__CUDA__UNIFIED.html export FI_EFA_SET_CUDA_SYNC_MEMOPS=0 -# LD_PRELOAD is required for PyTorch to find the NCCL library -# This path assumes you are using the Deep Learning AMI -# If you are not using the DLAMI, you may need to update this path -export LD_PRELOAD=/usr/local/cuda-12.8/lib/libnccl.so +# LD_PRELOAD is required for PyTorch to find the system NCCL library +# which is built with EFA/OFI support. Without this, PyTorch may use +# its bundled NCCL which lacks EFA support. +export LD_PRELOAD=/lib/x86_64-linux-gnu/libnccl.so export NCCL_SOCKET_IFNAME=^docker,lo,veth,eth ## Set HuggingFace metadata timeout (in seconds) for large clusters export HF_HUB_ETAG_TIMEOUT=60 +## Use a shared cache on FSx so dataset metadata and tokenizer downloads +## are reused across runs and nodes, avoiding HuggingFace API rate limits. +export HF_HOME=/fsx/.cache/huggingface +export HF_DATASETS_CACHE=/fsx/.cache/huggingface/datasets +export HF_HUB_OFFLINE=1 ######################## diff --git a/3.test_cases/pytorch/FSDP/slurm/llama2_7b-training.sbatch b/3.test_cases/pytorch/FSDP/slurm/llama2_7b-training.sbatch index 81870db75..f604d9945 100644 --- a/3.test_cases/pytorch/FSDP/slurm/llama2_7b-training.sbatch +++ b/3.test_cases/pytorch/FSDP/slurm/llama2_7b-training.sbatch @@ -32,7 +32,7 @@ export FSX_MOUNT=$(pwd):$DATA_PATH ## Plenty of EFA level variables ## For G4dn and other G5, comment out all #export FI_LOG_LEVEL=warn -export NCCL_DEBUG=INFO +export NCCL_DEBUG=WARN export FI_PROVIDER=efa export FI_EFA_USE_HUGE_PAGE=0 # Set to 0 when you see os.fork() causes OSError: Cannot allocate memory. Disabling huge page causes minor performance hit. ## Switching SYNC_MEMOPS to zero can boost throughput with FSDP @@ -40,14 +40,22 @@ export FI_EFA_USE_HUGE_PAGE=0 # Set to 0 when you see os.fork() causes OSErro ## Reduces memory synchronizations ## https://docs.nvidia.com/cuda/cuda-driver-api/group__CUDA__UNIFIED.html export FI_EFA_SET_CUDA_SYNC_MEMOPS=0 -# LD_PRELOAD is required for PyTorch to find the NCCL library -# This path assumes you are using the Deep Learning AMI -# If you are not using the DLAMI, you may need to update this path -export LD_PRELOAD=/usr/local/cuda-12.8/lib/libnccl.so +# LD_PRELOAD is required for PyTorch to find the system NCCL library +# which is built with EFA/OFI support. Without this, PyTorch may use +# its bundled NCCL which lacks EFA support. +export LD_PRELOAD=/lib/x86_64-linux-gnu/libnccl.so export NCCL_SOCKET_IFNAME=^docker,lo,veth,eth ## Set HuggingFace metadata timeout (in seconds) for large clusters export HF_HUB_ETAG_TIMEOUT=60 +## Use a shared cache on FSx so dataset metadata and tokenizer downloads +## are reused across runs and nodes, avoiding HuggingFace API rate limits. +export HF_HOME=/fsx/.cache/huggingface +export HF_DATASETS_CACHE=/fsx/.cache/huggingface/datasets +# Block all HuggingFace Hub API calls during training to prevent 429 rate +# limits from 128+ concurrent DataLoader workers. Tokenizers and dataset +# metadata must be pre-cached before running with this flag set. +export HF_HUB_OFFLINE=1 ######################## diff --git a/3.test_cases/pytorch/FSDP/slurm/llama3_1_70b-training.sbatch b/3.test_cases/pytorch/FSDP/slurm/llama3_1_70b-training.sbatch index 031988dc2..3ee7c2731 100644 --- a/3.test_cases/pytorch/FSDP/slurm/llama3_1_70b-training.sbatch +++ b/3.test_cases/pytorch/FSDP/slurm/llama3_1_70b-training.sbatch @@ -32,7 +32,7 @@ export FSX_MOUNT=$(pwd):$DATA_PATH ## Plenty of EFA level variables ## For G4dn and other G5, comment out all #export FI_LOG_LEVEL=warn -export NCCL_DEBUG=INFO +export NCCL_DEBUG=WARN export FI_PROVIDER=efa export FI_EFA_USE_HUGE_PAGE=0 # Set to 0 when you see os.fork() causes OSError: Cannot allocate memory. Disabling huge page causes minor performance hit. ## Switching SYNC_MEMOPS to zero can boost throughput with FSDP @@ -40,14 +40,19 @@ export FI_EFA_USE_HUGE_PAGE=0 # Set to 0 when you see os.fork() causes OSErro ## Reduces memory synchronizations ## https://docs.nvidia.com/cuda/cuda-driver-api/group__CUDA__UNIFIED.html export FI_EFA_SET_CUDA_SYNC_MEMOPS=0 -# LD_PRELOAD is required for PyTorch to find the NCCL library -# This path assumes you are using the Deep Learning AMI -# If you are not using the DLAMI, you may need to update this path -export LD_PRELOAD=/usr/local/cuda-12.8/lib/libnccl.so +# LD_PRELOAD is required for PyTorch to find the system NCCL library +# which is built with EFA/OFI support. Without this, PyTorch may use +# its bundled NCCL which lacks EFA support. +export LD_PRELOAD=/lib/x86_64-linux-gnu/libnccl.so export NCCL_SOCKET_IFNAME=^docker,lo,veth,eth ## Set HuggingFace metadata timeout (in seconds) for large clusters export HF_HUB_ETAG_TIMEOUT=60 +## Use a shared cache on FSx so dataset metadata and tokenizer downloads +## are reused across runs and nodes, avoiding HuggingFace API rate limits. +export HF_HOME=/fsx/.cache/huggingface +export HF_DATASETS_CACHE=/fsx/.cache/huggingface/datasets +export HF_HUB_OFFLINE=1 ######################## diff --git a/3.test_cases/pytorch/FSDP/slurm/llama3_1_8b-training.sbatch b/3.test_cases/pytorch/FSDP/slurm/llama3_1_8b-training.sbatch index 650388d43..0dab8c18d 100644 --- a/3.test_cases/pytorch/FSDP/slurm/llama3_1_8b-training.sbatch +++ b/3.test_cases/pytorch/FSDP/slurm/llama3_1_8b-training.sbatch @@ -32,7 +32,7 @@ export FSX_MOUNT=$(pwd):$DATA_PATH ## Plenty of EFA level variables ## For G4dn and other G5, comment out all #export FI_LOG_LEVEL=warn -export NCCL_DEBUG=INFO +export NCCL_DEBUG=WARN export FI_PROVIDER=efa export FI_EFA_USE_HUGE_PAGE=0 # Set to 0 when you see os.fork() causes OSError: Cannot allocate memory. Disabling huge page causes minor performance hit. ## Switching SYNC_MEMOPS to zero can boost throughput with FSDP @@ -40,14 +40,19 @@ export FI_EFA_USE_HUGE_PAGE=0 # Set to 0 when you see os.fork() causes OSErro ## Reduces memory synchronizations ## https://docs.nvidia.com/cuda/cuda-driver-api/group__CUDA__UNIFIED.html export FI_EFA_SET_CUDA_SYNC_MEMOPS=0 -# LD_PRELOAD is required for PyTorch to find the NCCL library -# This path assumes you are using the Deep Learning AMI -# If you are not using the DLAMI, you may need to update this path -export LD_PRELOAD=/usr/local/cuda-12.8/lib/libnccl.so +# LD_PRELOAD is required for PyTorch to find the system NCCL library +# which is built with EFA/OFI support. Without this, PyTorch may use +# its bundled NCCL which lacks EFA support. +export LD_PRELOAD=/lib/x86_64-linux-gnu/libnccl.so export NCCL_SOCKET_IFNAME=^docker,lo,veth,eth ## Set HuggingFace metadata timeout (in seconds) for large clusters export HF_HUB_ETAG_TIMEOUT=60 +## Use a shared cache on FSx so dataset metadata and tokenizer downloads +## are reused across runs and nodes, avoiding HuggingFace API rate limits. +export HF_HOME=/fsx/.cache/huggingface +export HF_DATASETS_CACHE=/fsx/.cache/huggingface/datasets +export HF_HUB_OFFLINE=1 ######################## diff --git a/3.test_cases/pytorch/FSDP/slurm/llama3_2_1b-training.sbatch b/3.test_cases/pytorch/FSDP/slurm/llama3_2_1b-training.sbatch index f4c93ffb7..56bfa8b78 100644 --- a/3.test_cases/pytorch/FSDP/slurm/llama3_2_1b-training.sbatch +++ b/3.test_cases/pytorch/FSDP/slurm/llama3_2_1b-training.sbatch @@ -32,7 +32,7 @@ export FSX_MOUNT=$(pwd):$DATA_PATH ## Plenty of EFA level variables ## For G4dn and other G5, comment out all #export FI_LOG_LEVEL=warn -export NCCL_DEBUG=INFO +export NCCL_DEBUG=WARN export FI_PROVIDER=efa export FI_EFA_USE_HUGE_PAGE=0 # Set to 0 when you see os.fork() causes OSError: Cannot allocate memory. Disabling huge page causes minor performance hit. ## Switching SYNC_MEMOPS to zero can boost throughput with FSDP @@ -40,14 +40,19 @@ export FI_EFA_USE_HUGE_PAGE=0 # Set to 0 when you see os.fork() causes OSErro ## Reduces memory synchronizations ## https://docs.nvidia.com/cuda/cuda-driver-api/group__CUDA__UNIFIED.html export FI_EFA_SET_CUDA_SYNC_MEMOPS=0 -# LD_PRELOAD is required for PyTorch to find the NCCL library -# This path assumes you are using the Deep Learning AMI -# If you are not using the DLAMI, you may need to update this path -export LD_PRELOAD=/usr/local/cuda-12.8/lib/libnccl.so +# LD_PRELOAD is required for PyTorch to find the system NCCL library +# which is built with EFA/OFI support. Without this, PyTorch may use +# its bundled NCCL which lacks EFA support. +export LD_PRELOAD=/lib/x86_64-linux-gnu/libnccl.so export NCCL_SOCKET_IFNAME=^docker,lo,veth,eth ## Set HuggingFace metadata timeout (in seconds) for large clusters export HF_HUB_ETAG_TIMEOUT=60 +## Use a shared cache on FSx so dataset metadata and tokenizer downloads +## are reused across runs and nodes, avoiding HuggingFace API rate limits. +export HF_HOME=/fsx/.cache/huggingface +export HF_DATASETS_CACHE=/fsx/.cache/huggingface/datasets +export HF_HUB_OFFLINE=1 ######################## diff --git a/3.test_cases/pytorch/FSDP/slurm/llama3_2_3b-training.sbatch b/3.test_cases/pytorch/FSDP/slurm/llama3_2_3b-training.sbatch index 9c9ab745a..2037944be 100644 --- a/3.test_cases/pytorch/FSDP/slurm/llama3_2_3b-training.sbatch +++ b/3.test_cases/pytorch/FSDP/slurm/llama3_2_3b-training.sbatch @@ -32,7 +32,7 @@ export FSX_MOUNT=$(pwd):$DATA_PATH ## Plenty of EFA level variables ## For G4dn and other G5, comment out all #export FI_LOG_LEVEL=warn -export NCCL_DEBUG=INFO +export NCCL_DEBUG=WARN export FI_PROVIDER=efa export FI_EFA_USE_HUGE_PAGE=0 # Set to 0 when you see os.fork() causes OSError: Cannot allocate memory. Disabling huge page causes minor performance hit. ## Switching SYNC_MEMOPS to zero can boost throughput with FSDP @@ -40,14 +40,19 @@ export FI_EFA_USE_HUGE_PAGE=0 # Set to 0 when you see os.fork() causes OSErro ## Reduces memory synchronizations ## https://docs.nvidia.com/cuda/cuda-driver-api/group__CUDA__UNIFIED.html export FI_EFA_SET_CUDA_SYNC_MEMOPS=0 -# LD_PRELOAD is required for PyTorch to find the NCCL library -# This path assumes you are using the Deep Learning AMI -# If you are not using the DLAMI, you may need to update this path -export LD_PRELOAD=/usr/local/cuda-12.8/lib/libnccl.so +# LD_PRELOAD is required for PyTorch to find the system NCCL library +# which is built with EFA/OFI support. Without this, PyTorch may use +# its bundled NCCL which lacks EFA support. +export LD_PRELOAD=/lib/x86_64-linux-gnu/libnccl.so export NCCL_SOCKET_IFNAME=^docker,lo,veth,eth ## Set HuggingFace metadata timeout (in seconds) for large clusters export HF_HUB_ETAG_TIMEOUT=60 +## Use a shared cache on FSx so dataset metadata and tokenizer downloads +## are reused across runs and nodes, avoiding HuggingFace API rate limits. +export HF_HOME=/fsx/.cache/huggingface +export HF_DATASETS_CACHE=/fsx/.cache/huggingface/datasets +export HF_HUB_OFFLINE=1 ######################## diff --git a/3.test_cases/pytorch/FSDP/slurm/mathstral_7b-training.sbatch b/3.test_cases/pytorch/FSDP/slurm/mathstral_7b-training.sbatch index 7934abb86..00ebfd579 100644 --- a/3.test_cases/pytorch/FSDP/slurm/mathstral_7b-training.sbatch +++ b/3.test_cases/pytorch/FSDP/slurm/mathstral_7b-training.sbatch @@ -32,7 +32,7 @@ export FSX_MOUNT=$(pwd):$DATA_PATH ## Plenty of EFA level variables ## For G4dn and other G5, comment out all #export FI_LOG_LEVEL=warn -export NCCL_DEBUG=INFO +export NCCL_DEBUG=WARN export FI_PROVIDER=efa export FI_EFA_USE_HUGE_PAGE=0 # Set to 0 when you see os.fork() causes OSError: Cannot allocate memory. Disabling huge page causes minor performance hit. ## Switching SYNC_MEMOPS to zero can boost throughput with FSDP @@ -40,14 +40,19 @@ export FI_EFA_USE_HUGE_PAGE=0 # Set to 0 when you see os.fork() causes OSErro ## Reduces memory synchronizations ## https://docs.nvidia.com/cuda/cuda-driver-api/group__CUDA__UNIFIED.html export FI_EFA_SET_CUDA_SYNC_MEMOPS=0 -# LD_PRELOAD is required for PyTorch to find the NCCL library -# This path assumes you are using the Deep Learning AMI -# If you are not using the DLAMI, you may need to update this path -export LD_PRELOAD=/usr/local/cuda-12.8/lib/libnccl.so +# LD_PRELOAD is required for PyTorch to find the system NCCL library +# which is built with EFA/OFI support. Without this, PyTorch may use +# its bundled NCCL which lacks EFA support. +export LD_PRELOAD=/lib/x86_64-linux-gnu/libnccl.so export NCCL_SOCKET_IFNAME=^docker,lo,veth,eth ## Set HuggingFace metadata timeout (in seconds) for large clusters export HF_HUB_ETAG_TIMEOUT=60 +## Use a shared cache on FSx so dataset metadata and tokenizer downloads +## are reused across runs and nodes, avoiding HuggingFace API rate limits. +export HF_HOME=/fsx/.cache/huggingface +export HF_DATASETS_CACHE=/fsx/.cache/huggingface/datasets +export HF_HUB_OFFLINE=1 ######################## diff --git a/3.test_cases/pytorch/FSDP/slurm/mistral_8x7b-training.sbatch b/3.test_cases/pytorch/FSDP/slurm/mistral_8x7b-training.sbatch index cee9bb8f3..2fca5d511 100644 --- a/3.test_cases/pytorch/FSDP/slurm/mistral_8x7b-training.sbatch +++ b/3.test_cases/pytorch/FSDP/slurm/mistral_8x7b-training.sbatch @@ -32,7 +32,7 @@ export FSX_MOUNT=$(pwd):$DATA_PATH ## Plenty of EFA level variables ## For G4dn and other G5, comment out all #export FI_LOG_LEVEL=warn -export NCCL_DEBUG=INFO +export NCCL_DEBUG=WARN export FI_PROVIDER=efa export FI_EFA_USE_HUGE_PAGE=0 # Set to 0 when you see os.fork() causes OSError: Cannot allocate memory. Disabling huge page causes minor performance hit. ## Switching SYNC_MEMOPS to zero can boost throughput with FSDP @@ -40,14 +40,19 @@ export FI_EFA_USE_HUGE_PAGE=0 # Set to 0 when you see os.fork() causes OSErro ## Reduces memory synchronizations ## https://docs.nvidia.com/cuda/cuda-driver-api/group__CUDA__UNIFIED.html export FI_EFA_SET_CUDA_SYNC_MEMOPS=0 -# LD_PRELOAD is required for PyTorch to find the NCCL library -# This path assumes you are using the Deep Learning AMI -# If you are not using the DLAMI, you may need to update this path -export LD_PRELOAD=/usr/local/cuda-12.8/lib/libnccl.so +# LD_PRELOAD is required for PyTorch to find the system NCCL library +# which is built with EFA/OFI support. Without this, PyTorch may use +# its bundled NCCL which lacks EFA support. +export LD_PRELOAD=/lib/x86_64-linux-gnu/libnccl.so export NCCL_SOCKET_IFNAME=^docker,lo,veth,eth ## Set HuggingFace metadata timeout (in seconds) for large clusters export HF_HUB_ETAG_TIMEOUT=60 +## Use a shared cache on FSx so dataset metadata and tokenizer downloads +## are reused across runs and nodes, avoiding HuggingFace API rate limits. +export HF_HOME=/fsx/.cache/huggingface +export HF_DATASETS_CACHE=/fsx/.cache/huggingface/datasets +export HF_HUB_OFFLINE=1 ######################## diff --git a/3.test_cases/pytorch/FSDP/src/model_utils/train_utils.py b/3.test_cases/pytorch/FSDP/src/model_utils/train_utils.py index bb3a7ccae..0d09d969c 100644 --- a/3.test_cases/pytorch/FSDP/src/model_utils/train_utils.py +++ b/3.test_cases/pytorch/FSDP/src/model_utils/train_utils.py @@ -3,6 +3,7 @@ import os import math +import json import functools import numpy as np import torch @@ -22,6 +23,7 @@ g_gigabyte = 1024**3 + def setup(): # initialize the process group dist.init_process_group("nccl") @@ -30,6 +32,7 @@ def setup(): def cleanup(): dist.destroy_process_group() + def get_date_of_run(): """create date and time for file save uniqueness example: 2022-05-07-08:31:12_PM' @@ -39,21 +42,21 @@ def get_date_of_run(): return date_of_run - def format_metrics_to_gb(item): """quick function to format numbers to gigabyte and round to 4 digit precision""" metric_num = item / g_gigabyte metric_num = round(metric_num, ndigits=4) return metric_num + def train(args, model, rank, world_size, train_loader, optimizer, epoch, sampler=None): model.train() - local_rank = int(os.environ['LOCAL_RANK']) + local_rank = int(os.environ["LOCAL_RANK"]) fsdp_loss = torch.zeros(2).to(local_rank) - + if sampler: sampler.set_epoch(epoch) - if rank==0: + if rank == 0: inner_pbar = tqdm.tqdm( range(len(train_loader)), colour="blue", desc="r0 Training Epoch" ) @@ -61,31 +64,32 @@ def train(args, model, rank, world_size, train_loader, optimizer, epoch, sampler for key in batch.keys(): batch[key] = batch[key].to(local_rank) optimizer.zero_grad() - output = model(input_ids=batch["source_ids"],attention_mask=batch["source_mask"],labels=batch["target_ids"] ) + output = model( + input_ids=batch["source_ids"], + attention_mask=batch["source_mask"], + labels=batch["target_ids"], + ) loss = output["loss"] loss.backward() optimizer.step() fsdp_loss[0] += loss.item() fsdp_loss[1] += len(batch) - if rank==0: + if rank == 0: inner_pbar.update(1) dist.all_reduce(fsdp_loss, op=dist.ReduceOp.SUM) train_accuracy = fsdp_loss[0] / fsdp_loss[1] - if rank == 0: inner_pbar.close() - print( - f"Train Epoch: \t{epoch}, Loss: \t{train_accuracy:.4f}" - ) + print(f"Train Epoch: \t{epoch}, Loss: \t{train_accuracy:.4f}") return train_accuracy def validation(model, rank, world_size, val_loader): model.eval() correct = 0 - local_rank = int(os.environ['LOCAL_RANK']) + local_rank = int(os.environ["LOCAL_RANK"]) fsdp_loss = torch.zeros(2).to(local_rank) if rank == 0: inner_pbar = tqdm.tqdm( @@ -95,11 +99,15 @@ def validation(model, rank, world_size, val_loader): for batch in val_loader: for key in batch.keys(): batch[key] = batch[key].to(local_rank) - output = model(input_ids=batch["source_ids"],attention_mask=batch["source_mask"],labels=batch["target_ids"]) + output = model( + input_ids=batch["source_ids"], + attention_mask=batch["source_mask"], + labels=batch["target_ids"], + ) fsdp_loss[0] += output["loss"].item() # sum up batch loss fsdp_loss[1] += len(batch) - if rank==0: + if rank == 0: inner_pbar.update(1) dist.all_reduce(fsdp_loss, op=dist.ReduceOp.SUM) @@ -109,6 +117,7 @@ def validation(model, rank, world_size, val_loader): print(f"Validation Loss: {val_loss:.4f}") return val_loss + def get_model_config(args): if "gpt_neox" in args.model_type: from transformers import GPTNeoXConfig @@ -149,7 +158,7 @@ def get_model_config(args): ) elif "llama_v3" in args.model_type: from transformers import LlamaConfig - + model_config = LlamaConfig( vocab_size=args.vocab_size, hidden_size=args.hidden_width, @@ -164,12 +173,12 @@ def get_model_config(args): use_cache=False, pretraining_tp=1, tie_word_embeddings=False, - rope_scaling= {"type": "dynamic", "factor": 2.0}, + rope_scaling={"type": "dynamic", "factor": 2.0}, ) - - + elif "mixtral" in args.model_type: from transformers import MixtralConfig + model_config = MixtralConfig( vocab_size=args.vocab_size, hidden_size=args.hidden_width, @@ -188,24 +197,26 @@ def get_model_config(args): ) elif "mistral" in args.model_type: from transformers import MistralConfig + model_config = MistralConfig( - vocab_size=args.vocab_size, - hidden_size=args.hidden_width, - intermediate_size=args.intermediate_size, - num_hidden_layers=args.num_layers, - num_attention_heads=args.num_heads, - num_key_value_heads=args.num_key_value_heads, - hidden_act="silu", - max_position_embeddings=args.max_context_width, - initializer_range=args.initializer_range, - rms_norm_eps=1e-5, - use_cache=False, - tie_word_embeddings=False + vocab_size=args.vocab_size, + hidden_size=args.hidden_width, + intermediate_size=args.intermediate_size, + num_hidden_layers=args.num_layers, + num_attention_heads=args.num_heads, + num_key_value_heads=args.num_key_value_heads, + hidden_act="silu", + max_position_embeddings=args.max_context_width, + initializer_range=args.initializer_range, + rms_norm_eps=1e-5, + use_cache=False, + tie_word_embeddings=False, ) else: raise NotImplementedError(f"Model {args.model_type} not implemented") return model_config + def compute_num_params(model): """Get num params.""" num_params = 0 @@ -220,18 +231,23 @@ def compute_num_params(model): return num_params + _logger = None + + def get_logger(): global _logger if _logger is None: - logging.getLogger("torch.distributed.checkpoint._dedup_tensors").setLevel(logging.ERROR) + logging.getLogger("torch.distributed.checkpoint._dedup_tensors").setLevel( + logging.ERROR + ) logging.getLogger("torch.distributed.distributed_c10d").setLevel(logging.ERROR) _logger = logging.getLogger(__name__) _logger.setLevel(logging.INFO) _logger.handlers = [] ch = logging.StreamHandler() formatter = logging.Formatter( - "%(asctime)s %(levelname).1s " "[%(filename)s:%(lineno)d] %(message)s", + "%(asctime)s %(levelname).1s [%(filename)s:%(lineno)d] %(message)s", "%Y-%m-%d %H:%M:%S", ) ch.setFormatter(formatter) @@ -239,6 +255,7 @@ def get_logger(): _logger.propagate = False return _logger + def get_transformer_layer(model_type="gpt2"): """Get transformer layer.""" if model_type == "gpt2": @@ -265,7 +282,7 @@ def get_transformer_layer(model_type="gpt2"): from transformers.models.llama.modeling_llama import LlamaDecoderLayer transformer_layer = LlamaDecoderLayer - + elif model_type == "llama_v3": from transformers.models.llama.modeling_llama import LlamaDecoderLayer @@ -286,6 +303,7 @@ def get_transformer_layer(model_type="gpt2"): return transformer_layer + def get_sharding_strategy(strategy: str): """Get sharding strategy.""" sharding_strategy = getattr(ShardingStrategy, strategy.upper()) @@ -299,6 +317,7 @@ def get_backward_fetch_policy(policy: str): _logger.debug("Translating %s to %s.", policy, backward_fetch_policy) return backward_fetch_policy + def apply_activation_checkpoint(args, model=None): from torch.distributed.algorithms._checkpoint.checkpoint_wrapper import ( CheckpointImpl, @@ -307,9 +326,7 @@ def apply_activation_checkpoint(args, model=None): ) transformer_layer = get_transformer_layer(args.model_type) - check_fn_gpt = lambda submodule: isinstance( - submodule, transformer_layer - ) + check_fn_gpt = lambda submodule: isinstance(submodule, transformer_layer) entrant_wrapper = functools.partial( checkpoint_wrapper, checkpoint_impl=CheckpointImpl.NO_REENTRANT ) @@ -317,6 +334,7 @@ def apply_activation_checkpoint(args, model=None): model, checkpoint_wrapper_fn=entrant_wrapper, check_fn=check_fn_gpt ) + def get_param_groups_by_weight_decay(module): """Get param groups.""" weight_decay_params = {"params": []} @@ -328,27 +346,22 @@ def get_param_groups_by_weight_decay(module): for module_ in module.modules(): # if isinstance(module_, FusedLayerNorm) or if isinstance(module_, LayerNorm): - for p in list( - module_._parameters.values() - ): # pylint: disable=invalid-name,protected-access + for p in list(module_._parameters.values()): # pylint: disable=invalid-name,protected-access if p is not None and id(p) not in param_ids: no_weight_decay_params["params"].append(p) param_ids.add(id(p)) else: - for n, p in list( - module_._parameters.items() - ): # pylint: disable=invalid-name,protected-access + for n, p in list(module_._parameters.items()): # pylint: disable=invalid-name,protected-access if p is not None and n != "bias" and id(p) not in param_ids: weight_decay_params["params"].append(p) param_ids.add(id(p)) - for n, p in list( - module_._parameters.items() - ): # pylint: disable=invalid-name,protected-access + for n, p in list(module_._parameters.items()): # pylint: disable=invalid-name,protected-access if p is not None and n == "bias" and id(p) not in param_ids: no_weight_decay_params["params"].append(p) param_ids.add(id(p)) return weight_decay_params, no_weight_decay_params + class AnnealingLR: # pylint: disable=too-many-instance-attributes """Anneals the learning rate.""" @@ -365,7 +378,6 @@ def __init__( # pylint: disable=too-many-arguments use_checkpoint_lr_scheduler=True, override_lr_scheduler=False, ): - # Class values. self.optimizer = optimizer self.start_lr = start_lr @@ -380,7 +392,7 @@ def __init__( # pylint: disable=too-many-arguments self.use_checkpoint_lr_scheduler = use_checkpoint_lr_scheduler if self.override_lr_scheduler: assert not self.use_checkpoint_lr_scheduler, ( - "both override and " "use-checkpoint are set." + "both override and use-checkpoint are set." ) # Set the learning rate self.step(self.num_iters) @@ -408,7 +420,11 @@ def get_lr(self): / (self.end_iter - self.plateau_iter) ) elif self.decay_style == "cosine": - lr = self.start_lr / 2.0 * (math.cos(math.pi * num_iters_ / self.end_iter) + 1) + lr = ( + self.start_lr + / 2.0 + * (math.cos(math.pi * num_iters_ / self.end_iter) + 1) + ) elif self.decay_style == "exponential": # exp(-0.693) = 1/2 lr = self.start_lr * math.exp(-0.693 * num_iters_ / self.end_iter) @@ -446,28 +462,35 @@ def _check_and_set(self, cls_value, sd_value, name): return cls_value if not self.use_checkpoint_lr_scheduler: - assert ( - cls_value == sd_value - ), f"AnnealingLR: class input value and checkpoint values for {name} do not match" + assert cls_value == sd_value, ( + f"AnnealingLR: class input value and checkpoint values for {name} do not match" + ) if self.rank == 0: _logger.info(f" > using checkpoint value {sd_value} for {name}") return sd_value def load_state_dict(self, sd): """Load state dict.""" - self.start_lr = self._check_and_set(self.start_lr, sd["start_lr"], "learning rate") - self.min_lr = self._check_and_set(self.min_lr, sd["min_lr"], "minimum learning rate") + self.start_lr = self._check_and_set( + self.start_lr, sd["start_lr"], "learning rate" + ) + self.min_lr = self._check_and_set( + self.min_lr, sd["min_lr"], "minimum learning rate" + ) self.warmup_iter = self._check_and_set( self.warmup_iter, sd["warmup_iter"], "warmup iterations" ) self.end_iter = self._check_and_set( self.end_iter, sd["end_iter"], "total number of iterations" ) - self.decay_style = self._check_and_set(self.decay_style, sd["decay_style"], "decay style") + self.decay_style = self._check_and_set( + self.decay_style, sd["decay_style"], "decay style" + ) self.num_iters = sd["num_iters"] self.step(self.num_iters) + def get_learning_rate_scheduler(optimizer, args): """Get learning rate scheduler.""" use_checkpoint_lr_scheduler = args.resume_from_checkpoint is not None @@ -496,22 +519,59 @@ def get_learning_rate_scheduler(optimizer, args): return lr_scheduler -def create_streaming_dataloader(dataset, - tokenizer, - name=None, - global_rank=0, - batch_size=1, - max_context_width=4096, - workers=4, - split=None): + +def create_streaming_dataloader( + dataset, + tokenizer, + name=None, + global_rank=0, + batch_size=1, + max_context_width=4096, + workers=4, + split=None, +): print(f"dataset={dataset}, name={name}") - tokenizer = AutoTokenizer.from_pretrained(tokenizer,legacy=False) - data = load_dataset(dataset, name=name, streaming=True, split=split).shuffle(42+global_rank) + tokenizer = AutoTokenizer.from_pretrained(tokenizer, legacy=False) + + # Use pre-resolved data files manifest if available to avoid every + # process hitting the HuggingFace API for paginated file listings + # and repo_info() validation, which causes HTTP 429 rate-limit errors + # on large clusters with many concurrent processes. + data_files = None + use_manifest = False + manifest_path = os.environ.get("HF_DATA_FILES_MANIFEST") + if manifest_path and os.path.isfile(manifest_path): + try: + with open(manifest_path) as f: + manifest = json.load(f) + if split and split in manifest: + data_files = {split: manifest[split]} + use_manifest = True + print( + f"Using pre-resolved data files for split={split} " + f"({len(manifest[split])} files, first={manifest[split][0][:80]})" + ) + except (json.JSONDecodeError, KeyError) as e: + print(f"Warning: could not read manifest {manifest_path}: {e}") + + if use_manifest: + # Load as generic JSON dataset using the manifest file paths. + # Local file paths use LocalFileSystem (no HF API calls at all). + # This avoids HuggingFace 429 rate limits from 128+ DataLoader workers. + data = load_dataset( + "json", streaming=True, split=split, data_files=data_files + ).shuffle(42 + global_rank) + else: + data = load_dataset( + dataset, name=name, streaming=True, split=split, data_files=data_files + ).shuffle(42 + global_rank) train_concat_dataset = ConcatTokensDataset(data, tokenizer, max_context_width, True) - train_dataloader = DataLoader(train_concat_dataset, - batch_size=batch_size, - num_workers=workers, - pin_memory=True, - prefetch_factor=4, - timeout=600) + train_dataloader = DataLoader( + train_concat_dataset, + batch_size=batch_size, + num_workers=workers, + pin_memory=True, + prefetch_factor=4, + timeout=600, + ) return train_dataloader