#!/usr/bin/env bash #=============================================================================== # Author: Smith Wang # Version: 1.0.0 # License: MIT # Filename: doris_csv_stream_load.sh # # Description: # 1) Use wget to download one or multiple CSV files (resume + size verification) # 2) Import into Apache Doris via Stream Load API using curl # # Module dependencies: # - bash (>= 5.0) # - wget # - curl # - awk, sed, grep, stat, date, mktemp # # ShellCheck: # shellcheck -x doris_csv_stream_load.sh #=============================================================================== set -euo pipefail IFS=$'\n\t' #=============================================================================== # Global Constants #=============================================================================== readonly SCRIPT_NAME="$(basename "$0")" readonly SCRIPT_VERSION="1.0.0" readonly DEFAULT_WORKDIR="./doris_csv_downloads" readonly DEFAULT_NATIONAL_DIR_URL="https://oss.demo.uavcmlc.com/cmlc-installation/doris/all" readonly DEFAULT_NATIONAL_COUNT="6" # suffix 0..5 readonly DEFAULT_REGION_URL="https://oss.demo.uavcmlc.com/cmlc-installation/doris/all/xiongan.csv" # Doris defaults (override by args) readonly DEFAULT_DORIS_USER="root" readonly DEFAULT_DORIS_PASS="" # empty by default (root:) readonly DEFAULT_DORIS_BE_PORT="8040" # Stream Load port # wget/curl behavior readonly WGET_RETRIES="10" readonly WGET_TIMEOUT_SEC="30" readonly CURL_TIMEOUT_SEC="600" # per file; adjust if needed readonly LOCKFILE="/tmp/${SCRIPT_NAME}.lock" #=============================================================================== # Runtime Config (set by args) #=============================================================================== ACTION="all" # download|load|all WORKDIR="${DEFAULT_WORKDIR}" NATIONAL_DIR_URL="${DEFAULT_NATIONAL_DIR_URL}" NATIONAL_PREFIX="" # REQUIRED for national mode NATIONAL_COUNT="${DEFAULT_NATIONAL_COUNT}" REGION_URL="${DEFAULT_REGION_URL}" DORIS_BE_IP="" # REQUIRED DORIS_BE_PORT="${DEFAULT_DORIS_BE_PORT}" DORIS_USER="${DEFAULT_DORIS_USER}" DORIS_PASS="${DEFAULT_DORIS_PASS}" DORIS_DB="cmii" DORIS_TABLE="dwd_reg_grid_city_detail_dd" COLUMN_SEPARATOR="," # Derived DOWNLOAD_LIST_FILE="" STREAMLOAD_LOG_DIR="" #=============================================================================== # ASCII Call Graph #=============================================================================== # main # ├─ acquire_lock # ├─ parse_args # ├─ validate_env # ├─ prepare_workdir # ├─ build_download_list # ├─ run_downloads # │ ├─ get_remote_size_bytes # │ ├─ download_one # │ └─ verify_file_size # ├─ run_stream_load # │ ├─ stream_load_one # │ └─ parse_stream_load_response # └─ release_lock (trap) #=============================================================================== #=============================================================================== # Logging #=============================================================================== LOG_LEVEL="INFO" # DEBUG|INFO|WARN|ERROR ### Print log line with level # @param level string Log level # @param msg string Message # @return 0 Success log() { local level="$1" local msg="$2" local ts ts="$(date '+%Y-%m-%d %H:%M:%S')" >&2 printf '%s [%s] %s: %s\n' "$ts" "$level" "$SCRIPT_NAME" "$msg" } ### Debug log # @param msg string Message # @return 0 Success log_debug() { [[ "$LOG_LEVEL" == "DEBUG" ]] && log "DEBUG" "$1" || true; } ### Info log # @param msg string Message # @return 0 Success log_info() { log "INFO" "$1"; } ### Warn log # @param msg string Message # @return 0 Success log_warn() { log "WARN" "$1"; } ### Error log # @param msg string Message # @return 0 Success log_error() { log "ERROR" "$1"; } #=============================================================================== # Error / Cleanup #=============================================================================== TMPDIR="" CLEANUP_FILES=() ### Cleanup handler # @return 0 Always cleanup() { local exit_code=$? # > cleanup temp resources if [[ -n "${TMPDIR}" && -d "${TMPDIR}" ]]; then rm -rf "${TMPDIR}" || true fi # > release lock release_lock || true if [[ $exit_code -ne 0 ]]; then log_error "Exiting with code ${exit_code}" fi exit "$exit_code" } trap cleanup EXIT INT TERM ### Fail with message # @param msg string Error message # @return 1 Always die() { log_error "$1" return 1 } #=============================================================================== # Lock #=============================================================================== ### Acquire a simple lock to avoid concurrent runs # @require flock (optional) OR atomic mkdir (fallback) # @return 0 Success acquire_lock() { # > Prefer mkdir lock for POSIX-ish behavior (no flock dependency) if mkdir "${LOCKFILE}.d" 2>/dev/null; then log_debug "Lock acquired: ${LOCKFILE}.d" else die "Another instance is running (lock exists: ${LOCKFILE}.d). Remove it if you're sure." fi } ### Release lock # @return 0 Success release_lock() { if [[ -d "${LOCKFILE}.d" ]]; then rmdir "${LOCKFILE}.d" 2>/dev/null || true log_debug "Lock released: ${LOCKFILE}.d" fi } #=============================================================================== # Usage #=============================================================================== ### Print usage # @return 0 Success usage() { cat <<'EOF' Usage: doris_csv_stream_load.sh [download|load|all] [options] Actions: download Only download CSVs (wget) load Only stream-load existing CSVs in workdir all Download then load (default) Options: --workdir Download directory (default: ./doris_csv_downloads) --log-level # National files (suffix 0..5 by default) --national-dir-url Base directory URL for national files default: https://oss.demo.uavcmlc.com/cmlc-installation/doris/all --national-prefix REQUIRED for national mode e.g. result_2aee9754dd304ca1-a0651901906f9bb4 --national-count How many files, suffix 0..n-1 (default: 6) # Optional single region file --region-url default: https://oss.demo.uavcmlc.com/cmlc-installation/doris/all/xiongan.csv --no-region Skip region file # Doris stream load config --doris-be-ip REQUIRED --doris-be-port default: 8040 --doris-user default: root --doris-pass default: empty --db default: cmii --table default: dwd_reg_grid_city_detail_dd --column-separator default: , Examples: # 1) All national(0..5) + region file, download then load: ./doris_csv_stream_load.sh all \ --national-prefix result_2aee9754dd304ca1-a0651901906f9bb4 \ --doris-be-ip 10.10.10.10 # 2) Download only: ./doris_csv_stream_load.sh download \ --national-prefix result_xxx \ --doris-be-ip 10.10.10.10 # 3) Load only (assumes files already in workdir): ./doris_csv_stream_load.sh load \ --national-prefix result_xxx \ --doris-be-ip 10.10.10.10 EOF } #=============================================================================== # Args Parsing #=============================================================================== SKIP_REGION="false" ### Parse CLI arguments # @param args string[] Command line args # @return 0 Success parse_args() { if [[ $# -ge 1 ]]; then case "$1" in download|load|all) ACTION="$1"; shift ;; -h|--help) usage; exit 0 ;; *) : ;; esac fi while [[ $# -gt 0 ]]; do case "$1" in --workdir) WORKDIR="$2"; shift 2 ;; --log-level) LOG_LEVEL="$2"; shift 2 ;; --national-dir-url) NATIONAL_DIR_URL="$2"; shift 2 ;; --national-prefix) NATIONAL_PREFIX="$2"; shift 2 ;; --national-count) NATIONAL_COUNT="$2"; shift 2 ;; --region-url) REGION_URL="$2"; shift 2 ;; --no-region) SKIP_REGION="true"; shift 1 ;; --doris-be-ip) DORIS_BE_IP="$2"; shift 2 ;; --doris-be-port) DORIS_BE_PORT="$2"; shift 2 ;; --doris-user) DORIS_USER="$2"; shift 2 ;; --doris-pass) DORIS_PASS="$2"; shift 2 ;; --db) DORIS_DB="$2"; shift 2 ;; --table) DORIS_TABLE="$2"; shift 2 ;; --column-separator) COLUMN_SEPARATOR="$2"; shift 2 ;; -h|--help) usage; exit 0 ;; *) die "Unknown argument: $1" ;; esac done } #=============================================================================== # Validation / Environment #=============================================================================== ### Validate required tools and config # @require wget # @require curl # @require awk sed grep stat # @return 0 Success validate_env() { command -v wget >/dev/null 2>&1 || die "wget not found" command -v curl >/dev/null 2>&1 || die "curl not found" command -v awk >/dev/null 2>&1 || die "awk not found" command -v sed >/dev/null 2>&1 || die "sed not found" command -v grep >/dev/null 2>&1 || die "grep not found" command -v stat >/dev/null 2>&1 || die "stat not found" [[ -n "${DORIS_BE_IP}" ]] || die "--doris-be-ip is required" [[ -n "${NATIONAL_PREFIX}" ]] || die "--national-prefix is required (filename changes, must be provided)" [[ "${NATIONAL_COUNT}" =~ ^[0-9]+$ ]] || die "--national-count must be an integer" } ### Prepare working directory and temp (追加初始化日志目录) # @return 0 Success prepare_workdir() { mkdir -p "${WORKDIR}" TMPDIR="$(mktemp -d)" DOWNLOAD_LIST_FILE="${TMPDIR}/download_list.txt" # > stream load logs dir STREAMLOAD_LOG_DIR="${WORKDIR}/_streamload_logs" mkdir -p "${STREAMLOAD_LOG_DIR}" log_debug "Workdir: ${WORKDIR}" log_debug "StreamLoad log dir: ${STREAMLOAD_LOG_DIR}" } ### Generate a local request id for tracing # @param csv_path string Local CSV file path # @return 0 Success (prints request id) gen_request_id() { local csv_path="$1" local ts ts="$(date '+%Y%m%d_%H%M%S')" # > sanitize filename for filesystem local base base="$(basename "${csv_path}" | sed 's/[^a-zA-Z0-9._-]/_/g')" printf '%s__%s__%s' "${ts}" "$$" "${base}" } ### Extract a JSON string field value without jq (best-effort) # @param json_file string JSON file path # @param field_name string Field name, e.g. Status # @return 0 Success (prints value or empty) json_get_string() { local json_file="$1" local field_name="$2" # > naive parse: "Field":"value" grep -Eo "\"${field_name}\"[[:space:]]*:[[:space:]]*\"[^\"]*\"" "${json_file}" \ | head -n1 \ | sed -E "s/.*\"${field_name}\"[[:space:]]*:[[:space:]]*\"([^\"]*)\".*/\1/" \ || true } ### Extract a JSON numeric field value without jq (best-effort) # @param json_file string JSON file path # @param field_name string Field name, e.g. TxnId # @return 0 Success (prints value or empty) json_get_number() { local json_file="$1" local field_name="$2" # > naive parse: "Field":12345 grep -Eo "\"${field_name}\"[[:space:]]*:[[:space:]]*[0-9]+" "${json_file}" \ | head -n1 \ | sed -E "s/.*\"${field_name}\"[[:space:]]*:[[:space:]]*([0-9]+).*/\1/" \ || true } #=============================================================================== # Download List Builder #=============================================================================== ### Build download URL list into a file # @return 0 Success build_download_list() { : > "${DOWNLOAD_LIST_FILE}" # > National files: prefix_0..prefix_(count-1) local i for ((i=0; i< NATIONA L_COUNT; i++)); do printf '%s/%s_%s.csv\n' "${NATIONAL_DIR_URL}" "${NATIONAL_PREFIX}" "${i}" >> "${DOWNLOAD_LIST_FILE}" done # > Optional region file if [[ "${SKIP_REGION}" != "true" ]]; then printf '%s\n' "${REGION_URL}" >> "${DOWNLOAD_LIST_FILE}" fi log_info "Download list prepared: $(wc -l < "${DOWNLOAD_LIST_FILE}") file(s)" log_debug "Download list content:\n$(cat "${DOWNLOAD_LIST_FILE}")" } # NOTE: fix accidental space in variable name (ShellCheck would flag). Keep code correct: # We'll patch it by redefining function properly. build_download_list() { : > "${DOWNLOAD_LIST_FILE}" # > National files: prefix_0..prefix_(count-1) local i for ((i=0; i< NATIONAL_COUNT; i++)); do printf '%s/%s_%s.csv\n' "${NATIONAL_DIR_URL}" "${NATIONAL_PREFIX}" "${i}" >> "${DOWNLOAD_LIST_FILE}" done # > Optional region file if [[ "${SKIP_REGION}" != "true" ]]; then printf '%s\n' "${REGION_URL}" >> "${DOWNLOAD_LIST_FILE}" fi log_info "Download list prepared: $(wc -l < "${DOWNLOAD_LIST_FILE}") file(s)" } #=============================================================================== # Download / Verify #=============================================================================== ### Get remote content length (bytes) via wget --spider # @param url string Remote URL # @return 0 Success (prints size or empty if unknown) get_remote_size_bytes() { local url="$1" local size="" # > wget spider to fetch headers # Some servers may not provide Content-Length; handle gracefully. local headers headers="$(wget --spider --server-response --timeout="${WGET_TIMEOUT_SEC}" --tries=2 "${url}" 2>&1 || true)" # Try to locate Content-Length size="$(printf '%s\n' "${headers}" | awk -F': ' 'tolower($1)==" content-length" {gsub("\r","",$2); print $2}' | tail -n 1)" if [[ -n "${size}" && "${size}" =~ ^[0-9]+$ ]]; then printf '%s' "${size}" else printf '%s' "" fi } ### Get local file size in bytes # @param file_path string Local file path # @return 0 Success (prints size) get_local_size_bytes() { local file_path="$1" stat -c '%s' "${file_path}" } ### Verify local file size equals remote (if remote size known) # @param file_path string Local file path # @param remote_size string Remote size bytes (may be empty) # @return 0 Success verify_file_size() { local file_path="$1" local remote_size="$2" [[ -f "${file_path}" ]] || die "File not found: ${file_path}" if [[ -z "${remote_size}" ]]; then # > Cannot verify by size; at least ensure file is non-empty local local_size local_size="$(get_local_size_bytes "${file_path}")" [[ "${local_size}" -gt 0 ]] || die "Downloaded file is empty: ${file_path}" log_warn "Remote Content-Length missing; only checked non-empty: ${file_path} (${local_size} bytes)" return 0 fi local local_size local_size="$(get_local_size_bytes "${file_path}")" if [[ "${local_size}" != "${remote_size}" ]]; then die "Size mismatch for ${file_path}: local=${local_size}, remote=${remote_size}" fi log_info "Verified: ${file_path} (size=${local_size} bytes)" } ### Download a single URL into workdir with resume + retries # @param url string Remote URL # @return 0 Success download_one() { local url="$1" local filename filename="$(basename "${url}")" local out_path="${WORKDIR}/${filename}" log_info "Downloading: ${url}" local remote_size remote_size="$(get_remote_size_bytes "${url}")" if [[ -n "${remote_size}" ]]; then log_debug "Remote size: ${remote_size} bytes for ${filename}" else log_warn "Remote size unknown (no Content-Length): ${url}" fi # > Use --continue for resume, --tries for retries, --timeout to avoid hanging # > Use -O to ensure deterministic output path wget --continue \ --tries="${WGET_RETRIES}" \ --timeout="${WGET_TIMEOUT_SEC}" \ --output-document="${out_path}" \ "${url}" # > Ensure fully downloaded verify_file_size "${out_path}" "${remote_size}" } ### Download all from list file (must all succeed) # @param list_file string File contains URLs # @return 0 Success run_downloads() { local list_file="$1" [[ -f "${list_file}" ]] || die "Download list file not found: ${list_file}" # > Read line by line (URL per line) while IFS= read -r url; do [[ -n "${url}" ]] || continue download_one "${url}" done < "${list_file}" log_info "All downloads completed successfully." } ### Parse Doris stream load response and decide success (增强:输出txn/label等) # @param resp_file string Response file path # @return 0 Success, 1 Failure parse_stream_load_response() { local resp_file="$1" [[ -f "${resp_file}" ]] || die "Response file not found: ${resp_file}" local status message txn_id label load_rows filtered_rows load_bytes status="$(json_get_string "${resp_file}" "Status")" message="$(json_get_string "${resp_file}" "Message")" txn_id="$(json_get_number "${resp_file}" "TxnId")" label="$(json_get_string "${resp_file}" "Label")" load_rows="$(json_get_number "${resp_file}" "NumberLoadedRows")" filtered_rows="$(json_get_number "${resp_file}" "NumberFilteredRows")" load_bytes="$(json_get_number "${resp_file}" "LoadBytes")" # > structured summary (easy to grep) log_info "StreamLoadResp status=${status:-N/A} txn_id=${txn_id:-N/A} label=${label:-N/A} loaded=${load_rows:-N/A} filtered=${filtered_rows:-N/A} bytes=${load_bytes:-N/A}" if [[ "${status}" == "Success" ]]; then log_info "Stream Load Success. Message=${message:-N/A}" return 0 fi log_error "Stream Load Failed. Status=${status:-Unknown} Message=${message:-N/A}" log_error "Full response saved at: ${resp_file}" return 1 } ### Stream load a single CSV file to Doris (改:不再加 -H label;改用本地 request_id 追踪) # @param csv_path string Local CSV file path # @return 0 Success stream_load_one() { local csv_path="$1" [[ -f "${csv_path}" ]] || die "CSV not found: ${csv_path}" local url="http://${DORIS_BE_IP}:${DORIS_BE_PORT}/api/${DORIS_DB}/${DORIS_TABLE}/_stream_load" # > local trace id to correlate logs/response/files local request_id request_id="$(gen_request_id "${csv_path}")" # > persist full response for tracing local resp_file="${STREAMLOAD_LOG_DIR}/${request_id}.json" log_info "Stream loading: ${csv_path} -> ${url} (request_id=${request_id})" log_info "Response will be saved: ${resp_file}" # > NOTE: do NOT set label header (per requirement) curl --location-trusted \ --silent --show-error --fail-with-body \ --max-time "${CURL_TIMEOUT_SEC}" \ -u "${DORIS_USER}:${DORIS_PASS}" \ -H "Expect:100-continue" \ -H "column_separator:${COLUMN_SEPARATOR}" \ -T "${csv_path}" \ -X PUT \ "${url}" > "${resp_file}" parse_stream_load_response "${resp_file}" } ### Stream load all CSVs in workdir that match prefix list # @param list_file string Download list file (to know exact filenames) # @return 0 Success run_stream_load() { local list_file="$1" [[ -f "${list_file}" ]] || die "Download list file not found: ${list_file}" # > Ensure all expected files exist before loading while IFS= read -r url; do [[ -n "${url}" ]] || continue local filename filename="$(basename "${url}")" local csv_path="${WORKDIR}/${filename}" [[ -f "${csv_path}" ]] || die "Expected CSV missing (download not complete?): ${csv_path}" done < "${list_file}" log_info "All expected CSV files exist. Starting Stream Load..." # > Load in the same order as list while IFS= read -r url; do [[ -n "${url}" ]] || continue local filename filename="$(basename "${url}")" stream_load_one "${WORKDIR}/${filename}" done < "${list_file}" log_info "All Stream Load operations finished." } #=============================================================================== # Main #=============================================================================== main() { acquire_lock parse_args "$@" validate_env prepare_workdir build_download_list case "${ACTION}" in download) # > Download only run_downloads "${DOWNLOAD_LIST_FILE}" ;; load) # > Load only (expects files already present) run_stream_load "${DOWNLOAD_LIST_FILE}" ;; all) # > Download then load run_downloads "${DOWNLOAD_LIST_FILE}" run_stream_load "${DOWNLOAD_LIST_FILE}" ;; *) die "Invalid action: ${ACTION}" ;; esac log_info "Done. (version=${SCRIPT_VERSION})" } main "$@"