I have written my first CTF challenge for Hack.lu 2019 CTF: Futuristic Communication. This article features how the challenge was built and how to solve it.
Table Of Contents
The source
The full challenge source is now public. You can get it on my GitLab here.
Running the challenge at your own
The challenge can be run using a single command because it was dockered 🐳. You dont have to build the container, it will get pulled out of my container registry.
docker run -dit --name hacklu_futuristic_communication -p 1337:1337 --cap-add=NET_ADMIN --net=host gitlab.p4ck3t0.de:4567/ctf/hacklu_2019/futuristic_communication:latest
The idea
I have done the challenge because when I am playing CTF, I always look out for some network challenges. So when I get the chance of doing my own challenge, it was clear I am doing a network challenge. The idea was pretty easy. Network engineers do either love or hate PCAPS. I love them, so I wanted to make a network analysis challenge. I also wanted a challenge you can not solve with the ‘standard’ CTF tool pwntools. Maybe you have done the challenge with scapy, or maybe with raw sockets which I did not even thinking of. For the challenge and my exploit I only used scapy.
Analyze the challenge
The challenge is written in Python and only uses the scapy module. So let’s do a short and quick analysis.
Analyze the first stage
For the first stage the class Player
is important. So let’s have a look at it.
class player:
def __init__(self, second_stage_bool):
self.second_stage_bool = second_stage_bool
self.ip = None
self.stop = False
The player class contains three attributes: second_stage_bool
, ip
and stop
. second_stage_bool
is for starting the second stage when the attribute is True
. The attribute ip
is for storing the ip adress of the player. The last attribute stop
is for stopping the first stage.
Thats all we need to know about the Player
class. Let’s have a look at the first_stage
function.
def first_stage(iface, ident, team):
first_stage_port=random.randrange(10000, 25000, 1)
print("Waiting for identification, on TCP port " + str(first_stage_port))
first_stage_sniff(first_stage_port, iface, ident, team)
This function starts the first stage. First, a unique port is defined for each new connection. This is essential because it makes the challenge a bit harder and also each connection has its own port. I use the port when starting the sniffer, waiting for the identifier.
def first_stage_filter_sniff(ident, team):
def first_stage_filter_sniff_prn(pkt):
if Raw in pkt and str(pkt.getlayer(Raw)) == "b'" + ident + "'":
print ("Identification successful. Waiting for further instructions.")
team.second_stage_bool=True
team.ip = str(pkt[IP].src)
team.stop = True
return first_stage_filter_sniff_prn
def first_stage_sniff(first_stage_port, iface, ident, team):
sniff(iface=iface, stop_filter=lambda x:team.stop, prn=first_stage_filter_sniff(ident, team), filter='dst port ' + str(first_stage_port), timeout=10)
Here is the scapy sniff function, which catches all traffic on the given port and checks for the identifier. If the identifier is correct, the team.second_stage_bool
is set to True
and the player can enter the second stage.
Analyze the second stage
For the second stage, there is also an important class, which contains the commands to solve the challenge.
class handshake_commands:
def __init__(self, sync):
self.flag = "flag{Ack?4ck_Ack!w4it_d1d_y0u_4v4n_syn}"
self.synchronize_number = sync
self.sync_counter = 0
self.priv = False
def privileged(self):
print("Check if the connection is synchronized.")
if self.sync_counter == self.synchronize_number:
self.priv = True
print("Connection synchronized. Entering privileged mode.")
else:
print("Not synchronized. Exiting")
exit(1)
def synchronize(self):
print("synchronize... success")
self.sync_counter += 1
def print_flag(self):
if self.priv == True:
print(self.flag)
exit(1)
else:
print("You are not privileged. Exiting")
exit(1)
The only important things are to know how the commands synchronize
, privileged
and flag
are working. I think here the code documents itself, because it is not hard to understand. The second_stage
function is more complex:
def second_stage(iface, ident, team):
sync = random.randrange(20, 30, 1)
second_stage_port = random.randrange(25001, 60000, 1)
seq = random.randrange(200000000, 4294900000, 1)
secret = handshake_commands(sync)
print("Expecting handshake, on TCP port " + str(second_stage_port) + ", with sequence number " + str(seq))
second_stage_sniff(second_stage_port, iface, ident, seq, sync, secret, team.ip)
This function also first defines a uniqe port, sequence number and the sync value which is the value how often you have to send the synchronize. Then the sniffer is started.
def second_stage_filter_sniff(ident, seq, sync, secret):
def second_stage_filter_sniff_prn(pkt):
if Raw in pkt and str(pkt.getlayer(Raw)) == "b'" + ident + "'" and pkt[TCP].flags == 'S' and pkt[TCP].seq == seq:
send(IP(dst=pkt[IP].src, src=pkt[IP].dst)/TCP(dport=pkt[TCP].sport, sport=pkt[TCP].dport, seq=pkt[TCP].seq, ack=pkt[TCP].seq - sync, flags='SA'), verbose=0)
elif Raw in pkt and str(pkt.getlayer(Raw)) == "b'" + ident + "'" and pkt[TCP].flags == 'A' and pkt[TCP].seq == seq -sync and pkt[TCP].ack == seq:
print("Connection setup noticed. Your Commands:\nsynchronize\nprivileged\nflag\n\nAll your TCP flags should be SYN flags.")
elif Raw in pkt and str(pkt.getlayer(Raw)) == "b'synchronize'" and pkt[TCP].ack == seq and pkt[TCP].seq == seq - sync + secret.sync_counter and pkt[TCP].flags == 'S':
secret.synchronize()
elif Raw in pkt and str(pkt.getlayer(Raw)) == "b'privileged'" and pkt[TCP].seq == seq and pkt[TCP].ack == pkt[TCP].seq and pkt[TCP].flags == 'S':
secret.privileged()
elif Raw in pkt and str(pkt.getlayer(Raw)) == "b'flag'" and pkt[TCP].flags == 'S' and pkt[TCP].seq == seq:
secret.print_flag()
else:
print("Invalid packet. Exiting")
exit(1)
return second_stage_filter_sniff_prn
def second_stage_sniff(second_stage_port, iface, ident, seq, sync, secret, ip):
sniff(iface=iface, prn=second_stage_filter_sniff(ident, seq, sync, secret), filter='host '+ str(ip) + ' and dst port ' + str(second_stage_port), timeout=60)
Notice the filter at the sniff function. The second stage not only filters for port but also for the team ip from stage one. Each packet arriving is passed to the prn
function and if it matches any of the conditions, the actions will trigger. Note that if your packet doesn’t match, you will drop into into the else
case, which causes a ‘Invalid Packet. Exiting’ msg. When sending a synchronize, your secret number should be increasing by one for every synchronize you send. The privileged mode can only be entered when you have sent enough synchronize packets and your sequence and ack numbers are equal to your personal sequence number. Then the flag is printed when privileged mode has been entered. 🚩
Analyze the exploit source
The exploit analysis will be quite short because there is nothing special. I think everybody has done it on their own way. There were only two major functions I will show you:
def getident(q, ippkt, sport):
def upload_packet(packet):
if Raw in packet and packet[TCP].flags == 'PA':
payload = str(packet.getlayer(Raw)).strip()
print(payload)
#get the identifiyer
if payload.startswith("Welcome"):
q.put(payload[61:97])
q.put(payload[138:143])
#get my personal sequece number
if payload.startswith("Expecting handshake"):
q.put(payload[33:38])
q.put(payload[61:71])
if Padding in packet:
ack = (packet.seq + len(str(packet.getlayer(Raw)))) - len(str(packet.getlayer(Padding)))
else:
ack = packet.seq + len(str(packet.getlayer(Raw)))
ackpkt = TCP(sport=sport, dport=conport, flags='A', seq=packet.ack, ack=ack)
send (ippkt/ackpkt, verbose=0)
return upload_packet
def sniffack(q, ippkt, sport, iface):
sniff(iface=iface, prn=getident(q, ippkt, sport), filter ='dst port ' + str(sport))
def waitack(q, ippkt, sport, iface):
t = Process(target=sniffack, args=(q, ippkt, sport, iface,))
return t
This function is for sending an ACK
on every PUSH/ACK
package on the main TCP stream. It also looks out for the identifier and gets the sequence number using a queue, because it runs as a process in the background. Then there is this other sniff function for getting the ‘magic value’ or offset how much I have to synchronize.
def stage_two_prn(seq, q):
def stage_two_prn_sniff(pkt):
if pkt[TCP].seq == int(seq) and pkt[TCP].flags == 'SA':
q.put(str(int(pkt[TCP].seq) - int(pkt[TCP].ack)))
return stage_two_prn_sniff
def stage_two_sniff(iface, seq, q, sport):
sniff(iface=iface, prn=stage_two_prn(seq, q), filter ='dst port ' + str(sport))
def stage_two_start(seq, ippkt, second_stage_port, q, iface, sport):
x=ippkt/TCP(dport=int(second_stage_port), sport=sport, flags='S', seq=int(seq))
sniffproc = Process(target=stage_two_sniff, args=(iface, seq, q, sport,))
return x, sniffproc
The sniffer for the second stage is only for getting the magic value from the SYN/ACK
. Once I have the values, the rest is simply sending the requested packeges.
All in all, I am very satisfied with the feedback on the challenge. I think for Hack.lu 2020 there will be some more of these network related challenges, but this time with real vulns.