In this post we will review a python machine learning implementation based on the article Detecting Obfuscated JavaScripts from Known and Unknown Obfuscators using Machine Learning.
Data Collection
The first step is to collect the javascripts from some of the most popular sites. We download the top 1000 popular sites from https://dataforseo.com, using the following curl:
curl 'https://dataforseo.com/wp-admin/admin-ajax.php' \
-H 'authority: dataforseo.com' \
-H 'accept: application/json, text/javascript, */*; q=0.01' \
-H 'accept-language: en-US,en;q=0.9,he;q=0.8,fr;q=0.7' \
-H 'content-type: application/x-www-form-urlencoded; charset=UTF-8' \
-H 'cookie: PHPSESSID=hqg1mr3lrcodbrujnddpfv0acv; _gcl_au=1.1.932766159.1664772134; referrer=https://www.google.com/; _gid=GA1.2.350097184.1664772135; _lfa=LF1.1.9259cece6f47bcdb.1664772134834; cae45c4ea51njjp04o0dacqap3-agile-crm-guid=86bf2470-40ff-6e95-0f29-905636c53559; cae45c4ea51njjp04o0dacqap3-agile-original-referrer=https%3A//www.google.com/; cae45c4ea51njjp04o0dacqap3-agile-crm-session_id=48d757a8-f09c-bb2b-4168-7272ecbbd6f7; cae45c4ea51njjp04o0dacqap3-agile-crm-session_start_time=14; _aimtellSubscriberID=b81e9d16-592b-ff27-9a09-1934dadd04c6; cae45c4ea51njjp04o0dacqap3-agile-session-webrules_v2=%7B%26%2334%3Brule_id%26%2334%3B%3A5120774913982464%2C%26%2334%3Bcount%26%2334%3B%3A1%2C%26%2334%3Btime%26%2334%3B%3A1664772136776%7D; intercom-id-yhwl2kwv=cd0629b2-2766-4925-814e-36baf817ef57; intercom-session-yhwl2kwv=; _gat=1; _ga_T5NKP5Y695=GS1.1.1664772134.1.1.1664772624.59.0.0; _ga=GA1.1.1433352343.1664772135; _uetsid=c0cc940042d511ed9b67d1852d41bc8d; _uetvid=c0cc95d042d511eda56a27dc9895ce0f' \
-H 'origin: https://dataforseo.com' \
-H 'referer: https://dataforseo.com/top-1000-websites' \
-H 'sec-ch-ua: "Chromium";v="106", "Google Chrome";v="106", "Not;A=Brand";v="99"' \
-H 'sec-ch-ua-mobile: ?0' \
-H 'sec-ch-ua-platform: "Linux"' \
-H 'sec-fetch-dest: empty' \
-H 'sec-fetch-mode: cors' \
-H 'sec-fetch-site: same-origin' \
-H 'user-agent: Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36' \
-H 'x-requested-with: XMLHttpRequest' \
--data-raw 'action=dfs_ranked_domains&location=0' \
--compressed > sites.json
Next, from each site we download the javascripts referenced from the site landing page. This is done using the beautiful soup library.
import json
import os.path
import pathlib
import shutil
from multiprocessing import Pool
import bs4
import requests
from src.common import ROOT_FOLDER
def send_request(url):
agent = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36'
headers = {
'User-Agent': agent,
}
page = requests.get(url, headers=headers)
if page.status_code != 200:
error_page = page.content
error_page = error_page.decode('utf-8')
raise Exception('{} failed code is {}: {}'.format(url, page.status_code, error_page))
data = page.content
data = data.decode('utf-8')
return data
def get_domain_folder(domain):
return ROOT_FOLDER + '/sites/' + domain
def process_script(domain, script, script_index):
if script.has_attr('src'):
src = script['src']
if not src.startswith('http'):
src = 'https://{}{}'.format(domain, src)
print('download script {}'.format(src))
data = send_request(src)
else:
data = script.getText()
output_path = '{}/{}.js'.format(get_domain_folder(domain), script_index)
with open(output_path, 'w') as file:
file.write(data)
def process_site(domain):
domain_folder = get_domain_folder(domain)
site_complete_indication = domain_folder + '/complete.txt'
if os.path.exists(site_complete_indication):
print('site {} already done'.format(domain))
return
if os.path.exists(domain_folder):
shutil.rmtree(domain_folder)
os.mkdir(domain_folder)
try:
data = send_request('https://' + domain)
except Exception as e:
print('domain {} access failed: {}'.format(domain, e))
return
site = bs4.BeautifulSoup(data, 'html.parser')
success = 0
failed = 0
for i, script in enumerate(site.findAll('script')):
try:
process_script(domain, script, i)
success += 1
except Exception as e:
print(e)
failed += 1
with open(site_complete_indication, 'w') as file:
file.write('success {}\nfailed {}'.format(success, failed))
def process_site_thread(site_tuple):
site_index, site = site_tuple
domain = site['domain']
print('process site {}: {}'.format(site_index, domain))
process_site(domain)
def main():
print('loading sites')
pathlib.Path(ROOT_FOLDER + '/sites').mkdir(parents=True, exist_ok=True)
with open(ROOT_FOLDER + '/sites.json', 'r') as file:
sites_json = json.load(file)
sites_tuples = list(enumerate(sites_json))
with Pool(20) as pool:
pool.map(process_site_thread, sites_tuples)
main()
Data Preparation
Having list of scripts for each site, we merge all the scripts into one folder, and remove the duplicates.
import hashlib
import os
import pathlib
from src.common import ROOT_FOLDER
def main():
pathlib.Path(ROOT_FOLDER + '/scripts').mkdir(parents=True, exist_ok=True)
hashes = {}
output_counter = 0
scripts_counter = 0
duplicates_counter = 0
for site in os.walk(ROOT_FOLDER + '/sites'):
site_path = site[0]
files = site[2]
for site_file in files:
script_path = '{}/{}'.format(site_path, site_file)
if not script_path.endswith('.js'):
continue
scripts_counter += 1
print('{}: {}'.format(scripts_counter, script_path))
with open(script_path, 'r') as file:
data = file.read()
data = data.strip()
if len(data) < 1000 or data.startswith('{') or data.startswith('<'):
continue
script_hash = hashlib.sha256(data.encode('utf-8')).hexdigest()
if script_hash in hashes:
duplicates_counter += 1
else:
hashes[script_hash] = True
output_counter += 1
output_path = ROOT_FOLDER + '/scripts/{}.js'.format(output_counter)
with open(output_path, 'w') as file:
file.write(data)
print('scripts {} duplicates {}'.format(scripts_counter, duplicates_counter))
main()
Once we have one folder with all the scripts, we can obfuscate them using different obfuscators. In the previous post we have been Using Online Obfuscatation for Multiple Files. In addition, we use the webpack obfuscator:
import os
import pathlib
import subprocess
from multiprocessing import Pool
from src.common import ROOT_FOLDER
def obfuscate(entry):
input_path, output_path = entry
stdout = subprocess.check_output([
'javascript-obfuscator',
input_path,
'--output',
output_path,
])
if len(stdout) > 0:
print(stdout)
def main():
os.environ["PATH"] += os.pathsep + '~/.nvm/versions/node/v18.3.0/bin'
output_folder = ROOT_FOLDER + '/obfuscated_webpack'
scripts_folder = ROOT_FOLDER + '/scripts'
pathlib.Path(output_folder).mkdir(parents=True, exist_ok=True)
jobs = []
for _, _, files_names in os.walk(scripts_folder):
for i, file_name in enumerate(sorted(files_names)):
file_path = scripts_folder + '/' + file_name
output_path = output_folder + '/' + file_name
entry = file_path, output_path
jobs.append(entry)
with Pool(6) as pool:
pool.map(obfuscate, jobs)
main()
Features Extraction
Now that we have the original javascripts folder, in addition to 3 obfuscated folders, we can extract features for each javascript file, and save the features into a csv file.
import csv
import os
import re
from collections import Counter
from math import log
from multiprocessing import Pool
import tqdm as tqdm
from src.common import ROOT_FOLDER
class Extractor:
def __init__(self):
self.csv_lines = []
def extract_folder(self, folder_path):
print('extracting folder {}'.format(folder_path))
files_paths = []
for _, _, files_names in os.walk(folder_path):
for file in files_names:
files_paths.append(folder_path + '/' + file)
with Pool(7) as pool:
for result in tqdm.tqdm(pool.imap_unordered(extract_file, files_paths), total=len(files_paths)):
if result is not None:
self.csv_lines.append(result)
def save_csv(self, file_path):
header = get_header()
self.csv_lines.insert(0, header)
with open(file_path, 'w') as file:
writer = csv.writer(file)
writer.writerows(self.csv_lines)
print('csv ready')
def extract_file(file_path):
with open(file_path, 'r') as file:
data = file.read()
data = data.strip()
if len(data) < 1000:
return
data = data.lower()
if 'looks like a html code, please use gui' in data:
return
words = re.split('[^a-z]', data)
words = list(filter(None, words))
if len(words) == 0:
return
backslash_ratio = data.count('/n') / len(data)
space_ratio = data.count(' ') / len(data)
bracket_ratio = data.count('[') / len(data)
hex_count = max(
len(re.findall('x[0-9a-f]{4}', data)),
data.count('\\x')
)
hex_ratio = hex_count / len(words)
unicode_ratio = data.count('\\u') / len(words)
chars_in_comment = 0
long_lines = 0
lines = data.split('\n')
not_empty_lines_counter = 0
for line in lines:
line = line.strip()
if line.startswith('//'):
chars_in_comment += len(line)
if len(line) > 1000:
long_lines += 1
if len(line) > 0:
not_empty_lines_counter += 1
chars_in_comment_share = chars_in_comment / not_empty_lines_counter
chars_per_line = len(data) / not_empty_lines_counter
if_share = words.count('if') / len(words)
false_share = words.count('false') / len(words)
true_share = words.count('true') / len(words)
return_share = words.count('return') / len(words)
var_share = words.count('var') / len(words)
tostring_share = words.count('tostring') / len(words)
this_share = words.count('this') / len(words)
else_share = words.count('else') / len(words)
null_share = words.count('null') / len(words)
special_words = [
'eval',
'unescape',
'fromcharcode',
'charcodeat',
'window',
'document',
'string',
'array',
'object',
]
special_count = 0
for special_word in special_words:
special_count += words.count(special_word)
special_share = special_count / len(words)
return [
file_path,
backslash_ratio,
chars_in_comment_share,
if_share,
special_share,
long_lines,
false_share,
hex_ratio,
unicode_ratio,
space_ratio,
true_share,
bracket_ratio,
return_share,
var_share,
tostring_share,
this_share,
else_share,
null_share,
chars_per_line,
shannon(data),
]
def shannon(string):
counts = Counter(string)
frequencies = ((i / len(string)) for i in counts.values())
return - sum(f * log(f, 2) for f in frequencies)
def get_header():
return [
'file_path',
'backslash_ratio',
'chars_in_comment_share',
'if_share',
'special_share',
'long_lines',
'false_share',
'hex_ratio',
'unicode_ratio',
'space_ratio',
'true_share',
'bracket_ratio',
'return_share',
'var_share',
'tostring_share',
'this_share',
'else_share',
'null_share',
'chars_per_line',
'shannon',
]
def main():
extractor = Extractor()
extractor.extract_folder(ROOT_FOLDER + '/obfuscated_webpack')
extractor.extract_folder(ROOT_FOLDER + '/scripts')
extractor.extract_folder(ROOT_FOLDER + '/obfuscated_draftlogic')
extractor.extract_folder(ROOT_FOLDER + '/obfuscated_javascriptobfuscator')
extractor.save_csv(ROOT_FOLDER + '/features.csv')
if __name__ == '__main__':
main()
Machine Learning
The last step is to run a random forest for the features.csv, and create a model that will be used to identify whether scripts are obfuscated.
import joblib
import numpy
import numpy as np
import pandas
import pandas as pd
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from src.common import ROOT_FOLDER
from src.features_extract import extract_file, get_header
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)
def load_csv(csv_path):
print('load CSV')
df = pd.read_csv(csv_path)
print(df.head(5))
return df
def build_forest():
df = load_csv(ROOT_FOLDER + '/features.csv')
print('split to training and test')
df['file_path'] = df['file_path'].apply(lambda x: 1 if 'obfuscated' in x else 0)
labels = np.array(df['file_path'])
features = df.drop('file_path', axis=1)
feature_list = list(features.columns)
features = np.array(features)
train_features, test_features, train_labels, test_labels = \
train_test_split(features,
labels,
test_size=0.25,
random_state=42,
)
print('training features shape {} labels shape {}'.format(
train_features.shape, train_labels.shape))
print('test features shape {} labels shape {}'.format(
test_features.shape, test_labels.shape))
print('random forest classifier training')
forest = RandomForestRegressor(n_estimators=100, random_state=42, verbose=2, n_jobs=-2)
forest.fit(train_features, train_labels)
print('random forest predictions')
predictions = forest.predict(test_features)
prediction_threshold = 0.5
predictions[predictions < prediction_threshold] = 0
predictions[predictions >= prediction_threshold] = 1
prediction_errors = predictions - test_labels
print('error for test {}'.format(
round(np.mean(abs(prediction_errors)), 3), 'degrees.'))
print('importance of each feature')
importances = list(forest.feature_importances_)
feature_importances = [(feature, round(importance, 2)) for feature, importance in
zip(feature_list, importances)]
feature_importances = sorted(feature_importances, key=lambda x: x[1], reverse=True)
for pair in feature_importances:
print('variable: {} Importance: {}'.format(*pair))
print('confusion matrix')
joined = np.stack((predictions, test_labels), axis=1)
tp = joined[np.where(
(joined[:, 0] == 1) *
(joined[:, 1] == 1)
)]
tn = joined[np.where(
(joined[:, 0] == 0) *
(joined[:, 1] == 0)
)]
fp = joined[np.where(
(joined[:, 0] == 1) *
(joined[:, 1] == 0)
)]
fn = joined[np.where(
(joined[:, 0] == 0) *
(joined[:, 1] == 1)
)]
print('true positive {}'.format(np.shape(tp)[0]))
print('true negative {}'.format(np.shape(tn)[0]))
print('false positive {}'.format(np.shape(fp)[0]))
print('false negative {}'.format(np.shape(fn)[0]))
joblib.dump(forest, ROOT_FOLDER + '/random_forest.joblib')
def load_forest():
forest = joblib.load(ROOT_FOLDER + '/random_forest.joblib')
df = load_csv(ROOT_FOLDER + '/features.csv')
print('split to training and test')
keep_name = df['file_path']
df['file_path'] = df['file_path'].apply(lambda x: 1 if 'obfuscated' in x else 0)
labels = np.array(df['file_path'])
features = df.drop('file_path', axis=1)
predictions = forest.predict(features)
prediction_threshold = 0.5
predictions[predictions < prediction_threshold] = 0
predictions[predictions >= prediction_threshold] = 1
errors = 0
for ndarray_index, y in numpy.ndenumerate(predictions):
label = labels[ndarray_index]
prediction = predictions[ndarray_index]
if label != prediction:
errors += 1
row = ndarray_index[0]
print('file {} row {}'.format(keep_name[row], row))
print('errors', errors)
def analyze_new_script(file_path):
forest = joblib.load(ROOT_FOLDER + '/random_forest.joblib')
forest.verbose = 0
rows = [extract_file((file_path, True))]
df = pandas.DataFrame(rows, columns=get_header())
features = df.drop('file_path', axis=1)
print(features)
predictions = forest.predict(features.values)
prediction = predictions[0]
print(prediction)
if prediction > 0.5:
print('this is obfuscated')
else:
print('not obfuscated')
build_forest()
load_forest()
analyze_new_script(ROOT_FOLDER + '/scripts/1.js')
analyze_new_script(ROOT_FOLDER + '/obfuscated_javascriptobfuscator/1.js')
analyze_new_script(ROOT_FOLDER + '/obfuscated_draftlogic/1.js')
Final Note
The performance of the random forest model is ~1% of false negatives and false positives, hence we can fell pretty good in using it for our need.