在日常工作中,经常需要时间戳转化、base64 编码/解码等操作。之前一般通过搜索引擎搜索,可以找到相应工具的页面。现在有了 Streamlit ,可以快速制作出对应功能的网页应用。例如以下的一些例子

安装:

pip install streamlit

Streamlit 脚本是一个从上到下执行的命令流,页面中控件值的改变会使得脚本重新执行

运行 streamlit 脚本:

streamlit run main.py

JSON 格式化

主要用到了以下控件

  • st.markdown:展示 Markdown 格式的文本,可以用展示标题等
  • st.text_area:输入文本框

一个文本框用于输入原始 JSON 数据,一个文本框展示格式化后的 JSON

import streamlit as st
import json

def _format(s: str) -> str:
    try:
        pretty = json.dumps(json.loads(s), indent=4)
    except Exception as e:
        st.error(f'Error: {e}')
        pretty = ''
    return pretty

st.markdown('# JSON Formatter')
text_in = st.text_area('Input', '{}', height=100)
text_out = _format(text_in)
st.text_area(label='Output', value=text_out, height=600)

也可以使用控件 st.json ,自带格式化等功能

text_in = st.text_area('Input', '{}', height=100)
st.json(json.loads(text_in))

Base64 编码/解码

可以选择是编码还是解码,比上面的例子多一个选择控件

  • st.radio:单选控件
import base64

def _encode(s: str) -> str:
    try:
        encoded = base64.b64encode(s.encode('utf-8')).decode('utf-8')
    except Exception as e:
        st.error(f'Error: {e}')
        encoded = ''
    return encoded

def _decode(s: str) -> str:
    try:
        decoded = base64.b64decode(s.encode('utf-8')).decode('utf-8')
    except Exception as e:
        st.error(f'Error: {e}')
        decoded = ''
    return decoded


st.markdown('# Base64 Encode/Decode')

mode = st.radio(
    'Encode/Decode',
    options=['encode', 'decode'],
)

text_in = st.text_area('Input', '')
text_out = ''
if mode == 'encode':
    text_out = _encode(text_in)
else:
    text_out = _decode(text_in)
st.text_area(label='Output', value=text_out)

URL 地址编码/解码

与上面例子类似,只是编码解码函数不同

def _encode(s: str) -> str:
    try:
        encoded = urllib.parse.quote(s)
    except Exception as e:
        st.error(f'Error: {e}')
        encoded = ''
    return encoded

def _decode(s: str) -> str:
    try:
        decoded = urllib.parse.unquote(s)
    except Exception as e:
        st.error(f'Error: {e}')
        decoded = ''
    return decoded

st.markdown('# URL Encode/Decode')

mode = st.radio(
    'Encode/Decode',
    options=['encode', 'decode'],
)

text_in = st.text_area('Input', '')
text_out = ''
if mode == 'encode':
    text_out = _encode(text_in)
else:
    text_out = _decode(text_in)
st.text_area(label='Output', value=text_out)

单位转化

不同单位类型可以放在不同的 tab,这里创建一个多 tab 页面

  • st.tabs:创建多个 tab
  • st.selectbox:下拉选择框
import re
import streamlit as st

NUMERIC_PATTERN = r"-?\d+(\.\d+)?"
TIME_UNITS = {
    "second": 1.,
    "millisecond": 1e-3,
    "microsecond": 1e-6,
    "nanosecond": 1e-9,
    "minute": 60.,
    "hour": 3600.,
}

DATA_UNITS = {
    "Gb": 1024*1024*1024,
    "Mb": 1024*1024,
    "Kb": 1024,
    "bytes": 1,
    "bit": 0.125,
    "Tb": 1024*1024*1024*1024,
}

def _convert_time(_in: str, input_unit, output_unit) -> str:
    if _in == '':
        return ''

    if re.fullmatch(NUMERIC_PATTERN, _in):
        _out = float(_in.replace(',', '')) * TIME_UNITS[input_unit] / TIME_UNITS[output_unit]
        if _out.is_integer():
            return f'{int(_out)}'
        else:
            return f'{_out}'
    else:
        return 'Invalide input'
    

def _convert_data(_in: str, input_unit, output_unit) -> str:
    if _in == '':
        return ''

    if re.fullmatch(NUMERIC_PATTERN, _in):
        _out = float(_in.replace(',', '')) * DATA_UNITS[input_unit] / DATA_UNITS[output_unit]
        if _out.is_integer():
            return f'{int(_out)}'
        else:
            return f'{_out}'
    else:
        return 'Invalide input'


st.markdown("# Unit Converter")

tab_time, tab_data = st.tabs(["Time", "Data"])

with tab_time:
    st.markdown("## Time")

    _in_time = st.text_input(label="input_time", value=1)
    input_time_unit = st.selectbox(label="input_time_unit", options=TIME_UNITS.keys())
    output_time_unit = st.selectbox(label="output_time_unit", options=TIME_UNITS.keys())
    _out_time = _convert_time(_in_time, input_time_unit, output_time_unit)
    st.text_input(label="output_time", value=_out_time, disabled=True)
    
with tab_data:
    st.markdown("## Data Storage")

    _in_data = st.text_input(label="input_data", value=1)
    input_data_unit = st.selectbox(label="input_data_unit", options=DATA_UNITS.keys())
    output_data_unit = st.selectbox(label="output_data_unit", options=DATA_UNITS.keys())
    _out_data = _convert_data(_in_data, input_data_unit, output_data_unit)
    st.text_input(label="output_data", value=_out_data, disabled=True)

时间戳转换

由于 Streamlit 的脚本每次都会从上至下运行,因此普通的变量无法存值,需要通过 st.session 存储和访问

例如以下这个时间戳转换应用,希望可以通过字符串获取 epoch 时间,也能通过 epoch 获取时间字符串,因此需要将一个时间变量保存在 session 钟

  • st.session_state:session 变量,可以用类/字典方式访问
  • st.button:按钮
  • st.columns:页面分列
from datetime import datetime, time
from zoneinfo import ZoneInfo

def _sync():
    st.session_state.t_local = st.session_state.t.astimezone(ZoneInfo('Asia/Shanghai'))
    st.session_state.t_utc = st.session_state.t.astimezone(ZoneInfo('UTC'))

def _now():
    st.session_state.t = datetime.fromtimestamp(int(datetime.now().timestamp()))
    _sync()

if 't' not in st.session_state:
    _now()

def _check(epoch, local_time: str):
    if st.session_state.t.strftime("%Y-%m-%d %H:%M:%S") != local_time:
        try:
            st.session_state.t = datetime.strptime(local_time, "%Y-%m-%d %H:%M:%S")
            _sync()
        except Exception as e:
            st.error(f'Error: {e}')
        return
    
    if int(st.session_state.t.timestamp()) != int(epoch):
        try:
            st.session_state.t = datetime.fromtimestamp(epoch)
            _sync()
        except Exception as e:
            st.error(f'Error: {e}')
        return
    
    _now()

st.markdown("# Timestamp Converter")
st.button(label='Now', type='primary', use_container_width=True, on_click=_now)
epoch = st.number_input(label="Epoch", value=int(st.session_state.t.timestamp()))


col1, col2 = st.columns(2)

with col1:
    st.markdown("## Local Time")
    
    local_time = st.text_input(label="Local Time", value=st.session_state.t_local.strftime("%Y-%m-%d %H:%M:%S"))
    st.text_input(label="ISO Format", value=st.session_state.t_local.isoformat(), disabled=True)
    st.markdown("---")
    st.text_input(label="Midnight Epoch", value=int(datetime.combine(st.session_state.t.date(), time()).timestamp()), disabled=True)
    st.text_input(label="Midnight Time", value=st.session_state.t.date().strftime("%Y-%m-%d %H:%M:%S"), disabled=True)

with col2:
    st.markdown("## UTC Time")

    utc_time = st.text_input(label="UTC Time", value=st.session_state.t_utc.strftime("%Y-%m-%d %H:%M:%S"))
    st.text_input(label="UTC Time", value=st.session_state.t_utc.astimezone(ZoneInfo('UTC')).isoformat(), disabled=True)
    
st.button(label='Update', type='primary', use_container_width=True, on_click=_check, args=(epoch, local_time, ))

多页应用

将上述页面集合成一个多页应用

  • st.page_link:一个跳转按钮,可以是相对于主页面的相对路径,也可以是外部链接
st.markdown("# Tools")

st.page_link("pages/1_Timestamp_Converter.py", label="Time Converter", icon="🕙")
st.page_link("pages/2_Unit_Converter.py", label="Unit Converter", icon="📏")
st.page_link("pages/3_JSON_Formatter.py", label="JSON Formatter", icon="📋")
st.page_link("pages/4_Base64_Encode_Decode.py", label="Base64 Encode/Decode", icon="📜")
st.page_link("pages/5_URL_Encode_Decode.py", label="URL Encode/Decode", icon="🌐")

将前面的脚本都放到 pages 文件夹下,它们就能在 sidebar 中被排列出来(以文件名排序)。

main.py
pages
|- 1_Timestamp_Converter.py
|- 2_Unit_Converter.py
|- 3_JSON_Formatter.py
|- 4_Base64_Encode_Decode.py
|- 5_URL_Encode_Decode.py

设置密码

可以给页面设置密码增加安全性,只有知道密码的人才可以访问

import hmac

def check_password():
    def password_entered():
        if hmac.compare_digest(st.session_state["password"], st.secrets["password"]):
            st.session_state["password_correct"] = True
            del st.session_state["password"]  # Don't store the password.
        else:
            st.session_state["password_correct"] = False

    if st.session_state.get("password_correct", False):
        return True

    st.text_input(
        "Password", type="password", on_change=password_entered, key="password"
    )
    if "password_correct" in st.session_state:
        st.error("😕 Password incorrect")
    return False


if not check_password():
    st.stop()

# the rest of the page
# ...

密码配置放到 ${PWD}/.streamlit/secrets.toml

password = "your_password"

配置文件

配置放到 ${PWD}/.streamlit/config.toml 中,可以在里面配置服务端口、日志格式、页面主题颜色等

[server]
port = 8501
address = "0.0.0.0"

[logger]
level = "info"
messageFormat = "%(asctime)s %(message)s"

[theme]
# The preset Streamlit theme that your custom theme inherits from.
# One of "light" or "dark".
base="dark"

# Primary accent color for interactive elements.
primaryColor = "#81689D"

# Background color for the main content area.
backgroundColor = "#474F7A"

# Background color used for the sidebar and most interactive widgets.
secondaryBackgroundColor = "#1F2544"

# Color used for almost all text.
# textColor =

# Font family for all text in the app, except code blocks. One of "sans serif",
# "serif", or "monospace".
# font =

部署

可以托管到 Streamlit 官方的云上,也可以私有化部署,这里提供一个 Dockerfile

FROM python:3.10.12-slim
ENV TZ=Asia/Shanghai
ENV LANG zh_CN.UTF-8

WORKDIR /app/webtools

COPY ./requirements.txt /app/webtools/
RUN pip3 install --upgrade pip && \
    pip3 install --no-cache-dir -r requirements.txt

COPY ./src  .
COPY secrets.toml .streamlit/
COPY config.toml .streamlit/

EXPOSE 8501
ENTRYPOINT ["streamlit", "run", "main.py"]

如果通过 Nginx 反向代理对外提供服务的话,需要参考 Deploy streamlit with nginx + docker 在 Nginx 中加上以下配置

server {
    server_name website.com;

    location / {
        proxy_pass http://127.0.0.1:8501/;
        proxy_set_header        Host $host;
        proxy_set_header        X-Real-IP $remote_addr;
        proxy_set_header        X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header        X-Forwarded-Proto $scheme;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
    }

    location /_stcore/stream {
        proxy_pass http://127.0.0.1:8501/_stcore/stream;
        proxy_http_version 1.1;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header Host $host;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_read_timeout 86400;
    }
}