Skip to content
Snippets Groups Projects
Commit 8ae0bd62 authored by Jeff Vander Stoep's avatar Jeff Vander Stoep Committed by android-build-merger
Browse files

Verify correct application of labels and attributes

am: bdfc0301

Change-Id: Ifafca851d39158cff053f4205583dd22f89070c8
parents dde38d9b bdfc0301
No related branches found
No related tags found
No related merge requests found
subdirs = ["tests"]
cc_library_host_shared {
name: "libsepolwrap",
srcs: ["sepol_wrap.cpp"],
shared_libs: ["libbase", "libsepol"],
cflags: ["-Wall", "-Werror",],
export_include_dirs: ["include"],
}
#ifdef __cplusplus
extern "C" {
#endif
int get_allow_rule(char *out, size_t len, void *policydbp, void *avtab_iterp);
bool init_libsepol(const char *policy_path);
void *load_policy(const char *policy_path);
void destroy_policy(void *policydbp);
void *init_avtab(void *policydbp);
void *init_cond_avtab(void *policydbp);
void destroy_avtab(void *avtab_iterp);
int get_type(char *out, size_t max_size, void *policydbp, void *type_iterp);
void *init_type_iter(void *policydbp, const char *type, bool is_attr);
void destroy_type_iter(void *type_iterp);
#ifdef __cplusplus
}
#endif
from ctypes import *
import re
import os
class TERule:
def __init__(self, rule):
data = rule.split(',')
self.flavor = data[0]
self.sctx = data[1]
self.tctx = data[2]
self.tclass = data[3]
self.perms = set((data[4].strip()).split(' '))
self.rule = rule
class Policy:
__Rules = None
__FcDict = None
__libsepolwrap = None
__policydbP = None
# Return all file_contexts entries that map to the input Type.
def QueryFc(self, Type):
if Type in self.__FcDict:
return self.__FcDict[Type]
else:
return None
# Return all attributes associated with a type if IsAttr=False or
# all types associated with an attribute if IsAttr=True
def QueryTypeAttribute(self, Type, IsAttr):
TypeIterP = self.__libsepolwrap.init_type_iter(self.__policydbP,
create_string_buffer(Type), c_bool(IsAttr))
if (TypeIterP == None):
sys.exit("Failed to initialize type iterator")
buf = create_string_buffer(2048)
while True:
ret = self.__libsepolwrap.get_type(buf, c_int(2048),
self.__policydbP, TypeIterP)
if ret == 0:
yield buf.value
continue
if ret == 1:
break;
# We should never get here.
sys.exit("Failed to import policy")
self.__libsepolwrap.destroy_type_iter(TypeIterP)
# Return all TERules that match:
# (any scontext) or (any tcontext) or (any tclass) or (any perms),
# perms.
# Any unspecified paramenter will match all.
#
# Example: QueryTERule(tcontext=["foo", "bar"], perms=["entrypoint"])
# Will return any rule with:
# (tcontext="foo" or tcontext="bar") and ("entrypoint" in perms)
def QueryTERule(self, **kwargs):
if self.__Rules is None:
self.__InitTERules()
for Rule in self.__Rules:
# Match source type
if "scontext" in kwargs and Rule.sctx not in kwargs['scontext']:
continue
# Match target type
if "tcontext" in kwargs and Rule.tctx not in kwargs['tcontext']:
continue
# Match target class
if "tclass" in kwargs and Rule.tclass not in kwargs['tclass']:
continue
# Match any perms
if "perms" in kwargs and not bool(Rule.perms & set(kwargs['perms'])):
continue
yield Rule
def __GetTERules(self, policydbP, avtabIterP):
if self.__Rules is None:
self.__Rules = set()
buf = create_string_buffer(2048)
ret = 0
while True:
ret = self.__libsepolwrap.get_allow_rule(buf, c_int(2048), policydbP, avtabIterP)
if ret == 0:
Rule = TERule(buf.value)
self.__Rules.add(Rule)
continue
if ret == 1:
break;
# We should never get here.
sys.exit("Failed to import policy")
def __InitTERules(self):
avtabIterP = self.__libsepolwrap.init_avtab(self.__policydbP)
if (avtabIterP == None):
sys.exit("Failed to initialize avtab")
self.__GetTERules(self.__policydbP, avtabIterP)
self.__libsepolwrap.destroy_avtab(avtabIterP)
avtabIterP = self.__libsepolwrap.init_cond_avtab(self.__policydbP)
if (avtabIterP == None):
sys.exit("Failed to initialize conditional avtab")
self.__GetTERules(self.__policydbP, avtabIterP)
self.__libsepolwrap.destroy_avtab(avtabIterP)
# load ctypes-ified libsepol wrapper
def __InitLibsepolwrap(self):
self.__libsepolwrap = CDLL("libsepolwrap.so")
# load file_contexts
def __InitFC(self, FcPaths):
fc = []
for path in FcPaths:
if not os.path.exists(path):
sys.exit("file_contexts file " + path + " does not exist.")
fd = open(path, "r")
fc += fd.readlines()
fd.close()
self.__FcDict = {}
for i in fc:
rec = i.split()
try:
t = rec[-1].split(":")[2]
if t in self.__FcDict:
self.__FcDict[t].append(rec[0])
else:
self.__FcDict[t] = [rec[0]]
except:
pass
# load policy
def __InitPolicy(self, PolicyPath):
self.__policydbP = self.__libsepolwrap.load_policy(create_string_buffer(PolicyPath))
if (self.__policydbP is None):
sys.exit("Failed to load policy")
def __init__(self, PolicyPath, FcPaths):
self.__InitLibsepolwrap()
self.__InitFC(FcPaths)
self.__InitPolicy(PolicyPath)
def __del__(self):
if self.__policydbP is not None:
self.__libsepolwrap.destroy_policy(self.__policydbP)
#include <stdio.h>
#include <string>
#include <sstream>
#include <stdlib.h>
#include <unistd.h>
#include <iostream>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sepol/policydb/avtab.h>
#include <sepol/policydb/policydb.h>
#include <sepol/policydb/services.h>
#include <sepol/policydb/util.h>
#include <sys/types.h>
#include <fstream>
#include <android-base/file.h>
#include <android-base/strings.h>
#include <sepol_wrap.h>
struct type_iter {
type_datum *d;
ebitmap_node *n;
unsigned int length;
unsigned int bit;
};
void *init_type_iter(void *policydbp, const char *type, bool is_attr)
{
policydb_t *db = static_cast<policydb_t *>(policydbp);
struct type_iter *out = (struct type_iter *)
calloc(1, sizeof(struct type_iter));
if (!out) {
std::cerr << "Failed to allocate type type iterator" << std::endl;
return NULL;
}
out->d = static_cast<type_datum *>(hashtab_search(db->p_types.table, type));
if (is_attr && out->d->flavor != TYPE_ATTRIB) {
std::cerr << "\"" << type << "\" MUST be an attribute in the policy" << std::endl;
free(out);
return NULL;
} else if (!is_attr && out->d->flavor !=TYPE_TYPE) {
std::cerr << "\"" << type << "\" MUST be a type in the policy" << std::endl;
free(out);
return NULL;
}
if (is_attr) {
out->bit = ebitmap_start(&db->attr_type_map[out->d->s.value - 1], &out->n);
out->length = ebitmap_length(&db->attr_type_map[out->d->s.value - 1]);
} else {
out->bit = ebitmap_start(&db->type_attr_map[out->d->s.value - 1], &out->n);
out->length = ebitmap_length(&db->type_attr_map[out->d->s.value - 1]);
}
return static_cast<void *>(out);
}
void destroy_type_iter(void *type_iterp)
{
struct type_iter *type_i = static_cast<struct type_iter *>(type_iterp);
free(type_i);
}
/*
* print allow rule into *out buffer.
*
* Returns -1 on error.
* Returns 0 on successfully reading an avtab entry.
* Returns 1 on complete
*/
int get_type(char *out, size_t max_size, void *policydbp, void *type_iterp)
{
size_t len;
policydb_t *db = static_cast<policydb_t *>(policydbp);
struct type_iter *i = static_cast<struct type_iter *>(type_iterp);
for (; i->bit < i->length; i->bit = ebitmap_next(&i->n, i->bit)) {
if (!ebitmap_node_get_bit(i->n, i->bit)) {
continue;
}
len = snprintf(out, max_size, "%s", db->p_type_val_to_name[i->bit]);
if (len >= max_size) {
std::cerr << "type name exceeds buffer size." << std::endl;
return -1;
}
i->bit = ebitmap_next(&i->n, i->bit);
return 0;
}
return 1;
}
void *load_policy(const char *policy_path)
{
FILE *fp;
policydb_t *db;
fp = fopen(policy_path, "re");
if (!fp) {
std::cerr << "Invalid or non-existing policy file: " << policy_path << std::endl;
return NULL;
}
db = (policydb_t *) calloc(1, sizeof(policydb_t));
if (!db) {
std::cerr << "Failed to allocate memory for policy db." << std::endl;
fclose(fp);
return NULL;
}
sidtab_t sidtab;
sepol_set_sidtab(&sidtab);
sepol_set_policydb(db);
struct stat sb;
if (fstat(fileno(fp), &sb)) {
std::cerr << "Failed to stat the policy file" << std::endl;
free(db);
fclose(fp);
return NULL;
}
auto unmap = [=](void *ptr) { munmap(ptr, sb.st_size); };
std::unique_ptr<void, decltype(unmap)> map(
mmap(NULL, sb.st_size, PROT_READ, MAP_PRIVATE, fileno(fp), 0), unmap);
if (!map) {
std::cerr << "Failed to map the policy file" << std::endl;
free(db);
fclose(fp);
return NULL;
}
struct policy_file pf;
policy_file_init(&pf);
pf.type = PF_USE_MEMORY;
pf.data = static_cast<char *>(map.get());
pf.len = sb.st_size;
if (policydb_init(db)) {
std::cerr << "Failed to initialize policydb" << std::endl;
free(db);
fclose(fp);
return NULL;
}
if (policydb_read(db, &pf, 0)) {
std::cerr << "Failed to read binary policy" << std::endl;
policydb_destroy(db);
free(db);
fclose(fp);
return NULL;
}
return static_cast<void *>(db);
}
/* items needed to iterate over the avtab */
struct avtab_iter {
avtab_t avtab;
uint32_t i;
avtab_ptr_t cur;
};
/*
* print allow rule into *out buffer.
*
* Returns -1 on error.
* Returns 0 on successfully reading an avtab entry.
* Returns 1 on complete
*/
static int get_avtab_allow_rule(char *out, size_t max_size, policydb_t *db,
struct avtab_iter *avtab_i)
{
size_t len;
for (; avtab_i->i < avtab_i->avtab.nslot; (avtab_i->i)++) {
if (avtab_i->cur == NULL) {
avtab_i->cur = avtab_i->avtab.htable[avtab_i->i];
}
for (; avtab_i->cur; avtab_i->cur = (avtab_i->cur)->next) {
if (!((avtab_i->cur)->key.specified & AVTAB_ALLOWED)) continue;
len = snprintf(out, max_size, "allow,%s,%s,%s,%s",
db->p_type_val_to_name[(avtab_i->cur)->key.source_type - 1],
db->p_type_val_to_name[(avtab_i->cur)->key.target_type - 1],
db->p_class_val_to_name[(avtab_i->cur)->key.target_class - 1],
sepol_av_to_string(db, (avtab_i->cur)->key.target_class, (avtab_i->cur)->datum.data));
avtab_i->cur = (avtab_i->cur)->next;
if (!(avtab_i->cur))
(avtab_i->i)++;
if (len >= max_size) {
std::cerr << "Allow rule exceeds buffer size." << std::endl;
return -1;
}
return 0;
}
avtab_i->cur = NULL;
}
return 1;
}
int get_allow_rule(char *out, size_t len, void *policydbp, void *avtab_iterp)
{
policydb_t *db = static_cast<policydb_t *>(policydbp);
struct avtab_iter *avtab_i = static_cast<struct avtab_iter *>(avtab_iterp);
return get_avtab_allow_rule(out, len, db, avtab_i);
}
/*
* <sepol/policydb/expand.h->conditional.h> uses 'bool' as a variable name
* inside extern "C" { .. } construct, which clang doesn't like.
* So, declare the function we need from expand.h ourselves.
*/
extern "C" int expand_avtab(policydb_t *p, avtab_t *a, avtab_t *expa);
static avtab_iter *init_avtab_common(avtab_t *in, policydb_t *p)
{
struct avtab_iter *out = (struct avtab_iter *)
calloc(1, sizeof(struct avtab_iter));
if (!out) {
std::cerr << "Failed to allocate avtab" << std::endl;
return NULL;
}
if (avtab_init(&out->avtab)) {
std::cerr << "Failed to initialize avtab" << std::endl;
free(out);
return NULL;
}
if (expand_avtab(p, in, &out->avtab)) {
std::cerr << "Failed to expand avtab" << std::endl;
free(out);
return NULL;
}
return out;
}
void *init_avtab(void *policydbp)
{
policydb_t *p = static_cast<policydb_t *>(policydbp);
return static_cast<void *>(init_avtab_common(&p->te_avtab, p));
}
void *init_cond_avtab(void *policydbp)
{
policydb_t *p = static_cast<policydb_t *>(policydbp);
return static_cast<void *>(init_avtab_common(&p->te_cond_avtab, p));
}
void destroy_avtab(void *avtab_iterp)
{
struct avtab_iter *avtab_i = static_cast<struct avtab_iter *>(avtab_iterp);
avtab_destroy(&avtab_i->avtab);
free(avtab_i);
}
void destroy_policy(void *policydbp)
{
policydb_t *p = static_cast<policydb_t *>(policydbp);
policydb_destroy(p);
}
from optparse import OptionParser
from optparse import Option, OptionValueError
import os
import policy
import re
import sys
'''
Use file_contexts and policy to verify Treble requirements
are not violated.
'''
###
# Differentiate between domains that are part of the core Android platform and
# domains introduced by vendors
coreAppdomain = {
'bluetooth',
'ephemeral_app',
'isolated_app',
'nfc',
'platform_app',
'priv_app',
'radio',
'shared_relro',
'shell',
'system_app',
'untrusted_app',
'untrusted_app_25',
'untrusted_v2_app',
}
coredomainWhitelist = {
'adbd',
'kernel',
'postinstall',
'postinstall_dexopt',
'recovery',
'system_server',
}
coredomainWhitelist |= coreAppdomain
class scontext:
def __init__(self):
self.fromSystem = False
self.fromVendor = False
self.coredomain = False
self.appdomain = False
self.attributes = set()
self.entrypoints = []
self.entrypointpaths = []
def PrintScontext(domain, sctx):
print domain
print "\tcoredomain="+str(sctx.coredomain)
print "\tappdomain="+str(sctx.appdomain)
print "\tfromSystem="+str(sctx.fromSystem)
print "\tfromVendor="+str(sctx.fromVendor)
print "\tattributes="+str(sctx.attributes)
print "\tentrypoints="+str(sctx.entrypoints)
print "\tentrypointpaths="
if sctx.entrypointpaths is not None:
for path in sctx.entrypointpaths:
print "\t\t"+str(path)
alldomains = {}
coredomains = set()
appdomains = set()
vendordomains = set()
###
# Check whether the regex will match a file path starting with the provided
# prefix
#
# Compares regex entries in file_contexts with a path prefix. Regex entries
# are often more specific than this file prefix. For example, the regex could
# be /system/bin/foo\.sh and the prefix could be /system. This function
# loops over the regex removing characters from the end until
# 1) there is a match - return True or 2) run out of characters - return
# False.
#
def MatchPathPrefix(pathregex, prefix):
for i in range(len(pathregex), 0, -1):
try:
pattern = re.compile('^' + pathregex[0:i] + "$")
except:
continue
if pattern.match(prefix):
return True
return False
def GetAllDomains(pol):
global alldomains
for result in pol.QueryTypeAttribute("domain", True):
alldomains[result] = scontext()
def GetAppDomains():
global appdomains
global alldomains
for d in alldomains:
# The application of the "appdomain" attribute is trusted because core
# selinux policy contains neverallow rules that enforce that only zygote
# and runas spawned processes may transition to processes that have
# the appdomain attribute.
if "appdomain" in alldomains[d].attributes:
alldomains[d].appdomain = True
appdomains.add(d)
def GetCoreDomains():
global alldomains
global coredomains
for d in alldomains:
# TestCoredomainViolators will verify if coredomain was incorrectly
# applied.
if "coredomain" in alldomains[d].attributes:
alldomains[d].coredomain = True
coredomains.add(d)
# check whether domains are executed off of /system or /vendor
if d in coredomainWhitelist:
continue
# TODO, add checks to prevent app domains from being incorrectly
# labeled as coredomain. Apps don't have entrypoints as they're always
# dynamically transitioned to by zygote.
if d in appdomains:
continue
if not alldomains[d].entrypointpaths:
continue
for path in alldomains[d].entrypointpaths:
# Processes with entrypoint on /system
if ((MatchPathPrefix(path, "/system") and not
MatchPathPrefix(path, "/system/vendor")) or
MatchPathPrefix(path, "/init") or
MatchPathPrefix(path, "/charger")):
alldomains[d].fromSystem = True
# Processes with entrypoint on /vendor or /system/vendor
if (MatchPathPrefix(path, "/vendor") or
MatchPathPrefix(path, "/system/vendor")):
alldomains[d].fromVendor = True
###
# Add the entrypoint type and path(s) to each domain.
#
def GetDomainEntrypoints(pol):
global alldomains
for x in pol.QueryTERule(tclass="file", perms=["entrypoint"]):
if not x.sctx in alldomains:
continue
alldomains[x.sctx].entrypoints.append(str(x.tctx))
# postinstall_file represents a special case specific to A/B OTAs.
# Update_engine mounts a partition and relabels it postinstall_file.
# There is no file_contexts entry associated with postinstall_file
# so skip the lookup.
if x.tctx == "postinstall_file":
continue
alldomains[x.sctx].entrypointpaths = pol.QueryFc(x.tctx)
###
# Get attributes associated with each domain
#
def GetAttributes(pol):
global alldomains
for domain in alldomains:
for result in pol.QueryTypeAttribute(domain, False):
alldomains[domain].attributes.add(result)
def setup(pol):
GetAllDomains(pol)
GetAttributes(pol)
GetDomainEntrypoints(pol)
GetAppDomains()
GetCoreDomains()
#############################################################
# Tests
#############################################################
def TestCoredomainViolations():
global alldomains
# verify that all domains launched from /system have the coredomain
# attribute
ret = ""
violators = []
for d in alldomains:
domain = alldomains[d]
if domain.fromSystem and "coredomain" not in domain.attributes:
violators.append(d);
if len(violators) > 0:
ret += "The following domain(s) must be associated with the "
ret += "\"coredomain\" attribute because they are executed off of "
ret += "/system:\n"
ret += " ".join(str(x) for x in sorted(violators)) + "\n"
# verify that all domains launched form /vendor do not have the coredomain
# attribute
violators = []
for d in alldomains:
domain = alldomains[d]
if domain.fromVendor and "coredomain" in domain.attributes:
violators.append(d)
if len(violators) > 0:
ret += "The following domains must not be associated with the "
ret += "\"coredomain\" attribute because they are executed off of "
ret += "/vendor or /system/vendor:\n"
ret += " ".join(str(x) for x in sorted(violators)) + "\n"
return ret
###
# extend OptionParser to allow the same option flag to be used multiple times.
# This is used to allow multiple file_contexts files and tests to be
# specified.
#
class MultipleOption(Option):
ACTIONS = Option.ACTIONS + ("extend",)
STORE_ACTIONS = Option.STORE_ACTIONS + ("extend",)
TYPED_ACTIONS = Option.TYPED_ACTIONS + ("extend",)
ALWAYS_TYPED_ACTIONS = Option.ALWAYS_TYPED_ACTIONS + ("extend",)
def take_action(self, action, dest, opt, value, values, parser):
if action == "extend":
values.ensure_value(dest, []).append(value)
else:
Option.take_action(self, action, dest, opt, value, values, parser)
Tests = ["CoredomainViolators"]
if __name__ == '__main__':
usage = "sepolicy-trebletests -f nonplat_file_contexts -f "
usage +="plat_file_contexts -p policy [--test test] [--help]"
parser = OptionParser(option_class=MultipleOption, usage=usage)
parser.add_option("-f", "--file_contexts", dest="file_contexts",
metavar="FILE", action="extend", type="string")
parser.add_option("-p", "--policy", dest="policy", metavar="FILE")
parser.add_option("-t", "--test", dest="test", action="extend",
help="Test options include "+str(Tests))
(options, args) = parser.parse_args()
if not options.policy:
sys.exit("Must specify monolithic policy file\n" + parser.usage)
if not os.path.exists(options.policy):
sys.exit("Error: policy file " + options.policy + " does not exist\n"
+ parser.usage)
if not options.file_contexts:
sys.exit("Error: Must specify file_contexts file(s)\n" + parser.usage)
for f in options.file_contexts:
if not os.path.exists(f):
sys.exit("Error: File_contexts file " + f + " does not exist\n" +
parser.usage)
pol = policy.Policy(options.policy, options.file_contexts)
setup(pol)
results = ""
# If an individual test is not specified, run all tests.
if options.test is None or "CoredomainViolations" in options.tests:
results += TestCoredomainViolations()
if len(results) > 0:
sys.exit(results)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment