当前位置:首页 > 技术知识 > 正文内容

Python 的网络与互联网访问模块及应用实例(一)

maynowei7个月前 (09-18)技术知识81



Python 提供了丰富的内置模块和第三方库来处理网络与互联网访问,使得从简单的 HTTP 请求到复杂的网络通信都变得相对简单。以下是常用的网络模块及其应用实例。


一、常用网络访问模块


socket - 底层网络接口,TCP/UDP 编程


http.client - 提供 HTTP 和 HTTPS 客户端功能


urllib - 处理 URL 和 HTTP 请求


ftplib - FTP 客户端操作


smtplib - 发送电子邮件


poplib - 接收电子邮件(从 POP3 服务器)


imaplib - 接收电子邮件(从 IMAP 服务器)


email - 解析和生成电子邮件


requests - 更简洁、易用的 HTTP 客户端库(第三方)


aiohttp - 异步 HTTP 客户端/服务器框架(第三方)


websockets - WebSocket 客户端/服务器实现(第三方)


paramiko - SSHv2 协议实现(第三方)


twisted - 事件驱动的网络编程框架(第三方)


二、核心模块应用实例


socket 模块:底层网络通信


TCP 服务器示例:


import socket


server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)


server_address = ('localhost', 12345)


server_socket.bind(server_address)


server_socket.listen(5)


print("服务器启动,等待连接...")


while True:


client_socket, client_address = server_socket.accept()


print(f"接收到来自 {client_address} 的连接")


try:


data = client_socket.recv(1024)


print(f"收到数据: {data.decode('utf-8')}")


message = "你好!消息已收到。"


client_socket.sendall(message.encode('utf-8'))


finally:


client_socket.close()


TCP 客户端示例:


import socket


client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)


server_address = ('localhost', 12345)


try:


client_socket.connect(server_address)


message = "Hello, Server!"


client_socket.sendall(message.encode('utf-8'))


data = client_socket.recv(1024)


print(f"服务器响应: {data.decode('utf-8')}")


finally:


client_socket.close()


requests 库:HTTP 客户端


GET 请求示例:


import requests


url = 'https://www.example.com '


response = requests.get(url)


if response.status_code == 200:


print(response.text)


else:


print(f"请求失败,状态码: {response.status_code}")


带参数的 GET 请求:


import requests


url = 'https://api.github.com/search/repositories '


params = {'q': 'python', 'sort': 'stars'}


headers = {'User-Agent': 'Mozilla/5.0'}


try:


response = requests.get(url, params=params, headers=headers, timeout=5)


response.raise_for_status()


data = response.json()


for repo in data['items'][:5]:


print(f"仓库名: {repo['name']}, 星数: {repo['stargazers_count']}")


except requests.exceptions.RequestException as e:


print(f"请求出错: {e}")


smtplib 和 email 模块:发送邮件


发送纯文本邮件:


import smtplib


from email.mime.text import MIMEText


from email.utils import formataddr


sender = 'your_email@example.com '


receiver = 'recipient@example.com '


smtp_pass = 'your_authorization_code'


msg = MIMEText('这是一封用 Python 发送的测试邮件。', 'plain', 'utf-8')


msg['Subject'] = 'Python SMTP 测试邮件'


msg['From'] = formataddr(('发送者名称', sender))


msg['To'] = formataddr(('接收者名称', receiver))


try:


server = smtplib.SMTP_SSL('smtp.qq.com', 465)


server.login(sender, smtp_pass)


server.sendmail(sender, [receiver], msg.as_string())


print("邮件发送成功")


except Exception as e:


print(f"邮件发送失败: {e}")


finally:


server.quit()


ftplib 模块:FTP 操作

from ftplib import FTP


ftp = FTP('ftp.example.com')


ftp.login('username', 'password')


ftp.cwd('/pub/python')


ftp.retrlines('LIST')


with open('local_file.txt', 'wb') as f:


ftp.retrbinary('RETR remote_file.txt', f.write)


with open('file_to_upload.txt', 'rb') as f:


ftp.storbinary('STOR uploaded_file.txt', f)


ftp.quit()


三、错误处理与重试机制


使用 requests 时的错误处理:


import requests


from requests.exceptions import RequestException, Timeout, HTTPError, ConnectionError


url = 'https://www.example.com '


try:


response = requests.get(url, timeout=5)


response.raise_for_status()


print(response.text)


except Timeout:


print("请求超时,请检查网络或稍后重试")


except ConnectionError:


print("网络连接错误,请检查网络设置")


except HTTPError as err:


print(f"HTTP 错误: {err}")


except RequestException as err:


print(f"请求异常: {err}")


简单的重试机制:


import requests


import time


def make_request_with_retry(url, retries=3, delay=1):


for attempt in range(1, retries + 1):


try:


response = requests.get(url, timeout=5)


response.raise_for_status()


return response


except RequestException as e:


print(f"请求失败,第 {attempt} 次尝试: {e}")


if attempt == retries:


print("已达到最大重试次数,放弃请求")


raise


time.sleep(delay)


return None


response = make_request_with_retry('https://www.example.com ')


if response is not None:


print(response.text)


四、简单应用实例:获取本机网络信息


import socket


import tkinter as tk


import tkinter.messagebox


def get_network_info():


try:


hostname = socket.gethostname()


ip_list = socket.gethostbyname_ex(hostname)[2]


return hostname, ip_list


except Exception as e:


print(f"获取网络信息失败: {e}")


return None, []


hostname, ip_list = get_network_info()


print(f"主机名: {hostname}")


print("IP 地址列表:")


for ip in ip_list:


print(f" {ip}")


五、最佳实践与注意事项


异常处理:网络操作极易因各种原因失败,务必使用 try-except 块进行妥善处理


超时设置:几乎所有网络请求都应设置合理的超时时间


资源清理:确保网络连接被正确关闭,防止资源泄漏


安全性:


处理用户输入或从网络接收的数据时需谨慎


使用 HTTPS 等加密协议传输敏感信息


妥善保管 API 密钥、密码等敏感信息


遵守规则:在进行网络爬虫或大量请求时,务必遵守网站的 robots.txt 协议


第三方库:对于常见的 HTTP 请求,优先考虑使用 requests 库而非 urllib

相关文章

聊聊并发编程: Lock(并发锁有哪些)

之前学习了如何使用synchronized关键字来实现同步访问,Java SE 5之后,并发包中新增了Lock接口(以及相关实现类)用来实现锁功能,它提供了与synchronized关键字类似的同步功...

Oracle 11g安装教程完整版(oracle 11g 安装教程)

由于工作需要,将安装的经验分享给大家。第一步:首先准备安装文件包:Oralce 11.2.0.4 64bit和plsqldev1405x64如图所示:第二步:将2个文件解压到同一个目录,如图所示:第三...

超详细的Oracle19c修改数据库用户名教程

概述由于开发很多视图指定了某个用户名,故需修改数据库用户名srmpro为srm。以下为操作过程..1、停止应用防止修改用户名密码后应用一直在发起错误连接,可事先查询哪个IP在连接数据库,然后断开对应连...

Docker安装Oracle 11g 数据库过程详解

1、查看docker 版本[root@node3 ~]# docker version Client: Version: 18.09.6 API version:...

Think in Mingdao——人人都是全栈工程师

文/明道云销售部顾问 文静编辑/蒋礼轩一、引言在软件开发领域,有这样一类"Think"系的书籍被广大程序员们奉为经典,如:Think in C++、Think in C#、Think...

Java集合框架:总结(java集合框架是什么?说出一些集合框架的优点)

Java集合框架这个系列做了一个整理,主要包括:Map系:HashMap, LinkedHashMap, TreeMap, WeakHashMap, EnumMap;List系:ArrayList,...