diff --git a/scapy/config.py b/scapy/config.py index b97ee0bd0e9eb54c9c7e31d7ac04c57e943bf512..fe541ed5705be605d90ea517eead7de7ce2a070d 100755 --- a/scapy/config.py +++ b/scapy/config.py @@ -391,7 +391,7 @@ contribs: a dict which can be used by contrib layers to store local configuratio geoip_city_ipv6 = '/usr/share/GeoIP/GeoIPCityv6.dat' load_layers = ["l2", "inet", "dhcp", "dns", "dot11", "gprs", "tls", "hsrp", "inet6", "ir", "isakmp", "l2tp", "mgcp", - "mobileip", "netbios", "netflow", "ntp", "ppp", + "mobileip", "netbios", "netflow", "ntp", "ppp", "pptp", "radius", "rip", "rtp", "skinny", "smb", "snmp", "tftp", "x509", "bluetooth", "dhcp6", "llmnr", "sctp", "vrrp", "ipsec", "lltd", "vxlan"] diff --git a/scapy/layers/l2.py b/scapy/layers/l2.py index 40f44518b641cde92fae5af56dbf31d214170a55..e81c4e7ba0f8789c96b34feedc011e011bc16df0 100644 --- a/scapy/layers/l2.py +++ b/scapy/layers/l2.py @@ -968,6 +968,13 @@ class GRE(Packet): ConditionalField(XIntField("key",None), lambda pkt:pkt.key_present==1), ConditionalField(XIntField("seqence_number",None), lambda pkt:pkt.seqnum_present==1), ] + + @classmethod + def dispatch_hook(cls, _pkt=None, *args, **kargs): + if _pkt and struct.unpack("!H", _pkt[2:4])[0] == 0x880b: + return GRE_PPTP + return cls + def post_build(self, p, pay): p += pay if self.chksum_present and self.chksum is None: @@ -976,6 +983,37 @@ class GRE(Packet): return p +class GRE_PPTP(GRE): + + """ + Enhanced GRE header used with PPTP + RFC 2637 + """ + + name = "GRE PPTP" + fields_desc = [BitField("chksum_present", 0, 1), + BitField("routing_present", 0, 1), + BitField("key_present", 1, 1), + BitField("seqnum_present", 0, 1), + BitField("strict_route_source", 0, 1), + BitField("recursion_control", 0, 3), + BitField("acknum_present", 0, 1), + BitField("flags", 0, 4), + BitField("version", 1, 3), + XShortEnumField("proto", 0x880b, ETHER_TYPES), + ShortField("payload_len", None), + ShortField("call_id", None), + ConditionalField(XIntField("seqence_number", None), lambda pkt: pkt.seqnum_present == 1), + ConditionalField(XIntField("ack_number", None), lambda pkt: pkt.acknum_present == 1)] + + def post_build(self, p, pay): + p += pay + if self.payload_len is None: + pay_len = len(pay) + p = p[:4] + chr((pay_len >> 8) & 0xff) + chr(pay_len & 0xff) + p[6:] + return p + + ### *BSD loopback layer class LoIntEnumField(EnumField): diff --git a/scapy/layers/ppp.py b/scapy/layers/ppp.py index b2e40ca79eb88070db49bc3e095fb2367bb8b64a..e415eebfee4f348f067c91c28d47abef0dfbb559 100644 --- a/scapy/layers/ppp.py +++ b/scapy/layers/ppp.py @@ -11,13 +11,13 @@ PPP (Point to Point Protocol) import struct from scapy.packet import Packet, bind_layers -from scapy.layers.l2 import Ether, CookedLinux +from scapy.layers.l2 import Ether, CookedLinux, GRE_PPTP from scapy.layers.inet import IP from scapy.layers.inet6 import IPv6 from scapy.fields import BitField, ByteEnumField, ByteField, \ - ConditionalField, FieldLenField, IPField, PacketListField, \ - ShortEnumField, ShortField, StrFixedLenField, StrLenField, XByteField, \ - XShortField + ConditionalField, FieldLenField, IntField, IPField, LenField, \ + PacketListField, PacketField, ShortEnumField, ShortField, \ + StrFixedLenField, StrLenField, XByteField, XShortField class PPPoE(Packet): @@ -339,6 +339,220 @@ class PPP_ECP(Packet): FieldLenField("len" , None, fmt="H", length_of="options", adjust=lambda p,x:x+4 ), PacketListField("options", [], PPP_ECP_Option, length_from=lambda p:p.len-4,) ] +### Link Control Protocol (RFC 1661) + +_PPP_lcptypes = {1: "Configure-Request", + 2: "Configure-Ack", + 3: "Configure-Nak", + 4: "Configure-Reject", + 5: "Terminate-Request", + 6: "Terminate-Ack", + 7: "Code-Reject", + 8: "Protocol-Reject", + 9: "Echo-Request", + 10: "Echo-Reply", + 11: "Discard-Request"} + + +class PPP_LCP(Packet): + name = "PPP Link Control Protocol" + fields_desc = [ByteEnumField("code", 5, _PPP_lcptypes), + XByteField("id", 0), + FieldLenField("len", None, fmt="H", length_of="data", + adjust=lambda p, x: x + 4), + StrLenField("data", "", + length_from=lambda p:p.len-4)] + + def extract_padding(self, pay): + return "",pay + + @classmethod + def dispatch_hook(cls, _pkt = None, *args, **kargs): + if _pkt: + o = ord(_pkt[0]) + if o in [1, 2, 3, 4]: + return PPP_LCP_Configure + elif o in [5,6]: + return PPP_LCP_Terminate + elif o == 7: + return PPP_LCP_Code_Reject + elif o == 8: + return PPP_LCP_Protocol_Reject + elif o in [9, 10]: + return PPP_LCP_Echo + elif o == 11: + return PPP_LCP_Discard_Request + else: + return cls + return cls + + +_PPP_lcp_optiontypes = {1: "Maximum-Receive-Unit", + 2: "Async-Control-Character-Map", + 3: "Authentication-protocol", + 4: "Quality-protocol", + 5: "Magic-number", + 7: "Protocol-Field-Compression", + 8: "Address-and-Control-Field-Compression", + 13: "Callback"} + + +class PPP_LCP_Option(Packet): + name = "PPP LCP Option" + fields_desc = [ByteEnumField("type", None, _PPP_lcp_optiontypes), + FieldLenField("len", None, fmt="B", length_of="data", + adjust=lambda p,x:x+2), + StrLenField("data", None, length_from=lambda p:p.len-2)] + + def extract_padding(self, pay): + return "", pay + + registered_options = {} + + @classmethod + def register_variant(cls): + cls.registered_options[cls.type.default] = cls + + @classmethod + def dispatch_hook(cls, _pkt=None, *args, **kargs): + if _pkt: + o = ord(_pkt[0]) + return cls.registered_options.get(o, cls) + return cls + + +class PPP_LCP_MRU_Option(PPP_LCP_Option): + fields_desc = [ByteEnumField("type", 1, _PPP_lcp_optiontypes), + FieldLenField("len", 4, fmt="B", adjust=lambda p,x:4), + ShortField("max_recv_unit", 1500)] + +_PPP_LCP_auth_protocols = {0xc023: "Password authentication protocol", + 0xc223: "Challenge-response authentication protocol", + 0xc227: "PPP Extensible authentication protocol"} + +_PPP_LCP_CHAP_algorithms = {5: "MD5", + 6: "SHA1", + 128: "MS-CHAP", + 129: "MS-CHAP-v2"} + + +class PPP_LCP_ACCM_Option(PPP_LCP_Option): + fields_desc = [ByteEnumField("type", 2, _PPP_lcp_optiontypes), + FieldLenField("len", 6, fmt="B"), + BitField("accm", 0x00000000, 32)] + + +def adjust_auth_len(pkt, x): + if pkt.auth_protocol == 0xc223: + return 5 + elif pkt.auth_protocol == 0xc023: + return 4 + else: + return x + 4 + + +class PPP_LCP_Auth_Protocol_Option(PPP_LCP_Option): + fields_desc = [ByteEnumField("type", 3, _PPP_lcp_optiontypes), + FieldLenField("len", None, fmt="B", length_of="data", + adjust=adjust_auth_len), + ShortEnumField("auth_protocol", 0xc023, _PPP_LCP_auth_protocols), + ConditionalField(StrLenField("data", '', length_from=lambda p:p.len-4), + lambda p:p.auth_protocol != 0xc223), + ConditionalField(ByteEnumField("algorithm", 5, _PPP_LCP_CHAP_algorithms), + lambda p:p.auth_protocol == 0xc223)] + + +_PPP_LCP_quality_protocols = {0xc025: "Link Quality Report"} + + +class PPP_LCP_Quality_Protocol_Option(PPP_LCP_Option): + fields_desc = [ByteEnumField("type", 4, _PPP_lcp_optiontypes), + FieldLenField("len", None, fmt="B", length_of="data", + adjust=lambda p,x:x+4), + ShortEnumField("quality_protocol", 0xc025, _PPP_LCP_quality_protocols), + StrLenField("data", "", length_from=lambda p:p.len-4)] + + +class PPP_LCP_Magic_Number_Option(PPP_LCP_Option): + fields_desc = [ByteEnumField("type", 5, _PPP_lcp_optiontypes), + FieldLenField("len", 6, fmt="B", adjust = lambda p,x:6), + IntField("magic_number", None)] + + +_PPP_lcp_callback_operations = {0: "Location determined by user authentication", + 1: "Dialing string", + 2: "Location identifier", + 3: "E.164 number", + 4: "Distinguished name"} + + +class PPP_LCP_Callback_Option(PPP_LCP_Option): + fields_desc = [ByteEnumField("type", 13, _PPP_lcp_optiontypes), + FieldLenField("len", None, fmt="B", length_of="message", + adjust=lambda p,x:x+3), + ByteEnumField("operation", 0, _PPP_lcp_callback_operations), + StrLenField("message", "", length_from=lambda p:p.len-3)] + + +class PPP_LCP_Configure(PPP_LCP): + fields_desc = [ByteEnumField("code", 1, _PPP_lcptypes), + XByteField("id", 0), + FieldLenField("len", None, fmt="H", length_of="options", + adjust=lambda p,x:x+4), + PacketListField("options", [], PPP_LCP_Option, + length_from=lambda p:p.len-4)] + + def answers(self, other): + return isinstance(other, PPP_LCP_Configure) and self.code in [2, 3, 4]\ + and other.code == 1 and other.id == self.id + + +class PPP_LCP_Terminate(PPP_LCP): + + def answers(self, other): + return isinstance(other, PPP_LCP_Terminate) and self.code == 6\ + and other.code == 5 and other.id == self.id + + +class PPP_LCP_Code_Reject(PPP_LCP): + fields_desc = [ByteEnumField("code", 7, _PPP_lcptypes), + XByteField("id", 0), + FieldLenField("len", None, fmt="H", length_of="rejected_packet", + adjust=lambda p,x:x+4), + PacketField("rejected_packet", None, PPP_LCP)] + + +class PPP_LCP_Protocol_Reject(PPP_LCP): + fields_desc = [ByteEnumField("code", 8, _PPP_lcptypes), + XByteField("id", 0), + FieldLenField("len", None, fmt="H", length_of="rejected_information", + adjust=lambda p,x:x+6), + ShortEnumField("rejected_protocol", None, _PPP_proto), + PacketField("rejected_information", None, Packet)] + + +class PPP_LCP_Echo(PPP_LCP): + fields_desc = [ByteEnumField("code", 9, _PPP_lcptypes), + XByteField("id", 0), + FieldLenField("len", None, fmt="H", length_of="data", + adjust=lambda p,x:x+8), + IntField("magic_number", None), + StrLenField("data", "", length_from=lambda p:p.len-8)] + + def answers(self, other): + return isinstance(other, PPP_LCP_Echo) and self.code == 10\ + and other.code == 9 and self.id == other.id + + +class PPP_LCP_Discard_Request(PPP_LCP): + fields_desc = [ByteEnumField("code", 11, _PPP_lcptypes), + XByteField("id", 0), + FieldLenField("len", None, fmt="H", length_of="data", + adjust=lambda p,x:x+8), + IntField("magic_number", None), + StrLenField("data", "", length_from=lambda p:p.len-8)] + + bind_layers( Ether, PPPoED, type=0x8863) bind_layers( Ether, PPPoE, type=0x8864) bind_layers( CookedLinux, PPPoED, proto=0x8863) @@ -349,5 +563,7 @@ bind_layers( PPP, IP, proto=0x0021) bind_layers( PPP, IPv6, proto=0x0057) bind_layers( PPP, PPP_IPCP, proto=0x8021) bind_layers( PPP, PPP_ECP, proto=0x8053) +bind_layers( PPP, PPP_LCP, proto=0xc021) bind_layers( Ether, PPP_IPCP, type=0x8021) bind_layers( Ether, PPP_ECP, type=0x8053) +bind_layers( GRE_PPTP, PPP, proto=0x880b) diff --git a/scapy/layers/pptp.py b/scapy/layers/pptp.py new file mode 100644 index 0000000000000000000000000000000000000000..16a9c6d363f78fd68346070b8a327d2176d51c46 --- /dev/null +++ b/scapy/layers/pptp.py @@ -0,0 +1,373 @@ +## This file is part of Scapy +## See http://www.secdev.org/projects/scapy for more informations +## Copyright (C) Jan Sebechlebsky <sebechlebskyjan@gmail.com> +## This program is published under a GPLv2 license + +""" +PPTP (Point to Point Tunneling Protocol) + +[RFC 2637] +""" + +from scapy.packet import Packet, bind_layers +from scapy.layers.inet import TCP +from scapy.fields import ByteEnumField, FieldLenField, FlagsField, IntField, IntEnumField,\ + LenField, XIntField, ShortField, ShortEnumField, StrFixedLenField,\ + StrLenField, XShortField, XByteField + +_PPTP_MAGIC_COOKIE = 0x1a2b3c4d + +_PPTP_msg_type = {1: "Control Message", + 2: "Managemenent Message"} + +_PPTP_ctrl_msg_type = { # Control Connection Management + 1: "Start-Control-Connection-Request", + 2: "Start-Control-Connection-Reply", + 3: "Stop-Control-Connection-Request", + 4: "Stop-Control-Connection-Reply", + 5: "Echo-Request", + 6: "Echo-Reply", + # Call Management + 7: "Outgoing-Call-Request", + 8: "Outgoing-Call-Reply", + 9: "Incoming-Call-Request", + 10: "Incoming-Call-Reply", + 11: "Incoming-Call-Connected", + 12: "Call-Clear-Request", + 13: "Call-Disconnect-Notify", + # Error Reporting + 14: "WAN-Error-Notify", + # PPP Session Control + 15: "Set-Link-Info"} + +_PPTP_general_error_code = {0: "None", + 1: "Not-Connected", + 2: "Bad-Format", + 3: "Bad-Value", + 4: "No-Resource", + 5: "Bad-Call ID", + 6: "PAC-Error"} + + +class PPTP(Packet): + name = "PPTP" + fields_desc = [FieldLenField("len", None, fmt="H", length_of="data", + adjust=lambda p, x: x + 12), + ShortEnumField("type", 1, _PPTP_msg_type), + XIntField("magic_cookie", _PPTP_MAGIC_COOKIE), + ShortEnumField("ctrl_msg_type", 1, _PPTP_ctrl_msg_type), + XShortField("reserved_0", 0x0000), + StrLenField("data", "",length_from=lambda p: p.len - 12)] + + registered_options = {} + + @classmethod + def register_variant(cls): + cls.registered_options[cls.ctrl_msg_type.default] = cls + + @classmethod + def dispatch_hook(cls, _pkt=None, *args, **kargs): + if _pkt: + o = ord(_pkt[9]) + return cls.registered_options.get(o, cls) + return cls + + +_PPTP_FRAMING_CAPABILITIES_FLAGS = ["Asynchronous Framing supported", + "Synchronous Framing supported"] + +_PPTP_BEARER_CAPABILITIES_FLAGS = ["Analog access supported", + "Digital access supported"] + + +class PPTPStartControlConnectionRequest(PPTP): + name = "PPTP Start Control Connection Request" + fields_desc = [LenField("len", 156), + ShortEnumField("type", 1, _PPTP_msg_type), + XIntField("magic_cookie", _PPTP_MAGIC_COOKIE), + ShortEnumField("ctrl_msg_type", 1, _PPTP_ctrl_msg_type), + XShortField("reserved_0", 0x0000), + ShortField("protocol_version", 1), + XShortField("reserved_1", 0x0000), + FlagsField("framing_capabilities", 0, 32, + _PPTP_FRAMING_CAPABILITIES_FLAGS), + FlagsField("bearer_capabilities", 0, 32, + _PPTP_BEARER_CAPABILITIES_FLAGS), + ShortField("maximum_channels", 65535), + ShortField("firmware_revision", 256), + StrFixedLenField("host_name", "linux", 64), + StrFixedLenField("vendor_string", "", 64)] + +_PPTP_start_control_connection_result = {1: "OK", + 2: "General error", + 3: "Command channel already exists", + 4: "Not authorized", + 5: "Unsupported protocol version"} + + +class PPTPStartControlConnectionReply(PPTP): + name = "PPTP Start Control Connection Reply" + fields_desc = [LenField("len", 156), + ShortEnumField("type", 1, _PPTP_msg_type), + XIntField("magic_cookie", _PPTP_MAGIC_COOKIE), + ShortEnumField("ctrl_msg_type", 2, _PPTP_ctrl_msg_type), + XShortField("reserved_0", 0x0000), + ShortField("protocol_version", 1), + ByteEnumField("result_code", 1, + _PPTP_start_control_connection_result), + ByteEnumField("error_code", 0, _PPTP_general_error_code), + FlagsField("framing_capabilities", 0, 32, + _PPTP_FRAMING_CAPABILITIES_FLAGS), + FlagsField("bearer_capabilities", 0, 32, + _PPTP_BEARER_CAPABILITIES_FLAGS), + ShortField("maximum_channels", 65535), + ShortField("firmware_revision", 256), + StrFixedLenField("host_name", "linux", 64), + StrFixedLenField("vendor_string", "", 64)] + + def answers(self, other): + return isinstance(other, PPTPStartControlConnectionRequest) + + +_PPTP_stop_control_connection_reason = {1: "None", + 2: "Stop-Protocol", + 3: "Stop-Local-Shutdown"} + + +class PPTPStopControlConnectionRequest(PPTP): + name = "PPTP Stop Control Connection Request" + fields_desc = [LenField("len", 16), + ShortEnumField("type", 1, _PPTP_msg_type), + XIntField("magic_cookie", _PPTP_MAGIC_COOKIE), + ShortEnumField("ctrl_msg_type", 3, _PPTP_ctrl_msg_type), + XShortField("reserved_0", 0x0000), + ByteEnumField("reason", 1, + _PPTP_stop_control_connection_reason), + XByteField("reserved_1", 0x00), + XShortField("reserved_2", 0x0000)] + +_PPTP_stop_control_connection_result = {1: "OK", + 2: "General error"} + + +class PPTPStopControlConnectionReply(PPTP): + name = "PPTP Stop Control Connection Reply" + fields_desc = [LenField("len", 16), + ShortEnumField("type", 1, _PPTP_msg_type), + XIntField("magic_cookie", _PPTP_MAGIC_COOKIE), + ShortEnumField("ctrl_msg_type", 4, _PPTP_ctrl_msg_type), + XShortField("reserved_0", 0x0000), + ByteEnumField("result_code", 1, + _PPTP_stop_control_connection_result), + ByteEnumField("error_code", 0, _PPTP_general_error_code), + XShortField("reserved_2", 0x0000)] + + def answers(self, other): + return isinstance(other, PPTPStopControlConnectionRequest) + + +class PPTPEchoRequest(PPTP): + name = "PPTP Echo Request" + fields_desc = [LenField("len", 16), + ShortEnumField("type", 1, _PPTP_msg_type), + XIntField("magic_cookie", _PPTP_MAGIC_COOKIE), + ShortEnumField("ctrl_msg_type", 5, _PPTP_ctrl_msg_type), + XShortField("reserved_0", 0x0000), + IntField("identifier", None)] + +_PPTP_echo_result = {1: "OK", + 2: "General error"} + + +class PPTPEchoReply(PPTP): + name = "PPTP Echo Reply" + fields_desc = [LenField("len", 20), + ShortEnumField("type", 1, _PPTP_msg_type), + XIntField("magic_cookie", _PPTP_MAGIC_COOKIE), + ShortEnumField("ctrl_msg_type", 6, _PPTP_ctrl_msg_type), + XShortField("reserved_0", 0x0000), + IntField("identifier", None), + ByteEnumField("result_code", 1, _PPTP_echo_result), + ByteEnumField("error_code", 0, _PPTP_general_error_code), + XShortField("reserved_1", 0x0000)] + + def answers(self, other): + return isinstance(other, PPTPEchoRequest) and other.identifier == self.identifier + +_PPTP_bearer_type = {1: "Analog channel", + 2: "Digital channel", + 3: "Any type of channel"} + +_PPTP_framing_type = {1: "Asynchronous framing", + 2: "Synchronous framing", + 3: "Any type of framing"} + + +class PPTPOutgoingCallRequest(PPTP): + name = "PPTP Outgoing Call Request" + fields_desc = [LenField("len", 168), + ShortEnumField("type", 1, _PPTP_msg_type), + XIntField("magic_cookie", _PPTP_MAGIC_COOKIE), + ShortEnumField("ctrl_msg_type", 7, _PPTP_ctrl_msg_type), + XShortField("reserved_0", 0x0000), + ShortField("call_id", 1), + ShortField("call_serial_number", 0), + IntField("minimum_bps", 32768), + IntField("maximum_bps", 2147483648), + IntEnumField("bearer_type", 3, _PPTP_bearer_type), + IntEnumField("framing_type", 3, _PPTP_framing_type), + ShortField("pkt_window_size", 16), + ShortField("pkt_proc_delay", 0), + ShortField('phone_number_len', 0), + XShortField("reserved_1", 0x0000), + StrFixedLenField("phone_number", '', 64), + StrFixedLenField("subaddress", '', 64)] + +_PPTP_result_code = {1: "Connected", + 2: "General error", + 3: "No Carrier", + 4: "Busy", + 5: "No dial tone", + 6: "Time-out", + 7: "Do not accept"} + + +class PPTPOutgoingCallReply(PPTP): + name = "PPTP Outgoing Call Reply" + fields_desc = [LenField("len", 32), + ShortEnumField("type", 1, _PPTP_msg_type), + XIntField("magic_cookie", _PPTP_MAGIC_COOKIE), + ShortEnumField("ctrl_msg_type", 8, _PPTP_ctrl_msg_type), + XShortField("reserved_0", 0x0000), + ShortField("call_id", 1), + ShortField("peer_call_id", 1), + ByteEnumField("result_code", 1, _PPTP_result_code), + ByteEnumField("error_code", 0, _PPTP_general_error_code), + ShortField("cause_code", 0), + IntField("connect_speed", 100000000), + ShortField("pkt_window_size", 16), + ShortField("pkt_proc_delay", 0), + IntField("channel_id", 0)] + + def answers(self, other): + return isinstance(other, PPTPOutgoingCallRequest) and other.call_id == self.peer_call_id + + +class PPTPIncomingCallRequest(PPTP): + name = "PPTP Incoming Call Request" + fields_desc = [LenField("len", 220), + ShortEnumField("type", 1, _PPTP_msg_type), + XIntField("magic_cookie", _PPTP_MAGIC_COOKIE), + ShortEnumField("ctrl_msg_type", 9, _PPTP_ctrl_msg_type), + XShortField("reserved_0", 0x0000), + ShortField("call_id", 1), + ShortField("call_serial_number", 1), + IntEnumField("bearer_type", 3, _PPTP_bearer_type), + IntField("channel_id", 0), + ShortField("dialed_number_len", 0), + ShortField("dialing_number_len", 0), + StrFixedLenField("dialed_number", "", 64), + StrFixedLenField("dialing_number", "", 64), + StrFixedLenField("subaddress", "", 64)] + + +class PPTPIncomingCallReply(PPTP): + name = "PPTP Incoming Call Reply" + fields_desc = [LenField("len", 148), + ShortEnumField("type", 1, _PPTP_msg_type), + XIntField("magic_cookie", _PPTP_MAGIC_COOKIE), + ShortEnumField("ctrl_msg_type", 10, _PPTP_ctrl_msg_type), + XShortField("reserved_0", 0x0000), + ShortField("call_id", 1), + ShortField("peer_call_id", 1), + ByteEnumField("result_code", 1, _PPTP_result_code), + ByteEnumField("error_code", 0, _PPTP_general_error_code), + ShortField("pkt_window_size", 64), + ShortField("pkt_transmit_delay", 0), + XShortField("reserved_1", 0x0000)] + + def answers(self, other): + return isinstance(other, PPTPIncomingCallRequest) and other.call_id == self.peer_call_id + + +class PPTPIncomingCallConnected(PPTP): + name = "PPTP Incoming Call Connected" + fields_desc = [LenField("len", 28), + ShortEnumField("type", 1, _PPTP_msg_type), + XIntField("magic_cookie", _PPTP_MAGIC_COOKIE), + ShortEnumField("ctrl_msg_type", 11, _PPTP_ctrl_msg_type), + XShortField("reserved_0", 0x0000), + ShortField("peer_call_id", 1), + XShortField("reserved_1", 0x0000), + IntField("connect_speed", 100000000), + ShortField("pkt_window_size", 64), + ShortField("pkt_transmit_delay", 0), + IntEnumField("framing_type", 1, _PPTP_framing_type)] + + def answers(self, other): + return isinstance(other, PPTPIncomingCallReply) and other.call_id == self.peer_call_id + + +class PPTPCallClearRequest(PPTP): + name = "PPTP Call Clear Request" + fields_desc = [LenField("len", 16), + ShortEnumField("type", 1, _PPTP_msg_type), + XIntField("magic_cookie", _PPTP_MAGIC_COOKIE), + ShortEnumField("ctrl_msg_type", 12, _PPTP_ctrl_msg_type), + XShortField("reserved_0", 0x0000), + ShortField("call_id", 1), + XShortField("reserved_1", 0x0000)] + +_PPTP_call_disconnect_result = {1: "Lost Carrier", + 2: "General error", + 3: "Admin Shutdown", + 4: "Request"} + + +class PPTPCallDisconnectNotify(PPTP): + name = "PPTP Call Disconnect Notify" + fields_desc = [LenField("len", 148), + ShortEnumField("type", 1, _PPTP_msg_type), + XIntField("magic_cookie", _PPTP_MAGIC_COOKIE), + ShortEnumField("ctrl_msg_type", 13, _PPTP_ctrl_msg_type), + XShortField("reserved_0", 0x0000), + ShortField("call_id", 1), + ByteEnumField("result_code", 1, + _PPTP_call_disconnect_result), + ByteEnumField("error_code", 0, _PPTP_general_error_code), + ShortField("cause_code", 0), + XShortField("reserved_1", 0x0000), + StrFixedLenField("call_statistic", "", 128)] + + +class PPTPWANErrorNotify(PPTP): + name = "PPTP WAN Error Notify" + fields_desc = [LenField("len", 40), + ShortEnumField("type", 1, _PPTP_msg_type), + XIntField("magic_cookie", _PPTP_MAGIC_COOKIE), + ShortEnumField("ctrl_msg_type", 14, _PPTP_ctrl_msg_type), + XShortField("reserved_0", 0x0000), + ShortField("peer_call_id", 1), + XShortField("reserved_1", 0x0000), + IntField("crc_errors", 0), + IntField("framing_errors", 0), + IntField("hardware_overruns", 0), + IntField("buffer_overruns", 0), + IntField("time_out_errors", 0), + IntField("alignment_errors", 0)] + + +class PPTPSetLinkInfo(PPTP): + name = "PPTP Set Link Info" + fields_desc = [LenField("len", 24), + ShortEnumField("type", 1, _PPTP_msg_type), + XIntField("magic_cookie", _PPTP_MAGIC_COOKIE), + ShortEnumField("ctrl_msg_type", 15, _PPTP_ctrl_msg_type), + XShortField("reserved_0", 0x0000), + ShortField("peer_call_id", 1), + XShortField("reserved_1", 0x0000), + XIntField("send_accm", 0x00000000), + XIntField("receive_accm", 0x00000000)] + +bind_layers(TCP, PPTP, sport=1723) +bind_layers(TCP, PPTP, dport=1723) diff --git a/test/pptp.uts b/test/pptp.uts new file mode 100644 index 0000000000000000000000000000000000000000..8aef367e53433efb5b0044fd0e221e7a78c0155d --- /dev/null +++ b/test/pptp.uts @@ -0,0 +1,684 @@ +############################## +% PPTP Related regression tests +############################## + ++ GRE Tests + += Test IP/GRE v0 decoding +~ gre ip + +data='45c00064000f0000ff2f1647c0a80c01c0a8170300000800'.decode('hex') +pkt = IP(data) +assert GRE in pkt +gre = pkt[GRE] +assert gre.chksum_present == 0 +assert gre.routing_present == 0 +assert gre.key_present == 0 +assert gre.seqnum_present == 0 +assert gre.strict_route_source == 0 +assert gre.recursion_control == 0 +assert gre.flags == 0 +assert gre.version == 0 +assert gre.proto == 0x800 + += Test IP/GRE v1 decoding with PPP LCP +~ gre ip pptp ppp lcp + +data='4500003c18324000402f0e5a0a0000020a0000063001880b001c9bf500000000ff03'\ + 'c021010100180206000000000304c2270506fbb8831007020802'.decode('hex') +pkt = IP(data) +assert GRE_PPTP in pkt +gre_pptp = pkt[GRE_PPTP] +assert gre_pptp.chksum_present == 0 +assert gre_pptp.routing_present == 0 +assert gre_pptp.key_present == 1 +assert gre_pptp.seqnum_present == 1 +assert gre_pptp.strict_route_source == 0 +assert gre_pptp.recursion_control == 0 +assert gre_pptp.acknum_present == 0 +assert gre_pptp.flags == 0 +assert gre_pptp.version == 1 +assert gre_pptp.proto == 0x880b +assert gre_pptp.payload_len == 28 +assert gre_pptp.call_id == 39925 +assert gre_pptp.seqence_number == 0x0 + +assert HDLC in pkt +assert PPP in pkt +assert PPP_LCP_Configure in pkt + += Test IP/GRE v1 encoding/decoding with PPP LCP Echo +~ gre ip pptp ppp hdlc lcp lcp_echo + +pkt = IP(src='192.168.0.1', dst='192.168.0.2') /\ + GRE_PPTP(seqnum_present=1, acknum_present=1, seqence_number=47, ack_number=42) /\ + HDLC() / PPP() / PPP_LCP_Echo(id=42, magic_number=4242, data='abcdef') +pkt_data = str(pkt) +pkt_data_ref = '4500003600010000402ff944c0a80001c0a800023081880b001200000000002f000000'\ + '2aff03c021092a000e00001092616263646566'.decode('hex') +assert (pkt_data == pkt_data_ref) +pkt_decoded = IP(pkt_data_ref) +assert IP in pkt +assert GRE_PPTP in pkt +assert HDLC in pkt +assert PPP in pkt +assert PPP_LCP_Echo in pkt + +assert pkt[IP].proto == 47 +assert pkt[GRE_PPTP].chksum_present == 0 +assert pkt[GRE_PPTP].routing_present == 0 +assert pkt[GRE_PPTP].key_present == 1 +assert pkt[GRE_PPTP].seqnum_present == 1 +assert pkt[GRE_PPTP].acknum_present == 1 +assert pkt[GRE_PPTP].seqence_number == 47 +assert pkt[GRE_PPTP].ack_number == 42 +assert pkt[PPP].proto == 0xc021 +assert pkt[PPP_LCP_Echo].code == 9 +assert pkt[PPP_LCP_Echo].id == 42 +assert pkt[PPP_LCP_Echo].magic_number == 4242 +assert pkt[PPP_LCP_Echo].data == 'abcdef' + ++ PPP LCP Tests += Test LCP Echo Request / Reply +~ ppp lcp lcp_echo + +lcp_echo_request_data = 'c021090700080000002a'.decode('hex') +lcp_echo_reply_data = str(PPP()/PPP_LCP_Echo(code=10, id=7, magic_number=77, data='defgh')) + +lcp_echo_request_pkt = PPP(lcp_echo_request_data) +lcp_echo_reply_pkt = PPP(lcp_echo_reply_data) + +assert lcp_echo_reply_pkt.answers(lcp_echo_request_pkt) +assert not lcp_echo_request_pkt.answers(lcp_echo_reply_pkt) + +lcp_echo_non_reply_data = str(PPP()/PPP_LCP_Echo(code=10, id=3, magic_number=77)) +lcp_echo_non_reply_pkt = PPP(lcp_echo_non_reply_data) + +assert not lcp_echo_non_reply_pkt.answers(lcp_echo_request_pkt) + +lcp_echo_non_reply_data = str(PPP()/PPP_LCP_Echo(id=7, magic_number=42)) +lcp_echo_non_reply_pkt = PPP(lcp_echo_non_reply_data) + +assert not lcp_echo_non_reply_pkt.answers(lcp_echo_request_pkt) + += Test LCP Configure Request +~ ppp lcp lcp_configure magic_number + +conf_req = PPP() / PPP_LCP_Configure(id=42, options=[PPP_LCP_Magic_Number_Option(magic_number=4242)]) +conf_req_ref_data = 'c021012a000a050600001092'.decode('hex') + +assert str(conf_req) == conf_req_ref_data + +conf_req_pkt = PPP(conf_req_ref_data) + +assert PPP_LCP_Configure in conf_req_pkt +assert conf_req_pkt[PPP_LCP_Configure].code == 1 +assert conf_req_pkt[PPP_LCP_Configure].id == 42 +assert len(conf_req_pkt[PPP_LCP_Configure].options) == 1 +assert isinstance(conf_req_pkt[PPP_LCP_Configure].options[0], PPP_LCP_Magic_Number_Option) +assert conf_req_pkt[PPP_LCP_Configure].options[0].magic_number == 4242 + += Test LCP Configure Ack +~ ppp lcp lcp_configure lcp_configure_ack + +conf_ack = PPP() / PPP_LCP_Configure(code='Configure-Ack', id=42, + options=[PPP_LCP_Magic_Number_Option(magic_number=4242)]) +conf_ack_ref_data = 'c021022a000a050600001092'.decode('hex') + +assert (str(conf_ack) == conf_ack_ref_data) + +conf_ack_pkt = PPP(conf_ack_ref_data) + +assert PPP_LCP_Configure in conf_ack_pkt +assert conf_ack_pkt[PPP_LCP_Configure].code == 2 +assert conf_ack_pkt[PPP_LCP_Configure].id == 42 +assert len(conf_ack_pkt[PPP_LCP_Configure].options) == 1 +assert isinstance(conf_ack_pkt[PPP_LCP_Configure].options[0], PPP_LCP_Magic_Number_Option) +assert conf_ack_pkt[PPP_LCP_Configure].options[0].magic_number == 4242 + +conf_req_pkt = PPP('c021012a000a050600001092'.decode('hex')) + +assert conf_ack_pkt.answers(conf_req_pkt) +assert not conf_req_pkt.answers(conf_ack_pkt) + += Test LCP Configure Nak +~ ppp lcp lcp_configure lcp_configure_nak lcp_mru_option lcp_accm_option + +conf_nak = PPP() / PPP_LCP_Configure(code='Configure-Nak', id=42, + options=[PPP_LCP_MRU_Option(), PPP_LCP_ACCM_Option(accm=0xffff0000)]) +conf_nak_ref_data = 'c021032a000e010405dc0206ffff0000'.decode('hex') + +assert(str(conf_nak) == conf_nak_ref_data) + +conf_nak_pkt = PPP(conf_nak_ref_data) + +assert PPP_LCP_Configure in conf_nak_pkt +assert conf_nak_pkt[PPP_LCP_Configure].code == 3 +assert conf_nak_pkt[PPP_LCP_Configure].id == 42 +assert len(conf_nak_pkt[PPP_LCP_Configure].options) == 2 +assert isinstance(conf_nak_pkt[PPP_LCP_Configure].options[0], PPP_LCP_MRU_Option) +assert conf_nak_pkt[PPP_LCP_Configure].options[0].max_recv_unit == 1500 +assert isinstance(conf_nak_pkt[PPP_LCP_Configure].options[1], PPP_LCP_ACCM_Option) +assert conf_nak_pkt[PPP_LCP_Configure].options[1].accm == 0xffff0000 + +conf_req_pkt = PPP('c021012a000e010405dc0206ffff0000'.decode('hex')) + +assert conf_nak_pkt.answers(conf_req_pkt) +assert not conf_req_pkt.answers(conf_nak_pkt) + += Test LCP Configure Reject +~ ppp lcp lcp_configure lcp_configure_reject + +conf_reject = PPP() / PPP_LCP_Configure(code='Configure-Reject', id=42, + options=[PPP_LCP_Callback_Option(operation='Location identifier', + message='test')]) +conf_reject_ref_data = 'c021042a000b0d070274657374'.decode('hex') + +assert(str(conf_reject) == conf_reject_ref_data) + +conf_reject_pkt = PPP(conf_reject_ref_data) + +assert PPP_LCP_Configure in conf_reject_pkt +assert conf_reject_pkt[PPP_LCP_Configure].code == 4 +assert conf_reject_pkt[PPP_LCP_Configure].id == 42 +assert len(conf_reject_pkt[PPP_LCP_Configure].options) == 1 +assert isinstance(conf_reject_pkt[PPP_LCP_Configure].options[0], PPP_LCP_Callback_Option) +assert conf_reject_pkt[PPP_LCP_Configure].options[0].operation == 2 +assert conf_reject_pkt[PPP_LCP_Configure].options[0].message == 'test' + +conf_req_pkt = PPP('c021012a000b0d070274657374'.decode('hex')) + +assert conf_reject_pkt.answers(conf_req_pkt) +assert not conf_req_pkt.answers(conf_reject_pkt) + += Test LCP Configure options +~ ppp lcp lcp_configure + +conf_req = PPP() / PPP_LCP_Configure(id=42, options=[PPP_LCP_MRU_Option(max_recv_unit=5000), + PPP_LCP_ACCM_Option(accm=0xf0f0f0f0), + PPP_LCP_Auth_Protocol_Option(), + PPP_LCP_Quality_Protocol_Option(data='test'), + PPP_LCP_Magic_Number_Option(magic_number=4242), + PPP_LCP_Callback_Option(operation='Distinguished name',message='test')]) +conf_req_ref_data = 'c021012a0027010413880206f0f0f0f00304c0230408c025746573740506000010920d070474657374'.decode('hex') + +assert(str(conf_req) == conf_req_ref_data) + +conf_req_pkt = PPP(conf_req_ref_data) + +assert PPP_LCP_Configure in conf_req_pkt +options = conf_req_pkt[PPP_LCP_Configure].options +assert len(options) == 6 +assert isinstance(options[0], PPP_LCP_MRU_Option) +assert options[0].max_recv_unit == 5000 +assert isinstance(options[1], PPP_LCP_ACCM_Option) +assert options[1].accm == 0xf0f0f0f0 +assert isinstance(options[2], PPP_LCP_Auth_Protocol_Option) +assert options[2].auth_protocol == 0xc023 +assert isinstance(options[3], PPP_LCP_Quality_Protocol_Option) +assert options[3].quality_protocol == 0xc025 +assert options[3].data == 'test' +assert isinstance(options[4], PPP_LCP_Magic_Number_Option) +assert options[4].magic_number == 4242 +assert isinstance(options[5], PPP_LCP_Callback_Option) +assert options[5].operation == 4 +assert options[5].message == 'test' + += Test LCP Auth option +~ ppp lcp lcp_configure + +pap = PPP_LCP_Auth_Protocol_Option() +pap_ref_data = '0304c023'.decode('hex') + +assert(str(pap) == pap_ref_data) + +pap_pkt = PPP_LCP_Option(pap_ref_data) +assert isinstance(pap_pkt, PPP_LCP_Auth_Protocol_Option) +assert pap_pkt.auth_protocol == 0xc023 + +chap_sha1 = PPP_LCP_Auth_Protocol_Option(auth_protocol='Challenge-response authentication protocol', algorithm="SHA1") +chap_sha1_ref_data = '0305c22306'.decode('hex') + +assert str(chap_sha1) == chap_sha1_ref_data + +chap_sha1_pkt = PPP_LCP_Option(chap_sha1_ref_data) +assert isinstance(chap_sha1_pkt, PPP_LCP_Auth_Protocol_Option) +assert chap_sha1_pkt.auth_protocol == 0xc223 +assert chap_sha1_pkt.algorithm == 6 + +eap = PPP_LCP_Auth_Protocol_Option(auth_protocol='PPP Extensible authentication protocol', data='test') +eap_ref_data = '0308c22774657374'.decode('hex') + +assert str(eap) == eap_ref_data + +eap_pkt = PPP_LCP_Option(eap_ref_data) +assert isinstance(eap_pkt, PPP_LCP_Auth_Protocol_Option) +assert eap_pkt.auth_protocol == 0xc227 +assert eap_pkt.data == 'test' + += Test LCP Code-Reject +~ ppp lcp lcp_code_reject + +code_reject = PPP() / PPP_LCP_Code_Reject(id=42, rejected_packet=PPP_LCP(code=42, id=7, data='unknown_data')) +code_reject_ref_data = 'c021072a00142a070010756e6b6e6f776e5f64617461'.decode('hex') + +assert str(code_reject) == code_reject_ref_data + +code_reject_pkt = PPP(code_reject_ref_data) +assert PPP_LCP_Code_Reject in code_reject_pkt +assert code_reject_pkt[PPP_LCP_Code_Reject].id == 42 +assert isinstance(code_reject_pkt[PPP_LCP_Code_Reject].rejected_packet, PPP_LCP) +assert code_reject[PPP_LCP_Code_Reject].rejected_packet.code == 42 +assert code_reject[PPP_LCP_Code_Reject].rejected_packet.id == 7 +assert code_reject[PPP_LCP_Code_Reject].rejected_packet.data == 'unknown_data' + += Test LCP Protocol-Reject +~ ppp lcp lcp_protocol_reject + +protocol_reject = PPP() / PPP_LCP_Protocol_Reject(id=42, rejected_protocol=0x8039, + rejected_information=Packet('0305c22306'.decode('hex'))) +protocol_reject_ref_data = 'c021082a000b80390305c22306'.decode('hex') + +assert str(protocol_reject) == protocol_reject_ref_data + +protocol_reject_pkt = PPP(protocol_reject_ref_data) +assert PPP_LCP_Protocol_Reject in protocol_reject_pkt +assert protocol_reject_pkt[PPP_LCP_Protocol_Reject].id == 42 +assert protocol_reject_pkt[PPP_LCP_Protocol_Reject].rejected_protocol == 0x8039 +assert len(protocol_reject_pkt[PPP_LCP_Protocol_Reject].rejected_information) == 5 + += Test LCP Discard Request +~ ppp lcp lcp_discard_request + +discard_request = PPP() / PPP_LCP_Discard_Request(id=7, magic_number=4242, data='test') +discard_request_ref_data = 'c0210b07000c0000109274657374'.decode('hex') + +assert str(discard_request) == discard_request_ref_data + +discard_request_pkt = PPP(discard_request_ref_data) +assert PPP_LCP_Discard_Request in discard_request_pkt +assert discard_request_pkt[PPP_LCP_Discard_Request].id == 7 +assert discard_request_pkt[PPP_LCP_Discard_Request].magic_number == 4242 +assert discard_request_pkt[PPP_LCP_Discard_Request].data == 'test' + += Test LCP Terminate-Request/Terminate-Ack +~ ppp lcp lcp_terminate + +terminate_request = PPP() / PPP_LCP_Terminate(id=7, data='test') +terminate_request_ref_data = 'c0210507000874657374'.decode('hex') + +assert str(terminate_request) == terminate_request_ref_data + +terminate_request_pkt = PPP(terminate_request_ref_data) +assert PPP_LCP_Terminate in terminate_request_pkt +assert terminate_request_pkt[PPP_LCP_Terminate].code == 5 +assert terminate_request_pkt[PPP_LCP_Terminate].id == 7 +assert terminate_request_pkt[PPP_LCP_Terminate].data == 'test' + +terminate_ack = PPP() / PPP_LCP_Terminate(code='Terminate-Ack', id=7) +terminate_ack_ref_data = 'c02106070004'.decode('hex') + +assert str(terminate_ack) == terminate_ack_ref_data + +terminate_ack_pkt = PPP(terminate_ack_ref_data) +assert PPP_LCP_Terminate in terminate_ack_pkt +assert terminate_ack_pkt[PPP_LCP_Terminate].code == 6 +assert terminate_ack_pkt[PPP_LCP_Terminate].id == 7 + +assert terminate_ack_pkt.answers(terminate_request_pkt) +assert not terminate_request_pkt.answers(terminate_ack_pkt) + ++ PPTP Tests += Test PPTP Start-Control-Connection-Request +~ pptp +start_control_connection = PPTPStartControlConnectionRequest(framing_capabilities='Asynchronous Framing supported', + bearer_capabilities='Digital access supported', + maximum_channels=42, + firmware_revision=47, + host_name='test host name', + vendor_string='test vendor string') +start_control_connection_ref_data = '009c00011a2b3c4d00010000000100000000000100000002002a00'\ + '2f7465737420686f7374206e616d65000000000000000000000000'\ + '000000000000000000000000000000000000000000000000000000'\ + '0000000000000000000000746573742076656e646f722073747269'\ + '6e6700000000000000000000000000000000000000000000000000'\ + '000000000000000000000000000000000000000000'.decode('hex') + +assert str(start_control_connection) == start_control_connection_ref_data + +start_control_connection_pkt = PPTP(start_control_connection_ref_data) + +assert isinstance(start_control_connection_pkt, PPTPStartControlConnectionRequest) +assert start_control_connection_pkt.magic_cookie == 0x1a2b3c4d +assert start_control_connection_pkt.protocol_version == 1 +assert start_control_connection_pkt.framing_capabilities == 1 +assert start_control_connection_pkt.bearer_capabilities == 2 +assert start_control_connection_pkt.maximum_channels == 42 +assert start_control_connection_pkt.firmware_revision == 47 +assert start_control_connection_pkt.host_name == 'test host name' + '\0' * (64-len('test host name')) +assert start_control_connection_pkt.vendor_string == 'test vendor string' + '\0' * (64-len('test vendor string')) + += Test PPTP Start-Control-Connection-Reply +~ pptp +start_control_connection_reply = PPTPStartControlConnectionReply(result_code='General error', + error_code='Not-Connected', + framing_capabilities='Synchronous Framing supported', + bearer_capabilities='Analog access supported', + vendor_string='vendor') +start_control_connection_reply_ref_data = '009c00011a2b3c4d00020000000102010000000200000001ffff0'\ + '1006c696e75780000000000000000000000000000000000000000'\ + '00000000000000000000000000000000000000000000000000000'\ + '000000000000000000000000076656e646f720000000000000000'\ + '00000000000000000000000000000000000000000000000000000'\ + '00000000000000000000000000000000000000000000000'.decode('hex') + +assert str(start_control_connection_reply) == start_control_connection_reply_ref_data + +start_control_connection_reply_pkt = PPTP(start_control_connection_reply_ref_data) + +assert isinstance(start_control_connection_reply_pkt, PPTPStartControlConnectionReply) +assert start_control_connection_reply_pkt.magic_cookie == 0x1a2b3c4d +assert start_control_connection_reply_pkt.protocol_version == 1 +assert start_control_connection_reply_pkt.result_code == 2 +assert start_control_connection_reply_pkt.error_code == 1 +assert start_control_connection_reply_pkt.framing_capabilities == 2 +assert start_control_connection_reply_pkt.bearer_capabilities == 1 +assert start_control_connection_reply_pkt.host_name == 'linux' + '\0' * (64-len('linux')) +assert start_control_connection_reply_pkt.vendor_string == 'vendor' + '\0' * (64-len('vendor')) + +start_control_connection_request = PPTPStartControlConnectionRequest() + +assert start_control_connection_reply_pkt.answers(start_control_connection_request) +assert not start_control_connection_request.answers(start_control_connection_reply_pkt) + += Test PPTP Stop-Control-Connection-Request +~ pptp +stop_control_connection = PPTPStopControlConnectionRequest(reason='Stop-Local-Shutdown') +stop_control_connection_ref_data = '001000011a2b3c4d0003000003000000'.decode('hex') + +assert str(stop_control_connection) == stop_control_connection_ref_data + +stop_control_connection_pkt = PPTP(stop_control_connection_ref_data) + +assert isinstance(stop_control_connection_pkt, PPTPStopControlConnectionRequest) +assert stop_control_connection_pkt.magic_cookie == 0x1a2b3c4d +assert stop_control_connection_pkt.reason == 3 + += Test PPTP Stop-Control-Connection-Reply +~ pptp +stop_control_connection_reply = PPTPStopControlConnectionReply(result_code='General error',error_code='PAC-Error') +stop_control_connection_reply_ref_data = '001000011a2b3c4d0004000002060000'.decode('hex') + +assert str(stop_control_connection_reply) == stop_control_connection_reply_ref_data + +stop_control_connection_reply_pkt = PPTP(stop_control_connection_reply_ref_data) + +assert isinstance(stop_control_connection_reply_pkt, PPTPStopControlConnectionReply) +assert stop_control_connection_reply_pkt.magic_cookie == 0x1a2b3c4d +assert stop_control_connection_reply_pkt.result_code == 2 +assert stop_control_connection_reply_pkt.error_code == 6 + +stop_control_connection_request = PPTPStopControlConnectionRequest() + +assert stop_control_connection_reply_pkt.answers(stop_control_connection_request) +assert not stop_control_connection_request.answers(stop_control_connection_reply_pkt) + += Test PPTP Echo-Request +~ pptp +echo_request = PPTPEchoRequest(identifier=42) +echo_request_ref_data = '001000011a2b3c4d000500000000002a'.decode('hex') + +assert str(echo_request) == echo_request_ref_data + +echo_request_pkt = PPTP(echo_request_ref_data) + +assert isinstance(echo_request_pkt, PPTPEchoRequest) +assert echo_request_pkt.magic_cookie == 0x1a2b3c4d +assert echo_request_pkt.identifier == 42 + += Test PPTP Echo-Reply +~ pptp +echo_reply = PPTPEchoReply(identifier=42, result_code='OK') +echo_reply_ref_data = '001400011a2b3c4d000600000000002a01000000'.decode('hex') + +assert str(echo_reply) == echo_reply_ref_data + +echo_reply_pkt = PPTP(echo_reply_ref_data) + +assert isinstance(echo_reply_pkt, PPTPEchoReply) +assert echo_reply_pkt.magic_cookie == 0x1a2b3c4d +assert echo_reply_pkt.identifier == 42 +assert echo_reply_pkt.result_code == 1 +assert echo_reply_pkt.error_code == 0 + +echo_request = PPTPEchoRequest(identifier=42) + +assert echo_reply_pkt.answers(echo_request) +assert not echo_request.answers(echo_reply) + +echo_request_incorrect = PPTPEchoRequest(identifier=47) + +assert not echo_reply_pkt.answers(echo_request_incorrect) +assert not echo_request_incorrect.answers(echo_reply_pkt) + += Test PPTP Outgoing-Call-Request +~ pptp +outgoing_call = PPTPOutgoingCallRequest(call_id=4242, call_serial_number=47, + minimum_bps=1000, maximum_bps=10000, + bearer_type='Digital channel', + pkt_window_size=16, pkt_proc_delay=1, + phone_number_len=9, phone_number='123456789', + subaddress='test') +outgoing_call_ref_data = '00a800011a2b3c4d000700001092002f000003e8000027100000000200'\ + '0000030010000100090000313233343536373839000000000000000000'\ + '0000000000000000000000000000000000000000000000000000000000'\ + '0000000000000000000000000000000000746573740000000000000000'\ + '0000000000000000000000000000000000000000000000000000000000'\ + '0000000000000000000000000000000000000000000000'.decode('hex') + +assert str(outgoing_call) == outgoing_call_ref_data + +outgoing_call_pkt = PPTP(outgoing_call_ref_data) + +assert isinstance(outgoing_call_pkt, PPTPOutgoingCallRequest) +assert outgoing_call_pkt.magic_cookie == 0x1a2b3c4d +assert outgoing_call_pkt.call_id == 4242 +assert outgoing_call_pkt.call_serial_number == 47 +assert outgoing_call_pkt.minimum_bps == 1000 +assert outgoing_call_pkt.maximum_bps == 10000 +assert outgoing_call_pkt.bearer_type == 2 +assert outgoing_call_pkt.framing_type == 3 +assert outgoing_call_pkt.pkt_window_size == 16 +assert outgoing_call_pkt.pkt_proc_delay == 1 +assert outgoing_call_pkt.phone_number_len == 9 +assert outgoing_call_pkt.phone_number == '123456789' + '\0' * (64-len('123456789')) +assert outgoing_call_pkt.subaddress == 'test' + '\0' * (64-len('test')) + += Test PPTP Outgoing-Call-Reply +~ pptp +outgoing_call_reply = PPTPOutgoingCallReply(call_id=4243, peer_call_id=4242, + result_code='Busy', error_code='No-Resource', + cause_code=42, connect_speed=5000, + pkt_window_size=32, pkt_proc_delay=3, + channel_id=42) +outgoing_call_reply_ref_data = '002000011a2b3c4d00080000109310920404002a00001388002000030000002a'.decode('hex') + +assert str(outgoing_call_reply) == outgoing_call_reply_ref_data + +outgoing_call_reply_pkt = PPTP(outgoing_call_reply_ref_data) + +assert isinstance(outgoing_call_reply_pkt, PPTPOutgoingCallReply) +assert outgoing_call_reply_pkt.magic_cookie == 0x1a2b3c4d +assert outgoing_call_reply_pkt.call_id == 4243 +assert outgoing_call_reply_pkt.peer_call_id == 4242 +assert outgoing_call_reply_pkt.result_code == 4 +assert outgoing_call_reply_pkt.error_code == 4 +assert outgoing_call_reply_pkt.cause_code == 42 +assert outgoing_call_reply_pkt.connect_speed == 5000 +assert outgoing_call_reply_pkt.pkt_window_size == 32 +assert outgoing_call_reply_pkt.pkt_proc_delay == 3 +assert outgoing_call_reply_pkt.channel_id == 42 + +outgoing_call_request = PPTPOutgoingCallRequest(call_id=4242) + +assert outgoing_call_reply_pkt.answers(outgoing_call_request) +assert not outgoing_call_request.answers(outgoing_call_reply_pkt) + +outgoing_call_request_incorrect = PPTPOutgoingCallRequest(call_id=5656) + +assert not outgoing_call_reply_pkt.answers(outgoing_call_request_incorrect) +assert not outgoing_call_request_incorrect.answers(outgoing_call_reply_pkt) + += Test PPTP Incoming-Call-Request +~ pptp +incoming_call = PPTPIncomingCallRequest(call_id=4242, call_serial_number=47, bearer_type='Digital channel', + channel_id=12, dialed_number_len=9, dialing_number_len=10, + dialed_number='123456789', dialing_number='0123456789', + subaddress='test') +incoming_call_ref_data = '00dc00011a2b3c4d000900001092002f000000020000000c0009000a313233343536373839'\ + '00000000000000000000000000000000000000000000000000000000000000000000000000'\ + '00000000000000000000000000000000000030313233343536373839000000000000000000'\ + '00000000000000000000000000000000000000000000000000000000000000000000000000'\ + '00000000000000007465737400000000000000000000000000000000000000000000000000'\ + '0000000000000000000000000000000000000000000000000000000000000000000000'.decode('hex') + +assert str(incoming_call) == incoming_call_ref_data + +incoming_call_pkt = PPTP(incoming_call_ref_data) + +assert isinstance(incoming_call_pkt, PPTPIncomingCallRequest) +assert incoming_call_pkt.magic_cookie == 0x1a2b3c4d +assert incoming_call_pkt.call_id == 4242 +assert incoming_call_pkt.call_serial_number == 47 +assert incoming_call_pkt.bearer_type == 2 +assert incoming_call_pkt.channel_id == 12 +assert incoming_call_pkt.dialed_number_len == 9 +assert incoming_call_pkt.dialing_number_len == 10 +assert incoming_call_pkt.dialed_number == '123456789' + '\0' * (64-len('123456789')) +assert incoming_call_pkt.dialing_number == '0123456789' + '\0' * (64-len('0123456879')) +assert incoming_call_pkt.subaddress == 'test' + '\0' * (64-len('test')) + += Test PPTP Incoming-Call-Reply +~ pptp +incoming_call_reply = PPTPIncomingCallReply(call_id=4243, peer_call_id=4242, result_code='Connected', + error_code='None', pkt_window_size=16, pkt_transmit_delay=42) +incoming_call_reply_ref_data = '009400011a2b3c4d000a00001093109201000010002a0000'.decode('hex') + +assert str(incoming_call_reply) == incoming_call_reply_ref_data + +incoming_call_reply_pkt = PPTP(incoming_call_reply_ref_data) +assert isinstance(incoming_call_reply_pkt, PPTPIncomingCallReply) +assert incoming_call_reply_pkt.magic_cookie == 0x1a2b3c4d +assert incoming_call_reply_pkt.call_id == 4243 +assert incoming_call_reply_pkt.peer_call_id == 4242 +assert incoming_call_reply_pkt.result_code == 1 +assert incoming_call_reply_pkt.error_code == 0 +assert incoming_call_reply_pkt.pkt_window_size == 16 +assert incoming_call_reply_pkt.pkt_transmit_delay == 42 + +incoming_call_req = PPTPIncomingCallRequest(call_id=4242) + +assert incoming_call_reply_pkt.answers(incoming_call_req) +assert not incoming_call_req.answers(incoming_call_reply) + +incoming_call_req_incorrect = PPTPIncomingCallRequest(call_id=4343) +assert not incoming_call_reply_pkt.answers(incoming_call_req_incorrect) +assert not incoming_call_req_incorrect.answers(incoming_call_reply_pkt) + += Test PPTP Incoming-Call-Connected +~ pptp +incoming_call_connected = PPTPIncomingCallConnected(peer_call_id=4242, connect_speed=47474747, + pkt_window_size=16, pkt_transmit_delay=7, + framing_type='Any type of framing') +incoming_call_connected_ref_data = '001c00011a2b3c4d000b00001092000002d4683b0010000700000003'.decode('hex') + +assert str(incoming_call_connected) == incoming_call_connected_ref_data + +incoming_call_connected_pkt = PPTP(incoming_call_connected_ref_data) +assert isinstance(incoming_call_connected_pkt, PPTPIncomingCallConnected) +assert incoming_call_connected_pkt.magic_cookie == 0x1a2b3c4d +assert incoming_call_connected_pkt.peer_call_id == 4242 +assert incoming_call_connected_pkt.connect_speed == 47474747 +assert incoming_call_connected_pkt.pkt_window_size == 16 +assert incoming_call_connected_pkt.pkt_transmit_delay == 7 +assert incoming_call_connected_pkt.framing_type == 3 + +incoming_call_reply = PPTPIncomingCallReply(call_id=4242) + +assert incoming_call_connected_pkt.answers(incoming_call_reply) +assert not incoming_call_reply.answers(incoming_call_connected_pkt) + +incoming_call_reply_incorrect = PPTPIncomingCallReply(call_id=4243) + +assert not incoming_call_connected_pkt.answers(incoming_call_reply_incorrect) +assert not incoming_call_reply_incorrect.answers(incoming_call_connected_pkt) + += Test PPTP Call-Clear-Request +~ pptp +call_clear_request = PPTPCallClearRequest(call_id=4242) +call_clear_request_ref_data = '001000011a2b3c4d000c000010920000'.decode('hex') + +assert str(call_clear_request) == call_clear_request_ref_data + +call_clear_request_pkt = PPTP(call_clear_request_ref_data) + +assert isinstance(call_clear_request_pkt, PPTPCallClearRequest) +assert call_clear_request_pkt.magic_cookie == 0x1a2b3c4d +assert call_clear_request_pkt.call_id == 4242 + += Test PPTP Call-Disconnect-Notify +~ pptp +call_disconnect_notify = PPTPCallDisconnectNotify(call_id=4242, result_code='Admin Shutdown', error_code='None', + cause_code=47, call_statistic='some description') +call_disconnect_notify_ref_data = '009400011a2b3c4d000d000010920300002f0000736f6d65206465736372697074696'\ + 'f6e000000000000000000000000000000000000000000000000000000000000000000'\ + '000000000000000000000000000000000000000000000000000000000000000000000'\ + '000000000000000000000000000000000000000000000000000000000000000000000'\ + '00000000000000000000'.decode('hex') + +assert str(call_disconnect_notify) == call_disconnect_notify_ref_data + +call_disconnect_notify_pkt = PPTP(call_disconnect_notify_ref_data) + +assert isinstance(call_disconnect_notify_pkt, PPTPCallDisconnectNotify) +assert call_disconnect_notify_pkt.magic_cookie == 0x1a2b3c4d +assert call_disconnect_notify_pkt.call_id == 4242 +assert call_disconnect_notify_pkt.result_code == 3 +assert call_disconnect_notify_pkt.error_code == 0 +assert call_disconnect_notify_pkt.cause_code == 47 +assert call_disconnect_notify_pkt.call_statistic == 'some description' + '\0' * (128-len('some description')) + += Test PPTP WAN-Error-Notify +~ pptp +wan_error_notify = PPTPWANErrorNotify(peer_call_id=4242, crc_errors=1, framing_errors=2, + hardware_overruns=3, buffer_overruns=4, time_out_errors=5, + alignment_errors=6) +wan_error_notify_ref_data = '002800011a2b3c4d000e000010920000000000010000000200000003000000040000000500000006'\ + .decode('hex') + +assert str(wan_error_notify) == wan_error_notify_ref_data + +wan_error_notify_pkt = PPTP(wan_error_notify_ref_data) + +assert isinstance(wan_error_notify_pkt, PPTPWANErrorNotify) +assert wan_error_notify_pkt.magic_cookie == 0x1a2b3c4d +assert wan_error_notify_pkt.peer_call_id == 4242 +assert wan_error_notify_pkt.crc_errors == 1 +assert wan_error_notify_pkt.framing_errors == 2 +assert wan_error_notify_pkt.hardware_overruns == 3 +assert wan_error_notify_pkt.buffer_overruns == 4 + += Test PPTP Set-Link-Info +~ pptp +set_link_info = PPTPSetLinkInfo(peer_call_id=4242, send_accm=0x0f0f0f0f, receive_accm=0xf0f0f0f0) +set_link_info_ref_data = '001800011a2b3c4d000f0000109200000f0f0f0ff0f0f0f0'.decode('hex') + +assert str(set_link_info) == set_link_info_ref_data + +set_link_info_pkt = PPTP(set_link_info_ref_data) + +assert isinstance(set_link_info_pkt, PPTPSetLinkInfo) +assert set_link_info_pkt.magic_cookie == 0x1a2b3c4d +assert set_link_info_pkt.peer_call_id == 4242 +assert set_link_info_pkt.send_accm == 0x0f0f0f0f +assert set_link_info_pkt.receive_accm == 0xf0f0f0f0