Contents
Introduction
AlertMe is a mobile app that warns users of nearby construction sites, road works, or blocked paths. Users can post and review hazards, while construction workers can create alerts and confirm reports, enhancing safety. In this blog, I will walk you through how I achieved this complex System Design in detail.
Thought Process
Designing a System is always a challenging task, there are lot of considerations to be made when it comes to developing an effecient system that can handle real-time complex data structure such as Geo coordiantes and make calculations on the go to alert users of nearby hazards.
Hence, I chose Django, Django REST, Geo Django, Django Channels and Django Storages along with Docker and Nginx as the core Backend Technology.
Data Model
As we know a data model is a conceptual framework that defines structure, organization and relationships of data within a system. It serves as a blueprint that helps manage how data is stored, retrieved, and manupulated.
In Django, a data model is represented as a python class that inherits the django.db.models.Model
and represents a table in a database. Django models is an abstraction that allows us to define data structure in python code, which Django translates to SQL queries for Database Operations.
Below is the data model that powers AlertMe.
"""
Database models
"""
from django.db import models
from django.contrib.gis.db import models as gis_models
from django.contrib.gis.geos import Point
import uuid
from django.contrib.auth.models import (
AbstractBaseUser,
BaseUserManager,
PermissionsMixin,
)
from django.db.models.signals import post_save
from django.dispatch import receiver
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
# Options for notification types
NOTIFICATION_TYPE_CHOICES = [
('BROADCAST', 'Broadcast'),
('SINGLE', 'Single')
]
# Choices for the "status" field in the Incident model
INCIDENT_STATUS_CHOICES = [
("ACTIVE", "Active"),
("PENDING", "Pending"),
("FIXING", "Fixing"),
("RESOLVED", "Resolved"),
("REJECTED", "Rejected"),
]
# Choices for the "reported_by" field in the Incident model
REPORTED_BY_CHOICES = [
("USER", "User"),
("ORG", "Org"),
]
# Model representing an organization with fields
class Organization(models.Model):
_id = models.UUIDField(
primary_key=True, default=uuid.uuid4, editable=False)
name = models.CharField(max_length=100)
email = models.EmailField(max_length=255, unique=True)
address = models.JSONField(default=dict)
coordinates = gis_models.PointField(blank=True, null=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
is_active = models.BooleanField(default=True)
def __str__(self):
return f'id: {self._id} | name: {self.name}'
class Meta:
ordering = ["-name"]
db_table = "organizations"
# Model representing a project associated with an organization
class Project(models.Model):
_id = models.UUIDField(
primary_key=True, default=uuid.uuid4, editable=False)
name = models.CharField(max_length=100)
address = models.JSONField(default=dict)
description = models.CharField(max_length=500)
coordinates = gis_models.PointField(blank=True, null=True)
organization = models.ForeignKey(
Organization, on_delete=models.PROTECT, null=False
)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
is_active = models.BooleanField(default=True)
def __str__(self):
return f'id: {self._id} | name: {self.name}'
class Meta:
ordering = ["-name"]
db_table = "projects"
# Custom manager for the User model
class UserManager(BaseUserManager):
"""Manager for users"""
def create_user(self, email, password=None, **extra_fields):
"""Create, save and return a new user"""
user = self.model(email=email, **extra_fields)
user.set_password(password)
user.save(using=self._db)
return user
def create_superuser(self, email, password=None, **extra_fields):
extra_fields.setdefault("is_staff", True)
extra_fields.setdefault("is_superuser", True)
if extra_fields.get("is_staff") is not True:
raise ValueError("Superuser must have is_staff=True.")
if extra_fields.get("is_superuser") is not True:
raise ValueError("Superuser must have is_superuser=True.")
return self.create_user(email, password, **extra_fields)
# Custom user model with fields for user profile information
class User(AbstractBaseUser, PermissionsMixin):
"""User in the system"""
_id = models.UUIDField(
primary_key=True, default=uuid.uuid4, editable=False)
picture = models.FileField(upload_to="user_images/", blank=True)
name = models.CharField(max_length=25)
email = models.EmailField(max_length=255, unique=True)
phone = models.CharField(max_length=15)
address = models.JSONField(default=dict)
coordinates = gis_models.PointField(
default=Point(0.0, 0.0), blank=True, null=True)
roaming_coordinates = gis_models.PointField(
default=Point(0.0, 0.0), blank=True, null=True)
alert_radius = models.FloatField(default=3)
notification = models.JSONField(default=dict, blank=True, null=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
project = models.ForeignKey(
Project,
on_delete=models.SET_NULL,
null=True,
blank=True
)
points = models.IntegerField(default=0)
password = models.CharField(max_length=128, blank=True, null=True)
is_active = models.BooleanField(default=True)
is_staff = models.BooleanField(default=False)
is_superuser = models.BooleanField(default=False)
def get_project_id(self):
return self.project._id if self.project else ''
def __str__(self):
coords = self.roaming_coordinates
return f'id: {self._id} | name: {self.name} \
| email: {self.email} \
{"| ORG" if self.is_staff else ""} \
| coords: {coords.y}, {coords.x}'
class Meta:
ordering = ["-name"]
db_table = "users"
objects = UserManager()
USERNAME_FIELD = "email"
# Model representing incident categories
class IncidentCategory(models.Model):
_id = models.UUIDField(
primary_key=True, default=uuid.uuid4, editable=False)
name = models.CharField(max_length=25)
icon = models.FileField(upload_to="incident_category_icons/")
description = models.CharField(max_length=200)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
def __str__(self):
return f'id: {self._id} | name: {self.name}'
class Meta:
ordering = ["created_at"]
db_table = "incident_categories"
verbose_name = "Incident Category"
verbose_name_plural = "Incident Categories"
# Model for storing incident images
class IncidentImage(models.Model):
_id = models.UUIDField(
primary_key=True, default=uuid.uuid4, editable=False)
image = models.FileField(upload_to="incident_images/")
class Meta:
db_table = "incident_images"
# Model representing an incident reported by users
class Incident(models.Model):
_id = models.UUIDField(
primary_key=True, default=uuid.uuid4, editable=False)
user = models.ForeignKey(User, on_delete=models.PROTECT)
project = models.ForeignKey(
Project, on_delete=models.PROTECT, blank=True, null=True
)
images = models.ManyToManyField(
IncidentImage, related_name="incidents", blank=True
)
incident_category = models.ForeignKey(
IncidentCategory, on_delete=models.PROTECT)
subject = models.CharField(max_length=30)
description = models.CharField(max_length=200)
coordinates = gis_models.PointField(blank=True, null=True)
address = models.JSONField(default=dict)
upvote_count = models.IntegerField(default=0)
report_count = models.IntegerField(default=0)
status = models.CharField(
max_length=10,
choices=INCIDENT_STATUS_CHOICES,
default=INCIDENT_STATUS_CHOICES[0][0],
)
is_accepted_by_org = models.BooleanField(default=False)
is_internal_for_org = models.BooleanField(default=False)
is_active = models.BooleanField(default=True)
reported_by = models.CharField(
max_length=4,
choices=REPORTED_BY_CHOICES,
default=REPORTED_BY_CHOICES[0][0]
)
voters = models.ManyToManyField(
User,
related_name="voted_incidents",
blank=True,
)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
def __str__(self):
coords = self.coordinates
return f'id: {self._id} | subject: {self.subject} \
| created by: {self.user.name} \
({ "ORG" if self.user.is_staff else "USER"}) \
| coords: {coords.y}, {coords.x}'
class Meta:
ordering = ["-created_at"]
db_table = "incidents"
# Model representing a notification list for users
class NotificationList(models.Model):
_id = models.UUIDField(
primary_key=True, default=uuid.uuid4, editable=False)
type = models.CharField(
max_length=10,
choices=NOTIFICATION_TYPE_CHOICES
)
user = models.ForeignKey(User, on_delete=models.PROTECT)
incident = models.ForeignKey(Incident, on_delete=models.PROTECT)
coordinates = gis_models.PointField(blank=True, null=True)
title = models.CharField(max_length=70)
subject = models.CharField(max_length=30)
description = models.CharField(max_length=200)
created_at = models.DateTimeField(auto_now_add=True)
def __str__(self):
coords = self.coordinates
return f'id: {self._id} | subject: {self.subject} \
| coords: {coords.y}, {coords.x}'
class Meta:
ordering = ["-created_at"]
db_table = "notifications_list"
verbose_name = "Notification List"
verbose_name_plural = "Notifications List"
# Notification trigger
@receiver(post_save, sender=NotificationList)
def trigger_notification(sender, instance, created, **kwargs):
if created:
channel_layer = get_channel_layer()
if instance.type == 'BROADCAST':
group_name = 'broadcast_notification'
else:
group_name = f"user_{instance.user._id}"
async_to_sync(channel_layer.group_send)(
group_name,
{
'type': 'send_notification',
'notification': {
'id': str(instance._id),
'type': instance.type,
'incident_id': str(instance.incident._id),
'user_id': str(instance.user._id),
'title': instance.title,
'subject': instance.incident.subject,
'description': instance.incident.description,
'lat': instance.coordinates.x,
'lng': instance.coordinates.y,
'created_at': str(instance.created_at)
}
}
)
Geo Django
GeoDjango is a powerful extension of Django designed to simplify the development of geographic based applications. Built into Django’s core framework, GeoDjango provides comprehensive support for geospatial data and enables developers to perform advanced spatial queries and analytics with ease. It leverages robust libraries like GEOS, GDAL, and PostGIS to offer tools for storing, querying, and manipulating spatial data in various formats, such as points, lines, polygons, and multi-polygons. This makes it ideal for applications that require geographical computations, like mapping services, location-based applications, or geographic information systems (GIS). With GeoDjango, you can effortlessly integrate features like proximity searches, route optimizations, and spatial data visualization into your Django project.
Below is a snippet of how we query to get the nearby hazard reports showing how Geo Django helps us query using points effectively.
class IncidentNearbyView(APIView):
def get(self, request):
try:
user_lat = float(request.GET.get("lat"))
user_lng = float(request.GET.get("lng"))
user_point = Point(user_lng, user_lat, srid=4326)
user_info = request.user_info
# Get user object
user = get_user_model().objects\
.get(_id=user_info.get('_id'))
# Find the nearby incident within alert_radius provided by
# user, fetch only ACTIVE, PENDING, FIXING status objects
nearby_incidents_qs = Incident.objects.filter(
coordinates__distance_lte=(
user_point, D(km=user.alert_radius)),
is_active=True,
status__in=['ACTIVE', 'PENDING', 'FIXING']
).annotate(
distance=Distance('coordinates', user_point)
).order_by('distance')
# Format to JSON and create array of nearby incidents
nearby_incidents = []
for incident in nearby_incidents_qs:
nearby_incidents.append(format_incident_data(incident))
return JsonResponse({
"message": Messages.SUCCESS,
"data": nearby_incidents,
"error": False,
"status": HTTPStatus.OK,
}, status=HTTPStatus.OK)
# General exception
except Exception as e:
print(e)
return JsonResponse({
"message": Messages.ERROR,
"data": None,
"error": True,
"status": HTTPStatus.INTERNAL_SERVER_ERROR,
}, status=HTTPStatus.OK)
Authentication
Securing API endpoints is crucial to prevent unauthorized access and protect sensitive data. One popular method for API authentication is using JSON Web Tokens (JWT). JWT is a compact, URL-safe token that can securely transmit information between two parties.
What is JWT?
JWT (JSON Web Token) is an open standard (RFC 7519) for securely transmitting information between parties. JWTs can be signed using a secret (with HMAC) or a public/private key pair using RSA or ECDSA.
Below is the custom middleware for JWT token verification for AlertMe. We used PyJWT to generate and decode the tokens. The helper functions add a layer of abstraction here. But we will discuss about this and JWTs in detail, some other time.
from django.http import JsonResponse
from .utils import decode_jwt_token
from http import HTTPStatus
from django.urls import resolve
from .messages import Messages
from django.contrib.auth import (
get_user_model
)
class JWTMiddleware:
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
try:
# Get auth token from headers
auth_header = request.headers.get('Authorization', '')
# URL patterns that should skip authentication
excluded_paths = ['/v1/users/login', '/v1/users/signup']
# URL patterns for admin dashboard
path_info = request.path_info
resolved_path = resolve(path_info)
# Check if current path is in excluded_paths or admin paths
if (
request.path_info in excluded_paths or
(resolved_path and resolved_path.namespace == 'admin')
):
return self.get_response(request)
if auth_header and auth_header.startswith('Bearer '):
token = auth_header.split(' ')[1]
response = decode_jwt_token(token)
# Check error
if response.get('error'):
return JsonResponse(response, status=HTTPStatus.OK)
# Get decoded data from token
data = response.get('data')
# Get user object only if is_active is True
user = get_user_model().objects\
.filter(_id=data.get('_id'), is_active=True).first()
# If user account was deactivated / soft delete
if not user:
return JsonResponse({
'message': Messages.ERROR_ACCOUNT_INACTIVE,
'data': None,
'error': True,
'status': HTTPStatus.UNAUTHORIZED
}, status=HTTPStatus.OK)
# Attach user info
request.user_info = {
"_id": user._id,
"email": user.email,
"project_id": user.get_project_id(),
"is_staff": user.is_staff,
}
return self.get_response(request)
else:
return JsonResponse({
'message': Messages.ERROR_UNAUTHORIZED,
'data': None,
'error': True,
'status': HTTPStatus.UNAUTHORIZED
}, status=HTTPStatus.OK)
# General exception
except Exception:
return JsonResponse({
'message': Messages.ERROR_UNAUTHORIZED,
'data': None,
'error': True,
'status': HTTPStatus.UNAUTHORIZED
}, status=HTTPStatus.OK)
WebSocket
Django Channels extends Django to handle real-time functionality like WebSockets, long polling, and other asynchronous protocols. This powerful framework enables you to build responsive, real-time applications having live notifications for example. At its core, Django Channels is built on top of ASGI (Asynchronous Server Gateway Interface), an evolution of the traditional WSGI, allowing for asynchronous handling of requests. This means Django Channels can process multiple requests concurrently, offering significant performance improvements for I/O-bound tasks. By integrating Django Channels into your project, you can maintain the robust and secure Django stack while embracing the modern web’s demand for instantaneous, interactive user experiences.
Below is the consumer implementation done for AlerMe’s live notifications.
import json
import threading
from http import HTTPStatus
from channels.generic.websocket import AsyncWebsocketConsumer
from django.utils import timezone
from django.contrib.gis.geos import Point
from django.contrib.gis.db.models.functions import Distance
from django.contrib.gis.measure import D
from urllib.parse import parse_qs
class NotificationConsumer(AsyncWebsocketConsumer):
def get_user(self):
from django.contrib.auth import (
get_user_model
)
from core.utils import decode_jwt_token
self.user = ''
query_string = self.scope['query_string'].decode()
params = parse_qs(query_string)
token = params.get('token', [None])[0]
# Check if token is present
if token:
response = decode_jwt_token(token)
# Check error
if not response.get('error'):
# Get decoded token info
data = response.get('data')
# Attach user info to the scope
self.scope['user_info'] = data
user_id = data.get('_id')
# Return the user object
self.user = get_user_model().objects.filter(
_id=user_id, is_active=True).first()
def update_user_coordinates(self):
self.user.roaming_coordinates = self.coordinates
self.user.save()
async def connect(self):
thread = threading.Thread(target=self.get_user)
thread.start()
thread.join()
if not self.user:
await self.close()
return
self.group_broadcast_name = 'broadcast_notification'
self.group_single_name = f'user_{self.user._id}'
# Register channel for single notifications
await self.channel_layer.group_add(
self.group_single_name,
self.channel_name
)
# Register channel for broadcast notifications
await self.channel_layer.group_add(
self.group_broadcast_name,
self.channel_name
)
# Accept request
await self.accept()
async def disconnect(self, close_code):
if self.user:
await self.channel_layer.group_discard(
self.group_single_name,
self.channel_name
)
await self.channel_layer.group_discard(
self.group_broadcast_name,
self.channel_name
)
async def receive(self, text_data):
coordinates = json.loads(text_data)
self.coordinates = Point(
coordinates.get('lng'),
coordinates.get('lat'),
srid=4326
)
thread = threading.Thread(target=self.update_user_coordinates)
thread.start()
thread.join()
async def send_notification(self, event):
notification = event['notification']
check_single = notification.get('type') == 'SINGLE'
check_broadcast = notification.get('type') == 'BROADCAST'
check_current_user = notification.get('user_id') == str(self.user._id)
check_not_current_user = notification.get('user_id') != str(self.user._id)
# Single notification
if check_single and check_current_user:
await self.send(text_data=json.dumps({
'notification': notification
}))
# Broadcast notification
elif check_broadcast and check_not_current_user:
from django.contrib.gis.geos import Point
from django.contrib.gis.measure import Distance
# Check if user alert radius is within the range for notification
lat = notification.get('lat')
lng = notification.get('lng')
notification_point = Point(lng, lat, srid=4326)
user_point = self.user.roaming_coordinates
distance = user_point.distance(notification_point)
distance_in_km = Distance(m=distance).km
user_alert_radius = self.user.alert_radius
if user_alert_radius >= distance_in_km:
await self.send(text_data=json.dumps({
'notification': notification
}))
Nginx
We configured Nginx to work as a reverse proxy
Using Nginx as a reverse proxy is an excellent choice for handling both HTTP and WebSocket connections. Nginx is a high-performance web server that efficiently manages incoming traffic, distributes requests to appropriate backend services, and maintains persistent connections required for WebSocket communication. When setting up Nginx as a reverse proxy for a Django API, you configure it to listen for HTTP requests on a specific port (e.g., 80 for HTTP or 443 for HTTPS) and route these requests to the Django application server, such as one running Daphne. For WebSocket connections, Nginx can be configured to handle the Upgrade
and Connection
headers necessary to switch from HTTP to WebSocket, ensuring a seamless and persistent connection between the client and the server. This dual functionality makes Nginx a robust and versatile tool in modern API architectures, allowing it to manage both standard HTTP API traffic and real-time WebSocket communications effectively, providing a secure, scalable, and reliable infrastructure for any Django-REST based application.
Below is the Nginx config done for AlertMe
events {}
http {
server {
listen 80;
server_name api.alertme.tech www.api.alertme.tech;
return 301 https://$host$request_uri;
}
server {
listen 443 ssl;
server_name api.alertme.tech www.api.alertme.tech;
client_max_body_size 25M;
ssl_certificate /etc/letsencrypt/live/api.alertme.tech-0002/fullchain.pem;
ssl_certificate_key /etc/letsencrypt/live/api.alertme.tech-0002/privkey.pem;
location / {
proxy_pass http://app:8000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
location /ws/ {
proxy_pass http://app:8000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
}
}
I want to extend a heartfelt thank you to all of you who took the time to read this blog post.