Source code for linuxmusterLinuxclient7.gpo

 # Order of parsing: (overwriting each other)
 # 1. Local (does not apply)
 # 2. Site (does not apply)
 # 3. Domain
 # 4. OUs from top to bottom
import ldap, ldap.sasl, re, os.path
import xml.etree.ElementTree as ElementTree
from linuxmusterLinuxclient7 import logging, constants, config, user, ldapHelper, shares, computer, printers

[docs] def processAllPolicies(): """ Process all applicable policies (equivalent to gpupdate on windows) :return: True on success, False otherwise :rtype: bool """ rc, policyDnList = _findApplicablePolicies() if not rc: logging.fatal("* Error when loading applicable GPOs! Shares and printers will not work.") return False allSuccessfull = True for policyDn in policyDnList: allSuccessfull = _parsePolicy(policyDn) and allSuccessfull return allSuccessfull
# -------------------- # - Helper functions - # -------------------- """ Currently unused. May be useful for solving issue https://github.com/linuxmuster/linuxmuster-linuxclient7/issues/1 def _parseGplinkSring(string): # a gPLink strink looks like this: # [LDAP://<link>;<status>][LDAP://<link>;<status>][...] # The ragex matches <link> and <status> in two separate groups # Note: "LDAP://" is matched as .*:// to prevent issues when the capitalization changes pattern = re.compile("\\[.*:\\/\\/([^\\]]+)\\;([0-9]+)\\]") return pattern.findall(string) def _extractOUsFromDN(dn): # NOT finished! pattern = re.compile("OU=([^,]+),") ouList = pattern.findall(dn) # We need to parse from top to bottom ouList.reverse() return ouList """ def _findApplicablePolicies(): policyDnList = [] """ Do this later! # 1. Domain rc, domainAdObject = ldapHelper.searchOne(f"(distinguishedName={ldapHelper.baseDn()})") if not rc: return False, None policyDNs.extend(_parseGplinkSring(domainAdObject["gPLink"])) # 2. OU policies from top to bottom rc, userAdObject = ldapHelper.searchOne(f"(sAMAccountName={user.username()})") if not rc: return False, None print(userAdObject["distinguishedName"]) """ # For now, just parse policy sophomorix:school:<school name> rc, schoolName = user.school() if not rc: return False, None policyName = f"sophomorix:school:{schoolName}" # find policy rc, policyAdObject = ldapHelper.searchOne(f"(displayName={policyName})") if not rc: return False, None policyDnList.append((policyAdObject["distinguishedName"], 0)) return True, policyDnList def _parsePolicy(policyDn): logging.info(f"=== Parsing policy [{policyDn[0]};{policyDn[1]}] ===") """ (not needed because it's currently hardcoded) # Check if the policy is disabled if policyDn[1] == 1: logging.info("===> Policy is disabled! ===") return True """ # Find policy in AD rc, policyAdObject = ldapHelper.searchOne(f"(distinguishedName={policyDn[0]})") if not rc: logging.error("===> Could not find poilcy in AD! ===") return False # mount the share the policy is on (probaply already mounted, just to be sure) rc, localPolicyPath = shares.getMountpointOfRemotePath(policyAdObject["gPCFileSysPath"], hiddenShare = True, autoMount = True) if not rc: logging.error("===> Could not mount path of poilcy! ===") return False try: # parse drives allSuccessfull = _processDrivesPolicy(localPolicyPath) # parse printers allSuccessfull = _processPrintersPolicy(localPolicyPath) and allSuccessfull except Exception as e: logging.error("An error occured when parsing policy!") logging.exception(e) return False logging.info(f"===> Parsed policy [{policyDn[0]};{policyDn[1]}] ===") return allSuccessfull def _parseXmlFilters(filtersXmlNode): filters = [] for xmlFilter in filtersXmlNode: if xmlFilter.tag == "FilterGroup": filters.append({ "name": xmlFilter.attrib["name"].split("\\")[1], "bool": xmlFilter.attrib["bool"], "userContext": xmlFilter.attrib["userContext"], # userContext defines if the filter applies in user or computer context "type": xmlFilter.tag }) else: filters.append({ "bool": "AND", "type": "FilterInvalid" }) logging.warning(f"Unknown filter type: {xmlFilter.tag}! Assuming condition is false.") return filters def _processFilters(policies): filteredPolicies = [] for policy in policies: if not len(policy["filters"]) > 0: filteredPolicies.append(policy) else: filtersPassed = True for filter in policy["filters"]: logging.debug(f"Testing filter: {filter}") if filter["bool"] == "AND": filtersPassed = filtersPassed and _processFilter(filter) elif filter["bool"] == "OR": filtersPassed = filtersPassed or _processFilter(filter) else: logging.warning(f"Unknown boolean operation: {filter['bool']}! Assuming condition is false.") filtersPassed = False if filtersPassed: filteredPolicies.append(policy) return filteredPolicies def _processFilter(filter): if filter["type"] == "FilterGroup": if filter["userContext"] == "1": return user.isInGroup(filter["name"]) elif filter["userContext"] == "0": return computer.isInGroup(filter["name"]) elif filter["type"] == "FilterInvalid": return False def _parseXmlPolicy(policyFile): if not os.path.isfile(policyFile): logging.warning("==> XML policy file not found! ==") return False, None try: tree = ElementTree.parse(policyFile) return True, tree except Exception as e: logging.exception(e) logging.error("==> Error while reading XML policy file! ==") return False, None def _processDrivesPolicy(policyBasepath): logging.info("== Parsing a drive policy! ==") policyFile = f"{policyBasepath}/User/Preferences/Drives/Drives.xml" shareList = [] rc, tree = _parseXmlPolicy(policyFile) if not rc: logging.error("==> Error while reading Drives policy file, skipping! ==") return False xmlDrives = tree.getroot() if not xmlDrives.tag == "Drives": logging.warning("==> Drive policy xml File is of invalid format, skipping! ==") return False for xmlDrive in xmlDrives: if xmlDrive.tag != "Drive" or ("disabled" in xmlDrive.attrib and xmlDrive.attrib["disabled"] == "1"): continue drive = {} drive["filters"] = [] for xmlDriveProperty in xmlDrive: if xmlDriveProperty.tag == "Properties": try: drive["label"] = xmlDriveProperty.attrib["label"] drive["letter"] = xmlDriveProperty.attrib["letter"] drive["path"] = xmlDriveProperty.attrib["path"] drive["useLetter"] = xmlDriveProperty.attrib["useLetter"] except Exception as e: logging.error("Exception when parsing a drive policy, it is missing an attribute:") logging.exception(e) break if xmlDriveProperty.tag == "Filters": drive["filters"] = _parseXmlFilters(xmlDriveProperty) else: shareList.append(drive) shareList = _processFilters(shareList) logging.info("Found shares:") for drive in shareList: logging.info(f"* {drive['label']:15}| {drive['letter']:5}| {drive['path']:40}| {drive['useLetter']:5}") for drive in shareList: if drive["useLetter"] == "1": formattedLetter = config.shares()["letterTemplate"].format(letter=drive['letter']) shareName = f"{drive['label']}{formattedLetter}" else: shareName = drive["label"] shares.mountShare(drive["path"], shareName=shareName) logging.info("==> Successfully parsed a drive policy! ==") return True def _processPrintersPolicy(policyBasepath): logging.info("== Parsing a printer policy! ==") policyFile = f"{policyBasepath}/User/Preferences/Printers/Printers.xml" printerList = [] # test rc, tree = _parseXmlPolicy(policyFile) if not rc: logging.error("==> Error while reading Printer policy file, skipping! ==") return False xmlPrinters = tree.getroot() if not xmlPrinters.tag == "Printers": logging.warning("==> Printer policy xml File is of invalid format, skipping! ==") return False for xmlPrinter in xmlPrinters: if xmlPrinter.tag != "SharedPrinter" or ("disabled" in xmlPrinter.attrib and xmlPrinter.attrib["disabled"] == "1"): continue printer = {} printer["filters"] = [] try: printer["name"] = xmlPrinter.attrib["name"] except Exception as e: logging.error("Exception when parsing a printer policy, it is missing the name attribute.") continue for xmlPrinterProperty in xmlPrinter: if xmlPrinterProperty.tag == "Properties": try: rc, printerUrl = printers.translateSambaToIpp(xmlPrinterProperty.attrib["path"]) if rc: printer["path"] = printerUrl except Exception as e: logging.warning("Exception when parsing a printer policy XML file") logging.exception(e) break if xmlPrinterProperty.tag == "Filters": printer["filters"] = _parseXmlFilters(xmlPrinterProperty) else: printerList.append(printer) printerList = _processFilters(printerList) logging.info("Found printers:") for printer in printerList: logging.info(f"* {printer['name']}\t\t| {printer['path']}\t| {printer['filters']}") printers.installPrinter(printer["path"], printer["name"]) logging.info("==> Successfully parsed a printer policy! ==") return True