DZone Snippets is a public source code repository. Easily build up your personal collection of code snippets, categorize them with tags / keywords, and share them with the world

Snippets has posted 5883 posts at DZone. View Full User Profile

Multiprocessing With ARM-console Guessfunc

06.15.2011
| 3535 views |
  • submit to reddit
        // When processing larger dumps (like a camera firmware dump), it will lock up (not print progress), but after 5-10 minutes it will beging to print output?? I have something here wrong...

# ARM firmware analysis console for Magic Lantern
# http://magiclantern.wikia.com/wiki/GPL_Tools/ARM_console
#
# (C) 2010 Alex Dumitrache <broscutamaker@gmail.com>
# License: GPL
#
# Module guessfunc: try to guess where are the functions inside the firmware

from profilestats import profile
from multiprocessing import Pool
import time
from scripts import *

def analyze_push(d):
    print "-----------------"
    print "Analyzing PUSH..."
    print "-----------------"
    select_dump(d)
    progress("Analyzing PUSH instructions...")
    for a in range(d.minaddr, d.maxaddr, 4):
        progress(float(a - d.minaddr) / (d.maxaddr - d.minaddr))
        if GetMnem(a) == "PUSH":
            progress("Analyzing PUSH instructions...")
            #~ print hex(a),GetDisasm(a)
            tryMakeSub(d,a)
    print "#############################"
    print "#    DONE ANALYZING PUSH    #"
    print "#############################"            
            
            

def analyze_bl(d):
    print "---------------"
    print "Analyzing BL..."
    print "---------------"
    select_dump(d)
    progress("Analyzing BL calls...")
    for a in range(d.minaddr, d.maxaddr, 4):
        progress(float(a - d.minaddr) / (d.maxaddr - d.minaddr))
        try: ins = d.DISASM.get(a,"").split("\t")[2]
        except: continue
        if ins.startswith("bl"):
            if GetMnem(a) == "BL":
                progress("Analyzing BL calls...")
                #~ print hex(a),GetDisasm(a)
                sub = bkt.subaddr_bl(a)
                if sub: 
                    tryMakeSub(d,sub)
                    assert (a,sub) in d.REFLIST
    print "###########################"
    print "#    DONE ANALYZING BL    #"
    print "###########################"


def analyze_bx(d):
    print "---------------"
    print "Analyzing BX..."
    print "---------------"
    select_dump(d)
    progress("Analyzing BX calls...")
    for a in range(d.minaddr, d.maxaddr, 4):
        progress(float(a - d.minaddr) / (d.maxaddr - d.minaddr))
        try: ins = d.DISASM.get(a,"").split("\t")[2]
        except: continue
        if ins.startswith("bx"):
            assert GetMnem(a) == "BX"
            progress("Analyzing BX calls...")
            print hex(a),GetDisasm(a)
            try: sub = bkt.subaddr_bx(a)
            except: sub = None
            if sub: 
                tryMakeSub(d,sub)
                if (a,sub) not in d.REFLIST:
                    d.AddRef(a, sub)
    print "###########################"
    print "#    DONE ANALYZING BX    #"
    print "###########################"
    

def analyze_b(d):
    select_dump(d)
    for a in range(d.minaddr, d.maxaddr, 4):
        if GetMnem(a) == "B":
            print hex(a),GetDisasm(a)
            

def analyze_names(d):
    print "------------------"
    print "Analyzing names..."
    print "------------------"
    select_dump(d)
    progress("Analyzing loaded names...")
    for i,n in enumerate(d.N2A.keys()):
        progress(float(i) / len(d.N2A))
        a = d.N2A[n]
        if not GuessString(d, a):
            progress("Analyzing loaded names...")
            tryMakeSub(d,a)
    print "######################################"
    print "#    DONE ANALYZING LOADING NAMES    #"
    print "######################################"


#~ def init_funcs(d):
    #~ for a in range(598452, 598492, 20):
        #~ sub = d.ROM[a+4]
        #~ print sub
        #~ tryMakeSub(d,sub)
        
#~ @profile
#def run_redir(d):
#    select_dump(d)
#    analyze_names(d)
#    analyze_bx(d)
#    analyze_bl(d)
    #~ analyze_b(d)
#    analyze_push(d)
    
def run(d):
    t1 = time.time()
    select_dump(d)
    po = Pool()
    names = po.apply_async(analyze_names, (d,))
    print names.get()
    bx = po.apply_async(analyze_bx, (d,))
    print bx.get()  
    bl = po.apply_async(analyze_bl, (d,))
    print bl.get() 
    push = po.apply_async(analyze_push, (d,))
    print push.get()
    t2 = time.time()
    print "Total time for this operation:"
    print t2 - t1
    po.close()
    po.join()
    
def run_slow(d):
    t1 = time.time()
    select_dump(d)
    analyze_names(d)
    analyze_bx(d)
    analyze_bl(d)
    #~ analyze_b(d)
    analyze_push(d)
    t2 = time.time()
    print "Total time for this operation:"
    print t2 - t1