Files

635 lines
20 KiB
Bash
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env bash
#===============================================================================
# Author: Smith Wang
# Version: 1.0.0
# License: MIT
# Filename: doris_csv_stream_load.sh
#
# Description:
# 1) Use wget to download one or multiple CSV files (resume + size verification)
# 2) Import into Apache Doris via Stream Load API using curl
#
# Module dependencies:
# - bash (>= 5.0)
# - wget
# - curl
# - awk, sed, grep, stat, date, mktemp
#
# ShellCheck:
# shellcheck -x doris_csv_stream_load.sh
#===============================================================================
set -euo pipefail
IFS=$'\n\t'
#===============================================================================
# Global Constants
#===============================================================================
readonly SCRIPT_NAME="$(basename "$0")"
readonly SCRIPT_VERSION="1.0.0"
readonly DEFAULT_WORKDIR="./doris_csv_downloads"
readonly DEFAULT_NATIONAL_DIR_URL="https://oss.demo.uavcmlc.com/cmlc-installation/doris/all"
readonly DEFAULT_NATIONAL_COUNT="6" # suffix 0..5
readonly DEFAULT_REGION_URL="https://oss.demo.uavcmlc.com/cmlc-installation/doris/all/xiongan.csv"
# Doris defaults (override by args)
readonly DEFAULT_DORIS_USER="root"
readonly DEFAULT_DORIS_PASS="" # empty by default (root:)
readonly DEFAULT_DORIS_BE_PORT="8040" # Stream Load port
# wget/curl behavior
readonly WGET_RETRIES="10"
readonly WGET_TIMEOUT_SEC="30"
readonly CURL_TIMEOUT_SEC="600" # per file; adjust if needed
readonly LOCKFILE="/tmp/${SCRIPT_NAME}.lock"
#===============================================================================
# Runtime Config (set by args)
#===============================================================================
ACTION="all" # download|load|all
WORKDIR="${DEFAULT_WORKDIR}"
NATIONAL_DIR_URL="${DEFAULT_NATIONAL_DIR_URL}"
NATIONAL_PREFIX="" # REQUIRED for national mode
NATIONAL_COUNT="${DEFAULT_NATIONAL_COUNT}"
REGION_URL="${DEFAULT_REGION_URL}"
DORIS_BE_IP="" # REQUIRED
DORIS_BE_PORT="${DEFAULT_DORIS_BE_PORT}"
DORIS_USER="${DEFAULT_DORIS_USER}"
DORIS_PASS="${DEFAULT_DORIS_PASS}"
DORIS_DB="cmii"
DORIS_TABLE="dwd_reg_grid_city_detail_dd"
COLUMN_SEPARATOR=","
# Derived
DOWNLOAD_LIST_FILE=""
STREAMLOAD_LOG_DIR=""
#===============================================================================
# ASCII Call Graph
#===============================================================================
# main
# ├─ acquire_lock
# ├─ parse_args
# ├─ validate_env
# ├─ prepare_workdir
# ├─ build_download_list
# ├─ run_downloads
# │ ├─ get_remote_size_bytes
# │ ├─ download_one
# │ └─ verify_file_size
# ├─ run_stream_load
# │ ├─ stream_load_one
# │ └─ parse_stream_load_response
# └─ release_lock (trap)
#===============================================================================
#===============================================================================
# Logging
#===============================================================================
LOG_LEVEL="INFO" # DEBUG|INFO|WARN|ERROR
### Print log line with level
# @param level string Log level
# @param msg string Message
# @return 0 Success
log() {
local level="$1"
local msg="$2"
local ts
ts="$(date '+%Y-%m-%d %H:%M:%S')"
>&2 printf '%s [%s] %s: %s\n' "$ts" "$level" "$SCRIPT_NAME" "$msg"
}
### Debug log
# @param msg string Message
# @return 0 Success
log_debug() { [[ "$LOG_LEVEL" == "DEBUG" ]] && log "DEBUG" "$1" || true; }
### Info log
# @param msg string Message
# @return 0 Success
log_info() { log "INFO" "$1"; }
### Warn log
# @param msg string Message
# @return 0 Success
log_warn() { log "WARN" "$1"; }
### Error log
# @param msg string Message
# @return 0 Success
log_error() { log "ERROR" "$1"; }
#===============================================================================
# Error / Cleanup
#===============================================================================
TMPDIR=""
CLEANUP_FILES=()
### Cleanup handler
# @return 0 Always
cleanup() {
local exit_code=$?
# > cleanup temp resources
if [[ -n "${TMPDIR}" && -d "${TMPDIR}" ]]; then
rm -rf "${TMPDIR}" || true
fi
# > release lock
release_lock || true
if [[ $exit_code -ne 0 ]]; then
log_error "Exiting with code ${exit_code}"
fi
exit "$exit_code"
}
trap cleanup EXIT INT TERM
### Fail with message
# @param msg string Error message
# @return 1 Always
die() {
log_error "$1"
return 1
}
#===============================================================================
# Lock
#===============================================================================
### Acquire a simple lock to avoid concurrent runs
# @require flock (optional) OR atomic mkdir (fallback)
# @return 0 Success
acquire_lock() {
# > Prefer mkdir lock for POSIX-ish behavior (no flock dependency)
if mkdir "${LOCKFILE}.d" 2>/dev/null; then
log_debug "Lock acquired: ${LOCKFILE}.d"
else
die "Another instance is running (lock exists: ${LOCKFILE}.d). Remove it if you're sure."
fi
}
### Release lock
# @return 0 Success
release_lock() {
if [[ -d "${LOCKFILE}.d" ]]; then
rmdir "${LOCKFILE}.d" 2>/dev/null || true
log_debug "Lock released: ${LOCKFILE}.d"
fi
}
#===============================================================================
# Usage
#===============================================================================
### Print usage
# @return 0 Success
usage() {
cat <<'EOF'
Usage:
doris_csv_stream_load.sh [download|load|all] [options]
Actions:
download Only download CSVs (wget)
load Only stream-load existing CSVs in workdir
all Download then load (default)
Options:
--workdir <path> Download directory (default: ./doris_csv_downloads)
--log-level <DEBUG|INFO|WARN|ERROR>
# National files (suffix 0..5 by default)
--national-dir-url <url> Base directory URL for national files
default: https://oss.demo.uavcmlc.com/cmlc-installation/doris/all
--national-prefix <name_prefix> REQUIRED for national mode
e.g. result_2aee9754dd304ca1-a0651901906f9bb4
--national-count <n> How many files, suffix 0..n-1 (default: 6)
# Optional single region file
--region-url <url> default: https://oss.demo.uavcmlc.com/cmlc-installation/doris/all/xiongan.csv
--no-region Skip region file
# Doris stream load config
--doris-be-ip <ip> REQUIRED
--doris-be-port <port> default: 8040
--doris-user <user> default: root
--doris-pass <pass> default: empty
--db <db_name> default: cmii
--table <table_name> default: dwd_reg_grid_city_detail_dd
--column-separator <sep> default: ,
Examples:
# 1) All national(0..5) + region file, download then load:
./doris_csv_stream_load.sh all \
--national-prefix result_2aee9754dd304ca1-a0651901906f9bb4 \
--doris-be-ip 10.10.10.10
# 2) Download only:
./doris_csv_stream_load.sh download \
--national-prefix result_xxx \
--doris-be-ip 10.10.10.10
# 3) Load only (assumes files already in workdir):
./doris_csv_stream_load.sh load \
--national-prefix result_xxx \
--doris-be-ip 10.10.10.10
EOF
}
#===============================================================================
# Args Parsing
#===============================================================================
SKIP_REGION="false"
### Parse CLI arguments
# @param args string[] Command line args
# @return 0 Success
parse_args() {
if [[ $# -ge 1 ]]; then
case "$1" in
download|load|all) ACTION="$1"; shift ;;
-h|--help) usage; exit 0 ;;
*) : ;;
esac
fi
while [[ $# -gt 0 ]]; do
case "$1" in
--workdir) WORKDIR="$2"; shift 2 ;;
--log-level) LOG_LEVEL="$2"; shift 2 ;;
--national-dir-url) NATIONAL_DIR_URL="$2"; shift 2 ;;
--national-prefix) NATIONAL_PREFIX="$2"; shift 2 ;;
--national-count) NATIONAL_COUNT="$2"; shift 2 ;;
--region-url) REGION_URL="$2"; shift 2 ;;
--no-region) SKIP_REGION="true"; shift 1 ;;
--doris-be-ip) DORIS_BE_IP="$2"; shift 2 ;;
--doris-be-port) DORIS_BE_PORT="$2"; shift 2 ;;
--doris-user) DORIS_USER="$2"; shift 2 ;;
--doris-pass) DORIS_PASS="$2"; shift 2 ;;
--db) DORIS_DB="$2"; shift 2 ;;
--table) DORIS_TABLE="$2"; shift 2 ;;
--column-separator) COLUMN_SEPARATOR="$2"; shift 2 ;;
-h|--help) usage; exit 0 ;;
*)
die "Unknown argument: $1"
;;
esac
done
}
#===============================================================================
# Validation / Environment
#===============================================================================
### Validate required tools and config
# @require wget
# @require curl
# @require awk sed grep stat
# @return 0 Success
validate_env() {
command -v wget >/dev/null 2>&1 || die "wget not found"
command -v curl >/dev/null 2>&1 || die "curl not found"
command -v awk >/dev/null 2>&1 || die "awk not found"
command -v sed >/dev/null 2>&1 || die "sed not found"
command -v grep >/dev/null 2>&1 || die "grep not found"
command -v stat >/dev/null 2>&1 || die "stat not found"
[[ -n "${DORIS_BE_IP}" ]] || die "--doris-be-ip is required"
[[ -n "${NATIONAL_PREFIX}" ]] || die "--national-prefix is required (filename changes, must be provided)"
[[ "${NATIONAL_COUNT}" =~ ^[0-9]+$ ]] || die "--national-count must be an integer"
}
### Prepare working directory and temp (追加初始化日志目录)
# @return 0 Success
prepare_workdir() {
mkdir -p "${WORKDIR}"
TMPDIR="$(mktemp -d)"
DOWNLOAD_LIST_FILE="${TMPDIR}/download_list.txt"
# > stream load logs dir
STREAMLOAD_LOG_DIR="${WORKDIR}/_streamload_logs"
mkdir -p "${STREAMLOAD_LOG_DIR}"
log_debug "Workdir: ${WORKDIR}"
log_debug "StreamLoad log dir: ${STREAMLOAD_LOG_DIR}"
}
### Generate a local request id for tracing
# @param csv_path string Local CSV file path
# @return 0 Success (prints request id)
gen_request_id() {
local csv_path="$1"
local ts
ts="$(date '+%Y%m%d_%H%M%S')"
# > sanitize filename for filesystem
local base
base="$(basename "${csv_path}" | sed 's/[^a-zA-Z0-9._-]/_/g')"
printf '%s__%s__%s' "${ts}" "$$" "${base}"
}
### Extract a JSON string field value without jq (best-effort)
# @param json_file string JSON file path
# @param field_name string Field name, e.g. Status
# @return 0 Success (prints value or empty)
json_get_string() {
local json_file="$1"
local field_name="$2"
# > naive parse: "Field":"value"
grep -Eo "\"${field_name}\"[[:space:]]*:[[:space:]]*\"[^\"]*\"" "${json_file}" \
| head -n1 \
| sed -E "s/.*\"${field_name}\"[[:space:]]*:[[:space:]]*\"([^\"]*)\".*/\1/" \
|| true
}
### Extract a JSON numeric field value without jq (best-effort)
# @param json_file string JSON file path
# @param field_name string Field name, e.g. TxnId
# @return 0 Success (prints value or empty)
json_get_number() {
local json_file="$1"
local field_name="$2"
# > naive parse: "Field":12345
grep -Eo "\"${field_name}\"[[:space:]]*:[[:space:]]*[0-9]+" "${json_file}" \
| head -n1 \
| sed -E "s/.*\"${field_name}\"[[:space:]]*:[[:space:]]*([0-9]+).*/\1/" \
|| true
}
#===============================================================================
# Download List Builder
#===============================================================================
### Build download URL list into a file
# @return 0 Success
build_download_list() {
: > "${DOWNLOAD_LIST_FILE}"
# > National files: prefix_0..prefix_(count-1)
local i
for ((i=0; i< NATIONA L_COUNT; i++)); do
printf '%s/%s_%s.csv\n' "${NATIONAL_DIR_URL}" "${NATIONAL_PREFIX}" "${i}" >> "${DOWNLOAD_LIST_FILE}"
done
# > Optional region file
if [[ "${SKIP_REGION}" != "true" ]]; then
printf '%s\n' "${REGION_URL}" >> "${DOWNLOAD_LIST_FILE}"
fi
log_info "Download list prepared: $(wc -l < "${DOWNLOAD_LIST_FILE}") file(s)"
log_debug "Download list content:\n$(cat "${DOWNLOAD_LIST_FILE}")"
}
# NOTE: fix accidental space in variable name (ShellCheck would flag). Keep code correct:
# We'll patch it by redefining function properly.
build_download_list() {
: > "${DOWNLOAD_LIST_FILE}"
# > National files: prefix_0..prefix_(count-1)
local i
for ((i=0; i< NATIONAL_COUNT; i++)); do
printf '%s/%s_%s.csv\n' "${NATIONAL_DIR_URL}" "${NATIONAL_PREFIX}" "${i}" >> "${DOWNLOAD_LIST_FILE}"
done
# > Optional region file
if [[ "${SKIP_REGION}" != "true" ]]; then
printf '%s\n' "${REGION_URL}" >> "${DOWNLOAD_LIST_FILE}"
fi
log_info "Download list prepared: $(wc -l < "${DOWNLOAD_LIST_FILE}") file(s)"
}
#===============================================================================
# Download / Verify
#===============================================================================
### Get remote content length (bytes) via wget --spider
# @param url string Remote URL
# @return 0 Success (prints size or empty if unknown)
get_remote_size_bytes() {
local url="$1"
local size=""
# > wget spider to fetch headers
# Some servers may not provide Content-Length; handle gracefully.
local headers
headers="$(wget --spider --server-response --timeout="${WGET_TIMEOUT_SEC}" --tries=2 "${url}" 2>&1 || true)"
# Try to locate Content-Length
size="$(printf '%s\n' "${headers}" | awk -F': ' 'tolower($1)==" content-length" {gsub("\r","",$2); print $2}' | tail -n 1)"
if [[ -n "${size}" && "${size}" =~ ^[0-9]+$ ]]; then
printf '%s' "${size}"
else
printf '%s' ""
fi
}
### Get local file size in bytes
# @param file_path string Local file path
# @return 0 Success (prints size)
get_local_size_bytes() {
local file_path="$1"
stat -c '%s' "${file_path}"
}
### Verify local file size equals remote (if remote size known)
# @param file_path string Local file path
# @param remote_size string Remote size bytes (may be empty)
# @return 0 Success
verify_file_size() {
local file_path="$1"
local remote_size="$2"
[[ -f "${file_path}" ]] || die "File not found: ${file_path}"
if [[ -z "${remote_size}" ]]; then
# > Cannot verify by size; at least ensure file is non-empty
local local_size
local_size="$(get_local_size_bytes "${file_path}")"
[[ "${local_size}" -gt 0 ]] || die "Downloaded file is empty: ${file_path}"
log_warn "Remote Content-Length missing; only checked non-empty: ${file_path} (${local_size} bytes)"
return 0
fi
local local_size
local_size="$(get_local_size_bytes "${file_path}")"
if [[ "${local_size}" != "${remote_size}" ]]; then
die "Size mismatch for ${file_path}: local=${local_size}, remote=${remote_size}"
fi
log_info "Verified: ${file_path} (size=${local_size} bytes)"
}
### Download a single URL into workdir with resume + retries
# @param url string Remote URL
# @return 0 Success
download_one() {
local url="$1"
local filename
filename="$(basename "${url}")"
local out_path="${WORKDIR}/${filename}"
log_info "Downloading: ${url}"
local remote_size
remote_size="$(get_remote_size_bytes "${url}")"
if [[ -n "${remote_size}" ]]; then
log_debug "Remote size: ${remote_size} bytes for ${filename}"
else
log_warn "Remote size unknown (no Content-Length): ${url}"
fi
# > Use --continue for resume, --tries for retries, --timeout to avoid hanging
# > Use -O to ensure deterministic output path
wget --continue \
--tries="${WGET_RETRIES}" \
--timeout="${WGET_TIMEOUT_SEC}" \
--output-document="${out_path}" \
"${url}"
# > Ensure fully downloaded
verify_file_size "${out_path}" "${remote_size}"
}
### Download all from list file (must all succeed)
# @param list_file string File contains URLs
# @return 0 Success
run_downloads() {
local list_file="$1"
[[ -f "${list_file}" ]] || die "Download list file not found: ${list_file}"
# > Read line by line (URL per line)
while IFS= read -r url; do
[[ -n "${url}" ]] || continue
download_one "${url}"
done < "${list_file}"
log_info "All downloads completed successfully."
}
### Parse Doris stream load response and decide success (增强输出txn/label等)
# @param resp_file string Response file path
# @return 0 Success, 1 Failure
parse_stream_load_response() {
local resp_file="$1"
[[ -f "${resp_file}" ]] || die "Response file not found: ${resp_file}"
local status message txn_id label load_rows filtered_rows load_bytes
status="$(json_get_string "${resp_file}" "Status")"
message="$(json_get_string "${resp_file}" "Message")"
txn_id="$(json_get_number "${resp_file}" "TxnId")"
label="$(json_get_string "${resp_file}" "Label")"
load_rows="$(json_get_number "${resp_file}" "NumberLoadedRows")"
filtered_rows="$(json_get_number "${resp_file}" "NumberFilteredRows")"
load_bytes="$(json_get_number "${resp_file}" "LoadBytes")"
# > structured summary (easy to grep)
log_info "StreamLoadResp status=${status:-N/A} txn_id=${txn_id:-N/A} label=${label:-N/A} loaded=${load_rows:-N/A} filtered=${filtered_rows:-N/A} bytes=${load_bytes:-N/A}"
if [[ "${status}" == "Success" ]]; then
log_info "Stream Load Success. Message=${message:-N/A}"
return 0
fi
log_error "Stream Load Failed. Status=${status:-Unknown} Message=${message:-N/A}"
log_error "Full response saved at: ${resp_file}"
return 1
}
### Stream load a single CSV file to Doris (改:不再加 -H label改用本地 request_id 追踪)
# @param csv_path string Local CSV file path
# @return 0 Success
stream_load_one() {
local csv_path="$1"
[[ -f "${csv_path}" ]] || die "CSV not found: ${csv_path}"
local url="http://${DORIS_BE_IP}:${DORIS_BE_PORT}/api/${DORIS_DB}/${DORIS_TABLE}/_stream_load"
# > local trace id to correlate logs/response/files
local request_id
request_id="$(gen_request_id "${csv_path}")"
# > persist full response for tracing
local resp_file="${STREAMLOAD_LOG_DIR}/${request_id}.json"
log_info "Stream loading: ${csv_path} -> ${url} (request_id=${request_id})"
log_info "Response will be saved: ${resp_file}"
# > NOTE: do NOT set label header (per requirement)
curl --location-trusted \
--silent --show-error --fail-with-body \
--max-time "${CURL_TIMEOUT_SEC}" \
-u "${DORIS_USER}:${DORIS_PASS}" \
-H "Expect:100-continue" \
-H "column_separator:${COLUMN_SEPARATOR}" \
-T "${csv_path}" \
-X PUT \
"${url}" > "${resp_file}"
parse_stream_load_response "${resp_file}"
}
### Stream load all CSVs in workdir that match prefix list
# @param list_file string Download list file (to know exact filenames)
# @return 0 Success
run_stream_load() {
local list_file="$1"
[[ -f "${list_file}" ]] || die "Download list file not found: ${list_file}"
# > Ensure all expected files exist before loading
while IFS= read -r url; do
[[ -n "${url}" ]] || continue
local filename
filename="$(basename "${url}")"
local csv_path="${WORKDIR}/${filename}"
[[ -f "${csv_path}" ]] || die "Expected CSV missing (download not complete?): ${csv_path}"
done < "${list_file}"
log_info "All expected CSV files exist. Starting Stream Load..."
# > Load in the same order as list
while IFS= read -r url; do
[[ -n "${url}" ]] || continue
local filename
filename="$(basename "${url}")"
stream_load_one "${WORKDIR}/${filename}"
done < "${list_file}"
log_info "All Stream Load operations finished."
}
#===============================================================================
# Main
#===============================================================================
main() {
acquire_lock
parse_args "$@"
validate_env
prepare_workdir
build_download_list
case "${ACTION}" in
download)
# > Download only
run_downloads "${DOWNLOAD_LIST_FILE}"
;;
load)
# > Load only (expects files already present)
run_stream_load "${DOWNLOAD_LIST_FILE}"
;;
all)
# > Download then load
run_downloads "${DOWNLOAD_LIST_FILE}"
run_stream_load "${DOWNLOAD_LIST_FILE}"
;;
*)
die "Invalid action: ${ACTION}"
;;
esac
log_info "Done. (version=${SCRIPT_VERSION})"
}
main "$@"