discrete_optimization/facility/facility.py

373 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
import math
import logging
from collections import namedtuple
Point = namedtuple("Point", ['x', 'y'])
length = None
EPSILON = 0.001
class Length(object):
""" Length is a helper object o get the distance between
customers and facilities. It also has functions
to return the facilities that are closest to a customer. """
def __init__(self, facilities, customers):
self.customer_to_facility = [[self.length(c.location, f.location)
for f in facilities]
for c in customers]
self.customer_closes_facility = [sorted([f for f in facilities],
key=lambda f: self.get(c, f))
for c in customers]
def get(self, customer, facility):
c_idx = customer.index if type(customer) is Customer else customer
f_idx = facility.index if type(facility) is Facility else facility
return self.customer_to_facility[c_idx][f_idx]
def length(self, point1, point2):
return math.sqrt((point1.x - point2.x)**2 + (point1.y - point2.y)**2)
def get_facilities(self, customer):
""" Returns closest facility in increasing order of distance. """
return self.customer_closes_facility[customer.index]
def get_feasible_facilities(self, customer, facilities):
return (f for f in self.get_facilities(customer)
if f.remaining_capacity >= customer.demand)
def get_feasible_open_facilities(self, customer, facilities):
return (f for f in self.get_facilities(customer)
if f.remaining_capacity >= customer.demand
if f.is_open)
class Facility(object):
def __init__(self, index, setup_cost, capacity, location):
self.index = index
self.setup_cost = setup_cost
self.capacity = capacity
self.location = location
self.is_open = False
self.remaining_capacity = capacity
self.customers = set()
def remove(self, customer):
logging.debug(f"From {self} remove {customer}.")
if not customer in self.customers:
raise ValueError(f"{customer} not connected to {self}.")
delta = 0
self.customers.remove(customer)
self.remaining_capacity += customer.demand
customer.facility = None
if not self.customers and self.is_open:
logging.debug(f"{self} is empty but open.")
delta -= length.get(customer.index, self.index)
return delta
def add(self, customer):
logging.debug(f"To {self} add {customer}.")
if not self.is_open:
raise ValueError(f"Cannot connect {customer} to not open {self}.")
if customer.demand > self.remaining_capacity:
raise ValueError(f"Capacity of {self} too low for {customer}")
delta = 0
if (other_facility := customer.facility):
delta += other_facility.remove(customer)
self.remaining_capacity -= customer.demand
self.customers.add(customer)
delta += length.get(customer.index, self.index)
customer.facility = self
return delta
def set_open(self):
logging.debug(f"Open {self}.")
self.is_open = True
return self.setup_cost
def set_not_open(self):
logging.debug(f"Close {self}.")
delta = 0
if self.customers:
raise ValueError(f"Cannot close {self} with {self.customers}.")
self.is_open = False
return -self.setup_cost
def remove_all_and_close(self):
logging.debug(f"{self} remove all and close.")
delta = 0
for customer in list(self.customers):
delta += self.remove(customer)
delta += self.set_not_open()
return delta
def __str__(self):
cap = f"{self.remaining_capacity}/{self.capacity}"
status = "O" if self.is_open else "C"
s = f"F({self.index}, {cap}, {status}, {self.setup_cost})"
return s
def __repr__(self):
return self.__str__()
class Customer(object):
def __init__(self, index, demand, location):
self.index = index
self.demand = demand
self.location = location
self.facility = None
def __str__(self):
con = "C" if self.facility is not None else "NC!"
s = f"C({self.index}, {con})"
return s
def __repr__(self):
return self.__str__()
class Solution(object):
def __init__(self, facilities, customers):
self.fs = facilities
self.cs = customers
self.cost = 0
def validate(self):
cost = 0
for c in self.cs:
if c.facility is None:
raise Exception(f"{c} not connected.")
cost += length.get(c.index, c.facility.index)
for f in self.fs:
if f.remaining_capacity < 0:
raise Exception(f"{f} exceeds capacity.")
if f.customers and not f.is_open:
raise Exception(f"{f} has customers, but is not open.")
if not f.customers and f.is_open:
raise Exception(f"{f} has no customers, but is open.")
if f.is_open:
cost += f.setup_cost
if abs(cost - self.cost) > EPSILON:
raise Exception(f"Running cost {self.cost} unequal to {cost}.")
return True
def plot_map(self):
try:
import matplotlib.pyplot as plt
import matplotlib.lines as lines
except ModuleNotFoundError:
return
figure = plt.figure()
for f in self.fs:
x, y = f.location
color = 'ro' if f.is_open else 'go'
plt.plot(x, y, color)
# plt.text(x, y, f"{f}")
for c in self.cs:
x, y = c.location
plt.plot(x, y, 'bx')
# plt.text(x, y, f"{c}")
if (f := c.facility) is not None:
x_f, y_f = f.location
plt.plot([x, x_f], [y, y_f], 'b-')
plt.show()
def to_output_data(self):
# calculate the cost of the solution
self.validate()
obj = self.cost
customer_to_facility = [c.facility.index for c in self.cs]
# prepare the solution in the specified output format
output_data = '%.2f' % obj + ' ' + str(0) + '\n'
output_data += ' '.join(map(str, customer_to_facility))
return output_data
def build_trivial(self):
facility = self.fs[0]
self.cost += facility.set_open()
for customer in self.cs:
if facility.remaining_capacity < customer.demand:
facility = self.fs[facility.index + 1]
self.cost += facility.set_open()
self.cost += facility.add(customer)
return self
def build_greedy(self):
def connect_to_closest_facility(customer):
cost = 0
for f in length.get_feasible_facilities(customer, self.fs):
if not f.is_open:
cost += f.set_open()
cost += f.add(customer)
return cost
raise Exception("No feasible facilities for {customer}.")
for customer in self.cs:
self.cost += connect_to_closest_facility(customer)
return self
def reconnect_greedy(self, customers):
delta = 0
not_connected = []
def connect_better_facility(customer):
current_facility = customer.facility
if current_facility:
current_length = length.get(customer, current_facility)
else:
current_length = float("inf")
for f in length.get_feasible_open_facilities(customer, self.fs):
new_length = length.get(customer, f)
if new_length < current_length:
logging.debug(f"{f} is better for {customer}.")
return f.add(customer)
elif new_length > current_length:
return 0
if not customer.facility:
not_connected.append(customer)
return 0
delta += sum([connect_better_facility(c) for c in customers])
return delta, not_connected
def close_facility(self, facility):
logging.debug(f"Closing {facility}.")
original_cost = self.cost
customers = list(facility.customers)
self.cost += facility.remove_all_and_close()
delta, not_connected = self.reconnect_greedy(customers)
self.cost += delta
if not_connected:
logging.debug("Not all customers connected. Restore.")
elif self.cost < original_cost:
delta = original_cost - self.cost
logging.info(f"Close {facility} resulted in improvement {delta}.")
return True
else:
logging.debug(f"No improvement. Restore.")
self.cost += facility.set_open()
delta, not_connected = self.reconnect_greedy(customers)
self.cost += delta
assert(not not_connected)
assert(abs(original_cost - self.cost) < EPSILON)
return False
def open_facility(self, facility):
logging.debug(f"Opening {facility}.")
original_cost = self.cost
self.cost += facility.set_open()
moved = []
for c in self.cs:
if c.demand < facility.remaining_capacity and \
length.get(c, c.facility) > length.get(c, facility):
moved.append({"previous_facility": c.facility, "c": c})
self.cost += facility.add(c)
if self.cost < original_cost:
delta = original_cost - self.cost
logging.info(f"Open {facility} resulted in improvement {delta}.")
return True
else:
logging.debug(f"No improvement. Restore.")
for m in moved:
c = m["c"]
f = m["previous_facility"]
self.cost += f.add(c)
self.cost += facility.set_not_open()
assert(abs(original_cost - self.cost) < EPSILON)
return False
def local_search(self):
fs = [f for f in self.fs if f.is_open]
fs.sort(key=lambda f: len(f.customers))
for f in fs:
self.close_facility(f)
delta, not_connected = self.reconnect_greedy(self.cs)
if not_connected:
raise Exception(f"{not_connected=}")
self.cost += delta
fs = [f for f in self.fs if not f.is_open]
fs.sort(key=lambda f: f.setup_cost)
for f in fs:
self.open_facility(f)
def solve_it(input_data):
global length
facilities, customers = parse(input_data)
length = Length(facilities, customers)
solution = Solution(facilities, customers)
solution.build_greedy()
solution.reconnect_greedy(solution.cs)
solution.local_search()
solution.plot_map()
output_data = solution.to_output_data()
return output_data
def main():
file_location = "data/fl_100_1"
with open(file_location, 'r') as input_data_file:
input_data = input_data_file.read()
print(solve_it(input_data))
def parse(input_data):
# parse the input
lines = input_data.split('\n')
parts = lines[0].split()
facility_count = int(parts[0])
customer_count = int(parts[1])
facilities = []
for i in range(1, facility_count+1):
parts = lines[i].split()
facilities.append(Facility(i-1, float(parts[0]), int(parts[1]),
Point(float(parts[2]), float(parts[3])) ))
customers = []
for i in range(facility_count+1, facility_count+1+customer_count):
parts = lines[i].split()
customers.append(Customer(i-1-facility_count, int(parts[0]),
Point(float(parts[1]), float(parts[2]))))
return facilities, customers
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format='%(message)s')
main()