Automatic Waste Pile Segregation

Published Aug 07, 2024
 20 hours to build
 Intermediate

This project aims to use a robotic arm to segregate a pile of waste into the commonly recycled waste types - plastic and metal using a robotic arm.

display image

Components Used

Arduino UNO
Arduino UNO
1
3d printed Robotic Arm
A prototype to perform actions
1
Servo motors
These provided 6 DoF for motion of the arm
6
Description

This project aims to use a robotic arm to segregate waste into the most commonly recycled plastic waste (plastic and metal).

Step 1: Image Detection

First, an image detection model is fine-tuned by training it on a custom dataset representing the type of waste most common in your society. It is important to have an optimal balance between the classes of waste to avoid bias in the model. For this model, we used the YOLOv8 image detection and classification model. YOLOv8 uses a split directory format, hence the files in the data directory should be sorted as below

The labels of the images was generated using Roboflow.

The code for fine-tuning the model can be found in the attached code folder. This code saves the model to the directory with the data. Then, the model is saved and run locally. The entire training process can be done on Google Colab especially if one does not have strong computing power.

## Installing necessary depedencies
!nvidia-smi
import os
HOME = os.getcwd()
print(HOME)
!pip install -q supervision
!pip install yolov8
!pip install ultralytics
import ultralytics
ultralytics.checks()
import numpy as np
from sklearn.utils.class_weight import compute_class_weight
import os
import yaml
from IPython.display import Image, display
import matplotlib.pyplot as plt
from google.colab import drive
drive.mount('/content/drive')
##Testing the original YOLO model on a random image
!yolo predict model=yolov8n.pt source='https://pluspng.com/img-png/3-600.png'
from ultralytics import YOLO

#loading the pre-trained YOLO model
model = YOLO('yolov8n.pt')

results = model(source='https://pluspng.com/img-png/3-600.png',conf=0.25)
#testing how the results printed out
print(results[0].boxes.xyxy)
print(results[0].boxes.conf)
print(results[0].boxes.cls)
import cv2
import supervision as sv
from ultralytics import YOLO
import requests

#load model
model = YOLO('yolov8n.pt')

# Fetch image from URL and save it to a file
url = 'https://pluspng.com/img-png/3-600.png'
response = requests.get(url)

with open('image.png', 'wb') as f:
    f.write(response.content)

# Read the saved image file
image = cv2.imread('image.png')

results = model(image)[0]
detections = sv.Detections.from_ultralytics(results)

bounding_box_notations = sv.BoxAnnotator()
label_annotations = sv.LabelAnnotator()

annotated_image = bounding_box_notations.annotate(
    scene=image, detections=detections)
annotated_frame = label_annotations.annotate(
    scene=annotated_image, detections=detections)

sv.plot_image(annotated_frame)
## Generating weigths to balance the dataset.
# Path to our dataset annotations/labels
annotations_path = '/content/drive/replace/with/path/to/your/data'

# Function to extract labels from annotations
def extract_labels(annotations_path):
    labels = []
    for annotation_file in os.listdir(annotations_path):
        with open(os.path.join(annotations_path, annotation_file), 'r') as f:
            for line in f:
                # Split the line by spaces and take the first element as the class label
                class_label = int(line.split()[0])
                labels.append(class_label)
    return np.array(labels)

# Extract labels
y_train = extract_labels(annotations_path)

# Compute class weights
class_weights = compute_class_weight(class_weight='balanced', classes=np.unique(y_train), y=y_train)
class_weights_dict = dict(enumerate(class_weights))
print(class_weights_dict)

# Editing the contents of our data.yaml file to account for the new weights for each class
data_yaml_path = '/content/drive/replace/with/path/to/your/data/yamlfile.yaml'

# Read the data.yaml file
with open(data_yaml_path, 'r') as f:
    data_yaml = yaml.safe_load(f)

# Adding class weights to the data.yaml file
data_yaml['class_weights'] = [float(w) for w in class_weights]

# Write the updated data.yaml back to the file
with open(data_yaml_path, 'w') as f:
    yaml.dump(data_yaml, f)

print("Updated data.yaml with class weights:", data_yaml['class_weights'])

## Modifying the model
import torch
import torch.nn as nn
# Loading the pre-trained YOLOv8 model
model = YOLO('yolov8n.pt')

#using the model
model.train(
    data='/content/drive/replace/with/path/to/your/data/yamlfile.yaml',
    epochs = 100,
    patience = 5,
    batch=32,
    imgsz=640,
    device='cuda:0'
)
metrics = model.val()  # evaluating model performance on the validation set
print(f"Validation mAP: {metrics.box.map}") #Model's mean Average Presicion(mAP), can be used as a metric of accuracy
## Checking the modified model's metrics
%cd {HOME}
Image(filename=f'{HOME}/runs/detect/train/confusion_matrix.png', width=600)#confusion matrix
%cd {HOME}
Image(filename=f'{HOME}/runs/detect/train/results.png', width=600)#results of the model
## Evaluating the modified model
#loading our modified model which was saved as best.pt
model = YOLO('/content/runs/detect/train/weights/best.pt')
model.eval()
## Testing the modified model on some images it has never seen before
results = model.predict(source ='https://media.healthyfood.com/wp-content/uploads/2017/03/Ask-the-experts-Plastic-water-bottles-600x402.jpg', save=True)
for result in results:
    print(result.boxes.data)
results = model.predict(source ='https://i.ytimg.com/vi/nnOeZTqqp60/maxresdefault.jpg', save=True)
for result in results:
    print(result.boxes.data)
## Hyper parameter tuning of the modified model
import torch
from ultralytics import YOLO
from sklearn.model_selection import ParameterGrid

# Define the hyperparameters to tune
param_grid = {
    'lr': [0.001, 0.01, 0.1],
    'optimizer':['Adam','SGD'],
    'patience':[5,10,15]
}

# Generate all combinations of hyperparameters
grid = ParameterGrid(param_grid)

# Load the YOLOv8 model
model = YOLO('/content/runs/detect/train/weights/best.pt')

# Function to train and evaluate the model
def train_and_evaluate(params):

    # Train the model
    model.train(data='/content/drive/replace/with/path/to/your/data/yamlfile.yaml', epochs=100,lr0=params['lr'],batch=32,patience=params['patience'],optimizer=params['optimizer'] )

    # Evaluate the model
    results = model.val()

    # Return the performance metric (e.g., mAP)
    return results.box.map

# Iterate over all combinations of hyperparameters
best_params = None
best_score = -float('inf')

for params in grid:
    print(f"Evaluating with parameters: {params}")
    score = train_and_evaluate(params)
    print(f"Score: {score}")

    if score > best_score:
        best_score = score
        best_params = params

print(f"Best parameters: {best_params}")
print(f"Best score: {best_score}")

#From the hyper parameter tuning, we got the results
# Best parameters: {'lr': 0.001, 'optimizer': 'SGD', 'patience': 15}
#Best score: 0.9280115175379601
#which was saved at /content/runs/detect/train433333/weights/best.pt
# Validating the best model we from the hyperparameter tuning
final_model = YOLO('/content/runs/detect/train433333/weights/best.pt')
metrics = final_model.val()
print(f"Validation mAP: {metrics.box.map}") #Model mean Average Presicion(mAP), basically like it's accurcy score
## Testing the final model on new images
results = final_model.predict(source ='https://pluspng.com/img-png/3-600.png', save=True)
for result in results:
    print(result.boxes.data)
import cv2
import supervision as sv
from ultralytics import YOLO
import requests

# Fetch image from URL and save it to a file
url = 'https://pluspng.com/img-png/3-600.png'
response = requests.get(url)

with open('image.png', 'wb') as f:
    f.write(response.content)

# Read the saved image file
image = cv2.imread('image.png')

results = final_model(image)[0]
detections = sv.Detections.from_ultralytics(results)

bounding_box_notations = sv.BoxAnnotator()
label_annotations = sv.LabelAnnotator()

annotated_image = bounding_box_notations.annotate(
    scene=image, detections=detections)
annotated_frame = label_annotations.annotate(
    scene=annotated_image, detections=detections)

sv.plot_image(annotated_frame)
results = final_model.predict(source ='https://richfieldsplastics.com/wp-content/uploads/2020/02/Benefits-of-Replacing-Metal-Parts-with-Plastic.jpg', save=True)
for result in results:
    print(result.boxes.data)
import cv2
import supervision as sv
from ultralytics import YOLO
import requests

# Fetch image from URL and save it to a file
url = 'https://i.ytimg.com/vi/nnOeZTqqp60/maxresdefault.jpg'
response = requests.get(url)

with open('image.png', 'wb') as f:
    f.write(response.content)

# Read the saved image file
image = cv2.imread('image.png')

results = final_model(image)[0]
detections = sv.Detections.from_ultralytics(results)

bounding_box_notations = sv.BoxAnnotator()
label_annotations = sv.LabelAnnotator()

annotated_image = bounding_box_notations.annotate(
    scene=image, detections=detections)
annotated_frame = label_annotations.annotate(
    scene=annotated_image, detections=detections)

sv.plot_image(annotated_frame)
## Using inference to add an "other" class post process
import torch

# Set the model to evaluation mode
final_model.eval()
# Loading our test images
test_images = '/content/drive/replace/with/path/to/your/test/data'

# Run inference with no gradient calculation
with torch.no_grad():
    results = final_model(test_images)
results #checking to see what is contained in results
# extracting labels from results
for result in results:
    boxes = result.boxes  # Extract bounding boxes and scores
    labels = []
    for box in boxes:
        score = box.conf.item()
        class_id = int(box.cls.item())

        # Define a confidence threshold
        confidence_threshold = 0.5

        if score < confidence_threshold:
            labels.append('other')
        else:
            if class_id == 0:
                labels.append('metal')
            elif class_id == 1:
                labels.append('plastic')
            else:
                labels.append('other')

    print(labels)
#creating a function for the inference and annotation
import torch
import cv2
from google.colab.patches import cv2_imshow
import requests

def run_inference_and_annotate(image_path, model, confidence_threshold=0.5):
    # Read the image
    image = cv2.imread(image_path)

    # Run inference with no gradient calculation
    with torch.no_grad():
        results = model(image)

    # Process results
    annotated_boxes = []
    for result in results:
        boxes = result.boxes  # Extract bounding boxes and scores
        for box in boxes:
            bbox = box.xyxy[0].tolist()  # Convert bbox tensor to list
            score = box.conf.item()
            class_id = int(box.cls.item())

            #label is first assigned to other. If the confidence of the prediction is higher than 0.5, then the model the labels are assigned to either metal/plastic
            label = 'other'
            if score >= confidence_threshold:
                if class_id == 0:
                    label = 'metal'
                elif class_id == 1:
                    label = 'plastic'

            annotated_boxes.append((bbox, label, score))

    # Annotate the image
    for (bbox, label, score) in annotated_boxes:
        x1, y1, x2, y2 = map(int, bbox)
        cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2)
        cv2.putText(image, f'{label} {score:.2f}', (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

    return image

## TESTING

# Fetch image from URL and save it to a file
url = 'https://media.wired.com/photos/6647d8022840f013a58dfcc4/master/w_960,c_limit/Razer-Iskur-V2-Office-Chair-Reviewer-Photo-SOURCE-Julian-Chokkattu.jpg'
response = requests.get(url)

with open('image.png', 'wb') as f:
    f.write(response.content)

# Annotate image using the function
annotated_image = run_inference_and_annotate('image.png', final_model)

# Display the image
cv2_imshow(annotated_image)
import torch
import cv2
from google.colab.patches import cv2_imshow
import requests

def run_inference_and_annotate(image_path, model, confidence_threshold=0.5):
    # Read the image
    image = cv2.imread(image_path)

    # Run inference with no gradient calculation
    with torch.no_grad():
        results = model(image)

    # Process results
    annotated_boxes = []
    for result in results:
        boxes = result.boxes  # Extract bounding boxes and scores
        for box in boxes:
            bbox = box.xyxy[0].tolist()  # Convert bbox tensor to list
            score = box.conf.item()
            class_id = int(box.cls.item())

            label = 'other'
            if score >= confidence_threshold:
                if class_id == 0:
                    label = 'metal'
                elif class_id == 1:
                    label = 'plastic'

            annotated_boxes.append((bbox, label, score))

    # Annotate the image
    for (bbox, label, score) in annotated_boxes:
        x1, y1, x2, y2 = map(int, bbox)
        cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2)
        cv2.putText(image, f'{label} {score:.2f}', (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

    return image

## TESTING

img =  '/content/Ask-the-experts-Plastic-water-bottles-600x402.jpg'

# Annotate image using the function
annotated_image = run_inference_and_annotate(img, final_model)

# Display the image
cv2_imshow(annotated_image)
## Saving the model
torch.save(YOLO('/content/runs/detect/train433333/weights/best.pt'), 'Imgdetec.pt')
# Loading model to see if it saved well
loaded_model = torch.load('/content/Imgdetec.pt')
metrics = loaded_model.val()
print(f"Validation mAP: {metrics.box.map}")

 

Step 2: Interfacing the model and the arm

We developed a simple Python script that passes the label of the detected waste to an Arduino script. This python script uses the laptop webcam for viewing. In the future, we hope to use a suitable camera module instead.

#importing necessary libraries
import os
import contextlib
from ultralytics import YOLO
import serial.tools.list_ports

ports = list(serial.tools.list_ports.comports()) # List all available serial ports and get their information
port_names = [port.device for port in ports]# Extract the device name (e.g., 'COM3', 'COM4') from each port
portlist = [str(onePort) for onePort in ports]# Create a list of string representations of each port

SerialInst = serial.Serial()# Initialize the serial communication instance
SerialInst.baudrate = 9600# Set the baud rate for the serial communication
SerialInst.port = port_names[0]# Assign the first available port to the serial instance (e.g., 'COM3')
SerialInst.open()# Open the serial communication port

print(port_names)# Print the list of available port names (for debugging or informational purposes)

# Load the YOLO model
model = YOLO("C:/Users/nhyir/Documents/INTRO TO AI/Group40_FINALPROJECT_Image-Detection/Imgdetec.pt")

while True:
    print('test')
    with contextlib.redirect_stdout(open(os.devnull, 'w')):
        results = model.predict(source="0", show=True, verbose=False,stream=True)  # Set show=False if you don't want to display the predictions
        print('result')
    for result in results:
        for box in result.boxes:
            class_id = int(box.cls.item())  # Get the class ID
            print(class_id)
            label = 'other'  # Default label

            # Assign a label based on the class ID
            if class_id == 0:
                label = 'metal'
            elif class_id == 1:
                label = 'plastic'

            # Print the label of the detected object
            print(f"Detected: {label}")
            SerialInst.write(f"{label}\n".encode('utf-8'))# Write the label to the serial port. encode('utf-8') converts the string to bytes in UTF-8 encoding, which is required for serial communication

Running this model will begin open a window that displays the camera feed. The type of waste detected is passed to the Arduino IDE. It is crucial that no other application that uses a Serial port (like Serial monitor) is running at this time).

Step 3: The Robotic Arm

The robotic arm is a simple 5DOF arm. The nature of the build does not affect the functionality much, it should simply be able to pick and drop the waste accordingly in a desired range. The files can be obtained through this website: STEP file Arduino based Robot Arm 🤖 ・Model to download and 3D print・Cults (cults3d.com). The entire build is powered using a 240V to 9V adapter, stepped down to 5V using a breadboard power supply module. Any suitable and long-lasting power source which provides 5V can be supplied to the breadboard. The schematic of the connections is shown below (with motors representing the entire limb).

Step 4: Controlling the Robotic Arm

The Arduino script controls the robotic arm, specifying where the waste should be placed depending on the type of waste it is.

#include <Servo.h>
#include <stdio.h>

Servo base;  // create Servo object to control a servo
Servo limb1;  // create Servo object to control a servo
Servo limb2;  // create Servo object to control a servo
//Servo clawx;  // create Servo object to control a servo
Servo clawy;  // create Servo object to control a servo
Servo grip;  // create Servo object to control a servo


int pos;    // variable to store the servo position
int basepos;
int limb1pos;
int limb2pos;
int clawxpos;
int clawypos;
int grippos;
int baseend;
int limb1end;
int limb2end;
//int clawxend;
int clawyend;
int gripend;

int blue_led = 11;
int red_led = 12;

void setup() {
  base.attach(3); //Servo3
  limb1.attach(5); //Servo1
  limb2.attach(6); //Servo5
  //clawx.attach(9); //Servo4
  clawy.attach(9); //Servo2
  grip.attach(10); //Servo6

  pinMode(blue_led, OUTPUT);
  pinMode(red_led, OUTPUT);

  Serial.begin(9600);
}

void loop() {
  digitalWrite(blue_led, HIGH);
  delay(500);
  digitalWrite(blue_led, LOW);

  digitalWrite(red_led, HIGH);
  delay(500);
  digitalWrite(red_led, LOW);
  
  starting();
  if (Serial.available() > 0) {
      // Read the incoming byte
      String incomingData = Serial.readStringUntil('\n');
      //delay(100);
      
      if (incomingData == "metal") {
        digitalWrite(blue_led, HIGH);
        pickWaste();
        delay(500);
        dropMetal();
        delay(2000);
        digitalWrite(blue_led, LOW);

      } else if (incomingData == "plastic") {
        digitalWrite(red_led, HIGH);
        pickWaste();
        delay(500);
        dropPlastic();
        delay(500);
        digitalWrite(red_led, LOW);
      } else if (incomingData == "other"){
        digitalWrite(red_led, HIGH);
        digitalWrite(blue_led, HIGH);
        delay(500);
        digitalWrite(red_led, LOW);
        digitalWrite(blue_led, LOW);
      }
    }
}



void starting(){
  defPos(94,120,60,15,80);
}

void pickWaste(){
  defPos(94,120,60,15,120);
  delay(500);
  grabWaste();
}

void dropPlastic(){
  defPos(180,120,40,40,25);
  relWaste();
}

void dropMetal(){
  defPos(135,120,40,40,25);
  relWaste();
}

void grabWaste(){
  grippos = grip.read();
  for (pos = grippos; pos>=25; pos -= 1){
      grip.write(pos);
      delay(15);
  }
}

void relWaste(){
  grippos = grip.read();
  for (pos = grippos; pos<=120; pos += 1){
      grip.write(pos);
      delay(10);
  }
}

void defPos(int baseend,int limb1end,int limb2end,int clawyend,int gripend){
  basepos = base.read();
  if (basepos>=baseend){
    for (pos = basepos; pos>=baseend; pos -= 1){
      base.write(pos);
    }}
  else{
    for (pos = basepos; pos<=baseend; pos += 1){
      base.write(pos);
    }}

  limb1pos = limb1.read();
  if (limb1pos>=limb1end){
    for (pos = limb1pos; pos>=limb1end; pos -= 1){
      limb1.write(pos);
    }}
  else{
    for (pos = limb1pos; pos<=limb1end; pos += 1){
      limb1.write(pos);
    }}

  limb2pos = limb2.read();
  if (limb2pos>=limb2end){
    for (pos = limb2pos; pos>=limb2end; pos -= 1){
      limb2.write(pos);
      delay(10);
    }}
  else{
    for (pos = limb2pos; pos<=limb2end; pos += 1){
      limb2.write(pos);
      delay(10);
    }}

  // clawxpos = clawx.read();
  // if (clawxpos>=clawxend){
  //   for (pos = clawxpos; pos>=clawxend; pos -= 1){
  //     clawx.write(pos);
  //   }}
  // else{
  //   for (pos = clawxpos; pos<=clawxend; pos += 1){
  //     clawx.write(pos);
  //   }}

  clawypos = clawy.read();
  if (clawypos>=clawyend){
    for (pos = clawypos; pos>=clawyend; pos -= 1){
      clawy.write(pos);
    }}
  else{
    for (pos = clawypos; pos<=clawyend; pos += 1){
      clawy.write(pos);
    }}

  grippos = grip.read();
  if (grippos>=gripend){
    for (pos = grippos; pos>=gripend; pos -= 1){
      grip.write(pos);
    }}
  else{
    for (pos = grippos; pos<=gripend; pos += 1){
      grip.write(pos);
    }}
}

The original robot design was a 6 DOF robot, however, one of the degrees was redundant, hence we disconnected it.

 

A video detailing the overall process can be found here: 

Codes

Downloads

Robotic Arm Waste Segregator Download

Institute / Organization

Ashesi University
Comments
Ad