{ "metadata": { "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.6-final" }, "orig_nbformat": 2, "kernelspec": { "name": "python36664bitea6884f10f474b21a2a2f022451e0d09", "display_name": "Python 3.6.6 64-bit" } }, "nbformat": 4, "nbformat_minor": 2, "cells": [ { "cell_type": "code", "execution_count": null, "source": [ "import os\r\n", "import pickle\r\n", "# Gmail API utils\r\n", "from googleapiclient.discovery import build\r\n", "from google_auth_oauthlib.flow import InstalledAppFlow\r\n", "from google.auth.transport.requests import Request\r\n", "# for encoding/decoding messages in base64\r\n", "from base64 import urlsafe_b64decode, urlsafe_b64encode\r\n", "# for dealing with attachement MIME types\r\n", "from email.mime.text import MIMEText\r\n", "from email.mime.multipart import MIMEMultipart\r\n", "from email.mime.image import MIMEImage\r\n", "from email.mime.audio import MIMEAudio\r\n", "from email.mime.base import MIMEBase\r\n", "from mimetypes import guess_type as guess_mime_type" ], "outputs": [], "metadata": {} }, { "cell_type": "code", "execution_count": null, "source": [ "# Request all access (permission to read/send/receive emails, manage the inbox, and more)\r\n", "SCOPES = ['https://mail.google.com/']\r\n", "our_email = 'your_gmail@gmail.com'" ], "outputs": [], "metadata": {} }, { "cell_type": "code", "execution_count": null, "source": [ "def gmail_authenticate():\r\n", " creds = None\r\n", " # the file token.pickle stores the user's access and refresh tokens, and is\r\n", " # created automatically when the authorization flow completes for the first time\r\n", " if os.path.exists(\"token.pickle\"):\r\n", " with open(\"token.pickle\", \"rb\") as token:\r\n", " creds = pickle.load(token)\r\n", " # if there are no (valid) credentials availablle, let the user log in.\r\n", " if not creds or not creds.valid:\r\n", " if creds and creds.expired and creds.refresh_token:\r\n", " creds.refresh(Request())\r\n", " else:\r\n", " flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES)\r\n", " creds = flow.run_local_server(port=0)\r\n", " # save the credentials for the next run\r\n", " with open(\"token.pickle\", \"wb\") as token:\r\n", " pickle.dump(creds, token)\r\n", " return build('gmail', 'v1', credentials=creds)" ], "outputs": [], "metadata": {} }, { "cell_type": "code", "execution_count": null, "source": [ "# get the Gmail API service\r\n", "service = gmail_authenticate()" ], "outputs": [], "metadata": {} }, { "cell_type": "code", "execution_count": null, "source": [ "# Adds the attachment with the given filename to the given message\r\n", "def add_attachment(message, filename):\r\n", " content_type, encoding = guess_mime_type(filename)\r\n", " if content_type is None or encoding is not None:\r\n", " content_type = 'application/octet-stream'\r\n", " main_type, sub_type = content_type.split('/', 1)\r\n", " if main_type == 'text':\r\n", " fp = open(filename, 'rb')\r\n", " msg = MIMEText(fp.read().decode(), _subtype=sub_type)\r\n", " fp.close()\r\n", " elif main_type == 'image':\r\n", " fp = open(filename, 'rb')\r\n", " msg = MIMEImage(fp.read(), _subtype=sub_type)\r\n", " fp.close()\r\n", " elif main_type == 'audio':\r\n", " fp = open(filename, 'rb')\r\n", " msg = MIMEAudio(fp.read(), _subtype=sub_type)\r\n", " fp.close()\r\n", " else:\r\n", " fp = open(filename, 'rb')\r\n", " msg = MIMEBase(main_type, sub_type)\r\n", " msg.set_payload(fp.read())\r\n", " fp.close()\r\n", " filename = os.path.basename(filename)\r\n", " msg.add_header('Content-Disposition', 'attachment', filename=filename)\r\n", " message.attach(msg)\r\n", "\r\n", "def build_message(destination, obj, body, attachments=[]):\r\n", " if not attachments: # no attachments given\r\n", " message = MIMEText(body)\r\n", " message['to'] = destination\r\n", " message['from'] = our_email\r\n", " message['subject'] = obj\r\n", " else:\r\n", " message = MIMEMultipart()\r\n", " message['to'] = destination\r\n", " message['from'] = our_email\r\n", " message['subject'] = obj\r\n", " message.attach(MIMEText(body))\r\n", " for filename in attachments:\r\n", " add_attachment(message, filename)\r\n", " return {'raw': urlsafe_b64encode(message.as_bytes()).decode()}\r\n", "\r\n", "def send_message(service, destination, obj, body, attachments=[]):\r\n", " return service.users().messages().send(\r\n", " userId=\"me\",\r\n", " body=build_message(destination, obj, body, attachments)\r\n", " ).execute()" ], "outputs": [], "metadata": {} }, { "cell_type": "code", "execution_count": null, "source": [ "# test send email\r\n", "send_message(service, \"destination@domain.com\", \"This is a subject\", \r\n", " \"This is the body of the email\", [\"test.txt\", \"credentials.json\"])" ], "outputs": [], "metadata": {} }, { "cell_type": "code", "execution_count": null, "source": [ "def search_messages(service, query):\r\n", " result = service.users().messages().list(userId='me',q=query).execute()\r\n", " messages = [ ]\r\n", " if 'messages' in result:\r\n", " messages.extend(result['messages'])\r\n", " while 'nextPageToken' in result:\r\n", " page_token = result['nextPageToken']\r\n", " result = service.users().messages().list(userId='me',q=query, pageToken=page_token).execute()\r\n", " if 'messages' in result:\r\n", " messages.extend(result['messages'])\r\n", " return messages" ], "outputs": [], "metadata": {} }, { "cell_type": "code", "execution_count": null, "source": [ "# utility functions\r\n", "def get_size_format(b, factor=1024, suffix=\"B\"):\r\n", " \"\"\"\r\n", " Scale bytes to its proper byte format\r\n", " e.g:\r\n", " 1253656 => '1.20MB'\r\n", " 1253656678 => '1.17GB'\r\n", " \"\"\"\r\n", " for unit in [\"\", \"K\", \"M\", \"G\", \"T\", \"P\", \"E\", \"Z\"]:\r\n", " if b < factor:\r\n", " return f\"{b:.2f}{unit}{suffix}\"\r\n", " b /= factor\r\n", " return f\"{b:.2f}Y{suffix}\"\r\n", "\r\n", "\r\n", "def clean(text):\r\n", " # clean text for creating a folder\r\n", " return \"\".join(c if c.isalnum() else \"_\" for c in text)" ], "outputs": [], "metadata": {} }, { "cell_type": "code", "execution_count": null, "source": [ "def parse_parts(service, parts, folder_name, message):\r\n", " \"\"\"\r\n", " Utility function that parses the content of an email partition\r\n", " \"\"\"\r\n", " if parts:\r\n", " for part in parts:\r\n", " filename = part.get(\"filename\")\r\n", " mimeType = part.get(\"mimeType\")\r\n", " body = part.get(\"body\")\r\n", " data = body.get(\"data\")\r\n", " file_size = body.get(\"size\")\r\n", " part_headers = part.get(\"headers\")\r\n", " if part.get(\"parts\"):\r\n", " # recursively call this function when we see that a part\r\n", " # has parts inside\r\n", " parse_parts(service, part.get(\"parts\"), folder_name, message)\r\n", " if mimeType == \"text/plain\":\r\n", " # if the email part is text plain\r\n", " if data:\r\n", " text = urlsafe_b64decode(data).decode()\r\n", " print(text)\r\n", " elif mimeType == \"text/html\":\r\n", " # if the email part is an HTML content\r\n", " # save the HTML file and optionally open it in the browser\r\n", " if not filename:\r\n", " filename = \"index.html\"\r\n", " filepath = os.path.join(folder_name, filename)\r\n", " print(\"Saving HTML to\", filepath)\r\n", " with open(filepath, \"wb\") as f:\r\n", " f.write(urlsafe_b64decode(data))\r\n", " else:\r\n", " # attachment other than a plain text or HTML\r\n", " for part_header in part_headers:\r\n", " part_header_name = part_header.get(\"name\")\r\n", " part_header_value = part_header.get(\"value\")\r\n", " if part_header_name == \"Content-Disposition\":\r\n", " if \"attachment\" in part_header_value:\r\n", " # we get the attachment ID \r\n", " # and make another request to get the attachment itself\r\n", " print(\"Saving the file:\", filename, \"size:\", get_size_format(file_size))\r\n", " attachment_id = body.get(\"attachmentId\")\r\n", " attachment = service.users().messages() \\\r\n", " .attachments().get(id=attachment_id, userId='me', messageId=message['id']).execute()\r\n", " data = attachment.get(\"data\")\r\n", " filepath = os.path.join(folder_name, filename)\r\n", " if data:\r\n", " with open(filepath, \"wb\") as f:\r\n", " f.write(urlsafe_b64decode(data))\r\n", "\r\n", "\r\n", "def read_message(service, message):\r\n", " \"\"\"\r\n", " This function takes Gmail API `service` and the given `message_id` and does the following:\r\n", " - Downloads the content of the email\r\n", " - Prints email basic information (To, From, Subject & Date) and plain/text parts\r\n", " - Creates a folder for each email based on the subject\r\n", " - Downloads text/html content (if available) and saves it under the folder created as index.html\r\n", " - Downloads any file that is attached to the email and saves it in the folder created\r\n", " \"\"\"\r\n", " msg = service.users().messages().get(userId='me', id=message['id'], format='full').execute()\r\n", " # parts can be the message body, or attachments\r\n", " payload = msg['payload']\r\n", " headers = payload.get(\"headers\")\r\n", " parts = payload.get(\"parts\")\r\n", " folder_name = \"email\"\r\n", " has_subject = False\r\n", " if headers:\r\n", " # this section prints email basic info & creates a folder for the email\r\n", " for header in headers:\r\n", " name = header.get(\"name\")\r\n", " value = header.get(\"value\")\r\n", " if name.lower() == 'from':\r\n", " # we print the From address\r\n", " print(\"From:\", value)\r\n", " if name.lower() == \"to\":\r\n", " # we print the To address\r\n", " print(\"To:\", value)\r\n", " if name.lower() == \"subject\":\r\n", " # make our boolean True, the email has \"subject\"\r\n", " has_subject = True\r\n", " # make a directory with the name of the subject\r\n", " folder_name = clean(value)\r\n", " # we will also handle emails with the same subject name\r\n", " folder_counter = 0\r\n", " while os.path.isdir(folder_name):\r\n", " folder_counter += 1\r\n", " # we have the same folder name, add a number next to it\r\n", " if folder_name[-1].isdigit() and folder_name[-2] == \"_\":\r\n", " folder_name = f\"{folder_name[:-2]}_{folder_counter}\"\r\n", " elif folder_name[-2:].isdigit() and folder_name[-3] == \"_\":\r\n", " folder_name = f\"{folder_name[:-3]}_{folder_counter}\"\r\n", " else:\r\n", " folder_name = f\"{folder_name}_{folder_counter}\"\r\n", " os.mkdir(folder_name)\r\n", " print(\"Subject:\", value)\r\n", " if name.lower() == \"date\":\r\n", " # we print the date when the message was sent\r\n", " print(\"Date:\", value)\r\n", " if not has_subject:\r\n", " # if the email does not have a subject, then make a folder with \"email\" name\r\n", " # since folders are created based on subjects\r\n", " if not os.path.isdir(folder_name):\r\n", " os.mkdir(folder_name)\r\n", " parse_parts(service, parts, folder_name, message)\r\n", " print(\"=\"*50)" ], "outputs": [], "metadata": {} }, { "cell_type": "code", "execution_count": null, "source": [ "# get emails that match the query you specify\r\n", "results = search_messages(service, \"Python Code\")\r\n", "# for each email matched, read it (output plain/text to console & save HTML and attachments)\r\n", "for msg in results:\r\n", " read_message(service, msg)" ], "outputs": [], "metadata": {} }, { "cell_type": "code", "execution_count": null, "source": [ "def mark_as_read(service, query):\r\n", " messages_to_mark = search_messages(service, query)\r\n", " return service.users().messages().batchModify(\r\n", " userId='me',\r\n", " body={\r\n", " 'ids': [ msg['id'] for msg in messages_to_mark ],\r\n", " 'removeLabelIds': ['UNREAD']\r\n", " }\r\n", " ).execute()\r\n", "\r\n", "def mark_as_unread(service, query):\r\n", " messages_to_mark = search_messages(service, query)\r\n", " return service.users().messages().batchModify(\r\n", " userId='me',\r\n", " body={\r\n", " 'ids': [ msg['id'] for msg in messages_to_mark ],\r\n", " 'addLabelIds': ['UNREAD']\r\n", " }\r\n", " ).execute()" ], "outputs": [], "metadata": {} }, { "cell_type": "code", "execution_count": null, "source": [ "mark_as_read(service, \"Google\")\n", "# search query by sender/receiver\n", "mark_as_unread(service, \"email@domain.com\")" ], "outputs": [], "metadata": {} }, { "cell_type": "code", "execution_count": null, "source": [ "def delete_messages(service, query):\n", " messages_to_delete = search_messages(service, query)\n", " # it's possible to delete a single message with the delete API, like this:\n", " # service.users().messages().delete(userId='me', id=msg['id'])\n", " # but it's also possible to delete all the selected messages with one query, batchDelete\n", " return service.users().messages().batchDelete(\n", " userId='me',\n", " body={\n", " 'ids': [ msg['id'] for msg in messages_to_delete]\n", " }\n", " ).execute()" ], "outputs": [], "metadata": {} }, { "cell_type": "code", "execution_count": null, "source": [ "delete_messages(service, \"Google Alerts\")" ], "outputs": [], "metadata": {} } ] }