Cutting through the smog: making an air quality bot with Haskell

Short and long-term exposure to air pollution can result in significant health problems. When air quality is considered unhealthy we should avoid certain activities, which poses a question that bears asking: how can we get notified when air quality is poor? This post is an attempt at solving this.

In this tutorial, I want to show you how a chatbot can help you reduce your pollution intake, and how you can build your own chatbot in Haskell using the line-bot-sdk. This tutorial assumes that you have some familiarity with Haskell.

The idea behind this chatbot is simple: users share their location, and the bot reads publicly available pollution data from local monitoring stations. If the air is unhealthy, we push a message to the users to let them know.

Why use a chatbot?

Chatbots allow users to have a smooth interaction with your service, and it’s relatively easy to integrate a chatbot with messaging apps. Using a chatbot also has the additional benefit that not only users but also groups can interact with your chatbot. For example, in this tutorial, you will learn how to simply drop a bot into your family chatroom and after that you and your relatives will get promptly notified when air quality is poor in the places you care about.

line-bot-sdk

We are going to be using the line-bot-sdk, a Haskell SDK for the LINE messaging platform. You can read an overview of the LINE Messaging API here. This blog post was generated from literate Haskell sources. For those who prefer to read the code, an extraction version can be found here.

The line-botsdk uses the servant framework, a set of packages for declaring web APIs at the type-level:

import Servant

We will be using some GHC extensions throughout this tutorial, e.g., -XOverloadedStrings-XRecordWildCards , -XNamedFieldPuns and some others that we will introduce as we proceed.

Parsing measurement data

Taiwan’s Environmental Protection Administration monitors air pollution in major cities and counties across Taiwan. They have a public API with the latest registered air pollution information as shown below.

curl https://opendata.epa.gov.tw/ws/Data/AQI/?\$format=json | jq .
[
 {
  "SiteName": "基隆",
  "County": "基隆市",
  "AQI": "40",
  "Pollutant": "",
  "Status": "良好",
  "SO2": "3.7",
  "CO": "0.24",
  "CO_8hr": "0.2",
  "O3": "44",
  "O3_8hr": "43",
  "PM10": "25",
  "PM2.5": "10",
  "NO2": "",
  "NOx": "",
  "NO": "",
  "WindSpeed": "1.1",
  "WindDirec": "90",
  "PublishTime": "2019-04-29 16:00",
  "PM2.5_AVG": "12",
  "PM10_AVG": "27",
  "SO2_AVG": "2",
  "Longitude": "121.760056",
  "Latitude": "25.129167"
 }
]

This API returns a JSON array with data measured from all the monitoring stations in Taiwan, typically updated every hour. An AQI number under 100 signifies good or acceptable air quality, while a number over 100 is cause for concern. Among the reported pollutants there are particulate matter, ground level ozone, carbon monoxide, and sulfur dioxide.

We will additionally need the location of the measurement, which we will use to find the closest available data to our users:

import Data.Text (Text)

data AQData = AQData
  { aqi :: Int
  , county :: Text
  , lat :: Double
  , lng :: Double
  , status :: Text
  , pm25 :: Int
  , pm10 :: Int
  , o3 :: Int
  , co :: Double
  , so2 :: Double
  }
  deriving (Eq, Show)

However, first, we need to do some data preprocessing. Note that all the JSON fields are strings, but our AQData type requires numeric values. And, there are some data points missing relevant details, such as the AQI or the location:

{
 "SiteName": "彰化(大城)",
 "County": "彰化縣",
 "AQI": "",
 "Pollutant": "",
 ...
}

So, we need to remove such data points, since they amount to noise. One possible way to do this, is by wrapping [AQData] in a newtype:

newtype AQDataResult = AQDataResult { result :: [AQData] }

And then provide an instance of the FromJSON class to decode and filter bad values:

instance FromJSON AQDataResult where
  parseJSON = undefined

However, there is another possibility. FromJSON has another method to parse a JSON value into a list of values.

parseJSONList :: Value -> Parser [a]
import Data.Aeson
import Control.Monad (forM)
import qualified Data.Vector as V (toList)
import Data.Maybe (catMaybes)

instance FromJSON AQData where
 parseJSONList = withArray "[AQData]" $ \arr ->
   catMaybes <$> forM (V.toList arr) parseAQData
 parseJSON _ = fail "not an array"

Array items go through parseAQData. Here the MaybeT monad transformer produces a value only if all items are present:

import Data.Aeson.Types
import Control.Monad.Trans.Maybe (runMaybeT, MaybeT(..))
import Text.Read (readMaybe)
import Control.Monad.Trans.Class (lift)

parseAQData :: Value -> Parser (Maybe AQData)
parseAQData = withObject "AQData" $ \o -> runMaybeT $ do
  aqi    <- MaybeT $ readMaybe <$> o .: "AQI"
  county <- lift   $               o .: "County"
  lat    <- MaybeT $ readMaybe <$> o .: "Latitude"
  lng    <- MaybeT $ readMaybe <$> o .: "Longitude"
  status <- lift   $               o .: "Status"
  pm25   <- MaybeT $ readMaybe <$> o .: "PM2.5"
  pm10   <- MaybeT $ readMaybe <$> o .: "PM10"
  o3     <- MaybeT $ readMaybe <$> o .: "O3"
  co     <- MaybeT $ readMaybe <$> o .: "CO"
  so2    <- MaybeT $ readMaybe <$> o .: "SO2"
  return AQData {..}

We then use catMaybes :: [Maybe a] -> [a] function to weed out the Nothings and return a list of AQData. Now that we have a FromJSON instance, we can write a client function to call this API:

import Control.Exception (try)
import Network.HTTP.Simple (httpJSON, HttpException, getResponseBody)

getAQData :: IO [AQData]
getAQData = do
  eresponse <- try $ httpJSON opendata
  case eresponse of
    Left e -> do
      print (e :: HttpException)
      getAQData -- retry
    Right response -> return $ getResponseBody response
  where
    opendata = "https://opendata.epa.gov.tw/ws/Data/AQI?$format=json"

Here we only intercept exceptions of type HTTPException. For simplicity we just retry if the request fails, in practice you should inspect the error and implement retries with exponential backoff.

Distance between two geo points

We want our bot to notify users of unhealthy air in the regions where they live and work, so first we need to know which monitor is the closest to the users. For that, we will use the harvesine formula, which determines the great-circle distance between two points on a sphere given their longitudes and latitudes.

First let’s define a type alias for latitude and longitude pairs (in degrees):

type Coord = (Double, Double)

With distance, we can calculate the distance in kilometers between any two given geo points.

import Data.Bifunctor (bimap)

distRad :: Double -> Coord -> Coord -> Double
distRad radius (lat1, lng1) (lat2, lng2) = 2 * radius * asin (min 1.0 root)
 where
   hlat = hsin (lat2 - lat1)
   hlng = hsin (lng2 - lng1)
   root = sqrt (hlat + cos lat1 * cos lat2 * hlng)
   hsin = (^ 2) . sin . (/ 2) -- harvesine of an angle

distDeg :: Double -> Coord -> Coord -> Double
distDeg radius p1 p2 = distRad radius (deg2rad p1) (deg2rad p2)
 where
   d2r = (/ 180) . (* pi)
   deg2rad = bimap d2r d2r

distance :: Coord -> Coord -> Double
distance = distDeg 6371 -- mean earth radius

Once we have a function to measure the distance between two geo-points, the only remaining task is finding the closest data point to a given location (e.g., a user location):

import Data.List.Extra (minimumOn)

getCoord :: AQData -> Coord
getCoord AQData{..} = (lat, lng)

closestTo :: [AQData] -> Coord -> AQData
closestTo xs coord = (distance coord . getCoord) `minimumOn` xs

minimumOn :: Ord b => (a -> b) -> [a] -> a is defined in the package extra.

App environment

Most of the computations we are going to define require reading values from a shared environment:

import Line.Bot.Webhook.Events (Source)
import Line.Bot.Types (ChannelToken, ChannelSecret)
import Control.Concurrent.STM.TVar (TVar)

data Env = Env
  { token  :: ChannelToken
  , secret :: ChannelSecret
  , users  :: TVar [(Source, Coord)]
  }

This way we can pass around the channel token, required to make calls to the LINE platform,  and the channel secret, to validate webhook events. We also need the list of users, which are represented as (Source, Coord). Source is defined in Line.Bot.Webhook.Events and it contains the ID of the user, group or room where push messages will be sent. The user list will be concurrently read and updated from different threads, so we store it here in a mutable variable, using Control.Concurrent.STM.TVar from the stm package [1]

We are going to use mtl type classes instead of a concrete monad transformer stack for this tutorial [2], with functions being polymorphic in their effect type. One benefit of this approach is that type constraints clearly express (and enforce) which effects can take place, and as a bonus will give us more options for composition.

import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Reader (MonadReader, ask)

We will also need the -XFlexibleContexts extension to allow types in addition to type variables in type constraints.

Handling webhook events

When an event is triggered, such as when our bot joins a chatroom, an HTTP POST request is sent to our registered webhook URL with the channel. Here we are interested in three types of events (other events are ignored):

  • when our bot is added as a friend (or unblocked)
  • when our bot joins a group or room
  • when our bot receives a location message from a user
{-# LANGUAGE LambdaCase #-}


import Line.Bot.Webhook.Events (Event (EventFollow, EventJoin, EventMessage),
                                Message (MessageLocation))

webhook :: (MonadReader Env m, MonadIO m) => [Event] -> m NoContent
webhook events = do
  forM_ events $ \case
    EventFollow  {..} -> askLoc replyToken
    EventJoin    {..} -> askLoc replyToken
    EventMessage { message = MessageLocation {..}
                 , source = source }    
                      -> addUser source (latitude, longitude)
    _                 -> return ()
  return NoContent

For the first two, we reply with a text message that contains a quick reply button, with a location action: this allows the users to easily share their location for air monitoring.

We are using Line.Bot.Types.ReplyToken, which is included in events that can be replied:

replyMessage :: ReplyToken -> [Message] -> Line NoContent

Line is the monad to send requests to the LINE bot platform.

import Line.Bot.Client (replyMessage, runLine)
import Line.Bot.Types (ActionLocation, MessageText, QuickReply,
                       QuickReplyButton, ReplyToken)

askLoc :: (MonadReader Env m, MonadIO m) => ReplyToken -> m ()
askLoc rt = do
  Env {token} <- ask
  _ <- liftIO $ runLine comp token
  return ()
  where
    welcome = "Where are you?"
    qr = QuickReply [QuickReplyButton Nothing (ActionLocation "location")]
    comp = replyMessage rt [MessageText welcome (Just qr)]

MessageText is a data constructor from Line.Bot.Types.Message. All messages can be sent with an optional QuickReply; Quick replies allow users to select from a predefined set of possible replies, see here for more details on using quick replies.

Finally runLine runs the given request with the channel token from the environment [3]:

runLine :: Line a -> ChannelToken -> IO (Either ClientError a)

Once we receive a location message event, we add the user and their location to the shared list of users:

import Control.Concurrent.STM.TVar (modifyTVar)
import Control.Monad.STM (atomically)
import Line.Bot.Webhook.Events (Source)

addUser :: (MonadReader Env m, MonadIO m) => Source -> Coord -> m ()
addUser source coord = do
  Env {users} <- ask
  liftIO $ atomically $ modifyTVar users ((source, coord) : )
  return ()

We add the source of the event, so if the message was sent from a group, we will notify the group, not the user who shared the location.

Note that for a real-world chatbot you should handle dual events like unfollow or leave.

Serving the webhook: WAI application

To serve our webhook API we need to produce a WAI app.

The line-bot-sdk exports a type synonym defined in Line.Bot.Webhook that encodes the LINE webhook API:

newtype Events :: Events { events :: [Event] }
type Webhook = LineReqBody '[JSON] Events :> Post '[JSON] NoContent

The LineReqBody combinator will validate that incoming requests originate from the LINE platform.

aqServer :: ServerT Webhook (ReaderT Env Handler)
aqServer = webhook . events

Servant handlers run by default in the Handler monad. In order to let our webhook handler to read the environment Env (which is enforced by the type constraint in webhook) we are going to stack the Reader monad.

It is beyond the scope of this tutorial to cover the nuts and bolts of the Servant web framework, which are already well-covered (e.g., in the servant tutorials).

{-# LANGUAGE DataKinds #-}

import Servant.Server (Context ((:.), EmptyContext))

api = Proxy :: Proxy Webhook
ctx = Proxy :: Proxy '[ChannelSecret]

app :: MonadReader Env m => m Application
app = ask >>= \env ->
  let server = hoistServerWithContext api ctx (`runReaderT` env) aqServer
  in return $ serveWithContext api (secret env :. EmptyContext) server

The final step is to turn our aqServer into a WAI Application.

Servant allows to pass values to combinators by using a Context. The LineReqBody combinator requires a Context with the channel secret. This is enforced by the type-level list '[ChannelSecret].

Periodic updates

We previously defined getAQData, which is an IO action that returns the list of (valid) data points. Our goal now is to call this API every hour to get the latest measured data and map it to our users, based on the location:

import Line.Bot.Client (runLine)
import Control.Monad (forM_)
import Control.Concurrent.STM.TVar (readTVar)
import Control.Monad.STM (atomically, retry)

processAQData :: (MonadReader Env m, MonadIO m) => m ()
processAQData = do
  Env {users, token} <- ask
  users' <- liftIO $ atomically $ do
    xs <- readTVar users
    case xs of
      [] -> retry
      _ -> return xs
  liftIO $ getAQData >>= \aqData ->
    let users'' = [(user, aqData `closestTo` coord) | (user, coord) <- users']
    in forM_ users'' $ flip runLine token . notifyChat

processAQData does several things [4]

  • read the list stored in the transactional variable users in the environment: if the list is empty, retry, blocking the thread until users are added
  • call getAQData to get the most recently available air quality data
  • we then run a list comprehension where we map each user, of type (Source, Coord) to (Source, AQData)
  • for each user, call notifyChat
import Line.Bot.Client (Line, pushMessage)
import Line.Bot.Webhook.Events (Source)

notifyChat :: (Source, AQData) -> Line NoContent
notifyChat (Source a, x)
  | unhealthy x = pushMessage a [mkMessage x]
  | otherwise = return NoContent

To alert users, we will push messages to those users whose closest monitoring station reports an AQI over 100:

unhealthy :: AQData -> Bool
unhealthy AQData{..} = aqi > 100

processAQData needs to be called periodically, at least once every hour. We will run it in a separate thread, so that it runs concurrently with our webhook server:

import Control.Concurrent.Lifted (fork, threadDelay)
import Control.Monad (forever)
import Control.Monad.Trans.Control (MonadBaseControl)

loop :: (MonadReader Env m, MonadIO m, MonadBaseControl IO m) => m ()
loop = do
  fork $ forever $ do
    processAQData
    threadDelay (3600 * 10^6)
    return ()

Air quality alerts

To inform users of pollution levels, we will use a Flex Message, which are messages with customizable layouts written in JSON format.

LINE chat
import Line.Bot.Types (Message (MessageFlex))

mkMessage :: AQData -> Message
mkMessage x = MessageFlex "air quality alert!" (flexContent x) Nothing 

A Flex message is constructed from an alternative text (for clients not supporting the feature), a Data.Aeson.Value which contains the message layout and content, and an optional quick reply:

MessageFlex :: Text -> Value -> Maybe QuickReply -> Message

To design the layout of the alert message we used the Flex Message Simulator. We will use the JSON quasiquoter aesonQQ, which converts (at compile time) a string representation of a JSON value into a Value:

{-# LANGUAGE QuasiQuotes #-}

import Data.Aeson.QQ (aesonQQ)
import Data.Aeson.Types

flexContent :: AQData -> Value
flexContent AQData{..} = [aesonQQ|
  {
    "type": "bubble",
    "styles": {
      "footer": {
        "separator": true
      }
    },
    "header": {
      "type": "box",
      "layout": "vertical",
      "contents": [
        {
          "type": "text",
          "text": "AIR QUALITY ALERT",
          "weight": "bold",
          "size": "xl",
          "color": "#ff0000",
          "margin": "md"
        },
        {
          "type": "text",
          "text": "Unhealthy air reported in your area",
          "size": "xs",
          "color": "#aaaaaa",
          "wrap": true
        }
      ]
    },
    "body": {
      "type": "box",
      "layout": "vertical",
      "contents": [
        {
          "type": "box",
          "layout": "vertical",
          "margin": "xxl",
          "spacing": "sm",
          "contents": [
            {
              "type": "box",
              "layout": "horizontal",
              "contents": [
                {
                  "type": "text",
                  "text": "County",
                  "size": "sm",
                  "color": "#555555",
                  "flex": 0
                },
                {
                  "type": "text",
                  "text": #{county},
                  "size": "sm",
                  "color": "#111111",
                  "align": "end"
                }
              ]
            },
            {
              "type": "box",
              "layout": "horizontal",
              "contents": [
                {
                  "type": "text",
                  "text": "Status",
                  "size": "sm",
                  "color": "#555555",
                  "flex": 0
                },
                {
                  "type": "text",
                  "text": #{status},
                  "size": "sm",
                  "color": "#111111",
                  "align": "end"
                }
              ]
            },
            {
              "type": "box",
              "layout": "horizontal",
              "contents": [
                {
                  "type": "text",
                  "text": "AQI",
                  "weight": "bold",
                  "size": "sm",
                  "color": "#ff0000",
                  "flex": 0
                },
                {
                  "type": "text",
                  "text": #{show aqi},
                  "weight": "bold",
                  "size": "sm",
                  "color": "#ff0000",
                  "align": "end"
                }
              ]
            },
            {
              "type": "box",
              "layout": "horizontal",
              "contents": [
                {
                  "type": "text",
                  "text": "PM2.5",
                  "size": "sm",
                  "color": "#555555",
                  "flex": 0
                },
                {
                  "type": "text",
                  "text": #{show pm25},
                  "size": "sm",
                  "color": "#111111",
                  "align": "end"
                }
              ]
            },
            {
              "type": "box",
              "layout": "horizontal",
              "contents": [
                {
                  "type": "text",
                  "text": "PM10",
                  "size": "sm",
                  "color": "#555555",
                  "flex": 0
                },
                {
                  "type": "text",
                  "text": #{show pm10},
                  "size": "sm",
                  "color": "#111111",
                  "align": "end"
                }
              ]
            },
            {
              "type": "box",
              "layout": "horizontal",
              "contents": [
                {
                  "type": "text",
                  "text": "O3",
                  "size": "sm",
                  "color": "#555555",
                  "flex": 0
                },
                {
                  "type": "text",
                  "text": #{show o3},
                  "size": "sm",
                  "color": "#111111",
                  "align": "end"
                }
              ]
            },
            {
              "type": "box",
              "layout": "horizontal",
              "contents": [
                {
                  "type": "text",
                  "text": "CO",
                  "size": "sm",
                  "color": "#555555",
                  "flex": 0
                },
                {
                  "type": "text",
                  "text": #{show co},
                  "size": "sm",
                  "color": "#111111",
                  "align": "end"
                }
              ]
            },
            {
              "type": "box",
              "layout": "horizontal",
              "contents": [
                {
                  "type": "text",
                  "text": "SO2",
                  "size": "sm",
                  "color": "#555555",
                  "flex": 0
                },
                {
                  "type": "text",
                  "text": #{show so2},
                  "size": "sm",
                  "color": "#111111",
                  "align": "end"
                }
              ]
            }
          ]
        }
      ]
    },
    "footer": {
      "type": "box",
      "layout": "horizontal",
      "contents": [
        {
          "type": "button",
          "action": {
            "type": "uri",
            "label": "More info",
            "uri": "https://www.epa.gov.tw/"
          }
        }
      ]
    }
  }
|]

Putting it all together

We are almost done! The only remaining part is to run our server and main loop:

  • We read from the environment the channel token and secret
  • create an initial Env.
  • thread the inital environment to our app and loop.
  • call Network.Wai.Handler.Warp.run to run the webhook in port 3000
import System.Environment (getEnv)
import Control.Concurrent.STM.TVar (newTVar)
import Control.Monad.Reader (runReader, runReaderT)
import Network.Wai.Handler.Warp (run)
import Data.String (fromString)

main :: IO ()
main = do
  token <- fromString <$> getEnv "CHANNEL_TOKEN"
  secret <- fromString <$> getEnv "CHANNEL_SECRET"
  env <- atomically $ Env token secret <$> newTVar []
  runReaderT loop env
  run 3000 $ runReader app env

Here you can see we are actually instantiating loop and app to concrete monads.

Wanna be friends?

If you live in Taiwan, and would like to get notified of bad air quality, you can follow this bot:

It’s the same bot as we have detailed in the tutorial with the exception that it uses PostGIS instead of a transactional variable to store users.

You can find the repository for the code here: https://github.com/moleike/taiwan-aqi-bot

Conclusion

In this tutorial we have covered the development of a simple but practical chatbot. I hope you enjoyed reading (and perhaps coding along) it, and I hope it inspired you to get started with your own chatbot ideas!


[1] Bear in mind that in this example the list of users does not persist across restarts or crashes; in a production environment you should use a database to store users.

[2] The so-called mtl-style programming

[3] Note that in order to keep this tutorial concise, we are not checking for possible errors, but you should pattern match the result of runLineLine has an instance of MonadError ClientError so you can catch errors there, too.

[4] The naive solution we are building here to associate users to monitoring stations would be impractical for a real application, where it would make more sense to filter out those data points where pollution is of concern, and then for each data point retrieve all users that are within a given distance (e.g., using a geospatial index), and notify them with multicast messages.

Related Post