# %% [markdown]
# # ChatGPT agent example usage
#
# ## Basic Example
# ChatGPT can be used in lots of cases, for example, sentiment analysis, language translation, SQL query generation, and text summarization.
#
# This example shows you how to run ChatGPT tasks in flyte.
#
# %%
from typing import List
import union
from flytekitplugins.openai import ChatGPTTask
# %% [markdown]
# You have to specify your `name`, `openai_organization` and `chatgpt_config`.
#
# `name` is for Flyte and it should be unique.
#
# `openai_organization` is for the OpenAI API. You can find it [here](https://platform.openai.com/account/organization).
#
# `chatgpt_config` is for OpenAI chat completion. You can find it [here](https://platform.openai.com/docs/api-reference/chat/create).
#
# %%
chatgpt_small_job = ChatGPTTask(
name="3.5-turbo",
openai_organization="org-NayNG68kGnVXMJ8Ak4PMgQv7",
chatgpt_config={
"model": "gpt-3.5-turbo",
"temperature": 0.7,
},
)
chatgpt_big_job = ChatGPTTask(
name="gpt-4",
openai_organization="org-NayNG68kGnVXMJ8Ak4PMgQv7",
chatgpt_config={
"model": "gpt-4",
"temperature": 0.7,
},
)
@union.workflow
def my_chatgpt_job(message: str) -> str:
message = chatgpt_small_job(message=message)
message = chatgpt_big_job(message=message)
return message
# %% [markdown]
# ## ChatGPT Summary Bot
# These examples show you a real use case of ChatGPT in the production mode.
#
# For more details, see the [FlyteChatGPT Summary Bot GitHub repository](https://github.com/Future-Outlier/FlyteChatGPTSummaryBot) and the [demo video](https://youtu.be/IS6gi4jR7h0?si=hWHZp5LyjDspiwfD).
# ### Summarize Flyte's latest GitHub releases to Slack
#
# %%
image = union.ImageSpec(
apt_packages=["git"],
packages=[
"flytekitplugins-chatgpt",
"requests",
"slack_sdk",
],
registry="ghcr.io/flyteorg",
)
chatgpt_job = ChatGPTTask(
name="3.5-turbo",
openai_organization="org-NayNG68kGnVXMJ8Ak4PMgQv7",
chatgpt_config={
"model": "gpt-3.5-turbo",
"temperature": 0.7,
},
)
@union.task(
container_image=image,
secret_requests=[union.Secret(key="token", group="github-api")],
)
def get_github_latest_release(owner: str = "flyteorg", repo: str = "flyte") -> str:
import requests
token = union.current_context().secrets.get("github-api", "token")
url = f"https://api.github.com/repos/{owner}/{repo}/releases/latest"
headers = {
"Authorization": f"token {token}",
"Accept": "application/vnd.github.v3+json",
}
response = requests.get(url, headers=headers)
message = (
"You are a Bot. Provide a summary of the latest Flyte Github releases for users on Slack."
"Ensure the response fits within 4000 characters, suitable for a Slack message. "
"Start the message with 'These are the latest Flyte GitHub releases'. "
f"End the message with 'Check out the releases page here: https://github.com/{owner}/{repo}/releases'. "
"Note: Handling via the Slack API is not required. Format the response in bullet points.\n\n"
f"Latest releases:\n{response.json()['body']}"
)
return message
@union.task(
container_image=image,
secret_requests=[union.Secret(key="token", group="slack-api")],
)
def post_message_on_slack(message: str):
if message == "":
return
from slack_sdk import WebClient
token = union.current_context().secrets.get("slack-api", "token")
client = WebClient(token=token)
client.chat_postMessage(channel="youtube-summary", text=message)
@union.workflow
def slack_wf(owner: str = "flyteorg", repo: str = "flyte", channel: str = "demo"):
message = get_github_latest_release(owner=owner, repo=repo)
message = chatgpt_job(message=message)
post_message_on_slack(message=message)
if __name__ == "__main__":
slack_wf()
# %% [markdown]
# ### Summarize Flyte's latest YouTube Video to Slack
# %%
image = union.ImageSpec(
apt_packages=["git"],
packages=[
"flytekitplugins-chatgpt",
"scrapetube==2.5.1",
"youtube_transcript_api==0.6.1",
"slack_sdk==3.23.0",
],
registry="ghcr.io/flyteorg",
)
chatgpt_job = ChatGPTTask(
name="3.5-turbo",
openai_organization="org-NayNG68kGnVXMJ8Ak4PMgQv7",
chatgpt_config={
"model": "gpt-3.5-turbo",
"temperature": 0.7,
},
)
@union.task(container_image=image)
def get_latest_video_transcript_chunks(channel_url: str) -> List[str]:
import scrapetube
from youtube_transcript_api import YouTubeTranscriptApi
# fetch_latest_video_id
video_generator = scrapetube.get_channel(channel_url=channel_url)
latest_video = next(video_generator)
video_id = latest_video["videoId"]
# fetch_transcript
transcript = YouTubeTranscriptApi.get_transcript(video_id)
# chunk_transcript
text_transcript = "\n".join([entry["text"] for entry in transcript])
return [text_transcript[i : i + 10000] for i in range(0, len(text_transcript), 10000)]
@union.workflow
def video_wf(channel_url: str):
chunks = get_latest_video_transcript_chunks(channel_url=channel_url)
dynamic_subwf(channel_url=channel_url, chunks=chunks)
@union.task(container_image=image)
def check_strs_len_less_than_num(msg1: str, msg2: str, num: int) -> bool:
return len(msg1) + len(msg2) < num
@union.task(container_image=image)
def concatenate_str(msg1: str, msg2: str) -> str:
return msg1 + msg2 + "\n"
@union.task(container_image=image)
def str_is_non_empty(msg: str) -> bool:
return len(msg) == 0
@union.dynamic(container_image=image)
def dynamic_subwf(channel_url: str, chunks: List[str]):
post_message_on_slack(
message=f"This is the latest video summary, checkout in Flyte's Youtube Channel!\n{channel_url}"
)
summary_messages = []
for chunk in chunks:
message = chatgpt_job(
message=concatenate_str(
msg1=(
"Please provide a summary of the following portion of the transcript"
" from the latest Flyte YouTube video. Note: This is only a segment"
" of the entire transcript, which has been split into multiple parts."
" The summary should be concise, not exceeding 4000 characters, and"
" suitable for sharing on Slack."
"Note: Handling via the Slack API is not required. Format the response in bullet points.\n\n"
"Transcript:\n"
),
msg2=chunk,
)
)
summary_messages.append(message)
message = ""
for summary_message in summary_messages:
b = check_strs_len_less_than_num(msg1=message, msg2=summary_message, num=15000)
if b.is_true:
message = concatenate_str(msg1=message, msg2=summary_message)
if b.is_false:
message = chatgpt_job(
message=concatenate_str(
msg1=(
"Please provide a concise summary of the following messages"
" generated by ChatGPT. The summary should be suitable for sharing"
" on Slack and not exceed 4000 characters."
"Note: Handling via the Slack API is not required. Format the response in bullet points.\n\n"
"Transcript:\n"
),
msg2=message,
)
)
post_message_on_slack(message=message)
message = summary_message
b = str_is_non_empty(msg=message)
if b.is_true:
message = chatgpt_job(
message=concatenate_str(
msg1=(
"Please provide a concise summary of the following messages"
" generated by ChatGPT. The summary should be suitable for sharing"
" on Slack and not exceed 4000 characters."
"Note: Handling via the Slack API is not required. Format the response in bullet points.\n\n"
"Transcript:\n"
),
msg2=message,
)
)
post_message_on_slack(message=message)
if __name__ == "__main__":
video_wf(channel_url="https://www.youtube.com/@flyteorg")
# %% [markdown]
# ### Summarize the latest MLOps trend from Medium to Twitter
# Note: This example only works in a local environment.
# %%
chatgpt_job = ChatGPTTask(
name="3.5-turbo",
openai_organization="org-NayNG68kGnVXMJ8Ak4PMgQv7",
chatgpt_config={
"model": "gpt-3.5-turbo",
"temperature": 0.7,
},
)
@union.task
def get_weekly_articles_title(url: str = "https://medium.com/tag/flyte") -> str:
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
options = Options()
options.add_argument("--headless")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
driver.get(url)
page_source = driver.page_source
driver.quit()
soup = BeautifulSoup(page_source, "html.parser")
texts = soup.stripped_strings
all_text = " ".join(texts)
message = (
f"You are a Bot. Provide a summary of the latest MLOps trend for users on Medium. "
f"Your response should fit within 280 characters for a tweet, excluding the article's title. "
f"Start the message with '''This is the trend of MLOps on Medium this week\n'''. Note: Tweet API handling is not required."
f"```````"
f"Article Title: {all_text}"
)
return message
@union.task(
secret_requests=[
Secret(key="bearer_token", group="tweet-api"),
Secret(key="consumer_key", group="tweet-api"),
Secret(key="consumer_secret", group="tweet-api"),
Secret(key="access_token", group="tweet-api"),
Secret(key="access_token_secret", group="tweet-api"),
],
)
def tweet(text: str):
import tweepy
TWEET_LENGTH = 280
BEARER_TOKEN = union..current_context().secrets.get("tweet-api", "bearer_token")
CONSUMER_KEY = union..current_context().secrets.get("tweet-api", "consumer_key")
CONSUMER_SECRET = union..current_context().secrets.get("tweet-api", "consumer_secret")
ACCESS_TOKEN = union..current_context().secrets.get("tweet-api", "access_token")
ACCESS_TOKEN_SECRET = union..current_context().secrets.get("tweet-api", "access_token_secret")
client = tweepy.Client(
bearer_token=BEARER_TOKEN,
consumer_key=CONSUMER_KEY,
consumer_secret=CONSUMER_SECRET,
access_token=ACCESS_TOKEN,
access_token_secret=ACCESS_TOKEN_SECRET,
)
if len(text) > TWEET_LENGTH:
text = text[:TWEET_LENGTH]
client.create_tweet(text=text)
@union.workflow
def tweet_wf(url: str = "https://medium.com/tag/flyte"):
message = get_weekly_articles_title(url=url)
message = chatgpt_job(message=message)
tweet(text=message)
if __name__ == "__main__":
tweet_wf()