Source code for linuxmusterLinuxclient7.shares

import os, pwd, sys, shutil, re, subprocess, shutil
from linuxmusterLinuxclient7 import logging, constants, user, config, computer
from pathlib import Path

[docs] def mountShare(networkPath, shareName = None, hiddenShare = False, username = None): """ Mount a given path of a samba share :param networkPath: Network path of the share :type networkPath: str :param shareName: The name of the share (name of the folder the share is being mounted to) :type shareName: str :param hiddenShare: If the share sould be visible in Nautilus :type hiddenShare: bool :param username: The user in whoms context the share should be mounted :type username: str :return: Tuple: (success, mountpoint) :rtype: tuple """ networkPath = networkPath.replace("\\", "/") username = _getDefaultUsername(username) shareName = _getDefaultShareName(networkPath, shareName) if user.isRoot(): return _mountShare(username, networkPath, shareName, hiddenShare, True) else: mountpoint = _getShareMountpoint(networkPath, username, hiddenShare, shareName) # This will call _mountShare() directly with root privileges return _mountShareWithoutRoot(networkPath, shareName, hiddenShare), mountpoint
[docs] def getMountpointOfRemotePath(remoteFilePath, hiddenShare = False, username = None, autoMount = True): """ Get the local path of a remote samba share path. This function automatically checks if the shares is already mounted. It optionally automatically mounts the top path of the remote share: If the remote path is `//server/sysvol/linuxmuster.lan/Policies` it mounts `//server/sysvol` :param remoteFilePath: Remote path :type remoteFilePath: str :param hiddenShare: If the share sould be visible in Nautilus :type hiddenShare: bool :param username: The user in whoms context the share should be mounted :type username: str :parama autoMount: If the share should be mouted automatically if it is not already mounted :type autoMount: bool :return: Tuple: (success, mountpoint) :rtype: tuple """ if username == None and user.isRoot() and not hiddenShare: logging.error("root can only mount hidden shares!") return False, None remoteFilePath = remoteFilePath.replace("\\", "/") username = _getDefaultUsername(username) # get basepath fo remote file path # this turns //server/sysvol/linuxmuster.lan/Policies into //server/sysvol pattern = re.compile("(^\\/\\/[^\\/]+\\/[^\\/]+)") match = pattern.search(remoteFilePath) if match is None: logging.error(f"Cannot get local file path of {remoteFilePath} beacuse it is not a valid path!") return False, None shareBasepath = match.group(0) if autoMount: rc, mointpoint = mountShare(shareBasepath, hiddenShare=hiddenShare, username=username) if not rc: return False, None # calculate local path shareMountpoint = _getShareMountpoint(shareBasepath, username, hiddenShare, shareName=None) localFilePath = remoteFilePath.replace(shareBasepath, shareMountpoint) return True, localFilePath
[docs] def unmountAllSharesOfUser(username): """ Unmount all shares of a given user and safely delete the mountpoints and the parent directory. :param username: The username of the user :type username: str :return: True or False :rtype: bool """ logging.info(f"=== Trying to unmount all shares of user {username} ===") for basedir in [constants.shareMountBasepath, constants.hiddenShareMountBasepath]: shareMountBasedir = basedir.format(username) try: mountedShares = os.listdir(shareMountBasedir) except FileNotFoundError: logging.info(f"Mount basedir {shareMountBasedir} does not exist -> nothing to unmount") continue for share in mountedShares: _unmountShare(f"{shareMountBasedir}/{share}") if len(os.listdir(shareMountBasedir)) > 0: logging.warning(f"Mount basedir {shareMountBasedir} is not empty so not removed!") return False else: # Delete the directory logging.info(f"Deleting {shareMountBasedir}...") try: os.rmdir(shareMountBasedir) except Exception as e: logging.error("FAILED!") logging.exception(e) return False logging.info(f"===> Finished unmounting all shares of user {username} ===") return True
[docs] def getLocalSysvolPath(): """ Get the local mountpoint of the sysvol :return: Full path of the mountpoint :rtype: str """ rc, networkConfig = config.network() if not rc: return False, None networkPath = f"//{networkConfig['serverHostname']}/sysvol" return getMountpointOfRemotePath(networkPath, True)
# -------------------- # - Helper functions - # -------------------- # useCruidOfExecutingUser: # defines if the ticket cache of the user executing the mount command should be used. # If set to False, the cache of the user with the given username will be used. # This parameter influences the `cruid` mount option. def _mountShare(username, networkPath, shareName, hiddenShare, useCruidOfExecutingUser=False): mountpoint = _getShareMountpoint(networkPath, username, hiddenShare, shareName) mountCommandOptions = f"file_mode=0700,dir_mode=0700,sec=krb5,nodev,nosuid,mfsymlinks,nobrl,vers=3.0,user={username}" rc, networkConfig = config.network() domain = None if rc: domain = networkConfig["domain"] mountCommandOptions += f",domain={domain.upper()}" try: pwdInfo = pwd.getpwnam(username) uid = pwdInfo.pw_uid gid = pwdInfo.pw_gid mountCommandOptions += f",gid={gid},uid={uid}" if not useCruidOfExecutingUser: mountCommandOptions += f",cruid={uid}" except KeyError: uid = -1 gid = -1 logging.warning("Uid could not be found! Continuing anyway!") mountCommand = [shutil.which("mount.cifs"), "-o", mountCommandOptions, networkPath, mountpoint] logging.debug(f"Trying to mount '{networkPath}' to '{mountpoint}'") logging.debug("* Creating directory...") try: Path(mountpoint).mkdir(parents=True, exist_ok=False) except FileExistsError: # Test if a share is already mounted there if _directoryIsMountpoint(mountpoint): logging.debug("* The mountpoint is already mounted.") return True, mountpoint else: logging.warning("* The target directory already exists, proceeding anyway!") logging.debug(f"* Executing '{' '.join(mountCommand)}' ") logging.debug("* Trying to mount...") if not subprocess.call(mountCommand, stdout=subprocess.PIPE, stderr=subprocess.PIPE) == 0: logging.fatal(f"* Error mounting share {networkPath} to {mountpoint}!\n") return False, None logging.debug("* Success!") # hide the shares parent dir (/home/%user/media) in case it is not a hidden share if not hiddenShare: try: hiddenFilePath = f"{mountpoint}/../../.hidden" logging.debug(f"* hiding parent dir {hiddenFilePath}") hiddenFile = open(hiddenFilePath, "w+") hiddenFile.write(mountpoint.split("/")[-2]) hiddenFile.close() except: logging.warning(f"Could not hide parent dir of share {mountpoint}") return True, mountpoint def _unmountShare(mountpoint): # check if mountpoint exists if (not os.path.exists(mountpoint)) or (not os.path.isdir(mountpoint)): logging.warning(f"* Could not unmount {mountpoint}, it does not exist.") # Try to unmount share logging.info(f"* Trying to unmount {mountpoint}...") if not subprocess.call(["umount", mountpoint]) == 0: logging.warning("* Failed!") if _directoryIsMountpoint(mountpoint): logging.warning("* It is still mounted! Exiting!") # Do not delete in this case! We might delete userdata! return logging.info("* It is not mounted! Continuing!") # check if the mountpoint is empty if len(os.listdir(mountpoint)) > 0: logging.warning(f"* mountpoint {mountpoint} is not empty so not removed!") return # Delete the directory logging.info(f"* Deleting {mountpoint}...") try: os.rmdir(mountpoint) except Exception as e: logging.error("* FAILED!") logging.exception(e) def _getDefaultUsername(username=None): if username == None: if user.isRoot(): username = computer.hostname().upper() + "$" else: username = user.username() return username def _getDefaultShareName(networkPath, shareName=None): if shareName is None: shareName = networkPath.split("/")[-1] return shareName def _mountShareWithoutRoot(networkPath, name, hidden): mountCommand = ["sudo", "/usr/share/linuxmuster-linuxclient7/scripts/sudoTools", "mount-share", "--path", networkPath, "--name", name] if hidden: mountCommand.append("--hidden") return subprocess.call(mountCommand) == 0 def _getShareMountpoint(networkPath, username, hidden, shareName = None): logging.debug(f"Calculating mountpoint of {networkPath}") shareName = _getDefaultShareName(networkPath, shareName) if hidden: return f"{constants.hiddenShareMountBasepath.format(username)}/{shareName}" else: return f"{constants.shareMountBasepath.format(username)}/{shareName}" def _directoryIsMountpoint(dir): return subprocess.call(["mountpoint", "-q", dir]) == 0