Python 채팅 프로그램
2024. 2. 1. 17:27ㆍIT관련
반응형
코드가 있으니 별다른 설명은 필요 없을것 같다.
- 다수의 클라이언트를 지원한다.
- 클라이언트에서 메시지를 보내면 서버가 받고 다시 클라이언트로 응답한다.
- 한번에 최대 1024 bytes 메시지 까지만 지원한다.
- 한글 메시지는 지원하지 않는다.
서버
import socket
import threading
IP = '127.0.0.1' # IP
PORT = 5000 # PORT
def start_server_echo(conn: socket, address: tuple) -> None:
"""
클라이언트 메시지를 수신하고 메시지를 다시 클라이언트에 보낸다.
@param conn: socket instance
@param address: tuple(IP, pid)
"""
print("Connection from: " + str(address))
conn.send(f'Hello {str(address)}'.encode())
while True:
# 메시지 수신. 1024 bytes 이상은 수신 못한다.
data = conn.recv(1024).decode()
if not data:
break
print(f"from {str(address)}: {str(data)}")
conn.send(data.encode()) # 클라이언트로 메시지 발송
conn.close() # 연결 해제
def server_program() -> None:
"""
서버 소켓 실행
"""
server_socket = socket.socket()
server_socket.bind((IP, PORT))
while True:
server_socket.listen(2)
conn, address = server_socket.accept() # 신규 connection을 받는다.
# 접속된 connection을 쓰레드로 돌리고 다음 connection을 대기한다.
threading.Thread(target=start_server_echo, args=(conn, address)).start()
if __name__ == '__main__':
server_program()
클라이언트
import os
import socket
import threading
import time
IP = '127.0.0.1' # IP
PORT = 5000 # PORT
def receiving(conn: socket) -> None:
"""
서버로부터 메시지를 수신하여 출력한다.
서버와 통신이 끊어질때 sys.exit(0)를 해봤지만 이 thread만 종료될뿐
main thread는 종료되지 않아서 강제로 프로세스를 kill 시킨다.
@param conn: socket instance
"""
try:
while True:
data = conn.recv(1024).decode() # receive response
print(f'Received from server: {data}\n') # show in terminal
except:
pass
finally:
print('Disconnected!')
conn.close()
pid = os.getpid()
os.kill(pid, 2)
def sending(conn: socket) -> None:
"""
메시지를 서버로 발송한다.
서버와 통신이 끊어질때 sys.exit(0)를 해봤지만 이 thread만 종료될뿐
main thread는 종료되지 않아서 강제로 프로세스를 kill 시킨다.
@param conn: socket instance
"""
try:
while True:
message = input('-> ')
conn.send(message.encode())
except:
pass
finally:
print('Disconnected!')
conn.close()
pid = os.getpid()
os.kill(pid, 2)
def client_program() -> None:
"""
서버에 접속하여 보내기/ 받기를 별도의 쓰레드로 만들어 실행한다.
threading.Thread()에서 args는 반드시 iterable만 가능하기 때문에
args를 (client_socket, )으로 넘긴다.
"""
while True:
try:
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 서버 소켓에 접속
client_socket.connect((IP, PORT))
break
except ConnectionRefusedError:
print('Connection Failed, Retrying..')
time.sleep(1)
threading.Thread(target=receiving, args=(client_socket,)).start()
threading.Thread(target=sending, args=(client_socket,)).start()
if __name__ == '__main__':
client_program()
반응형
'IT관련' 카테고리의 다른 글
카카오 메시지내 버튼 링크 오동작 처리 (0) | 2024.03.25 |
---|---|
안드로이드에서 AT 명령어 보내기 (0) | 2024.02.19 |
리모트 디버깅 포트 사용중인 크롬 브라우저 여부 확인 방법 (0) | 2024.01.31 |
Selenium chrome 실행시 "Chrome이 자동화된 테스트 소프트웨어에 의해 제어되고 있습니다." 자동화 메시지 제거 (0) | 2024.01.26 |
Requests --> Selenium 세션 쿠키 유지 (0) | 2024.01.26 |