Add system to have multiple url shorteners and chose which to use

This commit is contained in:
jesopo 2019-06-25 17:53:00 +01:00
parent cd299d92eb
commit 12181bfec6
8 changed files with 82 additions and 28 deletions

22
modules/bitly.py Normal file
View file

@ -0,0 +1,22 @@
#--depends-on commands
#--require-config bitly-api-key
import re
from src import ModuleManager, utils
URL_BITLYSHORTEN = "https://api-ssl.bitly.com/v3/shorten"
class Module(ModuleManager.BaseModule):
def on_load(self):
self.exports.add("shorturl-s-bitly", self._shorturl)
def _shorturl(self, url):
if len(url) < 22:
return None
page = utils.http.request(URL_BITLYSHORTEN, get_params={
"access_token": self.bot.config["bitly-api-key"],
"longUrl": url}, json=True)
if page and page.data["data"]:
return page.data["data"]["url"]
return None

View file

@ -1,3 +1,8 @@
#--depends-on channel_access
#--depends-on check_mode
#--depends-on commands
#--depends-on shorturl
import itertools, json, re, urllib.parse
from src import ModuleManager, utils
from . import colors, gitea, github

View file

@ -343,7 +343,7 @@ class GitHub(object):
url = ""
if data["check_run"]["details_url"]:
url = data["check_run"]["details_url"]
url = " - %s" % self.exports.get_one("shortlink")(url)
url = " - %s" % self.exports.get_one("shorturl-any")(url)
duration = ""
if data["check_run"]["completed_at"]:

View file

@ -1,38 +1,53 @@
#--depends-on commands
#--require-config bitly-api-key
import re
from src import ModuleManager, utils
URL_BITLYSHORTEN = "https://api-ssl.bitly.com/v3/shorten"
@utils.export("serverset", {"setting": "url-shortener",
"help": "Set URL shortener service", "example": "bitly"})
@utils.export("botset", {"setting": "url-shortener",
"help": "Set URL shortener service", "example": "bitly"})
class Module(ModuleManager.BaseModule):
_name = "Short"
def on_load(self):
self.exports.add("shortlink", self._shortlink)
self.exports.add("shorturl", self._shorturl)
self.exports.add("shorturl-any", self._shorturl_any)
def _shortlink(self, url):
if not re.match(utils.http.REGEX_URL, url):
url = "http://%s" % url
def _get_shortener(self, name):
return self.exports.get_one("shorturl-s-%s" % name, None)
def _call_shortener(self, shortener_name, url):
shortener = self._get_shortener(shortener_name)
if shortener == None:
return None
short_url = shortener(url)
if short_url == None:
return None
return short_url
page = utils.http.request(URL_BITLYSHORTEN, get_params={
"access_token": self.bot.config["bitly-api-key"],
"longUrl": url}, json=True)
def _shorturl_any(self, url):
global_shortener_name = self.bot.get_setting("url-shortener", "bitly")
if global_shortener_name:
return self._call_shortener(global_shortener_name, url) or url
if page and page.data["data"]:
return page.data["data"]["url"]
return url
shortener_name = self.exports.find_one("shorturl-s-", None)
if shortener_name == None:
return url
return self._call_shortener(shortener_name, url) or url
def _shorturl(self, server, url):
shortener_name = server.get_setting("url-shortener", "bitly")
if shortener_name == None:
return url
return self._call_shortener(shortener_name, url) or url
@utils.hook("received.command.shorten")
def shorten(self, event):
"""
:help: Shorten a given URL using the is.gd service
:help: Shorten a given URL
:usage: <url>
"""
url = None
if len(event["args"]) > 0:
url = event["args_split"][0]
if not re.match(utils.http.REGEX_URL, url):
url = "http://%s" % url
else:
url = event["target"].buffer.find(utils.http.REGEX_URL)
if url:
@ -40,7 +55,5 @@ class Module(ModuleManager.BaseModule):
if not url:
raise utils.EventError("No URL provided/found.")
if url:
event["stdout"].write("Shortened URL: %s" % self._shortlink(url))
else:
event["stderr"].write("Unable to shorten that URL.")
event["stdout"].write("Shortened URL: %s" % self._shorturl(
event["server"], url))

View file

@ -1,5 +1,6 @@
#--depends-on commands
#--depends-on config
#--depends-on shorturl
import hashlib, re, urllib.parse
from src import EventManager, ModuleManager, utils
@ -40,8 +41,8 @@ class Module(ModuleManager.BaseModule):
"\r", "").replace(" ", " ").strip()
if channel.get_setting("title-shorten", False):
short_url = self.exports.get_one("shortlink", lambda x: x
)(url)
short_url = self.exports.get_one("shorturl")(
event["server"], url)
return "%s - %s" % (title, short_url)
return title
else:

View file

@ -1,5 +1,6 @@
#--depends-on commands
#--depends-on permissions
#--depends-on shorturl
#--require-config twitter-api-key
#--require-config twitter-api-secret
#--require-config twitter-access-token
@ -34,8 +35,8 @@ class BitBotStreamListener(tweepy.StreamListener):
if server and channel_name in server.channels:
follows.append([server, server.channels.get(channel_name)])
tweet = format._tweet(_exports, status)
for server, channel in follows:
tweet = format._tweet(_exports, server, status)
_events.on("send.stdout").call(target=channel,
module_name="Tweets", server=server, message=tweet)

View file

@ -6,7 +6,7 @@ def _timestamp(dt):
since, unit = utils.time_unit(seconds_since)
return "%s %s ago" % (since, unit)
def _tweet(exports, tweet):
def _tweet(exports, server, tweet):
linked_id = tweet.id
username = tweet.user.screen_name
@ -17,7 +17,7 @@ def _tweet(exports, tweet):
tweet_link = "https://twitter.com/%s/status/%s" % (username,
linked_id)
short_url = exports.get_one("shortlink")(tweet_link)
short_url = exports.get_one("shorturl")(server, tweet_link)
short_url = " - %s" % short_url if short_url else ""
created_at = _timestamp(tweet.created_at)

View file

@ -25,6 +25,10 @@ class Exports(object):
self._context_exports[context][setting] = []
self._context_exports[context][setting].append(value)
def _get_keys(self):
return set(list(self._exports.keys())
+list(self._context_exports.keys()))
def get_all(self, setting: str) -> typing.List[typing.Any]:
return self._exports.get(setting, []) + sum([
exports.get(setting, []) for exports in
@ -34,6 +38,14 @@ class Exports(object):
values = self.get_all(setting)
return values[0] if values else default
def find_one(self, setting_prefix: str, default: typing.Any=None
) -> typing.Optional[typing.Any]:
keys = self._get_keys()
for key in keys:
if key.startswith(setting_prefix):
return key
return default
def purge_context(self, context: str):
if context in self._context_exports:
del self._context_exports[context]