"""MC3-P2: Q-learning & Dyna - grading script. Usage: - Switch to a student feedback directory first (will write "points.txt" and "comments.txt" in pwd). - Run this script with both ml4t/ and student solution in PYTHONPATH, e.g.: PYTHONPATH=ml4t:MC1-P2/jdoe7 python ml4t/mc2_p1_grading/grade_marketsim.py Copyright 2018, Georgia Institute of Technology (Georgia Tech) Atlanta, Georgia 30332 All Rights Reserved Template code for CS 4646/7646 Georgia Tech asserts copyright ownership of this template and all derivative works, including solutions to the projects assigned in this course. Students and other users of this template code are advised not to share it with others or to make it available on publicly viewable websites including repositories such as github and gitlab. This copyright statement should not be removed or edited. We do grant permission to share solutions privately with non-students such as potential employers. However, sharing with other current or future students of CS 7646 is prohibited and subject to being investigated as a GT honor code violation. -----do not edit anything above this line--- Student Name: Tucker Balch (replace with your name) GT User ID: tb34 (replace with your User ID) GT ID: 900897987 (replace with your GT ID) """ import pytest from grading.grading import grader, GradeResult, run_with_timeout, IncorrectOutput import os import sys import traceback as tb import datetime as dt import random import numpy as np import pandas as pd from collections import namedtuple import util # Student modules to import main_code = "QLearner" # module name to import robot_qlearning_testing_seed=1490652871 QLearningTestCase = namedtuple('QLearning', ['description', 'group','world_file','best_reward','median_reward','max_time','points']) qlearning_test_cases = [ QLearningTestCase( description="World 1", group='nodyna', world_file='world01.csv', best_reward=-17, median_reward=-29.5, max_time=2, points=9.5 ), QLearningTestCase( description="World 2", group='nodyna', world_file='world02.csv', best_reward=-14, median_reward=-19, max_time=2, points=9.5 ), QLearningTestCase( description="World 4", group='nodyna', world_file='world04.csv', best_reward=-24, median_reward=-33, max_time=2, points=9.5 ), QLearningTestCase( description="World 6", group='nodyna', world_file='world06.csv', best_reward=-16, median_reward=-23.5, max_time=2, points=9.5 ), QLearningTestCase( description="World 7", group='nodyna', world_file='world07.csv', best_reward=-14, median_reward=-26, max_time=2, points=9.5 ), QLearningTestCase( description="World 8", group='nodyna', world_file='world08.csv', best_reward=-14, median_reward=-19, max_time=2, points=9.5 ), QLearningTestCase( description="World 9", group='nodyna', world_file='world09.csv', best_reward=-15, median_reward=-20, max_time=2, points=9.5 ), QLearningTestCase( description="World 10", group='nodyna', world_file='world10.csv', best_reward=-28, median_reward=-42, max_time=2, points=9.5 ), # Dyna test cases QLearningTestCase( description="World 1, dyna=200", group='dyna', world_file='world01.csv', best_reward=-12, median_reward=-29.5, max_time=10, points=2.5 ), QLearningTestCase( description="World 2, dyna=200", group='dyna', world_file='world02.csv', best_reward=-14, median_reward=-19, max_time=10, points=2.5 ), QLearningTestCase( description="Author check", group='author', world_file='world01.csv', best_reward=0, median_reward=0, max_time=10, points=0 ), ] max_points = 100.0 html_pre_block = True # surround comments with HTML
 tag (for T-Square comments field)  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
# Test functon(s)  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
@pytest.mark.parametrize("description,group,world_file,best_reward,median_reward,max_time,points", qlearning_test_cases)  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
def test_qlearning(description, group, world_file, best_reward, median_reward, max_time, points, grader):  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
    points_earned = 0.0  # initialize points for this test case  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
    try:  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        incorrect = True  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        if not 'QLearner' in globals():  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
            import importlib  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
            m = importlib.import_module('QLearner')  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
            globals()['QLearner'] = m  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        # Unpack test case  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        world = np.array([list(map(float,s.strip().split(','))) for s in util.get_robot_world_file(world_file).readlines()])  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        student_reward = None  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        student_author = None  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        msgs = []  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        if group=='nodyna':  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
            def timeoutwrapper_nodyna():  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
                # Note: the following will NOT be commented durring final grading  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
                # random.seed(robot_qlearning_testing_seed)  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
                # np.random.seed(robot_qlearning_testing_seed)  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
                learner = QLearner.QLearner(num_states=100,\
                                            num_actions = 4, \
                                            alpha = 0.2, \
                                            gamma = 0.9, \
                                            rar = 0.98, \
                                            radr = 0.999, \
                                            dyna = 0, \
                                            verbose=False)  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
                return qltest(worldmap=world,iterations=500,max_steps=10000,learner=learner,verbose=False)  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
            student_reward = run_with_timeout(timeoutwrapper_nodyna,max_time,(),{})  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
            incorrect = False  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
            if student_reward < 1.5*median_reward:  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
                incorrect = True  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
                msgs.append("   Reward too low, expected %s, found %s"%(median_reward,student_reward))  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        elif group=='dyna':  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
            def timeoutwrapper_dyna():  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
                # Note: the following will NOT be commented durring final grading  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
                # random.seed(robot_qlearning_testing_seed)  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
                # np.random.seed(robot_qlearning_testing_seed)  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
                learner = QLearner.QLearner(num_states=100,\
                                            num_actions = 4, \
                                            alpha = 0.2, \
                                            gamma = 0.9, \
                                            rar = 0.5, \
                                            radr = 0.99, \
                                            dyna = 200, \
                                            verbose=False)  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
                return qltest(worldmap=world,iterations=50,max_steps=10000,learner=learner,verbose=False)  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
            student_reward = run_with_timeout(timeoutwrapper_dyna,max_time,(),{})  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
            incorrect = False  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
            if student_reward < 1.5*median_reward:  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
                incorrect = True  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
                msgs.append("   Reward too low, expected %s, found %s"%(median_reward,student_reward))  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        elif group=='author':  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
            points_earned = -20  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
            def timeoutwrapper_author():  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
                # Note: the following will NOT be commented durring final grading  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
                # random.seed(robot_qlearning_testing_seed)  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
                # np.random.seed(robot_qlearning_testing_seed)  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
                learner = QLearner.QLearner(num_states=100,\
                                            num_actions = 4, \
                                            alpha = 0.2, \
                                            gamma = 0.9, \
                                            rar = 0.98, \
                                            radr = 0.999, \
                                            dyna = 0, \
                                            verbose=False)  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
                return learner.author()  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
            student_author = run_with_timeout(timeoutwrapper_author,max_time,(),{})  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
            student_reward = best_reward+1  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
            incorrect = False  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
            if (student_author is None) or (student_author=='tb34'):  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
                incorrect = True  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
                msgs.append("   author() method not implemented correctly. Found {}".format(student_author))  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
            else:  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
                points_earned = points  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        if (not incorrect):  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
            points_earned += points  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        if incorrect:  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
            inputs_str = "    group: {}\n" \
                         "    world_file: {}\n"\
                         "    median_reward: {}\n".format(group, world_file, median_reward)  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
            raise IncorrectOutput("Test failed on one or more output criteria.\n  Inputs:\n{}\n  Failures:\n{}".format(inputs_str, "\n".join(msgs)))  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
    except Exception as e:  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        # Test result: failed  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        msg = "Test case description: {}\n".format(description)  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        # Generate a filtered stacktrace, only showing erroneous lines in student file(s)  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        tb_list = tb.extract_tb(sys.exc_info()[2])  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        for i in range(len(tb_list)):  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
            row = tb_list[i]  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
            tb_list[i] = (os.path.basename(row[0]), row[1], row[2], row[3])  # show only filename instead of long absolute path  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        tb_list = [row for row in tb_list if row[0] in ['QLearner.py','StrategyLearner.py']]  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        if tb_list:  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
            msg += "Traceback:\n"  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
            msg += ''.join(tb.format_list(tb_list))  # contains newlines  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        elif 'grading_traceback' in dir(e):  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
            msg += "Traceback:\n"  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
            msg += ''.join(tb.format_list(e.grading_traceback))  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        msg += "{}: {}".format(e.__class__.__name__, str(e))  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        # Report failure result to grader, with stacktrace  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        grader.add_result(GradeResult(outcome='failed', points=points_earned, msg=msg))  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        raise  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
    else:  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        # Test result: passed (no exceptions)  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        grader.add_result(GradeResult(outcome='passed', points=points_earned, msg=None))  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
def getrobotpos(data):  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
    R = -999  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
    C = -999  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
    for row in range(0, data.shape[0]):  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        for col in range(0, data.shape[1]):  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
            if data[row,col] == 2:  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
                C = col  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
                R = row  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
    if (R+C)<0:  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        print("warning: start location not defined")  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
    return R, C  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
# find where the goal is in the map  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
def getgoalpos(data):  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
    R = -999  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
    C = -999  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
    for row in range(0, data.shape[0]):  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        for col in range(0, data.shape[1]):  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
            if data[row,col] == 3:  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
                C = col  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
                R = row  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
    if (R+C)<0:  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        print("warning: goal location not defined")  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
    return (R, C)  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
# move the robot and report reward  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
def movebot(data,oldpos,a):  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
    testr, testc = oldpos  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
    randomrate = 0.20 # how often do we move randomly  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
    quicksandreward = -100 # penalty for stepping on quicksand  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
    # decide if we're going to ignore the action and  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
    # choose a random one instead  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
    if random.uniform(0.0, 1.0) <= randomrate: # going rogue  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        a = random.randint(0,3) # choose the random direction  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
    # update the test location  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
    if a == 0: #north  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        testr = testr - 1  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
    elif a == 1: #east  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        testc = testc + 1  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
    elif a == 2: #south  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        testr = testr + 1  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
    elif a == 3: #west  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        testc = testc - 1  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
    reward = -1 # default reward is negative one  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
    # see if it is legal. if not, revert  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
    if testr < 0: # off the map  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        testr, testc = oldpos  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
    elif testr >= data.shape[0]: # off the map  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        testr, testc = oldpos  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
    elif testc < 0: # off the map  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        testr, testc = oldpos  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
    elif testc >= data.shape[1]: # off the map  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        testr, testc = oldpos  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
    elif data[testr, testc] == 1: # it is an obstacle  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        testr, testc = oldpos  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
    elif data[testr, testc] == 5: # it is quicksand  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        reward = quicksandreward  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        data[testr, testc] = 6 # mark the event  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
    elif data[testr, testc] == 6: # it is still quicksand  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        reward = quicksandreward  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        data[testr, testc] = 6 # mark the event  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
    elif data[testr, testc] == 3:  # it is the goal  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        reward = 1 # for reaching the goal  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
    return (testr, testc), reward #return the new, legal location  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
# convert the location to a single integer  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
def discretize(pos):  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
    return pos[0]*10 + pos[1]  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
def qltest(worldmap, iterations, max_steps, learner, verbose):  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
# each iteration involves one trip to the goal  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
    startpos = getrobotpos(worldmap) #find where the robot starts  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
    goalpos = getgoalpos(worldmap) #find where the goal is  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
    # max_reward = -float('inf')  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
    all_rewards = list()  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
    for iteration in range(1,iterations+1):  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        total_reward = 0  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        data = worldmap.copy()  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        robopos = startpos  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        state = discretize(robopos) #convert the location to a state  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        action = learner.querysetstate(state) #set the state and get first action  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        count = 0  		  	   		     			  		 			     			  	  		 	  	 		 			  		  			
        while (robopos != goalpos) & (count