Hack.lu 2019 Futuristic Communication

Writeup for the "Futuristic Communication" challenge (misc 390) from Hack.lu CTF 2019

Hack.lu 2019 Futuristic Communication

Writeup for the "Futuristic Communication" challenge (misc 390) from Hack.lu CTF 2019

I have written my first CTF challenge for Hack.lu 2019 CTF: Futuristic Communication. This article features how the challenge was built and how to solve it.

Table of Contents

  1. The source
    1. Running the challenge
  2. The idea
  3. Analyze the challenge source
    1. Analyze the first stage
    2. Analyze the second stage
  4. Analyze the exploit source

The source

The full challenge source is now public. You can get it on my GitLab here.

Running the challenge at your own

The challenge can be run using a single command because it was dockered 🐳. You dont have to build the container, it will get pulled out of my container registry.

docker run -dit --name hacklu_futuristic_communication -p 1337:1337 --cap-add=NET_ADMIN --net=host gitlab.p4ck3t0.de:4567/ctf/hacklu_2019/futuristic_communication:latest

The idea

I have done the challenge because when I am playing CTF, I always look out for some network challenges. So when I get the chance of doing my own challenge, it was clear I am doing a network challenge. The idea was pretty easy. Network engineers do either love or hate PCAPS. I love them, so I wanted to make a network analysis challenge. I also wanted a challenge you can not solve with the ‘standard’ CTF tool pwntools. Maybe you have done the challenge with scapy, or maybe with raw sockets which I did not even thinking of. For the challenge and my exploit I only used scapy.

Analyze the challenge

The challenge is written in Python and only uses the scapy module. So let’s do a short and quick analysis.

Analyze the first stage

For the first stage the class Player is important. So let’s have a look at it.

class player:
    def __init__(self, second_stage_bool):
        self.second_stage_bool = second_stage_bool
        self.ip = None
        self.stop = False

The player class contains three attributes: second_stage_bool, ip and stop. second_stage_bool is for starting the second stage when the attribute is True. The attribute ip is for storing the ip adress of the player. The last attribute stop is for stopping the first stage. Thats all we need to know about the Player class. Let’s have a look at the first_stage function.

def first_stage(iface, ident, team):
    first_stage_port=random.randrange(10000, 25000, 1)
    print("Waiting for identification, on TCP port " + str(first_stage_port))
    first_stage_sniff(first_stage_port, iface, ident, team)

This function starts the first stage. First, a unique port is defined for each new connection. This is essential because it makes the challenge a bit harder and also each connection has its own port. I use the port when starting the sniffer, waiting for the identifier.

def first_stage_filter_sniff(ident, team):
    def first_stage_filter_sniff_prn(pkt):
        if Raw in pkt and str(pkt.getlayer(Raw)) == "b'" + ident + "'":
            print ("Identification successful. Waiting for further instructions.")
            team.second_stage_bool=True
            team.ip = str(pkt[IP].src)
            team.stop = True
    return first_stage_filter_sniff_prn


def first_stage_sniff(first_stage_port, iface, ident, team):
    sniff(iface=iface, stop_filter=lambda x:team.stop, prn=first_stage_filter_sniff(ident, team), filter='dst port ' + str(first_stage_port), timeout=10)

Here is the scapy sniff function, which catches all traffic on the given port and checks for the identifier. If the identifier is correct, the team.second_stage_bool is set to True and the player can enter the second stage.

Analyze the second stage

For the second stage, there is also an important class, which contains the commands to solve the challenge.

class handshake_commands:
    def __init__(self, sync):
        self.flag = "flag{Ack?4ck_Ack!w4it_d1d_y0u_4v4n_syn}"
        self.synchronize_number = sync
        self.sync_counter = 0
        self.priv = False

    def privileged(self):
        print("Check if the connection is synchronized.")
        if self.sync_counter == self.synchronize_number:
            self.priv = True
            print("Connection synchronized. Entering privileged mode.")
        else:
            print("Not synchronized. Exiting")
            exit(1)

    def synchronize(self):
        print("synchronize... success")
        self.sync_counter += 1

    def print_flag(self):
        if self.priv == True:
            print(self.flag)
            exit(1)
        else:
            print("You are not privileged. Exiting")
            exit(1)

The only important things are to know how the commands synchronize, privileged and flag are working. I think here the code documents itself, because it is not hard to understand. The second_stage function is more complex:

def second_stage(iface, ident, team):
    sync = random.randrange(20, 30, 1)
    second_stage_port = random.randrange(25001, 60000, 1)
    seq = random.randrange(200000000, 4294900000, 1)
    secret = handshake_commands(sync)
    print("Expecting handshake, on TCP port " + str(second_stage_port) + ", with sequence number " + str(seq))
    second_stage_sniff(second_stage_port, iface, ident, seq, sync, secret, team.ip)

This function also first defines a uniqe port, sequence number and the sync value which is the value how often you have to send the synchronize. Then the sniffer is started.

def second_stage_filter_sniff(ident, seq, sync, secret):
    def second_stage_filter_sniff_prn(pkt):
        
        if Raw in pkt and str(pkt.getlayer(Raw)) == "b'" + ident + "'" and pkt[TCP].flags == 'S' and pkt[TCP].seq == seq:
            send(IP(dst=pkt[IP].src, src=pkt[IP].dst)/TCP(dport=pkt[TCP].sport, sport=pkt[TCP].dport, seq=pkt[TCP].seq, ack=pkt[TCP].seq - sync, flags='SA'), verbose=0)
        
        elif Raw in pkt and str(pkt.getlayer(Raw)) == "b'" + ident + "'" and pkt[TCP].flags == 'A' and pkt[TCP].seq == seq -sync and pkt[TCP].ack == seq:
            print("Connection setup noticed. Your Commands:\nsynchronize\nprivileged\nflag\n\nAll your TCP flags should be SYN flags.")
        
        elif Raw in pkt and str(pkt.getlayer(Raw)) == "b'synchronize'" and pkt[TCP].ack == seq and pkt[TCP].seq == seq - sync + secret.sync_counter and pkt[TCP].flags == 'S':
            secret.synchronize()
        
        elif Raw in pkt and str(pkt.getlayer(Raw)) == "b'privileged'" and pkt[TCP].seq == seq and pkt[TCP].ack == pkt[TCP].seq and pkt[TCP].flags == 'S':
            secret.privileged()
        
        elif Raw in pkt and str(pkt.getlayer(Raw)) == "b'flag'" and pkt[TCP].flags == 'S' and pkt[TCP].seq == seq:
            secret.print_flag()
        
        else:
            print("Invalid packet. Exiting")
            exit(1)
    
    return second_stage_filter_sniff_prn

def second_stage_sniff(second_stage_port, iface, ident, seq, sync, secret, ip):
    sniff(iface=iface, prn=second_stage_filter_sniff(ident, seq, sync, secret), filter='host '+ str(ip) + ' and dst port ' + str(second_stage_port), timeout=60)

Notice the filter at the sniff function. The second stage not only filters for port but also for the team ip from stage one. Each packet arriving is passed to the prn function and if it matches any of the conditions, the actions will trigger. Note that if your packet doesn’t match, you will drop into into the else case, which causes a ‘Invalid Packet. Exiting’ msg. When sending a synchronize, your secret number should be increasing by one for every synchronize you send. The privileged mode can only be entered when you have sent enough synchronize packets and your sequence and ack numbers are equal to your personal sequence number. Then the flag is printed when privileged mode has been entered. 🚩

Analyze the exploit source

The exploit analysis will be quite short because there is nothing special. I think everybody has done it on their own way. There were only two major functions I will show you:

def getident(q, ippkt, sport):
    def upload_packet(packet):
        if Raw in packet and packet[TCP].flags == 'PA':
            payload = str(packet.getlayer(Raw)).strip()
            print(payload)
            
            #get the identifiyer
            if payload.startswith("Welcome"):
                q.put(payload[61:97])
                q.put(payload[138:143])
            
            #get my personal sequece number
            if payload.startswith("Expecting handshake"):
                q.put(payload[33:38])
                q.put(payload[61:71])
            
            if Padding in packet:
                ack = (packet.seq + len(str(packet.getlayer(Raw)))) - len(str(packet.getlayer(Padding)))
            else:
                ack = packet.seq + len(str(packet.getlayer(Raw)))
            ackpkt = TCP(sport=sport, dport=conport, flags='A', seq=packet.ack, ack=ack)
            send (ippkt/ackpkt, verbose=0)
    return upload_packet

def sniffack(q, ippkt, sport, iface):
    sniff(iface=iface, prn=getident(q, ippkt, sport), filter ='dst port ' + str(sport))

def waitack(q, ippkt, sport, iface):
    t = Process(target=sniffack, args=(q, ippkt, sport, iface,))
    return t

This function is for sending an ACK on every PUSH/ACK package on the main TCP stream. It also looks out for the identifier and gets the sequence number using a queue, because it runs as a process in the background. Then there is this other sniff function for getting the ‘magic value’ or offset how much I have to synchronize.

def stage_two_prn(seq, q):
    def stage_two_prn_sniff(pkt):
        if pkt[TCP].seq == int(seq) and pkt[TCP].flags == 'SA':
            q.put(str(int(pkt[TCP].seq) - int(pkt[TCP].ack)))
    return stage_two_prn_sniff

def stage_two_sniff(iface, seq, q, sport):
    sniff(iface=iface, prn=stage_two_prn(seq, q), filter ='dst port ' + str(sport))

def stage_two_start(seq, ippkt, second_stage_port, q, iface, sport):
    x=ippkt/TCP(dport=int(second_stage_port), sport=sport, flags='S', seq=int(seq))
    sniffproc = Process(target=stage_two_sniff, args=(iface, seq, q, sport,))
    return x, sniffproc

The sniffer for the second stage is only for getting the magic value from the SYN/ACK. Once I have the values, the rest is simply sending the requested packeges.

All in all, I am very satisfied with the feedback on the challenge. I think for Hack.lu 2020 there will be some more of these network related challenges, but this time with real vulns.


See also