691 lines
21 KiB
Python
691 lines
21 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import datetime as dt
|
|
import fcntl
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
from dataclasses import dataclass
|
|
from typing import Dict, List, Optional, Set, Tuple
|
|
from urllib.parse import SplitResult, unquote, urlsplit
|
|
|
|
|
|
LOCK_PATH = "/state/backup.lock"
|
|
SNAPSHOT_NAME_RE = re.compile(r"^\d{8}T\d{6}Z$")
|
|
|
|
BACKUP_SOURCES: List[Tuple[str, str]] = [
|
|
("/data/private", "data/private"),
|
|
("/data/groups", "data/groups"),
|
|
("/data/fslogix", "data/fslogix"),
|
|
("/state", "state"),
|
|
("/var/lib/samba/private", "samba/private"),
|
|
]
|
|
|
|
RCLONE_SCHEME_MAP = {
|
|
"sftp": "sftp",
|
|
"smb": "smb",
|
|
"cifs": "smb",
|
|
"davfs": "webdav",
|
|
"dav": "webdav",
|
|
"webdav": "webdav",
|
|
"http": "webdav",
|
|
"https": "webdav",
|
|
}
|
|
|
|
DEFAULT_BACKUP_START_HOUR = 2
|
|
DEFAULT_RETENTION_DAILY = 3
|
|
DEFAULT_RETENTION_WEEKLY = 2
|
|
DEFAULT_RETENTION_MONTHLY = 2
|
|
DEFAULT_RETENTION_YEARLY = 1
|
|
|
|
|
|
@dataclass
|
|
class Destination:
|
|
raw_url: str
|
|
scheme: str
|
|
parts: SplitResult
|
|
username: str
|
|
password: str
|
|
hostname: str
|
|
port: Optional[int]
|
|
path: str
|
|
|
|
|
|
@dataclass
|
|
class RetentionPolicy:
|
|
daily: int
|
|
weekly: int
|
|
monthly: int
|
|
yearly: int
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class Snapshot:
|
|
name: str
|
|
timestamp: dt.datetime
|
|
|
|
|
|
def log(message: str) -> None:
|
|
print(f"[backup] {message}", flush=True)
|
|
|
|
|
|
def run_command(
|
|
command: List[str],
|
|
*,
|
|
env: Optional[Dict[str, str]] = None,
|
|
input_text: Optional[str] = None,
|
|
check: bool = True,
|
|
) -> subprocess.CompletedProcess:
|
|
result = subprocess.run(
|
|
command,
|
|
capture_output=True,
|
|
text=True,
|
|
env=env,
|
|
input=input_text,
|
|
)
|
|
if check and result.returncode != 0:
|
|
output = result.stderr.strip() or result.stdout.strip()
|
|
raise RuntimeError(f"Command failed ({command[0]}): {output}")
|
|
return result
|
|
|
|
|
|
def parse_destination(raw_url: str) -> Destination:
|
|
parts = urlsplit(raw_url)
|
|
scheme = parts.scheme.lower()
|
|
if not scheme:
|
|
raise RuntimeError("BACKUP_DESTINATION must include a URL scheme")
|
|
|
|
hostname = parts.hostname or ""
|
|
if not hostname:
|
|
raise RuntimeError("BACKUP_DESTINATION must include a hostname")
|
|
|
|
return Destination(
|
|
raw_url=raw_url,
|
|
scheme=scheme,
|
|
parts=parts,
|
|
username=unquote(parts.username or ""),
|
|
password=unquote(parts.password or ""),
|
|
hostname=hostname,
|
|
port=parts.port,
|
|
path=unquote(parts.path or ""),
|
|
)
|
|
|
|
|
|
def redact_destination(raw_url: str) -> str:
|
|
parts = urlsplit(raw_url)
|
|
host = parts.hostname or ""
|
|
if ":" in host and not host.startswith("["):
|
|
host = f"[{host}]"
|
|
if parts.port is not None:
|
|
host = f"{host}:{parts.port}"
|
|
|
|
if parts.username:
|
|
username = unquote(parts.username)
|
|
if parts.password is not None:
|
|
auth = f"{username}:***"
|
|
else:
|
|
auth = username
|
|
netloc = f"{auth}@{host}"
|
|
else:
|
|
netloc = host
|
|
|
|
return f"{parts.scheme}://{netloc}{parts.path}"
|
|
|
|
|
|
def available_sources() -> List[Tuple[str, str]]:
|
|
sources: List[Tuple[str, str]] = []
|
|
for source_path, destination_path in BACKUP_SOURCES:
|
|
if os.path.isdir(source_path):
|
|
sources.append((source_path, destination_path))
|
|
else:
|
|
log(f"Skipping missing source: {source_path}")
|
|
return sources
|
|
|
|
|
|
def format_host(hostname: str) -> str:
|
|
if ":" in hostname and not hostname.startswith("["):
|
|
return f"[{hostname}]"
|
|
return hostname
|
|
|
|
|
|
def join_path(prefix: str, suffix: str) -> str:
|
|
left = prefix.strip("/")
|
|
right = suffix.strip("/")
|
|
if left and right:
|
|
return f"{left}/{right}"
|
|
return left or right
|
|
|
|
|
|
def parse_int_env(
|
|
name: str, default: int, *, minimum: int, maximum: Optional[int]
|
|
) -> int:
|
|
raw_value = os.getenv(name, "").strip()
|
|
if not raw_value:
|
|
return default
|
|
|
|
try:
|
|
value = int(raw_value)
|
|
except ValueError:
|
|
log(f"Invalid {name}='{raw_value}', using default {default}")
|
|
return default
|
|
|
|
if value < minimum or (maximum is not None and value > maximum):
|
|
if maximum is None:
|
|
log(f"Invalid {name}='{raw_value}', using default {default}")
|
|
else:
|
|
log(
|
|
f"Invalid {name}='{raw_value}' (expected {minimum}-{maximum}), using default {default}"
|
|
)
|
|
return default
|
|
|
|
return value
|
|
|
|
|
|
def parse_retention_policy() -> RetentionPolicy:
|
|
return RetentionPolicy(
|
|
daily=parse_int_env(
|
|
"BACKUP_RETENTION_DAILY", DEFAULT_RETENTION_DAILY, minimum=0, maximum=None
|
|
),
|
|
weekly=parse_int_env(
|
|
"BACKUP_RETENTION_WEEKLY",
|
|
DEFAULT_RETENTION_WEEKLY,
|
|
minimum=0,
|
|
maximum=None,
|
|
),
|
|
monthly=parse_int_env(
|
|
"BACKUP_RETENTION_MONTHLY",
|
|
DEFAULT_RETENTION_MONTHLY,
|
|
minimum=0,
|
|
maximum=None,
|
|
),
|
|
yearly=parse_int_env(
|
|
"BACKUP_RETENTION_YEARLY",
|
|
DEFAULT_RETENTION_YEARLY,
|
|
minimum=0,
|
|
maximum=None,
|
|
),
|
|
)
|
|
|
|
|
|
def parse_smb_identity(username: str) -> Tuple[str, str]:
|
|
if not username:
|
|
return "", ""
|
|
if ";" in username:
|
|
domain, user = username.split(";", 1)
|
|
return domain, user
|
|
if "\\" in username:
|
|
domain, user = username.split("\\", 1)
|
|
return domain, user
|
|
return "", username
|
|
|
|
|
|
def obscure_secret(secret: str) -> str:
|
|
result = run_command(["rclone", "obscure", secret])
|
|
value = result.stdout.strip()
|
|
if not value:
|
|
raise RuntimeError("rclone obscure returned an empty value")
|
|
return value
|
|
|
|
|
|
def parse_snapshot_name(name: str) -> Optional[dt.datetime]:
|
|
if not SNAPSHOT_NAME_RE.match(name):
|
|
return None
|
|
try:
|
|
parsed = dt.datetime.strptime(name, "%Y%m%dT%H%M%SZ")
|
|
except ValueError:
|
|
return None
|
|
return parsed.replace(tzinfo=dt.timezone.utc)
|
|
|
|
|
|
def choose_snapshot_name(existing_names: Set[str]) -> str:
|
|
base = dt.datetime.now(dt.timezone.utc)
|
|
for offset in range(0, 120):
|
|
candidate = (base + dt.timedelta(seconds=offset)).strftime("%Y%m%dT%H%M%SZ")
|
|
if candidate not in existing_names:
|
|
return candidate
|
|
raise RuntimeError("Unable to generate a unique snapshot name")
|
|
|
|
|
|
def is_week_start(timestamp: dt.datetime) -> bool:
|
|
return timestamp.weekday() == 0
|
|
|
|
|
|
def is_month_start(timestamp: dt.datetime) -> bool:
|
|
return timestamp.day == 1
|
|
|
|
|
|
def is_year_start(timestamp: dt.datetime) -> bool:
|
|
return timestamp.month == 1 and timestamp.day == 1
|
|
|
|
|
|
def select_newest(snapshot_pool: List[Snapshot], limit: int) -> Set[str]:
|
|
if limit <= 0:
|
|
return set()
|
|
sorted_pool = sorted(snapshot_pool, key=lambda entry: entry.timestamp, reverse=True)
|
|
return {entry.name for entry in sorted_pool[:limit]}
|
|
|
|
|
|
def compute_retained_snapshots(
|
|
snapshots: List[Snapshot], policy: RetentionPolicy
|
|
) -> Set[str]:
|
|
retained: Set[str] = set()
|
|
retained.update(select_newest(snapshots, policy.daily))
|
|
retained.update(
|
|
select_newest(
|
|
[entry for entry in snapshots if is_week_start(entry.timestamp)],
|
|
policy.weekly,
|
|
)
|
|
)
|
|
retained.update(
|
|
select_newest(
|
|
[entry for entry in snapshots if is_month_start(entry.timestamp)],
|
|
policy.monthly,
|
|
)
|
|
)
|
|
retained.update(
|
|
select_newest(
|
|
[entry for entry in snapshots if is_year_start(entry.timestamp)],
|
|
policy.yearly,
|
|
)
|
|
)
|
|
return retained
|
|
|
|
|
|
class RcloneBackend:
|
|
def __init__(self, destination: Destination):
|
|
self.base_prefix = ""
|
|
options, self.base_prefix = self._build_remote(destination)
|
|
self.config_path = self._write_config(options)
|
|
|
|
def close(self) -> None:
|
|
if os.path.exists(self.config_path):
|
|
os.remove(self.config_path)
|
|
|
|
def _run(
|
|
self,
|
|
args: List[str],
|
|
*,
|
|
check: bool = True,
|
|
input_text: Optional[str] = None,
|
|
) -> subprocess.CompletedProcess:
|
|
return run_command(
|
|
["rclone", *args, "--config", self.config_path],
|
|
check=check,
|
|
input_text=input_text,
|
|
)
|
|
|
|
def _build_remote(self, destination: Destination) -> Tuple[Dict[str, str], str]:
|
|
backend = RCLONE_SCHEME_MAP.get(destination.scheme)
|
|
if backend is None:
|
|
supported = ", ".join(["rsync", *sorted(RCLONE_SCHEME_MAP.keys())])
|
|
raise RuntimeError(
|
|
f"Unsupported BACKUP_DESTINATION scheme '{destination.scheme}'. Supported schemes: {supported}"
|
|
)
|
|
|
|
options: Dict[str, str] = {"type": backend}
|
|
remote_prefix = ""
|
|
|
|
if backend == "sftp":
|
|
options["host"] = destination.hostname
|
|
if destination.port is not None:
|
|
options["port"] = str(destination.port)
|
|
if destination.username:
|
|
options["user"] = destination.username
|
|
if destination.password:
|
|
options["pass"] = obscure_secret(destination.password)
|
|
remote_prefix = destination.path.strip("/")
|
|
return options, remote_prefix
|
|
|
|
if backend == "smb":
|
|
path_segments = [
|
|
segment for segment in destination.path.split("/") if segment
|
|
]
|
|
if not path_segments:
|
|
raise RuntimeError(
|
|
"smb destinations must include a share name in the path (example: smb://user:pass@host/share/path)"
|
|
)
|
|
|
|
share_name = path_segments[0]
|
|
remote_prefix = "/".join(path_segments[1:])
|
|
|
|
options["host"] = destination.hostname
|
|
options["share"] = share_name
|
|
if destination.port is not None:
|
|
options["port"] = str(destination.port)
|
|
|
|
domain, user = parse_smb_identity(destination.username)
|
|
if user:
|
|
options["user"] = user
|
|
if domain:
|
|
options["domain"] = domain
|
|
if destination.password:
|
|
options["pass"] = obscure_secret(destination.password)
|
|
|
|
return options, remote_prefix
|
|
|
|
webdav_scheme = (
|
|
destination.scheme if destination.scheme in {"http", "https"} else "https"
|
|
)
|
|
host = format_host(destination.hostname)
|
|
if destination.port is not None:
|
|
host = f"{host}:{destination.port}"
|
|
webdav_path = destination.path or "/"
|
|
|
|
options["url"] = f"{webdav_scheme}://{host}{webdav_path}"
|
|
options["vendor"] = "other"
|
|
if destination.username:
|
|
options["user"] = destination.username
|
|
if destination.password:
|
|
options["pass"] = obscure_secret(destination.password)
|
|
|
|
return options, remote_prefix
|
|
|
|
def _write_config(self, options: Dict[str, str]) -> str:
|
|
with tempfile.NamedTemporaryFile("w", encoding="utf-8", delete=False) as handle:
|
|
handle.write("[backup]\n")
|
|
for key, value in options.items():
|
|
handle.write(f"{key} = {value}\n")
|
|
config_path = handle.name
|
|
|
|
os.chmod(config_path, 0o600)
|
|
return config_path
|
|
|
|
def _snapshots_root(self) -> str:
|
|
return join_path(self.base_prefix, "snapshots")
|
|
|
|
def _snapshot_root(self, snapshot_name: str) -> str:
|
|
return join_path(self._snapshots_root(), snapshot_name)
|
|
|
|
def sync_source(
|
|
self, snapshot_name: str, source_path: str, destination_path: str
|
|
) -> None:
|
|
remote_path = join_path(self._snapshot_root(snapshot_name), destination_path)
|
|
self._run(
|
|
[
|
|
"sync",
|
|
f"{source_path}/",
|
|
f"backup:{remote_path}",
|
|
"--create-empty-src-dirs",
|
|
]
|
|
)
|
|
|
|
def write_marker(self, snapshot_name: str) -> None:
|
|
marker_path = join_path(self._snapshot_root(snapshot_name), ".backup_complete")
|
|
self._run(
|
|
["rcat", f"backup:{marker_path}"],
|
|
input_text=f"{dt.datetime.now(dt.timezone.utc).isoformat()}\n",
|
|
)
|
|
|
|
def _snapshot_has_marker(self, snapshot_name: str) -> bool:
|
|
result = self._run(
|
|
[
|
|
"lsf",
|
|
f"backup:{self._snapshot_root(snapshot_name)}",
|
|
"--files-only",
|
|
"--include",
|
|
".backup_complete",
|
|
],
|
|
check=False,
|
|
)
|
|
if result.returncode != 0:
|
|
return False
|
|
return any(
|
|
line.strip() == ".backup_complete" for line in result.stdout.splitlines()
|
|
)
|
|
|
|
def list_snapshots(self) -> List[str]:
|
|
result = self._run(
|
|
["lsf", f"backup:{self._snapshots_root()}", "--dirs-only", "--format", "p"],
|
|
check=False,
|
|
)
|
|
if result.returncode != 0:
|
|
output = (result.stderr.strip() or result.stdout.strip()).lower()
|
|
if "not found" in output or "doesn't exist" in output:
|
|
return []
|
|
raise RuntimeError(result.stderr.strip() or result.stdout.strip())
|
|
|
|
names: List[str] = []
|
|
for line in result.stdout.splitlines():
|
|
candidate = line.strip().rstrip("/")
|
|
if not SNAPSHOT_NAME_RE.match(candidate):
|
|
continue
|
|
if self._snapshot_has_marker(candidate):
|
|
names.append(candidate)
|
|
return sorted(set(names))
|
|
|
|
def delete_snapshot(self, snapshot_name: str) -> None:
|
|
result = self._run(
|
|
["purge", f"backup:{self._snapshot_root(snapshot_name)}"],
|
|
check=False,
|
|
)
|
|
if result.returncode != 0:
|
|
log(
|
|
f"Failed to delete snapshot {snapshot_name}: {result.stderr.strip() or result.stdout.strip()}"
|
|
)
|
|
|
|
|
|
class RsyncBackend:
|
|
def __init__(self, destination: Destination):
|
|
module_path = destination.path.lstrip("/")
|
|
if not module_path:
|
|
raise RuntimeError(
|
|
"rsync destinations must include a module path (example: rsync://user:pass@host/module/path)"
|
|
)
|
|
|
|
host = format_host(destination.hostname)
|
|
if destination.port is not None:
|
|
host = f"{host}:{destination.port}"
|
|
|
|
user_prefix = f"{destination.username}@" if destination.username else ""
|
|
self.remote_base = f"rsync://{user_prefix}{host}/{module_path.rstrip('/')}"
|
|
self.command_env = os.environ.copy()
|
|
if destination.password:
|
|
self.command_env["RSYNC_PASSWORD"] = destination.password
|
|
|
|
def close(self) -> None:
|
|
return
|
|
|
|
def _remote_path(self, path: str) -> str:
|
|
trimmed = path.strip("/")
|
|
if not trimmed:
|
|
return self.remote_base
|
|
return f"{self.remote_base}/{trimmed}"
|
|
|
|
def sync_source(
|
|
self, snapshot_name: str, source_path: str, destination_path: str
|
|
) -> None:
|
|
target = self._remote_path(
|
|
join_path(f"snapshots/{snapshot_name}", destination_path)
|
|
)
|
|
run_command(
|
|
["rsync", "-a", "--delete", f"{source_path}/", f"{target}/"],
|
|
env=self.command_env,
|
|
)
|
|
|
|
def write_marker(self, snapshot_name: str) -> None:
|
|
marker_remote = self._remote_path(f"snapshots/{snapshot_name}/.backup_complete")
|
|
marker_file = None
|
|
try:
|
|
with tempfile.NamedTemporaryFile(
|
|
"w", encoding="utf-8", delete=False
|
|
) as handle:
|
|
marker_file = handle.name
|
|
handle.write(f"{dt.datetime.now(dt.timezone.utc).isoformat()}\n")
|
|
run_command(
|
|
["rsync", "-a", marker_file, marker_remote], env=self.command_env
|
|
)
|
|
finally:
|
|
if marker_file and os.path.exists(marker_file):
|
|
os.remove(marker_file)
|
|
|
|
def _snapshot_has_marker(self, snapshot_name: str) -> bool:
|
|
marker_remote = self._remote_path(f"snapshots/{snapshot_name}/.backup_complete")
|
|
result = run_command(
|
|
["rsync", "--list-only", marker_remote],
|
|
env=self.command_env,
|
|
check=False,
|
|
)
|
|
return result.returncode == 0
|
|
|
|
def list_snapshots(self) -> List[str]:
|
|
root = self._remote_path("snapshots")
|
|
result = run_command(
|
|
["rsync", "--list-only", f"{root}/"],
|
|
env=self.command_env,
|
|
check=False,
|
|
)
|
|
if result.returncode != 0:
|
|
output = result.stderr.strip() or result.stdout.strip()
|
|
lower = output.lower()
|
|
if (
|
|
"no such file" in lower
|
|
or "not found" in lower
|
|
or "chdir failed" in lower
|
|
):
|
|
return []
|
|
raise RuntimeError(output)
|
|
|
|
names: List[str] = []
|
|
for line in result.stdout.splitlines():
|
|
line = line.strip()
|
|
if not line or line.startswith("receiving"):
|
|
continue
|
|
parts = line.split()
|
|
if not parts:
|
|
continue
|
|
candidate = parts[-1].rstrip("/")
|
|
if not SNAPSHOT_NAME_RE.match(candidate):
|
|
continue
|
|
if self._snapshot_has_marker(candidate):
|
|
names.append(candidate)
|
|
return sorted(set(names))
|
|
|
|
def delete_snapshot(self, snapshot_name: str) -> None:
|
|
snapshot_remote = self._remote_path(f"snapshots/{snapshot_name}")
|
|
empty_dir = tempfile.mkdtemp(prefix="backup-empty-")
|
|
try:
|
|
run_command(
|
|
["rsync", "-a", "--delete", f"{empty_dir}/", f"{snapshot_remote}/"],
|
|
env=self.command_env,
|
|
check=False,
|
|
)
|
|
|
|
run_command(
|
|
[
|
|
"rsync",
|
|
"-a",
|
|
"--delete",
|
|
"--prune-empty-dirs",
|
|
"--include",
|
|
f"/{snapshot_name}/***",
|
|
"--exclude",
|
|
"*",
|
|
f"{empty_dir}/",
|
|
f"{self._remote_path('snapshots')}/",
|
|
],
|
|
env=self.command_env,
|
|
check=False,
|
|
)
|
|
finally:
|
|
os.rmdir(empty_dir)
|
|
|
|
|
|
def build_backend(destination: Destination):
|
|
if destination.scheme == "rsync":
|
|
return RsyncBackend(destination)
|
|
return RcloneBackend(destination)
|
|
|
|
|
|
def parse_snapshot_inventory(snapshot_names: List[str]) -> List[Snapshot]:
|
|
snapshots: List[Snapshot] = []
|
|
for name in snapshot_names:
|
|
timestamp = parse_snapshot_name(name)
|
|
if timestamp is None:
|
|
continue
|
|
snapshots.append(Snapshot(name=name, timestamp=timestamp))
|
|
snapshots.sort(key=lambda entry: entry.timestamp, reverse=True)
|
|
return snapshots
|
|
|
|
|
|
def run_backup() -> int:
|
|
destination_url = os.getenv("BACKUP_DESTINATION", "").strip()
|
|
if not destination_url:
|
|
log("BACKUP_DESTINATION is unset, skipping backup")
|
|
return 0
|
|
|
|
policy = parse_retention_policy()
|
|
sources = available_sources()
|
|
if not sources:
|
|
log("No backup sources are available, skipping backup")
|
|
return 0
|
|
|
|
destination = parse_destination(destination_url)
|
|
backend = build_backend(destination)
|
|
try:
|
|
log(f"Starting backup to {redact_destination(destination.raw_url)}")
|
|
|
|
existing = set(backend.list_snapshots())
|
|
snapshot_name = choose_snapshot_name(existing)
|
|
log(f"Creating snapshot {snapshot_name}")
|
|
|
|
for source_path, destination_path in sources:
|
|
log(f"Syncing {source_path}")
|
|
backend.sync_source(snapshot_name, source_path, destination_path)
|
|
|
|
backend.write_marker(snapshot_name)
|
|
|
|
all_snapshot_names = backend.list_snapshots()
|
|
snapshots = parse_snapshot_inventory(all_snapshot_names)
|
|
retained = compute_retained_snapshots(snapshots, policy)
|
|
|
|
deleted_count = 0
|
|
for snapshot in snapshots:
|
|
if snapshot.name in retained:
|
|
continue
|
|
log(f"Pruning snapshot {snapshot.name}")
|
|
backend.delete_snapshot(snapshot.name)
|
|
deleted_count += 1
|
|
|
|
log(
|
|
f"Backup completed (snapshots total={len(snapshots)}, retained={len(retained)}, pruned={deleted_count})"
|
|
)
|
|
return 0
|
|
finally:
|
|
backend.close()
|
|
|
|
|
|
def with_lock() -> int:
|
|
lock_dir = os.path.dirname(LOCK_PATH)
|
|
if lock_dir and not os.path.isdir(lock_dir):
|
|
os.makedirs(lock_dir, exist_ok=True)
|
|
with open(LOCK_PATH, "w", encoding="utf-8") as lock_file:
|
|
try:
|
|
fcntl.flock(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
|
except BlockingIOError:
|
|
log("Another backup process is already running; skipping this cycle")
|
|
return 0
|
|
return run_backup()
|
|
|
|
|
|
def main() -> int:
|
|
backup_hour = parse_int_env(
|
|
"BACKUP_START_HOUR",
|
|
DEFAULT_BACKUP_START_HOUR,
|
|
minimum=0,
|
|
maximum=23,
|
|
)
|
|
if backup_hour != DEFAULT_BACKUP_START_HOUR:
|
|
log(f"Configured backup start hour is {backup_hour}:00")
|
|
|
|
try:
|
|
return with_lock()
|
|
except Exception as exc: # pylint: disable=broad-except
|
|
log(f"ERROR: {exc}")
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|