Sample Code

Here we provide a sample code to illustrate how to generate a traffic signal plan through interacting with the simulator. In the sample code, we implement two baseline methods: Self-organizing traffic light (SOTL) and reinforcement learning.

You can run SOTL using the following command:


python3 run_sotl.py --scenario "hangzhou_bc_tyc_1h_7_8_1848"
          

You can run a basic reinforcement learning code using the following command:


python3 run_rl.py --scenario "hangzhou_bc_tyc_1h_7_8_1848"
          

1 Preliminary

intersection
Figure 1 Description of the intersection

road_link = {"type":..., "startRoad":..., "endRoad":...,
                     "direction":..., "laneLinks":...}
          

lane_link = {"startLaneIndex":..., "endLaneIndex":..., "points":...}
          

The part of points are the geometric points to define the lane physical positions. "startLaneIndex" means the lane index of the start road while "endLaneIndex" is the lane index of the end road.

2 Read traffic environment from simulator

In the cityflow_env.py, a class of the cityflow environment is provided. There is a function to get the state features of the environment.


def get_state(self):
	state = {
	# {lane_id: lane_count, ...}
	state['lane_vehicle_count'] = self.eng.get_lane_vehicle_count() 
	# {lane_id: lane_waiting_count, ...} 
	state['lane_waiting_vehicle_count'] = self.eng.get_lane_waiting_vehicle_count()  
	# {lane_id: [vehicle1_id, vehicle2_id, ...], ...}
	state['lane_vehicles'] = self.eng.get_lane_vehicles() 
	# {vehicle_id: vehicle_speed, ...} 
	state['vehicle_speed'] = self.eng.get_vehicle_speed()  
	# {vehicle_id: distance, ...}
	state['vehicle_distance'] = self.eng.get_vehicle_distance() 
	state['current_time'] = self.eng.get_current_time()
	state['current_phase'] = self.current_phase
	state['current_phase_time'] = self.current_phase_time
	return state
          

3 Calculate traffic signal plans

As mentioned above, for a control algorithm, the mapping from lanes to a phase is crucial. We obtain the mapping by the function parse_roadnet(roadnetfile) in the utility.py.

The first step is to read the roadnet json file into a dictionary with the intersection_id as the key and the details of this intersection as the value.


roadnet = json.load(open(roadnetFile))
          

Then we process each intersection respectively.


lane_phase_info_dict = {}
for intersection in roadnet["intersections"]:
    if intersection['virtual']:
        continue
    lane_phase_info_dict[intersection['id']] = {
                        "start_lane": [],
                        "end_lane": [],
                        "phase": [],
                        "phase_startLane_mapping": {},
                        "phase_roadLink_mapping": {}
                        }
          

For an intersection, we record the mapping in a dictionary. Note that only non-virtual intersections are controlled by the traffic signal.

You can traverse the list of intersection["roadLinks"] and then the list of road_link["laneLinks"] to record the roadLink_lane_pair, showing which lane pairs form a roadlink.


road_links = intersection["roadLinks"]
start_lane = []
end_lane = []
# roadLink includes some lane_pair: (start_lane, end_lane)
roadLink_lane_pair = {ri: [] for ri in
                      range(len(road_links))}  

for ri in range(len(road_links)):
    road_link = road_links[ri]
    for lane_link in road_link["laneLinks"]:
        sl = road_link['startRoad'] + "_" + str(lane_link["startLaneIndex"])
        el = road_link['endRoad'] + "_" + str(lane_link["endLaneIndex"])
        start_lane.append(sl)
        end_lane.append(el)
        roadLink_lane_pair[ri].append((sl, el))

lane_phase_info_dict[intersection['id']]["start_lane"] = sorted(list(set(start_lane)))
lane_phase_info_dict[intersection['id']]["end_lane"] = sorted(list(set(end_lane)))
          

Finally, we can figure out the mapping from lanes to a phase.


for phase_i in range(1, len(intersection["trafficLight"]["lightphases"])):
    p = intersection["trafficLight"]["lightphases"][phase_i]
    lane_pair = []
    start_lane = []
    for ri in p["availableRoadLinks"]:
        lane_pair.extend(roadLink_lane_pair[ri])
        if roadLink_lane_pair[ri][0][0] not in start_lane:
            start_lane.append(roadLink_lane_pair[ri][0][0])
    lane_phase_info_dict[intersection['id']]["phase"].append(phase_i)
    lane_phase_info_dict[intersection['id']]["phase_startLane_mapping"][phase_i] = start_lane
    lane_phase_info_dict[intersection['id']]["phase_roadLink_mapping"][phase_i] = lane_pair
          

The information of phases is stored in the list of intersection["trafficLight"]["lightphases"] which is listed below. The index of the list represents a phase. Note that the phase #0 is ignored since it serves as the yellow signal and it is not controlled by the algorithm.


"lightphases": [
  {"time": 5, "availableRoadLinks": []},
  {"time": 30, "availableRoadLinks": [0, 4]},
  {"time": 30, "availableRoadLinks": [2, 7]},
  {"time": 30, "availableRoadLinks": [1, 5]},
  {"time": 30, "availableRoadLinks": [3, 6]},
  {"time": 30, "availableRoadLinks": [0, 1]},
  {"time": 30, "availableRoadLinks": [4, 5]},
  {"time": 30, "availableRoadLinks": [2, 3]},
  {"time": 30, "availableRoadLinks": [6, 7]},
  ]
          

Each phase contains a certain number of roadlinks so given available roadlinks, lanes of the phase can be known through roadLink_lane_pair.

3.1 SOTL method

As for the SOTL(run_sotl.py), it is a simple adaptive method for traffic signal control.


def choose_action(self, state):
    cur_phase = state["current_phase"]
    print("Time: {}, Phase: {}".format(state['current_time'], cur_phase))
    if state["current_phase_time"] >= self.phi:
        num_green_vehicle = sum([state["lane_waiting_vehicle_count"][i] for i in self.phase_startLane_mapping[cur_phase]])
        num_red_vehicle = sum([state["lane_waiting_vehicle_count"][i] for i in self.lane_phase_info[self.intersection_id]["start_lane"]]) - num_green_vehicle
        if num_green_vehicle <= self.min_green_vehicle and num_red_vehicle > self.max_red_vehicle:
            self.action = cur_phase % len(self.phase_list) + 1
    return self.action
          

There are some pre-defined parameters, minimum green time phi, minimum waiting vehicles on the lanes with a green signal, and the threshold of waiting vehicles on the lanes with a red signal. When the following three conditions are satisfied, the phase will be set to the next one. (1) the time of current phase exceeds the minimum green time \phi; (2) the number of "green vehicles" is less than the min waiting vehicles on green lanes; (3) the number of "red vehicles" surpasses the threshod of waiting vehicles on red lanes;

3.2 Reinforcement learning method

Apart from SOTL, a simple deep q-learning algorithm (run_rl.py) is also provided to calculate traffic signal plan. In this sample code, there are following definitions.

The following code segement illustrates these four definitions in the sample code.


# state definition in run_rl.py
state = env.get_state()
state = np.array(list(state['start_lane_vehicle_count'].values()) + [state['current_phase']]) # a sample state definition
state = np.reshape(state, [1, state_size])

# action definition in run_rl.py
action = phase_list[agent.choose_action(state)]

# reward defined in CityFlow_env.py
reward = env.get_reward() 
'''
def get_reward():
   lane_waiting_vehicle_count = self.eng.get_lane_waiting_vehicle_count()
  reward = -1 * sum(list(lane_waiting_vehicle_count.values())) 
  return reward
'''

# model network defined in dqn_agent.py
model = Sequential()
model.add(Dense(40, input_dim=self.state_size, activation='relu'))
model.add(Dense(40, activation='relu'))
model.add(Dense(self.action_size, activation='linear'))
          

Please note that this reinforcement learning algorithm is just for your reference and you may need to redesign more suitable rl algorithm, including network structure, state definition, reward function and so on.

4 Feed one traffic signal to simulator

In particular, we recommend that you implement your control method in an agent class as we write in sotl_agent.py and dqn_agent.py. A function of choose_action returns an action indicating the phase to be set in the simulator.

In cityflow_env.py, the step function in the cityflow_env class controls the simulator to carry out an action. And then the phase is set through the function self.eng.set_tl_phase(*self.intersection_id**, self.current_phase)*. Two paramters of set_tl_phase are the intersection_id and phase_id.


def step(self, next_phase):
    if self.current_phase == next_phase:
        self.current_phase_time += 1
    else:
        self.current_phase = next_phase
        self.current_phase_time = 1

    self.eng.set_tl_phase(self.intersection_id, self.current_phase)
    self.eng.next_step()
    self.phase_log.append(self.current_phase)
      

5 Record your traffic signal plan

Everytime when you finish controlling, the traffic signal plan can be recorded by runing 'env.log()', like in the 'run_sotl.py' and 'run_rl.py'.


# defined in CityFlow_env.py
env.log()
'''
def log(self):
    df = pd.DataFrame({self.intersection_id: self.phase_log[:self.num_step]})
    if not os.path.exists(self.config['data']):
        os.makedirs(self.config['data'])
    df.to_csv(os.path.join(self.config['data'], 'signal_plan.txt'), index=None
'''