import os, subprocess, shutil
from linuxmusterLinuxclient7 import logging, setup, realm, user, constants, printers, fileHelper
[docs]
def prepareForImage(unattended=False):
"""Prepare the computer for creating an image
:param unattended: If set to True, all questions will be answered with yes, defaults to False
:type unattended: bool, optional
:return: True on success, False otherwise
:rtype: bool
"""
logging.info("#### Image preparation ####")
try:
if not _testDomainJoin(unattended):
return False
if not _upgradeSystem(unattended):
return False
if not _clearCaches(unattended):
return False
if not _clearUserHomes(unattended):
return False
if not _clearUserCache(unattended):
return False
if not _clearPrinters(unattended):
return False
if not _clearLogs(unattended):
return False
if not _emptyTrash(unattended):
return False
except KeyboardInterrupt:
print()
logging.info("Cancelled.")
return False
print()
logging.info("#### Image preparation done ####")
logging.info("#### You may create an Image now :) ####")
print()
return True
# --------------------
# - Helper functions -
# --------------------
def _askStep(step, printPlaceholder=True):
if printPlaceholder:
print()
response = input(f"Do you want to {step}? (y/n): ")
result = response in ["y", "Y", "j", "J"]
if result:
print()
return result
def _testDomainJoin(unattended=False):
if not unattended and not _askStep("test if the domain join works"):
return True
return setup.status()
def _upgradeSystem(unattended=False):
if not unattended and not _askStep("update this computer now"):
return True
# Perform an update
logging.info("Updating this computer now...")
for action in ["update", "dist-upgrade", "autoremove", "clean"]:
if not _executeAptAction(action):
return False
return True
def _executeAptAction(action):
args = ["apt", action]
if action != "update":
args.append("-y")
if subprocess.call(args) != 0:
logging.error(f"apt {action} failed!")
return False
return True
def _clearCaches(unattended=False):
if not unattended and not _askStep("clear journalctl and apt caches now"):
return True
logging.info("Cleaning caches..")
logging.info("* apt")
fileHelper.deleteAllInDirectory("/var/lib/apt/lists/")
logging.info("* journalctl")
subprocess.call(["journalctl", "--flush", "--rotate"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
subprocess.call(["journalctl", "--vacuum-time=1s"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
logging.info("Done.")
return True
def _checkLoggedInUsers():
result = subprocess.run("who -s | awk '{ print $1 }'", stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, shell=True)
if result.returncode != 0:
logging.error("Failed to get logged in users!")
return False, None
loggedInUsers = list(filter(None, result.stdout.split("\n")))
for loggedInUser in loggedInUsers:
if user.isUserInAD(loggedInUser):
logging.error(f"User {loggedInUser} is still logged in, please log out first! Aborting!")
return False
return True
def _clearUserCache(unattended=False):
if not unattended and not _askStep("clear all cached users now"):
return True
if not _checkLoggedInUsers():
return False
realm.clearUserCache()
logging.info("Done.")
return realm.clearUserCache()
def _unmountAllCifsMounts():
logging.info("Unmounting all CIFS mounts!")
if subprocess.call(["umount", "-a", "-t", "cifs", "-l"]) != 0:
logging.info("Failed!")
return False
# double check (just to be sure)
result = subprocess.run("mount", stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, shell=True)
if result.returncode != 0:
logging.error("Failed to get mounts!")
return False
if ("cifs" in result.stdout) or ("CIFS" in result.stdout):
logging.error("There are still shares mounted!")
logging.info("Use \"mount | grep cifs\" to view them.")
return False
return True
def _clearUserHomes(unattended=False):
print("\nCAUTION! This will delete all userhomes of AD users!")
if not unattended and not _askStep("clear all user homes now", False):
return True
if not _checkLoggedInUsers():
return False
if not _unmountAllCifsMounts():
logging.info("Aborting deletion of user homes to prevent deleting data on the server.")
return False
userHomes = os.listdir("/home")
logging.info("Deleting all user homes now!")
for userHome in userHomes:
if not user.isUserInAD(userHome):
logging.info(f"* {userHome} [SKIPPED]")
continue
logging.info(f"* {userHome}")
try:
shutil.rmtree(f"/home/{userHome}")
except Exception as e:
logging.error("* FAILED!")
logging.exception(e)
try:
shutil.rmtree(constants.hiddenShareMountBasepath.format(userHome))
except:
pass
logging.info("Done.")
return True
def _clearPrinters(unattended=False):
print(f"\nCAUTION! This will delete all printers of {constants.templateUser}!")
print("This makes sure that local printers do not conflict with remote printers defined by GPOs.")
if not unattended and not _askStep(f"remove all local printers of {constants.templateUser}", False):
return True
if not printers.uninstallAllPrintersOfUser(constants.templateUser):
return False
return True
def _clearLogs(unattended=False):
if not unattended and not _askStep("clear the syslog"):
return True
if not fileHelper.deleteFile("/var/log/syslog"):
return False
subprocess.call(["sudo", "service", "rsyslog", "restart"])
return True
def _emptyTrash(unattended=False):
if not unattended and not _askStep("clear the Trash of linuxadmin"):
return True
if not fileHelper.deleteAllInDirectory(f"/home/{constants.templateUser}/.local/share/Trash"):
return False
return True