#
# This is used to run hooks
#
from enum import Enum
import os, subprocess
from linuxmusterLinuxclient7 import logging, constants, user, config, computer, environment, setup, shares
[docs]
class Type(Enum):
"""
Enum containing all hook types
"""
Boot = 0
"""The onBoot hook
"""
Shutdown = 1
"""The on Shutdown hook
"""
LoginAsRoot = 2
"""The onLoginAsRoot hook
"""
Login = 3
"""The onLogin hook
"""
SessionStarted = 4
"""The onSession started hook
"""
LogoutAsRoot = 5
LoginLogoutAsRoot = 6
remoteScriptNames = {
Type.Boot: "sysstart.sh",
Type.Login: "logon.sh",
Type.SessionStarted: "sessionstart.sh",
Type.Shutdown: "sysstop.sh"
}
_remoteScriptInUserContext = {
Type.Boot: False,
Type.Login: True,
Type.SessionStarted: True,
Type.Shutdown: False
}
[docs]
def runLocalHook(hookType):
"""
Run all scripts in a local hookdir
:param hookType: The type of hook to run
:type hookType: hooks.Type
"""
hookDir = _getLocalHookDir(hookType)
logging.info(f"=== Running local hook on{hookType.name} in {hookDir} ===")
if os.path.exists(hookDir):
_prepareEnvironment()
for fileName in sorted(os.listdir(hookDir)):
filePath = hookDir + "/" + fileName
_runHookScript(filePath)
logging.info(f"===> Finished running local hook on{hookType.name} ===")
[docs]
def runRemoteHook(hookType):
"""
Run hookscript from sysvol
:param hookType: The type of hook to run
:type hookType: hooks.Type
"""
logging.info(f"=== Running remote hook on{hookType.name} ===")
rc, hookScripts = _getRemoteHookScripts(hookType)
if rc:
_prepareEnvironment()
_runHookScript(hookScripts[0])
_runHookScript(hookScripts[1])
logging.info(f"===> Finished running remote hook on{hookType.name} ===")
[docs]
def runHook(hookType):
"""
Executes hooks.runLocalHook() and hooks.runRemoteHook()
:param hookType: The type of hook to run
:type hookType: hooks.Type
"""
runLocalHook(hookType)
runRemoteHook(hookType)
[docs]
def getLocalHookScript(hookType):
"""Get the path of a local hookscript
:param hookType: The type of hook script to get the path for
:type hookType: hooks.Type
:return: The path
:rtype: str
"""
return f"{constants.scriptDir}/on{hookType.name}"
[docs]
def shouldHooksBeExecuted(overrideUsername=None):
"""Check if hooks should be executed
:param overrideUsername: Override the username to check, defaults to None
:type overrideUsername: str, optional
:return: True if hooks should be executed, fale otherwise
:rtype: bool
"""
# check if linuxmuster-linuxclient7 is setup
if not setup.isSetup():
logging.info("==== Linuxmuster-linuxclient7 is not setup, exiting ====")
return False
# check if the computer is joined
if not computer.isInAD():
logging.info("==== This Client is not joined to any domain, exiting ====")
return False
# Check if the user is an AD user
if overrideUsername == None:
overrideUsername = user.username()
if not user.isUserInAD(overrideUsername):
logging.info(f"==== {overrideUsername} is not an AD user, exiting ====")
return False
return True
# --------------------
# - Helper functions -
# --------------------
def _prepareEnvironment():
dictsAndPrefixes = {}
rc, networkConfig = config.network()
if rc:
dictsAndPrefixes["Network"] = networkConfig
rc, userConfig = user.readAttributes()
if rc:
dictsAndPrefixes["User"] = userConfig
rc, computerConfig = computer.readAttributes()
if rc:
dictsAndPrefixes["Computer"] = computerConfig
environment = _dictsToEnv(dictsAndPrefixes)
_writeEnvironment(environment)
def _getLocalHookDir(hookType):
return f"{constants.etcBaseDir}/on{hookType.name}.d"
def _getRemoteHookScripts(hookType):
if not hookType in remoteScriptNames:
return False, None
rc, networkConfig = config.network()
if not rc:
logging.error("Could not execute server hooks because the network config could not be read")
return False, None
if _remoteScriptInUserContext[hookType]:
rc, attributes = user.readAttributes()
if not rc:
logging.error("Could not execute server hooks because the user config could not be read")
return False, None
else:
rc, attributes = computer.readAttributes()
if not rc:
logging.error("Could not execute server hooks because the computer config could not be read")
return False, None
try:
domain = networkConfig["domain"]
school = attributes["sophomorixSchoolname"]
scriptName = remoteScriptNames[hookType]
except:
logging.error("Could not execute server hooks because the computer/user config is missing attributes")
return False, None
rc, sysvolPath = shares.getLocalSysvolPath()
if not rc:
logging.error("Could not execute server hook {} because the sysvol could not be mounted!\n")
return False, None
hookScriptPathTemplate = f"{sysvolPath}/{domain}/scripts/{school}/{{}}/linux/{scriptName}"
return True, [hookScriptPathTemplate.format("lmn"), hookScriptPathTemplate.format("custom")]
# parameter must be a dict of {"prefix": dict}
def _dictsToEnv(dictsAndPrefixes):
environmentDict = {}
for prefix in dictsAndPrefixes:
for key in dictsAndPrefixes[prefix]:
if type(dictsAndPrefixes[prefix][key]) is list:
environmentDict[prefix + "_" + key] = "\n".join(dictsAndPrefixes[prefix][key])
else:
environmentDict[prefix + "_" + key] = dictsAndPrefixes[prefix][key]
return environmentDict
def _runHookScript(filePath):
if not os.path.isfile(filePath):
logging.warning(f"* File {filePath} should be executed as hook but does not exist!")
return
if not os.access(filePath, os.X_OK):
logging.warning(f"* File {filePath} is in hook dir but not executable!")
return
logging.info(f"== Executing script {filePath} ==")
result = subprocess.call([filePath])
logging.info(f"==> Script {filePath} finished with exit code {result} ==")
def _writeEnvironment(environment):
for key in environment:
os.putenv(key, environment[key])