import random import sys import os import time import json import io from collections import defaultdict # For info on how to use this file on a slurm server, see ijcai16_experiments.py # For any errors or help customizing this for your use, email Jeffrey Dudek at jmd11@rice.edu # Run the experiment using the provided dictionary of input parameters and write results to output def run_experiment(param, output): param['date'] = time.strftime("%y%m%d") if 'trials' not in param: print "Must specify 'trials': the number of random formulas to generate and satisfy" return if 'nvars' not in param: print "Must specify 'nvars': the number of variables to use" return if 'cnf_density' not in param and 'num_cnf' not in param: print "Must specify 'cnf_density': the desired ratio (# of cnf formulas) / (# of variables)" return if 'xor_density' not in param and 'num_xor' not in param: print "Must specify 'xor_density': the desired ratio (# of xor formulas) / (# of variables)" return trials = int(param['trials']) nvars = int(param['nvars']) if 'cnf_density' in param: num_cnf = int(nvars * float(param['cnf_density'])) else: num_cnf = int(param['num_cnf']) if 'xor_density' in param: num_xor = int(nvars * float(param['xor_density'])) else: num_xor = int(param['num_xor']) debug = ('debug' in param) variable_list = range(1, nvars+1) # Determine how to generate CNF clauses in the formula if 'cnf_length' in param: generate_cnf = Clause.generator(pick_vars_fixed_length(variable_list, int(param['cnf_length'])), literals_cnf, '') elif 'cnf_prob' in param: generate_cnf = Clause.generator(pick_vars_variable_length(variable_list, float(param['cnf_prob'])), literals_cnf, '') else: print "Specify 'cnf_length' to use fixed length cnf clauses or 'cnf_prob' to use variable length cnf clauses" return # Determine how to generate XOR clauses in the formula if 'xor_length' in param: generate_xor = Clause.generator(pick_vars_fixed_length(variable_list, int(param['xor_length'])), literals_xor, 'x') elif 'xor_prob' in param: generate_xor = Clause.generator(pick_vars_variable_length(variable_list, float(param['xor_prob'])), literals_xor, 'x') else: print "Specify 'xor_length' to use fixed length xor clauses or 'xor_prob' to use variable length xor clauses" return # Determine how to generate the entire formula generate_formula = generator_formula(num_cnf, generate_cnf, num_xor, generate_xor) data = [] print "Running " + str(trials) + " trials." i = 0 for i in xrange(trials): formula = generate_formula() if debug: return formula result = solve(param, formula) data.append(result) if result[0] == 2 and 'timeout_halt' in param and param['timeout_halt'] == 'true': break print "Ran " + str(i+1) + " trials" # Log the data output.write(json.dumps(param) + '\n') output.write('\n'.join(map(str, data)) + '\n') # Generate a fixed length list of variables def pick_vars_fixed_length(variable_list, length): def generate(): # choose a random sample of 'length' variables return random.sample(variable_list, length) return generate # Generate a list of variables where each variable occurs with a fixed probability 'prob' def pick_vars_variable_length(variable_list, prob): def generate(): if prob <= 0: return [] if prob >= 1: return variable_list # randomly choose each variable with probability 'prob' clause = [] for j in variable_list: if random.random() < prob: clause.append(j) return clause return generate # Convert a list of variables to a monotone CNF clause def literals_true(clause): # do not negate the clause return clause # Convert a list of variables to a random CNF clause def literals_cnf(clause): # randomly negate each variable with probability 1/2 return [random.choice([x, -x]) for x in clause] # Convert a list of variables to a random XOR clause def literals_xor(clause): # randomly negate the first variable with probability 1/2 c = list(clause) if len(c) == 0: return c c[0] = random.choice([c[0], -c[0]]) return c # Generate a random formula containing the specified numbers of CNF and XOR clauses def generator_formula(num_cnf, gen_cnf, num_xor, gen_xor): def generator(): clauses = [] for i in xrange(num_cnf): clauses.append(gen_cnf()) for i in xrange(num_xor): clauses.append(gen_xor()) return clauses return generator # A class for a single clause in a formula, containing info on how to write it for CryptoMiniSat class Clause: def __init__(self, indicator, literals): self.indicator = indicator self.literals = literals def __str__(self): line = ' '.join(map(str, self.literals)) return self.indicator + line + ' 0\n' @staticmethod def generator(pick_variables, as_literals, indicator): def generate(): # Choose the variables contained the clause variables = pick_variables() # Convert the variables chosen to literals lits = as_literals(variables) return Clause(indicator, lits) return generate def solve(param, clauses): """ Determine if the given cnf/xor clauses are satisfiable using the SAT solver indicated in param['solver', halting the computation if the timeout (indicated as part of param['sovler']) is exceeded. Returns (STATUS, PYTHONTIME, REALTIME, USERTIME, SYSTEMTIME) STATUS = 0 if the clauses are unsatisfiable = 1 if the clauses are satisfiable = 2 if timeout occurs = 3 if other error occurs PYTHONTIME = Amount of time taken to determine satisfiability measured by python REALTIME = Amount of time taken to determine satisfiability measured by external clock USERTIME = Amount of time taken to determine satisfiability measured by user time SYSTEMTIME = Amount of time taken to determine satisfiability measured by system time """ basetemp = param["temp_directory"] + param["task_id"] filename = basetemp + "_formula.txt" outputfile = basetemp + "_output.txt" timefile = basetemp + "_time.txt" write_formula(filename, clauses) # Use the sat solver to solve the provided clauses solve_cmd = param["project_directory"] + param["solver"] + " " + str(filename) + " > " + str(outputfile) solve_cmd = solve_cmd.replace('[project_directory]', param["project_directory"]) print solve_cmd # Use unix time to determine elapsed time time_cmd = "(time " + solve_cmd + ") 2> " + str(timefile) clock_start = time.time() os.system(time_cmd) clock_end = time.time() elapsed = clock_end - clock_start # Read the satisfiability results f = open(outputfile, 'r') lines = f.readlines() f.close() if len(lines) < 1: print 'Formula timeout' return 2, elapsed, '0m0s', '0m0s', '0m0s' # timeout has occurred res = lines[-1] # Print out the satisfiability results if requested if 'print_sat' in param and str(param['print_sat']) == 'True': for l in lines: print l # Read the timing results f = open(timefile, 'r') lines = f.readlines() f.close() if len(lines) < 3: print 'Unable to load formula time' return 3, elapsed, '0m0s', '0m0s', '0m0s' # unable to load time rtime = lines[-3].split('\t')[1].strip() utime = lines[-2].split('\t')[1].strip() stime = lines[-1].split('\t')[1].strip() if "INTERRUPTED" in res or "INDETERMINATE" in res: print 'Formula timeout' result_code = 2 # timeout has occurred elif "UNSATISFIABLE" in res: result_code = 0 elif "SATISFIABLE" in res: result_code = 1 else: print 'Solver error: ' + res result_code = 3 return result_code, elapsed, rtime, utime, stime # Write the formula to the provided file def write_formula(filename, clauses): f = open(filename, 'w') for clause in clauses: f.write(str(clause)) f.close() # Generate the filename prefix to be used uniquely by this job def generate_filename(param): return param['experiment'] + "_" + param['tag'] + "_" + param['date'] + "_" + param['jobid'] # Start this cnfxor experiment using the provided parameters def start_experiment(runner, args): if len(args) < 3: print 'Unable to start experiment; too few arguments' return expargs = json.loads(args[1]) expargs = dict([(str(k), str(v)) for k, v in expargs.items()]) inputfile = args[2] with io.open(inputfile) as f: inputs = f.readlines() changing_args = map(json.loads, inputs) for fresh_args in changing_args: taskargs = dict(expargs) for key, val in fresh_args.items(): taskargs[str(key)] = str(val) with open(taskargs['output_file'], 'w') as output: try: runner(taskargs, output) except Exception as e: print 'ERROR: ' + str(taskargs) print e if __name__ == "__main__": start_experiment(run_experiment, sys.argv)