Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 173 additions & 12 deletions meshtastic/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -742,10 +742,102 @@ def onConnected(interface):
printConfig(node.moduleConfig)

if args.configure:
with open(args.configure[0], encoding="utf8") as file:
configuration = yaml.safe_load(file)
filename = args.configure[0]

is_binary = False
if hasattr(args, "export_format") and args.export_format in ("binary", "protobuf"):
is_binary = True
elif hasattr(args, "export_format") and args.export_format == "yaml":
is_binary = False
else:
# Autodetection
try:
with open(filename, "r", encoding="utf8") as file:
configuration = yaml.safe_load(file)
if isinstance(configuration, dict):
is_binary = False
else:
is_binary = True
except Exception:
is_binary = True

if is_binary:
from meshtastic.protobuf import clientonly_pb2
# Read the binary protobuf file
with open(filename, "rb") as file:
profile = clientonly_pb2.DeviceProfile()
profile.ParseFromString(file.read())

closeNow = True
interface.getNode(args.dest, False, **getNode_kwargs).beginSettingsTransaction()

if profile.long_name:
print(f"Setting device owner to {profile.long_name}")
waitForAckNak = True
interface.getNode(args.dest, False, **getNode_kwargs).setOwner(profile.long_name)
time.sleep(0.5)

if profile.short_name:
print(f"Setting device owner short to {profile.short_name}")
waitForAckNak = True
interface.getNode(args.dest, False, **getNode_kwargs).setOwner(
long_name=None, short_name=profile.short_name
)
time.sleep(0.5)

if profile.channel_url:
print(f"Setting channel url to {profile.channel_url}")
interface.getNode(args.dest, **getNode_kwargs).setURL(profile.channel_url)
time.sleep(0.5)

if profile.canned_messages:
print(f"Setting canned message messages to {profile.canned_messages}")
interface.getNode(args.dest, **getNode_kwargs).set_canned_message(profile.canned_messages)
time.sleep(0.5)

if profile.ringtone:
print(f"Setting ringtone to {profile.ringtone}")
interface.getNode(args.dest, **getNode_kwargs).set_ringtone(profile.ringtone)
time.sleep(0.5)

if profile.HasField("fixed_position"):
# Convert high-precision integer coordinates back to floating point
pos = profile.fixed_position
lat = pos.latitude_i / 1e7 if pos.latitude_i else 0.0
lon = pos.longitude_i / 1e7 if pos.longitude_i else 0.0
alt = pos.altitude if pos.altitude else 0
print(f"Fixing altitude at {alt} meters")
print(f"Fixing latitude at {lat} degrees")
print(f"Fixing longitude at {lon} degrees")
print("Setting device position")
interface.localNode.setFixedPosition(lat, lon, alt)
time.sleep(0.5)

if profile.HasField("config"):
# Dynamically iterate through populated config sections (e.g. lora, display)
localConfig = interface.getNode(args.dest, **getNode_kwargs).localConfig
for field in profile.config.DESCRIPTOR.fields:
if profile.config.HasField(field.name):
getattr(localConfig, field.name).CopyFrom(getattr(profile.config, field.name))
interface.getNode(args.dest, **getNode_kwargs).writeConfig(field.name)
time.sleep(0.5)

if profile.HasField("module_config"):
# Dynamically iterate through populated module config sections (e.g. telemetry, serial)
moduleConfig = interface.getNode(args.dest, **getNode_kwargs).moduleConfig
for field in profile.module_config.DESCRIPTOR.fields:
if profile.module_config.HasField(field.name):
getattr(moduleConfig, field.name).CopyFrom(getattr(profile.module_config, field.name))
interface.getNode(args.dest, **getNode_kwargs).writeConfig(field.name)
time.sleep(0.5)

interface.getNode(args.dest, False, **getNode_kwargs).commitSettingsTransaction()
print("Writing modified configuration to device")

else:
with open(filename, "r", encoding="utf8") as file:
configuration = yaml.safe_load(file)
closeNow = True
interface.getNode(args.dest, False, **getNode_kwargs).beginSettingsTransaction()

if "owner" in configuration:
Expand Down Expand Up @@ -858,18 +950,36 @@ def onConnected(interface):
return

closeNow = True
config_txt = export_config(interface)

if args.export_config == "-":
# Output to stdout (preserves legacy use of `> file.yaml`)
print(config_txt)
is_binary = False
if hasattr(args, "export_format") and args.export_format in ("binary", "protobuf"):
is_binary = True
elif hasattr(args, "export_format") and args.export_format == "yaml":
is_binary = False
else:
is_binary = args.export_config.endswith(".cfg")

if is_binary:
config_bytes = export_profile(interface)
try:
with open(args.export_config, "w", encoding="utf-8") as f:
f.write(config_txt)
print(f"Exported configuration to {args.export_config}")
with open(args.export_config, "wb") as f:
f.write(config_bytes)
print(f"Exported profile to {args.export_config}")
except Exception as e:
meshtastic.util.our_exit(f"ERROR: Failed to write config file: {e}")
meshtastic.util.our_exit(f"ERROR: Failed to write profile file: {e}")
else:
config_txt = export_config(interface)

if args.export_config == "-":
# Output to stdout (preserves legacy use of `> file.yaml`)
print(config_txt)
else:
try:
with open(args.export_config, "w", encoding="utf-8") as f:
f.write(config_txt)
print(f"Exported configuration to {args.export_config}")
except Exception as e:
meshtastic.util.our_exit(f"ERROR: Failed to write config file: {e}")

if args.ch_set_url:
closeNow = True
Expand Down Expand Up @@ -1332,6 +1442,49 @@ def export_config(interface) -> str:
config_txt += yaml.dump(configObj)
return config_txt

def export_profile(interface) -> bytes:
"""used in --export-config for binary .cfg files"""
from meshtastic.protobuf import clientonly_pb2

profile = clientonly_pb2.DeviceProfile()

owner = interface.getLongName()
owner_short = interface.getShortName()
channel_url = interface.localNode.getURL()
myinfo = interface.getMyNodeInfo()
canned_messages = interface.getCannedMessage()
ringtone = interface.getRingtone()

if owner:
profile.long_name = owner
if owner_short:
profile.short_name = owner_short
if channel_url:
profile.channel_url = channel_url
if canned_messages:
profile.canned_messages = canned_messages
if ringtone:
profile.ringtone = ringtone

profile.config.CopyFrom(interface.localNode.localConfig)
profile.module_config.CopyFrom(interface.localNode.moduleConfig)

pos = myinfo.get("position")
if pos:
lat = pos.get("latitude")
lon = pos.get("longitude")
alt = pos.get("altitude")

if lat or lon or alt:
if lat:
profile.fixed_position.latitude_i = int(lat * 1e7)
if lon:
profile.fixed_position.longitude_i = int(lon * 1e7)
if alt:
profile.fixed_position.altitude = int(alt)

return profile.SerializeToString()


def create_power_meter():
"""Setup the power meter."""
Expand Down Expand Up @@ -1702,15 +1855,23 @@ def addImportExportArgs(parser: argparse.ArgumentParser) -> argparse.ArgumentPar

group.add_argument(
"--configure",
help="Specify a path to a yaml(.yml) file containing the desired settings for the connected device.",
"--import-config",
dest="configure",
help="Specify a path to a configuration file to import. Autodetects format (yaml or binary protobuf).",
action="append",
)
group.add_argument(
"--export-config",
nargs="?",
const="-", # default to "-" if no value provided
metavar="FILE",
help="Export device config as YAML (to stdout if no file given)"
help="Export device config (to stdout if no file given). Autodetects format by extension if possible."
)
group.add_argument(
"--export-format",
choices=["auto", "yaml", "binary", "protobuf"],
default="auto",
help="Format for export or import. 'auto' uses file extension or contents. 'binary' or 'protobuf' forces binary format. 'yaml' forces yaml."
)
return parser

Expand Down
Loading