Denizhalil

Python TCP/UDP Network Traffic Listener (Part 1)

Introduction:

Network traffic analysis is an essential part of gaining insights into cybersecurity, monitoring network performance, and understanding network communication. Using a powerful language like Python, we can listen to network traffic, analyze it, and make sense of it. In this article, we’ll learn how to listen to and analyze network traffic using Python. We’ll start by taking a look at the basic network functions in Python and then learn how to create our own network traffic listener.

Learning Objectives:

  1. Understand basic network functions in Python.
  2. Use Socket programming for network traffic listener applications.
  3. Analyze IPv4 and TCP/UDP packets using structured data.
  4. Learn fundamental concepts used in network traffic analysis.

Project Objectives:

The purpose of this project is to provide a basis for creating a network traffic sniffer and analyzing TCP/UDP packets using Python. After completing the project, you will have a set of tools that you can use to listen to network traffic and perform analysis on it.

A Brief Look at Socket:

Socket programming is a fundamental API used to provide data communication between computers. In Python, we can use the socket module to leverage this functionality. The basic principle of socket programming is to establish a connection between two devices communicating. For more, you can refer to the Python Socket Programming Guide.

Let’s Write Our Network Traffic Listener Code:

  1. Importing Required Modules:
import socket
import struct
import textwrap
from colorama import Fore, Style
  • socket: Module used for socket operations for network communication.
  • struct: Module used for structured packing and unpacking of data packets.
  • textwrap: Module used for text formatting operations.
  • colorama: Module used for colored outputs.
  1. Defining the NetworkSniffer Class:
class NetworkSniffer:
    def __init__(self):
        self.conn = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_TCP)
        self.conn.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
  • NetworkSniffer: Network traffic listener class.
  1. Main Function: Listening and Analyzing Network Traffic:
def run(self):
    # Continuously receive and process data.
    while True:
        # Receive data from the socket and get the address.
        raw_data, addr = self.conn.recvfrom(65536)
        # Process received data as an IPv4 packet.
        dest_ip, src_ip, data = self.ipv4_packet(raw_data)
        # Print IPv4 packet information.
        print(f'\n{Fore.GREEN}IPv4 Packet:{Style.RESET_ALL}')
        print('Source IP: {}, Destination IP: {}'.format(src_ip, dest_ip))

        # Process as TCP or UDP based on protocol number.
        if data['proto'] == 6:  # TCP
            tcp_data = self.tcp_segment(data['data'])
            self.print_tcp_segment(tcp_data)
        elif data['proto'] == 17:  # UDP
            udp_data = self.udp_segment(data['data'])
            self.print_udp_segment(udp_data)
  • run: Main function for listening and analyzing network traffic.
  1. Parsing IPv4 Packet:
def ipv4_packet(self, data):
    version_header_length = data[0]
    version = version_header_length >> 4              # First 4 bits: Version
    header_length = (version_header_length & 15) * 4  # Last 4 bits: Header Length
    # Extract TTL, protocol, source, and destination address information from the packet.
    ttl, proto, src, dest = struct.unpack('! 8x B B 2x 4s 4s', data[:20])
    # Convert source and destination addresses to IPv4 format and return with packet data.
    return self.ipv4(src), self.ipv4(dest), {'version': version, 'header_length': header_length,
                                             'ttl': ttl, 'proto': proto, 'data': data[header_length:]}
  • ipv4_packet: Function for parsing IPv4 packet.
  1. Formatting IPv4 Address:
def ipv4(self, addr):
    return '.'.join(map(str, addr))
  • ipv4: Function for formatting IPv4 address.
  1. Parsing TCP Segment:
def tcp_segment(self, data):
    # Unpack the data and assign to appropriate variables to extract information from the packet.
    (src_port, dest_port, sequence, acknowledgment, offset_reserved_flags) = struct.unpack('! H H L L H', data[:14])

    # Calculate the offset to determine the length of the header at the beginning of the TCP segment.
    offset = (offset_reserved_flags >> 12) * 4

    # Calculate values of different flags in the TCP header.
    flag_urg = (offset_reserved_flags & 32) >> 5
    flag_ack = (offset_reserved_flags & 16) >> 4
    flag_psh = (offset_reserved_flags & 8) >> 3
    flag_rst = (offset_reserved_flags & 4) >> 2
    flag_syn = (offset_reserved_flags & 2) >> 1
    flag_fin = offset_reserved_flags & 1

    # Gather TCP segment data and flags in a dictionary and return.
    return {'src_port': src_port, 'dest_port': dest_port, 'sequence': sequence, 'acknowledgment': acknowledgment, 'flag_urg':
        flag_urg, 'flag_ack': flag_ack, 'flag_psh': flag_psh, 'flag_rst': flag_rst, 'flag_syn': flag_syn, 'flag_fin': flag_fin, 'data': data[offset:]}
  • tcp_segment: Function for parsing TCP segment.
  1. Printing TCP Segment:
def print_tcp_segment(self, tcp_data):
    print(f'\n{Fore.GREEN}TCP Segment:{Style.RESET_ALL}')
    print('Source Port: {}, Destination Port: {}'.format(tcp_data['src_port'], tcp_data['dest_port']))
    print('Sequence: {}, Acknowledgment: {}'.format(tcp

_data['sequence'], tcp_data['acknowledgment']))
    print(f'{Fore.GREEN}Flags:{Style.RESET_ALL}')
    print('URG: {}, ACK: {}, PSH: {}, RST: {}, SYN: {}, FIN:{}'.format(tcp_data['flag_urg'], tcp_data['flag_ack'],
                                                                       tcp_data['flag_psh'], tcp_data['flag_rst'], tcp_data['flag_syn'], tcp_data['flag_fin']))
    print(f'{Fore.GREEN}Data:{Style.RESET_ALL}')
    print(self.format_multi_line('\t\t', tcp_data['data']))
  • print_tcp_segment: Function for printing TCP segment.
  1. Parsing UDP Segment:
def udp_segment(self, data):
    # Extract source port, destination port, and size information from the first 8 bytes of the packet.
    src_port, dest_port, size = struct.unpack('! H H 2x H', data[:8])
    # Gather obtained information in a dictionary along with packet data and return.
    return {'src_port': src_port, 'dest_port': dest_port, 'size': size, 'data': data[8:]}
  • udp_segment: Function for parsing UDP segment.
  1. Printing UDP Segment:
def print_udp_segment(self, udp_data):
    print(f'\n{Fore.GREEN}UDP Segment:{Style.RESET_ALL}')
    print('Source Port: {}, Destination Port: {}, Length: {}'.format(udp_data['src_port'], udp_data['dest_port'], udp_data['size']))
  • print_udp_segment: Function for printing UDP segment.
  1. Formatting Multi-Line Text:
def format_multi_line(self, prefix, string, size=80):
    # Subtract prefix length from size.
    size -= len(prefix)
    # If the incoming string is a byte string, convert it to hexadecimal format.
    if isinstance(string, bytes):
        string = ''.join(r'\x{:02x}'.format(byte) for byte in string)
        # If size is odd, an extra space will be allocated for single-digit bytes.
        if size % 2:
            size -= 1
    # Split the string into multiple lines with the specified size and add prefix to each line.
    return '\n'.join([prefix + line for line in textwrap.wrap(string, size)])
  • format_multi_line: Function for formatting multi-line text.
  1. Main Program:
# Run the main program
sniffer = NetworkSniffer()
sniffer.run()
  • Creating and running the network traffic listener class.
Python network data analyzer
Network packet sniffer with Python
Python network traffic monitoring tutorial
How to build a network traffic analyzer in Python

Conclusion:

Listening to and analyzing network traffic using Python is a powerful and flexible approach. In this article, we introduced the basics of network programming and used Python’s provided tools to create our own network traffic listener. With this foundational knowledge, we can now develop more complex network analysis tools and delve deeper into network traffic analysis.

References:

  • Beginning Your Journey in Programming and Cybersecurity – Navigating the Digital Future. Buy Me a Coffee
  • Mastering Scapy: A Comprehensive Guide to Network Analysis. Buy Me a Coffee
  • Creating an FTP Listener with Scapy for Network Security. Blog Post
  • Python Socket Programming: Building a Server and Client. Blog Post

A great way to stay updated until the next article is to join our Discord channel. You can join our Discord channel here!

1 thought on “Python TCP/UDP Network Traffic Listener (Part 1)”

Leave a Comment

Join our Mailing list!

Get all latest news, exclusive deals and academy updates.