NomadNet/nomadnet/ui/textui/Conversations.py

2002 lines
79 KiB
Python

import RNS
import os
import shutil
import time
import nomadnet
import LXMF
import urwid
from datetime import datetime, timedelta
from nomadnet.Directory import DirectoryEntry
def relative_time(timestamp):
now = time.time()
delta = now - timestamp
if delta < 0:
return "just now"
elif delta < 60:
return "just now"
elif delta < 3600:
m = int(delta / 60)
return str(m)+"m ago"
elif delta < 86400:
h = int(delta / 3600)
return str(h)+"h ago"
elif delta < 172800:
return "yesterday"
elif delta < 604800:
d = int(delta / 86400)
return str(d)+"d ago"
elif delta < 2592000:
w = int(delta / 604800)
return str(w)+"w ago"
else:
return datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d")
def _format_size(size):
if size < 1024:
return str(size)+" B"
elif size < 1048576:
return str(round(size/1024, 1))+" KB"
else:
return str(round(size/1048576, 1))+" MB"
from nomadnet.vendor.additional_urwid_widgets import IndicativeListBox
class ConversationListDisplayShortcuts():
def __init__(self, app):
self.app = app
self.widget = urwid.AttrMap(urwid.Text("[C-e] Peer Info [C-x] Delete [C-r] Sync [C-n] New [C-u] Ingest URI [C-o] Sort [C-g] Fullscreen"), "shortcutbar")
class ConversationDisplayShortcuts():
def __init__(self, app):
self.app = app
self.widget = urwid.AttrMap(urwid.Text("[C-d] Send [C-p] Paper Msg [C-t] Title [C-a] Attach [C-s] Save [C-k] Clear [C-w] Close [C-u] Purge [C-x] Clear History [C-o] Sort"), "shortcutbar")
class ConversationsArea(urwid.LineBox):
def keypress(self, size, key):
if key == "ctrl e":
self.delegate.edit_selected_in_directory()
elif key == "ctrl x":
self.delegate.delete_selected_conversation()
elif key == "ctrl n":
self.delegate.new_conversation()
elif key == "ctrl u":
self.delegate.ingest_lxm_uri()
elif key == "ctrl r":
self.delegate.sync_conversations()
elif key == "ctrl g":
self.delegate.toggle_fullscreen()
elif key == "ctrl o":
self.delegate.toggle_list_sort()
elif key == "tab":
self.delegate.app.ui.main_display.frame.focus_position = "header"
elif key == "up" and (self.delegate.ilb.first_item_is_selected() or self.delegate.ilb.body_is_empty()):
self.delegate.app.ui.main_display.frame.focus_position = "header"
else:
return super(ConversationsArea, self).keypress(size, key)
class DialogLineBox(urwid.LineBox):
def keypress(self, size, key):
if key == "esc":
if hasattr(self.delegate, "update_conversation_list"):
self.delegate.update_conversation_list()
elif hasattr(self.delegate, "dialog_active"):
self.delegate.dialog_active = False
self.delegate.conversation_changed(None)
else:
return super(DialogLineBox, self).keypress(size, key)
class ConversationsDisplay():
list_width = 0.33
given_list_width = 52
cached_conversation_widgets = {}
SORT_RECENT = 0
SORT_NAME = 1
def __init__(self, app):
self.app = app
self.dialog_open = False
self.sync_dialog = None
self.currently_displayed_conversation = None
self.list_sort_mode = ConversationsDisplay.SORT_RECENT
def disp_list_shortcuts(sender, arg1, arg2):
self.shortcuts_display = self.list_shortcuts
self.app.ui.main_display.update_active_shortcuts()
self.update_listbox()
self.columns_widget = urwid.Columns(
[
# (urwid.WEIGHT, ConversationsDisplay.list_width, self.listbox),
# (urwid.WEIGHT, 1-ConversationsDisplay.list_width, self.make_conversation_widget(None))
(ConversationsDisplay.given_list_width, self.listbox),
(urwid.WEIGHT, 1, self.make_conversation_widget(None))
],
dividechars=0, focus_column=0, box_columns=[0]
)
self.list_shortcuts = ConversationListDisplayShortcuts(self.app)
self.editor_shortcuts = ConversationDisplayShortcuts(self.app)
self.shortcuts_display = self.list_shortcuts
self.widget = self.columns_widget
nomadnet.Conversation.created_callback = self.update_conversation_list
def focus_change_event(self):
if not self.dialog_open:
self.update_conversation_list()
def toggle_list_sort(self):
if self.list_sort_mode == ConversationsDisplay.SORT_RECENT:
self.list_sort_mode = ConversationsDisplay.SORT_NAME
else:
self.list_sort_mode = ConversationsDisplay.SORT_RECENT
self.update_conversation_list()
def update_listbox(self):
conversations = self.app.conversations()
if self.list_sort_mode == ConversationsDisplay.SORT_NAME:
conversations.sort(key=lambda e: (e[3].lower(), e[0]))
conversation_list_widgets = []
for conversation in conversations:
conversation_list_widgets.append(self.conversation_list_widget(conversation))
self.list_widgets = conversation_list_widgets
self.ilb = IndicativeListBox(
self.list_widgets,
on_selection_change=self.conversation_list_selection,
initialization_is_selection_change=False,
highlight_offFocus="list_off_focus"
)
self.listbox = ConversationsArea(urwid.Filler(self.ilb, height=urwid.RELATIVE_100), title="Conversations")
self.listbox.delegate = self
def delete_selected_conversation(self):
self.dialog_open = True
item = self.ilb.get_selected_item()
if item == None:
return
source_hash = item.source_hash
def dismiss_dialog(sender):
self.update_conversation_list()
self.dialog_open = False
def confirmed(sender):
self.dialog_open = False
self.delete_conversation(source_hash)
nomadnet.Conversation.delete_conversation(source_hash, self.app)
self.update_conversation_list()
dialog = DialogLineBox(
urwid.Pile([
urwid.Text(
"Delete conversation with\n"+self.app.directory.simplest_display_str(bytes.fromhex(source_hash))+"\n",
align=urwid.CENTER,
),
urwid.Columns([
(urwid.WEIGHT, 0.45, urwid.Button("Yes", on_press=confirmed)),
(urwid.WEIGHT, 0.1, urwid.Text("")),
(urwid.WEIGHT, 0.45, urwid.Button("No", on_press=dismiss_dialog)),
])
]), title="?"
)
dialog.delegate = self
bottom = self.listbox
overlay = urwid.Overlay(
dialog,
bottom,
align=urwid.CENTER,
width=urwid.RELATIVE_100,
valign=urwid.MIDDLE,
height=urwid.PACK,
left=2,
right=2,
)
# options = self.columns_widget.options(urwid.WEIGHT, ConversationsDisplay.list_width)
options = self.columns_widget.options(urwid.GIVEN, ConversationsDisplay.given_list_width)
self.columns_widget.contents[0] = (overlay, options)
def edit_selected_in_directory(self):
g = self.app.ui.glyphs
self.dialog_open = True
item = self.ilb.get_selected_item()
if item == None:
return
source_hash_text = item.source_hash
display_name = self.ilb.get_selected_item().display_name
if display_name == None:
display_name = ""
e_id = urwid.Edit(caption="Addr : ",edit_text=source_hash_text)
t_id = urwid.Text("Addr : "+source_hash_text)
e_name = urwid.Edit(caption="Name : ",edit_text=display_name)
selected_id_widget = t_id
untrusted_selected = False
unknown_selected = True
trusted_selected = False
direct_selected = True
propagated_selected = False
try:
if self.app.directory.find(bytes.fromhex(source_hash_text)):
trust_level = self.app.directory.trust_level(bytes.fromhex(source_hash_text))
if trust_level == DirectoryEntry.UNTRUSTED:
untrusted_selected = True
unknown_selected = False
trusted_selected = False
elif trust_level == DirectoryEntry.UNKNOWN:
untrusted_selected = False
unknown_selected = True
trusted_selected = False
elif trust_level == DirectoryEntry.TRUSTED:
untrusted_selected = False
unknown_selected = False
trusted_selected = True
if self.app.directory.preferred_delivery(bytes.fromhex(source_hash_text)) == DirectoryEntry.PROPAGATED:
direct_selected = False
propagated_selected = True
except Exception as e:
pass
trust_button_group = []
r_untrusted = urwid.RadioButton(trust_button_group, "Untrusted", state=untrusted_selected)
r_unknown = urwid.RadioButton(trust_button_group, "Unknown", state=unknown_selected)
r_trusted = urwid.RadioButton(trust_button_group, "Trusted", state=trusted_selected)
method_button_group = []
r_direct = urwid.RadioButton(method_button_group, "Deliver directly", state=direct_selected)
r_propagated = urwid.RadioButton(method_button_group, "Use propagation nodes", state=propagated_selected)
def dismiss_dialog(sender):
self.update_conversation_list()
self.dialog_open = False
def confirmed(sender):
try:
display_name = e_name.get_edit_text()
source_hash = bytes.fromhex(e_id.get_edit_text())
trust_level = DirectoryEntry.UNTRUSTED
if r_unknown.state == True:
trust_level = DirectoryEntry.UNKNOWN
elif r_trusted.state == True:
trust_level = DirectoryEntry.TRUSTED
delivery = DirectoryEntry.DIRECT
if r_propagated.state == True:
delivery = DirectoryEntry.PROPAGATED
entry = DirectoryEntry(source_hash, display_name, trust_level, preferred_delivery=delivery)
self.app.directory.remember(entry)
self.update_conversation_list()
self.dialog_open = False
self.app.ui.main_display.sub_displays.network_display.directory_change_callback()
except Exception as e:
RNS.log("Could not save directory entry. The contained exception was: "+str(e), RNS.LOG_VERBOSE)
if not dialog_pile.error_display:
dialog_pile.error_display = True
options = dialog_pile.options(height_type=urwid.PACK)
dialog_pile.contents.append((urwid.Text(""), options))
dialog_pile.contents.append((
urwid.Text(("error_text", "Could not save entry. Check your input."), align=urwid.CENTER),
options,)
)
source_is_known = self.app.directory.is_known(bytes.fromhex(source_hash_text))
if source_is_known:
known_section = urwid.Divider(g["divider1"])
else:
def query_action(sender, user_data):
self.close_conversation_by_hash(user_data)
nomadnet.Conversation.query_for_peer(user_data)
options = dialog_pile.options(height_type=urwid.PACK)
dialog_pile.contents = [
(urwid.Text("Query sent"), options),
(urwid.Button("OK", on_press=dismiss_dialog), options)
]
query_button = urwid.Button("Query network for keys", on_press=query_action, user_data=source_hash_text)
known_section = urwid.Pile([
urwid.Divider(g["divider1"]),
urwid.Text(g["info"]+"\n", align=urwid.CENTER),
urwid.Text(
"The identity of this peer is not known, and you cannot currently send messages to it. "
"You can query the network to obtain the identity.\n",
align=urwid.CENTER,
),
query_button,
urwid.Divider(g["divider1"]),
])
dialog_pile = urwid.Pile([
selected_id_widget,
e_name,
urwid.Divider(g["divider1"]),
r_untrusted,
r_unknown,
r_trusted,
urwid.Divider(g["divider1"]),
r_direct,
r_propagated,
known_section,
urwid.Columns([
(urwid.WEIGHT, 0.45, urwid.Button("Save", on_press=confirmed)),
(urwid.WEIGHT, 0.1, urwid.Text("")),
(urwid.WEIGHT, 0.45, urwid.Button("Back", on_press=dismiss_dialog)),
])
])
dialog_pile.error_display = False
dialog = DialogLineBox(dialog_pile, title="Peer Info")
dialog.delegate = self
bottom = self.listbox
overlay = urwid.Overlay(
dialog,
bottom,
align=urwid.CENTER,
width=urwid.RELATIVE_100,
valign=urwid.MIDDLE,
height=urwid.PACK,
left=2,
right=2,
)
# options = self.columns_widget.options(urwid.WEIGHT, ConversationsDisplay.list_width)
options = self.columns_widget.options(urwid.GIVEN, ConversationsDisplay.given_list_width)
self.columns_widget.contents[0] = (overlay, options)
def new_conversation(self):
self.dialog_open = True
source_hash = ""
display_name = ""
e_id = urwid.Edit(caption="Addr : ",edit_text=source_hash)
e_name = urwid.Edit(caption="Name : ",edit_text=display_name)
trust_button_group = []
r_untrusted = urwid.RadioButton(trust_button_group, "Untrusted")
r_unknown = urwid.RadioButton(trust_button_group, "Unknown", state=True)
r_trusted = urwid.RadioButton(trust_button_group, "Trusted")
def dismiss_dialog(sender):
self.update_conversation_list()
self.dialog_open = False
def confirmed(sender):
try:
existing_conversations = nomadnet.Conversation.conversation_list(self.app)
display_name = e_name.get_edit_text()
source_hash_text = e_id.get_edit_text().strip()
source_hash = bytes.fromhex(source_hash_text)
trust_level = DirectoryEntry.UNTRUSTED
if r_unknown.state == True:
trust_level = DirectoryEntry.UNKNOWN
elif r_trusted.state == True:
trust_level = DirectoryEntry.TRUSTED
if not source_hash in [c[0] for c in existing_conversations]:
entry = DirectoryEntry(source_hash, display_name, trust_level)
self.app.directory.remember(entry)
new_conversation = nomadnet.Conversation(source_hash_text, nomadnet.NomadNetworkApp.get_shared_instance(), initiator=True)
self.update_conversation_list()
self.display_conversation(source_hash_text)
self.dialog_open = False
except Exception as e:
RNS.log("Could not start conversation. The contained exception was: "+str(e), RNS.LOG_VERBOSE)
if not dialog_pile.error_display:
dialog_pile.error_display = True
options = dialog_pile.options(height_type=urwid.PACK)
dialog_pile.contents.append((urwid.Text(""), options))
dialog_pile.contents.append((
urwid.Text(
("error_text", "Could not start conversation. Check your input."),
align=urwid.CENTER,
),
options,
))
dialog_pile = urwid.Pile([
e_id,
e_name,
urwid.Text(""),
r_untrusted,
r_unknown,
r_trusted,
urwid.Text(""),
urwid.Columns([
(urwid.WEIGHT, 0.45, urwid.Button("Create", on_press=confirmed)),
(urwid.WEIGHT, 0.1, urwid.Text("")),
(urwid.WEIGHT, 0.45, urwid.Button("Back", on_press=dismiss_dialog)),
])
])
dialog_pile.error_display = False
dialog = DialogLineBox(dialog_pile, title="New Conversation")
dialog.delegate = self
bottom = self.listbox
overlay = urwid.Overlay(
dialog,
bottom,
align=urwid.CENTER,
width=urwid.RELATIVE_100,
valign=urwid.MIDDLE,
height=urwid.PACK,
left=2,
right=2,
)
# options = self.columns_widget.options(urwid.WEIGHT, ConversationsDisplay.list_width)
options = self.columns_widget.options(urwid.GIVEN, ConversationsDisplay.given_list_width)
self.columns_widget.contents[0] = (overlay, options)
def ingest_lxm_uri(self):
self.dialog_open = True
lxm_uri = ""
e_uri = urwid.Edit(caption="URI : ",edit_text=lxm_uri)
def dismiss_dialog(sender):
self.update_conversation_list()
self.dialog_open = False
def confirmed(sender):
try:
local_delivery_signal = "local_delivery_occurred"
duplicate_signal = "duplicate_lxm"
lxm_uri = e_uri.get_edit_text().strip()
ingest_result = self.app.message_router.ingest_lxm_uri(
lxm_uri,
signal_local_delivery=local_delivery_signal,
signal_duplicate=duplicate_signal
)
if ingest_result == False:
raise ValueError("The URI contained no decodable messages")
elif ingest_result == local_delivery_signal:
rdialog_pile = urwid.Pile([
urwid.Text("Message was decoded, decrypted successfully, and added to your conversation list."),
urwid.Text(""),
urwid.Columns([
(urwid.WEIGHT, 0.6, urwid.Text("")),
(urwid.WEIGHT, 0.4, urwid.Button("OK", on_press=dismiss_dialog)),
])
])
rdialog_pile.error_display = False
rdialog = DialogLineBox(rdialog_pile, title="Ingest message URI")
rdialog.delegate = self
bottom = self.listbox
roverlay = urwid.Overlay(
rdialog,
bottom,
align=urwid.CENTER,
width=urwid.RELATIVE_100,
valign=urwid.MIDDLE,
height=urwid.PACK,
left=2,
right=2,
)
options = self.columns_widget.options(urwid.GIVEN, ConversationsDisplay.given_list_width)
self.columns_widget.contents[0] = (roverlay, options)
elif ingest_result == duplicate_signal:
rdialog_pile = urwid.Pile([
urwid.Text("The decoded message has already been processed by the LXMF Router, and will not be ingested again."),
urwid.Text(""),
urwid.Columns([
(urwid.WEIGHT, 0.6, urwid.Text("")),
(urwid.WEIGHT, 0.4, urwid.Button("OK", on_press=dismiss_dialog)),
])
])
rdialog_pile.error_display = False
rdialog = DialogLineBox(rdialog_pile, title="Ingest message URI")
rdialog.delegate = self
bottom = self.listbox
roverlay = urwid.Overlay(
rdialog,
bottom,
align=urwid.CENTER,
width=urwid.RELATIVE_100,
valign=urwid.MIDDLE,
height=urwid.PACK,
left=2,
right=2,
)
options = self.columns_widget.options(urwid.GIVEN, ConversationsDisplay.given_list_width)
self.columns_widget.contents[0] = (roverlay, options)
else:
if self.app.enable_node:
propagation_text = "The decoded message was not addressed to this LXMF address, but has been added to the propagation node queues, and will be distributed on the propagation network."
else:
propagation_text = "The decoded message was not addressed to this LXMF address, and has been discarded."
rdialog_pile = urwid.Pile([
urwid.Text(propagation_text),
urwid.Text(""),
urwid.Columns([
(urwid.WEIGHT, 0.6, urwid.Text("")),
(urwid.WEIGHT, 0.4, urwid.Button("OK", on_press=dismiss_dialog)),
])
])
rdialog_pile.error_display = False
rdialog = DialogLineBox(rdialog_pile, title="Ingest message URI")
rdialog.delegate = self
bottom = self.listbox
roverlay = urwid.Overlay(
rdialog,
bottom,
align=urwid.CENTER,
width=urwid.RELATIVE_100,
valign=urwid.MIDDLE,
height=urwid.PACK,
left=2,
right=2,
)
options = self.columns_widget.options(urwid.GIVEN, ConversationsDisplay.given_list_width)
self.columns_widget.contents[0] = (roverlay, options)
except Exception as e:
RNS.log("Could not ingest LXM URI. The contained exception was: "+str(e), RNS.LOG_VERBOSE)
if not dialog_pile.error_display:
dialog_pile.error_display = True
options = dialog_pile.options(height_type=urwid.PACK)
dialog_pile.contents.append((urwid.Text(""), options))
dialog_pile.contents.append((urwid.Text(("error_text", "Could ingest LXM from URI data. Check your input."), align=urwid.CENTER), options))
dialog_pile = urwid.Pile([
e_uri,
urwid.Text(""),
urwid.Columns([
(urwid.WEIGHT, 0.45, urwid.Button("Ingest", on_press=confirmed)),
(urwid.WEIGHT, 0.1, urwid.Text("")),
(urwid.WEIGHT, 0.45, urwid.Button("Back", on_press=dismiss_dialog)),
])
])
dialog_pile.error_display = False
dialog = DialogLineBox(dialog_pile, title="Ingest message URI")
dialog.delegate = self
bottom = self.listbox
overlay = urwid.Overlay(
dialog,
bottom,
align=urwid.CENTER,
width=urwid.RELATIVE_100,
valign=urwid.MIDDLE,
height=urwid.PACK,
left=2,
right=2,
)
options = self.columns_widget.options(urwid.GIVEN, ConversationsDisplay.given_list_width)
self.columns_widget.contents[0] = (overlay, options)
def delete_conversation(self, source_hash):
if source_hash in ConversationsDisplay.cached_conversation_widgets:
conversation = ConversationsDisplay.cached_conversation_widgets[source_hash]
self.close_conversation(conversation)
def toggle_fullscreen(self):
if ConversationsDisplay.given_list_width != 0:
self.saved_list_width = ConversationsDisplay.given_list_width
ConversationsDisplay.given_list_width = 0
else:
ConversationsDisplay.given_list_width = self.saved_list_width
self.update_conversation_list()
def sync_conversations(self):
g = self.app.ui.glyphs
self.dialog_open = True
def dismiss_dialog(sender):
self.dialog_open = False
self.sync_dialog = None
self.update_conversation_list()
if self.app.message_router.propagation_transfer_state >= LXMF.LXMRouter.PR_COMPLETE:
self.app.cancel_lxmf_sync()
max_messages_group = []
r_mall = urwid.RadioButton(max_messages_group, "Download all", state=True)
r_mlim = urwid.RadioButton(max_messages_group, "Limit to", state=False)
ie_lim = urwid.IntEdit("", 5)
rbs = urwid.GridFlow([r_mlim, ie_lim], 12, 1, 0, align=urwid.LEFT)
def sync_now(sender):
limit = None
if r_mlim.get_state():
limit = ie_lim.value()
self.app.request_lxmf_sync(limit)
self.update_sync_dialog()
def cancel_sync(sender):
self.app.cancel_lxmf_sync()
self.update_sync_dialog()
cancel_button = urwid.Button("Close", on_press=dismiss_dialog)
sync_progress = SyncProgressBar("progress_empty" , "progress_full", current=self.app.get_sync_progress(), done=1.0, satt=None)
real_sync_button = urwid.Button("Sync Now", on_press=sync_now)
hidden_sync_button = urwid.Button("Cancel Sync", on_press=cancel_sync)
if self.app.get_sync_status() == "Idle" or self.app.message_router.propagation_transfer_state >= LXMF.LXMRouter.PR_COMPLETE:
sync_button = real_sync_button
else:
sync_button = hidden_sync_button
button_columns = urwid.Columns([
(urwid.WEIGHT, 0.45, sync_button),
(urwid.WEIGHT, 0.1, urwid.Text("")),
(urwid.WEIGHT, 0.45, cancel_button),
])
real_sync_button.bc = button_columns
pn_ident = None
if self.app.get_default_propagation_node() != None:
pn_hash = self.app.get_default_propagation_node()
pn_ident = RNS.Identity.recall(pn_hash)
if pn_ident == None:
RNS.log("Propagation node identity is unknown, requesting from network...", RNS.LOG_DEBUG)
RNS.Transport.request_path(pn_hash)
if pn_ident != None:
node_hash = RNS.Destination.hash_from_name_and_identity("nomadnetwork.node", pn_ident)
pn_entry = self.app.directory.find(node_hash)
pn_display_str = " "
if pn_entry != None:
pn_display_str += " "+str(pn_entry.display_name)
else:
pn_display_str += " "+RNS.prettyhexrep(pn_hash)
dialog = DialogLineBox(
urwid.Pile([
urwid.Text(""+g["node"]+pn_display_str, align=urwid.CENTER),
urwid.Divider(g["divider1"]),
sync_progress,
urwid.Divider(g["divider1"]),
r_mall,
rbs,
urwid.Text(""),
button_columns
]), title="Message Sync"
)
else:
button_columns = urwid.Columns([
(urwid.WEIGHT, 0.45, urwid.Text("" )),
(urwid.WEIGHT, 0.1, urwid.Text("")),
(urwid.WEIGHT, 0.45, cancel_button),
])
dialog = DialogLineBox(
urwid.Pile([
urwid.Text(""),
urwid.Text("No trusted nodes found, cannot sync!\n", align=urwid.CENTER),
urwid.Text(
"To synchronise messages from the network, "
"one or more nodes must be marked as trusted in the Known Nodes list, "
"or a node must manually be selected as the default propagation node. "
"Nomad Network will then automatically sync from the nearest trusted node, "
"or the manually selected one.",
align=urwid.LEFT,
),
urwid.Text(""),
button_columns
]), title="Message Sync"
)
dialog.delegate = self
dialog.sync_progress = sync_progress
dialog.cancel_button = cancel_button
dialog.real_sync_button = real_sync_button
dialog.hidden_sync_button = hidden_sync_button
dialog.bc = button_columns
self.sync_dialog = dialog
bottom = self.listbox
overlay = urwid.Overlay(
dialog,
bottom,
align=urwid.CENTER,
width=urwid.RELATIVE_100,
valign=urwid.MIDDLE,
height=urwid.PACK,
left=2,
right=2,
)
# options = self.columns_widget.options(urwid.WEIGHT, ConversationsDisplay.list_width)
options = self.columns_widget.options(urwid.GIVEN, ConversationsDisplay.given_list_width)
self.columns_widget.contents[0] = (overlay, options)
def update_sync_dialog(self, loop = None, sender = None):
if self.dialog_open and self.sync_dialog != None:
self.sync_dialog.sync_progress.set_completion(self.app.get_sync_progress())
if self.app.get_sync_status() == "Idle" or self.app.message_router.propagation_transfer_state >= LXMF.LXMRouter.PR_COMPLETE:
self.sync_dialog.bc.contents[0] = (self.sync_dialog.real_sync_button, self.sync_dialog.bc.options(urwid.WEIGHT, 0.45))
else:
self.sync_dialog.bc.contents[0] = (self.sync_dialog.hidden_sync_button, self.sync_dialog.bc.options(urwid.WEIGHT, 0.45))
self.app.ui.loop.set_alarm_in(0.2, self.update_sync_dialog)
def conversation_list_selection(self, arg1, arg2):
pass
def update_conversation_list(self):
selected_hash = None
selected_item = self.ilb.get_selected_item()
if selected_item is not None:
if hasattr(selected_item, "source_hash"):
selected_hash = selected_item.source_hash
self.update_listbox()
options = self.columns_widget.options(urwid.GIVEN, ConversationsDisplay.given_list_width)
if not (self.dialog_open and self.sync_dialog != None):
self.columns_widget.contents[0] = (self.listbox, options)
else:
bottom = self.listbox
overlay = urwid.Overlay(
self.sync_dialog,
bottom,
align=urwid.CENTER,
width=urwid.RELATIVE_100,
valign=urwid.MIDDLE,
height=urwid.PACK,
left=2,
right=2,
)
self.columns_widget.contents[0] = (overlay, options)
if selected_hash is not None:
for idx, widget in enumerate(self.list_widgets):
if widget.source_hash == selected_hash:
self.ilb.select_item(idx)
break
nomadnet.NomadNetworkApp.get_shared_instance().ui.loop.draw_screen()
if self.app.ui.main_display.sub_displays.active_display == self.app.ui.main_display.sub_displays.conversations_display:
if self.currently_displayed_conversation != None:
if self.app.conversation_is_unread(self.currently_displayed_conversation):
self.app.mark_conversation_read(self.currently_displayed_conversation)
try:
if os.path.isfile(self.app.conversationpath + "/" + self.currently_displayed_conversation + "/unread"):
os.unlink(self.app.conversationpath + "/" + self.currently_displayed_conversation + "/unread")
except Exception as e:
raise e
def display_conversation(self, sender=None, source_hash=None):
if self.currently_displayed_conversation != None:
if self.app.conversation_is_unread(self.currently_displayed_conversation):
self.app.mark_conversation_read(self.currently_displayed_conversation)
self.currently_displayed_conversation = source_hash
# options = self.widget.options(urwid.WEIGHT, 1-ConversationsDisplay.list_width)
options = self.widget.options(urwid.WEIGHT, 1)
self.widget.contents[1] = (self.make_conversation_widget(source_hash), options)
if source_hash == None:
self.widget.focus_position = 0
else:
if self.app.conversation_is_unread(source_hash):
self.app.mark_conversation_read(source_hash)
self.update_conversation_list()
self.widget.focus_position = 1
conversation_position = None
index = 0
for widget in self.list_widgets:
if widget.source_hash == source_hash:
conversation_position = index
index += 1
if conversation_position != None:
self.ilb.select_item(conversation_position)
def make_conversation_widget(self, source_hash):
if source_hash in ConversationsDisplay.cached_conversation_widgets:
conversation_widget = ConversationsDisplay.cached_conversation_widgets[source_hash]
if source_hash != None:
conversation_widget.update_message_widgets(replace=True)
conversation_widget.check_editor_allowed()
return conversation_widget
else:
widget = ConversationWidget(source_hash)
widget.delegate = self
ConversationsDisplay.cached_conversation_widgets[source_hash] = widget
widget.check_editor_allowed()
return widget
def close_conversation_by_hash(self, conversation_hash):
if conversation_hash in ConversationsDisplay.cached_conversation_widgets:
ConversationsDisplay.cached_conversation_widgets.pop(conversation_hash)
if self.currently_displayed_conversation == conversation_hash:
self.display_conversation(sender=None, source_hash=None)
def close_conversation(self, conversation):
if conversation.source_hash in ConversationsDisplay.cached_conversation_widgets:
ConversationsDisplay.cached_conversation_widgets.pop(conversation.source_hash)
if self.currently_displayed_conversation == conversation.source_hash:
self.display_conversation(sender=None, source_hash=None)
def conversation_list_widget(self, conversation):
trust_level = conversation[2]
display_name = conversation[1]
source_hash = conversation[0]
unread = conversation[4]
last_activity = conversation[5]
g = self.app.ui.glyphs
if trust_level == DirectoryEntry.UNTRUSTED:
symbol = g["cross"]
style = "list_untrusted"
focus_style = "list_focus_untrusted"
elif trust_level == DirectoryEntry.UNKNOWN:
symbol = "?"
style = "list_unknown"
focus_style = "list_focus"
elif trust_level == DirectoryEntry.TRUSTED:
symbol = g["check"]
style = "list_trusted"
focus_style = "list_focus_trusted"
elif trust_level == DirectoryEntry.WARNING:
symbol = g["warning"]
style = "list_warning"
focus_style = "list_focus"
else:
symbol = g["warning"]
style = "list_untrusted"
focus_style = "list_focus_untrusted"
display_text = symbol
if display_name != None and display_name != "":
display_text += " "+display_name
if trust_level != DirectoryEntry.TRUSTED:
display_text += " <"+source_hash+">"
if trust_level != DirectoryEntry.UNTRUSTED:
if unread:
if source_hash != self.currently_displayed_conversation:
if unread > 1:
display_text += " "+g["unread"]+" ("+str(unread)+")"
else:
display_text += " "+g["unread"]
if last_activity > 0:
display_text += "\n "+relative_time(last_activity)
widget = ListEntry(display_text)
urwid.connect_signal(widget, "click", self.display_conversation, conversation[0])
display_widget = urwid.AttrMap(widget, style, focus_style)
display_widget.source_hash = source_hash
display_widget.display_name = display_name
return display_widget
def shortcuts(self):
focus_path = self.widget.get_focus_path()
if focus_path[0] == 0:
return self.list_shortcuts
elif focus_path[0] == 1:
return self.editor_shortcuts
else:
return self.list_shortcuts
class ListEntry(urwid.Text):
_selectable = True
signals = ["click"]
def keypress(self, size, key):
"""
Send 'click' signal on 'activate' command.
"""
if self._command_map[key] != urwid.ACTIVATE:
return key
self._emit('click')
def mouse_event(self, size, event, button, x, y, focus):
"""
Send 'click' signal on button 1 press.
"""
if button != 1 or not urwid.util.is_mouse_press(event):
return False
self._emit('click')
return True
class MessageEdit(urwid.Edit):
def keypress(self, size, key):
if key == "ctrl d":
self.delegate.send_message()
elif key == "ctrl p":
self.delegate.paper_message()
elif key == "ctrl a":
self.delegate.attach_file()
elif key == "ctrl s":
self.delegate.save_focused_attachments()
elif key == "ctrl k":
self.delegate.clear_editor()
elif key == "up":
y = self.get_cursor_coords(size)[1]
if y == 0:
if self.delegate.full_editor_active and self.name == "title_editor":
self.delegate.frame.focus_position = "body"
elif not self.delegate.full_editor_active and self.name == "content_editor":
self.delegate.frame.focus_position = "body"
else:
return super(MessageEdit, self).keypress(size, key)
else:
return super(MessageEdit, self).keypress(size, key)
else:
return super(MessageEdit, self).keypress(size, key)
class ConversationFrame(urwid.Frame):
def keypress(self, size, key):
if self.focus_position == "body":
if getattr(self.delegate, "dialog_active", False) or getattr(self.delegate, "dialog_open", False):
return super(ConversationFrame, self).keypress(size, key)
elif key == "up" and self.delegate.messagelist.top_is_visible:
nomadnet.NomadNetworkApp.get_shared_instance().ui.main_display.frame.focus_position = "header"
elif key == "down" and self.delegate.messagelist.bottom_is_visible:
self.focus_position = "footer"
else:
return super(ConversationFrame, self).keypress(size, key)
elif key == "ctrl k":
self.delegate.clear_editor()
else:
return super(ConversationFrame, self).keypress(size, key)
class ConversationWidget(urwid.WidgetWrap):
def __init__(self, source_hash):
self.app = nomadnet.NomadNetworkApp.get_shared_instance()
g = self.app.ui.glyphs
if source_hash == None:
self.frame = None
display_widget = urwid.LineBox(urwid.Filler(urwid.Text("\n No conversation selected"), "top"))
super().__init__(display_widget)
else:
if source_hash in ConversationsDisplay.cached_conversation_widgets:
return ConversationsDisplay.cached_conversation_widgets[source_hash]
else:
self.source_hash = source_hash
self.conversation = nomadnet.Conversation(source_hash, nomadnet.NomadNetworkApp.get_shared_instance())
self.message_widgets = []
self.sort_by_timestamp = False
self.updating_message_widgets = False
self.pending_attachments = []
self.dialog_active = False
self.update_message_widgets()
self.conversation.register_changed_callback(self.conversation_changed)
#title_editor = MessageEdit(caption="\u270E", edit_text="", multiline=False)
title_editor = MessageEdit(caption="", edit_text="", multiline=False)
title_editor.delegate = self
title_editor.name = "title_editor"
#msg_editor = MessageEdit(caption="\u270E", edit_text="", multiline=True)
msg_editor = MessageEdit(caption="", edit_text="", multiline=True)
msg_editor.delegate = self
msg_editor.name = "content_editor"
self.peer_info_widget = urwid.AttrMap(urwid.Text(""), "msg_header_sent")
self._update_peer_info()
header_widgets = [self.peer_info_widget]
if self.conversation.trust_level == DirectoryEntry.UNTRUSTED:
header_widgets.append(urwid.AttrMap(
urwid.Padding(
urwid.Text(g["warning"]+" Warning: Conversation with untrusted peer "+g["warning"], align=urwid.CENTER)),
"msg_warning_untrusted",
))
header = urwid.Pile(header_widgets)
self.minimal_editor = urwid.AttrMap(msg_editor, "msg_editor")
self.minimal_editor.name = "minimal_editor"
title_columns = urwid.Columns([
(8, urwid.Text("Title")),
urwid.AttrMap(title_editor, "msg_editor"),
])
content_columns = urwid.Columns([
(8, urwid.Text("Content")),
urwid.AttrMap(msg_editor, "msg_editor")
])
self.full_editor = urwid.Pile([
title_columns,
content_columns
])
self.full_editor.name = "full_editor"
self.content_editor = msg_editor
self.title_editor = title_editor
self.full_editor_active = False
self.frame = ConversationFrame(
self.messagelist,
header=header,
footer=self.minimal_editor,
focus_part="footer"
)
self.frame.delegate = self
self.display_widget = urwid.LineBox(
self.frame
)
super().__init__(self.display_widget)
def _update_peer_info(self):
g = self.app.ui.glyphs
source_hash_bytes = bytes.fromhex(self.source_hash)
display_name = self.app.directory.display_name(source_hash_bytes)
app_data = None
if display_name is None or self.app.message_router.get_outbound_stamp_cost(source_hash_bytes) is None:
app_data = RNS.Identity.recall_app_data(source_hash_bytes)
if display_name is None:
if app_data:
display_name = LXMF.display_name_from_app_data(app_data)
if display_name is None:
display_name = RNS.prettyhexrep(source_hash_bytes)
stamp_cost = self.app.message_router.get_outbound_stamp_cost(source_hash_bytes)
if stamp_cost is None and app_data:
stamp_cost = LXMF.stamp_cost_from_app_data(app_data)
hops = RNS.Transport.hops_to(source_hash_bytes)
if hops >= RNS.Transport.PATHFINDER_M:
hops_str = "unknown"
else:
hops_str = str(hops)+" hop" + ("s" if hops != 1 else "")
right_parts = []
if stamp_cost is not None:
right_parts.append("Stamp: "+str(stamp_cost))
right_parts.append(g["speed"]+hops_str)
left = " "+display_name
right = " ".join(right_parts)+" "
self.peer_info_widget.original_widget.set_text(left+" | "+right)
def clear_history_dialog(self):
def dismiss_dialog(sender):
self.dialog_open = False
self.conversation_changed(None)
def confirmed(sender):
self.dialog_open = False
self.conversation.clear_history()
self.conversation_changed(None)
dialog = DialogLineBox(
urwid.Pile([
urwid.Text("Clear conversation history\n", align=urwid.CENTER),
urwid.Columns([
(urwid.WEIGHT, 0.45, urwid.Button("Yes", on_press=confirmed)),
(urwid.WEIGHT, 0.1, urwid.Text("")),
(urwid.WEIGHT, 0.45, urwid.Button("No", on_press=dismiss_dialog)),
])
]), title="?"
)
dialog.delegate = self
bottom = self.messagelist
overlay = urwid.Overlay(
dialog,
bottom,
align=urwid.CENTER,
width=34,
valign=urwid.MIDDLE,
height=urwid.PACK,
left=2,
right=2,
)
self.frame.contents["body"] = (overlay, self.frame.options())
self.frame.focus_position = "body"
def _build_footer(self):
g = self.app.ui.glyphs
if self.full_editor_active:
editor = self.full_editor
else:
editor = self.minimal_editor
if self.pending_attachments:
attachment_texts = []
for path in self.pending_attachments:
attachment_texts.append(os.path.basename(path))
indicator = urwid.AttrMap(
urwid.Text(g["file"]+" "+str(len(self.pending_attachments))+" file(s): "+", ".join(attachment_texts)),
"msg_header_sent",
)
return urwid.Pile([indicator, editor])
else:
return editor
def toggle_editor(self):
if self.full_editor_active:
self.full_editor_active = False
else:
self.full_editor_active = True
self.frame.contents["footer"] = (self._build_footer(), None)
def check_editor_allowed(self):
g = self.app.ui.glyphs
if self.frame:
allowed = nomadnet.NomadNetworkApp.get_shared_instance().directory.is_known(bytes.fromhex(self.source_hash))
if allowed:
self.frame.contents["footer"] = (self._build_footer(), None)
else:
warning = urwid.AttrMap(
urwid.Padding(urwid.Text(
"\n"+g["info"]+"\n\nYou cannot currently message this peer, since its identity keys are not known. "
"The keys have been requested from the network and should arrive shortly, if available. "
"Close this conversation and reopen it to try again.\n\n"
"To query the network manually, select this conversation in the conversation list, "
"press Ctrl-E, and use the query button.\n",
align=urwid.CENTER,
)),
"msg_header_caution",
)
self.frame.contents["footer"] = (warning, None)
def toggle_focus_area(self):
name = ""
try:
name = self.frame.get_focus_widgets()[0].name
except Exception as e:
pass
if name == "messagelist":
self.frame.focus_position = "footer"
elif name == "minimal_editor" or name == "full_editor":
self.frame.focus_position = "body"
def keypress(self, size, key):
if key == "tab":
self.toggle_focus_area()
elif key == "ctrl w":
self.close()
elif key == "ctrl u":
self.conversation.purge_failed()
self.conversation_changed(None)
elif key == "ctrl t":
self.toggle_editor()
elif key == "ctrl x":
self.clear_history_dialog()
elif key == "ctrl g":
nomadnet.NomadNetworkApp.get_shared_instance().ui.main_display.sub_displays.conversations_display.toggle_fullscreen()
elif key == "ctrl o":
self.sort_by_timestamp ^= True
self.conversation_changed(None)
elif key == "ctrl a":
self.attach_file()
elif key == "ctrl s":
self.save_focused_attachments()
else:
return super(ConversationWidget, self).keypress(size, key)
def conversation_changed(self, conversation):
if hasattr(self, "peer_info_widget"):
self._update_peer_info()
self.update_message_widgets(replace = True)
def update_message_widgets(self, replace = False):
while self.updating_message_widgets:
time.sleep(0.5)
self.updating_message_widgets = True
self.message_widgets = []
added_hashes = []
for message in self.conversation.messages:
message_hash = message.get_hash()
if not message_hash in added_hashes:
added_hashes.append(message_hash)
message_widget = LXMessageWidget(message)
self.message_widgets.append(message_widget)
message.unload()
if self.sort_by_timestamp:
self.message_widgets.sort(key=lambda m: m.timestamp, reverse=False)
else:
self.message_widgets.sort(key=lambda m: m.sort_timestamp, reverse=False)
from nomadnet.vendor.additional_urwid_widgets import IndicativeListBox
self.messagelist = IndicativeListBox(self.message_widgets, position = len(self.message_widgets)-1)
self.messagelist.name = "messagelist"
if replace:
self.frame.contents["body"] = (self.messagelist, None)
nomadnet.NomadNetworkApp.get_shared_instance().ui.loop.draw_screen()
self.updating_message_widgets = False
def clear_editor(self):
self.content_editor.set_edit_text("")
self.title_editor.set_edit_text("")
self.pending_attachments = []
self.frame.contents["footer"] = (self._build_footer(), None)
def _collect_attachment_refs(self):
g = self.app.ui.glyphs
refs = []
sorted_messages = sorted(self.conversation.messages, key=lambda m: m.sort_timestamp, reverse=True)
for conv_message in sorted_messages:
if not conv_message.has_attachments():
continue
cached_names = conv_message._cached_attachment_names or []
att_file_idx = 0
for atype, aname, *arest in cached_names:
asize = arest[0] if arest else 0
glyph = g["file"] if atype == "file" else g[atype]
label = glyph+" "+aname
if asize > 0:
label += " ("+_format_size(asize)+")"
if atype == "file":
refs.append((label, aname, conv_message, "file", att_file_idx))
att_file_idx += 1
else:
refs.append((label, aname, conv_message, atype, 0))
return refs
def save_focused_attachments(self):
g = self.app.ui.glyphs
self.dialog_active = True
try:
attachment_items = self._collect_attachment_refs()
except Exception as e:
RNS.log("Error collecting attachments: "+str(e), RNS.LOG_ERROR)
attachment_items = []
save_dir = self.app.attachment_save_path if self.app.attachment_save_path else self.app.downloads_path
def dismiss_dialog(sender):
self.dialog_active = False
self.conversation_changed(None)
if not attachment_items:
dialog = DialogLineBox(
urwid.Pile([
urwid.Text("No attachments in this conversation.\n"),
urwid.Columns([
(urwid.WEIGHT, 0.6, urwid.Text("")),
(urwid.WEIGHT, 0.4, urwid.Button("OK", on_press=dismiss_dialog)),
])
]), title="Attachments"
)
dialog.delegate = self
bottom = self.messagelist
overlay = urwid.Overlay(dialog, bottom, align=urwid.CENTER, width=45, valign=urwid.MIDDLE, height=urwid.PACK, left=2, right=2)
self.frame.contents["body"] = (overlay, self.frame.options())
self.frame.focus_position = "body"
return
checkboxes = []
for label, filename, conv_msg, field_type, field_index in attachment_items:
cb = urwid.CheckBox(label, state=False)
cb._attachment_filename = filename
cb._conv_message = conv_msg
cb._field_type = field_type
cb._field_index = field_index
checkboxes.append(cb)
status_text = urwid.Text("")
def do_save(sender):
saved = []
errors = []
for cb in checkboxes:
if cb.get_state():
try:
src_path = cb._conv_message.get_attachment_file_path(cb._field_type, cb._field_index)
if src_path and os.path.isfile(src_path):
path = _copy_attachment_to_dest(cb._attachment_filename, src_path)
saved.append(path)
except Exception as e:
errors.append(str(e))
if saved:
lines = [g["check"]+" Copied "+str(len(saved))+" file(s) to "+save_dir+":"]
for p in saved:
lines.append(" "+os.path.basename(p))
if errors:
lines.append(g["cross"]+" "+str(len(errors))+" failed")
status_text.set_text("\n".join(lines))
elif errors:
status_text.set_text(g["cross"]+" Failed: "+errors[0])
else:
status_text.set_text("No files selected")
dialog_widgets = list(checkboxes)
dialog_widgets.append(urwid.Divider(g["divider1"]))
dialog_widgets.append(urwid.Text("Copy to: "+save_dir))
dialog_widgets.append(status_text)
dialog_widgets.append(urwid.Text(""))
dialog_widgets.append(urwid.Columns([
(urwid.WEIGHT, 0.45, urwid.Button("Copy to Downloads", on_press=do_save)),
(urwid.WEIGHT, 0.1, urwid.Text("")),
(urwid.WEIGHT, 0.45, urwid.Button("Close", on_press=dismiss_dialog)),
]))
dialog = DialogLineBox(urwid.ListBox(urwid.SimpleFocusListWalker(dialog_widgets)), title="Attachments")
dialog.delegate = self
bottom = self.messagelist
overlay = urwid.Overlay(dialog, bottom, align=urwid.CENTER, width=("relative", 80), valign=urwid.MIDDLE, height=("relative", 80), left=2, right=2)
self.frame.contents["body"] = (overlay, self.frame.options())
self.frame.focus_position = "body"
def send_message(self):
content = self.content_editor.get_edit_text()
title = self.title_editor.get_edit_text()
if not content == "":
fields = None
if self.pending_attachments:
file_attachments = []
for file_path in self.pending_attachments:
try:
with open(file_path, "rb") as af:
file_data = af.read()
file_name = os.path.basename(file_path)
file_attachments.append([file_name, file_data])
except Exception as e:
RNS.log("Error reading attachment "+str(file_path)+": "+str(e), RNS.LOG_ERROR)
if file_attachments:
fields = {LXMF.FIELD_FILE_ATTACHMENTS: file_attachments}
if self.conversation.send(content, title, fields=fields):
self.clear_editor()
def attach_file(self):
self.dialog_active = True
browser = FileBrowserDialog(self)
bottom = self.messagelist
overlay = urwid.Overlay(browser, bottom, align=urwid.CENTER, width=("relative", 90), valign=urwid.MIDDLE, height=("relative", 80), left=2, right=2)
self.frame.contents["body"] = (overlay, self.frame.options())
self.frame.focus_position = "body"
def file_browser_closed(self):
self.dialog_active = False
self.frame.contents["footer"] = (self._build_footer(), None)
self.conversation_changed(None)
def paper_message_saved(self, path):
g = self.app.ui.glyphs
def dismiss_dialog(sender):
self.dialog_open = False
self.conversation_changed(None)
dialog = DialogLineBox(
urwid.Pile([
urwid.Text("The paper message was saved to:\n\n"+str(path)+"\n", align=urwid.CENTER),
urwid.Columns([
(urwid.WEIGHT, 0.6, urwid.Text("")),
(urwid.WEIGHT, 0.4, urwid.Button("OK", on_press=dismiss_dialog)),
])
]), title=g["papermsg"].replace(" ", "")
)
dialog.delegate = self
bottom = self.messagelist
overlay = urwid.Overlay(dialog, bottom, align=urwid.CENTER, width=60, valign=urwid.MIDDLE, height=urwid.PACK, left=2, right=2)
self.frame.contents["body"] = (overlay, self.frame.options())
self.frame.focus_position = "body"
def print_paper_message_qr(self):
content = self.content_editor.get_edit_text()
title = self.title_editor.get_edit_text()
if not content == "":
if self.conversation.paper_output(content, title):
self.clear_editor()
else:
self.paper_message_failed()
def save_paper_message_qr(self):
content = self.content_editor.get_edit_text()
title = self.title_editor.get_edit_text()
if not content == "":
output_result = self.conversation.paper_output(content, title, mode="save_qr")
if output_result != False:
self.clear_editor()
self.paper_message_saved(output_result)
else:
self.paper_message_failed()
def save_paper_message_uri(self):
content = self.content_editor.get_edit_text()
title = self.title_editor.get_edit_text()
if not content == "":
output_result = self.conversation.paper_output(content, title, mode="save_uri")
if output_result != False:
self.clear_editor()
self.paper_message_saved(output_result)
else:
self.paper_message_failed()
def paper_message(self):
def dismiss_dialog(sender):
self.dialog_open = False
self.conversation_changed(None)
def print_qr(sender):
dismiss_dialog(self)
self.print_paper_message_qr()
def save_qr(sender):
dismiss_dialog(self)
self.save_paper_message_qr()
def save_uri(sender):
dismiss_dialog(self)
self.save_paper_message_uri()
dialog = DialogLineBox(
urwid.Pile([
urwid.Text(
"Select the desired paper message output method.\nSaved files will be written to:\n\n"+str(self.app.downloads_path)+"\n",
align=urwid.CENTER,
),
urwid.Columns([
(urwid.WEIGHT, 0.5, urwid.Button("Print QR", on_press=print_qr)),
(urwid.WEIGHT, 0.1, urwid.Text("")),
(urwid.WEIGHT, 0.5, urwid.Button("Save QR", on_press=save_qr)),
(urwid.WEIGHT, 0.1, urwid.Text("")),
(urwid.WEIGHT, 0.5, urwid.Button("Save URI", on_press=save_uri)),
(urwid.WEIGHT, 0.1, urwid.Text("")),
(urwid.WEIGHT, 0.5, urwid.Button("Cancel", on_press=dismiss_dialog))
])
]), title="Create Paper Message"
)
dialog.delegate = self
bottom = self.messagelist
overlay = urwid.Overlay(dialog, bottom, align=urwid.CENTER, width=60, valign=urwid.MIDDLE, height=urwid.PACK, left=2, right=2)
self.frame.contents["body"] = (overlay, self.frame.options())
self.frame.focus_position = "body"
def paper_message_failed(self):
def dismiss_dialog(sender):
self.dialog_open = False
self.conversation_changed(None)
dialog = DialogLineBox(
urwid.Pile([
urwid.Text(
"Could not output paper message,\ncheck your settings. See the log\nfile for any error messages.\n",
align=urwid.CENTER,
),
urwid.Columns([
(urwid.WEIGHT, 0.6, urwid.Text("")),
(urwid.WEIGHT, 0.4, urwid.Button("OK", on_press=dismiss_dialog)),
])
]), title="!"
)
dialog.delegate = self
bottom = self.messagelist
overlay = urwid.Overlay(dialog, bottom, align=urwid.CENTER, width=34, valign=urwid.MIDDLE, height=urwid.PACK, left=2, right=2)
self.frame.contents["body"] = (overlay, self.frame.options())
self.frame.focus_position = "body"
def close(self):
self.delegate.close_conversation(self)
class LXMessageWidget(urwid.WidgetWrap):
def __init__(self, message):
app = nomadnet.NomadNetworkApp.get_shared_instance()
g = app.ui.glyphs
self.timestamp = message.get_timestamp()
self.sort_timestamp = message.sort_timestamp
self.transfer_done = False
self._live_lxm = None
msg_hash = message.get_hash()
msg_state = message.get_state()
msg_source_hash = message._cached_source_hash
msg_method = message._cached_method
time_format = app.time_format
message_time = datetime.fromtimestamp(self.timestamp)
encryption_string = ""
if message.get_transport_encrypted():
encryption_string = " "+g["encrypted"]
else:
encryption_string = " "+g["plaintext"]
title_string = relative_time(self.timestamp)+" | "+message_time.strftime(time_format)+encryption_string
is_outbound = False
if msg_source_hash is None:
header_style = "msg_header_failed"
title_string = g["warning"]+" "+title_string
elif app.lxmf_destination.hash == msg_source_hash:
is_outbound = True
if msg_state == LXMF.LXMessage.DELIVERED:
header_style = "msg_header_delivered"
title_string = g["check"]+" "+g["arrow_r"]+" "+title_string
elif msg_state == LXMF.LXMessage.FAILED:
header_style = "msg_header_failed"
title_string = g["cross"]+" "+g["arrow_r"]+" "+title_string
elif msg_state == LXMF.LXMessage.REJECTED:
header_style = "msg_header_failed"
title_string = g["cross"]+" "+g["arrow_r"]+" Rejected "+title_string
elif msg_method == LXMF.LXMessage.PROPAGATED and msg_state == LXMF.LXMessage.SENT:
header_style = "msg_header_propagated"
title_string = g["sent"]+" "+g["arrow_r"]+" "+title_string
elif msg_method == LXMF.LXMessage.PAPER and msg_state == LXMF.LXMessage.PAPER:
header_style = "msg_header_propagated"
title_string = g["papermsg"]+" "+g["arrow_r"]+" "+title_string
elif msg_state == LXMF.LXMessage.SENT:
header_style = "msg_header_sent"
title_string = g["sent"]+" "+g["arrow_r"]+" "+title_string
else:
header_style = "msg_header_sent"
title_string = g["arrow_r"]+" "+title_string
else:
if message.signature_validated():
header_style = "msg_header_ok"
title_string = g["check"]+" "+g["arrow_l"]+" "+title_string
else:
header_style = "msg_header_caution"
title_string = g["warning"]+" "+g["arrow_l"]+" "+message.get_signature_description() + "\n " + title_string
if message.get_title() != "":
title_string += " | " + message.get_title()
has_attachments = message.has_attachments()
cached_names = message._cached_attachment_names or []
if has_attachments and cached_names:
attachment_strings = []
for atype, aname, *arest in cached_names:
attachment_strings.append(g[atype if atype != "file" else "file"]+" "+aname)
title_string += " | " + " ".join(attachment_strings)
title = urwid.AttrMap(urwid.Text(title_string), header_style)
self.progress_widget = urwid.Text("")
self.progress_attr = urwid.AttrMap(self.progress_widget, "progress_full")
content_text = message.get_content()
content_lines = content_text.split("\n")
indented = "\n".join(" "+line for line in content_lines)
pile_widgets = [title]
if is_outbound and msg_state is not None and msg_state < LXMF.LXMessage.SENT and msg_hash is not None:
try:
for pending in app.message_router.pending_outbound:
if pending.hash == msg_hash:
if pending.representation == LXMF.LXMessage.RESOURCE:
self._live_lxm = pending
break
except Exception:
pass
if self._live_lxm is not None:
pct = int(self._live_lxm.progress * 100)
bar_width = 20
filled = int(bar_width * self._live_lxm.progress)
if app.ui.colormode >= 256:
bar = "\u2588" * filled + "\u2591" * (bar_width - filled)
else:
bar = "#" * filled + "-" * (bar_width - filled)
self.progress_widget.set_text(" ["+bar+"] "+str(pct)+"%")
pile_widgets.append(self.progress_attr)
self._start_progress_poll()
pile_widgets.append(urwid.Text(indented))
if has_attachments and cached_names:
att_file_idx = 0
for atype, aname, *arest in cached_names:
glyph = g["file"] if atype == "file" else g[atype]
asize = arest[0] if arest else 0
label = " "+glyph+" "+aname
if asize > 0:
label += " ("+_format_size(asize)+")"
if atype == "file":
pile_widgets.append(ClickableAttachment(label, aname, message, "file", att_file_idx))
att_file_idx += 1
else:
pile_widgets.append(ClickableAttachment(label, aname, message, atype))
pile_widgets.append(urwid.Text(""))
super().__init__(urwid.Pile(pile_widgets))
def _start_progress_poll(self):
try:
loop = nomadnet.NomadNetworkApp.get_shared_instance().ui.loop
if loop:
loop.set_alarm_in(0.3, self._poll_progress)
except Exception:
pass
def _poll_progress(self, loop=None, user_data=None):
if self.transfer_done:
return
if self._live_lxm is None:
self.transfer_done = True
return
app = nomadnet.NomadNetworkApp.get_shared_instance()
g = app.ui.glyphs
progress = self._live_lxm.progress
state = self._live_lxm.state
pct = int(progress * 100)
if state == LXMF.LXMessage.FAILED:
self.progress_widget.set_text(" "+g["cross"]+" Transfer failed")
self.transfer_done = True
self._live_lxm = None
elif state == LXMF.LXMessage.REJECTED:
self.progress_widget.set_text(" "+g["cross"]+" Rejected: too large or not accepted")
self.transfer_done = True
self._live_lxm = None
elif state >= LXMF.LXMessage.SENT:
self.progress_widget.set_text("")
self.transfer_done = True
self._live_lxm = None
else:
bar_width = 20
filled = int(bar_width * progress)
if app.ui.colormode >= 256:
bar = "\u2588" * filled + "\u2591" * (bar_width - filled)
else:
bar = "#" * filled + "-" * (bar_width - filled)
self.progress_widget.set_text(" ["+bar+"] "+str(pct)+"%")
if not self.transfer_done:
try:
ui_loop = app.ui.loop
if ui_loop:
ui_loop.set_alarm_in(0.3, self._poll_progress)
ui_loop.draw_screen()
except Exception:
pass
class ClickableAttachment(urwid.Text):
def __init__(self, label, filename, conv_message, field_type, field_index=0):
self.filename = filename
self.conv_message = conv_message
self.field_type = field_type
self.field_index = field_index
self.saved = False
super().__init__(label)
def mouse_event(self, size, event, button, x, y, focus):
if button == 1 and urwid.util.is_mouse_press(event):
self._save()
return True
return False
def _save(self):
if self.saved:
return
app = nomadnet.NomadNetworkApp.get_shared_instance()
g = app.ui.glyphs
try:
src_path = self.conv_message.get_attachment_file_path(self.field_type, self.field_index)
if src_path and os.path.isfile(src_path):
save_path = _copy_attachment_to_dest(self.filename, src_path)
else:
if self.field_type == "file":
attachments = self.conv_message.get_file_attachments()
if self.field_index < len(attachments):
att = attachments[self.field_index]
if isinstance(att, list) and len(att) >= 2:
data = att[1] if isinstance(att[1], bytes) else b""
else:
data = b""
else:
data = b""
elif self.field_type == "image":
data = self.conv_message.get_image()
data = data if isinstance(data, bytes) else b""
elif self.field_type == "audio":
data = self.conv_message.get_audio()
data = data if isinstance(data, bytes) else b""
else:
data = b""
self.conv_message.unload()
if not data:
return
save_path = _save_attachment_to_disk(self.filename, data)
self.saved = True
self.set_text(" "+g["check"]+" Copied to: "+save_path)
except Exception as e:
RNS.log("Error saving attachment: "+str(e), RNS.LOG_ERROR)
self.set_text(" "+g["cross"]+" Save failed: "+str(e))
def _copy_attachment_to_dest(filename, src_path):
app = nomadnet.NomadNetworkApp.get_shared_instance()
save_dir = app.attachment_save_path if app.attachment_save_path else app.downloads_path
if not os.path.isdir(save_dir):
os.makedirs(save_dir)
save_path = os.path.join(save_dir, filename)
counter = 0
base, ext = os.path.splitext(filename)
while os.path.isfile(save_path):
counter += 1
save_path = os.path.join(save_dir, base+"_"+str(counter)+ext)
shutil.copy2(src_path, save_path)
return save_path
def _save_attachment_to_disk(filename, data):
app = nomadnet.NomadNetworkApp.get_shared_instance()
save_dir = app.attachment_save_path if app.attachment_save_path else app.downloads_path
if not os.path.isdir(save_dir):
os.makedirs(save_dir)
save_path = os.path.join(save_dir, filename)
counter = 0
base, ext = os.path.splitext(filename)
while os.path.isfile(save_path):
counter += 1
save_path = os.path.join(save_dir, base+"_"+str(counter)+ext)
with open(save_path, "wb") as f:
f.write(data)
return save_path
class FileBrowserEntry(urwid.WidgetWrap):
signals = ["click"]
def __init__(self, name, full_path, is_dir=False, is_parent=False, selected=False):
self.full_path = full_path
self.name = name
self.is_dir = is_dir
self.is_parent = is_parent
self.selected = selected
g = nomadnet.NomadNetworkApp.get_shared_instance().ui.glyphs
if is_parent:
display = g["arrow_l"]+" .."
elif is_dir:
display = g["arrow_r"]+" "+name+"/"
elif selected:
display = g["check"]+" "+name
else:
display = " "+name
self.text_widget = urwid.SelectableIcon(display, 0)
if is_dir or is_parent:
style = "list_trusted"
focus_style = "list_focus"
elif selected:
style = "list_trusted"
focus_style = "list_focus_trusted"
else:
style = "list_unknown"
focus_style = "list_focus"
display_widget = urwid.AttrMap(self.text_widget, style, focus_style)
super().__init__(display_widget)
def keypress(self, size, key):
if key == "enter":
self._emit("click")
else:
return key
def mouse_event(self, size, event, button, x, y, focus):
if button == 1 and urwid.util.is_mouse_press(event):
self._emit("click")
return True
return False
class FileBrowserDialog(urwid.WidgetWrap):
def __init__(self, delegate):
self.delegate = delegate
app = nomadnet.NomadNetworkApp.get_shared_instance()
self.g = app.ui.glyphs
self.current_path = os.path.expanduser("~")
self.path_label = urwid.Text("")
self.status_label = urwid.Text("")
self.file_walker = urwid.SimpleFocusListWalker([])
self.file_listbox = urwid.ListBox(self.file_walker)
self.button_columns = urwid.Columns([
(urwid.WEIGHT, 0.45, urwid.Button("Done", on_press=self._dismiss)),
(urwid.WEIGHT, 0.1, urwid.Text("")),
(urwid.WEIGHT, 0.45, urwid.Button("Cancel", on_press=self._cancel)),
])
header_pile = urwid.Pile([
self.path_label,
self.status_label,
urwid.Divider(self.g["divider1"]),
])
footer_pile = urwid.Pile([
urwid.Divider(self.g["divider1"]),
self.button_columns,
])
self._populate()
self.browser_frame = urwid.Frame(
self.file_listbox,
header=header_pile,
footer=footer_pile,
)
linebox = urwid.LineBox(self.browser_frame, title="Attach File")
super().__init__(linebox)
def _update_status(self):
pending = self.delegate.pending_attachments
if pending:
names = [os.path.basename(p) for p in pending]
self.status_label.set_text(" "+self.g["file"]+" "+str(len(pending))+" selected: "+", ".join(names))
else:
self.status_label.set_text(" No files selected")
def _populate(self):
self.path_label.set_text(" "+self.current_path)
self._update_status()
focus_pos = None
try:
focus_pos = self.file_listbox.focus_position
except Exception:
pass
entries = []
parent = os.path.dirname(self.current_path)
if parent != self.current_path:
entry = FileBrowserEntry("..", parent, is_parent=True)
urwid.connect_signal(entry, "click", self._entry_clicked, entry)
entries.append(entry)
try:
items = sorted(os.listdir(self.current_path))
except PermissionError:
entries.append(urwid.Text(("error_text", " Permission denied")))
self.file_walker[:] = entries
return
dirs = []
files = []
for item in items:
if item.startswith("."):
continue
full = os.path.join(self.current_path, item)
if os.path.isdir(full):
dirs.append((item, full))
elif os.path.isfile(full):
files.append((item, full))
for name, full in dirs:
entry = FileBrowserEntry(name, full, is_dir=True)
urwid.connect_signal(entry, "click", self._entry_clicked, entry)
entries.append(entry)
for name, full in files:
is_selected = full in self.delegate.pending_attachments
entry = FileBrowserEntry(name, full, selected=is_selected)
urwid.connect_signal(entry, "click", self._entry_clicked, entry)
entries.append(entry)
if not dirs and not files:
entries.append(urwid.Text(("inactive_text", " (empty)")))
self.file_walker[:] = entries
if focus_pos is not None and focus_pos < len(entries):
self.file_listbox.set_focus(focus_pos)
elif entries:
self.file_listbox.set_focus(0)
def _entry_clicked(self, entry_widget, user_data=None):
entry = user_data if user_data else entry_widget
if entry.is_dir or entry.is_parent:
self.current_path = entry.full_path
self._populate()
else:
if entry.full_path in self.delegate.pending_attachments:
self.delegate.pending_attachments.remove(entry.full_path)
else:
self.delegate.pending_attachments.append(entry.full_path)
self.delegate.frame.contents["footer"] = (self.delegate._build_footer(), None)
self._populate()
def _dismiss(self, sender):
self.delegate.file_browser_closed()
def _cancel(self, sender):
self.delegate.pending_attachments.clear()
self.delegate.frame.contents["footer"] = (self.delegate._build_footer(), None)
self.delegate.file_browser_closed()
def keypress(self, size, key):
if key == "esc":
self.delegate.file_browser_closed()
return
result = super().keypress(size, key)
if result == "down" and self.browser_frame.focus_position == "body":
self.browser_frame.focus_position = "footer"
return
elif result == "up" and self.browser_frame.focus_position == "footer":
self.browser_frame.focus_position = "body"
return
return result
class SyncProgressBar(urwid.ProgressBar):
def get_text(self):
status = nomadnet.NomadNetworkApp.get_shared_instance().get_sync_status()
show_percent = nomadnet.NomadNetworkApp.get_shared_instance().sync_status_show_percent()
if show_percent:
return status+" "+super().get_text()
else:
return status