Bookmark
Cleaning service with Agents
Introduction
This file can be run on any platform supporting Python, with the necessary install permissions. This example shows how to set up a cleaning service using Agents ann uAgents library tools.
Guide
Supporting documentation
The agents
The Protocol
__init__.py
from datetime import datetime, timedelta
from typing import List
from geopy.geocoders import Nominatim
from geopy.distance import geodesic
from uagents import Context, Model, Protocol
from .models import Provider, Availability, User
PROTOCOL_NAME = "cleaning"
PROTOCOL_VERSION = "0.1.0"
class ServiceRequest(Model):
user: str
location: str
time_start: datetime
duration: timedelta
services: List[int]
max_price: float
class ServiceResponse(Model):
accept: bool
price: float
class ServiceBooking(Model):
location: str
time_start: datetime
duration: timedelta
services: List[int]
price: float
class BookingResponse(Model):
success: bool
cleaning_proto = Protocol(name=PROTOCOL_NAME, version=PROTOCOL_VERSION)
def in_service_region(
location: str, availability: Availability, provider: Provider
) -> bool:
geolocator = Nominatim(user_agent="micro_agents")
user_location = geolocator.geocode(location)
cleaner_location = geolocator.geocode(provider.location)
if user_location is None:
raise RuntimeError(f"user location {location} not found")
if cleaner_location is None:
raise RuntimeError(f"provider location {provider.location} not found")
cleaner_coordinates = (cleaner_location.latitude, cleaner_location.longitude)
user_coordinates = (user_location.latitude, user_location.longitude)
service_distance = geodesic(user_coordinates, cleaner_coordinates).miles
in_range = service_distance <= availability.max_distance
return in_range
@cleaning_proto.on_message(model=ServiceRequest, replies=ServiceResponse)
async def handle_query_request(ctx: Context, sender: str, msg: ServiceRequest):
provider = await Provider.filter(name=ctx.name).first()
availability = await Availability.get(provider=provider)
services = [int(service.type) for service in await provider.services]
markup = provider.markup
user, _ = await User.get_or_create(name=msg.user, address=sender)
msg_duration_hours: float = msg.duration.total_seconds() / 3600
ctx.logger.info(f"Received service request from user `{user.name}`")
if (
set(msg.services) <= set(services)
and in_service_region(msg.location, availability, provider)
and availability.time_start <= msg.time_start
and availability.time_end >= msg.time_start + msg.duration
and availability.min_hourly_price * msg_duration_hours < msg.max_price
):
accept = True
price = markup * availability.min_hourly_price * msg_duration_hours
ctx.logger.info(f"I am available! Proposing price: {price}.")
else:
accept = False
price = 0
ctx.logger.info("I am not available. Declining request.")
await ctx.send(sender, ServiceResponse(accept=accept, price=price))
@cleaning_proto.on_message(model=ServiceBooking, replies=BookingResponse)
async def handle_book_request(ctx: Context, sender: str, msg: ServiceBooking):
provider = await Provider.filter(name=ctx.name).first()
availability = await Availability.get(provider=provider)
services = [int(service.type) for service in await provider.services]
user = await User.get(address=sender)
msg_duration_hours: float = msg.duration.total_seconds() / 3600
ctx.logger.info(f"Received booking request from user `{user.name}`")
success = (
set(msg.services) <= set(services)
and availability.time_start <= msg.time_start
and availability.time_end >= msg.time_start + msg.duration
and msg.price <= availability.min_hourly_price * msg_duration_hours
)
if success:
availability.time_start = msg.time_start + msg.duration
await availability.save()
ctx.logger.info("Accepted task and updated availability.")
# send the response
await ctx.send(sender, BookingResponse(success=success))
The Cleaner Agent
cleaner.py
from datetime import datetime
from pytz import utc
from tortoise import Tortoise
from protocols.cleaning import cleaning_proto
from protocols.cleaning.models import Availability, Provider, Service, ServiceType
from uagents import Agent, Context
from uagents.setup import fund_agent_if_low
cleaner = Agent(
name="cleaner",
port=8001,
seed="cleaner secret phrase",
endpoint={
"http://127.0.0.1:8001/submit": {},
},
)
fund_agent_if_low(cleaner.wallet.address())
# build the cleaning service agent from the cleaning protocol
cleaner.include(cleaning_proto)
@cleaner.on_event("startup")
async def startup(_ctx: Context):
await Tortoise.init(
db_url="sqlite://db.sqlite3", modules={"models": ["protocols.cleaning.models"]}
)
await Tortoise.generate_schemas()
provider = await Provider.create(name=cleaner.name, location="London Kings Cross")
floor = await Service.create(type=ServiceType.FLOOR)
window = await Service.create(type=ServiceType.WINDOW)
laundry = await Service.create(type=ServiceType.LAUNDRY)
await provider.services.add(floor)
await provider.services.add(window)
await provider.services.add(laundry)
await Availability.create(
provider=provider,
time_start=utc.localize(datetime.fromisoformat("2022-01-31 00:00:00")),
time_end=utc.localize(datetime.fromisoformat("2023-05-01 00:00:00")),
max_distance=10,
min_hourly_price=5,
)
@cleaner.on_event("shutdown")
async def shutdown(_ctx: Context):
await Tortoise.close_connections()
if __name__ == "__main__":
cleaner.run()
The User Agent
user.py
from datetime import datetime, timedelta
from pytz import utc
from protocols.cleaning import (
ServiceBooking,
BookingResponse,
ServiceRequest,
ServiceResponse,
)
from protocols.cleaning.models import ServiceType
from uagents import Agent, Context
from uagents.setup import fund_agent_if_low
CLEANER_ADDRESS = "agent1qdfdx6952trs028fxyug7elgcktam9f896ays6u9art4uaf75hwy2j9m87w"
user = Agent(
name="user",
port=8000,
seed="cleaning user recovery phrase",
endpoint={
"http://127.0.0.1:8000/submit": {},
},
)
fund_agent_if_low(user.wallet.address())
request = ServiceRequest(
user=user.name,
location="London Kings Cross",
time_start=utc.localize(datetime.fromisoformat("2023-04-10 16:00:00")),
duration=timedelta(hours=4),
services=[ServiceType.WINDOW, ServiceType.LAUNDRY],
max_price=60,
)
MARKDOWN = 0.8
@user.on_interval(period=3.0, messages=ServiceRequest)
async def interval(ctx: Context):
ctx.storage.set("markdown", MARKDOWN)
completed = ctx.storage.get("completed")
if not completed:
ctx.logger.info(f"Requesting cleaning service: {request}")
await ctx.send(CLEANER_ADDRESS, request)
@user.on_message(ServiceResponse, replies=ServiceBooking)
async def handle_query_response(ctx: Context, sender: str, msg: ServiceResponse):
markdown = ctx.storage.get("markdown")
if msg.accept:
ctx.logger.info("Cleaner is available, attempting to book now")
booking = ServiceBooking(
location=request.location,
time_start=request.time_start,
duration=request.duration,
services=request.services,
price=markdown * msg.price,
)
await ctx.send(sender, booking)
else:
ctx.logger.info("Cleaner is not available - nothing more to do")
ctx.storage.set("completed", True)
@user.on_message(BookingResponse, replies=set())
async def handle_book_response(ctx: Context, _sender: str, msg: BookingResponse):
if msg.success:
ctx.logger.info("Booking was successful")
else:
ctx.logger.info("Booking was UNSUCCESSFUL")
ctx.storage.set("completed", True)
if __name__ == "__main__":
user.run()