excluding expired/locked users from having a private folder; fixed groups (sqlite3)

This commit is contained in:
Ludwig Lehnert
2026-02-18 19:13:25 +01:00
parent 6d9476c578
commit 43bcb08548
3 changed files with 96 additions and 14 deletions

View File

@@ -22,8 +22,11 @@ PRIVATE_ROOT = "/data/private"
PUBLIC_ROOT = "/data/public"
GENERATED_CONF = "/etc/samba/generated/shares.conf"
LDAP_FILTER = "(&(objectClass=group)(sAMAccountName=FileShare_*))"
GROUP_PREFIX = "FileShare_"
LDAP_FILTER = (
"(&(objectClass=group)(|(sAMAccountName=FileShare_*)(sAMAccountName=FS_*)))"
)
GROUP_PREFIXES = ("FileShare_", "FS_")
USER_STATUS_FILTER = "(&(objectClass=user)(!(objectClass=computer))(sAMAccountName=*))"
REQUIRED_ENV = ["REALM", "WORKGROUP", "DOMAIN"]
ATTR_RE = re.compile(r"^([^:]+)(::?):\s*(.*)$")
@@ -32,12 +35,16 @@ PRIVATE_SKIP_EXACT = {
"krbtgt",
"administrator",
"guest",
"gast",
"defaultaccount",
"wdagutilityaccount",
"fileshare_serviceacc",
"fileshare_serviceaccount",
}
PRIVATE_SKIP_PREFIXES = ("msol_", "fileshare_service", "aad_")
UAC_ACCOUNTDISABLE = 0x0002
UAC_LOCKOUT = 0x0010
AD_NEVER_EXPIRES_VALUES = {0, 9223372036854775807}
def now_utc() -> str:
@@ -81,7 +88,7 @@ def run_command(command: List[str], check: bool = True) -> subprocess.CompletedP
return result
def parse_groups_from_ldap_output(output: str) -> List[Dict[str, str]]:
def parse_ldap_entries(output: str) -> List[Dict[str, Tuple[str, bool]]]:
entries: List[Dict[str, Tuple[str, bool]]] = []
current: Dict[str, Tuple[str, bool]] = {}
@@ -105,6 +112,20 @@ def parse_groups_from_ldap_output(output: str) -> List[Dict[str, str]]:
if current:
entries.append(current)
return entries
def derive_share_name(sam_account_name: str) -> Optional[str]:
for prefix in GROUP_PREFIXES:
if sam_account_name.startswith(prefix):
share_name = sam_account_name[len(prefix) :]
return share_name if share_name else None
return None
def parse_groups_from_ldap_output(output: str) -> List[Dict[str, str]]:
entries = parse_ldap_entries(output)
groups: List[Dict[str, str]] = []
for entry in entries:
if "objectGUID" not in entry or "sAMAccountName" not in entry:
@@ -112,10 +133,7 @@ def parse_groups_from_ldap_output(output: str) -> List[Dict[str, str]]:
sam_value, _ = entry["sAMAccountName"]
sam = sam_value.strip()
if not sam.startswith(GROUP_PREFIX):
continue
share_name = sam[len(GROUP_PREFIX) :]
share_name = derive_share_name(sam)
if not share_name:
continue
@@ -202,6 +220,65 @@ def fetch_fileshare_groups() -> List[Dict[str, str]]:
return fetch_groups_via_ldap_bind()
def windows_filetime_now() -> int:
unix_epoch_seconds = int(dt.datetime.now(dt.timezone.utc).timestamp())
return (unix_epoch_seconds + 11644473600) * 10000000
def parse_int(value: str, default: int = 0) -> int:
try:
return int(value.strip())
except (ValueError, AttributeError):
return default
def fetch_non_login_users() -> set:
command = [
"net",
"ads",
"search",
"-P",
USER_STATUS_FILTER,
"sAMAccountName",
"userAccountControl",
"accountExpires",
"lockoutTime",
]
result = run_command(command, check=False)
if result.returncode != 0:
log(
"net ads search for account status failed; private folder filtering will use static skip rules only"
)
return set()
blocked = set()
now_filetime = windows_filetime_now()
for entry in parse_ldap_entries(result.stdout):
if "sAMAccountName" not in entry:
continue
username = entry["sAMAccountName"][0].strip().lower()
if not username:
continue
uac = parse_int(entry.get("userAccountControl", ("0", False))[0], 0)
account_expires = parse_int(entry.get("accountExpires", ("0", False))[0], 0)
lockout_time = parse_int(entry.get("lockoutTime", ("0", False))[0], 0)
is_disabled = bool(uac & UAC_ACCOUNTDISABLE)
is_locked = bool(uac & UAC_LOCKOUT) or lockout_time > 0
is_expired = (
account_expires not in AD_NEVER_EXPIRES_VALUES
and account_expires <= now_filetime
)
if is_disabled or is_locked or is_expired:
blocked.add(username)
return blocked
def open_db() -> sqlite3.Connection:
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
conn = sqlite3.connect(DB_PATH)
@@ -455,7 +532,7 @@ def set_group_acl_with_admin(
)
def list_domain_users() -> List[str]:
def list_domain_users(non_login_users: set) -> List[str]:
result = run_command(["wbinfo", "-u"], check=False)
if result.returncode != 0:
log("wbinfo -u failed; skipping private directory sync")
@@ -472,6 +549,8 @@ def list_domain_users() -> List[str]:
continue
if should_skip_private_user(candidate):
continue
if candidate.lower() in non_login_users:
continue
users.append(candidate)
return sorted(set(users))
@@ -532,7 +611,8 @@ def sync_private_directories() -> None:
run_command(["setfacl", "-b", PRIVATE_ROOT], check=False)
os.chmod(PRIVATE_ROOT, 0o555)
users = list_domain_users()
non_login_users = fetch_non_login_users()
users = list_domain_users(non_login_users)
for username in users:
uid = resolve_user_uid_flexible(workgroup, username)
if uid is None: