-
-
Notifications
You must be signed in to change notification settings - Fork 276
Add Smart Traffic Light Example #369
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
7a8ac0f
c3fd606
596780a
c8980c4
51cd643
9e8fca0
da6ce26
5d05069
9813cf6
5bce758
fd06405
2497cdc
c901a16
95e228c
6eb4f35
7bbba0c
aa39394
e91f877
e083e75
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,26 @@ | ||
| # Smart Traffic Lights | ||
|
|
||
| ## Summary | ||
|
|
||
| An optimization simulation where traffic light agents use local information to minimize vehicle wait times at intersections. | ||
|
|
||
|  | ||
|
|
||
| ## Installation | ||
|
|
||
| To install the dependencies use pip and the requirements.txt in this directory. e.g. | ||
|
|
||
| ``` | ||
| $ pip install -r requirements.txt | ||
| ``` | ||
|
|
||
|
|
||
|
|
||
| Then open your browser to [http://127.0.0.1:8521/](http://127.0.0.1:8521/), select the model parameters, press Reset, then Start. | ||
|
|
||
| ## Files | ||
|
|
||
| * ``smart_traffic_light/agents.py``: Defines the CarAgent, the TrafficLightAgent and the IntersectionController classes. | ||
| * ``smart_traffic_light/model.py``: Defines the Traffic model and the DataCollector functions. | ||
| * ``run_example.py``: Script to compare waiting time in traffic using smart and normal traffic light controller. | ||
| * ``app.py``: Visualization script on Solara. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,80 @@ | ||
| from typing import Any | ||
|
|
||
| import mesa | ||
| from mesa.visualization import SolaraViz, make_plot_component, make_space_component | ||
| from smart_traffic_lights.agents import ( | ||
| CarAgent, | ||
| Direction, | ||
| LightState, | ||
| TrafficLightAgent, | ||
| ) | ||
| from smart_traffic_lights.model import TrafficModel | ||
|
|
||
|
|
||
| def traffic_portrayal(agent: mesa.Agent) -> dict[str, Any]: | ||
| """ | ||
| Determines how agents are drawn on the grid. | ||
|
|
||
| - Cars: Blue for East, Purple for North. | ||
| - Lights: Circle markers, Red or Green based on state. | ||
| - Controller: Hidden (has no position). | ||
| """ | ||
|
|
||
| if isinstance(agent, TrafficLightAgent): | ||
| return { | ||
| "color": "tab:green" if agent.state == LightState.GREEN else "tab:red", | ||
| "marker": "o", # Circle for lights | ||
| "size": 100, | ||
| "zorder": 1, # Ensure lights are drawn above cars | ||
| "alpha": 1.0, | ||
| } | ||
| if isinstance(agent, CarAgent): | ||
| return { | ||
| "color": "tab:blue" if agent.direction == Direction.EAST else "tab:purple", | ||
| "marker": "s", # Square for cars | ||
| "zorder": 0, # Ensure lights are drawn above cars | ||
| "size": 40, | ||
| } | ||
| return {} | ||
|
|
||
|
|
||
| # Define interactive parameters for Solara UI | ||
| model_params = { | ||
| "width": mesa.visualization.Slider( | ||
| label="Width of the grid", value=20, min=5, max=40, step=1 | ||
| ), | ||
| "height": mesa.visualization.Slider( | ||
| label="Height of the grid", value=20, min=5, max=40, step=1 | ||
| ), | ||
| "num_cars_east": mesa.visualization.Slider( | ||
| label="Number of cars going east", value=8, min=1, max=20, step=1 | ||
| ), | ||
| "num_cars_north": mesa.visualization.Slider( | ||
| label="Number of cars going north", value=8, min=1, max=20, step=1 | ||
| ), | ||
| "smart_lights": mesa.visualization.Slider( | ||
| label="Smart Lights (0=Off, 1=On)", value=1, min=0, max=1, step=1 | ||
| ), | ||
| } | ||
|
|
||
| # Create the Grid View | ||
| space_component = make_space_component(traffic_portrayal) | ||
|
|
||
| # Create the Wait Time Chart | ||
| wait_time_chart = make_plot_component({"Total_Red_Light_Wait_Time": "tab:red"}) | ||
|
|
||
| initial_model = TrafficModel() | ||
|
|
||
| # Instantiate the Solara Visualization Page | ||
| app = SolaraViz( | ||
| model=initial_model, | ||
| model_params=model_params, | ||
| components=[ | ||
| space_component, | ||
| wait_time_chart, | ||
| ], | ||
| name="Smart Traffic Simulation", | ||
| ) | ||
|
|
||
|
|
||
| # app |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,7 @@ | ||
| mesa[viz]>=3.0 | ||
| networkx | ||
| numpy | ||
| pandas | ||
| enum | ||
| typing | ||
| matplotlib |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,54 @@ | ||
| import numpy as np | ||
| from smart_traffic_lights.model import TrafficModel | ||
|
|
||
| STEPS = 2000 | ||
|
|
||
| width = 20 | ||
| height = 20 | ||
| num_cars_east = 8 | ||
| num_cars_north = 8 | ||
|
|
||
|
|
||
| print("Running Static Traffic Lights Simulation...") | ||
| model_static = TrafficModel(smart_lights=False) | ||
| for _ in range(STEPS): | ||
| model_static.step() | ||
|
|
||
| print("Running Smart Traffic Lights Simulation...") | ||
| model_smart = TrafficModel(smart_lights=True) | ||
| for _ in range(STEPS): | ||
| model_smart.step() | ||
|
|
||
| # Retrieve data | ||
| static_df = model_static.datacollector.get_model_vars_dataframe() | ||
| smart_df = model_smart.datacollector.get_model_vars_dataframe() | ||
|
|
||
| # Calculate the percentage improvement | ||
| final_wait_static = static_df["Total_Red_Light_Wait_Time"].iloc[-1] | ||
| final_wait_smart = smart_df["Total_Red_Light_Wait_Time"].iloc[-1] | ||
|
|
||
| improvement = np.round( | ||
| (final_wait_static - final_wait_smart) / final_wait_static * 100, 2 | ||
| ) | ||
|
|
||
| print("-" * 40) | ||
| print(f"Results after {STEPS} steps:") | ||
| print(f"Total Red Light Wait Time (Static Lights): {final_wait_static} steps") | ||
| print(f"Total Red Light Wait Time (Smart Lights) : {final_wait_smart} steps") | ||
| print( | ||
| f"Performance Improvement : {improvement}% reduction in red light wait time" | ||
| ) | ||
| print("-" * 40) | ||
|
|
||
| final_wait_static = static_df["Total_Wait_Time"].iloc[-1] | ||
| final_wait_smart = smart_df["Total_Wait_Time"].iloc[-1] | ||
|
|
||
| improvement = np.round( | ||
| (final_wait_static - final_wait_smart) / final_wait_static * 100, 2 | ||
| ) | ||
|
|
||
| print("-" * 40) | ||
| print(f"Total Wait Time (Static Lights): {final_wait_static} steps") | ||
| print(f"Total Wait Time (Smart Lights) : {final_wait_smart} steps") | ||
| print(f"Performance Improvement : {improvement}% reduction in total wait time") | ||
| print("-" * 40) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,168 @@ | ||
| import enum | ||
|
|
||
| import mesa | ||
| from mesa.discrete_space import CellAgent | ||
|
|
||
|
|
||
| class Direction(enum.Enum): | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the Direction enum class only defines east and north, there's no south or west. This means traffic only flows in two directions. Is this intentional? A real intersection would have opposing traffic flows. This is just a suggestion from my side, not something to change, i think it would make the model more interesting whilst adding very less code.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, for simplicity I only had these two directions. Something that I want to implement is a grid-like street system with cars that can also change direction. This will get a more realistic model, but it requires some more work to make. |
||
| EAST = (1, 0) | ||
| NORTH = (0, 1) | ||
|
|
||
|
|
||
| class LightState(enum.Enum): | ||
| RED = 0 | ||
| GREEN = 1 | ||
|
|
||
|
|
||
| class TrafficLightAgent(CellAgent): | ||
| """ | ||
| An agent representing a single traffic light. | ||
|
|
||
| Attributes: | ||
| state (LightState): The current state of the light (RED or GREEN). | ||
| direction (Direction): The flow of traffic this light controls. | ||
| """ | ||
|
|
||
| def __init__(self, model: mesa.Model, state: LightState, direction: Direction): | ||
| super().__init__(model) | ||
| self.state = state | ||
| self.direction = direction | ||
|
|
||
| def step(self): | ||
| # Traffic lights are passive; the Controller changes their state. | ||
| pass | ||
|
|
||
|
|
||
| class CarAgent(CellAgent): | ||
| """ | ||
| An agent representing a car in the grid. | ||
|
|
||
| Attributes: | ||
| direction (Direction): The direction the car is traveling. | ||
| wait_time (int): Accumulator for time steps spent not moving. | ||
| """ | ||
|
|
||
| def __init__(self, model: mesa.Model, direction: Direction): | ||
| super().__init__(model) | ||
| self.direction = direction | ||
| self.total_wait_time = 0 | ||
| self.red_light_wait_time = 0 | ||
|
|
||
| def step(self): | ||
| """ | ||
| Determines if the car can move forward based on obstacles and lights. | ||
| """ | ||
|
|
||
| # Calculate the next coordinate based on direction | ||
| current_x, current_y = self.cell.coordinate | ||
|
|
||
| # Calculate the next coordinate based on direction (wrapping around torus) | ||
| next_x = (current_x + self.direction.value[0]) % self.model.width | ||
| next_y = (current_y + self.direction.value[1]) % self.model.height | ||
| next_pos = (next_x, next_y) | ||
|
|
||
| can_move = True | ||
| stopped_by_red_light = False | ||
|
|
||
| next_cell = self.model.cells[next_pos] | ||
|
|
||
| for obj in next_cell.agents: | ||
| if isinstance(obj, CarAgent): | ||
| can_move = False | ||
| break | ||
| elif isinstance(obj, TrafficLightAgent): | ||
| # Only stop if the light controls our direction and is red | ||
| if obj.direction == self.direction and obj.state == LightState.RED: | ||
| can_move = False | ||
| stopped_by_red_light = True | ||
| break | ||
|
|
||
| if can_move: | ||
| # Moving the agent is now just reassigning the cell! | ||
| self.move_to(next_cell) | ||
| else: | ||
| self.total_wait_time += 1 | ||
| if stopped_by_red_light: | ||
| self.red_light_wait_time += 1 | ||
|
|
||
|
|
||
| class IntersectionController(mesa.Agent): | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. extending mesa.Agent seems intentional since the controller has no physical grid position, but a comment explaining this design choice would be extremely helpful in understanding why it differs from the other agents specifically CellAgent. |
||
| """ | ||
| A meta-agent that controls the traffic lights at an intersection. | ||
|
|
||
| Attributes: | ||
| smart (bool): If True, uses queue density to toggle. If False, uses fixed timer. | ||
| lights (List[TrafficLightAgent]): The lights managed by this controller. | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| model: mesa.Model, | ||
| smart: bool, | ||
| lights: list[TrafficLightAgent], | ||
| sensor_range: int = 5, | ||
| static_wait=15, | ||
| ): | ||
| super().__init__(model) | ||
| self.smart = smart | ||
| self.static_wait = static_wait | ||
| self.lights = {light.direction: light for light in lights} # Dictionary | ||
| self.sensor_range = sensor_range | ||
| self.timer = 0 | ||
| self.cooldown = 2 # Minimum steps before a light can change again | ||
|
|
||
| def get_queue_density(self, light: TrafficLightAgent) -> int: | ||
| """ | ||
| Calculates the number of cars waiting in the sensor zone approaching the light. | ||
| """ | ||
| count = 0 | ||
| # Look backwards from the light based on the direction it controls | ||
| dx, dy = light.direction.value | ||
| light_x, light_y = light.cell.coordinate | ||
|
|
||
| for i in range(1, self.sensor_range + 1): | ||
| check_x = (light_x - dx * i) % self.model.width | ||
| check_y = (light_y - dy * i) % self.model.height | ||
| check_pos = (check_x, check_y) | ||
|
|
||
| # Look up the cell at check_pos using our lookup dictionary | ||
| check_cell = self.model.cells[check_pos] | ||
| if any(isinstance(a, CarAgent) for a in check_cell.agents): | ||
| count += 1 | ||
|
|
||
| return count | ||
|
|
||
| def toggle_lights(self): | ||
| """ | ||
| Switches all lights managed by the controller. | ||
| """ | ||
| for light in self.lights.values(): | ||
| light.state = ( | ||
| LightState.GREEN if light.state == LightState.RED else LightState.RED | ||
| ) | ||
| self.timer = 0 | ||
|
|
||
| def step(self): | ||
| self.timer += 1 | ||
|
|
||
| if not self.smart: | ||
| # Static: Toggle every fixed interval | ||
| if self.timer >= self.static_wait: | ||
| self.toggle_lights() | ||
| else: | ||
| # Smart: Toggle based on dynamic queue density | ||
| if self.timer >= self.cooldown: | ||
| # Select lights by direction to find queue lengths | ||
| east_light = self.lights[Direction.EAST] | ||
| north_light = self.lights[Direction.NORTH] | ||
|
|
||
| east_queue = self.get_queue_density(east_light) | ||
| north_queue = self.get_queue_density(north_light) | ||
|
|
||
| # If the current green light has a smaller queue than the red light, toggle | ||
| if ( | ||
| east_light.state == LightState.GREEN and north_queue > east_queue | ||
| ) or ( | ||
| north_light.state == LightState.GREEN and east_queue > north_queue | ||
| ): | ||
| self.toggle_lights() | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Found the model really interesting/unique had some fun playing around with it, also the addition of a Gif rather than a screenshot is really helpful.