#!/usr/bin/env python3 import math import logging from collections import namedtuple Point = namedtuple("Point", ['x', 'y']) length = None EPSILON = 0.001 class Length(object): """ Length is a helper object o get the distance between customers and facilities. It also has functions to return the facilities that are closest to a customer. """ def __init__(self, facilities, customers): self.customer_to_facility = [[self.length(c.location, f.location) for f in facilities] for c in customers] self.customer_closes_facility = [sorted([f for f in facilities], key=lambda f: self.get(c, f)) for c in customers] def get(self, customer, facility): c_idx = customer.index if type(customer) is Customer else customer f_idx = facility.index if type(facility) is Facility else facility return self.customer_to_facility[c_idx][f_idx] def length(self, point1, point2): return math.sqrt((point1.x - point2.x)**2 + (point1.y - point2.y)**2) def get_facilities(self, customer): """ Returns closest facility in increasing order of distance. """ return self.customer_closes_facility[customer.index] def get_feasible_facilities(self, customer, facilities): return (f for f in self.get_facilities(customer) if f.remaining_capacity >= customer.demand) def get_feasible_open_facilities(self, customer, facilities): return (f for f in self.get_facilities(customer) if f.remaining_capacity >= customer.demand if f.is_open) class Facility(object): def __init__(self, index, setup_cost, capacity, location): self.index = index self.setup_cost = setup_cost self.capacity = capacity self.location = location self.is_open = False self.remaining_capacity = capacity self.customers = set() def remove(self, customer): logging.debug(f"From {self} remove {customer}.") if not customer in self.customers: raise ValueError(f"{customer} not connected to {self}.") delta = 0 self.customers.remove(customer) self.remaining_capacity += customer.demand customer.facility = None if not self.customers and self.is_open: logging.debug(f"{self} is empty but open.") delta -= length.get(customer.index, self.index) return delta def add(self, customer): logging.debug(f"To {self} add {customer}.") if not self.is_open: raise ValueError(f"Cannot connect {customer} to not open {self}.") if customer.demand > self.remaining_capacity: raise ValueError(f"Capacity of {self} too low for {customer}") delta = 0 if (other_facility := customer.facility): delta += other_facility.remove(customer) self.remaining_capacity -= customer.demand self.customers.add(customer) delta += length.get(customer.index, self.index) customer.facility = self return delta def set_open(self): logging.debug(f"Open {self}.") self.is_open = True return self.setup_cost def set_not_open(self): logging.debug(f"Close {self}.") delta = 0 if self.customers: raise ValueError(f"Cannot close {self} with {self.customers}.") self.is_open = False return -self.setup_cost def remove_all_and_close(self): logging.debug(f"{self} remove all and close.") delta = 0 for customer in list(self.customers): delta += self.remove(customer) delta += self.set_not_open() return delta def __str__(self): cap = f"{self.remaining_capacity}/{self.capacity}" status = "O" if self.is_open else "C" s = f"F({self.index}, {cap}, {status}, {self.setup_cost})" return s def __repr__(self): return self.__str__() class Customer(object): def __init__(self, index, demand, location): self.index = index self.demand = demand self.location = location self.facility = None def __str__(self): con = "C" if self.facility is not None else "NC!" s = f"C({self.index}, {con})" return s def __repr__(self): return self.__str__() class Solution(object): def __init__(self, facilities, customers): self.fs = facilities self.cs = customers self.cost = 0 def validate(self): cost = 0 for c in self.cs: if c.facility is None: raise Exception(f"{c} not connected.") cost += length.get(c.index, c.facility.index) for f in self.fs: if f.remaining_capacity < 0: raise Exception(f"{f} exceeds capacity.") if f.customers and not f.is_open: raise Exception(f"{f} has customers, but is not open.") if not f.customers and f.is_open: raise Exception(f"{f} has no customers, but is open.") if f.is_open: cost += f.setup_cost if abs(cost - self.cost) > EPSILON: raise Exception(f"Running cost {self.cost} unequal to {cost}.") return True def plot_map(self): try: import matplotlib.pyplot as plt import matplotlib.lines as lines except ModuleNotFoundError: return figure = plt.figure() for f in self.fs: x, y = f.location color = 'ro' if f.is_open else 'go' plt.plot(x, y, color) # plt.text(x, y, f"{f}") for c in self.cs: x, y = c.location plt.plot(x, y, 'bx') # plt.text(x, y, f"{c}") if (f := c.facility) is not None: x_f, y_f = f.location plt.plot([x, x_f], [y, y_f], 'b-') plt.show() def to_output_data(self): # calculate the cost of the solution self.validate() obj = self.cost customer_to_facility = [c.facility.index for c in self.cs] # prepare the solution in the specified output format output_data = '%.2f' % obj + ' ' + str(0) + '\n' output_data += ' '.join(map(str, customer_to_facility)) return output_data def build_trivial(self): facility = self.fs[0] self.cost += facility.set_open() for customer in self.cs: if facility.remaining_capacity < customer.demand: facility = self.fs[facility.index + 1] self.cost += facility.set_open() self.cost += facility.add(customer) return self def build_greedy(self): def connect_to_closest_facility(customer): cost = 0 for f in length.get_feasible_facilities(customer, self.fs): if not f.is_open: cost += f.set_open() cost += f.add(customer) return cost raise Exception("No feasible facilities for {customer}.") for customer in self.cs: self.cost += connect_to_closest_facility(customer) return self def reconnect_greedy(self, customers): delta = 0 not_connected = [] def connect_better_facility(customer): current_facility = customer.facility if current_facility: current_length = length.get(customer, current_facility) else: current_length = float("inf") for f in length.get_feasible_open_facilities(customer, self.fs): new_length = length.get(customer, f) if new_length < current_length: logging.debug(f"{f} is better for {customer}.") return f.add(customer) elif new_length > current_length: return 0 if not customer.facility: not_connected.append(customer) return 0 delta += sum([connect_better_facility(c) for c in customers]) return delta, not_connected def close_facility(self, facility): logging.debug(f"Closing {facility}.") original_cost = self.cost customers = list(facility.customers) self.cost += facility.remove_all_and_close() delta, not_connected = self.reconnect_greedy(customers) self.cost += delta if not_connected: logging.debug("Not all customers connected. Restore.") elif self.cost < original_cost: delta = original_cost - self.cost logging.info(f"Close {facility} resulted in improvement {delta}.") return True else: logging.debug(f"No improvement. Restore.") self.cost += facility.set_open() delta, not_connected = self.reconnect_greedy(customers) self.cost += delta assert(not not_connected) assert(abs(original_cost - self.cost) < EPSILON) return False def open_facility(self, facility): logging.debug(f"Opening {facility}.") original_cost = self.cost self.cost += facility.set_open() moved = [] for c in self.cs: if c.demand < facility.remaining_capacity and \ length.get(c, c.facility) > length.get(c, facility): moved.append({"previous_facility": c.facility, "c": c}) self.cost += facility.add(c) if self.cost < original_cost: delta = original_cost - self.cost logging.info(f"Open {facility} resulted in improvement {delta}.") return True else: logging.debug(f"No improvement. Restore.") for m in moved: c = m["c"] f = m["previous_facility"] self.cost += f.add(c) self.cost += facility.set_not_open() assert(abs(original_cost - self.cost) < EPSILON) return False def local_search(self): fs = [f for f in self.fs if f.is_open] fs.sort(key=lambda f: len(f.customers)) for f in fs: self.close_facility(f) delta, not_connected = self.reconnect_greedy(self.cs) if not_connected: raise Exception(f"{not_connected=}") self.cost += delta fs = [f for f in self.fs if not f.is_open] fs.sort(key=lambda f: f.setup_cost) for f in fs: self.open_facility(f) def solve_it(input_data): global length facilities, customers = parse(input_data) length = Length(facilities, customers) solution = Solution(facilities, customers) solution.build_greedy() solution.reconnect_greedy(solution.cs) solution.local_search() solution.plot_map() output_data = solution.to_output_data() return output_data def main(): file_location = "data/fl_100_1" with open(file_location, 'r') as input_data_file: input_data = input_data_file.read() print(solve_it(input_data)) def parse(input_data): # parse the input lines = input_data.split('\n') parts = lines[0].split() facility_count = int(parts[0]) customer_count = int(parts[1]) facilities = [] for i in range(1, facility_count+1): parts = lines[i].split() facilities.append(Facility(i-1, float(parts[0]), int(parts[1]), Point(float(parts[2]), float(parts[3])) )) customers = [] for i in range(facility_count+1, facility_count+1+customer_count): parts = lines[i].split() customers.append(Customer(i-1-facility_count, int(parts[0]), Point(float(parts[1]), float(parts[2])))) return facilities, customers if __name__ == "__main__": logging.basicConfig(level=logging.INFO, format='%(message)s') main()