1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
|
# Copyright (C) 2025 Stephen Fairchild (s-fairchild@users.sourceforge.net)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program in the file entitled COPYING.
# If not, see <http://www.gnu.org/licenses/>.
__all__ = ["main"]
_description_ = "Repeated encoder activations in formats of your choice."
import re
import time
import tempfile
import time
import logging
import tempfile
from contextlib import nullcontext as nulcx
from idjc.streamspec import Defaults
from .common import *
logging.basicConfig(level=logging.ERROR)
logger = logging.getLogger("testing")
def main():
streams = 6
up_time = 1
down_time = 1
port = 8185
print(_description_)
choice = lambda x: print("Yes" if x else "No")
while 1:
try:
Defaults.choose_from_menu()
except ValueError:
continue
else:
break
random = input("Want random parameters? (Y/n) ").lower() in ("y", "yes", "")
choice(random)
rslt = input(f"Enter number of streams, uptime, downtime. "
f"(Defaults are '{streams} {up_time} {down_time}') ")
match = re.match(r"^\s*(\d+)\s*(\d+)\s*(\d+)\s*$", rslt)
if match is None:
print("Using default values")
else:
try:
streams, up_time, down_time = (int(x) for x in match.groups()
if int(x) > 0)
except ValueError:
print("Using default values")
upload = input("Send to Icecast? (Y/n) ").lower() in ("y", "yes", "")
choice(upload)
noise = input("Encode random noise (y/N) ").lower() in ("y", "yes")
choice(noise)
log_level = input("Log level (dIwec) ").lower()
try:
logger.setLevel({'d': logging.DEBUG, 'i': logging.INFO,
'w': logging.WARN, 'e': logging.ERROR,
'c': logging.CRITICAL}[log_level])
except KeyError:
logger.setLevel(logging.INFO)
print(logging.getLevelName(logger.level))
with tempfile.TemporaryDirectory() as tmpdirname:
with Backend(streams, tmpdirname) as (mx_send, sc_send, receive):
ping = mk_ping(mx_send, receive)
def ping_sleep(duration):
for _ in range(duration):
ping()
time.sleep(1)
encoder = Encoder(sc_send, receive, random, streams)
source = Source(sc_send, receive, port, streams) if upload else nulcx()
try:
with IcecastServer(port=port) if upload else nulcx():
with RandomNoise(sc_send, receive) if noise else nulcx():
while 1:
with (encoder, source):
ping_sleep(up_time)
ping_sleep(down_time)
except KeyboardInterrupt:
pass
except BrokenPipeError as e:
logger.critical(e)
|