参考了Python渗透测试的思路,只是修改消息传递方式为queue,固定有客户端发起链接。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# -*- coding: utf-8 -*- import socket import ssl import threading import queue import time fromCliQueue = queue.Queue() fromSrvQueue = queue.Queue() sFlag = False ## Client -- proxyServer -- proxyClient -- Server class proxyClient(threading.Thread): def __init__(self): threading.Thread.__init__(self) self.sock = None self.ssl_context = ssl.create_default_context(cafile='RootCA.pem') self.ssl_context.load_cert_chain("Client.pem", "Client.key") self.ssl_context.protocol = ssl.PROTOCOL_TLSv1_2 self.ssl_context.check_hostname = False def run(self): global fromCliQueue global fromSrvQueue while True: #if mesg queue from client is not empty, send it to server and recv response if not fromCliQueue.empty(): if self.sock is None: self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) self.sock.connect(('127.0.0.1', 9999)) else: sdata = fromCliQueue.get() print("[proxyClient]send to server: {}".format(sdata)) self.sock.send(sdata) rdata = self.sock.recv(4096) #解决recv总是收到空字符 if rdata != b'': print("[proxyClient]recv from server: {}".format(rdata)) fromSrvQueue.put(rdata) self.sslDetected(rdata) def sslDetected(self, data): if b'\x01\x00\x00\x00' in data: self.sock = self.ssl_context.wrap_socket(self.sock) print("[proxyClient]ssl neogotiation") return True else: return False class proxyServer(threading.Thread): def __init__(self): threading.Thread.__init__(self) self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.sock.bind(('127.0.0.1',8888)) self.sock.listen(1) self.ssl_context = ssl.SSLContext() # self.ssl_context.verify_mode = ssl.CERT_REQUIRED self.ssl_context.load_verify_locations(cafile='RootCA.pem') # self.ssl_context.check_hostname = False # test this protocol with client's PROTOCOL_TLSv1_2 self.ssl_context.protocol = ssl.PROTOCOL_TLS_SERVER self.ssl_context.load_cert_chain("Server.pem", "Server.key") def run(self): global fromSrvQueue global fromCliQueue self.proxySocket, addr = self.sock.accept() while self.proxySocket: rdata = self.proxySocket.recv(4096) if rdata != b'': print("[proxyServer]recv from client: {}".format(rdata)) fromCliQueue.put(rdata) while True: if fromSrvQueue.empty(): time.sleep(1) continue else: sdata = fromSrvQueue.get() print("[proxyServer]send to client: {}".format(sdata)) self.proxySocket.send(sdata) self.sslDetected(sdata) break; def sslDetected(self, data): if b'\x01\x00\x00\x00' in data: self.proxySocket = self.ssl_context.wrap_socket(self.proxySocket, server_side=True) print("[proxyClient]ssl neogotiation") return True else: return False if __name__ == "__main__": proxyCli = proxyClient() proxyCli.daemon = True proxyCli.start() proxySrv = proxyServer() proxySrv.daemon = True proxySrv.start() proxySrv.join() |
sslDetected的作用是为了匹配某种特殊的情况:
- client-server先交互一些普通socket数据,然后开始ssl协商
假设已经获取server端和客户端的证书,此Proxy脚本可以伪造客户端和服务端,实现MITM的场景。
如果需要插入数据,可以再增加一个队列,额外起一个socketserver,来实现外部输入注入,亲测可行。O(∩_∩)O哈哈~,上面的脚本在以前的python ssl客户端和服务端验证通过,代码在这里(python ssl )
原脚本中的hexdump可以通过 pip install hexdump 安装现成的包,然后from hexdump import hexdump方式原生继承使用