diff --git a/scapy/layers/lltd.py b/scapy/layers/lltd.py index c89c80a1d7b49ef5453c4de0279ad3943b8a269b..7a4a644caba6e1539f0d5737aeed4e3b0d346a08 100644 --- a/scapy/layers/lltd.py +++ b/scapy/layers/lltd.py @@ -10,21 +10,32 @@ https://msdn.microsoft.com/en-us/library/cc233983.aspx """ import struct +from array import array from scapy.fields import BitField, FlagsField, ByteField, ByteEnumField, \ - ShortField, ShortEnumField, IntField, IntEnumField, LongField, \ - MultiEnumField, FieldLenField, FieldListField, PacketListField, \ - StrLenField, StrLenFieldUtf16, ConditionalField, MACField + ShortField, ShortEnumField, ThreeBytesField, IntField, IntEnumField, \ + LongField, MultiEnumField, FieldLenField, FieldListField, \ + PacketListField, StrLenField, StrLenFieldUtf16, ConditionalField, MACField from scapy.packet import Packet, Padding, bind_layers +from scapy.plist import PacketList from scapy.layers.l2 import Ether from scapy.layers.inet import IPField from scapy.layers.inet6 import IP6Field from scapy.config import conf from scapy.data import ETHER_ANY +# Protocol layers +################## + class LLTD(Packet): name = "LLTD" + answer_hashret = { + # (tos, function) tuple mapping (answer -> query), used by + # .hashret() + (1, 1): (0, 0), + (0, 12): (0, 11), + } fields_desc = [ ByteField("version", 1), ByteEnumField("tos", 0, { @@ -96,6 +107,38 @@ class LLTD(Packet): else: return self.sprintf('LLTD %tos% - %function%') + def hashret(self): + tos, function = self.tos, self.function + return "%c%c" % self.answer_hashret.get((tos, function), + (tos, function)) + + def answers(self, other): + if not isinstance(other, LLTD): + return False + if self.tos == 0: + if self.function == 0 and isinstance(self.payload, LLTDDiscover) \ + and len(self[LLTDDiscover].stations_list) == 1: + # "Topology discovery - Discover" with one MAC address + # discovered answers a "Quick discovery - Hello" + return other.tos == 1 and \ + other.function == 1 and \ + LLTDAttributeHostID in other and \ + other[LLTDAttributeHostID].mac == \ + self[LLTDDiscover].stations_list[0] + elif self.function == 12: + # "Topology discovery - QueryLargeTlvResp" answers + # "Topology discovery - QueryLargeTlv" with same .seq + # value + return other.tos == 0 and other.function == 11 \ + and other.seq == self.seq + elif self.tos == 1: + if self.function == 1 and isinstance(self.payload, LLTDHello): + # "Quick discovery - Hello" answers a "Topology + # discovery - Discover" + return other.tos == 0 and other.function == 0 and \ + other.real_src == self.current_mapper_address + return False + class LLTDHello(Packet): name = "LLTD - Hello" @@ -181,6 +224,48 @@ class LLTDQueryResp(Packet): ), [LLTD] +class LLTDQueryLargeTlv(Packet): + name = "LLTD - Query Large Tlv" + fields_desc = [ + ByteEnumField("type", 14, { + 14: "Icon image", + 17: "Friendly Name", + 19: "Hardware ID", + 22: "AP Association Table", + 24: "Detailed Icon Image", + 26: "Component Table", + 28: "Repeater AP Table", + }), + ThreeBytesField("offset", 0), + ] + + def mysummary(self): + return self.sprintf("%type% (offset %offset%)"), [LLTD] + + +class LLTDQueryLargeTlvResp(Packet): + name = "LLTD - Query Large Tlv Response" + fields_desc = [ + FlagsField("flags", 0, 2, "RM"), + BitField("len", None, 14), + StrLenField("value", "", length_from=lambda pkt: pkt.len) + ] + + def post_build(self, pkt, pay): + if self.len is None: + # len should be a FieldLenField but has an unsupported + # format (14 bits) + flags = ord(pkt[0]) & 0xc0 + length = len(self.value) + pkt = chr(flags + (length >> 8)) + chr(length % 256) + pkt[2:] + return pkt + pay + + def mysummary(self): + return self.sprintf("%%len%% bytes%s" % ( + " (last)" if not self.flags & 2 else "" + )), [LLTD] + + class LLTDAttribute(Packet): name = "LLTD Attribute" show_indent = False @@ -668,8 +753,71 @@ bind_layers(LLTD, LLTDHello, tos=0, function=1) bind_layers(LLTD, LLTDHello, tos=1, function=1) bind_layers(LLTD, LLTDEmit, tos=0, function=2) bind_layers(LLTD, LLTDQueryResp, tos=0, function=7) +bind_layers(LLTD, LLTDQueryLargeTlv, tos=0, function=11) +bind_layers(LLTD, LLTDQueryLargeTlvResp, tos=0, function=12) bind_layers(LLTDHello, LLTDAttribute) bind_layers(LLTDAttribute, LLTDAttribute) bind_layers(LLTDAttribute, Padding, type=0) bind_layers(LLTDEmiteeDesc, Padding) bind_layers(LLTDRecveeDesc, Padding) + + +# Utils +######## + +class LargeTlvBuilder(object): + """An object to build content fetched through LLTDQueryLargeTlv / + LLTDQueryLargeTlvResp packets. + + Usable with a PacketList() object: + >>> p = LargeTlvBuilder() + >>> p.parse(rdpcap('capture_file.cap')) + + Or during a network capture: + >>> p = LargeTlvBuilder() + >>> sniff(filter="ether proto 0x88d9", prn=p.parse) + + To get the result, use .get_data() + + """ + def __init__(self): + self.types_offsets = {} + self.data = {} + + def parse(self, plist): + """Update the builder using the provided `plist`. `plist` can + be either a Packet() or a PacketList(). + + """ + if not isinstance(plist, PacketList): + plist = PacketList(plist) + for pkt in plist[LLTD]: + if LLTDQueryLargeTlv in pkt: + key = "%s:%s:%d" % (pkt.real_dst, pkt.real_src, pkt.seq) + self.types_offsets[key] = (pkt[LLTDQueryLargeTlv].type, + pkt[LLTDQueryLargeTlv].offset) + elif LLTDQueryLargeTlvResp in pkt: + try: + key = "%s:%s:%d" % (pkt.real_src, pkt.real_dst, pkt.seq) + content, offset = self.types_offsets[key] + except KeyError: + continue + loc = slice(offset, offset + pkt[LLTDQueryLargeTlvResp].len) + key = "%s > %s [%s]" % ( + pkt.real_src, pkt.real_dst, + LLTDQueryLargeTlv.fields_desc[0].i2s.get(content, content), + ) + data = self.data.setdefault(key, array("B")) + datalen = len(data) + if datalen < loc.stop: + data.extend(array("B", "\x00" * (loc.stop - datalen))) + data[loc] = array("B", pkt[LLTDQueryLargeTlvResp].value) + + def get_data(self): + """Returns a dictionary object, keys are strings "source > + destincation [content type]", and values are the content + fetched, also as a string. + + """ + return dict((key, "".join(chr(byte) for byte in data)) + for key, data in self.data.iteritems()) diff --git a/test/regression.uts b/test/regression.uts index f317c6112aff0124582625a4d40eba38fa90c44d..af22f521ec7d1f3942fb1d629f4e5909ac03cbcf 100644 --- a/test/regression.uts +++ b/test/regression.uts @@ -4436,6 +4436,36 @@ assert pkt.src == pkt.real_src assert pkt.tos == 0 assert pkt.function == 0 += Large TLV +m1, m2, seq = RandMAC()._fix(), RandMAC()._fix(), 123 +preqbase = Ether(src=m1, dst=m2) / LLTD() / \ + LLTDQueryLargeTlv(type="Detailed Icon Image") +prespbase = Ether(src=m2, dst=m1) / LLTD() / \ + LLTDQueryLargeTlvResp() +plist = [] +pkt = preqbase.copy() +pkt.seq = seq +plist.append(Ether(str(pkt))) +pkt = prespbase.copy() +pkt.seq = seq +pkt.flags = "M" +pkt.value = "abcd" +plist.append(Ether(str(pkt))) +pkt = preqbase.copy() +pkt.seq = seq + 1 +pkt.offset = 4 +plist.append(Ether(str(pkt))) +pkt = prespbase.copy() +pkt.seq = seq + 1 +pkt.value = "efg" +plist.append(Ether(str(pkt))) +builder = LargeTlvBuilder() +builder.parse(plist) +data = builder.get_data() +assert len(data) == 1 +assert data.keys()[0].endswith(' [Detailed Icon Image]') +assert data.values()[0] == 'abcdefg' + ############ ############