Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 99 additions & 4 deletions usr/lib/hypnotix/hypnotix.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,16 @@
import traceback
import warnings
import subprocess
import tempfile
import requests
import gzip
import re
import base64
import pickle
import xml.etree.ElementTree as xmlET
from functools import partial
from pathlib import Path
from datetime import datetime, date, timedelta, timezone

# Force X11 on a Wayland session
if "WAYLAND_DISPLAY" in os.environ:
Expand Down Expand Up @@ -452,6 +460,7 @@ def add_badge(self, word, box, added_words):
print(e)

def show_groups(self, widget, content_type):
self.load_epg(self.active_provider.epg)
self.content_type = content_type
self.navigate_to("categories_page")
for child in self.categories_flowbox.get_children():
Expand Down Expand Up @@ -514,6 +523,7 @@ def on_category_button_clicked(self, widget, group):
self.show_vod(self.active_provider.series)

def show_favorites(self, widget=None):
self.load_epg(self.settings.get_string("favorites-epg"))
self.content_type = TV_GROUP
channels = []
for line in self.favorite_data:
Expand Down Expand Up @@ -542,6 +552,8 @@ def show_channels(self, channels, favorites=False):
self.download_channel_logos(logos_to_refresh)
else:
self.sidebar.hide()
self.epg_counter = {"channel": "", "idx": -1}
self.epg_timestamp = 0

def show_vod(self, items):
logos_to_refresh = []
Expand Down Expand Up @@ -852,6 +864,8 @@ def on_favorite_button_toggled(self, widget):
if widget.get_active() and data not in self.favorite_data:
print (f"Adding {name} to favorites")
self.favorite_data.append(data)
current_epg = self.active_provider.epg
self.settings.set_string("favorites-epg", (self.settings.get_string("favorites-epg").replace(current_epg, "") + " " + current_epg))
elif widget.get_active() == False and data in self.favorite_data:
print (f"Removing {name} from favorites")
self.favorite_data.remove(data)
Expand All @@ -875,6 +889,8 @@ def on_next_channel(self):
@async_function
def play_async(self, channel):
if self.mpv is not None:
self.epg_counter["idx"] = -1
self.mpv.command("show-text", "", 1)
self.mpv.stop()
print("CHANNEL: '%s' (%s)" % (channel.name, channel.url))
if channel is not None and channel.url is not None:
Expand Down Expand Up @@ -1272,6 +1288,8 @@ def set_provider_type(self, type_id):
visible_widgets.append(self.path_entry)
visible_widgets.append(self.path_label)
visible_widgets.append(self.browse_button)
visible_widgets.append(self.epg_entry)
visible_widgets.append(self.epg_label)
elif type_id == PROVIDER_TYPE_XTREAM:
visible_widgets.append(self.url_entry)
visible_widgets.append(self.url_label)
Expand Down Expand Up @@ -1447,7 +1465,42 @@ def close(w, res):

def on_menu_quit(self, widget):
self.application.quit()


@async_function
def load_epg(self, epg_urls):
self.status_label.set_text("Loading EPG...")
self.status_label.show()
self.epg = None
def get_cached_epg_path(urls):
cached_epg_name = base64.urlsafe_b64encode((date.today().isoformat() + urls.replace(" ","")).encode()).decode()
return os.path.join(tempfile.gettempdir(), cached_epg_name)
cached_epg_path = get_cached_epg_path(epg_urls)
if os.path.exists(cached_epg_path):
with gzip.open(cached_epg_path, 'rb') as f:
self.epg = pickle.load(f)
elif (epg_urls != ""):
urls = ""
for e in epg_urls.split():
try:
response = requests.get(e)
with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
tmp_file.write(response.content)
temp_file_path = tmp_file.name
with gzip.open(temp_file_path, 'rb') as f:
ungzip = f.read().decode('utf-8')
epg = xmlET.fromstring(ungzip)
if self.epg is None:
self.epg = epg
else:
for item in epg:
self.epg.append(item)
urls += e
except:
pass
with gzip.open(get_cached_epg_path(urls), 'wb') as f:
pickle.dump(self.epg, f)
self.status_label.hide()

def on_key_press_event(self, widget, event):
# Get any active, but not pressed modifiers, like CapsLock and NumLock
persistant_modifiers = Gtk.accelerator_get_default_mod_mask()
Expand All @@ -1457,8 +1510,46 @@ def on_key_press_event(self, widget, event):
# Bool of Control or Shift modifier states
ctrl = modifier == Gdk.ModifierType.CONTROL_MASK
shift = modifier == Gdk.ModifierType.SHIFT_MASK

if ctrl and event.keyval == Gdk.KEY_r:

epg_duration = 6 # seconds

def chan_match(chan1, chan2):
# discard digits at the beginning
chan1 = re.sub(r'^\d+', '', chan1)
chan2 = re.sub(r'^\d+', '', chan2)
# discard useless words
regex = r"\b(4K|HD)\b"
chan1 = re.sub(regex, "", chan1, flags=re.IGNORECASE)
chan2 = re.sub(regex, "", chan2, flags=re.IGNORECASE)
# normalize
chan1 = chan1.lower().replace(" ","")
chan1 = ''.join(filter(str.isalnum, chan1))
chan2 = chan2.lower().replace(" ","")
chan2 = ''.join(filter(str.isalnum, chan2))
return (chan1 in chan2 or chan2 in chan1)

if event.keyval == Gdk.KEY_g and not isinstance(widget.get_focus(), Gtk.Entry):
dateFormat = "%Y%m%d%H%M%S"
timeFormat = "%H:%M"
hoursOffset = 0 - int(datetime.now().astimezone().utcoffset().total_seconds() / 3600)
targetDatetime = (datetime.now() + timedelta(hours=hoursOffset)).timestamp()
if self.active_channel.name != self.epg_counter["channel"]:
self.epg_counter = {"channel": self.active_channel.name, "idx": -1 }
try:
channelEPG = [p for p in self.epg.findall("programme") if chan_match(p.attrib["channel"], self.active_channel.name)]
onair = [p for p in channelEPG if (tstart := datetime.strptime(p.attrib["start"].split()[0], dateFormat).timestamp()) <= targetDatetime and (tstop := datetime.strptime(p.attrib["stop"].split()[0], dateFormat).timestamp()) >= targetDatetime and (tstop - tstart) < (3600 * 5)]
osd_counter = ""
if len(onair) > 1:
self.epg_counter["idx"] = (self.epg_counter["idx"] + 1) % len(onair)
osd_counter = " [" + str(self.epg_counter["idx"] + 1) + "/" + str(len(onair)) + "]"
onair = onair[self.epg_counter["idx"]]
onairTime = (datetime.strptime(onair.attrib["start"].split()[0], dateFormat) + timedelta(hours=(0 - hoursOffset))).strftime(timeFormat) + " - " + (datetime.strptime(onair.attrib["stop"].split()[0], dateFormat) + timedelta(hours=(0 - hoursOffset))).strftime(timeFormat)
onairText = onair.attrib["channel"] + osd_counter + "\n" + onair.find("title").text + "\n" + onairTime
except:
onairText = "(no info)"
self.mpv.command("show-text", onairText, (epg_duration * 1000))
self.epg_timestamp = datetime.now().timestamp()
elif ctrl and event.keyval == Gdk.KEY_r:
self.reload(page=None, refresh=True)
elif ctrl and event.keyval == Gdk.KEY_f:
if self.search_button.get_active():
Expand All @@ -1473,7 +1564,11 @@ def on_key_press_event(self, widget, event):
elif event.keyval == Gdk.KEY_F7:
self.borderless_mode()
elif event.keyval == Gdk.KEY_Escape:
self.normal_mode()
if ((datetime.now().timestamp() - self.epg_timestamp) <= epg_duration):
self.mpv.command("show-text", "", 1)
self.epg_timestamp = 0
else:
self.normal_mode()
elif event.keyval == Gdk.KEY_BackSpace and not ctrl and type(widget.get_focus()) != gi.repository.Gtk.SearchEntry:
self.normal_mode()
self.on_go_back_button()
Expand Down
5 changes: 5 additions & 0 deletions usr/share/glib-2.0/schemas/org.x.hypnotix.gschema.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@
<summary>Provider selected by default</summary>
<description></description>
</key>
<key type="s" name="favorites-epg">
<default>""</default>
<summary>EPG urls for favorites</summary>
<description></description>
</key>
<key type="as" name="providers">
<default>['Free-TV:::url:::https://raw.githubusercontent.com/Free-TV/IPTV/master/playlist.m3u8:::::::::']</default>
<summary>Format: name:::type:::url(or path):::username:::password:::epg</summary>
Expand Down
Loading