Saturday, October 15, 2022

Detecting Obfuscated JavaScripts



In this post we will review a python machine learning implementation based on the article Detecting Obfuscated JavaScripts from Known and Unknown Obfuscators using Machine Learning.


Data Collection

The first step is to collect the javascripts from some of the most popular sites. We download the top 1000 popular sites from https://dataforseo.com, using the following curl:


curl 'https://dataforseo.com/wp-admin/admin-ajax.php' \
-H 'authority: dataforseo.com' \
-H 'accept: application/json, text/javascript, */*; q=0.01' \
-H 'accept-language: en-US,en;q=0.9,he;q=0.8,fr;q=0.7' \
-H 'content-type: application/x-www-form-urlencoded; charset=UTF-8' \
-H 'cookie: PHPSESSID=hqg1mr3lrcodbrujnddpfv0acv; _gcl_au=1.1.932766159.1664772134; referrer=https://www.google.com/; _gid=GA1.2.350097184.1664772135; _lfa=LF1.1.9259cece6f47bcdb.1664772134834; cae45c4ea51njjp04o0dacqap3-agile-crm-guid=86bf2470-40ff-6e95-0f29-905636c53559; cae45c4ea51njjp04o0dacqap3-agile-original-referrer=https%3A//www.google.com/; cae45c4ea51njjp04o0dacqap3-agile-crm-session_id=48d757a8-f09c-bb2b-4168-7272ecbbd6f7; cae45c4ea51njjp04o0dacqap3-agile-crm-session_start_time=14; _aimtellSubscriberID=b81e9d16-592b-ff27-9a09-1934dadd04c6; cae45c4ea51njjp04o0dacqap3-agile-session-webrules_v2=%7B%26%2334%3Brule_id%26%2334%3B%3A5120774913982464%2C%26%2334%3Bcount%26%2334%3B%3A1%2C%26%2334%3Btime%26%2334%3B%3A1664772136776%7D; intercom-id-yhwl2kwv=cd0629b2-2766-4925-814e-36baf817ef57; intercom-session-yhwl2kwv=; _gat=1; _ga_T5NKP5Y695=GS1.1.1664772134.1.1.1664772624.59.0.0; _ga=GA1.1.1433352343.1664772135; _uetsid=c0cc940042d511ed9b67d1852d41bc8d; _uetvid=c0cc95d042d511eda56a27dc9895ce0f' \
-H 'origin: https://dataforseo.com' \
-H 'referer: https://dataforseo.com/top-1000-websites' \
-H 'sec-ch-ua: "Chromium";v="106", "Google Chrome";v="106", "Not;A=Brand";v="99"' \
-H 'sec-ch-ua-mobile: ?0' \
-H 'sec-ch-ua-platform: "Linux"' \
-H 'sec-fetch-dest: empty' \
-H 'sec-fetch-mode: cors' \
-H 'sec-fetch-site: same-origin' \
-H 'user-agent: Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36' \
-H 'x-requested-with: XMLHttpRequest' \
--data-raw 'action=dfs_ranked_domains&location=0' \
--compressed > sites.json


Next, from each site we download the javascripts referenced from the site landing page. This is done using the beautiful soup library.


import json
import os.path
import pathlib
import shutil
from multiprocessing import Pool

import bs4
import requests

from src.common import ROOT_FOLDER


def send_request(url):
agent = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36'
headers = {
'User-Agent': agent,
}
page = requests.get(url, headers=headers)
if page.status_code != 200:
error_page = page.content
error_page = error_page.decode('utf-8')
raise Exception('{} failed code is {}: {}'.format(url, page.status_code, error_page))

data = page.content
data = data.decode('utf-8')
return data


def get_domain_folder(domain):
return ROOT_FOLDER + '/sites/' + domain


def process_script(domain, script, script_index):
if script.has_attr('src'):
src = script['src']
if not src.startswith('http'):
src = 'https://{}{}'.format(domain, src)
print('download script {}'.format(src))
data = send_request(src)
else:
data = script.getText()

output_path = '{}/{}.js'.format(get_domain_folder(domain), script_index)
with open(output_path, 'w') as file:
file.write(data)


def process_site(domain):
domain_folder = get_domain_folder(domain)
site_complete_indication = domain_folder + '/complete.txt'
if os.path.exists(site_complete_indication):
print('site {} already done'.format(domain))
return

if os.path.exists(domain_folder):
shutil.rmtree(domain_folder)
os.mkdir(domain_folder)

try:
data = send_request('https://' + domain)
except Exception as e:
print('domain {} access failed: {}'.format(domain, e))
return

site = bs4.BeautifulSoup(data, 'html.parser')

success = 0
failed = 0
for i, script in enumerate(site.findAll('script')):
try:
process_script(domain, script, i)
success += 1
except Exception as e:
print(e)
failed += 1

with open(site_complete_indication, 'w') as file:
file.write('success {}\nfailed {}'.format(success, failed))


def process_site_thread(site_tuple):
site_index, site = site_tuple
domain = site['domain']
print('process site {}: {}'.format(site_index, domain))
process_site(domain)


def main():
print('loading sites')

pathlib.Path(ROOT_FOLDER + '/sites').mkdir(parents=True, exist_ok=True)

with open(ROOT_FOLDER + '/sites.json', 'r') as file:
sites_json = json.load(file)

sites_tuples = list(enumerate(sites_json))
with Pool(20) as pool:
pool.map(process_site_thread, sites_tuples)


main()


Data Preparation 


Having list of scripts for each site, we merge all the scripts into one folder, and remove the duplicates.



import hashlib
import os
import pathlib

from src.common import ROOT_FOLDER


def main():
pathlib.Path(ROOT_FOLDER + '/scripts').mkdir(parents=True, exist_ok=True)
hashes = {}
output_counter = 0
scripts_counter = 0
duplicates_counter = 0
for site in os.walk(ROOT_FOLDER + '/sites'):
site_path = site[0]
files = site[2]
for site_file in files:
script_path = '{}/{}'.format(site_path, site_file)
if not script_path.endswith('.js'):
continue

scripts_counter += 1
print('{}: {}'.format(scripts_counter, script_path))
with open(script_path, 'r') as file:
data = file.read()

data = data.strip()
if len(data) < 1000 or data.startswith('{') or data.startswith('<'):
continue

script_hash = hashlib.sha256(data.encode('utf-8')).hexdigest()
if script_hash in hashes:
duplicates_counter += 1
else:
hashes[script_hash] = True
output_counter += 1
output_path = ROOT_FOLDER + '/scripts/{}.js'.format(output_counter)
with open(output_path, 'w') as file:
file.write(data)

print('scripts {} duplicates {}'.format(scripts_counter, duplicates_counter))


main()


Once we have one folder with all the scripts, we can obfuscate them using different obfuscators. In the previous post we have been Using Online Obfuscatation for Multiple Files. In addition, we use the webpack obfuscator:


import os
import pathlib
import subprocess
from multiprocessing import Pool

from src.common import ROOT_FOLDER


def obfuscate(entry):
input_path, output_path = entry
stdout = subprocess.check_output([
'javascript-obfuscator',
input_path,
'--output',
output_path,
])
if len(stdout) > 0:
print(stdout)


def main():
os.environ["PATH"] += os.pathsep + '~/.nvm/versions/node/v18.3.0/bin'
output_folder = ROOT_FOLDER + '/obfuscated_webpack'
scripts_folder = ROOT_FOLDER + '/scripts'
pathlib.Path(output_folder).mkdir(parents=True, exist_ok=True)
jobs = []
for _, _, files_names in os.walk(scripts_folder):
for i, file_name in enumerate(sorted(files_names)):
file_path = scripts_folder + '/' + file_name
output_path = output_folder + '/' + file_name
entry = file_path, output_path
jobs.append(entry)

with Pool(6) as pool:
pool.map(obfuscate, jobs)


main()


Features Extraction


Now that we have the original javascripts folder, in addition to 3 obfuscated folders, we can extract features for each javascript file, and save the features into a csv file.



import csv
import os
import re
from collections import Counter
from math import log
from multiprocessing import Pool

import tqdm as tqdm

from src.common import ROOT_FOLDER


class Extractor:
def __init__(self):
self.csv_lines = []

def extract_folder(self, folder_path):
print('extracting folder {}'.format(folder_path))
files_paths = []
for _, _, files_names in os.walk(folder_path):
for file in files_names:
files_paths.append(folder_path + '/' + file)

with Pool(7) as pool:
for result in tqdm.tqdm(pool.imap_unordered(extract_file, files_paths), total=len(files_paths)):
if result is not None:
self.csv_lines.append(result)

def save_csv(self, file_path):
header = get_header()

self.csv_lines.insert(0, header)

with open(file_path, 'w') as file:
writer = csv.writer(file)
writer.writerows(self.csv_lines)

print('csv ready')


def extract_file(file_path):
with open(file_path, 'r') as file:
data = file.read()

data = data.strip()
if len(data) < 1000:
return

data = data.lower()

if 'looks like a html code, please use gui' in data:
return

words = re.split('[^a-z]', data)
words = list(filter(None, words))
if len(words) == 0:
return

backslash_ratio = data.count('/n') / len(data)
space_ratio = data.count(' ') / len(data)
bracket_ratio = data.count('[') / len(data)
hex_count = max(
len(re.findall('x[0-9a-f]{4}', data)),
data.count('\\x')
)
hex_ratio = hex_count / len(words)
unicode_ratio = data.count('\\u') / len(words)

chars_in_comment = 0
long_lines = 0
lines = data.split('\n')
not_empty_lines_counter = 0
for line in lines:
line = line.strip()
if line.startswith('//'):
chars_in_comment += len(line)
if len(line) > 1000:
long_lines += 1
if len(line) > 0:
not_empty_lines_counter += 1
chars_in_comment_share = chars_in_comment / not_empty_lines_counter
chars_per_line = len(data) / not_empty_lines_counter

if_share = words.count('if') / len(words)
false_share = words.count('false') / len(words)
true_share = words.count('true') / len(words)
return_share = words.count('return') / len(words)
var_share = words.count('var') / len(words)
tostring_share = words.count('tostring') / len(words)
this_share = words.count('this') / len(words)
else_share = words.count('else') / len(words)
null_share = words.count('null') / len(words)
special_words = [
'eval',
'unescape',
'fromcharcode',
'charcodeat',
'window',
'document',
'string',
'array',
'object',
]

special_count = 0
for special_word in special_words:
special_count += words.count(special_word)
special_share = special_count / len(words)

return [
file_path,
backslash_ratio,
chars_in_comment_share,
if_share,
special_share,
long_lines,
false_share,
hex_ratio,
unicode_ratio,
space_ratio,
true_share,
bracket_ratio,
return_share,
var_share,
tostring_share,
this_share,
else_share,
null_share,
chars_per_line,
shannon(data),
]


def shannon(string):
counts = Counter(string)
frequencies = ((i / len(string)) for i in counts.values())
return - sum(f * log(f, 2) for f in frequencies)


def get_header():
return [
'file_path',
'backslash_ratio',
'chars_in_comment_share',
'if_share',
'special_share',
'long_lines',
'false_share',
'hex_ratio',
'unicode_ratio',
'space_ratio',
'true_share',
'bracket_ratio',
'return_share',
'var_share',
'tostring_share',
'this_share',
'else_share',
'null_share',
'chars_per_line',
'shannon',
]


def main():
extractor = Extractor()
extractor.extract_folder(ROOT_FOLDER + '/obfuscated_webpack')
extractor.extract_folder(ROOT_FOLDER + '/scripts')
extractor.extract_folder(ROOT_FOLDER + '/obfuscated_draftlogic')
extractor.extract_folder(ROOT_FOLDER + '/obfuscated_javascriptobfuscator')

extractor.save_csv(ROOT_FOLDER + '/features.csv')


if __name__ == '__main__':
main()


Machine Learning


The last step is to run a random forest for the features.csv, and create a model that will be used to identify whether scripts are obfuscated.


import joblib
import numpy
import numpy as np
import pandas
import pandas as pd
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split

from src.common import ROOT_FOLDER
from src.features_extract import extract_file, get_header

pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)


def load_csv(csv_path):
print('load CSV')

df = pd.read_csv(csv_path)
print(df.head(5))
return df


def build_forest():
df = load_csv(ROOT_FOLDER + '/features.csv')
print('split to training and test')
df['file_path'] = df['file_path'].apply(lambda x: 1 if 'obfuscated' in x else 0)
labels = np.array(df['file_path'])

features = df.drop('file_path', axis=1)
feature_list = list(features.columns)
features = np.array(features)
train_features, test_features, train_labels, test_labels = \
train_test_split(features,
labels,
test_size=0.25,
random_state=42,
)
print('training features shape {} labels shape {}'.format(
train_features.shape, train_labels.shape))
print('test features shape {} labels shape {}'.format(
test_features.shape, test_labels.shape))

print('random forest classifier training')

forest = RandomForestRegressor(n_estimators=100, random_state=42, verbose=2, n_jobs=-2)
forest.fit(train_features, train_labels)

print('random forest predictions')
predictions = forest.predict(test_features)

prediction_threshold = 0.5
predictions[predictions < prediction_threshold] = 0
predictions[predictions >= prediction_threshold] = 1

prediction_errors = predictions - test_labels
print('error for test {}'.format(
round(np.mean(abs(prediction_errors)), 3), 'degrees.'))

print('importance of each feature')

importances = list(forest.feature_importances_)
feature_importances = [(feature, round(importance, 2)) for feature, importance in
zip(feature_list, importances)]
feature_importances = sorted(feature_importances, key=lambda x: x[1], reverse=True)
for pair in feature_importances:
print('variable: {} Importance: {}'.format(*pair))

print('confusion matrix')

joined = np.stack((predictions, test_labels), axis=1)
tp = joined[np.where(
(joined[:, 0] == 1) *
(joined[:, 1] == 1)
)]
tn = joined[np.where(
(joined[:, 0] == 0) *
(joined[:, 1] == 0)
)]
fp = joined[np.where(
(joined[:, 0] == 1) *
(joined[:, 1] == 0)
)]
fn = joined[np.where(
(joined[:, 0] == 0) *
(joined[:, 1] == 1)
)]
print('true positive {}'.format(np.shape(tp)[0]))
print('true negative {}'.format(np.shape(tn)[0]))
print('false positive {}'.format(np.shape(fp)[0]))
print('false negative {}'.format(np.shape(fn)[0]))

joblib.dump(forest, ROOT_FOLDER + '/random_forest.joblib')


def load_forest():
forest = joblib.load(ROOT_FOLDER + '/random_forest.joblib')

df = load_csv(ROOT_FOLDER + '/features.csv')
print('split to training and test')
keep_name = df['file_path']
df['file_path'] = df['file_path'].apply(lambda x: 1 if 'obfuscated' in x else 0)
labels = np.array(df['file_path'])

features = df.drop('file_path', axis=1)

predictions = forest.predict(features)
prediction_threshold = 0.5
predictions[predictions < prediction_threshold] = 0
predictions[predictions >= prediction_threshold] = 1
errors = 0
for ndarray_index, y in numpy.ndenumerate(predictions):
label = labels[ndarray_index]
prediction = predictions[ndarray_index]
if label != prediction:
errors += 1
row = ndarray_index[0]
print('file {} row {}'.format(keep_name[row], row))
print('errors', errors)


def analyze_new_script(file_path):
forest = joblib.load(ROOT_FOLDER + '/random_forest.joblib')
forest.verbose = 0

rows = [extract_file((file_path, True))]
df = pandas.DataFrame(rows, columns=get_header())
features = df.drop('file_path', axis=1)

print(features)
predictions = forest.predict(features.values)
prediction = predictions[0]
print(prediction)
if prediction > 0.5:
print('this is obfuscated')
else:
print('not obfuscated')


build_forest()
load_forest()
analyze_new_script(ROOT_FOLDER + '/scripts/1.js')
analyze_new_script(ROOT_FOLDER + '/obfuscated_javascriptobfuscator/1.js')
analyze_new_script(ROOT_FOLDER + '/obfuscated_draftlogic/1.js')


Final Note

The performance of the random forest model is ~1% of false negatives and false positives, hence we can fell pretty good in using it for our need.



No comments:

Post a Comment